Implement SSL certificate checking

Previously, we did not check at all the authenticy and validity of
the SSL server we connected to. This is bad as it allows
man-in-the-middle attacks etc. This patch remedies the situation
somewhat.

If we specify a sslcacertfile= setting in the Repository section,
validate the server cert (on python>=2.6 or abort with python<=2.5).

As before, no certificate check is performed without that option.
In the future, the hostname check should be made optional and also
a mutt-lick "accept this certificate forever" thing should be
implemented.

Signed-off-by: Sebastian Spaeth <Sebastian@SSpaeth.de>
Signed-off-by: Nicolas Sebrecht <nicolas.s-dev@laposte.net>
This commit is contained in:
Sebastian 2010-12-16 12:43:47 +00:00 committed by Nicolas Sebrecht
parent 219eb8c47f
commit 4f57b94e23
4 changed files with 134 additions and 57 deletions

View File

@ -278,6 +278,12 @@ ssl = yes
# SSL Client key (optional) # SSL Client key (optional)
# sslclientkey = /path/to/file.key # sslclientkey = /path/to/file.key
# SSL CA Cert(s) to verify the server cert against (optional).
# No SSL verification is done without this option, if it is
# specified, the CA Cert(s) need to verify the Server cert AND
# match the hostname (* wildcard allowed on the left hand side)
# sslcacertcertfile = /path/to/cacertfile.crt
# Specify the port. If not specified, use a default port. # Specify the port. If not specified, use a default port.
# remoteport = 993 # remoteport = 993

View File

