Merge branch 'ss/remove-siglistener-class' into next
Conflicts: offlineimap/accounts.py
This commit is contained in:
commit
40900dcfb6
@ -262,6 +262,21 @@ MachineUI generates output in a machine-parsable format. It is designed
|
||||
for other programs that will interface to OfflineIMAP.
|
||||
|
||||
|
||||
Signals
|
||||
=======
|
||||
|
||||
OfflineImap listens to the unix signals SIGUSR1 and SIGUSR2.
|
||||
|
||||
If sent a SIGUSR1 it will abort any current (or next future) sleep of all
|
||||
accounts that are configured to "autorefresh". In effect, this will trigger a
|
||||
full sync of all accounts to be performed as soon as possible.
|
||||
|
||||
If sent a SIGUSR2, it will stop "autorefresh mode" for all accounts. That is,
|
||||
accounts will abort any current sleep and will exit after a currently running
|
||||
synchronization has finished. This signal can be used to gracefully exit out of
|
||||
a running offlineimap "daemon".
|
||||
|
||||
|
||||
KNOWN BUGS
|
||||
==========
|
||||
|
||||
|
@ -20,76 +20,11 @@ from offlineimap.repository import Repository
|
||||
from offlineimap.ui import getglobalui
|
||||
from offlineimap.threadutil import InstanceLimitedThread
|
||||
from subprocess import Popen, PIPE
|
||||
from threading import Lock
|
||||
from threading import Event
|
||||
import os
|
||||
from Queue import Queue
|
||||
import sys
|
||||
import traceback
|
||||
|
||||
class SigListener(Queue):
|
||||
def __init__(self):
|
||||
self.folderlock = Lock()
|
||||
self.folders = None
|
||||
Queue.__init__(self, 20)
|
||||
def put_nowait(self, sig):
|
||||
self.folderlock.acquire()
|
||||
try:
|
||||
if sig == 1:
|
||||
if self.folders is None or not self.autorefreshes:
|
||||
# folders haven't yet been added, or this account is once-only; drop signal
|
||||
return
|
||||
elif self.folders:
|
||||
for foldernr in range(len(self.folders)):
|
||||
# requeue folder
|
||||
self.folders[foldernr][1] = True
|
||||
self.quick = False
|
||||
return
|
||||
# else folders have already been cleared, put signal...
|
||||
finally:
|
||||
self.folderlock.release()
|
||||
Queue.put_nowait(self, sig)
|
||||
def addfolders(self, remotefolders, autorefreshes, quick):
|
||||
self.folderlock.acquire()
|
||||
try:
|
||||
self.folders = []
|
||||
self.quick = quick
|
||||
self.autorefreshes = autorefreshes
|
||||
for folder in remotefolders:
|
||||
# new folders are queued
|
||||
self.folders.append([folder, True])
|
||||
finally:
|
||||
self.folderlock.release()
|
||||
def clearfolders(self):
|
||||
self.folderlock.acquire()
|
||||
try:
|
||||
for folder, queued in self.folders:
|
||||
if queued:
|
||||
# some folders still in queue
|
||||
return False
|
||||
self.folders[:] = []
|
||||
return True
|
||||
finally:
|
||||
self.folderlock.release()
|
||||
def queuedfolders(self):
|
||||
self.folderlock.acquire()
|
||||
try:
|
||||
dirty = True
|
||||
while dirty:
|
||||
dirty = False
|
||||
for foldernr, (folder, queued) in enumerate(self.folders):
|
||||
if queued:
|
||||
# mark folder as no longer queued
|
||||
self.folders[foldernr][1] = False
|
||||
dirty = True
|
||||
quick = self.quick
|
||||
self.folderlock.release()
|
||||
yield (folder, quick)
|
||||
self.folderlock.acquire()
|
||||
except:
|
||||
self.folderlock.release()
|
||||
raise
|
||||
self.folderlock.release()
|
||||
|
||||
def getaccountlist(customconfig):
|
||||
return customconfig.getsectionlist('Account')
|
||||
|
||||
@ -110,6 +45,8 @@ class Account(CustomConfig.ConfigHelperMixin):
|
||||
Most of the time you will actually want to use the derived
|
||||
:class:`accounts.SyncableAccount` which contains all functions used
|
||||
for syncing an account."""
|
||||
#signal gets set when we should stop looping
|
||||
abort_signal = Event()
|
||||
|
||||
def __init__(self, config, name):
|
||||
"""
|
||||
@ -144,13 +81,49 @@ class Account(CustomConfig.ConfigHelperMixin):
|
||||
def getsection(self):
|
||||
return 'Account ' + self.getname()
|
||||
|
||||
def sleeper(self, siglistener):
|
||||
"""Sleep handler. Returns same value as UIBase.sleep:
|
||||
0 if timeout expired, 1 if there was a request to cancel the timer,
|
||||
and 2 if there is a request to abort the program.
|
||||
@classmethod
|
||||
def set_abort_event(cls, config, signum):
|
||||
"""Set skip sleep/abort event for all accounts
|
||||
|
||||
Also, returns 100 if configured to not sleep at all."""
|
||||
|
||||
If we want to skip a current (or the next) sleep, or if we want
|
||||
to abort an autorefresh loop, the main thread can use
|
||||
set_abort_event() to send the corresponding signal. Signum = 1
|
||||
implies that we want all accounts to abort or skip the current
|
||||
or next sleep phase. Signum = 2 will end the autorefresh loop,
|
||||
ie all accounts will return after they finished a sync.
|
||||
|
||||
This is a class method, it will send the signal to all accounts.
|
||||
"""
|
||||
if signum == 1:
|
||||
# resync signal, set config option for all accounts
|
||||
for acctsection in getaccountlist(config):
|
||||
config.set('Account ' + acctsection, "skipsleep", '1')
|
||||
elif signum == 2:
|
||||
# don't autorefresh anymore
|
||||
cls.abort_signal.set()
|
||||
|
||||
def get_abort_event(self):
|
||||
"""Checks if an abort signal had been sent
|
||||
|
||||
If the 'skipsleep' config option for this account had been set,
|
||||
with `set_abort_event(config, 1)` it will get cleared in this
|
||||
function. Ie, we will only skip one sleep and not all.
|
||||
|
||||
:returns: True, if the main thread had called
|
||||
:meth:`set_abort_event` earlier, otherwise 'False'.
|
||||
"""
|
||||
skipsleep = self.getconfboolean("skipsleep", 0)
|
||||
if skipsleep:
|
||||
self.config.set(self.getsection(), "skipsleep", '0')
|
||||
return skipsleep or Account.abort_signal.is_set()
|
||||
|
||||
def sleeper(self):
|
||||
"""Sleep if the account is set to autorefresh
|
||||
|
||||
:returns: 0:timeout expired, 1: canceled the timer,
|
||||
2:request to abort the program,
|
||||
100: if configured to not sleep at all.
|
||||
"""
|
||||
if not self.refreshperiod:
|
||||
return 100
|
||||
|
||||
@ -165,22 +138,18 @@ class Account(CustomConfig.ConfigHelperMixin):
|
||||
item.startkeepalive()
|
||||
|
||||
refreshperiod = int(self.refreshperiod * 60)
|
||||
# try:
|
||||
# sleepresult = siglistener.get_nowait()
|
||||
# # retrieved signal before sleep started
|
||||
# if sleepresult == 1:
|
||||
# # catching signal 1 here means folders were cleared before signal was posted
|
||||
# pass
|
||||
# except Empty:
|
||||
# sleepresult = self.ui.sleep(refreshperiod, siglistener)
|
||||
sleepresult = self.ui.sleep(refreshperiod, siglistener)
|
||||
if sleepresult == 1:
|
||||
self.quicknum = 0
|
||||
sleepresult = self.ui.sleep(refreshperiod, self)
|
||||
|
||||
# Cancel keepalive
|
||||
for item in kaobjs:
|
||||
item.stopkeepalive()
|
||||
return sleepresult
|
||||
|
||||
if sleepresult:
|
||||
if Account.abort_signal.is_set():
|
||||
return 2
|
||||
self.quicknum = 0
|
||||
return 1
|
||||
return 0
|
||||
|
||||
|
||||
class SyncableAccount(Account):
|
||||
@ -190,14 +159,13 @@ class SyncableAccount(Account):
|
||||
functions :meth:`syncrunner`, :meth:`sync`, :meth:`syncfolders`,
|
||||
used for syncing."""
|
||||
|
||||
def syncrunner(self, siglistener):
|
||||
def syncrunner(self):
|
||||
self.ui.registerthread(self.name)
|
||||
self.ui.acct(self.name)
|
||||
accountmetadata = self.getaccountmeta()
|
||||
if not os.path.exists(accountmetadata):
|
||||
os.mkdir(accountmetadata, 0700)
|
||||
|
||||
# get all three repositories
|
||||
self.remoterepos = Repository(self, 'remote')
|
||||
self.localrepos = Repository(self, 'local')
|
||||
self.statusrepos = Repository(self, 'status')
|
||||
@ -207,14 +175,14 @@ class SyncableAccount(Account):
|
||||
while looping:
|
||||
try:
|
||||
try:
|
||||
self.sync(siglistener)
|
||||
self.sync()
|
||||
except (KeyboardInterrupt, SystemExit):
|
||||
raise
|
||||
except OfflineImapError, e:
|
||||
self.ui.warn(e.reason)
|
||||
#stop looping and bubble up Exception if needed
|
||||
# Stop looping and bubble up Exception if needed.
|
||||
if e.severity >= OfflineImapError.ERROR.REPO:
|
||||
if looping:
|
||||
if looping:
|
||||
looping -= 1
|
||||
if e.severity >= OfflineImapError.ERROR.CRITICAL:
|
||||
raise
|
||||
@ -226,17 +194,21 @@ class SyncableAccount(Account):
|
||||
if self.refreshperiod:
|
||||
looping = 3
|
||||
finally:
|
||||
if self.sleeper(siglistener) >= 2:
|
||||
if looping and self.sleeper() >= 2:
|
||||
looping = 0
|
||||
self.ui.acctdone(self.name)
|
||||
|
||||
|
||||
def getaccountmeta(self):
|
||||
return os.path.join(self.metadatadir, 'Account-' + self.name)
|
||||
|
||||
def sync(self, siglistener):
|
||||
# We don't need an account lock because syncitall() goes through
|
||||
# each account once, then waits for all to finish.
|
||||
def sync(self):
|
||||
"""Synchronize the account once, then return
|
||||
|
||||
Assumes that `self.remoterepos`, `self.localrepos`, and
|
||||
`self.statusrepos` has already been populated, so it should only
|
||||
be called from the :meth:`syncrunner` function.
|
||||
"""
|
||||
folderthreads = []
|
||||
|
||||
hook = self.getconf('presynchook', '')
|
||||
self.callhook(hook)
|
||||
@ -263,23 +235,20 @@ class SyncableAccount(Account):
|
||||
self.ui.syncfolders(remoterepos, localrepos)
|
||||
remoterepos.syncfoldersto(localrepos, [statusrepos])
|
||||
|
||||
siglistener.addfolders(remoterepos.getfolders(), bool(self.refreshperiod), quick)
|
||||
|
||||
while True:
|
||||
folderthreads = []
|
||||
for remotefolder, quick in siglistener.queuedfolders():
|
||||
thread = InstanceLimitedThread(\
|
||||
instancename = 'FOLDER_' + self.remoterepos.getname(),
|
||||
target = syncfolder,
|
||||
name = "Folder sync [%s]" % self,
|
||||
args = (self.name, remoterepos, remotefolder, localrepos,
|
||||
statusrepos, quick))
|
||||
thread.setDaemon(1)
|
||||
thread.start()
|
||||
folderthreads.append(thread)
|
||||
threadutil.threadsreset(folderthreads)
|
||||
if siglistener.clearfolders():
|
||||
break
|
||||
# iterate through all folders on the remote repo and sync
|
||||
for remotefolder in remoterepos.getfolders():
|
||||
thread = InstanceLimitedThread(\
|
||||
instancename = 'FOLDER_' + self.remoterepos.getname(),
|
||||
target = syncfolder,
|
||||
name = "Folder sync [%s]" % self,
|
||||
args = (self.name, remoterepos, remotefolder, localrepos,
|
||||
statusrepos, quick))
|
||||
thread.setDaemon(1)
|
||||
thread.start()
|
||||
folderthreads.append(thread)
|
||||
# wait for all threads to finish
|
||||
for thr in folderthreads:
|
||||
thr.join()
|
||||
mbnames.write()
|
||||
localrepos.forgetfolders()
|
||||
remoterepos.forgetfolders()
|
||||
|
@ -254,7 +254,7 @@ class OfflineImap:
|
||||
config.set(section, "folderincludes", folderincludes)
|
||||
|
||||
self.lock(config, ui)
|
||||
|
||||
self.config = config
|
||||
|
||||
def sigterm_handler(signum, frame):
|
||||
# die immediately
|
||||
@ -315,21 +315,14 @@ class OfflineImap:
|
||||
threadutil.initInstanceLimit(instancename,
|
||||
config.getdefaultint('Repository ' + reposname,
|
||||
'maxconnections', 2))
|
||||
siglisteners = []
|
||||
def sig_handler(signum, frame):
|
||||
if signum == signal.SIGUSR1:
|
||||
# tell each account to do a full sync asap
|
||||
signum = (1,)
|
||||
elif signum == signal.SIGHUP:
|
||||
# tell each account to die asap
|
||||
signum = (2,)
|
||||
elif signum == signal.SIGUSR2:
|
||||
# tell each account to do a full sync asap, then die
|
||||
signum = (1, 2)
|
||||
# one listener per account thread (up to maxsyncaccounts)
|
||||
for listener in siglisteners:
|
||||
for sig in signum:
|
||||
listener.put_nowait(sig)
|
||||
def sig_handler(sig, frame):
|
||||
if sig == signal.SIGUSR1 or sig == signal.SIGHUP:
|
||||
# tell each account to stop sleeping
|
||||
accounts.Account.set_abort_event(self.config, 1)
|
||||
elif sig == signal.SIGUSR2:
|
||||
# tell each account to stop looping
|
||||
accounts.Account.set_abort_event(self.config, 2)
|
||||
|
||||
signal.signal(signal.SIGHUP,sig_handler)
|
||||
signal.signal(signal.SIGUSR1,sig_handler)
|
||||
signal.signal(signal.SIGUSR2,sig_handler)
|
||||
@ -340,14 +333,13 @@ class OfflineImap:
|
||||
|
||||
if options.singlethreading:
|
||||
#singlethreaded
|
||||
self.sync_singlethreaded(syncaccounts, config, siglisteners)
|
||||
self.sync_singlethreaded(syncaccounts, config)
|
||||
else:
|
||||
# multithreaded
|
||||
t = threadutil.ExitNotifyThread(target=syncmaster.syncitall,
|
||||
name='Sync Runner',
|
||||
kwargs = {'accounts': syncaccounts,
|
||||
'config': config,
|
||||
'siglisteners': siglisteners})
|
||||
'config': config})
|
||||
t.setDaemon(1)
|
||||
t.start()
|
||||
threadutil.exitnotifymonitorloop(threadutil.threadexited)
|
||||
@ -360,16 +352,13 @@ class OfflineImap:
|
||||
except:
|
||||
ui.mainException()
|
||||
|
||||
def sync_singlethreaded(self, accs, config, siglisteners):
|
||||
def sync_singlethreaded(self, accs, config):
|
||||
"""Executed if we do not want a separate syncmaster thread
|
||||
|
||||
:param accs: A list of accounts that should be synced
|
||||
:param config: The CustomConfig object
|
||||
:param siglisteners: The signal listeners list, defined in run()
|
||||
"""
|
||||
for accountname in accs:
|
||||
account = offlineimap.accounts.SyncableAccount(config, accountname)
|
||||
siglistener = offlineimap.accounts.SigListener()
|
||||
siglisteners.append(siglistener)
|
||||
threading.currentThread().name = "Account sync %s" % accountname
|
||||
account.syncrunner(siglistener=siglistener)
|
||||
account.syncrunner()
|
||||
|
@ -17,26 +17,22 @@
|
||||
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
|
||||
|
||||
from offlineimap.threadutil import threadlist, InstanceLimitedThread
|
||||
from offlineimap.accounts import SyncableAccount, SigListener
|
||||
from offlineimap.accounts import SyncableAccount
|
||||
from threading import currentThread
|
||||
|
||||
def syncaccount(threads, config, accountname, siglisteners):
|
||||
def syncaccount(threads, config, accountname):
|
||||
account = SyncableAccount(config, accountname)
|
||||
siglistener = SigListener()
|
||||
thread = InstanceLimitedThread(instancename = 'ACCOUNTLIMIT',
|
||||
target = account.syncrunner,
|
||||
name = "Account sync %s" % accountname,
|
||||
kwargs = {'siglistener': siglistener} )
|
||||
# the Sync Runner thread is the only one that will mutate siglisteners
|
||||
siglisteners.append(siglistener)
|
||||
name = "Account sync %s" % accountname)
|
||||
thread.setDaemon(1)
|
||||
thread.start()
|
||||
threads.add(thread)
|
||||
|
||||
def syncitall(accounts, config, siglisteners):
|
||||
def syncitall(accounts, config):
|
||||
currentThread().setExitMessage('SYNC_WITH_TIMER_TERMINATE')
|
||||
threads = threadlist()
|
||||
for accountname in accounts:
|
||||
syncaccount(threads, config, accountname, siglisteners)
|
||||
syncaccount(threads, config, accountname)
|
||||
# Wait for the threads to finish.
|
||||
threads.reset()
|
||||
|
@ -45,10 +45,6 @@ def semaphorereset(semaphore, originalstate):
|
||||
def semaphorewait(semaphore):
|
||||
semaphore.acquire()
|
||||
semaphore.release()
|
||||
|
||||
def threadsreset(threadlist):
|
||||
for thr in threadlist:
|
||||
thr.join()
|
||||
|
||||
class threadlist:
|
||||
def __init__(self):
|
||||
|
@ -132,10 +132,10 @@ class BlinkenBase:
|
||||
s.gettf().setcolor('white')
|
||||
s.__class__.__bases__[-1].callhook(s, msg)
|
||||
|
||||
def sleep(s, sleepsecs, siglistener):
|
||||
def sleep(s, sleepsecs, account):
|
||||
s.gettf().setcolor('red')
|
||||
s.getaccountframe().startsleep(sleepsecs)
|
||||
return UIBase.sleep(s, sleepsecs, siglistener)
|
||||
return UIBase.sleep(s, sleepsecs, account)
|
||||
|
||||
def sleeping(s, sleepsecs, remainingsecs):
|
||||
if remainingsecs and s.gettf().getcolor() == 'black':
|
||||
|
@ -557,10 +557,10 @@ class Blinkenlights(BlinkenBase, UIBase):
|
||||
s.c.stop()
|
||||
UIBase.mainException(s)
|
||||
|
||||
def sleep(s, sleepsecs, siglistener):
|
||||
def sleep(s, sleepsecs, account):
|
||||
s.gettf().setcolor('red')
|
||||
s._msg("Next sync in %d:%02d" % (sleepsecs / 60, sleepsecs % 60))
|
||||
return BlinkenBase.sleep(s, sleepsecs, siglistener)
|
||||
return BlinkenBase.sleep(s, sleepsecs, account)
|
||||
|
||||
if __name__ == '__main__':
|
||||
x = Blinkenlights(None)
|
||||
|
@ -342,24 +342,22 @@ class UIBase:
|
||||
|
||||
################################################## Other
|
||||
|
||||
def sleep(s, sleepsecs, siglistener):
|
||||
def sleep(s, sleepsecs, account):
|
||||
"""This function does not actually output anything, but handles
|
||||
the overall sleep, dealing with updates as necessary. It will,
|
||||
however, call sleeping() which DOES output something.
|
||||
|
||||
Returns 0 if timeout expired, 1 if there is a request to cancel
|
||||
the timer, and 2 if there is a request to abort the program."""
|
||||
|
||||
abortsleep = 0
|
||||
:returns: 0/False if timeout expired, 1/2/True if there is a
|
||||
request to cancel the timer.
|
||||
"""
|
||||
abortsleep = False
|
||||
while sleepsecs > 0 and not abortsleep:
|
||||
try:
|
||||
abortsleep = siglistener.get_nowait()
|
||||
# retrieved signal while sleeping: 1 means immediately resynch, 2 means immediately die
|
||||
except Empty:
|
||||
# no signal
|
||||
if account.get_abort_event():
|
||||
abortsleep = True
|
||||
else:
|
||||
abortsleep = s.sleeping(10, sleepsecs)
|
||||
sleepsecs -= 10
|
||||
s.sleeping(0, 0) # Done sleeping.
|
||||
sleepsecs -= 10
|
||||
s.sleeping(0, 0) # Done sleeping.
|
||||
return abortsleep
|
||||
|
||||
def sleeping(s, sleepsecs, remainingsecs):
|
||||
|
Loading…
x
Reference in New Issue
Block a user