/head: changeset 78

Revamped thread system to handle exceptions better
This commit is contained in:
jgoerzen 2002-07-05 03:34:39 +01:00
parent 2486a7796c
commit daf4fd99ac
6 changed files with 185 additions and 16 deletions

View File

@ -1,3 +1,9 @@
offlineimap (2.0.1) unstable; urgency=low
* Fixed a bug with not properly propogating foldersep changes.
-- John Goerzen <jgoerzen@complete.org> Thu, 4 Jul 2002 09:07:06 -0500
offlineimap (2.0.0) unstable; urgency=low offlineimap (2.0.0) unstable; urgency=low
* This code is now multithreaded. New config file options control the * This code is now multithreaded. New config file options control the

View File

@ -18,7 +18,7 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
from offlineimap import imaplib, imaputil, imapserver, repository, folder, mbnames, threadutil from offlineimap import imaplib, imaputil, imapserver, repository, folder, mbnames, threadutil
from offlineimap.threadutil import InstanceLimitedThread from offlineimap.threadutil import InstanceLimitedThread, ExitNotifyThread
import re, os, os.path, offlineimap, sys import re, os, os.path, offlineimap, sys
from ConfigParser import ConfigParser from ConfigParser import ConfigParser
from threading import * from threading import *
@ -111,6 +111,7 @@ def syncaccount(accountname, *args):
(accountname, remotefolder.getvisiblename()), (accountname, remotefolder.getvisiblename()),
args = (accountname, remoterepos, remotefolder, localrepos, args = (accountname, remoterepos, remotefolder, localrepos,
statusrepos)) statusrepos))
thread.setDaemon(1)
thread.start() thread.start()
folderthreads.append(thread) folderthreads.append(thread)
threadutil.threadsreset(folderthreads) threadutil.threadsreset(folderthreads)
@ -178,18 +179,45 @@ def syncitall():
target = syncaccount, target = syncaccount,
name = "syncaccount-%s" % accountname, name = "syncaccount-%s" % accountname,
args = (accountname,)) args = (accountname,))
thread.setDaemon(1)
thread.start() thread.start()
threads.append(thread) threads.append(thread)
# Wait for the threads to finish. # Wait for the threads to finish.
threadutil.threadsreset(threads) threadutil.threadsreset(threads)
mbnames.genmbnames(config, mailboxes) mbnames.genmbnames(config, mailboxes)
syncitall() def sync_with_timer():
if config.has_option('general', 'autorefresh'): currentThread().setExitMessage('SYNC_WITH_TIMER_TERMINATE')
refreshperiod = config.getint('general', 'autorefresh') * 60 syncitall()
while 1: if config.has_option('general', 'autorefresh'):
if ui.sleep(refreshperiod) == 2: refreshperiod = config.getint('general', 'autorefresh') * 60
break while 1:
else: if ui.sleep(refreshperiod) == 2:
syncitall() break
else:
syncitall()
def threadexited(thread):
if thread.getExitCause() == 'EXCEPTION':
ui.threadException(thread) # Expected to terminate
sys.exit(100) # Just in case...
os._exit(100)
elif thread.getExitMessage() == 'SYNC_WITH_TIMER_TERMINATE':
ui.terminate()
# Just in case...
sys.exit(100)
os._exit(100)
else:
ui.threadExited(thread)
threadutil.initexitnotify()
t = ExitNotifyThread(target=sync_with_timer, name='sync_with_timer')
t.setDaemon(1)
t.start()
try:
threadutil.exitnotifymonitorloop(threadexited)
except:
ui.mainException() # Also expected to terminate.

View File

@ -200,6 +200,7 @@ class BaseFolder:
self.getcopyinstancelimit(), self.getcopyinstancelimit(),
target = self.copymessageto, target = self.copymessageto,
args = (uid, applyto)) args = (uid, applyto))
thread.setDaemon(1)
thread.start() thread.start()
threads.append(thread) threads.append(thread)
else: else:

View File

