From 2c45b1998c5c5a695770ae1a82439a0faa3bd1d5 Mon Sep 17 00:00:00 2001 From: Unrud Date: Sat, 21 May 2016 00:38:42 +0200 Subject: [PATCH 1/8] Implement locking of whole storage --- radicale/__init__.py | 61 ++++++++++++++++---------- radicale/storage.py | 101 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 138 insertions(+), 24 deletions(-) diff --git a/radicale/__init__.py b/radicale/__init__.py index 35ab75d..9894d9f 100644 --- a/radicale/__init__.py +++ b/radicale/__init__.py @@ -273,32 +273,45 @@ class Application: is_authenticated = self.is_authenticated(user, password) is_valid_user = is_authenticated or not user - if is_valid_user: - items = self.Collection.discover( - path, environ.get("HTTP_DEPTH", "0")) - read_allowed_items, write_allowed_items = ( - self.collect_allowed_items(items, user)) - else: - read_allowed_items, write_allowed_items = None, None + lock = None + try: + if is_valid_user: + if function in (self.do_GET, self.do_HEAD, + self.do_OPTIONS, self.do_PROPFIND, + self.do_REPORT): + lock_mode = "r" + else: + lock_mode = "w" + lock = self.Collection.acquire_lock(lock_mode) - # Get content - content_length = int(environ.get("CONTENT_LENGTH") or 0) - if content_length: - content = self.decode( - environ["wsgi.input"].read(content_length), environ) - self.logger.debug("Request content:\n%s" % content) - else: - content = None + items = self.Collection.discover( + path, environ.get("HTTP_DEPTH", "0")) + read_allowed_items, write_allowed_items = ( + self.collect_allowed_items(items, user)) + else: + read_allowed_items, write_allowed_items = None, None - if is_valid_user and ( - (read_allowed_items or write_allowed_items) or - (is_authenticated and function == self.do_PROPFIND) or - function == self.do_OPTIONS): - status, headers, answer = function( - environ, read_allowed_items, write_allowed_items, content, - user) - else: - status, headers, answer = NOT_ALLOWED + # Get content + content_length = int(environ.get("CONTENT_LENGTH") or 0) + if content_length: + content = self.decode( + environ["wsgi.input"].read(content_length), environ) + self.logger.debug("Request content:\n%s" % content) + else: + content = None + + if is_valid_user and ( + (read_allowed_items or write_allowed_items) or + (is_authenticated and function == self.do_PROPFIND) or + function == self.do_OPTIONS): + status, headers, answer = function( + environ, read_allowed_items, write_allowed_items, content, + user) + else: + status, headers, answer = NOT_ALLOWED + finally: + if lock: + lock.release() if (status, headers, answer) == NOT_ALLOWED and not is_authenticated: # Unknown or unauthorized user diff --git a/radicale/storage.py b/radicale/storage.py index 982026f..7e4c2d8 100644 --- a/radicale/storage.py +++ b/radicale/storage.py @@ -29,6 +29,8 @@ import json import os import posixpath import shutil +import stat +import threading import time from contextlib import contextmanager from hashlib import md5 @@ -37,6 +39,35 @@ from uuid import uuid4 import vobject +if os.name == "nt": + import ctypes + import ctypes.wintypes + import msvcrt + + LOCKFILE_EXCLUSIVE_LOCK = 2 + if ctypes.sizeof(ctypes.c_void_p) == 4: + ULONG_PTR = ctypes.c_uint32 + else: + ULONG_PTR = ctypes.c_uint64 + + class Overlapped(ctypes.Structure): + _fields_ = [("internal", ULONG_PTR), + ("internal_high", ULONG_PTR), + ("offset", ctypes.wintypes.DWORD), + ("offset_high", ctypes.wintypes.DWORD), + ("h_event", ctypes.wintypes.HANDLE)] + + lock_file_ex = ctypes.windll.kernel32.LockFileEx + lock_file_ex.argtypes = [ctypes.wintypes.HANDLE, + ctypes.wintypes.DWORD, + ctypes.wintypes.DWORD, + ctypes.wintypes.DWORD, + ctypes.wintypes.DWORD, + ctypes.POINTER(Overlapped)] + lock_file_ex.restype = ctypes.wintypes.BOOL +elif os.name == "posix": + import fcntl + def load(configuration, logger): """Load the storage manager chosen in configuration.""" @@ -245,6 +276,18 @@ class BaseCollection: """Get the unicode string representing the whole collection.""" raise NotImplementedError + @classmethod + def acquire_lock(cls, mode): + """Lock the whole storage. + + ``mode`` must either be "r" for shared access or "w" for exclusive + access. + + Returns an object which has a method ``release``. + + """ + raise NotImplementedError + class Collection(BaseCollection): """Collection stored in several files per calendar.""" @@ -474,3 +517,61 @@ class Collection(BaseCollection): elif self.get_meta("tag") == "VADDRESSBOOK": return "".join([item.serialize() for item in items]) return "" + + _lock = threading.Lock() + + @classmethod + def acquire_lock(cls, mode): + class Lock: + def __init__(self, release_method): + self._release_method = release_method + + def release(self): + self._release_method() + + if mode not in ("r", "w"): + raise ValueError("Invalid lock mode: %s" % mode) + folder = os.path.expanduser( + cls.configuration.get("storage", "filesystem_folder")) + if not os.path.exists(folder): + os.makedirs(folder, exist_ok=True) + lock_path = os.path.join(folder, "Radicale.lock") + lock_file = open(lock_path, "w+") + # set access rights to a necessary minimum to prevent locking by + # arbitrary users + try: + os.chmod(lock_path, stat.S_IWUSR | stat.S_IRUSR) + except OSError: + cls.logger.debug("Failed to set permissions on lock file") + locked = False + if os.name == "nt": + handle = msvcrt.get_osfhandle(lock_file.fileno()) + flags = LOCKFILE_EXCLUSIVE_LOCK if mode == "w" else 0 + overlapped = Overlapped() + if lock_file_ex(handle, flags, 0, 1, 0, overlapped): + locked = True + elif os.name == "posix": + operation = fcntl.LOCK_EX if mode == "w" else fcntl.LOCK_SH + # According to documentation flock() is emulated with fcntl() on + # some platforms. fcntl() locks are not associated with an open + # file descriptor. The same file can be locked multiple times + # within the same process and if any fd of the file is closed, + # all locks are released. + # flock() does not work on NFS shares. + try: + fcntl.flock(lock_file.fileno(), operation) + except OSError: + pass + else: + locked = True + if locked: + lock = Lock(lock_file.close) + else: + cls.logger.debug("Locking not supported") + lock_file.close() + # Fallback to primitive lock which only works within one process + # and doesn't distinguish between shared and exclusive access. + # TODO: use readers–writer lock + cls._lock.acquire() + lock = Lock(cls._lock.release) + return lock From ff3b31fed6d1c090e4957a4c3d9432a375f2d216 Mon Sep 17 00:00:00 2001 From: Unrud Date: Sat, 21 May 2016 00:52:22 +0200 Subject: [PATCH 2/8] Use threading in integrated webserver --- radicale/__init__.py | 9 +++++++++ radicale/__main__.py | 7 ++++--- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/radicale/__init__.py b/radicale/__init__.py index 9894d9f..17866f7 100644 --- a/radicale/__init__.py +++ b/radicale/__init__.py @@ -30,6 +30,7 @@ import os import pprint import base64 import socket +import socketserver import ssl import wsgiref.simple_server import re @@ -92,6 +93,14 @@ class HTTPSServer(HTTPServer): self.server_activate() +class ThreadedHTTPServer(socketserver.ThreadingMixIn, HTTPServer): + pass + + +class ThreadedHTTPSServer(socketserver.ThreadingMixIn, HTTPSServer): + pass + + class RequestHandler(wsgiref.simple_server.WSGIRequestHandler): """HTTP requests handler.""" def log_message(self, *args, **kwargs): diff --git a/radicale/__main__.py b/radicale/__main__.py index 73f7f2b..c60b834 100644 --- a/radicale/__main__.py +++ b/radicale/__main__.py @@ -33,7 +33,8 @@ import ssl from wsgiref.simple_server import make_server from . import ( - Application, config, HTTPServer, HTTPSServer, log, RequestHandler, VERSION) + Application, config, ThreadedHTTPServer, ThreadedHTTPSServer, log, + RequestHandler, VERSION) # This is a script, many branches and variables @@ -152,7 +153,7 @@ def run(): # Create collection servers servers = {} if configuration.getboolean("server", "ssl"): - server_class = HTTPSServer + server_class = ThreadedHTTPSServer server_class.certificate = configuration.get("server", "certificate") server_class.key = configuration.get("server", "key") server_class.cyphers = configuration.get("server", "cyphers") @@ -168,7 +169,7 @@ def run(): "Error while reading SSL %s %r: %s" % ( name, filename, exception)) else: - server_class = HTTPServer + server_class = ThreadedHTTPServer if not configuration.getboolean("server", "dns_lookup"): RequestHandler.address_string = lambda self: self.client_address[0] From bca6cec6b3999f03212d8d115b24891ce6c10854 Mon Sep 17 00:00:00 2001 From: Unrud Date: Sun, 22 May 2016 08:47:19 +0200 Subject: [PATCH 3/8] Use context manager for locking --- radicale/__init__.py | 60 +++++++++++++++++++------------------------- radicale/storage.py | 9 ++++--- 2 files changed, 31 insertions(+), 38 deletions(-) diff --git a/radicale/__init__.py b/radicale/__init__.py index 17866f7..e740bb2 100644 --- a/radicale/__init__.py +++ b/radicale/__init__.py @@ -282,45 +282,37 @@ class Application: is_authenticated = self.is_authenticated(user, password) is_valid_user = is_authenticated or not user - lock = None - try: - if is_valid_user: - if function in (self.do_GET, self.do_HEAD, - self.do_OPTIONS, self.do_PROPFIND, - self.do_REPORT): - lock_mode = "r" - else: - lock_mode = "w" - lock = self.Collection.acquire_lock(lock_mode) + # Get content + content_length = int(environ.get("CONTENT_LENGTH") or 0) + if content_length: + content = self.decode( + environ["wsgi.input"].read(content_length), environ) + self.logger.debug("Request content:\n%s" % content) + else: + content = None + if is_valid_user: + if function in (self.do_GET, self.do_HEAD, + self.do_OPTIONS, self.do_PROPFIND, + self.do_REPORT): + lock_mode = "r" + else: + lock_mode = "w" + with self.Collection.acquire_lock(lock_mode): items = self.Collection.discover( path, environ.get("HTTP_DEPTH", "0")) read_allowed_items, write_allowed_items = ( self.collect_allowed_items(items, user)) - else: - read_allowed_items, write_allowed_items = None, None - - # Get content - content_length = int(environ.get("CONTENT_LENGTH") or 0) - if content_length: - content = self.decode( - environ["wsgi.input"].read(content_length), environ) - self.logger.debug("Request content:\n%s" % content) - else: - content = None - - if is_valid_user and ( - (read_allowed_items or write_allowed_items) or - (is_authenticated and function == self.do_PROPFIND) or - function == self.do_OPTIONS): - status, headers, answer = function( - environ, read_allowed_items, write_allowed_items, content, - user) - else: - status, headers, answer = NOT_ALLOWED - finally: - if lock: - lock.release() + if (read_allowed_items or write_allowed_items or + is_authenticated and function == self.do_PROPFIND or + function == self.do_OPTIONS): + status, headers, answer = function( + environ, read_allowed_items, write_allowed_items, + content, user) + else: + status, headers, answer = NOT_ALLOWED + else: + status, headers, answer = NOT_ALLOWED if (status, headers, answer) == NOT_ALLOWED and not is_authenticated: # Unknown or unauthorized user diff --git a/radicale/storage.py b/radicale/storage.py index 7e4c2d8..c386f70 100644 --- a/radicale/storage.py +++ b/radicale/storage.py @@ -277,14 +277,13 @@ class BaseCollection: raise NotImplementedError @classmethod + @contextmanager def acquire_lock(cls, mode): - """Lock the whole storage. + """Set a context manager to lock the whole storage. ``mode`` must either be "r" for shared access or "w" for exclusive access. - Returns an object which has a method ``release``. - """ raise NotImplementedError @@ -521,6 +520,7 @@ class Collection(BaseCollection): _lock = threading.Lock() @classmethod + @contextmanager def acquire_lock(cls, mode): class Lock: def __init__(self, release_method): @@ -574,4 +574,5 @@ class Collection(BaseCollection): # TODO: use readers–writer lock cls._lock.acquire() lock = Lock(cls._lock.release) - return lock + yield + lock.release() From eb9218354c88dde62be73bca940e69faabb2ba44 Mon Sep 17 00:00:00 2001 From: Unrud Date: Sun, 22 May 2016 09:16:18 +0200 Subject: [PATCH 4/8] Always use readers-writer lock in storage locking --- radicale/storage.py | 51 ++++++++++++++++++++++++--------------------- 1 file changed, 27 insertions(+), 24 deletions(-) diff --git a/radicale/storage.py b/radicale/storage.py index c386f70..d6bdd81 100644 --- a/radicale/storage.py +++ b/radicale/storage.py @@ -517,20 +517,31 @@ class Collection(BaseCollection): return "".join([item.serialize() for item in items]) return "" - _lock = threading.Lock() + _lock = threading.Condition() + _readers = 0 + _writer = False @classmethod @contextmanager def acquire_lock(cls, mode): - class Lock: - def __init__(self, release_method): - self._release_method = release_method - - def release(self): - self._release_method() + def condition(): + if mode == "r": + return not cls._writer + else: + return not cls._writer and cls._readers == 0 if mode not in ("r", "w"): raise ValueError("Invalid lock mode: %s" % mode) + # Use a primitive lock which only works within one process as a + # precondition for inter-process file-based locking + with cls._lock: + cls._lock.wait_for(condition) + if mode == "r": + cls._readers += 1 + # notify additional potential readers + cls._lock.notify() + else: + cls._writer = True folder = os.path.expanduser( cls.configuration.get("storage", "filesystem_folder")) if not os.path.exists(folder): @@ -543,13 +554,12 @@ class Collection(BaseCollection): os.chmod(lock_path, stat.S_IWUSR | stat.S_IRUSR) except OSError: cls.logger.debug("Failed to set permissions on lock file") - locked = False if os.name == "nt": handle = msvcrt.get_osfhandle(lock_file.fileno()) flags = LOCKFILE_EXCLUSIVE_LOCK if mode == "w" else 0 overlapped = Overlapped() - if lock_file_ex(handle, flags, 0, 1, 0, overlapped): - locked = True + if not lock_file_ex(handle, flags, 0, 1, 0, overlapped): + cls.logger.debug("Locking not supported") elif os.name == "posix": operation = fcntl.LOCK_EX if mode == "w" else fcntl.LOCK_SH # According to documentation flock() is emulated with fcntl() on @@ -561,18 +571,11 @@ class Collection(BaseCollection): try: fcntl.flock(lock_file.fileno(), operation) except OSError: - pass - else: - locked = True - if locked: - lock = Lock(lock_file.close) - else: - cls.logger.debug("Locking not supported") - lock_file.close() - # Fallback to primitive lock which only works within one process - # and doesn't distinguish between shared and exclusive access. - # TODO: use readers–writer lock - cls._lock.acquire() - lock = Lock(cls._lock.release) + cls.logger.debug("Locking not supported") yield - lock.release() + with cls._lock: + if mode == "r": + cls._readers -= 1 + else: + cls._writer = False + cls._lock.notify() From 49bc0728e3c2dc0d57f73e0d7af33bc9cda65e0e Mon Sep 17 00:00:00 2001 From: Unrud Date: Sun, 22 May 2016 09:47:36 +0200 Subject: [PATCH 5/8] Only one lock file per process (lockf() works now) lockf() is more portable than flock() --- radicale/storage.py | 59 ++++++++++++++++++++++----------------------- 1 file changed, 29 insertions(+), 30 deletions(-) diff --git a/radicale/storage.py b/radicale/storage.py index d6bdd81..7ffeca0 100644 --- a/radicale/storage.py +++ b/radicale/storage.py @@ -518,6 +518,7 @@ class Collection(BaseCollection): return "" _lock = threading.Condition() + _lock_file = None _readers = 0 _writer = False @@ -542,40 +543,38 @@ class Collection(BaseCollection): cls._lock.notify() else: cls._writer = True - folder = os.path.expanduser( - cls.configuration.get("storage", "filesystem_folder")) - if not os.path.exists(folder): - os.makedirs(folder, exist_ok=True) - lock_path = os.path.join(folder, "Radicale.lock") - lock_file = open(lock_path, "w+") - # set access rights to a necessary minimum to prevent locking by - # arbitrary users - try: - os.chmod(lock_path, stat.S_IWUSR | stat.S_IRUSR) - except OSError: - cls.logger.debug("Failed to set permissions on lock file") - if os.name == "nt": - handle = msvcrt.get_osfhandle(lock_file.fileno()) - flags = LOCKFILE_EXCLUSIVE_LOCK if mode == "w" else 0 - overlapped = Overlapped() - if not lock_file_ex(handle, flags, 0, 1, 0, overlapped): - cls.logger.debug("Locking not supported") - elif os.name == "posix": - operation = fcntl.LOCK_EX if mode == "w" else fcntl.LOCK_SH - # According to documentation flock() is emulated with fcntl() on - # some platforms. fcntl() locks are not associated with an open - # file descriptor. The same file can be locked multiple times - # within the same process and if any fd of the file is closed, - # all locks are released. - # flock() does not work on NFS shares. - try: - fcntl.flock(lock_file.fileno(), operation) - except OSError: - cls.logger.debug("Locking not supported") + if not cls._lock_file: + folder = os.path.expanduser( + cls.configuration.get("storage", "filesystem_folder")) + if not os.path.exists(folder): + os.makedirs(folder, exist_ok=True) + lock_path = os.path.join(folder, "Radicale.lock") + cls._lock_file = open(lock_path, "w+") + # set access rights to a necessary minimum to prevent locking + # by arbitrary users + try: + os.chmod(lock_path, stat.S_IWUSR | stat.S_IRUSR) + except OSError: + cls.logger.debug("Failed to set permissions on lock file") + if os.name == "nt": + handle = msvcrt.get_osfhandle(cls._lock_file.fileno()) + flags = LOCKFILE_EXCLUSIVE_LOCK if mode == "w" else 0 + overlapped = Overlapped() + if not lock_file_ex(handle, flags, 0, 1, 0, overlapped): + cls.logger.debug("Locking not supported") + elif os.name == "posix": + _cmd = fcntl.LOCK_EX if mode == "w" else fcntl.LOCK_SH + try: + fcntl.lockf(cls._lock_file.fileno(), _cmd) + except OSError: + cls.logger.debug("Locking not supported") yield with cls._lock: if mode == "r": cls._readers -= 1 else: cls._writer = False + if cls._readers == 0: + cls._lock_file.close() + cls._lock_file = None cls._lock.notify() From 1ea9a33101dbac442f30c688ea53e63bb9175cd1 Mon Sep 17 00:00:00 2001 From: Unrud Date: Sun, 22 May 2016 10:07:47 +0200 Subject: [PATCH 6/8] Reuse lock file --- radicale/storage.py | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/radicale/storage.py b/radicale/storage.py index 7ffeca0..68a54ff 100644 --- a/radicale/storage.py +++ b/radicale/storage.py @@ -65,6 +65,13 @@ if os.name == "nt": ctypes.wintypes.DWORD, ctypes.POINTER(Overlapped)] lock_file_ex.restype = ctypes.wintypes.BOOL + unlock_file_ex = ctypes.windll.kernel32.UnlockFileEx + unlock_file_ex.argtypes = [ctypes.wintypes.HANDLE, + ctypes.wintypes.DWORD, + ctypes.wintypes.DWORD, + ctypes.wintypes.DWORD, + ctypes.POINTER(Overlapped)] + unlock_file_ex.restype = ctypes.wintypes.BOOL elif os.name == "posix": import fcntl @@ -519,6 +526,7 @@ class Collection(BaseCollection): _lock = threading.Condition() _lock_file = None + _lock_file_locked = False _readers = 0 _writer = False @@ -556,6 +564,7 @@ class Collection(BaseCollection): os.chmod(lock_path, stat.S_IWUSR | stat.S_IRUSR) except OSError: cls.logger.debug("Failed to set permissions on lock file") + if not cls._lock_file_locked: if os.name == "nt": handle = msvcrt.get_osfhandle(cls._lock_file.fileno()) flags = LOCKFILE_EXCLUSIVE_LOCK if mode == "w" else 0 @@ -568,6 +577,7 @@ class Collection(BaseCollection): fcntl.lockf(cls._lock_file.fileno(), _cmd) except OSError: cls.logger.debug("Locking not supported") + cls._lock_file_locked = True yield with cls._lock: if mode == "r": @@ -575,6 +585,15 @@ class Collection(BaseCollection): else: cls._writer = False if cls._readers == 0: - cls._lock_file.close() - cls._lock_file = None + if os.name == "nt": + handle = msvcrt.get_osfhandle(cls._lock_file.fileno()) + overlapped = Overlapped() + if not unlock_file_ex(handle, 0, 1, 0, overlapped): + cls.logger.debug("Unlocking not supported") + elif os.name == "posix": + try: + fcntl.lockf(cls._lock_file.fileno(), fcntl.LOCK_UN) + except OSError: + cls.logger.debug("Unlocking not supported") + cls._lock_file_locked = False cls._lock.notify() From 14daa88772daee5594b48751c066bccfefe1629d Mon Sep 17 00:00:00 2001 From: Unrud Date: Sun, 22 May 2016 22:30:46 +0200 Subject: [PATCH 7/8] Use FIFO for storage locking This prevents starvation of writers in the current process --- radicale/storage.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/radicale/storage.py b/radicale/storage.py index 68a54ff..c795aeb 100644 --- a/radicale/storage.py +++ b/radicale/storage.py @@ -524,7 +524,8 @@ class Collection(BaseCollection): return "".join([item.serialize() for item in items]) return "" - _lock = threading.Condition() + _lock = threading.Lock() + _waiters = [] _lock_file = None _lock_file_locked = False _readers = 0 @@ -544,11 +545,20 @@ class Collection(BaseCollection): # Use a primitive lock which only works within one process as a # precondition for inter-process file-based locking with cls._lock: - cls._lock.wait_for(condition) + if cls._waiters or not condition(): + # use FIFO for access requests + waiter = threading.Condition(lock=cls._lock) + cls._waiters.append(waiter) + while True: + waiter.wait() + if condition(): + break + cls._waiters.pop(0) if mode == "r": cls._readers += 1 # notify additional potential readers - cls._lock.notify() + if cls._waiters: + cls._waiters[0].notify() else: cls._writer = True if not cls._lock_file: @@ -596,4 +606,5 @@ class Collection(BaseCollection): except OSError: cls.logger.debug("Unlocking not supported") cls._lock_file_locked = False - cls._lock.notify() + if cls._waiters: + cls._waiters[0].notify() From 6b1acd14d70b2e029aa56905518b3185a66d91c2 Mon Sep 17 00:00:00 2001 From: Unrud Date: Sun, 22 May 2016 22:41:19 +0200 Subject: [PATCH 8/8] Limit duration of file locks This prevents starvation of writers in other processes --- radicale/storage.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/radicale/storage.py b/radicale/storage.py index c795aeb..2223966 100644 --- a/radicale/storage.py +++ b/radicale/storage.py @@ -92,6 +92,7 @@ def load(configuration, logger): MIMETYPES = {"VADDRESSBOOK": "text/vcard", "VCALENDAR": "text/calendar"} +MAX_FILE_LOCK_DURATION = 0.25 def get_etag(text): @@ -528,6 +529,7 @@ class Collection(BaseCollection): _waiters = [] _lock_file = None _lock_file_locked = False + _lock_file_time = 0 _readers = 0 _writer = False @@ -535,6 +537,11 @@ class Collection(BaseCollection): @contextmanager def acquire_lock(cls, mode): def condition(): + # Prevent starvation of writers in other processes + if cls._lock_file_locked: + time_delta = time.time() - cls._lock_file_time + if time_delta < 0 or time_delta > MAX_FILE_LOCK_DURATION: + return False if mode == "r": return not cls._writer else: @@ -588,6 +595,7 @@ class Collection(BaseCollection): except OSError: cls.logger.debug("Locking not supported") cls._lock_file_locked = True + cls._lock_file_time = time.time() yield with cls._lock: if mode == "r":