Improve HTTP status codes

This commit is contained in:
Unrud 2018-08-14 18:39:09 +02:00
parent b0a0debf17
commit 546a52e34a

View File

@ -59,14 +59,23 @@ VERSION = pkg_resources.get_distribution('radicale').version
NOT_ALLOWED = ( NOT_ALLOWED = (
client.FORBIDDEN, (("Content-Type", "text/plain"),), client.FORBIDDEN, (("Content-Type", "text/plain"),),
"Access to the requested resource forbidden.") "Access to the requested resource forbidden.")
FORBIDDEN = (
client.FORBIDDEN, (("Content-Type", "text/plain"),),
"Action on the requested resource refused.")
BAD_REQUEST = ( BAD_REQUEST = (
client.BAD_REQUEST, (("Content-Type", "text/plain"),), "Bad Request") client.BAD_REQUEST, (("Content-Type", "text/plain"),), "Bad Request")
NOT_FOUND = ( NOT_FOUND = (
client.NOT_FOUND, (("Content-Type", "text/plain"),), client.NOT_FOUND, (("Content-Type", "text/plain"),),
"The requested resource could not be found.") "The requested resource could not be found.")
CONFLICT = (
client.CONFLICT, (("Content-Type", "text/plain"),),
"Conflict in the request.")
WEBDAV_PRECONDITION_FAILED = ( WEBDAV_PRECONDITION_FAILED = (
client.CONFLICT, (("Content-Type", "text/plain"),), client.CONFLICT, (("Content-Type", "text/plain"),),
"WebDAV precondition failed.") "WebDAV precondition failed.")
METHOD_NOT_ALLOWED = (
client.METHOD_NOT_ALLOWED, (("Content-Type", "text/plain"),),
"The method is not allowed on the requested resource.")
PRECONDITION_FAILED = ( PRECONDITION_FAILED = (
client.PRECONDITION_FAILED, client.PRECONDITION_FAILED,
(("Content-Type", "text/plain"),), "Precondition failed.") (("Content-Type", "text/plain"),), "Precondition failed.")
@ -564,6 +573,14 @@ class Application:
xml_declaration=True) xml_declaration=True)
return f.getvalue() return f.getvalue()
def _webdav_error_response(self, namespace, name,
status=WEBDAV_PRECONDITION_FAILED[0]):
"""Generate XML error response."""
headers = {"Content-Type": "text/xml; charset=%s" % self.encoding}
content = self._write_xml_content(
xmlutils.webdav_error(namespace, name))
return status, headers, content
def do_DELETE(self, environ, base_prefix, path, user): def do_DELETE(self, environ, base_prefix, path, user):
"""Manage DELETE request.""" """Manage DELETE request."""
if not self._access(user, path, "w"): if not self._access(user, path, "w"):
@ -644,7 +661,16 @@ class Application:
with self.Collection.acquire_lock("w", user): with self.Collection.acquire_lock("w", user):
item = next(self.Collection.discover(path), None) item = next(self.Collection.discover(path), None)
if item: if item:
return WEBDAV_PRECONDITION_FAILED return self._webdav_error_response(
"D", "resource-must-be-null")
parent_path = storage.sanitize_path(
"/%s/" % posixpath.dirname(path.strip("/")))
parent_item = next(self.Collection.discover(parent_path), None)
if not parent_item:
return CONFLICT
if (not isinstance(parent_item, storage.BaseCollection) or
parent_item.get_meta("tag")):
return FORBIDDEN
props = xmlutils.props_from_request(xml_content) props = xmlutils.props_from_request(xml_content)
props["tag"] = "VCALENDAR" props["tag"] = "VCALENDAR"
# TODO: use this? # TODO: use this?
@ -674,7 +700,15 @@ class Application:
with self.Collection.acquire_lock("w", user): with self.Collection.acquire_lock("w", user):
item = next(self.Collection.discover(path), None) item = next(self.Collection.discover(path), None)
if item: if item:
return WEBDAV_PRECONDITION_FAILED return METHOD_NOT_ALLOWED
parent_path = storage.sanitize_path(
"/%s/" % posixpath.dirname(path.strip("/")))
parent_item = next(self.Collection.discover(parent_path), None)
if not parent_item:
return CONFLICT
if (not isinstance(parent_item, storage.BaseCollection) or
parent_item.get_meta("tag")):
return FORBIDDEN
props = xmlutils.props_from_request(xml_content) props = xmlutils.props_from_request(xml_content)
try: try:
storage.check_and_sanitize_props(props) storage.check_and_sanitize_props(props)
@ -713,18 +747,29 @@ class Application:
if not item: if not item:
return NOT_FOUND return NOT_FOUND
if isinstance(item, storage.BaseCollection): if isinstance(item, storage.BaseCollection):
return WEBDAV_PRECONDITION_FAILED # TODO: support moving collections
return METHOD_NOT_ALLOWED
to_item = next(self.Collection.discover(to_path), None) to_item = next(self.Collection.discover(to_path), None)
if (isinstance(to_item, storage.BaseCollection) or if isinstance(to_item, storage.BaseCollection):
to_item and environ.get("HTTP_OVERWRITE", "F") != "T"): return FORBIDDEN
return WEBDAV_PRECONDITION_FAILED
to_parent_path = storage.sanitize_path( to_parent_path = storage.sanitize_path(
"/%s/" % posixpath.dirname(to_path.strip("/"))) "/%s/" % posixpath.dirname(to_path.strip("/")))
to_collection = next( to_collection = next(
self.Collection.discover(to_parent_path), None) self.Collection.discover(to_parent_path), None)
if not to_collection: if not to_collection:
return WEBDAV_PRECONDITION_FAILED return CONFLICT
tag = item.collection.get_meta("tag")
if not tag or tag != to_collection.get_meta("tag"):
return FORBIDDEN
if to_item and environ.get("HTTP_OVERWRITE", "F") != "T":
return PRECONDITION_FAILED
if (to_item and item.uid != to_item.uid or
not to_item and
to_collection.path != item.collection.path and
to_collection.has_uid(item.uid)):
return self._webdav_error_response(
"C" if tag == "VCALENDAR" else "CR", "no-uid-conflict")
to_href = posixpath.basename(to_path.strip("/")) to_href = posixpath.basename(to_path.strip("/"))
try: try:
self.Collection.move(item, to_collection, to_href) self.Collection.move(item, to_collection, to_href)
@ -732,7 +777,7 @@ class Application:
self.logger.warning( self.logger.warning(
"Bad MOVE request on %r: %s", path, e, exc_info=True) "Bad MOVE request on %r: %s", path, e, exc_info=True)
return BAD_REQUEST return BAD_REQUEST
return client.CREATED, {}, None return client.NO_CONTENT if to_item else client.CREATED, {}, None
def do_OPTIONS(self, environ, base_prefix, path, user): def do_OPTIONS(self, environ, base_prefix, path, user):
"""Manage OPTIONS request.""" """Manage OPTIONS request."""
@ -773,7 +818,6 @@ class Application:
base_prefix, path, xml_content, read_items, write_items, user) base_prefix, path, xml_content, read_items, write_items, user)
if status == client.FORBIDDEN: if status == client.FORBIDDEN:
return NOT_ALLOWED return NOT_ALLOWED
else:
return status, headers, self._write_xml_content(xml_answer) return status, headers, self._write_xml_content(xml_answer)
def do_PROPPATCH(self, environ, base_prefix, path, user): def do_PROPPATCH(self, environ, base_prefix, path, user):
@ -791,8 +835,10 @@ class Application:
return REQUEST_TIMEOUT return REQUEST_TIMEOUT
with self.Collection.acquire_lock("w", user): with self.Collection.acquire_lock("w", user):
item = next(self.Collection.discover(path), None) item = next(self.Collection.discover(path), None)
if not item:
return NOT_FOUND
if not isinstance(item, storage.BaseCollection): if not isinstance(item, storage.BaseCollection):
return WEBDAV_PRECONDITION_FAILED return FORBIDDEN
headers = {"DAV": DAV_HEADERS, headers = {"DAV": DAV_HEADERS,
"Content-Type": "text/xml; charset=%s" % self.encoding} "Content-Type": "text/xml; charset=%s" % self.encoding}
try: try:
@ -823,13 +869,12 @@ class Application:
"/%s/" % posixpath.dirname(path.strip("/"))) "/%s/" % posixpath.dirname(path.strip("/")))
item = next(self.Collection.discover(path), None) item = next(self.Collection.discover(path), None)
parent_item = next(self.Collection.discover(parent_path), None) parent_item = next(self.Collection.discover(parent_path), None)
if not parent_item:
return CONFLICT
write_whole_collection = ( write_whole_collection = (
isinstance(item, storage.BaseCollection) or isinstance(item, storage.BaseCollection) or
not parent_item or ( not parent_item.get_meta("tag"))
not next(parent_item.list(), None) and
parent_item.get_meta("tag") not in (
"VADDRESSBOOK", "VCALENDAR")))
if write_whole_collection: if write_whole_collection:
if not self.Rights.authorized(user, path, "w"): if not self.Rights.authorized(user, path, "w"):
return NOT_ALLOWED return NOT_ALLOWED
@ -851,10 +896,7 @@ class Application:
try: try:
items = tuple(vobject.readComponents(content or "")) items = tuple(vobject.readComponents(content or ""))
if not write_whole_collection and len(items) != 1: if write_whole_collection:
raise RuntimeError(
"Item contains %d components" % len(items))
if write_whole_collection or not parent_item.get_meta("tag"):
content_type = environ.get("CONTENT_TYPE", content_type = environ.get("CONTENT_TYPE",
"").split(";")[0] "").split(";")[0]
tags = {value: key tags = {value: key