prevent synchronization of identical folders from multiple threads
Use a mutex to lock the sync transaction, uniquely identified by the remote folder name. Serializing the syncs prevents any IDLE thread from syncing the same folder of a concurrent autorefresh thread. Othewise, we could download new messages twice, or read "one message past the last one" which in some cases creates a duplicated messages. Also prevents IDLE threads from reading new messages before the autorefresh thread finishes synchronization. Github-ref: #421 Originally-submitted-by: Michael Hohmuth <hohmuth@sax.de> Signed-off-by: Nicolas Sebrecht <nicolas.s-dev@laposte.net>
This commit is contained in:
		| @@ -15,7 +15,7 @@ | |||||||
| #    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA | #    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA | ||||||
|  |  | ||||||
| from subprocess import Popen, PIPE | from subprocess import Popen, PIPE | ||||||
| from threading import Event | from threading import Event, Lock | ||||||
| import os | import os | ||||||
| import time | import time | ||||||
| from sys import exc_info | from sys import exc_info | ||||||
| @@ -29,6 +29,9 @@ from offlineimap.ui import getglobalui | |||||||
| from offlineimap.threadutil import InstanceLimitedThread | from offlineimap.threadutil import InstanceLimitedThread | ||||||
|  |  | ||||||
| FOLDER_NAMESPACE = 'LIMITED_FOLDER_' | FOLDER_NAMESPACE = 'LIMITED_FOLDER_' | ||||||
|  | # Key: account name, Value: Dict of Key: remotefolder name, Value: lock. | ||||||
|  | SYNC_MUTEXES = {} | ||||||
|  | SYNC_MUTEXES_LOCK = Lock() | ||||||
|  |  | ||||||
| try: | try: | ||||||
|     import fcntl |     import fcntl | ||||||
| @@ -430,11 +433,35 @@ class SyncableAccount(Account): | |||||||
|             self.ui.error(e, exc_info()[2], msg="Calling hook") |             self.ui.error(e, exc_info()[2], msg="Calling hook") | ||||||
|  |  | ||||||
|  |  | ||||||
|  | #XXX: This function should likely be refactored. This should not be passed the | ||||||
|  | # account instance. | ||||||
| def syncfolder(account, remotefolder, quick): | def syncfolder(account, remotefolder, quick): | ||||||
|     """Synchronizes given remote folder for the specified account. |     """Synchronizes given remote folder for the specified account. | ||||||
|  |  | ||||||
|     Filtered folders on the remote side will not invoke this function. However, |     Filtered folders on the remote side will not invoke this function. | ||||||
|     this might be called in a concurrently.""" |  | ||||||
|  |     When called in concurrently for the same localfolder, syncs are | ||||||
|  |     serialized.""" | ||||||
|  |  | ||||||
|  |     def acquire_mutex(): | ||||||
|  |         account_name = account.getname() | ||||||
|  |         localfolder_name = localfolder.getfullname() | ||||||
|  |  | ||||||
|  |         with SYNC_MUTEXES_LOCK: | ||||||
|  |             if SYNC_MUTEXES.get(account_name) is None: | ||||||
|  |                 SYNC_MUTEXES[account_name] = {} | ||||||
|  |             # The localfolder full name is good to uniquely identify the sync | ||||||
|  |             # transaction. | ||||||
|  |             if SYNC_MUTEXES[account_name].get(localfolder_name) is None: | ||||||
|  |                 #XXX: This lock could be an external file lock so we can remove | ||||||
|  |                 # the lock at the account level. | ||||||
|  |                 SYNC_MUTEXES[account_name][localfolder_name] = Lock() | ||||||
|  |  | ||||||
|  |         # Acquire the lock. | ||||||
|  |         SYNC_MUTEXES[account_name][localfolder_name].acquire() | ||||||
|  |  | ||||||
|  |     def release_mutex(): | ||||||
|  |         SYNC_MUTEXES[account.getname()][localfolder.getfullname()].release() | ||||||
|  |  | ||||||
|     def check_uid_validity(): |     def check_uid_validity(): | ||||||
|         # If either the local or the status folder has messages and |         # If either the local or the status folder has messages and | ||||||
| @@ -528,6 +555,9 @@ def syncfolder(account, remotefolder, quick): | |||||||
|         # Load local folder. |         # Load local folder. | ||||||
|         localfolder = account.get_local_folder(remotefolder) |         localfolder = account.get_local_folder(remotefolder) | ||||||
|  |  | ||||||
|  |         # Acquire the mutex to start syncing. | ||||||
|  |         acquire_mutex() | ||||||
|  |  | ||||||
|         # Add the folder to the mbnames mailboxes. |         # Add the folder to the mbnames mailboxes. | ||||||
|         mbnames.add(account.name, localrepos.getlocalroot(), |         mbnames.add(account.name, localrepos.getlocalroot(), | ||||||
|             localfolder.getname()) |             localfolder.getname()) | ||||||
| @@ -616,3 +646,5 @@ def syncfolder(account, remotefolder, quick): | |||||||
|             if folder in locals(): |             if folder in locals(): | ||||||
|                 locals()[folder].dropmessagelistcache() |                 locals()[folder].dropmessagelistcache() | ||||||
|         statusfolder.closefiles() |         statusfolder.closefiles() | ||||||
|  |         # Release the mutex of this sync transaction. | ||||||
|  |         release_mutex() | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Nicolas Sebrecht
					Nicolas Sebrecht