backport: sqlite: properly serialize operations on the databases

1. There is one database per folder and sqlite requires to serialize the
writings. Instead of locking at LocalStatusSQLiteFolder object level, introduce
a new DatabaseFileLock object which is shared across threads. This fixes the
concurrent writes issues that some users might experience by duplications or
flags restored to the previous state.

2. Close the database only when we are sure no other threads will use the
connection on a *per-file* basis. Previous fix 677afb8d8f is wrong
because the same lock is shared for all the databases.

Github-fix: https://github.com/OfflineIMAP/offlineimap/issues/350
Signed-off-by: Nicolas Sebrecht <nicolas.s-dev@laposte.net>
This commit is contained in:
Nicolas Sebrecht 2016-07-25 14:39:11 +02:00
parent 41fdd4ee78
commit 038a433f69
3 changed files with 72 additions and 43 deletions

View File

@ -401,7 +401,8 @@ class SyncableAccount(Account):
def syncfolder(account, remotefolder, quick): def syncfolder(account, remotefolder, quick):
"""Synchronizes given remote folder for the specified account. """Synchronizes given remote folder for the specified account.
Filtered folders on the remote side will not invoke this function.""" Filtered folders on the remote side will not invoke this function. However,
this might be called in a concurrently."""
def check_uid_validity(localfolder, remotefolder, statusfolder): def check_uid_validity(localfolder, remotefolder, statusfolder):
# If either the local or the status folder has messages and # If either the local or the status folder has messages and
@ -430,7 +431,7 @@ def syncfolder(account, remotefolder, quick):
fd.close() fd.close()
def cachemessagelists_upto_date(localfolder, remotefolder, date): def cachemessagelists_upto_date(localfolder, remotefolder, date):
""" Returns messages with uid > min(uids of messages newer than date).""" """Returns messages with uid > min(uids of messages newer than date)."""
localfolder.cachemessagelist(min_date=date) localfolder.cachemessagelist(min_date=date)
check_uid_validity(localfolder, remotefolder, statusfolder) check_uid_validity(localfolder, remotefolder, statusfolder)
@ -451,7 +452,7 @@ def syncfolder(account, remotefolder, quick):
min_date=time.gmtime(time.mktime(date) + 24*60*60)) min_date=time.gmtime(time.mktime(date) + 24*60*60))
def cachemessagelists_startdate(new, partial, date): def cachemessagelists_startdate(new, partial, date):
""" Retrieve messagelists when startdate has been set for """Retrieve messagelists when startdate has been set for
the folder 'partial'. the folder 'partial'.
Idea: suppose you want to clone the messages after date in one Idea: suppose you want to clone the messages after date in one
@ -466,8 +467,7 @@ def syncfolder(account, remotefolder, quick):
might not correspond. But, if we're cloning a folder into a new one, might not correspond. But, if we're cloning a folder into a new one,
[min_uid, ...] does correspond to [1, ...]. [min_uid, ...] does correspond to [1, ...].
This is just for IMAP-IMAP. For Maildir-IMAP, use maxage instead. This is just for IMAP-IMAP. For Maildir-IMAP, use maxage instead."""
"""
new.cachemessagelist() new.cachemessagelist()
min_uid = partial.retrieve_min_uid() min_uid = partial.retrieve_min_uid()
@ -587,8 +587,8 @@ def syncfolder(account, remotefolder, quick):
if e.severity > OfflineImapError.ERROR.FOLDER: if e.severity > OfflineImapError.ERROR.FOLDER:
raise raise
else: else:
ui.error(e, exc_info()[2], msg = "Aborting sync, folder '%s' " ui.error(e, exc_info()[2], msg="Aborting sync, folder '%s' "
"[acc: '%s']" % (localfolder, account)) "[acc: '%s']"% (localfolder, account))
except Exception as e: except Exception as e:
ui.error(e, msg = "ERROR in syncfolder for %s folder %s: %s"% ui.error(e, msg = "ERROR in syncfolder for %s folder %s: %s"%
(account, remotefolder.getvisiblename(), traceback.format_exc())) (account, remotefolder.getvisiblename(), traceback.format_exc()))

