require UIDs in CalDAV/CardDAV / check for duplicated UIDs / try to use UIDs as filenames
This commit is contained in:
parent
7964d288a5
commit
c7e65fbb7f
@ -919,14 +919,8 @@ class Application:
|
|||||||
tag = "VADDRESSBOOK"
|
tag = "VADDRESSBOOK"
|
||||||
else:
|
else:
|
||||||
tag = parent_item.get_meta("tag")
|
tag = parent_item.get_meta("tag")
|
||||||
if tag == "VCALENDAR" and len(items) > 1:
|
storage.check_and_sanitize_items(
|
||||||
raise RuntimeError("VCALENDAR collection contains %d "
|
items, is_collection=write_whole_collection, tag=tag)
|
||||||
"components" % len(items))
|
|
||||||
for i in items:
|
|
||||||
storage.check_and_sanitize_item(
|
|
||||||
i, is_collection=write_whole_collection, uid=item.uid
|
|
||||||
if not write_whole_collection and item else None,
|
|
||||||
tag=tag)
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"Bad PUT request on %r: %s", path, e, exc_info=True)
|
"Bad PUT request on %r: %s", path, e, exc_info=True)
|
||||||
@ -954,6 +948,13 @@ class Application:
|
|||||||
"Bad PUT request on %r: %s", path, e, exc_info=True)
|
"Bad PUT request on %r: %s", path, e, exc_info=True)
|
||||||
return BAD_REQUEST
|
return BAD_REQUEST
|
||||||
else:
|
else:
|
||||||
|
uid = storage.get_uid_from_object(items[0])
|
||||||
|
if (item and item.uid != uid or
|
||||||
|
not item and parent_item.has_uid(uid)):
|
||||||
|
return self._webdav_error_response(
|
||||||
|
"C" if tag == "VCALENDAR" else "CR",
|
||||||
|
"no-uid-conflict")
|
||||||
|
|
||||||
href = posixpath.basename(path.strip("/"))
|
href = posixpath.basename(path.strip("/"))
|
||||||
try:
|
try:
|
||||||
if tag and not parent_item.get_meta("tag"):
|
if tag and not parent_item.get_meta("tag"):
|
||||||
|
@ -119,21 +119,33 @@ def load(configuration):
|
|||||||
return CollectionCopy
|
return CollectionCopy
|
||||||
|
|
||||||
|
|
||||||
def check_and_sanitize_item(vobject_item, is_collection=False, uid=None,
|
def check_and_sanitize_items(vobject_items, is_collection=False, tag=None):
|
||||||
tag=None):
|
|
||||||
"""Check vobject items for common errors and add missing UIDs.
|
"""Check vobject items for common errors and add missing UIDs.
|
||||||
|
|
||||||
``is_collection`` indicates that vobject_item contains unrelated
|
``is_collection`` indicates that vobject_item contains unrelated
|
||||||
components.
|
components.
|
||||||
|
|
||||||
If ``uid`` is not set, the UID is generated randomly.
|
|
||||||
|
|
||||||
The ``tag`` of the collection.
|
The ``tag`` of the collection.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
if tag and tag not in ("VCALENDAR", "VADDRESSBOOK"):
|
if tag and tag not in ("VCALENDAR", "VADDRESSBOOK"):
|
||||||
raise ValueError("Unsupported collection tag: %r" % tag)
|
raise ValueError("Unsupported collection tag: %r" % tag)
|
||||||
if vobject_item.name == "VCALENDAR" and tag == "VCALENDAR":
|
if not is_collection and len(vobject_items) != 1:
|
||||||
|
raise ValueError("Item contains %d components" % len(vobject_items))
|
||||||
|
if tag == "VCALENDAR":
|
||||||
|
if len(vobject_items) > 1:
|
||||||
|
raise RuntimeError("VCALENDAR collection contains %d "
|
||||||
|
"components" % len(vobject_items))
|
||||||
|
vobject_item = vobject_items[0]
|
||||||
|
if vobject_item.name != "VCALENDAR":
|
||||||
|
raise ValueError("Item type %r not supported in %r "
|
||||||
|
"collection" % (vobject_item.name, tag))
|
||||||
|
component_uids = set()
|
||||||
|
for component in vobject_item.components():
|
||||||
|
if component.name in ("VTODO", "VEVENT", "VJOURNAL"):
|
||||||
|
component_uid = get_uid(component)
|
||||||
|
if component_uid:
|
||||||
|
component_uids.add(component_uid)
|
||||||
component_name = None
|
component_name = None
|
||||||
object_uid = None
|
object_uid = None
|
||||||
object_uid_set = False
|
object_uid_set = False
|
||||||
@ -152,10 +164,17 @@ def check_and_sanitize_item(vobject_item, is_collection=False, uid=None,
|
|||||||
if not object_uid_set or is_collection:
|
if not object_uid_set or is_collection:
|
||||||
object_uid_set = True
|
object_uid_set = True
|
||||||
object_uid = component_uid
|
object_uid = component_uid
|
||||||
if component_uid is None:
|
if not component_uid:
|
||||||
component.add("UID").value = uid or random_uuid4()
|
if not is_collection:
|
||||||
elif not component_uid:
|
raise ValueError("%s component without UID in object" %
|
||||||
component.uid.value = uid or random_uuid4()
|
component_name)
|
||||||
|
component_uid = find_available_name(
|
||||||
|
component_uids.__contains__)
|
||||||
|
component_uids.add(component_uid)
|
||||||
|
if hasattr(component, "uid"):
|
||||||
|
component.uid.value = component_uid
|
||||||
|
else:
|
||||||
|
component.add("UID").value = component_uid
|
||||||
elif not object_uid or not component_uid:
|
elif not object_uid or not component_uid:
|
||||||
raise ValueError("Multiple %s components without UID in "
|
raise ValueError("Multiple %s components without UID in "
|
||||||
"object" % component_name)
|
"object" % component_name)
|
||||||
@ -169,19 +188,37 @@ def check_and_sanitize_item(vobject_item, is_collection=False, uid=None,
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise ValueError("invalid recurrence rules in %s" %
|
raise ValueError("invalid recurrence rules in %s" %
|
||||||
component.name) from e
|
component.name) from e
|
||||||
elif vobject_item.name == "VCARD" and tag == "VADDRESSBOOK":
|
elif tag == "VADDRESSBOOK":
|
||||||
# https://tools.ietf.org/html/rfc6352#section-5.1
|
# https://tools.ietf.org/html/rfc6352#section-5.1
|
||||||
|
object_uids = set()
|
||||||
|
for vobject_item in vobject_items:
|
||||||
|
if vobject_item.name == "VCARD":
|
||||||
object_uid = get_uid(vobject_item)
|
object_uid = get_uid(vobject_item)
|
||||||
if object_uid is None:
|
if object_uid:
|
||||||
vobject_item.add("UID").value = uid or random_uuid4()
|
object_uids.add(object_uid)
|
||||||
elif not object_uid:
|
for vobject_item in vobject_items:
|
||||||
vobject_item.uid.value = uid or random_uuid4()
|
if vobject_item.name == "VLIST":
|
||||||
elif vobject_item.name == "VLIST" and tag == "VADDRESSBOOK":
|
# Custom format used by SOGo Connector to store lists of
|
||||||
# Custom format used by SOGo Connector to store lists of contacts
|
# contacts
|
||||||
pass
|
continue
|
||||||
|
if vobject_item.name != "VCARD":
|
||||||
|
raise ValueError("Item type %r not supported in %r "
|
||||||
|
"collection" % (vobject_item.name, tag))
|
||||||
|
object_uid = get_uid(vobject_item)
|
||||||
|
if not object_uid:
|
||||||
|
if not is_collection:
|
||||||
|
raise ValueError("%s object without UID" %
|
||||||
|
vobject_item.name)
|
||||||
|
object_uid = find_available_name(object_uids.__contains__)
|
||||||
|
object_uids.add(object_uid)
|
||||||
|
if hasattr(vobject_item, "uid"):
|
||||||
|
vobject_item.uid.value = object_uid
|
||||||
else:
|
else:
|
||||||
|
vobject_item.add("UID").value = object_uid
|
||||||
|
else:
|
||||||
|
for i in vobject_items:
|
||||||
raise ValueError("Item type %r not supported in %s collection" %
|
raise ValueError("Item type %r not supported in %s collection" %
|
||||||
(vobject_item.name, repr(tag) if tag else "generic"))
|
(i.name, repr(tag) if tag else "generic"))
|
||||||
|
|
||||||
|
|
||||||
def check_and_sanitize_props(props):
|
def check_and_sanitize_props(props):
|
||||||
@ -191,10 +228,17 @@ def check_and_sanitize_props(props):
|
|||||||
raise ValueError("Unsupported collection tag: %r" % tag)
|
raise ValueError("Unsupported collection tag: %r" % tag)
|
||||||
|
|
||||||
|
|
||||||
def random_uuid4():
|
def find_available_name(exists_fn, suffix=""):
|
||||||
"""Generate a pseudo-random UUID"""
|
"""Generate a pseudo-random UID"""
|
||||||
|
# Prevent infinite loop
|
||||||
|
for _ in range(1000):
|
||||||
r = "%016x" % getrandbits(128)
|
r = "%016x" % getrandbits(128)
|
||||||
return "%s-%s-%s-%s-%s" % (r[:8], r[8:12], r[12:16], r[16:20], r[20:])
|
name = "%s-%s-%s-%s-%s%s" % (
|
||||||
|
r[:8], r[8:12], r[12:16], r[16:20], r[20:], suffix)
|
||||||
|
if not exists_fn(name):
|
||||||
|
return name
|
||||||
|
# something is wrong with the PRNG
|
||||||
|
raise RuntimeError("No unique random sequence found")
|
||||||
|
|
||||||
|
|
||||||
def scandir(path, only_dirs=False, only_files=False):
|
def scandir(path, only_dirs=False, only_files=False):
|
||||||
@ -612,6 +656,13 @@ class BaseCollection:
|
|||||||
"""
|
"""
|
||||||
return self.get(href) is not None
|
return self.get(href) is not None
|
||||||
|
|
||||||
|
def has_uid(self, uid):
|
||||||
|
"""Check if a UID exists in the collection."""
|
||||||
|
for item in self.get_all():
|
||||||
|
if item.uid == uid:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
def upload(self, href, vobject_item):
|
def upload(self, href, vobject_item):
|
||||||
"""Upload a new or replace an existing item."""
|
"""Upload a new or replace an existing item."""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
@ -791,7 +842,8 @@ class Collection(BaseCollection):
|
|||||||
return os.path.join(filesystem_folder, "collection-root")
|
return os.path.join(filesystem_folder, "collection-root")
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def _atomic_write(self, path, mode="w", newline=None, sync_directory=True):
|
def _atomic_write(self, path, mode="w", newline=None, sync_directory=True,
|
||||||
|
replace_fn=os.replace):
|
||||||
directory = os.path.dirname(path)
|
directory = os.path.dirname(path)
|
||||||
tmp = NamedTemporaryFile(
|
tmp = NamedTemporaryFile(
|
||||||
mode=mode, dir=directory, delete=False, prefix=".Radicale.tmp-",
|
mode=mode, dir=directory, delete=False, prefix=".Radicale.tmp-",
|
||||||
@ -805,7 +857,7 @@ class Collection(BaseCollection):
|
|||||||
raise RuntimeError("Fsync'ing file %r failed: %s" %
|
raise RuntimeError("Fsync'ing file %r failed: %s" %
|
||||||
(path, e)) from e
|
(path, e)) from e
|
||||||
tmp.close()
|
tmp.close()
|
||||||
os.replace(tmp.name, path)
|
replace_fn(tmp.name, path)
|
||||||
except BaseException:
|
except BaseException:
|
||||||
tmp.close()
|
tmp.close()
|
||||||
os.remove(tmp.name)
|
os.remove(tmp.name)
|
||||||
@ -813,16 +865,6 @@ class Collection(BaseCollection):
|
|||||||
if sync_directory:
|
if sync_directory:
|
||||||
self._sync_directory(directory)
|
self._sync_directory(directory)
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _find_available_file_name(exists_fn, suffix=""):
|
|
||||||
# Prevent infinite loop
|
|
||||||
for _ in range(1000):
|
|
||||||
file_name = random_uuid4() + suffix
|
|
||||||
if not exists_fn(file_name):
|
|
||||||
return file_name
|
|
||||||
# something is wrong with the PRNG
|
|
||||||
raise RuntimeError("No unique random sequence found")
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _fsync(cls, fd):
|
def _fsync(cls, fd):
|
||||||
if cls.configuration.getboolean("storage", "filesystem_fsync"):
|
if cls.configuration.getboolean("storage", "filesystem_fsync"):
|
||||||
@ -947,17 +989,29 @@ class Collection(BaseCollection):
|
|||||||
with exception_cm(path):
|
with exception_cm(path):
|
||||||
saved_item_errors = item_errors
|
saved_item_errors = item_errors
|
||||||
collection = None
|
collection = None
|
||||||
|
uids = set()
|
||||||
|
has_child_collections = False
|
||||||
for item in cls.discover(path, "1", exception_cm):
|
for item in cls.discover(path, "1", exception_cm):
|
||||||
if not collection:
|
if not collection:
|
||||||
collection = item
|
collection = item
|
||||||
collection.get_meta()
|
collection.get_meta()
|
||||||
continue
|
continue
|
||||||
if isinstance(item, BaseCollection):
|
if isinstance(item, BaseCollection):
|
||||||
|
has_child_collections = True
|
||||||
remaining_paths.append(item.path)
|
remaining_paths.append(item.path)
|
||||||
|
elif item.uid in uids:
|
||||||
|
cls.logger.error(
|
||||||
|
"Invalid item %r in %r: UID conflict %r",
|
||||||
|
item.href, path.strip("/"), item.uid)
|
||||||
else:
|
else:
|
||||||
|
uids.add(item.uid)
|
||||||
logger.debug("Verified item %r in %r", item.href, path)
|
logger.debug("Verified item %r in %r", item.href, path)
|
||||||
if item_errors == saved_item_errors:
|
if item_errors == saved_item_errors:
|
||||||
collection.sync()
|
collection.sync()
|
||||||
|
if has_child_collections and collection.get_meta("tag"):
|
||||||
|
cls.logger.error("Invalid collection %r: %r must not have "
|
||||||
|
"child collections", path.strip("/"),
|
||||||
|
collection.get_meta("tag"))
|
||||||
return item_errors == 0 and collection_errors == 0
|
return item_errors == 0 and collection_errors == 0
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@ -992,26 +1046,16 @@ class Collection(BaseCollection):
|
|||||||
items.extend(
|
items.extend(
|
||||||
getattr(collection, "%s_list" % content, []))
|
getattr(collection, "%s_list" % content, []))
|
||||||
items_by_uid = groupby(sorted(items, key=get_uid), get_uid)
|
items_by_uid = groupby(sorted(items, key=get_uid), get_uid)
|
||||||
vobject_items = {}
|
|
||||||
|
def vobject_items():
|
||||||
for uid, items in items_by_uid:
|
for uid, items in items_by_uid:
|
||||||
new_collection = vobject.iCalendar()
|
collection = vobject.iCalendar()
|
||||||
for item in items:
|
for item in items:
|
||||||
new_collection.add(item)
|
collection.add(item)
|
||||||
# href must comply to is_safe_filesystem_path_component
|
yield collection
|
||||||
# and no file name collisions must exist between hrefs
|
self._upload_all_nonatomic(vobject_items(), suffix=".ics")
|
||||||
href = self._find_available_file_name(
|
|
||||||
vobject_items.get, suffix=".ics")
|
|
||||||
vobject_items[href] = new_collection
|
|
||||||
self._upload_all_nonatomic(vobject_items)
|
|
||||||
elif props.get("tag") == "VADDRESSBOOK":
|
elif props.get("tag") == "VADDRESSBOOK":
|
||||||
vobject_items = {}
|
self._upload_all_nonatomic(collection, suffix=".vcf")
|
||||||
for card in collection:
|
|
||||||
# href must comply to is_safe_filesystem_path_component
|
|
||||||
# and no file name collisions must exist between hrefs
|
|
||||||
href = self._find_available_file_name(
|
|
||||||
vobject_items.get, suffix=".vcf")
|
|
||||||
vobject_items[href] = card
|
|
||||||
self._upload_all_nonatomic(vobject_items)
|
|
||||||
|
|
||||||
# This operation is not atomic on the filesystem level but it's
|
# This operation is not atomic on the filesystem level but it's
|
||||||
# very unlikely that one rename operations succeeds while the
|
# very unlikely that one rename operations succeeds while the
|
||||||
@ -1023,37 +1067,64 @@ class Collection(BaseCollection):
|
|||||||
|
|
||||||
return cls(sane_path)
|
return cls(sane_path)
|
||||||
|
|
||||||
def upload_all_nonatomic(self, vobject_items):
|
def _upload_all_nonatomic(self, vobject_items, suffix=""):
|
||||||
"""DEPRECATED: Use ``_upload_all_nonatomic``"""
|
|
||||||
return self._upload_all_nonatomic(vobject_items)
|
|
||||||
|
|
||||||
def _upload_all_nonatomic(self, vobject_items):
|
|
||||||
"""Upload a new set of items.
|
"""Upload a new set of items.
|
||||||
|
|
||||||
This takes a mapping of href and vobject items and
|
This takes a list of vobject items and
|
||||||
uploads them nonatomic and without existence checks.
|
uploads them nonatomic and without existence checks.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
cache_folder = os.path.join(self._filesystem_path,
|
cache_folder = os.path.join(self._filesystem_path,
|
||||||
".Radicale.cache", "item")
|
".Radicale.cache", "item")
|
||||||
self._makedirs_synced(cache_folder)
|
self._makedirs_synced(cache_folder)
|
||||||
for href, vobject_item in vobject_items.items():
|
hrefs = set()
|
||||||
if not is_safe_filesystem_path_component(href):
|
for vobject_item in vobject_items:
|
||||||
raise UnsafePathError(href)
|
uid = get_uid_from_object(vobject_item)
|
||||||
try:
|
try:
|
||||||
cache_content = self._item_cache_content(vobject_item)
|
cache_content = self._item_cache_content(vobject_item)
|
||||||
_, _, _, text, _, _, _, _ = cache_content
|
_, _, _, text, _, _, _, _ = cache_content
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
"Failed to store item %r in temporary collection %r: %s" %
|
"Failed to store item %r in temporary collection %r: %s" %
|
||||||
(href, self.path, e)) from e
|
(uid, self.path, e)) from e
|
||||||
|
href_candidates = []
|
||||||
|
if os.name in ("nt", "posix"):
|
||||||
|
href_candidates.append(
|
||||||
|
lambda: uid if uid.lower().endswith(suffix.lower())
|
||||||
|
else uid + suffix)
|
||||||
|
href_candidates.extend((
|
||||||
|
lambda: get_etag(uid).strip('"') + suffix,
|
||||||
|
lambda: find_available_name(hrefs.__contains__, suffix)))
|
||||||
|
href = None
|
||||||
|
|
||||||
|
def replace_fn(source, target):
|
||||||
|
nonlocal href
|
||||||
|
while href_candidates:
|
||||||
|
href = href_candidates.pop(0)()
|
||||||
|
if href in hrefs:
|
||||||
|
continue
|
||||||
|
if not is_safe_filesystem_path_component(href):
|
||||||
|
if not href_candidates:
|
||||||
|
raise UnsafePathError(href)
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
return os.replace(source, path_to_filesystem(
|
||||||
|
self._filesystem_path, href))
|
||||||
|
except OSError as e:
|
||||||
|
if href_candidates and (
|
||||||
|
os.name == "posix" and e.errno == 22 or
|
||||||
|
os.name == "nt" and e.errno == 123):
|
||||||
|
continue
|
||||||
|
raise
|
||||||
|
|
||||||
|
with self._atomic_write(os.path.join(self._filesystem_path, "ign"),
|
||||||
|
newline="", sync_directory=False,
|
||||||
|
replace_fn=replace_fn) as f:
|
||||||
|
f.write(text)
|
||||||
|
hrefs.add(href)
|
||||||
with self._atomic_write(os.path.join(cache_folder, href), "wb",
|
with self._atomic_write(os.path.join(cache_folder, href), "wb",
|
||||||
sync_directory=False) as f:
|
sync_directory=False) as f:
|
||||||
pickle.dump(cache_content, f)
|
pickle.dump(cache_content, f)
|
||||||
path = path_to_filesystem(self._filesystem_path, href)
|
|
||||||
with self._atomic_write(
|
|
||||||
path, newline="", sync_directory=False) as f:
|
|
||||||
f.write(text)
|
|
||||||
self._sync_directory(cache_folder)
|
self._sync_directory(cache_folder)
|
||||||
self._sync_directory(self._filesystem_path)
|
self._sync_directory(self._filesystem_path)
|
||||||
|
|
||||||
@ -1412,12 +1483,9 @@ class Collection(BaseCollection):
|
|||||||
try:
|
try:
|
||||||
vobject_items = tuple(vobject.readComponents(
|
vobject_items = tuple(vobject.readComponents(
|
||||||
raw_text.decode(self._encoding)))
|
raw_text.decode(self._encoding)))
|
||||||
if len(vobject_items) != 1:
|
check_and_sanitize_items(vobject_items,
|
||||||
raise RuntimeError("Content contains %d components"
|
|
||||||
% len(vobject_items))
|
|
||||||
vobject_item = vobject_items[0]
|
|
||||||
check_and_sanitize_item(vobject_item, uid=uid,
|
|
||||||
tag=self.get_meta("tag"))
|
tag=self.get_meta("tag"))
|
||||||
|
vobject_item = vobject_items[0]
|
||||||
cache_hash, uid, etag, text, name, tag, start, end = \
|
cache_hash, uid, etag, text, name, tag, start, end = \
|
||||||
self._store_item_cache(
|
self._store_item_cache(
|
||||||
href, vobject_item, input_hash)
|
href, vobject_item, input_hash)
|
||||||
|
34
radicale/tests/static/event1_modified.ics
Normal file
34
radicale/tests/static/event1_modified.ics
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
BEGIN:VCALENDAR
|
||||||
|
PRODID:-//Mozilla.org/NONSGML Mozilla Calendar V1.1//EN
|
||||||
|
VERSION:2.0
|
||||||
|
BEGIN:VTIMEZONE
|
||||||
|
TZID:Europe/Paris
|
||||||
|
X-LIC-LOCATION:Europe/Paris
|
||||||
|
BEGIN:DAYLIGHT
|
||||||
|
TZOFFSETFROM:+0100
|
||||||
|
TZOFFSETTO:+0200
|
||||||
|
TZNAME:CEST
|
||||||
|
DTSTART:19700329T020000
|
||||||
|
RRULE:FREQ=YEARLY;BYDAY=-1SU;BYMONTH=3
|
||||||
|
END:DAYLIGHT
|
||||||
|
BEGIN:STANDARD
|
||||||
|
TZOFFSETFROM:+0200
|
||||||
|
TZOFFSETTO:+0100
|
||||||
|
TZNAME:CET
|
||||||
|
DTSTART:19701025T030000
|
||||||
|
RRULE:FREQ=YEARLY;BYDAY=-1SU;BYMONTH=10
|
||||||
|
END:STANDARD
|
||||||
|
END:VTIMEZONE
|
||||||
|
BEGIN:VEVENT
|
||||||
|
CREATED:20130902T150157Z
|
||||||
|
LAST-MODIFIED:20130902T150158Z
|
||||||
|
DTSTAMP:20130902T150159Z
|
||||||
|
UID:event1
|
||||||
|
SUMMARY:Event
|
||||||
|
ORGANIZER:mailto:unclesam@example.com
|
||||||
|
ATTENDEE;ROLE=REQ-PARTICIPANT;PARTSTAT=TENTATIVE;CN=Jane Doe:MAILTO:janedoe@example.com
|
||||||
|
ATTENDEE;ROLE=REQ-PARTICIPANT;DELEGATED-FROM="MAILTO:bob@host.com";PARTSTAT=ACCEPTED;CN=John Doe:MAILTO:johndoe@example.com
|
||||||
|
DTSTART;TZID=Europe/Paris:20130901T180000
|
||||||
|
DTEND;TZID=Europe/Paris:20130901T190000
|
||||||
|
END:VEVENT
|
||||||
|
END:VCALENDAR
|
@ -1508,6 +1508,62 @@ class TestMultiFileSystem(BaseFileSystemTest, BaseRequestsMixIn):
|
|||||||
assert answer1 == answer2
|
assert answer1 == answer2
|
||||||
assert os.path.exists(os.path.join(cache_folder, "event1.ics"))
|
assert os.path.exists(os.path.join(cache_folder, "event1.ics"))
|
||||||
|
|
||||||
|
@pytest.mark.skipif(os.name not in ("nt", "posix"),
|
||||||
|
reason="Only supported on 'nt' and 'posix'")
|
||||||
|
def test_put_whole_calendar_uids_used_as_file_names(self):
|
||||||
|
"""Test if UIDs are used as file names."""
|
||||||
|
BaseRequestsMixIn.test_put_whole_calendar(self)
|
||||||
|
for uid in ("todo", "event"):
|
||||||
|
status, _, answer = self.request(
|
||||||
|
"GET", "/calendar.ics/%s.ics" % uid)
|
||||||
|
assert status == 200
|
||||||
|
assert "\r\nUID:%s\r\n" % uid in answer
|
||||||
|
|
||||||
|
@pytest.mark.skipif(os.name not in ("nt", "posix"),
|
||||||
|
reason="Only supported on 'nt' and 'posix'")
|
||||||
|
def test_put_whole_calendar_random_uids_used_as_file_names(self):
|
||||||
|
"""Test if UIDs are used as file names."""
|
||||||
|
BaseRequestsMixIn.test_put_whole_calendar_without_uids(self)
|
||||||
|
status, _, answer = self.request("GET", "/calendar.ics")
|
||||||
|
assert status == 200
|
||||||
|
uids = []
|
||||||
|
for line in answer.split("\r\n"):
|
||||||
|
if line.startswith("UID:"):
|
||||||
|
uids.append(line[len("UID:"):])
|
||||||
|
for uid in uids:
|
||||||
|
status, _, answer = self.request(
|
||||||
|
"GET", "/calendar.ics/%s.ics" % uid)
|
||||||
|
assert status == 200
|
||||||
|
assert "\r\nUID:%s\r\n" % uid in answer
|
||||||
|
|
||||||
|
@pytest.mark.skipif(os.name not in ("nt", "posix"),
|
||||||
|
reason="Only supported on 'nt' and 'posix'")
|
||||||
|
def test_put_whole_addressbook_uids_used_as_file_names(self):
|
||||||
|
"""Test if UIDs are used as file names."""
|
||||||
|
BaseRequestsMixIn.test_put_whole_addressbook(self)
|
||||||
|
for uid in ("contact1", "contact2"):
|
||||||
|
status, _, answer = self.request(
|
||||||
|
"GET", "/contacts.vcf/%s.vcf" % uid)
|
||||||
|
assert status == 200
|
||||||
|
assert "\r\nUID:%s\r\n" % uid in answer
|
||||||
|
|
||||||
|
@pytest.mark.skipif(os.name not in ("nt", "posix"),
|
||||||
|
reason="Only supported on 'nt' and 'posix'")
|
||||||
|
def test_put_whole_addressbook_random_uids_used_as_file_names(self):
|
||||||
|
"""Test if UIDs are used as file names."""
|
||||||
|
BaseRequestsMixIn.test_put_whole_addressbook_without_uids(self)
|
||||||
|
status, _, answer = self.request("GET", "/contacts.vcf")
|
||||||
|
assert status == 200
|
||||||
|
uids = []
|
||||||
|
for line in answer.split("\r\n"):
|
||||||
|
if line.startswith("UID:"):
|
||||||
|
uids.append(line[len("UID:"):])
|
||||||
|
for uid in uids:
|
||||||
|
status, _, answer = self.request(
|
||||||
|
"GET", "/contacts.vcf/%s.vcf" % uid)
|
||||||
|
assert status == 200
|
||||||
|
assert "\r\nUID:%s\r\n" % uid in answer
|
||||||
|
|
||||||
|
|
||||||
class TestCustomStorageSystem(BaseFileSystemTest):
|
class TestCustomStorageSystem(BaseFileSystemTest):
|
||||||
"""Test custom backend loading."""
|
"""Test custom backend loading."""
|
||||||
|
Loading…
Reference in New Issue
Block a user