assert sanitized and stripped paths
This commit is contained in:
@ -324,8 +324,8 @@ class Application(
|
||||
if permissions and self.Rights.authorized(user, path, permissions):
|
||||
return True
|
||||
if parent_permissions:
|
||||
parent_path = pathutils.sanitize_path(
|
||||
"/%s/" % posixpath.dirname(path.strip("/")))
|
||||
parent_path = pathutils.unstrip_path(
|
||||
posixpath.dirname(pathutils.strip_path(path)), True)
|
||||
if self.Rights.authorized(user, parent_path, parent_permissions):
|
||||
return True
|
||||
return False
|
||||
|
@ -28,7 +28,7 @@ import posixpath
|
||||
from http import client
|
||||
from urllib.parse import quote
|
||||
|
||||
from radicale import httputils, storage, xmlutils
|
||||
from radicale import httputils, pathutils, storage, xmlutils
|
||||
from radicale.log import logger
|
||||
|
||||
|
||||
@ -66,7 +66,7 @@ class ApplicationGetMixin:
|
||||
def do_GET(self, environ, base_prefix, path, user):
|
||||
"""Manage GET request."""
|
||||
# Redirect to .web if the root URL is requested
|
||||
if not path.strip("/"):
|
||||
if not pathutils.strip_path(path):
|
||||
web_path = ".web"
|
||||
if not environ.get("PATH_INFO"):
|
||||
web_path = posixpath.join(posixpath.basename(base_prefix),
|
||||
|
@ -63,8 +63,8 @@ class ApplicationMkcalendarMixin:
|
||||
if item:
|
||||
return self.webdav_error_response(
|
||||
"D", "resource-must-be-null")
|
||||
parent_path = pathutils.sanitize_path(
|
||||
"/%s/" % posixpath.dirname(path.strip("/")))
|
||||
parent_path = pathutils.unstrip_path(
|
||||
posixpath.dirname(pathutils.strip_path(path)), True)
|
||||
parent_item = next(self.Collection.discover(parent_path), None)
|
||||
if not parent_item:
|
||||
return httputils.CONFLICT
|
||||
|
@ -64,8 +64,8 @@ class ApplicationMkcolMixin:
|
||||
item = next(self.Collection.discover(path), None)
|
||||
if item:
|
||||
return httputils.METHOD_NOT_ALLOWED
|
||||
parent_path = pathutils.sanitize_path(
|
||||
"/%s/" % posixpath.dirname(path.strip("/")))
|
||||
parent_path = pathutils.unstrip_path(
|
||||
posixpath.dirname(pathutils.strip_path(path)), True)
|
||||
parent_item = next(self.Collection.discover(parent_path), None)
|
||||
if not parent_item:
|
||||
return httputils.CONFLICT
|
||||
|
@ -66,8 +66,8 @@ class ApplicationMoveMixin:
|
||||
to_item = next(self.Collection.discover(to_path), None)
|
||||
if isinstance(to_item, storage.BaseCollection):
|
||||
return httputils.FORBIDDEN
|
||||
to_parent_path = pathutils.sanitize_path(
|
||||
"/%s/" % posixpath.dirname(to_path.strip("/")))
|
||||
to_parent_path = pathutils.unstrip_path(
|
||||
posixpath.dirname(pathutils.strip_path(to_path)), True)
|
||||
to_collection = next(
|
||||
self.Collection.discover(to_parent_path), None)
|
||||
if not to_collection:
|
||||
@ -83,7 +83,7 @@ class ApplicationMoveMixin:
|
||||
to_collection.has_uid(item.uid)):
|
||||
return self.webdav_error_response(
|
||||
"C" if tag == "VCALENDAR" else "CR", "no-uid-conflict")
|
||||
to_href = posixpath.basename(to_path.strip("/"))
|
||||
to_href = posixpath.basename(pathutils.strip_path(to_path))
|
||||
try:
|
||||
self.Collection.move(item, to_collection, to_href)
|
||||
except ValueError as e:
|
||||
|
@ -95,9 +95,10 @@ def xml_propfind_response(base_prefix, path, item, props, user, write=False,
|
||||
href = ET.Element(xmlutils.make_tag("D", "href"))
|
||||
if is_collection:
|
||||
# Some clients expect collections to end with /
|
||||
uri = "/%s/" % item.path if item.path else "/"
|
||||
uri = pathutils.unstrip_path(item.path, True)
|
||||
else:
|
||||
uri = "/" + posixpath.join(collection.path, item.href)
|
||||
uri = pathutils.unstrip_path(
|
||||
posixpath.join(collection.path, item.href))
|
||||
|
||||
href.text = xmlutils.make_href(base_prefix, uri)
|
||||
response.append(href)
|
||||
@ -335,7 +336,7 @@ class ApplicationPropfindMixin:
|
||||
"""Get items from request that user is allowed to access."""
|
||||
for item in items:
|
||||
if isinstance(item, storage.BaseCollection):
|
||||
path = pathutils.sanitize_path("/%s/" % item.path)
|
||||
path = pathutils.unstrip_path(item.path, True)
|
||||
if item.get_meta("tag"):
|
||||
permissions = self.Rights.authorized(user, path, "rw")
|
||||
target = "collection with tag %r" % item.path
|
||||
@ -343,7 +344,7 @@ class ApplicationPropfindMixin:
|
||||
permissions = self.Rights.authorized(user, path, "RW")
|
||||
target = "collection %r" % item.path
|
||||
else:
|
||||
path = pathutils.sanitize_path("/%s/" % item.collection.path)
|
||||
path = pathutils.unstrip_path(item.collection.path, True)
|
||||
permissions = self.Rights.authorized(user, path, "rw")
|
||||
target = "item %r from %r" % (item.href, item.collection.path)
|
||||
if rights.intersect_permissions(permissions, "Ww"):
|
||||
|
@ -52,8 +52,8 @@ class ApplicationPutMixin:
|
||||
logger.debug("client timed out", exc_info=True)
|
||||
return httputils.REQUEST_TIMEOUT
|
||||
# Prepare before locking
|
||||
parent_path = pathutils.sanitize_path(
|
||||
"/%s/" % posixpath.dirname(path.strip("/")))
|
||||
parent_path = pathutils.unstrip_path(
|
||||
posixpath.dirname(pathutils.strip_path(path)), True)
|
||||
permissions = self.Rights.authorized(user, path, "Ww")
|
||||
parent_permissions = self.Rights.authorized(user, parent_path, "w")
|
||||
|
||||
@ -69,7 +69,7 @@ class ApplicationPutMixin:
|
||||
vobject_items, tags.get(content_type))
|
||||
if not tag:
|
||||
raise ValueError("Can't determine collection tag")
|
||||
collection_path = pathutils.sanitize_path(path).strip("/")
|
||||
collection_path = pathutils.strip_path(path)
|
||||
elif (write_whole_collection is not None and
|
||||
not write_whole_collection or
|
||||
not permissions and parent_permissions):
|
||||
@ -78,7 +78,7 @@ class ApplicationPutMixin:
|
||||
tag = storage.predict_tag_of_parent_collection(
|
||||
vobject_items)
|
||||
collection_path = posixpath.dirname(
|
||||
pathutils.sanitize_path(path).strip("/"))
|
||||
pathutils.strip_path(path))
|
||||
props = None
|
||||
stored_exc_info = None
|
||||
items = []
|
||||
@ -218,7 +218,7 @@ class ApplicationPutMixin:
|
||||
"C" if tag == "VCALENDAR" else "CR",
|
||||
"no-uid-conflict")
|
||||
|
||||
href = posixpath.basename(path.strip("/"))
|
||||
href = posixpath.basename(pathutils.strip_path(path))
|
||||
try:
|
||||
etag = parent_item.upload(href, prepared_item).etag
|
||||
except ValueError as e:
|
||||
|
@ -100,7 +100,8 @@ def xml_report(base_prefix, path, xml_request, collection, unlock_storage_fn):
|
||||
old_sync_token, e, exc_info=True)
|
||||
return (client.CONFLICT,
|
||||
xmlutils.webdav_error("D", "valid-sync-token"))
|
||||
hreferences = ("/" + posixpath.join(collection.path, n) for n in names)
|
||||
hreferences = (pathutils.unstrip_path(
|
||||
posixpath.join(collection.path, n)) for n in names)
|
||||
# Append current sync token to response
|
||||
sync_token_element = ET.Element(xmlutils.make_tag("D", "sync-token"))
|
||||
sync_token_element.text = sync_token
|
||||
@ -142,7 +143,8 @@ def xml_report(base_prefix, path, xml_request, collection, unlock_storage_fn):
|
||||
|
||||
for name, item in collection.get_multi(get_names()):
|
||||
if not item:
|
||||
uri = "/" + posixpath.join(collection.path, name)
|
||||
uri = pathutils.unstrip_path(
|
||||
posixpath.join(collection.path, name))
|
||||
response = xml_item_response(base_prefix, uri,
|
||||
found_item=False)
|
||||
multistatus.append(response)
|
||||
@ -223,7 +225,8 @@ def xml_report(base_prefix, path, xml_request, collection, unlock_storage_fn):
|
||||
else:
|
||||
not_found_props.append(element)
|
||||
|
||||
uri = "/" + posixpath.join(collection.path, item.href)
|
||||
uri = pathutils.unstrip_path(
|
||||
posixpath.join(collection.path, item.href))
|
||||
multistatus.append(xml_item_response(
|
||||
base_prefix, uri, found_props=found_props,
|
||||
not_found_props=not_found_props, found_item=True))
|
||||
|
Reference in New Issue
Block a user