diff --git a/config b/config index 8f332e6..9fa2f1e 100644 --- a/config +++ b/config @@ -114,6 +114,9 @@ # power fails! #filesystem_fsync = True +# Delete sync token that are older (seconds) +#max_sync_token_age = 2592000 + # Close the lock file when no more clients are waiting. # This option is not very useful in general, but on Windows files that are # opened cannot be deleted. diff --git a/radicale/__init__.py b/radicale/__init__.py index 07c1533..5d3def1 100644 --- a/radicale/__init__.py +++ b/radicale/__init__.py @@ -828,7 +828,8 @@ class Application: else: collection = item.collection headers = {"Content-Type": "text/xml; charset=%s" % self.encoding} - xml_answer = xmlutils.report( + status, xml_answer = xmlutils.report( base_prefix, path, xml_content, collection) - return (client.MULTI_STATUS, headers, - self._write_xml_content(xml_answer)) + if status == client.PRECONDITION_FAILED: + return PRECONDITION_FAILED + return (status, headers, self._write_xml_content(xml_answer)) diff --git a/radicale/config.py b/radicale/config.py index a275af6..5c55ca0 100644 --- a/radicale/config.py +++ b/radicale/config.py @@ -160,6 +160,10 @@ INITIAL_CONFIG = OrderedDict([ "/var/lib/radicale/collections"), "help": "path where collections are stored", "type": str}), + ("max_sync_token_age", { + "value": 2592000, # 30 days + "help": "delete sync token that are older", + "type": int}), ("filesystem_fsync", { "value": "True", "help": "sync all changes to filesystem during requests", diff --git a/radicale/storage.py b/radicale/storage.py index 240993e..bfe7b0e 100644 --- a/radicale/storage.py +++ b/radicale/storage.py @@ -25,11 +25,13 @@ entry. """ +import binascii import contextlib import datetime import errno import json import os +import pickle import posixpath import shlex import stat @@ -39,7 +41,7 @@ import time from contextlib import contextmanager from hashlib import md5 from importlib import import_module -from itertools import groupby +from itertools import chain, groupby from random import getrandbits from tempfile import NamedTemporaryFile, TemporaryDirectory @@ -300,6 +302,24 @@ class BaseCollection: """ raise NotImplementedError + def sync(self, old_token=None): + """Get the current sync token and changed items for synchronization. + + ``old_token`` an old sync token which is used as the base of the + delta update. If sync token is missing, all items are returned. + ValueError is raised for invalid or old tokens. + + WARNING: This simple default implementation treats all sync-token as + invalid. It adheres to the specification but some clients + (e.g. InfCloud) don't like it. Subclasses should provide a + more sophisticated implementation. + + """ + token = "http://radicale.org/ns/sync/%s" % self.etag.strip("\"") + if old_token: + raise ValueError("Sync token are not supported") + return token, self.list() + def list(self): """List collection items.""" raise NotImplementedError @@ -405,8 +425,8 @@ class Collection(BaseCollection): def _atomic_write(self, path, mode="w", newline=None): directory = os.path.dirname(path) tmp = NamedTemporaryFile( - mode=mode, dir=directory, encoding=self.encoding, - delete=False, prefix=".Radicale.tmp-", newline=newline) + mode=mode, dir=directory, delete=False, prefix=".Radicale.tmp-", + newline=newline, encoding=None if "b" in mode else self.encoding) try: yield tmp self._fsync(tmp.fileno()) @@ -623,6 +643,196 @@ class Collection(BaseCollection): cls._sync_directory(to_collection._filesystem_path) if item.collection._filesystem_path != to_collection._filesystem_path: cls._sync_directory(item.collection._filesystem_path) + # Track the change + to_collection._update_history_etag(to_href, item) + item.collection._update_history_etag(item.href, None) + to_collection._clean_history_cache() + if item.collection._filesystem_path != to_collection._filesystem_path: + item.collection._clean_history_cache() + + @classmethod + def _clean_cache(cls, folder, names, max_age=None): + """Delete all ``names`` in ``folder`` that are older than ``max_age``. + """ + age_limit = time.time() - max_age if max_age is not None else None + modified = False + for name in names: + if not is_safe_filesystem_path_component(name): + continue + if age_limit is not None: + try: + # Race: Another process might have deleted the file. + mtime = os.path.getmtime(os.path.join(folder, name)) + except FileNotFoundError: + continue + if mtime > age_limit: + continue + cls.logger.debug("Found expired item in cache: %r", name) + # Race: Another process might have deleted or locked the + # file. + try: + os.remove(os.path.join(folder, name)) + except (FileNotFoundError, PermissionError): + continue + modified = True + if modified: + cls._sync_directory(folder) + + def _update_history_etag(self, href, item): + """Updates and retrieves the history etag from the history cache. + + The history cache contains a file for each current and deleted item + of the collection. These files contain the etag of the item (empty + string for deleted items) and a history etag, which is a hash over + the previous history etag and the etag separated by "/". + """ + history_folder = os.path.join(self._filesystem_path, + ".Radicale.cache", "history") + try: + with open(os.path.join(history_folder, href), "rb") as f: + cache_etag, history_etag = pickle.load(f) + except (FileNotFoundError, pickle.UnpicklingError, ValueError) as e: + if isinstance(e, (pickle.UnpicklingError, ValueError)): + self.logger.warning( + "Failed to load history cache entry %r in %r: %s", + href, self.path, e, exc_info=True) + # Delete the damaged file + try: + os.remove(os.path.join(history_folder, href)) + except (FileNotFoundError, PermissionError): + pass + cache_etag = "" + # Initialize with random data to prevent collisions with cleaned + # expired items. + history_etag = binascii.hexlify(os.urandom(16)).decode("ascii") + etag = item.etag if item else "" + if etag != cache_etag: + self._makedirs_synced(history_folder) + history_etag = get_etag(history_etag + "/" + etag).strip("\"") + try: + # Race: Other processes might have created and locked the file. + with self._atomic_write(os.path.join(history_folder, href), + "wb") as f: + pickle.dump([etag, history_etag], f) + except PermissionError: + pass + return history_etag + + def _get_deleted_history_hrefs(self): + """Returns the hrefs of all deleted items that are still in the + history cache.""" + history_folder = os.path.join(self._filesystem_path, + ".Radicale.cache", "history") + try: + for href in os.listdir(history_folder): + if not is_safe_filesystem_path_component(href): + continue + if os.path.isfile(os.path.join(self._filesystem_path, href)): + continue + yield href + except FileNotFoundError: + pass + + def _clean_history_cache(self): + # Delete all expired cache entries of deleted items. + history_folder = os.path.join(self._filesystem_path, + ".Radicale.cache", "history") + self._clean_cache(history_folder, self._get_deleted_history_hrefs(), + max_age=self.configuration.getint( + "storage", "max_sync_token_age")) + + def sync(self, old_token=None): + # The sync token has the form http://radicale.org/ns/sync/TOKEN_NAME + # where TOKEN_NAME is the md5 hash of all history etags of present and + # past items of the collection. + def check_token_name(token_name): + if len(token_name) != 32: + return False + for c in token_name: + if c not in "0123456789abcdef": + return False + return True + + old_token_name = None + if old_token: + # Extract the token name from the sync token + if not old_token.startswith("http://radicale.org/ns/sync/"): + raise ValueError("Malformed token: %s" % old_token) + old_token_name = old_token[len("http://radicale.org/ns/sync/"):] + if not check_token_name(old_token_name): + raise ValueError("Malformed token: %s" % old_token) + # Get the current state and sync-token of the collection. + state = {} + token_name_hash = md5() + # Find the history of all existing and deleted items + for href, item in chain( + ((item.href, item) for item in self.pre_filtered_list(())), + ((href, None) for href in self._get_deleted_history_hrefs())): + history_etag = self._update_history_etag(href, item) + state[href] = history_etag + token_name_hash.update((href + "/" + history_etag).encode("utf-8")) + token_name = token_name_hash.hexdigest() + token = "http://radicale.org/ns/sync/%s" % token_name + if token_name == old_token_name: + # Nothing changed + return token, () + token_folder = os.path.join(self._filesystem_path, + ".Radicale.cache", "sync-token") + token_path = os.path.join(token_folder, token_name) + old_state = {} + if old_token_name: + # load the old token state + old_token_path = os.path.join(token_folder, old_token_name) + try: + # Race: Another process might have deleted the file. + with open(old_token_path, "rb") as f: + old_state = pickle.load(f) + except (FileNotFoundError, pickle.UnpicklingError, + ValueError) as e: + if isinstance(e, (pickle.UnpicklingError, ValueError)): + self.logger.warning( + "Failed to load stored sync token %r in %r: %s", + old_token_name, self.path, e, exc_info=True) + # Delete the damaged file + try: + os.remove(old_token_path) + except (FileNotFoundError, PermissionError): + pass + raise ValueError("Token not found: %s" % old_token) + # write the new token state or update the modification time of + # existing token state + if not os.path.exists(token_path): + self._makedirs_synced(token_folder) + try: + # Race: Other processes might have created and locked the file. + with self._atomic_write(token_path, "wb") as f: + pickle.dump(state, f) + except PermissionError: + pass + else: + # clean up old sync tokens and item cache + self._clean_cache(token_folder, os.listdir(token_folder), + max_age=self.configuration.getint( + "storage", "max_sync_token_age")) + self._clean_history_cache() + else: + # Try to update the modification time + try: + # Race: Another process might have deleted the file. + os.utime(token_path) + except FileNotFoundError: + pass + changes = [] + # Find all new, changed and deleted (that are still in the item cache) + # items + for href, history_etag in state.items(): + if history_etag != old_state.get(href): + changes.append(href) + # Find all deleted items that are no longer in the item cache + for href, history_etag in old_state.items(): + if href not in state: + changes.append(href) + return token, changes def list(self): for href in os.listdir(self._filesystem_path): @@ -664,6 +874,9 @@ class Collection(BaseCollection): item = Item(self, vobject_item, href) with self._atomic_write(path, newline="") as fd: fd.write(item.serialize()) + # Track the change + self._update_history_etag(href, item) + self._clean_history_cache() return item def delete(self, href=None): @@ -689,6 +902,9 @@ class Collection(BaseCollection): raise ComponentNotFoundError(href) os.remove(path) self._sync_directory(os.path.dirname(path)) + # Track the change + self._update_history_etag(href, None) + self._clean_history_cache() def get_meta(self, key=None): if os.path.exists(self._props_path): diff --git a/radicale/tests/test_base.py b/radicale/tests/test_base.py index 60adb76..6591ea8 100644 --- a/radicale/tests/test_base.py +++ b/radicale/tests/test_base.py @@ -24,6 +24,7 @@ import os import posixpath import shutil import tempfile +import xml.etree.ElementTree as ET import pytest from radicale import Application, config @@ -763,6 +764,177 @@ class BaseRequestsMixIn: assert status == 207 assert "href>%s<" % event_path in answer + def _report_sync_token(self, calendar_path, sync_token=None): + sync_token_xml = ( + "" % sync_token + if sync_token else "") + status, headers, answer = self.request( + "REPORT", calendar_path, + """ + + + + + %s + """ % sync_token_xml) + if sync_token and status == 412: + return None, None + assert status == 207 + xml = ET.fromstring(answer) + sync_token = xml.find("{DAV:}sync-token").text.strip() + assert sync_token + return sync_token, xml + + def test_report_sync_collection_no_change(self): + """Test sync-collection report without modifying the collection""" + calendar_path = "/calendar.ics/" + self.request("MKCALENDAR", calendar_path) + event = get_file_content("event1.ics") + event_path = posixpath.join(calendar_path, "event.ics") + self.request("PUT", event_path, event) + sync_token, xml = self._report_sync_token(calendar_path) + assert xml.find("{DAV:}response") is not None + new_sync_token, xml = self._report_sync_token(calendar_path, + sync_token) + assert sync_token == new_sync_token + assert xml.find("{DAV:}response") is None + + def test_report_sync_collection_add(self): + """Test sync-collection report with an added item""" + calendar_path = "/calendar.ics/" + self.request("MKCALENDAR", calendar_path) + sync_token, xml = self._report_sync_token(calendar_path) + event = get_file_content("event1.ics") + event_path = posixpath.join(calendar_path, "event.ics") + self.request("PUT", event_path, event) + sync_token, xml = self._report_sync_token(calendar_path, sync_token) + if not sync_token: + pytest.skip("storage backend does not support sync-token") + assert xml.find("{DAV:}response") is not None + assert xml.find("{DAV:}response/{DAV:}status") is None + + def test_report_sync_collection_delete(self): + """Test sync-collection report with a deleted item""" + calendar_path = "/calendar.ics/" + self.request("MKCALENDAR", calendar_path) + event = get_file_content("event1.ics") + event_path = posixpath.join(calendar_path, "event.ics") + self.request("PUT", event_path, event) + sync_token, xml = self._report_sync_token(calendar_path) + self.request("DELETE", event_path) + sync_token, xml = self._report_sync_token(calendar_path, sync_token) + if not sync_token: + pytest.skip("storage backend does not support sync-token") + assert "404" in xml.find("{DAV:}response/{DAV:}status").text + + def test_report_sync_collection_create_delete(self): + """Test sync-collection report with a created and deleted item""" + calendar_path = "/calendar.ics/" + self.request("MKCALENDAR", calendar_path) + sync_token, xml = self._report_sync_token(calendar_path) + event = get_file_content("event1.ics") + event_path = posixpath.join(calendar_path, "event.ics") + self.request("PUT", event_path, event) + self.request("DELETE", event_path) + sync_token, xml = self._report_sync_token(calendar_path, sync_token) + if not sync_token: + pytest.skip("storage backend does not support sync-token") + assert "404" in xml.find("{DAV:}response/{DAV:}status").text + + def test_report_sync_collection_modify_undo(self): + """Test sync-collection report with a modified and changed back item""" + calendar_path = "/calendar.ics/" + self.request("MKCALENDAR", calendar_path) + event1 = get_file_content("event1.ics") + event2 = get_file_content("event2.ics") + event_path = posixpath.join(calendar_path, "event1.ics") + self.request("PUT", event_path, event1) + sync_token, xml = self._report_sync_token(calendar_path) + self.request("PUT", event_path, event2) + self.request("PUT", event_path, event1) + sync_token, xml = self._report_sync_token(calendar_path, sync_token) + if not sync_token: + pytest.skip("storage backend does not support sync-token") + assert xml.find("{DAV:}response") is not None + assert xml.find("{DAV:}response/{DAV:}status") is None + + def test_report_sync_collection_move(self): + """Test sync-collection report a moved item""" + calendar_path = "/calendar.ics/" + self.request("MKCALENDAR", calendar_path) + event = get_file_content("event1.ics") + event1_path = posixpath.join(calendar_path, "event1.ics") + event2_path = posixpath.join(calendar_path, "event2.ics") + self.request("PUT", event1_path, event) + sync_token, xml = self._report_sync_token(calendar_path) + status, headers, answer = self.request( + "MOVE", event1_path, HTTP_DESTINATION=event2_path, HTTP_HOST="") + sync_token, xml = self._report_sync_token(calendar_path, sync_token) + if not sync_token: + pytest.skip("storage backend does not support sync-token") + for response in xml.findall("{DAV:}response"): + if response.find("{DAV:}status") is None: + assert response.find("{DAV:}href").text == event2_path + else: + assert "404" in response.find("{DAV:}status").text + assert response.find("{DAV:}href").text == event1_path + + def test_report_sync_collection_move_undo(self): + """Test sync-collection report with a moved and moved back item""" + calendar_path = "/calendar.ics/" + self.request("MKCALENDAR", calendar_path) + event = get_file_content("event1.ics") + event1_path = posixpath.join(calendar_path, "event1.ics") + event2_path = posixpath.join(calendar_path, "event2.ics") + self.request("PUT", event1_path, event) + sync_token, xml = self._report_sync_token(calendar_path) + status, headers, answer = self.request( + "MOVE", event1_path, HTTP_DESTINATION=event2_path, HTTP_HOST="") + status, headers, answer = self.request( + "MOVE", event2_path, HTTP_DESTINATION=event1_path, HTTP_HOST="") + sync_token, xml = self._report_sync_token(calendar_path, sync_token) + if not sync_token: + pytest.skip("storage backend does not support sync-token") + created = deleted = 0 + for response in xml.findall("{DAV:}response"): + if response.find("{DAV:}status") is None: + assert response.find("{DAV:}href").text == event1_path + created += 1 + else: + assert "404" in response.find("{DAV:}status").text + assert response.find("{DAV:}href").text == event2_path + deleted += 1 + assert created == 1 and deleted == 1 + + def test_report_sync_collection_invalid_sync_token(self): + """Test sync-collection report with an invalid sync token""" + calendar_path = "/calendar.ics/" + self.request("MKCALENDAR", calendar_path) + sync_token, xml = self._report_sync_token( + calendar_path, "http://radicale.org/ns/sync/INVALID") + assert not sync_token + + def test_propfind_sync_token(self): + """Retrieve the sync-token with a propfind request""" + calendar_path = "/calendar.ics/" + self.request("MKCALENDAR", calendar_path) + sync_token, xml = self._report_sync_token(calendar_path) + event = get_file_content("event1.ics") + event_path = posixpath.join(calendar_path, "event.ics") + self.request("PUT", event_path, event) + new_sync_token, xml = self._report_sync_token(calendar_path, + sync_token) + assert sync_token != new_sync_token + + def test_propfind_same_as_sync_collection_sync_token(self): + """Compare sync-token property with sync-collection sync-token""" + calendar_path = "/calendar.ics/" + self.request("MKCALENDAR", calendar_path) + sync_token, xml = self._report_sync_token(calendar_path) + new_sync_token, xml = self._report_sync_token(calendar_path, + sync_token) + assert sync_token == new_sync_token + def test_authorization(self): authorization = "Basic " + base64.b64encode(b"user:").decode() status, headers, answer = self.request( diff --git a/radicale/xmlutils.py b/radicale/xmlutils.py index 406b760..3c34fa8 100644 --- a/radicale/xmlutils.py +++ b/radicale/xmlutils.py @@ -617,6 +617,7 @@ def _propfind_response(base_prefix, path, item, props, user, write=False, if is_collection: prop200.append(ET.Element(_tag("CS", "getctag"))) + prop200.append(ET.Element(_tag("D", "sync-token"))) prop200.append(ET.Element(_tag("C", "calendar-timezone"))) prop200.append(ET.Element(_tag("D", "displayname"))) prop200.append(ET.Element(_tag("ICAL", "calendar-color"))) @@ -732,6 +733,11 @@ def _propfind_response(base_prefix, path, item, props, user, write=False, element.text = item.etag else: is404 = True + elif tag == _tag("D", "sync-token"): + if is_leaf: + element.text, _ = item.sync() + else: + is404 = True else: human_tag = _tag_from_clark(tag) meta = item.get_meta(human_tag) @@ -841,7 +847,7 @@ def report(base_prefix, path, xml_request, collection): # support for them) and stops working if an error code is returned. collection.logger.warning("Unsupported REPORT method %r on %r " "requested", root.tag, path) - return multistatus + return client.MULTI_STATUS, multistatus prop_element = root.find(_tag("D", "prop")) props = ( [prop.tag for prop in prop_element] @@ -860,6 +866,25 @@ def report(base_prefix, path, xml_request, collection): else: collection.logger.warning("Skipping invalid path %r in REPORT " "request on %r", href_path, path) + elif root.tag == _tag("D", "sync-collection"): + old_sync_token_element = root.find(_tag("D", "sync-token")) + old_sync_token = "" + if old_sync_token_element is not None and old_sync_token_element.text: + old_sync_token = old_sync_token_element.text.strip() + collection.logger.debug("Client provided sync token: %r", + old_sync_token) + try: + sync_token, names = collection.sync(old_sync_token) + except ValueError as e: + # Invalid sync token + collection.logger.info("Client provided invalid sync token %r: %s", + old_sync_token, e, exc_info=True) + return client.PRECONDITION_FAILED, None + hreferences = ("/" + posixpath.join(collection.path, n) for n in names) + # Append current sync token to response + sync_token_element = ET.Element(_tag("D", "sync-token")) + sync_token_element.text = sync_token + multistatus.append(sync_token_element) else: hreferences = (path,) filters = ( @@ -931,7 +956,7 @@ def report(base_prefix, path, xml_request, collection): base_prefix, uri, found_props=found_props, not_found_props=not_found_props, found_item=True)) - return multistatus + return client.MULTI_STATUS, multistatus def _item_response(base_prefix, href, found_props=(), not_found_props=(),