View File

@ -15,6 +15,7 @@
# along with this program; if not, write to the Free Software # along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
import os import os
import sqlite3 as sqlite
from sys import exc_info from sys import exc_info
from threading import Lock from threading import Lock
try: try:
@ -22,8 +23,35 @@ try:
except: except:
pass #fail only if needed later on, not on import pass #fail only if needed later on, not on import
import six
from .Base import BaseFolder from .Base import BaseFolder
class DatabaseFileLock(object):
"""Lock at database file level."""
def __init__(self):
self._lock = Lock()
self._counter = 0
def __enter__(self):
self._lock.acquire()
def __exit__(self, typ, value, tb):
self._lock.release()
def registerNewUser(self):
self._counter += 1
def removeOneUser(self):
self._counter -= 1
def getLock(self):
return self._lock
def shouldClose(self):
return self._counter < 1
class LocalStatusSQLiteFolder(BaseFolder): class LocalStatusSQLiteFolder(BaseFolder):
"""LocalStatus backend implemented with an SQLite database """LocalStatus backend implemented with an SQLite database
@ -43,11 +71,10 @@ class LocalStatusSQLiteFolder(BaseFolder):
# Current version of our db format. # Current version of our db format.
cur_version = 2 cur_version = 2
# Keep track on how many threads need access to the database. # Keep track on how many threads need access to the database.
threads_open_count = 0 locks = {} # Key: filename, value: DatabaseFileLock instance.
threads_open_lock = Lock()
def __init__(self, name, repository): def __init__(self, name, repository):
self.sep = '.' # Needs to be set before super.__init__() self.sep = '.' # Needs to be set before super().__init__().
super(LocalStatusSQLiteFolder, self).__init__(name, repository) super(LocalStatusSQLiteFolder, self).__init__(name, repository)
self.root = repository.root self.root = repository.root
self.filename = os.path.join(self.getroot(), self.getfolderbasename()) self.filename = os.path.join(self.getroot(), self.getfolderbasename())
@ -62,22 +89,24 @@ class LocalStatusSQLiteFolder(BaseFolder):
raise UserWarning("SQLite database path '%s' is not a directory."% raise UserWarning("SQLite database path '%s' is not a directory."%
dirname) dirname)
self.connection = None
# This lock protects against concurrent writes in same connection. # This lock protects against concurrent writes in same connection.
self._dblock = Lock() self._dblock = Lock()
self.connection = None
def openfiles(self): def openfiles(self):
# Make sure sqlite is in multithreading SERIALIZE mode.
assert sqlite.threadsafety == 1, 'Your sqlite is not multithreading safe.'
# Protect the creation/upgrade of database accross threads. # Protect the creation/upgrade of database accross threads.
with LocalStatusSQLiteFolder.threads_open_lock: if self.filename not in LocalStatusSQLiteFolder.locks:
LocalStatusSQLiteFolder.locks[self.filename] = DatabaseFileLock()
databaseFileLock = LocalStatusSQLiteFolder.locks[self.filename]
with databaseFileLock.getLock():
# Try to establish connection, no need for threadsafety in __init__. # Try to establish connection, no need for threadsafety in __init__.
try: try:
self.connection = sqlite.connect(self.filename, check_same_thread=False) self.connection = sqlite.connect(self.filename,
LocalStatusSQLiteFolder.threads_open_count += 1 check_same_thread=False)
except NameError: databaseFileLock.registerNewUser()
# sqlite import had failed.
raise UserWarning("SQLite backend chosen, but cannot connect "
"with available bindings to '%s'. Is the sqlite3 package "
"installed?."% self.filename), None, exc_info()[2]
except sqlite.OperationalError as e: except sqlite.OperationalError as e:
# Operation had failed. # Operation had failed.
raise UserWarning("cannot open database file '%s': %s.\nYou might " raise UserWarning("cannot open database file '%s': %s.\nYou might "
@ -85,9 +114,6 @@ class LocalStatusSQLiteFolder(BaseFolder):
"with the 'sqlite<3>' command."% "with the 'sqlite<3>' command."%
(self.filename, e)), None, exc_info()[2] (self.filename, e)), None, exc_info()[2]
# Make sure sqlite is in multithreading SERIALIZE mode.
assert sqlite.threadsafety == 1, 'Your sqlite is not multithreading safe.'
# Test if db version is current enough and if db is readable. # Test if db version is current enough and if db is readable.
try: try:
cursor = self.connection.execute( cursor = self.connection.execute(
@ -120,32 +146,32 @@ class LocalStatusSQLiteFolder(BaseFolder):
self.__sql_write('DELETE FROM status') self.__sql_write('DELETE FROM status')
def __sql_write(self, sql, vars=None, executemany=False): def __sql_write(self, sql, args=None, executemany=False):
"""Execute some SQL, retrying if the db was locked. """Execute some SQL, retrying if the db was locked.
:param sql: the SQL string passed to execute() :param sql: the SQL string passed to execute()
:param vars: the variable values to `sql`. E.g. (1,2) or {uid:1, :param args: the variable values to `sql`. E.g. (1,2) or {uid:1,
flags:'T'}. See sqlite docs for possibilities. flags:'T'}. See sqlite docs for possibilities.
:param executemany: bool indicating whether we want to :param executemany: bool indicating whether we want to
perform conn.executemany() or conn.execute(). perform conn.executemany() or conn.execute().
:returns: the Cursor() or raises an Exception""" :returns: None or raises an Exception."""
success = False success = False
while not success: while not success:
self._dblock.acquire()
try: try:
if vars is None: with LocalStatusSQLiteFolder.locks[self.filename].getLock():
if executemany: if args is None:
cursor = self.connection.executemany(sql) if executemany:
self.connection.executemany(sql)
else:
self.connection.execute(sql)
else: else:
cursor = self.connection.execute(sql) if executemany:
else: self.connection.executemany(sql, args)
if executemany: else:
cursor = self.connection.executemany(sql, vars) self.connection.execute(sql, args)
else: success = True
cursor = self.connection.execute(sql, vars) self.connection.commit()
success = True
self.connection.commit()
except sqlite.OperationalError as e: except sqlite.OperationalError as e:
if e.args[0] == 'cannot commit - no transaction is active': if e.args[0] == 'cannot commit - no transaction is active':
pass pass
@ -154,9 +180,6 @@ class LocalStatusSQLiteFolder(BaseFolder):
success = False success = False
else: else:
raise raise
finally:
self._dblock.release()
return cursor
def __upgrade_db(self, from_ver): def __upgrade_db(self, from_ver):
"""Upgrade the sqlite format from version 'from_ver' to current""" """Upgrade the sqlite format from version 'from_ver' to current"""
@ -231,9 +254,10 @@ class LocalStatusSQLiteFolder(BaseFolder):
self.messagelist[uid]['mtime'] = row[2] self.messagelist[uid]['mtime'] = row[2]
def closefiles(self): def closefiles(self):
with LocalStatusSQLiteFolder.threads_open_lock: databaseFileLock = LocalStatusSQLiteFolder.locks[self.filename]
LocalStatusSQLiteFolder.threads_open_count -= 1 with databaseFileLock.getLock():
if self.threads_open_count < 1: databaseFileLock.removeOneUser()
if databaseFileLock.shouldClose():
try: try:
self.connection.close() self.connection.close()
except: except:

View File

@ -91,6 +91,11 @@ class LocalStatusRepository(BaseRepository):
# Create an empty StatusFolder # Create an empty StatusFolder
folder = self._instanciatefolder(foldername) folder = self._instanciatefolder(foldername)
# First delete any existing data to make sure we won't consider obsolete
# data. This might happen if the user removed the folder (maildir) and
# it is re-created afterwards.
folder.purge()
folder.openfiles()
folder.save() folder.save()
folder.closefiles() folder.closefiles()