Merge pull request #565 from Unrud/synctoken

Support for sync-token and sync-collection
This commit is contained in:
Unrud 2017-06-02 13:01:54 +02:00 committed by GitHub
commit 78a62aee86
6 changed files with 429 additions and 8 deletions

3
config
View File

@ -114,6 +114,9 @@
# power fails!
#filesystem_fsync = True
# Delete sync token that are older (seconds)
#max_sync_token_age = 2592000
# Close the lock file when no more clients are waiting.
# This option is not very useful in general, but on Windows files that are
# opened cannot be deleted.

View File

@ -828,7 +828,8 @@ class Application:
else:
collection = item.collection
headers = {"Content-Type": "text/xml; charset=%s" % self.encoding}
xml_answer = xmlutils.report(
status, xml_answer = xmlutils.report(
base_prefix, path, xml_content, collection)
return (client.MULTI_STATUS, headers,
self._write_xml_content(xml_answer))
if status == client.PRECONDITION_FAILED:
return PRECONDITION_FAILED
return (status, headers, self._write_xml_content(xml_answer))

View File

@ -160,6 +160,10 @@ INITIAL_CONFIG = OrderedDict([
"/var/lib/radicale/collections"),
"help": "path where collections are stored",
"type": str}),
("max_sync_token_age", {
"value": 2592000, # 30 days
"help": "delete sync token that are older",
"type": int}),
("filesystem_fsync", {
"value": "True",
"help": "sync all changes to filesystem during requests",

View File

@ -25,11 +25,13 @@ entry.
"""
import binascii
import contextlib
import datetime
import errno
import json
import os
import pickle
import posixpath
import shlex
import stat
@ -39,7 +41,7 @@ import time
from contextlib import contextmanager
from hashlib import md5
from importlib import import_module
from itertools import groupby
from itertools import chain, groupby
from random import getrandbits
from tempfile import NamedTemporaryFile, TemporaryDirectory
@ -300,6 +302,24 @@ class BaseCollection:
"""
raise NotImplementedError
def sync(self, old_token=None):
"""Get the current sync token and changed items for synchronization.
``old_token`` an old sync token which is used as the base of the
delta update. If sync token is missing, all items are returned.
ValueError is raised for invalid or old tokens.
WARNING: This simple default implementation treats all sync-token as
invalid. It adheres to the specification but some clients
(e.g. InfCloud) don't like it. Subclasses should provide a
more sophisticated implementation.
"""
token = "http://radicale.org/ns/sync/%s" % self.etag.strip("\"")
if old_token:
raise ValueError("Sync token are not supported")
return token, self.list()
def list(self):
"""List collection items."""
raise NotImplementedError
@ -405,8 +425,8 @@ class Collection(BaseCollection):
def _atomic_write(self, path, mode="w", newline=None):
directory = os.path.dirname(path)
tmp = NamedTemporaryFile(
mode=mode, dir=directory, encoding=self.encoding,
delete=False, prefix=".Radicale.tmp-", newline=newline)
mode=mode, dir=directory, delete=False, prefix=".Radicale.tmp-",
newline=newline, encoding=None if "b" in mode else self.encoding)
try:
yield tmp
self._fsync(tmp.fileno())
@ -623,6 +643,196 @@ class Collection(BaseCollection):
cls._sync_directory(to_collection._filesystem_path)
if item.collection._filesystem_path != to_collection._filesystem_path:
cls._sync_directory(item.collection._filesystem_path)
# Track the change
to_collection._update_history_etag(to_href, item)
item.collection._update_history_etag(item.href, None)
to_collection._clean_history_cache()
if item.collection._filesystem_path != to_collection._filesystem_path:
item.collection._clean_history_cache()
@classmethod
def _clean_cache(cls, folder, names, max_age=None):
"""Delete all ``names`` in ``folder`` that are older than ``max_age``.
"""
age_limit = time.time() - max_age if max_age is not None else None
modified = False
for name in names:
if not is_safe_filesystem_path_component(name):
continue
if age_limit is not None:
try:
# Race: Another process might have deleted the file.
mtime = os.path.getmtime(os.path.join(folder, name))
except FileNotFoundError:
continue
if mtime > age_limit:
continue
cls.logger.debug("Found expired item in cache: %r", name)
# Race: Another process might have deleted or locked the
# file.
try:
os.remove(os.path.join(folder, name))
except (FileNotFoundError, PermissionError):
continue
modified = True
if modified:
cls._sync_directory(folder)
def _update_history_etag(self, href, item):
"""Updates and retrieves the history etag from the history cache.
The history cache contains a file for each current and deleted item
of the collection. These files contain the etag of the item (empty
string for deleted items) and a history etag, which is a hash over
the previous history etag and the etag separated by "/".
"""
history_folder = os.path.join(self._filesystem_path,
".Radicale.cache", "history")
try:
with open(os.path.join(history_folder, href), "rb") as f:
cache_etag, history_etag = pickle.load(f)
except (FileNotFoundError, pickle.UnpicklingError, ValueError) as e:
if isinstance(e, (pickle.UnpicklingError, ValueError)):
self.logger.warning(
"Failed to load history cache entry %r in %r: %s",
href, self.path, e, exc_info=True)
# Delete the damaged file
try:
os.remove(os.path.join(history_folder, href))
except (FileNotFoundError, PermissionError):
pass
cache_etag = ""
# Initialize with random data to prevent collisions with cleaned
# expired items.
history_etag = binascii.hexlify(os.urandom(16)).decode("ascii")
etag = item.etag if item else ""
if etag != cache_etag:
self._makedirs_synced(history_folder)
history_etag = get_etag(history_etag + "/" + etag).strip("\"")
try:
# Race: Other processes might have created and locked the file.
with self._atomic_write(os.path.join(history_folder, href),
"wb") as f:
pickle.dump([etag, history_etag], f)
except PermissionError:
pass
return history_etag
def _get_deleted_history_hrefs(self):
"""Returns the hrefs of all deleted items that are still in the
history cache."""
history_folder = os.path.join(self._filesystem_path,
".Radicale.cache", "history")
try:
for href in os.listdir(history_folder):
if not is_safe_filesystem_path_component(href):
continue
if os.path.isfile(os.path.join(self._filesystem_path, href)):
continue
yield href
except FileNotFoundError:
pass
def _clean_history_cache(self):
# Delete all expired cache entries of deleted items.
history_folder = os.path.join(self._filesystem_path,
".Radicale.cache", "history")
self._clean_cache(history_folder, self._get_deleted_history_hrefs(),
max_age=self.configuration.getint(
"storage", "max_sync_token_age"))
def sync(self, old_token=None):
# The sync token has the form http://radicale.org/ns/sync/TOKEN_NAME
# where TOKEN_NAME is the md5 hash of all history etags of present and
# past items of the collection.
def check_token_name(token_name):
if len(token_name) != 32:
return False
for c in token_name:
if c not in "0123456789abcdef":
return False
return True
old_token_name = None
if old_token:
# Extract the token name from the sync token
if not old_token.startswith("http://radicale.org/ns/sync/"):
raise ValueError("Malformed token: %s" % old_token)
old_token_name = old_token[len("http://radicale.org/ns/sync/"):]
if not check_token_name(old_token_name):
raise ValueError("Malformed token: %s" % old_token)
# Get the current state and sync-token of the collection.
state = {}
token_name_hash = md5()
# Find the history of all existing and deleted items
for href, item in chain(
((item.href, item) for item in self.pre_filtered_list(())),
((href, None) for href in self._get_deleted_history_hrefs())):
history_etag = self._update_history_etag(href, item)
state[href] = history_etag
token_name_hash.update((href + "/" + history_etag).encode("utf-8"))
token_name = token_name_hash.hexdigest()
token = "http://radicale.org/ns/sync/%s" % token_name
if token_name == old_token_name:
# Nothing changed
return token, ()
token_folder = os.path.join(self._filesystem_path,
".Radicale.cache", "sync-token")
token_path = os.path.join(token_folder, token_name)
old_state = {}
if old_token_name:
# load the old token state
old_token_path = os.path.join(token_folder, old_token_name)
try:
# Race: Another process might have deleted the file.
with open(old_token_path, "rb") as f:
old_state = pickle.load(f)
except (FileNotFoundError, pickle.UnpicklingError,
ValueError) as e:
if isinstance(e, (pickle.UnpicklingError, ValueError)):
self.logger.warning(
"Failed to load stored sync token %r in %r: %s",
old_token_name, self.path, e, exc_info=True)
# Delete the damaged file
try:
os.remove(old_token_path)
except (FileNotFoundError, PermissionError):
pass
raise ValueError("Token not found: %s" % old_token)
# write the new token state or update the modification time of
# existing token state
if not os.path.exists(token_path):
self._makedirs_synced(token_folder)
try:
# Race: Other processes might have created and locked the file.
with self._atomic_write(token_path, "wb") as f:
pickle.dump(state, f)
except PermissionError:
pass
else:
# clean up old sync tokens and item cache
self._clean_cache(token_folder, os.listdir(token_folder),
max_age=self.configuration.getint(
"storage", "max_sync_token_age"))
self._clean_history_cache()
else:
# Try to update the modification time
try:
# Race: Another process might have deleted the file.
os.utime(token_path)
except FileNotFoundError:
pass
changes = []
# Find all new, changed and deleted (that are still in the item cache)
# items
for href, history_etag in state.items():
if history_etag != old_state.get(href):
changes.append(href)
# Find all deleted items that are no longer in the item cache
for href, history_etag in old_state.items():
if href not in state:
changes.append(href)
return token, changes
def list(self):
for href in os.listdir(self._filesystem_path):
@ -664,6 +874,9 @@ class Collection(BaseCollection):
item = Item(self, vobject_item, href)
with self._atomic_write(path, newline="") as fd:
fd.write(item.serialize())
# Track the change
self._update_history_etag(href, item)
self._clean_history_cache()
return item
def delete(self, href=None):
@ -689,6 +902,9 @@ class Collection(BaseCollection):
raise ComponentNotFoundError(href)
os.remove(path)
self._sync_directory(os.path.dirname(path))
# Track the change
self._update_history_etag(href, None)
self._clean_history_cache()
def get_meta(self, key=None):
if os.path.exists(self._props_path):

View File

@ -24,6 +24,7 @@ import os
import posixpath
import shutil
import tempfile
import xml.etree.ElementTree as ET
import pytest
from radicale import Application, config
@ -763,6 +764,177 @@ class BaseRequestsMixIn:
assert status == 207
assert "href>%s<" % event_path in answer
def _report_sync_token(self, calendar_path, sync_token=None):
sync_token_xml = (
"<sync-token><![CDATA[%s]]></sync-token>" % sync_token
if sync_token else "<sync-token />")
status, headers, answer = self.request(
"REPORT", calendar_path,
"""<?xml version="1.0" encoding="utf-8" ?>
<sync-collection xmlns="DAV:">
<prop>
<getetag />
</prop>
%s
</sync-collection>""" % sync_token_xml)
if sync_token and status == 412:
return None, None
assert status == 207
xml = ET.fromstring(answer)
sync_token = xml.find("{DAV:}sync-token").text.strip()
assert sync_token
return sync_token, xml
def test_report_sync_collection_no_change(self):
"""Test sync-collection report without modifying the collection"""
calendar_path = "/calendar.ics/"
self.request("MKCALENDAR", calendar_path)
event = get_file_content("event1.ics")
event_path = posixpath.join(calendar_path, "event.ics")
self.request("PUT", event_path, event)
sync_token, xml = self._report_sync_token(calendar_path)
assert xml.find("{DAV:}response") is not None
new_sync_token, xml = self._report_sync_token(calendar_path,
sync_token)
assert sync_token == new_sync_token
assert xml.find("{DAV:}response") is None
def test_report_sync_collection_add(self):
"""Test sync-collection report with an added item"""
calendar_path = "/calendar.ics/"
self.request("MKCALENDAR", calendar_path)
sync_token, xml = self._report_sync_token(calendar_path)
event = get_file_content("event1.ics")
event_path = posixpath.join(calendar_path, "event.ics")
self.request("PUT", event_path, event)
sync_token, xml = self._report_sync_token(calendar_path, sync_token)
if not sync_token:
pytest.skip("storage backend does not support sync-token")
assert xml.find("{DAV:}response") is not None
assert xml.find("{DAV:}response/{DAV:}status") is None
def test_report_sync_collection_delete(self):
"""Test sync-collection report with a deleted item"""
calendar_path = "/calendar.ics/"
self.request("MKCALENDAR", calendar_path)
event = get_file_content("event1.ics")
event_path = posixpath.join(calendar_path, "event.ics")
self.request("PUT", event_path, event)
sync_token, xml = self._report_sync_token(calendar_path)
self.request("DELETE", event_path)
sync_token, xml = self._report_sync_token(calendar_path, sync_token)
if not sync_token:
pytest.skip("storage backend does not support sync-token")
assert "404" in xml.find("{DAV:}response/{DAV:}status").text
def test_report_sync_collection_create_delete(self):
"""Test sync-collection report with a created and deleted item"""
calendar_path = "/calendar.ics/"
self.request("MKCALENDAR", calendar_path)
sync_token, xml = self._report_sync_token(calendar_path)
event = get_file_content("event1.ics")
event_path = posixpath.join(calendar_path, "event.ics")
self.request("PUT", event_path, event)
self.request("DELETE", event_path)
sync_token, xml = self._report_sync_token(calendar_path, sync_token)
if not sync_token:
pytest.skip("storage backend does not support sync-token")
assert "404" in xml.find("{DAV:}response/{DAV:}status").text
def test_report_sync_collection_modify_undo(self):
"""Test sync-collection report with a modified and changed back item"""
calendar_path = "/calendar.ics/"
self.request("MKCALENDAR", calendar_path)
event1 = get_file_content("event1.ics")
event2 = get_file_content("event2.ics")
event_path = posixpath.join(calendar_path, "event1.ics")
self.request("PUT", event_path, event1)
sync_token, xml = self._report_sync_token(calendar_path)
self.request("PUT", event_path, event2)
self.request("PUT", event_path, event1)
sync_token, xml = self._report_sync_token(calendar_path, sync_token)
if not sync_token:
pytest.skip("storage backend does not support sync-token")
assert xml.find("{DAV:}response") is not None
assert xml.find("{DAV:}response/{DAV:}status") is None
def test_report_sync_collection_move(self):
"""Test sync-collection report a moved item"""
calendar_path = "/calendar.ics/"
self.request("MKCALENDAR", calendar_path)
event = get_file_content("event1.ics")
event1_path = posixpath.join(calendar_path, "event1.ics")
event2_path = posixpath.join(calendar_path, "event2.ics")
self.request("PUT", event1_path, event)
sync_token, xml = self._report_sync_token(calendar_path)
status, headers, answer = self.request(
"MOVE", event1_path, HTTP_DESTINATION=event2_path, HTTP_HOST="")
sync_token, xml = self._report_sync_token(calendar_path, sync_token)
if not sync_token:
pytest.skip("storage backend does not support sync-token")
for response in xml.findall("{DAV:}response"):
if response.find("{DAV:}status") is None:
assert response.find("{DAV:}href").text == event2_path
else:
assert "404" in response.find("{DAV:}status").text
assert response.find("{DAV:}href").text == event1_path
def test_report_sync_collection_move_undo(self):
"""Test sync-collection report with a moved and moved back item"""
calendar_path = "/calendar.ics/"
self.request("MKCALENDAR", calendar_path)
event = get_file_content("event1.ics")
event1_path = posixpath.join(calendar_path, "event1.ics")
event2_path = posixpath.join(calendar_path, "event2.ics")
self.request("PUT", event1_path, event)
sync_token, xml = self._report_sync_token(calendar_path)
status, headers, answer = self.request(
"MOVE", event1_path, HTTP_DESTINATION=event2_path, HTTP_HOST="")
status, headers, answer = self.request(
"MOVE", event2_path, HTTP_DESTINATION=event1_path, HTTP_HOST="")
sync_token, xml = self._report_sync_token(calendar_path, sync_token)
if not sync_token:
pytest.skip("storage backend does not support sync-token")
created = deleted = 0
for response in xml.findall("{DAV:}response"):
if response.find("{DAV:}status") is None:
assert response.find("{DAV:}href").text == event1_path
created += 1
else:
assert "404" in response.find("{DAV:}status").text
assert response.find("{DAV:}href").text == event2_path
deleted += 1
assert created == 1 and deleted == 1
def test_report_sync_collection_invalid_sync_token(self):
"""Test sync-collection report with an invalid sync token"""
calendar_path = "/calendar.ics/"
self.request("MKCALENDAR", calendar_path)
sync_token, xml = self._report_sync_token(
calendar_path, "http://radicale.org/ns/sync/INVALID")
assert not sync_token
def test_propfind_sync_token(self):
"""Retrieve the sync-token with a propfind request"""
calendar_path = "/calendar.ics/"
self.request("MKCALENDAR", calendar_path)
sync_token, xml = self._report_sync_token(calendar_path)
event = get_file_content("event1.ics")
event_path = posixpath.join(calendar_path, "event.ics")
self.request("PUT", event_path, event)
new_sync_token, xml = self._report_sync_token(calendar_path,
sync_token)
assert sync_token != new_sync_token
def test_propfind_same_as_sync_collection_sync_token(self):
"""Compare sync-token property with sync-collection sync-token"""
calendar_path = "/calendar.ics/"
self.request("MKCALENDAR", calendar_path)
sync_token, xml = self._report_sync_token(calendar_path)
new_sync_token, xml = self._report_sync_token(calendar_path,
sync_token)
assert sync_token == new_sync_token
def test_authorization(self):
authorization = "Basic " + base64.b64encode(b"user:").decode()
status, headers, answer = self.request(

View File

@ -617,6 +617,7 @@ def _propfind_response(base_prefix, path, item, props, user, write=False,
if is_collection:
prop200.append(ET.Element(_tag("CS", "getctag")))
prop200.append(ET.Element(_tag("D", "sync-token")))
prop200.append(ET.Element(_tag("C", "calendar-timezone")))
prop200.append(ET.Element(_tag("D", "displayname")))
prop200.append(ET.Element(_tag("ICAL", "calendar-color")))
@ -732,6 +733,11 @@ def _propfind_response(base_prefix, path, item, props, user, write=False,
element.text = item.etag
else:
is404 = True
elif tag == _tag("D", "sync-token"):
if is_leaf:
element.text, _ = item.sync()
else:
is404 = True
else:
human_tag = _tag_from_clark(tag)
meta = item.get_meta(human_tag)
@ -841,7 +847,7 @@ def report(base_prefix, path, xml_request, collection):
# support for them) and stops working if an error code is returned.
collection.logger.warning("Unsupported REPORT method %r on %r "
"requested", root.tag, path)
return multistatus
return client.MULTI_STATUS, multistatus
prop_element = root.find(_tag("D", "prop"))
props = (
[prop.tag for prop in prop_element]
@ -860,6 +866,25 @@ def report(base_prefix, path, xml_request, collection):
else:
collection.logger.warning("Skipping invalid path %r in REPORT "
"request on %r", href_path, path)
elif root.tag == _tag("D", "sync-collection"):
old_sync_token_element = root.find(_tag("D", "sync-token"))
old_sync_token = ""
if old_sync_token_element is not None and old_sync_token_element.text:
old_sync_token = old_sync_token_element.text.strip()
collection.logger.debug("Client provided sync token: %r",
old_sync_token)
try:
sync_token, names = collection.sync(old_sync_token)
except ValueError as e:
# Invalid sync token
collection.logger.info("Client provided invalid sync token %r: %s",
old_sync_token, e, exc_info=True)
return client.PRECONDITION_FAILED, None
hreferences = ("/" + posixpath.join(collection.path, n) for n in names)
# Append current sync token to response
sync_token_element = ET.Element(_tag("D", "sync-token"))
sync_token_element.text = sync_token
multistatus.append(sync_token_element)
else:
hreferences = (path,)
filters = (
@ -931,7 +956,7 @@ def report(base_prefix, path, xml_request, collection):
base_prefix, uri, found_props=found_props,
not_found_props=not_found_props, found_item=True))
return multistatus
return client.MULTI_STATUS, multistatus
def _item_response(base_prefix, href, found_props=(), not_found_props=(),