Real sync-collection support for the multifilesystem backend

This commit is contained in:
Unrud 2017-06-02 12:44:39 +02:00
parent 0ffa97eca0
commit f633b48a7a
3 changed files with 178 additions and 1 deletions

3
config
View File

@ -110,6 +110,9 @@
# power fails!
#filesystem_fsync = True
# Delete sync token that are older (seconds)
#max_sync_token_age = 2592000
# Close the lock file when no more clients are waiting.
# This option is not very useful in general, but on Windows files that are
# opened cannot be deleted.

View File

@ -135,6 +135,10 @@ INITIAL_CONFIG = OrderedDict([
"/var/lib/radicale/collections"),
"help": "path where collections are stored",
"type": str}),
("max_sync_token_age", {
"value": 2592000, # 30 days
"help": "delete sync token that are older",
"type": int}),
("filesystem_fsync", {
"value": "True",
"help": "sync all changes to filesystem during requests",

View File

@ -25,11 +25,13 @@ entry.
"""
import binascii
import contextlib
import datetime
import errno
import json
import os
import pickle
import posixpath
import shlex
import stat
@ -39,7 +41,7 @@ import time
from contextlib import contextmanager
from hashlib import md5
from importlib import import_module
from itertools import groupby
from itertools import chain, groupby
from random import getrandbits
from tempfile import NamedTemporaryFile, TemporaryDirectory
@ -641,6 +643,12 @@ class Collection(BaseCollection):
cls._sync_directory(to_collection._filesystem_path)
if item.collection._filesystem_path != to_collection._filesystem_path:
cls._sync_directory(item.collection._filesystem_path)
# Track the change
to_collection._update_history_etag(to_href, item)
item.collection._update_history_etag(item.href, None)
to_collection._clean_history_cache()
if item.collection._filesystem_path != to_collection._filesystem_path:
item.collection._clean_history_cache()
@classmethod
def _clean_cache(cls, folder, names, max_age=None):
@ -670,6 +678,162 @@ class Collection(BaseCollection):
if modified:
cls._sync_directory(folder)
def _update_history_etag(self, href, item):
"""Updates and retrieves the history etag from the history cache.
The history cache contains a file for each current and deleted item
of the collection. These files contain the etag of the item (empty
string for deleted items) and a history etag, which is a hash over
the previous history etag and the etag separated by "/".
"""
history_folder = os.path.join(self._filesystem_path,
".Radicale.cache", "history")
try:
with open(os.path.join(history_folder, href), "rb") as f:
cache_etag, history_etag = pickle.load(f)
except (FileNotFoundError, pickle.UnpicklingError, ValueError) as e:
if isinstance(e, (pickle.UnpicklingError, ValueError)):
self.logger.warning(
"Failed to load history cache entry %r in %r: %s",
href, self.path, e, exc_info=True)
# Delete the damaged file
try:
os.remove(os.path.join(history_folder, href))
except (FileNotFoundError, PermissionError):
pass
cache_etag = ""
# Initialize with random data to prevent collisions with cleaned
# expired items.
history_etag = binascii.hexlify(os.urandom(16)).decode("ascii")
etag = item.etag if item else ""
if etag != cache_etag:
self._makedirs_synced(history_folder)
history_etag = get_etag(history_etag + "/" + etag).strip("\"")
try:
# Race: Other processes might have created and locked the file.
with self._atomic_write(os.path.join(history_folder, href),
"wb") as f:
pickle.dump([etag, history_etag], f)
except PermissionError:
pass
return history_etag
def _get_deleted_history_hrefs(self):
"""Returns the hrefs of all deleted items that are still in the
history cache."""
history_folder = os.path.join(self._filesystem_path,
".Radicale.cache", "history")
try:
for href in os.listdir(history_folder):
if not is_safe_filesystem_path_component(href):
continue
if os.path.isfile(os.path.join(self._filesystem_path, href)):
continue
yield href
except FileNotFoundError:
pass
def _clean_history_cache(self):
# Delete all expired cache entries of deleted items.
history_folder = os.path.join(self._filesystem_path,
".Radicale.cache", "history")
self._clean_cache(history_folder, self._get_deleted_history_hrefs(),
max_age=self.configuration.getint(
"storage", "max_sync_token_age"))
def sync(self, old_token=None):
# The sync token has the form http://radicale.org/ns/sync/TOKEN_NAME
# where TOKEN_NAME is the md5 hash of all history etags of present and
# past items of the collection.
def check_token_name(token_name):
if len(token_name) != 32:
return False
for c in token_name:
if c not in "0123456789abcdef":
return False
return True
old_token_name = None
if old_token:
# Extract the token name from the sync token
if not old_token.startswith("http://radicale.org/ns/sync/"):
raise ValueError("Malformed token: %s" % old_token)
old_token_name = old_token[len("http://radicale.org/ns/sync/"):]
if not check_token_name(old_token_name):
raise ValueError("Malformed token: %s" % old_token)
# Get the current state and sync-token of the collection.
state = {}
token_name_hash = md5()
# Find the history of all existing and deleted items
for href, item in chain(
((item.href, item) for item in self.pre_filtered_list(())),
((href, None) for href in self._get_deleted_history_hrefs())):
history_etag = self._update_history_etag(href, item)
state[href] = history_etag
token_name_hash.update((href + "/" + history_etag).encode("utf-8"))
token_name = token_name_hash.hexdigest()
token = "http://radicale.org/ns/sync/%s" % token_name
if token_name == old_token_name:
# Nothing changed
return token, ()
token_folder = os.path.join(self._filesystem_path,
".Radicale.cache", "sync-token")
token_path = os.path.join(token_folder, token_name)
old_state = {}
if old_token_name:
# load the old token state
old_token_path = os.path.join(token_folder, old_token_name)
try:
# Race: Another process might have deleted the file.
with open(old_token_path, "rb") as f:
old_state = pickle.load(f)
except (FileNotFoundError, pickle.UnpicklingError,
ValueError) as e:
if isinstance(e, (pickle.UnpicklingError, ValueError)):
self.logger.warning(
"Failed to load stored sync token %r in %r: %s",
old_token_name, self.path, e, exc_info=True)
# Delete the damaged file
try:
os.remove(old_token_path)
except (FileNotFoundError, PermissionError):
pass
raise ValueError("Token not found: %s" % old_token)
# write the new token state or update the modification time of
# existing token state
if not os.path.exists(token_path):
self._makedirs_synced(token_folder)
try:
# Race: Other processes might have created and locked the file.
with self._atomic_write(token_path, "wb") as f:
pickle.dump(state, f)
except PermissionError:
pass
else:
# clean up old sync tokens and item cache
self._clean_cache(token_folder, os.listdir(token_folder),
max_age=self.configuration.getint(
"storage", "max_sync_token_age"))
self._clean_history_cache()
else:
# Try to update the modification time
try:
# Race: Another process might have deleted the file.
os.utime(token_path)
except FileNotFoundError:
pass
changes = []
# Find all new, changed and deleted (that are still in the item cache)
# items
for href, history_etag in state.items():
if history_etag != old_state.get(href):
changes.append(href)
# Find all deleted items that are no longer in the item cache
for href, history_etag in old_state.items():
if href not in state:
changes.append(href)
return token, changes
def list(self):
for href in os.listdir(self._filesystem_path):
if not is_safe_filesystem_path_component(href):
@ -710,6 +874,9 @@ class Collection(BaseCollection):
item = Item(self, vobject_item, href)
with self._atomic_write(path, newline="") as fd:
fd.write(item.serialize())
# Track the change
self._update_history_etag(href, item)
self._clean_history_cache()
return item
def delete(self, href=None):
@ -735,6 +902,9 @@ class Collection(BaseCollection):
raise ComponentNotFoundError(href)
os.remove(path)
self._sync_directory(os.path.dirname(path))
# Track the change
self._update_history_etag(href, None)
self._clean_history_cache()
def get_meta(self, key=None):
if os.path.exists(self._props_path):