docker-offlineimap/offlineimap/ui/Curses.py
Daniel Burrows 1844cefd17 Remove a redundant (and mostly harmless) output of the error string from the Curses UI.
It looks like I accidentally recorded the wrong version of Curses.py --
originally this code was there, but I moved it over to UIBase so it would
cover the TTY UI also.
2006-12-01 12:27:12 +01:00

590 lines
18 KiB
Python

# Curses-based interfaces
# Copyright (C) 2003 John Goerzen
# <jgoerzen@complete.org>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
from Blinkenlights import BlinkenBase
from UIBase import UIBase
from threading import *
import thread, time, sys, os, signal, time
from offlineimap import version, threadutil
from offlineimap.threadutil import MultiLock
import curses, curses.panel, curses.textpad, curses.wrapper
acctkeys = '1234567890abcdefghijklmnoprstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-=;/.,'
class CursesUtil:
def __init__(self):
self.pairlock = Lock()
self.iolock = MultiLock()
self.start()
def initpairs(self):
self.pairlock.acquire()
try:
self.pairs = {self._getpairindex(curses.COLOR_WHITE,
curses.COLOR_BLACK): 0}
self.nextpair = 1
finally:
self.pairlock.release()
def lock(self):
self.iolock.acquire()
def unlock(self):
self.iolock.release()
def locked(self, target, *args, **kwargs):
"""Perform an operation with full locking."""
self.lock()
try:
apply(target, args, kwargs)
finally:
self.unlock()
def refresh(self):
def lockedstuff():
curses.panel.update_panels()
curses.doupdate()
self.locked(lockedstuff)
def isactive(self):
return hasattr(self, 'stdscr')
def _getpairindex(self, fg, bg):
return '%d/%d' % (fg,bg)
def getpair(self, fg, bg):
if not self.has_color:
return 0
pindex = self._getpairindex(fg, bg)
self.pairlock.acquire()
try:
if self.pairs.has_key(pindex):
return curses.color_pair(self.pairs[pindex])
else:
self.pairs[pindex] = self.nextpair
curses.init_pair(self.nextpair, fg, bg)
self.nextpair += 1
return curses.color_pair(self.nextpair - 1)
finally:
self.pairlock.release()
def start(self):
self.stdscr = curses.initscr()
curses.noecho()
curses.cbreak()
self.stdscr.keypad(1)
try:
curses.start_color()
self.has_color = curses.has_colors()
except:
self.has_color = 0
self.oldcursor = None
try:
self.oldcursor = curses.curs_set(0)
except:
pass
self.stdscr.clear()
self.stdscr.refresh()
(self.height, self.width) = self.stdscr.getmaxyx()
self.initpairs()
def stop(self):
if not hasattr(self, 'stdscr'):
return
#self.stdscr.addstr(self.height - 1, 0, "\n",
# self.getpair(curses.COLOR_WHITE,
# curses.COLOR_BLACK))
if self.oldcursor != None:
curses.curs_set(self.oldcursor)
self.stdscr.refresh()
self.stdscr.keypad(0)
curses.nocbreak()
curses.echo()
curses.endwin()
del self.stdscr
def reset(self):
self.stop()
self.start()
class CursesAccountFrame:
def __init__(s, master, accountname):
s.c = master
s.children = []
s.accountname = accountname
def drawleadstr(s, secs = None):
if secs == None:
acctstr = '%s: [active] %13.13s: ' % (s.key, s.accountname)
else:
acctstr = '%s: [%3d:%02d] %13.13s: ' % (s.key,
secs / 60, secs % 60,
s.accountname)
s.c.locked(s.window.addstr, 0, 0, acctstr)
s.location = len(acctstr)
def setwindow(s, window, key):
s.window = window
s.key = key
s.drawleadstr()
for child in s.children:
child.update(window, 0, s.location)
s.location += 1
def getnewthreadframe(s):
tf = CursesThreadFrame(s.c, s.window, 0, s.location)
s.location += 1
s.children.append(tf)
return tf
def startsleep(s, sleepsecs):
s.sleeping_abort = 0
def sleeping(s, sleepsecs, remainingsecs):
if remainingsecs:
s.c.lock()
try:
s.drawleadstr(remainingsecs)
s.window.refresh()
finally:
s.c.unlock()
time.sleep(sleepsecs)
else:
s.c.lock()
try:
s.drawleadstr()
s.window.refresh()
finally:
s.c.unlock()
return s.sleeping_abort
def syncnow(s):
s.sleeping_abort = 1
class CursesThreadFrame:
def __init__(s, master, window, y, x):
"""master should be a CursesUtil object."""
s.c = master
s.window = window
s.x = x
s.y = y
s.colors = []
bg = curses.COLOR_BLACK
s.colormap = {'black': s.c.getpair(curses.COLOR_BLACK, bg),
'gray': s.c.getpair(curses.COLOR_WHITE, bg),
'white': curses.A_BOLD | s.c.getpair(curses.COLOR_WHITE, bg),
'blue': s.c.getpair(curses.COLOR_BLUE, bg),
'red': s.c.getpair(curses.COLOR_RED, bg),
'purple': s.c.getpair(curses.COLOR_MAGENTA, bg),
'cyan': s.c.getpair(curses.COLOR_CYAN, bg),
'green': s.c.getpair(curses.COLOR_GREEN, bg),
'orange': s.c.getpair(curses.COLOR_YELLOW, bg),
'yellow': curses.A_BOLD | s.c.getpair(curses.COLOR_YELLOW, bg),
'pink': curses.A_BOLD | s.c.getpair(curses.COLOR_RED, bg)}
#s.setcolor('gray')
s.setcolor('black')
def setcolor(self, color):
self.color = self.colormap[color]
self.colorname = color
self.display()
def display(self):
def lockedstuff():
if self.getcolor() == 'black':
self.window.addstr(self.y, self.x, ' ', self.color)
else:
self.window.addstr(self.y, self.x, '.', self.color)
self.c.stdscr.move(self.c.height - 1, self.c.width - 1)
self.window.refresh()
self.c.locked(lockedstuff)
def getcolor(self):
return self.colorname
def getcolorpair(self):
return self.color
def update(self, window, y, x):
self.window = window
self.y = y
self.x = x
self.display()
def setthread(self, newthread):
self.setcolor('black')
#if newthread:
# self.setcolor('gray')
#else:
# self.setcolor('black')
class InputHandler:
def __init__(s, util):
s.c = util
s.bgchar = None
s.inputlock = Lock()
s.lockheld = 0
s.statuslock = Lock()
s.startup = Event()
s.startthread()
def startthread(s):
s.thread = threadutil.ExitNotifyThread(target = s.bgreaderloop,
name = "InputHandler loop")
s.thread.setDaemon(1)
s.thread.start()
def bgreaderloop(s):
while 1:
s.statuslock.acquire()
if s.lockheld or s.bgchar == None:
s.statuslock.release()
s.startup.wait()
else:
s.statuslock.release()
ch = s.c.stdscr.getch()
s.statuslock.acquire()
try:
if s.lockheld or s.bgchar == None:
curses.ungetch(ch)
else:
s.bgchar(ch)
finally:
s.statuslock.release()
def set_bgchar(s, callback):
"""Sets a "background" character handler. If a key is pressed
while not doing anything else, it will be passed to this handler.
callback is a function taking a single arg -- the char pressed.
If callback is None, clears the request."""
s.statuslock.acquire()
oldhandler = s.bgchar
newhandler = callback
s.bgchar = callback
if oldhandler and not newhandler:
pass
if newhandler and not oldhandler:
s.startup.set()
s.statuslock.release()
def input_acquire(s):
"""Call this method when you want exclusive input control.
Make sure to call input_release afterwards!
"""
s.inputlock.acquire()
s.statuslock.acquire()
s.lockheld = 1
s.statuslock.release()
def input_release(s):
"""Call this method when you are done getting input."""
s.statuslock.acquire()
s.lockheld = 0
s.statuslock.release()
s.inputlock.release()
s.startup.set()
class Blinkenlights(BlinkenBase, UIBase):
def init_banner(s):
s.af = {}
s.aflock = Lock()
s.c = CursesUtil()
s.text = []
BlinkenBase.init_banner(s)
s.setupwindows()
s.inputhandler = InputHandler(s.c)
s.gettf().setcolor('red')
s._msg(version.banner)
s.inputhandler.set_bgchar(s.keypress)
signal.signal(signal.SIGWINCH, s.resizehandler)
s.resizelock = Lock()
s.resizecount = 0
def resizehandler(s, signum, frame):
s.resizeterm()
def resizeterm(s, dosleep = 1):
if not s.resizelock.acquire(0):
s.resizecount += 1
return
signal.signal(signal.SIGWINCH, signal.SIG_IGN)
s.aflock.acquire()
s.c.lock()
s.resizecount += 1
while s.resizecount:
s.c.reset()
s.setupwindows()
s.resizecount -= 1
s.c.unlock()
s.aflock.release()
s.resizelock.release()
signal.signal(signal.SIGWINCH, s.resizehandler)
if dosleep:
time.sleep(1)
s.resizeterm(0)
def isusable(s):
# Not a terminal? Can't use curses.
if not sys.stdout.isatty() and sys.stdin.isatty():
return 0
# No TERM specified? Can't use curses.
try:
if not len(os.environ['TERM']):
return 0
except: return 0
# ncurses doesn't want to start? Can't use curses.
# This test is nasty because initscr() actually EXITS on error.
# grr.
pid = os.fork()
if pid:
# parent
return not os.WEXITSTATUS(os.waitpid(pid, 0)[1])
else:
# child
curses.initscr()
curses.endwin()
# If we didn't die by here, indicate success.
sys.exit(0)
def keypress(s, key):
if key > 255:
return
if chr(key) == 'q':
# Request to quit.
s.terminate()
try:
index = acctkeys.index(chr(key))
except ValueError:
# Key not a valid one: exit.
return
if index >= len(s.hotkeys):
# Not in our list of valid hotkeys.
return
# Trying to end sleep somewhere.
s.getaccountframe(s.hotkeys[index]).syncnow()
def getpass(s, accountname, config, errmsg = None):
s.inputhandler.input_acquire()
# See comment on _msg for info on why both locks are obtained.
s.tflock.acquire()
s.c.lock()
try:
s.gettf().setcolor('white')
s._addline(" *** Input Required", s.gettf().getcolorpair())
s._addline(" *** Please enter password for account %s: " % accountname,
s.gettf().getcolorpair())
s.logwindow.refresh()
password = s.logwindow.getstr()
finally:
s.tflock.release()
s.c.unlock()
s.inputhandler.input_release()
return password
def setupwindows(s):
s.c.lock()
try:
s.bannerwindow = curses.newwin(1, s.c.width, 0, 0)
s.setupwindow_drawbanner()
s.logheight = s.c.height - 1 - len(s.af.keys())
s.logwindow = curses.newwin(s.logheight, s.c.width, 1, 0)
s.logwindow.idlok(1)
s.logwindow.scrollok(1)
s.logwindow.move(s.logheight - 1, 0)
s.setupwindow_drawlog()
accounts = s.af.keys()
accounts.sort()
accounts.reverse()
pos = s.c.height - 1
index = 0
s.hotkeys = []
for account in accounts:
accountwindow = curses.newwin(1, s.c.width, pos, 0)
s.af[account].setwindow(accountwindow, acctkeys[index])
s.hotkeys.append(account)
index += 1
pos -= 1
curses.doupdate()
finally:
s.c.unlock()
def setupwindow_drawbanner(s):
if s.c.has_color:
color = s.c.getpair(curses.COLOR_WHITE, curses.COLOR_BLUE) | \
curses.A_BOLD
else:
color = curses.A_REVERSE
s.bannerwindow.bkgd(' ', color) # Fill background with that color
s.bannerwindow.addstr("%s %s" % (version.productname,
version.versionstr))
s.bannerwindow.addstr(0, s.bannerwindow.getmaxyx()[1] - len(version.copyright) - 1,
version.copyright)
s.bannerwindow.noutrefresh()
def setupwindow_drawlog(s):
if s.c.has_color:
color = s.c.getpair(curses.COLOR_WHITE, curses.COLOR_BLACK)
else:
color = curses.A_NORMAL
s.logwindow.bkgd(' ', color)
for line, color in s.text:
s.logwindow.addstr("\n" + line, color)
s.logwindow.noutrefresh()
def getaccountframe(s, accountname = None):
if accountname == None:
accountname = s.getthreadaccount()
s.aflock.acquire()
try:
if accountname in s.af:
return s.af[accountname]
# New one.
s.af[accountname] = CursesAccountFrame(s.c, accountname)
s.c.lock()
try:
s.c.reset()
s.setupwindows()
finally:
s.c.unlock()
finally:
s.aflock.release()
return s.af[accountname]
def _display(s, msg, color = None):
if "\n" in msg:
for thisline in msg.split("\n"):
s._msg(thisline)
return
# We must acquire both locks. Otherwise, deadlock can result.
# This can happen if one thread calls _msg (locking curses, then
# tf) and another tries to set the color (locking tf, then curses)
#
# By locking both up-front here, in this order, we prevent deadlock.
s.tflock.acquire()
s.c.lock()
try:
if not s.c.isactive():
# For dumping out exceptions and stuff.
print msg
return
if color:
s.gettf().setcolor(color)
s._addline(msg, s.gettf().getcolorpair())
s.logwindow.refresh()
finally:
s.c.unlock()
s.tflock.release()
def _addline(s, msg, color):
s.c.lock()
try:
s.logwindow.addstr("\n" + msg, color)
s.text.append((msg, color))
while len(s.text) > s.logheight:
s.text = s.text[1:]
finally:
s.c.unlock()
def terminate(s, exitstatus = 0, errortitle = None, errormsg = None):
s.c.stop()
UIBase.terminate(s, exitstatus = exitstatus, errortitle = errortitle, errormsg = errormsg)
def threadException(s, thread):
s.c.stop()
UIBase.threadException(s, thread)
def mainException(s):
s.c.stop()
UIBase.mainException(s)
def sleep(s, sleepsecs):
s.gettf().setcolor('red')
s._msg("Next sync in %d:%02d" % (sleepsecs / 60, sleepsecs % 60))
BlinkenBase.sleep(s, sleepsecs)
if __name__ == '__main__':
x = Blinkenlights(None)
x.init_banner()
import time
time.sleep(5)
x.c.stop()
fgs = {'black': curses.COLOR_BLACK, 'red': curses.COLOR_RED,
'green': curses.COLOR_GREEN, 'yellow': curses.COLOR_YELLOW,
'blue': curses.COLOR_BLUE, 'magenta': curses.COLOR_MAGENTA,
'cyan': curses.COLOR_CYAN, 'white': curses.COLOR_WHITE}
x = CursesUtil()
win1 = curses.newwin(x.height, x.width / 4 - 1, 0, 0)
win1.addstr("Black/normal\n")
for name, fg in fgs.items():
win1.addstr("%s\n" % name, x.getpair(fg, curses.COLOR_BLACK))
win2 = curses.newwin(x.height, x.width / 4 - 1, 0, win1.getmaxyx()[1])
win2.addstr("Blue/normal\n")
for name, fg in fgs.items():
win2.addstr("%s\n" % name, x.getpair(fg, curses.COLOR_BLUE))
win3 = curses.newwin(x.height, x.width / 4 - 1, 0, win1.getmaxyx()[1] +
win2.getmaxyx()[1])
win3.addstr("Black/bright\n")
for name, fg in fgs.items():
win3.addstr("%s\n" % name, x.getpair(fg, curses.COLOR_BLACK) | \
curses.A_BOLD)
win4 = curses.newwin(x.height, x.width / 4 - 1, 0, win1.getmaxyx()[1] * 3)
win4.addstr("Blue/bright\n")
for name, fg in fgs.items():
win4.addstr("%s\n" % name, x.getpair(fg, curses.COLOR_BLUE) | \
curses.A_BOLD)
win1.refresh()
win2.refresh()
win3.refresh()
win4.refresh()
x.stdscr.refresh()
import time
time.sleep(5)
x.stop()
print x.has_color
print x.height
print x.width