Extract method configure

This commit is contained in:
Unrud 2021-12-10 20:54:04 +01:00
parent 208ae11683
commit 4b5165dc42
6 changed files with 48 additions and 84 deletions

View File

@ -32,7 +32,7 @@ from typing import Any, Dict, List, Optional, Tuple, Union
import defusedxml.ElementTree as DefusedET import defusedxml.ElementTree as DefusedET
import radicale import radicale
from radicale import app, config, xmlutils from radicale import app, config, types, xmlutils
RESPONSES = Dict[str, Union[int, Dict[str, Tuple[int, ET.Element]]]] RESPONSES = Dict[str, Union[int, Dict[str, Tuple[int, ET.Element]]]]
@ -50,12 +50,15 @@ class BaseTest:
def setup(self) -> None: def setup(self) -> None:
self.configuration = config.load() self.configuration = config.load()
self.colpath = tempfile.mkdtemp() self.colpath = tempfile.mkdtemp()
self.configuration.update({ self.configure({
"storage": {"filesystem_folder": self.colpath, "storage": {"filesystem_folder": self.colpath,
# Disable syncing to disk for better performance # Disable syncing to disk for better performance
"_filesystem_fsync": "False"}, "_filesystem_fsync": "False"},
# Set incorrect authentication delay to a short duration # Set incorrect authentication delay to a short duration
"auth": {"delay": "0.001"}}, "test", privileged=True) "auth": {"delay": "0.001"}})
def configure(self, config_: types.CONFIG) -> None:
self.configuration.update(config_, "test", privileged=True)
self.application = app.Application(self.configuration) self.application = app.Application(self.configuration)
def teardown(self) -> None: def teardown(self) -> None:

View File

