/head: changeset 65

Another work on syncing
This commit is contained in:
jgoerzen 2002-07-04 02:35:05 +01:00
parent 993f85bfa6
commit 45e6279680
5 changed files with 150 additions and 62 deletions

View File

@ -58,9 +58,19 @@ for account in accounts:
else: else:
passwords[account] = ui.getpass(accountname, config) passwords[account] = ui.getpass(accountname, config)
def syncitall(): mailboxes = []
mailboxes = [] mailboxlock = Lock()
for accountname in accounts: def addmailbox(accountname, remotefolder):
mailboxlock.acquire()
mailboxes.append({'accountname' : accountname,
'foldername': remotefolder.getvisiblename()})
mailboxlock.release()
def syncaccount(accountname):
# We don't need an account lock because syncitall() goes through
# each account once, then waits for all to finish.
accountsemaphore.acquire()
try:
ui.acct(accountname) ui.acct(accountname)
accountmetadata = os.path.join(metadatadir, accountname) accountmetadata = os.path.join(metadatadir, accountname)
if not os.path.exists(accountmetadata): if not os.path.exists(accountmetadata):
@ -133,7 +143,21 @@ def syncitall():
localfolder.syncmessagesto(statusfolder) localfolder.syncmessagesto(statusfolder)
statusfolder.save() statusfolder.save()
server.close() server.close()
finally:
accountsemaphore.release()
def syncitall():
mailboxes = [] # Reset.
threads = []
for accountname in accounts:
threadutil.semaphorewait(accountsemaphore)
thread = Thread(target = syncaccount,
name = "syncaccount %s" % accountname,
args = (accountname))
thread.start()
threads.append(thread)
# Wait for the threads to finish.
threadutil.threadreset(threads)
mbnames.genmbnames(config, mailboxes) mbnames.genmbnames(config, mailboxes)

View File

@ -23,6 +23,16 @@ class BaseFolder:
"""Returns name""" """Returns name"""
return self.name return self.name
def suggeststhreads(self):
"""Returns true if this folder suggests using threads for actions;
false otherwise. Probably only IMAP will return true."""
return 0
def waitforthread(self):
"""For threading folders, waits until there is a resource available
before firing off a thread. For all others, returns immediately."""
pass
def getvisiblename(self): def getvisiblename(self):
return self.name return self.name
@ -152,26 +162,40 @@ class BaseFolder:
# Did not find any server to take this message. Ignore. # Did not find any server to take this message. Ignore.
pass pass
def copymessageto(self, uid, applyto):
__main__.ui.copyingmessage(uid, self, applyto)
message = self.getmessage(uid)
flags = self.getmessageflags(uid)
for object in applyto:
newuid = object.savemessage(uid, message, flags)
if newuid > 0 and newuid != uid:
# Change the local uid.
self.savemessage(newuid, message, flags)
self.deletemessage(uid)
uid = newuid
def syncmessagesto_copy(self, dest, applyto): def syncmessagesto_copy(self, dest, applyto):
"""Pass 2 of folder synchronization. """Pass 2 of folder synchronization.
Look for messages present in self but not in dest. If any, add Look for messages present in self but not in dest. If any, add
them to dest.""" them to dest."""
threads = []
for uid in self.getmessagelist().keys(): for uid in self.getmessagelist().keys():
if uid < 0: # Ignore messages that pass 1 missed. if uid < 0: # Ignore messages that pass 1 missed.
continue continue
if not uid in dest.getmessagelist(): if not uid in dest.getmessagelist():
__main__.ui.copyingmessage(uid, self, applyto) if self.suggeststhreads():
message = self.getmessage(uid) self.waitforthread()
flags = self.getmessageflags(uid) thread = Thread(target = self.copymessageto,
for object in applyto: args = (uid, applyto))
newuid = object.savemessage(uid, message, flags) thread.start()
if newuid > 0 and newuid != uid: threads.append(thread)
# Change the local uid. else:
self.savemessage(newuid, message, flags) self.copymessageto(uid, applyto)
self.deletemessage(uid) for thread in threads:
uid = newuid thread.join()
def syncmessagesto_delete(self, dest, applyto): def syncmessagesto_delete(self, dest, applyto):
"""Pass 3 of folder synchronization. """Pass 3 of folder synchronization.

View File

