From 066b5994d11b8e1b0995126d83ed575c57262fd3 Mon Sep 17 00:00:00 2001 From: Unrud Date: Thu, 4 Aug 2016 06:08:08 +0200 Subject: [PATCH] Improve rights checking and request handlers * Access rights are checked before the storage is locked and collections are loaded. * DELETE sends 410 instead of doing nothing or crashing if the target doesn't exist. * GET always returns 404 if the target doesn't exist. * GET doesn't crash if a collection without tag property is requested. * MKCOL and MKCALENDAR send 409 if the target already exists. * MOVE checks if the target collection of an item actually exists and sends 409 otherwise. * PUT doesn't crash if a whole collection that doesn't exist yet is uploaded and ``content-type`` is ``text/vcard`` or ``text/calendar``. * PUT distinguishes between simple items and whole collections by the following criteria: Target is a collection; Parent exists; Parent has the tag property set; Parent contains other items. Before only the first two criteria where used, which was very unrelieable. #384 * PROPPATCH is only allowed on collections and 409 is send otherwise. * ``Rights.authorized`` takes a path instead of a collection. * ``Collection.discover`` only returns items in ``path``, that actually exist. #442 --- radicale/__init__.py | 498 +++++++++++++++++------------------- radicale/rights.py | 14 +- radicale/storage.py | 58 ++--- radicale/tests/test_base.py | 30 +++ radicale/xmlutils.py | 11 +- 5 files changed, 293 insertions(+), 318 deletions(-) diff --git a/radicale/__init__.py b/radicale/__init__.py index 8cee0de..d382894 100644 --- a/radicale/__init__.py +++ b/radicale/__init__.py @@ -29,6 +29,7 @@ should have been included in this package. import base64 import contextlib import os +import posixpath import pprint import shlex import socket @@ -38,6 +39,7 @@ import subprocess import threading import wsgiref.simple_server import zlib +from contextlib import contextmanager from http import client from urllib.parse import unquote, urlparse @@ -184,60 +186,31 @@ class Application: def collect_allowed_items(self, items, user): """Get items from request that user is allowed to access.""" - read_last_collection_allowed = None - write_last_collection_allowed = None read_allowed_items = [] write_allowed_items = [] - for item in items: if isinstance(item, self.Collection): - if self.authorized(user, item, "r"): - self.logger.debug( - "%s has read access to collection %s" % - (user or "Anonymous", item.path or "/")) - read_last_collection_allowed = True - read_allowed_items.append(item) - else: - self.logger.debug( - "%s has NO read access to collection %s" % - (user or "Anonymous", item.path or "/")) - read_last_collection_allowed = False - - if self.authorized(user, item, "w"): - self.logger.debug( - "%s has write access to collection %s" % - (user or "Anonymous", item.path or "/")) - write_last_collection_allowed = True - write_allowed_items.append(item) - else: - self.logger.debug( - "%s has NO write access to collection %s" % - (user or "Anonymous", item.path or "/")) - write_last_collection_allowed = False + path = item.path else: - # item is not a collection, it's the child of the last - # collection we've met in the loop. Only add this item - # if this last collection was allowed. - if read_last_collection_allowed: - self.logger.debug( - "%s has read access to item %s" % - (user or "Anonymous", item.href)) - read_allowed_items.append(item) - else: - self.logger.debug( - "%s has NO read access to item %s" % - (user or "Anonymous", item.href)) - - if write_last_collection_allowed: - self.logger.debug( - "%s has write access to item %s" % - (user or "Anonymous", item.href)) - write_allowed_items.append(item) - else: - self.logger.debug( - "%s has NO write access to item %s" % - (user or "Anonymous", item.href)) - + path = item.collection.path + if self.authorized(user, path, "r"): + self.logger.debug( + "%s has read access to collection %s" % + (user or "Anonymous", path or "/")) + read_allowed_items.append(item) + else: + self.logger.debug( + "%s has NO read access to collection %s" % + (user or "Anonymous", path or "/")) + if self.authorized(user, path, "w"): + self.logger.debug( + "%s has write access to collection %s" % + (user or "Anonymous", path or "/")) + write_allowed_items.append(item) + else: + self.logger.debug( + "%s has NO write access to collection %s" % + (user or "Anonymous", path or "/")) return read_allowed_items, write_allowed_items def __call__(self, environ, start_response): @@ -305,12 +278,11 @@ class Application: # Create principal collection if user and is_authenticated: principal_path = "/%s/" % user - collection = self.Collection(principal_path, True) - if self.authorized(user, collection, "w"): + if self.authorized(user, principal_path, "w"): with self.Collection.acquire_lock("r"): principal = next( self.Collection.discover(principal_path), None) - if not principal or principal.path != user: + if not principal: with self.Collection.acquire_lock("w"): self.Collection.create_collection(principal_path) @@ -333,34 +305,7 @@ class Application: content = None if is_valid_user: - if function in ( - self.do_GET, self.do_HEAD, self.do_OPTIONS, - self.do_PROPFIND, self.do_REPORT): - lock_mode = "r" - else: - lock_mode = "w" - with self.Collection.acquire_lock(lock_mode): - items = self.Collection.discover( - path, environ.get("HTTP_DEPTH", "0")) - read_allowed_items, write_allowed_items = ( - self.collect_allowed_items(items, user)) - if (read_allowed_items or write_allowed_items or - is_authenticated and function == self.do_PROPFIND or - function == self.do_OPTIONS): - status, headers, answer = function( - environ, read_allowed_items, write_allowed_items, - content, user) - hook = self.configuration.get("storage", "hook") - if lock_mode == "w" and hook: - self.logger.debug("Running hook") - folder = os.path.expanduser( - self.configuration.get( - "storage", "filesystem_folder")) - subprocess.check_call( - hook % {"user": shlex.quote(user or "Anonymous")}, - shell=True, cwd=folder) - else: - status, headers, answer = NOT_ALLOWED + status, headers, answer = function(environ, path, content, user) else: status, headers, answer = NOT_ALLOWED @@ -397,144 +342,154 @@ class Application: return response(status, headers, answer) - def do_DELETE(self, environ, read_collections, write_collections, content, - user): + def _access(self, user, path, permission, item=None): + """Checks if ``user`` can access ``path`` or the parent collection + with ``permission``. + + ``permission`` must either be "r" or "w". + + If ``item`` is given, only access to that class of item is checked. + + """ + path = storage.sanitize_path(path) + parent_path = storage.sanitize_path( + "/%s/" % posixpath.dirname(path.strip("/"))) + allowed = False + if not item or isinstance(item, self.Collection): + allowed |= self.authorized(user, path, permission) + if not item or not isinstance(item, self.Collection): + allowed |= self.authorized(user, parent_path, permission) + return allowed + + @contextmanager + def _lock_collection(self, lock_mode, user): + """Lock the collection with ``permission`` and execute hook.""" + with self.Collection.acquire_lock(lock_mode) as value: + yield value + hook = self.configuration.get("storage", "hook") + if lock_mode == "w" and hook: + self.logger.debug("Running hook") + folder = os.path.expanduser(self.configuration.get( + "storage", "filesystem_folder")) + subprocess.check_call( + hook % {"user": shlex.quote(user or "Anonymous")}, + shell=True, cwd=folder) + + def do_DELETE(self, environ, path, content, user): """Manage DELETE request.""" - if not write_collections: + if not self._access(user, path, "w"): return NOT_ALLOWED - - collection = write_collections[0] - - if collection.path == environ["PATH_INFO"].strip("/"): - # Path matching the collection, the collection must be deleted - item = collection - else: - # Try to get an item matching the path - name = xmlutils.name_from_path(environ["PATH_INFO"], collection) - item = collection.get(name) - - if item: + with self._lock_collection("w", user): + item = next(self.Collection.discover(path, depth="0"), None) + if not self._access(user, path, "w", item): + return NOT_ALLOWED + if not item: + return client.GONE, {}, None if_match = environ.get("HTTP_IF_MATCH", "*") - if if_match in ("*", item.etag): - # No ETag precondition or precondition verified, delete item - answer = xmlutils.delete(environ["PATH_INFO"], collection) - return client.OK, {}, answer + if if_match not in ("*", item.etag): + # ETag precondition not verified, do not delete item + return client.PRECONDITION_FAILED, {}, None + if isinstance(item, self.Collection): + answer = xmlutils.delete(path, item) + else: + answer = xmlutils.delete(path, item.collection, item.href) + return client.OK, {}, answer - # No item or ETag precondition not verified, do not delete item - return client.PRECONDITION_FAILED, {}, None - - def do_GET(self, environ, read_collections, write_collections, content, - user): + def do_GET(self, environ, path, content, user): """Manage GET request.""" # Display a "Radicale works!" message if the root URL is requested - if environ["PATH_INFO"] == "/": + if not path.strip("/"): headers = {"Content-type": "text/html"} answer = "\nRadicaleRadicale works!" return client.OK, headers, answer - - if not read_collections: + if not self._access(user, path, "r"): return NOT_ALLOWED - - collection = read_collections[0] - - item_name = xmlutils.name_from_path(environ["PATH_INFO"], collection) - - if item_name: - # Get collection item - item = collection.get(item_name) - if item: - answer = item.serialize() - etag = item.etag - else: - return client.NOT_FOUND, {}, None - else: - # Get whole collection - answer = collection.serialize() - if answer is None: + with self._lock_collection("r", user): + item = next(self.Collection.discover(path, depth="0"), None) + if not self._access(user, path, "r", item): + return NOT_ALLOWED + if not item: return client.NOT_FOUND, {}, None + if isinstance(item, self.Collection): + collection = item else: - etag = collection.etag - - if answer: + collection = item.collection + content_type = storage.MIMETYPES.get(collection.get_meta("tag"), + "text/plain") headers = { - "Content-Type": storage.MIMETYPES[collection.get_meta("tag")], + "Content-Type": content_type, "Last-Modified": collection.last_modified, - "ETag": etag} - else: - headers = {} - return client.OK, headers, answer + "ETag": item.etag} + answer = item.serialize() + return client.OK, headers, answer - def do_HEAD(self, environ, read_collections, write_collections, content, - user): + def do_HEAD(self, environ, path, content, user): """Manage HEAD request.""" - status, headers, answer = self.do_GET( - environ, read_collections, write_collections, content, user) + status, headers, answer = self.do_GET(environ, path, content, user) return status, headers, None - def do_MKCALENDAR(self, environ, read_collections, write_collections, - content, user): + def do_MKCALENDAR(self, environ, path, content, user): """Manage MKCALENDAR request.""" - if not write_collections: + if not self.authorized(user, path, "w"): return NOT_ALLOWED + with self._lock_collection("w", user): + item = next(self.Collection.discover(path, depth="0"), None) + if item: + return client.CONFLICT, {}, None + props = xmlutils.props_from_request(content) + # TODO: use this? + # timezone = props.get("C:calendar-timezone") + collection = self.Collection.create_collection(path, + tag="VCALENDAR") + for key, value in props.items(): + collection.set_meta(key, value) + return client.CREATED, {}, None - collection = write_collections[0] - - props = xmlutils.props_from_request(content) - # TODO: use this? - # timezone = props.get("C:calendar-timezone") - collection = self.Collection.create_collection( - environ["PATH_INFO"], tag="VCALENDAR") - for key, value in props.items(): - collection.set_meta(key, value) - return client.CREATED, {}, None - - def do_MKCOL(self, environ, read_collections, write_collections, content, - user): + def do_MKCOL(self, environ, path, content, user): """Manage MKCOL request.""" - if not write_collections: + if not self.authorized(user, path, "w"): return NOT_ALLOWED + with self._lock_collection("w", user): + item = next(self.Collection.discover(path, depth="0"), None) + if item: + return client.CONFLICT, {}, None + props = xmlutils.props_from_request(content) + collection = self.Collection.create_collection(path) + for key, value in props.items(): + collection.set_meta(key, value) + return client.CREATED, {}, None - collection = write_collections[0] - - props = xmlutils.props_from_request(content) - collection = self.Collection.create_collection(environ["PATH_INFO"]) - for key, value in props.items(): - collection.set_meta(key, value) - return client.CREATED, {}, None - - def do_MOVE(self, environ, read_collections, write_collections, content, - user): + def do_MOVE(self, environ, path, content, user): """Manage MOVE request.""" - if not write_collections: + to_url = urlparse(environ["HTTP_DESTINATION"]) + if to_url.netloc != environ["HTTP_HOST"]: + # Remote destination server, not supported + return client.BAD_GATEWAY, {}, None + to_path = storage.sanitize_path(to_url.path) + if (not self._access(user, path, "w") or + not self._access(user, to_path, "w")): return NOT_ALLOWED + with self._lock_collection("w", user): + item = next(self.Collection.discover(path, depth="0"), None) + if (not self._access(user, path, "w", item) or + not self._access(user, to_path, "w", item)): + return NOT_ALLOWED + if not item: + return client.GONE, {}, None + to_item = next(self.Collection.discover(to_path, depth="0"), None) + to_parent_path = storage.sanitize_path( + "/%s/" % posixpath.dirname(to_path.strip("/"))) + to_href = posixpath.basename(to_path.strip("/")) + to_collection = next(self.Collection.discover( + to_parent_path, depth="0"), None) + if (isinstance(item, self.Collection) or not to_collection or + to_item or to_path.strip("/").startswith(path.strip("/"))): + return client.CONFLICT, {}, None + to_collection.upload(to_href, item.item) + item.collection.delete(item.href) + return client.CREATED, {}, None - from_collection = write_collections[0] - from_name = xmlutils.name_from_path( - environ["PATH_INFO"], from_collection) - item = from_collection.get(from_name) - if item: - # Move the item - to_url_parts = urlparse(environ["HTTP_DESTINATION"]) - if to_url_parts.netloc == environ["HTTP_HOST"]: - to_url = to_url_parts.path - to_path, to_name = to_url.rstrip("/").rsplit("/", 1) - for to_collection in self.Collection.discover( - to_path, depth="0"): - if to_collection in write_collections: - to_collection.upload(to_name, item) - from_collection.delete(from_name) - return client.CREATED, {}, None - else: - return NOT_ALLOWED - else: - # Remote destination server, not supported - return client.BAD_GATEWAY, {}, None - else: - # No item found - return client.GONE, {}, None - - def do_OPTIONS(self, environ, read_collections, write_collections, - content, user): + def do_OPTIONS(self, environ, path, content, user): """Manage OPTIONS request.""" headers = { "Allow": ("DELETE, HEAD, GET, MKCALENDAR, MKCOL, MOVE, " @@ -542,94 +497,97 @@ class Application: "DAV": "1, 2, 3, calendar-access, addressbook, extended-mkcol"} return client.OK, headers, None - def do_PROPFIND(self, environ, read_collections, write_collections, - content, user): + def do_PROPFIND(self, environ, path, content, user): """Manage PROPFIND request.""" - if not read_collections: - return (client.NOT_FOUND, {}, None) if user else NOT_ALLOWED - headers = { - "DAV": "1, 2, 3, calendar-access, addressbook, extended-mkcol", - "Content-Type": "text/xml"} - answer = xmlutils.propfind( - environ["PATH_INFO"], content, read_collections, write_collections, - user) - return client.MULTI_STATUS, headers, answer + with self._lock_collection("r", user): + items = self.Collection.discover(path, + environ.get("HTTP_DEPTH", "0")) + read_items, write_items = self.collect_allowed_items(items, user) + if not read_items and not write_items: + return (client.NOT_FOUND, {}, None) if user else NOT_ALLOWED + headers = { + "DAV": "1, 2, 3, calendar-access, addressbook, extended-mkcol", + "Content-Type": "text/xml"} + answer = xmlutils.propfind( + path, content, read_items, write_items, user) + return client.MULTI_STATUS, headers, answer - def do_PROPPATCH(self, environ, read_collections, write_collections, - content, user): + def do_PROPPATCH(self, environ, path, content, user): """Manage PROPPATCH request.""" - if not write_collections: + if not self.authorized(user, path, "w"): return NOT_ALLOWED + with self._lock_collection("w", user): + item = next(self.Collection.discover(path, depth="0"), None) + if not isinstance(item, self.Collection): + return client.CONFLICT, {}, None + answer = xmlutils.proppatch(path, content, item) + headers = { + "DAV": "1, 2, 3, calendar-access, addressbook, extended-mkcol", + "Content-Type": "text/xml"} + return client.MULTI_STATUS, headers, answer - collection = write_collections[0] - - answer = xmlutils.proppatch(environ["PATH_INFO"], content, collection) - headers = { - "DAV": "1, 2, 3, calendar-access, addressbook, extended-mkcol", - "Content-Type": "text/xml"} - return client.MULTI_STATUS, headers, answer - - def do_PUT(self, environ, read_collections, write_collections, content, - user): + def do_PUT(self, environ, path, content, user): """Manage PUT request.""" - if not write_collections: + if not self._access(user, path, "w"): return NOT_ALLOWED - - collection = write_collections[0] - - content_type = environ.get("CONTENT_TYPE") - if content_type: - tags = {value: key for key, value in storage.MIMETYPES.items()} - tag = tags.get(content_type.split(";")[0]) - if tag: - collection.set_meta("tag", tag) - headers = {} - item_name = xmlutils.name_from_path(environ["PATH_INFO"], collection) - item = collection.get(item_name) - - etag = environ.get("HTTP_IF_MATCH", "") - match = environ.get("HTTP_IF_NONE_MATCH", "") == "*" - if (not item and not etag) or ( - item and ((etag or item.etag) == item.etag) and not match): - # PUT allowed in 3 cases - # Case 1: No item and no ETag precondition: Add new item - # Case 2: Item and ETag precondition verified: Modify item - # Case 3: Item and no Etag precondition: Force modifying item + with self._lock_collection("w", user): + parent_path = storage.sanitize_path( + "/%s/" % posixpath.dirname(path.strip("/"))) + item = next(self.Collection.discover(path, depth="0"), None) + parent_item = next(self.Collection.discover( + parent_path, depth="0"), None) + write_whole_collection = ( + isinstance(item, self.Collection) or + not parent_item or + not next(parent_item.list(), None) and + parent_item.get_meta("tag") not in ("VADDRESSBOOK", + "VCALENDAR")) + if (write_whole_collection and + not self.authorized(user, path, "w") or + not write_whole_collection and + not self.authorized(user, parent_path, "w")): + return NOT_ALLOWED + etag = environ.get("HTTP_IF_MATCH", "") + match = environ.get("HTTP_IF_NONE_MATCH", "") == "*" + if ((not item and etag) or (item and etag and item.etag != etag) or + (item and match)): + return client.PRECONDITION_FAILED, {}, None items = list(vobject.readComponents(content or "")) - if item: - # PUT is modifying an existing item - if items: - new_item = collection.update(item_name, items[0]) - else: - new_item = None - elif item_name: - # PUT is adding a new item - if items: - new_item = collection.upload(item_name, items[0]) - else: - new_item = None + content_type = environ.get("CONTENT_TYPE") + tag = None + if content_type: + tags = {value: key for key, value in storage.MIMETYPES.items()} + tag = tags.get(content_type.split(";")[0]) + if write_whole_collection: + if item: + # Delete old collection + item.delete() + new_item = self.Collection.create_collection(path, items, tag) else: - # PUT is replacing the whole collection - collection.delete() - new_item = self.Collection.create_collection( - environ["PATH_INFO"], items) - if new_item: - headers["ETag"] = new_item.etag - status = client.CREATED - else: - # PUT rejected in all other cases - status = client.PRECONDITION_FAILED - return status, headers, None + if tag: + parent_item.set_meta("tag", tag) + href = posixpath.basename(path.strip("/")) + if item: + new_item = parent_item.update(href, items[0]) + else: + new_item = parent_item.upload(href, items[0]) + headers = {"ETag": new_item.etag} + return client.CREATED, headers, None - def do_REPORT(self, environ, read_collections, write_collections, content, - user): + def do_REPORT(self, environ, path, content, user): """Manage REPORT request.""" - if not read_collections: + if not self._access(user, path, "w"): return NOT_ALLOWED - - collection = read_collections[0] - - headers = {"Content-Type": "text/xml"} - - answer = xmlutils.report(environ["PATH_INFO"], content, collection) - return client.MULTI_STATUS, headers, answer + with self._lock_collection("r", user): + item = next(self.Collection.discover(path, depth="0"), None) + if not self._access(user, path, "w", item): + return NOT_ALLOWED + if not item: + return client.NOT_FOUND, {}, None + if isinstance(item, self.Collection): + collection = item + else: + collection = item.collection + headers = {"Content-Type": "text/xml"} + answer = xmlutils.report(path, content, collection) + return client.MULTI_STATUS, headers, answer diff --git a/radicale/rights.py b/radicale/rights.py index c70ad46..95e66e6 100644 --- a/radicale/rights.py +++ b/radicale/rights.py @@ -107,19 +107,17 @@ class Rights(BaseRights): self.filename = os.path.expanduser(configuration.get("rights", "file")) self.rights_type = configuration.get("rights", "type").lower() - def authorized(self, user, collection, permission): + def authorized(self, user, path, permission): user = user or "" if user and not storage.is_safe_path_component(user): # Prevent usernames like "user/calendar.ics" raise ValueError("Unsafe username") - collection_url = collection.path.rstrip("/") - if collection_url in (".well-known/carddav", ".well-known/caldav"): - return permission == "r" + sane_path = storage.sanitize_path(path).strip("/") # Prevent "regex injection" user_escaped = re.escape(user) - collection_url_escaped = re.escape(collection_url) + sane_path_escaped = re.escape(sane_path) regex = ConfigParser( - {"login": user_escaped, "path": collection_url_escaped}) + {"login": user_escaped, "path": sane_path_escaped}) if self.rights_type in DEFINED_RIGHTS: self.logger.debug("Rights type '%s'" % self.rights_type) regex.readfp(StringIO(DEFINED_RIGHTS[self.rights_type])) @@ -135,11 +133,11 @@ class Rights(BaseRights): re_collection = regex.get(section, "collection") self.logger.debug( "Test if '%s:%s' matches against '%s:%s' from section '%s'" % ( - user, collection_url, re_user, re_collection, section)) + user, sane_path, re_user, re_collection, section)) user_match = re.fullmatch(re_user, user) if user_match: re_collection = re_collection.format(*user_match.groups()) - if re.fullmatch(re_collection, collection_url): + if re.fullmatch(re_collection, sane_path): self.logger.debug("Section '%s' matches" % section) return permission in regex.get(section, "permission") else: diff --git a/radicale/storage.py b/radicale/storage.py index 10cffc1..20c358f 100644 --- a/radicale/storage.py +++ b/radicale/storage.py @@ -211,9 +211,7 @@ class BaseCollection: returned. If ``depth`` is anything but "0", it is considered as "1" and direct - children are included in the result. If ``include_container`` is - ``True`` (the default), the containing object is included in the - result. + children are included in the result. The ``path`` is relative. @@ -368,41 +366,39 @@ class Collection(BaseCollection): attributes = sane_path.split("/") if not attributes[0]: attributes.pop() - - # Try to guess if the path leads to a collection or an item folder = os.path.expanduser( cls.configuration.get("storage", "filesystem_folder")) - if not os.path.isdir(path_to_filesystem(folder, sane_path)): - # path is not a collection - if attributes and os.path.isfile(path_to_filesystem(folder, - sane_path)): - # path is an item - attributes.pop() - elif attributes and os.path.isdir(path_to_filesystem( - folder, *attributes[:-1])): - # path parent is a collection - attributes.pop() - # TODO: else: return? + try: + filesystem_path = path_to_filesystem(folder, sane_path) + except ValueError: + # Path is unsafe + return + href = None + if not os.path.isdir(filesystem_path): + if attributes and os.path.isfile(filesystem_path): + href = attributes.pop() + else: + return path = "/".join(attributes) - principal = len(attributes) == 1 collection = cls(path, principal) + if href: + yield collection.get(href) + return yield collection - if depth != "0": - # TODO: fix this - items = list(collection.list()) - if items: - for item in items: - yield collection.get(item[0]) - _, directories, _ = next(os.walk(collection._filesystem_path)) - for sub_path in directories: - if not is_safe_filesystem_path_component(sub_path): - cls.logger.debug("Skipping collection: %s", sub_path) - continue - full_path = os.path.join(collection._filesystem_path, sub_path) - if os.path.exists(full_path): - yield cls(posixpath.join(path, sub_path)) + if depth == "0": + return + for item in collection.list(): + yield collection.get(item[0]) + for href in os.listdir(filesystem_path): + if not is_safe_filesystem_path_component(href): + cls.logger.debug("Skipping collection: %s", href) + continue + child_filesystem_path = path_to_filesystem(filesystem_path, href) + if os.path.isdir(child_filesystem_path): + child_principal = len(attributes) == 0 + yield cls(child_filesystem_path, child_principal) @classmethod def create_collection(cls, href, collection=None, tag=None): diff --git a/radicale/tests/test_base.py b/radicale/tests/test_base.py index 84bad8f..f846f0d 100644 --- a/radicale/tests/test_base.py +++ b/radicale/tests/test_base.py @@ -133,6 +133,36 @@ class BaseRequests: status, headers, answer = self.request("GET", "/calendar.ics/") assert "VEVENT" not in answer + def test_mkcalendar(self): + """Make a calendar.""" + self.request("MKCALENDAR", "/calendar.ics/") + status, headers, answer = self.request("GET", "/calendar.ics/") + assert status == 200 + + def test_move(self): + """Move a item.""" + self.request("MKCALENDAR", "/calendar.ics/") + event = get_file_content("event1.ics") + path1 = "/calendar.ics/event1.ics" + path2 = "/calendar.ics/event2.ics" + status, headers, answer = self.request("PUT", path1, event) + status, headers, answer = self.request( + "MOVE", path1, HTTP_DESTINATION=path2, HTTP_HOST="") + assert status == 201 + status, headers, answer = self.request("GET", path1) + assert status == 404 + status, headers, answer = self.request("GET", path2) + assert status == 200 + + def test_head(self): + status, headers, answer = self.request("HEAD", "/") + assert status == 200 + + def test_options(self): + status, headers, answer = self.request("OPTIONS", "/") + assert status == 200 + assert "DAV" in headers + def test_multiple_events_with_same_uid(self): """Add two events with the same UID.""" self.request("MKCOL", "/calendar.ics/") diff --git a/radicale/xmlutils.py b/radicale/xmlutils.py index 9134102..a48a3e5 100644 --- a/radicale/xmlutils.py +++ b/radicale/xmlutils.py @@ -472,20 +472,13 @@ def props_from_request(root, actions=("set", "remove")): return result -def delete(path, collection): +def delete(path, collection, href=None): """Read and answer DELETE requests. Read rfc4918-9.6 for info. """ - # Reading request - if collection.path == path.strip("/"): - # Delete the whole collection - collection.delete() - else: - # Remove an item from the collection - collection.delete(name_from_path(path, collection)) - + collection.delete(href) # Writing answer multistatus = ET.Element(_tag("D", "multistatus")) response = ET.Element(_tag("D", "response"))