diff --git a/radicale/__init__.py b/radicale/__init__.py
index 8cee0de..d382894 100644
--- a/radicale/__init__.py
+++ b/radicale/__init__.py
@@ -29,6 +29,7 @@ should have been included in this package.
import base64
import contextlib
import os
+import posixpath
import pprint
import shlex
import socket
@@ -38,6 +39,7 @@ import subprocess
import threading
import wsgiref.simple_server
import zlib
+from contextlib import contextmanager
from http import client
from urllib.parse import unquote, urlparse
@@ -184,60 +186,31 @@ class Application:
def collect_allowed_items(self, items, user):
"""Get items from request that user is allowed to access."""
- read_last_collection_allowed = None
- write_last_collection_allowed = None
read_allowed_items = []
write_allowed_items = []
-
for item in items:
if isinstance(item, self.Collection):
- if self.authorized(user, item, "r"):
- self.logger.debug(
- "%s has read access to collection %s" %
- (user or "Anonymous", item.path or "/"))
- read_last_collection_allowed = True
- read_allowed_items.append(item)
- else:
- self.logger.debug(
- "%s has NO read access to collection %s" %
- (user or "Anonymous", item.path or "/"))
- read_last_collection_allowed = False
-
- if self.authorized(user, item, "w"):
- self.logger.debug(
- "%s has write access to collection %s" %
- (user or "Anonymous", item.path or "/"))
- write_last_collection_allowed = True
- write_allowed_items.append(item)
- else:
- self.logger.debug(
- "%s has NO write access to collection %s" %
- (user or "Anonymous", item.path or "/"))
- write_last_collection_allowed = False
+ path = item.path
else:
- # item is not a collection, it's the child of the last
- # collection we've met in the loop. Only add this item
- # if this last collection was allowed.
- if read_last_collection_allowed:
- self.logger.debug(
- "%s has read access to item %s" %
- (user or "Anonymous", item.href))
- read_allowed_items.append(item)
- else:
- self.logger.debug(
- "%s has NO read access to item %s" %
- (user or "Anonymous", item.href))
-
- if write_last_collection_allowed:
- self.logger.debug(
- "%s has write access to item %s" %
- (user or "Anonymous", item.href))
- write_allowed_items.append(item)
- else:
- self.logger.debug(
- "%s has NO write access to item %s" %
- (user or "Anonymous", item.href))
-
+ path = item.collection.path
+ if self.authorized(user, path, "r"):
+ self.logger.debug(
+ "%s has read access to collection %s" %
+ (user or "Anonymous", path or "/"))
+ read_allowed_items.append(item)
+ else:
+ self.logger.debug(
+ "%s has NO read access to collection %s" %
+ (user or "Anonymous", path or "/"))
+ if self.authorized(user, path, "w"):
+ self.logger.debug(
+ "%s has write access to collection %s" %
+ (user or "Anonymous", path or "/"))
+ write_allowed_items.append(item)
+ else:
+ self.logger.debug(
+ "%s has NO write access to collection %s" %
+ (user or "Anonymous", path or "/"))
return read_allowed_items, write_allowed_items
def __call__(self, environ, start_response):
@@ -305,12 +278,11 @@ class Application:
# Create principal collection
if user and is_authenticated:
principal_path = "/%s/" % user
- collection = self.Collection(principal_path, True)
- if self.authorized(user, collection, "w"):
+ if self.authorized(user, principal_path, "w"):
with self.Collection.acquire_lock("r"):
principal = next(
self.Collection.discover(principal_path), None)
- if not principal or principal.path != user:
+ if not principal:
with self.Collection.acquire_lock("w"):
self.Collection.create_collection(principal_path)
@@ -333,34 +305,7 @@ class Application:
content = None
if is_valid_user:
- if function in (
- self.do_GET, self.do_HEAD, self.do_OPTIONS,
- self.do_PROPFIND, self.do_REPORT):
- lock_mode = "r"
- else:
- lock_mode = "w"
- with self.Collection.acquire_lock(lock_mode):
- items = self.Collection.discover(
- path, environ.get("HTTP_DEPTH", "0"))
- read_allowed_items, write_allowed_items = (
- self.collect_allowed_items(items, user))
- if (read_allowed_items or write_allowed_items or
- is_authenticated and function == self.do_PROPFIND or
- function == self.do_OPTIONS):
- status, headers, answer = function(
- environ, read_allowed_items, write_allowed_items,
- content, user)
- hook = self.configuration.get("storage", "hook")
- if lock_mode == "w" and hook:
- self.logger.debug("Running hook")
- folder = os.path.expanduser(
- self.configuration.get(
- "storage", "filesystem_folder"))
- subprocess.check_call(
- hook % {"user": shlex.quote(user or "Anonymous")},
- shell=True, cwd=folder)
- else:
- status, headers, answer = NOT_ALLOWED
+ status, headers, answer = function(environ, path, content, user)
else:
status, headers, answer = NOT_ALLOWED
@@ -397,144 +342,154 @@ class Application:
return response(status, headers, answer)
- def do_DELETE(self, environ, read_collections, write_collections, content,
- user):
+ def _access(self, user, path, permission, item=None):
+ """Checks if ``user`` can access ``path`` or the parent collection
+ with ``permission``.
+
+ ``permission`` must either be "r" or "w".
+
+ If ``item`` is given, only access to that class of item is checked.
+
+ """
+ path = storage.sanitize_path(path)
+ parent_path = storage.sanitize_path(
+ "/%s/" % posixpath.dirname(path.strip("/")))
+ allowed = False
+ if not item or isinstance(item, self.Collection):
+ allowed |= self.authorized(user, path, permission)
+ if not item or not isinstance(item, self.Collection):
+ allowed |= self.authorized(user, parent_path, permission)
+ return allowed
+
+ @contextmanager
+ def _lock_collection(self, lock_mode, user):
+ """Lock the collection with ``permission`` and execute hook."""
+ with self.Collection.acquire_lock(lock_mode) as value:
+ yield value
+ hook = self.configuration.get("storage", "hook")
+ if lock_mode == "w" and hook:
+ self.logger.debug("Running hook")
+ folder = os.path.expanduser(self.configuration.get(
+ "storage", "filesystem_folder"))
+ subprocess.check_call(
+ hook % {"user": shlex.quote(user or "Anonymous")},
+ shell=True, cwd=folder)
+
+ def do_DELETE(self, environ, path, content, user):
"""Manage DELETE request."""
- if not write_collections:
+ if not self._access(user, path, "w"):
return NOT_ALLOWED
-
- collection = write_collections[0]
-
- if collection.path == environ["PATH_INFO"].strip("/"):
- # Path matching the collection, the collection must be deleted
- item = collection
- else:
- # Try to get an item matching the path
- name = xmlutils.name_from_path(environ["PATH_INFO"], collection)
- item = collection.get(name)
-
- if item:
+ with self._lock_collection("w", user):
+ item = next(self.Collection.discover(path, depth="0"), None)
+ if not self._access(user, path, "w", item):
+ return NOT_ALLOWED
+ if not item:
+ return client.GONE, {}, None
if_match = environ.get("HTTP_IF_MATCH", "*")
- if if_match in ("*", item.etag):
- # No ETag precondition or precondition verified, delete item
- answer = xmlutils.delete(environ["PATH_INFO"], collection)
- return client.OK, {}, answer
+ if if_match not in ("*", item.etag):
+ # ETag precondition not verified, do not delete item
+ return client.PRECONDITION_FAILED, {}, None
+ if isinstance(item, self.Collection):
+ answer = xmlutils.delete(path, item)
+ else:
+ answer = xmlutils.delete(path, item.collection, item.href)
+ return client.OK, {}, answer
- # No item or ETag precondition not verified, do not delete item
- return client.PRECONDITION_FAILED, {}, None
-
- def do_GET(self, environ, read_collections, write_collections, content,
- user):
+ def do_GET(self, environ, path, content, user):
"""Manage GET request."""
# Display a "Radicale works!" message if the root URL is requested
- if environ["PATH_INFO"] == "/":
+ if not path.strip("/"):
headers = {"Content-type": "text/html"}
answer = "\n
RadicaleRadicale works!"
return client.OK, headers, answer
-
- if not read_collections:
+ if not self._access(user, path, "r"):
return NOT_ALLOWED
-
- collection = read_collections[0]
-
- item_name = xmlutils.name_from_path(environ["PATH_INFO"], collection)
-
- if item_name:
- # Get collection item
- item = collection.get(item_name)
- if item:
- answer = item.serialize()
- etag = item.etag
- else:
- return client.NOT_FOUND, {}, None
- else:
- # Get whole collection
- answer = collection.serialize()
- if answer is None:
+ with self._lock_collection("r", user):
+ item = next(self.Collection.discover(path, depth="0"), None)
+ if not self._access(user, path, "r", item):
+ return NOT_ALLOWED
+ if not item:
return client.NOT_FOUND, {}, None
+ if isinstance(item, self.Collection):
+ collection = item
else:
- etag = collection.etag
-
- if answer:
+ collection = item.collection
+ content_type = storage.MIMETYPES.get(collection.get_meta("tag"),
+ "text/plain")
headers = {
- "Content-Type": storage.MIMETYPES[collection.get_meta("tag")],
+ "Content-Type": content_type,
"Last-Modified": collection.last_modified,
- "ETag": etag}
- else:
- headers = {}
- return client.OK, headers, answer
+ "ETag": item.etag}
+ answer = item.serialize()
+ return client.OK, headers, answer
- def do_HEAD(self, environ, read_collections, write_collections, content,
- user):
+ def do_HEAD(self, environ, path, content, user):
"""Manage HEAD request."""
- status, headers, answer = self.do_GET(
- environ, read_collections, write_collections, content, user)
+ status, headers, answer = self.do_GET(environ, path, content, user)
return status, headers, None
- def do_MKCALENDAR(self, environ, read_collections, write_collections,
- content, user):
+ def do_MKCALENDAR(self, environ, path, content, user):
"""Manage MKCALENDAR request."""
- if not write_collections:
+ if not self.authorized(user, path, "w"):
return NOT_ALLOWED
+ with self._lock_collection("w", user):
+ item = next(self.Collection.discover(path, depth="0"), None)
+ if item:
+ return client.CONFLICT, {}, None
+ props = xmlutils.props_from_request(content)
+ # TODO: use this?
+ # timezone = props.get("C:calendar-timezone")
+ collection = self.Collection.create_collection(path,
+ tag="VCALENDAR")
+ for key, value in props.items():
+ collection.set_meta(key, value)
+ return client.CREATED, {}, None
- collection = write_collections[0]
-
- props = xmlutils.props_from_request(content)
- # TODO: use this?
- # timezone = props.get("C:calendar-timezone")
- collection = self.Collection.create_collection(
- environ["PATH_INFO"], tag="VCALENDAR")
- for key, value in props.items():
- collection.set_meta(key, value)
- return client.CREATED, {}, None
-
- def do_MKCOL(self, environ, read_collections, write_collections, content,
- user):
+ def do_MKCOL(self, environ, path, content, user):
"""Manage MKCOL request."""
- if not write_collections:
+ if not self.authorized(user, path, "w"):
return NOT_ALLOWED
+ with self._lock_collection("w", user):
+ item = next(self.Collection.discover(path, depth="0"), None)
+ if item:
+ return client.CONFLICT, {}, None
+ props = xmlutils.props_from_request(content)
+ collection = self.Collection.create_collection(path)
+ for key, value in props.items():
+ collection.set_meta(key, value)
+ return client.CREATED, {}, None
- collection = write_collections[0]
-
- props = xmlutils.props_from_request(content)
- collection = self.Collection.create_collection(environ["PATH_INFO"])
- for key, value in props.items():
- collection.set_meta(key, value)
- return client.CREATED, {}, None
-
- def do_MOVE(self, environ, read_collections, write_collections, content,
- user):
+ def do_MOVE(self, environ, path, content, user):
"""Manage MOVE request."""
- if not write_collections:
+ to_url = urlparse(environ["HTTP_DESTINATION"])
+ if to_url.netloc != environ["HTTP_HOST"]:
+ # Remote destination server, not supported
+ return client.BAD_GATEWAY, {}, None
+ to_path = storage.sanitize_path(to_url.path)
+ if (not self._access(user, path, "w") or
+ not self._access(user, to_path, "w")):
return NOT_ALLOWED
+ with self._lock_collection("w", user):
+ item = next(self.Collection.discover(path, depth="0"), None)
+ if (not self._access(user, path, "w", item) or
+ not self._access(user, to_path, "w", item)):
+ return NOT_ALLOWED
+ if not item:
+ return client.GONE, {}, None
+ to_item = next(self.Collection.discover(to_path, depth="0"), None)
+ to_parent_path = storage.sanitize_path(
+ "/%s/" % posixpath.dirname(to_path.strip("/")))
+ to_href = posixpath.basename(to_path.strip("/"))
+ to_collection = next(self.Collection.discover(
+ to_parent_path, depth="0"), None)
+ if (isinstance(item, self.Collection) or not to_collection or
+ to_item or to_path.strip("/").startswith(path.strip("/"))):
+ return client.CONFLICT, {}, None
+ to_collection.upload(to_href, item.item)
+ item.collection.delete(item.href)
+ return client.CREATED, {}, None
- from_collection = write_collections[0]
- from_name = xmlutils.name_from_path(
- environ["PATH_INFO"], from_collection)
- item = from_collection.get(from_name)
- if item:
- # Move the item
- to_url_parts = urlparse(environ["HTTP_DESTINATION"])
- if to_url_parts.netloc == environ["HTTP_HOST"]:
- to_url = to_url_parts.path
- to_path, to_name = to_url.rstrip("/").rsplit("/", 1)
- for to_collection in self.Collection.discover(
- to_path, depth="0"):
- if to_collection in write_collections:
- to_collection.upload(to_name, item)
- from_collection.delete(from_name)
- return client.CREATED, {}, None
- else:
- return NOT_ALLOWED
- else:
- # Remote destination server, not supported
- return client.BAD_GATEWAY, {}, None
- else:
- # No item found
- return client.GONE, {}, None
-
- def do_OPTIONS(self, environ, read_collections, write_collections,
- content, user):
+ def do_OPTIONS(self, environ, path, content, user):
"""Manage OPTIONS request."""
headers = {
"Allow": ("DELETE, HEAD, GET, MKCALENDAR, MKCOL, MOVE, "
@@ -542,94 +497,97 @@ class Application:
"DAV": "1, 2, 3, calendar-access, addressbook, extended-mkcol"}
return client.OK, headers, None
- def do_PROPFIND(self, environ, read_collections, write_collections,
- content, user):
+ def do_PROPFIND(self, environ, path, content, user):
"""Manage PROPFIND request."""
- if not read_collections:
- return (client.NOT_FOUND, {}, None) if user else NOT_ALLOWED
- headers = {
- "DAV": "1, 2, 3, calendar-access, addressbook, extended-mkcol",
- "Content-Type": "text/xml"}
- answer = xmlutils.propfind(
- environ["PATH_INFO"], content, read_collections, write_collections,
- user)
- return client.MULTI_STATUS, headers, answer
+ with self._lock_collection("r", user):
+ items = self.Collection.discover(path,
+ environ.get("HTTP_DEPTH", "0"))
+ read_items, write_items = self.collect_allowed_items(items, user)
+ if not read_items and not write_items:
+ return (client.NOT_FOUND, {}, None) if user else NOT_ALLOWED
+ headers = {
+ "DAV": "1, 2, 3, calendar-access, addressbook, extended-mkcol",
+ "Content-Type": "text/xml"}
+ answer = xmlutils.propfind(
+ path, content, read_items, write_items, user)
+ return client.MULTI_STATUS, headers, answer
- def do_PROPPATCH(self, environ, read_collections, write_collections,
- content, user):
+ def do_PROPPATCH(self, environ, path, content, user):
"""Manage PROPPATCH request."""
- if not write_collections:
+ if not self.authorized(user, path, "w"):
return NOT_ALLOWED
+ with self._lock_collection("w", user):
+ item = next(self.Collection.discover(path, depth="0"), None)
+ if not isinstance(item, self.Collection):
+ return client.CONFLICT, {}, None
+ answer = xmlutils.proppatch(path, content, item)
+ headers = {
+ "DAV": "1, 2, 3, calendar-access, addressbook, extended-mkcol",
+ "Content-Type": "text/xml"}
+ return client.MULTI_STATUS, headers, answer
- collection = write_collections[0]
-
- answer = xmlutils.proppatch(environ["PATH_INFO"], content, collection)
- headers = {
- "DAV": "1, 2, 3, calendar-access, addressbook, extended-mkcol",
- "Content-Type": "text/xml"}
- return client.MULTI_STATUS, headers, answer
-
- def do_PUT(self, environ, read_collections, write_collections, content,
- user):
+ def do_PUT(self, environ, path, content, user):
"""Manage PUT request."""
- if not write_collections:
+ if not self._access(user, path, "w"):
return NOT_ALLOWED
-
- collection = write_collections[0]
-
- content_type = environ.get("CONTENT_TYPE")
- if content_type:
- tags = {value: key for key, value in storage.MIMETYPES.items()}
- tag = tags.get(content_type.split(";")[0])
- if tag:
- collection.set_meta("tag", tag)
- headers = {}
- item_name = xmlutils.name_from_path(environ["PATH_INFO"], collection)
- item = collection.get(item_name)
-
- etag = environ.get("HTTP_IF_MATCH", "")
- match = environ.get("HTTP_IF_NONE_MATCH", "") == "*"
- if (not item and not etag) or (
- item and ((etag or item.etag) == item.etag) and not match):
- # PUT allowed in 3 cases
- # Case 1: No item and no ETag precondition: Add new item
- # Case 2: Item and ETag precondition verified: Modify item
- # Case 3: Item and no Etag precondition: Force modifying item
+ with self._lock_collection("w", user):
+ parent_path = storage.sanitize_path(
+ "/%s/" % posixpath.dirname(path.strip("/")))
+ item = next(self.Collection.discover(path, depth="0"), None)
+ parent_item = next(self.Collection.discover(
+ parent_path, depth="0"), None)
+ write_whole_collection = (
+ isinstance(item, self.Collection) or
+ not parent_item or
+ not next(parent_item.list(), None) and
+ parent_item.get_meta("tag") not in ("VADDRESSBOOK",
+ "VCALENDAR"))
+ if (write_whole_collection and
+ not self.authorized(user, path, "w") or
+ not write_whole_collection and
+ not self.authorized(user, parent_path, "w")):
+ return NOT_ALLOWED
+ etag = environ.get("HTTP_IF_MATCH", "")
+ match = environ.get("HTTP_IF_NONE_MATCH", "") == "*"
+ if ((not item and etag) or (item and etag and item.etag != etag) or
+ (item and match)):
+ return client.PRECONDITION_FAILED, {}, None
items = list(vobject.readComponents(content or ""))
- if item:
- # PUT is modifying an existing item
- if items:
- new_item = collection.update(item_name, items[0])
- else:
- new_item = None
- elif item_name:
- # PUT is adding a new item
- if items:
- new_item = collection.upload(item_name, items[0])
- else:
- new_item = None
+ content_type = environ.get("CONTENT_TYPE")
+ tag = None
+ if content_type:
+ tags = {value: key for key, value in storage.MIMETYPES.items()}
+ tag = tags.get(content_type.split(";")[0])
+ if write_whole_collection:
+ if item:
+ # Delete old collection
+ item.delete()
+ new_item = self.Collection.create_collection(path, items, tag)
else:
- # PUT is replacing the whole collection
- collection.delete()
- new_item = self.Collection.create_collection(
- environ["PATH_INFO"], items)
- if new_item:
- headers["ETag"] = new_item.etag
- status = client.CREATED
- else:
- # PUT rejected in all other cases
- status = client.PRECONDITION_FAILED
- return status, headers, None
+ if tag:
+ parent_item.set_meta("tag", tag)
+ href = posixpath.basename(path.strip("/"))
+ if item:
+ new_item = parent_item.update(href, items[0])
+ else:
+ new_item = parent_item.upload(href, items[0])
+ headers = {"ETag": new_item.etag}
+ return client.CREATED, headers, None
- def do_REPORT(self, environ, read_collections, write_collections, content,
- user):
+ def do_REPORT(self, environ, path, content, user):
"""Manage REPORT request."""
- if not read_collections:
+ if not self._access(user, path, "w"):
return NOT_ALLOWED
-
- collection = read_collections[0]
-
- headers = {"Content-Type": "text/xml"}
-
- answer = xmlutils.report(environ["PATH_INFO"], content, collection)
- return client.MULTI_STATUS, headers, answer
+ with self._lock_collection("r", user):
+ item = next(self.Collection.discover(path, depth="0"), None)
+ if not self._access(user, path, "w", item):
+ return NOT_ALLOWED
+ if not item:
+ return client.NOT_FOUND, {}, None
+ if isinstance(item, self.Collection):
+ collection = item
+ else:
+ collection = item.collection
+ headers = {"Content-Type": "text/xml"}
+ answer = xmlutils.report(path, content, collection)
+ return client.MULTI_STATUS, headers, answer
diff --git a/radicale/rights.py b/radicale/rights.py
index c70ad46..95e66e6 100644
--- a/radicale/rights.py
+++ b/radicale/rights.py
@@ -107,19 +107,17 @@ class Rights(BaseRights):
self.filename = os.path.expanduser(configuration.get("rights", "file"))
self.rights_type = configuration.get("rights", "type").lower()
- def authorized(self, user, collection, permission):
+ def authorized(self, user, path, permission):
user = user or ""
if user and not storage.is_safe_path_component(user):
# Prevent usernames like "user/calendar.ics"
raise ValueError("Unsafe username")
- collection_url = collection.path.rstrip("/")
- if collection_url in (".well-known/carddav", ".well-known/caldav"):
- return permission == "r"
+ sane_path = storage.sanitize_path(path).strip("/")
# Prevent "regex injection"
user_escaped = re.escape(user)
- collection_url_escaped = re.escape(collection_url)
+ sane_path_escaped = re.escape(sane_path)
regex = ConfigParser(
- {"login": user_escaped, "path": collection_url_escaped})
+ {"login": user_escaped, "path": sane_path_escaped})
if self.rights_type in DEFINED_RIGHTS:
self.logger.debug("Rights type '%s'" % self.rights_type)
regex.readfp(StringIO(DEFINED_RIGHTS[self.rights_type]))
@@ -135,11 +133,11 @@ class Rights(BaseRights):
re_collection = regex.get(section, "collection")
self.logger.debug(
"Test if '%s:%s' matches against '%s:%s' from section '%s'" % (
- user, collection_url, re_user, re_collection, section))
+ user, sane_path, re_user, re_collection, section))
user_match = re.fullmatch(re_user, user)
if user_match:
re_collection = re_collection.format(*user_match.groups())
- if re.fullmatch(re_collection, collection_url):
+ if re.fullmatch(re_collection, sane_path):
self.logger.debug("Section '%s' matches" % section)
return permission in regex.get(section, "permission")
else:
diff --git a/radicale/storage.py b/radicale/storage.py
index 10cffc1..20c358f 100644
--- a/radicale/storage.py
+++ b/radicale/storage.py
@@ -211,9 +211,7 @@ class BaseCollection:
returned.
If ``depth`` is anything but "0", it is considered as "1" and direct
- children are included in the result. If ``include_container`` is
- ``True`` (the default), the containing object is included in the
- result.
+ children are included in the result.
The ``path`` is relative.
@@ -368,41 +366,39 @@ class Collection(BaseCollection):
attributes = sane_path.split("/")
if not attributes[0]:
attributes.pop()
-
- # Try to guess if the path leads to a collection or an item
folder = os.path.expanduser(
cls.configuration.get("storage", "filesystem_folder"))
- if not os.path.isdir(path_to_filesystem(folder, sane_path)):
- # path is not a collection
- if attributes and os.path.isfile(path_to_filesystem(folder,
- sane_path)):
- # path is an item
- attributes.pop()
- elif attributes and os.path.isdir(path_to_filesystem(
- folder, *attributes[:-1])):
- # path parent is a collection
- attributes.pop()
- # TODO: else: return?
+ try:
+ filesystem_path = path_to_filesystem(folder, sane_path)
+ except ValueError:
+ # Path is unsafe
+ return
+ href = None
+ if not os.path.isdir(filesystem_path):
+ if attributes and os.path.isfile(filesystem_path):
+ href = attributes.pop()
+ else:
+ return
path = "/".join(attributes)
-
principal = len(attributes) == 1
collection = cls(path, principal)
+ if href:
+ yield collection.get(href)
+ return
yield collection
- if depth != "0":
- # TODO: fix this
- items = list(collection.list())
- if items:
- for item in items:
- yield collection.get(item[0])
- _, directories, _ = next(os.walk(collection._filesystem_path))
- for sub_path in directories:
- if not is_safe_filesystem_path_component(sub_path):
- cls.logger.debug("Skipping collection: %s", sub_path)
- continue
- full_path = os.path.join(collection._filesystem_path, sub_path)
- if os.path.exists(full_path):
- yield cls(posixpath.join(path, sub_path))
+ if depth == "0":
+ return
+ for item in collection.list():
+ yield collection.get(item[0])
+ for href in os.listdir(filesystem_path):
+ if not is_safe_filesystem_path_component(href):
+ cls.logger.debug("Skipping collection: %s", href)
+ continue
+ child_filesystem_path = path_to_filesystem(filesystem_path, href)
+ if os.path.isdir(child_filesystem_path):
+ child_principal = len(attributes) == 0
+ yield cls(child_filesystem_path, child_principal)
@classmethod
def create_collection(cls, href, collection=None, tag=None):
diff --git a/radicale/tests/test_base.py b/radicale/tests/test_base.py
index 84bad8f..f846f0d 100644
--- a/radicale/tests/test_base.py
+++ b/radicale/tests/test_base.py
@@ -133,6 +133,36 @@ class BaseRequests:
status, headers, answer = self.request("GET", "/calendar.ics/")
assert "VEVENT" not in answer
+ def test_mkcalendar(self):
+ """Make a calendar."""
+ self.request("MKCALENDAR", "/calendar.ics/")
+ status, headers, answer = self.request("GET", "/calendar.ics/")
+ assert status == 200
+
+ def test_move(self):
+ """Move a item."""
+ self.request("MKCALENDAR", "/calendar.ics/")
+ event = get_file_content("event1.ics")
+ path1 = "/calendar.ics/event1.ics"
+ path2 = "/calendar.ics/event2.ics"
+ status, headers, answer = self.request("PUT", path1, event)
+ status, headers, answer = self.request(
+ "MOVE", path1, HTTP_DESTINATION=path2, HTTP_HOST="")
+ assert status == 201
+ status, headers, answer = self.request("GET", path1)
+ assert status == 404
+ status, headers, answer = self.request("GET", path2)
+ assert status == 200
+
+ def test_head(self):
+ status, headers, answer = self.request("HEAD", "/")
+ assert status == 200
+
+ def test_options(self):
+ status, headers, answer = self.request("OPTIONS", "/")
+ assert status == 200
+ assert "DAV" in headers
+
def test_multiple_events_with_same_uid(self):
"""Add two events with the same UID."""
self.request("MKCOL", "/calendar.ics/")
diff --git a/radicale/xmlutils.py b/radicale/xmlutils.py
index 9134102..a48a3e5 100644
--- a/radicale/xmlutils.py
+++ b/radicale/xmlutils.py
@@ -472,20 +472,13 @@ def props_from_request(root, actions=("set", "remove")):
return result
-def delete(path, collection):
+def delete(path, collection, href=None):
"""Read and answer DELETE requests.
Read rfc4918-9.6 for info.
"""
- # Reading request
- if collection.path == path.strip("/"):
- # Delete the whole collection
- collection.delete()
- else:
- # Remove an item from the collection
- collection.delete(name_from_path(path, collection))
-
+ collection.delete(href)
# Writing answer
multistatus = ET.Element(_tag("D", "multistatus"))
response = ET.Element(_tag("D", "response"))