Rework configuration

This commit is contained in:
Unrud 2019-06-17 04:13:25 +02:00
parent 63e6d091b9
commit b7590f8c84
19 changed files with 609 additions and 220 deletions

View File

@ -2,7 +2,7 @@
# Copyright © 2008 Nicolas Kandel
# Copyright © 2008 Pascal Halter
# Copyright © 2008-2017 Guillaume Ayoub
# Copyright © 2017-2018 Unrud <unrud@outlook.com>
# Copyright © 2017-2019 Unrud <unrud@outlook.com>
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -47,9 +47,12 @@ def _init_application(config_path, wsgi_errors):
log.setup()
with log.register_stream(wsgi_errors):
_application_config_path = config_path
configuration = config.load([config_path] if config_path else [],
ignore_missing_paths=False)
configuration = config.load(config.parse_compound_paths(
config.DEFAULT_CONFIG_PATH,
config_path))
log.set_level(configuration.get("logging", "level"))
# Inspect configuration after logger is configured
configuration.inspect()
_application = Application(configuration)

View File

@ -1,6 +1,6 @@
# This file is part of Radicale Server - Calendar Server
# Copyright © 2011-2017 Guillaume Ayoub
# Copyright © 2017-2018 Unrud <unrud@outlook.com>
# Copyright © 2017-2019 Unrud <unrud@outlook.com>
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -23,6 +23,7 @@ This module can be executed from a command line with ``$python -m radicale``.
"""
import argparse
import contextlib
import os
import signal
import socket
@ -47,10 +48,14 @@ def run():
help="print debug information")
groups = {}
for section, values in config.INITIAL_CONFIG.items():
for section, values in config.DEFAULT_CONFIG_SCHEMA.items():
if values.get("_internal", False):
continue
group = parser.add_argument_group(section)
groups[group] = []
for option, data in values.items():
if option.startswith("_"):
continue
kwargs = data.copy()
long_name = "--{0}-{1}".format(
section, option.replace("_", "-"))
@ -75,6 +80,7 @@ def run():
kwargs["help"], long_name)
group.add_argument(*opposite_args, **kwargs)
else:
del kwargs["type"]
group.add_argument(*args, **kwargs)
args = parser.parse_args()
@ -82,36 +88,40 @@ def run():
# Preliminary configure logging
if args.debug:
args.logging_level = "debug"
if args.logging_level is not None:
log.set_level(args.logging_level)
with contextlib.suppress(ValueError):
log.set_level(config.DEFAULT_CONFIG_SCHEMA["logging"]["level"]["type"](
args.logging_level))
# Update Radicale configuration according to arguments
arguments_config = {}
for group, actions in groups.items():
section = group.title
section_config = {}
for action in actions:
value = getattr(args, action)
if value is not None:
section_config[action.split('_', 1)[1]] = value
if section_config:
arguments_config[section] = section_config
if args.config is not None:
config_paths = [args.config] if args.config else []
ignore_missing_paths = False
else:
config_paths = ["/etc/radicale/config",
os.path.expanduser("~/.config/radicale/config")]
if "RADICALE_CONFIG" in os.environ:
config_paths.append(os.environ["RADICALE_CONFIG"])
ignore_missing_paths = True
try:
configuration = config.load(config_paths,
ignore_missing_paths=ignore_missing_paths)
configuration = config.load(config.parse_compound_paths(
config.DEFAULT_CONFIG_PATH,
os.environ.get("RADICALE_CONFIG"),
args.config))
if arguments_config:
configuration.update(
arguments_config, "arguments", internal=False)
except Exception as e:
logger.fatal("Invalid configuration: %s", e, exc_info=True)
exit(1)
# Update Radicale configuration according to arguments
for group, actions in groups.items():
section = group.title
for action in actions:
value = getattr(args, action)
if value is not None:
configuration.set(section, action.split('_', 1)[1], value)
# Configure logging
log.set_level(configuration.get("logging", "level"))
# Inspect configuration after logger is configured
configuration.inspect()
if args.verify_storage:
logger.info("Verifying storage")
try:

View File

@ -2,7 +2,7 @@
# Copyright © 2008 Nicolas Kandel
# Copyright © 2008 Pascal Halter
# Copyright © 2008-2017 Guillaume Ayoub
# Copyright © 2017-2018 Unrud <unrud@outlook.com>
# Copyright © 2017-2019 Unrud <unrud@outlook.com>
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -80,8 +80,7 @@ class Application(
request_environ = dict(environ)
# Mask passwords
mask_passwords = self.configuration.getboolean(
"logging", "mask_passwords")
mask_passwords = self.configuration.get("logging", "mask_passwords")
authorization = request_environ.get("HTTP_AUTHORIZATION", "")
if mask_passwords and authorization.startswith("Basic"):
request_environ["HTTP_AUTHORIZATION"] = "Basic **masked**"
@ -162,7 +161,6 @@ class Application(
headers["Content-Length"] = str(len(answer))
# Add extra headers set in configuration
if self.configuration.has_section("headers"):
for key in self.configuration.options("headers"):
headers[key] = self.configuration.get("headers", key)
@ -244,7 +242,7 @@ class Application(
elif login:
logger.info("Failed login attempt: %r", login)
# Random delay to avoid timing oracles and bruteforce attacks
delay = self.configuration.getfloat("auth", "delay")
delay = self.configuration.get("auth", "delay")
if delay > 0:
random_delay = delay * (0.5 + random.random())
logger.debug("Sleeping %.3f seconds", random_delay)
@ -275,11 +273,11 @@ class Application(
logger.warning("Access to principal path %r denied by "
"rights backend", principal_path)
if self.configuration.getboolean("internal", "internal_server"):
if self.configuration.get("internal", "internal_server"):
# Verify content length
content_length = int(environ.get("CONTENT_LENGTH") or 0)
if content_length:
max_content_length = self.configuration.getint(
max_content_length = self.configuration.get(
"server", "max_content_length")
if max_content_length and content_length > max_content_length:
logger.info("Request body too large: %d", content_length)

View File

@ -2,7 +2,7 @@
# Copyright © 2008 Nicolas Kandel
# Copyright © 2008 Pascal Halter
# Copyright © 2008-2017 Guillaume Ayoub
# Copyright © 2017-2018 Unrud <unrud@outlook.com>
# Copyright © 2017-2019 Unrud <unrud@outlook.com>
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -21,7 +21,6 @@ import base64
import functools
import hashlib
import hmac
import os
from radicale import auth
@ -29,8 +28,7 @@ from radicale import auth
class Auth(auth.BaseAuth):
def __init__(self, configuration):
super().__init__(configuration)
self.filename = os.path.expanduser(
configuration.get("auth", "htpasswd_filename"))
self.filename = configuration.get("auth", "htpasswd_filename")
self.encryption = configuration.get("auth", "htpasswd_encryption")
if self.encryption == "ssha":

View File

@ -2,7 +2,7 @@
# Copyright © 2008-2017 Guillaume Ayoub
# Copyright © 2008 Nicolas Kandel
# Copyright © 2008 Pascal Halter
# Copyright © 2017-2018 Unrud <unrud@outlook.com>
# Copyright © 2017-2019 Unrud <unrud@outlook.com>
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -27,9 +27,14 @@ Give a configparser-like interface to read and write configuration.
import math
import os
from collections import OrderedDict
from configparser import RawConfigParser as ConfigParser
from configparser import RawConfigParser
from radicale import auth, rights, storage, web
from radicale.log import logger
DEFAULT_CONFIG_PATH = os.pathsep.join([
"?/etc/radicale/config",
"?~/.config/radicale/config"])
def positive_int(value):
@ -52,18 +57,43 @@ def positive_float(value):
def logging_level(value):
if value not in ("debug", "info", "warning", "error", "critical"):
raise ValueError("unsupported level: %s" % value)
raise ValueError("unsupported level: %r" % value)
return value
def filepath(value):
if not value:
return ""
value = os.path.expanduser(value)
if os.name == "nt":
value = os.path.expandvars(value)
return os.path.abspath(value)
def list_of_ip_address(value):
def ip_address(value):
try:
address, port = value.strip().rsplit(":", 1)
return address.strip("[] "), int(port)
except ValueError:
raise ValueError("malformed IP address: %r" % value)
return [ip_address(s.strip()) for s in value.split(",")]
def _convert_to_bool(value):
if value.lower() not in RawConfigParser.BOOLEAN_STATES:
raise ValueError("Not a boolean: %r" % value)
return RawConfigParser.BOOLEAN_STATES[value.lower()]
# Default configuration
INITIAL_CONFIG = OrderedDict([
DEFAULT_CONFIG_SCHEMA = OrderedDict([
("server", OrderedDict([
("hosts", {
"value": "127.0.0.1:5232",
"help": "set server hostnames including ports",
"aliases": ["-H", "--hosts"],
"type": str}),
"type": list_of_ip_address}),
("max_connections", {
"value": "8",
"help": "maximum number of parallel connections",
@ -86,17 +116,17 @@ INITIAL_CONFIG = OrderedDict([
"value": "/etc/ssl/radicale.cert.pem",
"help": "set certificate file",
"aliases": ["-c", "--certificate"],
"type": str}),
"type": filepath}),
("key", {
"value": "/etc/ssl/radicale.key.pem",
"help": "set private key file",
"aliases": ["-k", "--key"],
"type": str}),
"type": filepath}),
("certificate_authority", {
"value": "",
"help": "set CA certificate for validating clients",
"aliases": ["--certificate-authority"],
"type": str}),
"type": filepath}),
("protocol", {
"value": "PROTOCOL_TLSv1_2",
"help": "SSL protocol used",
@ -127,7 +157,7 @@ INITIAL_CONFIG = OrderedDict([
("htpasswd_filename", {
"value": "/etc/radicale/users",
"help": "htpasswd filename",
"type": str}),
"type": filepath}),
("htpasswd_encryption", {
"value": "bcrypt",
"help": "htpasswd encryption method",
@ -149,7 +179,7 @@ INITIAL_CONFIG = OrderedDict([
("file", {
"value": "/etc/radicale/rights",
"help": "file for rights management from_file",
"type": str})])),
"type": filepath})])),
("storage", OrderedDict([
("type", {
"value": "multifilesystem",
@ -157,14 +187,13 @@ INITIAL_CONFIG = OrderedDict([
"type": str,
"internal": storage.INTERNAL_TYPES}),
("filesystem_folder", {
"value": os.path.expanduser(
"/var/lib/radicale/collections"),
"value": "/var/lib/radicale/collections",
"help": "path where collections are stored",
"type": str}),
"type": filepath}),
("max_sync_token_age", {
"value": "2592000", # 30 days
"help": "delete sync token that are older",
"type": int}),
"type": positive_int}),
("hook", {
"value": "",
"help": "command that is run after changes to storage",
@ -183,9 +212,11 @@ INITIAL_CONFIG = OrderedDict([
("mask_passwords", {
"value": "True",
"help": "mask passwords in logs",
"type": bool})]))])
# Default configuration for "internal" settings
INTERNAL_CONFIG = OrderedDict([
"type": bool})])),
("headers", OrderedDict([
("_allow_extra", True)])),
("internal", OrderedDict([
("_internal", True),
("filesystem_fsync", {
"value": "True",
"help": "sync all changes to filesystem during requests",
@ -193,52 +224,196 @@ INTERNAL_CONFIG = OrderedDict([
("internal_server", {
"value": "False",
"help": "the internal server is used",
"type": bool})])
"type": bool})]))])
def load(paths=(), ignore_missing_paths=True):
config = ConfigParser()
for section, values in INITIAL_CONFIG.items():
config.add_section(section)
for key, data in values.items():
config.set(section, key, data["value"])
for path in paths:
if path or not ignore_missing_paths:
def parse_compound_paths(*compound_paths):
"""Parse a compound path and return the individual paths.
Paths in a compound path are joined by ``os.pathsep``. If a path starts
with ``?`` the return value ``IGNORE_IF_MISSING`` is set.
When multiple ``compound_paths`` are passed, the last argument that is
not ``None`` is used.
Returns a dict of the format ``[(PATH, IGNORE_IF_MISSING), ...]``
"""
compound_path = ""
for p in compound_paths:
if p is not None:
compound_path = p
paths = []
for path in compound_path.split(os.pathsep):
ignore_if_missing = path.startswith("?")
if ignore_if_missing:
path = path[1:]
path = filepath(path)
if path:
paths.append((path, ignore_if_missing))
return paths
def load(paths=()):
"""Load configuration from files.
``paths`` a list of the format ``[(PATH, IGNORE_IF_MISSING), ...]``.
"""
configuration = Configuration(DEFAULT_CONFIG_SCHEMA)
for path, ignore_if_missing in paths:
parser = RawConfigParser()
config_source = "config file %r" % path
try:
if not config.read(path) and not ignore_missing_paths:
if not parser.read(path):
config = Configuration.SOURCE_MISSING
if not ignore_if_missing:
raise RuntimeError("No such file: %r" % path)
else:
config = {s: {o: parser[s][o] for o in parser.options(s)}
for s in parser.sections()}
except Exception as e:
raise RuntimeError(
"Failed to load config file %r: %s" % (path, e)) from e
# Check the configuration
for section in config.sections():
if section == "headers":
"Failed to load %s: %s" % (config_source, e)) from e
configuration.update(config, config_source, internal=False)
return configuration
class Configuration:
SOURCE_MISSING = {}
def __init__(self, schema):
"""Initialize configuration.
``schema`` a dict that describes the configuration format.
See ``DEFAULT_CONFIG_SCHEMA``.
"""
self._schema = schema
self._values = {}
self._configs = []
values = {}
for section in schema:
values[section] = {}
for option in schema[section]:
if option.startswith("_"):
continue
if section not in INITIAL_CONFIG:
raise RuntimeError("Invalid section %r in config" % section)
allow_extra_options = ("type" in INITIAL_CONFIG[section] and
config.get(section, "type") not in
INITIAL_CONFIG[section]["type"].get("internal",
()))
values[section][option] = schema[section][option]["value"]
self.update(values, "default config")
def update(self, config, source, internal=True):
"""Update the configuration.
``config`` a dict of the format {SECTION: {OPTION: VALUE, ...}, ...}.
Set to ``Configuration.SOURCE_MISSING`` to indicate a missing
configuration source for inspection.
``source`` a description of the configuration source
``internal`` allows updating "_internal" sections and skips the source
during inspection.
"""
new_values = {}
for section in config:
if (section not in self._schema or not internal and
self._schema[section].get("_internal", False)):
raise RuntimeError(
"Invalid section %r in %s" % (section, source))
new_values[section] = {}
if "_allow_extra" in self._schema[section]:
allow_extra_options = self._schema[section]["_allow_extra"]
elif "type" in self._schema[section]:
if "type" in config[section]:
plugin_type = config[section]["type"]
else:
plugin_type = self.get(section, "type")
allow_extra_options = plugin_type not in self._schema[section][
"type"].get("internal", [])
else:
allow_extra_options = False
for option in config[section]:
if option not in INITIAL_CONFIG[section]:
if allow_extra_options:
continue
if option in self._schema[section]:
type_ = self._schema[section][option]["type"]
elif allow_extra_options:
type_ = str
else:
raise RuntimeError("Invalid option %r in section %r in "
"config" % (option, section))
type_ = INITIAL_CONFIG[section][option]["type"]
"%s" % (option, section, source))
raw_value = config[section][option]
try:
if type_ == bool:
config.getboolean(section, option)
else:
type_(config.get(section, option))
raw_value = _convert_to_bool(raw_value)
new_values[section][option] = type_(raw_value)
except Exception as e:
raise RuntimeError(
"Invalid %s value for option %r in section %r in config: "
"%r" % (type_.__name__, option, section,
config.get(section, option))) from e
# Add internal configuration
config.add_section("internal")
for key, data in INTERNAL_CONFIG.items():
config.set("internal", key, data["value"])
return config
"Invalid %s value for option %r in section %r in %s: "
"%r" % (type_.__name__, option, section, source,
raw_value)) from e
self._configs.append((config, source, internal))
for section in new_values:
if section not in self._values:
self._values[section] = {}
for option in new_values[section]:
self._values[section][option] = new_values[section][option]
def get(self, section, option):
"""Get the value of ``option`` in ``section``."""
return self._values[section][option]
def get_raw(self, section, option):
"""Get the raw value of ``option`` in ``section``."""
fconfig = self._configs[0]
for config, _, _ in reversed(self._configs):
if section in config and option in config[section]:
fconfig = config
break
return fconfig[section][option]
def sections(self):
"""List all sections."""
return self._values.keys()
def options(self, section):
"""List all options in ``section``"""
return self._values[section].keys()
def copy(self, plugin_schema=None):
"""Create a copy of the configuration
``plugin_schema`` is a optional dict that contains additional options
for usage with a plugin. See ``DEFAULT_CONFIG_SCHEMA``.
"""
if plugin_schema is None:
schema = self._schema
skip = 1 # skip default config
else:
skip = 0
schema = self._schema.copy()
for section, options in plugin_schema.items():
if (section not in schema or "type" not in schema[section] or
"internal" not in schema[section]["type"]):
raise ValueError("not a plugin section: %r" % section)
schema[section] = schema[section].copy()
schema[section]["type"] = schema[section]["type"].copy()
schema[section]["type"]["internal"] = [
self.get(section, "type")]
for option, value in options.items():
if option in schema[section]:
raise ValueError("option already exists in %r: %r" % (
section, option))
schema[section][option] = value
copy = self.__class__(schema)
for config, source, allow_internal in self._configs[skip:]:
copy.update(config, source, allow_internal)
return copy
def inspect(self):
"""Inspect all external config sources and write problems to logger."""
for config, source, internal in self._configs:
if internal:
continue
if config is self.SOURCE_MISSING:
logger.info("Skipped missing %s", source)
else:
logger.info("Parsed %s", source)

View File

@ -1,6 +1,6 @@
# This file is part of Radicale Server - Calendar Server
# Copyright © 2011-2017 Guillaume Ayoub
# Copyright © 2017-2018 Unrud <unrud@outlook.com>
# Copyright © 2017-2019 Unrud <unrud@outlook.com>
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -172,7 +172,7 @@ def setup():
register_stream = handler.register_stream
log_record_factory = IdentLogRecordFactory(logging.getLogRecordFactory())
logging.setLogRecordFactory(log_record_factory)
set_level(logging.DEBUG)
set_level(logging.WARNING)
def set_level(level):

View File

@ -1,6 +1,6 @@
# This file is part of Radicale Server - Calendar Server
# Copyright © 2012-2017 Guillaume Ayoub
# Copyright © 2017-2018 Unrud <unrud@outlook.com>
# Copyright © 2017-2019 Unrud <unrud@outlook.com>
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -16,7 +16,6 @@
# along with Radicale. If not, see <http://www.gnu.org/licenses/>.
import configparser
import os.path
import re
from radicale import pathutils, rights
@ -26,7 +25,7 @@ from radicale.log import logger
class Rights(rights.BaseRights):
def __init__(self, configuration):
super().__init__(configuration)
self.filename = os.path.expanduser(configuration.get("rights", "file"))
self.filename = configuration.get("rights", "file")
def authorized(self, user, path, permissions):
user = user or ""

View File

@ -2,7 +2,7 @@
# Copyright © 2008 Nicolas Kandel
# Copyright © 2008 Pascal Halter
# Copyright © 2008-2017 Guillaume Ayoub
# Copyright © 2017-2018 Unrud <unrud@outlook.com>
# Copyright © 2017-2019 Unrud <unrud@outlook.com>
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -32,7 +32,6 @@ import ssl
import sys
import threading
import wsgiref.simple_server
from configparser import ConfigParser
from urllib.parse import unquote
from radicale import Application
@ -247,24 +246,21 @@ def serve(configuration, shutdown_socket=None):
"""Serve radicale from configuration."""
logger.info("Starting Radicale")
# Copy configuration before modifying
config_copy = ConfigParser()
config_copy.read_dict(configuration)
configuration = config_copy
configuration["internal"]["internal_server"] = "True"
configuration = configuration.copy()
configuration.update({"internal": {"internal_server": "True"}}, "server")
# Create collection servers
servers = {}
if configuration.getboolean("server", "ssl"):
if configuration.get("server", "ssl"):
server_class = ParallelHTTPSServer
else:
server_class = ParallelHTTPServer
class ServerCopy(server_class):
"""Copy, avoids overriding the original class attributes."""
ServerCopy.client_timeout = configuration.getint("server", "timeout")
ServerCopy.max_connections = configuration.getint(
"server", "max_connections")
if configuration.getboolean("server", "ssl"):
ServerCopy.client_timeout = configuration.get("server", "timeout")
ServerCopy.max_connections = configuration.get("server", "max_connections")
if configuration.get("server", "ssl"):
ServerCopy.certificate = configuration.get("server", "certificate")
ServerCopy.key = configuration.get("server", "key")
ServerCopy.certificate_authority = configuration.get(
@ -285,7 +281,7 @@ def serve(configuration, shutdown_socket=None):
class RequestHandlerCopy(RequestHandler):
"""Copy, avoids overriding the original class attributes."""
if not configuration.getboolean("server", "dns_lookup"):
if not configuration.get("server", "dns_lookup"):
RequestHandlerCopy.address_string = lambda self: self.client_address[0]
if systemd:
@ -301,13 +297,7 @@ def serve(configuration, shutdown_socket=None):
server_addresses.append(socket.fromfd(
fd, ServerCopy.address_family, ServerCopy.socket_type))
else:
for host in configuration.get("server", "hosts").split(","):
try:
address, port = host.strip().rsplit(":", 1)
address, port = address.strip("[] "), int(port)
except ValueError as e:
raise RuntimeError(
"Failed to parse address %r: %s" % (host, e)) from e
for address, port in configuration.get("server", "hosts"):
server_addresses.append((address, port))
application = Application(configuration)
@ -321,7 +311,7 @@ def serve(configuration, shutdown_socket=None):
servers[server.socket] = server
logger.info("Listening to %r on port %d%s",
server.server_name, server.server_port, " using SSL"
if configuration.getboolean("server", "ssl") else "")
if configuration.get("server", "ssl") else "")
# Main loop: wait for requests on any of the servers or program shutdown
sockets = list(servers.keys())

View File

@ -1,7 +1,7 @@
# This file is part of Radicale Server - Calendar Server
# Copyright © 2014 Jean-Marc Martins
# Copyright © 2012-2017 Guillaume Ayoub
# Copyright © 2017-2018 Unrud <unrud@outlook.com>
# Copyright © 2017-2019 Unrud <unrud@outlook.com>
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -48,8 +48,7 @@ class Collection(
@classmethod
def static_init(cls):
folder = os.path.expanduser(cls.configuration.get(
"storage", "filesystem_folder"))
folder = cls.configuration.get("storage", "filesystem_folder")
cls._makedirs_synced(folder)
super().static_init()
@ -66,8 +65,8 @@ class Collection(
@classmethod
def _get_collection_root_folder(cls):
filesystem_folder = os.path.expanduser(
cls.configuration.get("storage", "filesystem_folder"))
filesystem_folder = cls.configuration.get(
"storage", "filesystem_folder")
return os.path.join(filesystem_folder, "collection-root")
@contextlib.contextmanager
@ -96,7 +95,7 @@ class Collection(
@classmethod
def _fsync(cls, fd):
if cls.configuration.getboolean("internal", "filesystem_fsync"):
if cls.configuration.get("internal", "filesystem_fsync"):
pathutils.fsync(fd)
@classmethod
@ -106,7 +105,7 @@ class Collection(
This only works on POSIX and does nothing on other systems.
"""
if not cls.configuration.getboolean("internal", "filesystem_fsync"):
if not cls.configuration.get("internal", "filesystem_fsync"):
return
if os.name == "posix":
try:

