1844cefd17
It looks like I accidentally recorded the wrong version of Curses.py -- originally this code was there, but I moved it over to UIBase so it would cover the TTY UI also.
590 lines
18 KiB
Python
590 lines
18 KiB
Python
# Curses-based interfaces
|
|
# Copyright (C) 2003 John Goerzen
|
|
# <jgoerzen@complete.org>
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, write to the Free Software
|
|
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
|
|
|
|
from Blinkenlights import BlinkenBase
|
|
from UIBase import UIBase
|
|
from threading import *
|
|
import thread, time, sys, os, signal, time
|
|
from offlineimap import version, threadutil
|
|
from offlineimap.threadutil import MultiLock
|
|
|
|
import curses, curses.panel, curses.textpad, curses.wrapper
|
|
|
|
acctkeys = '1234567890abcdefghijklmnoprstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-=;/.,'
|
|
|
|
class CursesUtil:
|
|
def __init__(self):
|
|
self.pairlock = Lock()
|
|
self.iolock = MultiLock()
|
|
self.start()
|
|
|
|
def initpairs(self):
|
|
self.pairlock.acquire()
|
|
try:
|
|
self.pairs = {self._getpairindex(curses.COLOR_WHITE,
|
|
curses.COLOR_BLACK): 0}
|
|
self.nextpair = 1
|
|
finally:
|
|
self.pairlock.release()
|
|
|
|
def lock(self):
|
|
self.iolock.acquire()
|
|
|
|
def unlock(self):
|
|
self.iolock.release()
|
|
|
|
def locked(self, target, *args, **kwargs):
|
|
"""Perform an operation with full locking."""
|
|
self.lock()
|
|
try:
|
|
apply(target, args, kwargs)
|
|
finally:
|
|
self.unlock()
|
|
|
|
def refresh(self):
|
|
def lockedstuff():
|
|
curses.panel.update_panels()
|
|
curses.doupdate()
|
|
self.locked(lockedstuff)
|
|
|
|
def isactive(self):
|
|
return hasattr(self, 'stdscr')
|
|
|
|
def _getpairindex(self, fg, bg):
|
|
return '%d/%d' % (fg,bg)
|
|
|
|
def getpair(self, fg, bg):
|
|
if not self.has_color:
|
|
return 0
|
|
pindex = self._getpairindex(fg, bg)
|
|
self.pairlock.acquire()
|
|
try:
|
|
if self.pairs.has_key(pindex):
|
|
return curses.color_pair(self.pairs[pindex])
|
|
else:
|
|
self.pairs[pindex] = self.nextpair
|
|
curses.init_pair(self.nextpair, fg, bg)
|
|
self.nextpair += 1
|
|
return curses.color_pair(self.nextpair - 1)
|
|
finally:
|
|
self.pairlock.release()
|
|
|
|
def start(self):
|
|
self.stdscr = curses.initscr()
|
|
curses.noecho()
|
|
curses.cbreak()
|
|
self.stdscr.keypad(1)
|
|
try:
|
|
curses.start_color()
|
|
self.has_color = curses.has_colors()
|
|
except:
|
|
self.has_color = 0
|
|
|
|
self.oldcursor = None
|
|
try:
|
|
self.oldcursor = curses.curs_set(0)
|
|
except:
|
|
pass
|
|
|
|
self.stdscr.clear()
|
|
self.stdscr.refresh()
|
|
(self.height, self.width) = self.stdscr.getmaxyx()
|
|
self.initpairs()
|
|
|
|
def stop(self):
|
|
if not hasattr(self, 'stdscr'):
|
|
return
|
|
#self.stdscr.addstr(self.height - 1, 0, "\n",
|
|
# self.getpair(curses.COLOR_WHITE,
|
|
# curses.COLOR_BLACK))
|
|
if self.oldcursor != None:
|
|
curses.curs_set(self.oldcursor)
|
|
self.stdscr.refresh()
|
|
self.stdscr.keypad(0)
|
|
curses.nocbreak()
|
|
curses.echo()
|
|
curses.endwin()
|
|
del self.stdscr
|
|
|
|
def reset(self):
|
|
self.stop()
|
|
self.start()
|
|
|
|
class CursesAccountFrame:
|
|
def __init__(s, master, accountname):
|
|
s.c = master
|
|
s.children = []
|
|
s.accountname = accountname
|
|
|
|
def drawleadstr(s, secs = None):
|
|
if secs == None:
|
|
acctstr = '%s: [active] %13.13s: ' % (s.key, s.accountname)
|
|
else:
|
|
acctstr = '%s: [%3d:%02d] %13.13s: ' % (s.key,
|
|
secs / 60, secs % 60,
|
|
s.accountname)
|
|
s.c.locked(s.window.addstr, 0, 0, acctstr)
|
|
s.location = len(acctstr)
|
|
|
|
def setwindow(s, window, key):
|
|
s.window = window
|
|
s.key = key
|
|
s.drawleadstr()
|
|
for child in s.children:
|
|
child.update(window, 0, s.location)
|
|
s.location += 1
|
|
|
|
def getnewthreadframe(s):
|
|
tf = CursesThreadFrame(s.c, s.window, 0, s.location)
|
|
s.location += 1
|
|
s.children.append(tf)
|
|
return tf
|
|
|
|
def startsleep(s, sleepsecs):
|
|
s.sleeping_abort = 0
|
|
|
|
def sleeping(s, sleepsecs, remainingsecs):
|
|
if remainingsecs:
|
|
s.c.lock()
|
|
try:
|
|
s.drawleadstr(remainingsecs)
|
|
s.window.refresh()
|
|
finally:
|
|
s.c.unlock()
|
|
time.sleep(sleepsecs)
|
|
else:
|
|
s.c.lock()
|
|
try:
|
|
s.drawleadstr()
|
|
s.window.refresh()
|
|
finally:
|
|
s.c.unlock()
|
|
return s.sleeping_abort
|
|
|
|
def syncnow(s):
|
|
s.sleeping_abort = 1
|
|
|
|
class CursesThreadFrame:
|
|
def __init__(s, master, window, y, x):
|
|
"""master should be a CursesUtil object."""
|
|
s.c = master
|
|
s.window = window
|
|
s.x = x
|
|
s.y = y
|
|
s.colors = []
|
|
bg = curses.COLOR_BLACK
|
|
s.colormap = {'black': s.c.getpair(curses.COLOR_BLACK, bg),
|
|
'gray': s.c.getpair(curses.COLOR_WHITE, bg),
|
|
'white': curses.A_BOLD | s.c.getpair(curses.COLOR_WHITE, bg),
|
|
'blue': s.c.getpair(curses.COLOR_BLUE, bg),
|
|
'red': s.c.getpair(curses.COLOR_RED, bg),
|
|
'purple': s.c.getpair(curses.COLOR_MAGENTA, bg),
|
|
'cyan': s.c.getpair(curses.COLOR_CYAN, bg),
|
|
'green': s.c.getpair(curses.COLOR_GREEN, bg),
|
|
'orange': s.c.getpair(curses.COLOR_YELLOW, bg),
|
|
'yellow': curses.A_BOLD | s.c.getpair(curses.COLOR_YELLOW, bg),
|
|
'pink': curses.A_BOLD | s.c.getpair(curses.COLOR_RED, bg)}
|
|
#s.setcolor('gray')
|
|
s.setcolor('black')
|
|
|
|
def setcolor(self, color):
|
|
self.color = self.colormap[color]
|
|
self.colorname = color
|
|
self.display()
|
|
|
|
def display(self):
|
|
def lockedstuff():
|
|
if self.getcolor() == 'black':
|
|
self.window.addstr(self.y, self.x, ' ', self.color)
|
|
else:
|
|
self.window.addstr(self.y, self.x, '.', self.color)
|
|
self.c.stdscr.move(self.c.height - 1, self.c.width - 1)
|
|
self.window.refresh()
|
|
self.c.locked(lockedstuff)
|
|
|
|
def getcolor(self):
|
|
return self.colorname
|
|
|
|
def getcolorpair(self):
|
|
return self.color
|
|
|
|
def update(self, window, y, x):
|
|
self.window = window
|
|
self.y = y
|
|
self.x = x
|
|
self.display()
|
|
|
|
def setthread(self, newthread):
|
|
self.setcolor('black')
|
|
#if newthread:
|
|
# self.setcolor('gray')
|
|
#else:
|
|
# self.setcolor('black')
|
|
|
|
class InputHandler:
|
|
def __init__(s, util):
|
|
s.c = util
|
|
s.bgchar = None
|
|
s.inputlock = Lock()
|
|
s.lockheld = 0
|
|
s.statuslock = Lock()
|
|
s.startup = Event()
|
|
s.startthread()
|
|
|
|
def startthread(s):
|
|
s.thread = threadutil.ExitNotifyThread(target = s.bgreaderloop,
|
|
name = "InputHandler loop")
|
|
s.thread.setDaemon(1)
|
|
s.thread.start()
|
|
|
|
def bgreaderloop(s):
|
|
while 1:
|
|
s.statuslock.acquire()
|
|
if s.lockheld or s.bgchar == None:
|
|
s.statuslock.release()
|
|
s.startup.wait()
|
|
else:
|
|
s.statuslock.release()
|
|
ch = s.c.stdscr.getch()
|
|
s.statuslock.acquire()
|
|
try:
|
|
if s.lockheld or s.bgchar == None:
|
|
curses.ungetch(ch)
|
|
else:
|
|
s.bgchar(ch)
|
|
finally:
|
|
s.statuslock.release()
|
|
|
|
def set_bgchar(s, callback):
|
|
"""Sets a "background" character handler. If a key is pressed
|
|
while not doing anything else, it will be passed to this handler.
|
|
|
|
callback is a function taking a single arg -- the char pressed.
|
|
|
|
If callback is None, clears the request."""
|
|
s.statuslock.acquire()
|
|
oldhandler = s.bgchar
|
|
newhandler = callback
|
|
s.bgchar = callback
|
|
|
|
if oldhandler and not newhandler:
|
|
pass
|
|
if newhandler and not oldhandler:
|
|
s.startup.set()
|
|
|
|
s.statuslock.release()
|
|
|
|
def input_acquire(s):
|
|
"""Call this method when you want exclusive input control.
|
|
Make sure to call input_release afterwards!
|
|
"""
|
|
|
|
s.inputlock.acquire()
|
|
s.statuslock.acquire()
|
|
s.lockheld = 1
|
|
s.statuslock.release()
|
|
|
|
def input_release(s):
|
|
"""Call this method when you are done getting input."""
|
|
s.statuslock.acquire()
|
|
s.lockheld = 0
|
|
s.statuslock.release()
|
|
s.inputlock.release()
|
|
s.startup.set()
|
|
|
|
class Blinkenlights(BlinkenBase, UIBase):
|
|
def init_banner(s):
|
|
s.af = {}
|
|
s.aflock = Lock()
|
|
s.c = CursesUtil()
|
|
s.text = []
|
|
BlinkenBase.init_banner(s)
|
|
s.setupwindows()
|
|
s.inputhandler = InputHandler(s.c)
|
|
s.gettf().setcolor('red')
|
|
s._msg(version.banner)
|
|
s.inputhandler.set_bgchar(s.keypress)
|
|
signal.signal(signal.SIGWINCH, s.resizehandler)
|
|
s.resizelock = Lock()
|
|
s.resizecount = 0
|
|
|
|
def resizehandler(s, signum, frame):
|
|
s.resizeterm()
|
|
|
|
def resizeterm(s, dosleep = 1):
|
|
if not s.resizelock.acquire(0):
|
|
s.resizecount += 1
|
|
return
|
|
signal.signal(signal.SIGWINCH, signal.SIG_IGN)
|
|
s.aflock.acquire()
|
|
s.c.lock()
|
|
s.resizecount += 1
|
|
while s.resizecount:
|
|
s.c.reset()
|
|
s.setupwindows()
|
|
s.resizecount -= 1
|
|
s.c.unlock()
|
|
s.aflock.release()
|
|
s.resizelock.release()
|
|
signal.signal(signal.SIGWINCH, s.resizehandler)
|
|
if dosleep:
|
|
time.sleep(1)
|
|
s.resizeterm(0)
|
|
|
|
def isusable(s):
|
|
# Not a terminal? Can't use curses.
|
|
if not sys.stdout.isatty() and sys.stdin.isatty():
|
|
return 0
|
|
|
|
# No TERM specified? Can't use curses.
|
|
try:
|
|
if not len(os.environ['TERM']):
|
|
return 0
|
|
except: return 0
|
|
|
|
# ncurses doesn't want to start? Can't use curses.
|
|
# This test is nasty because initscr() actually EXITS on error.
|
|
# grr.
|
|
|
|
pid = os.fork()
|
|
if pid:
|
|
# parent
|
|
return not os.WEXITSTATUS(os.waitpid(pid, 0)[1])
|
|
else:
|
|
# child
|
|
curses.initscr()
|
|
curses.endwin()
|
|
# If we didn't die by here, indicate success.
|
|
sys.exit(0)
|
|
|
|
def keypress(s, key):
|
|
if key > 255:
|
|
return
|
|
|
|
if chr(key) == 'q':
|
|
# Request to quit.
|
|
s.terminate()
|
|
|
|
try:
|
|
index = acctkeys.index(chr(key))
|
|
except ValueError:
|
|
# Key not a valid one: exit.
|
|
return
|
|
|
|
if index >= len(s.hotkeys):
|
|
# Not in our list of valid hotkeys.
|
|
return
|
|
|
|
# Trying to end sleep somewhere.
|
|
|
|
s.getaccountframe(s.hotkeys[index]).syncnow()
|
|
|
|
def getpass(s, accountname, config, errmsg = None):
|
|
s.inputhandler.input_acquire()
|
|
|
|
# See comment on _msg for info on why both locks are obtained.
|
|
|
|
s.tflock.acquire()
|
|
s.c.lock()
|
|
try:
|
|
s.gettf().setcolor('white')
|
|
s._addline(" *** Input Required", s.gettf().getcolorpair())
|
|
s._addline(" *** Please enter password for account %s: " % accountname,
|
|
s.gettf().getcolorpair())
|
|
s.logwindow.refresh()
|
|
password = s.logwindow.getstr()
|
|
finally:
|
|
s.tflock.release()
|
|
s.c.unlock()
|
|
s.inputhandler.input_release()
|
|
return password
|
|
|
|
def setupwindows(s):
|
|
s.c.lock()
|
|
try:
|
|
s.bannerwindow = curses.newwin(1, s.c.width, 0, 0)
|
|
s.setupwindow_drawbanner()
|
|
s.logheight = s.c.height - 1 - len(s.af.keys())
|
|
s.logwindow = curses.newwin(s.logheight, s.c.width, 1, 0)
|
|
s.logwindow.idlok(1)
|
|
s.logwindow.scrollok(1)
|
|
s.logwindow.move(s.logheight - 1, 0)
|
|
s.setupwindow_drawlog()
|
|
accounts = s.af.keys()
|
|
accounts.sort()
|
|
accounts.reverse()
|
|
|
|
pos = s.c.height - 1
|
|
index = 0
|
|
s.hotkeys = []
|
|
for account in accounts:
|
|
accountwindow = curses.newwin(1, s.c.width, pos, 0)
|
|
s.af[account].setwindow(accountwindow, acctkeys[index])
|
|
s.hotkeys.append(account)
|
|
index += 1
|
|
pos -= 1
|
|
|
|
curses.doupdate()
|
|
finally:
|
|
s.c.unlock()
|
|
|
|
def setupwindow_drawbanner(s):
|
|
if s.c.has_color:
|
|
color = s.c.getpair(curses.COLOR_WHITE, curses.COLOR_BLUE) | \
|
|
curses.A_BOLD
|
|
else:
|
|
color = curses.A_REVERSE
|
|
s.bannerwindow.bkgd(' ', color) # Fill background with that color
|
|
s.bannerwindow.addstr("%s %s" % (version.productname,
|
|
version.versionstr))
|
|
s.bannerwindow.addstr(0, s.bannerwindow.getmaxyx()[1] - len(version.copyright) - 1,
|
|
version.copyright)
|
|
|
|
s.bannerwindow.noutrefresh()
|
|
|
|
def setupwindow_drawlog(s):
|
|
if s.c.has_color:
|
|
color = s.c.getpair(curses.COLOR_WHITE, curses.COLOR_BLACK)
|
|
else:
|
|
color = curses.A_NORMAL
|
|
s.logwindow.bkgd(' ', color)
|
|
for line, color in s.text:
|
|
s.logwindow.addstr("\n" + line, color)
|
|
s.logwindow.noutrefresh()
|
|
|
|
def getaccountframe(s, accountname = None):
|
|
if accountname == None:
|
|
accountname = s.getthreadaccount()
|
|
s.aflock.acquire()
|
|
try:
|
|
if accountname in s.af:
|
|
return s.af[accountname]
|
|
|
|
# New one.
|
|
s.af[accountname] = CursesAccountFrame(s.c, accountname)
|
|
s.c.lock()
|
|
try:
|
|
s.c.reset()
|
|
s.setupwindows()
|
|
finally:
|
|
s.c.unlock()
|
|
finally:
|
|
s.aflock.release()
|
|
return s.af[accountname]
|
|
|
|
|
|
def _display(s, msg, color = None):
|
|
if "\n" in msg:
|
|
for thisline in msg.split("\n"):
|
|
s._msg(thisline)
|
|
return
|
|
|
|
# We must acquire both locks. Otherwise, deadlock can result.
|
|
# This can happen if one thread calls _msg (locking curses, then
|
|
# tf) and another tries to set the color (locking tf, then curses)
|
|
#
|
|
# By locking both up-front here, in this order, we prevent deadlock.
|
|
|
|
s.tflock.acquire()
|
|
s.c.lock()
|
|
try:
|
|
if not s.c.isactive():
|
|
# For dumping out exceptions and stuff.
|
|
print msg
|
|
return
|
|
if color:
|
|
s.gettf().setcolor(color)
|
|
s._addline(msg, s.gettf().getcolorpair())
|
|
s.logwindow.refresh()
|
|
finally:
|
|
s.c.unlock()
|
|
s.tflock.release()
|
|
|
|
def _addline(s, msg, color):
|
|
s.c.lock()
|
|
try:
|
|
s.logwindow.addstr("\n" + msg, color)
|
|
s.text.append((msg, color))
|
|
while len(s.text) > s.logheight:
|
|
s.text = s.text[1:]
|
|
finally:
|
|
s.c.unlock()
|
|
|
|
def terminate(s, exitstatus = 0, errortitle = None, errormsg = None):
|
|
s.c.stop()
|
|
UIBase.terminate(s, exitstatus = exitstatus, errortitle = errortitle, errormsg = errormsg)
|
|
|
|
def threadException(s, thread):
|
|
s.c.stop()
|
|
UIBase.threadException(s, thread)
|
|
|
|
def mainException(s):
|
|
s.c.stop()
|
|
UIBase.mainException(s)
|
|
|
|
def sleep(s, sleepsecs):
|
|
s.gettf().setcolor('red')
|
|
s._msg("Next sync in %d:%02d" % (sleepsecs / 60, sleepsecs % 60))
|
|
BlinkenBase.sleep(s, sleepsecs)
|
|
|
|
if __name__ == '__main__':
|
|
x = Blinkenlights(None)
|
|
x.init_banner()
|
|
import time
|
|
time.sleep(5)
|
|
x.c.stop()
|
|
fgs = {'black': curses.COLOR_BLACK, 'red': curses.COLOR_RED,
|
|
'green': curses.COLOR_GREEN, 'yellow': curses.COLOR_YELLOW,
|
|
'blue': curses.COLOR_BLUE, 'magenta': curses.COLOR_MAGENTA,
|
|
'cyan': curses.COLOR_CYAN, 'white': curses.COLOR_WHITE}
|
|
|
|
x = CursesUtil()
|
|
win1 = curses.newwin(x.height, x.width / 4 - 1, 0, 0)
|
|
win1.addstr("Black/normal\n")
|
|
for name, fg in fgs.items():
|
|
win1.addstr("%s\n" % name, x.getpair(fg, curses.COLOR_BLACK))
|
|
win2 = curses.newwin(x.height, x.width / 4 - 1, 0, win1.getmaxyx()[1])
|
|
win2.addstr("Blue/normal\n")
|
|
for name, fg in fgs.items():
|
|
win2.addstr("%s\n" % name, x.getpair(fg, curses.COLOR_BLUE))
|
|
win3 = curses.newwin(x.height, x.width / 4 - 1, 0, win1.getmaxyx()[1] +
|
|
win2.getmaxyx()[1])
|
|
win3.addstr("Black/bright\n")
|
|
for name, fg in fgs.items():
|
|
win3.addstr("%s\n" % name, x.getpair(fg, curses.COLOR_BLACK) | \
|
|
curses.A_BOLD)
|
|
win4 = curses.newwin(x.height, x.width / 4 - 1, 0, win1.getmaxyx()[1] * 3)
|
|
win4.addstr("Blue/bright\n")
|
|
for name, fg in fgs.items():
|
|
win4.addstr("%s\n" % name, x.getpair(fg, curses.COLOR_BLUE) | \
|
|
curses.A_BOLD)
|
|
|
|
|
|
win1.refresh()
|
|
win2.refresh()
|
|
win3.refresh()
|
|
win4.refresh()
|
|
x.stdscr.refresh()
|
|
import time
|
|
time.sleep(5)
|
|
x.stop()
|
|
print x.has_color
|
|
print x.height
|
|
print x.width
|
|
|