Lock the item cache when filling it
Prevents performance losses when many clients request the same missing items in parallel.
This commit is contained in:
		@@ -1217,6 +1217,38 @@ class Collection(BaseCollection):
 | 
			
		||||
            pass
 | 
			
		||||
        return content
 | 
			
		||||
 | 
			
		||||
    _cache_locks = {}
 | 
			
		||||
    _cache_locks_lock = threading.Lock()
 | 
			
		||||
 | 
			
		||||
    @contextmanager
 | 
			
		||||
    def _acquire_cache_lock(self, ns=""):
 | 
			
		||||
        with contextlib.ExitStack() as lock_stack:
 | 
			
		||||
            with contextlib.ExitStack() as locks_lock_stack:
 | 
			
		||||
                locks_lock_stack.enter_context(self._cache_locks_lock)
 | 
			
		||||
                lock_id = ns + "/" + self.path
 | 
			
		||||
                lock = self._cache_locks.get(lock_id)
 | 
			
		||||
                if not lock:
 | 
			
		||||
                    cache_folder = os.path.join(self._filesystem_path,
 | 
			
		||||
                                                ".Radicale.cache")
 | 
			
		||||
                    self._makedirs_synced(cache_folder)
 | 
			
		||||
                    lock_path = None
 | 
			
		||||
                    if self.configuration.getboolean(
 | 
			
		||||
                            "storage", "filesystem_locking"):
 | 
			
		||||
                        lock_path = os.path.join(
 | 
			
		||||
                            cache_folder,
 | 
			
		||||
                            ".Radicale.lock" + (".%s" % ns if ns else ""))
 | 
			
		||||
                    lock = FileBackedRwLock(lock_path)
 | 
			
		||||
                    self._cache_locks[lock_id] = lock
 | 
			
		||||
                lock_stack.enter_context(lock.acquire_lock(
 | 
			
		||||
                    "w", lambda: locks_lock_stack.pop_all().close()))
 | 
			
		||||
            try:
 | 
			
		||||
                yield
 | 
			
		||||
            finally:
 | 
			
		||||
                with self._cache_locks_lock:
 | 
			
		||||
                    lock_stack.pop_all().close()
 | 
			
		||||
                    if not lock.in_use():
 | 
			
		||||
                        del self._cache_locks[lock_id]
 | 
			
		||||
 | 
			
		||||
    def _load_item_cache(self, href):
 | 
			
		||||
        cache_folder = os.path.join(self._filesystem_path, ".Radicale.cache",
 | 
			
		||||
                                    "item")
 | 
			
		||||
@@ -1267,25 +1299,38 @@ class Collection(BaseCollection):
 | 
			
		||||
            href)
 | 
			
		||||
        vobject_item = None
 | 
			
		||||
        if input_hash != cache_hash:
 | 
			
		||||
            try:
 | 
			
		||||
                vobject_items = tuple(vobject.readComponents(
 | 
			
		||||
                    raw_text.decode(self._encoding)))
 | 
			
		||||
                if len(vobject_items) != 1:
 | 
			
		||||
                    raise RuntimeError(
 | 
			
		||||
                        "Content contains %d components" % len(vobject_items))
 | 
			
		||||
                vobject_item = vobject_items[0]
 | 
			
		||||
                check_and_sanitize_item(vobject_item, uid=uid,
 | 
			
		||||
                                        tag=self.get_meta("tag"))
 | 
			
		||||
            # Lock the item cache to prevent multpile processes from generating
 | 
			
		||||
            # the same data in parallel. This is only needed for performance.
 | 
			
		||||
            if self._lock.locked() == "w":
 | 
			
		||||
                # The access is already exclusive, use dummy context manager.
 | 
			
		||||
                lock = contextlib.suppress()
 | 
			
		||||
            else:
 | 
			
		||||
                lock = self._acquire_cache_lock("item")
 | 
			
		||||
            with lock:
 | 
			
		||||
                # Check if another process created the file in the meantime.
 | 
			
		||||
                cache_hash, uid, etag, text, tag, start, end = \
 | 
			
		||||
                    self._store_item_cache(href, vobject_item, input_hash)
 | 
			
		||||
            except Exception as e:
 | 
			
		||||
                raise RuntimeError("Failed to load item %r in %r: %s" %
 | 
			
		||||
                                   (href, self.path, e)) from e
 | 
			
		||||
            # Clean cache entriesn once after the data in the file system was
 | 
			
		||||
            # edited externally.
 | 
			
		||||
            if not self._item_cache_cleaned:
 | 
			
		||||
                self._item_cache_cleaned = True
 | 
			
		||||
                self._clean_item_cache()
 | 
			
		||||
                    self._load_item_cache(href)
 | 
			
		||||
                if input_hash != cache_hash:
 | 
			
		||||
                    try:
 | 
			
		||||
                        vobject_items = tuple(vobject.readComponents(
 | 
			
		||||
                            raw_text.decode(self._encoding)))
 | 
			
		||||
                        if len(vobject_items) != 1:
 | 
			
		||||
                            raise RuntimeError("Content contains %d components"
 | 
			
		||||
                                               % len(vobject_items))
 | 
			
		||||
                        vobject_item = vobject_items[0]
 | 
			
		||||
                        check_and_sanitize_item(vobject_item, uid=uid,
 | 
			
		||||
                                                tag=self.get_meta("tag"))
 | 
			
		||||
                        cache_hash, uid, etag, text, tag, start, end = \
 | 
			
		||||
                            self._store_item_cache(
 | 
			
		||||
                                href, vobject_item, input_hash)
 | 
			
		||||
                    except Exception as e:
 | 
			
		||||
                        raise RuntimeError("Failed to load item %r in %r: %s" %
 | 
			
		||||
                                           (href, self.path, e)) from e
 | 
			
		||||
                    # Clean cache entries once after the data in the file
 | 
			
		||||
                    # system was edited externally.
 | 
			
		||||
                    if not self._item_cache_cleaned:
 | 
			
		||||
                        self._item_cache_cleaned = True
 | 
			
		||||
                        self._clean_item_cache()
 | 
			
		||||
        last_modified = time.strftime(
 | 
			
		||||
            "%a, %d %b %Y %H:%M:%S GMT",
 | 
			
		||||
            time.gmtime(os.path.getmtime(path)))
 | 
			
		||||
@@ -1471,8 +1516,12 @@ class FileBackedRwLock:
 | 
			
		||||
            return "r"
 | 
			
		||||
        return ""
 | 
			
		||||
 | 
			
		||||
    def in_use(self):
 | 
			
		||||
        with self._lock:
 | 
			
		||||
            return self._waiters or self._readers or self._writer
 | 
			
		||||
 | 
			
		||||
    @contextmanager
 | 
			
		||||
    def acquire_lock(self, mode):
 | 
			
		||||
    def acquire_lock(self, mode, sync_callback=None):
 | 
			
		||||
        def condition():
 | 
			
		||||
            if mode == "r":
 | 
			
		||||
                return not self._writer
 | 
			
		||||
@@ -1482,6 +1531,8 @@ class FileBackedRwLock:
 | 
			
		||||
        # Use a primitive lock which only works within one process as a
 | 
			
		||||
        # precondition for inter-process file-based locking
 | 
			
		||||
        with self._lock:
 | 
			
		||||
            if sync_callback:
 | 
			
		||||
                sync_callback()
 | 
			
		||||
            if self._waiters or not condition():
 | 
			
		||||
                # Use FIFO for access requests
 | 
			
		||||
                waiter = threading.Condition(lock=self._lock)
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user