/head: changeset 69

More updates
This commit is contained in:
jgoerzen 2002-07-04 04:59:19 +01:00
parent 8efda69ef0
commit 1549691ec7
5 changed files with 67 additions and 18 deletions

View File

@ -18,6 +18,7 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
from offlineimap import imaplib, imaputil, imapserver, repository, folder, mbnames, threadutil from offlineimap import imaplib, imaputil, imapserver, repository, folder, mbnames, threadutil
from offlineimap.threadutil import InstanceLimitedThread
import re, os, os.path, offlineimap, sys import re, os, os.path, offlineimap, sys
from ConfigParser import ConfigParser from ConfigParser import ConfigParser
from threading import * from threading import *
@ -47,7 +48,9 @@ server = None
remoterepos = None remoterepos = None
localrepos = None localrepos = None
passwords = {} passwords = {}
accountsemaphore = BoundedSemaphore(config.getint("general", "maxsyncaccounts"))
threadutil.initInstanceLimit("ACCOUNTLIMIT", config.getint("general",
"maxsyncaccounts"))
# We have to gather passwords here -- don't want to have two threads # We have to gather passwords here -- don't want to have two threads
# asking for passwords simultaneously. # asking for passwords simultaneously.
@ -57,6 +60,9 @@ for account in accounts:
passwords[account] = config.get(account, "remotepass") passwords[account] = config.get(account, "remotepass")
else: else:
passwords[account] = ui.getpass(account, config) passwords[account] = ui.getpass(account, config)
for instancename in ["FOLDER_" + account, "MSGCOPY_" + account]:
threadutil.initInstanceLimit(instancename,
config.getint(account, "maxconnections"))
mailboxes = [] mailboxes = []
mailboxlock = Lock() mailboxlock = Lock()
@ -70,7 +76,6 @@ def syncaccount(accountname, *args):
print args print args
# We don't need an account lock because syncitall() goes through # We don't need an account lock because syncitall() goes through
# each account once, then waits for all to finish. # each account once, then waits for all to finish.
accountsemaphore.acquire()
try: try:
ui.acct(accountname) ui.acct(accountname)
accountmetadata = os.path.join(metadatadir, accountname) accountmetadata = os.path.join(metadatadir, accountname)
@ -101,17 +106,19 @@ def syncaccount(accountname, *args):
folderthreads = [] folderthreads = []
for remotefolder in remoterepos.getfolders(): for remotefolder in remoterepos.getfolders():
server.connectionwait() server.connectionwait()
thread = Thread(target = syncfolder, thread = InstanceLimitedThread(\
name = "syncfolder-%s-%s" % \ instancename = 'FOLDER_' + accountname,
(accountname, remotefolder.getvisiblename()), target = syncfolder,
args = (accountname, remoterepos, name = "syncfolder-%s-%s" % \
remotefolder, localrepos, statusrepos)) (accountname, remotefolder.getvisiblename()),
args = (accountname, remoterepos, remotefolder, localrepos,
statusrepos))
thread.start() thread.start()
folderthreads.append(thread) folderthreads.append(thread)
threadutil.threadsreset(folderthreads) threadutil.threadsreset(folderthreads)
server.close() server.close()
finally: finally:
accountsemaphore.release() pass
def syncfolder(accountname, remoterepos, remotefolder, localrepos, def syncfolder(accountname, remoterepos, remotefolder, localrepos,
statusrepos): statusrepos):
@ -166,10 +173,10 @@ def syncitall():
mailboxes = [] # Reset. mailboxes = [] # Reset.
threads = [] threads = []
for accountname in accounts: for accountname in accounts:
threadutil.semaphorewait(accountsemaphore) thread = InstanceLimitedThread(instancename = 'ACCOUNTLIMIT',
thread = Thread(target = syncaccount, target = syncaccount,
name = "syncaccount-%s" % accountname, name = "syncaccount-%s" % accountname,
args = (accountname,)) args = (accountname,))
thread.start() thread.start()
threads.append(thread) threads.append(thread)
# Wait for the threads to finish. # Wait for the threads to finish.

View File