@ -27,30 +27,43 @@ class IMAPFolder(BaseFolder):
self.root = imapserver.root self.root = imapserver.root
self.sep = imapserver.delim self.sep = imapserver.delim
self.imapserver = imapserver self.imapserver = imapserver
self.imapobj = self.imapserver.makeconnection()
self.messagelist = None self.messagelist = None
self.visiblename = visiblename self.visiblename = visiblename
def suggeststhreads(self):
return 1
def waitforthread(self):
self.imapserver.connectionwait()
def getvisiblename(self): def getvisiblename(self):
return self.visiblename return self.visiblename
def getuidvalidity(self): def getuidvalidity(self):
x = self.imapobj.status(self.getfullname(), '(UIDVALIDITY)')[1][0] imapobj = self.imapserver.acquireconnection()
try:
x = imapobj.status(self.getfullname(), '(UIDVALIDITY)')[1][0]
finally:
self.imapserver.releaseconnection(imapobj)
uidstring = imaputil.imapsplit(x)[1] uidstring = imaputil.imapsplit(x)[1]
return long(imaputil.flagsplit(uidstring)[1]) return long(imaputil.flagsplit(uidstring)[1])
def cachemessagelist(self): def cachemessagelist(self):
assert(self.imapobj.select(self.getfullname())[0] == 'OK') imapobj = self.imapserver.acquireconnection()
self.messagelist = {} try:
response = self.imapobj.status(self.getfullname(), '(MESSAGES)')[1][0] imapobj.select(self.getfullname())
result = imaputil.imapsplit(response)[1] self.messagelist = {}
maxmsgid = long(imaputil.flags2hash(result)['MESSAGES']) response = self.imapobj.status(self.getfullname(), '(MESSAGES)')[1][0]
if (maxmsgid < 1): result = imaputil.imapsplit(response)[1]
# No messages? return. maxmsgid = long(imaputil.flags2hash(result)['MESSAGES'])
return if (maxmsgid < 1):
# No messages? return.
return
# Now, get the flags and UIDs for these. # Now, get the flags and UIDs for these.
response = self.imapobj.fetch('1:%d' % maxmsgid, '(FLAGS UID)')[1] response = imapobj.fetch('1:%d' % maxmsgid, '(FLAGS UID)')[1]
finally:
self.imapserver.releaseconnection(imapobj)
for messagestr in response: for messagestr in response:
# Discard the message number. # Discard the message number.
messagestr = imaputil.imapsplit(messagestr)[1] messagestr = imaputil.imapsplit(messagestr)[1]
@ -63,51 +76,66 @@ class IMAPFolder(BaseFolder):
return self.messagelist return self.messagelist
def getmessage(self, uid): def getmessage(self, uid):
assert(self.imapobj.select(self.getfullname())[0] == 'OK') imapobj = self.imapserver.acquireconnection()
return self.imapobj.uid('fetch', '%d' % uid, '(BODY.PEEK[])')[1][0][1].replace("\r\n", "\n") try:
imapobj.select(self.getfullname())
return imapobj.uid('fetch', '%d' % uid, '(BODY.PEEK[])')[1][0][1].replace("\r\n", "\n")
finally:
self.imapserver.releaseconnection(imapobj)
def getmessageflags(self, uid): def getmessageflags(self, uid):
return self.getmessagelist()[uid]['flags'] return self.messagelist[uid]['flags']
def savemessage(self, uid, content, flags): def savemessage(self, uid, content, flags):
# This backend always assigns a new uid, so the uid arg is ignored. imapobj = self.imapserver.acquireconnection()
try:
# This backend always assigns a new uid, so the uid arg is ignored.
# In order to get the new uid, we need to save off the message ID.
# In order to get the new uid, we need to save off the message ID. message = rfc822.Message(StringIO(content))
mid = imapobj._quote(message.getheader('Message-Id'))
date = imaplib.Time2Internaldate(rfc822.parsedate(message.getheader('Date')))
message = rfc822.Message(StringIO(content)) if content.find("\r\n") == -1: # Convert line endings if not already
mid = self.imapobj._quote(message.getheader('Message-Id')) content = content.replace("\n", "\r\n")
date = imaplib.Time2Internaldate(rfc822.parsedate(message.getheader('Date')))
if content.find("\r\n") == -1: # Convert line endings if not already assert(imapobj.append(self.getfullname(),
content = content.replace("\n", "\r\n") imaputil.flagsmaildir2imap(flags),
date, content)[0] == 'OK')
assert(self.imapobj.append(self.getfullname(), # Checkpoint. Let it write out the messages, etc.
imaputil.flagsmaildir2imap(flags), assert(imapobj.check()[0] == 'OK')
date, content)[0] == 'OK') # Now find the UID it got.
# Checkpoint. Let it write out the messages, etc. matchinguids = imapobj.uid('search', None,
assert(self.imapobj.check()[0] == 'OK') '(HEADER Message-Id %s)' % mid)[1][0]
# Now find the UID it got. matchinguids = matchinguids.split(' ')
matchinguids = self.imapobj.uid('search', None, matchinguids.sort()
'(HEADER Message-Id %s)' % mid)[1][0] uid = long(matchinguids[-1])
matchinguids = matchinguids.split(' ') self.messagelist[uid] = {'uid': uid, 'flags': flags}
matchinguids.sort() return uid
uid = long(matchinguids[-1]) finally:
self.messagelist[uid] = {'uid': uid, 'flags': flags} self.imapserver.releaseconnection(imapobj)
return uid
def savemessageflags(self, uid, flags): def savemessageflags(self, uid, flags):
assert(self.imapobj.select(self.getfullname())[0] == 'OK') imapobj = self.imapserver.acquireconnection(imapobj)
result = self.imapobj.uid('store', '%d' % uid, 'FLAGS', try:
imaputil.flagsmaildir2imap(flags))[1][0] imapobj.select(self.getfullname())
result = imapobj.uid('store', '%d' % uid, 'FLAGS',
imaputil.flagsmaildir2imap(flags))[1][0]
finally:
self.imapserver.releaseconnection(imapobj)
flags = imaputil.flags2hash(imaputil.imapsplit(result)[1])['FLAGS'] flags = imaputil.flags2hash(imaputil.imapsplit(result)[1])['FLAGS']
self.messagelist[uid]['flags'] = imaputil.flagsimap2maildir(flags) self.messagelist[uid]['flags'] = imaputil.flagsimap2maildir(flags)
def addmessagesflags(self, uidlist, flags): def addmessagesflags(self, uidlist, flags):
assert(self.imapobj.select(self.getfullname())[0] == 'OK') imapobj = self.imapserver.acquireconnection(imapobj)
r = self.imapobj.uid('store', try:
','.join([str(uid) for uid in uidlist]), imapobj.select(self.getfullname())
'+FLAGS', r = imapobj.uid('store',
imaputil.flagsmaildir2imap(flags))[1] ','.join([str(uid) for uid in uidlist]),
'+FLAGS',
imaputil.flagsmaildir2imap(flags))[1]
finally:
self.imapserver.releaseconnection(imapobj)
resultcount = 0 resultcount = 0
for result in r: for result in r:
resultcount += 1 resultcount += 1
@ -124,10 +152,14 @@ class IMAPFolder(BaseFolder):
uidlist = [uid for uid in uidlist if uid in self.messagelist] uidlist = [uid for uid in uidlist if uid in self.messagelist]
if not len(uidlist): if not len(uidlist):
return return
self.addmessagesflags(uidlist, ['T']) self.addmessagesflags(uidlist, ['T'])
assert(self.imapobj.select(self.getfullname())[0] == 'OK') imapobj = self.imapserver.acquireconnection()
assert(self.imapobj.expunge()[0] == 'OK') try:
imapobj.select(self.getfullname())
assert(imapobj.expunge()[0] == 'OK')
finally:
self.imapserver.releaseconnection(imapobj)
for uid in uidlist: for uid in uidlist:
del(self.messagelist[uid]) del(self.messagelist[uid])

View File

@ -28,6 +28,7 @@ class UsefulIMAPMixIn:
return None return None
def select(self, mailbox='INBOX', readonly=None): def select(self, mailbox='INBOX', readonly=None):
print "Mixin select"
if self.getselectedfolder() == mailbox and not readonly: if self.getselectedfolder() == mailbox and not readonly:
# No change; return. # No change; return.
return return

View File

@ -16,9 +16,9 @@
# along with this program; if not, write to the Free Software # along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
from Threading import * from threading import *
def semaphorewait(semaphore, originalstate): def semaphorereset(semaphore, originalstate):
"""Wait until the semaphore gets back to its original state -- all acquired """Wait until the semaphore gets back to its original state -- all acquired
resources released.""" resources released."""
for i in range(originalstate): for i in range(originalstate):
@ -27,3 +27,10 @@ def semaphorewait(semaphore, originalstate):
for i in range(originalstate): for i in range(originalstate):
semaphore.release() semaphore.release()
def semaphorewait(semaphore):
semaphore.acquire()
semaphore.release()
def threadsreset(threadlist):
for thread in threadlist:
thread.join()