@ -1,7 +1,6 @@
# imaplib utilities # imaplib utilities
# Copyright (C) 2002-2007 John Goerzen # Copyright (C) 2002-2007 John Goerzen <jgoerzen@complete.org>
# <jgoerzen@complete.org> # 2010 Sebastian Spaeth <Sebastian@SSpaeth.de>
#
# This program is free software; you can redistribute it and/or modify # This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by # it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or # the Free Software Foundation; either version 2 of the License, or
@ -25,9 +24,9 @@ from imaplib import IMAP4_PORT, IMAP4_SSL_PORT, InternalDate, Mon2num
try: try:
import ssl import ssl
ssl_wrap = ssl.wrap_socket
except ImportError: except ImportError:
ssl_wrap = socket.ssl #fails on python <2.6
pass
class IMAP4_Tunnel(IMAP4): class IMAP4_Tunnel(IMAP4):
"""IMAP4 client class over a tunnel """IMAP4 client class over a tunnel
@ -62,45 +61,7 @@ class IMAP4_Tunnel(IMAP4):
self.infd.close() self.infd.close()
self.outfd.close() self.outfd.close()
self.process.wait() self.process.wait()
class sslwrapper:
def __init__(self, sslsock):
self.sslsock = sslsock
self.readbuf = ''
def write(self, s):
return self.sslsock.write(s)
def _read(self, n):
return self.sslsock.read(n)
def read(self, n):
if len(self.readbuf):
# Return the stuff in readbuf, even if less than n.
# It might contain the rest of the line, and if we try to
# read more, might block waiting for data that is not
# coming to arrive.
bytesfrombuf = min(n, len(self.readbuf))
retval = self.readbuf[:bytesfrombuf]
self.readbuf = self.readbuf[bytesfrombuf:]
return retval
retval = self._read(n)
if len(retval) > n:
self.readbuf = retval[n:]
return retval[:n]
return retval
def readline(self):
retval = ''
while 1:
linebuf = self.read(1024)
nlindex = linebuf.find("\n")
if nlindex != -1:
retval += linebuf[:nlindex + 1]
self.readbuf = linebuf[nlindex + 1:] + self.readbuf
return retval
else:
retval += linebuf
def new_mesg(self, s, secs=None): def new_mesg(self, s, secs=None):
if secs is None: if secs is None:
@ -109,14 +70,27 @@ def new_mesg(self, s, secs=None):
UIBase.getglobalui().debug('imap', ' %s.%02d %s' % (tm, (secs*100)%100, s)) UIBase.getglobalui().debug('imap', ' %s.%02d %s' % (tm, (secs*100)%100, s))
class WrappedIMAP4_SSL(IMAP4_SSL): class WrappedIMAP4_SSL(IMAP4_SSL):
"""Provides an improved version of the standard IMAP4_SSL
It provides a better readline() implementation as impaplib's
readline() is extremly inefficient. It can also connect to IPv6
addresses."""
def __init__(self, *args, **kwargs):
self._readbuf = ''
self._cacertfile = kwargs.get('cacertfile', None)
if kwargs.has_key('cacertfile'):
del kwargs['cacertfile']
IMAP4_SSL.__init__(self, *args, **kwargs)
def open(self, host = '', port = IMAP4_SSL_PORT): def open(self, host = '', port = IMAP4_SSL_PORT):
"""Setup connection to remote server on "host:port". """Do whatever IMAP4_SSL would do in open, but call sslwrap
(default: localhost:standard IMAP4 SSL port). with cert verification"""
This connection will be used by the routines: #IMAP4_SSL.open(self, host, port) uses the below 2 lines:
read, readline, send, shutdown.
"""
self.host = host self.host = host
self.port = port self.port = port
#rather than just self.sock = socket.create_connection((host, port))
#we use the below part to be able to connect to ipv6 addresses too
#This connects to the first ip found ipv4/ipv6 #This connects to the first ip found ipv4/ipv6
#Added by Adriaan Peeters <apeeters@lashout.net> based on a socket #Added by Adriaan Peeters <apeeters@lashout.net> based on a socket
#example from the python documentation: #example from the python documentation:
@ -136,11 +110,102 @@ class WrappedIMAP4_SSL(IMAP4_SSL):
if last_error != 0: if last_error != 0:
# FIXME # FIXME
raise socket.error(last_error) raise socket.error(last_error)
self.sslobj = ssl_wrap(self.sock, self.keyfile, self.certfile)
self.sslobj = sslwrapper(self.sslobj) #connected to socket, now wrap it in SSL
try:
if self._cacertfile:
requirecert = ssl.CERT_REQUIRED
else:
requirecert = ssl.CERT_NONE
self.sslobj = ssl.wrap_socket(self.sock, self.keyfile,
self.certfile,
ca_certs = self._cacertfile,
cert_reqs = requirecert)
except NameError:
#Python 2.4/2.5 don't have the ssl module, we need to
#socket.ssl() here but that doesn't allow cert
#verification!!!
if self._cacertfile:
#user configured a CA certificate, but python 2.4/5 doesn't
#allow us to easily check it. So bail out here.
raise Exception("SSL CA Certificates cannot be checked with python <=2.6. Abort")
self.sslobj = socket.ssl(self.sock, self.keyfile,
self.certfile)
else:
#ssl.wrap_socket worked and cert is verified, now check
#that hostnames also match.
error = self._verifycert(self.sslobj.getpeercert(), host)
if error:
raise ssl.SSLError("SSL Certificate host name mismatch: %s" % error)
#TODO: Done for now. We should implement a mutt-like behavior
#that offers the users to accept a certificate (presenting a
#fingerprint of it) (get via self.sslobj.getpeercert()), and
#save that, and compare on future connects, rather than having
#to trust what the CA certs say.
def _verifycert(self, cert, hostname):
'''Verify that cert (in socket.getpeercert() format) matches hostname.
CRLs and subjectAltName are not handled.
Returns error message if any problems are found and None on success.
'''
if not cert:
return ('no certificate received')
dnsname = hostname.lower()
for s in cert.get('subject', []):
key, value = s[0]
if key == 'commonName':
certname = value.lower()
if (certname == dnsname or
'.' in dnsname and certname == '*.' + dnsname.split('.', 1)[1]):
return None
return ('certificate is for %s') % certname
return ('no commonName found in certificate')
def _read_upto (self, n):
"""Read up to n bytes, emptying existing _readbuffer first"""
bytesfrombuf = min(n, len(self._readbuf))
if bytesfrombuf:
# Return the stuff in readbuf, even if less than n.
# It might contain the rest of the line, and if we try to
# read more, might block waiting for data that is not
# coming to arrive.
retval = self._readbuf[:bytesfrombuf]
self._readbuf = self._readbuf[bytesfrombuf:]
return retval
return self.sslobj.read(min(n, 16384))
def read(self, n):
"""Read exactly n bytes
As done in IMAP4_SSL.read() API. If read returns less than n
bytes, things break left and right."""
chunks = []
read = 0
while read < n:
data = self._read_upto (n-read)
read += len(data)
chunks.append(data)
return ''.join(chunks)
def readline(self): def readline(self):
return self.sslobj.readline() """Get the next line. This implementation is more efficient
than IMAP4_SSL.readline() which reads one char at a time and
reassembles the string by appending those chars. Uggh."""
retval = ''
while 1:
linebuf = self._read_upto(1024)
nlindex = linebuf.find("\n")
if nlindex != -1:
retval += linebuf[:nlindex + 1]
self._readbuf = linebuf[nlindex + 1:] + self._readbuf
return retval
else:
retval += linebuf
class WrappedIMAP4(IMAP4): class WrappedIMAP4(IMAP4):
@ -149,9 +214,8 @@ class WrappedIMAP4(IMAP4):
def open(self, host = '', port = IMAP4_PORT): def open(self, host = '', port = IMAP4_PORT):
"""Setup connection to remote server on "host:port" """Setup connection to remote server on "host:port"
(default: localhost:standard IMAP4 port). (default: localhost:standard IMAP4 port).
This connection will be used by the routines:
read, readline, send, shutdown.
""" """
#self.host and self.port are needed by the parent IMAP4 class
self.host = host self.host = host
self.port = port self.port = port
res = socket.getaddrinfo(host, port, socket.AF_UNSPEC, res = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
@ -174,7 +238,6 @@ class WrappedIMAP4(IMAP4):
raise socket.error(last_error) raise socket.error(last_error)
self.file = self.sock.makefile('rb') self.file = self.sock.makefile('rb')
mustquote = re.compile(r"[^\w!#$%&'+,.:;<=>?^`|~-]") mustquote = re.compile(r"[^\w!#$%&'+,.:;<=>?^`|~-]")
def Internaldate2epoch(resp): def Internaldate2epoch(resp):

View File

@ -100,7 +100,8 @@ class IMAPServer:
def __init__(self, config, reposname, def __init__(self, config, reposname,
username = None, password = None, hostname = None, username = None, password = None, hostname = None,
port = None, ssl = 1, maxconnections = 1, tunnel = None, port = None, ssl = 1, maxconnections = 1, tunnel = None,
reference = '""', sslclientcert = None, sslclientkey = None): reference = '""', sslclientcert = None, sslclientkey = None,
sslcacertfile= None):
self.reposname = reposname self.reposname = reposname
self.config = config self.config = config
self.username = username self.username = username
@ -113,6 +114,7 @@ class IMAPServer:
self.usessl = ssl self.usessl = ssl
self.sslclientcert = sslclientcert self.sslclientcert = sslclientcert
self.sslclientkey = sslclientkey self.sslclientkey = sslclientkey
self.sslcacertfile = sslcacertfile
self.delim = None self.delim = None
self.root = None self.root = None
if port == None: if port == None:
@ -253,7 +255,8 @@ class IMAPServer:
elif self.usessl: elif self.usessl:
UIBase.getglobalui().connecting(self.hostname, self.port) UIBase.getglobalui().connecting(self.hostname, self.port)
imapobj = UsefulIMAP4_SSL(self.hostname, self.port, imapobj = UsefulIMAP4_SSL(self.hostname, self.port,
self.sslclientkey, self.sslclientcert) self.sslclientkey, self.sslclientcert,
cacertfile = self.sslcacertfile)
else: else:
UIBase.getglobalui().connecting(self.hostname, self.port) UIBase.getglobalui().connecting(self.hostname, self.port)
imapobj = UsefulIMAP4(self.hostname, self.port) imapobj = UsefulIMAP4(self.hostname, self.port)
@ -414,6 +417,7 @@ class ConfigedIMAPServer(IMAPServer):
ssl = self.repos.getssl() ssl = self.repos.getssl()
sslclientcert = self.repos.getsslclientcert() sslclientcert = self.repos.getsslclientcert()
sslclientkey = self.repos.getsslclientkey() sslclientkey = self.repos.getsslclientkey()
sslcacertfile = self.repos.getsslcacertfile()
reference = self.repos.getreference() reference = self.repos.getreference()
server = None server = None
password = None password = None
@ -435,4 +439,5 @@ class ConfigedIMAPServer(IMAPServer):
self.repos.getmaxconnections(), self.repos.getmaxconnections(),
reference = reference, reference = reference,
sslclientcert = sslclientcert, sslclientcert = sslclientcert,
sslclientkey = sslclientkey) sslclientkey = sslclientkey,
sslcacertfile = sslcacertfile)

View File

@ -139,6 +139,9 @@ class IMAPRepository(BaseRepository):
def getsslclientkey(self): def getsslclientkey(self):
return self.getconf('sslclientkey', None) return self.getconf('sslclientkey', None)
def getsslcacertfile(self):
return self.getconf('sslcacertfile', None)
def getpreauthtunnel(self): def getpreauthtunnel(self):
return self.getconf('preauthtunnel', None) return self.getconf('preauthtunnel', None)