2003-04-18 03:18:34 +01:00
|
|
|
# Copyright (C) 2002, 2003 John Goerzen
|
2002-07-04 01:02:10 +01:00
|
|
|
# Thread support module
|
|
|
|
# <jgoerzen@complete.org>
|
|
|
|
#
|
|
|
|
# This program is free software; you can redistribute it and/or modify
|
|
|
|
# it under the terms of the GNU General Public License as published by
|
2003-04-16 20:23:45 +01:00
|
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
|
|
# (at your option) any later version.
|
2002-07-04 01:02:10 +01:00
|
|
|
#
|
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
# GNU General Public License for more details.
|
|
|
|
#
|
|
|
|
# You should have received a copy of the GNU General Public License
|
|
|
|
# along with this program; if not, write to the Free Software
|
2006-08-12 05:15:55 +01:00
|
|
|
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
|
2002-07-04 01:02:10 +01:00
|
|
|
|
2002-07-04 02:35:05 +01:00
|
|
|
from threading import *
|
2002-07-05 03:34:39 +01:00
|
|
|
from StringIO import StringIO
|
2008-08-05 00:05:29 -05:00
|
|
|
from Queue import Queue, Empty
|
|
|
|
import sys, traceback, thread, time
|
2002-10-07 22:17:13 +01:00
|
|
|
from offlineimap.ui import UIBase # for getglobalui()
|
2002-07-23 02:48:15 +01:00
|
|
|
|
|
|
|
profiledir = None
|
|
|
|
|
|
|
|
def setprofiledir(newdir):
|
|
|
|
global profiledir
|
|
|
|
profiledir = newdir
|
2002-07-05 03:34:39 +01:00
|
|
|
|
|
|
|
######################################################################
|
|
|
|
# General utilities
|
|
|
|
######################################################################
|
2002-07-04 01:02:10 +01:00
|
|
|
|
2002-07-04 02:35:05 +01:00
|
|
|
def semaphorereset(semaphore, originalstate):
|
2002-07-04 01:02:10 +01:00
|
|
|
"""Wait until the semaphore gets back to its original state -- all acquired
|
|
|
|
resources released."""
|
|
|
|
for i in range(originalstate):
|
|
|
|
semaphore.acquire()
|
|
|
|
# Now release these.
|
|
|
|
for i in range(originalstate):
|
|
|
|
semaphore.release()
|
|
|
|
|
2002-07-04 02:35:05 +01:00
|
|
|
def semaphorewait(semaphore):
|
|
|
|
semaphore.acquire()
|
|
|
|
semaphore.release()
|
|
|
|
|
|
|
|
def threadsreset(threadlist):
|
2002-07-11 07:53:09 +01:00
|
|
|
for thr in threadlist:
|
|
|
|
thr.join()
|
2002-07-04 04:59:19 +01:00
|
|
|
|
2003-01-04 05:57:46 +01:00
|
|
|
class threadlist:
|
|
|
|
def __init__(self):
|
|
|
|
self.lock = Lock()
|
|
|
|
self.list = []
|
|
|
|
|
|
|
|
def add(self, thread):
|
|
|
|
self.lock.acquire()
|
|
|
|
try:
|
|
|
|
self.list.append(thread)
|
|
|
|
finally:
|
|
|
|
self.lock.release()
|
|
|
|
|
|
|
|
def remove(self, thread):
|
|
|
|
self.lock.acquire()
|
|
|
|
try:
|
|
|
|
self.list.remove(thread)
|
|
|
|
finally:
|
|
|
|
self.lock.release()
|
|
|
|
|
|
|
|
def pop(self):
|
|
|
|
self.lock.acquire()
|
|
|
|
try:
|
|
|
|
if not len(self.list):
|
|
|
|
return None
|
|
|
|
return self.list.pop()
|
|
|
|
finally:
|
|
|
|
self.lock.release()
|
|
|
|
|
|
|
|
def reset(self):
|
|
|
|
while 1:
|
|
|
|
thread = self.pop()
|
|
|
|
if not thread:
|
|
|
|
return
|
|
|
|
thread.join()
|
|
|
|
|
|
|
|
|
2002-07-05 03:34:39 +01:00
|
|
|
######################################################################
|
|
|
|
# Exit-notify threads
|
|
|
|
######################################################################
|
|
|
|
|
2008-08-05 00:05:29 -05:00
|
|
|
exitthreads = Queue(100)
|
2002-07-05 03:34:39 +01:00
|
|
|
inited = 0
|
|
|
|
|
|
|
|
def initexitnotify():
|
|
|
|
"""Initialize the exit notify system. This MUST be called from the
|
|
|
|
SAME THREAD that will call monitorloop BEFORE it calls monitorloop.
|
|
|
|
This SHOULD be called before the main thread starts any other
|
|
|
|
ExitNotifyThreads, or else it may miss the ability to catch the exit
|
|
|
|
status from them!"""
|
2002-07-11 10:42:27 +01:00
|
|
|
pass
|
2002-07-05 03:34:39 +01:00
|
|
|
|
|
|
|
def exitnotifymonitorloop(callback):
|
|
|
|
"""Enter an infinite "monitoring" loop. The argument, callback,
|
|
|
|
defines the function to call when an ExitNotifyThread has terminated.
|
|
|
|
That function is called with a single argument -- the ExitNotifyThread
|
|
|
|
that has terminated. The monitor will not continue to monitor for
|
|
|
|
other threads until the function returns, so if it intends to perform
|
|
|
|
long calculations, it should start a new thread itself -- but NOT
|
|
|
|
an ExitNotifyThread, or else an infinite loop may result. Furthermore,
|
|
|
|
the monitor will hold the lock all the while the other thread is waiting.
|
|
|
|
"""
|
2008-08-02 15:10:11 -05:00
|
|
|
global exitthreads
|
2002-07-05 03:34:39 +01:00
|
|
|
while 1: # Loop forever.
|
2008-08-05 00:05:29 -05:00
|
|
|
try:
|
|
|
|
thrd = exitthreads.get(False)
|
|
|
|
callback(thrd)
|
|
|
|
except Empty:
|
|
|
|
time.sleep(1)
|
2002-07-05 03:34:39 +01:00
|
|
|
|
2002-10-07 22:17:13 +01:00
|
|
|
def threadexited(thread):
|
|
|
|
"""Called when a thread exits."""
|
|
|
|
ui = UIBase.getglobalui()
|
|
|
|
if thread.getExitCause() == 'EXCEPTION':
|
|
|
|
if isinstance(thread.getExitException(), SystemExit):
|
|
|
|
# Bring a SystemExit into the main thread.
|
|
|
|
# Do not send it back to UI layer right now.
|
|
|
|
# Maybe later send it to ui.terminate?
|
|
|
|
raise SystemExit
|
|
|
|
ui.threadException(thread) # Expected to terminate
|
|
|
|
sys.exit(100) # Just in case...
|
|
|
|
os._exit(100)
|
2003-04-29 01:04:22 +01:00
|
|
|
elif thread.getExitMessage() == 'SYNC_WITH_TIMER_TERMINATE':
|
|
|
|
ui.terminate()
|
|
|
|
# Just in case...
|
|
|
|
sys.exit(100)
|
|
|
|
os._exit(100)
|
2002-10-07 22:17:13 +01:00
|
|
|
else:
|
|
|
|
ui.threadExited(thread)
|
|
|
|
|
2002-07-05 03:34:39 +01:00
|
|
|
class ExitNotifyThread(Thread):
|
|
|
|
"""This class is designed to alert a "monitor" to the fact that a thread has
|
|
|
|
exited and to provide for the ability for it to find out why."""
|
|
|
|
def run(self):
|
2008-08-02 15:10:11 -05:00
|
|
|
global exitthreads, profiledir
|
2002-07-11 07:53:09 +01:00
|
|
|
self.threadid = thread.get_ident()
|
2002-07-05 03:34:39 +01:00
|
|
|
try:
|
2002-07-23 02:48:15 +01:00
|
|
|
if not profiledir: # normal case
|
|
|
|
Thread.run(self)
|
|
|
|
else:
|
2005-02-10 05:37:33 +01:00
|
|
|
import profile
|
2002-07-23 02:48:15 +01:00
|
|
|
prof = profile.Profile()
|
|
|
|
try:
|
|
|
|
prof = prof.runctx("Thread.run(self)", globals(), locals())
|
|
|
|
except SystemExit:
|
|
|
|
pass
|
|
|
|
prof.dump_stats( \
|
|
|
|
profiledir + "/" + str(self.threadid) + "_" + \
|
|
|
|
self.getName() + ".prof")
|
2002-07-05 03:34:39 +01:00
|
|
|
except:
|
|
|
|
self.setExitCause('EXCEPTION')
|
|
|
|
self.setExitException(sys.exc_info()[1])
|
|
|
|
sbuf = StringIO()
|
|
|
|
traceback.print_exc(file = sbuf)
|
|
|
|
self.setExitStackTrace(sbuf.getvalue())
|
|
|
|
else:
|
|
|
|
self.setExitCause('NORMAL')
|
|
|
|
if not hasattr(self, 'exitmessage'):
|
|
|
|
self.setExitMessage(None)
|
2008-08-02 15:10:11 -05:00
|
|
|
|
|
|
|
exitthreads.put(self, True)
|
2002-07-05 03:34:39 +01:00
|
|
|
|
|
|
|
def setExitCause(self, cause):
|
|
|
|
self.exitcause = cause
|
|
|
|
def getExitCause(self):
|
|
|
|
"""Returns the cause of the exit, one of:
|
|
|
|
'EXCEPTION' -- the thread aborted because of an exception
|
|
|
|
'NORMAL' -- normal termination."""
|
|
|
|
return self.exitcause
|
|
|
|
def setExitException(self, exc):
|
|
|
|
self.exitexception = exc
|
|
|
|
def getExitException(self):
|
|
|
|
"""If getExitCause() is 'EXCEPTION', holds the value from
|
|
|
|
sys.exc_info()[1] for this exception."""
|
|
|
|
return self.exitexception
|
|
|
|
def setExitStackTrace(self, st):
|
|
|
|
self.exitstacktrace = st
|
|
|
|
def getExitStackTrace(self):
|
|
|
|
"""If getExitCause() is 'EXCEPTION', returns a string representing
|
|
|
|
the stack trace for this exception."""
|
|
|
|
return self.exitstacktrace
|
|
|
|
def setExitMessage(self, msg):
|
|
|
|
"""Sets the exit message to be fetched by a subsequent call to
|
|
|
|
getExitMessage. This message may be any object or type except
|
|
|
|
None."""
|
|
|
|
self.exitmessage = msg
|
|
|
|
def getExitMessage(self):
|
|
|
|
"""For any exit cause, returns the message previously set by
|
|
|
|
a call to setExitMessage(), or None if there was no such message
|
|
|
|
set."""
|
|
|
|
return self.exitmessage
|
|
|
|
|
|
|
|
|
|
|
|
######################################################################
|
|
|
|
# Instance-limited threads
|
|
|
|
######################################################################
|
|
|
|
|
2002-07-04 04:59:19 +01:00
|
|
|
instancelimitedsems = {}
|
|
|
|
instancelimitedlock = Lock()
|
|
|
|
|
|
|
|
def initInstanceLimit(instancename, instancemax):
|
2002-07-05 03:34:39 +01:00
|
|
|
"""Initialize the instance-limited thread implementation to permit
|
|
|
|
up to intancemax threads with the given instancename."""
|
2002-07-04 04:59:19 +01:00
|
|
|
instancelimitedlock.acquire()
|
|
|
|
if not instancelimitedsems.has_key(instancename):
|
|
|
|
instancelimitedsems[instancename] = BoundedSemaphore(instancemax)
|
|
|
|
instancelimitedlock.release()
|
|
|
|
|
2002-07-05 03:34:39 +01:00
|
|
|
class InstanceLimitedThread(ExitNotifyThread):
|
2002-07-04 04:59:19 +01:00
|
|
|
def __init__(self, instancename, *args, **kwargs):
|
|
|
|
self.instancename = instancename
|
|
|
|
|
2002-07-05 03:34:39 +01:00
|
|
|
apply(ExitNotifyThread.__init__, (self,) + args, kwargs)
|
2002-07-04 04:59:19 +01:00
|
|
|
|
|
|
|
def start(self):
|
|
|
|
instancelimitedsems[self.instancename].acquire()
|
2002-07-05 03:34:39 +01:00
|
|
|
ExitNotifyThread.start(self)
|
2002-07-04 04:59:19 +01:00
|
|
|
|
|
|
|
def run(self):
|
|
|
|
try:
|
2002-07-05 03:34:39 +01:00
|
|
|
ExitNotifyThread.run(self)
|
2002-07-04 04:59:19 +01:00
|
|
|
finally:
|
|
|
|
instancelimitedsems[self.instancename].release()
|
|
|
|
|
|
|
|
|
2003-01-06 22:58:29 +01:00
|
|
|
######################################################################
|
|
|
|
# Multi-lock -- capable of handling a single thread requesting a lock
|
|
|
|
# multiple times
|
|
|
|
######################################################################
|
|
|
|
|
|
|
|
class MultiLock:
|
|
|
|
def __init__(self):
|
|
|
|
self.lock = Lock()
|
|
|
|
self.statuslock = Lock()
|
|
|
|
self.locksheld = {}
|
|
|
|
|
|
|
|
def acquire(self):
|
|
|
|
"""Obtain a lock. Provides nice support for a single
|
|
|
|
thread trying to lock it several times -- as may be the case
|
|
|
|
if one I/O-using object calls others, while wanting to make it all
|
|
|
|
an atomic operation. Keeps a "lock request count" for the current
|
|
|
|
thread, and acquires the lock when it goes above zero, releases when
|
|
|
|
it goes below one.
|
|
|
|
|
|
|
|
This call is always blocking."""
|
|
|
|
|
|
|
|
# First, check to see if this thread already has a lock.
|
|
|
|
# If so, increment the lock count and just return.
|
|
|
|
self.statuslock.acquire()
|
|
|
|
try:
|
|
|
|
threadid = thread.get_ident()
|
|
|
|
|
|
|
|
if threadid in self.locksheld:
|
|
|
|
self.locksheld[threadid] += 1
|
|
|
|
return
|
|
|
|
else:
|
|
|
|
# This is safe because it is a per-thread structure
|
|
|
|
self.locksheld[threadid] = 1
|
|
|
|
finally:
|
|
|
|
self.statuslock.release()
|
|
|
|
self.lock.acquire()
|
|
|
|
|
|
|
|
def release(self):
|
|
|
|
self.statuslock.acquire()
|
|
|
|
try:
|
|
|
|
threadid = thread.get_ident()
|
|
|
|
if self.locksheld[threadid] > 1:
|
|
|
|
self.locksheld[threadid] -= 1
|
|
|
|
return
|
|
|
|
else:
|
|
|
|
del self.locksheld[threadid]
|
|
|
|
self.lock.release()
|
|
|
|
finally:
|
|
|
|
self.statuslock.release()
|
|
|
|
|
|
|
|
|