Merge pull request #1 from thekix/master

Remove six, remove files rfc6555.py and selectors2.py and allow download mail from IMAP and Gmail
This commit is contained in:
Rodolfo García Peñas (kix) 2020-09-03 21:48:29 +02:00 committed by GitHub
commit 7625012179
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 123 additions and 1233 deletions

View File

@ -106,9 +106,8 @@ Bugs, issues and contributions can be requested to both the mailing list or the
## Requirements & dependencies
* Python v3+
* six (required)
* rfc6555 (required)
* imaplib2 >= 2.57 (optional)
* imaplib2 >= 3.5
* gssapi (optional), for Kerberos authentication
* portalocker (optional), if you need to run offlineimap in Cygwin for Windows

View File

@ -18,7 +18,6 @@ import os
import re
from sys import exc_info
from configparser import SafeConfigParser, Error
import six
from offlineimap.localeval import LocalEval
@ -71,10 +70,8 @@ class CustomConfigParser(SafeConfigParser):
val = self.get(section, option).strip()
return re.split(separator_re, val)
except re.error as e:
six.reraise(Error,
Error("Bad split regexp '%s': %s" %
(separator_re, e)),
exc_info()[2])
raise Error("Bad split regexp '%s': %s" %
(separator_re, e), exc_info()[2])
def getdefaultlist(self, section, option, default, separator_re):
"""Same as getlist, but returns the value of `default`

View File

@ -20,7 +20,6 @@ import os
import time
from sys import exc_info
import traceback
import six
from offlineimap import mbnames, CustomConfig, OfflineImapError
from offlineimap import globals
@ -243,14 +242,19 @@ class SyncableAccount(Account):
fcntl.lockf(self._lockfd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except NameError:
pass # fnctl not available, disable file locking... :(
except IOError:
raise OfflineImapError(
"Could not lock account %s. Is another "
"instance using this account?" % self,
OfflineImapError.ERROR.REPO,
exc_info()[2])
except IOError:
self._lockfd.close()
six.reraise(OfflineImapError,
OfflineImapError(
"Could not lock account %s. Is another "
"instance using this account?" % self,
OfflineImapError.ERROR.REPO),
exc_info()[2])
raise OfflineImapError(
"Could not lock account %s. Is another "
"instance using this account?" % self,
OfflineImapError.ERROR.REPO,
exc_info()[2])
def _unlock(self):
"""Unlock the account, deleting the lock file"""
@ -614,12 +618,11 @@ def syncfolder(account, remotefolder, quick):
localstart = localfolder.getstartdate()
remotestart = remotefolder.getstartdate()
if (maxage is not None) + (localstart is not None) + (remotestart is not None) > 1:
six.reraise(OfflineImapError,
OfflineImapError("You can set at most one of the "
"following: maxage, startdate (for the local "
"folder), startdate (for the remote folder)",
OfflineImapError.ERROR.REPO),
exc_info()[2])
raise OfflineImapError("You can set at most one of the "
"following: maxage, startdate (for the local "
"folder), startdate (for the remote folder)",
OfflineImapError.ERROR.REPO,
exc_info()[2])
if (maxage is not None or localstart or remotestart) and quick:
# IMAP quickchanged isn't compatible with options that
# involve restricting the messagelist, since the "quick"

View File

@ -19,8 +19,6 @@
import re
from sys import exc_info
import six
from offlineimap import imaputil, imaplibutil, OfflineImapError
import offlineimap.accounts
from .IMAP import IMAPFolder
@ -71,10 +69,10 @@ class GmailFolder(IMAPFolder):
data = self._fetch_from_imap(str(uid), self.retrycount)
# data looks now e.g.
# [('320 (X-GM-LABELS (...) UID 17061 BODY[] {2565}','msgbody....')]
# we only asked for one message, and that msg is in data[0].
# msbody is in [0][1].
body = data[0][1].replace("\r\n", "\n")
# ['320 (X-GM-LABELS (...) UID 17061 BODY[] {2565}','msgbody....']
# we only asked for one message, and that msg is in data[1].
# msbody is in [1].
body = data[1].replace("\r\n", "\n")
# Embed the labels into the message headers
if self.synclabels:
@ -134,14 +132,13 @@ class GmailFolder(IMAPFolder):
res_type, response = imapobj.fetch("'%s'" % msgsToFetch,
'(FLAGS X-GM-LABELS UID)')
if res_type != 'OK':
six.reraise(OfflineImapError,
OfflineImapError(
"FETCHING UIDs in folder [%s]%s failed. " %
(self.getrepository(), self) +
"Server responded '[%s] %s'" %
(res_type, response),
OfflineImapError.ERROR.FOLDER),
exc_info()[2])
raise OfflineImapError(
"FETCHING UIDs in folder [%s]%s failed. " %
(self.getrepository(), self) +
"Server responded '[%s] %s'" %
(res_type, response),
OfflineImapError.ERROR.FOLDER,
exc_info()[2])
finally:
self.imapserver.releaseconnection(imapobj)

View File

@ -18,8 +18,6 @@
import os
from sys import exc_info
import six
import offlineimap.accounts
from offlineimap import OfflineImapError
from offlineimap import imaputil
@ -181,11 +179,10 @@ class GmailMaildirFolder(MaildirFolder):
try:
os.rename(tmppath, filepath)
except OSError as e:
six.reraise(OfflineImapError,
OfflineImapError("Can't rename file '%s' to '%s': %s" %
(tmppath, filepath, e[1]),
OfflineImapError.ERROR.FOLDER),
exc_info()[2])
raise OfflineImapError("Can't rename file '%s' to '%s': %s" %
(tmppath, filepath, e[1]),
OfflineImapError.ERROR.FOLDER,
exc_info()[2])
# If utime_from_header=true, we don't want to change the mtime.
if self._utime_from_header and mtime:

View File

@ -20,8 +20,6 @@ import binascii
import re
import time
from sys import exc_info
import six
from offlineimap import imaputil, imaplibutil, emailutil, OfflineImapError
from offlineimap import globals
from imaplib2 import MonthNames
@ -115,11 +113,10 @@ class IMAPFolder(BaseFolder):
def getmaxage(self):
if self.config.getdefault("Account %s" %
self.accountname, "maxage", None):
six.reraise(OfflineImapError,
OfflineImapError(
"maxage is not supported on IMAP-IMAP sync",
OfflineImapError.ERROR.REPO),
exc_info()[2])
raise OfflineImapError(
"maxage is not supported on IMAP-IMAP sync",
OfflineImapError.ERROR.REPO,
exc_info()[2])
# Interface from BaseFolder
def getinstancelimitnamespace(self):
@ -391,6 +388,10 @@ class IMAPFolder(BaseFolder):
try:
matchinguids = imapobj.uid('search', 'HEADER',
headername, headervalue)[1][0]
# Returned value is type bytes
matchinguids = matchinguids.decode('utf-8')
except imapobj.error as err:
# IMAP server doesn't implement search or had a problem.
self.ui.debug('imap', "__savemessage_searchforheader: got IMAP "
@ -456,11 +457,7 @@ class IMAPFolder(BaseFolder):
# Folder was empty - start from 1.
start = 1
# Imaplib quotes all parameters of a string type. That must not happen
# with the range X:*. So we use bytearray to stop imaplib from getting
# in our way.
result = imapobj.uid('FETCH', bytearray('%d:*' % start), 'rfc822.header')
result = imapobj.uid('FETCH', '%d:*' % start, 'rfc822.header')
if result[0] != 'OK':
raise OfflineImapError('Error fetching mail headers: %s' %
'. '.join(result[1]), OfflineImapError.ERROR.MESSAGE)
@ -477,6 +474,9 @@ class IMAPFolder(BaseFolder):
# ('185 (RFC822.HEADER {1789}', '... mail headers ...'), ' UID 2444)'
for item in result:
if found is None and type(item) == tuple:
# Decode the value
item = [x.decode('utf-8') for x in item]
# Walk just tuples.
if re.search("(?:^|\\r|\\n)%s:\s*%s(?:\\r|\\n)" % (headername, headervalue),
item[1], flags=re.IGNORECASE):
@ -687,13 +687,14 @@ class IMAPFolder(BaseFolder):
self.imapserver.releaseconnection(imapobj, True)
imapobj = self.imapserver.acquireconnection()
if not retry_left:
six.reraise(OfflineImapError,
OfflineImapError("Saving msg (%s) in folder '%s', "
"repository '%s' failed (abort). Server responded: %s\n"
"Message content was: %s" %
(msg_id, self, self.getrepository(), str(e), dbg_output),
OfflineImapError.ERROR.MESSAGE),
exc_info()[2])
raise OfflineImapError(
"Saving msg (%s) in folder '%s', "
"repository '%s' failed (abort). Server responded: %s\n"
"Message content was: %s" %
(msg_id, self, self.getrepository(), str(e), dbg_output),
OfflineImapError.ERROR.MESSAGE,
exc_info()[2])
# XXX: is this still needed?
self.ui.error(e, exc_info()[2])
except imapobj.error as e: # APPEND failed
@ -702,12 +703,13 @@ class IMAPFolder(BaseFolder):
# drop conn, it might be bad.
self.imapserver.releaseconnection(imapobj, True)
imapobj = None
six.reraise(OfflineImapError,
OfflineImapError("Saving msg (%s) folder '%s', repo '%s'"
"failed (error). Server responded: %s\nMessage content was: "
"%s" % (msg_id, self, self.getrepository(), str(e), dbg_output),
OfflineImapError.ERROR.MESSAGE),
exc_info()[2])
raise OfflineImapError(
"Saving msg (%s) folder '%s', repo '%s'"
"failed (error). Server responded: %s\nMessage content was: "
"%s" % (msg_id, self, self.getrepository(), str(e), dbg_output),
OfflineImapError.ERROR.MESSAGE,
exc_info()[2])
# Checkpoint. Let it write out stuff, etc. Eg searches for
# just uploaded messages won't work if we don't do this.
(typ, dat) = imapobj.check()

View File

@ -18,8 +18,6 @@
from sys import exc_info
import os
import threading
import six
from .Base import BaseFolder
@ -71,7 +69,7 @@ class LocalStatusFolder(BaseFolder):
errstr = ("Corrupt line '%s' in cache file '%s'" %
(line, self.filename))
self.ui.warn(errstr)
six.reraise(ValueError, ValueError(errstr), exc_info()[2])
raise ValueError(errstr, exc_info()[2])
self.messagelist[uid] = self.msglist_item_initializer(uid)
self.messagelist[uid]['flags'] = flags
@ -94,7 +92,7 @@ class LocalStatusFolder(BaseFolder):
errstr = "Corrupt line '%s' in cache file '%s'" % \
(line, self.filename)
self.ui.warn(errstr)
six.reraise(ValueError, ValueError(errstr), exc_info()[2])
raise ValueError(errstr, exc_info()[2])
self.messagelist[uid] = self.msglist_item_initializer(uid)
self.messagelist[uid]['flags'] = flags
self.messagelist[uid]['mtime'] = mtime

View File

@ -19,9 +19,6 @@ import os
import sqlite3 as sqlite
from sys import exc_info
from threading import Lock
import six
from .Base import BaseFolder
@ -117,13 +114,11 @@ class LocalStatusSQLiteFolder(BaseFolder):
self._databaseFileLock.registerNewUser()
except sqlite.OperationalError as e:
# Operation had failed.
six.reraise(UserWarning,
UserWarning(
"cannot open database file '%s': %s.\nYou might"
" want to check the rights to that file and if "
"it cleanly opens with the 'sqlite<3>' command" %
(self.filename, e)),
exc_info()[2])
raise UserWarning(
"cannot open database file '%s': %s.\nYou might"
" want to check the rights to that file and if "
"it cleanly opens with the 'sqlite<3>' command" %
(self.filename, e), exc_info()[2])
# Test if db version is current enough and if db is readable.
try:
@ -351,10 +346,9 @@ class LocalStatusSQLiteFolder(BaseFolder):
self.__sql_write('INSERT INTO status (id,flags,mtime,labels) VALUES (?,?,?,?)',
(uid, flags, mtime, labels))
except Exception as e:
six.reraise(UserWarning,
UserWarning("%s while inserting UID %s" %
(str(e), str(uid))),
exc_info()[2])
raise UserWarning("%s while inserting UID %s" %
(str(e), str(uid)),
exc_info()[2])
return uid
# Interface from BaseFolder

View File

@ -21,13 +21,7 @@ import re
import os
from sys import exc_info
from threading import Lock
import six
try:
from hashlib import md5
except ImportError:
from md5 import md5
from hashlib import md5
from offlineimap import OfflineImapError, emailutil
from .Base import BaseFolder
@ -319,11 +313,10 @@ class MaildirFolder(BaseFolder):
time.sleep(0.23)
continue
severity = OfflineImapError.ERROR.MESSAGE
six.reraise(OfflineImapError,
OfflineImapError(
"Unique filename %s already exists." %
filename, severity),
exc_info()[2])
raise OfflineImapError(
"Unique filename %s already exists." %
filename, severity,
exc_info()[2])
else:
raise
@ -442,12 +435,11 @@ class MaildirFolder(BaseFolder):
os.rename(os.path.join(self.getfullname(), oldfilename),
os.path.join(self.getfullname(), newfilename))
except OSError as e:
six.reraise(OfflineImapError,
OfflineImapError(
"Can't rename file '%s' to '%s': %s" %
(oldfilename, newfilename, e[1]),
OfflineImapError.ERROR.FOLDER),
exc_info()[2])
raise OfflineImapError(
"Can't rename file '%s' to '%s': %s" %
(oldfilename, newfilename, e[1]),
OfflineImapError.ERROR.FOLDER,
exc_info()[2])
self.messagelist[uid]['flags'] = flags
self.messagelist[uid]['filename'] = newfilename
@ -529,12 +521,12 @@ class MaildirFolder(BaseFolder):
try:
os.rename(filename, newfilename)
except OSError as e:
six.reraise(OfflineImapError,
OfflineImapError(
"Can't rename file '%s' to '%s': %s" %
(filename, newfilename, e[1]),
OfflineImapError.ERROR.FOLDER),
exc_info()[2])
raise OfflineImapError(
"Can't rename file '%s' to '%s': %s" %
(filename, newfilename, e[1]),
OfflineImapError.ERROR.FOLDER,
exc_info()[2])
elif match.group(1) != self._foldermd5:
self.ui.warn(("Inconsistent FMD5 for file `%s':"
" Neither `%s' nor `%s' found")

View File

@ -20,7 +20,6 @@ import shutil
from os import fsync, unlink
from sys import exc_info
from threading import Lock
import six
try:
import portalocker
@ -89,11 +88,11 @@ class MappedIMAPFolder(IMAPFolder):
try:
line = line.strip()
except ValueError:
six.reraise(Exception,
Exception(
"Corrupt line '%s' in UID mapping file '%s'" %
(line, mapfilename)),
exc_info()[2])
raise Exception(
"Corrupt line '%s' in UID mapping file '%s'" %
(line, mapfilename),
exc_info()[2])
(str1, str2) = line.split(':')
loc = int(str1)
rem = int(str2)
@ -129,14 +128,13 @@ class MappedIMAPFolder(IMAPFolder):
try:
return [mapping[x] for x in items]
except KeyError as e:
six.reraise(OfflineImapError,
OfflineImapError(
"Could not find UID for msg '{0}' (f:'{1}'."
" This is usually a bad thing and should be "
"reported on the mailing list.".format(
e.args[0], self),
OfflineImapError.ERROR.MESSAGE),
exc_info()[2])
raise OfflineImapError(
"Could not find UID for msg '{0}' (f:'{1}'."
" This is usually a bad thing and should be "
"reported on the mailing list.".format(
e.args[0], self),
OfflineImapError.ERROR.MESSAGE,
exc_info()[2])
# Interface from BaseFolder
def cachemessagelist(self, min_date=None, min_uid=None):

View File

@ -24,9 +24,7 @@ import errno
import zlib
from sys import exc_info
from hashlib import sha512, sha384, sha256, sha224, sha1
import six
import rfc6555
from offlineimap import OfflineImapError
from offlineimap.ui import getglobalui
from imaplib2 import IMAP4, IMAP4_SSL, InternalDate
@ -59,9 +57,8 @@ class UsefulIMAPMixIn:
errstr = "Server '%s' closed connection, error on SELECT '%s'. Ser" \
"ver said: %s" % (self.host, mailbox, e.args[0])
severity = OfflineImapError.ERROR.FOLDER_RETRY
six.reraise(OfflineImapError,
OfflineImapError(errstr, severity),
exc_info()[2])
raise OfflineImapError(errstr, severity, exc_info()[2])
if result[0] != 'OK':
# in case of error, bail out with OfflineImapError
errstr = "Error SELECTing mailbox '%s', server reply:\n%s" % \

View File

@ -27,16 +27,12 @@ from socket import gaierror
from sys import exc_info
from ssl import SSLError, cert_time_to_seconds
from threading import Lock, BoundedSemaphore, Thread, Event, currentThread
import six
import offlineimap.accounts
from offlineimap import imaplibutil, imaputil, threadutil, OfflineImapError
from offlineimap.ui import getglobalui
try:
import gssapi
have_gss = True
except ImportError:
have_gss = False
@ -253,7 +249,8 @@ class IMAPServer():
msg = "%s (configuration is: %s)" % (e, str(params))
except Exception as eparams:
msg = "%s [cannot display configuration: %s]" % (e, eparams)
six.reraise(type(e), type(e)(msg), exc_info()[2])
raise type(e)(msg, exc_info()[2])
finally:
socket.socket = original_socket
@ -633,9 +630,7 @@ class IMAPServer():
"'%s'. Make sure you have configured the ser" \
"ver name correctly and that you are online." % \
(self.hostname, self.repos)
six.reraise(OfflineImapError,
OfflineImapError(reason, severity),
exc_info()[2])
raise OfflineImapError(reason, severity, exc_info()[2])
elif isinstance(e, SSLError) and e.errno == errno.EPERM:
# SSL unknown protocol error
@ -649,9 +644,7 @@ class IMAPServer():
reason = "Unknown SSL protocol connecting to host '%s' for " \
"repository '%s'. OpenSSL responded:\n%s" \
% (self.hostname, self.repos, e)
six.reraise(OfflineImapError,
OfflineImapError(reason, severity),
exc_info()[2])
raise OfflineImapError(reason, severity, exc_info()[2])
elif isinstance(e, socket.error) and e.args[0] == errno.ECONNREFUSED:
# "Connection refused", can be a non-existing port, or an unauthorized
@ -660,18 +653,16 @@ class IMAPServer():
"refused. Make sure you have the right host and port " \
"configured and that you are actually able to access the " \
"network." % (self.hostname, self.port, self.repos)
six.reraise(OfflineImapError,
OfflineImapError(reason, severity),
exc_info()[2])
raise OfflineImapError(reason, severity, exc_info()[2])
# Could not acquire connection to the remote;
# socket.error(last_error) raised
if str(e)[:24] == "can't open socket; error":
six.reraise(OfflineImapError,
OfflineImapError(
"Could not connect to remote server '%s' "
"for repository '%s'. Remote does not answer." % (self.hostname, self.repos),
OfflineImapError.ERROR.REPO),
exc_info()[2])
raise OfflineImapError(
"Could not connect to remote server '%s' "
"for repository '%s'. Remote does not answer." % (self.hostname, self.repos),
OfflineImapError.ERROR.REPO,
exc_info()[2])
else:
# re-raise all other errors
raise

View File

@ -21,9 +21,6 @@ import netrc
import errno
from sys import exc_info
from threading import Event
import six
from offlineimap import folder, imaputil, imapserver, OfflineImapError
from offlineimap.repository.Base import BaseRepository
from offlineimap.threadutil import ExitNotifyThread
@ -127,12 +124,11 @@ class IMAPRepository(BaseRepository):
try:
host = self.localeval.eval(host)
except Exception as e:
six.reraise(OfflineImapError,
OfflineImapError(
"remotehosteval option for repository "
"'%s' failed:\n%s" % (self, e),
OfflineImapError.ERROR.REPO),
exc_info()[2])
raise OfflineImapError(
"remotehosteval option for repository "
"'%s' failed:\n%s" % (self, e),
OfflineImapError.ERROR.REPO,
exc_info()[2])
if host:
self._host = host
return self._host
@ -549,7 +545,7 @@ class IMAPRepository(BaseRepository):
if foldername == '':
return
if self.getreference():
if self.getreference() != '""':
foldername = self.getreference() + self.getsep() + foldername
if not foldername: # Create top level folder as folder separator.
foldername = self.getsep()

View File

@ -15,13 +15,7 @@
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
from sys import exc_info
import six
try:
from configparser import NoSectionError
except ImportError: # python2
from configparser import NoSectionError
from configparser import NoSectionError
from offlineimap.repository.IMAP import IMAPRepository, MappedIMAPRepository
from offlineimap.repository.Gmail import GmailRepository
from offlineimap.repository.Maildir import MaildirRepository
@ -68,18 +62,16 @@ class Repository:
except NoSectionError:
errstr = ("Could not find section '%s' in configuration. Required "
"for account '%s'." % ('Repository %s' % name, account))
six.reraise(OfflineImapError,
OfflineImapError(errstr, OfflineImapError.ERROR.REPO),
exc_info()[2])
raise OfflineImapError(errstr, OfflineImapError.ERROR.REPO,
exc_info()[2])
try:
repo = typemap[repostype]
except KeyError:
errstr = "'%s' repository not supported for '%s' repositories." % \
(repostype, reqtype)
six.reraise(OfflineImapError,
OfflineImapError(errstr, OfflineImapError.ERROR.REPO),
exc_info()[2])
raise OfflineImapError(errstr, OfflineImapError.ERROR.REPO,
exc_info()[2])
return repo(name, account)

View File

@ -1,5 +1,4 @@
# Requirements
six
gssapi[kerberos]
portalocker[cygwin]
rfc6555

View File

@ -1,315 +0,0 @@
""" Python implementation of the Happy Eyeballs Algorithm described in RFC 6555. """
# Copyright 2017 Seth Michael Larson
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import errno
import socket
from selectors2 import DefaultSelector, EVENT_WRITE
# time.perf_counter() is defined in Python 3.3
try:
from time import perf_counter
except (ImportError, AttributeError):
from time import time as perf_counter
# This list is due to socket.error and IOError not being a
# subclass of OSError until later versions of Python.
_SOCKET_ERRORS = (socket.error, OSError, IOError)
# Detects whether an IPv6 socket can be allocated.
def _detect_ipv6():
if getattr(socket, 'has_ipv6', False) and hasattr(socket, 'AF_INET6'):
_sock = None
try:
_sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
_sock.bind(('::1', 0))
return True
except _SOCKET_ERRORS:
if _sock:
_sock.close()
return False
_HAS_IPV6 = _detect_ipv6()
# These are error numbers for asynchronous operations which can
# be safely ignored by RFC 6555 as being non-errors.
_ASYNC_ERRNOS = set([errno.EINPROGRESS,
errno.EAGAIN,
errno.EWOULDBLOCK])
if hasattr(errno, 'WSAWOULDBLOCK'):
_ASYNC_ERRNOS.add(errno.WSAWOULDBLOCK)
_DEFAULT_CACHE_DURATION = 60 * 10 # 10 minutes according to the RFC.
# This value that can be used to disable RFC 6555 globally.
RFC6555_ENABLED = _HAS_IPV6
__all__ = ['RFC6555_ENABLED',
'create_connection',
'cache']
__version__ = '0.0.0'
__author__ = 'Seth Michael Larson'
__email__ = 'sethmichaellarson@protonmail.com'
__license__ = 'Apache-2.0'
class _RFC6555CacheManager(object):
def __init__(self):
self.validity_duration = _DEFAULT_CACHE_DURATION
self.enabled = True
self.entries = {}
def add_entry(self, address, family):
if self.enabled:
current_time = perf_counter()
# Don't over-write old entries to reset their expiry.
if address not in self.entries or self.entries[address][1] > current_time:
self.entries[address] = (family, current_time + self.validity_duration)
def get_entry(self, address):
if not self.enabled or address not in self.entries:
return None
family, expiry = self.entries[address]
if perf_counter() > expiry:
del self.entries[address]
return None
return family
cache = _RFC6555CacheManager()
class _RFC6555ConnectionManager(object):
def __init__(self, address, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, source_address=None):
self.address = address
self.timeout = timeout
self.source_address = source_address
self._error = None
self._selector = DefaultSelector()
self._sockets = []
self._start_time = None
def create_connection(self):
self._start_time = perf_counter()
host, port = self.address
addr_info = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM)
ret = self._connect_with_cached_family(addr_info)
# If it's a list, then these are the remaining values to try.
if isinstance(ret, list):
addr_info = ret
else:
cache.add_entry(self.address, ret.family)
return ret
# If we don't get any results back then just skip to the end.
if not addr_info:
raise socket.error('getaddrinfo returns an empty list')
sock = self._attempt_connect_with_addr_info(addr_info)
if sock:
cache.add_entry(self.address, sock.family)
return sock
elif self._error:
raise self._error
else:
raise socket.timeout()
def _attempt_connect_with_addr_info(self, addr_info):
sock = None
try:
for family, socktype, proto, _, sockaddr in addr_info:
self._create_socket(family, socktype, proto, sockaddr)
sock = self._wait_for_connection(False)
if sock:
break
if sock is None:
sock = self._wait_for_connection(True)
finally:
self._remove_all_sockets()
return sock
def _connect_with_cached_family(self, addr_info):
family = cache.get_entry(self.address)
if family is None:
return addr_info
is_family = []
not_family = []
for value in addr_info:
if value[0] == family:
is_family.append(value)
else:
not_family.append(value)
sock = self._attempt_connect_with_addr_info(is_family)
if sock is not None:
return sock
return not_family
def _create_socket(self, family, socktype, proto, sockaddr):
sock = None
try:
sock = socket.socket(family, socktype, proto)
# If we're using the 'default' socket timeout we have
# to set it to a real value here as this is the earliest
# opportunity to without pre-allocating a socket just for
# this purpose.
if self.timeout is socket._GLOBAL_DEFAULT_TIMEOUT:
self.timeout = sock.gettimeout()
if self.source_address:
sock.bind(self.source_address)
# Make the socket non-blocking so we can use our selector.
sock.settimeout(0.0)
if self._is_acceptable_errno(sock.connect_ex(sockaddr)):
self._selector.register(sock, EVENT_WRITE)
self._sockets.append(sock)
except _SOCKET_ERRORS as e:
self._error = e
if sock is not None:
_RFC6555ConnectionManager._close_socket(sock)
def _wait_for_connection(self, last_wait):
self._remove_all_errored_sockets()
# This is a safe-guard to make sure sock.gettimeout() is called in the
# case that the default socket timeout is used. If there are no
# sockets then we may not have called sock.gettimeout() yet.
if not self._sockets:
return None
# If this is the last time we're waiting for connections
# then we should wait until we should raise a timeout
# error, otherwise we should only wait >0.2 seconds as
# recommended by RFC 6555.
if last_wait:
if self.timeout is None:
select_timeout = None
else:
select_timeout = self._get_remaining_time()
else:
select_timeout = self._get_select_time()
# Wait for any socket to become writable as a sign of being connected.
for key, _ in self._selector.select(select_timeout):
sock = key.fileobj
if not self._is_socket_errored(sock):
# Restore the old proper timeout of the socket.
sock.settimeout(self.timeout)
# Remove it from this list to exempt the socket from cleanup.
self._sockets.remove(sock)
self._selector.unregister(sock)
return sock
return None
def _get_remaining_time(self):
if self.timeout is None:
return None
return max(self.timeout - (perf_counter() - self._start_time), 0.0)
def _get_select_time(self):
if self.timeout is None:
return 0.2
return min(0.2, self._get_remaining_time())
def _remove_all_errored_sockets(self):
socks = []
for sock in self._sockets:
if self._is_socket_errored(sock):
socks.append(sock)
for sock in socks:
self._selector.unregister(sock)
self._sockets.remove(sock)
_RFC6555ConnectionManager._close_socket(sock)
@staticmethod
def _close_socket(sock):
try:
sock.close()
except _SOCKET_ERRORS:
pass
def _is_acceptable_errno(self, errno):
if errno == 0 or errno in _ASYNC_ERRNOS:
return True
self._error = socket.error()
self._error.errno = errno
return False
def _is_socket_errored(self, sock):
errno = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
return not self._is_acceptable_errno(errno)
def _remove_all_sockets(self):
for sock in self._sockets:
self._selector.unregister(sock)
_RFC6555ConnectionManager._close_socket(sock)
self._sockets = []
def create_connection(address, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, source_address=None):
if RFC6555_ENABLED and _HAS_IPV6:
manager = _RFC6555ConnectionManager(address, timeout, source_address)
return manager.create_connection()
else:
# This code is the same as socket.create_connection() but is
# here to make sure the same code is used across all Python versions as
# the source_address parameter was added to socket.create_connection() in 3.2
# This segment of code is licensed under the Python Software Foundation License
# See LICENSE: https://github.com/python/cpython/blob/3.6/LICENSE
host, port = address
err = None
for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
sock = None
try:
sock = socket.socket(af, socktype, proto)
if timeout is not socket._GLOBAL_DEFAULT_TIMEOUT:
sock.settimeout(timeout)
if source_address:
sock.bind(source_address)
sock.connect(sa)
return sock
except socket.error as _:
err = _
if sock is not None:
sock.close()
if err is not None:
raise err
else:
raise socket.error("getaddrinfo returns an empty list")

View File

@ -1,747 +0,0 @@
""" Back-ported, durable, and portable selectors """
# MIT License
#
# Copyright (c) 2017 Seth Michael Larson
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from collections import namedtuple, Mapping
import errno
import math
import platform
import select
import socket
import sys
import time
try:
monotonic = time.monotonic
except AttributeError:
monotonic = time.time
__author__ = 'Seth Michael Larson'
__email__ = 'sethmichaellarson@protonmail.com'
__version__ = '2.0.2'
__license__ = 'MIT'
__url__ = 'https://www.github.com/SethMichaelLarson/selectors2'
__all__ = ['EVENT_READ',
'EVENT_WRITE',
'SelectorKey',
'DefaultSelector',
'BaseSelector']
EVENT_READ = (1 << 0)
EVENT_WRITE = (1 << 1)
_DEFAULT_SELECTOR = None
_SYSCALL_SENTINEL = object() # Sentinel in case a system call returns None.
_ERROR_TYPES = (OSError, IOError, socket.error)
try:
_INTEGER_TYPES = (int, long)
except NameError:
_INTEGER_TYPES = (int,)
SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data'])
class _SelectorMapping(Mapping):
""" Mapping of file objects to selector keys """
def __init__(self, selector):
self._selector = selector
def __len__(self):
return len(self._selector._fd_to_key)
def __getitem__(self, fileobj):
try:
fd = self._selector._fileobj_lookup(fileobj)
return self._selector._fd_to_key[fd]
except KeyError:
raise KeyError("{0!r} is not registered.".format(fileobj))
def __iter__(self):
return iter(self._selector._fd_to_key)
def _fileobj_to_fd(fileobj):
""" Return a file descriptor from a file object. If
given an integer will simply return that integer back. """
if isinstance(fileobj, _INTEGER_TYPES):
fd = fileobj
else:
for _integer_type in _INTEGER_TYPES:
try:
fd = _integer_type(fileobj.fileno())
break
except (AttributeError, TypeError, ValueError):
continue
else:
raise ValueError("Invalid file object: {0!r}".format(fileobj))
if fd < 0:
raise ValueError("Invalid file descriptor: {0}".format(fd))
return fd
class BaseSelector(object):
""" Abstract Selector class
A selector supports registering file objects to be monitored
for specific I/O events.
A file object is a file descriptor or any object with a
`fileno()` method. An arbitrary object can be attached to the
file object which can be used for example to store context info,
a callback, etc.
A selector can use various implementations (select(), poll(), epoll(),
and kqueue()) depending on the platform. The 'DefaultSelector' class uses
the most efficient implementation for the current platform.
"""
def __init__(self):
# Maps file descriptors to keys.
self._fd_to_key = {}
# Read-only mapping returned by get_map()
self._map = _SelectorMapping(self)
def _fileobj_lookup(self, fileobj):
""" Return a file descriptor from a file object.
This wraps _fileobj_to_fd() to do an exhaustive
search in case the object is invalid but we still
have it in our map. Used by unregister() so we can
unregister an object that was previously registered
even if it is closed. It is also used by _SelectorMapping
"""
try:
return _fileobj_to_fd(fileobj)
except ValueError:
# Search through all our mapped keys.
for key in self._fd_to_key.values():
if key.fileobj is fileobj:
return key.fd
# Raise ValueError after all.
raise
def register(self, fileobj, events, data=None):
""" Register a file object for a set of events to monitor. """
if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)):
raise ValueError("Invalid events: {0!r}".format(events))
key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data)
if key.fd in self._fd_to_key:
raise KeyError("{0!r} (FD {1}) is already registered"
.format(fileobj, key.fd))
self._fd_to_key[key.fd] = key
return key
def unregister(self, fileobj):
""" Unregister a file object from being monitored. """
try:
key = self._fd_to_key.pop(self._fileobj_lookup(fileobj))
except KeyError:
raise KeyError("{0!r} is not registered".format(fileobj))
# Getting the fileno of a closed socket on Windows errors with EBADF.
except socket.error as err:
if err.errno != errno.EBADF:
raise
else:
for key in self._fd_to_key.values():
if key.fileobj is fileobj:
self._fd_to_key.pop(key.fd)
break
else:
raise KeyError("{0!r} is not registered".format(fileobj))
return key
def modify(self, fileobj, events, data=None):
""" Change a registered file object monitored events and data. """
# NOTE: Some subclasses optimize this operation even further.
try:
key = self._fd_to_key[self._fileobj_lookup(fileobj)]
except KeyError:
raise KeyError("{0!r} is not registered".format(fileobj))
if events != key.events:
self.unregister(fileobj)
key = self.register(fileobj, events, data)
elif data != key.data:
# Use a shortcut to update the data.
key = key._replace(data=data)
self._fd_to_key[key.fd] = key
return key
def select(self, timeout=None):
""" Perform the actual selection until some monitored file objects
are ready or the timeout expires. """
raise NotImplementedError()
def close(self):
""" Close the selector. This must be called to ensure that all
underlying resources are freed. """
self._fd_to_key.clear()
self._map = None
def get_key(self, fileobj):
""" Return the key associated with a registered file object. """
mapping = self.get_map()
if mapping is None:
raise RuntimeError("Selector is closed")
try:
return mapping[fileobj]
except KeyError:
raise KeyError("{0!r} is not registered".format(fileobj))
def get_map(self):
""" Return a mapping of file objects to selector keys """
return self._map
def _key_from_fd(self, fd):
""" Return the key associated to a given file descriptor
Return None if it is not found. """
try:
return self._fd_to_key[fd]
except KeyError:
return None
def __enter__(self):
return self
def __exit__(self, *_):
self.close()
# Almost all platforms have select.select()
if hasattr(select, "select"):
class SelectSelector(BaseSelector):
""" Select-based selector. """
def __init__(self):
super(SelectSelector, self).__init__()
self._readers = set()
self._writers = set()
def register(self, fileobj, events, data=None):
key = super(SelectSelector, self).register(fileobj, events, data)
if events & EVENT_READ:
self._readers.add(key.fd)
if events & EVENT_WRITE:
self._writers.add(key.fd)
return key
def unregister(self, fileobj):
key = super(SelectSelector, self).unregister(fileobj)
self._readers.discard(key.fd)
self._writers.discard(key.fd)
return key
def select(self, timeout=None):
# Selecting on empty lists on Windows errors out.
if not len(self._readers) and not len(self._writers):
return []
timeout = None if timeout is None else max(timeout, 0.0)
ready = []
r, w, _ = _syscall_wrapper(self._wrap_select, True, self._readers,
self._writers, timeout=timeout)
r = set(r)
w = set(w)
for fd in r | w:
events = 0
if fd in r:
events |= EVENT_READ
if fd in w:
events |= EVENT_WRITE
key = self._key_from_fd(fd)
if key:
ready.append((key, events & key.events))
return ready
def _wrap_select(self, r, w, timeout=None):
""" Wrapper for select.select because timeout is a positional arg """
return select.select(r, w, [], timeout)
__all__.append('SelectSelector')
# Jython has a different implementation of .fileno() for socket objects.
if platform.python_implementation() == 'Jython':
class _JythonSelectorMapping(object):
""" This is an implementation of _SelectorMapping that is built
for use specifically with Jython, which does not provide a hashable
value from socket.socket.fileno(). """
def __init__(self, selector):
assert isinstance(selector, JythonSelectSelector)
self._selector = selector
def __len__(self):
return len(self._selector._sockets)
def __getitem__(self, fileobj):
for sock, key in self._selector._sockets:
if sock is fileobj:
return key
else:
raise KeyError("{0!r} is not registered.".format(fileobj))
class JythonSelectSelector(SelectSelector):
""" This is an implementation of SelectSelector that is for Jython
which works around that Jython's socket.socket.fileno() does not
return an integer fd value. All SelectorKey.fd will be equal to -1
and should not be used. This instead uses object id to compare fileobj
and will only use select.select as it's the only selector that allows
directly passing in socket objects rather than registering fds.
See: http://bugs.jython.org/issue1678
https://wiki.python.org/jython/NewSocketModule#socket.fileno.28.29_does_not_return_an_integer
"""
def __init__(self):
super(JythonSelectSelector, self).__init__()
self._sockets = [] # Uses a list of tuples instead of dictionary.
self._map = _JythonSelectorMapping(self)
self._readers = []
self._writers = []
# Jython has a select.cpython_compatible_select function in older versions.
self._select_func = getattr(select, 'cpython_compatible_select', select.select)
def register(self, fileobj, events, data=None):
for sock, _ in self._sockets:
if sock is fileobj:
raise KeyError("{0!r} is already registered"
.format(fileobj, sock))
key = SelectorKey(fileobj, -1, events, data)
self._sockets.append((fileobj, key))
if events & EVENT_READ:
self._readers.append(fileobj)
if events & EVENT_WRITE:
self._writers.append(fileobj)
return key
def unregister(self, fileobj):
for i, (sock, key) in enumerate(self._sockets):
if sock is fileobj:
break
else:
raise KeyError("{0!r} is not registered.".format(fileobj))
if key.events & EVENT_READ:
self._readers.remove(fileobj)
if key.events & EVENT_WRITE:
self._writers.remove(fileobj)
del self._sockets[i]
return key
def _wrap_select(self, r, w, timeout=None):
""" Wrapper for select.select because timeout is a positional arg """
return self._select_func(r, w, [], timeout)
__all__.append('JythonSelectSelector')
SelectSelector = JythonSelectSelector # Override so the wrong selector isn't used.
if hasattr(select, "poll"):
class PollSelector(BaseSelector):
""" Poll-based selector """
def __init__(self):
super(PollSelector, self).__init__()
self._poll = select.poll()
def register(self, fileobj, events, data=None):
key = super(PollSelector, self).register(fileobj, events, data)
event_mask = 0
if events & EVENT_READ:
event_mask |= select.POLLIN
if events & EVENT_WRITE:
event_mask |= select.POLLOUT
self._poll.register(key.fd, event_mask)
return key
def unregister(self, fileobj):
key = super(PollSelector, self).unregister(fileobj)
self._poll.unregister(key.fd)
return key
def _wrap_poll(self, timeout=None):
""" Wrapper function for select.poll.poll() so that
_syscall_wrapper can work with only seconds. """
if timeout is not None:
if timeout <= 0:
timeout = 0
else:
# select.poll.poll() has a resolution of 1 millisecond,
# round away from zero to wait *at least* timeout seconds.
timeout = math.ceil(timeout * 1000)
result = self._poll.poll(timeout)
return result
def select(self, timeout=None):
ready = []
fd_events = _syscall_wrapper(self._wrap_poll, True, timeout=timeout)
for fd, event_mask in fd_events:
events = 0
if event_mask & ~select.POLLIN:
events |= EVENT_WRITE
if event_mask & ~select.POLLOUT:
events |= EVENT_READ
key = self._key_from_fd(fd)
if key:
ready.append((key, events & key.events))
return ready
__all__.append('PollSelector')
if hasattr(select, "epoll"):
class EpollSelector(BaseSelector):
""" Epoll-based selector """
def __init__(self):
super(EpollSelector, self).__init__()
self._epoll = select.epoll()
def fileno(self):
return self._epoll.fileno()
def register(self, fileobj, events, data=None):
key = super(EpollSelector, self).register(fileobj, events, data)
events_mask = 0
if events & EVENT_READ:
events_mask |= select.EPOLLIN
if events & EVENT_WRITE:
events_mask |= select.EPOLLOUT
_syscall_wrapper(self._epoll.register, False, key.fd, events_mask)
return key
def unregister(self, fileobj):
key = super(EpollSelector, self).unregister(fileobj)
try:
_syscall_wrapper(self._epoll.unregister, False, key.fd)
except _ERROR_TYPES:
# This can occur when the fd was closed since registry.
pass
return key
def select(self, timeout=None):
if timeout is not None:
if timeout <= 0:
timeout = 0.0
else:
# select.epoll.poll() has a resolution of 1 millisecond
# but luckily takes seconds so we don't need a wrapper
# like PollSelector. Just for better rounding.
timeout = math.ceil(timeout * 1000) * 0.001
timeout = float(timeout)
else:
timeout = -1.0 # epoll.poll() must have a float.
# We always want at least 1 to ensure that select can be called
# with no file descriptors registered. Otherwise will fail.
max_events = max(len(self._fd_to_key), 1)
ready = []
fd_events = _syscall_wrapper(self._epoll.poll, True,
timeout=timeout,
maxevents=max_events)
for fd, event_mask in fd_events:
events = 0
if event_mask & ~select.EPOLLIN:
events |= EVENT_WRITE
if event_mask & ~select.EPOLLOUT:
events |= EVENT_READ
key = self._key_from_fd(fd)
if key:
ready.append((key, events & key.events))
return ready
def close(self):
self._epoll.close()
super(EpollSelector, self).close()
__all__.append('EpollSelector')
if hasattr(select, "devpoll"):
class DevpollSelector(BaseSelector):
"""Solaris /dev/poll selector."""
def __init__(self):
super(DevpollSelector, self).__init__()
self._devpoll = select.devpoll()
def fileno(self):
return self._devpoll.fileno()
def register(self, fileobj, events, data=None):
key = super(DevpollSelector, self).register(fileobj, events, data)
poll_events = 0
if events & EVENT_READ:
poll_events |= select.POLLIN
if events & EVENT_WRITE:
poll_events |= select.POLLOUT
self._devpoll.register(key.fd, poll_events)
return key
def unregister(self, fileobj):
key = super(DevpollSelector, self).unregister(fileobj)
self._devpoll.unregister(key.fd)
return key
def _wrap_poll(self, timeout=None):
""" Wrapper function for select.poll.poll() so that
_syscall_wrapper can work with only seconds. """
if timeout is not None:
if timeout <= 0:
timeout = 0
else:
# select.devpoll.poll() has a resolution of 1 millisecond,
# round away from zero to wait *at least* timeout seconds.
timeout = math.ceil(timeout * 1000)
result = self._devpoll.poll(timeout)
return result
def select(self, timeout=None):
ready = []
fd_events = _syscall_wrapper(self._wrap_poll, True, timeout=timeout)
for fd, event_mask in fd_events:
events = 0
if event_mask & ~select.POLLIN:
events |= EVENT_WRITE
if event_mask & ~select.POLLOUT:
events |= EVENT_READ
key = self._key_from_fd(fd)
if key:
ready.append((key, events & key.events))
return ready
def close(self):
self._devpoll.close()
super(DevpollSelector, self).close()
__all__.append('DevpollSelector')
if hasattr(select, "kqueue"):
class KqueueSelector(BaseSelector):
""" Kqueue / Kevent-based selector """
def __init__(self):
super(KqueueSelector, self).__init__()
self._kqueue = select.kqueue()
def fileno(self):
return self._kqueue.fileno()
def register(self, fileobj, events, data=None):
key = super(KqueueSelector, self).register(fileobj, events, data)
if events & EVENT_READ:
kevent = select.kevent(key.fd,
select.KQ_FILTER_READ,
select.KQ_EV_ADD)
_syscall_wrapper(self._wrap_control, False, [kevent], 0, 0)
if events & EVENT_WRITE:
kevent = select.kevent(key.fd,
select.KQ_FILTER_WRITE,
select.KQ_EV_ADD)
_syscall_wrapper(self._wrap_control, False, [kevent], 0, 0)
return key
def unregister(self, fileobj):
key = super(KqueueSelector, self).unregister(fileobj)
if key.events & EVENT_READ:
kevent = select.kevent(key.fd,
select.KQ_FILTER_READ,
select.KQ_EV_DELETE)
try:
_syscall_wrapper(self._wrap_control, False, [kevent], 0, 0)
except _ERROR_TYPES:
pass
if key.events & EVENT_WRITE:
kevent = select.kevent(key.fd,
select.KQ_FILTER_WRITE,
select.KQ_EV_DELETE)
try:
_syscall_wrapper(self._wrap_control, False, [kevent], 0, 0)
except _ERROR_TYPES:
pass
return key
def select(self, timeout=None):
if timeout is not None:
timeout = max(timeout, 0)
max_events = len(self._fd_to_key) * 2
ready_fds = {}
kevent_list = _syscall_wrapper(self._wrap_control, True,
None, max_events, timeout=timeout)
for kevent in kevent_list:
fd = kevent.ident
event_mask = kevent.filter
events = 0
if event_mask == select.KQ_FILTER_READ:
events |= EVENT_READ
if event_mask == select.KQ_FILTER_WRITE:
events |= EVENT_WRITE
key = self._key_from_fd(fd)
if key:
if key.fd not in ready_fds:
ready_fds[key.fd] = (key, events & key.events)
else:
old_events = ready_fds[key.fd][1]
ready_fds[key.fd] = (key, (events | old_events) & key.events)
return list(ready_fds.values())
def close(self):
self._kqueue.close()
super(KqueueSelector, self).close()
def _wrap_control(self, changelist, max_events, timeout):
return self._kqueue.control(changelist, max_events, timeout)
__all__.append('KqueueSelector')
def _can_allocate(struct):
""" Checks that select structs can be allocated by the underlying
operating system, not just advertised by the select module. We don't
check select() because we'll be hopeful that most platforms that
don't have it available will not advertise it. (ie: GAE) """
try:
# select.poll() objects won't fail until used.
if struct == 'poll':
p = select.poll()
p.poll(0)
# All others will fail on allocation.
else:
getattr(select, struct)().close()
return True
except (OSError, AttributeError):
return False
# Python 3.5 uses a more direct route to wrap system calls to increase speed.
if sys.version_info >= (3, 5):
def _syscall_wrapper(func, _, *args, **kwargs):
""" This is the short-circuit version of the below logic
because in Python 3.5+ all selectors restart system calls. """
return func(*args, **kwargs)
else:
def _syscall_wrapper(func, recalc_timeout, *args, **kwargs):
""" Wrapper function for syscalls that could fail due to EINTR.
All functions should be retried if there is time left in the timeout
in accordance with PEP 475. """
timeout = kwargs.get("timeout", None)
if timeout is None:
expires = None
recalc_timeout = False
else:
timeout = float(timeout)
if timeout < 0.0: # Timeout less than 0 treated as no timeout.
expires = None
else:
expires = monotonic() + timeout
if recalc_timeout and 'timeout' not in kwargs:
raise ValueError(
'Timeout must be in kwargs to be recalculated')
result = _SYSCALL_SENTINEL
while result is _SYSCALL_SENTINEL:
try:
result = func(*args, **kwargs)
# OSError is thrown by select.select
# IOError is thrown by select.epoll.poll
# select.error is thrown by select.poll.poll
# Aren't we thankful for Python 3.x rework for exceptions?
except (OSError, IOError, select.error) as e:
# select.error wasn't a subclass of OSError in the past.
errcode = None
if hasattr(e, 'errno') and e.errno is not None:
errcode = e.errno
elif hasattr(e, 'args'):
errcode = e.args[0]
# Also test for the Windows equivalent of EINTR.
is_interrupt = (errcode == errno.EINTR or (hasattr(errno, 'WSAEINTR') and
errcode == errno.WSAEINTR))
if is_interrupt:
if expires is not None:
current_time = monotonic()
if current_time > expires:
raise OSError(errno.ETIMEDOUT, 'Connection timed out')
if recalc_timeout:
kwargs["timeout"] = expires - current_time
continue
raise
return result
# Choose the best implementation, roughly:
# kqueue == devpoll == epoll > poll > select
# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
def DefaultSelector():
""" This function serves as a first call for DefaultSelector to
detect if the select module is being monkey-patched incorrectly
by eventlet, greenlet, and preserve proper behavior. """
global _DEFAULT_SELECTOR
if _DEFAULT_SELECTOR is None:
if platform.python_implementation() == 'Jython': # Platform-specific: Jython
_DEFAULT_SELECTOR = JythonSelectSelector
elif _can_allocate('kqueue'):
_DEFAULT_SELECTOR = KqueueSelector
elif _can_allocate('devpoll'):
_DEFAULT_SELECTOR = DevpollSelector
elif _can_allocate('epoll'):
_DEFAULT_SELECTOR = EpollSelector
elif _can_allocate('poll'):
_DEFAULT_SELECTOR = PollSelector
elif hasattr(select, 'select'):
_DEFAULT_SELECTOR = SelectSelector
else: # Platform-specific: AppEngine
raise RuntimeError('Platform does not have a selector.')
return _DEFAULT_SELECTOR()