Update Tornado Web Server 4.5.dev1 (92f29b8) to 4.5.1 (79b2683)

This commit is contained in:
JackDandy 2017-08-07 17:00:36 +01:00
parent 884c5b91b4
commit ecdcb83e44
44 changed files with 764 additions and 353 deletions

View file

@ -24,7 +24,7 @@
* Update SimpleJSON library 3.8.1 (6022794) to 3.10.0 (c52efea)
* Update Six compatibility library 1.10.0 (r405) to 1.10.0 (r433)
* Update socks from SocksiPy 1.0 to PySocks 1.6.5 (b4323df)
* Update Tornado Web Server 4.5.dev1 (92f29b8) to 4.5.dev1 (38e493e)
* Update Tornado Web Server 4.5.dev1 (92f29b8) to 4.5.1 (79b2683)
* Update unidecode library 0.04.18 to 0.04.20 (1e18d98)
* Update xmltodict library 0.9.2 (eac0031) to 0.10.2 (375d3a6)
* Update Bootstrap 3.2.0 to 3.3.7
@ -87,6 +87,7 @@
* Change restart/shutdown to use updated jQuery
* Remove AlphaReign torrent provider
* Update cachecontrol library 0.11.5 to 0.11.7 (3b3b776)
* Update Tornado Web Server 4.5.dev1 (92f29b8) to 4.5.dev1 (38e493e)
### 0.12.26 (2017-08-20 13:05:00 UTC)

View file

@ -16,7 +16,7 @@
"""The Tornado web server and tools."""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
# version is a human-readable version number.
@ -25,5 +25,5 @@ from __future__ import absolute_import, division, print_function, with_statement
# is zero for an official release, positive for a development branch,
# or negative for a release candidate or beta (after the base version
# number has been incremented)
version = "4.5.dev1"
version_info = (4, 5, 0, -100)
version = "4.5.1"
version_info = (4, 5, 1, 0)

View file

@ -17,7 +17,7 @@
"""Data used by the tornado.locale module."""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
LOCALE_NAMES = {
"af_ZA": {"name_en": u"Afrikaans", "name": u"Afrikaans"},

View file

@ -65,7 +65,7 @@ Example usage for Google OAuth:
errors are more consistently reported through the ``Future`` interfaces.
"""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import base64
import binascii
@ -954,6 +954,20 @@ class FacebookGraphMixin(OAuth2Mixin):
.. testoutput::
:hide:
This method returns a dictionary which may contain the following fields:
* ``access_token``, a string which may be passed to `facebook_request`
* ``session_expires``, an integer encoded as a string representing
the time until the access token expires in seconds. This field should
be used like ``int(user['session_expires'])``; in a future version of
Tornado it will change from a string to an integer.
* ``id``, ``name``, ``first_name``, ``last_name``, ``locale``, ``picture``,
``link``, plus any fields named in the ``extra_fields`` argument. These
fields are copied from the Facebook graph API `user object <https://developers.facebook.com/docs/graph-api/reference/user>`_
.. versionchanged:: 4.5
The ``session_expires`` field was updated to support changes made to the
Facebook API in March 2017.
"""
http = self.get_auth_http_client()
args = {
@ -978,10 +992,10 @@ class FacebookGraphMixin(OAuth2Mixin):
future.set_exception(AuthError('Facebook auth error: %s' % str(response)))
return
args = urlparse.parse_qs(escape.native_str(response.body))
args = escape.json_decode(response.body)
session = {
"access_token": args["access_token"][-1],
"expires": args.get("expires")
"access_token": args.get("access_token"),
"expires_in": args.get("expires_in")
}
self.facebook_request(
@ -1004,7 +1018,12 @@ class FacebookGraphMixin(OAuth2Mixin):
for field in fields:
fieldmap[field] = user.get(field)
fieldmap.update({"access_token": session["access_token"], "session_expires": session.get("expires")})
# session_expires is converted to str for compatibility with
# older versions in which the server used url-encoding and
# this code simply returned the string verbatim.
# This should change in Tornado 5.0.
fieldmap.update({"access_token": session["access_token"],
"session_expires": str(session.get("expires_in"))})
future.set_result(fieldmap)
@_auth_return_future

View file

@ -45,7 +45,7 @@ incorrectly.
"""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import os
import sys
@ -103,10 +103,6 @@ except ImportError:
# os.execv is broken on Windows and can't properly parse command line
# arguments and executable name if they contain whitespaces. subprocess
# fixes that behavior.
# This distinction is also important because when we use execv, we want to
# close the IOLoop and all its file descriptors, to guard against any
# file descriptors that were not set CLOEXEC. When execv is not available,
# we must not close the IOLoop because we want the process to exit cleanly.
_has_execv = sys.platform != 'win32'
_watched_files = set()
@ -127,8 +123,6 @@ def start(io_loop=None, check_time=500):
_io_loops[io_loop] = True
if len(_io_loops) > 1:
gen_log.warning("tornado.autoreload started more than once in the same process")
if _has_execv:
add_reload_hook(functools.partial(io_loop.close, all_fds=True))
modify_times = {}
callback = functools.partial(_reload_on_update, modify_times)
scheduler = ioloop.PeriodicCallback(callback, check_time, io_loop=io_loop)
@ -249,6 +243,7 @@ def _reload():
# unwind, so just exit uncleanly.
os._exit(0)
_USAGE = """\
Usage:
python -m tornado.autoreload -m module.to.run [args...]

View file

@ -21,7 +21,7 @@ a mostly-compatible `Future` class designed for use from coroutines,
as well as some utility functions for interacting with the
`concurrent.futures` package.
"""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import functools
import platform
@ -234,7 +234,10 @@ class Future(object):
if self._result is not None:
return self._result
if self._exc_info is not None:
try:
raise_exc_info(self._exc_info)
finally:
self = None
self._check_done()
return self._result
@ -340,6 +343,7 @@ class Future(object):
app_log.error('Future %r exception was never retrieved: %s',
self, ''.join(tb).rstrip())
TracebackFuture = Future
if futures is None:
@ -364,6 +368,7 @@ class DummyExecutor(object):
def shutdown(self, wait=True):
pass
dummy_executor = DummyExecutor()

View file

@ -16,7 +16,7 @@
"""Non-blocking HTTP client implementation using pycurl."""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import collections
import functools
@ -278,9 +278,9 @@ class CurlAsyncHTTPClient(AsyncHTTPClient):
if curl_log.isEnabledFor(logging.DEBUG):
curl.setopt(pycurl.VERBOSE, 1)
curl.setopt(pycurl.DEBUGFUNCTION, self._curl_debug)
if hasattr(pycurl,'PROTOCOLS'): # PROTOCOLS first appeared in pycurl 7.19.5 (2014-07-12)
curl.setopt(pycurl.PROTOCOLS, pycurl.PROTO_HTTP|pycurl.PROTO_HTTPS)
curl.setopt(pycurl.REDIR_PROTOCOLS, pycurl.PROTO_HTTP|pycurl.PROTO_HTTPS)
if hasattr(pycurl, 'PROTOCOLS'): # PROTOCOLS first appeared in pycurl 7.19.5 (2014-07-12)
curl.setopt(pycurl.PROTOCOLS, pycurl.PROTO_HTTP | pycurl.PROTO_HTTPS)
curl.setopt(pycurl.REDIR_PROTOCOLS, pycurl.PROTO_HTTP | pycurl.PROTO_HTTPS)
return curl
def _curl_setup_request(self, curl, request, buffer, headers):

View file

@ -20,7 +20,7 @@ Also includes a few other miscellaneous string manipulation functions that
have crept in over time.
"""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import json
import re
@ -199,6 +199,7 @@ def utf8(value):
)
return value.encode("utf-8")
_TO_UNICODE_TYPES = (unicode_type, type(None))
@ -216,6 +217,7 @@ def to_unicode(value):
)
return value.decode("utf-8")
# to_unicode was previously named _unicode not because it was private,
# but to avoid conflicts with the built-in unicode() function/type
_unicode = to_unicode
@ -264,6 +266,7 @@ def recursive_unicode(obj):
else:
return obj
# I originally used the regex from
# http://daringfireball.net/2010/07/improved_regex_for_matching_urls
# but it gets all exponential on certain patterns (such as too many trailing
@ -391,4 +394,5 @@ def _build_unicode_map():
unicode_map[name] = unichr(value)
return unicode_map
_HTML_UNICODE_MAP = _build_unicode_map()

View file

@ -74,7 +74,7 @@ See the `convert_yielded` function to extend this mechanism.
via ``singledispatch``.
"""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import collections
import functools
@ -245,6 +245,7 @@ def coroutine(func, replace_callback=True):
"""
return _make_coroutine_wrapper(func, replace_callback=True)
# Ties lifetime of runners to their result futures. Github Issue #1769
# Generators, like any object in Python, must be strong referenced
# in order to not be cleaned up by the garbage collector. When using
@ -264,6 +265,7 @@ def coroutine(func, replace_callback=True):
# Runner alive.
_futures_to_runners = weakref.WeakKeyDictionary()
def _make_coroutine_wrapper(func, replace_callback):
"""The inner workings of ``@gen.coroutine`` and ``@gen.engine``.
@ -315,6 +317,7 @@ def _make_coroutine_wrapper(func, replace_callback):
future.set_exc_info(sys.exc_info())
else:
_futures_to_runners[future] = Runner(result, future, yielded)
yielded = None
try:
return future
finally:
@ -338,6 +341,8 @@ def _make_coroutine_wrapper(func, replace_callback):
def is_coroutine_function(func):
"""Return whether *func* is a coroutine function, i.e. a function
wrapped with `~.gen.coroutine`.
.. versionadded:: 4.5
"""
return getattr(func, '__tornado_coroutine__', False)
@ -715,6 +720,7 @@ def multi(children, quiet_exceptions=()):
else:
return multi_future(children, quiet_exceptions=quiet_exceptions)
Multi = multi
@ -960,6 +966,9 @@ coroutines that are likely to yield Futures that are ready instantly.
Usage: ``yield gen.moment``
.. versionadded:: 4.0
.. deprecated:: 4.5
``yield None`` is now equivalent to ``yield gen.moment``.
"""
moment.set_result(None)
@ -990,6 +999,7 @@ class Runner(object):
# of the coroutine.
self.stack_context_deactivate = None
if self.handle_yield(first_yielded):
gen = result_future = first_yielded = None
self.run()
def register_callback(self, key):
@ -1046,9 +1056,14 @@ class Runner(object):
except Exception:
self.had_exception = True
exc_info = sys.exc_info()
future = None
if exc_info is not None:
try:
yielded = self.gen.throw(*exc_info)
finally:
# Break up a reference to itself
# for faster GC on CPython.
exc_info = None
else:
yielded = self.gen.send(value)
@ -1082,6 +1097,7 @@ class Runner(object):
return
if not self.handle_yield(yielded):
return
yielded = None
finally:
self.running = False
@ -1130,8 +1146,12 @@ class Runner(object):
self.future.set_exc_info(sys.exc_info())
if not self.future.done() or self.future is moment:
def inner(f):
# Break a reference cycle to speed GC.
f = None # noqa
self.run()
self.io_loop.add_future(
self.future, lambda f: self.run())
self.future, inner)
return False
return True
@ -1153,6 +1173,7 @@ class Runner(object):
self.stack_context_deactivate()
self.stack_context_deactivate = None
Arguments = collections.namedtuple('Arguments', ['args', 'kwargs'])
@ -1172,6 +1193,7 @@ def _argument_adapter(callback):
callback(None)
return wrapper
# Convert Awaitables into Futures. It is unfortunately possible
# to have infinite recursion here if those Awaitables assume that
# we're using a different coroutine runner and yield objects
@ -1249,7 +1271,9 @@ def convert_yielded(yielded):
.. versionadded:: 4.1
"""
# Lists and dicts containing YieldPoints were handled earlier.
if isinstance(yielded, (list, dict)):
if yielded is None:
return moment
elif isinstance(yielded, (list, dict)):
return multi(yielded)
elif is_future(yielded):
return yielded
@ -1258,6 +1282,7 @@ def convert_yielded(yielded):
else:
raise BadYieldError("yielded unknown object %r" % (yielded,))
if singledispatch is not None:
convert_yielded = singledispatch(convert_yielded)

View file

@ -19,7 +19,7 @@
.. versionadded:: 4.0
"""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import re
@ -257,6 +257,7 @@ class HTTP1Connection(httputil.HTTPConnection):
if need_delegate_close:
with _ExceptionLoggingContext(app_log):
delegate.on_connection_close()
header_future = None
self._clear_callbacks()
raise gen.Return(True)
@ -489,7 +490,7 @@ class HTTP1Connection(httputil.HTTPConnection):
elif ("Content-Length" in headers or
headers.get("Transfer-Encoding", "").lower() == "chunked" or
getattr(start_line, 'method', None) in ("HEAD", "GET")):
# start_line may be a request or reponse start line; only
# start_line may be a request or response start line; only
# the former has a method attribute.
return connection_header == "keep-alive"
return False

