From f633b48a7a02286a59eda7ede7b3008d43b6ae4b Mon Sep 17 00:00:00 2001 From: Unrud Date: Fri, 2 Jun 2017 12:44:39 +0200 Subject: [PATCH] Real sync-collection support for the multifilesystem backend --- config | 3 + radicale/config.py | 4 ++ radicale/storage.py | 172 +++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 178 insertions(+), 1 deletion(-) diff --git a/config b/config index 958fb46..20ad0da 100644 --- a/config +++ b/config @@ -110,6 +110,9 @@ # power fails! #filesystem_fsync = True +# Delete sync token that are older (seconds) +#max_sync_token_age = 2592000 + # Close the lock file when no more clients are waiting. # This option is not very useful in general, but on Windows files that are # opened cannot be deleted. diff --git a/radicale/config.py b/radicale/config.py index 3db0e5b..ba0ae2a 100644 --- a/radicale/config.py +++ b/radicale/config.py @@ -135,6 +135,10 @@ INITIAL_CONFIG = OrderedDict([ "/var/lib/radicale/collections"), "help": "path where collections are stored", "type": str}), + ("max_sync_token_age", { + "value": 2592000, # 30 days + "help": "delete sync token that are older", + "type": int}), ("filesystem_fsync", { "value": "True", "help": "sync all changes to filesystem during requests", diff --git a/radicale/storage.py b/radicale/storage.py index 9225694..bfe7b0e 100644 --- a/radicale/storage.py +++ b/radicale/storage.py @@ -25,11 +25,13 @@ entry. """ +import binascii import contextlib import datetime import errno import json import os +import pickle import posixpath import shlex import stat @@ -39,7 +41,7 @@ import time from contextlib import contextmanager from hashlib import md5 from importlib import import_module -from itertools import groupby +from itertools import chain, groupby from random import getrandbits from tempfile import NamedTemporaryFile, TemporaryDirectory @@ -641,6 +643,12 @@ class Collection(BaseCollection): cls._sync_directory(to_collection._filesystem_path) if item.collection._filesystem_path != to_collection._filesystem_path: cls._sync_directory(item.collection._filesystem_path) + # Track the change + to_collection._update_history_etag(to_href, item) + item.collection._update_history_etag(item.href, None) + to_collection._clean_history_cache() + if item.collection._filesystem_path != to_collection._filesystem_path: + item.collection._clean_history_cache() @classmethod def _clean_cache(cls, folder, names, max_age=None): @@ -670,6 +678,162 @@ class Collection(BaseCollection): if modified: cls._sync_directory(folder) + def _update_history_etag(self, href, item): + """Updates and retrieves the history etag from the history cache. + + The history cache contains a file for each current and deleted item + of the collection. These files contain the etag of the item (empty + string for deleted items) and a history etag, which is a hash over + the previous history etag and the etag separated by "/". + """ + history_folder = os.path.join(self._filesystem_path, + ".Radicale.cache", "history") + try: + with open(os.path.join(history_folder, href), "rb") as f: + cache_etag, history_etag = pickle.load(f) + except (FileNotFoundError, pickle.UnpicklingError, ValueError) as e: + if isinstance(e, (pickle.UnpicklingError, ValueError)): + self.logger.warning( + "Failed to load history cache entry %r in %r: %s", + href, self.path, e, exc_info=True) + # Delete the damaged file + try: + os.remove(os.path.join(history_folder, href)) + except (FileNotFoundError, PermissionError): + pass + cache_etag = "" + # Initialize with random data to prevent collisions with cleaned + # expired items. + history_etag = binascii.hexlify(os.urandom(16)).decode("ascii") + etag = item.etag if item else "" + if etag != cache_etag: + self._makedirs_synced(history_folder) + history_etag = get_etag(history_etag + "/" + etag).strip("\"") + try: + # Race: Other processes might have created and locked the file. + with self._atomic_write(os.path.join(history_folder, href), + "wb") as f: + pickle.dump([etag, history_etag], f) + except PermissionError: + pass + return history_etag + + def _get_deleted_history_hrefs(self): + """Returns the hrefs of all deleted items that are still in the + history cache.""" + history_folder = os.path.join(self._filesystem_path, + ".Radicale.cache", "history") + try: + for href in os.listdir(history_folder): + if not is_safe_filesystem_path_component(href): + continue + if os.path.isfile(os.path.join(self._filesystem_path, href)): + continue + yield href + except FileNotFoundError: + pass + + def _clean_history_cache(self): + # Delete all expired cache entries of deleted items. + history_folder = os.path.join(self._filesystem_path, + ".Radicale.cache", "history") + self._clean_cache(history_folder, self._get_deleted_history_hrefs(), + max_age=self.configuration.getint( + "storage", "max_sync_token_age")) + + def sync(self, old_token=None): + # The sync token has the form http://radicale.org/ns/sync/TOKEN_NAME + # where TOKEN_NAME is the md5 hash of all history etags of present and + # past items of the collection. + def check_token_name(token_name): + if len(token_name) != 32: + return False + for c in token_name: + if c not in "0123456789abcdef": + return False + return True + + old_token_name = None + if old_token: + # Extract the token name from the sync token + if not old_token.startswith("http://radicale.org/ns/sync/"): + raise ValueError("Malformed token: %s" % old_token) + old_token_name = old_token[len("http://radicale.org/ns/sync/"):] + if not check_token_name(old_token_name): + raise ValueError("Malformed token: %s" % old_token) + # Get the current state and sync-token of the collection. + state = {} + token_name_hash = md5() + # Find the history of all existing and deleted items + for href, item in chain( + ((item.href, item) for item in self.pre_filtered_list(())), + ((href, None) for href in self._get_deleted_history_hrefs())): + history_etag = self._update_history_etag(href, item) + state[href] = history_etag + token_name_hash.update((href + "/" + history_etag).encode("utf-8")) + token_name = token_name_hash.hexdigest() + token = "http://radicale.org/ns/sync/%s" % token_name + if token_name == old_token_name: + # Nothing changed + return token, () + token_folder = os.path.join(self._filesystem_path, + ".Radicale.cache", "sync-token") + token_path = os.path.join(token_folder, token_name) + old_state = {} + if old_token_name: + # load the old token state + old_token_path = os.path.join(token_folder, old_token_name) + try: + # Race: Another process might have deleted the file. + with open(old_token_path, "rb") as f: + old_state = pickle.load(f) + except (FileNotFoundError, pickle.UnpicklingError, + ValueError) as e: + if isinstance(e, (pickle.UnpicklingError, ValueError)): + self.logger.warning( + "Failed to load stored sync token %r in %r: %s", + old_token_name, self.path, e, exc_info=True) + # Delete the damaged file + try: + os.remove(old_token_path) + except (FileNotFoundError, PermissionError): + pass + raise ValueError("Token not found: %s" % old_token) + # write the new token state or update the modification time of + # existing token state + if not os.path.exists(token_path): + self._makedirs_synced(token_folder) + try: + # Race: Other processes might have created and locked the file. + with self._atomic_write(token_path, "wb") as f: + pickle.dump(state, f) + except PermissionError: + pass + else: + # clean up old sync tokens and item cache + self._clean_cache(token_folder, os.listdir(token_folder), + max_age=self.configuration.getint( + "storage", "max_sync_token_age")) + self._clean_history_cache() + else: + # Try to update the modification time + try: + # Race: Another process might have deleted the file. + os.utime(token_path) + except FileNotFoundError: + pass + changes = [] + # Find all new, changed and deleted (that are still in the item cache) + # items + for href, history_etag in state.items(): + if history_etag != old_state.get(href): + changes.append(href) + # Find all deleted items that are no longer in the item cache + for href, history_etag in old_state.items(): + if href not in state: + changes.append(href) + return token, changes + def list(self): for href in os.listdir(self._filesystem_path): if not is_safe_filesystem_path_component(href): @@ -710,6 +874,9 @@ class Collection(BaseCollection): item = Item(self, vobject_item, href) with self._atomic_write(path, newline="") as fd: fd.write(item.serialize()) + # Track the change + self._update_history_etag(href, item) + self._clean_history_cache() return item def delete(self, href=None): @@ -735,6 +902,9 @@ class Collection(BaseCollection): raise ComponentNotFoundError(href) os.remove(path) self._sync_directory(os.path.dirname(path)) + # Track the change + self._update_history_etag(href, None) + self._clean_history_cache() def get_meta(self, key=None): if os.path.exists(self._props_path):