Merge branch 'feature/UpdateTornado' into develop

This commit is contained in:
JackDandy 2018-09-23 22:40:56 +01:00
commit 9a36b6dd33
25 changed files with 859 additions and 506 deletions

View file

@ -10,6 +10,7 @@
* Update Requests library 2.15.1 (282b01a) to 2.19.1 (33b41c7)
* Update scandir module 1.6 (c3592ee) to 1.9.0 (9ab3d1f)
* Update SimpleJSON 3.13.2 (6ffddbe) to 3.16.0 (e2a54f7)
* Update Tornado Web Server 5.0.1 (2b2a220a) to 5.1.1 (cc2cf07)
* Update unidecode module 1.0.22 (81f938d) to 1.0.22 (578cdb9)
* Add idna library 2.7 (0f50bdc)
* Add urllib3 release 1.23 (7c216f4)

View file

@ -24,5 +24,5 @@ from __future__ import absolute_import, division, print_function
# is zero for an official release, positive for a development branch,
# or negative for a release candidate or beta (after the base version
# number has been incremented)
version = "5.1.dev1"
version_info = (5, 1, 0, -100)
version = "5.1.1"
version_info = (5, 1, 1, 0)

View file

@ -37,15 +37,14 @@ Example usage for Google OAuth:
class GoogleOAuth2LoginHandler(tornado.web.RequestHandler,
tornado.auth.GoogleOAuth2Mixin):
@tornado.gen.coroutine
def get(self):
async def get(self):
if self.get_argument('code', False):
user = yield self.get_authenticated_user(
user = await self.get_authenticated_user(
redirect_uri='http://your.site.com/auth/google',
code=self.get_argument('code'))
# Save the user with e.g. set_secure_cookie
else:
yield self.authorize_redirect(
await self.authorize_redirect(
redirect_uri='http://your.site.com/auth/google',
client_id=self.settings['google_oauth']['key'],
scope=['profile', 'email'],
@ -75,15 +74,15 @@ import time
import uuid
import warnings
from tornado.concurrent import (Future, return_future, chain_future,
future_set_exc_info,
from tornado.concurrent import (Future, _non_deprecated_return_future,
future_set_exc_info, chain_future,
future_set_result_unless_cancelled)
from tornado import gen
from tornado import httpclient
from tornado import escape
from tornado.httputil import url_concat
from tornado.log import gen_log
from tornado.stack_context import ExceptionStackContext
from tornado.stack_context import ExceptionStackContext, wrap
from tornado.util import unicode_type, ArgReplacer, PY3
if PY3:
@ -128,7 +127,7 @@ def _auth_return_future(f):
warnings.warn("callback arguments are deprecated, use the returned Future instead",
DeprecationWarning)
future.add_done_callback(
functools.partial(_auth_future_to_callback, callback))
wrap(functools.partial(_auth_future_to_callback, callback)))
def handle_exception(typ, value, tb):
if future.done():
@ -136,7 +135,7 @@ def _auth_return_future(f):
else:
future_set_exc_info(future, (typ, value, tb))
return True
with ExceptionStackContext(handle_exception):
with ExceptionStackContext(handle_exception, delay_warning=True):
f(*args, **kwargs)
return future
return wrapper
@ -149,7 +148,7 @@ class OpenIdMixin(object):
* ``_OPENID_ENDPOINT``: the identity provider's URI.
"""
@return_future
@_non_deprecated_return_future
def authenticate_redirect(self, callback_uri=None,
ax_attrs=["name", "email", "language", "username"],
callback=None):
@ -203,8 +202,8 @@ class OpenIdMixin(object):
if http_client is None:
http_client = self.get_auth_http_client()
fut = http_client.fetch(url, method="POST", body=urllib_parse.urlencode(args))
fut.add_done_callback(functools.partial(
self._on_authentication_verified, callback))
fut.add_done_callback(wrap(functools.partial(
self._on_authentication_verified, callback)))
def _openid_args(self, callback_uri, ax_attrs=[], oauth_scope=None):
url = urlparse.urljoin(self.request.full_url(), callback_uri)
@ -344,7 +343,7 @@ class OAuthMixin(object):
Subclasses must also override the `_oauth_get_user_future` and
`_oauth_consumer_token` methods.
"""
@return_future
@_non_deprecated_return_future
def authorize_redirect(self, callback_uri=None, extra_params=None,
http_client=None, callback=None):
"""Redirects the user to obtain OAuth authorization for this service.
@ -382,18 +381,18 @@ class OAuthMixin(object):
fut = http_client.fetch(
self._oauth_request_token_url(callback_uri=callback_uri,
extra_params=extra_params))
fut.add_done_callback(functools.partial(
fut.add_done_callback(wrap(functools.partial(
self._on_request_token,
self._OAUTH_AUTHORIZE_URL,
callback_uri,
callback))
callback)))
else:
fut = http_client.fetch(self._oauth_request_token_url())
fut.add_done_callback(
functools.partial(
wrap(functools.partial(
self._on_request_token, self._OAUTH_AUTHORIZE_URL,
callback_uri,
callback))
callback)))
@_auth_return_future
def get_authenticated_user(self, callback, http_client=None):
@ -433,7 +432,7 @@ class OAuthMixin(object):
if http_client is None:
http_client = self.get_auth_http_client()
fut = http_client.fetch(self._oauth_access_token_url(token))
fut.add_done_callback(functools.partial(self._on_access_token, callback))
fut.add_done_callback(wrap(functools.partial(self._on_access_token, callback)))
def _oauth_request_token_url(self, callback_uri=None, extra_params=None):
consumer_token = self._oauth_consumer_token()
@ -516,7 +515,7 @@ class OAuthMixin(object):
fut = self._oauth_get_user_future(access_token)
fut = gen.convert_yielded(fut)
fut.add_done_callback(
functools.partial(self._on_oauth_get_user, access_token, future))
wrap(functools.partial(self._on_oauth_get_user, access_token, future)))
def _oauth_consumer_token(self):
"""Subclasses must override this to return their OAuth consumer keys.
@ -525,7 +524,7 @@ class OAuthMixin(object):
"""
raise NotImplementedError()
@return_future
@_non_deprecated_return_future
def _oauth_get_user_future(self, access_token, callback):
"""Subclasses must override this to get basic information about the
user.
@ -618,7 +617,7 @@ class OAuth2Mixin(object):
* ``_OAUTH_AUTHORIZE_URL``: The service's authorization url.
* ``_OAUTH_ACCESS_TOKEN_URL``: The service's access token url.
"""
@return_future
@_non_deprecated_return_future
def authorize_redirect(self, redirect_uri=None, client_id=None,
client_secret=None, extra_params=None,
callback=None, scope=None, response_type="code"):
@ -683,16 +682,15 @@ class OAuth2Mixin(object):
class MainHandler(tornado.web.RequestHandler,
tornado.auth.FacebookGraphMixin):
@tornado.web.authenticated
@tornado.gen.coroutine
def get(self):
new_entry = yield self.oauth2_request(
async def get(self):
new_entry = await self.oauth2_request(
"https://graph.facebook.com/me/feed",
post_args={"message": "I am posting from my Tornado application!"},
access_token=self.current_user["access_token"])
if not new_entry:
# Call failed; perhaps missing permission?
yield self.authorize_redirect()
await self.authorize_redirect()
return
self.finish("Posted a message!")
@ -713,7 +711,7 @@ class OAuth2Mixin(object):
if all_args:
url += "?" + urllib_parse.urlencode(all_args)
callback = functools.partial(self._on_oauth2_request, callback)
callback = wrap(functools.partial(self._on_oauth2_request, callback))
http = self.get_auth_http_client()
if post_args is not None:
fut = http.fetch(url, method="POST", body=urllib_parse.urlencode(post_args))
@ -758,13 +756,12 @@ class TwitterMixin(OAuthMixin):
class TwitterLoginHandler(tornado.web.RequestHandler,
tornado.auth.TwitterMixin):
@tornado.gen.coroutine
def get(self):
async def get(self):
if self.get_argument("oauth_token", None):
user = yield self.get_authenticated_user()
user = await self.get_authenticated_user()
# Save the user using e.g. set_secure_cookie()
else:
yield self.authorize_redirect()
await self.authorize_redirect()
.. testoutput::
:hide:
@ -781,7 +778,7 @@ class TwitterMixin(OAuthMixin):
_OAUTH_NO_CALLBACKS = False
_TWITTER_BASE_URL = "https://api.twitter.com/1.1"
@return_future
@_non_deprecated_return_future
def authenticate_redirect(self, callback_uri=None, callback=None):
"""Just like `~OAuthMixin.authorize_redirect`, but
auto-redirects if authorized.
@ -799,10 +796,10 @@ class TwitterMixin(OAuthMixin):
Use the returned awaitable object instead.
"""
http = self.get_auth_http_client()
http.fetch(self._oauth_request_token_url(callback_uri=callback_uri),
functools.partial(
fut = http.fetch(self._oauth_request_token_url(callback_uri=callback_uri))
fut.add_done_callback(wrap(functools.partial(
self._on_request_token, self._OAUTH_AUTHENTICATE_URL,
None, callback))
None, callback)))
@_auth_return_future
def twitter_request(self, path, callback=None, access_token=None,
@ -829,9 +826,8 @@ class TwitterMixin(OAuthMixin):
class MainHandler(tornado.web.RequestHandler,
tornado.auth.TwitterMixin):
@tornado.web.authenticated
@tornado.gen.coroutine
def get(self):
new_entry = yield self.twitter_request(
async def get(self):
new_entry = await self.twitter_request(
"/statuses/update",
post_args={"status": "Testing Tornado Web Server"},
access_token=self.current_user["access_token"])
@ -867,7 +863,7 @@ class TwitterMixin(OAuthMixin):
if args:
url += "?" + urllib_parse.urlencode(args)
http = self.get_auth_http_client()
http_callback = functools.partial(self._on_twitter_request, callback, url)
http_callback = wrap(functools.partial(self._on_twitter_request, callback, url))
if post_args is not None:
fut = http.fetch(url, method="POST", body=urllib_parse.urlencode(post_args))
else:
@ -942,19 +938,18 @@ class GoogleOAuth2Mixin(OAuth2Mixin):
class GoogleOAuth2LoginHandler(tornado.web.RequestHandler,
tornado.auth.GoogleOAuth2Mixin):
@tornado.gen.coroutine
def get(self):
async def get(self):
if self.get_argument('code', False):
access = yield self.get_authenticated_user(
access = await self.get_authenticated_user(
redirect_uri='http://your.site.com/auth/google',
code=self.get_argument('code'))
user = yield self.oauth2_request(
user = await self.oauth2_request(
"https://www.googleapis.com/oauth2/v1/userinfo",
access_token=access["access_token"])
# Save the user and access token with
# e.g. set_secure_cookie.
else:
yield self.authorize_redirect(
await self.authorize_redirect(
redirect_uri='http://your.site.com/auth/google',
client_id=self.settings['google_oauth']['key'],
scope=['profile', 'email'],
@ -982,7 +977,7 @@ class GoogleOAuth2Mixin(OAuth2Mixin):
method="POST",
headers={'Content-Type': 'application/x-www-form-urlencoded'},
body=body)
fut.add_done_callback(functools.partial(self._on_access_token, callback))
fut.add_done_callback(wrap(functools.partial(self._on_access_token, callback)))
def _on_access_token(self, future, response_fut):
"""Callback function for the exchange to the access token."""
@ -1014,17 +1009,16 @@ class FacebookGraphMixin(OAuth2Mixin):
class FacebookGraphLoginHandler(tornado.web.RequestHandler,
tornado.auth.FacebookGraphMixin):
@tornado.gen.coroutine
def get(self):
async def get(self):
if self.get_argument("code", False):
user = yield self.get_authenticated_user(
user = await self.get_authenticated_user(
redirect_uri='/auth/facebookgraph/',
client_id=self.settings["facebook_api_key"],
client_secret=self.settings["facebook_secret"],
code=self.get_argument("code"))
# Save the user with e.g. set_secure_cookie
else:
yield self.authorize_redirect(
await self.authorize_redirect(
redirect_uri='/auth/facebookgraph/',
client_id=self.settings["facebook_api_key"],
extra_params={"scope": "read_stream,offline_access"})
@ -1067,8 +1061,8 @@ class FacebookGraphMixin(OAuth2Mixin):
fields.update(extra_fields)
fut = http.fetch(self._oauth_request_token_url(**args))
fut.add_done_callback(functools.partial(self._on_access_token, redirect_uri, client_id,
client_secret, callback, fields))
fut.add_done_callback(wrap(functools.partial(self._on_access_token, redirect_uri, client_id,
client_secret, callback, fields)))
@gen.coroutine
def _on_access_token(self, redirect_uri, client_id, client_secret,
@ -1134,9 +1128,8 @@ class FacebookGraphMixin(OAuth2Mixin):
class MainHandler(tornado.web.RequestHandler,
tornado.auth.FacebookGraphMixin):
@tornado.web.authenticated
@tornado.gen.coroutine
def get(self):
new_entry = yield self.facebook_request(
async def get(self):
new_entry = await self.facebook_request(
"/me/feed",
post_args={"message": "I am posting from my Tornado application!"},
access_token=self.current_user["access_token"])

View file

@ -107,6 +107,9 @@ _watched_files = set()
_reload_hooks = []
_reload_attempted = False
_io_loops = weakref.WeakKeyDictionary() # type: ignore
_autoreload_is_main = False
_original_argv = None
_original_spec = None
def start(check_time=500):
@ -214,11 +217,15 @@ def _reload():
# __spec__ is not available (Python < 3.4), check instead if
# sys.path[0] is an empty string and add the current directory to
# $PYTHONPATH.
spec = getattr(sys.modules['__main__'], '__spec__', None)
if spec:
argv = ['-m', spec.name] + sys.argv[1:]
if _autoreload_is_main:
spec = _original_spec
argv = _original_argv
else:
spec = getattr(sys.modules['__main__'], '__spec__', None)
argv = sys.argv
if spec:
argv = ['-m', spec.name] + argv[1:]
else:
path_prefix = '.' + os.pathsep
if (sys.path[0] == '' and
not os.environ.get("PYTHONPATH", "").startswith(path_prefix)):
@ -226,7 +233,7 @@ def _reload():
os.environ.get("PYTHONPATH", ""))
if not _has_execv:
subprocess.Popen([sys.executable] + argv)
sys.exit(0)
os._exit(0)
else:
try:
os.execv(sys.executable, [sys.executable] + argv)
@ -269,7 +276,17 @@ def main():
can catch import-time problems like syntax errors that would otherwise
prevent the script from reaching its call to `wait`.
"""
# Remember that we were launched with autoreload as main.
# The main module can be tricky; set the variables both in our globals
# (which may be __main__) and the real importable version.
import tornado.autoreload
global _autoreload_is_main
global _original_argv, _original_spec
tornado.autoreload._autoreload_is_main = _autoreload_is_main = True
original_argv = sys.argv
tornado.autoreload._original_argv = _original_argv = original_argv
original_spec = getattr(sys.modules['__main__'], '__spec__', None)
tornado.autoreload._original_spec = _original_spec = original_spec
sys.argv = sys.argv[:]
if len(sys.argv) >= 3 and sys.argv[1] == "-m":
mode = "module"

View file

@ -483,7 +483,7 @@ def return_future(f):
If no callback is given, the caller should use the ``Future`` to
wait for the function to complete (perhaps by yielding it in a
`.gen.engine` function, or passing it to `.IOLoop.add_future`).
coroutine, or passing it to `.IOLoop.add_future`).
Usage:
@ -494,10 +494,8 @@ def return_future(f):
# Do stuff (possibly asynchronous)
callback(result)
@gen.engine
def caller(callback):
yield future_func(arg1, arg2)
callback()
async def caller():
await future_func(arg1, arg2)
..
@ -512,9 +510,22 @@ def return_future(f):
.. deprecated:: 5.1
New code should use coroutines directly instead of wrapping
callback-based code with this decorator.
This decorator will be removed in Tornado 6.0. New code should
use coroutines directly instead of wrapping callback-based code
with this decorator. Interactions with non-Tornado
callback-based code should be managed explicitly to avoid
relying on the `.ExceptionStackContext` built into this
decorator.
"""
warnings.warn("@return_future is deprecated, use coroutines instead",
DeprecationWarning)
return _non_deprecated_return_future(f, warn=True)
def _non_deprecated_return_future(f, warn=False):
# Allow auth.py to use this decorator without triggering
# deprecation warnings. This will go away once auth.py has removed
# its legacy interfaces in 6.0.
replacer = ArgReplacer(f, 'callback')
@functools.wraps(f)
@ -528,7 +539,15 @@ def return_future(f):
future_set_exc_info(future, (typ, value, tb))
return True
exc_info = None
with ExceptionStackContext(handle_error):
esc = ExceptionStackContext(handle_error, delay_warning=True)
with esc:
if not warn:
# HACK: In non-deprecated mode (only used in auth.py),
# suppress the warning entirely. Since this is added
# in a 5.1 patch release and already removed in 6.0
# I'm prioritizing a minimial change instead of a
# clean solution.
esc.delay_warning = False
try:
result = f(*args, **kwargs)
if result is not None:

View file

@ -80,7 +80,7 @@ class CurlAsyncHTTPClient(AsyncHTTPClient):
self._multi = None
def fetch_impl(self, request, callback):
self._requests.append((request, callback))
self._requests.append((request, callback, self.io_loop.time()))
self._process_queue()
self._set_timeout(0)
@ -205,13 +205,15 @@ class CurlAsyncHTTPClient(AsyncHTTPClient):
while self._free_list and self._requests:
started += 1
curl = self._free_list.pop()
(request, callback) = self._requests.popleft()
(request, callback, queue_start_time) = self._requests.popleft()
curl.info = {
"headers": httputil.HTTPHeaders(),
"buffer": BytesIO(),
"request": request,
"callback": callback,
"queue_start_time": queue_start_time,
"curl_start_time": time.time(),
"curl_start_ioloop_time": self.io_loop.current().time(),
}
try:
self._curl_setup_request(
@ -257,7 +259,7 @@ class CurlAsyncHTTPClient(AsyncHTTPClient):
# the various curl timings are documented at
# http://curl.haxx.se/libcurl/c/curl_easy_getinfo.html
time_info = dict(
queue=info["curl_start_time"] - info["request"].start_time,
queue=info["curl_start_ioloop_time"] - info["queue_start_time"],
namelookup=curl.getinfo(pycurl.NAMELOOKUP_TIME),
connect=curl.getinfo(pycurl.CONNECT_TIME),
appconnect=curl.getinfo(pycurl.APPCONNECT_TIME),
@ -271,7 +273,8 @@ class CurlAsyncHTTPClient(AsyncHTTPClient):
request=info["request"], code=code, headers=info["headers"],
buffer=buffer, effective_url=effective_url, error=error,
reason=info['headers'].get("X-Http-Reason", None),
request_time=time.time() - info["curl_start_time"],
request_time=self.io_loop.time() - info["curl_start_ioloop_time"],
start_time=info["curl_start_time"],
time_info=time_info))
except Exception:
self.handle_callback_exception(info["callback"])
@ -319,17 +322,7 @@ class CurlAsyncHTTPClient(AsyncHTTPClient):
self.io_loop.add_callback(request.streaming_callback, chunk)
else:
write_function = buffer.write
if bytes is str: # py2
curl.setopt(pycurl.WRITEFUNCTION, write_function)
else: # py3
# Upstream pycurl doesn't support py3, but ubuntu 12.10 includes
# a fork/port. That version has a bug in which it passes unicode
# strings instead of bytes to the WRITEFUNCTION. This means that
# if you use a WRITEFUNCTION (which tornado always does), you cannot
# download arbitrary binary data. This needs to be fixed in the
# ported pycurl package, but in the meantime this lambda will
# make it work for downloading (utf8) text.
curl.setopt(pycurl.WRITEFUNCTION, lambda s: write_function(utf8(s)))
curl.setopt(pycurl.FOLLOWLOCATION, request.follow_redirects)
curl.setopt(pycurl.MAXREDIRS, request.max_redirects)
curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(1000 * request.connect_timeout))
@ -348,7 +341,7 @@ class CurlAsyncHTTPClient(AsyncHTTPClient):
curl.setopt(pycurl.PROXY, request.proxy_host)
curl.setopt(pycurl.PROXYPORT, request.proxy_port)
if request.proxy_username:
credentials = '%s:%s' % (request.proxy_username,
credentials = httputil.encode_username_password(request.proxy_username,
request.proxy_password)
curl.setopt(pycurl.PROXYUSERPWD, credentials)
@ -441,8 +434,6 @@ class CurlAsyncHTTPClient(AsyncHTTPClient):
curl.setopt(pycurl.INFILESIZE, len(request.body or ''))
if request.auth_username is not None:
userpwd = "%s:%s" % (request.auth_username, request.auth_password or '')
if request.auth_mode is None or request.auth_mode == "basic":
curl.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC)
elif request.auth_mode == "digest":
@ -450,7 +441,9 @@ class CurlAsyncHTTPClient(AsyncHTTPClient):
else:
raise ValueError("Unsupported auth_mode %s" % request.auth_mode)
curl.setopt(pycurl.USERPWD, native_str(userpwd))
userpwd = httputil.encode_username_password(request.auth_username,
request.auth_password)
curl.setopt(pycurl.USERPWD, userpwd)
curl_log.debug("%s %s (username: %r)", request.method, request.url,
request.auth_username)
else:

View file

@ -17,7 +17,7 @@ environment than chaining callbacks. Code using coroutines is
technically asynchronous, but it is written as a single generator
instead of a collection of separate functions.
For example, the following asynchronous handler:
For example, the following callback-based asynchronous handler:
.. testcode::

View file

@ -21,6 +21,7 @@
from __future__ import absolute_import, division, print_function
import re
import warnings
from tornado.concurrent import (Future, future_add_done_callback,
future_set_result_unless_cancelled)
@ -277,8 +278,14 @@ class HTTP1Connection(httputil.HTTPConnection):
def set_close_callback(self, callback):
"""Sets a callback that will be run when the connection is closed.
.. deprecated:: 4.0
Use `.HTTPMessageDelegate.on_connection_close` instead.
Note that this callback is slightly different from
`.HTTPMessageDelegate.on_connection_close`: The
`.HTTPMessageDelegate` method is called when the connection is
closed while recieving a message. This callback is used when
there is not an active delegate (for example, on the server
side this callback is used if the client closes the connection
after sending its request but before receiving all the
response.
"""
self._close_callback = stack_context.wrap(callback)
@ -395,6 +402,8 @@ class HTTP1Connection(httputil.HTTPConnection):
future.exception()
else:
if callback is not None:
warnings.warn("callback argument is deprecated, use returned Future instead",
DeprecationWarning)
self._write_callback = stack_context.wrap(callback)
else:
future = self._write_future = Future()
@ -402,7 +411,7 @@ class HTTP1Connection(httputil.HTTPConnection):
if chunk:
data += self._format_chunk(chunk)
self._pending_write = self.stream.write(data)
self._pending_write.add_done_callback(self._on_write_complete)
future_add_done_callback(self._pending_write, self._on_write_complete)
return future
def _format_chunk(self, chunk):
@ -434,6 +443,8 @@ class HTTP1Connection(httputil.HTTPConnection):
self._write_future.exception()
else:
if callback is not None:
warnings.warn("callback argument is deprecated, use returned Future instead",
DeprecationWarning)
self._write_callback = stack_context.wrap(callback)
else:
future = self._write_future = Future()

View file

@ -125,15 +125,15 @@ class AsyncHTTPClient(Configurable):
Example usage::
def handle_response(response):
if response.error:
print("Error: %s" % response.error)
async def f():
http_client = AsyncHTTPClient()
try:
response = await http_client.fetch("http://www.google.com")
except Exception as e:
print("Error: %s" % e)
else:
print(response.body)
http_client = AsyncHTTPClient()
http_client.fetch("http://www.google.com/", handle_response)
The constructor for this class is magic in several respects: It
actually creates an instance of an implementation-specific
subclass, and instances are reused as a kind of pseudo-singleton
@ -578,17 +578,35 @@ class HTTPResponse(object):
* error: Exception object, if any
* request_time: seconds from request start to finish
* request_time: seconds from request start to finish. Includes all network
operations from DNS resolution to receiving the last byte of data.
Does not include time spent in the queue (due to the ``max_clients`` option).
If redirects were followed, only includes the final request.
* start_time: Time at which the HTTP operation started, based on `time.time`
(not the monotonic clock used by `.IOLoop.time`). May be ``None`` if the request
timed out while in the queue.
* time_info: dictionary of diagnostic timing information from the request.
Available data are subject to change, but currently uses timings
available from http://curl.haxx.se/libcurl/c/curl_easy_getinfo.html,
plus ``queue``, which is the delay (if any) introduced by waiting for
a slot under `AsyncHTTPClient`'s ``max_clients`` setting.
.. versionadded:: 5.1
Added the ``start_time`` attribute.
.. versionchanged:: 5.1
The ``request_time`` attribute previously included time spent in the queue
for ``simple_httpclient``, but not in ``curl_httpclient``. Now queueing time
is excluded in both implementations. ``request_time`` is now more accurate for
``curl_httpclient`` because it uses a monotonic clock when available.
"""
def __init__(self, request, code, headers=None, buffer=None,
effective_url=None, error=None, request_time=None,
time_info=None, reason=None):
time_info=None, reason=None, start_time=None):
if isinstance(request, _RequestProxy):
self.request = request.request
else:
@ -615,6 +633,7 @@ class HTTPResponse(object):
self.error = None
else:
self.error = error
self.start_time = start_time
self.request_time = request_time
self.time_info = time_info or {}

View file

@ -29,10 +29,12 @@ import email.utils
import numbers
import re
import time
import unicodedata
import warnings
from tornado.escape import native_str, parse_qs_bytes, utf8
from tornado.log import gen_log
from tornado.util import ObjectDict, PY3
from tornado.util import ObjectDict, PY3, unicode_type
if PY3:
import http.cookies as Cookie
@ -380,10 +382,15 @@ class HTTPServerRequest(object):
"""Returns True if this request supports HTTP/1.1 semantics.
.. deprecated:: 4.0
Applications are less likely to need this information with the
introduction of `.HTTPConnection`. If you still need it, access
the ``version`` attribute directly.
Applications are less likely to need this information with
the introduction of `.HTTPConnection`. If you still need
it, access the ``version`` attribute directly. This method
will be removed in Tornado 6.0.
"""
warnings.warn("supports_http_1_1() is deprecated, use request.version instead",
DeprecationWarning)
return self.version == "HTTP/1.1"
@property
@ -412,8 +419,10 @@ class HTTPServerRequest(object):
.. deprecated:: 4.0
Use ``request.connection`` and the `.HTTPConnection` methods
to write the response.
to write the response. This method will be removed in Tornado 6.0.
"""
warnings.warn("req.write deprecated, use req.connection.write and write_headers instead",
DeprecationWarning)
assert isinstance(chunk, bytes)
assert self.version.startswith("HTTP/1."), \
"deprecated interface only supported in HTTP/1.x"
@ -424,8 +433,10 @@ class HTTPServerRequest(object):
.. deprecated:: 4.0
Use ``request.connection`` and the `.HTTPConnection` methods
to write the response.
to write the response. This method will be removed in Tornado 6.0.
"""
warnings.warn("req.finish deprecated, use req.connection.finish instead",
DeprecationWarning)
self.connection.finish()
self._finish_time = time.time()
@ -581,6 +592,11 @@ class HTTPConnection(object):
The ``version`` field of ``start_line`` is ignored.
Returns a `.Future` if no callback is given.
.. deprecated:: 5.1
The ``callback`` argument is deprecated and will be removed
in Tornado 6.0.
"""
raise NotImplementedError()
@ -589,6 +605,11 @@ class HTTPConnection(object):
The callback will be run when the write is complete. If no callback
is given, returns a Future.
.. deprecated:: 5.1
The ``callback`` argument is deprecated and will be removed
in Tornado 6.0.
"""
raise NotImplementedError()
@ -762,6 +783,11 @@ def parse_multipart_form_data(boundary, data, arguments, files):
The ``boundary`` and ``data`` parameters are both byte strings.
The dictionaries given in the arguments and files parameters
will be updated with the contents of the body.
.. versionchanged:: 5.1
Now recognizes non-ASCII filenames in RFC 2231/5987
(``filename*=``) format.
"""
# The standard allows for the boundary to be quoted in the header,
# although it's rare (it happens at least for google app engine
@ -870,7 +896,8 @@ def parse_response_start_line(line):
# The original 2.7 version of this code did not correctly support some
# combinations of semicolons and double quotes.
# It has also been modified to support valueless parameters as seen in
# websocket extension negotiations.
# websocket extension negotiations, and to support non-ascii values in
# RFC 2231/5987 format.
def _parseparam(s):
@ -887,25 +914,37 @@ def _parseparam(s):
def _parse_header(line):
"""Parse a Content-type like header.
r"""Parse a Content-type like header.
Return the main content-type and a dictionary of options.
>>> d = "form-data; foo=\"b\\\\a\\\"r\"; file*=utf-8''T%C3%A4st"
>>> ct, d = _parse_header(d)
>>> ct
'form-data'
>>> d['file'] == r'T\u00e4st'.encode('ascii').decode('unicode_escape')
True
>>> d['foo']
'b\\a"r'
"""
parts = _parseparam(';' + line)
key = next(parts)
pdict = {}
# decode_params treats first argument special, but we already stripped key
params = [('Dummy', 'value')]
for p in parts:
i = p.find('=')
if i >= 0:
name = p[:i].strip().lower()
value = p[i + 1:].strip()
if len(value) >= 2 and value[0] == value[-1] == '"':
params.append((name, native_str(value)))
params = email.utils.decode_params(params)
params.pop(0) # get rid of the dummy again
pdict = {}
for name, value in params:
value = email.utils.collapse_rfc2231_value(value)
if len(value) >= 2 and value[0] == '"' and value[-1] == '"':
value = value[1:-1]
value = value.replace('\\\\', '\\').replace('\\"', '"')
pdict[name] = value
else:
pdict[p] = None
return key, pdict
@ -929,6 +968,20 @@ def _encode_header(key, pdict):
return '; '.join(out)
def encode_username_password(username, password):
"""Encodes a username/password pair in the format used by HTTP auth.
The return value is a byte string in the form ``username:password``.
.. versionadded:: 5.1
"""
if isinstance(username, unicode_type):
username = unicodedata.normalize('NFC', username)
if isinstance(password, unicode_type):
password = unicodedata.normalize('NFC', password)
return utf8(username) + b":" + utf8(password)
def doctests():
import doctest
return doctest.DocTestSuite()

View file

@ -101,13 +101,11 @@ class IOLoop(Configurable):
import socket
import tornado.ioloop
from tornado import gen
from tornado.iostream import IOStream
@gen.coroutine
def handle_connection(connection, address):
async def handle_connection(connection, address):
stream = IOStream(connection)
message = yield stream.read_until_close()
message = await stream.read_until_close()
print("message from client:", message.decode().strip())
def connection_ready(sock, fd, events):
@ -119,7 +117,8 @@ class IOLoop(Configurable):
raise
return
connection.setblocking(0)
handle_connection(connection, address)
io_loop = tornado.ioloop.IOLoop.current()
io_loop.spawn_callback(handle_connection, connection, address)
if __name__ == '__main__':
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
@ -441,7 +440,8 @@ class IOLoop(Configurable):
.. deprecated:: 5.0
Not implemented on the `asyncio` event loop. Use the environment
variable ``PYTHONASYNCIODEBUG=1`` instead.
variable ``PYTHONASYNCIODEBUG=1`` instead. This method will be
removed in Tornado 6.0.
"""
raise NotImplementedError()
@ -455,7 +455,8 @@ class IOLoop(Configurable):
.. deprecated:: 5.0
Not implemented on the `asyncio` event loop. Use the environment
variable ``PYTHONASYNCIODEBUG=1`` instead.
variable ``PYTHONASYNCIODEBUG=1`` instead. This method will be
removed in Tornado 6.0.
"""
self.set_blocking_signal_threshold(seconds, self.log_stack)
@ -463,6 +464,10 @@ class IOLoop(Configurable):
"""Signal handler to log the stack trace of the current thread.
For use with `set_blocking_signal_threshold`.
.. deprecated:: 5.1
This method will be removed in Tornado 6.0.
"""
gen_log.warning('IOLoop blocked for %f seconds in\n%s',
self._blocking_signal_threshold,
@ -498,17 +503,6 @@ class IOLoop(Configurable):
If the event loop is not currently running, the next call to `start()`
will return immediately.
To use asynchronous methods from otherwise-synchronous code (such as
unit tests), you can start and stop the event loop like this::
ioloop = IOLoop()
async_method(ioloop=ioloop, callback=ioloop.stop)
ioloop.start()
``ioloop.start()`` will return after ``async_method`` has run
its callback, whether that callback was invoked before or
after ``ioloop.start``.
Note that even after `stop` has been called, the `IOLoop` is not
completely stopped until `IOLoop.start` has also returned.
Some work that was scheduled before the call to `stop` may still
@ -519,10 +513,10 @@ class IOLoop(Configurable):
def run_sync(self, func, timeout=None):
"""Starts the `IOLoop`, runs the given function, and stops the loop.
The function must return either a yieldable object or
``None``. If the function returns a yieldable object, the
`IOLoop` will run until the yieldable is resolved (and
`run_sync()` will return the yieldable's result). If it raises
The function must return either an awaitable object or
``None``. If the function returns an awaitable object, the
`IOLoop` will run until the awaitable is resolved (and
`run_sync()` will return the awaitable's result). If it raises
an exception, the `IOLoop` will stop and the exception will be
re-raised to the caller.
@ -530,21 +524,21 @@ class IOLoop(Configurable):
a maximum duration for the function. If the timeout expires,
a `tornado.util.TimeoutError` is raised.
This method is useful in conjunction with `tornado.gen.coroutine`
to allow asynchronous calls in a ``main()`` function::
This method is useful to allow asynchronous calls in a
``main()`` function::
@gen.coroutine
def main():
async def main():
# do stuff...
if __name__ == '__main__':
IOLoop.current().run_sync(main)
.. versionchanged:: 4.3
Returning a non-``None``, non-yieldable value is now an error.
Returning a non-``None``, non-awaitable value is now an error.
.. versionchanged:: 5.0
If a timeout occurs, the ``func`` coroutine will be cancelled.
"""
future_cell = [None]
@ -714,6 +708,10 @@ class IOLoop(Configurable):
The callback is invoked with one argument, the
`.Future`.
This method only accepts `.Future` objects and not other
awaitables (unlike most of Tornado where the two are
interchangeable).
"""
assert is_future(future)
callback = stack_context.wrap(callback)
@ -789,6 +787,16 @@ class IOLoop(Configurable):
The exception itself is not passed explicitly, but is available
in `sys.exc_info`.
.. versionchanged:: 5.0
When the `asyncio` event loop is used (which is now the
default on Python 3), some callback errors will be handled by
`asyncio` instead of this method.
.. deprecated: 5.1
Support for this method will be removed in Tornado 6.0.
"""
app_log.error("Exception in callback %r", callback, exc_info=True)

View file

@ -33,6 +33,7 @@ import os
import socket
import sys
import re
import warnings
from tornado.concurrent import Future
from tornado import ioloop
@ -342,6 +343,12 @@ class BaseIOStream(object):
.. versionchanged:: 4.0
Added the ``max_bytes`` argument. The ``callback`` argument is
now optional and a `.Future` will be returned if it is omitted.
.. deprecated:: 5.1
The ``callback`` argument is deprecated and will be removed
in Tornado 6.0. Use the returned `.Future` instead.
"""
future = self._set_read_callback(callback)
self._read_regex = re.compile(regex)
@ -375,6 +382,11 @@ class BaseIOStream(object):
.. versionchanged:: 4.0
Added the ``max_bytes`` argument. The ``callback`` argument is
now optional and a `.Future` will be returned if it is omitted.
.. deprecated:: 5.1
The ``callback`` argument is deprecated and will be removed
in Tornado 6.0. Use the returned `.Future` instead.
"""
future = self._set_read_callback(callback)
self._read_delimiter = delimiter
@ -408,11 +420,22 @@ class BaseIOStream(object):
.. versionchanged:: 4.0
Added the ``partial`` argument. The callback argument is now
optional and a `.Future` will be returned if it is omitted.
.. deprecated:: 5.1
The ``callback`` and ``streaming_callback`` arguments are
deprecated and will be removed in Tornado 6.0. Use the
returned `.Future` (and ``partial=True`` for
``streaming_callback``) instead.
"""
future = self._set_read_callback(callback)
assert isinstance(num_bytes, numbers.Integral)
self._read_bytes = num_bytes
self._read_partial = partial
if streaming_callback is not None:
warnings.warn("streaming_callback is deprecated, use partial instead",
DeprecationWarning)
self._streaming_callback = stack_context.wrap(streaming_callback)
try:
self._try_inline_read()
@ -434,6 +457,12 @@ class BaseIOStream(object):
entirely filled with read data.
.. versionadded:: 5.0
.. deprecated:: 5.1
The ``callback`` argument is deprecated and will be removed
in Tornado 6.0. Use the returned `.Future` instead.
"""
future = self._set_read_callback(callback)
@ -485,8 +514,18 @@ class BaseIOStream(object):
The callback argument is now optional and a `.Future` will
be returned if it is omitted.
.. deprecated:: 5.1
The ``callback`` and ``streaming_callback`` arguments are
deprecated and will be removed in Tornado 6.0. Use the
returned `.Future` (and `read_bytes` with ``partial=True``
for ``streaming_callback``) instead.
"""
future = self._set_read_callback(callback)
if streaming_callback is not None:
warnings.warn("streaming_callback is deprecated, use read_bytes(partial=True) instead",
DeprecationWarning)
self._streaming_callback = stack_context.wrap(streaming_callback)
if self.closed():
if self._streaming_callback is not None:
@ -521,6 +560,12 @@ class BaseIOStream(object):
.. versionchanged:: 4.5
Added support for `memoryview` arguments.
.. deprecated:: 5.1
The ``callback`` argument is deprecated and will be removed
in Tornado 6.0. Use the returned `.Future` instead.
"""
self._check_closed()
if data:
@ -530,6 +575,8 @@ class BaseIOStream(object):
self._write_buffer.append(data)
self._total_write_index += len(data)
if callback is not None:
warnings.warn("callback argument is deprecated, use returned Future instead",
DeprecationWarning)
self._write_callback = stack_context.wrap(callback)
future = None
else:
@ -546,9 +593,14 @@ class BaseIOStream(object):
def set_close_callback(self, callback):
"""Call the given callback when the stream is closed.
This is not necessary for applications that use the `.Future`
interface; all outstanding ``Futures`` will resolve with a
`StreamClosedError` when the stream is closed.
This mostly is not necessary for applications that use the
`.Future` interface; all outstanding ``Futures`` will resolve
with a `StreamClosedError` when the stream is closed. However,
it is still useful as a way to signal that the stream has been
closed while no other read or write is in progress.
Unlike other callback-based interfaces, ``set_close_callback``
will not be removed in Tornado 6.0.
"""
self._close_callback = stack_context.wrap(callback)
self._maybe_add_error_listener()
@ -808,6 +860,8 @@ class BaseIOStream(object):
assert self._read_callback is None, "Already reading"
assert self._read_future is None, "Already reading"
if callback is not None:
warnings.warn("callbacks are deprecated, use returned Future instead",
DeprecationWarning)
self._read_callback = stack_context.wrap(callback)
else:
self._read_future = Future()
@ -1137,24 +1191,23 @@ class IOStream(BaseIOStream):
import tornado.iostream
import socket
def send_request():
stream.write(b"GET / HTTP/1.0\r\nHost: friendfeed.com\r\n\r\n")
stream.read_until(b"\r\n\r\n", on_headers)
def on_headers(data):
async def main():
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
stream = tornado.iostream.IOStream(s)
await stream.connect(("friendfeed.com", 80))
await stream.write(b"GET / HTTP/1.0\r\nHost: friendfeed.com\r\n\r\n")
header_data = await stream.read_until(b"\r\n\r\n")
headers = {}
for line in data.split(b"\r\n"):
for line in header_data.split(b"\r\n"):
parts = line.split(b":")
if len(parts) == 2:
headers[parts[0].strip()] = parts[1].strip()
stream.read_bytes(int(headers[b"Content-Length"]), on_body)
def on_body(data):
print(data)
body_data = await stream.read_bytes(int(headers[b"Content-Length"]))
print(body_data)
stream.close()
tornado.ioloop.IOLoop.current().stop()
if __name__ == '__main__':
tornado.ioloop.IOLoop.current().run_sync(main)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
stream = tornado.iostream.IOStream(s)
stream.connect(("friendfeed.com", 80), send_request)
@ -1238,9 +1291,17 @@ class IOStream(BaseIOStream):
``ssl_options=dict(cert_reqs=ssl.CERT_NONE)`` or a
suitably-configured `ssl.SSLContext` to the
`SSLIOStream` constructor to disable.
.. deprecated:: 5.1
The ``callback`` argument is deprecated and will be removed
in Tornado 6.0. Use the returned `.Future` instead.
"""
self._connecting = True
if callback is not None:
warnings.warn("callback argument is deprecated, use returned Future instead",
DeprecationWarning)
self._connect_callback = stack_context.wrap(callback)
future = None
else:
@ -1350,7 +1411,13 @@ class IOStream(BaseIOStream):
return future
def _handle_connect(self):
try:
err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
except socket.error as e:
# Hurd doesn't allow SO_ERROR for loopback sockets because all
# errors for such sockets are reported synchronously.
if errno_from_exception(e) == errno.ENOPROTOOPT:
err = 0
if err != 0:
self.error = socket.error(err, os.strerror(err))
# IOLoop implementations may vary: some of them return
@ -1524,9 +1591,13 @@ class SSLIOStream(IOStream):
def connect(self, address, callback=None, server_hostname=None):
self._server_hostname = server_hostname
# Pass a dummy callback to super.connect(), which is slightly
# more efficient than letting it return a Future we ignore.
super(SSLIOStream, self).connect(address, callback=lambda: None)
# Ignore the result of connect(). If it fails,
# wait_for_handshake will raise an error too. This is
# necessary for the old semantics of the connect callback
# (which takes no arguments). In 6.0 this can be refactored to
# be a regular coroutine.
fut = super(SSLIOStream, self).connect(address)
fut.add_done_callback(lambda f: f.exception())
return self.wait_for_handshake(callback)
def _handle_connect(self):
@ -1570,11 +1641,19 @@ class SSLIOStream(IOStream):
handshake to complete). It may only be called once per stream.
.. versionadded:: 4.2
.. deprecated:: 5.1
The ``callback`` argument is deprecated and will be removed
in Tornado 6.0. Use the returned `.Future` instead.
"""
if (self._ssl_connect_callback is not None or
self._ssl_connect_future is not None):
raise RuntimeError("Already waiting")
if callback is not None:
warnings.warn("callback argument is deprecated, use returned Future instead",
DeprecationWarning)
self._ssl_connect_callback = stack_context.wrap(callback)
future = None
else:

View file

@ -61,22 +61,19 @@ class Condition(_TimeoutGarbageCollector):
condition = Condition()
@gen.coroutine
def waiter():
async def waiter():
print("I'll wait right here")
yield condition.wait() # Yield a Future.
await condition.wait()
print("I'm done waiting")
@gen.coroutine
def notifier():
async def notifier():
print("About to notify")
condition.notify()
print("Done notifying")
@gen.coroutine
def runner():
# Yield two Futures; wait for waiter() and notifier() to finish.
yield [waiter(), notifier()]
async def runner():
# Wait for waiter() and notifier() in parallel
await gen.multi([waiter(), notifier()])
IOLoop.current().run_sync(runner)
@ -93,12 +90,12 @@ class Condition(_TimeoutGarbageCollector):
io_loop = IOLoop.current()
# Wait up to 1 second for a notification.
yield condition.wait(timeout=io_loop.time() + 1)
await condition.wait(timeout=io_loop.time() + 1)
...or a `datetime.timedelta` for a timeout relative to the current time::
# Wait up to 1 second.
yield condition.wait(timeout=datetime.timedelta(seconds=1))
await condition.wait(timeout=datetime.timedelta(seconds=1))
The method returns False if there's no notification before the deadline.
@ -170,22 +167,19 @@ class Event(object):
event = Event()
@gen.coroutine
def waiter():
async def waiter():
print("Waiting for event")
yield event.wait()
await event.wait()
print("Not waiting this time")
yield event.wait()
await event.wait()
print("Done")
@gen.coroutine
def setter():
async def setter():
print("About to set the event")
event.set()
@gen.coroutine
def runner():
yield [waiter(), setter()]
async def runner():
await gen.multi([waiter(), setter()])
IOLoop.current().run_sync(runner)
@ -290,12 +284,11 @@ class Semaphore(_TimeoutGarbageCollector):
# Ensure reliable doctest output: resolve Futures one at a time.
futures_q = deque([Future() for _ in range(3)])
@gen.coroutine
def simulator(futures):
async def simulator(futures):
for f in futures:
# simulate the asynchronous passage of time
yield gen.moment
yield gen.moment
await gen.sleep(0)
await gen.sleep(0)
f.set_result(None)
IOLoop.current().add_callback(simulator, list(futures_q))
@ -311,20 +304,18 @@ class Semaphore(_TimeoutGarbageCollector):
sem = Semaphore(2)
@gen.coroutine
def worker(worker_id):
yield sem.acquire()
async def worker(worker_id):
await sem.acquire()
try:
print("Worker %d is working" % worker_id)
yield use_some_resource()
await use_some_resource()
finally:
print("Worker %d is done" % worker_id)
sem.release()
@gen.coroutine
def runner():
async def runner():
# Join all workers.
yield [worker(i) for i in range(3)]
await gen.multi([worker(i) for i in range(3)])
IOLoop.current().run_sync(runner)
@ -340,7 +331,18 @@ class Semaphore(_TimeoutGarbageCollector):
Workers 0 and 1 are allowed to run concurrently, but worker 2 waits until
the semaphore has been released once, by worker 0.
`.acquire` is a context manager, so ``worker`` could be written as::
The semaphore can be used as an async context manager::
async def worker(worker_id):
async with sem:
print("Worker %d is working" % worker_id)
await use_some_resource()
# Now the semaphore has been released.
print("Worker %d is done" % worker_id)
For compatibility with older versions of Python, `.acquire` is a
context manager, so ``worker`` could also be written as::
@gen.coroutine
def worker(worker_id):
@ -351,19 +353,9 @@ class Semaphore(_TimeoutGarbageCollector):
# Now the semaphore has been released.
print("Worker %d is done" % worker_id)
In Python 3.5, the semaphore itself can be used as an async context
manager::
async def worker(worker_id):
async with sem:
print("Worker %d is working" % worker_id)
await use_some_resource()
# Now the semaphore has been released.
print("Worker %d is done" % worker_id)
.. versionchanged:: 4.3
Added ``async with`` support in Python 3.5.
"""
def __init__(self, value=1):
super(Semaphore, self).__init__()
@ -464,26 +456,24 @@ class Lock(object):
Releasing an unlocked lock raises `RuntimeError`.
`acquire` supports the context manager protocol in all Python versions:
A Lock can be used as an async context manager with the ``async
with`` statement:
>>> from tornado import gen, locks
>>> from tornado import locks
>>> lock = locks.Lock()
>>>
>>> @gen.coroutine
... def f():
... with (yield lock.acquire()):
>>> async def f():
... async with lock:
... # Do something holding the lock.
... pass
...
... # Now the lock is released.
In Python 3.5, `Lock` also supports the async context manager
protocol. Note that in this case there is no `acquire`, because
``async with`` includes both the ``yield`` and the ``acquire``
(just as it does with `threading.Lock`):
For compatibility with older versions of Python, the `.acquire`
method asynchronously returns a regular context manager:
>>> async def f2(): # doctest: +SKIP
... async with lock:
>>> async def f2():
... with (yield lock.acquire()):
... # Do something holding the lock.
... pass
...

View file

@ -138,7 +138,12 @@ def bind_sockets(port, address=None, family=socket.AF_UNSPEC,
raise
set_close_exec(sock.fileno())
if os.name != 'nt':
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
except socket.error as e:
if errno_from_exception(e) != errno.ENOPROTOOPT:
# Hurd doesn't support SO_REUSEADDR.
raise
if reuse_port:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
if af == socket.AF_INET6:
@ -180,7 +185,12 @@ if hasattr(socket, 'AF_UNIX'):
"""
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
set_close_exec(sock.fileno())
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
except socket.error as e:
if errno_from_exception(e) != errno.ENOPROTOOPT:
# Hurd doesn't support SO_REUSEADDR
raise
sock.setblocking(0)
try:
st = os.stat(file)

View file

@ -91,7 +91,6 @@ instances to define isolated sets of options, such as for subcommands.
options can be defined, set, and read with any mix of the two.
Dashes are typical for command-line usage while config files require
underscores.
"""
from __future__ import absolute_import, division, print_function
@ -326,18 +325,20 @@ class OptionParser(object):
the global namespace that matches a defined option will be
used to set that option's value.
Options are not parsed from strings as they would be on the
command line; they should be set to the correct type (this
means if you have ``datetime`` or ``timedelta`` options you
will need to import those modules in the config file.
Options may either be the specified type for the option or
strings (in which case they will be parsed the same way as in
`.parse_command_line`)
Example (using the options defined in the top-level docs of
this module)::
port = 80
mysql_host = 'mydb.example.com:3306'
# Both lists and comma-separated strings are allowed for
# multiple=True.
memcache_hosts = ['cache1.example.com:11011',
'cache2.example.com:11011']
memcache_hosts = 'cache1.example.com:11011,cache2.example.com:11011'
If ``final`` is ``False``, parse callbacks will not be run.
This is useful for applications that wish to combine configurations
@ -358,6 +359,9 @@ class OptionParser(object):
The special variable ``__file__`` is available inside config
files, specifying the absolute path to the config file itself.
.. versionchanged:: 5.1
Added the ability to set options via strings in config files.
"""
config = {'__file__': os.path.abspath(path)}
with open(path, 'rb') as f:
@ -365,7 +369,17 @@ class OptionParser(object):
for name in config:
normalized = self._normalize_name(name)
if normalized in self._options:
self._options[normalized].set(config[name])
option = self._options[normalized]
if option.multiple:
if not isinstance(config[name], (list, str)):
raise Error("Option %r is required to be a list of %s "
"or a comma-separated string" %
(option.name, option.type.__name__))
if type(config[name]) == str and option.type != str:
option.parse(config[name])
else:
option.set(config[name])
if final:
self.run_parse_callbacks()

View file

@ -62,8 +62,13 @@ class BaseAsyncIOLoop(IOLoop):
self.remove_handler(fd)
if all_fds:
self.close_fd(fileobj)
self.asyncio_loop.close()
# Remove the mapping before closing the asyncio loop. If this
# happened in the other order, we could race against another
# initialize() call which would see the closed asyncio loop,
# assume it was closed from the asyncio side, and do this
# cleanup for us, leading to a KeyError.
del IOLoop._ioloop_for_asyncio[self.asyncio_loop]
self.asyncio_loop.close()
def add_handler(self, fd, handler, events):
fd, fileobj = self.split_fd(fd)

View file

@ -124,6 +124,13 @@ class TornadoReactor(PosixReactorBase):
.. versionchanged:: 5.0
The ``io_loop`` argument (deprecated since version 4.1) has been removed.
.. deprecated:: 5.1
This class will be removed in Tornado 6.0. Use
``twisted.internet.asyncioreactor.AsyncioSelectorReactor``
instead.
"""
def __init__(self):
self._io_loop = tornado.ioloop.IOLoop.current()
@ -350,6 +357,10 @@ def install():
.. versionchanged:: 5.0
The ``io_loop`` argument (deprecated since version 4.1) has been removed.
.. deprecated:: 5.1
This functio will be removed in Tornado 6.0. Use
``twisted.internet.asyncioreactor.install`` instead.
"""
reactor = TornadoReactor()
from twisted.internet.main import installReactor # type: ignore
@ -411,6 +422,11 @@ class TwistedIOLoop(tornado.ioloop.IOLoop):
See also :meth:`tornado.ioloop.IOLoop.install` for general notes on
installing alternative IOLoops.
.. deprecated:: 5.1
The `asyncio` event loop will be the only available implementation in
Tornado 6.0.
"""
def initialize(self, reactor=None, **kwargs):
super(TwistedIOLoop, self).initialize(**kwargs)

View file

@ -79,28 +79,24 @@ class Queue(object):
q = Queue(maxsize=2)
@gen.coroutine
def consumer():
while True:
item = yield q.get()
async def consumer():
async for item in q:
try:
print('Doing work on %s' % item)
yield gen.sleep(0.01)
await gen.sleep(0.01)
finally:
q.task_done()
@gen.coroutine
def producer():
async def producer():
for item in range(5):
yield q.put(item)
await q.put(item)
print('Put %s' % item)
@gen.coroutine
def main():
async def main():
# Start consumer without waiting (since it never finishes).
IOLoop.current().spawn_callback(consumer)
yield producer() # Wait for producer to put all tasks.
yield q.join() # Wait for consumer to finish all tasks.
await producer() # Wait for producer to put all tasks.
await q.join() # Wait for consumer to finish all tasks.
print('Done')
IOLoop.current().run_sync(main)
@ -119,11 +115,14 @@ class Queue(object):
Doing work on 4
Done
In Python 3.5, `Queue` implements the async iterator protocol, so
``consumer()`` could be rewritten as::
async def consumer():
async for item in q:
In versions of Python without native coroutines (before 3.5),
``consumer()`` could be written as::
@gen.coroutine
def consumer():
while True:
item = yield q.get()
try:
print('Doing work on %s' % item)
yield gen.sleep(0.01)

View file

@ -1,6 +1,6 @@
from __future__ import absolute_import, division, print_function
from tornado.escape import utf8, _unicode
from tornado.escape import _unicode
from tornado import gen
from tornado.httpclient import HTTPResponse, HTTPError, AsyncHTTPClient, main, _RequestProxy
from tornado import httputil
@ -20,6 +20,7 @@ import functools
import re
import socket
import sys
import time
from io import BytesIO
@ -215,6 +216,7 @@ class _HTTPConnection(httputil.HTTPMessageDelegate):
max_header_size, max_body_size):
self.io_loop = IOLoop.current()
self.start_time = self.io_loop.time()
self.start_wall_time = time.time()
self.client = client
self.request = request
self.release_callback = release_callback
@ -230,7 +232,11 @@ class _HTTPConnection(httputil.HTTPMessageDelegate):
# Timeout handle returned by IOLoop.add_timeout
self._timeout = None
self._sockaddr = None
with stack_context.ExceptionStackContext(self._handle_exception):
IOLoop.current().add_callback(self.run)
@gen.coroutine
def run(self):
try:
self.parsed = urlparse.urlsplit(_unicode(self.request.url))
if self.parsed.scheme not in ("http", "https"):
raise ValueError("Unsupported url scheme: %s" %
@ -248,7 +254,7 @@ class _HTTPConnection(httputil.HTTPMessageDelegate):
host = host[1:-1]
self.parsed_hostname = host # save final host for _on_connect
if request.allow_ipv6 is False:
if self.request.allow_ipv6 is False:
af = socket.AF_INET
else:
af = socket.AF_UNSPEC
@ -260,56 +266,11 @@ class _HTTPConnection(httputil.HTTPMessageDelegate):
self._timeout = self.io_loop.add_timeout(
self.start_time + timeout,
stack_context.wrap(functools.partial(self._on_timeout, "while connecting")))
fut = self.tcp_client.connect(host, port, af=af,
stream = yield self.tcp_client.connect(
host, port, af=af,
ssl_options=ssl_options,
max_buffer_size=self.max_buffer_size)
fut.add_done_callback(stack_context.wrap(self._on_connect))
def _get_ssl_options(self, scheme):
if scheme == "https":
if self.request.ssl_options is not None:
return self.request.ssl_options
# If we are using the defaults, don't construct a
# new SSLContext.
if (self.request.validate_cert and
self.request.ca_certs is None and
self.request.client_cert is None and
self.request.client_key is None):
return _client_ssl_defaults
ssl_ctx = ssl.create_default_context(
ssl.Purpose.SERVER_AUTH,
cafile=self.request.ca_certs)
if not self.request.validate_cert:
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
if self.request.client_cert is not None:
ssl_ctx.load_cert_chain(self.request.client_cert,
self.request.client_key)
if hasattr(ssl, 'OP_NO_COMPRESSION'):
# See netutil.ssl_options_to_context
ssl_ctx.options |= ssl.OP_NO_COMPRESSION
return ssl_ctx
return None
def _on_timeout(self, info=None):
"""Timeout callback of _HTTPConnection instance.
Raise a `HTTPTimeoutError` when a timeout occurs.
:info string key: More detailed timeout information.
"""
self._timeout = None
error_message = "Timeout {0}".format(info) if info else "Timeout"
if self.final_callback is not None:
raise HTTPTimeoutError(error_message)
def _remove_timeout(self):
if self._timeout is not None:
self.io_loop.remove_timeout(self._timeout)
self._timeout = None
def _on_connect(self, stream_fut):
stream = stream_fut.result()
if self.final_callback is None:
# final_callback is cleared if we've hit our timeout.
stream.close()
@ -349,9 +310,9 @@ class _HTTPConnection(httputil.HTTPMessageDelegate):
if self.request.auth_mode not in (None, "basic"):
raise ValueError("unsupported auth_mode %s",
self.request.auth_mode)
auth = utf8(username) + b":" + utf8(password)
self.request.headers["Authorization"] = (b"Basic " +
base64.b64encode(auth))
self.request.headers["Authorization"] = (
b"Basic " + base64.b64encode(
httputil.encode_username_password(username, password)))
if self.request.user_agent:
self.request.headers["User-Agent"] = self.request.user_agent
if not self.request.allow_nonstandard_methods:
@ -386,9 +347,56 @@ class _HTTPConnection(httputil.HTTPMessageDelegate):
req_path, '')
self.connection.write_headers(start_line, self.request.headers)
if self.request.expect_100_continue:
self._read_response()
yield self.connection.read_response(self)
else:
self._write_body(True)
yield self._write_body(True)
except Exception:
if not self._handle_exception(*sys.exc_info()):
raise
def _get_ssl_options(self, scheme):
if scheme == "https":
if self.request.ssl_options is not None:
return self.request.ssl_options
# If we are using the defaults, don't construct a
# new SSLContext.
if (self.request.validate_cert and
self.request.ca_certs is None and
self.request.client_cert is None and
self.request.client_key is None):
return _client_ssl_defaults
ssl_ctx = ssl.create_default_context(
ssl.Purpose.SERVER_AUTH,
cafile=self.request.ca_certs)
if not self.request.validate_cert:
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
if self.request.client_cert is not None:
ssl_ctx.load_cert_chain(self.request.client_cert,
self.request.client_key)
if hasattr(ssl, 'OP_NO_COMPRESSION'):
# See netutil.ssl_options_to_context
ssl_ctx.options |= ssl.OP_NO_COMPRESSION
return ssl_ctx
return None
def _on_timeout(self, info=None):
"""Timeout callback of _HTTPConnection instance.
Raise a `HTTPTimeoutError` when a timeout occurs.
:info string key: More detailed timeout information.
"""
self._timeout = None
error_message = "Timeout {0}".format(info) if info else "Timeout"
if self.final_callback is not None:
self._handle_exception(HTTPTimeoutError, HTTPTimeoutError(error_message),
None)
def _remove_timeout(self):
if self._timeout is not None:
self.io_loop.remove_timeout(self._timeout)
self._timeout = None
def _create_connection(self, stream):
stream.set_nodelay(True)
@ -402,31 +410,21 @@ class _HTTPConnection(httputil.HTTPMessageDelegate):
self._sockaddr)
return connection
@gen.coroutine
def _write_body(self, start_read):
if self.request.body is not None:
self.connection.write(self.request.body)
elif self.request.body_producer is not None:
fut = self.request.body_producer(self.connection.write)
if fut is not None:
fut = gen.convert_yielded(fut)
def on_body_written(fut):
fut.result()
yield fut
self.connection.finish()
if start_read:
self._read_response()
self.io_loop.add_future(fut, on_body_written)
return
self.connection.finish()
if start_read:
self._read_response()
def _read_response(self):
# Ensure that any exception raised in read_response ends up in our
# stack context.
self.io_loop.add_future(
self.connection.read_response(self),
lambda f: f.result())
try:
yield self.connection.read_response(self)
except StreamClosedError:
if not self._handle_exception(*sys.exc_info()):
raise
def _release(self):
if self.release_callback is not None:
@ -451,6 +449,7 @@ class _HTTPConnection(httputil.HTTPMessageDelegate):
value = value.real_error
self._run_callback(HTTPResponse(self.request, 599, error=value,
request_time=self.io_loop.time() - self.start_time,
start_time=self.start_wall_time,
))
if hasattr(self, "stream"):
@ -543,6 +542,7 @@ class _HTTPConnection(httputil.HTTPMessageDelegate):
self.code, reason=getattr(self, 'reason', None),
headers=self.headers,
request_time=self.io_loop.time() - self.start_time,
start_time=self.start_wall_time,
buffer=buffer,
effective_url=self.request.url)
self._run_callback(response)

View file

@ -64,12 +64,18 @@ Here are a few rules of thumb for when it's necessary:
persist across asynchronous calls, create a new `StackContext` (or
`ExceptionStackContext`), and make your asynchronous calls in a ``with``
block that references your `StackContext`.
.. deprecated:: 5.1
The ``stack_context`` package is deprecated and will be removed in
Tornado 6.0.
"""
from __future__ import absolute_import, division, print_function
import sys
import threading
import warnings
from tornado.util import raise_exc_info
@ -107,6 +113,8 @@ class StackContext(object):
and not necessary in most applications.
"""
def __init__(self, context_factory):
warnings.warn("StackContext is deprecated and will be removed in Tornado 6.0",
DeprecationWarning)
self.context_factory = context_factory
self.contexts = []
self.active = True
@ -174,8 +182,20 @@ class ExceptionStackContext(object):
If the exception handler returns true, the exception will be
consumed and will not be propagated to other exception handlers.
.. versionadded:: 5.1
The ``delay_warning`` argument can be used to delay the emission
of DeprecationWarnings until an exception is caught by the
``ExceptionStackContext``, which facilitates certain transitional
use cases.
"""
def __init__(self, exception_handler):
def __init__(self, exception_handler, delay_warning=False):
self.delay_warning = delay_warning
if not self.delay_warning:
warnings.warn(
"StackContext is deprecated and will be removed in Tornado 6.0",
DeprecationWarning)
self.exception_handler = exception_handler
self.active = True
@ -184,6 +204,10 @@ class ExceptionStackContext(object):
def exit(self, type, value, traceback):
if type is not None:
if self.delay_warning:
warnings.warn(
"StackContext is deprecated and will be removed in Tornado 6.0",
DeprecationWarning)
return self.exception_handler(type, value, traceback)
def __enter__(self):

View file

@ -46,12 +46,11 @@ class TCPServer(object):
from tornado import gen
class EchoServer(TCPServer):
@gen.coroutine
def handle_stream(self, stream, address):
async def handle_stream(self, stream, address):
while True:
try:
data = yield stream.read_until(b"\n")
yield stream.write(data)
data = await stream.read_until(b"\n")
await stream.write(data)
except StreamClosedError:
break

View file

@ -145,14 +145,15 @@ class AsyncTestCase(unittest.TestCase):
The unittest framework is synchronous, so the test must be
complete by the time the test method returns. This means that
asynchronous code cannot be used in quite the same way as usual.
To write test functions that use the same ``yield``-based patterns
used with the `tornado.gen` module, decorate your test methods
with `tornado.testing.gen_test` instead of
`tornado.gen.coroutine`. This class also provides the `stop()`
and `wait()` methods for a more manual style of testing. The test
method itself must call ``self.wait()``, and asynchronous
callbacks should call ``self.stop()`` to signal completion.
asynchronous code cannot be used in quite the same way as usual
and must be adapted to fit. To write your tests with coroutines,
decorate your test methods with `tornado.testing.gen_test` instead
of `tornado.gen.coroutine`.
This class also provides the (deprecated) `stop()` and `wait()`
methods for a more manual style of testing. The test method itself
must call ``self.wait()``, and asynchronous callbacks should call
``self.stop()`` to signal completion.
By default, a new `.IOLoop` is constructed for each test and is available
as ``self.io_loop``. If the code being tested requires a
@ -183,22 +184,6 @@ class AsyncTestCase(unittest.TestCase):
response = self.wait()
# Test contents of response
self.assertIn("FriendFeed", response.body)
# This test uses an explicit callback-based style.
class MyTestCase3(AsyncTestCase):
def test_http_fetch(self):
client = AsyncHTTPClient()
client.fetch("http://www.tornadoweb.org/", self.handle_fetch)
self.wait()
def handle_fetch(self, response):
# Test contents of response (failures and exceptions here
# will cause self.wait() to throw an exception and end the
# test).
# Exceptions thrown here are magically propagated to
# self.wait() in test_http_fetch() via stack_context.
self.assertIn("FriendFeed", response.body)
self.stop()
"""
def __init__(self, methodName='runTest'):
super(AsyncTestCase, self).__init__(methodName)
@ -265,7 +250,7 @@ class AsyncTestCase(unittest.TestCase):
raise_exc_info(failure)
def run(self, result=None):
with ExceptionStackContext(self._handle_exception):
with ExceptionStackContext(self._handle_exception, delay_warning=True):
super(AsyncTestCase, self).run(result)
# As a last resort, if an exception escaped super.run() and wasn't
# re-raised in tearDown, raise it here. This will cause the
@ -279,6 +264,10 @@ class AsyncTestCase(unittest.TestCase):
Keyword arguments or a single positional argument passed to `stop()` are
saved and will be returned by `wait()`.
.. deprecated:: 5.1
`stop` and `wait` are deprecated; use ``@gen_test`` instead.
"""
assert _arg is None or not kwargs
self.__stop_args = kwargs or _arg
@ -300,6 +289,10 @@ class AsyncTestCase(unittest.TestCase):
.. versionchanged:: 3.1
Added the ``ASYNC_TEST_TIMEOUT`` environment variable.
.. deprecated:: 5.1
`stop` and `wait` are deprecated; use ``@gen_test`` instead.
"""
if timeout is None:
timeout = get_async_test_timeout()

View file

@ -78,6 +78,7 @@ import time
import tornado
import traceback
import types
import warnings
from inspect import isclass
from io import BytesIO
@ -542,6 +543,10 @@ class RequestHandler(object):
Newly-set cookies are not immediately visible via `get_cookie`;
they are not present until the next request.
expires may be a numeric timestamp as returned by `time.time`,
a time tuple as returned by `time.gmtime`, or a
`datetime.datetime` object.
Additional keyword arguments are set on the cookies.Morsel
directly.
See https://docs.python.org/3/library/http.cookies.html#http.cookies.Morsel
@ -744,7 +749,18 @@ class RequestHandler(object):
self._write_buffer.append(chunk)
def render(self, template_name, **kwargs):
"""Renders the template with the given arguments as the response."""
"""Renders the template with the given arguments as the response.
``render()`` calls ``finish()``, so no other output methods can be called
after it.
Returns a `.Future` with the same semantics as the one returned by `finish`.
Awaiting this `.Future` is optional.
.. versionchanged:: 5.1
Now returns a `.Future` instead of ``None``.
"""
if self._finished:
raise RuntimeError("Cannot render() after finish()")
html = self.render_string(template_name, **kwargs)
@ -805,7 +821,7 @@ class RequestHandler(object):
if html_bodies:
hloc = html.index(b'</body>')
html = html[:hloc] + b''.join(html_bodies) + b'\n' + html[hloc:]
self.finish(html)
return self.finish(html)
def render_linked_js(self, js_files):
"""Default method used to render the final js links for the
@ -945,6 +961,11 @@ class RequestHandler(object):
.. versionchanged:: 4.0
Now returns a `.Future` if no callback is given.
.. deprecated:: 5.1
The ``callback`` argument is deprecated and will be removed in
Tornado 6.0.
"""
chunk = b"".join(self._write_buffer)
self._write_buffer = []
@ -983,7 +1004,20 @@ class RequestHandler(object):
return future
def finish(self, chunk=None):
"""Finishes this response, ending the HTTP request."""
"""Finishes this response, ending the HTTP request.
Passing a ``chunk`` to ``finish()`` is equivalent to passing that
chunk to ``write()`` and then calling ``finish()`` with no arguments.
Returns a `.Future` which may optionally be awaited to track the sending
of the response to the client. This `.Future` resolves when all the response
data has been sent, and raises an error if the connection is closed before all
data can be sent.
.. versionchanged:: 5.1
Now returns a `.Future` instead of ``None``.
"""
if self._finished:
raise RuntimeError("finish() called twice")
@ -1015,12 +1049,27 @@ class RequestHandler(object):
# are keepalive connections)
self.request.connection.set_close_callback(None)
self.flush(include_footers=True)
self.request.finish()
future = self.flush(include_footers=True)
self.request.connection.finish()
self._log()
self._finished = True
self.on_finish()
self._break_cycles()
return future
def detach(self):
"""Take control of the underlying stream.
Returns the underlying `.IOStream` object and stops all
further HTTP processing. Intended for implementing protocols
like websockets that tunnel over an HTTP handshake.
This method is only supported when HTTP/1.1 is used.
.. versionadded:: 5.1
"""
self._finished = True
return self.request.connection.detach()
def _break_cycles(self):
# Break up a reference cycle between this handler and the
@ -1688,7 +1737,14 @@ def asynchronous(method):
.. versionchanged:: 4.3 Returning anything but ``None`` or a
yieldable object from a method decorated with ``@asynchronous``
is an error. Such return values were previously ignored silently.
.. deprecated:: 5.1
This decorator is deprecated and will be removed in Tornado 6.0.
Use coroutines instead.
"""
warnings.warn("@asynchronous is deprecated, use coroutines instead",
DeprecationWarning)
# Delay the IOLoop import because it's not available on app engine.
from tornado.ioloop import IOLoop
@ -1696,7 +1752,7 @@ def asynchronous(method):
def wrapper(self, *args, **kwargs):
self._auto_finish = False
with stack_context.ExceptionStackContext(
self._stack_context_handle_exception):
self._stack_context_handle_exception, delay_warning=True):
result = method(self, *args, **kwargs)
if result is not None:
result = gen.convert_yielded(result)

View file

@ -21,6 +21,7 @@ from __future__ import absolute_import, division, print_function
import base64
import hashlib
import os
import sys
import struct
import tornado.escape
import tornado.web
@ -31,7 +32,7 @@ from tornado.escape import utf8, native_str, to_unicode
from tornado import gen, httpclient, httputil
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.iostream import StreamClosedError
from tornado.log import gen_log, app_log
from tornado.log import gen_log
from tornado import simple_httpclient
from tornado.queues import Queue
from tornado.tcpclient import TCPClient
@ -43,6 +44,8 @@ if PY3:
else:
from urlparse import urlparse # py3
_default_max_message_size = 10 * 1024 * 1024
class WebSocketError(Exception):
pass
@ -56,6 +59,10 @@ class WebSocketClosedError(WebSocketError):
pass
class _DecompressTooLargeError(Exception):
pass
class WebSocketHandler(tornado.web.RequestHandler):
"""Subclass this class to create a basic WebSocket handler.
@ -145,7 +152,6 @@ class WebSocketHandler(tornado.web.RequestHandler):
self.stream = None
self._on_close_called = False
@tornado.web.asynchronous
def get(self, *args, **kwargs):
self.open_args = args
self.open_kwargs = kwargs
@ -225,7 +231,7 @@ class WebSocketHandler(tornado.web.RequestHandler):
Default is 10MiB.
"""
return self.settings.get('websocket_max_message_size', None)
return self.settings.get('websocket_max_message_size', _default_max_message_size)
def write_message(self, message, binary=False):
"""Sends the given message to the client of this Web Socket.
@ -256,18 +262,38 @@ class WebSocketHandler(tornado.web.RequestHandler):
return self.ws_connection.write_message(message, binary=binary)
def select_subprotocol(self, subprotocols):
"""Invoked when a new WebSocket requests specific subprotocols.
"""Override to implement subprotocol negotiation.
``subprotocols`` is a list of strings identifying the
subprotocols proposed by the client. This method may be
overridden to return one of those strings to select it, or
``None`` to not select a subprotocol. Failure to select a
subprotocol does not automatically abort the connection,
although clients may close the connection if none of their
proposed subprotocols was selected.
``None`` to not select a subprotocol.
Failure to select a subprotocol does not automatically abort
the connection, although clients may close the connection if
none of their proposed subprotocols was selected.
The list may be empty, in which case this method must return
None. This method is always called exactly once even if no
subprotocols were proposed so that the handler can be advised
of this fact.
.. versionchanged:: 5.1
Previously, this method was called with a list containing
an empty string instead of an empty list if no subprotocols
were proposed by the client.
"""
return None
@property
def selected_subprotocol(self):
"""The subprotocol returned by `select_subprotocol`.
.. versionadded:: 5.1
"""
return self.ws_connection.selected_subprotocol
def get_compression_options(self):
"""Override to return compression options for the connection.
@ -298,6 +324,13 @@ class WebSocketHandler(tornado.web.RequestHandler):
The arguments to `open` are extracted from the `tornado.web.URLSpec`
regular expression, just like the arguments to
`tornado.web.RequestHandler.get`.
`open` may be a coroutine. `on_message` will not be called until
`open` has returned.
.. versionchanged:: 5.1
``open`` may be a coroutine.
"""
pass
@ -481,7 +514,7 @@ class WebSocketHandler(tornado.web.RequestHandler):
self, compression_options=self.get_compression_options())
def _attach_stream(self):
self.stream = self.request.connection.detach()
self.stream = self.detach()
self.stream.set_close_callback(self.on_connection_close)
# disable non-WS methods
for method in ["write", "redirect", "set_header", "set_cookie",
@ -512,8 +545,7 @@ class WebSocketProtocol(object):
try:
result = callback(*args, **kwargs)
except Exception:
app_log.error("Uncaught exception in %s",
getattr(self.request, 'path', None), exc_info=True)
self.handler.log_exception(*sys.exc_info())
self._abort()
else:
if result is not None:
@ -570,7 +602,8 @@ class _PerMessageDeflateCompressor(object):
class _PerMessageDeflateDecompressor(object):
def __init__(self, persistent, max_wbits, compression_options=None):
def __init__(self, persistent, max_wbits, max_message_size, compression_options=None):
self._max_message_size = max_message_size
if max_wbits is None:
max_wbits = zlib.MAX_WBITS
if not (8 <= max_wbits <= zlib.MAX_WBITS):
@ -587,7 +620,10 @@ class _PerMessageDeflateDecompressor(object):
def decompress(self, data):
decompressor = self._decompressor or self._create_decompressor()
return decompressor.decompress(data + b'\x00\x00\xff\xff')
result = decompressor.decompress(data + b'\x00\x00\xff\xff', self._max_message_size)
if decompressor.unconsumed_tail:
raise _DecompressTooLargeError()
return result
class WebSocketProtocol13(WebSocketProtocol):
@ -675,13 +711,17 @@ class WebSocketProtocol13(WebSocketProtocol):
return WebSocketProtocol13.compute_accept_value(
self.request.headers.get("Sec-Websocket-Key"))
@gen.coroutine
def _accept_connection(self):
subprotocols = [s.strip() for s in self.request.headers.get_list("Sec-WebSocket-Protocol")]
if subprotocols:
selected = self.handler.select_subprotocol(subprotocols)
if selected:
assert selected in subprotocols
self.handler.set_header("Sec-WebSocket-Protocol", selected)
subprotocol_header = self.request.headers.get("Sec-WebSocket-Protocol")
if subprotocol_header:
subprotocols = [s.strip() for s in subprotocol_header.split(',')]
else:
subprotocols = []
self.selected_subprotocol = self.handler.select_subprotocol(subprotocols)
if self.selected_subprotocol:
assert self.selected_subprotocol in subprotocols
self.handler.set_header("Sec-WebSocket-Protocol", self.selected_subprotocol)
extensions = self._parse_extensions_header(self.request.headers)
for ext in extensions:
@ -711,9 +751,11 @@ class WebSocketProtocol13(WebSocketProtocol):
self.stream = self.handler.stream
self.start_pinging()
self._run_callback(self.handler.open, *self.handler.open_args,
open_result = self._run_callback(self.handler.open, *self.handler.open_args,
**self.handler.open_kwargs)
self._receive_frame()
if open_result is not None:
yield open_result
yield self._receive_frame_loop()
def _parse_extensions_header(self, headers):
extensions = headers.get("Sec-WebSocket-Extensions", '')
@ -740,6 +782,8 @@ class WebSocketProtocol13(WebSocketProtocol):
else:
raise ValueError("unsupported extension %r", ext)
self.selected_subprotocol = headers.get('Sec-WebSocket-Protocol', None)
def _get_compressor_options(self, side, agreed_parameters, compression_options=None):
"""Converts a websocket agreed_parameters set to keyword arguments
for our compressor objects.
@ -767,6 +811,7 @@ class WebSocketProtocol13(WebSocketProtocol):
self._compressor = _PerMessageDeflateCompressor(
**self._get_compressor_options(side, agreed_parameters, compression_options))
self._decompressor = _PerMessageDeflateDecompressor(
max_message_size=self.handler.max_message_size,
**self._get_compressor_options(other_side, agreed_parameters, compression_options))
def _write_frame(self, fin, opcode, data, flags=0):
@ -836,111 +881,84 @@ class WebSocketProtocol13(WebSocketProtocol):
assert isinstance(data, bytes)
self._write_frame(True, 0x9, data)
def _receive_frame(self):
@gen.coroutine
def _receive_frame_loop(self):
try:
self.stream.read_bytes(2, self._on_frame_start)
while not self.client_terminated:
yield self._receive_frame()
except StreamClosedError:
self._abort()
def _on_frame_start(self, data):
self._wire_bytes_in += len(data)
header, payloadlen = struct.unpack("BB", data)
self._final_frame = header & self.FIN
def _read_bytes(self, n):
self._wire_bytes_in += n
return self.stream.read_bytes(n)
@gen.coroutine
def _receive_frame(self):
# Read the frame header.
data = yield self._read_bytes(2)
header, mask_payloadlen = struct.unpack("BB", data)
is_final_frame = header & self.FIN
reserved_bits = header & self.RSV_MASK
self._frame_opcode = header & self.OPCODE_MASK
self._frame_opcode_is_control = self._frame_opcode & 0x8
if self._decompressor is not None and self._frame_opcode != 0:
opcode = header & self.OPCODE_MASK
opcode_is_control = opcode & 0x8
if self._decompressor is not None and opcode != 0:
# Compression flag is present in the first frame's header,
# but we can't decompress until we have all the frames of
# the message.
self._frame_compressed = bool(reserved_bits & self.RSV1)
reserved_bits &= ~self.RSV1
if reserved_bits:
# client is using as-yet-undefined extensions; abort
self._abort()
return
self._masked_frame = bool(payloadlen & 0x80)
payloadlen = payloadlen & 0x7f
if self._frame_opcode_is_control and payloadlen >= 126:
is_masked = bool(mask_payloadlen & 0x80)
payloadlen = mask_payloadlen & 0x7f
# Parse and validate the length.
if opcode_is_control and payloadlen >= 126:
# control frames must have payload < 126
self._abort()
return
try:
if payloadlen < 126:
self._frame_length = payloadlen
if self._masked_frame:
self.stream.read_bytes(4, self._on_masking_key)
else:
self._read_frame_data(False)
elif payloadlen == 126:
self.stream.read_bytes(2, self._on_frame_length_16)
data = yield self._read_bytes(2)
payloadlen = struct.unpack("!H", data)[0]
elif payloadlen == 127:
self.stream.read_bytes(8, self._on_frame_length_64)
except StreamClosedError:
self._abort()
def _read_frame_data(self, masked):
new_len = self._frame_length
data = yield self._read_bytes(8)
payloadlen = struct.unpack("!Q", data)[0]
new_len = payloadlen
if self._fragmented_message_buffer is not None:
new_len += len(self._fragmented_message_buffer)
if new_len > (self.handler.max_message_size or 10 * 1024 * 1024):
if new_len > self.handler.max_message_size:
self.close(1009, "message too big")
self._abort()
return
self.stream.read_bytes(
self._frame_length,
self._on_masked_frame_data if masked else self._on_frame_data)
def _on_frame_length_16(self, data):
self._wire_bytes_in += len(data)
self._frame_length = struct.unpack("!H", data)[0]
try:
if self._masked_frame:
self.stream.read_bytes(4, self._on_masking_key)
else:
self._read_frame_data(False)
except StreamClosedError:
self._abort()
# Read the payload, unmasking if necessary.
if is_masked:
self._frame_mask = yield self._read_bytes(4)
data = yield self._read_bytes(payloadlen)
if is_masked:
data = _websocket_mask(self._frame_mask, data)
def _on_frame_length_64(self, data):
self._wire_bytes_in += len(data)
self._frame_length = struct.unpack("!Q", data)[0]
try:
if self._masked_frame:
self.stream.read_bytes(4, self._on_masking_key)
else:
self._read_frame_data(False)
except StreamClosedError:
self._abort()
def _on_masking_key(self, data):
self._wire_bytes_in += len(data)
self._frame_mask = data
try:
self._read_frame_data(True)
except StreamClosedError:
self._abort()
def _on_masked_frame_data(self, data):
# Don't touch _wire_bytes_in; we'll do it in _on_frame_data.
self._on_frame_data(_websocket_mask(self._frame_mask, data))
def _on_frame_data(self, data):
handled_future = None
self._wire_bytes_in += len(data)
if self._frame_opcode_is_control:
# Decide what to do with this frame.
if opcode_is_control:
# control frames may be interleaved with a series of fragmented
# data frames, so control frames must not interact with
# self._fragmented_*
if not self._final_frame:
if not is_final_frame:
# control frames must not be fragmented
self._abort()
return
opcode = self._frame_opcode
elif self._frame_opcode == 0: # continuation frame
elif opcode == 0: # continuation frame
if self._fragmented_message_buffer is None:
# nothing to continue
self._abort()
return
self._fragmented_message_buffer += data
if self._final_frame:
if is_final_frame:
opcode = self._fragmented_message_opcode
data = self._fragmented_message_buffer
self._fragmented_message_buffer = None
@ -949,22 +967,14 @@ class WebSocketProtocol13(WebSocketProtocol):
# can't start new message until the old one is finished
self._abort()
return
if self._final_frame:
opcode = self._frame_opcode
else:
self._fragmented_message_opcode = self._frame_opcode
if not is_final_frame:
self._fragmented_message_opcode = opcode
self._fragmented_message_buffer = data
if self._final_frame:
if is_final_frame:
handled_future = self._handle_message(opcode, data)
if not self.client_terminated:
if handled_future:
# on_message is a coroutine, process more frames once it's done.
handled_future.add_done_callback(
lambda future: self._receive_frame())
else:
self._receive_frame()
if handled_future is not None:
yield handled_future
def _handle_message(self, opcode, data):
"""Execute on_message, returning its Future if it is a coroutine."""
@ -972,7 +982,12 @@ class WebSocketProtocol13(WebSocketProtocol):
return
if self._frame_compressed:
try:
data = self._decompressor.decompress(data)
except _DecompressTooLargeError:
self.close(1009, "message too big after decompression")
self._abort()
return
if opcode == 0x1:
# UTF-8 data
@ -1092,7 +1107,7 @@ class WebSocketClientConnection(simple_httpclient._HTTPConnection):
"""
def __init__(self, request, on_message_callback=None,
compression_options=None, ping_interval=None, ping_timeout=None,
max_message_size=None):
max_message_size=None, subprotocols=[]):
self.compression_options = compression_options
self.connect_future = Future()
self.protocol = None
@ -1113,6 +1128,8 @@ class WebSocketClientConnection(simple_httpclient._HTTPConnection):
'Sec-WebSocket-Key': self.key,
'Sec-WebSocket-Version': '13',
})
if subprotocols is not None:
request.headers['Sec-WebSocket-Protocol'] = ','.join(subprotocols)
if self.compression_options is not None:
# Always offer to let the server set our max_wbits (and even though
# we don't offer it, we will accept a client_no_context_takeover
@ -1167,7 +1184,7 @@ class WebSocketClientConnection(simple_httpclient._HTTPConnection):
self.protocol = self.get_websocket_protocol()
self.protocol._process_server_headers(self.key, self.headers)
self.protocol.start_pinging()
self.protocol._receive_frame()
IOLoop.current().add_callback(self.protocol._receive_frame_loop)
if self._timeout is not None:
self.io_loop.remove_timeout(self._timeout)
@ -1247,11 +1264,19 @@ class WebSocketClientConnection(simple_httpclient._HTTPConnection):
return WebSocketProtocol13(self, mask_outgoing=True,
compression_options=self.compression_options)
@property
def selected_subprotocol(self):
"""The subprotocol selected by the server.
.. versionadded:: 5.1
"""
return self.protocol.selected_subprotocol
def websocket_connect(url, callback=None, connect_timeout=None,
on_message_callback=None, compression_options=None,
ping_interval=None, ping_timeout=None,
max_message_size=None):
max_message_size=_default_max_message_size, subprotocols=None):
"""Client-side websocket support.
Takes a url and returns a Future whose result is a
@ -1274,6 +1299,11 @@ def websocket_connect(url, callback=None, connect_timeout=None,
``websocket_connect``. In both styles, a message of ``None``
indicates that the connection has been closed.
``subprotocols`` may be a list of strings specifying proposed
subprotocols. The selected protocol may be found on the
``selected_subprotocol`` attribute of the connection object
when the connection is complete.
.. versionchanged:: 3.2
Also accepts ``HTTPRequest`` objects in place of urls.
@ -1286,6 +1316,9 @@ def websocket_connect(url, callback=None, connect_timeout=None,
.. versionchanged:: 5.0
The ``io_loop`` argument (deprecated since version 4.1) has been removed.
.. versionchanged:: 5.1
Added the ``subprotocols`` argument.
"""
if isinstance(url, httpclient.HTTPRequest):
assert connect_timeout is None
@ -1302,7 +1335,8 @@ def websocket_connect(url, callback=None, connect_timeout=None,
compression_options=compression_options,
ping_interval=ping_interval,
ping_timeout=ping_timeout,
max_message_size=max_message_size)
max_message_size=max_message_size,
subprotocols=subprotocols)
if callback is not None:
IOLoop.current().add_future(conn.connect_future, callback)
return conn.connect_future

