From 898ae18f7724ad9c823bda73eb10c555013a35aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rodolfo=20Garc=C3=ADa=20Pe=C3=B1as=20=28kix=29?= Date: Wed, 2 Sep 2020 09:58:35 +0200 Subject: [PATCH] Remove rfc6555.py and selectors2.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit I included these files to make some tests, but now we can remove them and use the system files. You can install these files using pip install rfc6555.py selectors2.py is a dependency of rfc6555 package. Signed-off-by: Rodolfo García Peñas (kix) --- rfc6555.py | 315 --------------------- selectors2.py | 747 -------------------------------------------------- 2 files changed, 1062 deletions(-) delete mode 100644 rfc6555.py delete mode 100644 selectors2.py diff --git a/rfc6555.py b/rfc6555.py deleted file mode 100644 index 56436b7..0000000 --- a/rfc6555.py +++ /dev/null @@ -1,315 +0,0 @@ -""" Python implementation of the Happy Eyeballs Algorithm described in RFC 6555. """ - -# Copyright 2017 Seth Michael Larson -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import errno -import socket -from selectors2 import DefaultSelector, EVENT_WRITE - -# time.perf_counter() is defined in Python 3.3 -try: - from time import perf_counter -except (ImportError, AttributeError): - from time import time as perf_counter - - -# This list is due to socket.error and IOError not being a -# subclass of OSError until later versions of Python. -_SOCKET_ERRORS = (socket.error, OSError, IOError) - - -# Detects whether an IPv6 socket can be allocated. -def _detect_ipv6(): - if getattr(socket, 'has_ipv6', False) and hasattr(socket, 'AF_INET6'): - _sock = None - try: - _sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) - _sock.bind(('::1', 0)) - return True - except _SOCKET_ERRORS: - if _sock: - _sock.close() - return False - - -_HAS_IPV6 = _detect_ipv6() - -# These are error numbers for asynchronous operations which can -# be safely ignored by RFC 6555 as being non-errors. -_ASYNC_ERRNOS = set([errno.EINPROGRESS, - errno.EAGAIN, - errno.EWOULDBLOCK]) -if hasattr(errno, 'WSAWOULDBLOCK'): - _ASYNC_ERRNOS.add(errno.WSAWOULDBLOCK) - -_DEFAULT_CACHE_DURATION = 60 * 10 # 10 minutes according to the RFC. - -# This value that can be used to disable RFC 6555 globally. -RFC6555_ENABLED = _HAS_IPV6 - -__all__ = ['RFC6555_ENABLED', - 'create_connection', - 'cache'] - -__version__ = '0.0.0' -__author__ = 'Seth Michael Larson' -__email__ = 'sethmichaellarson@protonmail.com' -__license__ = 'Apache-2.0' - - -class _RFC6555CacheManager(object): - def __init__(self): - self.validity_duration = _DEFAULT_CACHE_DURATION - self.enabled = True - self.entries = {} - - def add_entry(self, address, family): - if self.enabled: - current_time = perf_counter() - - # Don't over-write old entries to reset their expiry. - if address not in self.entries or self.entries[address][1] > current_time: - self.entries[address] = (family, current_time + self.validity_duration) - - def get_entry(self, address): - if not self.enabled or address not in self.entries: - return None - - family, expiry = self.entries[address] - if perf_counter() > expiry: - del self.entries[address] - return None - - return family - - -cache = _RFC6555CacheManager() - - -class _RFC6555ConnectionManager(object): - def __init__(self, address, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, source_address=None): - self.address = address - self.timeout = timeout - self.source_address = source_address - - self._error = None - self._selector = DefaultSelector() - self._sockets = [] - self._start_time = None - - def create_connection(self): - self._start_time = perf_counter() - - host, port = self.address - addr_info = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM) - ret = self._connect_with_cached_family(addr_info) - - # If it's a list, then these are the remaining values to try. - if isinstance(ret, list): - addr_info = ret - else: - cache.add_entry(self.address, ret.family) - return ret - - # If we don't get any results back then just skip to the end. - if not addr_info: - raise socket.error('getaddrinfo returns an empty list') - - sock = self._attempt_connect_with_addr_info(addr_info) - - if sock: - cache.add_entry(self.address, sock.family) - return sock - elif self._error: - raise self._error - else: - raise socket.timeout() - - def _attempt_connect_with_addr_info(self, addr_info): - sock = None - try: - for family, socktype, proto, _, sockaddr in addr_info: - self._create_socket(family, socktype, proto, sockaddr) - sock = self._wait_for_connection(False) - if sock: - break - if sock is None: - sock = self._wait_for_connection(True) - finally: - self._remove_all_sockets() - return sock - - def _connect_with_cached_family(self, addr_info): - family = cache.get_entry(self.address) - if family is None: - return addr_info - - is_family = [] - not_family = [] - - for value in addr_info: - if value[0] == family: - is_family.append(value) - else: - not_family.append(value) - - sock = self._attempt_connect_with_addr_info(is_family) - if sock is not None: - return sock - - return not_family - - def _create_socket(self, family, socktype, proto, sockaddr): - sock = None - try: - sock = socket.socket(family, socktype, proto) - - # If we're using the 'default' socket timeout we have - # to set it to a real value here as this is the earliest - # opportunity to without pre-allocating a socket just for - # this purpose. - if self.timeout is socket._GLOBAL_DEFAULT_TIMEOUT: - self.timeout = sock.gettimeout() - - if self.source_address: - sock.bind(self.source_address) - - # Make the socket non-blocking so we can use our selector. - sock.settimeout(0.0) - - if self._is_acceptable_errno(sock.connect_ex(sockaddr)): - self._selector.register(sock, EVENT_WRITE) - self._sockets.append(sock) - - except _SOCKET_ERRORS as e: - self._error = e - if sock is not None: - _RFC6555ConnectionManager._close_socket(sock) - - def _wait_for_connection(self, last_wait): - self._remove_all_errored_sockets() - - # This is a safe-guard to make sure sock.gettimeout() is called in the - # case that the default socket timeout is used. If there are no - # sockets then we may not have called sock.gettimeout() yet. - if not self._sockets: - return None - - # If this is the last time we're waiting for connections - # then we should wait until we should raise a timeout - # error, otherwise we should only wait >0.2 seconds as - # recommended by RFC 6555. - if last_wait: - if self.timeout is None: - select_timeout = None - else: - select_timeout = self._get_remaining_time() - else: - select_timeout = self._get_select_time() - - # Wait for any socket to become writable as a sign of being connected. - for key, _ in self._selector.select(select_timeout): - sock = key.fileobj - - if not self._is_socket_errored(sock): - - # Restore the old proper timeout of the socket. - sock.settimeout(self.timeout) - - # Remove it from this list to exempt the socket from cleanup. - self._sockets.remove(sock) - self._selector.unregister(sock) - return sock - - return None - - def _get_remaining_time(self): - if self.timeout is None: - return None - return max(self.timeout - (perf_counter() - self._start_time), 0.0) - - def _get_select_time(self): - if self.timeout is None: - return 0.2 - return min(0.2, self._get_remaining_time()) - - def _remove_all_errored_sockets(self): - socks = [] - for sock in self._sockets: - if self._is_socket_errored(sock): - socks.append(sock) - for sock in socks: - self._selector.unregister(sock) - self._sockets.remove(sock) - _RFC6555ConnectionManager._close_socket(sock) - - @staticmethod - def _close_socket(sock): - try: - sock.close() - except _SOCKET_ERRORS: - pass - - def _is_acceptable_errno(self, errno): - if errno == 0 or errno in _ASYNC_ERRNOS: - return True - self._error = socket.error() - self._error.errno = errno - return False - - def _is_socket_errored(self, sock): - errno = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) - return not self._is_acceptable_errno(errno) - - def _remove_all_sockets(self): - for sock in self._sockets: - self._selector.unregister(sock) - _RFC6555ConnectionManager._close_socket(sock) - self._sockets = [] - - -def create_connection(address, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, source_address=None): - if RFC6555_ENABLED and _HAS_IPV6: - manager = _RFC6555ConnectionManager(address, timeout, source_address) - return manager.create_connection() - else: - # This code is the same as socket.create_connection() but is - # here to make sure the same code is used across all Python versions as - # the source_address parameter was added to socket.create_connection() in 3.2 - # This segment of code is licensed under the Python Software Foundation License - # See LICENSE: https://github.com/python/cpython/blob/3.6/LICENSE - host, port = address - err = None - for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM): - af, socktype, proto, canonname, sa = res - sock = None - try: - sock = socket.socket(af, socktype, proto) - if timeout is not socket._GLOBAL_DEFAULT_TIMEOUT: - sock.settimeout(timeout) - if source_address: - sock.bind(source_address) - sock.connect(sa) - return sock - - except socket.error as _: - err = _ - if sock is not None: - sock.close() - - if err is not None: - raise err - else: - raise socket.error("getaddrinfo returns an empty list") diff --git a/selectors2.py b/selectors2.py deleted file mode 100644 index 1625a30..0000000 --- a/selectors2.py +++ /dev/null @@ -1,747 +0,0 @@ -""" Back-ported, durable, and portable selectors """ - -# MIT License -# -# Copyright (c) 2017 Seth Michael Larson -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. - -from collections import namedtuple, Mapping -import errno -import math -import platform -import select -import socket -import sys -import time - -try: - monotonic = time.monotonic -except AttributeError: - monotonic = time.time - -__author__ = 'Seth Michael Larson' -__email__ = 'sethmichaellarson@protonmail.com' -__version__ = '2.0.2' -__license__ = 'MIT' -__url__ = 'https://www.github.com/SethMichaelLarson/selectors2' - -__all__ = ['EVENT_READ', - 'EVENT_WRITE', - 'SelectorKey', - 'DefaultSelector', - 'BaseSelector'] - -EVENT_READ = (1 << 0) -EVENT_WRITE = (1 << 1) -_DEFAULT_SELECTOR = None -_SYSCALL_SENTINEL = object() # Sentinel in case a system call returns None. -_ERROR_TYPES = (OSError, IOError, socket.error) - -try: - _INTEGER_TYPES = (int, long) -except NameError: - _INTEGER_TYPES = (int,) - - -SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data']) - - -class _SelectorMapping(Mapping): - """ Mapping of file objects to selector keys """ - - def __init__(self, selector): - self._selector = selector - - def __len__(self): - return len(self._selector._fd_to_key) - - def __getitem__(self, fileobj): - try: - fd = self._selector._fileobj_lookup(fileobj) - return self._selector._fd_to_key[fd] - except KeyError: - raise KeyError("{0!r} is not registered.".format(fileobj)) - - def __iter__(self): - return iter(self._selector._fd_to_key) - - -def _fileobj_to_fd(fileobj): - """ Return a file descriptor from a file object. If - given an integer will simply return that integer back. """ - if isinstance(fileobj, _INTEGER_TYPES): - fd = fileobj - else: - for _integer_type in _INTEGER_TYPES: - try: - fd = _integer_type(fileobj.fileno()) - break - except (AttributeError, TypeError, ValueError): - continue - else: - raise ValueError("Invalid file object: {0!r}".format(fileobj)) - if fd < 0: - raise ValueError("Invalid file descriptor: {0}".format(fd)) - return fd - - -class BaseSelector(object): - """ Abstract Selector class - - A selector supports registering file objects to be monitored - for specific I/O events. - - A file object is a file descriptor or any object with a - `fileno()` method. An arbitrary object can be attached to the - file object which can be used for example to store context info, - a callback, etc. - - A selector can use various implementations (select(), poll(), epoll(), - and kqueue()) depending on the platform. The 'DefaultSelector' class uses - the most efficient implementation for the current platform. - """ - def __init__(self): - # Maps file descriptors to keys. - self._fd_to_key = {} - - # Read-only mapping returned by get_map() - self._map = _SelectorMapping(self) - - def _fileobj_lookup(self, fileobj): - """ Return a file descriptor from a file object. - This wraps _fileobj_to_fd() to do an exhaustive - search in case the object is invalid but we still - have it in our map. Used by unregister() so we can - unregister an object that was previously registered - even if it is closed. It is also used by _SelectorMapping - """ - try: - return _fileobj_to_fd(fileobj) - except ValueError: - - # Search through all our mapped keys. - for key in self._fd_to_key.values(): - if key.fileobj is fileobj: - return key.fd - - # Raise ValueError after all. - raise - - def register(self, fileobj, events, data=None): - """ Register a file object for a set of events to monitor. """ - if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)): - raise ValueError("Invalid events: {0!r}".format(events)) - - key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data) - - if key.fd in self._fd_to_key: - raise KeyError("{0!r} (FD {1}) is already registered" - .format(fileobj, key.fd)) - - self._fd_to_key[key.fd] = key - return key - - def unregister(self, fileobj): - """ Unregister a file object from being monitored. """ - try: - key = self._fd_to_key.pop(self._fileobj_lookup(fileobj)) - except KeyError: - raise KeyError("{0!r} is not registered".format(fileobj)) - - # Getting the fileno of a closed socket on Windows errors with EBADF. - except socket.error as err: - if err.errno != errno.EBADF: - raise - else: - for key in self._fd_to_key.values(): - if key.fileobj is fileobj: - self._fd_to_key.pop(key.fd) - break - else: - raise KeyError("{0!r} is not registered".format(fileobj)) - return key - - def modify(self, fileobj, events, data=None): - """ Change a registered file object monitored events and data. """ - # NOTE: Some subclasses optimize this operation even further. - try: - key = self._fd_to_key[self._fileobj_lookup(fileobj)] - except KeyError: - raise KeyError("{0!r} is not registered".format(fileobj)) - - if events != key.events: - self.unregister(fileobj) - key = self.register(fileobj, events, data) - - elif data != key.data: - # Use a shortcut to update the data. - key = key._replace(data=data) - self._fd_to_key[key.fd] = key - - return key - - def select(self, timeout=None): - """ Perform the actual selection until some monitored file objects - are ready or the timeout expires. """ - raise NotImplementedError() - - def close(self): - """ Close the selector. This must be called to ensure that all - underlying resources are freed. """ - self._fd_to_key.clear() - self._map = None - - def get_key(self, fileobj): - """ Return the key associated with a registered file object. """ - mapping = self.get_map() - if mapping is None: - raise RuntimeError("Selector is closed") - try: - return mapping[fileobj] - except KeyError: - raise KeyError("{0!r} is not registered".format(fileobj)) - - def get_map(self): - """ Return a mapping of file objects to selector keys """ - return self._map - - def _key_from_fd(self, fd): - """ Return the key associated to a given file descriptor - Return None if it is not found. """ - try: - return self._fd_to_key[fd] - except KeyError: - return None - - def __enter__(self): - return self - - def __exit__(self, *_): - self.close() - - -# Almost all platforms have select.select() -if hasattr(select, "select"): - class SelectSelector(BaseSelector): - """ Select-based selector. """ - def __init__(self): - super(SelectSelector, self).__init__() - self._readers = set() - self._writers = set() - - def register(self, fileobj, events, data=None): - key = super(SelectSelector, self).register(fileobj, events, data) - if events & EVENT_READ: - self._readers.add(key.fd) - if events & EVENT_WRITE: - self._writers.add(key.fd) - return key - - def unregister(self, fileobj): - key = super(SelectSelector, self).unregister(fileobj) - self._readers.discard(key.fd) - self._writers.discard(key.fd) - return key - - def select(self, timeout=None): - # Selecting on empty lists on Windows errors out. - if not len(self._readers) and not len(self._writers): - return [] - - timeout = None if timeout is None else max(timeout, 0.0) - ready = [] - r, w, _ = _syscall_wrapper(self._wrap_select, True, self._readers, - self._writers, timeout=timeout) - r = set(r) - w = set(w) - for fd in r | w: - events = 0 - if fd in r: - events |= EVENT_READ - if fd in w: - events |= EVENT_WRITE - - key = self._key_from_fd(fd) - if key: - ready.append((key, events & key.events)) - return ready - - def _wrap_select(self, r, w, timeout=None): - """ Wrapper for select.select because timeout is a positional arg """ - return select.select(r, w, [], timeout) - - __all__.append('SelectSelector') - - # Jython has a different implementation of .fileno() for socket objects. - if platform.python_implementation() == 'Jython': - class _JythonSelectorMapping(object): - """ This is an implementation of _SelectorMapping that is built - for use specifically with Jython, which does not provide a hashable - value from socket.socket.fileno(). """ - - def __init__(self, selector): - assert isinstance(selector, JythonSelectSelector) - self._selector = selector - - def __len__(self): - return len(self._selector._sockets) - - def __getitem__(self, fileobj): - for sock, key in self._selector._sockets: - if sock is fileobj: - return key - else: - raise KeyError("{0!r} is not registered.".format(fileobj)) - - class JythonSelectSelector(SelectSelector): - """ This is an implementation of SelectSelector that is for Jython - which works around that Jython's socket.socket.fileno() does not - return an integer fd value. All SelectorKey.fd will be equal to -1 - and should not be used. This instead uses object id to compare fileobj - and will only use select.select as it's the only selector that allows - directly passing in socket objects rather than registering fds. - See: http://bugs.jython.org/issue1678 - https://wiki.python.org/jython/NewSocketModule#socket.fileno.28.29_does_not_return_an_integer - """ - - def __init__(self): - super(JythonSelectSelector, self).__init__() - - self._sockets = [] # Uses a list of tuples instead of dictionary. - self._map = _JythonSelectorMapping(self) - self._readers = [] - self._writers = [] - - # Jython has a select.cpython_compatible_select function in older versions. - self._select_func = getattr(select, 'cpython_compatible_select', select.select) - - def register(self, fileobj, events, data=None): - for sock, _ in self._sockets: - if sock is fileobj: - raise KeyError("{0!r} is already registered" - .format(fileobj, sock)) - - key = SelectorKey(fileobj, -1, events, data) - self._sockets.append((fileobj, key)) - - if events & EVENT_READ: - self._readers.append(fileobj) - if events & EVENT_WRITE: - self._writers.append(fileobj) - return key - - def unregister(self, fileobj): - for i, (sock, key) in enumerate(self._sockets): - if sock is fileobj: - break - else: - raise KeyError("{0!r} is not registered.".format(fileobj)) - - if key.events & EVENT_READ: - self._readers.remove(fileobj) - if key.events & EVENT_WRITE: - self._writers.remove(fileobj) - - del self._sockets[i] - return key - - def _wrap_select(self, r, w, timeout=None): - """ Wrapper for select.select because timeout is a positional arg """ - return self._select_func(r, w, [], timeout) - - __all__.append('JythonSelectSelector') - SelectSelector = JythonSelectSelector # Override so the wrong selector isn't used. - - -if hasattr(select, "poll"): - class PollSelector(BaseSelector): - """ Poll-based selector """ - def __init__(self): - super(PollSelector, self).__init__() - self._poll = select.poll() - - def register(self, fileobj, events, data=None): - key = super(PollSelector, self).register(fileobj, events, data) - event_mask = 0 - if events & EVENT_READ: - event_mask |= select.POLLIN - if events & EVENT_WRITE: - event_mask |= select.POLLOUT - self._poll.register(key.fd, event_mask) - return key - - def unregister(self, fileobj): - key = super(PollSelector, self).unregister(fileobj) - self._poll.unregister(key.fd) - return key - - def _wrap_poll(self, timeout=None): - """ Wrapper function for select.poll.poll() so that - _syscall_wrapper can work with only seconds. """ - if timeout is not None: - if timeout <= 0: - timeout = 0 - else: - # select.poll.poll() has a resolution of 1 millisecond, - # round away from zero to wait *at least* timeout seconds. - timeout = math.ceil(timeout * 1000) - - result = self._poll.poll(timeout) - return result - - def select(self, timeout=None): - ready = [] - fd_events = _syscall_wrapper(self._wrap_poll, True, timeout=timeout) - for fd, event_mask in fd_events: - events = 0 - if event_mask & ~select.POLLIN: - events |= EVENT_WRITE - if event_mask & ~select.POLLOUT: - events |= EVENT_READ - - key = self._key_from_fd(fd) - if key: - ready.append((key, events & key.events)) - - return ready - - __all__.append('PollSelector') - -if hasattr(select, "epoll"): - class EpollSelector(BaseSelector): - """ Epoll-based selector """ - def __init__(self): - super(EpollSelector, self).__init__() - self._epoll = select.epoll() - - def fileno(self): - return self._epoll.fileno() - - def register(self, fileobj, events, data=None): - key = super(EpollSelector, self).register(fileobj, events, data) - events_mask = 0 - if events & EVENT_READ: - events_mask |= select.EPOLLIN - if events & EVENT_WRITE: - events_mask |= select.EPOLLOUT - _syscall_wrapper(self._epoll.register, False, key.fd, events_mask) - return key - - def unregister(self, fileobj): - key = super(EpollSelector, self).unregister(fileobj) - try: - _syscall_wrapper(self._epoll.unregister, False, key.fd) - except _ERROR_TYPES: - # This can occur when the fd was closed since registry. - pass - return key - - def select(self, timeout=None): - if timeout is not None: - if timeout <= 0: - timeout = 0.0 - else: - # select.epoll.poll() has a resolution of 1 millisecond - # but luckily takes seconds so we don't need a wrapper - # like PollSelector. Just for better rounding. - timeout = math.ceil(timeout * 1000) * 0.001 - timeout = float(timeout) - else: - timeout = -1.0 # epoll.poll() must have a float. - - # We always want at least 1 to ensure that select can be called - # with no file descriptors registered. Otherwise will fail. - max_events = max(len(self._fd_to_key), 1) - - ready = [] - fd_events = _syscall_wrapper(self._epoll.poll, True, - timeout=timeout, - maxevents=max_events) - for fd, event_mask in fd_events: - events = 0 - if event_mask & ~select.EPOLLIN: - events |= EVENT_WRITE - if event_mask & ~select.EPOLLOUT: - events |= EVENT_READ - - key = self._key_from_fd(fd) - if key: - ready.append((key, events & key.events)) - return ready - - def close(self): - self._epoll.close() - super(EpollSelector, self).close() - - __all__.append('EpollSelector') - - -if hasattr(select, "devpoll"): - class DevpollSelector(BaseSelector): - """Solaris /dev/poll selector.""" - - def __init__(self): - super(DevpollSelector, self).__init__() - self._devpoll = select.devpoll() - - def fileno(self): - return self._devpoll.fileno() - - def register(self, fileobj, events, data=None): - key = super(DevpollSelector, self).register(fileobj, events, data) - poll_events = 0 - if events & EVENT_READ: - poll_events |= select.POLLIN - if events & EVENT_WRITE: - poll_events |= select.POLLOUT - self._devpoll.register(key.fd, poll_events) - return key - - def unregister(self, fileobj): - key = super(DevpollSelector, self).unregister(fileobj) - self._devpoll.unregister(key.fd) - return key - - def _wrap_poll(self, timeout=None): - """ Wrapper function for select.poll.poll() so that - _syscall_wrapper can work with only seconds. """ - if timeout is not None: - if timeout <= 0: - timeout = 0 - else: - # select.devpoll.poll() has a resolution of 1 millisecond, - # round away from zero to wait *at least* timeout seconds. - timeout = math.ceil(timeout * 1000) - - result = self._devpoll.poll(timeout) - return result - - def select(self, timeout=None): - ready = [] - fd_events = _syscall_wrapper(self._wrap_poll, True, timeout=timeout) - for fd, event_mask in fd_events: - events = 0 - if event_mask & ~select.POLLIN: - events |= EVENT_WRITE - if event_mask & ~select.POLLOUT: - events |= EVENT_READ - - key = self._key_from_fd(fd) - if key: - ready.append((key, events & key.events)) - - return ready - - def close(self): - self._devpoll.close() - super(DevpollSelector, self).close() - - __all__.append('DevpollSelector') - - -if hasattr(select, "kqueue"): - class KqueueSelector(BaseSelector): - """ Kqueue / Kevent-based selector """ - def __init__(self): - super(KqueueSelector, self).__init__() - self._kqueue = select.kqueue() - - def fileno(self): - return self._kqueue.fileno() - - def register(self, fileobj, events, data=None): - key = super(KqueueSelector, self).register(fileobj, events, data) - if events & EVENT_READ: - kevent = select.kevent(key.fd, - select.KQ_FILTER_READ, - select.KQ_EV_ADD) - - _syscall_wrapper(self._wrap_control, False, [kevent], 0, 0) - - if events & EVENT_WRITE: - kevent = select.kevent(key.fd, - select.KQ_FILTER_WRITE, - select.KQ_EV_ADD) - - _syscall_wrapper(self._wrap_control, False, [kevent], 0, 0) - - return key - - def unregister(self, fileobj): - key = super(KqueueSelector, self).unregister(fileobj) - if key.events & EVENT_READ: - kevent = select.kevent(key.fd, - select.KQ_FILTER_READ, - select.KQ_EV_DELETE) - try: - _syscall_wrapper(self._wrap_control, False, [kevent], 0, 0) - except _ERROR_TYPES: - pass - if key.events & EVENT_WRITE: - kevent = select.kevent(key.fd, - select.KQ_FILTER_WRITE, - select.KQ_EV_DELETE) - try: - _syscall_wrapper(self._wrap_control, False, [kevent], 0, 0) - except _ERROR_TYPES: - pass - - return key - - def select(self, timeout=None): - if timeout is not None: - timeout = max(timeout, 0) - - max_events = len(self._fd_to_key) * 2 - ready_fds = {} - - kevent_list = _syscall_wrapper(self._wrap_control, True, - None, max_events, timeout=timeout) - - for kevent in kevent_list: - fd = kevent.ident - event_mask = kevent.filter - events = 0 - if event_mask == select.KQ_FILTER_READ: - events |= EVENT_READ - if event_mask == select.KQ_FILTER_WRITE: - events |= EVENT_WRITE - - key = self._key_from_fd(fd) - if key: - if key.fd not in ready_fds: - ready_fds[key.fd] = (key, events & key.events) - else: - old_events = ready_fds[key.fd][1] - ready_fds[key.fd] = (key, (events | old_events) & key.events) - - return list(ready_fds.values()) - - def close(self): - self._kqueue.close() - super(KqueueSelector, self).close() - - def _wrap_control(self, changelist, max_events, timeout): - return self._kqueue.control(changelist, max_events, timeout) - - __all__.append('KqueueSelector') - - -def _can_allocate(struct): - """ Checks that select structs can be allocated by the underlying - operating system, not just advertised by the select module. We don't - check select() because we'll be hopeful that most platforms that - don't have it available will not advertise it. (ie: GAE) """ - try: - # select.poll() objects won't fail until used. - if struct == 'poll': - p = select.poll() - p.poll(0) - - # All others will fail on allocation. - else: - getattr(select, struct)().close() - return True - except (OSError, AttributeError): - return False - - -# Python 3.5 uses a more direct route to wrap system calls to increase speed. -if sys.version_info >= (3, 5): - def _syscall_wrapper(func, _, *args, **kwargs): - """ This is the short-circuit version of the below logic - because in Python 3.5+ all selectors restart system calls. """ - return func(*args, **kwargs) -else: - def _syscall_wrapper(func, recalc_timeout, *args, **kwargs): - """ Wrapper function for syscalls that could fail due to EINTR. - All functions should be retried if there is time left in the timeout - in accordance with PEP 475. """ - timeout = kwargs.get("timeout", None) - if timeout is None: - expires = None - recalc_timeout = False - else: - timeout = float(timeout) - if timeout < 0.0: # Timeout less than 0 treated as no timeout. - expires = None - else: - expires = monotonic() + timeout - - if recalc_timeout and 'timeout' not in kwargs: - raise ValueError( - 'Timeout must be in kwargs to be recalculated') - - result = _SYSCALL_SENTINEL - while result is _SYSCALL_SENTINEL: - try: - result = func(*args, **kwargs) - # OSError is thrown by select.select - # IOError is thrown by select.epoll.poll - # select.error is thrown by select.poll.poll - # Aren't we thankful for Python 3.x rework for exceptions? - except (OSError, IOError, select.error) as e: - # select.error wasn't a subclass of OSError in the past. - errcode = None - if hasattr(e, 'errno') and e.errno is not None: - errcode = e.errno - elif hasattr(e, 'args'): - errcode = e.args[0] - - # Also test for the Windows equivalent of EINTR. - is_interrupt = (errcode == errno.EINTR or (hasattr(errno, 'WSAEINTR') and - errcode == errno.WSAEINTR)) - - if is_interrupt: - if expires is not None: - current_time = monotonic() - if current_time > expires: - raise OSError(errno.ETIMEDOUT, 'Connection timed out') - if recalc_timeout: - kwargs["timeout"] = expires - current_time - continue - raise - return result - - -# Choose the best implementation, roughly: -# kqueue == devpoll == epoll > poll > select -# select() also can't accept a FD > FD_SETSIZE (usually around 1024) -def DefaultSelector(): - """ This function serves as a first call for DefaultSelector to - detect if the select module is being monkey-patched incorrectly - by eventlet, greenlet, and preserve proper behavior. """ - global _DEFAULT_SELECTOR - if _DEFAULT_SELECTOR is None: - if platform.python_implementation() == 'Jython': # Platform-specific: Jython - _DEFAULT_SELECTOR = JythonSelectSelector - elif _can_allocate('kqueue'): - _DEFAULT_SELECTOR = KqueueSelector - elif _can_allocate('devpoll'): - _DEFAULT_SELECTOR = DevpollSelector - elif _can_allocate('epoll'): - _DEFAULT_SELECTOR = EpollSelector - elif _can_allocate('poll'): - _DEFAULT_SELECTOR = PollSelector - elif hasattr(select, 'select'): - _DEFAULT_SELECTOR = SelectSelector - else: # Platform-specific: AppEngine - raise RuntimeError('Platform does not have a selector.') - return _DEFAULT_SELECTOR()