diff --git a/radicale/storage.py b/radicale/storage.py index f84cb68..f649807 100644 --- a/radicale/storage.py +++ b/radicale/storage.py @@ -1217,6 +1217,38 @@ class Collection(BaseCollection): pass return content + _cache_locks = {} + _cache_locks_lock = threading.Lock() + + @contextmanager + def _acquire_cache_lock(self, ns=""): + with contextlib.ExitStack() as lock_stack: + with contextlib.ExitStack() as locks_lock_stack: + locks_lock_stack.enter_context(self._cache_locks_lock) + lock_id = ns + "/" + self.path + lock = self._cache_locks.get(lock_id) + if not lock: + cache_folder = os.path.join(self._filesystem_path, + ".Radicale.cache") + self._makedirs_synced(cache_folder) + lock_path = None + if self.configuration.getboolean( + "storage", "filesystem_locking"): + lock_path = os.path.join( + cache_folder, + ".Radicale.lock" + (".%s" % ns if ns else "")) + lock = FileBackedRwLock(lock_path) + self._cache_locks[lock_id] = lock + lock_stack.enter_context(lock.acquire_lock( + "w", lambda: locks_lock_stack.pop_all().close())) + try: + yield + finally: + with self._cache_locks_lock: + lock_stack.pop_all().close() + if not lock.in_use(): + del self._cache_locks[lock_id] + def _load_item_cache(self, href): cache_folder = os.path.join(self._filesystem_path, ".Radicale.cache", "item") @@ -1267,25 +1299,38 @@ class Collection(BaseCollection): href) vobject_item = None if input_hash != cache_hash: - try: - vobject_items = tuple(vobject.readComponents( - raw_text.decode(self._encoding))) - if len(vobject_items) != 1: - raise RuntimeError( - "Content contains %d components" % len(vobject_items)) - vobject_item = vobject_items[0] - check_and_sanitize_item(vobject_item, uid=uid, - tag=self.get_meta("tag")) + # Lock the item cache to prevent multpile processes from generating + # the same data in parallel. This is only needed for performance. + if self._lock.locked() == "w": + # The access is already exclusive, use dummy context manager. + lock = contextlib.suppress() + else: + lock = self._acquire_cache_lock("item") + with lock: + # Check if another process created the file in the meantime. cache_hash, uid, etag, text, tag, start, end = \ - self._store_item_cache(href, vobject_item, input_hash) - except Exception as e: - raise RuntimeError("Failed to load item %r in %r: %s" % - (href, self.path, e)) from e - # Clean cache entriesn once after the data in the file system was - # edited externally. - if not self._item_cache_cleaned: - self._item_cache_cleaned = True - self._clean_item_cache() + self._load_item_cache(href) + if input_hash != cache_hash: + try: + vobject_items = tuple(vobject.readComponents( + raw_text.decode(self._encoding))) + if len(vobject_items) != 1: + raise RuntimeError("Content contains %d components" + % len(vobject_items)) + vobject_item = vobject_items[0] + check_and_sanitize_item(vobject_item, uid=uid, + tag=self.get_meta("tag")) + cache_hash, uid, etag, text, tag, start, end = \ + self._store_item_cache( + href, vobject_item, input_hash) + except Exception as e: + raise RuntimeError("Failed to load item %r in %r: %s" % + (href, self.path, e)) from e + # Clean cache entries once after the data in the file + # system was edited externally. + if not self._item_cache_cleaned: + self._item_cache_cleaned = True + self._clean_item_cache() last_modified = time.strftime( "%a, %d %b %Y %H:%M:%S GMT", time.gmtime(os.path.getmtime(path))) @@ -1471,8 +1516,12 @@ class FileBackedRwLock: return "r" return "" + def in_use(self): + with self._lock: + return self._waiters or self._readers or self._writer + @contextmanager - def acquire_lock(self, mode): + def acquire_lock(self, mode, sync_callback=None): def condition(): if mode == "r": return not self._writer @@ -1482,6 +1531,8 @@ class FileBackedRwLock: # Use a primitive lock which only works within one process as a # precondition for inter-process file-based locking with self._lock: + if sync_callback: + sync_callback() if self._waiters or not condition(): # Use FIFO for access requests waiter = threading.Condition(lock=self._lock)