Type hints for tests

This commit is contained in:
Unrud 2021-07-26 20:56:47 +02:00 committed by Unrud
parent 698ae875ce
commit 60f25bf19a
12 changed files with 501 additions and 379 deletions

View File

@ -22,13 +22,19 @@ Tests for Radicale.
import base64 import base64
import logging import logging
import shutil
import sys import sys
import tempfile
import xml.etree.ElementTree as ET
from io import BytesIO from io import BytesIO
from typing import Any, Dict, List, Optional, Tuple, Union
import defusedxml.ElementTree as DefusedET import defusedxml.ElementTree as DefusedET
import radicale import radicale
from radicale import xmlutils from radicale import app, config, xmlutils
RESPONSES = Dict[str, Union[int, Dict[str, Tuple[int, ET.Element]]]]
# Enable debug output # Enable debug output
radicale.log.logger.setLevel(logging.DEBUG) radicale.log.logger.setLevel(logging.DEBUG)
@ -37,40 +43,70 @@ radicale.log.logger.setLevel(logging.DEBUG)
class BaseTest: class BaseTest:
"""Base class for tests.""" """Base class for tests."""
def request(self, method, path, data=None, login=None, **args): colpath: str
configuration: config.Configuration
application: app.Application
def setup(self) -> None:
self.configuration = config.load()
self.colpath = tempfile.mkdtemp()
self.configuration.update({
"storage": {"filesystem_folder": self.colpath,
# Disable syncing to disk for better performance
"_filesystem_fsync": "False"},
# Set incorrect authentication delay to a short duration
"auth": {"delay": "0.001"}}, "test", privileged=True)
self.application = app.Application(self.configuration)
def teardown(self) -> None:
shutil.rmtree(self.colpath)
def request(self, method: str, path: str, data: Optional[str] = None,
**kwargs) -> Tuple[int, Dict[str, str], str]:
"""Send a request.""" """Send a request."""
for key in args: login = kwargs.pop("login", None)
args[key.upper()] = args[key] if login is not None and not isinstance(login, str):
raise TypeError("login argument must be %r, not %r" %
(str, type(login)))
environ: Dict[str, Any] = {k.upper(): v for k, v in kwargs.items()}
for k, v in environ.items():
if not isinstance(v, str):
raise TypeError("type of %r is %r, expected %r" %
(k, type(v), str))
encoding: str = self.configuration.get("encoding", "request")
if login: if login:
args["HTTP_AUTHORIZATION"] = "Basic " + base64.b64encode( environ["HTTP_AUTHORIZATION"] = "Basic " + base64.b64encode(
login.encode()).decode() login.encode(encoding)).decode()
args["REQUEST_METHOD"] = method.upper() environ["REQUEST_METHOD"] = method.upper()
args["PATH_INFO"] = path environ["PATH_INFO"] = path
if data: if data:
data = data.encode() data_bytes = data.encode(encoding)
args["wsgi.input"] = BytesIO(data) environ["wsgi.input"] = BytesIO(data_bytes)
args["CONTENT_LENGTH"] = str(len(data)) environ["CONTENT_LENGTH"] = str(len(data_bytes))
args["wsgi.errors"] = sys.stderr environ["wsgi.errors"] = sys.stderr
status = headers = None status = headers = None
def start_response(status_, headers_): def start_response(status_: str, headers_: List[Tuple[str, str]]
) -> None:
nonlocal status, headers nonlocal status, headers
status = status_ status = status_
headers = headers_ headers = headers_
answer = self.application(args, start_response) answers = list(self.application(environ, start_response))
assert status is not None and headers is not None
return (int(status.split()[0]), dict(headers), return (int(status.split()[0]), dict(headers),
answer[0].decode() if answer else None) answers[0].decode() if answers else "")
@staticmethod @staticmethod
def parse_responses(text): def parse_responses(text: str) -> RESPONSES:
xml = DefusedET.fromstring(text) xml = DefusedET.fromstring(text)
assert xml.tag == xmlutils.make_clark("D:multistatus") assert xml.tag == xmlutils.make_clark("D:multistatus")
path_responses = {} path_responses: Dict[str, Union[
int, Dict[str, Tuple[int, ET.Element]]]] = {}
for response in xml.findall(xmlutils.make_clark("D:response")): for response in xml.findall(xmlutils.make_clark("D:response")):
href = response.find(xmlutils.make_clark("D:href")) href = response.find(xmlutils.make_clark("D:href"))
assert href.text not in path_responses assert href.text not in path_responses
prop_respones = {} prop_respones: Dict[str, Tuple[int, ET.Element]] = {}
for propstat in response.findall( for propstat in response.findall(
xmlutils.make_clark("D:propstat")): xmlutils.make_clark("D:propstat")):
status = propstat.find(xmlutils.make_clark("D:status")) status = propstat.find(xmlutils.make_clark("D:status"))
@ -92,70 +128,90 @@ class BaseTest:
return path_responses return path_responses
@staticmethod @staticmethod
def _check_status(status, good_status, check=True): def _check_status(status: int, good_status: int,
if check is True: check: Union[bool, int] = True) -> bool:
assert status == good_status if check is not False:
elif check is not False: expected = good_status if check is True else check
assert status == check assert status == expected, "%d != %d" % (status, expected)
return status == good_status return status == good_status
def get(self, path, check=True, **args): def get(self, path: str, check: Union[bool, int] = True, **kwargs
status, _, answer = self.request("GET", path, **args) ) -> Tuple[int, str]:
assert "data" not in kwargs
status, _, answer = self.request("GET", path, **kwargs)
self._check_status(status, 200, check) self._check_status(status, 200, check)
return status, answer return status, answer
def post(self, path, data=None, check=True, **args): def post(self, path: str, data: str = None, check: Union[bool, int] = True,
status, _, answer = self.request("POST", path, data, **args) **kwargs) -> Tuple[int, str]:
status, _, answer = self.request("POST", path, data, **kwargs)
self._check_status(status, 200, check) self._check_status(status, 200, check)
return status, answer return status, answer
def put(self, path, data, check=True, **args): def put(self, path: str, data: str, check: Union[bool, int] = True,
status, _, answer = self.request("PUT", path, data, **args) **kwargs) -> Tuple[int, str]:
status, _, answer = self.request("PUT", path, data, **kwargs)
self._check_status(status, 201, check) self._check_status(status, 201, check)
return status, answer return status, answer
def propfind(self, path, data=None, check=True, **args): def propfind(self, path: str, data: Optional[str] = None,
status, _, answer = self.request("PROPFIND", path, data, **args) check: Union[bool, int] = True, **kwargs
) -> Tuple[int, RESPONSES]:
status, _, answer = self.request("PROPFIND", path, data, **kwargs)
if not self._check_status(status, 207, check): if not self._check_status(status, 207, check):
return status, None return status, {}
assert answer is not None
responses = self.parse_responses(answer) responses = self.parse_responses(answer)
if args.get("HTTP_DEPTH", "0") == "0": if kwargs.get("HTTP_DEPTH", "0") == "0":
assert len(responses) == 1 and path in responses assert len(responses) == 1 and path in responses
return status, responses return status, responses
def proppatch(self, path, data=None, check=True, **args): def proppatch(self, path: str, data: Optional[str] = None,
status, _, answer = self.request("PROPPATCH", path, data, **args) check: Union[bool, int] = True, **kwargs
) -> Tuple[int, RESPONSES]:
status, _, answer = self.request("PROPPATCH", path, data, **kwargs)
if not self._check_status(status, 207, check): if not self._check_status(status, 207, check):
return status, None return status, {}
assert answer is not None
responses = self.parse_responses(answer) responses = self.parse_responses(answer)
assert len(responses) == 1 and path in responses assert len(responses) == 1 and path in responses
return status, responses return status, responses
def report(self, path, data, check=True, **args): def report(self, path: str, data: str, check: Union[bool, int] = True,
status, _, answer = self.request("REPORT", path, data, **args) **kwargs) -> Tuple[int, RESPONSES]:
status, _, answer = self.request("REPORT", path, data, **kwargs)
if not self._check_status(status, 207, check): if not self._check_status(status, 207, check):
return status, None return status, {}
assert answer is not None
return status, self.parse_responses(answer) return status, self.parse_responses(answer)
def delete(self, path, check=True, **args): def delete(self, path: str, check: Union[bool, int] = True, **kwargs
status, _, answer = self.request("DELETE", path, **args) ) -> Tuple[int, RESPONSES]:
assert "data" not in kwargs
status, _, answer = self.request("DELETE", path, **kwargs)
if not self._check_status(status, 200, check): if not self._check_status(status, 200, check):
return status, None return status, {}
assert answer is not None
responses = self.parse_responses(answer) responses = self.parse_responses(answer)
assert len(responses) == 1 and path in responses assert len(responses) == 1 and path in responses
return status, responses return status, responses
def mkcalendar(self, path, data=None, check=True, **args): def mkcalendar(self, path: str, data: Optional[str] = None,
status, _, answer = self.request("MKCALENDAR", path, data, **args) check: Union[bool, int] = True, **kwargs
) -> Tuple[int, str]:
status, _, answer = self.request("MKCALENDAR", path, data, **kwargs)
self._check_status(status, 201, check) self._check_status(status, 201, check)
return status, answer return status, answer
def mkcol(self, path, data=None, check=True, **args): def mkcol(self, path: str, data: Optional[str] = None,
status, _, _ = self.request("MKCOL", path, data, **args) check: Union[bool, int] = True, **kwargs) -> int:
status, _, _ = self.request("MKCOL", path, data, **kwargs)
self._check_status(status, 201, check) self._check_status(status, 201, check)
return status return status
def create_addressbook(self, path, check=True, **args): def create_addressbook(self, path: str, check: Union[bool, int] = True,
**kwargs) -> int:
assert "data" not in kwargs
return self.mkcol(path, """\ return self.mkcol(path, """\
<?xml version="1.0" encoding="UTF-8" ?> <?xml version="1.0" encoding="UTF-8" ?>
<create xmlns="DAV:" xmlns:CR="urn:ietf:params:xml:ns:carddav"> <create xmlns="DAV:" xmlns:CR="urn:ietf:params:xml:ns:carddav">
@ -167,4 +223,4 @@ class BaseTest:
</resourcetype> </resourcetype>
</prop> </prop>
</set> </set>
</create>""", check=check, **args) </create>""", check=check, **kwargs)

