Improve rights checking and request handlers

* Access rights are checked before the storage is locked and
    collections are loaded.
  * DELETE sends 410 instead of doing nothing or crashing if the target
    doesn't exist.
  * GET always returns 404 if the target doesn't exist.
  * GET doesn't crash if a collection without tag property is requested.
  * MKCOL and MKCALENDAR send 409 if the target already exists.
  * MOVE checks if the target collection of an item actually exists and
    sends 409 otherwise.
  * PUT doesn't crash if a whole collection that doesn't exist yet is
    uploaded and ``content-type`` is ``text/vcard`` or
    ``text/calendar``.
  * PUT distinguishes between simple items and whole collections by the
    following criteria: Target is a collection; Parent exists; Parent
    has the tag property set; Parent contains other items. Before only
    the first two criteria where used, which was very unrelieable. #384
  * PROPPATCH is only allowed on collections and 409 is send otherwise.
  * ``Rights.authorized`` takes a path instead of a collection.
  * ``Collection.discover`` only returns items in ``path``, that
    actually exist. #442
This commit is contained in:
Unrud 2016-08-04 06:08:08 +02:00
parent b71664b322
commit 066b5994d1
5 changed files with 293 additions and 318 deletions

View File

@ -29,6 +29,7 @@ should have been included in this package.
import base64 import base64
import contextlib import contextlib
import os import os
import posixpath
import pprint import pprint
import shlex import shlex
import socket import socket
@ -38,6 +39,7 @@ import subprocess
import threading import threading
import wsgiref.simple_server import wsgiref.simple_server
import zlib import zlib
from contextlib import contextmanager
from http import client from http import client
from urllib.parse import unquote, urlparse from urllib.parse import unquote, urlparse
@ -184,60 +186,31 @@ class Application:
def collect_allowed_items(self, items, user): def collect_allowed_items(self, items, user):
"""Get items from request that user is allowed to access.""" """Get items from request that user is allowed to access."""
read_last_collection_allowed = None
write_last_collection_allowed = None
read_allowed_items = [] read_allowed_items = []
write_allowed_items = [] write_allowed_items = []
for item in items: for item in items:
if isinstance(item, self.Collection): if isinstance(item, self.Collection):
if self.authorized(user, item, "r"): path = item.path
self.logger.debug(
"%s has read access to collection %s" %
(user or "Anonymous", item.path or "/"))
read_last_collection_allowed = True
read_allowed_items.append(item)
else:
self.logger.debug(
"%s has NO read access to collection %s" %
(user or "Anonymous", item.path or "/"))
read_last_collection_allowed = False
if self.authorized(user, item, "w"):
self.logger.debug(
"%s has write access to collection %s" %
(user or "Anonymous", item.path or "/"))
write_last_collection_allowed = True
write_allowed_items.append(item)
else:
self.logger.debug(
"%s has NO write access to collection %s" %
(user or "Anonymous", item.path or "/"))
write_last_collection_allowed = False
else: else:
# item is not a collection, it's the child of the last path = item.collection.path
# collection we've met in the loop. Only add this item if self.authorized(user, path, "r"):
# if this last collection was allowed. self.logger.debug(
if read_last_collection_allowed: "%s has read access to collection %s" %
self.logger.debug( (user or "Anonymous", path or "/"))
"%s has read access to item %s" % read_allowed_items.append(item)
(user or "Anonymous", item.href)) else:
read_allowed_items.append(item) self.logger.debug(
else: "%s has NO read access to collection %s" %
self.logger.debug( (user or "Anonymous", path or "/"))
"%s has NO read access to item %s" % if self.authorized(user, path, "w"):
(user or "Anonymous", item.href)) self.logger.debug(
"%s has write access to collection %s" %
if write_last_collection_allowed: (user or "Anonymous", path or "/"))
self.logger.debug( write_allowed_items.append(item)
"%s has write access to item %s" % else:
(user or "Anonymous", item.href)) self.logger.debug(
write_allowed_items.append(item) "%s has NO write access to collection %s" %
else: (user or "Anonymous", path or "/"))
self.logger.debug(
"%s has NO write access to item %s" %
(user or "Anonymous", item.href))
return read_allowed_items, write_allowed_items return read_allowed_items, write_allowed_items
def __call__(self, environ, start_response): def __call__(self, environ, start_response):
@ -305,12 +278,11 @@ class Application:
# Create principal collection # Create principal collection
if user and is_authenticated: if user and is_authenticated:
principal_path = "/%s/" % user principal_path = "/%s/" % user
collection = self.Collection(principal_path, True) if self.authorized(user, principal_path, "w"):
if self.authorized(user, collection, "w"):
with self.Collection.acquire_lock("r"): with self.Collection.acquire_lock("r"):
principal = next( principal = next(
self.Collection.discover(principal_path), None) self.Collection.discover(principal_path), None)
if not principal or principal.path != user: if not principal:
with self.Collection.acquire_lock("w"): with self.Collection.acquire_lock("w"):
self.Collection.create_collection(principal_path) self.Collection.create_collection(principal_path)
@ -333,34 +305,7 @@ class Application:
content = None content = None
if is_valid_user: if is_valid_user:
if function in ( status, headers, answer = function(environ, path, content, user)
self.do_GET, self.do_HEAD, self.do_OPTIONS,
self.do_PROPFIND, self.do_REPORT):
lock_mode = "r"
else:
lock_mode = "w"
with self.Collection.acquire_lock(lock_mode):
items = self.Collection.discover(
path, environ.get("HTTP_DEPTH", "0"))
read_allowed_items, write_allowed_items = (
self.collect_allowed_items(items, user))
if (read_allowed_items or write_allowed_items or
is_authenticated and function == self.do_PROPFIND or
function == self.do_OPTIONS):
status, headers, answer = function(
environ, read_allowed_items, write_allowed_items,
content, user)
hook = self.configuration.get("storage", "hook")
if lock_mode == "w" and hook:
self.logger.debug("Running hook")
folder = os.path.expanduser(
self.configuration.get(
"storage", "filesystem_folder"))
subprocess.check_call(
hook % {"user": shlex.quote(user or "Anonymous")},
shell=True, cwd=folder)
else:
status, headers, answer = NOT_ALLOWED
else: else:
status, headers, answer = NOT_ALLOWED status, headers, answer = NOT_ALLOWED
@ -397,144 +342,154 @@ class Application:
return response(status, headers, answer) return response(status, headers, answer)
def do_DELETE(self, environ, read_collections, write_collections, content, def _access(self, user, path, permission, item=None):
user): """Checks if ``user`` can access ``path`` or the parent collection
with ``permission``.
``permission`` must either be "r" or "w".
If ``item`` is given, only access to that class of item is checked.
"""
path = storage.sanitize_path(path)
parent_path = storage.sanitize_path(
"/%s/" % posixpath.dirname(path.strip("/")))
allowed = False
if not item or isinstance(item, self.Collection):
allowed |= self.authorized(user, path, permission)
if not item or not isinstance(item, self.Collection):
allowed |= self.authorized(user, parent_path, permission)
return allowed
@contextmanager
def _lock_collection(self, lock_mode, user):
"""Lock the collection with ``permission`` and execute hook."""
with self.Collection.acquire_lock(lock_mode) as value:
yield value
hook = self.configuration.get("storage", "hook")
if lock_mode == "w" and hook:
self.logger.debug("Running hook")
folder = os.path.expanduser(self.configuration.get(
"storage", "filesystem_folder"))
subprocess.check_call(
hook % {"user": shlex.quote(user or "Anonymous")},
shell=True, cwd=folder)
def do_DELETE(self, environ, path, content, user):
"""Manage DELETE request.""" """Manage DELETE request."""
if not write_collections: if not self._access(user, path, "w"):
return NOT_ALLOWED return NOT_ALLOWED
with self._lock_collection("w", user):
collection = write_collections[0] item = next(self.Collection.discover(path, depth="0"), None)
if not self._access(user, path, "w", item):
if collection.path == environ["PATH_INFO"].strip("/"): return NOT_ALLOWED
# Path matching the collection, the collection must be deleted if not item:
item = collection return client.GONE, {}, None
else:
# Try to get an item matching the path
name = xmlutils.name_from_path(environ["PATH_INFO"], collection)
item = collection.get(name)
if item:
if_match = environ.get("HTTP_IF_MATCH", "*") if_match = environ.get("HTTP_IF_MATCH", "*")
if if_match in ("*", item.etag): if if_match not in ("*", item.etag):
# No ETag precondition or precondition verified, delete item # ETag precondition not verified, do not delete item
answer = xmlutils.delete(environ["PATH_INFO"], collection) return client.PRECONDITION_FAILED, {}, None
return client.OK, {}, answer if isinstance(item, self.Collection):
answer = xmlutils.delete(path, item)
else:
answer = xmlutils.delete(path, item.collection, item.href)
return client.OK, {}, answer
# No item or ETag precondition not verified, do not delete item def do_GET(self, environ, path, content, user):
return client.PRECONDITION_FAILED, {}, None
def do_GET(self, environ, read_collections, write_collections, content,
user):
"""Manage GET request.""" """Manage GET request."""
# Display a "Radicale works!" message if the root URL is requested # Display a "Radicale works!" message if the root URL is requested
if environ["PATH_INFO"] == "/": if not path.strip("/"):
headers = {"Content-type": "text/html"} headers = {"Content-type": "text/html"}
answer = "<!DOCTYPE html>\n<title>Radicale</title>Radicale works!" answer = "<!DOCTYPE html>\n<title>Radicale</title>Radicale works!"
return client.OK, headers, answer return client.OK, headers, answer
if not self._access(user, path, "r"):
if not read_collections:
return NOT_ALLOWED return NOT_ALLOWED
with self._lock_collection("r", user):
collection = read_collections[0] item = next(self.Collection.discover(path, depth="0"), None)
if not self._access(user, path, "r", item):
item_name = xmlutils.name_from_path(environ["PATH_INFO"], collection) return NOT_ALLOWED
if not item:
if item_name:
# Get collection item
item = collection.get(item_name)
if item:
answer = item.serialize()
etag = item.etag
else:
return client.NOT_FOUND, {}, None
else:
# Get whole collection
answer = collection.serialize()
if answer is None:
return client.NOT_FOUND, {}, None return client.NOT_FOUND, {}, None
if isinstance(item, self.Collection):
collection = item
else: else:
etag = collection.etag collection = item.collection
content_type = storage.MIMETYPES.get(collection.get_meta("tag"),
if answer: "text/plain")
headers = { headers = {
"Content-Type": storage.MIMETYPES[collection.get_meta("tag")], "Content-Type": content_type,
"Last-Modified": collection.last_modified, "Last-Modified": collection.last_modified,
"ETag": etag} "ETag": item.etag}
else: answer = item.serialize()
headers = {} return client.OK, headers, answer
return client.OK, headers, answer
def do_HEAD(self, environ, read_collections, write_collections, content, def do_HEAD(self, environ, path, content, user):
user):
"""Manage HEAD request.""" """Manage HEAD request."""
status, headers, answer = self.do_GET( status, headers, answer = self.do_GET(environ, path, content, user)
environ, read_collections, write_collections, content, user)
return status, headers, None return status, headers, None
def do_MKCALENDAR(self, environ, read_collections, write_collections, def do_MKCALENDAR(self, environ, path, content, user):
content, user):
"""Manage MKCALENDAR request.""" """Manage MKCALENDAR request."""
if not write_collections: if not self.authorized(user, path, "w"):
return NOT_ALLOWED return NOT_ALLOWED
with self._lock_collection("w", user):
item = next(self.Collection.discover(path, depth="0"), None)
if item:
return client.CONFLICT, {}, None
props = xmlutils.props_from_request(content)
# TODO: use this?
# timezone = props.get("C:calendar-timezone")
collection = self.Collection.create_collection(path,
tag="VCALENDAR")
for key, value in props.items():
collection.set_meta(key, value)
return client.CREATED, {}, None
collection = write_collections[0] def do_MKCOL(self, environ, path, content, user):
props = xmlutils.props_from_request(content)
# TODO: use this?
# timezone = props.get("C:calendar-timezone")
collection = self.Collection.create_collection(
environ["PATH_INFO"], tag="VCALENDAR")
for key, value in props.items():
collection.set_meta(key, value)
return client.CREATED, {}, None
def do_MKCOL(self, environ, read_collections, write_collections, content,
user):
"""Manage MKCOL request.""" """Manage MKCOL request."""
if not write_collections: if not self.authorized(user, path, "w"):
return NOT_ALLOWED return NOT_ALLOWED
with self._lock_collection("w", user):
item = next(self.Collection.discover(path, depth="0"), None)
if item:
return client.CONFLICT, {}, None
props = xmlutils.props_from_request(content)
collection = self.Collection.create_collection(path)
for key, value in props.items():
collection.set_meta(key, value)
return client.CREATED, {}, None
collection = write_collections[0] def do_MOVE(self, environ, path, content, user):
props = xmlutils.props_from_request(content)
collection = self.Collection.create_collection(environ["PATH_INFO"])
for key, value in props.items():
collection.set_meta(key, value)
return client.CREATED, {}, None
def do_MOVE(self, environ, read_collections, write_collections, content,
user):
"""Manage MOVE request.""" """Manage MOVE request."""
if not write_collections: to_url = urlparse(environ["HTTP_DESTINATION"])
if to_url.netloc != environ["HTTP_HOST"]:
# Remote destination server, not supported
return client.BAD_GATEWAY, {}, None
to_path = storage.sanitize_path(to_url.path)
if (not self._access(user, path, "w") or
not self._access(user, to_path, "w")):
return NOT_ALLOWED return NOT_ALLOWED
with self._lock_collection("w", user):
item = next(self.Collection.discover(path, depth="0"), None)
if (not self._access(user, path, "w", item) or
not self._access(user, to_path, "w", item)):
return NOT_ALLOWED
if not item:
return client.GONE, {}, None
to_item = next(self.Collection.discover(to_path, depth="0"), None)
to_parent_path = storage.sanitize_path(
"/%s/" % posixpath.dirname(to_path.strip("/")))
to_href = posixpath.basename(to_path.strip("/"))
to_collection = next(self.Collection.discover(
to_parent_path, depth="0"), None)
if (isinstance(item, self.Collection) or not to_collection or
to_item or to_path.strip("/").startswith(path.strip("/"))):
return client.CONFLICT, {}, None
to_collection.upload(to_href, item.item)
item.collection.delete(item.href)
return client.CREATED, {}, None
from_collection = write_collections[0] def do_OPTIONS(self, environ, path, content, user):
from_name = xmlutils.name_from_path(
environ["PATH_INFO"], from_collection)
item = from_collection.get(from_name)
if item:
# Move the item
to_url_parts = urlparse(environ["HTTP_DESTINATION"])
if to_url_parts.netloc == environ["HTTP_HOST"]:
to_url = to_url_parts.path
to_path, to_name = to_url.rstrip("/").rsplit("/", 1)
for to_collection in self.Collection.discover(
to_path, depth="0"):
if to_collection in write_collections:
to_collection.upload(to_name, item)
from_collection.delete(from_name)
return client.CREATED, {}, None
else:
return NOT_ALLOWED
else:
# Remote destination server, not supported
return client.BAD_GATEWAY, {}, None
else:
# No item found
return client.GONE, {}, None
def do_OPTIONS(self, environ, read_collections, write_collections,
content, user):
"""Manage OPTIONS request.""" """Manage OPTIONS request."""
headers = { headers = {
"Allow": ("DELETE, HEAD, GET, MKCALENDAR, MKCOL, MOVE, " "Allow": ("DELETE, HEAD, GET, MKCALENDAR, MKCOL, MOVE, "
@ -542,94 +497,97 @@ class Application:
"DAV": "1, 2, 3, calendar-access, addressbook, extended-mkcol"} "DAV": "1, 2, 3, calendar-access, addressbook, extended-mkcol"}
return client.OK, headers, None return client.OK, headers, None
def do_PROPFIND(self, environ, read_collections, write_collections, def do_PROPFIND(self, environ, path, content, user):
content, user):
"""Manage PROPFIND request.""" """Manage PROPFIND request."""
if not read_collections: with self._lock_collection("r", user):
return (client.NOT_FOUND, {}, None) if user else NOT_ALLOWED items = self.Collection.discover(path,
headers = { environ.get("HTTP_DEPTH", "0"))
"DAV": "1, 2, 3, calendar-access, addressbook, extended-mkcol", read_items, write_items = self.collect_allowed_items(items, user)
"Content-Type": "text/xml"} if not read_items and not write_items:
answer = xmlutils.propfind( return (client.NOT_FOUND, {}, None) if user else NOT_ALLOWED
environ["PATH_INFO"], content, read_collections, write_collections, headers = {
user) "DAV": "1, 2, 3, calendar-access, addressbook, extended-mkcol",
return client.MULTI_STATUS, headers, answer "Content-Type": "text/xml"}
answer = xmlutils.propfind(
path, content, read_items, write_items, user)
return client.MULTI_STATUS, headers, answer
def do_PROPPATCH(self, environ, read_collections, write_collections, def do_PROPPATCH(self, environ, path, content, user):
content, user):
"""Manage PROPPATCH request.""" """Manage PROPPATCH request."""
if not write_collections: if not self.authorized(user, path, "w"):
return NOT_ALLOWED return NOT_ALLOWED
with self._lock_collection("w", user):
item = next(self.Collection.discover(path, depth="0"), None)
if not isinstance(item, self.Collection):
return client.CONFLICT, {}, None
answer = xmlutils.proppatch(path, content, item)
headers = {
"DAV": "1, 2, 3, calendar-access, addressbook, extended-mkcol",
"Content-Type": "text/xml"}
return client.MULTI_STATUS, headers, answer
collection = write_collections[0] def do_PUT(self, environ, path, content, user):
answer = xmlutils.proppatch(environ["PATH_INFO"], content, collection)
headers = {
"DAV": "1, 2, 3, calendar-access, addressbook, extended-mkcol",
"Content-Type": "text/xml"}
return client.MULTI_STATUS, headers, answer
def do_PUT(self, environ, read_collections, write_collections, content,
user):
"""Manage PUT request.""" """Manage PUT request."""
if not write_collections: if not self._access(user, path, "w"):
return NOT_ALLOWED return NOT_ALLOWED
with self._lock_collection("w", user):
collection = write_collections[0] parent_path = storage.sanitize_path(
"/%s/" % posixpath.dirname(path.strip("/")))
content_type = environ.get("CONTENT_TYPE") item = next(self.Collection.discover(path, depth="0"), None)
if content_type: parent_item = next(self.Collection.discover(
tags = {value: key for key, value in storage.MIMETYPES.items()} parent_path, depth="0"), None)
tag = tags.get(content_type.split(";")[0]) write_whole_collection = (
if tag: isinstance(item, self.Collection) or
collection.set_meta("tag", tag) not parent_item or
headers = {} not next(parent_item.list(), None) and
item_name = xmlutils.name_from_path(environ["PATH_INFO"], collection) parent_item.get_meta("tag") not in ("VADDRESSBOOK",
item = collection.get(item_name) "VCALENDAR"))
if (write_whole_collection and
etag = environ.get("HTTP_IF_MATCH", "") not self.authorized(user, path, "w") or
match = environ.get("HTTP_IF_NONE_MATCH", "") == "*" not write_whole_collection and
if (not item and not etag) or ( not self.authorized(user, parent_path, "w")):
item and ((etag or item.etag) == item.etag) and not match): return NOT_ALLOWED
# PUT allowed in 3 cases etag = environ.get("HTTP_IF_MATCH", "")
# Case 1: No item and no ETag precondition: Add new item match = environ.get("HTTP_IF_NONE_MATCH", "") == "*"
# Case 2: Item and ETag precondition verified: Modify item if ((not item and etag) or (item and etag and item.etag != etag) or
# Case 3: Item and no Etag precondition: Force modifying item (item and match)):
return client.PRECONDITION_FAILED, {}, None
items = list(vobject.readComponents(content or "")) items = list(vobject.readComponents(content or ""))
if item: content_type = environ.get("CONTENT_TYPE")
# PUT is modifying an existing item tag = None
if items: if content_type:
new_item = collection.update(item_name, items[0]) tags = {value: key for key, value in storage.MIMETYPES.items()}
else: tag = tags.get(content_type.split(";")[0])
new_item = None if write_whole_collection:
elif item_name: if item:
# PUT is adding a new item # Delete old collection
if items: item.delete()
new_item = collection.upload(item_name, items[0]) new_item = self.Collection.create_collection(path, items, tag)
else:
new_item = None
else: else:
# PUT is replacing the whole collection if tag:
collection.delete() parent_item.set_meta("tag", tag)
new_item = self.Collection.create_collection( href = posixpath.basename(path.strip("/"))
environ["PATH_INFO"], items) if item:
if new_item: new_item = parent_item.update(href, items[0])
headers["ETag"] = new_item.etag else:
status = client.CREATED new_item = parent_item.upload(href, items[0])
else: headers = {"ETag": new_item.etag}
# PUT rejected in all other cases return client.CREATED, headers, None
status = client.PRECONDITION_FAILED
return status, headers, None
def do_REPORT(self, environ, read_collections, write_collections, content, def do_REPORT(self, environ, path, content, user):
user):
"""Manage REPORT request.""" """Manage REPORT request."""
if not read_collections: if not self._access(user, path, "w"):
return NOT_ALLOWED return NOT_ALLOWED
with self._lock_collection("r", user):
collection = read_collections[0] item = next(self.Collection.discover(path, depth="0"), None)
if not self._access(user, path, "w", item):
headers = {"Content-Type": "text/xml"} return NOT_ALLOWED
if not item:
answer = xmlutils.report(environ["PATH_INFO"], content, collection) return client.NOT_FOUND, {}, None
return client.MULTI_STATUS, headers, answer if isinstance(item, self.Collection):
collection = item
else:
collection = item.collection
headers = {"Content-Type": "text/xml"}
answer = xmlutils.report(path, content, collection)
return client.MULTI_STATUS, headers, answer

View File

@ -107,19 +107,17 @@ class Rights(BaseRights):
self.filename = os.path.expanduser(configuration.get("rights", "file")) self.filename = os.path.expanduser(configuration.get("rights", "file"))
self.rights_type = configuration.get("rights", "type").lower() self.rights_type = configuration.get("rights", "type").lower()
def authorized(self, user, collection, permission): def authorized(self, user, path, permission):
user = user or "" user = user or ""
if user and not storage.is_safe_path_component(user): if user and not storage.is_safe_path_component(user):
# Prevent usernames like "user/calendar.ics" # Prevent usernames like "user/calendar.ics"
raise ValueError("Unsafe username") raise ValueError("Unsafe username")
collection_url = collection.path.rstrip("/") sane_path = storage.sanitize_path(path).strip("/")
if collection_url in (".well-known/carddav", ".well-known/caldav"):
return permission == "r"
# Prevent "regex injection" # Prevent "regex injection"
user_escaped = re.escape(user) user_escaped = re.escape(user)
collection_url_escaped = re.escape(collection_url) sane_path_escaped = re.escape(sane_path)
regex = ConfigParser( regex = ConfigParser(
{"login": user_escaped, "path": collection_url_escaped}) {"login": user_escaped, "path": sane_path_escaped})
if self.rights_type in DEFINED_RIGHTS: if self.rights_type in DEFINED_RIGHTS:
self.logger.debug("Rights type '%s'" % self.rights_type) self.logger.debug("Rights type '%s'" % self.rights_type)
regex.readfp(StringIO(DEFINED_RIGHTS[self.rights_type])) regex.readfp(StringIO(DEFINED_RIGHTS[self.rights_type]))
@ -135,11 +133,11 @@ class Rights(BaseRights):
re_collection = regex.get(section, "collection") re_collection = regex.get(section, "collection")
self.logger.debug( self.logger.debug(
"Test if '%s:%s' matches against '%s:%s' from section '%s'" % ( "Test if '%s:%s' matches against '%s:%s' from section '%s'" % (
user, collection_url, re_user, re_collection, section)) user, sane_path, re_user, re_collection, section))
user_match = re.fullmatch(re_user, user) user_match = re.fullmatch(re_user, user)
if user_match: if user_match:
re_collection = re_collection.format(*user_match.groups()) re_collection = re_collection.format(*user_match.groups())
if re.fullmatch(re_collection, collection_url): if re.fullmatch(re_collection, sane_path):
self.logger.debug("Section '%s' matches" % section) self.logger.debug("Section '%s' matches" % section)
return permission in regex.get(section, "permission") return permission in regex.get(section, "permission")
else: else:

View File

@ -211,9 +211,7 @@ class BaseCollection:
returned. returned.
If ``depth`` is anything but "0", it is considered as "1" and direct If ``depth`` is anything but "0", it is considered as "1" and direct
children are included in the result. If ``include_container`` is children are included in the result.
``True`` (the default), the containing object is included in the
result.
The ``path`` is relative. The ``path`` is relative.
@ -368,41 +366,39 @@ class Collection(BaseCollection):
attributes = sane_path.split("/") attributes = sane_path.split("/")
if not attributes[0]: if not attributes[0]:
attributes.pop() attributes.pop()
# Try to guess if the path leads to a collection or an item
folder = os.path.expanduser( folder = os.path.expanduser(
cls.configuration.get("storage", "filesystem_folder")) cls.configuration.get("storage", "filesystem_folder"))
if not os.path.isdir(path_to_filesystem(folder, sane_path)): try:
# path is not a collection filesystem_path = path_to_filesystem(folder, sane_path)
if attributes and os.path.isfile(path_to_filesystem(folder, except ValueError:
sane_path)): # Path is unsafe
# path is an item return
attributes.pop() href = None
elif attributes and os.path.isdir(path_to_filesystem( if not os.path.isdir(filesystem_path):
folder, *attributes[:-1])): if attributes and os.path.isfile(filesystem_path):
# path parent is a collection href = attributes.pop()
attributes.pop() else:
# TODO: else: return? return
path = "/".join(attributes) path = "/".join(attributes)
principal = len(attributes) == 1 principal = len(attributes) == 1
collection = cls(path, principal) collection = cls(path, principal)
if href:
yield collection.get(href)
return
yield collection yield collection
if depth != "0": if depth == "0":
# TODO: fix this return
items = list(collection.list()) for item in collection.list():
if items: yield collection.get(item[0])
for item in items: for href in os.listdir(filesystem_path):
yield collection.get(item[0]) if not is_safe_filesystem_path_component(href):
_, directories, _ = next(os.walk(collection._filesystem_path)) cls.logger.debug("Skipping collection: %s", href)
for sub_path in directories: continue
if not is_safe_filesystem_path_component(sub_path): child_filesystem_path = path_to_filesystem(filesystem_path, href)
cls.logger.debug("Skipping collection: %s", sub_path) if os.path.isdir(child_filesystem_path):
continue child_principal = len(attributes) == 0
full_path = os.path.join(collection._filesystem_path, sub_path) yield cls(child_filesystem_path, child_principal)
if os.path.exists(full_path):
yield cls(posixpath.join(path, sub_path))
@classmethod @classmethod
def create_collection(cls, href, collection=None, tag=None): def create_collection(cls, href, collection=None, tag=None):

View File

@ -133,6 +133,36 @@ class BaseRequests:
status, headers, answer = self.request("GET", "/calendar.ics/") status, headers, answer = self.request("GET", "/calendar.ics/")
assert "VEVENT" not in answer assert "VEVENT" not in answer
def test_mkcalendar(self):
"""Make a calendar."""
self.request("MKCALENDAR", "/calendar.ics/")
status, headers, answer = self.request("GET", "/calendar.ics/")
assert status == 200
def test_move(self):
"""Move a item."""
self.request("MKCALENDAR", "/calendar.ics/")
event = get_file_content("event1.ics")
path1 = "/calendar.ics/event1.ics"
path2 = "/calendar.ics/event2.ics"
status, headers, answer = self.request("PUT", path1, event)
status, headers, answer = self.request(
"MOVE", path1, HTTP_DESTINATION=path2, HTTP_HOST="")
assert status == 201
status, headers, answer = self.request("GET", path1)
assert status == 404
status, headers, answer = self.request("GET", path2)
assert status == 200
def test_head(self):
status, headers, answer = self.request("HEAD", "/")
assert status == 200
def test_options(self):
status, headers, answer = self.request("OPTIONS", "/")
assert status == 200
assert "DAV" in headers
def test_multiple_events_with_same_uid(self): def test_multiple_events_with_same_uid(self):
"""Add two events with the same UID.""" """Add two events with the same UID."""
self.request("MKCOL", "/calendar.ics/") self.request("MKCOL", "/calendar.ics/")

View File

@ -472,20 +472,13 @@ def props_from_request(root, actions=("set", "remove")):
return result return result
def delete(path, collection): def delete(path, collection, href=None):
"""Read and answer DELETE requests. """Read and answer DELETE requests.
Read rfc4918-9.6 for info. Read rfc4918-9.6 for info.
""" """
# Reading request collection.delete(href)
if collection.path == path.strip("/"):
# Delete the whole collection
collection.delete()
else:
# Remove an item from the collection
collection.delete(name_from_path(path, collection))
# Writing answer # Writing answer
multistatus = ET.Element(_tag("D", "multistatus")) multistatus = ET.Element(_tag("D", "multistatus"))
response = ET.Element(_tag("D", "response")) response = ET.Element(_tag("D", "response"))