@ -17,6 +17,12 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
from threading import * from threading import *
from StringIO import StringIO
import sys, traceback
######################################################################
# General utilities
######################################################################
def semaphorereset(semaphore, originalstate): def semaphorereset(semaphore, originalstate):
"""Wait until the semaphore gets back to its original state -- all acquired """Wait until the semaphore gets back to its original state -- all acquired
@ -35,28 +41,121 @@ def threadsreset(threadlist):
for thread in threadlist: for thread in threadlist:
thread.join() thread.join()
######################################################################
# Exit-notify threads
######################################################################
exitcondition = Condition(Lock())
exitthread = None
inited = 0
def initexitnotify():
"""Initialize the exit notify system. This MUST be called from the
SAME THREAD that will call monitorloop BEFORE it calls monitorloop.
This SHOULD be called before the main thread starts any other
ExitNotifyThreads, or else it may miss the ability to catch the exit
status from them!"""
exitcondition.acquire()
def exitnotifymonitorloop(callback):
"""Enter an infinite "monitoring" loop. The argument, callback,
defines the function to call when an ExitNotifyThread has terminated.
That function is called with a single argument -- the ExitNotifyThread
that has terminated. The monitor will not continue to monitor for
other threads until the function returns, so if it intends to perform
long calculations, it should start a new thread itself -- but NOT
an ExitNotifyThread, or else an infinite loop may result. Furthermore,
the monitor will hold the lock all the while the other thread is waiting.
"""
global exitcondition, exitthread
while 1: # Loop forever.
while exitthread == None:
exitcondition.wait(1)
callback(exitthread)
exitthread = None
class ExitNotifyThread(Thread):
"""This class is designed to alert a "monitor" to the fact that a thread has
exited and to provide for the ability for it to find out why."""
def run(self):
global exitcondition, exitthread
try:
Thread.run(self)
except:
self.setExitCause('EXCEPTION')
self.setExitException(sys.exc_info()[1])
sbuf = StringIO()
traceback.print_exc(file = sbuf)
self.setExitStackTrace(sbuf.getvalue())
else:
self.setExitCause('NORMAL')
if not hasattr(self, 'exitmessage'):
self.setExitMessage(None)
exitcondition.acquire()
exitthread = self
exitcondition.notify()
exitcondition.release()
def setExitCause(self, cause):
self.exitcause = cause
def getExitCause(self):
"""Returns the cause of the exit, one of:
'EXCEPTION' -- the thread aborted because of an exception
'NORMAL' -- normal termination."""
return self.exitcause
def setExitException(self, exc):
self.exitexception = exc
def getExitException(self):
"""If getExitCause() is 'EXCEPTION', holds the value from
sys.exc_info()[1] for this exception."""
return self.exitexception
def setExitStackTrace(self, st):
self.exitstacktrace = st
def getExitStackTrace(self):
"""If getExitCause() is 'EXCEPTION', returns a string representing
the stack trace for this exception."""
return self.exitstacktrace
def setExitMessage(self, msg):
"""Sets the exit message to be fetched by a subsequent call to
getExitMessage. This message may be any object or type except
None."""
self.exitmessage = msg
def getExitMessage(self):
"""For any exit cause, returns the message previously set by
a call to setExitMessage(), or None if there was no such message
set."""
return self.exitmessage
######################################################################
# Instance-limited threads
######################################################################
instancelimitedsems = {} instancelimitedsems = {}
instancelimitedlock = Lock() instancelimitedlock = Lock()
def initInstanceLimit(instancename, instancemax): def initInstanceLimit(instancename, instancemax):
"""Initialize the instance-limited thread implementation to permit
up to intancemax threads with the given instancename."""
instancelimitedlock.acquire() instancelimitedlock.acquire()
if not instancelimitedsems.has_key(instancename): if not instancelimitedsems.has_key(instancename):
instancelimitedsems[instancename] = BoundedSemaphore(instancemax) instancelimitedsems[instancename] = BoundedSemaphore(instancemax)
instancelimitedlock.release() instancelimitedlock.release()
class InstanceLimitedThread(Thread): class InstanceLimitedThread(ExitNotifyThread):
def __init__(self, instancename, *args, **kwargs): def __init__(self, instancename, *args, **kwargs):
self.instancename = instancename self.instancename = instancename
apply(Thread.__init__, (self,) + args, kwargs) apply(ExitNotifyThread.__init__, (self,) + args, kwargs)
def start(self): def start(self):
instancelimitedsems[self.instancename].acquire() instancelimitedsems[self.instancename].acquire()
Thread.start(self) ExitNotifyThread.start(self)
def run(self): def run(self):
try: try:
Thread.run(self) ExitNotifyThread.run(self)
finally: finally:
instancelimitedsems[self.instancename].release() instancelimitedsems[self.instancename].release()

View File

@ -1,10 +1,12 @@
from UIBase import UIBase from UIBase import UIBase
from getpass import getpass from getpass import getpass
import select, sys import select, sys
from threading import *
class TTYUI(UIBase): class TTYUI(UIBase):
def __init__(self, verbose = 0): def __init__(self, verbose = 0):
self.verbose = 0 self.verbose = 0
self.iswaiting = 0
def _msg(s, msg): def _msg(s, msg):
print msg print msg
@ -28,11 +30,19 @@ class TTYUI(UIBase):
UIBase.messagelistloaded(s, repos, folder, count) UIBase.messagelistloaded(s, repos, folder, count)
def sleep(s, sleepsecs): def sleep(s, sleepsecs):
s.iswaiting = 1
try: try:
UIBase.sleep(s, sleepsecs) UIBase.sleep(s, sleepsecs)
except KeyboardInterrupt: finally:
s.iswaiting = 0
def mainException(s):
if isinstance(sys.exc_info()[1], KeyboardInterrupt) and \
s.iswaiting:
sys.stdout.write("Timer interrupted at user request; program terminating. \n") sys.stdout.write("Timer interrupted at user request; program terminating. \n")
return 2 s.terminate()
else:
UIBase.mainException(s)
def sleeping(s, sleepsecs, remainingsecs): def sleeping(s, sleepsecs, remainingsecs):
if remainingsecs > 0: if remainingsecs > 0:

View File

@ -18,7 +18,8 @@
from offlineimap import repository from offlineimap import repository
import offlineimap.version import offlineimap.version
import re, time import re, time, sys, traceback
from StringIO import StringIO
class UIBase: class UIBase:
################################################## UTILS ################################################## UTILS
@ -108,6 +109,30 @@ class UIBase:
s._msg("Deleting flags %s to message %d on %s" % \ s._msg("Deleting flags %s to message %d on %s" % \
(", ".join(flags), uid, ds)) (", ".join(flags), uid, ds))
################################################## Threads
def threadException(s, thread):
"""Called when a thread has terminated with an exception.
The argument is the ExitNotifyThread that has so terminated."""
s._msg("Thread '%s' terminated with exception:\n%s" % \
(thread.getName(), thread.getExitStackTrace()))
s.terminate(100)
def mainException(s):
sbuf = StringIO()
traceback.print_exc(file = sbuf)
s._msg("Main program terminated with exception:\n" +
sbuf.getvalue())
def terminate(s, exitstatus = 0):
"""Called to terminate the application."""
sys.exit(exitstatus)
def threadExited(s, thread):
"""Called when a thread has exited normally. Many UIs will
just ignore this."""
pass
################################################## Other ################################################## Other
def sleep(s, sleepsecs): def sleep(s, sleepsecs):