commit
e25373fa85
@ -50,7 +50,24 @@ from . import auth, rights, storage, xmlutils
|
||||
|
||||
VERSION = "2.0.0rc0"
|
||||
|
||||
NOT_ALLOWED = (client.FORBIDDEN, {}, None)
|
||||
NOT_ALLOWED = (client.FORBIDDEN, {"Content-type": "text/plain"},
|
||||
"Access to the requested resource forbidden.")
|
||||
NOT_FOUND = (client.NOT_FOUND, {"Content-type": "text/plain"},
|
||||
"The requested resource could not be found.")
|
||||
WEBDAV_PRECONDITION_FAILED = (client.CONFLICT, {"Content-type": "text/plain"},
|
||||
"WebDAV precondition failed.")
|
||||
PRECONDITION_FAILED = (client.PRECONDITION_FAILED,
|
||||
{"Content-type": "text/plain"}, "Precondition failed.")
|
||||
REQUEST_TIMEOUT = (client.REQUEST_TIMEOUT, {"Content-type": "text/plain"},
|
||||
"Connection timed out.")
|
||||
REQUEST_ENTITY_TOO_LARGE = (client.REQUEST_ENTITY_TOO_LARGE,
|
||||
{"Content-type": "text/plain"},
|
||||
"Request body too large.")
|
||||
REMOTE_DESTINATION = (client.BAD_GATEWAY, {"Content-type": "text/plain"},
|
||||
"Remote destination not supported.")
|
||||
DIRECTORY_LISTING = (client.FORBIDDEN, {"Content-type": "text/plain"},
|
||||
"Directory listings are not supported.")
|
||||
|
||||
DAV_HEADERS = "1, 2, 3, calendar-access, addressbook, extended-mkcol"
|
||||
|
||||
|
||||
@ -296,7 +313,7 @@ class Application:
|
||||
|
||||
# If "/.well-known" is not available, clients query "/"
|
||||
if path == "/.well-known" or path.startswith("/.well-known/"):
|
||||
return response(client.NOT_FOUND, {})
|
||||
return response(*NOT_FOUND)
|
||||
|
||||
if user and not storage.is_safe_path_component(user):
|
||||
# Prevent usernames like "user/calendar.ics"
|
||||
@ -326,13 +343,13 @@ class Application:
|
||||
if max_content_length and content_length > max_content_length:
|
||||
self.logger.debug(
|
||||
"Request body too large: %d", content_length)
|
||||
return response(client.REQUEST_ENTITY_TOO_LARGE)
|
||||
return response(*REQUEST_ENTITY_TOO_LARGE)
|
||||
|
||||
if is_valid_user:
|
||||
try:
|
||||
status, headers, answer = function(environ, path, user)
|
||||
except socket.timeout:
|
||||
return response(client.REQUEST_TIMEOUT)
|
||||
return response(*REQUEST_TIMEOUT)
|
||||
else:
|
||||
status, headers, answer = NOT_ALLOWED
|
||||
|
||||
@ -342,10 +359,9 @@ class Application:
|
||||
self.logger.info("%s refused" % (user or "Anonymous user"))
|
||||
status = client.UNAUTHORIZED
|
||||
realm = self.configuration.get("server", "realm")
|
||||
headers = {
|
||||
headers.update ({
|
||||
"WWW-Authenticate":
|
||||
"Basic realm=\"%s\"" % realm}
|
||||
answer = None
|
||||
"Basic realm=\"%s\"" % realm})
|
||||
|
||||
# Set content length
|
||||
if answer:
|
||||
@ -407,11 +423,11 @@ class Application:
|
||||
if not self._access(user, path, "w", item):
|
||||
return NOT_ALLOWED
|
||||
if not item:
|
||||
return client.GONE, {}, None
|
||||
return NOT_FOUND
|
||||
if_match = environ.get("HTTP_IF_MATCH", "*")
|
||||
if if_match not in ("*", item.etag):
|
||||
# ETag precondition not verified, do not delete item
|
||||
return client.PRECONDITION_FAILED, {}, None
|
||||
return PRECONDITION_FAILED
|
||||
if isinstance(item, self.Collection):
|
||||
answer = xmlutils.delete(path, item)
|
||||
else:
|
||||
@ -430,9 +446,11 @@ class Application:
|
||||
if not self._access(user, path, "r", item):
|
||||
return NOT_ALLOWED
|
||||
if not item:
|
||||
return client.NOT_FOUND, {}, None
|
||||
return NOT_FOUND
|
||||
if isinstance(item, self.Collection):
|
||||
collection = item
|
||||
if collection.get_meta("tag") not in ("VADDRESSBOOK", "VCALENDAR"):
|
||||
return DIRECTORY_LISTING
|
||||
else:
|
||||
collection = item.collection
|
||||
content_type = xmlutils.MIMETYPES.get(
|
||||
@ -457,7 +475,7 @@ class Application:
|
||||
with self.Collection.acquire_lock("w", user):
|
||||
item = next(self.Collection.discover(path), None)
|
||||
if item:
|
||||
return client.CONFLICT, {}, None
|
||||
return WEBDAV_PRECONDITION_FAILED
|
||||
props = xmlutils.props_from_request(content)
|
||||
props["tag"] = "VCALENDAR"
|
||||
# TODO: use this?
|
||||
@ -473,7 +491,7 @@ class Application:
|
||||
with self.Collection.acquire_lock("w", user):
|
||||
item = next(self.Collection.discover(path), None)
|
||||
if item:
|
||||
return client.CONFLICT, {}, None
|
||||
return WEBDAV_PRECONDITION_FAILED
|
||||
props = xmlutils.props_from_request(content)
|
||||
self.Collection.create_collection(path, props=props)
|
||||
return client.CREATED, {}, None
|
||||
@ -483,7 +501,7 @@ class Application:
|
||||
to_url = urlparse(environ["HTTP_DESTINATION"])
|
||||
if to_url.netloc != environ["HTTP_HOST"]:
|
||||
# Remote destination server, not supported
|
||||
return client.BAD_GATEWAY, {}, None
|
||||
return REMOTE_DESTINATION
|
||||
if not self._access(user, path, "w"):
|
||||
return NOT_ALLOWED
|
||||
to_path = storage.sanitize_path(to_url.path)
|
||||
@ -497,20 +515,20 @@ class Application:
|
||||
if not self._access(user, to_path, "w", item):
|
||||
return NOT_ALLOWED
|
||||
if not item:
|
||||
return client.GONE, {}, None
|
||||
return NOT_FOUND
|
||||
if isinstance(item, self.Collection):
|
||||
return client.CONFLICT, {}, None
|
||||
return WEBDAV_PRECONDITION_FAILED
|
||||
|
||||
to_item = next(self.Collection.discover(to_path), None)
|
||||
if (isinstance(to_item, self.Collection) or
|
||||
to_item and environ.get("HTTP_OVERWRITE", "F") != "T"):
|
||||
return client.CONFLICT, {}, None
|
||||
return WEBDAV_PRECONDITION_FAILED
|
||||
to_parent_path = storage.sanitize_path(
|
||||
"/%s/" % posixpath.dirname(to_path.strip("/")))
|
||||
to_collection = next(
|
||||
self.Collection.discover(to_parent_path), None)
|
||||
if not to_collection:
|
||||
return client.CONFLICT, {}, None
|
||||
return WEBDAV_PRECONDITION_FAILED
|
||||
to_href = posixpath.basename(to_path.strip("/"))
|
||||
self.Collection.move(item, to_collection, to_href)
|
||||
return client.CREATED, {}, None
|
||||
@ -536,7 +554,7 @@ class Application:
|
||||
if not self._access(user, path, "r", item):
|
||||
return NOT_ALLOWED
|
||||
if not item:
|
||||
return client.NOT_FOUND, {}, None
|
||||
return NOT_FOUND
|
||||
# put item back
|
||||
items = itertools.chain([item], items)
|
||||
read_items, write_items = self.collect_allowed_items(items, user)
|
||||
@ -556,7 +574,7 @@ class Application:
|
||||
with self.Collection.acquire_lock("w", user):
|
||||
item = next(self.Collection.discover(path), None)
|
||||
if not isinstance(item, self.Collection):
|
||||
return client.CONFLICT, {}, None
|
||||
return WEBDAV_PRECONDITION_FAILED
|
||||
headers = {"DAV": DAV_HEADERS, "Content-Type": "text/xml"}
|
||||
answer = xmlutils.proppatch(path, content, item)
|
||||
return client.MULTI_STATUS, headers, answer
|
||||
@ -587,15 +605,15 @@ class Application:
|
||||
etag = environ.get("HTTP_IF_MATCH", "")
|
||||
if not item and etag:
|
||||
# Etag asked but no item found: item has been removed
|
||||
return client.PRECONDITION_FAILED, {}, None
|
||||
return PRECONDITION_FAILED
|
||||
if item and etag and item.etag != etag:
|
||||
# Etag asked but item not matching: item has changed
|
||||
return client.PRECONDITION_FAILED, {}, None
|
||||
return PRECONDITION_FAILED
|
||||
|
||||
match = environ.get("HTTP_IF_NONE_MATCH", "") == "*"
|
||||
if item and match:
|
||||
# Creation asked but item found: item can't be replaced
|
||||
return client.PRECONDITION_FAILED, {}, None
|
||||
return PRECONDITION_FAILED
|
||||
|
||||
items = list(vobject.readComponents(content or ""))
|
||||
content_type = environ.get("CONTENT_TYPE", "").split(";")[0]
|
||||
@ -623,7 +641,7 @@ class Application:
|
||||
if not self._access(user, path, "w", item):
|
||||
return NOT_ALLOWED
|
||||
if not item:
|
||||
return client.NOT_FOUND, {}, None
|
||||
return NOT_FOUND
|
||||
if isinstance(item, self.Collection):
|
||||
collection = item
|
||||
else:
|
||||
|
@ -736,8 +736,8 @@ class BaseRequestsMixIn:
|
||||
def test_principal_collection_creation(self):
|
||||
"""Verify existence of the principal collection."""
|
||||
status, headers, answer = self.request(
|
||||
"GET", "/user/", REMOTE_USER="user")
|
||||
assert status == 200
|
||||
"PROPFIND", "/user/", REMOTE_USER="user")
|
||||
assert status == 207
|
||||
|
||||
def test_existence_of_root_collections(self):
|
||||
"""Verify that the root collection always exists."""
|
||||
@ -762,8 +762,8 @@ class BaseRequestsMixIn:
|
||||
"created_by_hook"))
|
||||
status, headers, answer = self.request("MKCOL", "/calendar.ics/")
|
||||
assert status == 201
|
||||
status, headers, answer = self.request("GET", "/created_by_hook/")
|
||||
assert status == 200
|
||||
status, headers, answer = self.request("PROPFIND", "/created_by_hook/")
|
||||
assert status == 207
|
||||
|
||||
def test_hook_read_access(self):
|
||||
"""Verify that hook is not run for read accesses."""
|
||||
@ -791,8 +791,8 @@ class BaseRequestsMixIn:
|
||||
"created_by_hook"))
|
||||
status, headers, answer = self.request("GET", "/", REMOTE_USER="user")
|
||||
assert status == 200
|
||||
status, headers, answer = self.request("GET", "/created_by_hook/")
|
||||
assert status == 200
|
||||
status, headers, answer = self.request("PROPFIND", "/created_by_hook/")
|
||||
assert status == 207
|
||||
|
||||
def test_hook_fail(self):
|
||||
"""Verify that a request fails if the hook fails."""
|
||||
|
Loading…
Reference in New Issue
Block a user