View file

@ -33,6 +33,7 @@ from __future__ import absolute_import, division, print_function
import sys
from io import BytesIO
import tornado
import warnings
from tornado.concurrent import Future
from tornado import escape
@ -76,6 +77,7 @@ class WSGIApplication(web.Application):
.. deprecated:: 4.0
Use a regular `.Application` and wrap it in `WSGIAdapter` instead.
This class will be removed in Tornado 6.0.
"""
def __call__(self, environ, start_response):
return WSGIAdapter(self)(environ, start_response)
@ -83,8 +85,10 @@ class WSGIApplication(web.Application):
# WSGI has no facilities for flow control, so just return an already-done
# Future when the interface requires it.
_dummy_future = Future()
_dummy_future.set_result(None)
def _dummy_future():
f = Future()
f.set_result(None)
return f
class _WSGIConnection(httputil.HTTPConnection):
@ -116,7 +120,7 @@ class _WSGIConnection(httputil.HTTPConnection):
self.write(chunk, callback)
elif callback is not None:
callback()
return _dummy_future
return _dummy_future()
def write(self, chunk, callback=None):
if self._expected_content_remaining is not None:
@ -128,7 +132,7 @@ class _WSGIConnection(httputil.HTTPConnection):
self._write_buffer.append(chunk)
if callback is not None:
callback()
return _dummy_future
return _dummy_future()
def finish(self):
if (self._expected_content_remaining is not None and
@ -179,9 +183,25 @@ class WSGIAdapter(object):
that it is not possible to use `.AsyncHTTPClient`, or the
`tornado.auth` or `tornado.websocket` modules.
In multithreaded WSGI servers on Python 3, it may be necessary to
permit `asyncio` to create event loops on any thread. Run the
following at startup (typically import time for WSGI
applications)::
import asyncio
from tornado.platform.asyncio import AnyThreadEventLoopPolicy
asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy())
.. versionadded:: 4.0
.. deprecated:: 5.1
This class is deprecated and will be removed in Tornado 6.0.
Use Tornado's `.HTTPServer` instead of a WSGI container.
"""
def __init__(self, application):
warnings.warn("WSGIAdapter is deprecated, use Tornado's HTTPServer instead",
DeprecationWarning)
if isinstance(application, WSGIApplication):
self.application = lambda request: web.Application.__call__(
application, request)