View File

@ -28,7 +28,8 @@ from radicale import auth
class Auth(auth.BaseAuth): class Auth(auth.BaseAuth):
def login(self, login, password):
def login(self, login: str, password: str) -> str:
if login == "tmp": if login == "tmp":
return login return login
return "" return ""

View File

@ -23,7 +23,8 @@ from radicale import pathutils, rights
class Rights(rights.BaseRights): class Rights(rights.BaseRights):
def authorization(self, user, path):
def authorization(self, user: str, path: str) -> str:
sane_path = pathutils.strip_path(path) sane_path = pathutils.strip_path(path)
if sane_path not in ("tmp", "other"): if sane_path not in ("tmp", "other"):
return "" return ""

View File

@ -27,8 +27,10 @@ from radicale.storage import BaseCollection, multifilesystem
class Collection(multifilesystem.Collection): class Collection(multifilesystem.Collection):
sync = BaseCollection.sync sync = BaseCollection.sync
class Storage(multifilesystem.Storage): class Storage(multifilesystem.Storage):
_collection_class = Collection _collection_class = Collection

View File

@ -21,13 +21,16 @@ Custom web plugin.
from http import client from http import client
from radicale import httputils, web from radicale import httputils, types, web
class Web(web.BaseWeb): class Web(web.BaseWeb):
def get(self, environ, base_prefix, path, user):
def get(self, environ: types.WSGIEnviron, base_prefix: str, path: str,
user: str) -> types.WSGIResponse:
return client.OK, {"Content-Type": "text/plain"}, "custom" return client.OK, {"Content-Type": "text/plain"}, "custom"
def post(self, environ, base_prefix, path, user): def post(self, environ: types.WSGIEnviron, base_prefix: str, path: str,
user: str) -> types.WSGIResponse:
content = httputils.read_request_body(self.configuration, environ) content = httputils.read_request_body(self.configuration, environ)
return client.OK, {"Content-Type": "text/plain"}, "echo:" + content return client.OK, {"Content-Type": "text/plain"}, "echo:" + content

View File

