diff --git a/head/offlineimap.py b/head/offlineimap.py index 6440dea..b416a09 100644 --- a/head/offlineimap.py +++ b/head/offlineimap.py @@ -58,9 +58,19 @@ for account in accounts: else: passwords[account] = ui.getpass(accountname, config) -def syncitall(): - mailboxes = [] - for accountname in accounts: +mailboxes = [] +mailboxlock = Lock() +def addmailbox(accountname, remotefolder): + mailboxlock.acquire() + mailboxes.append({'accountname' : accountname, + 'foldername': remotefolder.getvisiblename()}) + mailboxlock.release() + +def syncaccount(accountname): + # We don't need an account lock because syncitall() goes through + # each account once, then waits for all to finish. + accountsemaphore.acquire() + try: ui.acct(accountname) accountmetadata = os.path.join(metadatadir, accountname) if not os.path.exists(accountmetadata): @@ -133,7 +143,21 @@ def syncitall(): localfolder.syncmessagesto(statusfolder) statusfolder.save() server.close() + finally: + accountsemaphore.release() +def syncitall(): + mailboxes = [] # Reset. + threads = [] + for accountname in accounts: + threadutil.semaphorewait(accountsemaphore) + thread = Thread(target = syncaccount, + name = "syncaccount %s" % accountname, + args = (accountname)) + thread.start() + threads.append(thread) + # Wait for the threads to finish. + threadutil.threadreset(threads) mbnames.genmbnames(config, mailboxes) diff --git a/head/offlineimap/folder/Base.py b/head/offlineimap/folder/Base.py index c50603e..726fca0 100644 --- a/head/offlineimap/folder/Base.py +++ b/head/offlineimap/folder/Base.py @@ -23,6 +23,16 @@ class BaseFolder: """Returns name""" return self.name + def suggeststhreads(self): + """Returns true if this folder suggests using threads for actions; + false otherwise. Probably only IMAP will return true.""" + return 0 + + def waitforthread(self): + """For threading folders, waits until there is a resource available + before firing off a thread. For all others, returns immediately.""" + pass + def getvisiblename(self): return self.name @@ -152,26 +162,40 @@ class BaseFolder: # Did not find any server to take this message. Ignore. pass + def copymessageto(self, uid, applyto): + __main__.ui.copyingmessage(uid, self, applyto) + message = self.getmessage(uid) + flags = self.getmessageflags(uid) + for object in applyto: + newuid = object.savemessage(uid, message, flags) + if newuid > 0 and newuid != uid: + # Change the local uid. + self.savemessage(newuid, message, flags) + self.deletemessage(uid) + uid = newuid + + def syncmessagesto_copy(self, dest, applyto): """Pass 2 of folder synchronization. Look for messages present in self but not in dest. If any, add them to dest.""" + threads = [] for uid in self.getmessagelist().keys(): if uid < 0: # Ignore messages that pass 1 missed. continue if not uid in dest.getmessagelist(): - __main__.ui.copyingmessage(uid, self, applyto) - message = self.getmessage(uid) - flags = self.getmessageflags(uid) - for object in applyto: - newuid = object.savemessage(uid, message, flags) - if newuid > 0 and newuid != uid: - # Change the local uid. - self.savemessage(newuid, message, flags) - self.deletemessage(uid) - uid = newuid + if self.suggeststhreads(): + self.waitforthread() + thread = Thread(target = self.copymessageto, + args = (uid, applyto)) + thread.start() + threads.append(thread) + else: + self.copymessageto(uid, applyto) + for thread in threads: + thread.join() def syncmessagesto_delete(self, dest, applyto): """Pass 3 of folder synchronization. diff --git a/head/offlineimap/folder/IMAP.py b/head/offlineimap/folder/IMAP.py index 0bcd905..dfd4682 100644 --- a/head/offlineimap/folder/IMAP.py +++ b/head/offlineimap/folder/IMAP.py @@ -27,30 +27,43 @@ class IMAPFolder(BaseFolder): self.root = imapserver.root self.sep = imapserver.delim self.imapserver = imapserver - self.imapobj = self.imapserver.makeconnection() self.messagelist = None self.visiblename = visiblename + def suggeststhreads(self): + return 1 + + def waitforthread(self): + self.imapserver.connectionwait() + def getvisiblename(self): return self.visiblename def getuidvalidity(self): - x = self.imapobj.status(self.getfullname(), '(UIDVALIDITY)')[1][0] + imapobj = self.imapserver.acquireconnection() + try: + x = imapobj.status(self.getfullname(), '(UIDVALIDITY)')[1][0] + finally: + self.imapserver.releaseconnection(imapobj) uidstring = imaputil.imapsplit(x)[1] return long(imaputil.flagsplit(uidstring)[1]) def cachemessagelist(self): - assert(self.imapobj.select(self.getfullname())[0] == 'OK') - self.messagelist = {} - response = self.imapobj.status(self.getfullname(), '(MESSAGES)')[1][0] - result = imaputil.imapsplit(response)[1] - maxmsgid = long(imaputil.flags2hash(result)['MESSAGES']) - if (maxmsgid < 1): - # No messages? return. - return + imapobj = self.imapserver.acquireconnection() + try: + imapobj.select(self.getfullname()) + self.messagelist = {} + response = self.imapobj.status(self.getfullname(), '(MESSAGES)')[1][0] + result = imaputil.imapsplit(response)[1] + maxmsgid = long(imaputil.flags2hash(result)['MESSAGES']) + if (maxmsgid < 1): + # No messages? return. + return - # Now, get the flags and UIDs for these. - response = self.imapobj.fetch('1:%d' % maxmsgid, '(FLAGS UID)')[1] + # Now, get the flags and UIDs for these. + response = imapobj.fetch('1:%d' % maxmsgid, '(FLAGS UID)')[1] + finally: + self.imapserver.releaseconnection(imapobj) for messagestr in response: # Discard the message number. messagestr = imaputil.imapsplit(messagestr)[1] @@ -63,51 +76,66 @@ class IMAPFolder(BaseFolder): return self.messagelist def getmessage(self, uid): - assert(self.imapobj.select(self.getfullname())[0] == 'OK') - return self.imapobj.uid('fetch', '%d' % uid, '(BODY.PEEK[])')[1][0][1].replace("\r\n", "\n") + imapobj = self.imapserver.acquireconnection() + try: + imapobj.select(self.getfullname()) + return imapobj.uid('fetch', '%d' % uid, '(BODY.PEEK[])')[1][0][1].replace("\r\n", "\n") + finally: + self.imapserver.releaseconnection(imapobj) def getmessageflags(self, uid): - return self.getmessagelist()[uid]['flags'] + return self.messagelist[uid]['flags'] def savemessage(self, uid, content, flags): - # This backend always assigns a new uid, so the uid arg is ignored. + imapobj = self.imapserver.acquireconnection() + try: + # This backend always assigns a new uid, so the uid arg is ignored. + # In order to get the new uid, we need to save off the message ID. - # In order to get the new uid, we need to save off the message ID. + message = rfc822.Message(StringIO(content)) + mid = imapobj._quote(message.getheader('Message-Id')) + date = imaplib.Time2Internaldate(rfc822.parsedate(message.getheader('Date'))) - message = rfc822.Message(StringIO(content)) - mid = self.imapobj._quote(message.getheader('Message-Id')) - date = imaplib.Time2Internaldate(rfc822.parsedate(message.getheader('Date'))) + if content.find("\r\n") == -1: # Convert line endings if not already + content = content.replace("\n", "\r\n") - if content.find("\r\n") == -1: # Convert line endings if not already - content = content.replace("\n", "\r\n") - - assert(self.imapobj.append(self.getfullname(), - imaputil.flagsmaildir2imap(flags), - date, content)[0] == 'OK') - # Checkpoint. Let it write out the messages, etc. - assert(self.imapobj.check()[0] == 'OK') - # Now find the UID it got. - matchinguids = self.imapobj.uid('search', None, - '(HEADER Message-Id %s)' % mid)[1][0] - matchinguids = matchinguids.split(' ') - matchinguids.sort() - uid = long(matchinguids[-1]) - self.messagelist[uid] = {'uid': uid, 'flags': flags} - return uid + assert(imapobj.append(self.getfullname(), + imaputil.flagsmaildir2imap(flags), + date, content)[0] == 'OK') + # Checkpoint. Let it write out the messages, etc. + assert(imapobj.check()[0] == 'OK') + # Now find the UID it got. + matchinguids = imapobj.uid('search', None, + '(HEADER Message-Id %s)' % mid)[1][0] + matchinguids = matchinguids.split(' ') + matchinguids.sort() + uid = long(matchinguids[-1]) + self.messagelist[uid] = {'uid': uid, 'flags': flags} + return uid + finally: + self.imapserver.releaseconnection(imapobj) def savemessageflags(self, uid, flags): - assert(self.imapobj.select(self.getfullname())[0] == 'OK') - result = self.imapobj.uid('store', '%d' % uid, 'FLAGS', - imaputil.flagsmaildir2imap(flags))[1][0] + imapobj = self.imapserver.acquireconnection(imapobj) + try: + imapobj.select(self.getfullname()) + result = imapobj.uid('store', '%d' % uid, 'FLAGS', + imaputil.flagsmaildir2imap(flags))[1][0] + finally: + self.imapserver.releaseconnection(imapobj) flags = imaputil.flags2hash(imaputil.imapsplit(result)[1])['FLAGS'] self.messagelist[uid]['flags'] = imaputil.flagsimap2maildir(flags) def addmessagesflags(self, uidlist, flags): - assert(self.imapobj.select(self.getfullname())[0] == 'OK') - r = self.imapobj.uid('store', - ','.join([str(uid) for uid in uidlist]), - '+FLAGS', - imaputil.flagsmaildir2imap(flags))[1] + imapobj = self.imapserver.acquireconnection(imapobj) + try: + imapobj.select(self.getfullname()) + r = imapobj.uid('store', + ','.join([str(uid) for uid in uidlist]), + '+FLAGS', + imaputil.flagsmaildir2imap(flags))[1] + finally: + self.imapserver.releaseconnection(imapobj) resultcount = 0 for result in r: resultcount += 1 @@ -124,10 +152,14 @@ class IMAPFolder(BaseFolder): uidlist = [uid for uid in uidlist if uid in self.messagelist] if not len(uidlist): return - + self.addmessagesflags(uidlist, ['T']) - assert(self.imapobj.select(self.getfullname())[0] == 'OK') - assert(self.imapobj.expunge()[0] == 'OK') + imapobj = self.imapserver.acquireconnection() + try: + imapobj.select(self.getfullname()) + assert(imapobj.expunge()[0] == 'OK') + finally: + self.imapserver.releaseconnection(imapobj) for uid in uidlist: del(self.messagelist[uid]) diff --git a/head/offlineimap/imapserver.py b/head/offlineimap/imapserver.py index 3a89d03..c7025f6 100644 --- a/head/offlineimap/imapserver.py +++ b/head/offlineimap/imapserver.py @@ -28,6 +28,7 @@ class UsefulIMAPMixIn: return None def select(self, mailbox='INBOX', readonly=None): + print "Mixin select" if self.getselectedfolder() == mailbox and not readonly: # No change; return. return diff --git a/head/offlineimap/threadutil.py b/head/offlineimap/threadutil.py index 9b047f4..7ad5a10 100644 --- a/head/offlineimap/threadutil.py +++ b/head/offlineimap/threadutil.py @@ -16,9 +16,9 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -from Threading import * +from threading import * -def semaphorewait(semaphore, originalstate): +def semaphorereset(semaphore, originalstate): """Wait until the semaphore gets back to its original state -- all acquired resources released.""" for i in range(originalstate): @@ -27,3 +27,10 @@ def semaphorewait(semaphore, originalstate): for i in range(originalstate): semaphore.release() +def semaphorewait(semaphore): + semaphore.acquire() + semaphore.release() + +def threadsreset(threadlist): + for thread in threadlist: + thread.join()