Let rights plugins decide if access to item is granted
This commit is contained in:
parent
04c51d2ced
commit
5669433f58
@ -217,7 +217,7 @@ class Application:
|
||||
self.logger = logger
|
||||
self.Auth = auth.load(configuration, logger)
|
||||
self.Collection = storage.load(configuration, logger)
|
||||
self.authorized = rights.load(configuration, logger)
|
||||
self.Rights = rights.load(configuration, logger)
|
||||
self.web = web.load(configuration, logger)
|
||||
self.encoding = configuration.get("encoding", "request")
|
||||
|
||||
@ -269,30 +269,28 @@ class Application:
|
||||
read_allowed_items = []
|
||||
write_allowed_items = []
|
||||
for item in items:
|
||||
if not item:
|
||||
continue
|
||||
if isinstance(item, storage.BaseCollection):
|
||||
path = item.path
|
||||
path = storage.sanitize_path("/%s/" % item.path)
|
||||
can_read = self.Rights.authorized(user, path, "r")
|
||||
can_write = self.Rights.authorized(user, path, "w")
|
||||
target = "collection %r" % item.path
|
||||
else:
|
||||
path = item.collection.path
|
||||
if self.authorized(user, path, "r"):
|
||||
self.logger.debug(
|
||||
"%s has read access to collection %r",
|
||||
"%r" % user if user else "anonymous user", path)
|
||||
path = storage.sanitize_path("/%s/%s" % (item.collection.path,
|
||||
item.href))
|
||||
can_read = self.Rights.authorized_item(user, path, "r")
|
||||
can_write = self.Rights.authorized_item(user, path, "w")
|
||||
target = "item %r from %r" % (item.href, item.collection.path)
|
||||
text_status = []
|
||||
if can_read:
|
||||
text_status.append("read")
|
||||
read_allowed_items.append(item)
|
||||
else:
|
||||
self.logger.debug(
|
||||
"%s has NO read access to collection %r",
|
||||
"%r" % user if user else "anonymous user", path)
|
||||
if self.authorized(user, path, "w"):
|
||||
self.logger.debug(
|
||||
"%s has write access to collection %s",
|
||||
"%r" % user if user else "anonymous user", path)
|
||||
if can_write:
|
||||
text_status.append("write")
|
||||
write_allowed_items.append(item)
|
||||
else:
|
||||
self.logger.debug(
|
||||
"%s has NO write access to collection %s",
|
||||
"%r" % user if user else "anonymous user", path)
|
||||
"%s has %s access to %s",
|
||||
"%r" % user if user else "anonymous user",
|
||||
" and ".join(text_status) if text_status else "NO", target)
|
||||
return read_allowed_items, write_allowed_items
|
||||
|
||||
def __call__(self, environ, start_response):
|
||||
@ -434,7 +432,7 @@ class Application:
|
||||
# Create principal collection
|
||||
if user and is_authenticated:
|
||||
principal_path = "/%s/" % user
|
||||
if self.authorized(user, principal_path, "w"):
|
||||
if self.Rights.authorized(user, principal_path, "w"):
|
||||
with self.Collection.acquire_lock("r", user):
|
||||
principal = next(
|
||||
self.Collection.discover(principal_path, depth="1"),
|
||||
@ -489,14 +487,11 @@ class Application:
|
||||
If ``item`` is given, only access to that class of item is checked.
|
||||
|
||||
"""
|
||||
path = storage.sanitize_path(path)
|
||||
parent_path = storage.sanitize_path(
|
||||
"/%s/" % posixpath.dirname(path.strip("/")))
|
||||
allowed = False
|
||||
if not item or isinstance(item, storage.BaseCollection):
|
||||
allowed |= self.authorized(user, path, permission)
|
||||
allowed |= self.Rights.authorized(user, path, permission)
|
||||
if not item or not isinstance(item, storage.BaseCollection):
|
||||
allowed |= self.authorized(user, parent_path, permission)
|
||||
allowed |= self.Rights.authorized_item(user, path, permission)
|
||||
return allowed
|
||||
|
||||
def _read_raw_content(self, environ):
|
||||
@ -604,7 +599,7 @@ class Application:
|
||||
|
||||
def do_MKCALENDAR(self, environ, base_prefix, path, user):
|
||||
"""Manage MKCALENDAR request."""
|
||||
if not self.authorized(user, path, "w"):
|
||||
if not self.Rights.authorized(user, path, "w"):
|
||||
return NOT_ALLOWED
|
||||
try:
|
||||
xml_content = self._read_xml_content(environ)
|
||||
@ -630,7 +625,7 @@ class Application:
|
||||
|
||||
def do_MKCOL(self, environ, base_prefix, path, user):
|
||||
"""Manage MKCOL request."""
|
||||
if not self.authorized(user, path, "w"):
|
||||
if not self.Rights.authorized(user, path, "w"):
|
||||
return NOT_ALLOWED
|
||||
try:
|
||||
xml_content = self._read_xml_content(environ)
|
||||
@ -741,7 +736,7 @@ class Application:
|
||||
|
||||
def do_PROPPATCH(self, environ, base_prefix, path, user):
|
||||
"""Manage PROPPATCH request."""
|
||||
if not self.authorized(user, path, "w"):
|
||||
if not self.Rights.authorized(user, path, "w"):
|
||||
return NOT_ALLOWED
|
||||
try:
|
||||
xml_content = self._read_xml_content(environ)
|
||||
@ -783,9 +778,9 @@ class Application:
|
||||
parent_item.get_meta("tag") not in (
|
||||
"VADDRESSBOOK", "VCALENDAR")))
|
||||
if write_whole_collection:
|
||||
if not self.authorized(user, path, "w"):
|
||||
if not self.Rights.authorized(user, path, "w"):
|
||||
return NOT_ALLOWED
|
||||
elif not self.authorized(user, parent_path, "w"):
|
||||
elif not self.Rights.authorized_item(user, path, "w"):
|
||||
return NOT_ALLOWED
|
||||
|
||||
etag = environ.get("HTTP_IF_MATCH", "")
|
||||
|
@ -39,6 +39,7 @@ Leading or ending slashes are trimmed from collection's path.
|
||||
|
||||
import configparser
|
||||
import os.path
|
||||
import posixpath
|
||||
import re
|
||||
from importlib import import_module
|
||||
|
||||
@ -67,7 +68,7 @@ def load(configuration, logger):
|
||||
raise RuntimeError("Failed to load rights module %r: %s" %
|
||||
(rights_type, e)) from e
|
||||
logger.info("Rights type is %r", rights_type)
|
||||
return rights_class(configuration, logger).authorized
|
||||
return rights_class(configuration, logger)
|
||||
|
||||
|
||||
class BaseRights:
|
||||
@ -75,7 +76,7 @@ class BaseRights:
|
||||
self.configuration = configuration
|
||||
self.logger = logger
|
||||
|
||||
def authorized(self, user, collection, permission):
|
||||
def authorized(self, user, path, permission):
|
||||
"""Check if the user is allowed to read or write the collection.
|
||||
|
||||
If the user is empty, check for anonymous rights.
|
||||
@ -83,6 +84,13 @@ class BaseRights:
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def authorized_item(self, user, path, permission):
|
||||
"""Check if the user is allowed to read or write the item."""
|
||||
path = storage.sanitize_path(path)
|
||||
parent_path = storage.sanitize_path(
|
||||
"/%s/" % posixpath.dirname(path.strip("/")))
|
||||
return self.authorized(user, parent_path, permission)
|
||||
|
||||
|
||||
class NoneRights(BaseRights):
|
||||
def authorized(self, user, path, permission):
|
||||
@ -105,7 +113,7 @@ class OwnerOnlyRights(BaseRights):
|
||||
def authorized(self, user, path, permission):
|
||||
sane_path = storage.sanitize_path(path).strip("/")
|
||||
return bool(user) and (
|
||||
permission == "r" and not sane_path.strip("/") or
|
||||
permission == "r" and not sane_path or
|
||||
user == sane_path.split("/", maxsplit=1)[0])
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user