diff --git a/offlineimap/accounts.py b/offlineimap/accounts.py index e00db6b..3549097 100644 --- a/offlineimap/accounts.py +++ b/offlineimap/accounts.py @@ -401,7 +401,8 @@ class SyncableAccount(Account): def syncfolder(account, remotefolder, quick): """Synchronizes given remote folder for the specified account. - Filtered folders on the remote side will not invoke this function.""" + Filtered folders on the remote side will not invoke this function. However, + this might be called in a concurrently.""" def check_uid_validity(localfolder, remotefolder, statusfolder): # If either the local or the status folder has messages and @@ -430,7 +431,7 @@ def syncfolder(account, remotefolder, quick): fd.close() def cachemessagelists_upto_date(localfolder, remotefolder, date): - """ Returns messages with uid > min(uids of messages newer than date).""" + """Returns messages with uid > min(uids of messages newer than date).""" localfolder.cachemessagelist(min_date=date) check_uid_validity(localfolder, remotefolder, statusfolder) @@ -451,7 +452,7 @@ def syncfolder(account, remotefolder, quick): min_date=time.gmtime(time.mktime(date) + 24*60*60)) def cachemessagelists_startdate(new, partial, date): - """ Retrieve messagelists when startdate has been set for + """Retrieve messagelists when startdate has been set for the folder 'partial'. Idea: suppose you want to clone the messages after date in one @@ -466,8 +467,7 @@ def syncfolder(account, remotefolder, quick): might not correspond. But, if we're cloning a folder into a new one, [min_uid, ...] does correspond to [1, ...]. - This is just for IMAP-IMAP. For Maildir-IMAP, use maxage instead. - """ + This is just for IMAP-IMAP. For Maildir-IMAP, use maxage instead.""" new.cachemessagelist() min_uid = partial.retrieve_min_uid() @@ -587,8 +587,8 @@ def syncfolder(account, remotefolder, quick): if e.severity > OfflineImapError.ERROR.FOLDER: raise else: - ui.error(e, exc_info()[2], msg = "Aborting sync, folder '%s' " - "[acc: '%s']" % (localfolder, account)) + ui.error(e, exc_info()[2], msg="Aborting sync, folder '%s' " + "[acc: '%s']"% (localfolder, account)) except Exception as e: ui.error(e, msg = "ERROR in syncfolder for %s folder %s: %s"% (account, remotefolder.getvisiblename(), traceback.format_exc())) diff --git a/offlineimap/folder/LocalStatusSQLite.py b/offlineimap/folder/LocalStatusSQLite.py index 8d3e9ee..09325ed 100644 --- a/offlineimap/folder/LocalStatusSQLite.py +++ b/offlineimap/folder/LocalStatusSQLite.py @@ -15,6 +15,7 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA import os +import sqlite3 as sqlite from sys import exc_info from threading import Lock try: @@ -22,8 +23,35 @@ try: except: pass #fail only if needed later on, not on import +import six + from .Base import BaseFolder +class DatabaseFileLock(object): + """Lock at database file level.""" + + def __init__(self): + self._lock = Lock() + self._counter = 0 + + def __enter__(self): + self._lock.acquire() + + def __exit__(self, typ, value, tb): + self._lock.release() + + def registerNewUser(self): + self._counter += 1 + + def removeOneUser(self): + self._counter -= 1 + + def getLock(self): + return self._lock + + def shouldClose(self): + return self._counter < 1 + class LocalStatusSQLiteFolder(BaseFolder): """LocalStatus backend implemented with an SQLite database @@ -43,11 +71,10 @@ class LocalStatusSQLiteFolder(BaseFolder): # Current version of our db format. cur_version = 2 # Keep track on how many threads need access to the database. - threads_open_count = 0 - threads_open_lock = Lock() + locks = {} # Key: filename, value: DatabaseFileLock instance. def __init__(self, name, repository): - self.sep = '.' # Needs to be set before super.__init__() + self.sep = '.' # Needs to be set before super().__init__(). super(LocalStatusSQLiteFolder, self).__init__(name, repository) self.root = repository.root self.filename = os.path.join(self.getroot(), self.getfolderbasename()) @@ -62,22 +89,24 @@ class LocalStatusSQLiteFolder(BaseFolder): raise UserWarning("SQLite database path '%s' is not a directory."% dirname) + self.connection = None # This lock protects against concurrent writes in same connection. self._dblock = Lock() - self.connection = None def openfiles(self): + # Make sure sqlite is in multithreading SERIALIZE mode. + assert sqlite.threadsafety == 1, 'Your sqlite is not multithreading safe.' + # Protect the creation/upgrade of database accross threads. - with LocalStatusSQLiteFolder.threads_open_lock: + if self.filename not in LocalStatusSQLiteFolder.locks: + LocalStatusSQLiteFolder.locks[self.filename] = DatabaseFileLock() + databaseFileLock = LocalStatusSQLiteFolder.locks[self.filename] + with databaseFileLock.getLock(): # Try to establish connection, no need for threadsafety in __init__. try: - self.connection = sqlite.connect(self.filename, check_same_thread=False) - LocalStatusSQLiteFolder.threads_open_count += 1 - except NameError: - # sqlite import had failed. - raise UserWarning("SQLite backend chosen, but cannot connect " - "with available bindings to '%s'. Is the sqlite3 package " - "installed?."% self.filename), None, exc_info()[2] + self.connection = sqlite.connect(self.filename, + check_same_thread=False) + databaseFileLock.registerNewUser() except sqlite.OperationalError as e: # Operation had failed. raise UserWarning("cannot open database file '%s': %s.\nYou might " @@ -85,9 +114,6 @@ class LocalStatusSQLiteFolder(BaseFolder): "with the 'sqlite<3>' command."% (self.filename, e)), None, exc_info()[2] - # Make sure sqlite is in multithreading SERIALIZE mode. - assert sqlite.threadsafety == 1, 'Your sqlite is not multithreading safe.' - # Test if db version is current enough and if db is readable. try: cursor = self.connection.execute( @@ -120,32 +146,32 @@ class LocalStatusSQLiteFolder(BaseFolder): self.__sql_write('DELETE FROM status') - def __sql_write(self, sql, vars=None, executemany=False): + def __sql_write(self, sql, args=None, executemany=False): """Execute some SQL, retrying if the db was locked. :param sql: the SQL string passed to execute() - :param vars: the variable values to `sql`. E.g. (1,2) or {uid:1, + :param args: the variable values to `sql`. E.g. (1,2) or {uid:1, flags:'T'}. See sqlite docs for possibilities. :param executemany: bool indicating whether we want to perform conn.executemany() or conn.execute(). - :returns: the Cursor() or raises an Exception""" + :returns: None or raises an Exception.""" success = False while not success: - self._dblock.acquire() try: - if vars is None: - if executemany: - cursor = self.connection.executemany(sql) + with LocalStatusSQLiteFolder.locks[self.filename].getLock(): + if args is None: + if executemany: + self.connection.executemany(sql) + else: + self.connection.execute(sql) else: - cursor = self.connection.execute(sql) - else: - if executemany: - cursor = self.connection.executemany(sql, vars) - else: - cursor = self.connection.execute(sql, vars) - success = True - self.connection.commit() + if executemany: + self.connection.executemany(sql, args) + else: + self.connection.execute(sql, args) + success = True + self.connection.commit() except sqlite.OperationalError as e: if e.args[0] == 'cannot commit - no transaction is active': pass @@ -154,9 +180,6 @@ class LocalStatusSQLiteFolder(BaseFolder): success = False else: raise - finally: - self._dblock.release() - return cursor def __upgrade_db(self, from_ver): """Upgrade the sqlite format from version 'from_ver' to current""" @@ -231,9 +254,10 @@ class LocalStatusSQLiteFolder(BaseFolder): self.messagelist[uid]['mtime'] = row[2] def closefiles(self): - with LocalStatusSQLiteFolder.threads_open_lock: - LocalStatusSQLiteFolder.threads_open_count -= 1 - if self.threads_open_count < 1: + databaseFileLock = LocalStatusSQLiteFolder.locks[self.filename] + with databaseFileLock.getLock(): + databaseFileLock.removeOneUser() + if databaseFileLock.shouldClose(): try: self.connection.close() except: diff --git a/offlineimap/repository/LocalStatus.py b/offlineimap/repository/LocalStatus.py index 5d33cdc..48bce1a 100644 --- a/offlineimap/repository/LocalStatus.py +++ b/offlineimap/repository/LocalStatus.py @@ -91,6 +91,11 @@ class LocalStatusRepository(BaseRepository): # Create an empty StatusFolder folder = self._instanciatefolder(foldername) + # First delete any existing data to make sure we won't consider obsolete + # data. This might happen if the user removed the folder (maildir) and + # it is re-created afterwards. + folder.purge() + folder.openfiles() folder.save() folder.closefiles()