View file

@ -38,7 +38,7 @@ To select ``curl_httpclient``, call `AsyncHTTPClient.configure` at startup::
AsyncHTTPClient.configure("tornado.curl_httpclient.CurlAsyncHTTPClient")
"""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import functools
import time
@ -423,6 +423,9 @@ class HTTPRequest(object):
.. versionadded:: 4.2
The ``ssl_options`` argument.
.. versionadded:: 4.5
The ``proxy_auth_mode`` argument.
"""
# Note that some of these attributes go through property setters
# defined below.
@ -670,5 +673,6 @@ def main():
print(native_str(response.body))
client.close()
if __name__ == "__main__":
main()

View file

@ -26,7 +26,7 @@ class except to start a server at the beginning of the process
to `tornado.httputil.HTTPServerRequest`. The old name remains as an alias.
"""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import socket
@ -62,6 +62,13 @@ class HTTPServer(TCPServer, Configurable,
if Tornado is run behind an SSL-decoding proxy that does not set one of
the supported ``xheaders``.
By default, when parsing the ``X-Forwarded-For`` header, Tornado will
select the last (i.e., the closest) address on the list of hosts as the
remote host IP address. To select the next server in the chain, a list of
trusted downstream hosts may be passed as the ``trusted_downstream``
argument. These hosts will be skipped when parsing the ``X-Forwarded-For``
header.
To make this server serve SSL traffic, send the ``ssl_options`` keyword
argument with an `ssl.SSLContext` object. For compatibility with older
versions of Python ``ssl_options`` may also be a dictionary of keyword
@ -124,6 +131,9 @@ class HTTPServer(TCPServer, Configurable,
.. versionchanged:: 4.2
`HTTPServer` is now a subclass of `tornado.util.Configurable`.
.. versionchanged:: 4.5
Added the ``trusted_downstream`` argument.
"""
def __init__(self, *args, **kwargs):
# Ignore args to __init__; real initialization belongs in
@ -138,7 +148,8 @@ class HTTPServer(TCPServer, Configurable,
decompress_request=False,
chunk_size=None, max_header_size=None,
idle_connection_timeout=None, body_timeout=None,
max_body_size=None, max_buffer_size=None):
max_body_size=None, max_buffer_size=None,
trusted_downstream=None):
self.request_callback = request_callback
self.no_keep_alive = no_keep_alive
self.xheaders = xheaders
@ -149,11 +160,13 @@ class HTTPServer(TCPServer, Configurable,
max_header_size=max_header_size,
header_timeout=idle_connection_timeout or 3600,
max_body_size=max_body_size,
body_timeout=body_timeout)
body_timeout=body_timeout,
no_keep_alive=no_keep_alive)
TCPServer.__init__(self, io_loop=io_loop, ssl_options=ssl_options,
max_buffer_size=max_buffer_size,
read_chunk_size=chunk_size)
self._connections = set()
self.trusted_downstream = trusted_downstream
@classmethod
def configurable_base(cls):
@ -172,7 +185,8 @@ class HTTPServer(TCPServer, Configurable,
def handle_stream(self, stream, address):
context = _HTTPRequestContext(stream, address,
self.protocol)
self.protocol,
self.trusted_downstream)
conn = HTTP1ServerConnection(
stream, self.conn_params, context)
self._connections.add(conn)
@ -219,7 +233,7 @@ class _CallableAdapter(httputil.HTTPMessageDelegate):
class _HTTPRequestContext(object):
def __init__(self, stream, address, protocol):
def __init__(self, stream, address, protocol, trusted_downstream=None):
self.address = address
# Save the socket's address family now so we know how to
# interpret self.address even after the stream is closed
@ -243,6 +257,7 @@ class _HTTPRequestContext(object):
self.protocol = "http"
self._orig_remote_ip = self.remote_ip
self._orig_protocol = self.protocol
self.trusted_downstream = set(trusted_downstream or [])
def __str__(self):
if self.address_family in (socket.AF_INET, socket.AF_INET6):
@ -259,7 +274,10 @@ class _HTTPRequestContext(object):
"""Rewrite the ``remote_ip`` and ``protocol`` fields."""
# Squid uses X-Forwarded-For, others use X-Real-Ip
ip = headers.get("X-Forwarded-For", self.remote_ip)
ip = ip.split(',')[-1].strip()
# Skip trusted downstream hosts in X-Forwarded-For list
for ip in (cand.strip() for cand in reversed(ip.split(','))):
if ip not in self.trusted_downstream:
break
ip = headers.get("X-Real-Ip", ip)
if netutil.is_valid_ip(ip):
self.remote_ip = ip
@ -303,4 +321,5 @@ class _ProxyAdapter(httputil.HTTPMessageDelegate):
def _cleanup(self):
self.connection.context._unapply_xheaders()
HTTPRequest = httputil.HTTPServerRequest

View file

@ -20,7 +20,7 @@ This module also defines the `HTTPServerRequest` class which is exposed
via `tornado.web.RequestHandler.request`.
"""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import calendar
import collections
@ -38,11 +38,12 @@ from tornado.util import ObjectDict, PY3
if PY3:
import http.cookies as Cookie
from http.client import responses
from urllib.parse import urlencode
from urllib.parse import urlencode, urlparse, urlunparse, parse_qsl
else:
import Cookie
from httplib import responses
from urllib import urlencode
from urlparse import urlparse, urlunparse, parse_qsl
# responses is unused in this file, but we re-export it to other files.
@ -98,6 +99,7 @@ class _NormalizedHeaderCache(dict):
del self[old_key]
return normalized
_normalized_headers = _NormalizedHeaderCache(1000)
@ -601,11 +603,28 @@ def url_concat(url, args):
>>> url_concat("http://example.com/foo?a=b", [("c", "d"), ("c", "d2")])
'http://example.com/foo?a=b&c=d&c=d2'
"""
if not args:
if args is None:
return url
parsed_url = urlparse(url)
if isinstance(args, dict):
parsed_query = parse_qsl(parsed_url.query, keep_blank_values=True)
parsed_query.extend(args.items())
elif isinstance(args, list) or isinstance(args, tuple):
parsed_query = parse_qsl(parsed_url.query, keep_blank_values=True)
parsed_query.extend(args)
else:
err = "'args' parameter should be dict, list or tuple. Not {0}".format(
type(args))
raise TypeError(err)
final_query = urlencode(parsed_query)
url = urlunparse((
parsed_url[0],
parsed_url[1],
parsed_url[2],
parsed_url[3],
final_query,
parsed_url[5]))
return url
if url[-1] not in ('?', '&'):
url += '&' if ('?' in url) else '?'
return url + urlencode(args)
class HTTPFile(ObjectDict):
@ -920,10 +939,12 @@ def split_host_and_port(netloc):
port = None
return (host, port)
_OctalPatt = re.compile(r"\\[0-3][0-7][0-7]")
_QuotePatt = re.compile(r"[\\].")
_nulljoin = ''.join
def _unquote_cookie(str):
"""Handle double quotes and escaping in cookie values.
@ -965,11 +986,11 @@ def _unquote_cookie(str):
k = q_match.start(0)
if q_match and (not o_match or k < j): # QuotePatt matched
res.append(str[i:k])
res.append(str[k+1])
res.append(str[k + 1])
i = k + 2
else: # OctalPatt matched
res.append(str[i:j])
res.append(chr(int(str[j+1:j+4], 8)))
res.append(chr(int(str[j + 1:j + 4], 8)))
i = j + 4
return _nulljoin(res)

View file

@ -26,7 +26,7 @@ In addition to I/O events, the `IOLoop` can also schedule time-based events.
`IOLoop.add_timeout` is a non-blocking alternative to `time.sleep`.
"""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import collections
import datetime
@ -715,7 +715,7 @@ class PollIOLoop(IOLoop):
self._closing = True
self.remove_handler(self._waker.fileno())
if all_fds:
for fd, handler in self._handlers.values():
for fd, handler in list(self._handlers.values()):
self.close_fd(fd)
self._waker.close()
self._impl.close()

