diff --git a/README.md b/README.md index 0a90851..7c08a8d 100644 --- a/README.md +++ b/README.md @@ -106,9 +106,8 @@ Bugs, issues and contributions can be requested to both the mailing list or the ## Requirements & dependencies * Python v3+ -* six (required) * rfc6555 (required) -* imaplib2 >= 2.57 (optional) +* imaplib2 >= 3.5 * gssapi (optional), for Kerberos authentication * portalocker (optional), if you need to run offlineimap in Cygwin for Windows diff --git a/offlineimap/CustomConfig.py b/offlineimap/CustomConfig.py index f4af64a..59da078 100644 --- a/offlineimap/CustomConfig.py +++ b/offlineimap/CustomConfig.py @@ -18,7 +18,6 @@ import os import re from sys import exc_info from configparser import SafeConfigParser, Error -import six from offlineimap.localeval import LocalEval @@ -71,10 +70,8 @@ class CustomConfigParser(SafeConfigParser): val = self.get(section, option).strip() return re.split(separator_re, val) except re.error as e: - six.reraise(Error, - Error("Bad split regexp '%s': %s" % - (separator_re, e)), - exc_info()[2]) + raise Error("Bad split regexp '%s': %s" % + (separator_re, e), exc_info()[2]) def getdefaultlist(self, section, option, default, separator_re): """Same as getlist, but returns the value of `default` diff --git a/offlineimap/accounts.py b/offlineimap/accounts.py index 6a7d867..00342e1 100644 --- a/offlineimap/accounts.py +++ b/offlineimap/accounts.py @@ -20,7 +20,6 @@ import os import time from sys import exc_info import traceback -import six from offlineimap import mbnames, CustomConfig, OfflineImapError from offlineimap import globals @@ -243,14 +242,19 @@ class SyncableAccount(Account): fcntl.lockf(self._lockfd, fcntl.LOCK_EX | fcntl.LOCK_NB) except NameError: pass # fnctl not available, disable file locking... :( + except IOError: + raise OfflineImapError( + "Could not lock account %s. Is another " + "instance using this account?" % self, + OfflineImapError.ERROR.REPO, + exc_info()[2]) except IOError: self._lockfd.close() - six.reraise(OfflineImapError, - OfflineImapError( - "Could not lock account %s. Is another " - "instance using this account?" % self, - OfflineImapError.ERROR.REPO), - exc_info()[2]) + raise OfflineImapError( + "Could not lock account %s. Is another " + "instance using this account?" % self, + OfflineImapError.ERROR.REPO, + exc_info()[2]) def _unlock(self): """Unlock the account, deleting the lock file""" @@ -614,12 +618,11 @@ def syncfolder(account, remotefolder, quick): localstart = localfolder.getstartdate() remotestart = remotefolder.getstartdate() if (maxage is not None) + (localstart is not None) + (remotestart is not None) > 1: - six.reraise(OfflineImapError, - OfflineImapError("You can set at most one of the " - "following: maxage, startdate (for the local " - "folder), startdate (for the remote folder)", - OfflineImapError.ERROR.REPO), - exc_info()[2]) + raise OfflineImapError("You can set at most one of the " + "following: maxage, startdate (for the local " + "folder), startdate (for the remote folder)", + OfflineImapError.ERROR.REPO, + exc_info()[2]) if (maxage is not None or localstart or remotestart) and quick: # IMAP quickchanged isn't compatible with options that # involve restricting the messagelist, since the "quick" diff --git a/offlineimap/folder/Gmail.py b/offlineimap/folder/Gmail.py index 308f68d..750ff46 100644 --- a/offlineimap/folder/Gmail.py +++ b/offlineimap/folder/Gmail.py @@ -19,8 +19,6 @@ import re from sys import exc_info -import six - from offlineimap import imaputil, imaplibutil, OfflineImapError import offlineimap.accounts from .IMAP import IMAPFolder @@ -71,10 +69,10 @@ class GmailFolder(IMAPFolder): data = self._fetch_from_imap(str(uid), self.retrycount) # data looks now e.g. - # [('320 (X-GM-LABELS (...) UID 17061 BODY[] {2565}','msgbody....')] - # we only asked for one message, and that msg is in data[0]. - # msbody is in [0][1]. - body = data[0][1].replace("\r\n", "\n") + # ['320 (X-GM-LABELS (...) UID 17061 BODY[] {2565}','msgbody....'] + # we only asked for one message, and that msg is in data[1]. + # msbody is in [1]. + body = data[1].replace("\r\n", "\n") # Embed the labels into the message headers if self.synclabels: @@ -134,14 +132,13 @@ class GmailFolder(IMAPFolder): res_type, response = imapobj.fetch("'%s'" % msgsToFetch, '(FLAGS X-GM-LABELS UID)') if res_type != 'OK': - six.reraise(OfflineImapError, - OfflineImapError( - "FETCHING UIDs in folder [%s]%s failed. " % - (self.getrepository(), self) + - "Server responded '[%s] %s'" % - (res_type, response), - OfflineImapError.ERROR.FOLDER), - exc_info()[2]) + raise OfflineImapError( + "FETCHING UIDs in folder [%s]%s failed. " % + (self.getrepository(), self) + + "Server responded '[%s] %s'" % + (res_type, response), + OfflineImapError.ERROR.FOLDER, + exc_info()[2]) finally: self.imapserver.releaseconnection(imapobj) diff --git a/offlineimap/folder/GmailMaildir.py b/offlineimap/folder/GmailMaildir.py index 395b7a3..4114dd4 100644 --- a/offlineimap/folder/GmailMaildir.py +++ b/offlineimap/folder/GmailMaildir.py @@ -18,8 +18,6 @@ import os from sys import exc_info -import six - import offlineimap.accounts from offlineimap import OfflineImapError from offlineimap import imaputil @@ -181,11 +179,10 @@ class GmailMaildirFolder(MaildirFolder): try: os.rename(tmppath, filepath) except OSError as e: - six.reraise(OfflineImapError, - OfflineImapError("Can't rename file '%s' to '%s': %s" % - (tmppath, filepath, e[1]), - OfflineImapError.ERROR.FOLDER), - exc_info()[2]) + raise OfflineImapError("Can't rename file '%s' to '%s': %s" % + (tmppath, filepath, e[1]), + OfflineImapError.ERROR.FOLDER, + exc_info()[2]) # If utime_from_header=true, we don't want to change the mtime. if self._utime_from_header and mtime: diff --git a/offlineimap/folder/IMAP.py b/offlineimap/folder/IMAP.py index b4d279c..53004a8 100644 --- a/offlineimap/folder/IMAP.py +++ b/offlineimap/folder/IMAP.py @@ -20,8 +20,6 @@ import binascii import re import time from sys import exc_info -import six - from offlineimap import imaputil, imaplibutil, emailutil, OfflineImapError from offlineimap import globals from imaplib2 import MonthNames @@ -115,11 +113,10 @@ class IMAPFolder(BaseFolder): def getmaxage(self): if self.config.getdefault("Account %s" % self.accountname, "maxage", None): - six.reraise(OfflineImapError, - OfflineImapError( - "maxage is not supported on IMAP-IMAP sync", - OfflineImapError.ERROR.REPO), - exc_info()[2]) + raise OfflineImapError( + "maxage is not supported on IMAP-IMAP sync", + OfflineImapError.ERROR.REPO, + exc_info()[2]) # Interface from BaseFolder def getinstancelimitnamespace(self): @@ -391,6 +388,10 @@ class IMAPFolder(BaseFolder): try: matchinguids = imapobj.uid('search', 'HEADER', headername, headervalue)[1][0] + + # Returned value is type bytes + matchinguids = matchinguids.decode('utf-8') + except imapobj.error as err: # IMAP server doesn't implement search or had a problem. self.ui.debug('imap', "__savemessage_searchforheader: got IMAP " @@ -456,11 +457,7 @@ class IMAPFolder(BaseFolder): # Folder was empty - start from 1. start = 1 - # Imaplib quotes all parameters of a string type. That must not happen - # with the range X:*. So we use bytearray to stop imaplib from getting - # in our way. - - result = imapobj.uid('FETCH', bytearray('%d:*' % start), 'rfc822.header') + result = imapobj.uid('FETCH', '%d:*' % start, 'rfc822.header') if result[0] != 'OK': raise OfflineImapError('Error fetching mail headers: %s' % '. '.join(result[1]), OfflineImapError.ERROR.MESSAGE) @@ -477,6 +474,9 @@ class IMAPFolder(BaseFolder): # ('185 (RFC822.HEADER {1789}', '... mail headers ...'), ' UID 2444)' for item in result: if found is None and type(item) == tuple: + # Decode the value + item = [x.decode('utf-8') for x in item] + # Walk just tuples. if re.search("(?:^|\\r|\\n)%s:\s*%s(?:\\r|\\n)" % (headername, headervalue), item[1], flags=re.IGNORECASE): @@ -687,13 +687,14 @@ class IMAPFolder(BaseFolder): self.imapserver.releaseconnection(imapobj, True) imapobj = self.imapserver.acquireconnection() if not retry_left: - six.reraise(OfflineImapError, - OfflineImapError("Saving msg (%s) in folder '%s', " - "repository '%s' failed (abort). Server responded: %s\n" - "Message content was: %s" % - (msg_id, self, self.getrepository(), str(e), dbg_output), - OfflineImapError.ERROR.MESSAGE), - exc_info()[2]) + raise OfflineImapError( + "Saving msg (%s) in folder '%s', " + "repository '%s' failed (abort). Server responded: %s\n" + "Message content was: %s" % + (msg_id, self, self.getrepository(), str(e), dbg_output), + OfflineImapError.ERROR.MESSAGE, + exc_info()[2]) + # XXX: is this still needed? self.ui.error(e, exc_info()[2]) except imapobj.error as e: # APPEND failed @@ -702,12 +703,13 @@ class IMAPFolder(BaseFolder): # drop conn, it might be bad. self.imapserver.releaseconnection(imapobj, True) imapobj = None - six.reraise(OfflineImapError, - OfflineImapError("Saving msg (%s) folder '%s', repo '%s'" - "failed (error). Server responded: %s\nMessage content was: " - "%s" % (msg_id, self, self.getrepository(), str(e), dbg_output), - OfflineImapError.ERROR.MESSAGE), - exc_info()[2]) + raise OfflineImapError( + "Saving msg (%s) folder '%s', repo '%s'" + "failed (error). Server responded: %s\nMessage content was: " + "%s" % (msg_id, self, self.getrepository(), str(e), dbg_output), + OfflineImapError.ERROR.MESSAGE, + exc_info()[2]) + # Checkpoint. Let it write out stuff, etc. Eg searches for # just uploaded messages won't work if we don't do this. (typ, dat) = imapobj.check() diff --git a/offlineimap/folder/LocalStatus.py b/offlineimap/folder/LocalStatus.py index 43e6073..d1e2eb8 100644 --- a/offlineimap/folder/LocalStatus.py +++ b/offlineimap/folder/LocalStatus.py @@ -18,8 +18,6 @@ from sys import exc_info import os import threading -import six - from .Base import BaseFolder @@ -71,7 +69,7 @@ class LocalStatusFolder(BaseFolder): errstr = ("Corrupt line '%s' in cache file '%s'" % (line, self.filename)) self.ui.warn(errstr) - six.reraise(ValueError, ValueError(errstr), exc_info()[2]) + raise ValueError(errstr, exc_info()[2]) self.messagelist[uid] = self.msglist_item_initializer(uid) self.messagelist[uid]['flags'] = flags @@ -94,7 +92,7 @@ class LocalStatusFolder(BaseFolder): errstr = "Corrupt line '%s' in cache file '%s'" % \ (line, self.filename) self.ui.warn(errstr) - six.reraise(ValueError, ValueError(errstr), exc_info()[2]) + raise ValueError(errstr, exc_info()[2]) self.messagelist[uid] = self.msglist_item_initializer(uid) self.messagelist[uid]['flags'] = flags self.messagelist[uid]['mtime'] = mtime diff --git a/offlineimap/folder/LocalStatusSQLite.py b/offlineimap/folder/LocalStatusSQLite.py index 3950bb9..5292463 100644 --- a/offlineimap/folder/LocalStatusSQLite.py +++ b/offlineimap/folder/LocalStatusSQLite.py @@ -19,9 +19,6 @@ import os import sqlite3 as sqlite from sys import exc_info from threading import Lock - -import six - from .Base import BaseFolder @@ -117,13 +114,11 @@ class LocalStatusSQLiteFolder(BaseFolder): self._databaseFileLock.registerNewUser() except sqlite.OperationalError as e: # Operation had failed. - six.reraise(UserWarning, - UserWarning( - "cannot open database file '%s': %s.\nYou might" - " want to check the rights to that file and if " - "it cleanly opens with the 'sqlite<3>' command" % - (self.filename, e)), - exc_info()[2]) + raise UserWarning( + "cannot open database file '%s': %s.\nYou might" + " want to check the rights to that file and if " + "it cleanly opens with the 'sqlite<3>' command" % + (self.filename, e), exc_info()[2]) # Test if db version is current enough and if db is readable. try: @@ -351,10 +346,9 @@ class LocalStatusSQLiteFolder(BaseFolder): self.__sql_write('INSERT INTO status (id,flags,mtime,labels) VALUES (?,?,?,?)', (uid, flags, mtime, labels)) except Exception as e: - six.reraise(UserWarning, - UserWarning("%s while inserting UID %s" % - (str(e), str(uid))), - exc_info()[2]) + raise UserWarning("%s while inserting UID %s" % + (str(e), str(uid)), + exc_info()[2]) return uid # Interface from BaseFolder diff --git a/offlineimap/folder/Maildir.py b/offlineimap/folder/Maildir.py index 9783069..f275436 100644 --- a/offlineimap/folder/Maildir.py +++ b/offlineimap/folder/Maildir.py @@ -21,13 +21,7 @@ import re import os from sys import exc_info from threading import Lock -import six - -try: - from hashlib import md5 -except ImportError: - from md5 import md5 - +from hashlib import md5 from offlineimap import OfflineImapError, emailutil from .Base import BaseFolder @@ -319,11 +313,10 @@ class MaildirFolder(BaseFolder): time.sleep(0.23) continue severity = OfflineImapError.ERROR.MESSAGE - six.reraise(OfflineImapError, - OfflineImapError( - "Unique filename %s already exists." % - filename, severity), - exc_info()[2]) + raise OfflineImapError( + "Unique filename %s already exists." % + filename, severity, + exc_info()[2]) else: raise @@ -442,12 +435,11 @@ class MaildirFolder(BaseFolder): os.rename(os.path.join(self.getfullname(), oldfilename), os.path.join(self.getfullname(), newfilename)) except OSError as e: - six.reraise(OfflineImapError, - OfflineImapError( - "Can't rename file '%s' to '%s': %s" % - (oldfilename, newfilename, e[1]), - OfflineImapError.ERROR.FOLDER), - exc_info()[2]) + raise OfflineImapError( + "Can't rename file '%s' to '%s': %s" % + (oldfilename, newfilename, e[1]), + OfflineImapError.ERROR.FOLDER, + exc_info()[2]) self.messagelist[uid]['flags'] = flags self.messagelist[uid]['filename'] = newfilename @@ -529,12 +521,12 @@ class MaildirFolder(BaseFolder): try: os.rename(filename, newfilename) except OSError as e: - six.reraise(OfflineImapError, - OfflineImapError( - "Can't rename file '%s' to '%s': %s" % - (filename, newfilename, e[1]), - OfflineImapError.ERROR.FOLDER), - exc_info()[2]) + raise OfflineImapError( + "Can't rename file '%s' to '%s': %s" % + (filename, newfilename, e[1]), + OfflineImapError.ERROR.FOLDER, + exc_info()[2]) + elif match.group(1) != self._foldermd5: self.ui.warn(("Inconsistent FMD5 for file `%s':" " Neither `%s' nor `%s' found") diff --git a/offlineimap/folder/UIDMaps.py b/offlineimap/folder/UIDMaps.py index 9d709ed..9014805 100644 --- a/offlineimap/folder/UIDMaps.py +++ b/offlineimap/folder/UIDMaps.py @@ -20,7 +20,6 @@ import shutil from os import fsync, unlink from sys import exc_info from threading import Lock -import six try: import portalocker @@ -89,11 +88,11 @@ class MappedIMAPFolder(IMAPFolder): try: line = line.strip() except ValueError: - six.reraise(Exception, - Exception( - "Corrupt line '%s' in UID mapping file '%s'" % - (line, mapfilename)), - exc_info()[2]) + raise Exception( + "Corrupt line '%s' in UID mapping file '%s'" % + (line, mapfilename), + exc_info()[2]) + (str1, str2) = line.split(':') loc = int(str1) rem = int(str2) @@ -129,14 +128,13 @@ class MappedIMAPFolder(IMAPFolder): try: return [mapping[x] for x in items] except KeyError as e: - six.reraise(OfflineImapError, - OfflineImapError( - "Could not find UID for msg '{0}' (f:'{1}'." - " This is usually a bad thing and should be " - "reported on the mailing list.".format( - e.args[0], self), - OfflineImapError.ERROR.MESSAGE), - exc_info()[2]) + raise OfflineImapError( + "Could not find UID for msg '{0}' (f:'{1}'." + " This is usually a bad thing and should be " + "reported on the mailing list.".format( + e.args[0], self), + OfflineImapError.ERROR.MESSAGE, + exc_info()[2]) # Interface from BaseFolder def cachemessagelist(self, min_date=None, min_uid=None): diff --git a/offlineimap/imaplibutil.py b/offlineimap/imaplibutil.py index bdc18a3..338f3f5 100644 --- a/offlineimap/imaplibutil.py +++ b/offlineimap/imaplibutil.py @@ -24,9 +24,7 @@ import errno import zlib from sys import exc_info from hashlib import sha512, sha384, sha256, sha224, sha1 -import six import rfc6555 - from offlineimap import OfflineImapError from offlineimap.ui import getglobalui from imaplib2 import IMAP4, IMAP4_SSL, InternalDate @@ -59,9 +57,8 @@ class UsefulIMAPMixIn: errstr = "Server '%s' closed connection, error on SELECT '%s'. Ser" \ "ver said: %s" % (self.host, mailbox, e.args[0]) severity = OfflineImapError.ERROR.FOLDER_RETRY - six.reraise(OfflineImapError, - OfflineImapError(errstr, severity), - exc_info()[2]) + raise OfflineImapError(errstr, severity, exc_info()[2]) + if result[0] != 'OK': # in case of error, bail out with OfflineImapError errstr = "Error SELECTing mailbox '%s', server reply:\n%s" % \ diff --git a/offlineimap/imapserver.py b/offlineimap/imapserver.py index c5238f0..cbd816d 100644 --- a/offlineimap/imapserver.py +++ b/offlineimap/imapserver.py @@ -27,16 +27,12 @@ from socket import gaierror from sys import exc_info from ssl import SSLError, cert_time_to_seconds from threading import Lock, BoundedSemaphore, Thread, Event, currentThread - -import six - import offlineimap.accounts from offlineimap import imaplibutil, imaputil, threadutil, OfflineImapError from offlineimap.ui import getglobalui try: import gssapi - have_gss = True except ImportError: have_gss = False @@ -253,7 +249,8 @@ class IMAPServer(): msg = "%s (configuration is: %s)" % (e, str(params)) except Exception as eparams: msg = "%s [cannot display configuration: %s]" % (e, eparams) - six.reraise(type(e), type(e)(msg), exc_info()[2]) + + raise type(e)(msg, exc_info()[2]) finally: socket.socket = original_socket @@ -633,9 +630,7 @@ class IMAPServer(): "'%s'. Make sure you have configured the ser" \ "ver name correctly and that you are online." % \ (self.hostname, self.repos) - six.reraise(OfflineImapError, - OfflineImapError(reason, severity), - exc_info()[2]) + raise OfflineImapError(reason, severity, exc_info()[2]) elif isinstance(e, SSLError) and e.errno == errno.EPERM: # SSL unknown protocol error @@ -649,9 +644,7 @@ class IMAPServer(): reason = "Unknown SSL protocol connecting to host '%s' for " \ "repository '%s'. OpenSSL responded:\n%s" \ % (self.hostname, self.repos, e) - six.reraise(OfflineImapError, - OfflineImapError(reason, severity), - exc_info()[2]) + raise OfflineImapError(reason, severity, exc_info()[2]) elif isinstance(e, socket.error) and e.args[0] == errno.ECONNREFUSED: # "Connection refused", can be a non-existing port, or an unauthorized @@ -660,18 +653,16 @@ class IMAPServer(): "refused. Make sure you have the right host and port " \ "configured and that you are actually able to access the " \ "network." % (self.hostname, self.port, self.repos) - six.reraise(OfflineImapError, - OfflineImapError(reason, severity), - exc_info()[2]) + raise OfflineImapError(reason, severity, exc_info()[2]) + # Could not acquire connection to the remote; # socket.error(last_error) raised if str(e)[:24] == "can't open socket; error": - six.reraise(OfflineImapError, - OfflineImapError( - "Could not connect to remote server '%s' " - "for repository '%s'. Remote does not answer." % (self.hostname, self.repos), - OfflineImapError.ERROR.REPO), - exc_info()[2]) + raise OfflineImapError( + "Could not connect to remote server '%s' " + "for repository '%s'. Remote does not answer." % (self.hostname, self.repos), + OfflineImapError.ERROR.REPO, + exc_info()[2]) else: # re-raise all other errors raise diff --git a/offlineimap/repository/IMAP.py b/offlineimap/repository/IMAP.py index a97e2c9..60888f9 100644 --- a/offlineimap/repository/IMAP.py +++ b/offlineimap/repository/IMAP.py @@ -21,9 +21,6 @@ import netrc import errno from sys import exc_info from threading import Event - -import six - from offlineimap import folder, imaputil, imapserver, OfflineImapError from offlineimap.repository.Base import BaseRepository from offlineimap.threadutil import ExitNotifyThread @@ -127,12 +124,11 @@ class IMAPRepository(BaseRepository): try: host = self.localeval.eval(host) except Exception as e: - six.reraise(OfflineImapError, - OfflineImapError( - "remotehosteval option for repository " - "'%s' failed:\n%s" % (self, e), - OfflineImapError.ERROR.REPO), - exc_info()[2]) + raise OfflineImapError( + "remotehosteval option for repository " + "'%s' failed:\n%s" % (self, e), + OfflineImapError.ERROR.REPO, + exc_info()[2]) if host: self._host = host return self._host @@ -549,7 +545,7 @@ class IMAPRepository(BaseRepository): if foldername == '': return - if self.getreference(): + if self.getreference() != '""': foldername = self.getreference() + self.getsep() + foldername if not foldername: # Create top level folder as folder separator. foldername = self.getsep() diff --git a/offlineimap/repository/__init__.py b/offlineimap/repository/__init__.py index 6a202eb..ad2edf2 100644 --- a/offlineimap/repository/__init__.py +++ b/offlineimap/repository/__init__.py @@ -15,13 +15,7 @@ # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA from sys import exc_info -import six - -try: - from configparser import NoSectionError -except ImportError: # python2 - from configparser import NoSectionError - +from configparser import NoSectionError from offlineimap.repository.IMAP import IMAPRepository, MappedIMAPRepository from offlineimap.repository.Gmail import GmailRepository from offlineimap.repository.Maildir import MaildirRepository @@ -68,18 +62,16 @@ class Repository: except NoSectionError: errstr = ("Could not find section '%s' in configuration. Required " "for account '%s'." % ('Repository %s' % name, account)) - six.reraise(OfflineImapError, - OfflineImapError(errstr, OfflineImapError.ERROR.REPO), - exc_info()[2]) + raise OfflineImapError(errstr, OfflineImapError.ERROR.REPO, + exc_info()[2]) try: repo = typemap[repostype] except KeyError: errstr = "'%s' repository not supported for '%s' repositories." % \ (repostype, reqtype) - six.reraise(OfflineImapError, - OfflineImapError(errstr, OfflineImapError.ERROR.REPO), - exc_info()[2]) + raise OfflineImapError(errstr, OfflineImapError.ERROR.REPO, + exc_info()[2]) return repo(name, account) diff --git a/requirements.txt b/requirements.txt index 1e2a2ae..9d575fb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,4 @@ # Requirements -six gssapi[kerberos] portalocker[cygwin] rfc6555 diff --git a/rfc6555.py b/rfc6555.py deleted file mode 100644 index 56436b7..0000000 --- a/rfc6555.py +++ /dev/null @@ -1,315 +0,0 @@ -""" Python implementation of the Happy Eyeballs Algorithm described in RFC 6555. """ - -# Copyright 2017 Seth Michael Larson -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import errno -import socket -from selectors2 import DefaultSelector, EVENT_WRITE - -# time.perf_counter() is defined in Python 3.3 -try: - from time import perf_counter -except (ImportError, AttributeError): - from time import time as perf_counter - - -# This list is due to socket.error and IOError not being a -# subclass of OSError until later versions of Python. -_SOCKET_ERRORS = (socket.error, OSError, IOError) - - -# Detects whether an IPv6 socket can be allocated. -def _detect_ipv6(): - if getattr(socket, 'has_ipv6', False) and hasattr(socket, 'AF_INET6'): - _sock = None - try: - _sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) - _sock.bind(('::1', 0)) - return True - except _SOCKET_ERRORS: - if _sock: - _sock.close() - return False - - -_HAS_IPV6 = _detect_ipv6() - -# These are error numbers for asynchronous operations which can -# be safely ignored by RFC 6555 as being non-errors. -_ASYNC_ERRNOS = set([errno.EINPROGRESS, - errno.EAGAIN, - errno.EWOULDBLOCK]) -if hasattr(errno, 'WSAWOULDBLOCK'): - _ASYNC_ERRNOS.add(errno.WSAWOULDBLOCK) - -_DEFAULT_CACHE_DURATION = 60 * 10 # 10 minutes according to the RFC. - -# This value that can be used to disable RFC 6555 globally. -RFC6555_ENABLED = _HAS_IPV6 - -__all__ = ['RFC6555_ENABLED', - 'create_connection', - 'cache'] - -__version__ = '0.0.0' -__author__ = 'Seth Michael Larson' -__email__ = 'sethmichaellarson@protonmail.com' -__license__ = 'Apache-2.0' - - -class _RFC6555CacheManager(object): - def __init__(self): - self.validity_duration = _DEFAULT_CACHE_DURATION - self.enabled = True - self.entries = {} - - def add_entry(self, address, family): - if self.enabled: - current_time = perf_counter() - - # Don't over-write old entries to reset their expiry. - if address not in self.entries or self.entries[address][1] > current_time: - self.entries[address] = (family, current_time + self.validity_duration) - - def get_entry(self, address): - if not self.enabled or address not in self.entries: - return None - - family, expiry = self.entries[address] - if perf_counter() > expiry: - del self.entries[address] - return None - - return family - - -cache = _RFC6555CacheManager() - - -class _RFC6555ConnectionManager(object): - def __init__(self, address, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, source_address=None): - self.address = address - self.timeout = timeout - self.source_address = source_address - - self._error = None - self._selector = DefaultSelector() - self._sockets = [] - self._start_time = None - - def create_connection(self): - self._start_time = perf_counter() - - host, port = self.address - addr_info = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM) - ret = self._connect_with_cached_family(addr_info) - - # If it's a list, then these are the remaining values to try. - if isinstance(ret, list): - addr_info = ret - else: - cache.add_entry(self.address, ret.family) - return ret - - # If we don't get any results back then just skip to the end. - if not addr_info: - raise socket.error('getaddrinfo returns an empty list') - - sock = self._attempt_connect_with_addr_info(addr_info) - - if sock: - cache.add_entry(self.address, sock.family) - return sock - elif self._error: - raise self._error - else: - raise socket.timeout() - - def _attempt_connect_with_addr_info(self, addr_info): - sock = None - try: - for family, socktype, proto, _, sockaddr in addr_info: - self._create_socket(family, socktype, proto, sockaddr) - sock = self._wait_for_connection(False) - if sock: - break - if sock is None: - sock = self._wait_for_connection(True) - finally: - self._remove_all_sockets() - return sock - - def _connect_with_cached_family(self, addr_info): - family = cache.get_entry(self.address) - if family is None: - return addr_info - - is_family = [] - not_family = [] - - for value in addr_info: - if value[0] == family: - is_family.append(value) - else: - not_family.append(value) - - sock = self._attempt_connect_with_addr_info(is_family) - if sock is not None: - return sock - - return not_family - - def _create_socket(self, family, socktype, proto, sockaddr): - sock = None - try: - sock = socket.socket(family, socktype, proto) - - # If we're using the 'default' socket timeout we have - # to set it to a real value here as this is the earliest - # opportunity to without pre-allocating a socket just for - # this purpose. - if self.timeout is socket._GLOBAL_DEFAULT_TIMEOUT: - self.timeout = sock.gettimeout() - - if self.source_address: - sock.bind(self.source_address) - - # Make the socket non-blocking so we can use our selector. - sock.settimeout(0.0) - - if self._is_acceptable_errno(sock.connect_ex(sockaddr)): - self._selector.register(sock, EVENT_WRITE) - self._sockets.append(sock) - - except _SOCKET_ERRORS as e: - self._error = e - if sock is not None: - _RFC6555ConnectionManager._close_socket(sock) - - def _wait_for_connection(self, last_wait): - self._remove_all_errored_sockets() - - # This is a safe-guard to make sure sock.gettimeout() is called in the - # case that the default socket timeout is used. If there are no - # sockets then we may not have called sock.gettimeout() yet. - if not self._sockets: - return None - - # If this is the last time we're waiting for connections - # then we should wait until we should raise a timeout - # error, otherwise we should only wait >0.2 seconds as - # recommended by RFC 6555. - if last_wait: - if self.timeout is None: - select_timeout = None - else: - select_timeout = self._get_remaining_time() - else: - select_timeout = self._get_select_time() - - # Wait for any socket to become writable as a sign of being connected. - for key, _ in self._selector.select(select_timeout): - sock = key.fileobj - - if not self._is_socket_errored(sock): - - # Restore the old proper timeout of the socket. - sock.settimeout(self.timeout) - - # Remove it from this list to exempt the socket from cleanup. - self._sockets.remove(sock) - self._selector.unregister(sock) - return sock - - return None - - def _get_remaining_time(self): - if self.timeout is None: - return None - return max(self.timeout - (perf_counter() - self._start_time), 0.0) - - def _get_select_time(self): - if self.timeout is None: - return 0.2 - return min(0.2, self._get_remaining_time()) - - def _remove_all_errored_sockets(self): - socks = [] - for sock in self._sockets: - if self._is_socket_errored(sock): - socks.append(sock) - for sock in socks: - self._selector.unregister(sock) - self._sockets.remove(sock) - _RFC6555ConnectionManager._close_socket(sock) - - @staticmethod - def _close_socket(sock): - try: - sock.close() - except _SOCKET_ERRORS: - pass - - def _is_acceptable_errno(self, errno): - if errno == 0 or errno in _ASYNC_ERRNOS: - return True - self._error = socket.error() - self._error.errno = errno - return False - - def _is_socket_errored(self, sock): - errno = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) - return not self._is_acceptable_errno(errno) - - def _remove_all_sockets(self): - for sock in self._sockets: - self._selector.unregister(sock) - _RFC6555ConnectionManager._close_socket(sock) - self._sockets = [] - - -def create_connection(address, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, source_address=None): - if RFC6555_ENABLED and _HAS_IPV6: - manager = _RFC6555ConnectionManager(address, timeout, source_address) - return manager.create_connection() - else: - # This code is the same as socket.create_connection() but is - # here to make sure the same code is used across all Python versions as - # the source_address parameter was added to socket.create_connection() in 3.2 - # This segment of code is licensed under the Python Software Foundation License - # See LICENSE: https://github.com/python/cpython/blob/3.6/LICENSE - host, port = address - err = None - for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM): - af, socktype, proto, canonname, sa = res - sock = None - try: - sock = socket.socket(af, socktype, proto) - if timeout is not socket._GLOBAL_DEFAULT_TIMEOUT: - sock.settimeout(timeout) - if source_address: - sock.bind(source_address) - sock.connect(sa) - return sock - - except socket.error as _: - err = _ - if sock is not None: - sock.close() - - if err is not None: - raise err - else: - raise socket.error("getaddrinfo returns an empty list") diff --git a/selectors2.py b/selectors2.py deleted file mode 100644 index 1625a30..0000000 --- a/selectors2.py +++ /dev/null @@ -1,747 +0,0 @@ -""" Back-ported, durable, and portable selectors """ - -# MIT License -# -# Copyright (c) 2017 Seth Michael Larson -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. - -from collections import namedtuple, Mapping -import errno -import math -import platform -import select -import socket -import sys -import time - -try: - monotonic = time.monotonic -except AttributeError: - monotonic = time.time - -__author__ = 'Seth Michael Larson' -__email__ = 'sethmichaellarson@protonmail.com' -__version__ = '2.0.2' -__license__ = 'MIT' -__url__ = 'https://www.github.com/SethMichaelLarson/selectors2' - -__all__ = ['EVENT_READ', - 'EVENT_WRITE', - 'SelectorKey', - 'DefaultSelector', - 'BaseSelector'] - -EVENT_READ = (1 << 0) -EVENT_WRITE = (1 << 1) -_DEFAULT_SELECTOR = None -_SYSCALL_SENTINEL = object() # Sentinel in case a system call returns None. -_ERROR_TYPES = (OSError, IOError, socket.error) - -try: - _INTEGER_TYPES = (int, long) -except NameError: - _INTEGER_TYPES = (int,) - - -SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data']) - - -class _SelectorMapping(Mapping): - """ Mapping of file objects to selector keys """ - - def __init__(self, selector): - self._selector = selector - - def __len__(self): - return len(self._selector._fd_to_key) - - def __getitem__(self, fileobj): - try: - fd = self._selector._fileobj_lookup(fileobj) - return self._selector._fd_to_key[fd] - except KeyError: - raise KeyError("{0!r} is not registered.".format(fileobj)) - - def __iter__(self): - return iter(self._selector._fd_to_key) - - -def _fileobj_to_fd(fileobj): - """ Return a file descriptor from a file object. If - given an integer will simply return that integer back. """ - if isinstance(fileobj, _INTEGER_TYPES): - fd = fileobj - else: - for _integer_type in _INTEGER_TYPES: - try: - fd = _integer_type(fileobj.fileno()) - break - except (AttributeError, TypeError, ValueError): - continue - else: - raise ValueError("Invalid file object: {0!r}".format(fileobj)) - if fd < 0: - raise ValueError("Invalid file descriptor: {0}".format(fd)) - return fd - - -class BaseSelector(object): - """ Abstract Selector class - - A selector supports registering file objects to be monitored - for specific I/O events. - - A file object is a file descriptor or any object with a - `fileno()` method. An arbitrary object can be attached to the - file object which can be used for example to store context info, - a callback, etc. - - A selector can use various implementations (select(), poll(), epoll(), - and kqueue()) depending on the platform. The 'DefaultSelector' class uses - the most efficient implementation for the current platform. - """ - def __init__(self): - # Maps file descriptors to keys. - self._fd_to_key = {} - - # Read-only mapping returned by get_map() - self._map = _SelectorMapping(self) - - def _fileobj_lookup(self, fileobj): - """ Return a file descriptor from a file object. - This wraps _fileobj_to_fd() to do an exhaustive - search in case the object is invalid but we still - have it in our map. Used by unregister() so we can - unregister an object that was previously registered - even if it is closed. It is also used by _SelectorMapping - """ - try: - return _fileobj_to_fd(fileobj) - except ValueError: - - # Search through all our mapped keys. - for key in self._fd_to_key.values(): - if key.fileobj is fileobj: - return key.fd - - # Raise ValueError after all. - raise - - def register(self, fileobj, events, data=None): - """ Register a file object for a set of events to monitor. """ - if (not events) or (events & ~(EVENT_READ | EVENT_WRITE)): - raise ValueError("Invalid events: {0!r}".format(events)) - - key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data) - - if key.fd in self._fd_to_key: - raise KeyError("{0!r} (FD {1}) is already registered" - .format(fileobj, key.fd)) - - self._fd_to_key[key.fd] = key - return key - - def unregister(self, fileobj): - """ Unregister a file object from being monitored. """ - try: - key = self._fd_to_key.pop(self._fileobj_lookup(fileobj)) - except KeyError: - raise KeyError("{0!r} is not registered".format(fileobj)) - - # Getting the fileno of a closed socket on Windows errors with EBADF. - except socket.error as err: - if err.errno != errno.EBADF: - raise - else: - for key in self._fd_to_key.values(): - if key.fileobj is fileobj: - self._fd_to_key.pop(key.fd) - break - else: - raise KeyError("{0!r} is not registered".format(fileobj)) - return key - - def modify(self, fileobj, events, data=None): - """ Change a registered file object monitored events and data. """ - # NOTE: Some subclasses optimize this operation even further. - try: - key = self._fd_to_key[self._fileobj_lookup(fileobj)] - except KeyError: - raise KeyError("{0!r} is not registered".format(fileobj)) - - if events != key.events: - self.unregister(fileobj) - key = self.register(fileobj, events, data) - - elif data != key.data: - # Use a shortcut to update the data. - key = key._replace(data=data) - self._fd_to_key[key.fd] = key - - return key - - def select(self, timeout=None): - """ Perform the actual selection until some monitored file objects - are ready or the timeout expires. """ - raise NotImplementedError() - - def close(self): - """ Close the selector. This must be called to ensure that all - underlying resources are freed. """ - self._fd_to_key.clear() - self._map = None - - def get_key(self, fileobj): - """ Return the key associated with a registered file object. """ - mapping = self.get_map() - if mapping is None: - raise RuntimeError("Selector is closed") - try: - return mapping[fileobj] - except KeyError: - raise KeyError("{0!r} is not registered".format(fileobj)) - - def get_map(self): - """ Return a mapping of file objects to selector keys """ - return self._map - - def _key_from_fd(self, fd): - """ Return the key associated to a given file descriptor - Return None if it is not found. """ - try: - return self._fd_to_key[fd] - except KeyError: - return None - - def __enter__(self): - return self - - def __exit__(self, *_): - self.close() - - -# Almost all platforms have select.select() -if hasattr(select, "select"): - class SelectSelector(BaseSelector): - """ Select-based selector. """ - def __init__(self): - super(SelectSelector, self).__init__() - self._readers = set() - self._writers = set() - - def register(self, fileobj, events, data=None): - key = super(SelectSelector, self).register(fileobj, events, data) - if events & EVENT_READ: - self._readers.add(key.fd) - if events & EVENT_WRITE: - self._writers.add(key.fd) - return key - - def unregister(self, fileobj): - key = super(SelectSelector, self).unregister(fileobj) - self._readers.discard(key.fd) - self._writers.discard(key.fd) - return key - - def select(self, timeout=None): - # Selecting on empty lists on Windows errors out. - if not len(self._readers) and not len(self._writers): - return [] - - timeout = None if timeout is None else max(timeout, 0.0) - ready = [] - r, w, _ = _syscall_wrapper(self._wrap_select, True, self._readers, - self._writers, timeout=timeout) - r = set(r) - w = set(w) - for fd in r | w: - events = 0 - if fd in r: - events |= EVENT_READ - if fd in w: - events |= EVENT_WRITE - - key = self._key_from_fd(fd) - if key: - ready.append((key, events & key.events)) - return ready - - def _wrap_select(self, r, w, timeout=None): - """ Wrapper for select.select because timeout is a positional arg """ - return select.select(r, w, [], timeout) - - __all__.append('SelectSelector') - - # Jython has a different implementation of .fileno() for socket objects. - if platform.python_implementation() == 'Jython': - class _JythonSelectorMapping(object): - """ This is an implementation of _SelectorMapping that is built - for use specifically with Jython, which does not provide a hashable - value from socket.socket.fileno(). """ - - def __init__(self, selector): - assert isinstance(selector, JythonSelectSelector) - self._selector = selector - - def __len__(self): - return len(self._selector._sockets) - - def __getitem__(self, fileobj): - for sock, key in self._selector._sockets: - if sock is fileobj: - return key - else: - raise KeyError("{0!r} is not registered.".format(fileobj)) - - class JythonSelectSelector(SelectSelector): - """ This is an implementation of SelectSelector that is for Jython - which works around that Jython's socket.socket.fileno() does not - return an integer fd value. All SelectorKey.fd will be equal to -1 - and should not be used. This instead uses object id to compare fileobj - and will only use select.select as it's the only selector that allows - directly passing in socket objects rather than registering fds. - See: http://bugs.jython.org/issue1678 - https://wiki.python.org/jython/NewSocketModule#socket.fileno.28.29_does_not_return_an_integer - """ - - def __init__(self): - super(JythonSelectSelector, self).__init__() - - self._sockets = [] # Uses a list of tuples instead of dictionary. - self._map = _JythonSelectorMapping(self) - self._readers = [] - self._writers = [] - - # Jython has a select.cpython_compatible_select function in older versions. - self._select_func = getattr(select, 'cpython_compatible_select', select.select) - - def register(self, fileobj, events, data=None): - for sock, _ in self._sockets: - if sock is fileobj: - raise KeyError("{0!r} is already registered" - .format(fileobj, sock)) - - key = SelectorKey(fileobj, -1, events, data) - self._sockets.append((fileobj, key)) - - if events & EVENT_READ: - self._readers.append(fileobj) - if events & EVENT_WRITE: - self._writers.append(fileobj) - return key - - def unregister(self, fileobj): - for i, (sock, key) in enumerate(self._sockets): - if sock is fileobj: - break - else: - raise KeyError("{0!r} is not registered.".format(fileobj)) - - if key.events & EVENT_READ: - self._readers.remove(fileobj) - if key.events & EVENT_WRITE: - self._writers.remove(fileobj) - - del self._sockets[i] - return key - - def _wrap_select(self, r, w, timeout=None): - """ Wrapper for select.select because timeout is a positional arg """ - return self._select_func(r, w, [], timeout) - - __all__.append('JythonSelectSelector') - SelectSelector = JythonSelectSelector # Override so the wrong selector isn't used. - - -if hasattr(select, "poll"): - class PollSelector(BaseSelector): - """ Poll-based selector """ - def __init__(self): - super(PollSelector, self).__init__() - self._poll = select.poll() - - def register(self, fileobj, events, data=None): - key = super(PollSelector, self).register(fileobj, events, data) - event_mask = 0 - if events & EVENT_READ: - event_mask |= select.POLLIN - if events & EVENT_WRITE: - event_mask |= select.POLLOUT - self._poll.register(key.fd, event_mask) - return key - - def unregister(self, fileobj): - key = super(PollSelector, self).unregister(fileobj) - self._poll.unregister(key.fd) - return key - - def _wrap_poll(self, timeout=None): - """ Wrapper function for select.poll.poll() so that - _syscall_wrapper can work with only seconds. """ - if timeout is not None: - if timeout <= 0: - timeout = 0 - else: - # select.poll.poll() has a resolution of 1 millisecond, - # round away from zero to wait *at least* timeout seconds. - timeout = math.ceil(timeout * 1000) - - result = self._poll.poll(timeout) - return result - - def select(self, timeout=None): - ready = [] - fd_events = _syscall_wrapper(self._wrap_poll, True, timeout=timeout) - for fd, event_mask in fd_events: - events = 0 - if event_mask & ~select.POLLIN: - events |= EVENT_WRITE - if event_mask & ~select.POLLOUT: - events |= EVENT_READ - - key = self._key_from_fd(fd) - if key: - ready.append((key, events & key.events)) - - return ready - - __all__.append('PollSelector') - -if hasattr(select, "epoll"): - class EpollSelector(BaseSelector): - """ Epoll-based selector """ - def __init__(self): - super(EpollSelector, self).__init__() - self._epoll = select.epoll() - - def fileno(self): - return self._epoll.fileno() - - def register(self, fileobj, events, data=None): - key = super(EpollSelector, self).register(fileobj, events, data) - events_mask = 0 - if events & EVENT_READ: - events_mask |= select.EPOLLIN - if events & EVENT_WRITE: - events_mask |= select.EPOLLOUT - _syscall_wrapper(self._epoll.register, False, key.fd, events_mask) - return key - - def unregister(self, fileobj): - key = super(EpollSelector, self).unregister(fileobj) - try: - _syscall_wrapper(self._epoll.unregister, False, key.fd) - except _ERROR_TYPES: - # This can occur when the fd was closed since registry. - pass - return key - - def select(self, timeout=None): - if timeout is not None: - if timeout <= 0: - timeout = 0.0 - else: - # select.epoll.poll() has a resolution of 1 millisecond - # but luckily takes seconds so we don't need a wrapper - # like PollSelector. Just for better rounding. - timeout = math.ceil(timeout * 1000) * 0.001 - timeout = float(timeout) - else: - timeout = -1.0 # epoll.poll() must have a float. - - # We always want at least 1 to ensure that select can be called - # with no file descriptors registered. Otherwise will fail. - max_events = max(len(self._fd_to_key), 1) - - ready = [] - fd_events = _syscall_wrapper(self._epoll.poll, True, - timeout=timeout, - maxevents=max_events) - for fd, event_mask in fd_events: - events = 0 - if event_mask & ~select.EPOLLIN: - events |= EVENT_WRITE - if event_mask & ~select.EPOLLOUT: - events |= EVENT_READ - - key = self._key_from_fd(fd) - if key: - ready.append((key, events & key.events)) - return ready - - def close(self): - self._epoll.close() - super(EpollSelector, self).close() - - __all__.append('EpollSelector') - - -if hasattr(select, "devpoll"): - class DevpollSelector(BaseSelector): - """Solaris /dev/poll selector.""" - - def __init__(self): - super(DevpollSelector, self).__init__() - self._devpoll = select.devpoll() - - def fileno(self): - return self._devpoll.fileno() - - def register(self, fileobj, events, data=None): - key = super(DevpollSelector, self).register(fileobj, events, data) - poll_events = 0 - if events & EVENT_READ: - poll_events |= select.POLLIN - if events & EVENT_WRITE: - poll_events |= select.POLLOUT - self._devpoll.register(key.fd, poll_events) - return key - - def unregister(self, fileobj): - key = super(DevpollSelector, self).unregister(fileobj) - self._devpoll.unregister(key.fd) - return key - - def _wrap_poll(self, timeout=None): - """ Wrapper function for select.poll.poll() so that - _syscall_wrapper can work with only seconds. """ - if timeout is not None: - if timeout <= 0: - timeout = 0 - else: - # select.devpoll.poll() has a resolution of 1 millisecond, - # round away from zero to wait *at least* timeout seconds. - timeout = math.ceil(timeout * 1000) - - result = self._devpoll.poll(timeout) - return result - - def select(self, timeout=None): - ready = [] - fd_events = _syscall_wrapper(self._wrap_poll, True, timeout=timeout) - for fd, event_mask in fd_events: - events = 0 - if event_mask & ~select.POLLIN: - events |= EVENT_WRITE - if event_mask & ~select.POLLOUT: - events |= EVENT_READ - - key = self._key_from_fd(fd) - if key: - ready.append((key, events & key.events)) - - return ready - - def close(self): - self._devpoll.close() - super(DevpollSelector, self).close() - - __all__.append('DevpollSelector') - - -if hasattr(select, "kqueue"): - class KqueueSelector(BaseSelector): - """ Kqueue / Kevent-based selector """ - def __init__(self): - super(KqueueSelector, self).__init__() - self._kqueue = select.kqueue() - - def fileno(self): - return self._kqueue.fileno() - - def register(self, fileobj, events, data=None): - key = super(KqueueSelector, self).register(fileobj, events, data) - if events & EVENT_READ: - kevent = select.kevent(key.fd, - select.KQ_FILTER_READ, - select.KQ_EV_ADD) - - _syscall_wrapper(self._wrap_control, False, [kevent], 0, 0) - - if events & EVENT_WRITE: - kevent = select.kevent(key.fd, - select.KQ_FILTER_WRITE, - select.KQ_EV_ADD) - - _syscall_wrapper(self._wrap_control, False, [kevent], 0, 0) - - return key - - def unregister(self, fileobj): - key = super(KqueueSelector, self).unregister(fileobj) - if key.events & EVENT_READ: - kevent = select.kevent(key.fd, - select.KQ_FILTER_READ, - select.KQ_EV_DELETE) - try: - _syscall_wrapper(self._wrap_control, False, [kevent], 0, 0) - except _ERROR_TYPES: - pass - if key.events & EVENT_WRITE: - kevent = select.kevent(key.fd, - select.KQ_FILTER_WRITE, - select.KQ_EV_DELETE) - try: - _syscall_wrapper(self._wrap_control, False, [kevent], 0, 0) - except _ERROR_TYPES: - pass - - return key - - def select(self, timeout=None): - if timeout is not None: - timeout = max(timeout, 0) - - max_events = len(self._fd_to_key) * 2 - ready_fds = {} - - kevent_list = _syscall_wrapper(self._wrap_control, True, - None, max_events, timeout=timeout) - - for kevent in kevent_list: - fd = kevent.ident - event_mask = kevent.filter - events = 0 - if event_mask == select.KQ_FILTER_READ: - events |= EVENT_READ - if event_mask == select.KQ_FILTER_WRITE: - events |= EVENT_WRITE - - key = self._key_from_fd(fd) - if key: - if key.fd not in ready_fds: - ready_fds[key.fd] = (key, events & key.events) - else: - old_events = ready_fds[key.fd][1] - ready_fds[key.fd] = (key, (events | old_events) & key.events) - - return list(ready_fds.values()) - - def close(self): - self._kqueue.close() - super(KqueueSelector, self).close() - - def _wrap_control(self, changelist, max_events, timeout): - return self._kqueue.control(changelist, max_events, timeout) - - __all__.append('KqueueSelector') - - -def _can_allocate(struct): - """ Checks that select structs can be allocated by the underlying - operating system, not just advertised by the select module. We don't - check select() because we'll be hopeful that most platforms that - don't have it available will not advertise it. (ie: GAE) """ - try: - # select.poll() objects won't fail until used. - if struct == 'poll': - p = select.poll() - p.poll(0) - - # All others will fail on allocation. - else: - getattr(select, struct)().close() - return True - except (OSError, AttributeError): - return False - - -# Python 3.5 uses a more direct route to wrap system calls to increase speed. -if sys.version_info >= (3, 5): - def _syscall_wrapper(func, _, *args, **kwargs): - """ This is the short-circuit version of the below logic - because in Python 3.5+ all selectors restart system calls. """ - return func(*args, **kwargs) -else: - def _syscall_wrapper(func, recalc_timeout, *args, **kwargs): - """ Wrapper function for syscalls that could fail due to EINTR. - All functions should be retried if there is time left in the timeout - in accordance with PEP 475. """ - timeout = kwargs.get("timeout", None) - if timeout is None: - expires = None - recalc_timeout = False - else: - timeout = float(timeout) - if timeout < 0.0: # Timeout less than 0 treated as no timeout. - expires = None - else: - expires = monotonic() + timeout - - if recalc_timeout and 'timeout' not in kwargs: - raise ValueError( - 'Timeout must be in kwargs to be recalculated') - - result = _SYSCALL_SENTINEL - while result is _SYSCALL_SENTINEL: - try: - result = func(*args, **kwargs) - # OSError is thrown by select.select - # IOError is thrown by select.epoll.poll - # select.error is thrown by select.poll.poll - # Aren't we thankful for Python 3.x rework for exceptions? - except (OSError, IOError, select.error) as e: - # select.error wasn't a subclass of OSError in the past. - errcode = None - if hasattr(e, 'errno') and e.errno is not None: - errcode = e.errno - elif hasattr(e, 'args'): - errcode = e.args[0] - - # Also test for the Windows equivalent of EINTR. - is_interrupt = (errcode == errno.EINTR or (hasattr(errno, 'WSAEINTR') and - errcode == errno.WSAEINTR)) - - if is_interrupt: - if expires is not None: - current_time = monotonic() - if current_time > expires: - raise OSError(errno.ETIMEDOUT, 'Connection timed out') - if recalc_timeout: - kwargs["timeout"] = expires - current_time - continue - raise - return result - - -# Choose the best implementation, roughly: -# kqueue == devpoll == epoll > poll > select -# select() also can't accept a FD > FD_SETSIZE (usually around 1024) -def DefaultSelector(): - """ This function serves as a first call for DefaultSelector to - detect if the select module is being monkey-patched incorrectly - by eventlet, greenlet, and preserve proper behavior. """ - global _DEFAULT_SELECTOR - if _DEFAULT_SELECTOR is None: - if platform.python_implementation() == 'Jython': # Platform-specific: Jython - _DEFAULT_SELECTOR = JythonSelectSelector - elif _can_allocate('kqueue'): - _DEFAULT_SELECTOR = KqueueSelector - elif _can_allocate('devpoll'): - _DEFAULT_SELECTOR = DevpollSelector - elif _can_allocate('epoll'): - _DEFAULT_SELECTOR = EpollSelector - elif _can_allocate('poll'): - _DEFAULT_SELECTOR = PollSelector - elif hasattr(select, 'select'): - _DEFAULT_SELECTOR = SelectSelector - else: # Platform-specific: AppEngine - raise RuntimeError('Platform does not have a selector.') - return _DEFAULT_SELECTOR()