From 5ce2c624022ee00d007959f20d23eb6184b7958b Mon Sep 17 00:00:00 2001 From: Unrud Date: Wed, 31 May 2017 00:36:18 +0200 Subject: [PATCH 1/2] Implement rights types directly This is faster and easier to understand. --- radicale/rights.py | 94 ++++++++++++++++++++++------------------------ 1 file changed, 44 insertions(+), 50 deletions(-) diff --git a/radicale/rights.py b/radicale/rights.py index 72837e6..2b7def8 100644 --- a/radicale/rights.py +++ b/radicale/rights.py @@ -41,51 +41,29 @@ import os.path import re from configparser import ConfigParser from importlib import import_module -from io import StringIO from . import storage def load(configuration, logger): """Load the rights manager chosen in configuration.""" - auth_type = configuration.get("auth", "type") rights_type = configuration.get("rights", "type") - if auth_type == "None" or rights_type == "None": - return lambda user, collection, permission: True - elif rights_type in DEFINED_RIGHTS or rights_type == "from_file": - return Rights(configuration, logger).authorized + if configuration.get("auth", "type") == "None": + rights_type = "None" + logger.info("Rights type is %r", rights_type) + if rights_type == "None": + rights_class = NoneRights + elif rights_type == "authenticated": + rights_class = AuthenticatedRights + elif rights_type == "owner_write": + rights_class = OwnerWriteRights + elif rights_type == "owner_only": + rights_class = OwnerOnlyRights + elif rights_type == "from_file": + rights_class = Rights else: - module = import_module(rights_type) - return module.Rights(configuration, logger).authorized - - -DEFINED_RIGHTS = { - "authenticated": """ -[rw] -user:.+ -collection:.* -permission:rw - """, - "owner_write": """ -[w] -user:.+ -collection:%(login)s(/.*)? -permission:rw -[r] -user:.+ -collection:.* -permission:r - """, - "owner_only": """ -[rw] -user:.+ -collection:%(login)s(/.*)? -permission:rw -[r] -user:.+ -collection: -permission:r - """} + rights_class = import_module(rights_type).Rights + return rights_class(configuration, logger).authorized class BaseRights: @@ -102,32 +80,48 @@ class BaseRights: raise NotImplementedError +class NoneRights(BaseRights): + def authorized(self, user, path, permission): + return True + + +class AuthenticatedRights(BaseRights): + def authorized(self, user, path, permission): + return bool(user) + + +class OwnerWriteRights(BaseRights): + def authorized(self, user, path, permission): + sane_path = storage.sanitize_path(path).strip("/") + return bool(user) and (permission == "r" or + user == sane_path.split("/", maxsplit=1)[0]) + + +class OwnerOnlyRights(BaseRights): + def authorized(self, user, path, permission): + sane_path = storage.sanitize_path(path).strip("/") + return bool(user) and ( + permission == "r" and not sane_path.strip("/") or + user == sane_path.split("/", maxsplit=1)[0]) + + class Rights(BaseRights): def __init__(self, configuration, logger): super().__init__(configuration, logger) self.filename = os.path.expanduser(configuration.get("rights", "file")) - self.rights_type = configuration.get("rights", "type").lower() def authorized(self, user, path, permission): user = user or "" - if user and not storage.is_safe_path_component(user): - # Prevent usernames like "user/calendar.ics" - raise ValueError("Refused unsafe username: %s", user) sane_path = storage.sanitize_path(path).strip("/") # Prevent "regex injection" user_escaped = re.escape(user) sane_path_escaped = re.escape(sane_path) regex = ConfigParser( {"login": user_escaped, "path": sane_path_escaped}) - if self.rights_type in DEFINED_RIGHTS: - self.logger.debug("Rights type '%s'", self.rights_type) - regex.readfp(StringIO(DEFINED_RIGHTS[self.rights_type])) - else: - self.logger.debug("Reading rights from file '%s'", self.filename) - if not regex.read(self.filename): - self.logger.error( - "File '%s' not found for rights", self.filename) - return False + if not regex.read(self.filename): + raise RuntimeError("Failed to read rights from file %r", + self.filename) + return False for section in regex.sections(): re_user = regex.get(section, "user") From 3e715a9aff39ad6c1c95b089cf47af18ddeab289 Mon Sep 17 00:00:00 2001 From: Unrud Date: Wed, 31 May 2017 00:36:20 +0200 Subject: [PATCH 2/2] Test rights management --- radicale/tests/custom/rights.py | 27 +++++++ radicale/tests/test_rights.py | 139 ++++++++++++++++++++++++++++++++ 2 files changed, 166 insertions(+) create mode 100644 radicale/tests/custom/rights.py create mode 100644 radicale/tests/test_rights.py diff --git a/radicale/tests/custom/rights.py b/radicale/tests/custom/rights.py new file mode 100644 index 0000000..8fdda24 --- /dev/null +++ b/radicale/tests/custom/rights.py @@ -0,0 +1,27 @@ +# This file is part of Radicale Server - Calendar Server +# Copyright (C) 2017 Unrud +# +# This library is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Radicale. If not, see . + +""" +Custom rights management. + +""" + +from radicale import rights + + +class Rights(rights.BaseRights): + def authorized(self, user, path, permission): + return path.strip("/") in ("tmp", "other") diff --git a/radicale/tests/test_rights.py b/radicale/tests/test_rights.py new file mode 100644 index 0000000..f332f5b --- /dev/null +++ b/radicale/tests/test_rights.py @@ -0,0 +1,139 @@ +# This file is part of Radicale Server - Calendar Server +# Copyright (C) 2017 Unrud +# +# This library is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Radicale. If not, see . + +""" +Radicale tests with simple requests and rights. +""" + +import base64 +import os +import shutil +import tempfile + +from radicale import Application, config + +from .test_base import BaseTest + + +class TestBaseAuthRequests(BaseTest): + """Tests basic requests with rights.""" + + def setup(self): + self.configuration = config.load() + self.colpath = tempfile.mkdtemp() + self.configuration.set("storage", "filesystem_folder", self.colpath) + # Disable syncing to disk for better performance + self.configuration.set("storage", "filesystem_fsync", "False") + # Required on Windows, doesn't matter on Unix + self.configuration.set("storage", "close_lock_file", "True") + + def teardown(self): + shutil.rmtree(self.colpath) + + def _test_rights(self, rights_type, user, path, mode, expected_status): + assert mode in ("r", "w") + assert user in ("", "tmp") + htpasswd_file_path = os.path.join(self.colpath, ".htpasswd") + with open(htpasswd_file_path, "w") as f: + f.write("tmp:bepo\nother:bepo") + self.configuration.set("rights", "type", rights_type) + self.configuration.set("auth", "type", "htpasswd") + self.configuration.set("auth", "htpasswd_filename", htpasswd_file_path) + self.configuration.set("auth", "htpasswd_encryption", "plain") + self.application = Application(self.configuration, self.logger) + for u in ("tmp", "other"): + status, _, _ = self.request( + "PROPFIND", "/%s" % u, HTTP_AUTHORIZATION="Basic %s" % + base64.b64encode(("%s:bepo" % u).encode()).decode()) + assert status == 207 + status, _, _ = self.request( + "PROPFIND" if mode == "r" else "PROPPATCH", path, + HTTP_AUTHORIZATION="Basic %s" % base64.b64encode( + ("tmp:bepo").encode()).decode() if user else "") + assert status == expected_status + + def test_owner_only(self): + self._test_rights("owner_only", "", "/", "r", 401) + self._test_rights("owner_only", "", "/", "w", 401) + self._test_rights("owner_only", "", "/tmp", "r", 401) + self._test_rights("owner_only", "", "/tmp", "w", 401) + self._test_rights("owner_only", "tmp", "/", "r", 207) + self._test_rights("owner_only", "tmp", "/", "w", 403) + self._test_rights("owner_only", "tmp", "/tmp", "r", 207) + self._test_rights("owner_only", "tmp", "/tmp", "w", 207) + self._test_rights("owner_only", "tmp", "/other", "r", 403) + self._test_rights("owner_only", "tmp", "/other", "w", 403) + + def test_owner_write(self): + self._test_rights("owner_write", "", "/", "r", 401) + self._test_rights("owner_write", "", "/", "w", 401) + self._test_rights("owner_write", "", "/tmp", "r", 401) + self._test_rights("owner_write", "", "/tmp", "w", 401) + self._test_rights("owner_write", "tmp", "/", "r", 207) + self._test_rights("owner_write", "tmp", "/", "w", 403) + self._test_rights("owner_write", "tmp", "/tmp", "r", 207) + self._test_rights("owner_write", "tmp", "/tmp", "w", 207) + self._test_rights("owner_write", "tmp", "/other", "r", 207) + self._test_rights("owner_write", "tmp", "/other", "w", 403) + + def test_authenticated(self): + self._test_rights("authenticated", "", "/", "r", 401) + self._test_rights("authenticated", "", "/", "w", 401) + self._test_rights("authenticated", "", "/tmp", "r", 401) + self._test_rights("authenticated", "", "/tmp", "w", 401) + self._test_rights("authenticated", "tmp", "/", "r", 207) + self._test_rights("authenticated", "tmp", "/", "w", 207) + self._test_rights("authenticated", "tmp", "/tmp", "r", 207) + self._test_rights("authenticated", "tmp", "/tmp", "w", 207) + self._test_rights("authenticated", "tmp", "/other", "r", 207) + self._test_rights("authenticated", "tmp", "/other", "w", 207) + + def test_none(self): + self._test_rights("None", "", "/", "r", 207) + self._test_rights("None", "", "/", "w", 207) + self._test_rights("None", "", "/tmp", "r", 207) + self._test_rights("None", "", "/tmp", "w", 207) + self._test_rights("None", "tmp", "/", "r", 207) + self._test_rights("None", "tmp", "/", "w", 207) + self._test_rights("None", "tmp", "/tmp", "r", 207) + self._test_rights("None", "tmp", "/tmp", "w", 207) + self._test_rights("None", "tmp", "/other", "r", 207) + self._test_rights("None", "tmp", "/other", "w", 207) + + def test_from_file(self): + rights_file_path = os.path.join(self.colpath, "rights") + with open(rights_file_path, "w") as f: + f.write("""\ +[owner] +user: .+ +collection: %(login)s(/.*)? +permission: rw +[custom] +user: .* +collection: custom(/.*)? +permission: r""") + self.configuration.set("rights", "file", rights_file_path) + self._test_rights("from_file", "", "/other", "r", 401) + self._test_rights("from_file", "tmp", "/other", "r", 403) + self._test_rights("from_file", "", "/custom/sub", "r", 404) + self._test_rights("from_file", "tmp", "/custom/sub", "r", 404) + self._test_rights("from_file", "", "/custom/sub", "w", 401) + self._test_rights("from_file", "tmp", "/custom/sub", "w", 403) + + def test_custom(self): + """Custom rights management.""" + self._test_rights("tests.custom.rights", "", "/", "r", 401) + self._test_rights("tests.custom.rights", "", "/tmp", "r", 207)