@ -26,19 +26,21 @@ This module offers helpers to use in tests.
import os import os
EXAMPLES_FOLDER = os.path.join(os.path.dirname(__file__), "static") from radicale import config, types
EXAMPLES_FOLDER: str = os.path.join(os.path.dirname(__file__), "static")
def get_file_path(file_name): def get_file_path(file_name: str) -> str:
return os.path.join(EXAMPLES_FOLDER, file_name) return os.path.join(EXAMPLES_FOLDER, file_name)
def get_file_content(file_name): def get_file_content(file_name: str) -> str:
with open(get_file_path(file_name), encoding="utf-8") as fd: with open(get_file_path(file_name), encoding="utf-8") as f:
return fd.read() return f.read()
def configuration_to_dict(configuration): def configuration_to_dict(configuration: config.Configuration) -> types.CONFIG:
"""Convert configuration to a dict with raw values.""" """Convert configuration to a dict with raw values."""
return {section: {option: configuration.get_raw(section, option) return {section: {option: configuration.get_raw(section, option)
for option in configuration.options(section) for option in configuration.options(section)

View File

@ -22,13 +22,12 @@ Radicale tests with simple requests and authentication.
""" """
import os import os
import shutil
import sys import sys
import tempfile from typing import Iterable, Tuple, Union
import pytest import pytest
from radicale import Application, config, xmlutils from radicale import Application, xmlutils
from radicale.tests import BaseTest from radicale.tests import BaseTest
@ -38,21 +37,10 @@ class TestBaseAuthRequests(BaseTest):
We should setup auth for each type before creating the Application object. We should setup auth for each type before creating the Application object.
""" """
def setup(self):
self.configuration = config.load()
self.colpath = tempfile.mkdtemp()
self.configuration.update({
"storage": {"filesystem_folder": self.colpath,
# Disable syncing to disk for better performance
"_filesystem_fsync": "False"},
# Set incorrect authentication delay to a very low value
"auth": {"delay": "0.002"}}, "test", privileged=True)
def teardown(self): def _test_htpasswd(self, htpasswd_encryption: str, htpasswd_content: str,
shutil.rmtree(self.colpath) test_matrix: Union[str, Iterable[Tuple[str, str, bool]]]
= "ascii") -> None:
def _test_htpasswd(self, htpasswd_encryption, htpasswd_content,
test_matrix="ascii"):
"""Test htpasswd authentication with user "tmp" and password "bepo" for """Test htpasswd authentication with user "tmp" and password "bepo" for
``test_matrix`` "ascii" or user "😀" and password "🔑" for ``test_matrix`` "ascii" or user "😀" and password "🔑" for
``test_matrix`` "unicode".""" ``test_matrix`` "unicode"."""
@ -67,7 +55,7 @@ class TestBaseAuthRequests(BaseTest):
except MissingBackendError: except MissingBackendError:
pytest.skip("bcrypt backend for passlib is not installed") pytest.skip("bcrypt backend for passlib is not installed")
htpasswd_file_path = os.path.join(self.colpath, ".htpasswd") htpasswd_file_path = os.path.join(self.colpath, ".htpasswd")
encoding = self.configuration.get("encoding", "stock") encoding: str = self.configuration.get("encoding", "stock")
with open(htpasswd_file_path, "w", encoding=encoding) as f: with open(htpasswd_file_path, "w", encoding=encoding) as f:
f.write(htpasswd_content) f.write(htpasswd_content)
self.configuration.update({ self.configuration.update({
@ -83,54 +71,56 @@ class TestBaseAuthRequests(BaseTest):
test_matrix = (("😀", "🔑", True), ("😀", "🌹", False), test_matrix = (("😀", "🔑", True), ("😀", "🌹", False),
("😁", "🔑", False), ("😀", "", False), ("😁", "🔑", False), ("😀", "", False),
("", "🔑", False), ("", "", False)) ("", "🔑", False), ("", "", False))
elif isinstance(test_matrix, str):
raise ValueError("Unknown test matrix %r" % test_matrix)
for user, password, valid in test_matrix: for user, password, valid in test_matrix:
self.propfind("/", check=207 if valid else 401, self.propfind("/", check=207 if valid else 401,
login="%s:%s" % (user, password)) login="%s:%s" % (user, password))
def test_htpasswd_plain(self): def test_htpasswd_plain(self) -> None:
self._test_htpasswd("plain", "tmp:bepo") self._test_htpasswd("plain", "tmp:bepo")
def test_htpasswd_plain_password_split(self): def test_htpasswd_plain_password_split(self) -> None:
self._test_htpasswd("plain", "tmp:be:po", ( self._test_htpasswd("plain", "tmp:be:po", (
("tmp", "be:po", True), ("tmp", "bepo", False))) ("tmp", "be:po", True), ("tmp", "bepo", False)))
def test_htpasswd_plain_unicode(self): def test_htpasswd_plain_unicode(self) -> None:
self._test_htpasswd("plain", "😀:🔑", "unicode") self._test_htpasswd("plain", "😀:🔑", "unicode")
def test_htpasswd_md5(self): def test_htpasswd_md5(self) -> None:
self._test_htpasswd("md5", "tmp:$apr1$BI7VKCZh$GKW4vq2hqDINMr8uv7lDY/") self._test_htpasswd("md5", "tmp:$apr1$BI7VKCZh$GKW4vq2hqDINMr8uv7lDY/")
def test_htpasswd_md5_unicode(self): def test_htpasswd_md5_unicode(self):
self._test_htpasswd( self._test_htpasswd(
"md5", "😀:$apr1$w4ev89r1$29xO8EvJmS2HEAadQ5qy11", "unicode") "md5", "😀:$apr1$w4ev89r1$29xO8EvJmS2HEAadQ5qy11", "unicode")
def test_htpasswd_bcrypt(self): def test_htpasswd_bcrypt(self) -> None:
self._test_htpasswd("bcrypt", "tmp:$2y$05$oD7hbiQFQlvCM7zoalo/T.MssV3V" self._test_htpasswd("bcrypt", "tmp:$2y$05$oD7hbiQFQlvCM7zoalo/T.MssV3V"
"NTRI3w5KDnj8NTUKJNWfVpvRq") "NTRI3w5KDnj8NTUKJNWfVpvRq")
def test_htpasswd_bcrypt_unicode(self): def test_htpasswd_bcrypt_unicode(self) -> None:
self._test_htpasswd("bcrypt", "😀:$2y$10$Oyz5aHV4MD9eQJbk6GPemOs4T6edK" self._test_htpasswd("bcrypt", "😀:$2y$10$Oyz5aHV4MD9eQJbk6GPemOs4T6edK"
"6U9Sqlzr.W1mMVCS8wJUftnW", "unicode") "6U9Sqlzr.W1mMVCS8wJUftnW", "unicode")
def test_htpasswd_multi(self): def test_htpasswd_multi(self) -> None:
self._test_htpasswd("plain", "ign:ign\ntmp:bepo") self._test_htpasswd("plain", "ign:ign\ntmp:bepo")
@pytest.mark.skipif(sys.platform == "win32", reason="leading and trailing " @pytest.mark.skipif(sys.platform == "win32", reason="leading and trailing "
"whitespaces not allowed in file names") "whitespaces not allowed in file names")
def test_htpasswd_whitespace_user(self): def test_htpasswd_whitespace_user(self) -> None:
for user in (" tmp", "tmp ", " tmp "): for user in (" tmp", "tmp ", " tmp "):
self._test_htpasswd("plain", "%s:bepo" % user, ( self._test_htpasswd("plain", "%s:bepo" % user, (
(user, "bepo", True), ("tmp", "bepo", False))) (user, "bepo", True), ("tmp", "bepo", False)))
def test_htpasswd_whitespace_password(self): def test_htpasswd_whitespace_password(self) -> None:
for password in (" bepo", "bepo ", " bepo "): for password in (" bepo", "bepo ", " bepo "):
self._test_htpasswd("plain", "tmp:%s" % password, ( self._test_htpasswd("plain", "tmp:%s" % password, (
("tmp", password, True), ("tmp", "bepo", False))) ("tmp", password, True), ("tmp", "bepo", False)))
def test_htpasswd_comment(self): def test_htpasswd_comment(self) -> None:
self._test_htpasswd("plain", "#comment\n #comment\n \ntmp:bepo\n\n") self._test_htpasswd("plain", "#comment\n #comment\n \ntmp:bepo\n\n")
def test_remote_user(self): def test_remote_user(self) -> None:
self.configuration.update({"auth": {"type": "remote_user"}}, "test") self.configuration.update({"auth": {"type": "remote_user"}}, "test")
self.application = Application(self.configuration) self.application = Application(self.configuration)
_, responses = self.propfind("/", """\ _, responses = self.propfind("/", """\
@ -140,11 +130,15 @@ class TestBaseAuthRequests(BaseTest):
<current-user-principal /> <current-user-principal />
</prop> </prop>
</propfind>""", REMOTE_USER="test") </propfind>""", REMOTE_USER="test")
status, prop = responses["/"]["D:current-user-principal"] assert responses is not None
response = responses["/"]
assert not isinstance(response, int)
status, prop = response["D:current-user-principal"]
assert status == 200 assert status == 200
assert prop.find(xmlutils.make_clark("D:href")).text == "/test/" href_element = prop.find(xmlutils.make_clark("D:href"))
assert href_element is not None and href_element.text == "/test/"
def test_http_x_remote_user(self): def test_http_x_remote_user(self) -> None:
self.configuration.update( self.configuration.update(
{"auth": {"type": "http_x_remote_user"}}, "test") {"auth": {"type": "http_x_remote_user"}}, "test")
self.application = Application(self.configuration) self.application = Application(self.configuration)
@ -155,11 +149,15 @@ class TestBaseAuthRequests(BaseTest):
<current-user-principal /> <current-user-principal />
</prop> </prop>
</propfind>""", HTTP_X_REMOTE_USER="test") </propfind>""", HTTP_X_REMOTE_USER="test")
status, prop = responses["/"]["D:current-user-principal"] assert responses is not None
response = responses["/"]
assert not isinstance(response, int)
status, prop = response["D:current-user-principal"]
assert status == 200 assert status == 200
assert prop.find(xmlutils.make_clark("D:href")).text == "/test/" href_element = prop.find(xmlutils.make_clark("D:href"))
assert href_element is not None and href_element.text == "/test/"
def test_custom(self): def test_custom(self) -> None:
"""Custom authentication.""" """Custom authentication."""
self.configuration.update( self.configuration.update(
{"auth": {"type": "radicale.tests.custom.auth"}}, "test") {"auth": {"type": "radicale.tests.custom.auth"}}, "test")

File diff suppressed because it is too large Load Diff

View File

@ -18,23 +18,26 @@ import os
import shutil import shutil
import tempfile import tempfile
from configparser import RawConfigParser from configparser import RawConfigParser
from typing import List, Tuple
import pytest import pytest
from radicale import config from radicale import config, types
from radicale.tests.helpers import configuration_to_dict from radicale.tests.helpers import configuration_to_dict
class TestConfig: class TestConfig:
"""Test the configuration.""" """Test the configuration."""
def setup(self): colpath: str
def setup(self) -> None:
self.colpath = tempfile.mkdtemp() self.colpath = tempfile.mkdtemp()
def teardown(self): def teardown(self) -> None:
shutil.rmtree(self.colpath) shutil.rmtree(self.colpath)
def _write_config(self, config_dict, name): def _write_config(self, config_dict: types.CONFIG, name: str) -> str:
parser = RawConfigParser() parser = RawConfigParser()
parser.read_dict(config_dict) parser.read_dict(config_dict)
config_path = os.path.join(self.colpath, name) config_path = os.path.join(self.colpath, name)
@ -42,7 +45,7 @@ class TestConfig:
parser.write(f) parser.write(f)
return config_path return config_path
def test_parse_compound_paths(self): def test_parse_compound_paths(self) -> None:
assert len(config.parse_compound_paths()) == 0 assert len(config.parse_compound_paths()) == 0
assert len(config.parse_compound_paths("")) == 0 assert len(config.parse_compound_paths("")) == 0
assert len(config.parse_compound_paths(None, "")) == 0 assert len(config.parse_compound_paths(None, "")) == 0
@ -62,16 +65,16 @@ class TestConfig:
assert os.path.basename(paths[i][0]) == name assert os.path.basename(paths[i][0]) == name
assert paths[i][1] is ignore_if_missing assert paths[i][1] is ignore_if_missing
def test_load_empty(self): def test_load_empty(self) -> None:
config_path = self._write_config({}, "config") config_path = self._write_config({}, "config")
config.load([(config_path, False)]) config.load([(config_path, False)])
def test_load_full(self): def test_load_full(self) -> None:
config_path = self._write_config( config_path = self._write_config(
configuration_to_dict(config.load()), "config") configuration_to_dict(config.load()), "config")
config.load([(config_path, False)]) config.load([(config_path, False)])
def test_load_missing(self): def test_load_missing(self) -> None:
config_path = os.path.join(self.colpath, "does_not_exist") config_path = os.path.join(self.colpath, "does_not_exist")
config.load([(config_path, True)]) config.load([(config_path, True)])
with pytest.raises(Exception) as exc_info: with pytest.raises(Exception) as exc_info:
@ -79,18 +82,20 @@ class TestConfig:
e = exc_info.value e = exc_info.value
assert "Failed to load config file %r" % config_path in str(e) assert "Failed to load config file %r" % config_path in str(e)
def test_load_multiple(self): def test_load_multiple(self) -> None:
config_path1 = self._write_config({ config_path1 = self._write_config({
"server": {"hosts": "192.0.2.1:1111"}}, "config1") "server": {"hosts": "192.0.2.1:1111"}}, "config1")
config_path2 = self._write_config({ config_path2 = self._write_config({
"server": {"max_connections": 1111}}, "config2") "server": {"max_connections": 1111}}, "config2")
configuration = config.load([(config_path1, False), configuration = config.load([(config_path1, False),
(config_path2, False)]) (config_path2, False)])
assert len(configuration.get("server", "hosts")) == 1 server_hosts: List[Tuple[str, int]] = configuration.get(
assert configuration.get("server", "hosts")[0] == ("192.0.2.1", 1111) "server", "hosts")
assert len(server_hosts) == 1
assert server_hosts[0] == ("192.0.2.1", 1111)
assert configuration.get("server", "max_connections") == 1111 assert configuration.get("server", "max_connections") == 1111
def test_copy(self): def test_copy(self) -> None:
configuration1 = config.load() configuration1 = config.load()
configuration1.update({"server": {"max_connections": "1111"}}, "test") configuration1.update({"server": {"max_connections": "1111"}}, "test")
configuration2 = configuration1.copy() configuration2 = configuration1.copy()
@ -98,14 +103,14 @@ class TestConfig:
assert configuration1.get("server", "max_connections") == 1111 assert configuration1.get("server", "max_connections") == 1111
assert configuration2.get("server", "max_connections") == 1112 assert configuration2.get("server", "max_connections") == 1112
def test_invalid_section(self): def test_invalid_section(self) -> None:
configuration = config.load() configuration = config.load()
with pytest.raises(Exception) as exc_info: with pytest.raises(Exception) as exc_info:
configuration.update({"does_not_exist": {"x": "x"}}, "test") configuration.update({"does_not_exist": {"x": "x"}}, "test")
e = exc_info.value e = exc_info.value
assert "Invalid section 'does_not_exist'" in str(e) assert "Invalid section 'does_not_exist'" in str(e)
def test_invalid_option(self): def test_invalid_option(self) -> None:
configuration = config.load() configuration = config.load()
with pytest.raises(Exception) as exc_info: with pytest.raises(Exception) as exc_info:
configuration.update({"server": {"x": "x"}}, "test") configuration.update({"server": {"x": "x"}}, "test")
@ -113,7 +118,7 @@ class TestConfig:
assert "Invalid option 'x'" in str(e) assert "Invalid option 'x'" in str(e)
assert "section 'server'" in str(e) assert "section 'server'" in str(e)
def test_invalid_option_plugin(self): def test_invalid_option_plugin(self) -> None:
configuration = config.load() configuration = config.load()
with pytest.raises(Exception) as exc_info: with pytest.raises(Exception) as exc_info:
configuration.update({"auth": {"x": "x"}}, "test") configuration.update({"auth": {"x": "x"}}, "test")
@ -121,7 +126,7 @@ class TestConfig:
assert "Invalid option 'x'" in str(e) assert "Invalid option 'x'" in str(e)
assert "section 'auth'" in str(e) assert "section 'auth'" in str(e)
def test_invalid_value(self): def test_invalid_value(self) -> None:
configuration = config.load() configuration = config.load()
with pytest.raises(Exception) as exc_info: with pytest.raises(Exception) as exc_info:
configuration.update({"server": {"max_connections": "x"}}, "test") configuration.update({"server": {"max_connections": "x"}}, "test")
@ -131,7 +136,7 @@ class TestConfig:
assert "section 'server" in str(e) assert "section 'server" in str(e)
assert "'x'" in str(e) assert "'x'" in str(e)
def test_privileged(self): def test_privileged(self) -> None:
configuration = config.load() configuration = config.load()
configuration.update({"server": {"_internal_server": "True"}}, configuration.update({"server": {"_internal_server": "True"}},
"test", privileged=True) "test", privileged=True)
@ -141,9 +146,9 @@ class TestConfig:
e = exc_info.value e = exc_info.value
assert "Invalid option '_internal_server'" in str(e) assert "Invalid option '_internal_server'" in str(e)
def test_plugin_schema(self): def test_plugin_schema(self) -> None:
plugin_schema = {"auth": {"new_option": {"value": "False", plugin_schema: types.CONFIG_SCHEMA = {
"type": bool}}} "auth": {"new_option": {"value": "False", "type": bool}}}
configuration = config.load() configuration = config.load()
configuration.update({"auth": {"type": "new_plugin"}}, "test") configuration.update({"auth": {"type": "new_plugin"}}, "test")
plugin_configuration = configuration.copy(plugin_schema) plugin_configuration = configuration.copy(plugin_schema)
@ -152,26 +157,26 @@ class TestConfig:
plugin_configuration = configuration.copy(plugin_schema) plugin_configuration = configuration.copy(plugin_schema)
assert plugin_configuration.get("auth", "new_option") is True assert plugin_configuration.get("auth", "new_option") is True
def test_plugin_schema_duplicate_option(self): def test_plugin_schema_duplicate_option(self) -> None:
plugin_schema = {"auth": {"type": {"value": "False", plugin_schema: types.CONFIG_SCHEMA = {
"type": bool}}} "auth": {"type": {"value": "False", "type": bool}}}
configuration = config.load() configuration = config.load()
with pytest.raises(Exception) as exc_info: with pytest.raises(Exception) as exc_info:
configuration.copy(plugin_schema) configuration.copy(plugin_schema)
e = exc_info.value e = exc_info.value
assert "option already exists in 'auth': 'type'" in str(e) assert "option already exists in 'auth': 'type'" in str(e)
def test_plugin_schema_invalid(self): def test_plugin_schema_invalid(self) -> None:
plugin_schema = {"server": {"new_option": {"value": "False", plugin_schema: types.CONFIG_SCHEMA = {
"type": bool}}} "server": {"new_option": {"value": "False", "type": bool}}}
configuration = config.load() configuration = config.load()
with pytest.raises(Exception) as exc_info: with pytest.raises(Exception) as exc_info:
configuration.copy(plugin_schema) configuration.copy(plugin_schema)
e = exc_info.value e = exc_info.value
assert "not a plugin section: 'server" in str(e) assert "not a plugin section: 'server" in str(e)
def test_plugin_schema_option_invalid(self): def test_plugin_schema_option_invalid(self) -> None:
plugin_schema = {"auth": {}} plugin_schema: types.CONFIG_SCHEMA = {"auth": {}}
configuration = config.load() configuration = config.load()
configuration.update({"auth": {"type": "new_plugin", configuration.update({"auth": {"type": "new_plugin",
"new_option": False}}, "test") "new_option": False}}, "test")

View File

@ -19,10 +19,8 @@ Radicale tests with simple requests and rights.
""" """
import os import os
import shutil
import tempfile
from radicale import Application, config from radicale import Application
from radicale.tests import BaseTest from radicale.tests import BaseTest
from radicale.tests.helpers import get_file_content from radicale.tests.helpers import get_file_content
@ -30,20 +28,8 @@ from radicale.tests.helpers import get_file_content
class TestBaseRightsRequests(BaseTest): class TestBaseRightsRequests(BaseTest):
"""Tests basic requests with rights.""" """Tests basic requests with rights."""
def setup(self): def _test_rights(self, rights_type: str, user: str, path: str, mode: str,
self.configuration = config.load() expected_status: int, with_auth: bool = True) -> None:
self.colpath = tempfile.mkdtemp()
self.configuration.update({
"storage": {"filesystem_folder": self.colpath,
# Disable syncing to disk for better performance
"_filesystem_fsync": "False"}},
"test", privileged=True)
def teardown(self):
shutil.rmtree(self.colpath)
def _test_rights(self, rights_type, user, path, mode, expected_status,
with_auth=True):
assert mode in ("r", "w") assert mode in ("r", "w")
assert user in ("", "tmp") assert user in ("", "tmp")
htpasswd_file_path = os.path.join(self.colpath, ".htpasswd") htpasswd_file_path = os.path.join(self.colpath, ".htpasswd")
@ -61,7 +47,7 @@ class TestBaseRightsRequests(BaseTest):
(self.propfind if mode == "r" else self.proppatch)( (self.propfind if mode == "r" else self.proppatch)(
path, check=expected_status, login="tmp:bepo" if user else None) path, check=expected_status, login="tmp:bepo" if user else None)
def test_owner_only(self): def test_owner_only(self) -> None:
self._test_rights("owner_only", "", "/", "r", 401) self._test_rights("owner_only", "", "/", "r", 401)
self._test_rights("owner_only", "", "/", "w", 401) self._test_rights("owner_only", "", "/", "w", 401)
self._test_rights("owner_only", "", "/tmp/", "r", 401) self._test_rights("owner_only", "", "/tmp/", "r", 401)
@ -73,13 +59,13 @@ class TestBaseRightsRequests(BaseTest):
self._test_rights("owner_only", "tmp", "/other/", "r", 403) self._test_rights("owner_only", "tmp", "/other/", "r", 403)
self._test_rights("owner_only", "tmp", "/other/", "w", 403) self._test_rights("owner_only", "tmp", "/other/", "w", 403)
def test_owner_only_without_auth(self): def test_owner_only_without_auth(self) -> None:
self._test_rights("owner_only", "", "/", "r", 207, False) self._test_rights("owner_only", "", "/", "r", 207, False)
self._test_rights("owner_only", "", "/", "w", 401, False) self._test_rights("owner_only", "", "/", "w", 401, False)
self._test_rights("owner_only", "", "/tmp/", "r", 207, False) self._test_rights("owner_only", "", "/tmp/", "r", 207, False)
self._test_rights("owner_only", "", "/tmp/", "w", 207, False) self._test_rights("owner_only", "", "/tmp/", "w", 207, False)
def test_owner_write(self): def test_owner_write(self) -> None:
self._test_rights("owner_write", "", "/", "r", 401) self._test_rights("owner_write", "", "/", "r", 401)
self._test_rights("owner_write", "", "/", "w", 401) self._test_rights("owner_write", "", "/", "w", 401)
self._test_rights("owner_write", "", "/tmp/", "r", 401) self._test_rights("owner_write", "", "/tmp/", "r", 401)
@ -91,13 +77,13 @@ class TestBaseRightsRequests(BaseTest):
self._test_rights("owner_write", "tmp", "/other/", "r", 207) self._test_rights("owner_write", "tmp", "/other/", "r", 207)
self._test_rights("owner_write", "tmp", "/other/", "w", 403) self._test_rights("owner_write", "tmp", "/other/", "w", 403)
def test_owner_write_without_auth(self): def test_owner_write_without_auth(self) -> None:
self._test_rights("owner_write", "", "/", "r", 207, False) self._test_rights("owner_write", "", "/", "r", 207, False)
self._test_rights("owner_write", "", "/", "w", 401, False) self._test_rights("owner_write", "", "/", "w", 401, False)
self._test_rights("owner_write", "", "/tmp/", "r", 207, False) self._test_rights("owner_write", "", "/tmp/", "r", 207, False)
self._test_rights("owner_write", "", "/tmp/", "w", 207, False) self._test_rights("owner_write", "", "/tmp/", "w", 207, False)
def test_authenticated(self): def test_authenticated(self) -> None:
self._test_rights("authenticated", "", "/", "r", 401) self._test_rights("authenticated", "", "/", "r", 401)
self._test_rights("authenticated", "", "/", "w", 401) self._test_rights("authenticated", "", "/", "w", 401)
self._test_rights("authenticated", "", "/tmp/", "r", 401) self._test_rights("authenticated", "", "/tmp/", "r", 401)
@ -109,13 +95,13 @@ class TestBaseRightsRequests(BaseTest):
self._test_rights("authenticated", "tmp", "/other/", "r", 207) self._test_rights("authenticated", "tmp", "/other/", "r", 207)
self._test_rights("authenticated", "tmp", "/other/", "w", 207) self._test_rights("authenticated", "tmp", "/other/", "w", 207)
def test_authenticated_without_auth(self): def test_authenticated_without_auth(self) -> None:
self._test_rights("authenticated", "", "/", "r", 207, False) self._test_rights("authenticated", "", "/", "r", 207, False)
self._test_rights("authenticated", "", "/", "w", 207, False) self._test_rights("authenticated", "", "/", "w", 207, False)
self._test_rights("authenticated", "", "/tmp/", "r", 207, False) self._test_rights("authenticated", "", "/tmp/", "r", 207, False)
self._test_rights("authenticated", "", "/tmp/", "w", 207, False) self._test_rights("authenticated", "", "/tmp/", "w", 207, False)
def test_from_file(self): def test_from_file(self) -> None:
rights_file_path = os.path.join(self.colpath, "rights") rights_file_path = os.path.join(self.colpath, "rights")
with open(rights_file_path, "w") as f: with open(rights_file_path, "w") as f:
f.write("""\ f.write("""\
@ -160,13 +146,13 @@ permissions: i""")
self.get("/public/calendar") self.get("/public/calendar")
self.get("/public/calendar/1.ics", check=401) self.get("/public/calendar/1.ics", check=401)
def test_custom(self): def test_custom(self) -> None:
"""Custom rights management.""" """Custom rights management."""
self._test_rights("radicale.tests.custom.rights", "", "/", "r", 401) self._test_rights("radicale.tests.custom.rights", "", "/", "r", 401)
self._test_rights( self._test_rights(
"radicale.tests.custom.rights", "", "/tmp/", "r", 207) "radicale.tests.custom.rights", "", "/tmp/", "r", 207)
def test_collections_and_items(self): def test_collections_and_items(self) -> None:
"""Test rights for creation of collections, calendars and items. """Test rights for creation of collections, calendars and items.
Collections are allowed at "/" and "/.../". Collections are allowed at "/" and "/.../".
@ -183,7 +169,7 @@ permissions: i""")
self.mkcol("/user/calendar/item", check=401) self.mkcol("/user/calendar/item", check=401)
self.mkcalendar("/user/calendar/item", check=401) self.mkcalendar("/user/calendar/item", check=401)
def test_put_collections_and_items(self): def test_put_collections_and_items(self) -> None:
"""Test rights for creation of calendars and items with PUT.""" """Test rights for creation of calendars and items with PUT."""
self.application = Application(self.configuration) self.application = Application(self.configuration)
self.put("/user/", "BEGIN:VCALENDAR\r\nEND:VCALENDAR", check=401) self.put("/user/", "BEGIN:VCALENDAR\r\nEND:VCALENDAR", check=401)

View File

@ -21,15 +21,14 @@ Test the internal server.
import errno import errno
import os import os
import shutil
import socket import socket
import ssl import ssl
import subprocess import subprocess
import sys import sys
import tempfile
import threading import threading
import time import time
from configparser import RawConfigParser from configparser import RawConfigParser
from typing import Callable, Dict, NoReturn, Optional, Tuple, cast
from urllib import request from urllib import request
from urllib.error import HTTPError, URLError from urllib.error import HTTPError, URLError
@ -41,34 +40,43 @@ from radicale.tests.helpers import configuration_to_dict, get_file_path
class DisabledRedirectHandler(request.HTTPRedirectHandler): class DisabledRedirectHandler(request.HTTPRedirectHandler):
def http_error_301(self, req, fp, code, msg, headers):
# HACK: typeshed annotation are wrong for `fp` and `msg`
# (https://github.com/python/typeshed/pull/5728)
# `headers` is incompatible with `http.client.HTTPMessage`
# (https://github.com/python/typeshed/issues/5729)
def http_error_301(self, req: request.Request, fp, code: int,
msg, headers) -> NoReturn:
raise HTTPError(req.full_url, code, msg, headers, fp) raise HTTPError(req.full_url, code, msg, headers, fp)
def http_error_302(self, req, fp, code, msg, headers): def http_error_302(self, req: request.Request, fp, code: int,
msg, headers) -> NoReturn:
raise HTTPError(req.full_url, code, msg, headers, fp) raise HTTPError(req.full_url, code, msg, headers, fp)
def http_error_303(self, req, fp, code, msg, headers): def http_error_303(self, req: request.Request, fp, code: int,
msg, headers) -> NoReturn:
raise HTTPError(req.full_url, code, msg, headers, fp) raise HTTPError(req.full_url, code, msg, headers, fp)
def http_error_307(self, req, fp, code, msg, headers): def http_error_307(self, req: request.Request, fp, code: int,
msg, headers) -> NoReturn:
raise HTTPError(req.full_url, code, msg, headers, fp) raise HTTPError(req.full_url, code, msg, headers, fp)
class TestBaseServerRequests(BaseTest): class TestBaseServerRequests(BaseTest):
"""Test the internal server.""" """Test the internal server."""
def setup(self): shutdown_socket: socket.socket
self.configuration = config.load() thread: threading.Thread
self.colpath = tempfile.mkdtemp() opener: request.OpenerDirector
def setup(self) -> None:
super().setup()
self.shutdown_socket, shutdown_socket_out = socket.socketpair() self.shutdown_socket, shutdown_socket_out = socket.socketpair()
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
# Find available port # Find available port
sock.bind(("127.0.0.1", 0)) sock.bind(("127.0.0.1", 0))
self.sockname = sock.getsockname() self.sockname = sock.getsockname()
self.configuration.update({ self.configuration.update({
"storage": {"filesystem_folder": self.colpath,
# Disable syncing to disk for better performance
"_filesystem_fsync": "False"},
"server": {"hosts": "[%s]:%d" % self.sockname}, "server": {"hosts": "[%s]:%d" % self.sockname},
# Enable debugging for new processes # Enable debugging for new processes
"logging": {"level": "debug"}}, "logging": {"level": "debug"}},
@ -82,40 +90,57 @@ class TestBaseServerRequests(BaseTest):
request.HTTPSHandler(context=ssl_context), request.HTTPSHandler(context=ssl_context),
DisabledRedirectHandler) DisabledRedirectHandler)
def teardown(self): def teardown(self) -> None:
self.shutdown_socket.close() self.shutdown_socket.close()
try: try:
self.thread.join() self.thread.join()
except RuntimeError: # Thread never started except RuntimeError: # Thread never started
pass pass
shutil.rmtree(self.colpath) super().teardown()
def request(self, method, path, data=None, is_alive_fn=None, **headers): def request(self, method: str, path: str, data: Optional[str] = None,
**kwargs) -> Tuple[int, Dict[str, str], str]:
"""Send a request.""" """Send a request."""
login = kwargs.pop("login", None)
if login is not None and not isinstance(login, str):
raise TypeError("login argument must be %r, not %r" %
(str, type(login)))
if login:
raise NotImplementedError
is_alive_fn: Optional[Callable[[], bool]] = kwargs.pop(
"is_alive_fn", None)
headers: Dict[str, str] = kwargs
for k, v in headers.items():
if not isinstance(v, str):
raise TypeError("type of %r is %r, expected %r" %
(k, type(v), str))
if is_alive_fn is None: if is_alive_fn is None:
is_alive_fn = self.thread.is_alive is_alive_fn = self.thread.is_alive
scheme = ("https" if self.configuration.get("server", "ssl") else encoding: str = self.configuration.get("encoding", "request")
"http") scheme = "https" if self.configuration.get("server", "ssl") else "http"
data_bytes = None
if data:
data_bytes = data.encode(encoding)
req = request.Request( req = request.Request(
"%s://[%s]:%d%s" % (scheme, *self.sockname, path), "%s://[%s]:%d%s" % (scheme, *self.sockname, path),
data=data, headers=headers, method=method) data=data_bytes, headers=headers, method=method)
while True: while True:
assert is_alive_fn() assert is_alive_fn()
try: try:
with self.opener.open(req) as f: with self.opener.open(req) as f:
return f.getcode(), f.info(), f.read().decode() return f.getcode(), dict(f.info()), f.read().decode()
except HTTPError as e: except HTTPError as e:
return e.code, e.headers, e.read().decode() return e.code, dict(e.headers), e.read().decode()
except URLError as e: except URLError as e:
if not isinstance(e.reason, ConnectionRefusedError): if not isinstance(e.reason, ConnectionRefusedError):
raise raise
time.sleep(0.1) time.sleep(0.1)
def test_root(self): def test_root(self) -> None:
self.thread.start() self.thread.start()
self.get("/", check=302) self.get("/", check=302)
def test_ssl(self): def test_ssl(self) -> None:
self.configuration.update({ self.configuration.update({
"server": {"ssl": "True", "server": {"ssl": "True",
"certificate": get_file_path("cert.pem"), "certificate": get_file_path("cert.pem"),
@ -123,7 +148,7 @@ class TestBaseServerRequests(BaseTest):
self.thread.start() self.thread.start()
self.get("/", check=302) self.get("/", check=302)
def test_bind_fail(self): def test_bind_fail(self) -> None:
for address_family, address in [(socket.AF_INET, "::1"), for address_family, address in [(socket.AF_INET, "::1"),
(socket.AF_INET6, "127.0.0.1")]: (socket.AF_INET6, "127.0.0.1")]:
with socket.socket(address_family, socket.SOCK_STREAM) as sock: with socket.socket(address_family, socket.SOCK_STREAM) as sock:
@ -143,7 +168,7 @@ class TestBaseServerRequests(BaseTest):
errno.EADDRNOTAVAIL, errno.EAFNOSUPPORT, errno.EADDRNOTAVAIL, errno.EAFNOSUPPORT,
errno.EPROTONOSUPPORT)) errno.EPROTONOSUPPORT))
def test_ipv6(self): def test_ipv6(self) -> None:
try: try:
with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as sock: with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as sock:
# Only allow IPv6 connections to the IPv6 socket # Only allow IPv6 connections to the IPv6 socket
@ -162,7 +187,7 @@ class TestBaseServerRequests(BaseTest):
self.thread.start() self.thread.start()
self.get("/", check=302) self.get("/", check=302)
def test_command_line_interface(self): def test_command_line_interface(self) -> None:
config_args = [] config_args = []
for section, values in config.DEFAULT_CONFIG_SCHEMA.items(): for section, values in config.DEFAULT_CONFIG_SCHEMA.items():
if section.startswith("_"): if section.startswith("_"):
@ -172,13 +197,14 @@ class TestBaseServerRequests(BaseTest):
continue continue
long_name = "--%s-%s" % (section, option.replace("_", "-")) long_name = "--%s-%s" % (section, option.replace("_", "-"))
if data["type"] == bool: if data["type"] == bool:
if not self.configuration.get(section, option): if not cast(bool, self.configuration.get(section, option)):
long_name = "--no%s" % long_name[1:] long_name = "--no%s" % long_name[1:]
config_args.append(long_name) config_args.append(long_name)
else: else:
config_args.append(long_name) config_args.append(long_name)
config_args.append( raw_value = self.configuration.get_raw(section, option)
self.configuration.get_raw(section, option)) assert isinstance(raw_value, str)
config_args.append(raw_value)
p = subprocess.Popen( p = subprocess.Popen(
[sys.executable, "-m", "radicale"] + config_args, [sys.executable, "-m", "radicale"] + config_args,
env={**os.environ, "PYTHONPATH": os.pathsep.join(sys.path)}) env={**os.environ, "PYTHONPATH": os.pathsep.join(sys.path)})
@ -190,7 +216,7 @@ class TestBaseServerRequests(BaseTest):
if os.name == "posix": if os.name == "posix":
assert p.returncode == 0 assert p.returncode == 0
def test_wsgi_server(self): def test_wsgi_server(self) -> None:
config_path = os.path.join(self.colpath, "config") config_path = os.path.join(self.colpath, "config")
parser = RawConfigParser() parser = RawConfigParser()
parser.read_dict(configuration_to_dict(self.configuration)) parser.read_dict(configuration_to_dict(self.configuration))
@ -199,9 +225,10 @@ class TestBaseServerRequests(BaseTest):
env = os.environ.copy() env = os.environ.copy()
env["PYTHONPATH"] = os.pathsep.join(sys.path) env["PYTHONPATH"] = os.pathsep.join(sys.path)
env["RADICALE_CONFIG"] = config_path env["RADICALE_CONFIG"] = config_path
raw_server_hosts = self.configuration.get_raw("server", "hosts")
assert isinstance(raw_server_hosts, str)
p = subprocess.Popen([ p = subprocess.Popen([
sys.executable, "-m", "waitress", sys.executable, "-m", "waitress", "--listen", raw_server_hosts,
"--listen", self.configuration.get_raw("server", "hosts"),
"radicale:application"], env=env) "radicale:application"], env=env)
try: try:
self.get("/", is_alive_fn=lambda: p.poll() is None, check=302) self.get("/", is_alive_fn=lambda: p.poll() is None, check=302)

View File

@ -19,30 +19,14 @@ Test web plugin.
""" """
import shutil from radicale import Application
import tempfile
from radicale import Application, config
from radicale.tests import BaseTest from radicale.tests import BaseTest
class TestBaseWebRequests(BaseTest): class TestBaseWebRequests(BaseTest):
"""Test web plugin.""" """Test web plugin."""
def setup(self): def test_internal(self) -> None:
self.configuration = config.load()
self.colpath = tempfile.mkdtemp()
self.configuration.update({
"storage": {"filesystem_folder": self.colpath,
# Disable syncing to disk for better performance
"_filesystem_fsync": "False"}},
"test", privileged=True)
self.application = Application(self.configuration)
def teardown(self):
shutil.rmtree(self.colpath)
def test_internal(self):
status, headers, _ = self.request("GET", "/.web") status, headers, _ = self.request("GET", "/.web")
assert status == 302 assert status == 302
assert headers.get("Location") == ".web/" assert headers.get("Location") == ".web/"
@ -50,7 +34,7 @@ class TestBaseWebRequests(BaseTest):
assert answer assert answer
self.post("/.web", check=405) self.post("/.web", check=405)
def test_none(self): def test_none(self) -> None:
self.configuration.update({"web": {"type": "none"}}, "test") self.configuration.update({"web": {"type": "none"}}, "test")
self.application = Application(self.configuration) self.application = Application(self.configuration)
_, answer = self.get("/.web") _, answer = self.get("/.web")
@ -58,7 +42,7 @@ class TestBaseWebRequests(BaseTest):
self.get("/.web/", check=404) self.get("/.web/", check=404)
self.post("/.web", check=405) self.post("/.web", check=405)
def test_custom(self): def test_custom(self) -> None:
"""Custom web plugin.""" """Custom web plugin."""
self.configuration.update({ self.configuration.update({
"web": {"type": "radicale.tests.custom.web"}}, "test") "web": {"type": "radicale.tests.custom.web"}}, "test")