Only support file based locking

This commit is contained in:
Unrud 2018-08-16 07:59:58 +02:00
parent c7e65fbb7f
commit e96410c6e7
5 changed files with 52 additions and 168 deletions

5
config

@ -117,11 +117,6 @@
# Delete sync token that are older (seconds) # Delete sync token that are older (seconds)
#max_sync_token_age = 2592000 #max_sync_token_age = 2592000
# Close the lock file when no more clients are waiting.
# This option is not very useful in general, but on Windows files that are
# opened cannot be deleted.
#filesystem_close_lock_file = False
# Command that is run after changes to storage # Command that is run after changes to storage
# Example: ([ -d .git ] || git init) && git add -A && (git diff --cached --quiet || git commit -m "Changes by "%(user)s) # Example: ([ -d .git ] || git init) && git add -A && (git diff --cached --quiet || git commit -m "Changes by "%(user)s)
#hook = #hook =

@ -173,14 +173,6 @@ INITIAL_CONFIG = OrderedDict([
"value": "True", "value": "True",
"help": "sync all changes to filesystem during requests", "help": "sync all changes to filesystem during requests",
"type": bool}), "type": bool}),
("filesystem_locking", {
"value": "True",
"help": "lock the storage while accessing it",
"type": bool}),
("filesystem_close_lock_file", {
"value": "False",
"help": "close the lock file when no more clients are waiting",
"type": bool}),
("hook", { ("hook", {
"value": "", "value": "",
"help": "command that is run after changes to storage", "help": "command that is run after changes to storage",

@ -805,16 +805,8 @@ class Collection(BaseCollection):
folder = os.path.expanduser(cls.configuration.get( folder = os.path.expanduser(cls.configuration.get(
"storage", "filesystem_folder")) "storage", "filesystem_folder"))
cls._makedirs_synced(folder) cls._makedirs_synced(folder)
lock_path = None lock_path = os.path.join(folder, ".Radicale.lock")
if cls.configuration.getboolean("storage", "filesystem_locking"): cls._lock = FileBackedRwLock(lock_path)
lock_path = os.path.join(folder, ".Radicale.lock")
close_lock_file = cls.configuration.getboolean(
"storage", "filesystem_close_lock_file")
cls._lock = FileBackedRwLock(lock_path, close_lock_file)
# init cache lock
cls._cache_locks = {}
cls._cache_locks_lock = threading.Lock()
def __init__(self, path, principal=None, folder=None, def __init__(self, path, principal=None, folder=None,
filesystem_path=None): filesystem_path=None):
@ -1382,36 +1374,15 @@ class Collection(BaseCollection):
pass pass
return content return content
@contextmanager
def _acquire_cache_lock(self, ns=""): def _acquire_cache_lock(self, ns=""):
if "/" in ns: if self._lock.locked == "w":
raise ValueError("ns must not include '/'") return contextlib.ExitStack()
with contextlib.ExitStack() as lock_stack: cache_folder = os.path.join(self._filesystem_path, ".Radicale.cache")
with contextlib.ExitStack() as locks_lock_stack: self._makedirs_synced(cache_folder)
locks_lock_stack.enter_context(self._cache_locks_lock) lock_path = os.path.join(cache_folder,
lock_id = ns + "/" + self.path ".Radicale.lock" + (".%s" % ns if ns else ""))
lock = self._cache_locks.get(lock_id) lock = FileBackedRwLock(lock_path)
if not lock: return lock.acquire("w")
cache_folder = os.path.join(self._filesystem_path,
".Radicale.cache")
self._makedirs_synced(cache_folder)
lock_path = None
if self.configuration.getboolean(
"storage", "filesystem_locking"):
lock_path = os.path.join(
cache_folder,
".Radicale.lock" + (".%s" % ns if ns else ""))
lock = FileBackedRwLock(lock_path)
self._cache_locks[lock_id] = lock
lock_stack.enter_context(lock.acquire_lock(
"w", lambda: locks_lock_stack.pop_all().close()))
try:
yield
finally:
with self._cache_locks_lock:
lock_stack.pop_all().close()
if not lock.in_use():
del self._cache_locks[lock_id]
def _load_item_cache(self, href, input_hash): def _load_item_cache(self, href, input_hash):
cache_folder = os.path.join(self._filesystem_path, ".Radicale.cache", cache_folder = os.path.join(self._filesystem_path, ".Radicale.cache",
@ -1470,12 +1441,11 @@ class Collection(BaseCollection):
self._load_item_cache(href, input_hash) self._load_item_cache(href, input_hash)
vobject_item = None vobject_item = None
if input_hash != cache_hash: if input_hash != cache_hash:
with contextlib.ExitStack() as lock_stack: with self._acquire_cache_lock("item"):
# Lock the item cache to prevent multpile processes from # Lock the item cache to prevent multpile processes from
# generating the same data in parallel. # generating the same data in parallel.
# This improves the performance for multiple requests. # This improves the performance for multiple requests.
if self._lock.locked() == "r": if self._lock.locked == "r":
lock_stack.enter_context(self._acquire_cache_lock("item"))
# Check if another process created the file in the meantime # Check if another process created the file in the meantime
cache_hash, uid, etag, text, name, tag, start, end = \ cache_hash, uid, etag, text, name, tag, start, end = \
self._load_item_cache(href, input_hash) self._load_item_cache(href, input_hash)
@ -1592,7 +1562,7 @@ class Collection(BaseCollection):
def get_meta(self, key=None): def get_meta(self, key=None):
# reuse cached value if the storage is read-only # reuse cached value if the storage is read-only
if self._lock.locked() == "w" or self._meta_cache is None: if self._lock.locked == "w" or self._meta_cache is None:
try: try:
try: try:
with open(self._props_path, encoding=self._encoding) as f: with open(self._props_path, encoding=self._encoding) as f:
@ -1621,14 +1591,14 @@ class Collection(BaseCollection):
@property @property
def etag(self): def etag(self):
# reuse cached value if the storage is read-only # reuse cached value if the storage is read-only
if self._lock.locked() == "w" or self._etag_cache is None: if self._lock.locked == "w" or self._etag_cache is None:
self._etag_cache = super().etag self._etag_cache = super().etag
return self._etag_cache return self._etag_cache
@classmethod @classmethod
@contextmanager @contextmanager
def acquire_lock(cls, mode, user=None): def acquire_lock(cls, mode, user=None):
with cls._lock.acquire_lock(mode): with cls._lock.acquire(mode):
yield yield
# execute hook # execute hook
hook = cls.configuration.get("storage", "hook") hook = cls.configuration.get("storage", "hook")
@ -1653,123 +1623,54 @@ class Collection(BaseCollection):
class FileBackedRwLock: class FileBackedRwLock:
"""A readers-Writer lock that can additionally lock a file. """A readers-Writer lock that locks a file."""
All requests are processed in FIFO order. def __init__(self, path):
"""
def __init__(self, path=None, close_lock_file=True):
"""Initilize a lock.
``path`` the file that is used for locking (optional)
``close_lock_file`` close the lock file, when unlocked and no requests
are pending
"""
self._path = path self._path = path
self._close_lock_file = close_lock_file
self._lock = threading.Lock()
self._waiters = []
self._lock_file = None
self._lock_file_locked = False
self._readers = 0 self._readers = 0
self._writer = False self._writer = False
self._lock = threading.Lock()
@property
def locked(self): def locked(self):
if self._writer:
return "w"
if self._readers:
return "r"
return ""
def in_use(self):
with self._lock: with self._lock:
return self._waiters or self._readers or self._writer if self._readers > 0:
return "r"
if self._writer:
return "w"
return ""
@contextmanager @contextmanager
def acquire_lock(self, mode, sync_callback=None): def acquire(self, mode):
def condition(): if mode not in "rw":
if mode == "r": raise ValueError("Invalid mode: %r" % mode)
return not self._writer with open(self._path, "w+") as lock_file:
if os.name == "nt":
handle = msvcrt.get_osfhandle(lock_file.fileno())
flags = LOCKFILE_EXCLUSIVE_LOCK if mode == "w" else 0
overlapped = Overlapped()
if not lock_file_ex(handle, flags, 0, 1, 0, overlapped):
raise RuntimeError("Locking the storage failed: %s" %
ctypes.FormatError())
elif os.name == "posix":
_cmd = fcntl.LOCK_EX if mode == "w" else fcntl.LOCK_SH
try:
fcntl.flock(lock_file.fileno(), _cmd)
except OSError as e:
raise RuntimeError("Locking the storage failed: %s" %
e) from e
else: else:
return not self._writer and self._readers == 0 raise RuntimeError("Locking the storage failed: "
"Unsupported operating system")
# Use a primitive lock which only works within one process as a
# precondition for inter-process file-based locking
with self._lock:
if sync_callback:
sync_callback()
if self._waiters or not condition():
# Use FIFO for access requests
waiter = threading.Condition(lock=self._lock)
self._waiters.append(waiter)
while True:
waiter.wait()
if condition():
break
self._waiters.pop(0)
if mode == "r":
self._readers += 1
# Notify additional potential readers
if self._waiters:
self._waiters[0].notify()
else:
self._writer = True
if self._path and not self._lock_file_locked:
if not self._lock_file:
self._lock_file = open(self._path, "w+")
if os.name == "nt":
handle = msvcrt.get_osfhandle(self._lock_file.fileno())
flags = LOCKFILE_EXCLUSIVE_LOCK if mode == "w" else 0
overlapped = Overlapped()
if not lock_file_ex(handle, flags, 0, 1, 0, overlapped):
raise RuntimeError("Locking the storage failed "
"(can be disabled in the config): "
"%s" % ctypes.FormatError())
elif os.name == "posix":
_cmd = fcntl.LOCK_EX if mode == "w" else fcntl.LOCK_SH
try:
fcntl.flock(self._lock_file.fileno(), _cmd)
except OSError as e:
raise RuntimeError("Locking the storage failed "
"(can be disabled in the config): "
"%s" % e) from e
else:
raise RuntimeError("Locking the storage failed "
"(can be disabled in the config): "
"Unsupported operating system")
self._lock_file_locked = True
try:
yield
finally:
with self._lock: with self._lock:
if mode == "r": if mode == "r":
self._readers -= 1 self._readers += 1
else: else:
self._writer = True
try:
yield
finally:
with self._lock:
if mode == "r":
self._readers -= 1
self._writer = False self._writer = False
if self._lock_file_locked and self._readers == 0:
if os.name == "nt":
handle = msvcrt.get_osfhandle(self._lock_file.fileno())
overlapped = Overlapped()
if not unlock_file_ex(handle, 0, 1, 0, overlapped):
raise RuntimeError("Unlocking the storage failed: "
"%s" % ctypes.FormatError())
elif os.name == "posix":
try:
fcntl.flock(self._lock_file.fileno(),
fcntl.LOCK_UN)
except OSError as e:
raise RuntimeError("Unlocking the storage failed: "
"%s" % e) from e
else:
raise RuntimeError("Unlocking the storage failed: "
"Unsupported operating system")
if self._close_lock_file and not self._waiters:
self._lock_file.close()
self._lock_file = None
self._lock_file_locked = False
if self._waiters:
self._waiters[0].notify()

@ -1428,8 +1428,6 @@ class BaseFileSystemTest(BaseTest):
self.configuration["storage"]["filesystem_folder"] = self.colpath self.configuration["storage"]["filesystem_folder"] = self.colpath
# Disable syncing to disk for better performance # Disable syncing to disk for better performance
self.configuration["storage"]["filesystem_fsync"] = "False" self.configuration["storage"]["filesystem_fsync"] = "False"
# Required on Windows, doesn't matter on Unix
self.configuration["storage"]["filesystem_close_lock_file"] = "True"
self.application = Application(self.configuration) self.application = Application(self.configuration)
def teardown(self): def teardown(self):

@ -37,8 +37,6 @@ class TestBaseAuthRequests(BaseTest):
self.configuration["storage"]["filesystem_folder"] = self.colpath self.configuration["storage"]["filesystem_folder"] = self.colpath
# Disable syncing to disk for better performance # Disable syncing to disk for better performance
self.configuration["storage"]["filesystem_fsync"] = "False" self.configuration["storage"]["filesystem_fsync"] = "False"
# Required on Windows, doesn't matter on Unix
self.configuration["storage"]["filesystem_close_lock_file"] = "True"
def teardown(self): def teardown(self):
shutil.rmtree(self.colpath) shutil.rmtree(self.colpath)