Stricter parsing of filters

This commit is contained in:
Unrud 2017-08-29 20:08:30 +02:00
parent 963e28d661
commit 31ba4eb876
2 changed files with 89 additions and 62 deletions

View File

@ -927,6 +927,11 @@ class Application:
else:
collection = item.collection
headers = {"Content-Type": "text/xml; charset=%s" % self.encoding}
status, xml_answer = xmlutils.report(
base_prefix, path, xml_content, collection)
try:
status, xml_answer = xmlutils.report(
base_prefix, path, xml_content, collection)
except ValueError as e:
self.logger.warning(
"Bad REPORT request on %r: %s", path, e, exc_info=True)
return BAD_REQUEST
return (status, headers, self._write_xml_content(xml_answer))

View File

@ -162,72 +162,76 @@ def _comp_match(item, filter_, scope="collection"):
See rfc4791-9.7.1.
"""
# TODO: Filtering VALARM and VFREEBUSY is not implemented
# HACK: the filters are tested separately against all components
filter_length = len(filter_)
if scope == "collection":
tag = item.collection.get_meta("tag")
tag = item.name
else:
for component in item.components():
if component.name in ("VTODO", "VEVENT", "VJOURNAL"):
tag = component.name
break
else:
return False
tag = item.component_name
if not tag:
return False
name = filter_.get("name")
if filter_length == 0:
# Point #1 of rfc4791-9.7.1
return filter_.get("name") == tag
else:
if filter_length == 1:
if filter_[0].tag == _tag("C", "is-not-defined"):
# Point #2 of rfc4791-9.7.1
return filter_.get("name") != tag
if filter_[0].tag == _tag("C", "time-range"):
# Point #3 of rfc4791-9.7.1
return name == tag
if filter_length == 1:
if filter_[0].tag == _tag("C", "is-not-defined"):
# Point #2 of rfc4791-9.7.1
return name != tag
if name != tag:
return False
# Point #3 and #4 of rfc4791-9.7.1
components = ([item.item] if scope == "collection"
else list(getattr(item, "%s_list" % tag.lower())))
for child in filter_:
if child.tag == _tag("C", "prop-filter"):
if not any(_prop_match(comp, child, "C")
for comp in components):
return False
elif child.tag == _tag("C", "time-range"):
if not _time_range_match(item.item, filter_[0], tag):
return False
filter_ = filter_[1:]
# Point #4 of rfc4791-9.7.1
return all(
_prop_match(item, child) if child.tag == _tag("C", "prop-filter")
else _comp_match(item, child, scope="component")
for child in filter_)
elif child.tag == _tag("C", "comp-filter"):
if not _comp_match(item, child, scope="component"):
return False
else:
raise ValueError("Unexpected %r in comp-filter" % child.tag)
return True
def _prop_match(item, filter_):
def _prop_match(vobject_item, filter_, ns):
"""Check whether the ``item`` matches the prop ``filter_``.
See rfc4791-9.7.2 and rfc6352-10.5.1.
"""
filter_length = len(filter_)
if item.collection.get_meta("tag") == "VCALENDAR":
for component in item.components():
if component.name in ("VTODO", "VEVENT", "VJOURNAL"):
vobject_item = component
break
else:
vobject_item = item.item
if filter_length == 0:
name = filter_.get("name").lower()
if len(filter_) == 0:
# Point #1 of rfc4791-9.7.2
return filter_.get("name").lower() in vobject_item.contents
else:
name = filter_.get("name").lower()
if filter_length == 1:
if filter_[0].tag == _tag("C", "is-not-defined"):
# Point #2 of rfc4791-9.7.2
return name not in vobject_item.contents
if filter_[0].tag == _tag("C", "time-range"):
# Point #3 of rfc4791-9.7.2
if not _time_range_match(vobject_item, filter_[0], name):
return name in vobject_item.contents
if len(filter_) == 1:
if filter_[0].tag == _tag("C", "is-not-defined"):
# Point #2 of rfc4791-9.7.2
return name not in vobject_item.contents
if name not in vobject_item.contents:
return False
# Point #3 and #4 of rfc4791-9.7.2
for child in filter_:
if ns == "C" and child.tag == _tag("C", "time-range"):
if not _time_range_match(vobject_item, child, name):
return False
filter_ = filter_[1:]
elif filter_[0].tag == _tag("C", "text-match"):
# Point #4 of rfc4791-9.7.2
if not _text_match(vobject_item, filter_[0], name):
elif child.tag == _tag(ns, "text-match"):
if not _text_match(vobject_item, child, name, ns):
return False
filter_ = filter_[1:]
return all(
_param_filter_match(vobject_item, param_filter, name)
for param_filter in filter_)
elif child.tag == _tag(ns, "param-filter"):
if not _param_filter_match(vobject_item, child, name, ns):
return False
else:
raise ValueError("Unexpected %r in prop-filter" % child.tag)
return True
def _time_range_match(vobject_item, filter_, child_name):
@ -510,7 +514,7 @@ def _visit_time_ranges(vobject_item, child_name, range_fn, infinity_fn):
return
def _text_match(vobject_item, filter_, child_name, attrib_name=None):
def _text_match(vobject_item, filter_, child_name, ns, attrib_name=None):
"""Check whether the ``item`` matches the text-match ``filter_``.
See rfc4791-9.7.5.
@ -520,7 +524,9 @@ def _text_match(vobject_item, filter_, child_name, attrib_name=None):
# for DAV servers are actually pretty useless. Texts are lowered to
# be case-insensitive, almost as the "i;ascii-casemap" value.
text = next(filter_.itertext()).lower()
match_type = filter_.get("match-type", match_type)
match_type = "contains"
if ns == "CR":
match_type = filter_.get("match-type", match_type)
def match(value):
value = value.lower()
@ -547,7 +553,7 @@ def _text_match(vobject_item, filter_, child_name, attrib_name=None):
return condition
def _param_filter_match(vobject_item, filter_, parent_name):
def _param_filter_match(vobject_item, filter_, parent_name, ns):
"""Check whether the ``item`` matches the param-filter ``filter_``.
See rfc4791-9.7.3.
@ -557,10 +563,10 @@ def _param_filter_match(vobject_item, filter_, parent_name):
children = getattr(vobject_item, "%s_list" % parent_name, [])
condition = any(name in child.params for child in children)
if len(filter_):
if filter_[0].tag == _tag("C", "text-match"):
if filter_[0].tag == _tag(ns, "text-match"):
return condition and _text_match(
vobject_item, filter_[0], parent_name, name)
elif filter_[0].tag == _tag("C", "is-not-defined"):
vobject_item, filter_[0], parent_name, ns, name)
elif filter_[0].tag == _tag(ns, "is-not-defined"):
return not condition
else:
return condition
@ -1183,16 +1189,32 @@ def report(base_prefix, path, xml_request, collection):
if collection_requested:
yield from collection.get_all_filtered(filters)
def match(item, filter_):
tag = collection.get_meta("tag")
if (tag == "VCALENDAR" and filter_.tag != _tag("C", filter_)):
if len(filter_) == 0:
return True
if len(filter_) > 1:
raise ValueError("Filter with %d children" % len(filter_))
if filter_[0].tag != _tag("C", "comp-filter"):
raise ValueError("Unexpected %r in filter" % filter_[0].tag)
return _comp_match(item, filter_[0])
if tag == "VADDRESSBOOK" and filter_.tag != _tag("CR", filter_):
for child in filter_:
if child.tag != _tag("CR", "prop-filter"):
raise ValueError("Unexpected %r in filter" % child.tag)
return all(_prop_match(item.item, f, "CR") for f in filter_)
raise ValueError("unsupported filter %r for %r" % (filter_.tag, tag))
for item, filters_matched in retrieve_items(collection, hreferences,
multistatus):
if filters and not filters_matched:
match = (
_comp_match if collection.get_meta("tag") == "VCALENDAR"
else _prop_match)
try:
if not all(match(item, filter_[0]) for filter_ in filters
if filter_):
if not all(match(item, filter_) for filter_ in filters):
continue
except ValueError as e:
raise ValueError("Failed to filter item %r from %r: %s" %
(item.href, collection.path, e)) from e
except Exception as e:
raise RuntimeError("Failed to filter item %r from %r: %s" %
(item.href, collection.path, e)) from e