Using standard imaplib2.py

This is the standard imaplib2 file.
This commit is contained in:
Rodolfo García Peñas (kix) 2020-08-28 11:48:16 +02:00
parent 75ce6e71f3
commit 476c485a52

View File

@ -57,26 +57,18 @@ __author__ = "Piers Lauder <piers@janeelix.com>"
__URL__ = "http://imaplib2.sourceforge.net"
__license__ = "Python License"
import binascii, calendar, errno, os, random, re, select, socket, sys, time, threading, zlib
import binascii, calendar, errno, os, queue, random, re, select, socket, sys, time, threading, zlib
if bytes != str:
# Python 3, but NB assumes strings in all I/O
# for backwards compatibility with python 2 usage.
import queue
string_types = str
else:
import queue as queue
string_types = str
select_module = select
# Globals
CRLF = '\r\n'
CRLF = b'\r\n'
IMAP4_PORT = 143
IMAP4_SSL_PORT = 993
IDLE_TIMEOUT_RESPONSE = '* IDLE TIMEOUT\r\n'
IDLE_TIMEOUT_RESPONSE = b'* IDLE TIMEOUT\r\n'
IDLE_TIMEOUT = 60*29 # Don't stay in IDLE state longer
READ_POLL_TIMEOUT = 30 # Without this timeout interrupted network connections can hang reader
READ_SIZE = 32768 # Consume all available in socket
@ -148,15 +140,15 @@ UID_direct = ('SEARCH', 'SORT', 'THREAD')
def Int2AP(num):
"""string = Int2AP(num)
Return 'num' converted to a string using characters from the set 'A'..'P'
Return 'num' converted to bytes using characters from the set 'A'..'P'
"""
val, a2p = [], 'ABCDEFGHIJKLMNOP'
val = b''; AP = b'ABCDEFGHIJKLMNOP'
num = int(abs(num))
while num:
num, mod = divmod(num, 16)
val.insert(0, a2p[mod])
return ''.join(val)
val = AP[mod:mod+1] + val
return val
@ -173,7 +165,7 @@ class Request(object):
else:
self.callback_arg = (self, cb_arg) # Self reference required in callback arg
self.tag = '%s%s' % (parent.tagpre, parent.tagnum)
self.tag = parent.tagpre + bytes(str(parent.tagnum), 'ASCII')
parent.tagnum += 1
self.ready = threading.Event()
@ -262,20 +254,10 @@ class IMAP4(object):
that state-changing commands will both block until previous commands
have completed, and block subsequent commands until they have finished.
All (non-callback) arguments to commands are converted to strings,
All (non-callback) string arguments to commands are converted to bytes,
except for AUTHENTICATE, and the last argument to APPEND which is
passed as an IMAP4 literal. If necessary (the string contains any
non-printing characters or white-space and isn't enclosed with
either parentheses or double or single quotes) each string is
quoted. However, the 'password' argument to the LOGIN command is
always quoted. If you want to avoid having an argument string
quoted (eg: the 'flags' argument to STORE) then enclose the string
in parentheses (eg: "(\Deleted)"). If you are using "sequence sets"
containing the wildcard character '*', then enclose the argument
in single quotes: the quotes will be removed and the resulting
string passed unquoted. Note also that you can pass in an argument
with a type that doesn't evaluate to 'string_types' (eg: 'bytearray')
and it will be converted to a string without quoting.
passed as an IMAP4 literal. NB: the 'password' argument to the LOGIN
command is always quoted.
There is one instance variable, 'state', that is useful for tracking
whether the client needs to login to the server. If it has the
@ -302,11 +284,10 @@ class IMAP4(object):
_literal = br'.*{(?P<size>\d+)}$'
_untagged_status = br'\* (?P<data>\d+) (?P<type>[A-Z-]+)( (?P<data2>.*))?'
continuation_cre = re.compile(r'\+( (?P<data>.*))?')
mapCRLF_cre = re.compile(r'\r\n|\r|\n')
mustquote_cre = re.compile(r"[^!#$&'+,./0-9:;<=>?@A-Z\[^_`a-z|}~-]")
response_code_cre = re.compile(r'\[(?P<type>[A-Z-]+)( (?P<data>[^\]]*))?\]')
untagged_response_cre = re.compile(r'\* (?P<type>[A-Z-]+)( (?P<data>.*))?')
continuation_cre = re.compile(br'\+( (?P<data>.*))?')
mapCRLF_cre = re.compile(br'\r\n|\r|\n')
response_code_cre = re.compile(br'\[(?P<type>[A-Z-]+)( (?P<data>[^\]]*))?\]')
untagged_response_cre = re.compile(br'\* (?P<type>[A-Z-]+)( (?P<data>.*))?')
def __init__(self, host=None, port=None, debug=None, debug_file=None, identifier=None, timeout=None, debug_buf_lvl=None):
@ -334,9 +315,9 @@ class IMAP4(object):
self.tagnum = 0
self.tagpre = Int2AP(random.randint(4096, 65535))
self.tagre = re.compile(r'(?P<tag>'
self.tagre = re.compile(br'(?P<tag>'
+ self.tagpre
+ r'\d+) (?P<type>[A-Z]+) ?(?P<data>.*)')
+ br'\d+) (?P<type>[A-Z]+) (?P<data>.*)', re.ASCII)
self._mode_ascii()
@ -401,10 +382,7 @@ class IMAP4(object):
else:
raise self.error('unrecognised server welcome message: %s' % repr(self.welcome))
typ, dat = self.capability()
if dat == [None]:
raise self.error('no CAPABILITY response from server')
self.capabilities = tuple(dat[-1].upper().split())
self._get_capabilities()
if __debug__: self._log(1, 'CAPABILITY: %r' % (self.capabilities,))
for version in AllowedVersions:
@ -426,26 +404,28 @@ class IMAP4(object):
raise AttributeError("Unknown IMAP4 command: '%s'" % attr)
def __enter__(self):
return self
def __exit__(self, *args):
try:
self.logout()
except OSError:
pass
def _mode_ascii(self):
self.utf8_enabled = False
self._encoding = 'ascii'
if bytes != str:
self.literal_cre = re.compile(self._literal, re.ASCII)
self.untagged_status_cre = re.compile(self._untagged_status, re.ASCII)
else:
self.literal_cre = re.compile(self._literal)
self.untagged_status_cre = re.compile(self._untagged_status)
self.literal_cre = re.compile(self._literal, re.ASCII)
self.untagged_status_cre = re.compile(self._untagged_status, re.ASCII)
def _mode_utf8(self):
self.utf8_enabled = True
self._encoding = 'utf-8'
if bytes != str:
self.literal_cre = re.compile(self._literal)
self.untagged_status_cre = re.compile(self._untagged_status)
else:
self.literal_cre = re.compile(self._literal, re.UNICODE)
self.untagged_status_cre = re.compile(self._untagged_status, re.UNICODE)
self.literal_cre = re.compile(self._literal)
self.untagged_status_cre = re.compile(self._untagged_status)
@ -469,34 +449,7 @@ class IMAP4(object):
"""open_socket()
Open socket choosing first address family available."""
msg = (-1, 'could not open socket')
for res in socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
try:
s = socket.socket(af, socktype, proto)
except socket.error as m:
msg = m
continue
try:
for i in (0, 1):
try:
s.connect(sa)
break
except socket.error as m:
msg = m
if len(msg.args) < 2 or msg.args[0] != errno.EINTR:
raise
else:
raise socket.error(msg)
except socket.error as m:
msg = m
s.close()
continue
break
else:
raise socket.error(msg)
return s
return socket.create_connection((self.host, self.port))
def ssl_wrap_socket(self):
@ -599,9 +552,6 @@ class IMAP4(object):
data = self.compressor.compress(data)
data += self.compressor.flush(zlib.Z_SYNC_FLUSH)
if bytes != str:
data = bytes(data, 'ASCII')
self.sock.sendall(data)
@ -706,6 +656,8 @@ class IMAP4(object):
date_time = Time2Internaldate(date_time)
else:
date_time = None
if isinstance(message, str):
message = bytes(message, 'ASCII')
literal = self.mapCRLF_cre.sub(CRLF, message)
if self.utf8_enabled:
literal = b'UTF8 (' + literal + b')'
@ -728,10 +680,11 @@ class IMAP4(object):
data = authobject(response)
It will be called to process server continuation responses.
It should return data that will be encoded and sent to server.
It should return None if the client abort response '*' should
be sent instead."""
It will be called to process server continuation responses,
the 'response' argument will be a 'bytes'. It should return
bytes that will be encoded and sent to server. It should
return None if the client abort response '*' should be sent
instead."""
self.literal = _Authenticator(authobject).process
try:
@ -970,7 +923,7 @@ class IMAP4(object):
def _CRAM_MD5_AUTH(self, challenge):
"""Authobject to use with CRAM-MD5 authentication."""
import hmac
pwd = (self.password.encode('ASCII') if isinstance(self.password, str)
pwd = (self.password.encode('utf-8') if isinstance(self.password, str)
else self.password)
return self.user + " " + hmac.HMAC(pwd, challenge, 'md5').hexdigest()
@ -1206,10 +1159,7 @@ class IMAP4(object):
self.rdth.setDaemon(True)
self.rdth.start()
typ, dat = self.capability()
if dat == [None]:
raise self.error('no CAPABILITY response from server')
self.capabilities = tuple(dat[-1].upper().split())
self._get_capabilities()
self._tls_established = True
@ -1305,7 +1255,7 @@ class IMAP4(object):
# Append new 'dat' to end of last untagged response if same 'typ',
# else append new response.
if dat is None: dat = ''
if dat is None: dat = b''
self.commands_lock.acquire()
@ -1324,38 +1274,19 @@ class IMAP4(object):
self.commands_lock.release()
if __debug__: self._log(5, 'untagged_responses[%s] %s += ["%.80s"]' % (typ, len(urd)-1, dat))
if __debug__: self._log(5, 'untagged_responses[%s] %s += ["%.80r"]' % (typ, len(urd)-1, dat))
def _check_bye(self):
bye = self._get_untagged_response('BYE', leave=True)
if bye:
if str != bytes:
raise self.abort(bye[-1].decode('ASCII', 'replace'))
else:
raise self.abort(bye[-1])
def _checkquote(self, arg):
# Must quote command args if "atom-specials" present,
# and not already quoted. NB: single quotes are removed.
if not isinstance(arg, string_types):
return arg
if len(arg) >= 2 and (arg[0],arg[-1]) in (('(',')'),('"','"')):
return arg
if len(arg) >= 2 and (arg[0],arg[-1]) in (("'","'"),):
return arg[1:-1]
if arg and self.mustquote_cre.search(arg) is None:
return arg
return self._quote(arg)
raise self.abort(bye[-1].decode('ASCII', 'replace'))
def _choose_nonull_or_dflt(self, dflt, *args):
if isinstance(dflt, string_types):
dflttyp = string_types # Allow any string type
if isinstance(dflt, str):
dflttyp = str # Allow any string type
else:
dflttyp = type(dflt)
for arg in args:
@ -1400,8 +1331,8 @@ class IMAP4(object):
if self.state not in Commands[name][CMD_VAL_STATES]:
self.literal = None
raise self.error('command %s illegal in state %s, only allowed in states %s'
% (name, self.state, ', '.join(Commands[name][CMD_VAL_STATES])))
raise self.error('command %s illegal in state %s'
% (name, self.state))
self._check_bye()
@ -1423,23 +1354,26 @@ class IMAP4(object):
rqb = self._request_push(name=name, **kw)
data = '%s %s' % (rqb.tag, name)
name = bytes(name, self._encoding)
data = rqb.tag + b' ' + name
for arg in args:
if arg is None: continue
data = '%s %s' % (data, self._checkquote(arg))
if isinstance(arg, str):
arg = bytes(arg, self._encoding)
data = data + b' ' + arg
literal = self.literal
if literal is not None:
self.literal = None
if isinstance(literal, string_types):
literator = None
data = '%s {%s}' % (data, len(literal))
else:
if type(literal) is type(self._command):
literator = literal
else:
literator = None
data = data + bytes(' {%s}' % len(literal), self._encoding)
if __debug__: self._log(4, 'data=%s' % data)
if __debug__: self._log(4, 'data=%r' % data)
rqb.data = '%s%s' % (data, CRLF)
rqb.data = data + CRLF
if literal is None:
self.ouq.put(rqb)
@ -1454,7 +1388,7 @@ class IMAP4(object):
# Wait for continuation response
ok, data = crqb.get_response('command: %s => %%s' % name)
if __debug__: self._log(4, 'continuation => %s, %s' % (ok, data))
if __debug__: self._log(4, 'continuation => %s, %r' % (ok, data))
# NO/BAD response?
@ -1477,7 +1411,7 @@ class IMAP4(object):
crqb = self._request_push(name=name, tag='continuation')
if __debug__: self._log(4, 'write literal size %s' % len(literal))
crqb.data = '%s%s' % (literal, CRLF)
crqb.data = literal + CRLF
self.ouq.put(crqb)
if literator is None:
@ -1514,10 +1448,7 @@ class IMAP4(object):
return
bye = self._get_untagged_response('BYE', leave=True)
if bye:
if str != bytes:
rqb.abort(self.abort, bye[-1].decode('ASCII', 'replace'))
else:
rqb.abort(self.abort, bye[-1])
rqb.abort(self.abort, bye[-1].decode('ASCII', 'replace'))
return
typ, dat = response
if typ == 'BAD':
@ -1554,11 +1485,20 @@ class IMAP4(object):
self.idle_rqb = None
self.idle_timeout = None
self.idle_lock.release()
irqb.data = 'DONE%s' % CRLF
irqb.data = bytes('DONE', 'ASCII') + CRLF
self.ouq.put(irqb)
if __debug__: self._log(2, 'server IDLE finished')
def _get_capabilities(self):
typ, dat = self.capability()
if dat == [None]:
raise self.error('no CAPABILITY response from server')
dat = str(dat[-1], "ASCII")
dat = dat.upper()
self.capabilities = tuple(dat.split())
def _get_untagged_response(self, name, leave=False):
self.commands_lock.acquire()
@ -1568,7 +1508,7 @@ class IMAP4(object):
if not leave:
del self.untagged_responses[i]
self.commands_lock.release()
if __debug__: self._log(5, '_get_untagged_response(%s) => %.80s' % (name, dat))
if __debug__: self._log(5, '_get_untagged_response(%s) => %.80r' % (name, dat))
return dat
self.commands_lock.release()
@ -1600,12 +1540,12 @@ class IMAP4(object):
if self._accumulated_data:
typ, dat = self._literal_expected
self._append_untagged(typ, (dat, ''.join(self._accumulated_data)))
self._append_untagged(typ, (dat, b''.join(self._accumulated_data)))
self._accumulated_data = []
# Protocol mandates all lines terminated by CRLF
resp = resp[:-2]
if __debug__: self._log(5, '_put_response(%s)' % resp)
if __debug__: self._log(5, '_put_response(%r)' % resp)
if 'continuation' in self.tagged_commands:
continuation_expected = True
@ -1629,12 +1569,12 @@ class IMAP4(object):
# Command completion response?
if self._match(self.tagre, resp):
tag = self.mo.group('tag')
typ = self.mo.group('type')
typ = str(self.mo.group('type'), 'ASCII')
dat = self.mo.group('data')
if typ in ('OK', 'NO', 'BAD') and self._match(self.response_code_cre, dat):
self._append_untagged(self.mo.group('type'), self.mo.group('data'))
self._append_untagged(str(self.mo.group('type'), 'ASCII'), self.mo.group('data'))
if not tag in self.tagged_commands:
if __debug__: self._log(1, 'unexpected tagged response: %s' % resp)
if __debug__: self._log(1, 'unexpected tagged response: %r' % resp)
else:
self._request_pop(tag, (typ, [dat]))
else:
@ -1651,18 +1591,18 @@ class IMAP4(object):
if self._match(self.continuation_cre, resp):
if not continuation_expected:
if __debug__: self._log(1, "unexpected continuation response: '%s'" % resp)
if __debug__: self._log(1, "unexpected continuation response: '%r'" % resp)
return
self._request_pop('continuation', (True, self.mo.group('data')))
return
if __debug__: self._log(1, "unexpected response: '%s'" % resp)
if __debug__: self._log(1, "unexpected response: '%r'" % resp)
return
typ = self.mo.group('type')
typ = str(self.mo.group('type'), 'ASCII')
dat = self.mo.group('data')
if dat is None: dat = '' # Null untagged response
if dat2: dat = dat + ' ' + dat2
if dat is None: dat = b'' # Null untagged response
if dat2: dat = dat + b' ' + dat2
# Is there a literal to come?
@ -1675,7 +1615,7 @@ class IMAP4(object):
self._append_untagged(typ, dat)
if typ in ('OK', 'NO', 'BAD') and self._match(self.response_code_cre, dat):
self._append_untagged(self.mo.group('type'), self.mo.group('data'))
self._append_untagged(str(self.mo.group('type'), 'ASCII'), self.mo.group('data'))
if typ != 'OK': # NO, BYE, IDLE
self._end_idle()
@ -1690,7 +1630,7 @@ class IMAP4(object):
if typ in ('NO', 'BAD', 'BYE'):
if typ == 'BYE':
self.Terminate = True
if __debug__: self._log(1, '%s response: %s' % (typ, dat))
if __debug__: self._log(1, '%s response: %r' % (typ, dat))
def _quote(self, arg):
@ -1715,7 +1655,7 @@ class IMAP4(object):
need_event = False
self.commands_lock.release()
if __debug__: self._log(4, '_request_pop(%s, %s) [%d] = %s' % (name, data, len(self.tagged_commands), rqb.tag))
if __debug__: self._log(4, '_request_pop(%s, %r) [%d] = %s' % (name, data, len(self.tagged_commands), rqb.tag))
rqb.deliver(data)
if need_event:
@ -1756,7 +1696,7 @@ class IMAP4(object):
if not dat:
break
data += dat
if __debug__: self._log(4, '_untagged_response(%s, ?, %s) => %.80s' % (typ, name, data))
if __debug__: self._log(4, '_untagged_response(%s, ?, %s) => %.80r' % (typ, name, data))
return typ, data
@ -1823,7 +1763,7 @@ class IMAP4(object):
if __debug__: self._log(1, 'inq None - terminating')
break
if not isinstance(line, string_types):
if not isinstance(line, bytes):
typ, val = line
break
@ -1873,10 +1813,7 @@ class IMAP4(object):
}
return ' '.join([PollErrors[s] for s in list(PollErrors.keys()) if (s & state)])
if bytes != str:
line_part = b''
else:
line_part = ''
line_part = b''
poll = select.poll()
@ -1913,23 +1850,14 @@ class IMAP4(object):
rxzero = 0
while True:
if bytes != str:
stop = data.find(b'\n', start)
if stop < 0:
line_part += data[start:]
break
stop += 1
line_part, start, line = \
b'', stop, (line_part + data[start:stop]).decode(errors='ignore')
else:
stop = data.find('\n', start)
if stop < 0:
line_part += data[start:]
break
stop += 1
line_part, start, line = \
'', stop, line_part + data[start:stop]
if __debug__: self._log(4, '< %s' % line)
stop = data.find(b'\n', start)
if stop < 0:
line_part += data[start:]
break
stop += 1
line_part, start, line = \
b'', stop, line_part + data[start:stop]
if __debug__: self._log(4, '< %r' % line)
self.inq.put(line)
if self.TerminateReader:
terminate = True
@ -1960,10 +1888,7 @@ class IMAP4(object):
if __debug__: self._log(1, 'starting using select')
if bytes != str:
line_part = b''
else:
line_part = ''
line_part = b''
rxzero = 0
terminate = False
@ -1992,23 +1917,14 @@ class IMAP4(object):
rxzero = 0
while True:
if bytes != str:
stop = data.find(b'\n', start)
if stop < 0:
line_part += data[start:]
break
stop += 1
line_part, start, line = \
b'', stop, (line_part + data[start:stop]).decode(errors='ignore')
else:
stop = data.find('\n', start)
if stop < 0:
line_part += data[start:]
break
stop += 1
line_part, start, line = \
'', stop, line_part + data[start:stop]
if __debug__: self._log(4, '< %s' % line)
stop = data.find(b'\n', start)
if stop < 0:
line_part += data[start:]
break
stop += 1
line_part, start, line = \
b'', stop, (line_part + data[start:stop]).decode(errors='ignore')
if __debug__: self._log(4, '< %r' % line)
self.inq.put(line)
if self.TerminateReader:
terminate = True
@ -2040,7 +1956,7 @@ class IMAP4(object):
try:
self.send(rqb.data)
if __debug__: self._log(4, '> %s' % rqb.data)
if __debug__: self._log(4, '> %r' % rqb.data)
except:
reason = 'socket error: %s - %s' % sys.exc_info()[:2]
if __debug__:
@ -2086,7 +2002,7 @@ class IMAP4(object):
return
t = '\n\t\t'
l = ['%s: "%s"' % (x[0], x[1][0] and '" "'.join(x[1]) or '') for x in l]
l = ['%s: "%s"' % (x[0], x[1][0] and b'" "'.join(x[1]) or '') for x in l]
self.debug_lock.acquire()
self._mesg('untagged responses dump:%s%s' % (t, t.join(l)))
self.debug_lock.release()
@ -2223,9 +2139,6 @@ class IMAP4_SSL(IMAP4):
data = self.compressor.compress(data)
data += self.compressor.flush(zlib.Z_SYNC_FLUSH)
if bytes != str:
data = bytes(data, 'utf8')
if hasattr(self.sock, "sendall"):
self.sock.sendall(data)
else:
@ -2310,9 +2223,6 @@ class IMAP4_stream(IMAP4):
data = self.compressor.compress(data)
data += self.compressor.flush(zlib.Z_SYNC_FLUSH)
if bytes != str:
data = bytes(data, 'utf8')
self.writefile.write(data)
self.writefile.flush()
@ -2322,6 +2232,7 @@ class IMAP4_stream(IMAP4):
self.readfile.close()
self.writefile.close()
self._P.wait()
class _Authenticator(object):
@ -2335,7 +2246,7 @@ class _Authenticator(object):
def process(self, data, rqb):
ret = self.mech(self.decode(data))
if ret is None:
return '*' # Abort conversation
return b'*' # Abort conversation
return self.encode(ret)
def encode(self, inp):
@ -2347,17 +2258,16 @@ class _Authenticator(object):
# so when it gets to the end of the 8-bit input
# there's no partial 6-bit output.
#
if bytes != str:
oup = b''
else:
oup = ''
oup = b''
if isinstance(inp, str):
inp = inp.encode('utf-8')
while inp:
if len(inp) > 48:
t = inp[:48]
inp = inp[48:]
else:
t = inp
inp = ''
inp = b''
e = binascii.b2a_base64(t)
if e:
oup = oup + e[:-1]
@ -2365,7 +2275,7 @@ class _Authenticator(object):
def decode(self, inp):
if not inp:
return ''
return b''
return binascii.a2b_base64(inp)
@ -2394,19 +2304,23 @@ class _IdleCont(object):
MonthNames = [None, 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
Mon2num = dict(list(zip((x for x in MonthNames[1:]), list(range(1, 13)))))
Mon2num = {s.encode():n+1 for n, s in enumerate(MonthNames[1:])}
InternalDate = re.compile(r'.*INTERNALDATE "'
r'(?P<day>[ 0123][0-9])-(?P<mon>[A-Z][a-z][a-z])-(?P<year>[0-9][0-9][0-9][0-9])'
r' (?P<hour>[0-9][0-9]):(?P<min>[0-9][0-9]):(?P<sec>[0-9][0-9])'
r' (?P<zonen>[-+])(?P<zoneh>[0-9][0-9])(?P<zonem>[0-9][0-9])'
r'"')
InternalDate = re.compile(br'.*INTERNALDATE "'
br'(?P<day>[ 0123][0-9])-(?P<mon>[A-Z][a-z][a-z])-(?P<year>[0-9][0-9][0-9][0-9])'
br' (?P<hour>[0-9][0-9]):(?P<min>[0-9][0-9]):(?P<sec>[0-9][0-9])'
br' (?P<zonen>[-+])(?P<zoneh>[0-9][0-9])(?P<zonem>[0-9][0-9])'
br'"')
def Internaldate2Time(resp):
"""time_tuple = Internaldate2Time(resp)
Convert IMAP4 INTERNALDATE to UT."""
Parse an IMAP4 INTERNALDATE string.
Return corresponding local time. The return value is a
time.struct_time instance or None if the string has wrong format."""
mo = InternalDate.match(resp)
if not mo:
@ -2426,23 +2340,11 @@ def Internaldate2Time(resp):
# INTERNALDATE timezone must be subtracted to get UT
zone = (zoneh*60 + zonem)*60
if zonen == '-':
if zonen == b'-':
zone = -zone
tt = (year, mon, day, hour, min, sec, -1, -1, -1)
utc = time.mktime(tt)
# Following is necessary because the time module has no 'mkgmtime'.
# 'mktime' assumes arg in local timezone, so adds timezone/altzone.
lt = time.localtime(utc)
if time.daylight and lt[-1]:
zone = zone + time.altzone
else:
zone = zone + time.timezone
return time.localtime(utc - zone)
return time.localtime(calendar.timegm(tt) - zone)
Internaldate2tuple = Internaldate2Time # (Backward compatible)
@ -2451,28 +2353,48 @@ Internaldate2tuple = Internaldate2Time # (Backward compatible)
def Time2Internaldate(date_time):
"""'"DD-Mmm-YYYY HH:MM:SS +HHMM"' = Time2Internaldate(date_time)
Convert 'date_time' to IMAP4 INTERNALDATE representation."""
Convert 'date_time' to IMAP4 INTERNALDATE representation.
The date_time argument can be a number (int or float) representing
seconds since epoch (as returned by time.time()), a 9-tuple
representing local time, an instance of time.struct_time (as
returned by time.localtime()), an aware datetime instance or a
double-quoted string. In the last case, it is assumed to already
be in the correct format."""
from datetime import datetime, timezone, timedelta
if isinstance(date_time, (int, float)):
tt = time.localtime(date_time)
elif isinstance(date_time, (tuple, time.struct_time)):
tt = date_time
elif isinstance(date_time, tuple):
try:
gmtoff = date_time.tm_gmtoff
except AttributeError:
if time.daylight:
dst = date_time[8]
if dst == -1:
dst = time.localtime(time.mktime(date_time))[8]
gmtoff = -(time.timezone, time.altzone)[dst]
else:
gmtoff = -time.timezone
delta = timedelta(seconds=gmtoff)
dt = datetime(*date_time[:6], tzinfo=timezone(delta))
elif isinstance(date_time, datetime):
if date_time.tzinfo is None:
raise ValueError("date_time must be aware")
dt = date_time
elif isinstance(date_time, str) and (date_time[0],date_time[-1]) == ('"','"'):
return date_time # Assume in correct format
else:
raise ValueError("date_time not of a known type")
if time.daylight and tt[-1]:
zone = -time.altzone
else:
zone = -time.timezone
return ('"%2d-%s-%04d %02d:%02d:%02d %+03d%02d"' %
((tt[2], MonthNames[tt[1]], tt[0]) + tt[3:6] +
divmod(zone//60, 60)))
fmt = '"%d-{}-%Y %H:%M:%S %z"'.format(MonthNames[dt.month])
return dt.strftime(fmt)
FLAGS_cre = re.compile(r'.*FLAGS \((?P<flags>[^\)]*)\)')
FLAGS_cre = re.compile(br'.*FLAGS \((?P<flags>[^\)]*)\)')
def ParseFlags(resp):
@ -2538,15 +2460,15 @@ if __name__ == '__main__':
test_seq1 = [
('list', ('""', '""')),
('list', ('""', '%')),
('list', ('""', '"%"')),
('create', ('imaplib2_test0',)),
('rename', ('imaplib2_test0', 'imaplib2_test1')),
('CREATE', ('imaplib2_test2',)),
('append', ('imaplib2_test2', None, None, test_mesg)),
('list', ('', 'imaplib2_test%')),
('list', ('""', '"imaplib2_test%"')),
('select', ('imaplib2_test2',)),
('search', (None, 'SUBJECT', 'IMAP4 test')),
('fetch', ("'1:*'", '(FLAGS INTERNALDATE RFC822)')),
('search', (None, 'SUBJECT', '"IMAP4 test"')),
('fetch', ('1:*', '(FLAGS INTERNALDATE RFC822)')),
('store', ('1', 'FLAGS', '(\Deleted)')),
('namespace', ()),
('expunge', ()),
@ -2561,10 +2483,10 @@ if __name__ == '__main__':
('append', (None, None, None, test_mesg)),
('examine', ()),
('select', ()),
('fetch', ("'1:*'", '(FLAGS UID)')),
('fetch', ('1:*', '(FLAGS UID)')),
('examine', ()),
('select', ()),
('uid', ('SEARCH', 'SUBJECT', 'IMAP4 test')),
('uid', ('SEARCH', 'SUBJECT', '"IMAP4 test"')),
('uid', ('SEARCH', 'ALL')),
('uid', ('THREAD', 'references', 'UTF-8', '(SEEN)')),
('recent', ()),
@ -2631,8 +2553,8 @@ if __name__ == '__main__':
for cmd,args in test_seq1:
run(cmd, args)
for ml in run('list', ('', 'imaplib2_test%'), cb=False):
mo = re.match(r'.*"([^"]+)"$', ml)
for ml in run('list', ('""', '"imaplib2_test%"'), cb=False):
mo = re.match(br'.*"([^"]+)"$', ml)
if mo: path = mo.group(1)
else: path = ml.split()[-1]
run('delete', (path,))