View File

@ -1,7 +1,7 @@
# This file is part of Radicale Server - Calendar Server
# Copyright © 2014 Jean-Marc Martins
# Copyright © 2012-2017 Guillaume Ayoub
# Copyright © 2017-2018 Unrud <unrud@outlook.com>
# Copyright © 2017-2019 Unrud <unrud@outlook.com>
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -83,5 +83,5 @@ class CollectionHistoryMixin:
history_folder = os.path.join(self._filesystem_path,
".Radicale.cache", "history")
self._clean_cache(history_folder, self._get_deleted_history_hrefs(),
max_age=self.configuration.getint(
max_age=self.configuration.get(
"storage", "max_sync_token_age"))

View File

@ -1,7 +1,7 @@
# This file is part of Radicale Server - Calendar Server
# Copyright © 2014 Jean-Marc Martins
# Copyright © 2012-2017 Guillaume Ayoub
# Copyright © 2017-2018 Unrud <unrud@outlook.com>
# Copyright © 2017-2019 Unrud <unrud@outlook.com>
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -30,8 +30,7 @@ class CollectionLockMixin:
@classmethod
def static_init(cls):
super().static_init()
folder = os.path.expanduser(cls.configuration.get(
"storage", "filesystem_folder"))
folder = cls.configuration.get("storage", "filesystem_folder")
lock_path = os.path.join(folder, ".Radicale.lock")
cls._lock = pathutils.RwLock(lock_path)
@ -53,8 +52,7 @@ class CollectionLockMixin:
# execute hook
hook = cls.configuration.get("storage", "hook")
if mode == "w" and hook:
folder = os.path.expanduser(cls.configuration.get(
"storage", "filesystem_folder"))
folder = cls.configuration.get("storage", "filesystem_folder")
logger.debug("Running hook")
debug = logger.isEnabledFor(logging.DEBUG)
p = subprocess.Popen(

View File

@ -1,7 +1,7 @@
# This file is part of Radicale Server - Calendar Server
# Copyright © 2014 Jean-Marc Martins
# Copyright © 2012-2017 Guillaume Ayoub
# Copyright © 2017-2018 Unrud <unrud@outlook.com>
# Copyright © 2017-2019 Unrud <unrud@outlook.com>
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -96,7 +96,7 @@ class CollectionSyncMixin:
else:
# clean up old sync tokens and item cache
self._clean_cache(token_folder, os.listdir(token_folder),
max_age=self.configuration.getint(
max_age=self.configuration.get(
"storage", "max_sync_token_age"))
self._clean_history()
else:

View File

@ -2,7 +2,7 @@
# Copyright © 2008 Nicolas Kandel
# Copyright © 2008 Pascal Halter
# Copyright © 2008-2017 Guillaume Ayoub
# Copyright © 2017-2018 Unrud <unrud@outlook.com>
# Copyright © 2017-2019 Unrud <unrud@outlook.com>
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -39,3 +39,14 @@ def get_file_content(file_name):
return fd.read()
except IOError:
print("Couldn't open the file %s" % file_name)
def configuration_to_dict(configuration):
d = {}
for section in configuration.sections():
if configuration._schema[section].get("_internal", False):
continue
d[section] = {}
for option in configuration.options(section):
d[section][option] = configuration.get_raw(section, option)
return d

View File

@ -1,7 +1,7 @@
# This file is part of Radicale Server - Calendar Server
# Copyright © 2012-2016 Jean-Marc Martins
# Copyright © 2012-2017 Guillaume Ayoub
# Copyright © 2017-2018 Unrud <unrud@outlook.com>
# Copyright © 2017-2019 Unrud <unrud@outlook.com>
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -42,11 +42,12 @@ class TestBaseAuthRequests(BaseTest):
def setup(self):
self.configuration = config.load()
self.colpath = tempfile.mkdtemp()
self.configuration["storage"]["filesystem_folder"] = self.colpath
self.configuration.update({
"storage": {"filesystem_folder": self.colpath},
# Disable syncing to disk for better performance
self.configuration["internal"]["filesystem_fsync"] = "False"
"internal": {"filesystem_fsync": "False"},
# Set incorrect authentication delay to a very low value
self.configuration["auth"]["delay"] = "0.002"
"auth": {"delay": "0.002"}}, "test")
def teardown(self):
shutil.rmtree(self.colpath)
@ -57,9 +58,10 @@ class TestBaseAuthRequests(BaseTest):
htpasswd_file_path = os.path.join(self.colpath, ".htpasswd")
with open(htpasswd_file_path, "w") as f:
f.write(htpasswd_content)
self.configuration["auth"]["type"] = "htpasswd"
self.configuration["auth"]["htpasswd_filename"] = htpasswd_file_path
self.configuration["auth"]["htpasswd_encryption"] = htpasswd_encryption
self.configuration.update({
"auth": {"type": "htpasswd",
"htpasswd_filename": htpasswd_file_path,
"htpasswd_encryption": htpasswd_encryption}}, "test")
self.application = Application(self.configuration)
if test_matrix is None:
test_matrix = (
@ -129,7 +131,7 @@ class TestBaseAuthRequests(BaseTest):
self._test_htpasswd("plain", "#comment\n #comment\n \ntmp:bepo\n\n")
def test_remote_user(self):
self.configuration["auth"]["type"] = "remote_user"
self.configuration.update({"auth": {"type": "remote_user"}}, "test")
self.application = Application(self.configuration)
status, _, answer = self.request(
"PROPFIND", "/",
@ -143,7 +145,8 @@ class TestBaseAuthRequests(BaseTest):
assert ">/test/<" in answer
def test_http_x_remote_user(self):
self.configuration["auth"]["type"] = "http_x_remote_user"
self.configuration.update(
{"auth": {"type": "http_x_remote_user"}}, "test")
self.application = Application(self.configuration)
status, _, answer = self.request(
"PROPFIND", "/",
@ -158,7 +161,8 @@ class TestBaseAuthRequests(BaseTest):
def test_custom(self):
"""Custom authentication."""
self.configuration["auth"]["type"] = "tests.custom.auth"
self.configuration.update(
{"auth": {"type": "tests.custom.auth"}}, "test")
self.application = Application(self.configuration)
status, _, answer = self.request(
"PROPFIND", "/tmp", HTTP_AUTHORIZATION="Basic %s" %

View File

@ -1,6 +1,6 @@
# This file is part of Radicale Server - Calendar Server
# Copyright © 2012-2017 Guillaume Ayoub
# Copyright © 2017-2018 Unrud <unrud@outlook.com>
# Copyright © 2017-2019 Unrud <unrud@outlook.com>
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -1404,10 +1404,11 @@ class BaseRequestsMixIn:
def test_authentication(self):
"""Test if server sends authentication request."""
self.configuration["auth"]["type"] = "htpasswd"
self.configuration["auth"]["htpasswd_filename"] = os.devnull
self.configuration["auth"]["htpasswd_encryption"] = "plain"
self.configuration["rights"]["type"] = "owner_only"
self.configuration.update({
"auth": {"type": "htpasswd",
"htpasswd_filename": os.devnull,
"htpasswd_encryption": "plain"},
"rights": {"type": "owner_only"}}, "test")
self.application = Application(self.configuration)
status, headers, _ = self.request("MKCOL", "/user/")
assert status in (401, 403)
@ -1431,9 +1432,8 @@ class BaseRequestsMixIn:
assert status == 207
def test_custom_headers(self):
if not self.configuration.has_section("headers"):
self.configuration.add_section("headers")
self.configuration.set("headers", "test", "123")
self.configuration.update({"headers": {"test": "123"}}, "test")
self.application = Application(self.configuration)
# Test if header is set on success
status, headers, _ = self.request("OPTIONS", "/")
assert status == 200
@ -1461,11 +1461,7 @@ class BaseFileSystemTest(BaseTest):
def setup(self):
self.configuration = config.load()
self.configuration["storage"]["type"] = self.storage_type
self.colpath = tempfile.mkdtemp()
self.configuration["storage"]["filesystem_folder"] = self.colpath
# Disable syncing to disk for better performance
self.configuration["internal"]["filesystem_fsync"] = "False"
# Allow access to anything for tests
rights_file_path = os.path.join(self.colpath, "rights")
with open(rights_file_path, "w") as f:
@ -1474,8 +1470,13 @@ class BaseFileSystemTest(BaseTest):
user: .*
collection: .*
permissions: RrWw""")
self.configuration["rights"]["file"] = rights_file_path
self.configuration["rights"]["type"] = "from_file"
self.configuration.update({
"storage": {"type": self.storage_type,
"filesystem_folder": self.colpath},
# Disable syncing to disk for better performance
"internal": {"filesystem_fsync": "False"},
"rights": {"file": rights_file_path,
"type": "from_file"}}, "test")
self.application = Application(self.configuration)
def teardown(self):
@ -1488,14 +1489,18 @@ class TestMultiFileSystem(BaseFileSystemTest, BaseRequestsMixIn):
def test_fsync(self):
"""Create a directory and file with syncing enabled."""
self.configuration["internal"]["filesystem_fsync"] = "True"
self.configuration.update({
"internal": {"filesystem_fsync": "True"}}, "test")
self.application = Application(self.configuration)
status, _, _ = self.request("MKCALENDAR", "/calendar.ics/")
assert status == 201
def test_hook(self):
"""Run hook."""
self.configuration["storage"]["hook"] = (
self.configuration.update({"storage": {"hook": (
"mkdir %s" % os.path.join("collection-root", "created_by_hook"))
}}, "test")
self.application = Application(self.configuration)
status, _, _ = self.request("MKCALENDAR", "/calendar.ics/")
assert status == 201
status, _, _ = self.request("PROPFIND", "/created_by_hook/")
@ -1503,8 +1508,10 @@ class TestMultiFileSystem(BaseFileSystemTest, BaseRequestsMixIn):
def test_hook_read_access(self):
"""Verify that hook is not run for read accesses."""
self.configuration["storage"]["hook"] = (
self.configuration.update({"storage": {"hook": (
"mkdir %s" % os.path.join("collection-root", "created_by_hook"))
}}, "test")
self.application = Application(self.configuration)
status, _, _ = self.request("PROPFIND", "/")
assert status == 207
status, _, _ = self.request("PROPFIND", "/created_by_hook/")
@ -1514,15 +1521,18 @@ class TestMultiFileSystem(BaseFileSystemTest, BaseRequestsMixIn):
reason="flock command not found")
def test_hook_storage_locked(self):
"""Verify that the storage is locked when the hook runs."""
self.configuration["storage"]["hook"] = (
"flock -n .Radicale.lock || exit 0; exit 1")
self.configuration.update({"storage": {"hook": (
"flock -n .Radicale.lock || exit 0; exit 1")}}, "test")
self.application = Application(self.configuration)
status, _, _ = self.request("MKCALENDAR", "/calendar.ics/")
assert status == 201
def test_hook_principal_collection_creation(self):
"""Verify that the hooks runs when a new user is created."""
self.configuration["storage"]["hook"] = (
self.configuration.update({"storage": {"hook": (
"mkdir %s" % os.path.join("collection-root", "created_by_hook"))
}}, "test")
self.application = Application(self.configuration)
status, _, _ = self.request("PROPFIND", "/", HTTP_AUTHORIZATION=(
"Basic " + base64.b64encode(b"user:").decode()))
assert status == 207
@ -1531,7 +1541,8 @@ class TestMultiFileSystem(BaseFileSystemTest, BaseRequestsMixIn):
def test_hook_fail(self):
"""Verify that a request fails if the hook fails."""
self.configuration["storage"]["hook"] = "exit 1"
self.configuration.update({"storage": {"hook": "exit 1"}}, "test")
self.application = Application(self.configuration)
status, _, _ = self.request("MKCALENDAR", "/calendar.ics/")
assert status != 201

View File

@ -0,0 +1,182 @@
# This file is part of Radicale Server - Calendar Server
# Copyright © 2019 Unrud <unrud@outlook.com>
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Radicale. If not, see <http://www.gnu.org/licenses/>.
import os
import shutil
import tempfile
from configparser import RawConfigParser
import pytest
from radicale import config
from .helpers import configuration_to_dict
class TestConfig:
"""Test the configuration."""
def setup(self):
self.colpath = tempfile.mkdtemp()
def teardown(self):
shutil.rmtree(self.colpath)
def _write_config(self, config_dict, name):
parser = RawConfigParser()
parser.read_dict(config_dict)
config_path = os.path.join(self.colpath, name)
with open(config_path, "w") as f:
parser.write(f)
return config_path
def test_parse_compound_paths(self):
assert len(config.parse_compound_paths()) == 0
assert len(config.parse_compound_paths("")) == 0
assert len(config.parse_compound_paths(None, "")) == 0
assert len(config.parse_compound_paths("config", "")) == 0
assert len(config.parse_compound_paths("config", None)) == 1
assert len(config.parse_compound_paths(os.pathsep.join(["", ""]))) == 0
assert len(config.parse_compound_paths(os.pathsep.join([
"", "config", ""]))) == 1
paths = config.parse_compound_paths(os.pathsep.join([
"config1", "?config2", "config3"]))
assert len(paths) == 3
for i, (name, ignore_if_missing) in enumerate([
("config1", False), ("config2", True), ("config3", False)]):
assert os.path.isabs(paths[i][0])
assert os.path.basename(paths[i][0]) == name
assert paths[i][1] is ignore_if_missing
def test_load_empty(self):
config_path = self._write_config({}, "config")
config.load([(config_path, False)])
def test_load_full(self):
config_path = self._write_config(
configuration_to_dict(config.load()), "config")
config.load([(config_path, False)])
def test_load_missing(self):
config_path = os.path.join(self.colpath, "does_not_exist")
config.load([(config_path, True)])
with pytest.raises(Exception) as exc_info:
config.load([(config_path, False)])
e = exc_info.value
assert ("Failed to load config file %r" % config_path) in str(e)
def test_load_multiple(self):
config_path1 = self._write_config({
"server": {"hosts": "192.0.2.1:1111"}}, "config1")
config_path2 = self._write_config({
"server": {"max_connections": 1111}}, "config2")
configuration = config.load([(config_path1, False),
(config_path2, False)])
assert len(configuration.get("server", "hosts")) == 1
assert configuration.get("server", "hosts")[0] == ("192.0.2.1", 1111)
assert configuration.get("server", "max_connections") == 1111
def test_copy(self):
configuration1 = config.load()
configuration1.update({"server": {"max_connections": "1111"}}, "test")
configuration2 = configuration1.copy()
configuration2.update({"server": {"max_connections": "1112"}}, "test")
assert configuration1.get("server", "max_connections") == 1111
assert configuration2.get("server", "max_connections") == 1112
def test_invalid_section(self):
configuration = config.load()
with pytest.raises(Exception) as exc_info:
configuration.update({"does_not_exist": {"x": "x"}}, "test")
e = exc_info.value
assert "Invalid section 'does_not_exist'" in str(e)
def test_invalid_option(self):
configuration = config.load()
with pytest.raises(Exception) as exc_info:
configuration.update({"server": {"x": "x"}}, "test")
e = exc_info.value
assert "Invalid option 'x'" in str(e)
assert "section 'server'" in str(e)
def test_invalid_option_plugin(self):
configuration = config.load()
with pytest.raises(Exception) as exc_info:
configuration.update({"auth": {"x": "x"}}, "test")
e = exc_info.value
assert "Invalid option 'x'" in str(e)
assert "section 'auth'" in str(e)
def test_invalid_value(self):
configuration = config.load()
with pytest.raises(Exception) as exc_info:
configuration.update({"server": {"max_connections": "x"}}, "test")
e = exc_info.value
assert "Invalid positive_int" in str(e)
assert "option 'max_connections" in str(e)
assert "section 'server" in str(e)
assert "'x'" in str(e)
def test_internal(self):
configuration = config.load()
configuration.update({"internal": {"internal_server": "True"}}, "test")
with pytest.raises(Exception) as exc_info:
configuration.update({"internal": {"internal_server": "True"}},
"test", internal=False)
e = exc_info.value
assert "Invalid section 'internal'" in str(e)
def test_plugin_schema(self):
PLUGIN_SCHEMA = {"auth": {"new_option": {"value": "False",
"type": bool}}}
configuration = config.load()
configuration.update({"auth": {"type": "new_plugin"}}, "test")
plugin_configuration = configuration.copy(PLUGIN_SCHEMA)
assert plugin_configuration.get("auth", "new_option") is False
configuration.update({"auth": {"new_option": "True"}}, "test")
plugin_configuration = configuration.copy(PLUGIN_SCHEMA)
assert plugin_configuration.get("auth", "new_option") is True
def test_plugin_schema_duplicate_option(self):
PLUGIN_SCHEMA = {"auth": {"type": {"value": "False",
"type": bool}}}
configuration = config.load()
with pytest.raises(Exception) as exc_info:
configuration.copy(PLUGIN_SCHEMA)
e = exc_info.value
assert "option already exists in 'auth': 'type'" in str(e)
def test_plugin_schema_invalid(self):
PLUGIN_SCHEMA = {"server": {"new_option": {"value": "False",
"type": bool}}}
configuration = config.load()
with pytest.raises(Exception) as exc_info:
configuration.copy(PLUGIN_SCHEMA)
e = exc_info.value
assert "not a plugin section: 'server" in str(e)
def test_plugin_schema_option_invalid(self):
PLUGIN_SCHEMA = {"auth": {}}
configuration = config.load()
configuration.update({"auth": {"type": "new_plugin",
"new_option": False}}, "test")
with pytest.raises(Exception) as exc_info:
configuration.copy(PLUGIN_SCHEMA)
e = exc_info.value
assert "Invalid option 'new_option'" in str(e)
assert "section 'auth'" in str(e)

View File

@ -1,5 +1,5 @@
# This file is part of Radicale Server - Calendar Server
# Copyright © 2017-2018 Unrud <unrud@outlook.com>
# Copyright © 2017-2019 Unrud <unrud@outlook.com>
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -35,9 +35,10 @@ class TestBaseRightsRequests(BaseTest):
def setup(self):
self.configuration = config.load()
self.colpath = tempfile.mkdtemp()
self.configuration["storage"]["filesystem_folder"] = self.colpath
self.configuration.update({
"storage": {"filesystem_folder": self.colpath},
# Disable syncing to disk for better performance
self.configuration["internal"]["filesystem_fsync"] = "False"
"internal": {"filesystem_fsync": "False"}}, "test")
def teardown(self):
shutil.rmtree(self.colpath)
@ -49,11 +50,11 @@ class TestBaseRightsRequests(BaseTest):
htpasswd_file_path = os.path.join(self.colpath, ".htpasswd")
with open(htpasswd_file_path, "w") as f:
f.write("tmp:bepo\nother:bepo")
self.configuration["rights"]["type"] = rights_type
if with_auth:
self.configuration["auth"]["type"] = "htpasswd"
self.configuration["auth"]["htpasswd_filename"] = htpasswd_file_path
self.configuration["auth"]["htpasswd_encryption"] = "plain"
self.configuration.update({
"rights": {"type": rights_type},
"auth": {"type": "htpasswd" if with_auth else "none",
"htpasswd_filename": htpasswd_file_path,
"htpasswd_encryption": "plain"}}, "test")
self.application = Application(self.configuration)
for u in ("tmp", "other"):
status, _, _ = self.request(
@ -132,7 +133,8 @@ permissions: RrWw
user: .*
collection: custom(/.*)?
permissions: Rr""")
self.configuration["rights"]["file"] = rights_file_path
self.configuration.update(
{"rights": {"file": rights_file_path}}, "test")
self._test_rights("from_file", "", "/other", "r", 401)
self._test_rights("from_file", "tmp", "/other", "r", 403)
self._test_rights("from_file", "", "/custom/sub", "r", 404)

View File

@ -1,5 +1,5 @@
# This file is part of Radicale Server - Calendar Server
# Copyright © 2018 Unrud <unrud@outlook.com>
# Copyright © 2018-2019 Unrud <unrud@outlook.com>
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -28,7 +28,7 @@ import sys
import tempfile
import threading
import time
from configparser import ConfigParser
from configparser import RawConfigParser
from urllib import request
from urllib.error import HTTPError, URLError
@ -36,7 +36,7 @@ import pytest
from radicale import config, server
from .helpers import get_file_path
from .helpers import configuration_to_dict, get_file_path
try:
import gunicorn
@ -57,17 +57,18 @@ class TestBaseServerRequests:
def setup(self):
self.configuration = config.load()
self.colpath = tempfile.mkdtemp()
self.configuration["storage"]["filesystem_folder"] = self.colpath
# Enable debugging for new processes
self.configuration["logging"]["level"] = "debug"
# Disable syncing to disk for better performance
self.configuration["internal"]["filesystem_fsync"] = "False"
self.shutdown_socket, shutdown_socket_out = socket.socketpair()
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
# Find available port
sock.bind(("127.0.0.1", 0))
self.sockname = sock.getsockname()
self.configuration["server"]["hosts"] = "[%s]:%d" % self.sockname
self.configuration.update({
"storage": {"filesystem_folder": self.colpath},
"server": {"hosts": "[%s]:%d" % self.sockname},
# Enable debugging for new processes
"logging": {"level": "debug"},
# Disable syncing to disk for better performance
"internal": {"filesystem_fsync": "False"}}, "test")
self.thread = threading.Thread(target=server.serve, args=(
self.configuration, shutdown_socket_out))
ssl_context = ssl.create_default_context()
@ -89,8 +90,8 @@ class TestBaseServerRequests:
"""Send a request."""
if is_alive_fn is None:
is_alive_fn = self.thread.is_alive
scheme = ("https" if self.configuration.getboolean("server", "ssl")
else "http")
scheme = ("https" if self.configuration.get("server", "ssl") else
"http")
req = request.Request(
"%s://[%s]:%d%s" % (scheme, *self.sockname, path),
data=data, headers=headers, method=method)
@ -112,9 +113,10 @@ class TestBaseServerRequests:
assert status == 302
def test_ssl(self):
self.configuration["server"]["ssl"] = "True"
self.configuration["server"]["certificate"] = get_file_path("cert.pem")
self.configuration["server"]["key"] = get_file_path("key.pem")
self.configuration.update({
"server": {"ssl": "True",
"certificate": get_file_path("cert.pem"),
"key": get_file_path("key.pem")}}, "test")
self.thread.start()
status, _, _ = self.request("GET", "/")
assert status == 302
@ -129,7 +131,8 @@ class TestBaseServerRequests:
except OSError:
pytest.skip("IPv6 not supported")
self.sockname = sock.getsockname()[:2]
self.configuration["server"]["hosts"] = "[%s]:%d" % self.sockname
self.configuration.update({
"server": {"hosts": "[%s]:%d" % self.sockname}}, "test")
savedEaiAddrfamily = server.EAI_ADDRFAMILY
if os.name == "nt" and server.EAI_ADDRFAMILY is None:
# HACK: incomplete errno conversion in WINE
@ -143,17 +146,22 @@ class TestBaseServerRequests:
def test_command_line_interface(self):
config_args = []
for section, values in config.INITIAL_CONFIG.items():
for section, values in config.DEFAULT_CONFIG_SCHEMA.items():
if values.get("_internal", False):
continue
for option, data in values.items():
if option.startswith("_"):
continue
long_name = "--{0}-{1}".format(
section, option.replace("_", "-"))
if data["type"] == bool:
if not self.configuration.getboolean(section, option):
if not self.configuration.get(section, option):
long_name = "--no{0}".format(long_name[1:])
config_args.append(long_name)
else:
config_args.append(long_name)
config_args.append(self.configuration.get(section, option))
config_args.append(
self.configuration.get_raw(section, option))
env = os.environ.copy()
env["PYTHONPATH"] = os.pathsep.join(sys.path)
p = subprocess.Popen(
@ -170,18 +178,17 @@ class TestBaseServerRequests:
@pytest.mark.skipif(not gunicorn, reason="gunicorn module not found")
def test_wsgi_server(self):
config = ConfigParser()
config.read_dict(self.configuration)
assert config.remove_section("internal")
config_path = os.path.join(self.colpath, "config")
parser = RawConfigParser()
parser.read_dict(configuration_to_dict(self.configuration))
with open(config_path, "w") as f:
config.write(f)
parser.write(f)
env = os.environ.copy()
env["PYTHONPATH"] = os.pathsep.join(sys.path)
p = subprocess.Popen([
sys.executable,
"-c", "from gunicorn.app.wsgiapp import run; run()",
"--bind", self.configuration["server"]["hosts"],
"--bind", self.configuration.get_raw("server", "hosts"),
"--env", "RADICALE_CONFIG=%s" % config_path, "radicale"], env=env)
try:
status, _, _ = self.request(

View File

@ -1,5 +1,5 @@
# This file is part of Radicale Server - Calendar Server
# Copyright © 2018 Unrud <unrud@outlook.com>
# Copyright © 2018-2019 Unrud <unrud@outlook.com>
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -33,9 +33,10 @@ class TestBaseWebRequests(BaseTest):
def setup(self):
self.configuration = config.load()
self.colpath = tempfile.mkdtemp()
self.configuration["storage"]["filesystem_folder"] = self.colpath
self.configuration.update({
"storage": {"filesystem_folder": self.colpath},
# Disable syncing to disk for better performance
self.configuration["internal"]["filesystem_fsync"] = "False"
"internal": {"filesystem_fsync": "False"}}, "test")
self.application = Application(self.configuration)
def teardown(self):
@ -50,7 +51,7 @@ class TestBaseWebRequests(BaseTest):
assert answer
def test_none(self):
self.configuration["web"]["type"] = "none"
self.configuration.update({"web": {"type": "none"}}, "test")
self.application = Application(self.configuration)
status, _, answer = self.request("GET", "/.web")
assert status == 200
@ -60,7 +61,8 @@ class TestBaseWebRequests(BaseTest):
def test_custom(self):
"""Custom web plugin."""
self.configuration["web"]["type"] = "tests.custom.web"
self.configuration.update({
"web": {"type": "tests.custom.web"}}, "test")
self.application = Application(self.configuration)
status, _, answer = self.request("GET", "/.web")
assert status == 200