View file

@ -24,7 +24,7 @@ Contents:
* `PipeIOStream`: Pipe-based IOStream implementation.
"""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import collections
import errno
@ -82,6 +82,8 @@ _ERRNO_INPROGRESS = (errno.EINPROGRESS,)
if hasattr(errno, "WSAEINPROGRESS"):
_ERRNO_INPROGRESS += (errno.WSAEINPROGRESS,) # type: ignore
_WINDOWS = sys.platform.startswith('win')
class StreamClosedError(IOError):
"""Exception raised by `IOStream` methods when the stream is closed.
@ -158,11 +160,16 @@ class BaseIOStream(object):
self.max_buffer_size // 2)
self.max_write_buffer_size = max_write_buffer_size
self.error = None
self._read_buffer = collections.deque()
self._write_buffer = collections.deque()
self._read_buffer = bytearray()
self._read_buffer_pos = 0
self._read_buffer_size = 0
self._write_buffer = bytearray()
self._write_buffer_pos = 0
self._write_buffer_size = 0
self._write_buffer_frozen = False
self._total_write_index = 0
self._total_write_done_index = 0
self._pending_writes_while_frozen = []
self._read_delimiter = None
self._read_regex = None
self._read_max_bytes = None
@ -173,7 +180,7 @@ class BaseIOStream(object):
self._read_future = None
self._streaming_callback = None
self._write_callback = None
self._write_future = None
self._write_futures = collections.deque()
self._close_callback = None
self._connect_callback = None
self._connect_future = None
@ -367,36 +374,37 @@ class BaseIOStream(object):
If no ``callback`` is given, this method returns a `.Future` that
resolves (with a result of ``None``) when the write has been
completed. If `write` is called again before that `.Future` has
resolved, the previous future will be orphaned and will never resolve.
completed.
The ``data`` argument may be of type `bytes` or `memoryview`.
.. versionchanged:: 4.0
Now returns a `.Future` if no callback is given.
.. versionchanged:: 4.5
Added support for `memoryview` arguments.
"""
assert isinstance(data, bytes)
self._check_closed()
# We use bool(_write_buffer) as a proxy for write_buffer_size>0,
# so never put empty strings in the buffer.
if data:
if (self.max_write_buffer_size is not None and
self._write_buffer_size + len(data) > self.max_write_buffer_size):
raise StreamBufferFullError("Reached maximum write buffer size")
# Break up large contiguous strings before inserting them in the
# write buffer, so we don't have to recopy the entire thing
# as we slice off pieces to send to the socket.
WRITE_BUFFER_CHUNK_SIZE = 128 * 1024
for i in range(0, len(data), WRITE_BUFFER_CHUNK_SIZE):
self._write_buffer.append(data[i:i + WRITE_BUFFER_CHUNK_SIZE])
if self._write_buffer_frozen:
self._pending_writes_while_frozen.append(data)
else:
self._write_buffer += data
self._write_buffer_size += len(data)
self._total_write_index += len(data)
if callback is not None:
self._write_callback = stack_context.wrap(callback)
future = None
else:
future = self._write_future = TracebackFuture()
future = TracebackFuture()
future.add_done_callback(lambda f: f.exception())
self._write_futures.append((self._total_write_index, future))
if not self._connecting:
self._handle_write()
if self._write_buffer:
if self._write_buffer_size:
self._add_io_state(self.io_loop.WRITE)
self._maybe_add_error_listener()
return future
@ -445,9 +453,8 @@ class BaseIOStream(object):
if self._read_future is not None:
futures.append(self._read_future)
self._read_future = None
if self._write_future is not None:
futures.append(self._write_future)
self._write_future = None
futures += [future for _, future in self._write_futures]
self._write_futures.clear()
if self._connect_future is not None:
futures.append(self._connect_future)
self._connect_future = None
@ -466,6 +473,7 @@ class BaseIOStream(object):
# if the IOStream object is kept alive by a reference cycle.
# TODO: Clear the read buffer too; it currently breaks some tests.
self._write_buffer = None
self._write_buffer_size = 0
def reading(self):
"""Returns true if we are currently reading from the stream."""
@ -473,7 +481,7 @@ class BaseIOStream(object):
def writing(self):
"""Returns true if we are currently writing to the stream."""
return bool(self._write_buffer)
return self._write_buffer_size > 0
def closed(self):
"""Returns true if the stream has been closed."""
@ -650,7 +658,7 @@ class BaseIOStream(object):
except Exception as e:
if 1 != e.errno:
gen_log.warning("error on read: %s" % e)
self.close(exc_info=True)
self.close(exc_info=e)
return
if pos is not None:
self._read_from_buffer(pos)
@ -744,7 +752,7 @@ class BaseIOStream(object):
break
if chunk is None:
return 0
self._read_buffer.append(chunk)
self._read_buffer += chunk
self._read_buffer_size += len(chunk)
if self._read_buffer_size > self.max_buffer_size:
gen_log.error("Reached maximum read buffer size")
@ -792,30 +800,25 @@ class BaseIOStream(object):
# since large merges are relatively expensive and get undone in
# _consume().
if self._read_buffer:
while True:
loc = self._read_buffer[0].find(self._read_delimiter)
loc = self._read_buffer.find(self._read_delimiter,
self._read_buffer_pos)
if loc != -1:
loc -= self._read_buffer_pos
delimiter_len = len(self._read_delimiter)
self._check_max_bytes(self._read_delimiter,
loc + delimiter_len)
return loc + delimiter_len
if len(self._read_buffer) == 1:
break
_double_prefix(self._read_buffer)
self._check_max_bytes(self._read_delimiter,
len(self._read_buffer[0]))
self._read_buffer_size)
elif self._read_regex is not None:
if self._read_buffer:
while True:
m = self._read_regex.search(self._read_buffer[0])
m = self._read_regex.search(self._read_buffer,
self._read_buffer_pos)
if m is not None:
self._check_max_bytes(self._read_regex, m.end())
return m.end()
if len(self._read_buffer) == 1:
break
_double_prefix(self._read_buffer)
self._check_max_bytes(self._read_regex,
len(self._read_buffer[0]))
loc = m.end() - self._read_buffer_pos
self._check_max_bytes(self._read_regex, loc)
return loc
self._check_max_bytes(self._read_regex, self._read_buffer_size)
return None
def _check_max_bytes(self, delimiter, size):
@ -825,35 +828,56 @@ class BaseIOStream(object):
"delimiter %r not found within %d bytes" % (
delimiter, self._read_max_bytes))
def _freeze_write_buffer(self, size):
self._write_buffer_frozen = size
def _unfreeze_write_buffer(self):
self._write_buffer_frozen = False
self._write_buffer += b''.join(self._pending_writes_while_frozen)
self._write_buffer_size += sum(map(len, self._pending_writes_while_frozen))
self._pending_writes_while_frozen[:] = []
def _got_empty_write(self, size):
"""
Called when a non-blocking write() failed writing anything.
Can be overridden in subclasses.
"""
def _handle_write(self):
while self._write_buffer:
while self._write_buffer_size:
assert self._write_buffer_size >= 0
try:
if not self._write_buffer_frozen:
start = self._write_buffer_pos
if self._write_buffer_frozen:
size = self._write_buffer_frozen
elif _WINDOWS:
# On windows, socket.send blows up if given a
# write buffer that's too large, instead of just
# returning the number of bytes it was able to
# process. Therefore we must not call socket.send
# with more than 128KB at a time.
_merge_prefix(self._write_buffer, 128 * 1024)
num_bytes = self.write_to_fd(self._write_buffer[0])
size = 128 * 1024
else:
size = self._write_buffer_size
num_bytes = self.write_to_fd(
memoryview(self._write_buffer)[start:start + size])
if num_bytes == 0:
# With OpenSSL, if we couldn't write the entire buffer,
# the very same string object must be used on the
# next call to send. Therefore we suppress
# merging the write buffer after an incomplete send.
# A cleaner solution would be to set
# SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER, but this is
# not yet accessible from python
# (http://bugs.python.org/issue8240)
self._write_buffer_frozen = True
self._got_empty_write(size)
break
self._write_buffer_frozen = False
_merge_prefix(self._write_buffer, num_bytes)
self._write_buffer.popleft()
self._write_buffer_pos += num_bytes
self._write_buffer_size -= num_bytes
# Amortized O(1) shrink
# (this heuristic is implemented natively in Python 3.4+
# but is replicated here for Python 2)
if self._write_buffer_pos > self._write_buffer_size:
del self._write_buffer[:self._write_buffer_pos]
self._write_buffer_pos = 0
if self._write_buffer_frozen:
self._unfreeze_write_buffer()
self._total_write_done_index += num_bytes
except (socket.error, IOError, OSError) as e:
if e.args[0] in _ERRNO_WOULDBLOCK:
self._write_buffer_frozen = True
self._got_empty_write(size)
break
else:
if not self._is_connreset(e):
@ -864,22 +888,38 @@ class BaseIOStream(object):
self.fileno(), e)
self.close(exc_info=True)
return
if not self._write_buffer:
while self._write_futures:
index, future = self._write_futures[0]
if index > self._total_write_done_index:
break
self._write_futures.popleft()
future.set_result(None)
if not self._write_buffer_size:
if self._write_callback:
callback = self._write_callback
self._write_callback = None
self._run_callback(callback)
if self._write_future:
future = self._write_future
self._write_future = None
future.set_result(None)
def _consume(self, loc):
# Consume loc bytes from the read buffer and return them
if loc == 0:
return b""
_merge_prefix(self._read_buffer, loc)
assert loc <= self._read_buffer_size
# Slice the bytearray buffer into bytes, without intermediate copying
b = (memoryview(self._read_buffer)
[self._read_buffer_pos:self._read_buffer_pos + loc]
).tobytes()
self._read_buffer_pos += loc
self._read_buffer_size -= loc
return self._read_buffer.popleft()
# Amortized O(1) shrink
# (this heuristic is implemented natively in Python 3.4+
# but is replicated here for Python 2)
if self._read_buffer_pos > self._read_buffer_size:
del self._read_buffer[:self._read_buffer_pos]
self._read_buffer_pos = 0
return b
def _check_closed(self):
if self.closed():
@ -1125,7 +1165,7 @@ class IOStream(BaseIOStream):
suitably-configured `ssl.SSLContext` to disable.
"""
if (self._read_callback or self._read_future or
self._write_callback or self._write_future or
self._write_callback or self._write_futures or
self._connect_callback or self._connect_future or
self._pending_callbacks or self._closed or
self._read_buffer or self._write_buffer):
@ -1252,6 +1292,17 @@ class SSLIOStream(IOStream):
def writing(self):
return self._handshake_writing or super(SSLIOStream, self).writing()
def _got_empty_write(self, size):
# With OpenSSL, if we couldn't write the entire buffer,
# the very same string object must be used on the
# next call to send. Therefore we suppress
# merging the write buffer after an incomplete send.
# A cleaner solution would be to set
# SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER, but this is
# not yet accessible from python
# (http://bugs.python.org/issue8240)
self._freeze_write_buffer(size)
def _do_ssl_handshake(self):
# Based on code from test_ssl.py in the python stdlib
try:
@ -1499,53 +1550,6 @@ class PipeIOStream(BaseIOStream):
return chunk
def _double_prefix(deque):
"""Grow by doubling, but don't split the second chunk just because the
first one is small.
"""
new_len = max(len(deque[0]) * 2,
(len(deque[0]) + len(deque[1])))
_merge_prefix(deque, new_len)
def _merge_prefix(deque, size):
"""Replace the first entries in a deque of strings with a single
string of up to size bytes.
>>> d = collections.deque(['abc', 'de', 'fghi', 'j'])
>>> _merge_prefix(d, 5); print(d)
deque(['abcde', 'fghi', 'j'])
Strings will be split as necessary to reach the desired size.
>>> _merge_prefix(d, 7); print(d)
deque(['abcdefg', 'hi', 'j'])
>>> _merge_prefix(d, 3); print(d)
deque(['abc', 'defg', 'hi', 'j'])
>>> _merge_prefix(d, 100); print(d)
deque(['abcdefghij'])
"""
if len(deque) == 1 and len(deque[0]) <= size:
return
prefix = []
remaining = size
while deque and remaining > 0:
chunk = deque.popleft()
if len(chunk) > remaining:
deque.appendleft(chunk[remaining:])
chunk = chunk[:remaining]
prefix.append(chunk)
remaining -= len(chunk)
# This data structure normally just contains byte strings, but
# the unittest gets messy if it doesn't use the default str() type,
# so do the merge based on the type of data that's actually present.
if prefix:
deque.appendleft(type(prefix[0])().join(prefix))
if not deque:
deque.appendleft(b"")
def doctests():
import doctest
return doctest.DocTestSuite()

