Remove global state about configuration and logs

Many things have been changed to make this possible, probably leading to
many hidden bugs waiting to be found.

Related to #122.
This commit is contained in:
Guillaume Ayoub
2016-04-22 11:37:02 +09:00
parent 8ac19ae0fc
commit 2f97d7d1e1
15 changed files with 576 additions and 488 deletions

View File

@ -39,24 +39,21 @@ Leading or ending slashes are trimmed from collection's path.
import os.path
import re
import sys
from configparser import ConfigParser
from io import StringIO
from . import config, log
from importlib import import_module
def _load():
def load(configuration, logger):
"""Load the rights manager chosen in configuration."""
rights_type = config.get("rights", "type")
rights_type = configuration.get("rights", "type")
if rights_type == "None":
sys.modules[__name__].authorized = (
lambda user, collection, permission: True)
return lambda user, collection, permission: True
elif rights_type in DEFINED_RIGHTS or rights_type == "from_file":
pass # authorized is already defined
return Rights(configuration, logger).authorized
else:
__import__(rights_type)
sys.modules[__name__].authorized = sys.modules[rights_type].authorized
module = import_module(rights_type)
return module.Rights(configuration, logger).authorized
DEFINED_RIGHTS = {
@ -84,53 +81,57 @@ permission:rw
"""}
def _read_from_sections(user, collection_url, permission):
"""Get regex sections."""
filename = os.path.expanduser(config.get("rights", "file"))
rights_type = config.get("rights", "type").lower()
# Prevent "regex injection"
user_escaped = re.escape(user)
collection_url_escaped = re.escape(collection_url)
regex = ConfigParser({"login": user_escaped, "path": collection_url_escaped})
if rights_type in DEFINED_RIGHTS:
log.LOGGER.debug("Rights type '%s'" % rights_type)
regex.readfp(StringIO(DEFINED_RIGHTS[rights_type]))
elif rights_type == "from_file":
log.LOGGER.debug("Reading rights from file %s" % filename)
if not regex.read(filename):
log.LOGGER.error("File '%s' not found for rights" % filename)
return False
else:
log.LOGGER.error("Unknown rights type '%s'" % rights_type)
class BaseRights:
def __init__(self, configuration, logger):
self.configuration = configuration
self.logger = logger
def authorized(self, user, collection, permission):
"""Check if the user is allowed to read or write the collection.
If the user is empty, check for anonymous rights.
"""
raise NotImplementedError
class Rights(BaseRights):
def __init__(self, configuration, logger):
super().__init__()
self.filename = os.path.expanduser(configuration.get("rights", "file"))
self.rights_type = configuration.get("rights", "type").lower()
def authorized(self, user, collection, permission):
collection_url = collection.path.rstrip("/") or "/"
if collection_url in (".well-known/carddav", ".well-known/caldav"):
return permission == "r"
# Prevent "regex injection"
user_escaped = re.escape(user)
collection_url_escaped = re.escape(collection_url)
regex = ConfigParser(
{"login": user_escaped, "path": collection_url_escaped})
if self.rights_type in DEFINED_RIGHTS:
self.logger.debug("Rights type '%s'" % self.rights_type)
regex.readfp(StringIO(DEFINED_RIGHTS[self.rights_type]))
else:
self.logger.debug("Reading rights from file '%s'" % self.filename)
if not regex.read(self.filename):
self.logger.error(
"File '%s' not found for rights" % self.filename)
return False
for section in regex.sections():
re_user = regex.get(section, "user")
re_collection = regex.get(section, "collection")
self.logger.debug(
"Test if '%s:%s' matches against '%s:%s' from section '%s'" % (
user, collection_url, re_user, re_collection, section))
user_match = re.match(re_user, user)
if user_match:
re_collection = re_collection.format(*user_match.groups())
if re.match(re_collection, collection_url):
self.logger.debug("Section '%s' matches" % section)
return permission in regex.get(section, "permission")
else:
self.logger.debug("Section '%s' does not match" % section)
return False
for section in regex.sections():
re_user = regex.get(section, "user")
re_collection = regex.get(section, "collection")
log.LOGGER.debug(
"Test if '%s:%s' matches against '%s:%s' from section '%s'" % (
user, collection_url, re_user, re_collection, section))
user_match = re.match(re_user, user)
if user_match:
re_collection = re_collection.format(*user_match.groups())
if re.match(re_collection, collection_url):
log.LOGGER.debug("Section '%s' matches" % section)
return permission in regex.get(section, "permission")
else:
log.LOGGER.debug("Section '%s' does not match" % section)
return False
def authorized(user, collection, permission):
"""Check if the user is allowed to read or write the collection.
If the user is empty, check for anonymous rights.
"""
collection_url = collection.path.rstrip("/") or "/"
if collection_url in (".well-known/carddav", ".well-known/caldav"):
return permission == "r"
rights_type = config.get("rights", "type").lower()
return (
rights_type == "none" or
_read_from_sections(user or "", collection_url, permission))