diff --git a/CHANGES.md b/CHANGES.md index e92a1e23..0463652b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -24,7 +24,7 @@ * Update SimpleJSON library 3.8.1 (6022794) to 3.10.0 (c52efea) * Update Six compatibility library 1.10.0 (r405) to 1.10.0 (r433) * Update socks from SocksiPy 1.0 to PySocks 1.6.5 (b4323df) -* Update Tornado Web Server 4.5.dev1 (92f29b8) to 4.5.dev1 (38e493e) +* Update Tornado Web Server 4.5.dev1 (92f29b8) to 4.5.1 (79b2683) * Update unidecode library 0.04.18 to 0.04.20 (1e18d98) * Update xmltodict library 0.9.2 (eac0031) to 0.10.2 (375d3a6) * Update Bootstrap 3.2.0 to 3.3.7 @@ -87,6 +87,7 @@ * Change restart/shutdown to use updated jQuery * Remove AlphaReign torrent provider * Update cachecontrol library 0.11.5 to 0.11.7 (3b3b776) +* Update Tornado Web Server 4.5.dev1 (92f29b8) to 4.5.dev1 (38e493e) ### 0.12.26 (2017-08-20 13:05:00 UTC) diff --git a/lib/tornado/__init__.py b/lib/tornado/__init__.py index e856a5fe..f054e402 100644 --- a/lib/tornado/__init__.py +++ b/lib/tornado/__init__.py @@ -16,7 +16,7 @@ """The Tornado web server and tools.""" -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function # version is a human-readable version number. @@ -25,5 +25,5 @@ from __future__ import absolute_import, division, print_function, with_statement # is zero for an official release, positive for a development branch, # or negative for a release candidate or beta (after the base version # number has been incremented) -version = "4.5.dev1" -version_info = (4, 5, 0, -100) +version = "4.5.1" +version_info = (4, 5, 1, 0) diff --git a/lib/tornado/_locale_data.py b/lib/tornado/_locale_data.py index e073afe5..6fa2c297 100644 --- a/lib/tornado/_locale_data.py +++ b/lib/tornado/_locale_data.py @@ -17,7 +17,7 @@ """Data used by the tornado.locale module.""" -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function LOCALE_NAMES = { "af_ZA": {"name_en": u"Afrikaans", "name": u"Afrikaans"}, diff --git a/lib/tornado/auth.py b/lib/tornado/auth.py index 44144061..f02d2898 100644 --- a/lib/tornado/auth.py +++ b/lib/tornado/auth.py @@ -65,7 +65,7 @@ Example usage for Google OAuth: errors are more consistently reported through the ``Future`` interfaces. """ -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import base64 import binascii @@ -954,6 +954,20 @@ class FacebookGraphMixin(OAuth2Mixin): .. testoutput:: :hide: + This method returns a dictionary which may contain the following fields: + + * ``access_token``, a string which may be passed to `facebook_request` + * ``session_expires``, an integer encoded as a string representing + the time until the access token expires in seconds. This field should + be used like ``int(user['session_expires'])``; in a future version of + Tornado it will change from a string to an integer. + * ``id``, ``name``, ``first_name``, ``last_name``, ``locale``, ``picture``, + ``link``, plus any fields named in the ``extra_fields`` argument. These + fields are copied from the Facebook graph API `user object `_ + + .. versionchanged:: 4.5 + The ``session_expires`` field was updated to support changes made to the + Facebook API in March 2017. """ http = self.get_auth_http_client() args = { @@ -978,10 +992,10 @@ class FacebookGraphMixin(OAuth2Mixin): future.set_exception(AuthError('Facebook auth error: %s' % str(response))) return - args = urlparse.parse_qs(escape.native_str(response.body)) + args = escape.json_decode(response.body) session = { - "access_token": args["access_token"][-1], - "expires": args.get("expires") + "access_token": args.get("access_token"), + "expires_in": args.get("expires_in") } self.facebook_request( @@ -1004,7 +1018,12 @@ class FacebookGraphMixin(OAuth2Mixin): for field in fields: fieldmap[field] = user.get(field) - fieldmap.update({"access_token": session["access_token"], "session_expires": session.get("expires")}) + # session_expires is converted to str for compatibility with + # older versions in which the server used url-encoding and + # this code simply returned the string verbatim. + # This should change in Tornado 5.0. + fieldmap.update({"access_token": session["access_token"], + "session_expires": str(session.get("expires_in"))}) future.set_result(fieldmap) @_auth_return_future diff --git a/lib/tornado/autoreload.py b/lib/tornado/autoreload.py index 5e0d00d1..60571efe 100644 --- a/lib/tornado/autoreload.py +++ b/lib/tornado/autoreload.py @@ -45,7 +45,7 @@ incorrectly. """ -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import os import sys @@ -103,10 +103,6 @@ except ImportError: # os.execv is broken on Windows and can't properly parse command line # arguments and executable name if they contain whitespaces. subprocess # fixes that behavior. -# This distinction is also important because when we use execv, we want to -# close the IOLoop and all its file descriptors, to guard against any -# file descriptors that were not set CLOEXEC. When execv is not available, -# we must not close the IOLoop because we want the process to exit cleanly. _has_execv = sys.platform != 'win32' _watched_files = set() @@ -127,8 +123,6 @@ def start(io_loop=None, check_time=500): _io_loops[io_loop] = True if len(_io_loops) > 1: gen_log.warning("tornado.autoreload started more than once in the same process") - if _has_execv: - add_reload_hook(functools.partial(io_loop.close, all_fds=True)) modify_times = {} callback = functools.partial(_reload_on_update, modify_times) scheduler = ioloop.PeriodicCallback(callback, check_time, io_loop=io_loop) @@ -249,6 +243,7 @@ def _reload(): # unwind, so just exit uncleanly. os._exit(0) + _USAGE = """\ Usage: python -m tornado.autoreload -m module.to.run [args...] diff --git a/lib/tornado/concurrent.py b/lib/tornado/concurrent.py index ec68dc4f..667e6b17 100644 --- a/lib/tornado/concurrent.py +++ b/lib/tornado/concurrent.py @@ -21,7 +21,7 @@ a mostly-compatible `Future` class designed for use from coroutines, as well as some utility functions for interacting with the `concurrent.futures` package. """ -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import functools import platform @@ -234,7 +234,10 @@ class Future(object): if self._result is not None: return self._result if self._exc_info is not None: - raise_exc_info(self._exc_info) + try: + raise_exc_info(self._exc_info) + finally: + self = None self._check_done() return self._result @@ -340,6 +343,7 @@ class Future(object): app_log.error('Future %r exception was never retrieved: %s', self, ''.join(tb).rstrip()) + TracebackFuture = Future if futures is None: @@ -364,6 +368,7 @@ class DummyExecutor(object): def shutdown(self, wait=True): pass + dummy_executor = DummyExecutor() diff --git a/lib/tornado/curl_httpclient.py b/lib/tornado/curl_httpclient.py index bef78419..eef4a17a 100644 --- a/lib/tornado/curl_httpclient.py +++ b/lib/tornado/curl_httpclient.py @@ -16,7 +16,7 @@ """Non-blocking HTTP client implementation using pycurl.""" -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import collections import functools @@ -278,9 +278,9 @@ class CurlAsyncHTTPClient(AsyncHTTPClient): if curl_log.isEnabledFor(logging.DEBUG): curl.setopt(pycurl.VERBOSE, 1) curl.setopt(pycurl.DEBUGFUNCTION, self._curl_debug) - if hasattr(pycurl,'PROTOCOLS'): # PROTOCOLS first appeared in pycurl 7.19.5 (2014-07-12) - curl.setopt(pycurl.PROTOCOLS, pycurl.PROTO_HTTP|pycurl.PROTO_HTTPS) - curl.setopt(pycurl.REDIR_PROTOCOLS, pycurl.PROTO_HTTP|pycurl.PROTO_HTTPS) + if hasattr(pycurl, 'PROTOCOLS'): # PROTOCOLS first appeared in pycurl 7.19.5 (2014-07-12) + curl.setopt(pycurl.PROTOCOLS, pycurl.PROTO_HTTP | pycurl.PROTO_HTTPS) + curl.setopt(pycurl.REDIR_PROTOCOLS, pycurl.PROTO_HTTP | pycurl.PROTO_HTTPS) return curl def _curl_setup_request(self, curl, request, buffer, headers): diff --git a/lib/tornado/escape.py b/lib/tornado/escape.py index 7a3b0e03..2ca3fe3f 100644 --- a/lib/tornado/escape.py +++ b/lib/tornado/escape.py @@ -20,7 +20,7 @@ Also includes a few other miscellaneous string manipulation functions that have crept in over time. """ -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import json import re @@ -199,6 +199,7 @@ def utf8(value): ) return value.encode("utf-8") + _TO_UNICODE_TYPES = (unicode_type, type(None)) @@ -216,6 +217,7 @@ def to_unicode(value): ) return value.decode("utf-8") + # to_unicode was previously named _unicode not because it was private, # but to avoid conflicts with the built-in unicode() function/type _unicode = to_unicode @@ -264,6 +266,7 @@ def recursive_unicode(obj): else: return obj + # I originally used the regex from # http://daringfireball.net/2010/07/improved_regex_for_matching_urls # but it gets all exponential on certain patterns (such as too many trailing @@ -391,4 +394,5 @@ def _build_unicode_map(): unicode_map[name] = unichr(value) return unicode_map + _HTML_UNICODE_MAP = _build_unicode_map() diff --git a/lib/tornado/gen.py b/lib/tornado/gen.py index d7df3b52..99f91066 100644 --- a/lib/tornado/gen.py +++ b/lib/tornado/gen.py @@ -74,7 +74,7 @@ See the `convert_yielded` function to extend this mechanism. via ``singledispatch``. """ -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import collections import functools @@ -245,6 +245,7 @@ def coroutine(func, replace_callback=True): """ return _make_coroutine_wrapper(func, replace_callback=True) + # Ties lifetime of runners to their result futures. Github Issue #1769 # Generators, like any object in Python, must be strong referenced # in order to not be cleaned up by the garbage collector. When using @@ -264,6 +265,7 @@ def coroutine(func, replace_callback=True): # Runner alive. _futures_to_runners = weakref.WeakKeyDictionary() + def _make_coroutine_wrapper(func, replace_callback): """The inner workings of ``@gen.coroutine`` and ``@gen.engine``. @@ -315,6 +317,7 @@ def _make_coroutine_wrapper(func, replace_callback): future.set_exc_info(sys.exc_info()) else: _futures_to_runners[future] = Runner(result, future, yielded) + yielded = None try: return future finally: @@ -338,6 +341,8 @@ def _make_coroutine_wrapper(func, replace_callback): def is_coroutine_function(func): """Return whether *func* is a coroutine function, i.e. a function wrapped with `~.gen.coroutine`. + + .. versionadded:: 4.5 """ return getattr(func, '__tornado_coroutine__', False) @@ -715,6 +720,7 @@ def multi(children, quiet_exceptions=()): else: return multi_future(children, quiet_exceptions=quiet_exceptions) + Multi = multi @@ -960,6 +966,9 @@ coroutines that are likely to yield Futures that are ready instantly. Usage: ``yield gen.moment`` .. versionadded:: 4.0 + +.. deprecated:: 4.5 + ``yield None`` is now equivalent to ``yield gen.moment``. """ moment.set_result(None) @@ -990,6 +999,7 @@ class Runner(object): # of the coroutine. self.stack_context_deactivate = None if self.handle_yield(first_yielded): + gen = result_future = first_yielded = None self.run() def register_callback(self, key): @@ -1046,10 +1056,15 @@ class Runner(object): except Exception: self.had_exception = True exc_info = sys.exc_info() + future = None if exc_info is not None: - yielded = self.gen.throw(*exc_info) - exc_info = None + try: + yielded = self.gen.throw(*exc_info) + finally: + # Break up a reference to itself + # for faster GC on CPython. + exc_info = None else: yielded = self.gen.send(value) @@ -1082,6 +1097,7 @@ class Runner(object): return if not self.handle_yield(yielded): return + yielded = None finally: self.running = False @@ -1130,8 +1146,12 @@ class Runner(object): self.future.set_exc_info(sys.exc_info()) if not self.future.done() or self.future is moment: + def inner(f): + # Break a reference cycle to speed GC. + f = None # noqa + self.run() self.io_loop.add_future( - self.future, lambda f: self.run()) + self.future, inner) return False return True @@ -1153,6 +1173,7 @@ class Runner(object): self.stack_context_deactivate() self.stack_context_deactivate = None + Arguments = collections.namedtuple('Arguments', ['args', 'kwargs']) @@ -1172,6 +1193,7 @@ def _argument_adapter(callback): callback(None) return wrapper + # Convert Awaitables into Futures. It is unfortunately possible # to have infinite recursion here if those Awaitables assume that # we're using a different coroutine runner and yield objects @@ -1249,7 +1271,9 @@ def convert_yielded(yielded): .. versionadded:: 4.1 """ # Lists and dicts containing YieldPoints were handled earlier. - if isinstance(yielded, (list, dict)): + if yielded is None: + return moment + elif isinstance(yielded, (list, dict)): return multi(yielded) elif is_future(yielded): return yielded @@ -1258,6 +1282,7 @@ def convert_yielded(yielded): else: raise BadYieldError("yielded unknown object %r" % (yielded,)) + if singledispatch is not None: convert_yielded = singledispatch(convert_yielded) diff --git a/lib/tornado/http1connection.py b/lib/tornado/http1connection.py index 055f1827..3c5ccaf6 100644 --- a/lib/tornado/http1connection.py +++ b/lib/tornado/http1connection.py @@ -19,7 +19,7 @@ .. versionadded:: 4.0 """ -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import re @@ -257,6 +257,7 @@ class HTTP1Connection(httputil.HTTPConnection): if need_delegate_close: with _ExceptionLoggingContext(app_log): delegate.on_connection_close() + header_future = None self._clear_callbacks() raise gen.Return(True) @@ -489,7 +490,7 @@ class HTTP1Connection(httputil.HTTPConnection): elif ("Content-Length" in headers or headers.get("Transfer-Encoding", "").lower() == "chunked" or getattr(start_line, 'method', None) in ("HEAD", "GET")): - # start_line may be a request or reponse start line; only + # start_line may be a request or response start line; only # the former has a method attribute. return connection_header == "keep-alive" return False diff --git a/lib/tornado/httpclient.py b/lib/tornado/httpclient.py index 2b5d1fba..8436ece4 100644 --- a/lib/tornado/httpclient.py +++ b/lib/tornado/httpclient.py @@ -38,7 +38,7 @@ To select ``curl_httpclient``, call `AsyncHTTPClient.configure` at startup:: AsyncHTTPClient.configure("tornado.curl_httpclient.CurlAsyncHTTPClient") """ -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import functools import time @@ -423,6 +423,9 @@ class HTTPRequest(object): .. versionadded:: 4.2 The ``ssl_options`` argument. + + .. versionadded:: 4.5 + The ``proxy_auth_mode`` argument. """ # Note that some of these attributes go through property setters # defined below. @@ -670,5 +673,6 @@ def main(): print(native_str(response.body)) client.close() + if __name__ == "__main__": main() diff --git a/lib/tornado/httpserver.py b/lib/tornado/httpserver.py index c7b9c2f8..d757be18 100644 --- a/lib/tornado/httpserver.py +++ b/lib/tornado/httpserver.py @@ -26,7 +26,7 @@ class except to start a server at the beginning of the process to `tornado.httputil.HTTPServerRequest`. The old name remains as an alias. """ -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import socket @@ -62,6 +62,13 @@ class HTTPServer(TCPServer, Configurable, if Tornado is run behind an SSL-decoding proxy that does not set one of the supported ``xheaders``. + By default, when parsing the ``X-Forwarded-For`` header, Tornado will + select the last (i.e., the closest) address on the list of hosts as the + remote host IP address. To select the next server in the chain, a list of + trusted downstream hosts may be passed as the ``trusted_downstream`` + argument. These hosts will be skipped when parsing the ``X-Forwarded-For`` + header. + To make this server serve SSL traffic, send the ``ssl_options`` keyword argument with an `ssl.SSLContext` object. For compatibility with older versions of Python ``ssl_options`` may also be a dictionary of keyword @@ -124,6 +131,9 @@ class HTTPServer(TCPServer, Configurable, .. versionchanged:: 4.2 `HTTPServer` is now a subclass of `tornado.util.Configurable`. + + .. versionchanged:: 4.5 + Added the ``trusted_downstream`` argument. """ def __init__(self, *args, **kwargs): # Ignore args to __init__; real initialization belongs in @@ -138,7 +148,8 @@ class HTTPServer(TCPServer, Configurable, decompress_request=False, chunk_size=None, max_header_size=None, idle_connection_timeout=None, body_timeout=None, - max_body_size=None, max_buffer_size=None): + max_body_size=None, max_buffer_size=None, + trusted_downstream=None): self.request_callback = request_callback self.no_keep_alive = no_keep_alive self.xheaders = xheaders @@ -149,11 +160,13 @@ class HTTPServer(TCPServer, Configurable, max_header_size=max_header_size, header_timeout=idle_connection_timeout or 3600, max_body_size=max_body_size, - body_timeout=body_timeout) + body_timeout=body_timeout, + no_keep_alive=no_keep_alive) TCPServer.__init__(self, io_loop=io_loop, ssl_options=ssl_options, max_buffer_size=max_buffer_size, read_chunk_size=chunk_size) self._connections = set() + self.trusted_downstream = trusted_downstream @classmethod def configurable_base(cls): @@ -172,7 +185,8 @@ class HTTPServer(TCPServer, Configurable, def handle_stream(self, stream, address): context = _HTTPRequestContext(stream, address, - self.protocol) + self.protocol, + self.trusted_downstream) conn = HTTP1ServerConnection( stream, self.conn_params, context) self._connections.add(conn) @@ -219,7 +233,7 @@ class _CallableAdapter(httputil.HTTPMessageDelegate): class _HTTPRequestContext(object): - def __init__(self, stream, address, protocol): + def __init__(self, stream, address, protocol, trusted_downstream=None): self.address = address # Save the socket's address family now so we know how to # interpret self.address even after the stream is closed @@ -243,6 +257,7 @@ class _HTTPRequestContext(object): self.protocol = "http" self._orig_remote_ip = self.remote_ip self._orig_protocol = self.protocol + self.trusted_downstream = set(trusted_downstream or []) def __str__(self): if self.address_family in (socket.AF_INET, socket.AF_INET6): @@ -259,7 +274,10 @@ class _HTTPRequestContext(object): """Rewrite the ``remote_ip`` and ``protocol`` fields.""" # Squid uses X-Forwarded-For, others use X-Real-Ip ip = headers.get("X-Forwarded-For", self.remote_ip) - ip = ip.split(',')[-1].strip() + # Skip trusted downstream hosts in X-Forwarded-For list + for ip in (cand.strip() for cand in reversed(ip.split(','))): + if ip not in self.trusted_downstream: + break ip = headers.get("X-Real-Ip", ip) if netutil.is_valid_ip(ip): self.remote_ip = ip @@ -303,4 +321,5 @@ class _ProxyAdapter(httputil.HTTPMessageDelegate): def _cleanup(self): self.connection.context._unapply_xheaders() + HTTPRequest = httputil.HTTPServerRequest diff --git a/lib/tornado/httputil.py b/lib/tornado/httputil.py index 8ea8a01e..818ea914 100644 --- a/lib/tornado/httputil.py +++ b/lib/tornado/httputil.py @@ -20,7 +20,7 @@ This module also defines the `HTTPServerRequest` class which is exposed via `tornado.web.RequestHandler.request`. """ -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import calendar import collections @@ -38,11 +38,12 @@ from tornado.util import ObjectDict, PY3 if PY3: import http.cookies as Cookie from http.client import responses - from urllib.parse import urlencode + from urllib.parse import urlencode, urlparse, urlunparse, parse_qsl else: import Cookie from httplib import responses from urllib import urlencode + from urlparse import urlparse, urlunparse, parse_qsl # responses is unused in this file, but we re-export it to other files. @@ -98,6 +99,7 @@ class _NormalizedHeaderCache(dict): del self[old_key] return normalized + _normalized_headers = _NormalizedHeaderCache(1000) @@ -601,11 +603,28 @@ def url_concat(url, args): >>> url_concat("http://example.com/foo?a=b", [("c", "d"), ("c", "d2")]) 'http://example.com/foo?a=b&c=d&c=d2' """ - if not args: + if args is None: return url - if url[-1] not in ('?', '&'): - url += '&' if ('?' in url) else '?' - return url + urlencode(args) + parsed_url = urlparse(url) + if isinstance(args, dict): + parsed_query = parse_qsl(parsed_url.query, keep_blank_values=True) + parsed_query.extend(args.items()) + elif isinstance(args, list) or isinstance(args, tuple): + parsed_query = parse_qsl(parsed_url.query, keep_blank_values=True) + parsed_query.extend(args) + else: + err = "'args' parameter should be dict, list or tuple. Not {0}".format( + type(args)) + raise TypeError(err) + final_query = urlencode(parsed_query) + url = urlunparse(( + parsed_url[0], + parsed_url[1], + parsed_url[2], + parsed_url[3], + final_query, + parsed_url[5])) + return url class HTTPFile(ObjectDict): @@ -920,10 +939,12 @@ def split_host_and_port(netloc): port = None return (host, port) + _OctalPatt = re.compile(r"\\[0-3][0-7][0-7]") _QuotePatt = re.compile(r"[\\].") _nulljoin = ''.join + def _unquote_cookie(str): """Handle double quotes and escaping in cookie values. @@ -965,11 +986,11 @@ def _unquote_cookie(str): k = q_match.start(0) if q_match and (not o_match or k < j): # QuotePatt matched res.append(str[i:k]) - res.append(str[k+1]) + res.append(str[k + 1]) i = k + 2 else: # OctalPatt matched res.append(str[i:j]) - res.append(chr(int(str[j+1:j+4], 8))) + res.append(chr(int(str[j + 1:j + 4], 8))) i = j + 4 return _nulljoin(res) diff --git a/lib/tornado/ioloop.py b/lib/tornado/ioloop.py index 1b1a07cd..ad35787f 100644 --- a/lib/tornado/ioloop.py +++ b/lib/tornado/ioloop.py @@ -26,7 +26,7 @@ In addition to I/O events, the `IOLoop` can also schedule time-based events. `IOLoop.add_timeout` is a non-blocking alternative to `time.sleep`. """ -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import collections import datetime @@ -715,7 +715,7 @@ class PollIOLoop(IOLoop): self._closing = True self.remove_handler(self._waker.fileno()) if all_fds: - for fd, handler in self._handlers.values(): + for fd, handler in list(self._handlers.values()): self.close_fd(fd) self._waker.close() self._impl.close() diff --git a/lib/tornado/iostream.py b/lib/tornado/iostream.py index e3e70f04..4bf79a56 100644 --- a/lib/tornado/iostream.py +++ b/lib/tornado/iostream.py @@ -24,7 +24,7 @@ Contents: * `PipeIOStream`: Pipe-based IOStream implementation. """ -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import collections import errno @@ -82,6 +82,8 @@ _ERRNO_INPROGRESS = (errno.EINPROGRESS,) if hasattr(errno, "WSAEINPROGRESS"): _ERRNO_INPROGRESS += (errno.WSAEINPROGRESS,) # type: ignore +_WINDOWS = sys.platform.startswith('win') + class StreamClosedError(IOError): """Exception raised by `IOStream` methods when the stream is closed. @@ -158,11 +160,16 @@ class BaseIOStream(object): self.max_buffer_size // 2) self.max_write_buffer_size = max_write_buffer_size self.error = None - self._read_buffer = collections.deque() - self._write_buffer = collections.deque() + self._read_buffer = bytearray() + self._read_buffer_pos = 0 self._read_buffer_size = 0 + self._write_buffer = bytearray() + self._write_buffer_pos = 0 self._write_buffer_size = 0 self._write_buffer_frozen = False + self._total_write_index = 0 + self._total_write_done_index = 0 + self._pending_writes_while_frozen = [] self._read_delimiter = None self._read_regex = None self._read_max_bytes = None @@ -173,7 +180,7 @@ class BaseIOStream(object): self._read_future = None self._streaming_callback = None self._write_callback = None - self._write_future = None + self._write_futures = collections.deque() self._close_callback = None self._connect_callback = None self._connect_future = None @@ -367,36 +374,37 @@ class BaseIOStream(object): If no ``callback`` is given, this method returns a `.Future` that resolves (with a result of ``None``) when the write has been - completed. If `write` is called again before that `.Future` has - resolved, the previous future will be orphaned and will never resolve. + completed. + + The ``data`` argument may be of type `bytes` or `memoryview`. .. versionchanged:: 4.0 Now returns a `.Future` if no callback is given. + + .. versionchanged:: 4.5 + Added support for `memoryview` arguments. """ - assert isinstance(data, bytes) self._check_closed() - # We use bool(_write_buffer) as a proxy for write_buffer_size>0, - # so never put empty strings in the buffer. if data: if (self.max_write_buffer_size is not None and self._write_buffer_size + len(data) > self.max_write_buffer_size): raise StreamBufferFullError("Reached maximum write buffer size") - # Break up large contiguous strings before inserting them in the - # write buffer, so we don't have to recopy the entire thing - # as we slice off pieces to send to the socket. - WRITE_BUFFER_CHUNK_SIZE = 128 * 1024 - for i in range(0, len(data), WRITE_BUFFER_CHUNK_SIZE): - self._write_buffer.append(data[i:i + WRITE_BUFFER_CHUNK_SIZE]) - self._write_buffer_size += len(data) + if self._write_buffer_frozen: + self._pending_writes_while_frozen.append(data) + else: + self._write_buffer += data + self._write_buffer_size += len(data) + self._total_write_index += len(data) if callback is not None: self._write_callback = stack_context.wrap(callback) future = None else: - future = self._write_future = TracebackFuture() + future = TracebackFuture() future.add_done_callback(lambda f: f.exception()) + self._write_futures.append((self._total_write_index, future)) if not self._connecting: self._handle_write() - if self._write_buffer: + if self._write_buffer_size: self._add_io_state(self.io_loop.WRITE) self._maybe_add_error_listener() return future @@ -445,9 +453,8 @@ class BaseIOStream(object): if self._read_future is not None: futures.append(self._read_future) self._read_future = None - if self._write_future is not None: - futures.append(self._write_future) - self._write_future = None + futures += [future for _, future in self._write_futures] + self._write_futures.clear() if self._connect_future is not None: futures.append(self._connect_future) self._connect_future = None @@ -466,6 +473,7 @@ class BaseIOStream(object): # if the IOStream object is kept alive by a reference cycle. # TODO: Clear the read buffer too; it currently breaks some tests. self._write_buffer = None + self._write_buffer_size = 0 def reading(self): """Returns true if we are currently reading from the stream.""" @@ -473,7 +481,7 @@ class BaseIOStream(object): def writing(self): """Returns true if we are currently writing to the stream.""" - return bool(self._write_buffer) + return self._write_buffer_size > 0 def closed(self): """Returns true if the stream has been closed.""" @@ -650,7 +658,7 @@ class BaseIOStream(object): except Exception as e: if 1 != e.errno: gen_log.warning("error on read: %s" % e) - self.close(exc_info=True) + self.close(exc_info=e) return if pos is not None: self._read_from_buffer(pos) @@ -744,7 +752,7 @@ class BaseIOStream(object): break if chunk is None: return 0 - self._read_buffer.append(chunk) + self._read_buffer += chunk self._read_buffer_size += len(chunk) if self._read_buffer_size > self.max_buffer_size: gen_log.error("Reached maximum read buffer size") @@ -792,30 +800,25 @@ class BaseIOStream(object): # since large merges are relatively expensive and get undone in # _consume(). if self._read_buffer: - while True: - loc = self._read_buffer[0].find(self._read_delimiter) - if loc != -1: - delimiter_len = len(self._read_delimiter) - self._check_max_bytes(self._read_delimiter, - loc + delimiter_len) - return loc + delimiter_len - if len(self._read_buffer) == 1: - break - _double_prefix(self._read_buffer) + loc = self._read_buffer.find(self._read_delimiter, + self._read_buffer_pos) + if loc != -1: + loc -= self._read_buffer_pos + delimiter_len = len(self._read_delimiter) + self._check_max_bytes(self._read_delimiter, + loc + delimiter_len) + return loc + delimiter_len self._check_max_bytes(self._read_delimiter, - len(self._read_buffer[0])) + self._read_buffer_size) elif self._read_regex is not None: if self._read_buffer: - while True: - m = self._read_regex.search(self._read_buffer[0]) - if m is not None: - self._check_max_bytes(self._read_regex, m.end()) - return m.end() - if len(self._read_buffer) == 1: - break - _double_prefix(self._read_buffer) - self._check_max_bytes(self._read_regex, - len(self._read_buffer[0])) + m = self._read_regex.search(self._read_buffer, + self._read_buffer_pos) + if m is not None: + loc = m.end() - self._read_buffer_pos + self._check_max_bytes(self._read_regex, loc) + return loc + self._check_max_bytes(self._read_regex, self._read_buffer_size) return None def _check_max_bytes(self, delimiter, size): @@ -825,35 +828,56 @@ class BaseIOStream(object): "delimiter %r not found within %d bytes" % ( delimiter, self._read_max_bytes)) + def _freeze_write_buffer(self, size): + self._write_buffer_frozen = size + + def _unfreeze_write_buffer(self): + self._write_buffer_frozen = False + self._write_buffer += b''.join(self._pending_writes_while_frozen) + self._write_buffer_size += sum(map(len, self._pending_writes_while_frozen)) + self._pending_writes_while_frozen[:] = [] + + def _got_empty_write(self, size): + """ + Called when a non-blocking write() failed writing anything. + Can be overridden in subclasses. + """ + def _handle_write(self): - while self._write_buffer: + while self._write_buffer_size: + assert self._write_buffer_size >= 0 try: - if not self._write_buffer_frozen: + start = self._write_buffer_pos + if self._write_buffer_frozen: + size = self._write_buffer_frozen + elif _WINDOWS: # On windows, socket.send blows up if given a # write buffer that's too large, instead of just # returning the number of bytes it was able to # process. Therefore we must not call socket.send # with more than 128KB at a time. - _merge_prefix(self._write_buffer, 128 * 1024) - num_bytes = self.write_to_fd(self._write_buffer[0]) + size = 128 * 1024 + else: + size = self._write_buffer_size + num_bytes = self.write_to_fd( + memoryview(self._write_buffer)[start:start + size]) if num_bytes == 0: - # With OpenSSL, if we couldn't write the entire buffer, - # the very same string object must be used on the - # next call to send. Therefore we suppress - # merging the write buffer after an incomplete send. - # A cleaner solution would be to set - # SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER, but this is - # not yet accessible from python - # (http://bugs.python.org/issue8240) - self._write_buffer_frozen = True + self._got_empty_write(size) break - self._write_buffer_frozen = False - _merge_prefix(self._write_buffer, num_bytes) - self._write_buffer.popleft() + self._write_buffer_pos += num_bytes self._write_buffer_size -= num_bytes + # Amortized O(1) shrink + # (this heuristic is implemented natively in Python 3.4+ + # but is replicated here for Python 2) + if self._write_buffer_pos > self._write_buffer_size: + del self._write_buffer[:self._write_buffer_pos] + self._write_buffer_pos = 0 + if self._write_buffer_frozen: + self._unfreeze_write_buffer() + self._total_write_done_index += num_bytes except (socket.error, IOError, OSError) as e: if e.args[0] in _ERRNO_WOULDBLOCK: - self._write_buffer_frozen = True + self._got_empty_write(size) break else: if not self._is_connreset(e): @@ -864,22 +888,38 @@ class BaseIOStream(object): self.fileno(), e) self.close(exc_info=True) return - if not self._write_buffer: + + while self._write_futures: + index, future = self._write_futures[0] + if index > self._total_write_done_index: + break + self._write_futures.popleft() + future.set_result(None) + + if not self._write_buffer_size: if self._write_callback: callback = self._write_callback self._write_callback = None self._run_callback(callback) - if self._write_future: - future = self._write_future - self._write_future = None - future.set_result(None) def _consume(self, loc): + # Consume loc bytes from the read buffer and return them if loc == 0: return b"" - _merge_prefix(self._read_buffer, loc) + assert loc <= self._read_buffer_size + # Slice the bytearray buffer into bytes, without intermediate copying + b = (memoryview(self._read_buffer) + [self._read_buffer_pos:self._read_buffer_pos + loc] + ).tobytes() + self._read_buffer_pos += loc self._read_buffer_size -= loc - return self._read_buffer.popleft() + # Amortized O(1) shrink + # (this heuristic is implemented natively in Python 3.4+ + # but is replicated here for Python 2) + if self._read_buffer_pos > self._read_buffer_size: + del self._read_buffer[:self._read_buffer_pos] + self._read_buffer_pos = 0 + return b def _check_closed(self): if self.closed(): @@ -1125,7 +1165,7 @@ class IOStream(BaseIOStream): suitably-configured `ssl.SSLContext` to disable. """ if (self._read_callback or self._read_future or - self._write_callback or self._write_future or + self._write_callback or self._write_futures or self._connect_callback or self._connect_future or self._pending_callbacks or self._closed or self._read_buffer or self._write_buffer): @@ -1252,6 +1292,17 @@ class SSLIOStream(IOStream): def writing(self): return self._handshake_writing or super(SSLIOStream, self).writing() + def _got_empty_write(self, size): + # With OpenSSL, if we couldn't write the entire buffer, + # the very same string object must be used on the + # next call to send. Therefore we suppress + # merging the write buffer after an incomplete send. + # A cleaner solution would be to set + # SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER, but this is + # not yet accessible from python + # (http://bugs.python.org/issue8240) + self._freeze_write_buffer(size) + def _do_ssl_handshake(self): # Based on code from test_ssl.py in the python stdlib try: @@ -1499,53 +1550,6 @@ class PipeIOStream(BaseIOStream): return chunk -def _double_prefix(deque): - """Grow by doubling, but don't split the second chunk just because the - first one is small. - """ - new_len = max(len(deque[0]) * 2, - (len(deque[0]) + len(deque[1]))) - _merge_prefix(deque, new_len) - - -def _merge_prefix(deque, size): - """Replace the first entries in a deque of strings with a single - string of up to size bytes. - - >>> d = collections.deque(['abc', 'de', 'fghi', 'j']) - >>> _merge_prefix(d, 5); print(d) - deque(['abcde', 'fghi', 'j']) - - Strings will be split as necessary to reach the desired size. - >>> _merge_prefix(d, 7); print(d) - deque(['abcdefg', 'hi', 'j']) - - >>> _merge_prefix(d, 3); print(d) - deque(['abc', 'defg', 'hi', 'j']) - - >>> _merge_prefix(d, 100); print(d) - deque(['abcdefghij']) - """ - if len(deque) == 1 and len(deque[0]) <= size: - return - prefix = [] - remaining = size - while deque and remaining > 0: - chunk = deque.popleft() - if len(chunk) > remaining: - deque.appendleft(chunk[remaining:]) - chunk = chunk[:remaining] - prefix.append(chunk) - remaining -= len(chunk) - # This data structure normally just contains byte strings, but - # the unittest gets messy if it doesn't use the default str() type, - # so do the merge based on the type of data that's actually present. - if prefix: - deque.appendleft(type(prefix[0])().join(prefix)) - if not deque: - deque.appendleft(b"") - - def doctests(): import doctest return doctest.DocTestSuite() diff --git a/lib/tornado/locale.py b/lib/tornado/locale.py index c1cb6792..7dba10d6 100644 --- a/lib/tornado/locale.py +++ b/lib/tornado/locale.py @@ -39,7 +39,7 @@ supported by `gettext` and related tools). If neither method is called, the `Locale.translate` method will simply return the original string. """ -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import codecs import csv @@ -187,7 +187,7 @@ def load_gettext_translations(directory, domain): {directory}/{lang}/LC_MESSAGES/{domain}.mo - Three steps are required to have you app translated: + Three steps are required to have your app translated: 1. Generate POT translation file:: diff --git a/lib/tornado/locks.py b/lib/tornado/locks.py index d84a9a87..4f9ecf6d 100644 --- a/lib/tornado/locks.py +++ b/lib/tornado/locks.py @@ -12,7 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import collections diff --git a/lib/tornado/log.py b/lib/tornado/log.py index ac1bb95e..654afc02 100644 --- a/lib/tornado/log.py +++ b/lib/tornado/log.py @@ -28,7 +28,7 @@ These streams may be configured independently using the standard library's `logging` module. For example, you may wish to send ``tornado.access`` logs to a separate file for analysis. """ -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import logging import logging.handlers @@ -37,6 +37,11 @@ import sys from tornado.escape import _unicode from tornado.util import unicode_type, basestring_type +try: + import colorama +except ImportError: + colorama = None + try: import curses # type: ignore except ImportError: @@ -49,15 +54,21 @@ gen_log = logging.getLogger("tornado.general") def _stderr_supports_color(): - color = False - if curses and hasattr(sys.stderr, 'isatty') and sys.stderr.isatty(): - try: - curses.setupterm() - if curses.tigetnum("colors") > 0: - color = True - except Exception: - pass - return color + try: + if hasattr(sys.stderr, 'isatty') and sys.stderr.isatty(): + if curses: + curses.setupterm() + if curses.tigetnum("colors") > 0: + return True + elif colorama: + if sys.stderr is getattr(colorama.initialise, 'wrapped_stderr', + object()): + return True + except Exception: + # Very broad exception handling because it's always better to + # fall back to non-colored logs than to break at startup. + pass + return False def _safe_unicode(s): @@ -79,6 +90,17 @@ class LogFormatter(logging.Formatter): This formatter is enabled automatically by `tornado.options.parse_command_line` or `tornado.options.parse_config_file` (unless ``--logging=none`` is used). + + Color support on Windows versions that do not support ANSI color codes is + enabled by use of the colorama__ library. Applications that wish to use + this must first initialize colorama with a call to ``colorama.init``. + See the colorama documentation for details. + + __ https://pypi.python.org/pypi/colorama + + .. versionchanged:: 4.5 + Added support for ``colorama``. Changed the constructor + signature to be compatible with `logging.config.dictConfig`. """ DEFAULT_FORMAT = '%(color)s[%(levelname)1.1s %(asctime)s %(module)s:%(lineno)d]%(end_color)s %(message)s' DEFAULT_DATE_FORMAT = '%y%m%d %H:%M:%S' @@ -89,8 +111,8 @@ class LogFormatter(logging.Formatter): logging.ERROR: 1, # Red } - def __init__(self, color=True, fmt=DEFAULT_FORMAT, - datefmt=DEFAULT_DATE_FORMAT, colors=DEFAULT_COLORS): + def __init__(self, fmt=DEFAULT_FORMAT, datefmt=DEFAULT_DATE_FORMAT, + style='%', color=True, colors=DEFAULT_COLORS): r""" :arg bool color: Enables color support. :arg string fmt: Log message format. @@ -111,21 +133,28 @@ class LogFormatter(logging.Formatter): self._colors = {} if color and _stderr_supports_color(): - # The curses module has some str/bytes confusion in - # python3. Until version 3.2.3, most methods return - # bytes, but only accept strings. In addition, we want to - # output these strings with the logging module, which - # works with unicode strings. The explicit calls to - # unicode() below are harmless in python2 but will do the - # right conversion in python 3. - fg_color = (curses.tigetstr("setaf") or - curses.tigetstr("setf") or "") - if (3, 0) < sys.version_info < (3, 2, 3): - fg_color = unicode_type(fg_color, "ascii") + if curses is not None: + # The curses module has some str/bytes confusion in + # python3. Until version 3.2.3, most methods return + # bytes, but only accept strings. In addition, we want to + # output these strings with the logging module, which + # works with unicode strings. The explicit calls to + # unicode() below are harmless in python2 but will do the + # right conversion in python 3. + fg_color = (curses.tigetstr("setaf") or + curses.tigetstr("setf") or "") + if (3, 0) < sys.version_info < (3, 2, 3): + fg_color = unicode_type(fg_color, "ascii") - for levelno, code in colors.items(): - self._colors[levelno] = unicode_type(curses.tparm(fg_color, code), "ascii") - self._normal = unicode_type(curses.tigetstr("sgr0"), "ascii") + for levelno, code in colors.items(): + self._colors[levelno] = unicode_type(curses.tparm(fg_color, code), "ascii") + self._normal = unicode_type(curses.tigetstr("sgr0"), "ascii") + else: + # If curses is not present (currently we'll only get here for + # colorama on windows), assume hard-coded ANSI color codes. + for levelno, code in colors.items(): + self._colors[levelno] = '\033[2;3%dm' % code + self._normal = '\033[0m' else: self._normal = '' diff --git a/lib/tornado/netutil.py b/lib/tornado/netutil.py index 20b4bdd6..c34c8c8b 100644 --- a/lib/tornado/netutil.py +++ b/lib/tornado/netutil.py @@ -16,7 +16,7 @@ """Miscellaneous network utility code.""" -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import errno import os @@ -200,6 +200,7 @@ def bind_sockets(port, address=None, family=socket.AF_UNSPEC, sockets.append(sock) return sockets + if hasattr(socket, 'AF_UNIX'): def bind_unix_socket(file, mode=0o600, backlog=_DEFAULT_BACKLOG): """Creates a listening unix socket. diff --git a/lib/tornado/options.py b/lib/tornado/options.py index 2fbb32ad..0a72cc65 100644 --- a/lib/tornado/options.py +++ b/lib/tornado/options.py @@ -82,7 +82,7 @@ instances to define isolated sets of options, such as for subcommands. underscores. """ -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import datetime import numbers diff --git a/lib/tornado/platform/asyncio.py b/lib/tornado/platform/asyncio.py index 9556da61..830ee1f3 100644 --- a/lib/tornado/platform/asyncio.py +++ b/lib/tornado/platform/asyncio.py @@ -19,7 +19,7 @@ loops. Windows. Use the `~asyncio.SelectorEventLoop` instead. """ -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import functools import tornado.concurrent @@ -30,7 +30,7 @@ from tornado import stack_context try: # Import the real asyncio module for py33+ first. Older versions of the # trollius backport also use this name. - import asyncio # type: ignore + import asyncio # type: ignore except ImportError as e: # Asyncio itself isn't available; see if trollius is (backport to py26+). try: @@ -217,5 +217,6 @@ def to_asyncio_future(tornado_future): tornado.concurrent.chain_future(tornado_future, af) return af + if hasattr(convert_yielded, 'register'): convert_yielded.register(asyncio.Future, to_tornado_future) # type: ignore diff --git a/lib/tornado/platform/auto.py b/lib/tornado/platform/auto.py index 449b634b..1f4d7001 100644 --- a/lib/tornado/platform/auto.py +++ b/lib/tornado/platform/auto.py @@ -23,7 +23,7 @@ Most code that needs access to this functionality should do e.g.:: from tornado.platform.auto import set_close_exec """ -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import os diff --git a/lib/tornado/platform/caresresolver.py b/lib/tornado/platform/caresresolver.py index 4205de30..fd6e9d27 100644 --- a/lib/tornado/platform/caresresolver.py +++ b/lib/tornado/platform/caresresolver.py @@ -1,4 +1,4 @@ -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import pycares # type: ignore import socket diff --git a/lib/tornado/platform/common.py b/lib/tornado/platform/common.py index d78ee686..a73f8db7 100644 --- a/lib/tornado/platform/common.py +++ b/lib/tornado/platform/common.py @@ -1,5 +1,5 @@ """Lowest-common-denominator implementations of platform functionality.""" -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import errno import socket @@ -8,6 +8,7 @@ import time from tornado.platform import interface from tornado.util import errno_from_exception + def try_close(f): # Avoid issue #875 (race condition when using the file in another # thread). diff --git a/lib/tornado/platform/epoll.py b/lib/tornado/platform/epoll.py index b08cc628..80bfd8af 100644 --- a/lib/tornado/platform/epoll.py +++ b/lib/tornado/platform/epoll.py @@ -14,7 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. """EPoll-based IOLoop implementation for Linux systems.""" -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import select diff --git a/lib/tornado/platform/interface.py b/lib/tornado/platform/interface.py index cc062391..c0ef2905 100644 --- a/lib/tornado/platform/interface.py +++ b/lib/tornado/platform/interface.py @@ -21,7 +21,7 @@ for other tornado.platform modules. Most code should import the appropriate implementation from `tornado.platform.auto`. """ -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function def set_close_exec(fd): @@ -62,5 +62,6 @@ class Waker(object): """Closes the waker's file descriptor(s).""" raise NotImplementedError() + def monotonic_time(): raise NotImplementedError() diff --git a/lib/tornado/platform/kqueue.py b/lib/tornado/platform/kqueue.py index f8f3e4a6..3a5d4174 100644 --- a/lib/tornado/platform/kqueue.py +++ b/lib/tornado/platform/kqueue.py @@ -14,7 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. """KQueue-based IOLoop implementation for BSD/Mac systems.""" -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import select diff --git a/lib/tornado/platform/posix.py b/lib/tornado/platform/posix.py index 572c0139..9bf1f188 100644 --- a/lib/tornado/platform/posix.py +++ b/lib/tornado/platform/posix.py @@ -16,7 +16,7 @@ """Posix implementations of platform-specific functionality.""" -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import fcntl import os diff --git a/lib/tornado/platform/select.py b/lib/tornado/platform/select.py index db52ef91..a18049f7 100644 --- a/lib/tornado/platform/select.py +++ b/lib/tornado/platform/select.py @@ -17,7 +17,7 @@ Used as a fallback for systems that don't support epoll or kqueue. """ -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import select diff --git a/lib/tornado/platform/twisted.py b/lib/tornado/platform/twisted.py index 92157c7c..0f9787e8 100644 --- a/lib/tornado/platform/twisted.py +++ b/lib/tornado/platform/twisted.py @@ -21,7 +21,7 @@ depending on which library's underlying event loop you want to use. This module has been tested with Twisted versions 11.0.0 and newer. """ -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import datetime import functools @@ -574,6 +574,7 @@ class TwistedResolver(Resolver): ] raise gen.Return(result) + if hasattr(gen.convert_yielded, 'register'): @gen.convert_yielded.register(Deferred) # type: ignore def _(d): diff --git a/lib/tornado/platform/windows.py b/lib/tornado/platform/windows.py index 9a319f27..e94a0cf1 100644 --- a/lib/tornado/platform/windows.py +++ b/lib/tornado/platform/windows.py @@ -2,7 +2,7 @@ # for production use. -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import ctypes # type: ignore import ctypes.wintypes # type: ignore diff --git a/lib/tornado/process.py b/lib/tornado/process.py index 7c876494..fae94f3c 100644 --- a/lib/tornado/process.py +++ b/lib/tornado/process.py @@ -18,7 +18,7 @@ the server into multiple processes and managing subprocesses. """ -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import errno import os @@ -355,6 +355,10 @@ class Subprocess(object): else: assert os.WIFEXITED(status) self.returncode = os.WEXITSTATUS(status) + # We've taken over wait() duty from the subprocess.Popen + # object. If we don't inform it of the process's return code, + # it will log a warning at destruction in python 3.6+. + self.proc.returncode = self.returncode if self._exit_callback: callback = self._exit_callback self._exit_callback = None diff --git a/lib/tornado/queues.py b/lib/tornado/queues.py index b8e9b569..0041a800 100644 --- a/lib/tornado/queues.py +++ b/lib/tornado/queues.py @@ -12,7 +12,17 @@ # License for the specific language governing permissions and limitations # under the License. -from __future__ import absolute_import, division, print_function, with_statement +"""Asynchronous queues for coroutines. + +.. warning:: + + Unlike the standard library's `queue` module, the classes defined here + are *not* thread-safe. To use these queues from another thread, + use `.IOLoop.add_callback` to transfer control to the `.IOLoop` thread + before calling any queue methods. +""" + +from __future__ import absolute_import, division, print_function import collections import heapq diff --git a/lib/tornado/routing.py b/lib/tornado/routing.py index 71c63b3d..6762dc05 100644 --- a/lib/tornado/routing.py +++ b/lib/tornado/routing.py @@ -12,16 +12,23 @@ # License for the specific language governing permissions and limitations # under the License. -"""Basic routing implementation. +"""Flexible routing implementation. -Tornado routes HTTP requests to appropriate handlers using `Router` class implementations. +Tornado routes HTTP requests to appropriate handlers using `Router` +class implementations. The `tornado.web.Application` class is a +`Router` implementation and may be used directly, or the classes in +this module may be used for additional flexibility. The `RuleRouter` +class can match on more criteria than `.Application`, or the `Router` +interface can be subclassed for maximum customization. -`Router` interface extends `~.httputil.HTTPServerConnectionDelegate` to provide additional -routing capabilities. This also means that any `Router` implementation can be used directly -as a ``request_callback`` for `~.httpserver.HTTPServer` constructor. +`Router` interface extends `~.httputil.HTTPServerConnectionDelegate` +to provide additional routing capabilities. This also means that any +`Router` implementation can be used directly as a ``request_callback`` +for `~.httpserver.HTTPServer` constructor. -`Router` subclass must implement a ``find_handler`` method to provide a suitable -`~.httputil.HTTPMessageDelegate` instance to handle the request: +`Router` subclass must implement a ``find_handler`` method to provide +a suitable `~.httputil.HTTPMessageDelegate` instance to handle the +request: .. code-block:: python @@ -44,16 +51,18 @@ as a ``request_callback`` for `~.httpserver.HTTPServer` constructor. router = CustomRouter() server = HTTPServer(router) -The main responsibility of `Router` implementation is to provide a mapping from a request -to `~.httputil.HTTPMessageDelegate` instance that will handle this request. In the example above -we can see that routing is possible even without instantiating an `~.web.Application`. +The main responsibility of `Router` implementation is to provide a +mapping from a request to `~.httputil.HTTPMessageDelegate` instance +that will handle this request. In the example above we can see that +routing is possible even without instantiating an `~.web.Application`. -For routing to `~.web.RequestHandler` implementations we need an `~.web.Application` instance. -`~.web.Application.get_handler_delegate` provides a convenient way to create -`~.httputil.HTTPMessageDelegate` for a given request and `~.web.RequestHandler`. +For routing to `~.web.RequestHandler` implementations we need an +`~.web.Application` instance. `~.web.Application.get_handler_delegate` +provides a convenient way to create `~.httputil.HTTPMessageDelegate` +for a given request and `~.web.RequestHandler`. -Here is a simple example of how we can we route to `~.web.RequestHandler` subclasses -by HTTP method: +Here is a simple example of how we can we route to +`~.web.RequestHandler` subclasses by HTTP method: .. code-block:: python @@ -81,16 +90,18 @@ by HTTP method: router = HTTPMethodRouter(Application()) server = HTTPServer(router) -`ReversibleRouter` interface adds the ability to distinguish between the routes and -reverse them to the original urls using route's name and additional arguments. -`~.web.Application` is itself an implementation of `ReversibleRouter` class. +`ReversibleRouter` interface adds the ability to distinguish between +the routes and reverse them to the original urls using route's name +and additional arguments. `~.web.Application` is itself an +implementation of `ReversibleRouter` class. -`RuleRouter` and `ReversibleRuleRouter` are implementations of `Router` and `ReversibleRouter` -interfaces and can be used for creating rule-based routing configurations. +`RuleRouter` and `ReversibleRuleRouter` are implementations of +`Router` and `ReversibleRouter` interfaces and can be used for +creating rule-based routing configurations. -Rules are instances of `Rule` class. They contain a `Matcher`, which provides the logic for -determining whether the rule is a match for a particular request and a target, which can be -one of the following. +Rules are instances of `Rule` class. They contain a `Matcher`, which +provides the logic for determining whether the rule is a match for a +particular request and a target, which can be one of the following. 1) An instance of `~.httputil.HTTPServerConnectionDelegate`: @@ -159,9 +170,12 @@ In the example below `RuleRouter` is used to route between applications: server = HTTPServer(router) For more information on application-level routing see docs for `~.web.Application`. + +.. versionadded:: 4.5 + """ -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import re from functools import partial diff --git a/lib/tornado/simple_httpclient.py b/lib/tornado/simple_httpclient.py index adcf38b2..8fb70707 100644 --- a/lib/tornado/simple_httpclient.py +++ b/lib/tornado/simple_httpclient.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function from tornado.escape import utf8, _unicode from tornado import gen @@ -499,7 +499,7 @@ class _HTTPConnection(httputil.HTTPMessageDelegate): def _should_follow_redirect(self): return (self.request.follow_redirects and self.request.max_redirects > 0 and - self.code in (301, 302, 303, 307)) + self.code in (301, 302, 303, 307, 308)) def finish(self): data = b''.join(self.chunks) diff --git a/lib/tornado/stack_context.py b/lib/tornado/stack_context.py index 2c0d9ee7..61ae51f4 100644 --- a/lib/tornado/stack_context.py +++ b/lib/tornado/stack_context.py @@ -67,7 +67,7 @@ Here are a few rules of thumb for when it's necessary: block that references your `StackContext`. """ -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import sys import threading @@ -82,6 +82,8 @@ class StackContextInconsistentError(Exception): class _State(threading.local): def __init__(self): self.contexts = (tuple(), None) + + _state = _State() diff --git a/lib/tornado/tcpclient.py b/lib/tornado/tcpclient.py index 11146860..33074bd5 100644 --- a/lib/tornado/tcpclient.py +++ b/lib/tornado/tcpclient.py @@ -16,7 +16,7 @@ """A non-blocking TCP connection factory. """ -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import functools import socket @@ -155,16 +155,30 @@ class TCPClient(object): @gen.coroutine def connect(self, host, port, af=socket.AF_UNSPEC, ssl_options=None, - max_buffer_size=None): + max_buffer_size=None, source_ip=None, source_port=None): """Connect to the given host and port. Asynchronously returns an `.IOStream` (or `.SSLIOStream` if ``ssl_options`` is not None). + + Using the ``source_ip`` kwarg, one can specify the source + IP address to use when establishing the connection. + In case the user needs to resolve and + use a specific interface, it has to be handled outside + of Tornado as this depends very much on the platform. + + Similarly, when the user requires a certain source port, it can + be specified using the ``source_port`` arg. + + .. versionchanged:: 4.5 + Added the ``source_ip`` and ``source_port`` arguments. """ addrinfo = yield self.resolver.resolve(host, port, af) connector = _Connector( addrinfo, self.io_loop, - functools.partial(self._create_stream, max_buffer_size)) + functools.partial(self._create_stream, max_buffer_size, + source_ip=source_ip, source_port=source_port) + ) af, addr, stream = yield connector.start() # TODO: For better performance we could cache the (af, addr) # information here and re-use it on subsequent connections to @@ -174,13 +188,32 @@ class TCPClient(object): server_hostname=host) raise gen.Return(stream) - def _create_stream(self, max_buffer_size, af, addr): + def _create_stream(self, max_buffer_size, af, addr, source_ip=None, + source_port=None): # Always connect in plaintext; we'll convert to ssl if necessary # after one connection has completed. + source_port_bind = source_port if isinstance(source_port, int) else 0 + source_ip_bind = source_ip + if source_port_bind and not source_ip: + # User required a specific port, but did not specify + # a certain source IP, will bind to the default loopback. + source_ip_bind = '::1' if af == socket.AF_INET6 else '127.0.0.1' + # Trying to use the same address family as the requested af socket: + # - 127.0.0.1 for IPv4 + # - ::1 for IPv6 + socket_obj = socket.socket(af) + if source_port_bind or source_ip_bind: + # If the user requires binding also to a specific IP/port. + try: + socket_obj.bind((source_ip_bind, source_port_bind)) + except socket.error: + socket_obj.close() + # Fail loudly if unable to use the IP/port. + raise try: - stream = IOStream(socket.socket(af), - io_loop=self.io_loop, - max_buffer_size=max_buffer_size) + stream = IOStream(socket_obj, + io_loop=self.io_loop, + max_buffer_size=max_buffer_size) except socket.error as e: fu = Future() fu.set_exception(e) diff --git a/lib/tornado/tcpserver.py b/lib/tornado/tcpserver.py index ac666698..f47ec89a 100644 --- a/lib/tornado/tcpserver.py +++ b/lib/tornado/tcpserver.py @@ -15,7 +15,7 @@ # under the License. """A non-blocking, single-threaded TCP server.""" -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import errno import os diff --git a/lib/tornado/template.py b/lib/tornado/template.py index 67c61e6b..3b2fa3fe 100644 --- a/lib/tornado/template.py +++ b/lib/tornado/template.py @@ -196,7 +196,7 @@ if you need to include a literal ``{{``, ``{%``, or ``{#`` in the output. `filter_whitespace` for available options. New in Tornado 4.3. """ -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import datetime import linecache diff --git a/lib/tornado/testing.py b/lib/tornado/testing.py index 902bfdfc..74d04b60 100644 --- a/lib/tornado/testing.py +++ b/lib/tornado/testing.py @@ -10,7 +10,7 @@ for the tornado.autoreload module to rerun the tests when code changes. """ -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function try: from tornado import gen @@ -656,7 +656,9 @@ def main(**kwargs): This test runner is essentially equivalent to `unittest.main` from the standard library, but adds support for tornado-style option - parsing and log formatting. + parsing and log formatting. It is *not* necessary to use this + `main` function to run tests using `AsyncTestCase`; these tests + are self-contained and can run with any test runner. The easiest way to run a test is via the command line:: @@ -735,5 +737,6 @@ def main(**kwargs): gen_log.error('FAIL') raise + if __name__ == '__main__': main() diff --git a/lib/tornado/util.py b/lib/tornado/util.py index d0f83d1f..981b94c8 100644 --- a/lib/tornado/util.py +++ b/lib/tornado/util.py @@ -10,7 +10,7 @@ interface of its subclasses, including `.AsyncHTTPClient`, `.IOLoop`, and `.Resolver`. """ -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import array import atexit @@ -193,7 +193,11 @@ def exec_in(code, glob, loc=None): if PY3: exec(""" def raise_exc_info(exc_info): - raise exc_info[1].with_traceback(exc_info[2]) + try: + raise exc_info[1].with_traceback(exc_info[2]) + finally: + exc_info = None + """) else: exec(""" @@ -232,6 +236,7 @@ def _re_unescape_replacement(match): raise ValueError("cannot unescape '\\\\%s'" % group[0]) return group + _re_unescape_pattern = re.compile(r'\\(.)', re.DOTALL) @@ -450,6 +455,7 @@ def _websocket_mask_python(mask, data): else: return unmasked_arr.tostring() + if (os.environ.get('TORNADO_NO_EXTENSION') or os.environ.get('TORNADO_EXTENSION') == '0'): # These environment variables exist to make it easier to do performance diff --git a/lib/tornado/web.py b/lib/tornado/web.py index 9557a6f3..d79889fa 100644 --- a/lib/tornado/web.py +++ b/lib/tornado/web.py @@ -56,7 +56,7 @@ request. """ -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import base64 import binascii @@ -756,45 +756,21 @@ class RequestHandler(object): if body_part: html_bodies.append(utf8(body_part)) - def is_absolute(path): - return any(path.startswith(x) for x in ["/", "http:", "https:"]) if js_files: # Maintain order of JavaScript files given by modules - paths = [] - unique_paths = set() - for path in js_files: - if not is_absolute(path): - path = self.static_url(path) - if path not in unique_paths: - paths.append(path) - unique_paths.add(path) - js = ''.join('' - for p in paths) + js = self.render_linked_js(js_files) sloc = html.rindex(b'') html = html[:sloc] + utf8(js) + b'\n' + html[sloc:] if js_embed: - js = b'' + js = self.render_embed_js(js_embed) sloc = html.rindex(b'') html = html[:sloc] + js + b'\n' + html[sloc:] if css_files: - paths = [] - unique_paths = set() - for path in css_files: - if not is_absolute(path): - path = self.static_url(path) - if path not in unique_paths: - paths.append(path) - unique_paths.add(path) - css = ''.join('' - for p in paths) + css = self.render_linked_css(css_files) hloc = html.index(b'') html = html[:hloc] + utf8(css) + b'\n' + html[hloc:] if css_embed: - css = b'' + css = self.render_embed_css(css_embed) hloc = html.index(b'') html = html[:hloc] + css + b'\n' + html[hloc:] if html_heads: @@ -805,6 +781,64 @@ class RequestHandler(object): html = html[:hloc] + b''.join(html_bodies) + b'\n' + html[hloc:] self.finish(html) + def render_linked_js(self, js_files): + """Default method used to render the final js links for the + rendered webpage. + + Override this method in a sub-classed controller to change the output. + """ + paths = [] + unique_paths = set() + + for path in js_files: + if not is_absolute(path): + path = self.static_url(path) + if path not in unique_paths: + paths.append(path) + unique_paths.add(path) + + return ''.join('' + for p in paths) + + def render_embed_js(self, js_embed): + """Default method used to render the final embedded js for the + rendered webpage. + + Override this method in a sub-classed controller to change the output. + """ + return b'' + + def render_linked_css(self, css_files): + """Default method used to render the final css links for the + rendered webpage. + + Override this method in a sub-classed controller to change the output. + """ + paths = [] + unique_paths = set() + + for path in css_files: + if not is_absolute(path): + path = self.static_url(path) + if path not in unique_paths: + paths.append(path) + unique_paths.add(path) + + return ''.join('' + for p in paths) + + def render_embed_css(self, css_embed): + """Default method used to render the final embedded css for the + rendered webpage. + + Override this method in a sub-classed controller to change the output. + """ + return b'' + def render_string(self, template_name, **kwargs): """Generate the given template with the given arguments. @@ -959,6 +993,9 @@ class RequestHandler(object): self._log() self._finished = True self.on_finish() + self._break_cycles() + + def _break_cycles(self): # Break up a reference cycle between this handler and the # _ui_module closures to allow for faster GC on CPython. self.ui = None @@ -1672,10 +1709,6 @@ def stream_request_body(cls): * The regular HTTP method (``post``, ``put``, etc) will be called after the entire body has been read. - There is a subtle interaction between ``data_received`` and asynchronous - ``prepare``: The first call to ``data_received`` may occur at any point - after the call to ``prepare`` has returned *or yielded*. - See the `file receiver demo `_ for example usage. """ @@ -1834,6 +1867,8 @@ class Application(ReversibleRouter): `StaticFileHandler` can be specified with the ``static_handler_class`` setting. + .. versionchanged:: 4.5 + Integration with the new `tornado.routing` module. """ def __init__(self, handlers=None, default_host=None, transforms=None, **settings): @@ -2218,6 +2253,9 @@ class RedirectHandler(RequestHandler): Use Python's :ref:`format string syntax ` to customize how values are substituted. + + .. versionchanged:: 4.5 + Added support for substitutions into the destination URL. """ def initialize(self, url, permanent=True): self._url = url @@ -3084,6 +3122,7 @@ def create_signed_value(secret, name, value, version=None, clock=None, else: raise ValueError("Unsupported version %d" % version) + # A leading version number in decimal # with no leading zeros, followed by a pipe. _signed_value_version_re = re.compile(br"^([1-9][0-9]*)\|(.*)$") @@ -3240,3 +3279,7 @@ def _create_signature_v2(secret, s): hash = hmac.new(utf8(secret), digestmod=hashlib.sha256) hash.update(utf8(s)) return utf8(hash.hexdigest()) + + +def is_absolute(path): + return any(path.startswith(x) for x in ["/", "http:", "https:"]) diff --git a/lib/tornado/websocket.py b/lib/tornado/websocket.py index 6e1220b3..69437ee4 100644 --- a/lib/tornado/websocket.py +++ b/lib/tornado/websocket.py @@ -16,7 +16,7 @@ the protocol (known as "draft 76") and are not compatible with this module. Removed support for the draft 76 protocol version. """ -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function # Author: Jacob Kristhammar, 2010 import base64 @@ -30,8 +30,8 @@ import zlib from tornado.concurrent import TracebackFuture from tornado.escape import utf8, native_str, to_unicode -from tornado import httpclient, httputil -from tornado.ioloop import IOLoop +from tornado import gen, httpclient, httputil +from tornado.ioloop import IOLoop, PeriodicCallback from tornado.iostream import StreamClosedError from tornado.log import gen_log, app_log from tornado import simple_httpclient @@ -65,6 +65,10 @@ class WebSocketHandler(tornado.web.RequestHandler): override `open` and `on_close` to handle opened and closed connections. + Custom upgrade response headers can be sent by overriding + `~tornado.web.RequestHandler.set_default_headers` or + `~tornado.web.RequestHandler.prepare`. + See http://dev.w3.org/html5/websockets/ for details on the JavaScript interface. The protocol is specified at http://tools.ietf.org/html/rfc6455. @@ -122,6 +126,17 @@ class WebSocketHandler(tornado.web.RequestHandler): to show the "accept this certificate" dialog but has nowhere to show it. You must first visit a regular HTML page using the same certificate to accept it before the websocket connection will succeed. + + If the application setting ``websocket_ping_interval`` has a non-zero + value, a ping will be sent periodically, and the connection will be + closed if a response is not received before the ``websocket_ping_timeout``. + + Messages larger than the ``websocket_max_message_size`` application setting + (default 10MiB) will not be accepted. + + .. versionchanged:: 4.5 + Added ``websocket_ping_interval``, ``websocket_ping_timeout``, and + ``websocket_max_message_size``. """ def __init__(self, application, request, **kwargs): super(WebSocketHandler, self).__init__(application, request, **kwargs) @@ -176,18 +191,42 @@ class WebSocketHandler(tornado.web.RequestHandler): gen_log.debug(log_msg) return - self.stream = self.request.connection.detach() - self.stream.set_close_callback(self.on_connection_close) - self.ws_connection = self.get_websocket_protocol() if self.ws_connection: self.ws_connection.accept_connection() else: - if not self.stream.closed(): - self.stream.write(tornado.escape.utf8( - "HTTP/1.1 426 Upgrade Required\r\n" - "Sec-WebSocket-Version: 7, 8, 13\r\n\r\n")) - self.stream.close() + self.set_status(426, "Upgrade Required") + self.set_header("Sec-WebSocket-Version", "7, 8, 13") + self.finish() + + stream = None + + @property + def ping_interval(self): + """The interval for websocket keep-alive pings. + + Set websocket_ping_interval = 0 to disable pings. + """ + return self.settings.get('websocket_ping_interval', None) + + @property + def ping_timeout(self): + """If no ping is received in this many seconds, + close the websocket connection (VPNs, etc. can fail to cleanly close ws connections). + Default is max of 3 pings or 30 seconds. + """ + return self.settings.get('websocket_ping_timeout', None) + + @property + def max_message_size(self): + """Maximum allowed message size. + + If the remote peer sends a message larger than this, the connection + will be closed. + + Default is 10MiB. + """ + return self.settings.get('websocket_max_message_size', None) def write_message(self, message, binary=False): """Sends the given message to the client of this Web Socket. @@ -231,11 +270,22 @@ class WebSocketHandler(tornado.web.RequestHandler): If this method returns None (the default), compression will be disabled. If it returns a dict (even an empty one), it will be enabled. The contents of the dict may be used to - control the memory and CPU usage of the compression, - but no such options are currently implemented. + control the following compression options: + + ``compression_level`` specifies the compression level. + + ``mem_level`` specifies the amount of memory used for the internal compression state. + + These parameters are documented in details here: + https://docs.python.org/3.6/library/zlib.html#zlib.compressobj .. versionadded:: 4.1 + + .. versionchanged:: 4.5 + + Added ``compression_level`` and ``mem_level``. """ + # TODO: Add wbits option. return None def open(self, *args, **kwargs): @@ -251,6 +301,10 @@ class WebSocketHandler(tornado.web.RequestHandler): """Handle incoming messages on the WebSocket This method must be overridden. + + .. versionchanged:: 4.5 + + ``on_message`` can be a coroutine. """ raise NotImplementedError @@ -264,6 +318,10 @@ class WebSocketHandler(tornado.web.RequestHandler): """Invoked when the response to a ping frame is received.""" pass + def on_ping(self, data): + """Invoked when the a ping frame is received.""" + pass + def on_close(self): """Invoked when the WebSocket is closed. @@ -319,7 +377,7 @@ class WebSocketHandler(tornado.web.RequestHandler): This is an important security measure; don't disable it without understanding the security implications. In - particular, if your authenticatino is cookie-based, you + particular, if your authentication is cookie-based, you must either restrict the origins allowed by ``check_origin()`` or implement your own XSRF-like protection for websocket connections. See `these @@ -376,6 +434,16 @@ class WebSocketHandler(tornado.web.RequestHandler): if not self._on_close_called: self._on_close_called = True self.on_close() + self._break_cycles() + + def _break_cycles(self): + # WebSocketHandlers call finish() early, but we don't want to + # break up reference cycles (which makes it impossible to call + # self.render_string) until after we've really closed the + # connection (if it was established in the first place, + # indicated by status code 101). + if self.get_status() != 101 or self._on_close_called: + super(WebSocketHandler, self)._break_cycles() def send_error(self, *args, **kwargs): if self.stream is None: @@ -393,18 +461,17 @@ class WebSocketHandler(tornado.web.RequestHandler): return WebSocketProtocol13( self, compression_options=self.get_compression_options()) + def _attach_stream(self): + self.stream = self.request.connection.detach() + self.stream.set_close_callback(self.on_connection_close) + # disable non-WS methods + for method in ["write", "redirect", "set_header", "set_cookie", + "set_status", "flush", "finish"]: + setattr(self, method, _raise_not_supported_for_websockets) -def _wrap_method(method): - def _disallow_for_websocket(self, *args, **kwargs): - if self.stream is None: - method(self, *args, **kwargs) - else: - raise RuntimeError("Method not supported for Web Sockets") - return _disallow_for_websocket -for method in ["write", "redirect", "set_header", "set_cookie", - "set_status", "flush", "finish"]: - setattr(WebSocketHandler, method, - _wrap_method(getattr(WebSocketHandler, method))) + +def _raise_not_supported_for_websockets(*args, **kwargs): + raise RuntimeError("Method not supported for Web Sockets") class WebSocketProtocol(object): @@ -420,14 +487,20 @@ class WebSocketProtocol(object): def _run_callback(self, callback, *args, **kwargs): """Runs the given callback with exception handling. - On error, aborts the websocket connection and returns False. + If the callback is a coroutine, returns its Future. On error, aborts the + websocket connection and returns None. """ try: - callback(*args, **kwargs) + result = callback(*args, **kwargs) except Exception: app_log.error("Uncaught exception in %s", - self.request.path, exc_info=True) + getattr(self.request, 'path', None), exc_info=True) self._abort() + else: + if result is not None: + result = gen.convert_yielded(result) + self.stream.io_loop.add_future(result, lambda f: f.result()) + return result def on_connection_close(self): self._abort() @@ -441,7 +514,7 @@ class WebSocketProtocol(object): class _PerMessageDeflateCompressor(object): - def __init__(self, persistent, max_wbits): + def __init__(self, persistent, max_wbits, compression_options=None): if max_wbits is None: max_wbits = zlib.MAX_WBITS # There is no symbolic constant for the minimum wbits value. @@ -449,14 +522,24 @@ class _PerMessageDeflateCompressor(object): raise ValueError("Invalid max_wbits value %r; allowed range 8-%d", max_wbits, zlib.MAX_WBITS) self._max_wbits = max_wbits + + if compression_options is None or 'compression_level' not in compression_options: + self._compression_level = tornado.web.GZipContentEncoding.GZIP_LEVEL + else: + self._compression_level = compression_options['compression_level'] + + if compression_options is None or 'mem_level' not in compression_options: + self._mem_level = 8 + else: + self._mem_level = compression_options['mem_level'] + if persistent: self._compressor = self._create_compressor() else: self._compressor = None def _create_compressor(self): - return zlib.compressobj(tornado.web.GZipContentEncoding.GZIP_LEVEL, - zlib.DEFLATED, -self._max_wbits) + return zlib.compressobj(self._compression_level, zlib.DEFLATED, -self._max_wbits, self._mem_level) def compress(self, data): compressor = self._compressor or self._create_compressor() @@ -467,7 +550,7 @@ class _PerMessageDeflateCompressor(object): class _PerMessageDeflateDecompressor(object): - def __init__(self, persistent, max_wbits): + def __init__(self, persistent, max_wbits, compression_options=None): if max_wbits is None: max_wbits = zlib.MAX_WBITS if not (8 <= max_wbits <= zlib.MAX_WBITS): @@ -526,6 +609,9 @@ class WebSocketProtocol13(WebSocketProtocol): # the effect of compression, frame overhead, and control frames. self._wire_bytes_in = 0 self._wire_bytes_out = 0 + self.ping_callback = None + self.last_ping = 0 + self.last_pong = 0 def accept_connection(self): try: @@ -562,46 +648,42 @@ class WebSocketProtocol13(WebSocketProtocol): self.request.headers.get("Sec-Websocket-Key")) def _accept_connection(self): - subprotocol_header = '' subprotocols = self.request.headers.get("Sec-WebSocket-Protocol", '') subprotocols = [s.strip() for s in subprotocols.split(',')] if subprotocols: selected = self.handler.select_subprotocol(subprotocols) if selected: assert selected in subprotocols - subprotocol_header = ("Sec-WebSocket-Protocol: %s\r\n" - % selected) + self.handler.set_header("Sec-WebSocket-Protocol", selected) - extension_header = '' extensions = self._parse_extensions_header(self.request.headers) for ext in extensions: if (ext[0] == 'permessage-deflate' and self._compression_options is not None): # TODO: negotiate parameters if compression_options # specifies limits. - self._create_compressors('server', ext[1]) + self._create_compressors('server', ext[1], self._compression_options) if ('client_max_window_bits' in ext[1] and ext[1]['client_max_window_bits'] is None): # Don't echo an offered client_max_window_bits # parameter with no value. del ext[1]['client_max_window_bits'] - extension_header = ('Sec-WebSocket-Extensions: %s\r\n' % - httputil._encode_header( - 'permessage-deflate', ext[1])) + self.handler.set_header("Sec-WebSocket-Extensions", + httputil._encode_header( + 'permessage-deflate', ext[1])) break - if self.stream.closed(): - self._abort() - return - self.stream.write(tornado.escape.utf8( - "HTTP/1.1 101 Switching Protocols\r\n" - "Upgrade: websocket\r\n" - "Connection: Upgrade\r\n" - "Sec-WebSocket-Accept: %s\r\n" - "%s%s" - "\r\n" % (self._challenge_response(), - subprotocol_header, extension_header))) + self.handler.clear_header("Content-Type") + self.handler.set_status(101) + self.handler.set_header("Upgrade", "websocket") + self.handler.set_header("Connection", "Upgrade") + self.handler.set_header("Sec-WebSocket-Accept", self._challenge_response()) + self.handler.finish() + self.handler._attach_stream() + self.stream = self.handler.stream + + self.start_pinging() self._run_callback(self.handler.open, *self.handler.open_args, **self.handler.open_kwargs) self._receive_frame() @@ -631,7 +713,7 @@ class WebSocketProtocol13(WebSocketProtocol): else: raise ValueError("unsupported extension %r", ext) - def _get_compressor_options(self, side, agreed_parameters): + def _get_compressor_options(self, side, agreed_parameters, compression_options=None): """Converts a websocket agreed_parameters set to keyword arguments for our compressor objects. """ @@ -642,9 +724,10 @@ class WebSocketProtocol13(WebSocketProtocol): options['max_wbits'] = zlib.MAX_WBITS else: options['max_wbits'] = int(wbits_header) + options['compression_options'] = compression_options return options - def _create_compressors(self, side, agreed_parameters): + def _create_compressors(self, side, agreed_parameters, compression_options=None): # TODO: handle invalid parameters gracefully allowed_keys = set(['server_no_context_takeover', 'client_no_context_takeover', @@ -655,9 +738,9 @@ class WebSocketProtocol13(WebSocketProtocol): raise ValueError("unsupported compression parameter %r" % key) other_side = 'client' if (side == 'server') else 'server' self._compressor = _PerMessageDeflateCompressor( - **self._get_compressor_options(side, agreed_parameters)) + **self._get_compressor_options(side, agreed_parameters, compression_options)) self._decompressor = _PerMessageDeflateDecompressor( - **self._get_compressor_options(other_side, agreed_parameters)) + **self._get_compressor_options(other_side, agreed_parameters, compression_options)) def _write_frame(self, fin, opcode, data, flags=0): if fin: @@ -738,8 +821,7 @@ class WebSocketProtocol13(WebSocketProtocol): if self._masked_frame: self.stream.read_bytes(4, self._on_masking_key) else: - self.stream.read_bytes(self._frame_length, - self._on_frame_data) + self._read_frame_data(False) elif payloadlen == 126: self.stream.read_bytes(2, self._on_frame_length_16) elif payloadlen == 127: @@ -747,6 +829,17 @@ class WebSocketProtocol13(WebSocketProtocol): except StreamClosedError: self._abort() + def _read_frame_data(self, masked): + new_len = self._frame_length + if self._fragmented_message_buffer is not None: + new_len += len(self._fragmented_message_buffer) + if new_len > (self.handler.max_message_size or 10 * 1024 * 1024): + self.close(1009, "message too big") + return + self.stream.read_bytes( + self._frame_length, + self._on_masked_frame_data if masked else self._on_frame_data) + def _on_frame_length_16(self, data): self._wire_bytes_in += len(data) self._frame_length = struct.unpack("!H", data)[0] @@ -754,7 +847,7 @@ class WebSocketProtocol13(WebSocketProtocol): if self._masked_frame: self.stream.read_bytes(4, self._on_masking_key) else: - self.stream.read_bytes(self._frame_length, self._on_frame_data) + self._read_frame_data(False) except StreamClosedError: self._abort() @@ -765,7 +858,7 @@ class WebSocketProtocol13(WebSocketProtocol): if self._masked_frame: self.stream.read_bytes(4, self._on_masking_key) else: - self.stream.read_bytes(self._frame_length, self._on_frame_data) + self._read_frame_data(False) except StreamClosedError: self._abort() @@ -773,8 +866,7 @@ class WebSocketProtocol13(WebSocketProtocol): self._wire_bytes_in += len(data) self._frame_mask = data try: - self.stream.read_bytes(self._frame_length, - self._on_masked_frame_data) + self._read_frame_data(True) except StreamClosedError: self._abort() @@ -783,6 +875,8 @@ class WebSocketProtocol13(WebSocketProtocol): self._on_frame_data(_websocket_mask(self._frame_mask, data)) def _on_frame_data(self, data): + handled_future = None + self._wire_bytes_in += len(data) if self._frame_opcode_is_control: # control frames may be interleaved with a series of fragmented @@ -815,12 +909,18 @@ class WebSocketProtocol13(WebSocketProtocol): self._fragmented_message_buffer = data if self._final_frame: - self._handle_message(opcode, data) + handled_future = self._handle_message(opcode, data) if not self.client_terminated: - self._receive_frame() + if handled_future: + # on_message is a coroutine, process more frames once it's done. + handled_future.add_done_callback( + lambda future: self._receive_frame()) + else: + self._receive_frame() def _handle_message(self, opcode, data): + """Execute on_message, returning its Future if it is a coroutine.""" if self.client_terminated: return @@ -835,11 +935,11 @@ class WebSocketProtocol13(WebSocketProtocol): except UnicodeDecodeError: self._abort() return - self._run_callback(self.handler.on_message, decoded) + return self._run_callback(self.handler.on_message, decoded) elif opcode == 0x2: # Binary data self._message_bytes_in += len(data) - self._run_callback(self.handler.on_message, data) + return self._run_callback(self.handler.on_message, data) elif opcode == 0x8: # Close self.client_terminated = True @@ -852,9 +952,11 @@ class WebSocketProtocol13(WebSocketProtocol): elif opcode == 0x9: # Ping self._write_frame(True, 0xA, data) + self._run_callback(self.handler.on_ping, data) elif opcode == 0xA: # Pong - self._run_callback(self.handler.on_pong, data) + self.last_pong = IOLoop.current().time() + return self._run_callback(self.handler.on_pong, data) else: self._abort() @@ -883,6 +985,51 @@ class WebSocketProtocol13(WebSocketProtocol): self._waiting = self.stream.io_loop.add_timeout( self.stream.io_loop.time() + 5, self._abort) + @property + def ping_interval(self): + interval = self.handler.ping_interval + if interval is not None: + return interval + return 0 + + @property + def ping_timeout(self): + timeout = self.handler.ping_timeout + if timeout is not None: + return timeout + return max(3 * self.ping_interval, 30) + + def start_pinging(self): + """Start sending periodic pings to keep the connection alive""" + if self.ping_interval > 0: + self.last_ping = self.last_pong = IOLoop.current().time() + self.ping_callback = PeriodicCallback( + self.periodic_ping, self.ping_interval * 1000) + self.ping_callback.start() + + def periodic_ping(self): + """Send a ping to keep the websocket alive + + Called periodically if the websocket_ping_interval is set and non-zero. + """ + if self.stream.closed() and self.ping_callback is not None: + self.ping_callback.stop() + return + + # Check for timeout on pong. Make sure that we really have + # sent a recent ping in case the machine with both server and + # client has been suspended since the last ping. + now = IOLoop.current().time() + since_last_pong = now - self.last_pong + since_last_ping = now - self.last_ping + if (since_last_ping < 2 * self.ping_interval and + since_last_pong > self.ping_timeout): + self.close() + return + + self.write_ping(b'') + self.last_ping = now + class WebSocketClientConnection(simple_httpclient._HTTPConnection): """WebSocket client connection. @@ -891,7 +1038,8 @@ class WebSocketClientConnection(simple_httpclient._HTTPConnection): `websocket_connect` function instead. """ def __init__(self, io_loop, request, on_message_callback=None, - compression_options=None): + compression_options=None, ping_interval=None, ping_timeout=None, + max_message_size=None): self.compression_options = compression_options self.connect_future = TracebackFuture() self.protocol = None @@ -900,6 +1048,9 @@ class WebSocketClientConnection(simple_httpclient._HTTPConnection): self.key = base64.b64encode(os.urandom(16)) self._on_message_callback = on_message_callback self.close_code = self.close_reason = None + self.ping_interval = ping_interval + self.ping_timeout = ping_timeout + self.max_message_size = max_message_size scheme, sep, rest = request.url.partition(':') scheme = {'ws': 'http', 'wss': 'https'}[scheme] @@ -963,6 +1114,7 @@ class WebSocketClientConnection(simple_httpclient._HTTPConnection): self.headers = headers self.protocol = self.get_websocket_protocol() self.protocol._process_server_headers(self.key, self.headers) + self.protocol.start_pinging() self.protocol._receive_frame() if self._timeout is not None: @@ -1016,13 +1168,18 @@ class WebSocketClientConnection(simple_httpclient._HTTPConnection): def on_pong(self, data): pass + def on_ping(self, data): + pass + def get_websocket_protocol(self): return WebSocketProtocol13(self, mask_outgoing=True, compression_options=self.compression_options) def websocket_connect(url, io_loop=None, callback=None, connect_timeout=None, - on_message_callback=None, compression_options=None): + on_message_callback=None, compression_options=None, + ping_interval=None, ping_timeout=None, + max_message_size=None): """Client-side websocket support. Takes a url and returns a Future whose result is a @@ -1051,6 +1208,10 @@ def websocket_connect(url, io_loop=None, callback=None, connect_timeout=None, .. versionchanged:: 4.1 Added ``compression_options`` and ``on_message_callback``. The ``io_loop`` argument is deprecated. + + .. versionchanged:: 4.5 + Added the ``ping_interval``, ``ping_timeout``, and ``max_message_size`` + arguments, which have the same meaning as in `WebSocketHandler`. """ if io_loop is None: io_loop = IOLoop.current() @@ -1066,7 +1227,10 @@ def websocket_connect(url, io_loop=None, callback=None, connect_timeout=None, request, httpclient.HTTPRequest._DEFAULTS) conn = WebSocketClientConnection(io_loop, request, on_message_callback=on_message_callback, - compression_options=compression_options) + compression_options=compression_options, + ping_interval=ping_interval, + ping_timeout=ping_timeout, + max_message_size=max_message_size) if callback is not None: io_loop.add_future(conn.connect_future, callback) return conn.connect_future diff --git a/lib/tornado/wsgi.py b/lib/tornado/wsgi.py index e9ead300..68a7615a 100644 --- a/lib/tornado/wsgi.py +++ b/lib/tornado/wsgi.py @@ -29,7 +29,7 @@ provides WSGI support in two ways: and Tornado handlers in a single server. """ -from __future__ import absolute_import, division, print_function, with_statement +from __future__ import absolute_import, division, print_function import sys from io import BytesIO