Included external libraries

I included these libraries here, to avoid problems sharing with other
users.

I will remove them later.
This commit is contained in:
Rodolfo García Peñas (kix) 2020-08-28 23:15:13 +02:00
parent 3711954908
commit fb43b31e7c
2 changed files with 1062 additions and 0 deletions

315
rfc6555.py Normal file
View File

@ -0,0 +1,315 @@
""" Python implementation of the Happy Eyeballs Algorithm described in RFC 6555. """
# Copyright 2017 Seth Michael Larson
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import errno
import socket
from selectors2 import DefaultSelector, EVENT_WRITE
# time.perf_counter() is defined in Python 3.3
try:
from time import perf_counter
except (ImportError, AttributeError):
from time import time as perf_counter
# This list is due to socket.error and IOError not being a
# subclass of OSError until later versions of Python.
_SOCKET_ERRORS = (socket.error, OSError, IOError)
# Detects whether an IPv6 socket can be allocated.
def _detect_ipv6():
if getattr(socket, 'has_ipv6', False) and hasattr(socket, 'AF_INET6'):
_sock = None
try:
_sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
_sock.bind(('::1', 0))
return True
except _SOCKET_ERRORS:
if _sock:
_sock.close()
return False
_HAS_IPV6 = _detect_ipv6()
# These are error numbers for asynchronous operations which can
# be safely ignored by RFC 6555 as being non-errors.
_ASYNC_ERRNOS = set([errno.EINPROGRESS,
errno.EAGAIN,
errno.EWOULDBLOCK])
if hasattr(errno, 'WSAWOULDBLOCK'):
_ASYNC_ERRNOS.add(errno.WSAWOULDBLOCK)
_DEFAULT_CACHE_DURATION = 60 * 10 # 10 minutes according to the RFC.
# This value that can be used to disable RFC 6555 globally.
RFC6555_ENABLED = _HAS_IPV6
__all__ = ['RFC6555_ENABLED',
'create_connection',
'cache']
__version__ = '0.0.0'
__author__ = 'Seth Michael Larson'
__email__ = 'sethmichaellarson@protonmail.com'
__license__ = 'Apache-2.0'
class _RFC6555CacheManager(object):
def __init__(self):
self.validity_duration = _DEFAULT_CACHE_DURATION
self.enabled = True
self.entries = {}
def add_entry(self, address, family):
if self.enabled:
current_time = perf_counter()
# Don't over-write old entries to reset their expiry.
if address not in self.entries or self.entries[address][1] > current_time:
self.entries[address] = (family, current_time + self.validity_duration)
def get_entry(self, address):
if not self.enabled or address not in self.entries:
return None
family, expiry = self.entries[address]
if perf_counter() > expiry:
del self.entries[address]
return None
return family
cache = _RFC6555CacheManager()
class _RFC6555ConnectionManager(object):
def __init__(self, address, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, source_address=None):
self.address = address
self.timeout = timeout
self.source_address = source_address
self._error = None
self._selector = DefaultSelector()
self._sockets = []
self._start_time = None
def create_connection(self):
self._start_time = perf_counter()
host, port = self.address
addr_info = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM)
ret = self._connect_with_cached_family(addr_info)
# If it's a list, then these are the remaining values to try.
if isinstance(ret, list):
addr_info = ret
else:
cache.add_entry(self.address, ret.family)
return ret
# If we don't get any results back then just skip to the end.
if not addr_info:
raise socket.error('getaddrinfo returns an empty list')
sock = self._attempt_connect_with_addr_info(addr_info)
if sock:
cache.add_entry(self.address, sock.family)
return sock
elif self._error:
raise self._error
else:
raise socket.timeout()
def _attempt_connect_with_addr_info(self, addr_info):
sock = None
try:
for family, socktype, proto, _, sockaddr in addr_info:
self._create_socket(family, socktype, proto, sockaddr)
sock = self._wait_for_connection(False)
if sock:
break
if sock is None:
sock = self._wait_for_connection(True)
finally:
self._remove_all_sockets()
return sock
def _connect_with_cached_family(self, addr_info):
family = cache.get_entry(self.address)
if family is None:
return addr_info
is_family = []
not_family = []
for value in addr_info:
if value[0] == family:
is_family.append(value)
else:
not_family.append(value)
sock = self._attempt_connect_with_addr_info(is_family)
if sock is not None:
return sock
return not_family
def _create_socket(self, family, socktype, proto, sockaddr):
sock = None
try:
sock = socket.socket(family, socktype, proto)
# If we're using the 'default' socket timeout we have
# to set it to a real value here as this is the earliest
# opportunity to without pre-allocating a socket just for
# this purpose.
if self.timeout is socket._GLOBAL_DEFAULT_TIMEOUT:
self.timeout = sock.gettimeout()
if self.source_address:
sock.bind(self.source_address)
# Make the socket non-blocking so we can use our selector.
sock.settimeout(0.0)
if self._is_acceptable_errno(sock.connect_ex(sockaddr)):
self._selector.register(sock, EVENT_WRITE)
self._sockets.append(sock)
except _SOCKET_ERRORS as e:
self._error = e
if sock is not None:
_RFC6555ConnectionManager._close_socket(sock)
def _wait_for_connection(self, last_wait):
self._remove_all_errored_sockets()
# This is a safe-guard to make sure sock.gettimeout() is called in the
# case that the default socket timeout is used. If there are no
# sockets then we may not have called sock.gettimeout() yet.
if not self._sockets:
return None
# If this is the last time we're waiting for connections
# then we should wait until we should raise a timeout
# error, otherwise we should only wait >0.2 seconds as
# recommended by RFC 6555.
if last_wait:
if self.timeout is None:
select_timeout = None
else:
select_timeout = self._get_remaining_time()
else:
select_timeout = self._get_select_time()
# Wait for any socket to become writable as a sign of being connected.
for key, _ in self._selector.select(select_timeout):
sock = key.fileobj
if not self._is_socket_errored(sock):
# Restore the old proper timeout of the socket.
sock.settimeout(self.timeout)
# Remove it from this list to exempt the socket from cleanup.
self._sockets.remove(sock)
self._selector.unregister(sock)
return sock
return None
def _get_remaining_time(self):
if self.timeout is None:
return None
return max(self.timeout - (perf_counter() - self._start_time), 0.0)
def _get_select_time(self):
if self.timeout is None:
return 0.2
return min(0.2, self._get_remaining_time())
def _remove_all_errored_sockets(self):
socks = []
for sock in self._sockets:
if self._is_socket_errored(sock):
socks.append(sock)
for sock in socks:
self._selector.unregister(sock)
self._sockets.remove(sock)
_RFC6555ConnectionManager._close_socket(sock)
@staticmethod
def _close_socket(sock):
try:
sock.close()
except _SOCKET_ERRORS:
pass
def _is_acceptable_errno(self, errno):
if errno == 0 or errno in _ASYNC_ERRNOS:
return True
self._error = socket.error()
self._error.errno = errno
return False
def _is_socket_errored(self, sock):
errno = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
return not self._is_acceptable_errno(errno)
def _remove_all_sockets(self):
for sock in self._sockets:
self._selector.unregister(sock)
_RFC6555ConnectionManager._close_socket(sock)
self._sockets = []
def create_connection(address, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, source_address=None):
if RFC6555_ENABLED and _HAS_IPV6:
manager = _RFC6555ConnectionManager(address, timeout, source_address)
return manager.create_connection()
else:
# This code is the same as socket.create_connection() but is
# here to make sure the same code is used across all Python versions as
# the source_address parameter was added to socket.create_connection() in 3.2
# This segment of code is licensed under the Python Software Foundation License
# See LICENSE: https://github.com/python/cpython/blob/3.6/LICENSE
host, port = address
err = None
for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
sock = None
try:
sock = socket.socket(af, socktype, proto)
if timeout is not socket._GLOBAL_DEFAULT_TIMEOUT:
sock.settimeout(timeout)
if source_address:
sock.bind(source_address)
sock.connect(sa)
return sock
except socket.error as _:
err = _
if sock is not None:
sock.close()
if err is not None:
raise err
else:
raise socket.error("getaddrinfo returns an empty list")

