Remove weird SigListener class
The SigListener class was used to queue folders that we need to sync and to receive "resync" and "abort" signals. It was undocumented and weird and we had to pass "siglisteners" through the whole program. Simply do away with it, and make 2 functions in the Account() class: set_abort_event and get_abort_event which can be used to set and check for such signals. This way we do not need to pass siglisteners all over the place. Tested Blinkenlights and TTYUI uis to make sure that SIGUSR1 and SIGUSR2 actually still work. Document those signals in MANUAL.rst. They were completly undocumented. This simplifies the code and interdependencies by passing less stuff around. Removes an undocumented and weirdly named class. Signed-off-by: Sebastian Spaeth <Sebastian@SSpaeth.de> Signed-off-by: Nicolas Sebrecht <nicolas.s-dev@laposte.net>
This commit is contained in:

committed by
Nicolas Sebrecht

parent
ac27c93c83
commit
89619838b0
@@ -20,76 +20,11 @@ from offlineimap.repository import Repository
|
||||
from offlineimap.ui import getglobalui
|
||||
from offlineimap.threadutil import InstanceLimitedThread
|
||||
from subprocess import Popen, PIPE
|
||||
from threading import Lock
|
||||
from threading import Event
|
||||
import os
|
||||
from Queue import Queue
|
||||
import sys
|
||||
import traceback
|
||||
|
||||
class SigListener(Queue):
|
||||
def __init__(self):
|
||||
self.folderlock = Lock()
|
||||
self.folders = None
|
||||
Queue.__init__(self, 20)
|
||||
def put_nowait(self, sig):
|
||||
self.folderlock.acquire()
|
||||
try:
|
||||
if sig == 1:
|
||||
if self.folders is None or not self.autorefreshes:
|
||||
# folders haven't yet been added, or this account is once-only; drop signal
|
||||
return
|
||||
elif self.folders:
|
||||
for foldernr in range(len(self.folders)):
|
||||
# requeue folder
|
||||
self.folders[foldernr][1] = True
|
||||
self.quick = False
|
||||
return
|
||||
# else folders have already been cleared, put signal...
|
||||
finally:
|
||||
self.folderlock.release()
|
||||
Queue.put_nowait(self, sig)
|
||||
def addfolders(self, remotefolders, autorefreshes, quick):
|
||||
self.folderlock.acquire()
|
||||
try:
|
||||
self.folders = []
|
||||
self.quick = quick
|
||||
self.autorefreshes = autorefreshes
|
||||
for folder in remotefolders:
|
||||
# new folders are queued
|
||||
self.folders.append([folder, True])
|
||||
finally:
|
||||
self.folderlock.release()
|
||||
def clearfolders(self):
|
||||
self.folderlock.acquire()
|
||||
try:
|
||||
for folder, queued in self.folders:
|
||||
if queued:
|
||||
# some folders still in queue
|
||||
return False
|
||||
self.folders[:] = []
|
||||
return True
|
||||
finally:
|
||||
self.folderlock.release()
|
||||
def queuedfolders(self):
|
||||
self.folderlock.acquire()
|
||||
try:
|
||||
dirty = True
|
||||
while dirty:
|
||||
dirty = False
|
||||
for foldernr, (folder, queued) in enumerate(self.folders):
|
||||
if queued:
|
||||
# mark folder as no longer queued
|
||||
self.folders[foldernr][1] = False
|
||||
dirty = True
|
||||
quick = self.quick
|
||||
self.folderlock.release()
|
||||
yield (folder, quick)
|
||||
self.folderlock.acquire()
|
||||
except:
|
||||
self.folderlock.release()
|
||||
raise
|
||||
self.folderlock.release()
|
||||
|
||||
def getaccountlist(customconfig):
|
||||
return customconfig.getsectionlist('Account')
|
||||
|
||||
@@ -110,6 +45,8 @@ class Account(CustomConfig.ConfigHelperMixin):
|
||||
Most of the time you will actually want to use the derived
|
||||
:class:`accounts.SyncableAccount` which contains all functions used
|
||||
for syncing an account."""
|
||||
#signal gets set when we should stop looping
|
||||
abort_signal = Event()
|
||||
|
||||
def __init__(self, config, name):
|
||||
"""
|
||||
@@ -144,13 +81,49 @@ class Account(CustomConfig.ConfigHelperMixin):
|
||||
def getsection(self):
|
||||
return 'Account ' + self.getname()
|
||||
|
||||
def sleeper(self, siglistener):
|
||||
"""Sleep handler. Returns same value as UIBase.sleep:
|
||||
0 if timeout expired, 1 if there was a request to cancel the timer,
|
||||
and 2 if there is a request to abort the program.
|
||||
@classmethod
|
||||
def set_abort_event(cls, config, signum):
|
||||
"""Set skip sleep/abort event for all accounts
|
||||
|
||||
Also, returns 100 if configured to not sleep at all."""
|
||||
|
||||
If we want to skip a current (or the next) sleep, or if we want
|
||||
to abort an autorefresh loop, the main thread can use
|
||||
set_abort_event() to send the corresponding signal. Signum = 1
|
||||
implies that we want all accounts to abort or skip the current
|
||||
or next sleep phase. Signum = 2 will end the autorefresh loop,
|
||||
ie all accounts will return after they finished a sync.
|
||||
|
||||
This is a class method, it will send the signal to all accounts.
|
||||
"""
|
||||
if signum == 1:
|
||||
# resync signal, set config option for all accounts
|
||||
for acctsection in getaccountlist(config):
|
||||
config.set('Account ' + acctsection, "skipsleep", '1')
|
||||
elif signum == 2:
|
||||
# don't autorefresh anymore
|
||||
cls.abort_signal.set()
|
||||
|
||||
def get_abort_event(self):
|
||||
"""Checks if an abort signal had been sent
|
||||
|
||||
If the 'skipsleep' config option for this account had been set,
|
||||
with `set_abort_event(config, 1)` it will get cleared in this
|
||||
function. Ie, we will only skip one sleep and not all.
|
||||
|
||||
:returns: True, if the main thread had called
|
||||
:meth:`set_abort_event` earlier, otherwise 'False'.
|
||||
"""
|
||||
skipsleep = self.getconfboolean("skipsleep", 0)
|
||||
if skipsleep:
|
||||
self.config.set(self.getsection(), "skipsleep", '0')
|
||||
return skipsleep or Account.abort_signal.is_set()
|
||||
|
||||
def sleeper(self):
|
||||
"""Sleep if the account is set to autorefresh
|
||||
|
||||
:returns: 0:timeout expired, 1: canceled the timer,
|
||||
2:request to abort the program,
|
||||
100: if configured to not sleep at all.
|
||||
"""
|
||||
if not self.refreshperiod:
|
||||
return 100
|
||||
|
||||
@@ -165,22 +138,18 @@ class Account(CustomConfig.ConfigHelperMixin):
|
||||
item.startkeepalive()
|
||||
|
||||
refreshperiod = int(self.refreshperiod * 60)
|
||||
# try:
|
||||
# sleepresult = siglistener.get_nowait()
|
||||
# # retrieved signal before sleep started
|
||||
# if sleepresult == 1:
|
||||
# # catching signal 1 here means folders were cleared before signal was posted
|
||||
# pass
|
||||
# except Empty:
|
||||
# sleepresult = self.ui.sleep(refreshperiod, siglistener)
|
||||
sleepresult = self.ui.sleep(refreshperiod, siglistener)
|
||||
if sleepresult == 1:
|
||||
self.quicknum = 0
|
||||
sleepresult = self.ui.sleep(refreshperiod, self)
|
||||
|
||||
# Cancel keepalive
|
||||
for item in kaobjs:
|
||||
item.stopkeepalive()
|
||||
return sleepresult
|
||||
|
||||
if sleepresult:
|
||||
if Account.abort_signal.is_set():
|
||||
return 2
|
||||
self.quicknum = 0
|
||||
return 1
|
||||
return 0
|
||||
|
||||
|
||||
class SyncableAccount(Account):
|
||||
@@ -190,14 +159,13 @@ class SyncableAccount(Account):
|
||||
functions :meth:`syncrunner`, :meth:`sync`, :meth:`syncfolders`,
|
||||
used for syncing."""
|
||||
|
||||
def syncrunner(self, siglistener):
|
||||
def syncrunner(self):
|
||||
self.ui.registerthread(self.name)
|
||||
self.ui.acct(self.name)
|
||||
accountmetadata = self.getaccountmeta()
|
||||
if not os.path.exists(accountmetadata):
|
||||
os.mkdir(accountmetadata, 0700)
|
||||
|
||||
# get all three repositories
|
||||
self.remoterepos = Repository(self, 'remote')
|
||||
self.localrepos = Repository(self, 'local')
|
||||
self.statusrepos = Repository(self, 'status')
|
||||
@@ -207,36 +175,40 @@ class SyncableAccount(Account):
|
||||
while looping:
|
||||
try:
|
||||
try:
|
||||
self.sync(siglistener)
|
||||
self.sync()
|
||||
except (KeyboardInterrupt, SystemExit):
|
||||
raise
|
||||
except OfflineImapError, e:
|
||||
self.ui.warn(e.reason)
|
||||
#stop looping and bubble up Exception if needed
|
||||
# Stop looping and bubble up Exception if needed.
|
||||
if e.severity >= OfflineImapError.ERROR.REPO:
|
||||
if looping:
|
||||
if looping:
|
||||
looping -= 1
|
||||
if e.severity >= OfflineImapError.ERROR.CRITICAL:
|
||||
raise
|
||||
except:
|
||||
self.ui.warn("Error occured attempting to sync "\
|
||||
"account '%s':\n"% (self, traceback.format_exc()))
|
||||
self.ui.warn("Error occured attempting to sync account "
|
||||
"'%s':\n" % (self, traceback.format_exc()))
|
||||
else:
|
||||
# after success sync, reset the looping counter to 3
|
||||
if self.refreshperiod:
|
||||
looping = 3
|
||||
finally:
|
||||
if self.sleeper(siglistener) >= 2:
|
||||
if looping and self.sleeper() >= 2:
|
||||
looping = 0
|
||||
self.ui.acctdone(self.name)
|
||||
|
||||
|
||||
def getaccountmeta(self):
|
||||
return os.path.join(self.metadatadir, 'Account-' + self.name)
|
||||
|
||||
def sync(self, siglistener):
|
||||
# We don't need an account lock because syncitall() goes through
|
||||
# each account once, then waits for all to finish.
|
||||
def sync(self):
|
||||
"""Synchronize the account once, then return
|
||||
|
||||
Assumes that `self.remoterepos`, `self.localrepos`, and
|
||||
`self.statusrepos` has already been populated, so it should only
|
||||
be called from the :meth:`syncrunner` function.
|
||||
"""
|
||||
folderthreads = []
|
||||
|
||||
hook = self.getconf('presynchook', '')
|
||||
self.callhook(hook)
|
||||
@@ -263,23 +235,20 @@ class SyncableAccount(Account):
|
||||
self.ui.syncfolders(remoterepos, localrepos)
|
||||
remoterepos.syncfoldersto(localrepos, [statusrepos])
|
||||
|
||||
siglistener.addfolders(remoterepos.getfolders(), bool(self.refreshperiod), quick)
|
||||
|
||||
while True:
|
||||
folderthreads = []
|
||||
for remotefolder, quick in siglistener.queuedfolders():
|
||||
thread = InstanceLimitedThread(\
|
||||
instancename = 'FOLDER_' + self.remoterepos.getname(),
|
||||
target = syncfolder,
|
||||
name = "Folder sync [%s]" % self,
|
||||
args = (self.name, remoterepos, remotefolder, localrepos,
|
||||
statusrepos, quick))
|
||||
thread.setDaemon(1)
|
||||
thread.start()
|
||||
folderthreads.append(thread)
|
||||
threadutil.threadsreset(folderthreads)
|
||||
if siglistener.clearfolders():
|
||||
break
|
||||
# iterate through all folders on the remote repo and sync
|
||||
for remotefolder in remoterepos.getfolders():
|
||||
thread = InstanceLimitedThread(\
|
||||
instancename = 'FOLDER_' + self.remoterepos.getname(),
|
||||
target = syncfolder,
|
||||
name = "Folder sync [%s]" % self,
|
||||
args = (self.name, remoterepos, remotefolder, localrepos,
|
||||
statusrepos, quick))
|
||||
thread.setDaemon(1)
|
||||
thread.start()
|
||||
folderthreads.append(thread)
|
||||
# wait for all threads to finish
|
||||
for thr in folderthreads:
|
||||
thr.join()
|
||||
mbnames.write()
|
||||
localrepos.forgetfolders()
|
||||
remoterepos.forgetfolders()
|
||||
|
Reference in New Issue
Block a user