@ -27,7 +27,7 @@ from typing import Iterable, Tuple, Union
import pytest import pytest
from radicale import Application, xmlutils from radicale import xmlutils
from radicale.tests import BaseTest from radicale.tests import BaseTest
@ -58,11 +58,9 @@ class TestBaseAuthRequests(BaseTest):
encoding: str = self.configuration.get("encoding", "stock") encoding: str = self.configuration.get("encoding", "stock")
with open(htpasswd_file_path, "w", encoding=encoding) as f: with open(htpasswd_file_path, "w", encoding=encoding) as f:
f.write(htpasswd_content) f.write(htpasswd_content)
self.configuration.update({ self.configure({"auth": {"type": "htpasswd",
"auth": {"type": "htpasswd", "htpasswd_filename": htpasswd_file_path,
"htpasswd_filename": htpasswd_file_path, "htpasswd_encryption": htpasswd_encryption}})
"htpasswd_encryption": htpasswd_encryption}}, "test")
self.application = Application(self.configuration)
if test_matrix == "ascii": if test_matrix == "ascii":
test_matrix = (("tmp", "bepo", True), ("tmp", "tmp", False), test_matrix = (("tmp", "bepo", True), ("tmp", "tmp", False),
("tmp", "", False), ("unk", "unk", False), ("tmp", "", False), ("unk", "unk", False),
@ -121,8 +119,7 @@ class TestBaseAuthRequests(BaseTest):
self._test_htpasswd("plain", "#comment\n #comment\n \ntmp:bepo\n\n") self._test_htpasswd("plain", "#comment\n #comment\n \ntmp:bepo\n\n")
def test_remote_user(self) -> None: def test_remote_user(self) -> None:
self.configuration.update({"auth": {"type": "remote_user"}}, "test") self.configure({"auth": {"type": "remote_user"}})
self.application = Application(self.configuration)
_, responses = self.propfind("/", """\ _, responses = self.propfind("/", """\
<?xml version="1.0" encoding="utf-8"?> <?xml version="1.0" encoding="utf-8"?>
<propfind xmlns="DAV:"> <propfind xmlns="DAV:">
@ -139,9 +136,7 @@ class TestBaseAuthRequests(BaseTest):
assert href_element is not None and href_element.text == "/test/" assert href_element is not None and href_element.text == "/test/"
def test_http_x_remote_user(self) -> None: def test_http_x_remote_user(self) -> None:
self.configuration.update( self.configure({"auth": {"type": "http_x_remote_user"}})
{"auth": {"type": "http_x_remote_user"}}, "test")
self.application = Application(self.configuration)
_, responses = self.propfind("/", """\ _, responses = self.propfind("/", """\
<?xml version="1.0" encoding="utf-8"?> <?xml version="1.0" encoding="utf-8"?>
<propfind xmlns="DAV:"> <propfind xmlns="DAV:">
@ -159,7 +154,5 @@ class TestBaseAuthRequests(BaseTest):
def test_custom(self) -> None: def test_custom(self) -> None:
"""Custom authentication.""" """Custom authentication."""
self.configuration.update( self.configure({"auth": {"type": "radicale.tests.custom.auth"}})
{"auth": {"type": "radicale.tests.custom.auth"}}, "test")
self.application = Application(self.configuration)
self.propfind("/tmp/", login="tmp:") self.propfind("/tmp/", login="tmp:")

View File

@ -31,7 +31,7 @@ import defusedxml.ElementTree as DefusedET
import pytest import pytest
import radicale.tests.custom.storage_simple_sync import radicale.tests.custom.storage_simple_sync
from radicale import Application, config, storage, xmlutils from radicale import config, storage, xmlutils
from radicale.tests import RESPONSES, BaseTest from radicale.tests import RESPONSES, BaseTest
from radicale.tests.helpers import get_file_content from radicale.tests.helpers import get_file_content
@ -1544,12 +1544,10 @@ class BaseRequestsMixIn(BaseTest):
def test_authentication(self) -> None: def test_authentication(self) -> None:
"""Test if server sends authentication request.""" """Test if server sends authentication request."""
self.configuration.update({ self.configure({"auth": {"type": "htpasswd",
"auth": {"type": "htpasswd", "htpasswd_filename": os.devnull,
"htpasswd_filename": os.devnull, "htpasswd_encryption": "plain"},
"htpasswd_encryption": "plain"}, "rights": {"type": "owner_only"}})
"rights": {"type": "owner_only"}}, "test")
self.application = Application(self.configuration)
status, headers, _ = self.request("MKCOL", "/user/") status, headers, _ = self.request("MKCOL", "/user/")
assert status in (401, 403) assert status in (401, 403)
assert headers.get("WWW-Authenticate") assert headers.get("WWW-Authenticate")
@ -1580,8 +1578,7 @@ class BaseRequestsMixIn(BaseTest):
self.propfind("/") self.propfind("/")
def test_custom_headers(self) -> None: def test_custom_headers(self) -> None:
self.configuration.update({"headers": {"test": "123"}}, "test") self.configure({"headers": {"test": "123"}})
self.application = Application(self.configuration)
# Test if header is set on success # Test if header is set on success
status, headers, _ = self.request("OPTIONS", "/") status, headers, _ = self.request("OPTIONS", "/")
assert status == 200 assert status == 200
@ -1615,11 +1612,9 @@ class BaseStorageTest(BaseTest):
user: .* user: .*
collection: .* collection: .*
permissions: RrWw""") permissions: RrWw""")
self.configuration.update({ self.configure({"storage": {"type": self.storage_type},
"storage": {"type": self.storage_type}, "rights": {"file": rights_file_path,
"rights": {"file": rights_file_path, "type": "from_file"}})
"type": "from_file"}}, "test", privileged=True)
self.application = Application(self.configuration)
class TestMultiFileSystem(BaseStorageTest, BaseRequestsMixIn): class TestMultiFileSystem(BaseStorageTest, BaseRequestsMixIn):
@ -1630,33 +1625,25 @@ class TestMultiFileSystem(BaseStorageTest, BaseRequestsMixIn):
def test_folder_creation(self) -> None: def test_folder_creation(self) -> None:
"""Verify that the folder is created.""" """Verify that the folder is created."""
folder = os.path.join(self.colpath, "subfolder") folder = os.path.join(self.colpath, "subfolder")
self.configuration.update( self.configure({"storage": {"filesystem_folder": folder}})
{"storage": {"filesystem_folder": folder}}, "test")
self.application = Application(self.configuration)
assert os.path.isdir(folder) assert os.path.isdir(folder)
def test_fsync(self) -> None: def test_fsync(self) -> None:
"""Create a directory and file with syncing enabled.""" """Create a directory and file with syncing enabled."""
self.configuration.update({"storage": {"_filesystem_fsync": "True"}}, self.configure({"storage": {"_filesystem_fsync": "True"}})
"test", privileged=True)
self.application = Application(self.configuration)
self.mkcalendar("/calendar.ics/") self.mkcalendar("/calendar.ics/")
def test_hook(self) -> None: def test_hook(self) -> None:
"""Run hook.""" """Run hook."""
self.configuration.update({"storage": { self.configure({"storage": {"hook": "mkdir %s" % os.path.join(
"hook": ("mkdir %s" % os.path.join( "collection-root", "created_by_hook")}})
"collection-root", "created_by_hook"))}}, "test")
self.application = Application(self.configuration)
self.mkcalendar("/calendar.ics/") self.mkcalendar("/calendar.ics/")
self.propfind("/created_by_hook/") self.propfind("/created_by_hook/")
def test_hook_read_access(self) -> None: def test_hook_read_access(self) -> None:
"""Verify that hook is not run for read accesses.""" """Verify that hook is not run for read accesses."""
self.configuration.update({"storage": { self.configure({"storage": {"hook": "mkdir %s" % os.path.join(
"hook": ("mkdir %s" % os.path.join( "collection-root", "created_by_hook")}})
"collection-root", "created_by_hook"))}}, "test")
self.application = Application(self.configuration)
self.propfind("/") self.propfind("/")
self.propfind("/created_by_hook/", check=404) self.propfind("/created_by_hook/", check=404)
@ -1664,24 +1651,20 @@ class TestMultiFileSystem(BaseStorageTest, BaseRequestsMixIn):
reason="flock command not found") reason="flock command not found")
def test_hook_storage_locked(self) -> None: def test_hook_storage_locked(self) -> None:
"""Verify that the storage is locked when the hook runs.""" """Verify that the storage is locked when the hook runs."""
self.configuration.update({"storage": {"hook": ( self.configure({"storage": {"hook": (
"flock -n .Radicale.lock || exit 0; exit 1")}}, "test") "flock -n .Radicale.lock || exit 0; exit 1")}})
self.application = Application(self.configuration)
self.mkcalendar("/calendar.ics/") self.mkcalendar("/calendar.ics/")
def test_hook_principal_collection_creation(self) -> None: def test_hook_principal_collection_creation(self) -> None:
"""Verify that the hooks runs when a new user is created.""" """Verify that the hooks runs when a new user is created."""
self.configuration.update({"storage": { self.configure({"storage": {"hook": "mkdir %s" % os.path.join(
"hook": ("mkdir %s" % os.path.join( "collection-root", "created_by_hook")}})
"collection-root", "created_by_hook"))}}, "test")
self.application = Application(self.configuration)
self.propfind("/", login="user:") self.propfind("/", login="user:")
self.propfind("/created_by_hook/") self.propfind("/created_by_hook/")
def test_hook_fail(self) -> None: def test_hook_fail(self) -> None:
"""Verify that a request fails if the hook fails.""" """Verify that a request fails if the hook fails."""
self.configuration.update({"storage": {"hook": "exit 1"}}, "test") self.configure({"storage": {"hook": "exit 1"}})
self.application = Application(self.configuration)
self.mkcalendar("/calendar.ics/", check=500) self.mkcalendar("/calendar.ics/", check=500)
def test_item_cache_rebuild(self) -> None: def test_item_cache_rebuild(self) -> None:

View File

@ -20,7 +20,6 @@ Radicale tests with simple requests and rights.
import os import os
from radicale import Application
from radicale.tests import BaseTest from radicale.tests import BaseTest
from radicale.tests.helpers import get_file_content from radicale.tests.helpers import get_file_content
@ -35,12 +34,11 @@ class TestBaseRightsRequests(BaseTest):
htpasswd_file_path = os.path.join(self.colpath, ".htpasswd") htpasswd_file_path = os.path.join(self.colpath, ".htpasswd")
with open(htpasswd_file_path, "w") as f: with open(htpasswd_file_path, "w") as f:
f.write("tmp:bepo\nother:bepo") f.write("tmp:bepo\nother:bepo")
self.configuration.update({ self.configure({
"rights": {"type": rights_type}, "rights": {"type": rights_type},
"auth": {"type": "htpasswd" if with_auth else "none", "auth": {"type": "htpasswd" if with_auth else "none",
"htpasswd_filename": htpasswd_file_path, "htpasswd_filename": htpasswd_file_path,
"htpasswd_encryption": "plain"}}, "test") "htpasswd_encryption": "plain"}})
self.application = Application(self.configuration)
for u in ("tmp", "other"): for u in ("tmp", "other"):
# Indirect creation of principal collection # Indirect creation of principal collection
self.propfind("/%s/" % u, login="%s:bepo" % u) self.propfind("/%s/" % u, login="%s:bepo" % u)
@ -113,8 +111,7 @@ permissions: RrWw
user: .* user: .*
collection: custom(/.*)? collection: custom(/.*)?
permissions: Rr""") permissions: Rr""")
self.configuration.update( self.configure({"rights": {"file": rights_file_path}})
{"rights": {"file": rights_file_path}}, "test")
self._test_rights("from_file", "", "/other/", "r", 401) self._test_rights("from_file", "", "/other/", "r", 401)
self._test_rights("from_file", "tmp", "/other/", "r", 403) self._test_rights("from_file", "tmp", "/other/", "r", 403)
self._test_rights("from_file", "", "/custom/sub", "r", 404) self._test_rights("from_file", "", "/custom/sub", "r", 404)
@ -134,10 +131,8 @@ permissions: RrWw
user: .* user: .*
collection: public/[^/]* collection: public/[^/]*
permissions: i""") permissions: i""")
self.configuration.update( self.configure({"rights": {"type": "from_file",
{"rights": {"type": "from_file", "file": rights_file_path}})
"file": rights_file_path}}, "test")
self.application = Application(self.configuration)
self.mkcalendar("/tmp/calendar", login="tmp:bepo") self.mkcalendar("/tmp/calendar", login="tmp:bepo")
self.mkcol("/public", login="tmp:bepo") self.mkcol("/public", login="tmp:bepo")
self.mkcalendar("/public/calendar", login="tmp:bepo") self.mkcalendar("/public/calendar", login="tmp:bepo")
@ -160,7 +155,6 @@ permissions: i""")
Items are allowed at "/.../.../...". Items are allowed at "/.../.../...".
""" """
self.application = Application(self.configuration)
self.mkcalendar("/", check=401) self.mkcalendar("/", check=401)
self.mkcalendar("/user/", check=401) self.mkcalendar("/user/", check=401)
self.mkcol("/user/") self.mkcol("/user/")
@ -171,7 +165,6 @@ permissions: i""")
def test_put_collections_and_items(self) -> None: def test_put_collections_and_items(self) -> None:
"""Test rights for creation of calendars and items with PUT.""" """Test rights for creation of calendars and items with PUT."""
self.application = Application(self.configuration)
self.put("/user/", "BEGIN:VCALENDAR\r\nEND:VCALENDAR", check=401) self.put("/user/", "BEGIN:VCALENDAR\r\nEND:VCALENDAR", check=401)
self.mkcol("/user/") self.mkcol("/user/")
self.put("/user/calendar/", "BEGIN:VCALENDAR\r\nEND:VCALENDAR") self.put("/user/calendar/", "BEGIN:VCALENDAR\r\nEND:VCALENDAR")