View file

@ -39,7 +39,7 @@ supported by `gettext` and related tools). If neither method is called,
the `Locale.translate` method will simply return the original string.
"""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import codecs
import csv
@ -187,7 +187,7 @@ def load_gettext_translations(directory, domain):
{directory}/{lang}/LC_MESSAGES/{domain}.mo
Three steps are required to have you app translated:
Three steps are required to have your app translated:
1. Generate POT translation file::

View file

@ -12,7 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import collections

View file

@ -28,7 +28,7 @@ These streams may be configured independently using the standard library's
`logging` module. For example, you may wish to send ``tornado.access`` logs
to a separate file for analysis.
"""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import logging
import logging.handlers
@ -37,6 +37,11 @@ import sys
from tornado.escape import _unicode
from tornado.util import unicode_type, basestring_type
try:
import colorama
except ImportError:
colorama = None
try:
import curses # type: ignore
except ImportError:
@ -49,15 +54,21 @@ gen_log = logging.getLogger("tornado.general")
def _stderr_supports_color():
color = False
if curses and hasattr(sys.stderr, 'isatty') and sys.stderr.isatty():
try:
if hasattr(sys.stderr, 'isatty') and sys.stderr.isatty():
if curses:
curses.setupterm()
if curses.tigetnum("colors") > 0:
color = True
return True
elif colorama:
if sys.stderr is getattr(colorama.initialise, 'wrapped_stderr',
object()):
return True
except Exception:
# Very broad exception handling because it's always better to
# fall back to non-colored logs than to break at startup.
pass
return color
return False
def _safe_unicode(s):
@ -79,6 +90,17 @@ class LogFormatter(logging.Formatter):
This formatter is enabled automatically by
`tornado.options.parse_command_line` or `tornado.options.parse_config_file`
(unless ``--logging=none`` is used).
Color support on Windows versions that do not support ANSI color codes is
enabled by use of the colorama__ library. Applications that wish to use
this must first initialize colorama with a call to ``colorama.init``.
See the colorama documentation for details.
__ https://pypi.python.org/pypi/colorama
.. versionchanged:: 4.5
Added support for ``colorama``. Changed the constructor
signature to be compatible with `logging.config.dictConfig`.
"""
DEFAULT_FORMAT = '%(color)s[%(levelname)1.1s %(asctime)s %(module)s:%(lineno)d]%(end_color)s %(message)s'
DEFAULT_DATE_FORMAT = '%y%m%d %H:%M:%S'
@ -89,8 +111,8 @@ class LogFormatter(logging.Formatter):
logging.ERROR: 1, # Red
}
def __init__(self, color=True, fmt=DEFAULT_FORMAT,
datefmt=DEFAULT_DATE_FORMAT, colors=DEFAULT_COLORS):
def __init__(self, fmt=DEFAULT_FORMAT, datefmt=DEFAULT_DATE_FORMAT,
style='%', color=True, colors=DEFAULT_COLORS):
r"""
:arg bool color: Enables color support.
:arg string fmt: Log message format.
@ -111,6 +133,7 @@ class LogFormatter(logging.Formatter):
self._colors = {}
if color and _stderr_supports_color():
if curses is not None:
# The curses module has some str/bytes confusion in
# python3. Until version 3.2.3, most methods return
# bytes, but only accept strings. In addition, we want to
@ -126,6 +149,12 @@ class LogFormatter(logging.Formatter):
for levelno, code in colors.items():
self._colors[levelno] = unicode_type(curses.tparm(fg_color, code), "ascii")
self._normal = unicode_type(curses.tigetstr("sgr0"), "ascii")
else:
# If curses is not present (currently we'll only get here for
# colorama on windows), assume hard-coded ANSI color codes.
for levelno, code in colors.items():
self._colors[levelno] = '\033[2;3%dm' % code
self._normal = '\033[0m'
else:
self._normal = ''

View file