@ -18,6 +18,8 @@
import __main__ import __main__
from threading import * from threading import *
from offlineimap import threadutil
from offlineimap.threadutil import InstanceLimitedThread
class BaseFolder: class BaseFolder:
def getname(self): def getname(self):
@ -34,6 +36,11 @@ class BaseFolder:
before firing off a thread. For all others, returns immediately.""" before firing off a thread. For all others, returns immediately."""
pass pass
def getcopyinstancelimit(self):
"""For threading folders, returns the instancelimitname for
InstanceLimitedThreads."""
raise NotImplementedException
def getvisiblename(self): def getvisiblename(self):
return self.name return self.name
@ -189,8 +196,10 @@ class BaseFolder:
if not uid in dest.getmessagelist(): if not uid in dest.getmessagelist():
if self.suggeststhreads(): if self.suggeststhreads():
self.waitforthread() self.waitforthread()
thread = Thread(target = self.copymessageto, thread = InstanceLimitedThread(\
args = (uid, applyto)) self.getcopyinstancelimit(),
target = self.copymessageto,
args = (uid, applyto))
thread.start() thread.start()
threads.append(thread) threads.append(thread)
else: else:

View File

@ -22,13 +22,14 @@ import rfc822
from StringIO import StringIO from StringIO import StringIO
class IMAPFolder(BaseFolder): class IMAPFolder(BaseFolder):
def __init__(self, imapserver, name, visiblename): def __init__(self, imapserver, name, visiblename, accountname):
self.name = imaputil.dequote(name) self.name = imaputil.dequote(name)
self.root = imapserver.root self.root = imapserver.root
self.sep = imapserver.delim self.sep = imapserver.delim
self.imapserver = imapserver self.imapserver = imapserver
self.messagelist = None self.messagelist = None
self.visiblename = visiblename self.visiblename = visiblename
self.accountname = accountname
def suggeststhreads(self): def suggeststhreads(self):
return 1 return 1
@ -36,6 +37,9 @@ class IMAPFolder(BaseFolder):
def waitforthread(self): def waitforthread(self):
self.imapserver.connectionwait() self.imapserver.connectionwait()
def getcopyinstancelimit(self):
return 'MSGCOPY_' + self.accountname
def getvisiblename(self): def getvisiblename(self):
return self.visiblename return self.visiblename
@ -116,7 +120,7 @@ class IMAPFolder(BaseFolder):
self.imapserver.releaseconnection(imapobj) self.imapserver.releaseconnection(imapobj)
def savemessageflags(self, uid, flags): def savemessageflags(self, uid, flags):
imapobj = self.imapserver.acquireconnection(imapobj) imapobj = self.imapserver.acquireconnection()
try: try:
imapobj.select(self.getfullname()) imapobj.select(self.getfullname())
result = imapobj.uid('store', '%d' % uid, 'FLAGS', result = imapobj.uid('store', '%d' % uid, 'FLAGS',

View File

@ -38,7 +38,8 @@ class IMAPRepository(BaseRepository):
def getfolder(self, foldername): def getfolder(self, foldername):
return folder.IMAP.IMAPFolder(self.imapserver, foldername, return folder.IMAP.IMAPFolder(self.imapserver, foldername,
self.nametrans(foldername)) self.nametrans(foldername),
accountname)
def getfolders(self): def getfolders(self):
if self.folders != None: if self.folders != None:
@ -54,7 +55,8 @@ class IMAPRepository(BaseRepository):
if '\\Noselect' in imaputil.flagsplit(flags): if '\\Noselect' in imaputil.flagsplit(flags):
continue continue
retval.append(folder.IMAP.IMAPFolder(self.imapserver, name, retval.append(folder.IMAP.IMAPFolder(self.imapserver, name,
self.nametrans(imaputil.dequote(name)))) self.nametrans(imaputil.dequote(name)),
self.accountname))
retval.sort(lambda x, y: cmp(x.getvisiblename(), y.getvisiblename())) retval.sort(lambda x, y: cmp(x.getvisiblename(), y.getvisiblename()))
self.folders = retval self.folders = retval
return retval return retval

View File

@ -34,3 +34,30 @@ def semaphorewait(semaphore):
def threadsreset(threadlist): def threadsreset(threadlist):
for thread in threadlist: for thread in threadlist:
thread.join() thread.join()
instancelimitedsems = {}
instancelimitedlock = Lock()
def initInstanceLimit(instancename, instancemax):
instancelimitedlock.acquire()
if not instancelimitedsems.has_key(instancename):
instancelimitedsems[instancename] = BoundedSemaphore(instancemax)
instancelimitedlock.release()
class InstanceLimitedThread(Thread):
def __init__(self, instancename, *args, **kwargs):
self.instancename = instancename
apply(Thread.__init__, (self,) + args, kwargs)
def start(self):
instancelimitedsems[self.instancename].acquire()
Thread.start(self)
def run(self):
try:
Thread.run(self)
finally:
instancelimitedsems[self.instancename].release()