View File

@ -76,11 +76,9 @@ class TestBaseServerRequests(BaseTest):
# Find available port # Find available port
sock.bind(("127.0.0.1", 0)) sock.bind(("127.0.0.1", 0))
self.sockname = sock.getsockname() self.sockname = sock.getsockname()
self.configuration.update({ self.configure({"server": {"hosts": "[%s]:%d" % self.sockname},
"server": {"hosts": "[%s]:%d" % self.sockname}, # Enable debugging for new processes
# Enable debugging for new processes "logging": {"level": "debug"}})
"logging": {"level": "debug"}},
"test", privileged=True)
self.thread = threading.Thread(target=server.serve, args=( self.thread = threading.Thread(target=server.serve, args=(
self.configuration, shutdown_socket_out)) self.configuration, shutdown_socket_out))
ssl_context = ssl.create_default_context() ssl_context = ssl.create_default_context()
@ -141,10 +139,9 @@ class TestBaseServerRequests(BaseTest):
self.get("/", check=302) self.get("/", check=302)
def test_ssl(self) -> None: def test_ssl(self) -> None:
self.configuration.update({ self.configure({"server": {"ssl": "True",
"server": {"ssl": "True", "certificate": get_file_path("cert.pem"),
"certificate": get_file_path("cert.pem"), "key": get_file_path("key.pem")}})
"key": get_file_path("key.pem")}}, "test")
self.thread.start() self.thread.start()
self.get("/", check=302) self.get("/", check=302)
@ -182,13 +179,12 @@ class TestBaseServerRequests(BaseTest):
errno.EPROTONOSUPPORT): errno.EPROTONOSUPPORT):
pytest.skip("IPv6 not supported") pytest.skip("IPv6 not supported")
raise raise
self.configuration.update({ self.configure({"server": {"hosts": "[%s]:%d" % self.sockname}})
"server": {"hosts": "[%s]:%d" % self.sockname}}, "test")
self.thread.start() self.thread.start()
self.get("/", check=302) self.get("/", check=302)
def test_command_line_interface(self, with_bool_options=False) -> None: def test_command_line_interface(self, with_bool_options=False) -> None:
self.configuration.update({"headers": {"Test-Server": "test"}}) self.configure({"headers": {"Test-Server": "test"}})
config_args = [] config_args = []
for section in self.configuration.sections(): for section in self.configuration.sections():
if section.startswith("_"): if section.startswith("_"):

View File

@ -19,7 +19,6 @@ Test web plugin.
""" """
from radicale import Application
from radicale.tests import BaseTest from radicale.tests import BaseTest
@ -35,8 +34,7 @@ class TestBaseWebRequests(BaseTest):
self.post("/.web", check=405) self.post("/.web", check=405)
def test_none(self) -> None: def test_none(self) -> None:
self.configuration.update({"web": {"type": "none"}}, "test") self.configure({"web": {"type": "none"}})
self.application = Application(self.configuration)
_, answer = self.get("/.web") _, answer = self.get("/.web")
assert answer assert answer
self.get("/.web/", check=404) self.get("/.web/", check=404)
@ -44,9 +42,7 @@ class TestBaseWebRequests(BaseTest):
def test_custom(self) -> None: def test_custom(self) -> None:
"""Custom web plugin.""" """Custom web plugin."""
self.configuration.update({ self.configure({"web": {"type": "radicale.tests.custom.web"}})
"web": {"type": "radicale.tests.custom.web"}}, "test")
self.application = Application(self.configuration)
_, answer = self.get("/.web") _, answer = self.get("/.web")
assert answer == "custom" assert answer == "custom"
_, answer = self.post("/.web", "body content") _, answer = self.post("/.web", "body content")