diff --git a/offlineimap/accounts.py b/offlineimap/accounts.py index 2871be5..2dc288d 100644 --- a/offlineimap/accounts.py +++ b/offlineimap/accounts.py @@ -425,7 +425,8 @@ class SyncableAccount(Account): def syncfolder(account, remotefolder, quick): """Synchronizes given remote folder for the specified account. - Filtered folders on the remote side will not invoke this function.""" + Filtered folders on the remote side will not invoke this function. However, + this might be called in a concurrently.""" def check_uid_validity(localfolder, remotefolder, statusfolder): # If either the local or the status folder has messages and @@ -454,7 +455,7 @@ def syncfolder(account, remotefolder, quick): fd.close() def cachemessagelists_upto_date(localfolder, remotefolder, date): - """ Returns messages with uid > min(uids of messages newer than date).""" + """Returns messages with uid > min(uids of messages newer than date).""" localfolder.cachemessagelist(min_date=date) check_uid_validity(localfolder, remotefolder, statusfolder) @@ -474,7 +475,7 @@ def syncfolder(account, remotefolder, quick): min_date=time.gmtime(time.mktime(date) + 24*60*60)) def cachemessagelists_startdate(new, partial, date): - """ Retrieve messagelists when startdate has been set for + """Retrieve messagelists when startdate has been set for the folder 'partial'. Idea: suppose you want to clone the messages after date in one @@ -489,8 +490,7 @@ def syncfolder(account, remotefolder, quick): might not correspond. But, if we're cloning a folder into a new one, [min_uid, ...] does correspond to [1, ...]. - This is just for IMAP-IMAP. For Maildir-IMAP, use maxage instead. - """ + This is just for IMAP-IMAP. For Maildir-IMAP, use maxage instead.""" new.cachemessagelist() min_uid = partial.retrieve_min_uid() @@ -602,8 +602,8 @@ def syncfolder(account, remotefolder, quick): if e.severity > OfflineImapError.ERROR.FOLDER: raise else: - ui.error(e, exc_info()[2], msg = "Aborting sync, folder '%s' " - "[acc: '%s']" % (localfolder, account)) + ui.error(e, exc_info()[2], msg="Aborting sync, folder '%s' " + "[acc: '%s']"% (localfolder, account)) except Exception as e: ui.error(e, msg = "ERROR in syncfolder for %s folder %s: %s"% (account, remotefolder.getvisiblename(), traceback.format_exc())) diff --git a/offlineimap/folder/LocalStatusSQLite.py b/offlineimap/folder/LocalStatusSQLite.py index e2478b4..5899a48 100644 --- a/offlineimap/folder/LocalStatusSQLite.py +++ b/offlineimap/folder/LocalStatusSQLite.py @@ -16,13 +16,39 @@ # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA import os -import six import sqlite3 as sqlite from sys import exc_info from threading import Lock +import six + from .Base import BaseFolder +class DatabaseFileLock(object): + """Lock at database file level.""" + + def __init__(self): + self._lock = Lock() + self._counter = 0 + + def __enter__(self): + self._lock.acquire() + + def __exit__(self, typ, value, tb): + self._lock.release() + + def registerNewUser(self): + self._counter += 1 + + def removeOneUser(self): + self._counter -= 1 + + def getLock(self): + return self._lock + + def shouldClose(self): + return self._counter < 1 + class LocalStatusSQLiteFolder(BaseFolder): """LocalStatus backend implemented with an SQLite database @@ -42,11 +68,10 @@ class LocalStatusSQLiteFolder(BaseFolder): # Current version of our db format. cur_version = 2 # Keep track on how many threads need access to the database. - threads_open_count = 0 - threads_open_lock = Lock() + locks = {} # Key: filename, value: DatabaseFileLock instance. def __init__(self, name, repository): - self.sep = '.' # Needs to be set before super.__init__() + self.sep = '.' # Needs to be set before super().__init__(). super(LocalStatusSQLiteFolder, self).__init__(name, repository) self.root = repository.root self.filename = os.path.join(self.getroot(), self.getfolderbasename()) @@ -60,19 +85,24 @@ class LocalStatusSQLiteFolder(BaseFolder): raise UserWarning("SQLite database path '%s' is not a directory."% dirname) + self.connection = None # This lock protects against concurrent writes in same connection. self._dblock = Lock() - self.connection = None def openfiles(self): - # Protect the creation/upgrade of database accross threads. - with LocalStatusSQLiteFolder.threads_open_lock: - # Try to establish connection, no need for threadsafety in __init__. + # Make sure sqlite is in multithreading SERIALIZE mode. + assert sqlite.threadsafety == 1, 'Your sqlite is not multithreading safe.' + # Protect the creation/upgrade of database accross threads. + if self.filename not in LocalStatusSQLiteFolder.locks: + LocalStatusSQLiteFolder.locks[self.filename] = DatabaseFileLock() + databaseFileLock = LocalStatusSQLiteFolder.locks[self.filename] + with databaseFileLock.getLock(): + # Try to establish connection, no need for threadsafety in __init__. try: self.connection = sqlite.connect(self.filename, check_same_thread=False) - LocalStatusSQLiteFolder.threads_open_count += 1 + databaseFileLock.registerNewUser() except sqlite.OperationalError as e: # Operation had failed. six.reraise(UserWarning, @@ -83,9 +113,6 @@ class LocalStatusSQLiteFolder(BaseFolder): (self.filename, e)), exc_info()[2]) - # Make sure sqlite is in multithreading SERIALIZE mode. - assert sqlite.threadsafety == 1, 'Your sqlite is not multithreading safe.' - # Test if db version is current enough and if db is readable. try: cursor = self.connection.execute( @@ -118,32 +145,32 @@ class LocalStatusSQLiteFolder(BaseFolder): def isnewfolder(self): return self._newfolder - def __sql_write(self, sql, vars=None, executemany=False): + def __sql_write(self, sql, args=None, executemany=False): """Execute some SQL, retrying if the db was locked. :param sql: the SQL string passed to execute() - :param vars: the variable values to `sql`. E.g. (1,2) or {uid:1, + :param args: the variable values to `sql`. E.g. (1,2) or {uid:1, flags:'T'}. See sqlite docs for possibilities. :param executemany: bool indicating whether we want to perform conn.executemany() or conn.execute(). - :returns: the Cursor() or raises an Exception""" + :returns: None or raises an Exception.""" success = False while not success: - self._dblock.acquire() try: - if vars is None: - if executemany: - cursor = self.connection.executemany(sql) + with LocalStatusSQLiteFolder.locks[self.filename].getLock(): + if args is None: + if executemany: + self.connection.executemany(sql) + else: + self.connection.execute(sql) else: - cursor = self.connection.execute(sql) - else: - if executemany: - cursor = self.connection.executemany(sql, vars) - else: - cursor = self.connection.execute(sql, vars) - success = True - self.connection.commit() + if executemany: + self.connection.executemany(sql, args) + else: + self.connection.execute(sql, args) + success = True + self.connection.commit() except sqlite.OperationalError as e: if e.args[0] == 'cannot commit - no transaction is active': pass @@ -152,9 +179,6 @@ class LocalStatusSQLiteFolder(BaseFolder): success = False else: raise - finally: - self._dblock.release() - return cursor def __upgrade_db(self, from_ver): """Upgrade the sqlite format from version 'from_ver' to current""" @@ -229,9 +253,10 @@ class LocalStatusSQLiteFolder(BaseFolder): self.messagelist[uid]['mtime'] = row[2] def closefiles(self): - with LocalStatusSQLiteFolder.threads_open_lock: - LocalStatusSQLiteFolder.threads_open_count -= 1 - if self.threads_open_count < 1: + databaseFileLock = LocalStatusSQLiteFolder.locks[self.filename] + with databaseFileLock.getLock(): + databaseFileLock.removeOneUser() + if databaseFileLock.shouldClose(): try: self.connection.close() except: diff --git a/offlineimap/repository/LocalStatus.py b/offlineimap/repository/LocalStatus.py index becbda2..f23020f 100644 --- a/offlineimap/repository/LocalStatus.py +++ b/offlineimap/repository/LocalStatus.py @@ -95,6 +95,7 @@ class LocalStatusRepository(BaseRepository): # data. This might happen if the user removed the folder (maildir) and # it is re-created afterwards. folder.purge() + folder.openfiles() folder.save() folder.closefiles()