From 60f25bf19a566181a54075f8ae0942a85c7f0d66 Mon Sep 17 00:00:00 2001 From: Unrud Date: Mon, 26 Jul 2021 20:56:47 +0200 Subject: [PATCH] Type hints for tests --- radicale/tests/__init__.py | 152 ++++--- radicale/tests/custom/auth.py | 3 +- radicale/tests/custom/rights.py | 3 +- radicale/tests/custom/storage_simple_sync.py | 2 + radicale/tests/custom/web.py | 9 +- radicale/tests/helpers.py | 14 +- radicale/tests/test_auth.py | 68 ++- radicale/tests/test_base.py | 415 +++++++++++-------- radicale/tests/test_config.py | 61 +-- radicale/tests/test_rights.py | 40 +- radicale/tests/test_server.py | 89 ++-- radicale/tests/test_web.py | 24 +- 12 files changed, 501 insertions(+), 379 deletions(-) diff --git a/radicale/tests/__init__.py b/radicale/tests/__init__.py index f8900b1..00e2b15 100644 --- a/radicale/tests/__init__.py +++ b/radicale/tests/__init__.py @@ -22,13 +22,19 @@ Tests for Radicale. import base64 import logging +import shutil import sys +import tempfile +import xml.etree.ElementTree as ET from io import BytesIO +from typing import Any, Dict, List, Optional, Tuple, Union import defusedxml.ElementTree as DefusedET import radicale -from radicale import xmlutils +from radicale import app, config, xmlutils + +RESPONSES = Dict[str, Union[int, Dict[str, Tuple[int, ET.Element]]]] # Enable debug output radicale.log.logger.setLevel(logging.DEBUG) @@ -37,40 +43,70 @@ radicale.log.logger.setLevel(logging.DEBUG) class BaseTest: """Base class for tests.""" - def request(self, method, path, data=None, login=None, **args): + colpath: str + configuration: config.Configuration + application: app.Application + + def setup(self) -> None: + self.configuration = config.load() + self.colpath = tempfile.mkdtemp() + self.configuration.update({ + "storage": {"filesystem_folder": self.colpath, + # Disable syncing to disk for better performance + "_filesystem_fsync": "False"}, + # Set incorrect authentication delay to a short duration + "auth": {"delay": "0.001"}}, "test", privileged=True) + self.application = app.Application(self.configuration) + + def teardown(self) -> None: + shutil.rmtree(self.colpath) + + def request(self, method: str, path: str, data: Optional[str] = None, + **kwargs) -> Tuple[int, Dict[str, str], str]: """Send a request.""" - for key in args: - args[key.upper()] = args[key] + login = kwargs.pop("login", None) + if login is not None and not isinstance(login, str): + raise TypeError("login argument must be %r, not %r" % + (str, type(login))) + environ: Dict[str, Any] = {k.upper(): v for k, v in kwargs.items()} + for k, v in environ.items(): + if not isinstance(v, str): + raise TypeError("type of %r is %r, expected %r" % + (k, type(v), str)) + encoding: str = self.configuration.get("encoding", "request") if login: - args["HTTP_AUTHORIZATION"] = "Basic " + base64.b64encode( - login.encode()).decode() - args["REQUEST_METHOD"] = method.upper() - args["PATH_INFO"] = path + environ["HTTP_AUTHORIZATION"] = "Basic " + base64.b64encode( + login.encode(encoding)).decode() + environ["REQUEST_METHOD"] = method.upper() + environ["PATH_INFO"] = path if data: - data = data.encode() - args["wsgi.input"] = BytesIO(data) - args["CONTENT_LENGTH"] = str(len(data)) - args["wsgi.errors"] = sys.stderr + data_bytes = data.encode(encoding) + environ["wsgi.input"] = BytesIO(data_bytes) + environ["CONTENT_LENGTH"] = str(len(data_bytes)) + environ["wsgi.errors"] = sys.stderr status = headers = None - def start_response(status_, headers_): + def start_response(status_: str, headers_: List[Tuple[str, str]] + ) -> None: nonlocal status, headers status = status_ headers = headers_ - answer = self.application(args, start_response) + answers = list(self.application(environ, start_response)) + assert status is not None and headers is not None return (int(status.split()[0]), dict(headers), - answer[0].decode() if answer else None) + answers[0].decode() if answers else "") @staticmethod - def parse_responses(text): + def parse_responses(text: str) -> RESPONSES: xml = DefusedET.fromstring(text) assert xml.tag == xmlutils.make_clark("D:multistatus") - path_responses = {} + path_responses: Dict[str, Union[ + int, Dict[str, Tuple[int, ET.Element]]]] = {} for response in xml.findall(xmlutils.make_clark("D:response")): href = response.find(xmlutils.make_clark("D:href")) assert href.text not in path_responses - prop_respones = {} + prop_respones: Dict[str, Tuple[int, ET.Element]] = {} for propstat in response.findall( xmlutils.make_clark("D:propstat")): status = propstat.find(xmlutils.make_clark("D:status")) @@ -92,70 +128,90 @@ class BaseTest: return path_responses @staticmethod - def _check_status(status, good_status, check=True): - if check is True: - assert status == good_status - elif check is not False: - assert status == check + def _check_status(status: int, good_status: int, + check: Union[bool, int] = True) -> bool: + if check is not False: + expected = good_status if check is True else check + assert status == expected, "%d != %d" % (status, expected) return status == good_status - def get(self, path, check=True, **args): - status, _, answer = self.request("GET", path, **args) + def get(self, path: str, check: Union[bool, int] = True, **kwargs + ) -> Tuple[int, str]: + assert "data" not in kwargs + status, _, answer = self.request("GET", path, **kwargs) self._check_status(status, 200, check) return status, answer - def post(self, path, data=None, check=True, **args): - status, _, answer = self.request("POST", path, data, **args) + def post(self, path: str, data: str = None, check: Union[bool, int] = True, + **kwargs) -> Tuple[int, str]: + status, _, answer = self.request("POST", path, data, **kwargs) self._check_status(status, 200, check) return status, answer - def put(self, path, data, check=True, **args): - status, _, answer = self.request("PUT", path, data, **args) + def put(self, path: str, data: str, check: Union[bool, int] = True, + **kwargs) -> Tuple[int, str]: + status, _, answer = self.request("PUT", path, data, **kwargs) self._check_status(status, 201, check) return status, answer - def propfind(self, path, data=None, check=True, **args): - status, _, answer = self.request("PROPFIND", path, data, **args) + def propfind(self, path: str, data: Optional[str] = None, + check: Union[bool, int] = True, **kwargs + ) -> Tuple[int, RESPONSES]: + status, _, answer = self.request("PROPFIND", path, data, **kwargs) if not self._check_status(status, 207, check): - return status, None + return status, {} + assert answer is not None responses = self.parse_responses(answer) - if args.get("HTTP_DEPTH", "0") == "0": + if kwargs.get("HTTP_DEPTH", "0") == "0": assert len(responses) == 1 and path in responses return status, responses - def proppatch(self, path, data=None, check=True, **args): - status, _, answer = self.request("PROPPATCH", path, data, **args) + def proppatch(self, path: str, data: Optional[str] = None, + check: Union[bool, int] = True, **kwargs + ) -> Tuple[int, RESPONSES]: + status, _, answer = self.request("PROPPATCH", path, data, **kwargs) if not self._check_status(status, 207, check): - return status, None + return status, {} + assert answer is not None responses = self.parse_responses(answer) assert len(responses) == 1 and path in responses return status, responses - def report(self, path, data, check=True, **args): - status, _, answer = self.request("REPORT", path, data, **args) + def report(self, path: str, data: str, check: Union[bool, int] = True, + **kwargs) -> Tuple[int, RESPONSES]: + status, _, answer = self.request("REPORT", path, data, **kwargs) if not self._check_status(status, 207, check): - return status, None + return status, {} + assert answer is not None return status, self.parse_responses(answer) - def delete(self, path, check=True, **args): - status, _, answer = self.request("DELETE", path, **args) + def delete(self, path: str, check: Union[bool, int] = True, **kwargs + ) -> Tuple[int, RESPONSES]: + assert "data" not in kwargs + status, _, answer = self.request("DELETE", path, **kwargs) if not self._check_status(status, 200, check): - return status, None + return status, {} + assert answer is not None responses = self.parse_responses(answer) assert len(responses) == 1 and path in responses return status, responses - def mkcalendar(self, path, data=None, check=True, **args): - status, _, answer = self.request("MKCALENDAR", path, data, **args) + def mkcalendar(self, path: str, data: Optional[str] = None, + check: Union[bool, int] = True, **kwargs + ) -> Tuple[int, str]: + status, _, answer = self.request("MKCALENDAR", path, data, **kwargs) self._check_status(status, 201, check) return status, answer - def mkcol(self, path, data=None, check=True, **args): - status, _, _ = self.request("MKCOL", path, data, **args) + def mkcol(self, path: str, data: Optional[str] = None, + check: Union[bool, int] = True, **kwargs) -> int: + status, _, _ = self.request("MKCOL", path, data, **kwargs) self._check_status(status, 201, check) return status - def create_addressbook(self, path, check=True, **args): + def create_addressbook(self, path: str, check: Union[bool, int] = True, + **kwargs) -> int: + assert "data" not in kwargs return self.mkcol(path, """\ @@ -167,4 +223,4 @@ class BaseTest: -""", check=check, **args) +""", check=check, **kwargs) diff --git a/radicale/tests/custom/auth.py b/radicale/tests/custom/auth.py index b0a1172..8e17adc 100644 --- a/radicale/tests/custom/auth.py +++ b/radicale/tests/custom/auth.py @@ -28,7 +28,8 @@ from radicale import auth class Auth(auth.BaseAuth): - def login(self, login, password): + + def login(self, login: str, password: str) -> str: if login == "tmp": return login return "" diff --git a/radicale/tests/custom/rights.py b/radicale/tests/custom/rights.py index 5696ad5..8f145dd 100644 --- a/radicale/tests/custom/rights.py +++ b/radicale/tests/custom/rights.py @@ -23,7 +23,8 @@ from radicale import pathutils, rights class Rights(rights.BaseRights): - def authorization(self, user, path): + + def authorization(self, user: str, path: str) -> str: sane_path = pathutils.strip_path(path) if sane_path not in ("tmp", "other"): return "" diff --git a/radicale/tests/custom/storage_simple_sync.py b/radicale/tests/custom/storage_simple_sync.py index 3046a82..7e023f0 100644 --- a/radicale/tests/custom/storage_simple_sync.py +++ b/radicale/tests/custom/storage_simple_sync.py @@ -27,8 +27,10 @@ from radicale.storage import BaseCollection, multifilesystem class Collection(multifilesystem.Collection): + sync = BaseCollection.sync class Storage(multifilesystem.Storage): + _collection_class = Collection diff --git a/radicale/tests/custom/web.py b/radicale/tests/custom/web.py index 784614f..2626e05 100644 --- a/radicale/tests/custom/web.py +++ b/radicale/tests/custom/web.py @@ -21,13 +21,16 @@ Custom web plugin. from http import client -from radicale import httputils, web +from radicale import httputils, types, web class Web(web.BaseWeb): - def get(self, environ, base_prefix, path, user): + + def get(self, environ: types.WSGIEnviron, base_prefix: str, path: str, + user: str) -> types.WSGIResponse: return client.OK, {"Content-Type": "text/plain"}, "custom" - def post(self, environ, base_prefix, path, user): + def post(self, environ: types.WSGIEnviron, base_prefix: str, path: str, + user: str) -> types.WSGIResponse: content = httputils.read_request_body(self.configuration, environ) return client.OK, {"Content-Type": "text/plain"}, "echo:" + content diff --git a/radicale/tests/helpers.py b/radicale/tests/helpers.py index face22b..3d2d13b 100644 --- a/radicale/tests/helpers.py +++ b/radicale/tests/helpers.py @@ -26,19 +26,21 @@ This module offers helpers to use in tests. import os -EXAMPLES_FOLDER = os.path.join(os.path.dirname(__file__), "static") +from radicale import config, types + +EXAMPLES_FOLDER: str = os.path.join(os.path.dirname(__file__), "static") -def get_file_path(file_name): +def get_file_path(file_name: str) -> str: return os.path.join(EXAMPLES_FOLDER, file_name) -def get_file_content(file_name): - with open(get_file_path(file_name), encoding="utf-8") as fd: - return fd.read() +def get_file_content(file_name: str) -> str: + with open(get_file_path(file_name), encoding="utf-8") as f: + return f.read() -def configuration_to_dict(configuration): +def configuration_to_dict(configuration: config.Configuration) -> types.CONFIG: """Convert configuration to a dict with raw values.""" return {section: {option: configuration.get_raw(section, option) for option in configuration.options(section) diff --git a/radicale/tests/test_auth.py b/radicale/tests/test_auth.py index 5379be7..d2f52da 100644 --- a/radicale/tests/test_auth.py +++ b/radicale/tests/test_auth.py @@ -22,13 +22,12 @@ Radicale tests with simple requests and authentication. """ import os -import shutil import sys -import tempfile +from typing import Iterable, Tuple, Union import pytest -from radicale import Application, config, xmlutils +from radicale import Application, xmlutils from radicale.tests import BaseTest @@ -38,21 +37,10 @@ class TestBaseAuthRequests(BaseTest): We should setup auth for each type before creating the Application object. """ - def setup(self): - self.configuration = config.load() - self.colpath = tempfile.mkdtemp() - self.configuration.update({ - "storage": {"filesystem_folder": self.colpath, - # Disable syncing to disk for better performance - "_filesystem_fsync": "False"}, - # Set incorrect authentication delay to a very low value - "auth": {"delay": "0.002"}}, "test", privileged=True) - def teardown(self): - shutil.rmtree(self.colpath) - - def _test_htpasswd(self, htpasswd_encryption, htpasswd_content, - test_matrix="ascii"): + def _test_htpasswd(self, htpasswd_encryption: str, htpasswd_content: str, + test_matrix: Union[str, Iterable[Tuple[str, str, bool]]] + = "ascii") -> None: """Test htpasswd authentication with user "tmp" and password "bepo" for ``test_matrix`` "ascii" or user "😀" and password "🔑" for ``test_matrix`` "unicode".""" @@ -67,7 +55,7 @@ class TestBaseAuthRequests(BaseTest): except MissingBackendError: pytest.skip("bcrypt backend for passlib is not installed") htpasswd_file_path = os.path.join(self.colpath, ".htpasswd") - encoding = self.configuration.get("encoding", "stock") + encoding: str = self.configuration.get("encoding", "stock") with open(htpasswd_file_path, "w", encoding=encoding) as f: f.write(htpasswd_content) self.configuration.update({ @@ -83,54 +71,56 @@ class TestBaseAuthRequests(BaseTest): test_matrix = (("😀", "🔑", True), ("😀", "🌹", False), ("😁", "🔑", False), ("😀", "", False), ("", "🔑", False), ("", "", False)) + elif isinstance(test_matrix, str): + raise ValueError("Unknown test matrix %r" % test_matrix) for user, password, valid in test_matrix: self.propfind("/", check=207 if valid else 401, login="%s:%s" % (user, password)) - def test_htpasswd_plain(self): + def test_htpasswd_plain(self) -> None: self._test_htpasswd("plain", "tmp:bepo") - def test_htpasswd_plain_password_split(self): + def test_htpasswd_plain_password_split(self) -> None: self._test_htpasswd("plain", "tmp:be:po", ( ("tmp", "be:po", True), ("tmp", "bepo", False))) - def test_htpasswd_plain_unicode(self): + def test_htpasswd_plain_unicode(self) -> None: self._test_htpasswd("plain", "😀:🔑", "unicode") - def test_htpasswd_md5(self): + def test_htpasswd_md5(self) -> None: self._test_htpasswd("md5", "tmp:$apr1$BI7VKCZh$GKW4vq2hqDINMr8uv7lDY/") def test_htpasswd_md5_unicode(self): self._test_htpasswd( "md5", "😀:$apr1$w4ev89r1$29xO8EvJmS2HEAadQ5qy11", "unicode") - def test_htpasswd_bcrypt(self): + def test_htpasswd_bcrypt(self) -> None: self._test_htpasswd("bcrypt", "tmp:$2y$05$oD7hbiQFQlvCM7zoalo/T.MssV3V" "NTRI3w5KDnj8NTUKJNWfVpvRq") - def test_htpasswd_bcrypt_unicode(self): + def test_htpasswd_bcrypt_unicode(self) -> None: self._test_htpasswd("bcrypt", "😀:$2y$10$Oyz5aHV4MD9eQJbk6GPemOs4T6edK" "6U9Sqlzr.W1mMVCS8wJUftnW", "unicode") - def test_htpasswd_multi(self): + def test_htpasswd_multi(self) -> None: self._test_htpasswd("plain", "ign:ign\ntmp:bepo") @pytest.mark.skipif(sys.platform == "win32", reason="leading and trailing " "whitespaces not allowed in file names") - def test_htpasswd_whitespace_user(self): + def test_htpasswd_whitespace_user(self) -> None: for user in (" tmp", "tmp ", " tmp "): self._test_htpasswd("plain", "%s:bepo" % user, ( (user, "bepo", True), ("tmp", "bepo", False))) - def test_htpasswd_whitespace_password(self): + def test_htpasswd_whitespace_password(self) -> None: for password in (" bepo", "bepo ", " bepo "): self._test_htpasswd("plain", "tmp:%s" % password, ( ("tmp", password, True), ("tmp", "bepo", False))) - def test_htpasswd_comment(self): + def test_htpasswd_comment(self) -> None: self._test_htpasswd("plain", "#comment\n #comment\n \ntmp:bepo\n\n") - def test_remote_user(self): + def test_remote_user(self) -> None: self.configuration.update({"auth": {"type": "remote_user"}}, "test") self.application = Application(self.configuration) _, responses = self.propfind("/", """\ @@ -140,11 +130,15 @@ class TestBaseAuthRequests(BaseTest): """, REMOTE_USER="test") - status, prop = responses["/"]["D:current-user-principal"] + assert responses is not None + response = responses["/"] + assert not isinstance(response, int) + status, prop = response["D:current-user-principal"] assert status == 200 - assert prop.find(xmlutils.make_clark("D:href")).text == "/test/" + href_element = prop.find(xmlutils.make_clark("D:href")) + assert href_element is not None and href_element.text == "/test/" - def test_http_x_remote_user(self): + def test_http_x_remote_user(self) -> None: self.configuration.update( {"auth": {"type": "http_x_remote_user"}}, "test") self.application = Application(self.configuration) @@ -155,11 +149,15 @@ class TestBaseAuthRequests(BaseTest): """, HTTP_X_REMOTE_USER="test") - status, prop = responses["/"]["D:current-user-principal"] + assert responses is not None + response = responses["/"] + assert not isinstance(response, int) + status, prop = response["D:current-user-principal"] assert status == 200 - assert prop.find(xmlutils.make_clark("D:href")).text == "/test/" + href_element = prop.find(xmlutils.make_clark("D:href")) + assert href_element is not None and href_element.text == "/test/" - def test_custom(self): + def test_custom(self) -> None: """Custom authentication.""" self.configuration.update( {"auth": {"type": "radicale.tests.custom.auth"}}, "test") diff --git a/radicale/tests/test_base.py b/radicale/tests/test_base.py index 5952615..4e8cd11 100644 --- a/radicale/tests/test_base.py +++ b/radicale/tests/test_base.py @@ -24,37 +24,39 @@ import os import posixpath import shutil import sys -import tempfile -from typing import Any, ClassVar +from typing import (Any, Callable, ClassVar, Iterable, List, Optional, Tuple, + Union) import defusedxml.ElementTree as DefusedET import pytest import radicale.tests.custom.storage_simple_sync from radicale import Application, config, storage, xmlutils -from radicale.tests import BaseTest +from radicale.tests import RESPONSES, BaseTest from radicale.tests.helpers import get_file_content +StorageType = Union[str, Callable[[config.Configuration], storage.BaseStorage]] -class BaseRequestsMixIn: + +class BaseRequestsMixIn(BaseTest): """Tests with simple requests.""" # Allow skipping sync-token tests, when not fully supported by the backend - full_sync_token_support = True + full_sync_token_support: ClassVar[bool] = True - def test_root(self): + def test_root(self) -> None: """GET request at "/".""" _, answer = self.get("/", check=302) assert answer == "Redirected to .web" - def test_script_name(self): + def test_script_name(self) -> None: """GET request at "/" with SCRIPT_NAME.""" _, answer = self.get("/", check=302, SCRIPT_NAME="/radicale") assert answer == "Redirected to .web" _, answer = self.get("", check=302, SCRIPT_NAME="/radicale") assert answer == "Redirected to radicale/.web" - def test_add_event(self): + def test_add_event(self) -> None: """Add an event.""" self.mkcalendar("/calendar.ics/") event = get_file_content("event1.ics") @@ -68,7 +70,7 @@ class BaseRequestsMixIn: assert "Event" in answer assert "UID:event" in answer - def test_add_event_without_uid(self): + def test_add_event_without_uid(self) -> None: """Add an event without UID.""" self.mkcalendar("/calendar.ics/") event = get_file_content("event1.ics").replace("UID:event1\n", "") @@ -76,7 +78,7 @@ class BaseRequestsMixIn: path = "/calendar.ics/event.ics" self.put(path, event, check=400) - def test_add_event_duplicate_uid(self): + def test_add_event_duplicate_uid(self) -> None: """Add an event with an existing UID.""" self.mkcalendar("/calendar.ics/") event = get_file_content("event1.ics") @@ -88,7 +90,7 @@ class BaseRequestsMixIn: assert xml.tag == xmlutils.make_clark("D:error") assert xml.find(xmlutils.make_clark("C:no-uid-conflict")) is not None - def test_add_todo(self): + def test_add_todo(self) -> None: """Add a todo.""" self.mkcalendar("/calendar.ics/") todo = get_file_content("todo1.ics") @@ -102,7 +104,7 @@ class BaseRequestsMixIn: assert "Todo" in answer assert "UID:todo" in answer - def test_add_contact(self): + def test_add_contact(self) -> None: """Add a contact.""" self.create_addressbook("/contacts.vcf/") contact = get_file_content("contact1.vcf") @@ -117,7 +119,7 @@ class BaseRequestsMixIn: _, answer = self.get(path) assert "UID:contact1" in answer - def test_add_contact_without_uid(self): + def test_add_contact_without_uid(self) -> None: """Add a contact without UID.""" self.create_addressbook("/contacts.vcf/") contact = get_file_content("contact1.vcf").replace("UID:contact1\n", @@ -126,7 +128,7 @@ class BaseRequestsMixIn: path = "/contacts.vcf/contact.vcf" self.put(path, contact, check=400) - def test_update_event(self): + def test_update_event(self) -> None: """Update an event.""" self.mkcalendar("/calendar.ics/") event = get_file_content("event1.ics") @@ -139,7 +141,7 @@ class BaseRequestsMixIn: _, answer = self.get(path) assert "DTSTAMP:20130902T150159Z" in answer - def test_update_event_uid_event(self): + def test_update_event_uid_event(self) -> None: """Update an event with a different UID.""" self.mkcalendar("/calendar.ics/") event1 = get_file_content("event1.ics") @@ -152,7 +154,7 @@ class BaseRequestsMixIn: assert xml.tag == xmlutils.make_clark("D:error") assert xml.find(xmlutils.make_clark("C:no-uid-conflict")) is not None - def test_put_whole_calendar(self): + def test_put_whole_calendar(self) -> None: """Create and overwrite a whole calendar.""" self.put("/calendar.ics/", "BEGIN:VCALENDAR\r\nEND:VCALENDAR") event1 = get_file_content("event1.ics") @@ -165,7 +167,7 @@ class BaseRequestsMixIn: assert "\r\nUID:event\r\n" in answer and "\r\nUID:todo\r\n" in answer assert "\r\nUID:event1\r\n" not in answer - def test_put_whole_calendar_without_uids(self): + def test_put_whole_calendar_without_uids(self) -> None: """Create a whole calendar without UID.""" event = get_file_content("event_multiple.ics") event = event.replace("UID:event\n", "").replace("UID:todo\n", "") @@ -182,15 +184,16 @@ class BaseRequestsMixIn: for uid2 in uids[i + 1:]: assert uid1 != uid2 - def test_put_whole_addressbook(self): + def test_put_whole_addressbook(self) -> None: """Create and overwrite a whole addressbook.""" contacts = get_file_content("contact_multiple.vcf") self.put("/contacts.vcf/", contacts) _, answer = self.get("/contacts.vcf/") - assert ("\r\nUID:contact1\r\n" in answer and - "\r\nUID:contact2\r\n" in answer) + assert answer is not None + assert "\r\nUID:contact1\r\n" in answer + assert "\r\nUID:contact2\r\n" in answer - def test_put_whole_addressbook_without_uids(self): + def test_put_whole_addressbook_without_uids(self) -> None: """Create a whole addressbook without UID.""" contacts = get_file_content("contact_multiple.vcf") contacts = contacts.replace("UID:contact1\n", "").replace( @@ -208,7 +211,7 @@ class BaseRequestsMixIn: for uid2 in uids[i + 1:]: assert uid1 != uid2 - def test_verify(self): + def test_verify(self) -> None: """Verify the storage.""" contacts = get_file_content("contact_multiple.vcf") self.put("/contacts.vcf/", contacts) @@ -217,7 +220,7 @@ class BaseRequestsMixIn: s = storage.load(self.configuration) assert s.verify() - def test_delete(self): + def test_delete(self) -> None: """Delete an event.""" self.mkcalendar("/calendar.ics/") event = get_file_content("event1.ics") @@ -228,14 +231,14 @@ class BaseRequestsMixIn: _, answer = self.get("/calendar.ics/") assert "VEVENT" not in answer - def test_mkcalendar(self): + def test_mkcalendar(self) -> None: """Make a calendar.""" self.mkcalendar("/calendar.ics/") _, answer = self.get("/calendar.ics/") assert "BEGIN:VCALENDAR" in answer assert "END:VCALENDAR" in answer - def test_mkcalendar_overwrite(self): + def test_mkcalendar_overwrite(self) -> None: """Try to overwrite an existing calendar.""" self.mkcalendar("/calendar.ics/") status, answer = self.mkcalendar("/calendar.ics/", check=False) @@ -245,41 +248,43 @@ class BaseRequestsMixIn: assert xml.find(xmlutils.make_clark( "D:resource-must-be-null")) is not None - def test_mkcalendar_intermediate(self): + def test_mkcalendar_intermediate(self) -> None: """Try make a calendar in a unmapped collection.""" status, _ = self.mkcalendar("/unmapped/calendar.ics/", check=False) assert status == 409 - def test_mkcol(self): + def test_mkcol(self) -> None: """Make a collection.""" self.mkcol("/user/") - def test_mkcol_overwrite(self): + def test_mkcol_overwrite(self) -> None: """Try to overwrite an existing collection.""" self.mkcol("/user/") status = self.mkcol("/user/", check=False) assert status == 405 - def test_mkcol_intermediate(self): + def test_mkcol_intermediate(self) -> None: """Try make a collection in a unmapped collection.""" status = self.mkcol("/unmapped/user/", check=False) assert status == 409 - def test_mkcol_make_calendar(self): + def test_mkcol_make_calendar(self) -> None: """Make a calendar with additional props.""" mkcol_make_calendar = get_file_content("mkcol_make_calendar.xml") self.mkcol("/calendar.ics/", mkcol_make_calendar) _, answer = self.get("/calendar.ics/") + assert answer is not None assert "BEGIN:VCALENDAR" in answer assert "END:VCALENDAR" in answer # Read additional properties propfind = get_file_content("propfind_calendar_color.xml") _, responses = self.propfind("/calendar.ics/", propfind) - assert len(responses["/calendar.ics/"]) == 1 - status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"] + response = responses["/calendar.ics/"] + assert not isinstance(response, int) and len(response) == 1 + status, prop = response["ICAL:calendar-color"] assert status == 200 and prop.text == "#BADA55" - def test_move(self): + def test_move(self) -> None: """Move a item.""" self.mkcalendar("/calendar.ics/") event = get_file_content("event1.ics") @@ -292,7 +297,7 @@ class BaseRequestsMixIn: self.get(path1, check=404) self.get(path2) - def test_move_between_colections(self): + def test_move_between_colections(self) -> None: """Move a item.""" self.mkcalendar("/calendar1.ics/") self.mkcalendar("/calendar2.ics/") @@ -306,7 +311,7 @@ class BaseRequestsMixIn: self.get(path1, check=404) self.get(path2) - def test_move_between_colections_duplicate_uid(self): + def test_move_between_colections_duplicate_uid(self) -> None: """Move a item to a collection which already contains the UID.""" self.mkcalendar("/calendar1.ics/") self.mkcalendar("/calendar2.ics/") @@ -322,7 +327,7 @@ class BaseRequestsMixIn: assert xml.tag == xmlutils.make_clark("D:error") assert xml.find(xmlutils.make_clark("C:no-uid-conflict")) is not None - def test_move_between_colections_overwrite(self): + def test_move_between_colections_overwrite(self) -> None: """Move a item to a collection which already contains the item.""" self.mkcalendar("/calendar1.ics/") self.mkcalendar("/calendar2.ics/") @@ -338,7 +343,7 @@ class BaseRequestsMixIn: HTTP_HOST="", HTTP_OVERWRITE="T") assert status == 204 - def test_move_between_colections_overwrite_uid_conflict(self): + def test_move_between_colections_overwrite_uid_conflict(self) -> None: """Move a item to a collection which already contains the item with a different UID.""" self.mkcalendar("/calendar1.ics/") @@ -356,16 +361,16 @@ class BaseRequestsMixIn: assert xml.tag == xmlutils.make_clark("D:error") assert xml.find(xmlutils.make_clark("C:no-uid-conflict")) is not None - def test_head(self): + def test_head(self) -> None: status, _, _ = self.request("HEAD", "/") assert status == 302 - def test_options(self): + def test_options(self) -> None: status, headers, _ = self.request("OPTIONS", "/") assert status == 200 assert "DAV" in headers - def test_delete_collection(self): + def test_delete_collection(self) -> None: """Delete a collection.""" self.mkcalendar("/calendar.ics/") event = get_file_content("event1.ics") @@ -374,7 +379,7 @@ class BaseRequestsMixIn: assert responses["/calendar.ics/"] == 200 self.get("/calendar.ics/", check=404) - def test_delete_root_collection(self): + def test_delete_root_collection(self) -> None: """Delete the root collection.""" self.mkcalendar("/calendar.ics/") event = get_file_content("event1.ics") @@ -385,7 +390,7 @@ class BaseRequestsMixIn: self.get("/calendar.ics/", check=404) self.get("/event1.ics", 404) - def test_propfind(self): + def test_propfind(self) -> None: calendar_path = "/calendar.ics/" self.mkcalendar("/calendar.ics/") event = get_file_content("event1.ics") @@ -398,139 +403,163 @@ class BaseRequestsMixIn: assert len(responses) == 2 assert calendar_path in responses and event_path in responses - def test_propfind_propname(self): + def test_propfind_propname(self) -> None: self.mkcalendar("/calendar.ics/") event = get_file_content("event1.ics") self.put("/calendar.ics/event.ics", event) propfind = get_file_content("propname.xml") _, responses = self.propfind("/calendar.ics/", propfind) - status, prop = responses["/calendar.ics/"]["D:sync-token"] + response = responses["/calendar.ics/"] + assert not isinstance(response, int) + status, prop = response["D:sync-token"] assert status == 200 and not prop.text _, responses = self.propfind("/calendar.ics/event.ics", propfind) - status, prop = responses["/calendar.ics/event.ics"]["D:getetag"] + response = responses["/calendar.ics/event.ics"] + assert not isinstance(response, int) + status, prop = response["D:getetag"] assert status == 200 and not prop.text - def test_propfind_allprop(self): + def test_propfind_allprop(self) -> None: self.mkcalendar("/calendar.ics/") event = get_file_content("event1.ics") self.put("/calendar.ics/event.ics", event) propfind = get_file_content("allprop.xml") _, responses = self.propfind("/calendar.ics/", propfind) - status, prop = responses["/calendar.ics/"]["D:sync-token"] + response = responses["/calendar.ics/"] + assert not isinstance(response, int) + status, prop = response["D:sync-token"] assert status == 200 and prop.text _, responses = self.propfind("/calendar.ics/event.ics", propfind) - status, prop = responses["/calendar.ics/event.ics"]["D:getetag"] + response = responses["/calendar.ics/event.ics"] + assert not isinstance(response, int) + status, prop = response["D:getetag"] assert status == 200 and prop.text - def test_propfind_nonexistent(self): + def test_propfind_nonexistent(self) -> None: """Read a property that does not exist.""" self.mkcalendar("/calendar.ics/") propfind = get_file_content("propfind_calendar_color.xml") _, responses = self.propfind("/calendar.ics/", propfind) - assert len(responses["/calendar.ics/"]) == 1 - status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"] + response = responses["/calendar.ics/"] + assert not isinstance(response, int) and len(response) == 1 + status, prop = response["ICAL:calendar-color"] assert status == 404 and not prop.text - def test_proppatch(self): + def test_proppatch(self) -> None: """Set/Remove a property and read it back.""" self.mkcalendar("/calendar.ics/") proppatch = get_file_content("proppatch_set_calendar_color.xml") _, responses = self.proppatch("/calendar.ics/", proppatch) - assert len(responses["/calendar.ics/"]) == 1 - status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"] + response = responses["/calendar.ics/"] + assert not isinstance(response, int) and len(response) == 1 + status, prop = response["ICAL:calendar-color"] assert status == 200 and not prop.text # Read property back propfind = get_file_content("propfind_calendar_color.xml") _, responses = self.propfind("/calendar.ics/", propfind) - assert len(responses["/calendar.ics/"]) == 1 - status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"] + response = responses["/calendar.ics/"] + assert not isinstance(response, int) and len(response) == 1 + status, prop = response["ICAL:calendar-color"] assert status == 200 and prop.text == "#BADA55" propfind = get_file_content("allprop.xml") _, responses = self.propfind("/calendar.ics/", propfind) - status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"] + response = responses["/calendar.ics/"] + assert not isinstance(response, int) + status, prop = response["ICAL:calendar-color"] assert status == 200 and prop.text == "#BADA55" # Remove property proppatch = get_file_content("proppatch_remove_calendar_color.xml") _, responses = self.proppatch("/calendar.ics/", proppatch) - assert len(responses["/calendar.ics/"]) == 1 - status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"] + response = responses["/calendar.ics/"] + assert not isinstance(response, int) and len(response) == 1 + status, prop = response["ICAL:calendar-color"] assert status == 200 and not prop.text # Read property back propfind = get_file_content("propfind_calendar_color.xml") _, responses = self.propfind("/calendar.ics/", propfind) - assert len(responses["/calendar.ics/"]) == 1 - status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"] + response = responses["/calendar.ics/"] + assert not isinstance(response, int) and len(response) == 1 + status, prop = response["ICAL:calendar-color"] assert status == 404 - def test_proppatch_multiple1(self): + def test_proppatch_multiple1(self) -> None: """Set/Remove a multiple properties and read them back.""" self.mkcalendar("/calendar.ics/") propfind = get_file_content("propfind_multiple.xml") proppatch = get_file_content("proppatch_set_multiple1.xml") _, responses = self.proppatch("/calendar.ics/", proppatch) - assert len(responses["/calendar.ics/"]) == 2 - status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"] + response = responses["/calendar.ics/"] + assert not isinstance(response, int) and len(response) == 2 + status, prop = response["ICAL:calendar-color"] assert status == 200 and not prop.text - status, prop = responses["/calendar.ics/"]["C:calendar-description"] + status, prop = response["C:calendar-description"] assert status == 200 and not prop.text # Read properties back _, responses = self.propfind("/calendar.ics/", propfind) - assert len(responses["/calendar.ics/"]) == 2 - status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"] + response = responses["/calendar.ics/"] + assert not isinstance(response, int) and len(response) == 2 + status, prop = response["ICAL:calendar-color"] assert status == 200 and prop.text == "#BADA55" - status, prop = responses["/calendar.ics/"]["C:calendar-description"] + status, prop = response["C:calendar-description"] assert status == 200 and prop.text == "test" # Remove properties proppatch = get_file_content("proppatch_remove_multiple1.xml") _, responses = self.proppatch("/calendar.ics/", proppatch) - assert len(responses["/calendar.ics/"]) == 2 - status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"] + response = responses["/calendar.ics/"] + assert not isinstance(response, int) and len(response) == 2 + status, prop = response["ICAL:calendar-color"] assert status == 200 and not prop.text - status, prop = responses["/calendar.ics/"]["C:calendar-description"] + status, prop = response["C:calendar-description"] assert status == 200 and not prop.text # Read properties back _, responses = self.propfind("/calendar.ics/", propfind) - assert len(responses["/calendar.ics/"]) == 2 - status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"] + response = responses["/calendar.ics/"] + assert not isinstance(response, int) and len(response) == 2 + status, prop = response["ICAL:calendar-color"] assert status == 404 - status, prop = responses["/calendar.ics/"]["C:calendar-description"] + status, prop = response["C:calendar-description"] assert status == 404 - def test_proppatch_multiple2(self): + def test_proppatch_multiple2(self) -> None: """Set/Remove a multiple properties and read them back.""" self.mkcalendar("/calendar.ics/") propfind = get_file_content("propfind_multiple.xml") proppatch = get_file_content("proppatch_set_multiple2.xml") _, responses = self.proppatch("/calendar.ics/", proppatch) - assert len(responses["/calendar.ics/"]) == 2 - status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"] + response = responses["/calendar.ics/"] + assert not isinstance(response, int) and len(response) == 2 + status, prop = response["ICAL:calendar-color"] assert status == 200 and not prop.text - status, prop = responses["/calendar.ics/"]["C:calendar-description"] + status, prop = response["C:calendar-description"] assert status == 200 and not prop.text # Read properties back _, responses = self.propfind("/calendar.ics/", propfind) - assert len(responses["/calendar.ics/"]) == 2 - status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"] + response = responses["/calendar.ics/"] + assert not isinstance(response, int) and len(response) == 2 + assert len(response) == 2 + status, prop = response["ICAL:calendar-color"] assert status == 200 and prop.text == "#BADA55" - status, prop = responses["/calendar.ics/"]["C:calendar-description"] + status, prop = response["C:calendar-description"] assert status == 200 and prop.text == "test" # Remove properties proppatch = get_file_content("proppatch_remove_multiple2.xml") _, responses = self.proppatch("/calendar.ics/", proppatch) - assert len(responses["/calendar.ics/"]) == 2 - status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"] + response = responses["/calendar.ics/"] + assert not isinstance(response, int) and len(response) == 2 + status, prop = response["ICAL:calendar-color"] assert status == 200 and not prop.text - status, prop = responses["/calendar.ics/"]["C:calendar-description"] + status, prop = response["C:calendar-description"] assert status == 200 and not prop.text # Read properties back _, responses = self.propfind("/calendar.ics/", propfind) - assert len(responses["/calendar.ics/"]) == 2 - status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"] + response = responses["/calendar.ics/"] + assert not isinstance(response, int) and len(response) == 2 + status, prop = response["ICAL:calendar-color"] assert status == 404 - status, prop = responses["/calendar.ics/"]["C:calendar-description"] + status, prop = response["C:calendar-description"] assert status == 404 - def test_proppatch_set_and_remove(self): + def test_proppatch_set_and_remove(self) -> None: """Set and remove multiple properties in single request.""" self.mkcalendar("/calendar.ics/") propfind = get_file_content("propfind_multiple.xml") @@ -540,20 +569,22 @@ class BaseRequestsMixIn: # Remove and set properties in single request proppatch = get_file_content("proppatch_set_and_remove.xml") _, responses = self.proppatch("/calendar.ics/", proppatch) - assert len(responses["/calendar.ics/"]) == 2 - status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"] + response = responses["/calendar.ics/"] + assert not isinstance(response, int) and len(response) == 2 + status, prop = response["ICAL:calendar-color"] assert status == 200 and not prop.text - status, prop = responses["/calendar.ics/"]["C:calendar-description"] + status, prop = response["C:calendar-description"] assert status == 200 and not prop.text # Read properties back _, responses = self.propfind("/calendar.ics/", propfind) - assert len(responses["/calendar.ics/"]) == 2 - status, prop = responses["/calendar.ics/"]["ICAL:calendar-color"] + response = responses["/calendar.ics/"] + assert not isinstance(response, int) and len(response) == 2 + status, prop = response["ICAL:calendar-color"] assert status == 404 - status, prop = responses["/calendar.ics/"]["C:calendar-description"] + status, prop = response["C:calendar-description"] assert status == 200 and prop.text == "test2" - def test_put_whole_calendar_multiple_events_with_same_uid(self): + def test_put_whole_calendar_multiple_events_with_same_uid(self) -> None: """Add two events with the same UID.""" self.put("/calendar.ics/", get_file_content("event2.ics")) _, responses = self.report("/calendar.ics/", """\ @@ -564,13 +595,18 @@ class BaseRequestsMixIn: """) assert len(responses) == 1 - status, prop = responses["/calendar.ics/event2.ics"]["D:getetag"] + response = responses["/calendar.ics/event2.ics"] + assert not isinstance(response, int) + status, prop = response["D:getetag"] assert status == 200 and prop.text _, answer = self.get("/calendar.ics/") assert answer.count("BEGIN:VEVENT") == 2 - def _test_filter(self, filters, kind="event", test=None, items=(1,)): + def _test_filter(self, filters: Iterable[str], kind: str = "event", + test: Optional[str] = None, items: Iterable[int] = (1,) + ) -> List[str]: filter_template = "%s" + create_collection_fn: Callable[[str], Any] if kind in ("event", "journal", "todo"): create_collection_fn = self.mkcalendar path = "/calendar.ics/" @@ -603,18 +639,19 @@ class BaseRequestsMixIn: {2} """.format(namespace, report, filters_text)) + assert responses is not None paths = [] for path, props in responses.items(): - assert len(props) == 1 + assert not isinstance(props, int) and len(props) == 1 status, prop = props["D:getetag"] assert status == 200 and prop.text paths.append(path) return paths - def test_addressbook_empty_filter(self): + def test_addressbook_empty_filter(self) -> None: self._test_filter([""], kind="contact") - def test_addressbook_prop_filter(self): + def test_addressbook_prop_filter(self) -> None: assert "/contacts.vcf/contact1.vcf" in self._test_filter(["""\ tes """], "contact") - def test_addressbook_prop_filter_any(self): + def test_addressbook_prop_filter_any(self) -> None: assert "/contacts.vcf/contact1.vcf" in self._test_filter(["""\ test @@ -688,7 +725,7 @@ class BaseRequestsMixIn: test """], "contact") - def test_addressbook_prop_filter_all(self): + def test_addressbook_prop_filter_all(self) -> None: assert "/contacts.vcf/contact1.vcf" in self._test_filter(["""\ tes @@ -704,15 +741,15 @@ class BaseRequestsMixIn: test """], "contact", test="allof") - def test_calendar_empty_filter(self): + def test_calendar_empty_filter(self) -> None: self._test_filter([""]) - def test_calendar_tag_filter(self): + def test_calendar_tag_filter(self) -> None: """Report request with tag-based filter on calendar.""" assert "/calendar.ics/event1.ics" in self._test_filter(["""\ """]) - def test_item_tag_filter(self): + def test_item_tag_filter(self) -> None: """Report request with tag-based filter on an item.""" assert "/calendar.ics/event1.ics" in self._test_filter(["""\ @@ -723,7 +760,7 @@ class BaseRequestsMixIn: """]) - def test_item_not_tag_filter(self): + def test_item_not_tag_filter(self) -> None: """Report request with tag-based is-not filter on an item.""" assert "/calendar.ics/event1.ics" not in self._test_filter(["""\ @@ -738,7 +775,7 @@ class BaseRequestsMixIn: """]) - def test_item_prop_filter(self): + def test_item_prop_filter(self) -> None: """Report request with prop-based filter on an item.""" assert "/calendar.ics/event1.ics" in self._test_filter(["""\ @@ -753,7 +790,7 @@ class BaseRequestsMixIn: """]) - def test_item_not_prop_filter(self): + def test_item_not_prop_filter(self) -> None: """Report request with prop-based is-not filter on an item.""" assert "/calendar.ics/event1.ics" not in self._test_filter(["""\ @@ -772,7 +809,7 @@ class BaseRequestsMixIn: """]) - def test_mutiple_filters(self): + def test_mutiple_filters(self) -> None: """Report request with multiple filters on an item.""" assert "/calendar.ics/event1.ics" not in self._test_filter(["""\ @@ -812,7 +849,7 @@ class BaseRequestsMixIn: """]) - def test_text_match_filter(self): + def test_text_match_filter(self) -> None: """Report request with text-match filter on calendar.""" assert "/calendar.ics/event1.ics" in self._test_filter(["""\ @@ -847,7 +884,7 @@ class BaseRequestsMixIn: """]) - def test_param_filter(self): + def test_param_filter(self) -> None: """Report request with param-filter on calendar.""" assert "/calendar.ics/event1.ics" in self._test_filter(["""\ @@ -892,7 +929,7 @@ class BaseRequestsMixIn: """]) - def test_time_range_filter_events(self): + def test_time_range_filter_events(self) -> None: """Report request with time-range filter on events.""" answer = self._test_filter(["""\ @@ -1019,7 +1056,7 @@ class BaseRequestsMixIn: """], items=(9,)) assert "/calendar.ics/event9.ics" not in answer - def test_time_range_filter_events_rrule(self): + def test_time_range_filter_events_rrule(self) -> None: """Report request with time-range filter on events with rrules.""" answer = self._test_filter(["""\ @@ -1054,7 +1091,7 @@ class BaseRequestsMixIn: assert "/calendar.ics/event1.ics" not in answer assert "/calendar.ics/event2.ics" not in answer - def test_time_range_filter_todos(self): + def test_time_range_filter_todos(self) -> None: """Report request with time-range filter on todos.""" answer = self._test_filter(["""\ @@ -1113,7 +1150,7 @@ class BaseRequestsMixIn: """], "todo", items=range(1, 9)) assert "/calendar.ics/todo7.ics" in answer - def test_time_range_filter_todos_rrule(self): + def test_time_range_filter_todos_rrule(self) -> None: """Report request with time-range filter on todos with rrules.""" answer = self._test_filter(["""\ @@ -1157,7 +1194,7 @@ class BaseRequestsMixIn: """], "todo", items=(9,)) assert "/calendar.ics/todo9.ics" not in answer - def test_time_range_filter_journals(self): + def test_time_range_filter_journals(self) -> None: """Report request with time-range filter on journals.""" answer = self._test_filter(["""\ @@ -1205,7 +1242,7 @@ class BaseRequestsMixIn: assert "/calendar.ics/journal2.ics" in answer assert "/calendar.ics/journal3.ics" in answer - def test_time_range_filter_journals_rrule(self): + def test_time_range_filter_journals_rrule(self) -> None: """Report request with time-range filter on journals with rrules.""" answer = self._test_filter(["""\ @@ -1232,7 +1269,7 @@ class BaseRequestsMixIn: assert "/calendar.ics/journal1.ics" not in answer assert "/calendar.ics/journal2.ics" not in answer - def test_report_item(self): + def test_report_item(self) -> None: """Test report request on an item""" calendar_path = "/calendar.ics/" self.mkcalendar(calendar_path) @@ -1247,10 +1284,14 @@ class BaseRequestsMixIn: """) assert len(responses) == 1 - status, prop = responses[event_path]["D:getetag"] + response = responses[event_path] + assert not isinstance(response, int) + status, prop = response["D:getetag"] assert status == 200 and prop.text - def _report_sync_token(self, calendar_path, sync_token=None): + def _report_sync_token( + self, calendar_path: str, sync_token: Optional[str] = None + ) -> Tuple[str, RESPONSES]: sync_token_xml = ( "" % sync_token if sync_token else "") @@ -1267,7 +1308,7 @@ class BaseRequestsMixIn: assert xml.tag == xmlutils.make_clark("D:error") assert sync_token and xml.find( xmlutils.make_clark("D:valid-sync-token")) is not None - return None, None + return "", {} assert status == 207 assert xml.tag == xmlutils.make_clark("D:multistatus") sync_token = xml.find(xmlutils.make_clark("D:sync-token")).text.strip() @@ -1281,7 +1322,7 @@ class BaseRequestsMixIn: assert response in (200, 404) return sync_token, responses - def test_report_sync_collection_no_change(self): + def test_report_sync_collection_no_change(self) -> None: """Test sync-collection report without modifying the collection""" calendar_path = "/calendar.ics/" self.mkcalendar(calendar_path) @@ -1296,7 +1337,7 @@ class BaseRequestsMixIn: return assert sync_token == new_sync_token and len(responses) == 0 - def test_report_sync_collection_add(self): + def test_report_sync_collection_add(self) -> None: """Test sync-collection report with an added item""" calendar_path = "/calendar.ics/" self.mkcalendar(calendar_path) @@ -1311,7 +1352,7 @@ class BaseRequestsMixIn: return assert len(responses) == 1 and responses[event_path] == 200 - def test_report_sync_collection_delete(self): + def test_report_sync_collection_delete(self) -> None: """Test sync-collection report with a deleted item""" calendar_path = "/calendar.ics/" self.mkcalendar(calendar_path) @@ -1327,7 +1368,7 @@ class BaseRequestsMixIn: return assert len(responses) == 1 and responses[event_path] == 404 - def test_report_sync_collection_create_delete(self): + def test_report_sync_collection_create_delete(self) -> None: """Test sync-collection report with a created and deleted item""" calendar_path = "/calendar.ics/" self.mkcalendar(calendar_path) @@ -1343,7 +1384,7 @@ class BaseRequestsMixIn: return assert len(responses) == 1 and responses[event_path] == 404 - def test_report_sync_collection_modify_undo(self): + def test_report_sync_collection_modify_undo(self) -> None: """Test sync-collection report with a modified and changed back item""" calendar_path = "/calendar.ics/" self.mkcalendar(calendar_path) @@ -1361,7 +1402,7 @@ class BaseRequestsMixIn: return assert len(responses) == 1 and responses[event_path] == 200 - def test_report_sync_collection_move(self): + def test_report_sync_collection_move(self) -> None: """Test sync-collection report a moved item""" calendar_path = "/calendar.ics/" self.mkcalendar(calendar_path) @@ -1381,7 +1422,7 @@ class BaseRequestsMixIn: assert len(responses) == 2 and (responses[event1_path] == 404 and responses[event2_path] == 200) - def test_report_sync_collection_move_undo(self): + def test_report_sync_collection_move_undo(self) -> None: """Test sync-collection report with a moved and moved back item""" calendar_path = "/calendar.ics/" self.mkcalendar(calendar_path) @@ -1404,7 +1445,7 @@ class BaseRequestsMixIn: assert len(responses) == 2 and (responses[event1_path] == 200 and responses[event2_path] == 404) - def test_report_sync_collection_invalid_sync_token(self): + def test_report_sync_collection_invalid_sync_token(self) -> None: """Test sync-collection report with an invalid sync token""" calendar_path = "/calendar.ics/" self.mkcalendar(calendar_path) @@ -1412,34 +1453,40 @@ class BaseRequestsMixIn: calendar_path, "http://radicale.org/ns/sync/INVALID") assert not sync_token - def test_propfind_sync_token(self): + def test_propfind_sync_token(self) -> None: """Retrieve the sync-token with a propfind request""" calendar_path = "/calendar.ics/" self.mkcalendar(calendar_path) propfind = get_file_content("allprop.xml") _, responses = self.propfind(calendar_path, propfind) - status, sync_token = responses[calendar_path]["D:sync-token"] + response = responses[calendar_path] + assert not isinstance(response, int) + status, sync_token = response["D:sync-token"] assert status == 200 and sync_token.text event = get_file_content("event1.ics") event_path = posixpath.join(calendar_path, "event.ics") self.put(event_path, event) _, responses = self.propfind(calendar_path, propfind) - status, new_sync_token = responses[calendar_path]["D:sync-token"] + response = responses[calendar_path] + assert not isinstance(response, int) + status, new_sync_token = response["D:sync-token"] assert status == 200 and new_sync_token.text assert sync_token.text != new_sync_token.text - def test_propfind_same_as_sync_collection_sync_token(self): + def test_propfind_same_as_sync_collection_sync_token(self) -> None: """Compare sync-token property with sync-collection sync-token""" calendar_path = "/calendar.ics/" self.mkcalendar(calendar_path) propfind = get_file_content("allprop.xml") _, responses = self.propfind(calendar_path, propfind) - status, sync_token = responses[calendar_path]["D:sync-token"] + response = responses[calendar_path] + assert not isinstance(response, int) + status, sync_token = response["D:sync-token"] assert status == 200 and sync_token.text report_sync_token, _ = self._report_sync_token(calendar_path) assert sync_token.text == report_sync_token - def test_calendar_getcontenttype(self): + def test_calendar_getcontenttype(self) -> None: """Test report request on an item""" self.mkcalendar("/test/") for component in ("event", "todo", "journal"): @@ -1454,14 +1501,15 @@ class BaseRequestsMixIn: """) - assert len(responses) == 1 and len( - responses["/test/test.ics"]) == 1 - status, prop = responses["/test/test.ics"]["D:getcontenttype"] + assert len(responses) == 1 + response = responses["/test/test.ics"] + assert not isinstance(response, int) and len(response) == 1 + status, prop = response["D:getcontenttype"] assert status == 200 and prop.text == ( "text/calendar;charset=utf-8;component=V%s" % component.upper()) - def test_addressbook_getcontenttype(self): + def test_addressbook_getcontenttype(self) -> None: """Test report request on an item""" self.create_addressbook("/test/") contact = get_file_content("contact1.vcf") @@ -1473,11 +1521,13 @@ class BaseRequestsMixIn: """) - assert len(responses) == 1 and len(responses["/test/test.vcf"]) == 1 - status, prop = responses["/test/test.vcf"]["D:getcontenttype"] + assert len(responses) == 1 + response = responses["/test/test.vcf"] + assert not isinstance(response, int) and len(response) == 1 + status, prop = response["D:getcontenttype"] assert status == 200 and prop.text == "text/vcard;charset=utf-8" - def test_authorization(self): + def test_authorization(self) -> None: _, responses = self.propfind("/", """\ @@ -1485,12 +1535,14 @@ class BaseRequestsMixIn: """, login="user:") - assert len(responses["/"]) == 1 - status, prop = responses["/"]["D:current-user-principal"] + response = responses["/"] + assert not isinstance(response, int) and len(response) == 1 + status, prop = response["D:current-user-principal"] assert status == 200 and len(prop) == 1 - assert prop.find(xmlutils.make_clark("D:href")).text == "/user/" + element = prop.find(xmlutils.make_clark("D:href")) + assert element is not None and element.text == "/user/" - def test_authentication(self): + def test_authentication(self) -> None: """Test if server sends authentication request.""" self.configuration.update({ "auth": {"type": "htpasswd", @@ -1502,11 +1554,11 @@ class BaseRequestsMixIn: assert status in (401, 403) assert headers.get("WWW-Authenticate") - def test_principal_collection_creation(self): + def test_principal_collection_creation(self) -> None: """Verify existence of the principal collection.""" self.propfind("/user/", login="user:") - def test_authentication_current_user_principal_workaround(self): + def test_authentication_current_user_principal_hack(self) -> None: """Test if server sends authentication request when accessing current-user-principal prop (workaround for DAVx5).""" status, headers, _ = self.request("PROPFIND", "/", """\ @@ -1519,7 +1571,7 @@ class BaseRequestsMixIn: assert status in (401, 403) assert headers.get("WWW-Authenticate") - def test_existence_of_root_collections(self): + def test_existence_of_root_collections(self) -> None: """Verify that the root collection always exists.""" # Use PROPFIND because GET returns message self.propfind("/") @@ -1527,7 +1579,7 @@ class BaseRequestsMixIn: self.delete("/") self.propfind("/") - def test_custom_headers(self): + def test_custom_headers(self) -> None: self.configuration.update({"headers": {"test": "123"}}, "test") self.application = Application(self.configuration) # Test if header is set on success @@ -1541,7 +1593,7 @@ class BaseRequestsMixIn: @pytest.mark.skipif(sys.version_info < (3, 6), reason="Unsupported in Python < 3.6") - def test_timezone_seconds(self): + def test_timezone_seconds(self) -> None: """Verify that timezones with minutes and seconds work.""" self.mkcalendar("/calendar.ics/") event = get_file_content("event_timezone_seconds.ics") @@ -1551,11 +1603,10 @@ class BaseRequestsMixIn: class BaseFileSystemTest(BaseTest): """Base class for filesystem backend tests.""" - storage_type: ClassVar[Any] + storage_type: ClassVar[StorageType] - def setup(self): - self.configuration = config.load() - self.colpath = tempfile.mkdtemp() + def setup(self) -> None: + super().setup() # Allow access to anything for tests rights_file_path = os.path.join(self.colpath, "rights") with open(rights_file_path, "w") as f: @@ -1565,23 +1616,18 @@ user: .* collection: .* permissions: RrWw""") self.configuration.update({ - "storage": {"type": self.storage_type, - "filesystem_folder": self.colpath, - # Disable syncing to disk for better performance - "_filesystem_fsync": "False"}, + "storage": {"type": self.storage_type}, "rights": {"file": rights_file_path, "type": "from_file"}}, "test", privileged=True) self.application = Application(self.configuration) - def teardown(self): - shutil.rmtree(self.colpath) - class TestMultiFileSystem(BaseFileSystemTest, BaseRequestsMixIn): """Test BaseRequests on multifilesystem.""" - storage_type = "multifilesystem" - def test_folder_creation(self): + storage_type: ClassVar[StorageType] = "multifilesystem" + + def test_folder_creation(self) -> None: """Verify that the folder is created.""" folder = os.path.join(self.colpath, "subfolder") self.configuration.update( @@ -1589,14 +1635,14 @@ class TestMultiFileSystem(BaseFileSystemTest, BaseRequestsMixIn): self.application = Application(self.configuration) assert os.path.isdir(folder) - def test_fsync(self): + def test_fsync(self) -> None: """Create a directory and file with syncing enabled.""" self.configuration.update({"storage": {"_filesystem_fsync": "True"}}, "test", privileged=True) self.application = Application(self.configuration) self.mkcalendar("/calendar.ics/") - def test_hook(self): + def test_hook(self) -> None: """Run hook.""" self.configuration.update({"storage": { "hook": ("mkdir %s" % os.path.join( @@ -1605,7 +1651,7 @@ class TestMultiFileSystem(BaseFileSystemTest, BaseRequestsMixIn): self.mkcalendar("/calendar.ics/") self.propfind("/created_by_hook/") - def test_hook_read_access(self): + def test_hook_read_access(self) -> None: """Verify that hook is not run for read accesses.""" self.configuration.update({"storage": { "hook": ("mkdir %s" % os.path.join( @@ -1616,14 +1662,14 @@ class TestMultiFileSystem(BaseFileSystemTest, BaseRequestsMixIn): @pytest.mark.skipif(not shutil.which("flock"), reason="flock command not found") - def test_hook_storage_locked(self): + def test_hook_storage_locked(self) -> None: """Verify that the storage is locked when the hook runs.""" self.configuration.update({"storage": {"hook": ( "flock -n .Radicale.lock || exit 0; exit 1")}}, "test") self.application = Application(self.configuration) self.mkcalendar("/calendar.ics/") - def test_hook_principal_collection_creation(self): + def test_hook_principal_collection_creation(self) -> None: """Verify that the hooks runs when a new user is created.""" self.configuration.update({"storage": { "hook": ("mkdir %s" % os.path.join( @@ -1632,13 +1678,13 @@ class TestMultiFileSystem(BaseFileSystemTest, BaseRequestsMixIn): self.propfind("/", login="user:") self.propfind("/created_by_hook/") - def test_hook_fail(self): + def test_hook_fail(self) -> None: """Verify that a request fails if the hook fails.""" self.configuration.update({"storage": {"hook": "exit 1"}}, "test") self.application = Application(self.configuration) self.mkcalendar("/calendar.ics/", check=500) - def test_item_cache_rebuild(self): + def test_item_cache_rebuild(self) -> None: """Delete the item cache and verify that it is rebuild.""" self.mkcalendar("/calendar.ics/") event = get_file_content("event1.ics") @@ -1655,7 +1701,7 @@ class TestMultiFileSystem(BaseFileSystemTest, BaseRequestsMixIn): @pytest.mark.skipif(os.name != "posix" and sys.platform != "win32", reason="Only supported on 'posix' and 'win32'") - def test_put_whole_calendar_uids_used_as_file_names(self): + def test_put_whole_calendar_uids_used_as_file_names(self) -> None: """Test if UIDs are used as file names.""" BaseRequestsMixIn.test_put_whole_calendar(self) for uid in ("todo", "event"): @@ -1664,21 +1710,23 @@ class TestMultiFileSystem(BaseFileSystemTest, BaseRequestsMixIn): @pytest.mark.skipif(os.name != "posix" and sys.platform != "win32", reason="Only supported on 'posix' and 'win32'") - def test_put_whole_calendar_random_uids_used_as_file_names(self): + def test_put_whole_calendar_random_uids_used_as_file_names(self) -> None: """Test if UIDs are used as file names.""" BaseRequestsMixIn.test_put_whole_calendar_without_uids(self) _, answer = self.get("/calendar.ics") + assert answer is not None uids = [] for line in answer.split("\r\n"): if line.startswith("UID:"): uids.append(line[len("UID:"):]) for uid in uids: _, answer = self.get("/calendar.ics/%s.ics" % uid) + assert answer is not None assert "\r\nUID:%s\r\n" % uid in answer @pytest.mark.skipif(os.name != "posix" and sys.platform != "win32", reason="Only supported on 'posix' and 'win32'") - def test_put_whole_addressbook_uids_used_as_file_names(self): + def test_put_whole_addressbook_uids_used_as_file_names(self) -> None: """Test if UIDs are used as file names.""" BaseRequestsMixIn.test_put_whole_addressbook(self) for uid in ("contact1", "contact2"): @@ -1687,27 +1735,33 @@ class TestMultiFileSystem(BaseFileSystemTest, BaseRequestsMixIn): @pytest.mark.skipif(os.name != "posix" and sys.platform != "win32", reason="Only supported on 'posix' and 'win32'") - def test_put_whole_addressbook_random_uids_used_as_file_names(self): + def test_put_whole_addressbook_random_uids_used_as_file_names( + self) -> None: """Test if UIDs are used as file names.""" BaseRequestsMixIn.test_put_whole_addressbook_without_uids(self) _, answer = self.get("/contacts.vcf") + assert answer is not None uids = [] for line in answer.split("\r\n"): if line.startswith("UID:"): uids.append(line[len("UID:"):]) for uid in uids: _, answer = self.get("/contacts.vcf/%s.vcf" % uid) + assert answer is not None assert "\r\nUID:%s\r\n" % uid in answer class TestCustomStorageSystem(BaseFileSystemTest): """Test custom backend loading.""" - storage_type = "radicale.tests.custom.storage_simple_sync" - full_sync_token_support = False + + storage_type: ClassVar[StorageType] = ( + "radicale.tests.custom.storage_simple_sync") + full_sync_token_support: ClassVar[bool] = False + test_root = BaseRequestsMixIn.test_root _report_sync_token = BaseRequestsMixIn._report_sync_token # include tests related to sync token - s = None + s: str = "" for s in dir(BaseRequestsMixIn): if s.startswith("test_") and ("_sync_" in s or s.endswith("_sync")): locals()[s] = getattr(BaseRequestsMixIn, s) @@ -1716,5 +1770,8 @@ class TestCustomStorageSystem(BaseFileSystemTest): class TestCustomStorageSystemCallable(BaseFileSystemTest): """Test custom backend loading with ``callable``.""" - storage_type = radicale.tests.custom.storage_simple_sync.Storage + + storage_type: ClassVar[StorageType] = ( + radicale.tests.custom.storage_simple_sync.Storage) + test_add_event = BaseRequestsMixIn.test_add_event diff --git a/radicale/tests/test_config.py b/radicale/tests/test_config.py index e9177cb..384cbca 100644 --- a/radicale/tests/test_config.py +++ b/radicale/tests/test_config.py @@ -18,23 +18,26 @@ import os import shutil import tempfile from configparser import RawConfigParser +from typing import List, Tuple import pytest -from radicale import config +from radicale import config, types from radicale.tests.helpers import configuration_to_dict class TestConfig: """Test the configuration.""" - def setup(self): + colpath: str + + def setup(self) -> None: self.colpath = tempfile.mkdtemp() - def teardown(self): + def teardown(self) -> None: shutil.rmtree(self.colpath) - def _write_config(self, config_dict, name): + def _write_config(self, config_dict: types.CONFIG, name: str) -> str: parser = RawConfigParser() parser.read_dict(config_dict) config_path = os.path.join(self.colpath, name) @@ -42,7 +45,7 @@ class TestConfig: parser.write(f) return config_path - def test_parse_compound_paths(self): + def test_parse_compound_paths(self) -> None: assert len(config.parse_compound_paths()) == 0 assert len(config.parse_compound_paths("")) == 0 assert len(config.parse_compound_paths(None, "")) == 0 @@ -62,16 +65,16 @@ class TestConfig: assert os.path.basename(paths[i][0]) == name assert paths[i][1] is ignore_if_missing - def test_load_empty(self): + def test_load_empty(self) -> None: config_path = self._write_config({}, "config") config.load([(config_path, False)]) - def test_load_full(self): + def test_load_full(self) -> None: config_path = self._write_config( configuration_to_dict(config.load()), "config") config.load([(config_path, False)]) - def test_load_missing(self): + def test_load_missing(self) -> None: config_path = os.path.join(self.colpath, "does_not_exist") config.load([(config_path, True)]) with pytest.raises(Exception) as exc_info: @@ -79,18 +82,20 @@ class TestConfig: e = exc_info.value assert "Failed to load config file %r" % config_path in str(e) - def test_load_multiple(self): + def test_load_multiple(self) -> None: config_path1 = self._write_config({ "server": {"hosts": "192.0.2.1:1111"}}, "config1") config_path2 = self._write_config({ "server": {"max_connections": 1111}}, "config2") configuration = config.load([(config_path1, False), (config_path2, False)]) - assert len(configuration.get("server", "hosts")) == 1 - assert configuration.get("server", "hosts")[0] == ("192.0.2.1", 1111) + server_hosts: List[Tuple[str, int]] = configuration.get( + "server", "hosts") + assert len(server_hosts) == 1 + assert server_hosts[0] == ("192.0.2.1", 1111) assert configuration.get("server", "max_connections") == 1111 - def test_copy(self): + def test_copy(self) -> None: configuration1 = config.load() configuration1.update({"server": {"max_connections": "1111"}}, "test") configuration2 = configuration1.copy() @@ -98,14 +103,14 @@ class TestConfig: assert configuration1.get("server", "max_connections") == 1111 assert configuration2.get("server", "max_connections") == 1112 - def test_invalid_section(self): + def test_invalid_section(self) -> None: configuration = config.load() with pytest.raises(Exception) as exc_info: configuration.update({"does_not_exist": {"x": "x"}}, "test") e = exc_info.value assert "Invalid section 'does_not_exist'" in str(e) - def test_invalid_option(self): + def test_invalid_option(self) -> None: configuration = config.load() with pytest.raises(Exception) as exc_info: configuration.update({"server": {"x": "x"}}, "test") @@ -113,7 +118,7 @@ class TestConfig: assert "Invalid option 'x'" in str(e) assert "section 'server'" in str(e) - def test_invalid_option_plugin(self): + def test_invalid_option_plugin(self) -> None: configuration = config.load() with pytest.raises(Exception) as exc_info: configuration.update({"auth": {"x": "x"}}, "test") @@ -121,7 +126,7 @@ class TestConfig: assert "Invalid option 'x'" in str(e) assert "section 'auth'" in str(e) - def test_invalid_value(self): + def test_invalid_value(self) -> None: configuration = config.load() with pytest.raises(Exception) as exc_info: configuration.update({"server": {"max_connections": "x"}}, "test") @@ -131,7 +136,7 @@ class TestConfig: assert "section 'server" in str(e) assert "'x'" in str(e) - def test_privileged(self): + def test_privileged(self) -> None: configuration = config.load() configuration.update({"server": {"_internal_server": "True"}}, "test", privileged=True) @@ -141,9 +146,9 @@ class TestConfig: e = exc_info.value assert "Invalid option '_internal_server'" in str(e) - def test_plugin_schema(self): - plugin_schema = {"auth": {"new_option": {"value": "False", - "type": bool}}} + def test_plugin_schema(self) -> None: + plugin_schema: types.CONFIG_SCHEMA = { + "auth": {"new_option": {"value": "False", "type": bool}}} configuration = config.load() configuration.update({"auth": {"type": "new_plugin"}}, "test") plugin_configuration = configuration.copy(plugin_schema) @@ -152,26 +157,26 @@ class TestConfig: plugin_configuration = configuration.copy(plugin_schema) assert plugin_configuration.get("auth", "new_option") is True - def test_plugin_schema_duplicate_option(self): - plugin_schema = {"auth": {"type": {"value": "False", - "type": bool}}} + def test_plugin_schema_duplicate_option(self) -> None: + plugin_schema: types.CONFIG_SCHEMA = { + "auth": {"type": {"value": "False", "type": bool}}} configuration = config.load() with pytest.raises(Exception) as exc_info: configuration.copy(plugin_schema) e = exc_info.value assert "option already exists in 'auth': 'type'" in str(e) - def test_plugin_schema_invalid(self): - plugin_schema = {"server": {"new_option": {"value": "False", - "type": bool}}} + def test_plugin_schema_invalid(self) -> None: + plugin_schema: types.CONFIG_SCHEMA = { + "server": {"new_option": {"value": "False", "type": bool}}} configuration = config.load() with pytest.raises(Exception) as exc_info: configuration.copy(plugin_schema) e = exc_info.value assert "not a plugin section: 'server" in str(e) - def test_plugin_schema_option_invalid(self): - plugin_schema = {"auth": {}} + def test_plugin_schema_option_invalid(self) -> None: + plugin_schema: types.CONFIG_SCHEMA = {"auth": {}} configuration = config.load() configuration.update({"auth": {"type": "new_plugin", "new_option": False}}, "test") diff --git a/radicale/tests/test_rights.py b/radicale/tests/test_rights.py index 951cf55..e9b9278 100644 --- a/radicale/tests/test_rights.py +++ b/radicale/tests/test_rights.py @@ -19,10 +19,8 @@ Radicale tests with simple requests and rights. """ import os -import shutil -import tempfile -from radicale import Application, config +from radicale import Application from radicale.tests import BaseTest from radicale.tests.helpers import get_file_content @@ -30,20 +28,8 @@ from radicale.tests.helpers import get_file_content class TestBaseRightsRequests(BaseTest): """Tests basic requests with rights.""" - def setup(self): - self.configuration = config.load() - self.colpath = tempfile.mkdtemp() - self.configuration.update({ - "storage": {"filesystem_folder": self.colpath, - # Disable syncing to disk for better performance - "_filesystem_fsync": "False"}}, - "test", privileged=True) - - def teardown(self): - shutil.rmtree(self.colpath) - - def _test_rights(self, rights_type, user, path, mode, expected_status, - with_auth=True): + def _test_rights(self, rights_type: str, user: str, path: str, mode: str, + expected_status: int, with_auth: bool = True) -> None: assert mode in ("r", "w") assert user in ("", "tmp") htpasswd_file_path = os.path.join(self.colpath, ".htpasswd") @@ -61,7 +47,7 @@ class TestBaseRightsRequests(BaseTest): (self.propfind if mode == "r" else self.proppatch)( path, check=expected_status, login="tmp:bepo" if user else None) - def test_owner_only(self): + def test_owner_only(self) -> None: self._test_rights("owner_only", "", "/", "r", 401) self._test_rights("owner_only", "", "/", "w", 401) self._test_rights("owner_only", "", "/tmp/", "r", 401) @@ -73,13 +59,13 @@ class TestBaseRightsRequests(BaseTest): self._test_rights("owner_only", "tmp", "/other/", "r", 403) self._test_rights("owner_only", "tmp", "/other/", "w", 403) - def test_owner_only_without_auth(self): + def test_owner_only_without_auth(self) -> None: self._test_rights("owner_only", "", "/", "r", 207, False) self._test_rights("owner_only", "", "/", "w", 401, False) self._test_rights("owner_only", "", "/tmp/", "r", 207, False) self._test_rights("owner_only", "", "/tmp/", "w", 207, False) - def test_owner_write(self): + def test_owner_write(self) -> None: self._test_rights("owner_write", "", "/", "r", 401) self._test_rights("owner_write", "", "/", "w", 401) self._test_rights("owner_write", "", "/tmp/", "r", 401) @@ -91,13 +77,13 @@ class TestBaseRightsRequests(BaseTest): self._test_rights("owner_write", "tmp", "/other/", "r", 207) self._test_rights("owner_write", "tmp", "/other/", "w", 403) - def test_owner_write_without_auth(self): + def test_owner_write_without_auth(self) -> None: self._test_rights("owner_write", "", "/", "r", 207, False) self._test_rights("owner_write", "", "/", "w", 401, False) self._test_rights("owner_write", "", "/tmp/", "r", 207, False) self._test_rights("owner_write", "", "/tmp/", "w", 207, False) - def test_authenticated(self): + def test_authenticated(self) -> None: self._test_rights("authenticated", "", "/", "r", 401) self._test_rights("authenticated", "", "/", "w", 401) self._test_rights("authenticated", "", "/tmp/", "r", 401) @@ -109,13 +95,13 @@ class TestBaseRightsRequests(BaseTest): self._test_rights("authenticated", "tmp", "/other/", "r", 207) self._test_rights("authenticated", "tmp", "/other/", "w", 207) - def test_authenticated_without_auth(self): + def test_authenticated_without_auth(self) -> None: self._test_rights("authenticated", "", "/", "r", 207, False) self._test_rights("authenticated", "", "/", "w", 207, False) self._test_rights("authenticated", "", "/tmp/", "r", 207, False) self._test_rights("authenticated", "", "/tmp/", "w", 207, False) - def test_from_file(self): + def test_from_file(self) -> None: rights_file_path = os.path.join(self.colpath, "rights") with open(rights_file_path, "w") as f: f.write("""\ @@ -160,13 +146,13 @@ permissions: i""") self.get("/public/calendar") self.get("/public/calendar/1.ics", check=401) - def test_custom(self): + def test_custom(self) -> None: """Custom rights management.""" self._test_rights("radicale.tests.custom.rights", "", "/", "r", 401) self._test_rights( "radicale.tests.custom.rights", "", "/tmp/", "r", 207) - def test_collections_and_items(self): + def test_collections_and_items(self) -> None: """Test rights for creation of collections, calendars and items. Collections are allowed at "/" and "/.../". @@ -183,7 +169,7 @@ permissions: i""") self.mkcol("/user/calendar/item", check=401) self.mkcalendar("/user/calendar/item", check=401) - def test_put_collections_and_items(self): + def test_put_collections_and_items(self) -> None: """Test rights for creation of calendars and items with PUT.""" self.application = Application(self.configuration) self.put("/user/", "BEGIN:VCALENDAR\r\nEND:VCALENDAR", check=401) diff --git a/radicale/tests/test_server.py b/radicale/tests/test_server.py index 72ff52b..956bc85 100644 --- a/radicale/tests/test_server.py +++ b/radicale/tests/test_server.py @@ -21,15 +21,14 @@ Test the internal server. import errno import os -import shutil import socket import ssl import subprocess import sys -import tempfile import threading import time from configparser import RawConfigParser +from typing import Callable, Dict, NoReturn, Optional, Tuple, cast from urllib import request from urllib.error import HTTPError, URLError @@ -41,34 +40,43 @@ from radicale.tests.helpers import configuration_to_dict, get_file_path class DisabledRedirectHandler(request.HTTPRedirectHandler): - def http_error_301(self, req, fp, code, msg, headers): + + # HACK: typeshed annotation are wrong for `fp` and `msg` + # (https://github.com/python/typeshed/pull/5728) + # `headers` is incompatible with `http.client.HTTPMessage` + # (https://github.com/python/typeshed/issues/5729) + def http_error_301(self, req: request.Request, fp, code: int, + msg, headers) -> NoReturn: raise HTTPError(req.full_url, code, msg, headers, fp) - def http_error_302(self, req, fp, code, msg, headers): + def http_error_302(self, req: request.Request, fp, code: int, + msg, headers) -> NoReturn: raise HTTPError(req.full_url, code, msg, headers, fp) - def http_error_303(self, req, fp, code, msg, headers): + def http_error_303(self, req: request.Request, fp, code: int, + msg, headers) -> NoReturn: raise HTTPError(req.full_url, code, msg, headers, fp) - def http_error_307(self, req, fp, code, msg, headers): + def http_error_307(self, req: request.Request, fp, code: int, + msg, headers) -> NoReturn: raise HTTPError(req.full_url, code, msg, headers, fp) class TestBaseServerRequests(BaseTest): """Test the internal server.""" - def setup(self): - self.configuration = config.load() - self.colpath = tempfile.mkdtemp() + shutdown_socket: socket.socket + thread: threading.Thread + opener: request.OpenerDirector + + def setup(self) -> None: + super().setup() self.shutdown_socket, shutdown_socket_out = socket.socketpair() with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: # Find available port sock.bind(("127.0.0.1", 0)) self.sockname = sock.getsockname() self.configuration.update({ - "storage": {"filesystem_folder": self.colpath, - # Disable syncing to disk for better performance - "_filesystem_fsync": "False"}, "server": {"hosts": "[%s]:%d" % self.sockname}, # Enable debugging for new processes "logging": {"level": "debug"}}, @@ -82,40 +90,57 @@ class TestBaseServerRequests(BaseTest): request.HTTPSHandler(context=ssl_context), DisabledRedirectHandler) - def teardown(self): + def teardown(self) -> None: self.shutdown_socket.close() try: self.thread.join() except RuntimeError: # Thread never started pass - shutil.rmtree(self.colpath) + super().teardown() - def request(self, method, path, data=None, is_alive_fn=None, **headers): + def request(self, method: str, path: str, data: Optional[str] = None, + **kwargs) -> Tuple[int, Dict[str, str], str]: """Send a request.""" + login = kwargs.pop("login", None) + if login is not None and not isinstance(login, str): + raise TypeError("login argument must be %r, not %r" % + (str, type(login))) + if login: + raise NotImplementedError + is_alive_fn: Optional[Callable[[], bool]] = kwargs.pop( + "is_alive_fn", None) + headers: Dict[str, str] = kwargs + for k, v in headers.items(): + if not isinstance(v, str): + raise TypeError("type of %r is %r, expected %r" % + (k, type(v), str)) if is_alive_fn is None: is_alive_fn = self.thread.is_alive - scheme = ("https" if self.configuration.get("server", "ssl") else - "http") + encoding: str = self.configuration.get("encoding", "request") + scheme = "https" if self.configuration.get("server", "ssl") else "http" + data_bytes = None + if data: + data_bytes = data.encode(encoding) req = request.Request( "%s://[%s]:%d%s" % (scheme, *self.sockname, path), - data=data, headers=headers, method=method) + data=data_bytes, headers=headers, method=method) while True: assert is_alive_fn() try: with self.opener.open(req) as f: - return f.getcode(), f.info(), f.read().decode() + return f.getcode(), dict(f.info()), f.read().decode() except HTTPError as e: - return e.code, e.headers, e.read().decode() + return e.code, dict(e.headers), e.read().decode() except URLError as e: if not isinstance(e.reason, ConnectionRefusedError): raise time.sleep(0.1) - def test_root(self): + def test_root(self) -> None: self.thread.start() self.get("/", check=302) - def test_ssl(self): + def test_ssl(self) -> None: self.configuration.update({ "server": {"ssl": "True", "certificate": get_file_path("cert.pem"), @@ -123,7 +148,7 @@ class TestBaseServerRequests(BaseTest): self.thread.start() self.get("/", check=302) - def test_bind_fail(self): + def test_bind_fail(self) -> None: for address_family, address in [(socket.AF_INET, "::1"), (socket.AF_INET6, "127.0.0.1")]: with socket.socket(address_family, socket.SOCK_STREAM) as sock: @@ -143,7 +168,7 @@ class TestBaseServerRequests(BaseTest): errno.EADDRNOTAVAIL, errno.EAFNOSUPPORT, errno.EPROTONOSUPPORT)) - def test_ipv6(self): + def test_ipv6(self) -> None: try: with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as sock: # Only allow IPv6 connections to the IPv6 socket @@ -162,7 +187,7 @@ class TestBaseServerRequests(BaseTest): self.thread.start() self.get("/", check=302) - def test_command_line_interface(self): + def test_command_line_interface(self) -> None: config_args = [] for section, values in config.DEFAULT_CONFIG_SCHEMA.items(): if section.startswith("_"): @@ -172,13 +197,14 @@ class TestBaseServerRequests(BaseTest): continue long_name = "--%s-%s" % (section, option.replace("_", "-")) if data["type"] == bool: - if not self.configuration.get(section, option): + if not cast(bool, self.configuration.get(section, option)): long_name = "--no%s" % long_name[1:] config_args.append(long_name) else: config_args.append(long_name) - config_args.append( - self.configuration.get_raw(section, option)) + raw_value = self.configuration.get_raw(section, option) + assert isinstance(raw_value, str) + config_args.append(raw_value) p = subprocess.Popen( [sys.executable, "-m", "radicale"] + config_args, env={**os.environ, "PYTHONPATH": os.pathsep.join(sys.path)}) @@ -190,7 +216,7 @@ class TestBaseServerRequests(BaseTest): if os.name == "posix": assert p.returncode == 0 - def test_wsgi_server(self): + def test_wsgi_server(self) -> None: config_path = os.path.join(self.colpath, "config") parser = RawConfigParser() parser.read_dict(configuration_to_dict(self.configuration)) @@ -199,9 +225,10 @@ class TestBaseServerRequests(BaseTest): env = os.environ.copy() env["PYTHONPATH"] = os.pathsep.join(sys.path) env["RADICALE_CONFIG"] = config_path + raw_server_hosts = self.configuration.get_raw("server", "hosts") + assert isinstance(raw_server_hosts, str) p = subprocess.Popen([ - sys.executable, "-m", "waitress", - "--listen", self.configuration.get_raw("server", "hosts"), + sys.executable, "-m", "waitress", "--listen", raw_server_hosts, "radicale:application"], env=env) try: self.get("/", is_alive_fn=lambda: p.poll() is None, check=302) diff --git a/radicale/tests/test_web.py b/radicale/tests/test_web.py index 5d2b792..2dc599e 100644 --- a/radicale/tests/test_web.py +++ b/radicale/tests/test_web.py @@ -19,30 +19,14 @@ Test web plugin. """ -import shutil -import tempfile - -from radicale import Application, config +from radicale import Application from radicale.tests import BaseTest class TestBaseWebRequests(BaseTest): """Test web plugin.""" - def setup(self): - self.configuration = config.load() - self.colpath = tempfile.mkdtemp() - self.configuration.update({ - "storage": {"filesystem_folder": self.colpath, - # Disable syncing to disk for better performance - "_filesystem_fsync": "False"}}, - "test", privileged=True) - self.application = Application(self.configuration) - - def teardown(self): - shutil.rmtree(self.colpath) - - def test_internal(self): + def test_internal(self) -> None: status, headers, _ = self.request("GET", "/.web") assert status == 302 assert headers.get("Location") == ".web/" @@ -50,7 +34,7 @@ class TestBaseWebRequests(BaseTest): assert answer self.post("/.web", check=405) - def test_none(self): + def test_none(self) -> None: self.configuration.update({"web": {"type": "none"}}, "test") self.application = Application(self.configuration) _, answer = self.get("/.web") @@ -58,7 +42,7 @@ class TestBaseWebRequests(BaseTest): self.get("/.web/", check=404) self.post("/.web", check=405) - def test_custom(self): + def test_custom(self) -> None: """Custom web plugin.""" self.configuration.update({ "web": {"type": "radicale.tests.custom.web"}}, "test")