Simplify Rights plugin interface
This commit is contained in:
parent
8ca01a4989
commit
f6a3a19680
@ -264,7 +264,7 @@ class Application(
|
|||||||
# Create principal collection
|
# Create principal collection
|
||||||
if user:
|
if user:
|
||||||
principal_path = "/%s/" % user
|
principal_path = "/%s/" % user
|
||||||
if self._rights.authorized(user, principal_path, "W"):
|
if "W" in self._rights.authorization(user, principal_path):
|
||||||
with self._storage.acquire_lock("r", user):
|
with self._storage.acquire_lock("r", user):
|
||||||
principal = next(
|
principal = next(
|
||||||
self._storage.discover(principal_path, depth="1"),
|
self._storage.discover(principal_path, depth="1"),
|
||||||
@ -328,12 +328,14 @@ class Application(
|
|||||||
else:
|
else:
|
||||||
permissions = ""
|
permissions = ""
|
||||||
parent_permissions = permission
|
parent_permissions = permission
|
||||||
if permissions and self._rights.authorized(user, path, permissions):
|
if permissions and rights.intersect(
|
||||||
|
self._rights.authorization(user, path), permissions):
|
||||||
return True
|
return True
|
||||||
if parent_permissions:
|
if parent_permissions:
|
||||||
parent_path = pathutils.unstrip_path(
|
parent_path = pathutils.unstrip_path(
|
||||||
posixpath.dirname(pathutils.strip_path(path)), True)
|
posixpath.dirname(pathutils.strip_path(path)), True)
|
||||||
if self._rights.authorized(user, parent_path, parent_permissions):
|
if rights.intersect(self._rights.authorization(user, parent_path),
|
||||||
|
parent_permissions):
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@ -30,7 +30,7 @@ from radicale.log import logger
|
|||||||
class ApplicationMkcalendarMixin:
|
class ApplicationMkcalendarMixin:
|
||||||
def do_MKCALENDAR(self, environ, base_prefix, path, user):
|
def do_MKCALENDAR(self, environ, base_prefix, path, user):
|
||||||
"""Manage MKCALENDAR request."""
|
"""Manage MKCALENDAR request."""
|
||||||
if not self._rights.authorized(user, path, "w"):
|
if "w" not in self._rights.authorization(user, path):
|
||||||
return httputils.NOT_ALLOWED
|
return httputils.NOT_ALLOWED
|
||||||
try:
|
try:
|
||||||
xml_content = self._read_xml_content(environ)
|
xml_content = self._read_xml_content(environ)
|
||||||
|
@ -23,14 +23,15 @@ from http import client
|
|||||||
|
|
||||||
from radicale import httputils
|
from radicale import httputils
|
||||||
from radicale import item as radicale_item
|
from radicale import item as radicale_item
|
||||||
from radicale import pathutils, storage, xmlutils
|
from radicale import pathutils, rights, storage, xmlutils
|
||||||
from radicale.log import logger
|
from radicale.log import logger
|
||||||
|
|
||||||
|
|
||||||
class ApplicationMkcolMixin:
|
class ApplicationMkcolMixin:
|
||||||
def do_MKCOL(self, environ, base_prefix, path, user):
|
def do_MKCOL(self, environ, base_prefix, path, user):
|
||||||
"""Manage MKCOL request."""
|
"""Manage MKCOL request."""
|
||||||
permissions = self._rights.authorized(user, path, "Ww")
|
permissions = rights.intersect(
|
||||||
|
self._rights.authorization(user, path), "Ww")
|
||||||
if not permissions:
|
if not permissions:
|
||||||
return httputils.NOT_ALLOWED
|
return httputils.NOT_ALLOWED
|
||||||
try:
|
try:
|
||||||
|
@ -314,19 +314,22 @@ class ApplicationPropfindMixin:
|
|||||||
if isinstance(item, storage.BaseCollection):
|
if isinstance(item, storage.BaseCollection):
|
||||||
path = pathutils.unstrip_path(item.path, True)
|
path = pathutils.unstrip_path(item.path, True)
|
||||||
if item.get_meta("tag"):
|
if item.get_meta("tag"):
|
||||||
permissions = self._rights.authorized(user, path, "rw")
|
permissions = rights.intersect(
|
||||||
|
self._rights.authorization(user, path), "rw")
|
||||||
target = "collection with tag %r" % item.path
|
target = "collection with tag %r" % item.path
|
||||||
else:
|
else:
|
||||||
permissions = self._rights.authorized(user, path, "RW")
|
permissions = rights.intersect(
|
||||||
|
self._rights.authorization(user, path), "RW")
|
||||||
target = "collection %r" % item.path
|
target = "collection %r" % item.path
|
||||||
else:
|
else:
|
||||||
path = pathutils.unstrip_path(item.collection.path, True)
|
path = pathutils.unstrip_path(item.collection.path, True)
|
||||||
permissions = self._rights.authorized(user, path, "rw")
|
permissions = rights.intersect(
|
||||||
|
self._rights.authorization(user, path), "rw")
|
||||||
target = "item %r from %r" % (item.href, item.collection.path)
|
target = "item %r from %r" % (item.href, item.collection.path)
|
||||||
if rights.intersect_permissions(permissions, "Ww"):
|
if rights.intersect(permissions, "Ww"):
|
||||||
permission = "w"
|
permission = "w"
|
||||||
status = "write"
|
status = "write"
|
||||||
elif rights.intersect_permissions(permissions, "Rr"):
|
elif rights.intersect(permissions, "Rr"):
|
||||||
permission = "r"
|
permission = "r"
|
||||||
status = "read"
|
status = "read"
|
||||||
else:
|
else:
|
||||||
|
@ -27,7 +27,7 @@ import vobject
|
|||||||
|
|
||||||
from radicale import httputils
|
from radicale import httputils
|
||||||
from radicale import item as radicale_item
|
from radicale import item as radicale_item
|
||||||
from radicale import pathutils, storage, xmlutils
|
from radicale import pathutils, rights, storage, xmlutils
|
||||||
from radicale.log import logger
|
from radicale.log import logger
|
||||||
|
|
||||||
MIMETYPE_TAGS = {value: key for key, value in xmlutils.MIMETYPES.items()}
|
MIMETYPE_TAGS = {value: key for key, value in xmlutils.MIMETYPES.items()}
|
||||||
@ -128,8 +128,10 @@ class ApplicationPutMixin:
|
|||||||
content_type = environ.get("CONTENT_TYPE", "").split(";")[0]
|
content_type = environ.get("CONTENT_TYPE", "").split(";")[0]
|
||||||
parent_path = pathutils.unstrip_path(
|
parent_path = pathutils.unstrip_path(
|
||||||
posixpath.dirname(pathutils.strip_path(path)), True)
|
posixpath.dirname(pathutils.strip_path(path)), True)
|
||||||
permissions = self._rights.authorized(user, path, "Ww")
|
permissions = rights.intersect(
|
||||||
parent_permissions = self._rights.authorized(user, parent_path, "w")
|
self._rights.authorization(user, path), "Ww")
|
||||||
|
parent_permissions = rights.intersect(
|
||||||
|
self._rights.authorization(user, parent_path), "w")
|
||||||
try:
|
try:
|
||||||
vobject_items = tuple(vobject.readComponents(content or ""))
|
vobject_items = tuple(vobject.readComponents(content or ""))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@ -157,10 +159,10 @@ class ApplicationPutMixin:
|
|||||||
tag = parent_item.get_meta("tag")
|
tag = parent_item.get_meta("tag")
|
||||||
|
|
||||||
if write_whole_collection:
|
if write_whole_collection:
|
||||||
if not self._rights.authorized(
|
if ("w" if tag else "W") not in self._rights.authorization(
|
||||||
user, path, "w" if tag else "W"):
|
user, path):
|
||||||
return httputils.NOT_ALLOWED
|
return httputils.NOT_ALLOWED
|
||||||
elif not self._rights.authorized(user, parent_path, "w"):
|
elif "w" not in self._rights.authorization(user, parent_path):
|
||||||
return httputils.NOT_ALLOWED
|
return httputils.NOT_ALLOWED
|
||||||
|
|
||||||
etag = environ.get("HTTP_IF_MATCH", "")
|
etag = environ.get("HTTP_IF_MATCH", "")
|
||||||
|
@ -40,7 +40,12 @@ def load(configuration):
|
|||||||
return utils.load_plugin(INTERNAL_TYPES, "rights", "Rights", configuration)
|
return utils.load_plugin(INTERNAL_TYPES, "rights", "Rights", configuration)
|
||||||
|
|
||||||
|
|
||||||
def intersect_permissions(a, b="RrWw"):
|
def intersect(a, b):
|
||||||
|
"""Intersect two lists of rights.
|
||||||
|
|
||||||
|
Returns all rights that are both in ``a`` and ``b``.
|
||||||
|
|
||||||
|
"""
|
||||||
return "".join(set(a).intersection(set(b)))
|
return "".join(set(a).intersection(set(b)))
|
||||||
|
|
||||||
|
|
||||||
@ -55,16 +60,14 @@ class BaseRights:
|
|||||||
"""
|
"""
|
||||||
self.configuration = configuration
|
self.configuration = configuration
|
||||||
|
|
||||||
def authorized(self, user, path, permissions):
|
def authorization(self, user, path):
|
||||||
"""Check if the user is allowed to read or write the collection.
|
"""Get granted rights of ``user`` for the collection ``path``.
|
||||||
|
|
||||||
If ``user`` is empty, check for anonymous rights.
|
If ``user`` is empty, check for anonymous rights.
|
||||||
|
|
||||||
``path`` is sanitized.
|
``path`` is sanitized.
|
||||||
|
|
||||||
``permissions`` can include "R", "r", "W", "w"
|
Returns granted rights (e.g. ``"RW"``).
|
||||||
|
|
||||||
Returns granted rights.
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
@ -29,12 +29,12 @@ class Rights(rights.BaseRights):
|
|||||||
super().__init__(*args, **kwargs)
|
super().__init__(*args, **kwargs)
|
||||||
self._verify_user = self.configuration.get("auth", "type") != "none"
|
self._verify_user = self.configuration.get("auth", "type") != "none"
|
||||||
|
|
||||||
def authorized(self, user, path, permissions):
|
def authorization(self, user, path):
|
||||||
if self._verify_user and not user:
|
if self._verify_user and not user:
|
||||||
return ""
|
return ""
|
||||||
sane_path = pathutils.strip_path(path)
|
sane_path = pathutils.strip_path(path)
|
||||||
if "/" not in sane_path:
|
if "/" not in sane_path:
|
||||||
return rights.intersect_permissions(permissions, "RW")
|
return "RW"
|
||||||
if sane_path.count("/") == 1:
|
if sane_path.count("/") == 1:
|
||||||
return rights.intersect_permissions(permissions, "rw")
|
return "rw"
|
||||||
return ""
|
return ""
|
||||||
|
@ -45,7 +45,7 @@ class Rights(rights.BaseRights):
|
|||||||
super().__init__(configuration)
|
super().__init__(configuration)
|
||||||
self._filename = configuration.get("rights", "file")
|
self._filename = configuration.get("rights", "file")
|
||||||
|
|
||||||
def authorized(self, user, path, permissions):
|
def authorization(self, user, path):
|
||||||
user = user or ""
|
user = user or ""
|
||||||
sane_path = pathutils.strip_path(path)
|
sane_path = pathutils.strip_path(path)
|
||||||
# Prevent "regex injection"
|
# Prevent "regex injection"
|
||||||
@ -75,8 +75,7 @@ class Rights(rights.BaseRights):
|
|||||||
logger.debug("Rule %r:%r matches %r:%r from section %r",
|
logger.debug("Rule %r:%r matches %r:%r from section %r",
|
||||||
user, sane_path, user_pattern,
|
user, sane_path, user_pattern,
|
||||||
collection_pattern, section)
|
collection_pattern, section)
|
||||||
return rights.intersect_permissions(
|
return rights_config.get(section, "permissions")
|
||||||
permissions, rights_config.get(section, "permissions"))
|
|
||||||
logger.debug("Rule %r:%r doesn't match %r:%r from section %r",
|
logger.debug("Rule %r:%r doesn't match %r:%r from section %r",
|
||||||
user, sane_path, user_pattern, collection_pattern,
|
user, sane_path, user_pattern, collection_pattern,
|
||||||
section)
|
section)
|
||||||
|
@ -22,20 +22,20 @@ calendars and address books.
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import radicale.rights.authenticated as authenticated
|
import radicale.rights.authenticated as authenticated
|
||||||
from radicale import pathutils, rights
|
from radicale import pathutils
|
||||||
|
|
||||||
|
|
||||||
class Rights(authenticated.Rights):
|
class Rights(authenticated.Rights):
|
||||||
def authorized(self, user, path, permissions):
|
def authorization(self, user, path):
|
||||||
if self._verify_user and not user:
|
if self._verify_user and not user:
|
||||||
return ""
|
return ""
|
||||||
sane_path = pathutils.strip_path(path)
|
sane_path = pathutils.strip_path(path)
|
||||||
if not sane_path:
|
if not sane_path:
|
||||||
return rights.intersect_permissions(permissions, "R")
|
return "R"
|
||||||
if self._verify_user and user != sane_path.split("/", maxsplit=1)[0]:
|
if self._verify_user and user != sane_path.split("/", maxsplit=1)[0]:
|
||||||
return ""
|
return ""
|
||||||
if "/" not in sane_path:
|
if "/" not in sane_path:
|
||||||
return rights.intersect_permissions(permissions, "RW")
|
return "RW"
|
||||||
if sane_path.count("/") == 1:
|
if sane_path.count("/") == 1:
|
||||||
return rights.intersect_permissions(permissions, "rw")
|
return "rw"
|
||||||
return ""
|
return ""
|
||||||
|
@ -22,24 +22,22 @@ address books but only grants write access to their own.
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import radicale.rights.authenticated as authenticated
|
import radicale.rights.authenticated as authenticated
|
||||||
from radicale import pathutils, rights
|
from radicale import pathutils
|
||||||
|
|
||||||
|
|
||||||
class Rights(authenticated.Rights):
|
class Rights(authenticated.Rights):
|
||||||
def authorized(self, user, path, permissions):
|
def authorization(self, user, path):
|
||||||
if self._verify_user and not user:
|
if self._verify_user and not user:
|
||||||
return ""
|
return ""
|
||||||
sane_path = pathutils.strip_path(path)
|
sane_path = pathutils.strip_path(path)
|
||||||
if not sane_path:
|
if not sane_path:
|
||||||
return rights.intersect_permissions(permissions, "R")
|
return "R"
|
||||||
if self._verify_user:
|
if self._verify_user:
|
||||||
owned = user == sane_path.split("/", maxsplit=1)[0]
|
owned = user == sane_path.split("/", maxsplit=1)[0]
|
||||||
else:
|
else:
|
||||||
owned = True
|
owned = True
|
||||||
if "/" not in sane_path:
|
if "/" not in sane_path:
|
||||||
return rights.intersect_permissions(permissions,
|
return "RW" if owned else "R"
|
||||||
"RW" if owned else "R")
|
|
||||||
if sane_path.count("/") == 1:
|
if sane_path.count("/") == 1:
|
||||||
return rights.intersect_permissions(permissions,
|
return "rw" if owned else "r"
|
||||||
"rw" if owned else "r")
|
|
||||||
return ""
|
return ""
|
||||||
|
@ -23,8 +23,8 @@ from radicale import pathutils, rights
|
|||||||
|
|
||||||
|
|
||||||
class Rights(rights.BaseRights):
|
class Rights(rights.BaseRights):
|
||||||
def authorized(self, user, path, permissions):
|
def authorization(self, user, path):
|
||||||
sane_path = pathutils.strip_path(path)
|
sane_path = pathutils.strip_path(path)
|
||||||
if sane_path not in ("tmp", "other"):
|
if sane_path not in ("tmp", "other"):
|
||||||
return ""
|
return ""
|
||||||
return rights.intersect_permissions(permissions)
|
return "RrWw"
|
||||||
|
Loading…
Reference in New Issue
Block a user