@ -16,7 +16,7 @@
"""Miscellaneous network utility code."""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import errno
import os
@ -200,6 +200,7 @@ def bind_sockets(port, address=None, family=socket.AF_UNSPEC,
sockets.append(sock)
return sockets
if hasattr(socket, 'AF_UNIX'):
def bind_unix_socket(file, mode=0o600, backlog=_DEFAULT_BACKLOG):
"""Creates a listening unix socket.

View file

@ -82,7 +82,7 @@ instances to define isolated sets of options, such as for subcommands.
underscores.
"""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import datetime
import numbers

View file

@ -19,7 +19,7 @@ loops.
Windows. Use the `~asyncio.SelectorEventLoop` instead.
"""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import functools
import tornado.concurrent
@ -217,5 +217,6 @@ def to_asyncio_future(tornado_future):
tornado.concurrent.chain_future(tornado_future, af)
return af
if hasattr(convert_yielded, 'register'):
convert_yielded.register(asyncio.Future, to_tornado_future) # type: ignore

View file

@ -23,7 +23,7 @@ Most code that needs access to this functionality should do e.g.::
from tornado.platform.auto import set_close_exec
"""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import os

View file

@ -1,4 +1,4 @@
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import pycares # type: ignore
import socket

View file

@ -1,5 +1,5 @@
"""Lowest-common-denominator implementations of platform functionality."""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import errno
import socket
@ -8,6 +8,7 @@ import time
from tornado.platform import interface
from tornado.util import errno_from_exception
def try_close(f):
# Avoid issue #875 (race condition when using the file in another
# thread).

View file

@ -14,7 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
"""EPoll-based IOLoop implementation for Linux systems."""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import select

View file

@ -21,7 +21,7 @@ for other tornado.platform modules. Most code should import the appropriate
implementation from `tornado.platform.auto`.
"""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
def set_close_exec(fd):
@ -62,5 +62,6 @@ class Waker(object):
"""Closes the waker's file descriptor(s)."""
raise NotImplementedError()
def monotonic_time():
raise NotImplementedError()

View file

@ -14,7 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
"""KQueue-based IOLoop implementation for BSD/Mac systems."""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import select

View file

@ -16,7 +16,7 @@
"""Posix implementations of platform-specific functionality."""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import fcntl
import os

View file

@ -17,7 +17,7 @@
Used as a fallback for systems that don't support epoll or kqueue.
"""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import select

View file

@ -21,7 +21,7 @@ depending on which library's underlying event loop you want to use.
This module has been tested with Twisted versions 11.0.0 and newer.
"""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import datetime
import functools
@ -574,6 +574,7 @@ class TwistedResolver(Resolver):
]
raise gen.Return(result)
if hasattr(gen.convert_yielded, 'register'):
@gen.convert_yielded.register(Deferred) # type: ignore
def _(d):

View file

@ -2,7 +2,7 @@
# for production use.
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import ctypes # type: ignore
import ctypes.wintypes # type: ignore

View file

@ -18,7 +18,7 @@
the server into multiple processes and managing subprocesses.
"""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import errno
import os
@ -355,6 +355,10 @@ class Subprocess(object):
else:
assert os.WIFEXITED(status)
self.returncode = os.WEXITSTATUS(status)
# We've taken over wait() duty from the subprocess.Popen
# object. If we don't inform it of the process's return code,
# it will log a warning at destruction in python 3.6+.
self.proc.returncode = self.returncode
if self._exit_callback:
callback = self._exit_callback
self._exit_callback = None

View file

@ -12,7 +12,17 @@
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function, with_statement
"""Asynchronous queues for coroutines.
.. warning::
Unlike the standard library's `queue` module, the classes defined here
are *not* thread-safe. To use these queues from another thread,
use `.IOLoop.add_callback` to transfer control to the `.IOLoop` thread
before calling any queue methods.
"""
from __future__ import absolute_import, division, print_function
import collections
import heapq

View file

@ -12,16 +12,23 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Basic routing implementation.
"""Flexible routing implementation.
Tornado routes HTTP requests to appropriate handlers using `Router` class implementations.
Tornado routes HTTP requests to appropriate handlers using `Router`
class implementations. The `tornado.web.Application` class is a
`Router` implementation and may be used directly, or the classes in
this module may be used for additional flexibility. The `RuleRouter`
class can match on more criteria than `.Application`, or the `Router`
interface can be subclassed for maximum customization.
`Router` interface extends `~.httputil.HTTPServerConnectionDelegate` to provide additional
routing capabilities. This also means that any `Router` implementation can be used directly
as a ``request_callback`` for `~.httpserver.HTTPServer` constructor.
`Router` interface extends `~.httputil.HTTPServerConnectionDelegate`
to provide additional routing capabilities. This also means that any
`Router` implementation can be used directly as a ``request_callback``
for `~.httpserver.HTTPServer` constructor.
`Router` subclass must implement a ``find_handler`` method to provide a suitable
`~.httputil.HTTPMessageDelegate` instance to handle the request:
`Router` subclass must implement a ``find_handler`` method to provide
a suitable `~.httputil.HTTPMessageDelegate` instance to handle the
request:
.. code-block:: python
@ -44,16 +51,18 @@ as a ``request_callback`` for `~.httpserver.HTTPServer` constructor.
router = CustomRouter()
server = HTTPServer(router)
The main responsibility of `Router` implementation is to provide a mapping from a request
to `~.httputil.HTTPMessageDelegate` instance that will handle this request. In the example above
we can see that routing is possible even without instantiating an `~.web.Application`.
The main responsibility of `Router` implementation is to provide a
mapping from a request to `~.httputil.HTTPMessageDelegate` instance
that will handle this request. In the example above we can see that
routing is possible even without instantiating an `~.web.Application`.
For routing to `~.web.RequestHandler` implementations we need an `~.web.Application` instance.
`~.web.Application.get_handler_delegate` provides a convenient way to create
`~.httputil.HTTPMessageDelegate` for a given request and `~.web.RequestHandler`.
For routing to `~.web.RequestHandler` implementations we need an
`~.web.Application` instance. `~.web.Application.get_handler_delegate`
provides a convenient way to create `~.httputil.HTTPMessageDelegate`
for a given request and `~.web.RequestHandler`.
Here is a simple example of how we can we route to `~.web.RequestHandler` subclasses
by HTTP method:
Here is a simple example of how we can we route to
`~.web.RequestHandler` subclasses by HTTP method:
.. code-block:: python
@ -81,16 +90,18 @@ by HTTP method:
router = HTTPMethodRouter(Application())
server = HTTPServer(router)
`ReversibleRouter` interface adds the ability to distinguish between the routes and
reverse them to the original urls using route's name and additional arguments.
`~.web.Application` is itself an implementation of `ReversibleRouter` class.
`ReversibleRouter` interface adds the ability to distinguish between
the routes and reverse them to the original urls using route's name
and additional arguments. `~.web.Application` is itself an
implementation of `ReversibleRouter` class.
`RuleRouter` and `ReversibleRuleRouter` are implementations of `Router` and `ReversibleRouter`
interfaces and can be used for creating rule-based routing configurations.
`RuleRouter` and `ReversibleRuleRouter` are implementations of
`Router` and `ReversibleRouter` interfaces and can be used for
creating rule-based routing configurations.
Rules are instances of `Rule` class. They contain a `Matcher`, which provides the logic for
determining whether the rule is a match for a particular request and a target, which can be
one of the following.
Rules are instances of `Rule` class. They contain a `Matcher`, which
provides the logic for determining whether the rule is a match for a
particular request and a target, which can be one of the following.
1) An instance of `~.httputil.HTTPServerConnectionDelegate`:
@ -159,9 +170,12 @@ In the example below `RuleRouter` is used to route between applications:
server = HTTPServer(router)
For more information on application-level routing see docs for `~.web.Application`.
.. versionadded:: 4.5
"""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import re
from functools import partial

View file

@ -1,5 +1,5 @@
#!/usr/bin/env python
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
from tornado.escape import utf8, _unicode
from tornado import gen
@ -499,7 +499,7 @@ class _HTTPConnection(httputil.HTTPMessageDelegate):
def _should_follow_redirect(self):
return (self.request.follow_redirects and
self.request.max_redirects > 0 and
self.code in (301, 302, 303, 307))
self.code in (301, 302, 303, 307, 308))
def finish(self):
data = b''.join(self.chunks)

View file

@ -67,7 +67,7 @@ Here are a few rules of thumb for when it's necessary:
block that references your `StackContext`.
"""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import sys
import threading
@ -82,6 +82,8 @@ class StackContextInconsistentError(Exception):
class _State(threading.local):
def __init__(self):
self.contexts = (tuple(), None)
_state = _State()

View file

