230 lines
8.2 KiB
Python
Raw Normal View History

# Copyright (C) 2002-2011 John Goerzen & contributors
# Thread support module
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
2006-08-12 05:15:55 +01:00
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
from threading import Lock, Thread, BoundedSemaphore
from Queue import Queue, Empty
import traceback
from thread import get_ident # python < 2.6 support
import os.path
import sys
from offlineimap.ui import getglobalui
######################################################################
# General utilities
######################################################################
def semaphorereset(semaphore, originalstate):
"""Wait until the semaphore gets back to its original state -- all acquired
resources released."""
for i in range(originalstate):
semaphore.acquire()
# Now release these.
for i in range(originalstate):
semaphore.release()
class threadlist:
def __init__(self):
self.lock = Lock()
self.list = []
def add(self, thread):
self.lock.acquire()
try:
self.list.append(thread)
finally:
self.lock.release()
def remove(self, thread):
self.lock.acquire()
try:
self.list.remove(thread)
finally:
self.lock.release()
def pop(self):
self.lock.acquire()
try:
if not len(self.list):
return None
return self.list.pop()
finally:
self.lock.release()
def reset(self):
while 1:
thread = self.pop()
if not thread:
return
thread.join()
######################################################################
# Exit-notify threads
######################################################################
exitthreads = Queue(100)
def exitnotifymonitorloop(callback):
"""An infinite "monitoring" loop watching for finished ExitNotifyThread's.
:param callback: the function to call when a thread terminated. That
function is called with a single argument -- the
ExitNotifyThread that has terminated. The monitor will
not continue to monitor for other threads until
'callback' returns, so if it intends to perform long
calculations, it should start a new thread itself -- but
NOT an ExitNotifyThread, or else an infinite loop
may result.
Furthermore, the monitor will hold the lock all the
while the other thread is waiting.
:type callback: a callable function
"""
global exitthreads
while 1:
# Loop forever and call 'callback' for each thread that exited
try:
# we need a timeout in the get() call, so that ctrl-c can throw
# a SIGINT (http://bugs.python.org/issue1360). A timeout with empty
# Queue will raise `Empty`.
thrd = exitthreads.get(True, 60)
callback(thrd)
except Empty:
pass
def threadexited(thread):
"""Called when a thread exits."""
ui = getglobalui()
if thread.getExitCause() == 'EXCEPTION':
if isinstance(thread.getExitException(), SystemExit):
# Bring a SystemExit into the main thread.
# Do not send it back to UI layer right now.
# Maybe later send it to ui.terminate?
raise SystemExit
ui.threadException(thread) # Expected to terminate
sys.exit(100) # Just in case...
elif thread.getExitMessage() == 'SYNC_WITH_TIMER_TERMINATE':
ui.terminate()
# Just in case...
sys.exit(100)
else:
ui.threadExited(thread)
class ExitNotifyThread(Thread):
"""This class is designed to alert a "monitor" to the fact that a thread has
exited and to provide for the ability for it to find out why."""
profiledir = None
"""class variable that is set to the profile directory if required"""
def run(self):
global exitthreads
self.threadid = get_ident()
try:
if not ExitNotifyThread.profiledir: # normal case
Thread.run(self)
else:
try:
import cProfile as profile
except ImportError:
import profile
prof = profile.Profile()
try:
prof = prof.runctx("Thread.run(self)", globals(), locals())
except SystemExit:
pass
prof.dump_stats(os.path.join(ExitNotifyThread.profiledir,
"%s_%s.prof" % (self.threadid, self.getName())))
except:
self.setExitCause('EXCEPTION')
if sys:
self.setExitException(sys.exc_info()[1])
tb = traceback.format_exc()
self.setExitStackTrace(tb)
else:
self.setExitCause('NORMAL')
if not hasattr(self, 'exitmessage'):
self.setExitMessage(None)
if exitthreads:
exitthreads.put(self, True)
def setExitCause(self, cause):
self.exitcause = cause
def getExitCause(self):
"""Returns the cause of the exit, one of:
'EXCEPTION' -- the thread aborted because of an exception
'NORMAL' -- normal termination."""
return self.exitcause
def setExitException(self, exc):
self.exitexception = exc
def getExitException(self):
"""If getExitCause() is 'EXCEPTION', holds the value from
sys.exc_info()[1] for this exception."""
return self.exitexception
def setExitStackTrace(self, st):
self.exitstacktrace = st
def getExitStackTrace(self):
"""If getExitCause() is 'EXCEPTION', returns a string representing
the stack trace for this exception."""
return self.exitstacktrace
def setExitMessage(self, msg):
"""Sets the exit message to be fetched by a subsequent call to
getExitMessage. This message may be any object or type except
None."""
self.exitmessage = msg
def getExitMessage(self):
"""For any exit cause, returns the message previously set by
a call to setExitMessage(), or None if there was no such message
set."""
return self.exitmessage
@classmethod
def set_profiledir(cls, directory):
"""If set, will output profile information to 'directory'"""
cls.profiledir = directory
######################################################################
# Instance-limited threads
######################################################################
2002-07-04 04:59:19 +01:00
instancelimitedsems = {}
instancelimitedlock = Lock()
def initInstanceLimit(instancename, instancemax):
"""Initialize the instance-limited thread implementation to permit
up to intancemax threads with the given instancename."""
2002-07-04 04:59:19 +01:00
instancelimitedlock.acquire()
if not instancelimitedsems.has_key(instancename):
instancelimitedsems[instancename] = BoundedSemaphore(instancemax)
instancelimitedlock.release()
class InstanceLimitedThread(ExitNotifyThread):
2002-07-04 04:59:19 +01:00
def __init__(self, instancename, *args, **kwargs):
self.instancename = instancename
super(InstanceLimitedThread, self).__init__(*args, **kwargs)
2002-07-04 04:59:19 +01:00
def start(self):
instancelimitedsems[self.instancename].acquire()
ExitNotifyThread.start(self)
2002-07-04 04:59:19 +01:00
def run(self):
try:
ExitNotifyThread.run(self)
2002-07-04 04:59:19 +01:00
finally:
if instancelimitedsems and instancelimitedsems[self.instancename]:
instancelimitedsems[self.instancename].release()