diff --git a/offlineimap/bundled_imaplib2.py b/offlineimap/bundled_imaplib2.py index 2657d2f..77c3eb0 100644 --- a/offlineimap/bundled_imaplib2.py +++ b/offlineimap/bundled_imaplib2.py @@ -57,26 +57,18 @@ __author__ = "Piers Lauder " __URL__ = "http://imaplib2.sourceforge.net" __license__ = "Python License" -import binascii, calendar, errno, os, random, re, select, socket, sys, time, threading, zlib +import binascii, calendar, errno, os, queue, random, re, select, socket, sys, time, threading, zlib -if bytes != str: - # Python 3, but NB assumes strings in all I/O - # for backwards compatibility with python 2 usage. - import queue - string_types = str -else: - import queue as queue - string_types = str select_module = select # Globals -CRLF = '\r\n' +CRLF = b'\r\n' IMAP4_PORT = 143 IMAP4_SSL_PORT = 993 -IDLE_TIMEOUT_RESPONSE = '* IDLE TIMEOUT\r\n' +IDLE_TIMEOUT_RESPONSE = b'* IDLE TIMEOUT\r\n' IDLE_TIMEOUT = 60*29 # Don't stay in IDLE state longer READ_POLL_TIMEOUT = 30 # Without this timeout interrupted network connections can hang reader READ_SIZE = 32768 # Consume all available in socket @@ -148,15 +140,15 @@ UID_direct = ('SEARCH', 'SORT', 'THREAD') def Int2AP(num): """string = Int2AP(num) - Return 'num' converted to a string using characters from the set 'A'..'P' + Return 'num' converted to bytes using characters from the set 'A'..'P' """ - val, a2p = [], 'ABCDEFGHIJKLMNOP' + val = b''; AP = b'ABCDEFGHIJKLMNOP' num = int(abs(num)) while num: num, mod = divmod(num, 16) - val.insert(0, a2p[mod]) - return ''.join(val) + val = AP[mod:mod+1] + val + return val @@ -173,7 +165,7 @@ class Request(object): else: self.callback_arg = (self, cb_arg) # Self reference required in callback arg - self.tag = '%s%s' % (parent.tagpre, parent.tagnum) + self.tag = parent.tagpre + bytes(str(parent.tagnum), 'ASCII') parent.tagnum += 1 self.ready = threading.Event() @@ -262,20 +254,10 @@ class IMAP4(object): that state-changing commands will both block until previous commands have completed, and block subsequent commands until they have finished. - All (non-callback) arguments to commands are converted to strings, + All (non-callback) string arguments to commands are converted to bytes, except for AUTHENTICATE, and the last argument to APPEND which is - passed as an IMAP4 literal. If necessary (the string contains any - non-printing characters or white-space and isn't enclosed with - either parentheses or double or single quotes) each string is - quoted. However, the 'password' argument to the LOGIN command is - always quoted. If you want to avoid having an argument string - quoted (eg: the 'flags' argument to STORE) then enclose the string - in parentheses (eg: "(\Deleted)"). If you are using "sequence sets" - containing the wildcard character '*', then enclose the argument - in single quotes: the quotes will be removed and the resulting - string passed unquoted. Note also that you can pass in an argument - with a type that doesn't evaluate to 'string_types' (eg: 'bytearray') - and it will be converted to a string without quoting. + passed as an IMAP4 literal. NB: the 'password' argument to the LOGIN + command is always quoted. There is one instance variable, 'state', that is useful for tracking whether the client needs to login to the server. If it has the @@ -302,11 +284,10 @@ class IMAP4(object): _literal = br'.*{(?P\d+)}$' _untagged_status = br'\* (?P\d+) (?P[A-Z-]+)( (?P.*))?' - continuation_cre = re.compile(r'\+( (?P.*))?') - mapCRLF_cre = re.compile(r'\r\n|\r|\n') - mustquote_cre = re.compile(r"[^!#$&'+,./0-9:;<=>?@A-Z\[^_`a-z|}~-]") - response_code_cre = re.compile(r'\[(?P[A-Z-]+)( (?P[^\]]*))?\]') - untagged_response_cre = re.compile(r'\* (?P[A-Z-]+)( (?P.*))?') + continuation_cre = re.compile(br'\+( (?P.*))?') + mapCRLF_cre = re.compile(br'\r\n|\r|\n') + response_code_cre = re.compile(br'\[(?P[A-Z-]+)( (?P[^\]]*))?\]') + untagged_response_cre = re.compile(br'\* (?P[A-Z-]+)( (?P.*))?') def __init__(self, host=None, port=None, debug=None, debug_file=None, identifier=None, timeout=None, debug_buf_lvl=None): @@ -334,9 +315,9 @@ class IMAP4(object): self.tagnum = 0 self.tagpre = Int2AP(random.randint(4096, 65535)) - self.tagre = re.compile(r'(?P' + self.tagre = re.compile(br'(?P' + self.tagpre - + r'\d+) (?P[A-Z]+) ?(?P.*)') + + br'\d+) (?P[A-Z]+) (?P.*)', re.ASCII) self._mode_ascii() @@ -401,10 +382,7 @@ class IMAP4(object): else: raise self.error('unrecognised server welcome message: %s' % repr(self.welcome)) - typ, dat = self.capability() - if dat == [None]: - raise self.error('no CAPABILITY response from server') - self.capabilities = tuple(dat[-1].upper().split()) + self._get_capabilities() if __debug__: self._log(1, 'CAPABILITY: %r' % (self.capabilities,)) for version in AllowedVersions: @@ -426,26 +404,28 @@ class IMAP4(object): raise AttributeError("Unknown IMAP4 command: '%s'" % attr) + def __enter__(self): + return self + + def __exit__(self, *args): + try: + self.logout() + except OSError: + pass + + def _mode_ascii(self): self.utf8_enabled = False self._encoding = 'ascii' - if bytes != str: - self.literal_cre = re.compile(self._literal, re.ASCII) - self.untagged_status_cre = re.compile(self._untagged_status, re.ASCII) - else: - self.literal_cre = re.compile(self._literal) - self.untagged_status_cre = re.compile(self._untagged_status) + self.literal_cre = re.compile(self._literal, re.ASCII) + self.untagged_status_cre = re.compile(self._untagged_status, re.ASCII) def _mode_utf8(self): self.utf8_enabled = True self._encoding = 'utf-8' - if bytes != str: - self.literal_cre = re.compile(self._literal) - self.untagged_status_cre = re.compile(self._untagged_status) - else: - self.literal_cre = re.compile(self._literal, re.UNICODE) - self.untagged_status_cre = re.compile(self._untagged_status, re.UNICODE) + self.literal_cre = re.compile(self._literal) + self.untagged_status_cre = re.compile(self._untagged_status) @@ -469,34 +449,7 @@ class IMAP4(object): """open_socket() Open socket choosing first address family available.""" - msg = (-1, 'could not open socket') - for res in socket.getaddrinfo(self.host, self.port, socket.AF_UNSPEC, socket.SOCK_STREAM): - af, socktype, proto, canonname, sa = res - try: - s = socket.socket(af, socktype, proto) - except socket.error as m: - msg = m - continue - try: - for i in (0, 1): - try: - s.connect(sa) - break - except socket.error as m: - msg = m - if len(msg.args) < 2 or msg.args[0] != errno.EINTR: - raise - else: - raise socket.error(msg) - except socket.error as m: - msg = m - s.close() - continue - break - else: - raise socket.error(msg) - - return s + return socket.create_connection((self.host, self.port)) def ssl_wrap_socket(self): @@ -599,9 +552,6 @@ class IMAP4(object): data = self.compressor.compress(data) data += self.compressor.flush(zlib.Z_SYNC_FLUSH) - if bytes != str: - data = bytes(data, 'ASCII') - self.sock.sendall(data) @@ -706,6 +656,8 @@ class IMAP4(object): date_time = Time2Internaldate(date_time) else: date_time = None + if isinstance(message, str): + message = bytes(message, 'ASCII') literal = self.mapCRLF_cre.sub(CRLF, message) if self.utf8_enabled: literal = b'UTF8 (' + literal + b')' @@ -728,10 +680,11 @@ class IMAP4(object): data = authobject(response) - It will be called to process server continuation responses. - It should return data that will be encoded and sent to server. - It should return None if the client abort response '*' should - be sent instead.""" + It will be called to process server continuation responses, + the 'response' argument will be a 'bytes'. It should return + bytes that will be encoded and sent to server. It should + return None if the client abort response '*' should be sent + instead.""" self.literal = _Authenticator(authobject).process try: @@ -970,7 +923,7 @@ class IMAP4(object): def _CRAM_MD5_AUTH(self, challenge): """Authobject to use with CRAM-MD5 authentication.""" import hmac - pwd = (self.password.encode('ASCII') if isinstance(self.password, str) + pwd = (self.password.encode('utf-8') if isinstance(self.password, str) else self.password) return self.user + " " + hmac.HMAC(pwd, challenge, 'md5').hexdigest() @@ -1206,10 +1159,7 @@ class IMAP4(object): self.rdth.setDaemon(True) self.rdth.start() - typ, dat = self.capability() - if dat == [None]: - raise self.error('no CAPABILITY response from server') - self.capabilities = tuple(dat[-1].upper().split()) + self._get_capabilities() self._tls_established = True @@ -1305,7 +1255,7 @@ class IMAP4(object): # Append new 'dat' to end of last untagged response if same 'typ', # else append new response. - if dat is None: dat = '' + if dat is None: dat = b'' self.commands_lock.acquire() @@ -1324,38 +1274,19 @@ class IMAP4(object): self.commands_lock.release() - if __debug__: self._log(5, 'untagged_responses[%s] %s += ["%.80s"]' % (typ, len(urd)-1, dat)) + if __debug__: self._log(5, 'untagged_responses[%s] %s += ["%.80r"]' % (typ, len(urd)-1, dat)) def _check_bye(self): bye = self._get_untagged_response('BYE', leave=True) if bye: - if str != bytes: - raise self.abort(bye[-1].decode('ASCII', 'replace')) - else: - raise self.abort(bye[-1]) - - - def _checkquote(self, arg): - - # Must quote command args if "atom-specials" present, - # and not already quoted. NB: single quotes are removed. - - if not isinstance(arg, string_types): - return arg - if len(arg) >= 2 and (arg[0],arg[-1]) in (('(',')'),('"','"')): - return arg - if len(arg) >= 2 and (arg[0],arg[-1]) in (("'","'"),): - return arg[1:-1] - if arg and self.mustquote_cre.search(arg) is None: - return arg - return self._quote(arg) + raise self.abort(bye[-1].decode('ASCII', 'replace')) def _choose_nonull_or_dflt(self, dflt, *args): - if isinstance(dflt, string_types): - dflttyp = string_types # Allow any string type + if isinstance(dflt, str): + dflttyp = str # Allow any string type else: dflttyp = type(dflt) for arg in args: @@ -1400,8 +1331,8 @@ class IMAP4(object): if self.state not in Commands[name][CMD_VAL_STATES]: self.literal = None - raise self.error('command %s illegal in state %s, only allowed in states %s' - % (name, self.state, ', '.join(Commands[name][CMD_VAL_STATES]))) + raise self.error('command %s illegal in state %s' + % (name, self.state)) self._check_bye() @@ -1423,23 +1354,26 @@ class IMAP4(object): rqb = self._request_push(name=name, **kw) - data = '%s %s' % (rqb.tag, name) + name = bytes(name, self._encoding) + data = rqb.tag + b' ' + name for arg in args: if arg is None: continue - data = '%s %s' % (data, self._checkquote(arg)) + if isinstance(arg, str): + arg = bytes(arg, self._encoding) + data = data + b' ' + arg literal = self.literal if literal is not None: self.literal = None - if isinstance(literal, string_types): - literator = None - data = '%s {%s}' % (data, len(literal)) - else: + if type(literal) is type(self._command): literator = literal + else: + literator = None + data = data + bytes(' {%s}' % len(literal), self._encoding) - if __debug__: self._log(4, 'data=%s' % data) + if __debug__: self._log(4, 'data=%r' % data) - rqb.data = '%s%s' % (data, CRLF) + rqb.data = data + CRLF if literal is None: self.ouq.put(rqb) @@ -1454,7 +1388,7 @@ class IMAP4(object): # Wait for continuation response ok, data = crqb.get_response('command: %s => %%s' % name) - if __debug__: self._log(4, 'continuation => %s, %s' % (ok, data)) + if __debug__: self._log(4, 'continuation => %s, %r' % (ok, data)) # NO/BAD response? @@ -1477,7 +1411,7 @@ class IMAP4(object): crqb = self._request_push(name=name, tag='continuation') if __debug__: self._log(4, 'write literal size %s' % len(literal)) - crqb.data = '%s%s' % (literal, CRLF) + crqb.data = literal + CRLF self.ouq.put(crqb) if literator is None: @@ -1514,10 +1448,7 @@ class IMAP4(object): return bye = self._get_untagged_response('BYE', leave=True) if bye: - if str != bytes: - rqb.abort(self.abort, bye[-1].decode('ASCII', 'replace')) - else: - rqb.abort(self.abort, bye[-1]) + rqb.abort(self.abort, bye[-1].decode('ASCII', 'replace')) return typ, dat = response if typ == 'BAD': @@ -1554,11 +1485,20 @@ class IMAP4(object): self.idle_rqb = None self.idle_timeout = None self.idle_lock.release() - irqb.data = 'DONE%s' % CRLF + irqb.data = bytes('DONE', 'ASCII') + CRLF self.ouq.put(irqb) if __debug__: self._log(2, 'server IDLE finished') + def _get_capabilities(self): + typ, dat = self.capability() + if dat == [None]: + raise self.error('no CAPABILITY response from server') + dat = str(dat[-1], "ASCII") + dat = dat.upper() + self.capabilities = tuple(dat.split()) + + def _get_untagged_response(self, name, leave=False): self.commands_lock.acquire() @@ -1568,7 +1508,7 @@ class IMAP4(object): if not leave: del self.untagged_responses[i] self.commands_lock.release() - if __debug__: self._log(5, '_get_untagged_response(%s) => %.80s' % (name, dat)) + if __debug__: self._log(5, '_get_untagged_response(%s) => %.80r' % (name, dat)) return dat self.commands_lock.release() @@ -1600,12 +1540,12 @@ class IMAP4(object): if self._accumulated_data: typ, dat = self._literal_expected - self._append_untagged(typ, (dat, ''.join(self._accumulated_data))) + self._append_untagged(typ, (dat, b''.join(self._accumulated_data))) self._accumulated_data = [] # Protocol mandates all lines terminated by CRLF resp = resp[:-2] - if __debug__: self._log(5, '_put_response(%s)' % resp) + if __debug__: self._log(5, '_put_response(%r)' % resp) if 'continuation' in self.tagged_commands: continuation_expected = True @@ -1629,12 +1569,12 @@ class IMAP4(object): # Command completion response? if self._match(self.tagre, resp): tag = self.mo.group('tag') - typ = self.mo.group('type') + typ = str(self.mo.group('type'), 'ASCII') dat = self.mo.group('data') if typ in ('OK', 'NO', 'BAD') and self._match(self.response_code_cre, dat): - self._append_untagged(self.mo.group('type'), self.mo.group('data')) + self._append_untagged(str(self.mo.group('type'), 'ASCII'), self.mo.group('data')) if not tag in self.tagged_commands: - if __debug__: self._log(1, 'unexpected tagged response: %s' % resp) + if __debug__: self._log(1, 'unexpected tagged response: %r' % resp) else: self._request_pop(tag, (typ, [dat])) else: @@ -1651,18 +1591,18 @@ class IMAP4(object): if self._match(self.continuation_cre, resp): if not continuation_expected: - if __debug__: self._log(1, "unexpected continuation response: '%s'" % resp) + if __debug__: self._log(1, "unexpected continuation response: '%r'" % resp) return self._request_pop('continuation', (True, self.mo.group('data'))) return - if __debug__: self._log(1, "unexpected response: '%s'" % resp) + if __debug__: self._log(1, "unexpected response: '%r'" % resp) return - typ = self.mo.group('type') + typ = str(self.mo.group('type'), 'ASCII') dat = self.mo.group('data') - if dat is None: dat = '' # Null untagged response - if dat2: dat = dat + ' ' + dat2 + if dat is None: dat = b'' # Null untagged response + if dat2: dat = dat + b' ' + dat2 # Is there a literal to come? @@ -1675,7 +1615,7 @@ class IMAP4(object): self._append_untagged(typ, dat) if typ in ('OK', 'NO', 'BAD') and self._match(self.response_code_cre, dat): - self._append_untagged(self.mo.group('type'), self.mo.group('data')) + self._append_untagged(str(self.mo.group('type'), 'ASCII'), self.mo.group('data')) if typ != 'OK': # NO, BYE, IDLE self._end_idle() @@ -1690,7 +1630,7 @@ class IMAP4(object): if typ in ('NO', 'BAD', 'BYE'): if typ == 'BYE': self.Terminate = True - if __debug__: self._log(1, '%s response: %s' % (typ, dat)) + if __debug__: self._log(1, '%s response: %r' % (typ, dat)) def _quote(self, arg): @@ -1715,7 +1655,7 @@ class IMAP4(object): need_event = False self.commands_lock.release() - if __debug__: self._log(4, '_request_pop(%s, %s) [%d] = %s' % (name, data, len(self.tagged_commands), rqb.tag)) + if __debug__: self._log(4, '_request_pop(%s, %r) [%d] = %s' % (name, data, len(self.tagged_commands), rqb.tag)) rqb.deliver(data) if need_event: @@ -1756,7 +1696,7 @@ class IMAP4(object): if not dat: break data += dat - if __debug__: self._log(4, '_untagged_response(%s, ?, %s) => %.80s' % (typ, name, data)) + if __debug__: self._log(4, '_untagged_response(%s, ?, %s) => %.80r' % (typ, name, data)) return typ, data @@ -1823,7 +1763,7 @@ class IMAP4(object): if __debug__: self._log(1, 'inq None - terminating') break - if not isinstance(line, string_types): + if not isinstance(line, bytes): typ, val = line break @@ -1873,10 +1813,7 @@ class IMAP4(object): } return ' '.join([PollErrors[s] for s in list(PollErrors.keys()) if (s & state)]) - if bytes != str: - line_part = b'' - else: - line_part = '' + line_part = b'' poll = select.poll() @@ -1913,23 +1850,14 @@ class IMAP4(object): rxzero = 0 while True: - if bytes != str: - stop = data.find(b'\n', start) - if stop < 0: - line_part += data[start:] - break - stop += 1 - line_part, start, line = \ - b'', stop, (line_part + data[start:stop]).decode(errors='ignore') - else: - stop = data.find('\n', start) - if stop < 0: - line_part += data[start:] - break - stop += 1 - line_part, start, line = \ - '', stop, line_part + data[start:stop] - if __debug__: self._log(4, '< %s' % line) + stop = data.find(b'\n', start) + if stop < 0: + line_part += data[start:] + break + stop += 1 + line_part, start, line = \ + b'', stop, line_part + data[start:stop] + if __debug__: self._log(4, '< %r' % line) self.inq.put(line) if self.TerminateReader: terminate = True @@ -1960,10 +1888,7 @@ class IMAP4(object): if __debug__: self._log(1, 'starting using select') - if bytes != str: - line_part = b'' - else: - line_part = '' + line_part = b'' rxzero = 0 terminate = False @@ -1992,23 +1917,14 @@ class IMAP4(object): rxzero = 0 while True: - if bytes != str: - stop = data.find(b'\n', start) - if stop < 0: - line_part += data[start:] - break - stop += 1 - line_part, start, line = \ - b'', stop, (line_part + data[start:stop]).decode(errors='ignore') - else: - stop = data.find('\n', start) - if stop < 0: - line_part += data[start:] - break - stop += 1 - line_part, start, line = \ - '', stop, line_part + data[start:stop] - if __debug__: self._log(4, '< %s' % line) + stop = data.find(b'\n', start) + if stop < 0: + line_part += data[start:] + break + stop += 1 + line_part, start, line = \ + b'', stop, (line_part + data[start:stop]).decode(errors='ignore') + if __debug__: self._log(4, '< %r' % line) self.inq.put(line) if self.TerminateReader: terminate = True @@ -2040,7 +1956,7 @@ class IMAP4(object): try: self.send(rqb.data) - if __debug__: self._log(4, '> %s' % rqb.data) + if __debug__: self._log(4, '> %r' % rqb.data) except: reason = 'socket error: %s - %s' % sys.exc_info()[:2] if __debug__: @@ -2086,7 +2002,7 @@ class IMAP4(object): return t = '\n\t\t' - l = ['%s: "%s"' % (x[0], x[1][0] and '" "'.join(x[1]) or '') for x in l] + l = ['%s: "%s"' % (x[0], x[1][0] and b'" "'.join(x[1]) or '') for x in l] self.debug_lock.acquire() self._mesg('untagged responses dump:%s%s' % (t, t.join(l))) self.debug_lock.release() @@ -2223,9 +2139,6 @@ class IMAP4_SSL(IMAP4): data = self.compressor.compress(data) data += self.compressor.flush(zlib.Z_SYNC_FLUSH) - if bytes != str: - data = bytes(data, 'utf8') - if hasattr(self.sock, "sendall"): self.sock.sendall(data) else: @@ -2310,9 +2223,6 @@ class IMAP4_stream(IMAP4): data = self.compressor.compress(data) data += self.compressor.flush(zlib.Z_SYNC_FLUSH) - if bytes != str: - data = bytes(data, 'utf8') - self.writefile.write(data) self.writefile.flush() @@ -2322,6 +2232,7 @@ class IMAP4_stream(IMAP4): self.readfile.close() self.writefile.close() + self._P.wait() class _Authenticator(object): @@ -2335,7 +2246,7 @@ class _Authenticator(object): def process(self, data, rqb): ret = self.mech(self.decode(data)) if ret is None: - return '*' # Abort conversation + return b'*' # Abort conversation return self.encode(ret) def encode(self, inp): @@ -2347,17 +2258,16 @@ class _Authenticator(object): # so when it gets to the end of the 8-bit input # there's no partial 6-bit output. # - if bytes != str: - oup = b'' - else: - oup = '' + oup = b'' + if isinstance(inp, str): + inp = inp.encode('utf-8') while inp: if len(inp) > 48: t = inp[:48] inp = inp[48:] else: t = inp - inp = '' + inp = b'' e = binascii.b2a_base64(t) if e: oup = oup + e[:-1] @@ -2365,7 +2275,7 @@ class _Authenticator(object): def decode(self, inp): if not inp: - return '' + return b'' return binascii.a2b_base64(inp) @@ -2394,19 +2304,23 @@ class _IdleCont(object): MonthNames = [None, 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'] -Mon2num = dict(list(zip((x for x in MonthNames[1:]), list(range(1, 13))))) +Mon2num = {s.encode():n+1 for n, s in enumerate(MonthNames[1:])} -InternalDate = re.compile(r'.*INTERNALDATE "' - r'(?P[ 0123][0-9])-(?P[A-Z][a-z][a-z])-(?P[0-9][0-9][0-9][0-9])' - r' (?P[0-9][0-9]):(?P[0-9][0-9]):(?P[0-9][0-9])' - r' (?P[-+])(?P[0-9][0-9])(?P[0-9][0-9])' - r'"') +InternalDate = re.compile(br'.*INTERNALDATE "' + br'(?P[ 0123][0-9])-(?P[A-Z][a-z][a-z])-(?P[0-9][0-9][0-9][0-9])' + br' (?P[0-9][0-9]):(?P[0-9][0-9]):(?P[0-9][0-9])' + br' (?P[-+])(?P[0-9][0-9])(?P[0-9][0-9])' + br'"') def Internaldate2Time(resp): """time_tuple = Internaldate2Time(resp) - Convert IMAP4 INTERNALDATE to UT.""" + + Parse an IMAP4 INTERNALDATE string. + + Return corresponding local time. The return value is a + time.struct_time instance or None if the string has wrong format.""" mo = InternalDate.match(resp) if not mo: @@ -2426,23 +2340,11 @@ def Internaldate2Time(resp): # INTERNALDATE timezone must be subtracted to get UT zone = (zoneh*60 + zonem)*60 - if zonen == '-': + if zonen == b'-': zone = -zone tt = (year, mon, day, hour, min, sec, -1, -1, -1) - - utc = time.mktime(tt) - - # Following is necessary because the time module has no 'mkgmtime'. - # 'mktime' assumes arg in local timezone, so adds timezone/altzone. - - lt = time.localtime(utc) - if time.daylight and lt[-1]: - zone = zone + time.altzone - else: - zone = zone + time.timezone - - return time.localtime(utc - zone) + return time.localtime(calendar.timegm(tt) - zone) Internaldate2tuple = Internaldate2Time # (Backward compatible) @@ -2451,28 +2353,48 @@ Internaldate2tuple = Internaldate2Time # (Backward compatible) def Time2Internaldate(date_time): """'"DD-Mmm-YYYY HH:MM:SS +HHMM"' = Time2Internaldate(date_time) - Convert 'date_time' to IMAP4 INTERNALDATE representation.""" + + Convert 'date_time' to IMAP4 INTERNALDATE representation. + + The date_time argument can be a number (int or float) representing + seconds since epoch (as returned by time.time()), a 9-tuple + representing local time, an instance of time.struct_time (as + returned by time.localtime()), an aware datetime instance or a + double-quoted string. In the last case, it is assumed to already + be in the correct format.""" + + from datetime import datetime, timezone, timedelta if isinstance(date_time, (int, float)): tt = time.localtime(date_time) - elif isinstance(date_time, (tuple, time.struct_time)): - tt = date_time + elif isinstance(date_time, tuple): + try: + gmtoff = date_time.tm_gmtoff + except AttributeError: + if time.daylight: + dst = date_time[8] + if dst == -1: + dst = time.localtime(time.mktime(date_time))[8] + gmtoff = -(time.timezone, time.altzone)[dst] + else: + gmtoff = -time.timezone + delta = timedelta(seconds=gmtoff) + dt = datetime(*date_time[:6], tzinfo=timezone(delta)) + elif isinstance(date_time, datetime): + if date_time.tzinfo is None: + raise ValueError("date_time must be aware") + dt = date_time elif isinstance(date_time, str) and (date_time[0],date_time[-1]) == ('"','"'): return date_time # Assume in correct format else: raise ValueError("date_time not of a known type") - if time.daylight and tt[-1]: - zone = -time.altzone - else: - zone = -time.timezone - return ('"%2d-%s-%04d %02d:%02d:%02d %+03d%02d"' % - ((tt[2], MonthNames[tt[1]], tt[0]) + tt[3:6] + - divmod(zone//60, 60))) + fmt = '"%d-{}-%Y %H:%M:%S %z"'.format(MonthNames[dt.month]) + return dt.strftime(fmt) -FLAGS_cre = re.compile(r'.*FLAGS \((?P[^\)]*)\)') +FLAGS_cre = re.compile(br'.*FLAGS \((?P[^\)]*)\)') def ParseFlags(resp): @@ -2538,15 +2460,15 @@ if __name__ == '__main__': test_seq1 = [ ('list', ('""', '""')), - ('list', ('""', '%')), + ('list', ('""', '"%"')), ('create', ('imaplib2_test0',)), ('rename', ('imaplib2_test0', 'imaplib2_test1')), ('CREATE', ('imaplib2_test2',)), ('append', ('imaplib2_test2', None, None, test_mesg)), - ('list', ('', 'imaplib2_test%')), + ('list', ('""', '"imaplib2_test%"')), ('select', ('imaplib2_test2',)), - ('search', (None, 'SUBJECT', 'IMAP4 test')), - ('fetch', ("'1:*'", '(FLAGS INTERNALDATE RFC822)')), + ('search', (None, 'SUBJECT', '"IMAP4 test"')), + ('fetch', ('1:*', '(FLAGS INTERNALDATE RFC822)')), ('store', ('1', 'FLAGS', '(\Deleted)')), ('namespace', ()), ('expunge', ()), @@ -2561,10 +2483,10 @@ if __name__ == '__main__': ('append', (None, None, None, test_mesg)), ('examine', ()), ('select', ()), - ('fetch', ("'1:*'", '(FLAGS UID)')), + ('fetch', ('1:*', '(FLAGS UID)')), ('examine', ()), ('select', ()), - ('uid', ('SEARCH', 'SUBJECT', 'IMAP4 test')), + ('uid', ('SEARCH', 'SUBJECT', '"IMAP4 test"')), ('uid', ('SEARCH', 'ALL')), ('uid', ('THREAD', 'references', 'UTF-8', '(SEEN)')), ('recent', ()), @@ -2631,8 +2553,8 @@ if __name__ == '__main__': for cmd,args in test_seq1: run(cmd, args) - for ml in run('list', ('', 'imaplib2_test%'), cb=False): - mo = re.match(r'.*"([^"]+)"$', ml) + for ml in run('list', ('""', '"imaplib2_test%"'), cb=False): + mo = re.match(br'.*"([^"]+)"$', ml) if mo: path = mo.group(1) else: path = ml.split()[-1] run('delete', (path,))