sqlite: properly serialize operations on the databases
1. There is one database per folder and sqlite requires to serialize the
writings. Instead of locking at LocalStatusSQLiteFolder instance level,
introduce a new DatabaseFileLock object which is shared across threads. This
fixes the concurrent writes issues that some users might experience by
duplications or flags restored to the previous state.
2. Close the database only when we are sure no other threads will use the
connection on a *per-file* basis. Previous fix 677afb8d8f
is wrong
because the same lock is shared for all the database files.
Github-fix: https://github.com/OfflineIMAP/offlineimap/issues/350
Signed-off-by: Nicolas Sebrecht <nicolas.s-dev@laposte.net>
This commit is contained in:
@ -16,13 +16,39 @@
|
||||
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
|
||||
|
||||
import os
|
||||
import six
|
||||
import sqlite3 as sqlite
|
||||
from sys import exc_info
|
||||
from threading import Lock
|
||||
|
||||
import six
|
||||
|
||||
from .Base import BaseFolder
|
||||
|
||||
class DatabaseFileLock(object):
|
||||
"""Lock at database file level."""
|
||||
|
||||
def __init__(self):
|
||||
self._lock = Lock()
|
||||
self._counter = 0
|
||||
|
||||
def __enter__(self):
|
||||
self._lock.acquire()
|
||||
|
||||
def __exit__(self, typ, value, tb):
|
||||
self._lock.release()
|
||||
|
||||
def registerNewUser(self):
|
||||
self._counter += 1
|
||||
|
||||
def removeOneUser(self):
|
||||
self._counter -= 1
|
||||
|
||||
def getLock(self):
|
||||
return self._lock
|
||||
|
||||
def shouldClose(self):
|
||||
return self._counter < 1
|
||||
|
||||
|
||||
class LocalStatusSQLiteFolder(BaseFolder):
|
||||
"""LocalStatus backend implemented with an SQLite database
|
||||
@ -42,11 +68,10 @@ class LocalStatusSQLiteFolder(BaseFolder):
|
||||
# Current version of our db format.
|
||||
cur_version = 2
|
||||
# Keep track on how many threads need access to the database.
|
||||
threads_open_count = 0
|
||||
threads_open_lock = Lock()
|
||||
locks = {} # Key: filename, value: DatabaseFileLock instance.
|
||||
|
||||
def __init__(self, name, repository):
|
||||
self.sep = '.' # Needs to be set before super.__init__()
|
||||
self.sep = '.' # Needs to be set before super().__init__().
|
||||
super(LocalStatusSQLiteFolder, self).__init__(name, repository)
|
||||
self.root = repository.root
|
||||
self.filename = os.path.join(self.getroot(), self.getfolderbasename())
|
||||
@ -60,19 +85,24 @@ class LocalStatusSQLiteFolder(BaseFolder):
|
||||
raise UserWarning("SQLite database path '%s' is not a directory."%
|
||||
dirname)
|
||||
|
||||
self.connection = None
|
||||
# This lock protects against concurrent writes in same connection.
|
||||
self._dblock = Lock()
|
||||
self.connection = None
|
||||
|
||||
def openfiles(self):
|
||||
# Protect the creation/upgrade of database accross threads.
|
||||
with LocalStatusSQLiteFolder.threads_open_lock:
|
||||
# Try to establish connection, no need for threadsafety in __init__.
|
||||
# Make sure sqlite is in multithreading SERIALIZE mode.
|
||||
assert sqlite.threadsafety == 1, 'Your sqlite is not multithreading safe.'
|
||||
|
||||
# Protect the creation/upgrade of database accross threads.
|
||||
if self.filename not in LocalStatusSQLiteFolder.locks:
|
||||
LocalStatusSQLiteFolder.locks[self.filename] = DatabaseFileLock()
|
||||
databaseFileLock = LocalStatusSQLiteFolder.locks[self.filename]
|
||||
with databaseFileLock.getLock():
|
||||
# Try to establish connection, no need for threadsafety in __init__.
|
||||
try:
|
||||
self.connection = sqlite.connect(self.filename,
|
||||
check_same_thread=False)
|
||||
LocalStatusSQLiteFolder.threads_open_count += 1
|
||||
databaseFileLock.registerNewUser()
|
||||
except sqlite.OperationalError as e:
|
||||
# Operation had failed.
|
||||
six.reraise(UserWarning,
|
||||
@ -83,9 +113,6 @@ class LocalStatusSQLiteFolder(BaseFolder):
|
||||
(self.filename, e)),
|
||||
exc_info()[2])
|
||||
|
||||
# Make sure sqlite is in multithreading SERIALIZE mode.
|
||||
assert sqlite.threadsafety == 1, 'Your sqlite is not multithreading safe.'
|
||||
|
||||
# Test if db version is current enough and if db is readable.
|
||||
try:
|
||||
cursor = self.connection.execute(
|
||||
@ -118,32 +145,32 @@ class LocalStatusSQLiteFolder(BaseFolder):
|
||||
def isnewfolder(self):
|
||||
return self._newfolder
|
||||
|
||||
def __sql_write(self, sql, vars=None, executemany=False):
|
||||
def __sql_write(self, sql, args=None, executemany=False):
|
||||
"""Execute some SQL, retrying if the db was locked.
|
||||
|
||||
:param sql: the SQL string passed to execute()
|
||||
:param vars: the variable values to `sql`. E.g. (1,2) or {uid:1,
|
||||
:param args: the variable values to `sql`. E.g. (1,2) or {uid:1,
|
||||
flags:'T'}. See sqlite docs for possibilities.
|
||||
:param executemany: bool indicating whether we want to
|
||||
perform conn.executemany() or conn.execute().
|
||||
:returns: the Cursor() or raises an Exception"""
|
||||
:returns: None or raises an Exception."""
|
||||
|
||||
success = False
|
||||
while not success:
|
||||
self._dblock.acquire()
|
||||
try:
|
||||
if vars is None:
|
||||
if executemany:
|
||||
cursor = self.connection.executemany(sql)
|
||||
with LocalStatusSQLiteFolder.locks[self.filename].getLock():
|
||||
if args is None:
|
||||
if executemany:
|
||||
self.connection.executemany(sql)
|
||||
else:
|
||||
self.connection.execute(sql)
|
||||
else:
|
||||
cursor = self.connection.execute(sql)
|
||||
else:
|
||||
if executemany:
|
||||
cursor = self.connection.executemany(sql, vars)
|
||||
else:
|
||||
cursor = self.connection.execute(sql, vars)
|
||||
success = True
|
||||
self.connection.commit()
|
||||
if executemany:
|
||||
self.connection.executemany(sql, args)
|
||||
else:
|
||||
self.connection.execute(sql, args)
|
||||
success = True
|
||||
self.connection.commit()
|
||||
except sqlite.OperationalError as e:
|
||||
if e.args[0] == 'cannot commit - no transaction is active':
|
||||
pass
|
||||
@ -152,9 +179,6 @@ class LocalStatusSQLiteFolder(BaseFolder):
|
||||
success = False
|
||||
else:
|
||||
raise
|
||||
finally:
|
||||
self._dblock.release()
|
||||
return cursor
|
||||
|
||||
def __upgrade_db(self, from_ver):
|
||||
"""Upgrade the sqlite format from version 'from_ver' to current"""
|
||||
@ -229,9 +253,10 @@ class LocalStatusSQLiteFolder(BaseFolder):
|
||||
self.messagelist[uid]['mtime'] = row[2]
|
||||
|
||||
def closefiles(self):
|
||||
with LocalStatusSQLiteFolder.threads_open_lock:
|
||||
LocalStatusSQLiteFolder.threads_open_count -= 1
|
||||
if self.threads_open_count < 1:
|
||||
databaseFileLock = LocalStatusSQLiteFolder.locks[self.filename]
|
||||
with databaseFileLock.getLock():
|
||||
databaseFileLock.removeOneUser()
|
||||
if databaseFileLock.shouldClose():
|
||||
try:
|
||||
self.connection.close()
|
||||
except:
|
||||
|
Reference in New Issue
Block a user