747
selectors2.py Normal file
View File

@ -0,0 +1,747 @@
""" Back-ported, durable, and portable selectors """
# MIT License
#
# Copyright (c) 2017 Seth Michael Larson
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from collections import namedtuple, Mapping
import errno
import math
import platform
import select
import socket
import sys
import time
try:
monotonic = time.monotonic
except AttributeError:
monotonic = time.time
__author__ = 'Seth Michael Larson'
__email__ = 'sethmichaellarson@protonmail.com'
__version__ = '2.0.2'
__license__ = 'MIT'
__url__ = 'https://www.github.com/SethMichaelLarson/selectors2'
__all__ = ['EVENT_READ',
'EVENT_WRITE',
'SelectorKey',
'DefaultSelector',
'BaseSelector']
EVENT_READ = (1 << 0)
EVENT_WRITE = (1 << 1)
_DEFAULT_SELECTOR = None
_SYSCALL_SENTINEL = object() # Sentinel in case a system call returns None.
_ERROR_TYPES = (OSError, IOError, socket.error)
try:
_INTEGER_TYPES = (int, long)
except NameError:
_INTEGER_TYPES = (int,)
SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data'])
class _SelectorMapping(Mapping):
""" Mapping of file objects to selector keys """
def __init__(self, selector):
self._selector = selector
def __len__(self):
return len(self._selector._fd_to_key)
def __getitem__(self, fileobj):
try:
fd = self._selector._fileobj_lookup(fileobj)
return self._selector._fd_to_key[fd]
except KeyError:
raise KeyError("{0!r} is not registered.".format(fileobj))
def __iter__(self):
return iter(self._selector._fd_to_key)
def _fileobj_to_fd(fileobj):
""" Return a file descriptor from a file object. If
given an integer will simply return that integer back. """
if isinstance(fileobj, _INTEGER_TYPES):
fd = fileobj
else:
for _integer_type in _INTEGER_TYPES:
try:
fd = _integer_type(fileobj.fileno())
break
except (AttributeError, TypeError, ValueError):
continue
else:
raise ValueError("Invalid file object: {0!r}".format(fileobj))
if fd < 0:
raise ValueError("Invalid file descriptor: {0}".format(fd))
return fd
class BaseSelector(object):
""" Abstract Selector class
A selector supports registering file objects to be monitored
for specific I/O events.
A file object is a file descriptor or any object with a
`fileno()` method. An arbitrary object can be attached to the
file object which can be used for example to store context info,
a callback, etc.
A selector can use various implementations (select(), poll(), epoll(),
and kqueue()) depending on the platform. The 'DefaultSelector' class uses
the most efficient implementation for the current platform.
"""
def __init__(self):
# Maps file descriptors to keys.
self._fd_to_key = {}
# Read-only mapping returned by get_map()
self._map = _SelectorMapping(self)
def _fileobj_lookup(self, fileobj):
""" Return a file descriptor from a file object.
This wraps _fileobj_to_fd() to do an exhaustive
search in case the object is invalid but we still
have it in our map. Used by unregister() so we can
unregister an object that was previously registered
even if it is closed. It is also used by _SelectorMapping
"""
try:
return _fileobj_to_fd(fileobj)
except ValueError:
# Search through all our mapped keys.
for key in self._fd_to_key.values():
if key.fileobj is fileobj:
return key.fd
# Raise ValueError after all.
raise
def register(self, fileobj, events, data=None):
""" Register a file object for a set of events to monitor. """
if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)):
raise ValueError("Invalid events: {0!r}".format(events))
key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data)
if key.fd in self._fd_to_key:
raise KeyError("{0!r} (FD {1}) is already registered"
.format(fileobj, key.fd))
self._fd_to_key[key.fd] = key
return key
def unregister(self, fileobj):
""" Unregister a file object from being monitored. """
try:
key = self._fd_to_key.pop(self._fileobj_lookup(fileobj))
except KeyError:
raise KeyError("{0!r} is not registered".format(fileobj))
# Getting the fileno of a closed socket on Windows errors with EBADF.
except socket.error as err:
if err.errno != errno.EBADF:
raise
else:
for key in self._fd_to_key.values():
if key.fileobj is fileobj:
self._fd_to_key.pop(key.fd)
break
else:
raise KeyError("{0!r} is not registered".format(fileobj))
return key
def modify(self, fileobj, events, data=None):
""" Change a registered file object monitored events and data. """
# NOTE: Some subclasses optimize this operation even further.
try:
key = self._fd_to_key[self._fileobj_lookup(fileobj)]
except KeyError:
raise KeyError("{0!r} is not registered".format(fileobj))
if events != key.events:
self.unregister(fileobj)
key = self.register(fileobj, events, data)
elif data != key.data:
# Use a shortcut to update the data.
key = key._replace(data=data)
self._fd_to_key[key.fd] = key
return key
def select(self, timeout=None):
""" Perform the actual selection until some monitored file objects
are ready or the timeout expires. """
raise NotImplementedError()
def close(self):
""" Close the selector. This must be called to ensure that all
underlying resources are freed. """
self._fd_to_key.clear()
self._map = None
def get_key(self, fileobj):
""" Return the key associated with a registered file object. """
mapping = self.get_map()
if mapping is None:
raise RuntimeError("Selector is closed")
try:
return mapping[fileobj]
except KeyError:
raise KeyError("{0!r} is not registered".format(fileobj))
def get_map(self):
""" Return a mapping of file objects to selector keys """
return self._map
def _key_from_fd(self, fd):
""" Return the key associated to a given file descriptor
Return None if it is not found. """
try:
return self._fd_to_key[fd]
except KeyError:
return None
def __enter__(self):
return self
def __exit__(self, *_):
self.close()
# Almost all platforms have select.select()
if hasattr(select, "select"):
class SelectSelector(BaseSelector):
""" Select-based selector. """
def __init__(self):
super(SelectSelector, self).__init__()
self._readers = set()
self._writers = set()
def register(self, fileobj, events, data=None):
key = super(SelectSelector, self).register(fileobj, events, data)
if events & EVENT_READ:
self._readers.add(key.fd)
if events & EVENT_WRITE:
self._writers.add(key.fd)
return key
def unregister(self, fileobj):
key = super(SelectSelector, self).unregister(fileobj)
self._readers.discard(key.fd)
self._writers.discard(key.fd)
return key
def select(self, timeout=None):
# Selecting on empty lists on Windows errors out.
if not len(self._readers) and not len(self._writers):
return []
timeout = None if timeout is None else max(timeout, 0.0)
ready = []
r, w, _ = _syscall_wrapper(self._wrap_select, True, self._readers,
self._writers, timeout=timeout)
r = set(r)
w = set(w)
for fd in r | w:
events = 0
if fd in r:
events |= EVENT_READ
if fd in w:
events |= EVENT_WRITE
key = self._key_from_fd(fd)
if key:
ready.append((key, events & key.events))
return ready
def _wrap_select(self, r, w, timeout=None):
""" Wrapper for select.select because timeout is a positional arg """
return select.select(r, w, [], timeout)
__all__.append('SelectSelector')
# Jython has a different implementation of .fileno() for socket objects.
if platform.python_implementation() == 'Jython':
class _JythonSelectorMapping(object):
""" This is an implementation of _SelectorMapping that is built
for use specifically with Jython, which does not provide a hashable
value from socket.socket.fileno(). """
def __init__(self, selector):
assert isinstance(selector, JythonSelectSelector)
self._selector = selector
def __len__(self):
return len(self._selector._sockets)
def __getitem__(self, fileobj):
for sock, key in self._selector._sockets:
if sock is fileobj:
return key
else:
raise KeyError("{0!r} is not registered.".format(fileobj))
class JythonSelectSelector(SelectSelector):
""" This is an implementation of SelectSelector that is for Jython
which works around that Jython's socket.socket.fileno() does not
return an integer fd value. All SelectorKey.fd will be equal to -1
and should not be used. This instead uses object id to compare fileobj
and will only use select.select as it's the only selector that allows
directly passing in socket objects rather than registering fds.
See: http://bugs.jython.org/issue1678
https://wiki.python.org/jython/NewSocketModule#socket.fileno.28.29_does_not_return_an_integer
"""
def __init__(self):
super(JythonSelectSelector, self).__init__()
self._sockets = [] # Uses a list of tuples instead of dictionary.
self._map = _JythonSelectorMapping(self)
self._readers = []
self._writers = []
# Jython has a select.cpython_compatible_select function in older versions.
self._select_func = getattr(select, 'cpython_compatible_select', select.select)
def register(self, fileobj, events, data=None):
for sock, _ in self._sockets:
if sock is fileobj:
raise KeyError("{0!r} is already registered"
.format(fileobj, sock))
key = SelectorKey(fileobj, -1, events, data)
self._sockets.append((fileobj, key))
if events & EVENT_READ:
self._readers.append(fileobj)
if events & EVENT_WRITE:
self._writers.append(fileobj)
return key
def unregister(self, fileobj):
for i, (sock, key) in enumerate(self._sockets):
if sock is fileobj:
break
else:
raise KeyError("{0!r} is not registered.".format(fileobj))
if key.events & EVENT_READ:
self._readers.remove(fileobj)
if key.events & EVENT_WRITE:
self._writers.remove(fileobj)
del self._sockets[i]
return key
def _wrap_select(self, r, w, timeout=None):
""" Wrapper for select.select because timeout is a positional arg """
return self._select_func(r, w, [], timeout)
__all__.append('JythonSelectSelector')
SelectSelector = JythonSelectSelector # Override so the wrong selector isn't used.
if hasattr(select, "poll"):
class PollSelector(BaseSelector):
""" Poll-based selector """
def __init__(self):
super(PollSelector, self).__init__()
self._poll = select.poll()
def register(self, fileobj, events, data=None):
key = super(PollSelector, self).register(fileobj, events, data)
event_mask = 0
if events & EVENT_READ:
event_mask |= select.POLLIN
if events & EVENT_WRITE:
event_mask |= select.POLLOUT
self._poll.register(key.fd, event_mask)
return key
def unregister(self, fileobj):
key = super(PollSelector, self).unregister(fileobj)
self._poll.unregister(key.fd)
return key
def _wrap_poll(self, timeout=None):
""" Wrapper function for select.poll.poll() so that
_syscall_wrapper can work with only seconds. """
if timeout is not None:
if timeout <= 0:
timeout = 0
else:
# select.poll.poll() has a resolution of 1 millisecond,
# round away from zero to wait *at least* timeout seconds.
timeout = math.ceil(timeout * 1000)
result = self._poll.poll(timeout)
return result
def select(self, timeout=None):
ready = []
fd_events = _syscall_wrapper(self._wrap_poll, True, timeout=timeout)
for fd, event_mask in fd_events:
events = 0
if event_mask & ~select.POLLIN:
events |= EVENT_WRITE
if event_mask & ~select.POLLOUT:
events |= EVENT_READ
key = self._key_from_fd(fd)
if key:
ready.append((key, events & key.events))
return ready
__all__.append('PollSelector')
if hasattr(select, "epoll"):
class EpollSelector(BaseSelector):
""" Epoll-based selector """
def __init__(self):
super(EpollSelector, self).__init__()
self._epoll = select.epoll()
def fileno(self):
return self._epoll.fileno()
def register(self, fileobj, events, data=None):
key = super(EpollSelector, self).register(fileobj, events, data)
events_mask = 0
if events & EVENT_READ:
events_mask |= select.EPOLLIN
if events & EVENT_WRITE:
events_mask |= select.EPOLLOUT
_syscall_wrapper(self._epoll.register, False, key.fd, events_mask)
return key
def unregister(self, fileobj):
key = super(EpollSelector, self).unregister(fileobj)
try:
_syscall_wrapper(self._epoll.unregister, False, key.fd)
except _ERROR_TYPES:
# This can occur when the fd was closed since registry.
pass
return key
def select(self, timeout=None):
if timeout is not None:
if timeout <= 0:
timeout = 0.0
else:
# select.epoll.poll() has a resolution of 1 millisecond
# but luckily takes seconds so we don't need a wrapper
# like PollSelector. Just for better rounding.
timeout = math.ceil(timeout * 1000) * 0.001
timeout = float(timeout)
else:
timeout = -1.0 # epoll.poll() must have a float.
# We always want at least 1 to ensure that select can be called
# with no file descriptors registered. Otherwise will fail.
max_events = max(len(self._fd_to_key), 1)
ready = []
fd_events = _syscall_wrapper(self._epoll.poll, True,
timeout=timeout,
maxevents=max_events)
for fd, event_mask in fd_events:
events = 0
if event_mask & ~select.EPOLLIN:
events |= EVENT_WRITE
if event_mask & ~select.EPOLLOUT:
events |= EVENT_READ
key = self._key_from_fd(fd)
if key:
ready.append((key, events & key.events))
return ready
def close(self):
self._epoll.close()
super(EpollSelector, self).close()
__all__.append('EpollSelector')
if hasattr(select, "devpoll"):
class DevpollSelector(BaseSelector):
"""Solaris /dev/poll selector."""
def __init__(self):
super(DevpollSelector, self).__init__()
self._devpoll = select.devpoll()
def fileno(self):
return self._devpoll.fileno()
def register(self, fileobj, events, data=None):
key = super(DevpollSelector, self).register(fileobj, events, data)
poll_events = 0
if events & EVENT_READ:
poll_events |= select.POLLIN
if events & EVENT_WRITE:
poll_events |= select.POLLOUT
self._devpoll.register(key.fd, poll_events)
return key
def unregister(self, fileobj):
key = super(DevpollSelector, self).unregister(fileobj)
self._devpoll.unregister(key.fd)
return key
def _wrap_poll(self, timeout=None):
""" Wrapper function for select.poll.poll() so that
_syscall_wrapper can work with only seconds. """
if timeout is not None:
if timeout <= 0:
timeout = 0
else:
# select.devpoll.poll() has a resolution of 1 millisecond,
# round away from zero to wait *at least* timeout seconds.
timeout = math.ceil(timeout * 1000)
result = self._devpoll.poll(timeout)
return result
def select(self, timeout=None):
ready = []
fd_events = _syscall_wrapper(self._wrap_poll, True, timeout=timeout)
for fd, event_mask in fd_events:
events = 0
if event_mask & ~select.POLLIN:
events |= EVENT_WRITE
if event_mask & ~select.POLLOUT:
events |= EVENT_READ
key = self._key_from_fd(fd)
if key:
ready.append((key, events & key.events))
return ready
def close(self):
self._devpoll.close()
super(DevpollSelector, self).close()
__all__.append('DevpollSelector')
if hasattr(select, "kqueue"):
class KqueueSelector(BaseSelector):
""" Kqueue / Kevent-based selector """
def __init__(self):
super(KqueueSelector, self).__init__()
self._kqueue = select.kqueue()
def fileno(self):
return self._kqueue.fileno()
def register(self, fileobj, events, data=None):
key = super(KqueueSelector, self).register(fileobj, events, data)
if events & EVENT_READ:
kevent = select.kevent(key.fd,
select.KQ_FILTER_READ,
select.KQ_EV_ADD)
_syscall_wrapper(self._wrap_control, False, [kevent], 0, 0)
if events & EVENT_WRITE:
kevent = select.kevent(key.fd,
select.KQ_FILTER_WRITE,
select.KQ_EV_ADD)
_syscall_wrapper(self._wrap_control, False, [kevent], 0, 0)
return key
def unregister(self, fileobj):
key = super(KqueueSelector, self).unregister(fileobj)
if key.events & EVENT_READ:
kevent = select.kevent(key.fd,
select.KQ_FILTER_READ,
select.KQ_EV_DELETE)
try:
_syscall_wrapper(self._wrap_control, False, [kevent], 0, 0)
except _ERROR_TYPES:
pass
if key.events & EVENT_WRITE:
kevent = select.kevent(key.fd,
select.KQ_FILTER_WRITE,
select.KQ_EV_DELETE)
try:
_syscall_wrapper(self._wrap_control, False, [kevent], 0, 0)
except _ERROR_TYPES:
pass
return key
def select(self, timeout=None):
if timeout is not None:
timeout = max(timeout, 0)
max_events = len(self._fd_to_key) * 2
ready_fds = {}
kevent_list = _syscall_wrapper(self._wrap_control, True,
None, max_events, timeout=timeout)
for kevent in kevent_list:
fd = kevent.ident
event_mask = kevent.filter
events = 0
if event_mask == select.KQ_FILTER_READ:
events |= EVENT_READ
if event_mask == select.KQ_FILTER_WRITE:
events |= EVENT_WRITE
key = self._key_from_fd(fd)
if key:
if key.fd not in ready_fds:
ready_fds[key.fd] = (key, events & key.events)
else:
old_events = ready_fds[key.fd][1]
ready_fds[key.fd] = (key, (events | old_events) & key.events)
return list(ready_fds.values())
def close(self):
self._kqueue.close()
super(KqueueSelector, self).close()
def _wrap_control(self, changelist, max_events, timeout):
return self._kqueue.control(changelist, max_events, timeout)
__all__.append('KqueueSelector')
def _can_allocate(struct):
""" Checks that select structs can be allocated by the underlying
operating system, not just advertised by the select module. We don't
check select() because we'll be hopeful that most platforms that
don't have it available will not advertise it. (ie: GAE) """
try:
# select.poll() objects won't fail until used.
if struct == 'poll':
p = select.poll()
p.poll(0)
# All others will fail on allocation.
else:
getattr(select, struct)().close()
return True
except (OSError, AttributeError):
return False
# Python 3.5 uses a more direct route to wrap system calls to increase speed.
if sys.version_info >= (3, 5):
def _syscall_wrapper(func, _, *args, **kwargs):
""" This is the short-circuit version of the below logic
because in Python 3.5+ all selectors restart system calls. """
return func(*args, **kwargs)
else:
def _syscall_wrapper(func, recalc_timeout, *args, **kwargs):
""" Wrapper function for syscalls that could fail due to EINTR.
All functions should be retried if there is time left in the timeout
in accordance with PEP 475. """
timeout = kwargs.get("timeout", None)
if timeout is None:
expires = None
recalc_timeout = False
else:
timeout = float(timeout)
if timeout < 0.0: # Timeout less than 0 treated as no timeout.
expires = None
else:
expires = monotonic() + timeout
if recalc_timeout and 'timeout' not in kwargs:
raise ValueError(
'Timeout must be in kwargs to be recalculated')
result = _SYSCALL_SENTINEL
while result is _SYSCALL_SENTINEL:
try:
result = func(*args, **kwargs)
# OSError is thrown by select.select
# IOError is thrown by select.epoll.poll
# select.error is thrown by select.poll.poll
# Aren't we thankful for Python 3.x rework for exceptions?
except (OSError, IOError, select.error) as e:
# select.error wasn't a subclass of OSError in the past.
errcode = None
if hasattr(e, 'errno') and e.errno is not None:
errcode = e.errno
elif hasattr(e, 'args'):
errcode = e.args[0]
# Also test for the Windows equivalent of EINTR.
is_interrupt = (errcode == errno.EINTR or (hasattr(errno, 'WSAEINTR') and
errcode == errno.WSAEINTR))
if is_interrupt:
if expires is not None:
current_time = monotonic()
if current_time > expires:
raise OSError(errno.ETIMEDOUT, 'Connection timed out')
if recalc_timeout:
kwargs["timeout"] = expires - current_time
continue
raise
return result
# Choose the best implementation, roughly:
# kqueue == devpoll == epoll > poll > select
# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
def DefaultSelector():
""" This function serves as a first call for DefaultSelector to
detect if the select module is being monkey-patched incorrectly
by eventlet, greenlet, and preserve proper behavior. """
global _DEFAULT_SELECTOR
if _DEFAULT_SELECTOR is None:
if platform.python_implementation() == 'Jython': # Platform-specific: Jython
_DEFAULT_SELECTOR = JythonSelectSelector
elif _can_allocate('kqueue'):
_DEFAULT_SELECTOR = KqueueSelector
elif _can_allocate('devpoll'):
_DEFAULT_SELECTOR = DevpollSelector
elif _can_allocate('epoll'):
_DEFAULT_SELECTOR = EpollSelector
elif _can_allocate('poll'):
_DEFAULT_SELECTOR = PollSelector
elif hasattr(select, 'select'):
_DEFAULT_SELECTOR = SelectSelector
else: # Platform-specific: AppEngine
raise RuntimeError('Platform does not have a selector.')
return _DEFAULT_SELECTOR()