@ -16,7 +16,7 @@
"""A non-blocking TCP connection factory.
"""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import functools
import socket
@ -155,16 +155,30 @@ class TCPClient(object):
@gen.coroutine
def connect(self, host, port, af=socket.AF_UNSPEC, ssl_options=None,
max_buffer_size=None):
max_buffer_size=None, source_ip=None, source_port=None):
"""Connect to the given host and port.
Asynchronously returns an `.IOStream` (or `.SSLIOStream` if
``ssl_options`` is not None).
Using the ``source_ip`` kwarg, one can specify the source
IP address to use when establishing the connection.
In case the user needs to resolve and
use a specific interface, it has to be handled outside
of Tornado as this depends very much on the platform.
Similarly, when the user requires a certain source port, it can
be specified using the ``source_port`` arg.
.. versionchanged:: 4.5
Added the ``source_ip`` and ``source_port`` arguments.
"""
addrinfo = yield self.resolver.resolve(host, port, af)
connector = _Connector(
addrinfo, self.io_loop,
functools.partial(self._create_stream, max_buffer_size))
functools.partial(self._create_stream, max_buffer_size,
source_ip=source_ip, source_port=source_port)
)
af, addr, stream = yield connector.start()
# TODO: For better performance we could cache the (af, addr)
# information here and re-use it on subsequent connections to
@ -174,11 +188,30 @@ class TCPClient(object):
server_hostname=host)
raise gen.Return(stream)
def _create_stream(self, max_buffer_size, af, addr):
def _create_stream(self, max_buffer_size, af, addr, source_ip=None,
source_port=None):
# Always connect in plaintext; we'll convert to ssl if necessary
# after one connection has completed.
source_port_bind = source_port if isinstance(source_port, int) else 0
source_ip_bind = source_ip
if source_port_bind and not source_ip:
# User required a specific port, but did not specify
# a certain source IP, will bind to the default loopback.
source_ip_bind = '::1' if af == socket.AF_INET6 else '127.0.0.1'
# Trying to use the same address family as the requested af socket:
# - 127.0.0.1 for IPv4
# - ::1 for IPv6
socket_obj = socket.socket(af)
if source_port_bind or source_ip_bind:
# If the user requires binding also to a specific IP/port.
try:
stream = IOStream(socket.socket(af),
socket_obj.bind((source_ip_bind, source_port_bind))
except socket.error:
socket_obj.close()
# Fail loudly if unable to use the IP/port.
raise
try:
stream = IOStream(socket_obj,
io_loop=self.io_loop,
max_buffer_size=max_buffer_size)
except socket.error as e:

View file

@ -15,7 +15,7 @@
# under the License.
"""A non-blocking, single-threaded TCP server."""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import errno
import os

View file

@ -196,7 +196,7 @@ if you need to include a literal ``{{``, ``{%``, or ``{#`` in the output.
`filter_whitespace` for available options. New in Tornado 4.3.
"""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import datetime
import linecache

View file

@ -10,7 +10,7 @@
for the tornado.autoreload module to rerun the tests when code changes.
"""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
try:
from tornado import gen
@ -656,7 +656,9 @@ def main(**kwargs):
This test runner is essentially equivalent to `unittest.main` from
the standard library, but adds support for tornado-style option
parsing and log formatting.
parsing and log formatting. It is *not* necessary to use this
`main` function to run tests using `AsyncTestCase`; these tests
are self-contained and can run with any test runner.
The easiest way to run a test is via the command line::
@ -735,5 +737,6 @@ def main(**kwargs):
gen_log.error('FAIL')
raise
if __name__ == '__main__':
main()

View file

@ -10,7 +10,7 @@ interface of its subclasses, including `.AsyncHTTPClient`, `.IOLoop`,
and `.Resolver`.
"""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import array
import atexit
@ -193,7 +193,11 @@ def exec_in(code, glob, loc=None):
if PY3:
exec("""
def raise_exc_info(exc_info):
try:
raise exc_info[1].with_traceback(exc_info[2])
finally:
exc_info = None
""")
else:
exec("""
@ -232,6 +236,7 @@ def _re_unescape_replacement(match):
raise ValueError("cannot unescape '\\\\%s'" % group[0])
return group
_re_unescape_pattern = re.compile(r'\\(.)', re.DOTALL)
@ -450,6 +455,7 @@ def _websocket_mask_python(mask, data):
else:
return unmasked_arr.tostring()
if (os.environ.get('TORNADO_NO_EXTENSION') or
os.environ.get('TORNADO_EXTENSION') == '0'):
# These environment variables exist to make it easier to do performance

View file

@ -56,7 +56,7 @@ request.
"""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import base64
import binascii
@ -756,45 +756,21 @@ class RequestHandler(object):
if body_part:
html_bodies.append(utf8(body_part))
def is_absolute(path):
return any(path.startswith(x) for x in ["/", "http:", "https:"])
if js_files:
# Maintain order of JavaScript files given by modules
paths = []
unique_paths = set()
for path in js_files:
if not is_absolute(path):
path = self.static_url(path)
if path not in unique_paths:
paths.append(path)
unique_paths.add(path)
js = ''.join('<script src="' + escape.xhtml_escape(p) +
'" type="text/javascript"></script>'
for p in paths)
js = self.render_linked_js(js_files)
sloc = html.rindex(b'</body>')
html = html[:sloc] + utf8(js) + b'\n' + html[sloc:]
if js_embed:
js = b'<script type="text/javascript">\n//<![CDATA[\n' + \
b'\n'.join(js_embed) + b'\n//]]>\n</script>'
js = self.render_embed_js(js_embed)
sloc = html.rindex(b'</body>')
html = html[:sloc] + js + b'\n' + html[sloc:]
if css_files:
paths = []
unique_paths = set()
for path in css_files:
if not is_absolute(path):
path = self.static_url(path)
if path not in unique_paths:
paths.append(path)
unique_paths.add(path)
css = ''.join('<link href="' + escape.xhtml_escape(p) + '" '
'type="text/css" rel="stylesheet"/>'
for p in paths)
css = self.render_linked_css(css_files)
hloc = html.index(b'</head>')
html = html[:hloc] + utf8(css) + b'\n' + html[hloc:]
if css_embed:
css = b'<style type="text/css">\n' + b'\n'.join(css_embed) + \
b'\n</style>'
css = self.render_embed_css(css_embed)
hloc = html.index(b'</head>')
html = html[:hloc] + css + b'\n' + html[hloc:]
if html_heads:
@ -805,6 +781,64 @@ class RequestHandler(object):
html = html[:hloc] + b''.join(html_bodies) + b'\n' + html[hloc:]
self.finish(html)
def render_linked_js(self, js_files):
"""Default method used to render the final js links for the
rendered webpage.
Override this method in a sub-classed controller to change the output.
"""
paths = []
unique_paths = set()
for path in js_files:
if not is_absolute(path):
path = self.static_url(path)
if path not in unique_paths:
paths.append(path)
unique_paths.add(path)
return ''.join('<script src="' + escape.xhtml_escape(p) +
'" type="text/javascript"></script>'
for p in paths)
def render_embed_js(self, js_embed):
"""Default method used to render the final embedded js for the
rendered webpage.
Override this method in a sub-classed controller to change the output.
"""
return b'<script type="text/javascript">\n//<![CDATA[\n' + \
b'\n'.join(js_embed) + b'\n//]]>\n</script>'
def render_linked_css(self, css_files):
"""Default method used to render the final css links for the
rendered webpage.
Override this method in a sub-classed controller to change the output.
"""
paths = []
unique_paths = set()
for path in css_files:
if not is_absolute(path):
path = self.static_url(path)
if path not in unique_paths:
paths.append(path)
unique_paths.add(path)
return ''.join('<link href="' + escape.xhtml_escape(p) + '" '
'type="text/css" rel="stylesheet"/>'
for p in paths)
def render_embed_css(self, css_embed):
"""Default method used to render the final embedded css for the
rendered webpage.
Override this method in a sub-classed controller to change the output.
"""
return b'<style type="text/css">\n' + b'\n'.join(css_embed) + \
b'\n</style>'
def render_string(self, template_name, **kwargs):
"""Generate the given template with the given arguments.
@ -959,6 +993,9 @@ class RequestHandler(object):
self._log()
self._finished = True
self.on_finish()
self._break_cycles()
def _break_cycles(self):
# Break up a reference cycle between this handler and the
# _ui_module closures to allow for faster GC on CPython.
self.ui = None
@ -1672,10 +1709,6 @@ def stream_request_body(cls):
* The regular HTTP method (``post``, ``put``, etc) will be called after
the entire body has been read.
There is a subtle interaction between ``data_received`` and asynchronous
``prepare``: The first call to ``data_received`` may occur at any point
after the call to ``prepare`` has returned *or yielded*.
See the `file receiver demo <https://github.com/tornadoweb/tornado/tree/master/demos/file_upload/>`_
for example usage.
"""
@ -1834,6 +1867,8 @@ class Application(ReversibleRouter):
`StaticFileHandler` can be specified with the
``static_handler_class`` setting.
.. versionchanged:: 4.5
Integration with the new `tornado.routing` module.
"""
def __init__(self, handlers=None, default_host=None, transforms=None,
**settings):
@ -2218,6 +2253,9 @@ class RedirectHandler(RequestHandler):
Use Python's :ref:`format string syntax <formatstrings>` to customize how
values are substituted.
.. versionchanged:: 4.5
Added support for substitutions into the destination URL.
"""
def initialize(self, url, permanent=True):
self._url = url
@ -3084,6 +3122,7 @@ def create_signed_value(secret, name, value, version=None, clock=None,
else:
raise ValueError("Unsupported version %d" % version)
# A leading version number in decimal
# with no leading zeros, followed by a pipe.
_signed_value_version_re = re.compile(br"^([1-9][0-9]*)\|(.*)$")
@ -3240,3 +3279,7 @@ def _create_signature_v2(secret, s):
hash = hmac.new(utf8(secret), digestmod=hashlib.sha256)
hash.update(utf8(s))
return utf8(hash.hexdigest())
def is_absolute(path):
return any(path.startswith(x) for x in ["/", "http:", "https:"])

View file

@ -16,7 +16,7 @@ the protocol (known as "draft 76") and are not compatible with this module.
Removed support for the draft 76 protocol version.
"""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
# Author: Jacob Kristhammar, 2010
import base64
@ -30,8 +30,8 @@ import zlib
from tornado.concurrent import TracebackFuture
from tornado.escape import utf8, native_str, to_unicode
from tornado import httpclient, httputil
from tornado.ioloop import IOLoop
from tornado import gen, httpclient, httputil
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.iostream import StreamClosedError
from tornado.log import gen_log, app_log
from tornado import simple_httpclient
@ -65,6 +65,10 @@ class WebSocketHandler(tornado.web.RequestHandler):
override `open` and `on_close` to handle opened and closed
connections.
Custom upgrade response headers can be sent by overriding
`~tornado.web.RequestHandler.set_default_headers` or
`~tornado.web.RequestHandler.prepare`.
See http://dev.w3.org/html5/websockets/ for details on the
JavaScript interface. The protocol is specified at
http://tools.ietf.org/html/rfc6455.
@ -122,6 +126,17 @@ class WebSocketHandler(tornado.web.RequestHandler):
to show the "accept this certificate" dialog but has nowhere to show it.
You must first visit a regular HTML page using the same certificate
to accept it before the websocket connection will succeed.
If the application setting ``websocket_ping_interval`` has a non-zero
value, a ping will be sent periodically, and the connection will be
closed if a response is not received before the ``websocket_ping_timeout``.
Messages larger than the ``websocket_max_message_size`` application setting
(default 10MiB) will not be accepted.
.. versionchanged:: 4.5
Added ``websocket_ping_interval``, ``websocket_ping_timeout``, and
``websocket_max_message_size``.
"""
def __init__(self, application, request, **kwargs):
super(WebSocketHandler, self).__init__(application, request, **kwargs)
@ -176,18 +191,42 @@ class WebSocketHandler(tornado.web.RequestHandler):
gen_log.debug(log_msg)
return
self.stream = self.request.connection.detach()
self.stream.set_close_callback(self.on_connection_close)
self.ws_connection = self.get_websocket_protocol()
if self.ws_connection:
self.ws_connection.accept_connection()
else:
if not self.stream.closed():
self.stream.write(tornado.escape.utf8(
"HTTP/1.1 426 Upgrade Required\r\n"
"Sec-WebSocket-Version: 7, 8, 13\r\n\r\n"))
self.stream.close()
self.set_status(426, "Upgrade Required")
self.set_header("Sec-WebSocket-Version", "7, 8, 13")
self.finish()
stream = None
@property
def ping_interval(self):
"""The interval for websocket keep-alive pings.
Set websocket_ping_interval = 0 to disable pings.
"""
return self.settings.get('websocket_ping_interval', None)
@property
def ping_timeout(self):
"""If no ping is received in this many seconds,
close the websocket connection (VPNs, etc. can fail to cleanly close ws connections).
Default is max of 3 pings or 30 seconds.
"""
return self.settings.get('websocket_ping_timeout', None)
@property
def max_message_size(self):
"""Maximum allowed message size.
If the remote peer sends a message larger than this, the connection
will be closed.
Default is 10MiB.
"""
return self.settings.get('websocket_max_message_size', None)
def write_message(self, message, binary=False):
"""Sends the given message to the client of this Web Socket.
@ -231,11 +270,22 @@ class WebSocketHandler(tornado.web.RequestHandler):
If this method returns None (the default), compression will
be disabled. If it returns a dict (even an empty one), it
will be enabled. The contents of the dict may be used to
control the memory and CPU usage of the compression,
but no such options are currently implemented.
control the following compression options:
``compression_level`` specifies the compression level.
``mem_level`` specifies the amount of memory used for the internal compression state.
These parameters are documented in details here:
https://docs.python.org/3.6/library/zlib.html#zlib.compressobj
.. versionadded:: 4.1
.. versionchanged:: 4.5
Added ``compression_level`` and ``mem_level``.
"""
# TODO: Add wbits option.
return None
def open(self, *args, **kwargs):
@ -251,6 +301,10 @@ class WebSocketHandler(tornado.web.RequestHandler):
"""Handle incoming messages on the WebSocket
This method must be overridden.
.. versionchanged:: 4.5
``on_message`` can be a coroutine.
"""
raise NotImplementedError
@ -264,6 +318,10 @@ class WebSocketHandler(tornado.web.RequestHandler):
"""Invoked when the response to a ping frame is received."""
pass
def on_ping(self, data):
"""Invoked when the a ping frame is received."""
pass
def on_close(self):
"""Invoked when the WebSocket is closed.
@ -319,7 +377,7 @@ class WebSocketHandler(tornado.web.RequestHandler):
This is an important security measure; don't disable it
without understanding the security implications. In
particular, if your authenticatino is cookie-based, you
particular, if your authentication is cookie-based, you
must either restrict the origins allowed by
``check_origin()`` or implement your own XSRF-like
protection for websocket connections. See `these
@ -376,6 +434,16 @@ class WebSocketHandler(tornado.web.RequestHandler):
if not self._on_close_called:
self._on_close_called = True
self.on_close()
self._break_cycles()
def _break_cycles(self):
# WebSocketHandlers call finish() early, but we don't want to
# break up reference cycles (which makes it impossible to call
# self.render_string) until after we've really closed the
# connection (if it was established in the first place,
# indicated by status code 101).
if self.get_status() != 101 or self._on_close_called:
super(WebSocketHandler, self)._break_cycles()
def send_error(self, *args, **kwargs):
if self.stream is None:
@ -393,18 +461,17 @@ class WebSocketHandler(tornado.web.RequestHandler):
return WebSocketProtocol13(
self, compression_options=self.get_compression_options())
def _wrap_method(method):
def _disallow_for_websocket(self, *args, **kwargs):
if self.stream is None:
method(self, *args, **kwargs)
else:
raise RuntimeError("Method not supported for Web Sockets")
return _disallow_for_websocket
for method in ["write", "redirect", "set_header", "set_cookie",
def _attach_stream(self):
self.stream = self.request.connection.detach()
self.stream.set_close_callback(self.on_connection_close)
# disable non-WS methods
for method in ["write", "redirect", "set_header", "set_cookie",
"set_status", "flush", "finish"]:
setattr(WebSocketHandler, method,
_wrap_method(getattr(WebSocketHandler, method)))
setattr(self, method, _raise_not_supported_for_websockets)
def _raise_not_supported_for_websockets(*args, **kwargs):
raise RuntimeError("Method not supported for Web Sockets")
class WebSocketProtocol(object):
@ -420,14 +487,20 @@ class WebSocketProtocol(object):
def _run_callback(self, callback, *args, **kwargs):
"""Runs the given callback with exception handling.
On error, aborts the websocket connection and returns False.
If the callback is a coroutine, returns its Future. On error, aborts the
websocket connection and returns None.
"""
try:
callback(*args, **kwargs)
result = callback(*args, **kwargs)
except Exception:
app_log.error("Uncaught exception in %s",
self.request.path, exc_info=True)
getattr(self.request, 'path', None), exc_info=True)
self._abort()
else:
if result is not None:
result = gen.convert_yielded(result)
self.stream.io_loop.add_future(result, lambda f: f.result())
return result
def on_connection_close(self):
self._abort()
@ -441,7 +514,7 @@ class WebSocketProtocol(object):
class _PerMessageDeflateCompressor(object):
def __init__(self, persistent, max_wbits):
def __init__(self, persistent, max_wbits, compression_options=None):
if max_wbits is None:
max_wbits = zlib.MAX_WBITS
# There is no symbolic constant for the minimum wbits value.
@ -449,14 +522,24 @@ class _PerMessageDeflateCompressor(object):
raise ValueError("Invalid max_wbits value %r; allowed range 8-%d",
max_wbits, zlib.MAX_WBITS)
self._max_wbits = max_wbits
if compression_options is None or 'compression_level' not in compression_options:
self._compression_level = tornado.web.GZipContentEncoding.GZIP_LEVEL
else:
self._compression_level = compression_options['compression_level']
if compression_options is None or 'mem_level' not in compression_options:
self._mem_level = 8
else:
self._mem_level = compression_options['mem_level']
if persistent:
self._compressor = self._create_compressor()
else:
self._compressor = None
def _create_compressor(self):
return zlib.compressobj(tornado.web.GZipContentEncoding.GZIP_LEVEL,
zlib.DEFLATED, -self._max_wbits)
return zlib.compressobj(self._compression_level, zlib.DEFLATED, -self._max_wbits, self._mem_level)
def compress(self, data):
compressor = self._compressor or self._create_compressor()
@ -467,7 +550,7 @@ class _PerMessageDeflateCompressor(object):
class _PerMessageDeflateDecompressor(object):
def __init__(self, persistent, max_wbits):
def __init__(self, persistent, max_wbits, compression_options=None):
if max_wbits is None:
max_wbits = zlib.MAX_WBITS
if not (8 <= max_wbits <= zlib.MAX_WBITS):
@ -526,6 +609,9 @@ class WebSocketProtocol13(WebSocketProtocol):
# the effect of compression, frame overhead, and control frames.
self._wire_bytes_in = 0
self._wire_bytes_out = 0
self.ping_callback = None
self.last_ping = 0
self.last_pong = 0
def accept_connection(self):
try:
@ -562,46 +648,42 @@ class WebSocketProtocol13(WebSocketProtocol):
self.request.headers.get("Sec-Websocket-Key"))
def _accept_connection(self):
subprotocol_header = ''
subprotocols = self.request.headers.get("Sec-WebSocket-Protocol", '')
subprotocols = [s.strip() for s in subprotocols.split(',')]
if subprotocols:
selected = self.handler.select_subprotocol(subprotocols)
if selected:
assert selected in subprotocols
subprotocol_header = ("Sec-WebSocket-Protocol: %s\r\n"
% selected)
self.handler.set_header("Sec-WebSocket-Protocol", selected)
extension_header = ''
extensions = self._parse_extensions_header(self.request.headers)
for ext in extensions:
if (ext[0] == 'permessage-deflate' and
self._compression_options is not None):
# TODO: negotiate parameters if compression_options
# specifies limits.
self._create_compressors('server', ext[1])
self._create_compressors('server', ext[1], self._compression_options)
if ('client_max_window_bits' in ext[1] and
ext[1]['client_max_window_bits'] is None):
# Don't echo an offered client_max_window_bits
# parameter with no value.
del ext[1]['client_max_window_bits']
extension_header = ('Sec-WebSocket-Extensions: %s\r\n' %
self.handler.set_header("Sec-WebSocket-Extensions",
httputil._encode_header(
'permessage-deflate', ext[1]))
break
if self.stream.closed():
self._abort()
return
self.stream.write(tornado.escape.utf8(
"HTTP/1.1 101 Switching Protocols\r\n"
"Upgrade: websocket\r\n"
"Connection: Upgrade\r\n"
"Sec-WebSocket-Accept: %s\r\n"
"%s%s"
"\r\n" % (self._challenge_response(),
subprotocol_header, extension_header)))
self.handler.clear_header("Content-Type")
self.handler.set_status(101)
self.handler.set_header("Upgrade", "websocket")
self.handler.set_header("Connection", "Upgrade")
self.handler.set_header("Sec-WebSocket-Accept", self._challenge_response())
self.handler.finish()
self.handler._attach_stream()
self.stream = self.handler.stream
self.start_pinging()
self._run_callback(self.handler.open, *self.handler.open_args,
**self.handler.open_kwargs)
self._receive_frame()
@ -631,7 +713,7 @@ class WebSocketProtocol13(WebSocketProtocol):
else:
raise ValueError("unsupported extension %r", ext)
def _get_compressor_options(self, side, agreed_parameters):
def _get_compressor_options(self, side, agreed_parameters, compression_options=None):
"""Converts a websocket agreed_parameters set to keyword arguments
for our compressor objects.
"""
@ -642,9 +724,10 @@ class WebSocketProtocol13(WebSocketProtocol):
options['max_wbits'] = zlib.MAX_WBITS
else:
options['max_wbits'] = int(wbits_header)
options['compression_options'] = compression_options
return options
def _create_compressors(self, side, agreed_parameters):
def _create_compressors(self, side, agreed_parameters, compression_options=None):
# TODO: handle invalid parameters gracefully
allowed_keys = set(['server_no_context_takeover',
'client_no_context_takeover',
@ -655,9 +738,9 @@ class WebSocketProtocol13(WebSocketProtocol):
raise ValueError("unsupported compression parameter %r" % key)
other_side = 'client' if (side == 'server') else 'server'
self._compressor = _PerMessageDeflateCompressor(
**self._get_compressor_options(side, agreed_parameters))
**self._get_compressor_options(side, agreed_parameters, compression_options))
self._decompressor = _PerMessageDeflateDecompressor(
**self._get_compressor_options(other_side, agreed_parameters))
**self._get_compressor_options(other_side, agreed_parameters, compression_options))
def _write_frame(self, fin, opcode, data, flags=0):
if fin:
@ -738,8 +821,7 @@ class WebSocketProtocol13(WebSocketProtocol):
if self._masked_frame:
self.stream.read_bytes(4, self._on_masking_key)
else:
self.stream.read_bytes(self._frame_length,
self._on_frame_data)
self._read_frame_data(False)
elif payloadlen == 126:
self.stream.read_bytes(2, self._on_frame_length_16)
elif payloadlen == 127:
@ -747,6 +829,17 @@ class WebSocketProtocol13(WebSocketProtocol):
except StreamClosedError:
self._abort()
def _read_frame_data(self, masked):
new_len = self._frame_length
if self._fragmented_message_buffer is not None:
new_len += len(self._fragmented_message_buffer)
if new_len > (self.handler.max_message_size or 10 * 1024 * 1024):
self.close(1009, "message too big")
return
self.stream.read_bytes(
self._frame_length,
self._on_masked_frame_data if masked else self._on_frame_data)
def _on_frame_length_16(self, data):
self._wire_bytes_in += len(data)
self._frame_length = struct.unpack("!H", data)[0]
@ -754,7 +847,7 @@ class WebSocketProtocol13(WebSocketProtocol):
if self._masked_frame:
self.stream.read_bytes(4, self._on_masking_key)
else:
self.stream.read_bytes(self._frame_length, self._on_frame_data)
self._read_frame_data(False)
except StreamClosedError:
self._abort()
@ -765,7 +858,7 @@ class WebSocketProtocol13(WebSocketProtocol):
if self._masked_frame:
self.stream.read_bytes(4, self._on_masking_key)
else:
self.stream.read_bytes(self._frame_length, self._on_frame_data)
self._read_frame_data(False)
except StreamClosedError:
self._abort()
@ -773,8 +866,7 @@ class WebSocketProtocol13(WebSocketProtocol):
self._wire_bytes_in += len(data)
self._frame_mask = data
try:
self.stream.read_bytes(self._frame_length,
self._on_masked_frame_data)
self._read_frame_data(True)
except StreamClosedError:
self._abort()
@ -783,6 +875,8 @@ class WebSocketProtocol13(WebSocketProtocol):
self._on_frame_data(_websocket_mask(self._frame_mask, data))
def _on_frame_data(self, data):
handled_future = None
self._wire_bytes_in += len(data)
if self._frame_opcode_is_control:
# control frames may be interleaved with a series of fragmented
@ -815,12 +909,18 @@ class WebSocketProtocol13(WebSocketProtocol):
self._fragmented_message_buffer = data
if self._final_frame:
self._handle_message(opcode, data)
handled_future = self._handle_message(opcode, data)
if not self.client_terminated:
if handled_future:
# on_message is a coroutine, process more frames once it's done.
handled_future.add_done_callback(
lambda future: self._receive_frame())
else:
self._receive_frame()
def _handle_message(self, opcode, data):
"""Execute on_message, returning its Future if it is a coroutine."""
if self.client_terminated:
return
@ -835,11 +935,11 @@ class WebSocketProtocol13(WebSocketProtocol):
except UnicodeDecodeError:
self._abort()
return
self._run_callback(self.handler.on_message, decoded)
return self._run_callback(self.handler.on_message, decoded)
elif opcode == 0x2:
# Binary data
self._message_bytes_in += len(data)
self._run_callback(self.handler.on_message, data)
return self._run_callback(self.handler.on_message, data)
elif opcode == 0x8:
# Close
self.client_terminated = True
@ -852,9 +952,11 @@ class WebSocketProtocol13(WebSocketProtocol):
elif opcode == 0x9:
# Ping
self._write_frame(True, 0xA, data)
self._run_callback(self.handler.on_ping, data)
elif opcode == 0xA:
# Pong
self._run_callback(self.handler.on_pong, data)
self.last_pong = IOLoop.current().time()
return self._run_callback(self.handler.on_pong, data)
else:
self._abort()
@ -883,6 +985,51 @@ class WebSocketProtocol13(WebSocketProtocol):
self._waiting = self.stream.io_loop.add_timeout(
self.stream.io_loop.time() + 5, self._abort)
@property
def ping_interval(self):
interval = self.handler.ping_interval
if interval is not None:
return interval
return 0
@property
def ping_timeout(self):
timeout = self.handler.ping_timeout
if timeout is not None:
return timeout
return max(3 * self.ping_interval, 30)
def start_pinging(self):
"""Start sending periodic pings to keep the connection alive"""
if self.ping_interval > 0:
self.last_ping = self.last_pong = IOLoop.current().time()
self.ping_callback = PeriodicCallback(
self.periodic_ping, self.ping_interval * 1000)
self.ping_callback.start()
def periodic_ping(self):
"""Send a ping to keep the websocket alive
Called periodically if the websocket_ping_interval is set and non-zero.
"""
if self.stream.closed() and self.ping_callback is not None:
self.ping_callback.stop()
return
# Check for timeout on pong. Make sure that we really have
# sent a recent ping in case the machine with both server and
# client has been suspended since the last ping.
now = IOLoop.current().time()
since_last_pong = now - self.last_pong
since_last_ping = now - self.last_ping
if (since_last_ping < 2 * self.ping_interval and
since_last_pong > self.ping_timeout):
self.close()
return
self.write_ping(b'')
self.last_ping = now
class WebSocketClientConnection(simple_httpclient._HTTPConnection):
"""WebSocket client connection.
@ -891,7 +1038,8 @@ class WebSocketClientConnection(simple_httpclient._HTTPConnection):
`websocket_connect` function instead.
"""
def __init__(self, io_loop, request, on_message_callback=None,
compression_options=None):
compression_options=None, ping_interval=None, ping_timeout=None,
max_message_size=None):
self.compression_options = compression_options
self.connect_future = TracebackFuture()
self.protocol = None
@ -900,6 +1048,9 @@ class WebSocketClientConnection(simple_httpclient._HTTPConnection):
self.key = base64.b64encode(os.urandom(16))
self._on_message_callback = on_message_callback
self.close_code = self.close_reason = None
self.ping_interval = ping_interval
self.ping_timeout = ping_timeout
self.max_message_size = max_message_size
scheme, sep, rest = request.url.partition(':')
scheme = {'ws': 'http', 'wss': 'https'}[scheme]
@ -963,6 +1114,7 @@ class WebSocketClientConnection(simple_httpclient._HTTPConnection):
self.headers = headers
self.protocol = self.get_websocket_protocol()
self.protocol._process_server_headers(self.key, self.headers)
self.protocol.start_pinging()
self.protocol._receive_frame()
if self._timeout is not None:
@ -1016,13 +1168,18 @@ class WebSocketClientConnection(simple_httpclient._HTTPConnection):
def on_pong(self, data):
pass
def on_ping(self, data):
pass
def get_websocket_protocol(self):
return WebSocketProtocol13(self, mask_outgoing=True,
compression_options=self.compression_options)
def websocket_connect(url, io_loop=None, callback=None, connect_timeout=None,
on_message_callback=None, compression_options=None):
on_message_callback=None, compression_options=None,
ping_interval=None, ping_timeout=None,
max_message_size=None):
"""Client-side websocket support.
Takes a url and returns a Future whose result is a
@ -1051,6 +1208,10 @@ def websocket_connect(url, io_loop=None, callback=None, connect_timeout=None,
.. versionchanged:: 4.1
Added ``compression_options`` and ``on_message_callback``.
The ``io_loop`` argument is deprecated.
.. versionchanged:: 4.5
Added the ``ping_interval``, ``ping_timeout``, and ``max_message_size``
arguments, which have the same meaning as in `WebSocketHandler`.
"""
if io_loop is None:
io_loop = IOLoop.current()
@ -1066,7 +1227,10 @@ def websocket_connect(url, io_loop=None, callback=None, connect_timeout=None,
request, httpclient.HTTPRequest._DEFAULTS)
conn = WebSocketClientConnection(io_loop, request,
on_message_callback=on_message_callback,
compression_options=compression_options)
compression_options=compression_options,
ping_interval=ping_interval,
ping_timeout=ping_timeout,
max_message_size=max_message_size)
if callback is not None:
io_loop.add_future(conn.connect_future, callback)
return conn.connect_future

View file

@ -29,7 +29,7 @@ provides WSGI support in two ways:
and Tornado handlers in a single server.
"""
from __future__ import absolute_import, division, print_function, with_statement
from __future__ import absolute_import, division, print_function
import sys
from io import BytesIO