Update Tornado Web Server 5.0.1 (2b2a220a) → 5.1.1 (cc2cf07).

This commit is contained in:
JackDandy 2018-09-23 20:55:04 +01:00
parent a1e7b4aa66
commit f13de010bf
25 changed files with 859 additions and 506 deletions

View file

@ -10,6 +10,7 @@
* Update Requests library 2.15.1 (282b01a) to 2.19.1 (33b41c7) * Update Requests library 2.15.1 (282b01a) to 2.19.1 (33b41c7)
* Update scandir module 1.6 (c3592ee) to 1.9.0 (9ab3d1f) * Update scandir module 1.6 (c3592ee) to 1.9.0 (9ab3d1f)
* Update SimpleJSON 3.13.2 (6ffddbe) to 3.16.0 (e2a54f7) * Update SimpleJSON 3.13.2 (6ffddbe) to 3.16.0 (e2a54f7)
* Update Tornado Web Server 5.0.1 (2b2a220a) to 5.1.1 (cc2cf07)
* Update unidecode module 1.0.22 (81f938d) to 1.0.22 (578cdb9) * Update unidecode module 1.0.22 (81f938d) to 1.0.22 (578cdb9)
* Add idna library 2.7 (0f50bdc) * Add idna library 2.7 (0f50bdc)
* Add urllib3 release 1.23 (7c216f4) * Add urllib3 release 1.23 (7c216f4)

View file

@ -24,5 +24,5 @@ from __future__ import absolute_import, division, print_function
# is zero for an official release, positive for a development branch, # is zero for an official release, positive for a development branch,
# or negative for a release candidate or beta (after the base version # or negative for a release candidate or beta (after the base version
# number has been incremented) # number has been incremented)
version = "5.1.dev1" version = "5.1.1"
version_info = (5, 1, 0, -100) version_info = (5, 1, 1, 0)

View file

@ -37,15 +37,14 @@ Example usage for Google OAuth:
class GoogleOAuth2LoginHandler(tornado.web.RequestHandler, class GoogleOAuth2LoginHandler(tornado.web.RequestHandler,
tornado.auth.GoogleOAuth2Mixin): tornado.auth.GoogleOAuth2Mixin):
@tornado.gen.coroutine async def get(self):
def get(self):
if self.get_argument('code', False): if self.get_argument('code', False):
user = yield self.get_authenticated_user( user = await self.get_authenticated_user(
redirect_uri='http://your.site.com/auth/google', redirect_uri='http://your.site.com/auth/google',
code=self.get_argument('code')) code=self.get_argument('code'))
# Save the user with e.g. set_secure_cookie # Save the user with e.g. set_secure_cookie
else: else:
yield self.authorize_redirect( await self.authorize_redirect(
redirect_uri='http://your.site.com/auth/google', redirect_uri='http://your.site.com/auth/google',
client_id=self.settings['google_oauth']['key'], client_id=self.settings['google_oauth']['key'],
scope=['profile', 'email'], scope=['profile', 'email'],
@ -75,15 +74,15 @@ import time
import uuid import uuid
import warnings import warnings
from tornado.concurrent import (Future, return_future, chain_future, from tornado.concurrent import (Future, _non_deprecated_return_future,
future_set_exc_info, future_set_exc_info, chain_future,
future_set_result_unless_cancelled) future_set_result_unless_cancelled)
from tornado import gen from tornado import gen
from tornado import httpclient from tornado import httpclient
from tornado import escape from tornado import escape
from tornado.httputil import url_concat from tornado.httputil import url_concat
from tornado.log import gen_log from tornado.log import gen_log
from tornado.stack_context import ExceptionStackContext from tornado.stack_context import ExceptionStackContext, wrap
from tornado.util import unicode_type, ArgReplacer, PY3 from tornado.util import unicode_type, ArgReplacer, PY3
if PY3: if PY3:
@ -128,7 +127,7 @@ def _auth_return_future(f):
warnings.warn("callback arguments are deprecated, use the returned Future instead", warnings.warn("callback arguments are deprecated, use the returned Future instead",
DeprecationWarning) DeprecationWarning)
future.add_done_callback( future.add_done_callback(
functools.partial(_auth_future_to_callback, callback)) wrap(functools.partial(_auth_future_to_callback, callback)))
def handle_exception(typ, value, tb): def handle_exception(typ, value, tb):
if future.done(): if future.done():
@ -136,7 +135,7 @@ def _auth_return_future(f):
else: else:
future_set_exc_info(future, (typ, value, tb)) future_set_exc_info(future, (typ, value, tb))
return True return True
with ExceptionStackContext(handle_exception): with ExceptionStackContext(handle_exception, delay_warning=True):
f(*args, **kwargs) f(*args, **kwargs)
return future return future
return wrapper return wrapper
@ -149,7 +148,7 @@ class OpenIdMixin(object):
* ``_OPENID_ENDPOINT``: the identity provider's URI. * ``_OPENID_ENDPOINT``: the identity provider's URI.
""" """
@return_future @_non_deprecated_return_future
def authenticate_redirect(self, callback_uri=None, def authenticate_redirect(self, callback_uri=None,
ax_attrs=["name", "email", "language", "username"], ax_attrs=["name", "email", "language", "username"],
callback=None): callback=None):
@ -203,8 +202,8 @@ class OpenIdMixin(object):
if http_client is None: if http_client is None:
http_client = self.get_auth_http_client() http_client = self.get_auth_http_client()
fut = http_client.fetch(url, method="POST", body=urllib_parse.urlencode(args)) fut = http_client.fetch(url, method="POST", body=urllib_parse.urlencode(args))
fut.add_done_callback(functools.partial( fut.add_done_callback(wrap(functools.partial(
self._on_authentication_verified, callback)) self._on_authentication_verified, callback)))
def _openid_args(self, callback_uri, ax_attrs=[], oauth_scope=None): def _openid_args(self, callback_uri, ax_attrs=[], oauth_scope=None):
url = urlparse.urljoin(self.request.full_url(), callback_uri) url = urlparse.urljoin(self.request.full_url(), callback_uri)
@ -344,7 +343,7 @@ class OAuthMixin(object):
Subclasses must also override the `_oauth_get_user_future` and Subclasses must also override the `_oauth_get_user_future` and
`_oauth_consumer_token` methods. `_oauth_consumer_token` methods.
""" """
@return_future @_non_deprecated_return_future
def authorize_redirect(self, callback_uri=None, extra_params=None, def authorize_redirect(self, callback_uri=None, extra_params=None,
http_client=None, callback=None): http_client=None, callback=None):
"""Redirects the user to obtain OAuth authorization for this service. """Redirects the user to obtain OAuth authorization for this service.
@ -382,18 +381,18 @@ class OAuthMixin(object):
fut = http_client.fetch( fut = http_client.fetch(
self._oauth_request_token_url(callback_uri=callback_uri, self._oauth_request_token_url(callback_uri=callback_uri,
extra_params=extra_params)) extra_params=extra_params))
fut.add_done_callback(functools.partial( fut.add_done_callback(wrap(functools.partial(
self._on_request_token, self._on_request_token,
self._OAUTH_AUTHORIZE_URL, self._OAUTH_AUTHORIZE_URL,
callback_uri, callback_uri,
callback)) callback)))
else: else:
fut = http_client.fetch(self._oauth_request_token_url()) fut = http_client.fetch(self._oauth_request_token_url())
fut.add_done_callback( fut.add_done_callback(
functools.partial( wrap(functools.partial(
self._on_request_token, self._OAUTH_AUTHORIZE_URL, self._on_request_token, self._OAUTH_AUTHORIZE_URL,
callback_uri, callback_uri,
callback)) callback)))
@_auth_return_future @_auth_return_future
def get_authenticated_user(self, callback, http_client=None): def get_authenticated_user(self, callback, http_client=None):
@ -433,7 +432,7 @@ class OAuthMixin(object):
if http_client is None: if http_client is None:
http_client = self.get_auth_http_client() http_client = self.get_auth_http_client()
fut = http_client.fetch(self._oauth_access_token_url(token)) fut = http_client.fetch(self._oauth_access_token_url(token))
fut.add_done_callback(functools.partial(self._on_access_token, callback)) fut.add_done_callback(wrap(functools.partial(self._on_access_token, callback)))
def _oauth_request_token_url(self, callback_uri=None, extra_params=None): def _oauth_request_token_url(self, callback_uri=None, extra_params=None):
consumer_token = self._oauth_consumer_token() consumer_token = self._oauth_consumer_token()
@ -516,7 +515,7 @@ class OAuthMixin(object):
fut = self._oauth_get_user_future(access_token) fut = self._oauth_get_user_future(access_token)
fut = gen.convert_yielded(fut) fut = gen.convert_yielded(fut)
fut.add_done_callback( fut.add_done_callback(
functools.partial(self._on_oauth_get_user, access_token, future)) wrap(functools.partial(self._on_oauth_get_user, access_token, future)))
def _oauth_consumer_token(self): def _oauth_consumer_token(self):
"""Subclasses must override this to return their OAuth consumer keys. """Subclasses must override this to return their OAuth consumer keys.
@ -525,7 +524,7 @@ class OAuthMixin(object):
""" """
raise NotImplementedError() raise NotImplementedError()
@return_future @_non_deprecated_return_future
def _oauth_get_user_future(self, access_token, callback): def _oauth_get_user_future(self, access_token, callback):
"""Subclasses must override this to get basic information about the """Subclasses must override this to get basic information about the
user. user.
@ -618,7 +617,7 @@ class OAuth2Mixin(object):
* ``_OAUTH_AUTHORIZE_URL``: The service's authorization url. * ``_OAUTH_AUTHORIZE_URL``: The service's authorization url.
* ``_OAUTH_ACCESS_TOKEN_URL``: The service's access token url. * ``_OAUTH_ACCESS_TOKEN_URL``: The service's access token url.
""" """
@return_future @_non_deprecated_return_future
def authorize_redirect(self, redirect_uri=None, client_id=None, def authorize_redirect(self, redirect_uri=None, client_id=None,
client_secret=None, extra_params=None, client_secret=None, extra_params=None,
callback=None, scope=None, response_type="code"): callback=None, scope=None, response_type="code"):
@ -683,16 +682,15 @@ class OAuth2Mixin(object):
class MainHandler(tornado.web.RequestHandler, class MainHandler(tornado.web.RequestHandler,
tornado.auth.FacebookGraphMixin): tornado.auth.FacebookGraphMixin):
@tornado.web.authenticated @tornado.web.authenticated
@tornado.gen.coroutine async def get(self):
def get(self): new_entry = await self.oauth2_request(
new_entry = yield self.oauth2_request(
"https://graph.facebook.com/me/feed", "https://graph.facebook.com/me/feed",
post_args={"message": "I am posting from my Tornado application!"}, post_args={"message": "I am posting from my Tornado application!"},
access_token=self.current_user["access_token"]) access_token=self.current_user["access_token"])
if not new_entry: if not new_entry:
# Call failed; perhaps missing permission? # Call failed; perhaps missing permission?
yield self.authorize_redirect() await self.authorize_redirect()
return return
self.finish("Posted a message!") self.finish("Posted a message!")
@ -713,7 +711,7 @@ class OAuth2Mixin(object):
if all_args: if all_args:
url += "?" + urllib_parse.urlencode(all_args) url += "?" + urllib_parse.urlencode(all_args)
callback = functools.partial(self._on_oauth2_request, callback) callback = wrap(functools.partial(self._on_oauth2_request, callback))
http = self.get_auth_http_client() http = self.get_auth_http_client()
if post_args is not None: if post_args is not None:
fut = http.fetch(url, method="POST", body=urllib_parse.urlencode(post_args)) fut = http.fetch(url, method="POST", body=urllib_parse.urlencode(post_args))
@ -758,13 +756,12 @@ class TwitterMixin(OAuthMixin):
class TwitterLoginHandler(tornado.web.RequestHandler, class TwitterLoginHandler(tornado.web.RequestHandler,
tornado.auth.TwitterMixin): tornado.auth.TwitterMixin):
@tornado.gen.coroutine async def get(self):
def get(self):
if self.get_argument("oauth_token", None): if self.get_argument("oauth_token", None):
user = yield self.get_authenticated_user() user = await self.get_authenticated_user()
# Save the user using e.g. set_secure_cookie() # Save the user using e.g. set_secure_cookie()
else: else:
yield self.authorize_redirect() await self.authorize_redirect()
.. testoutput:: .. testoutput::
:hide: :hide:
@ -781,7 +778,7 @@ class TwitterMixin(OAuthMixin):
_OAUTH_NO_CALLBACKS = False _OAUTH_NO_CALLBACKS = False
_TWITTER_BASE_URL = "https://api.twitter.com/1.1" _TWITTER_BASE_URL = "https://api.twitter.com/1.1"
@return_future @_non_deprecated_return_future
def authenticate_redirect(self, callback_uri=None, callback=None): def authenticate_redirect(self, callback_uri=None, callback=None):
"""Just like `~OAuthMixin.authorize_redirect`, but """Just like `~OAuthMixin.authorize_redirect`, but
auto-redirects if authorized. auto-redirects if authorized.
@ -799,10 +796,10 @@ class TwitterMixin(OAuthMixin):
Use the returned awaitable object instead. Use the returned awaitable object instead.
""" """
http = self.get_auth_http_client() http = self.get_auth_http_client()
http.fetch(self._oauth_request_token_url(callback_uri=callback_uri), fut = http.fetch(self._oauth_request_token_url(callback_uri=callback_uri))
functools.partial( fut.add_done_callback(wrap(functools.partial(
self._on_request_token, self._OAUTH_AUTHENTICATE_URL, self._on_request_token, self._OAUTH_AUTHENTICATE_URL,
None, callback)) None, callback)))
@_auth_return_future @_auth_return_future
def twitter_request(self, path, callback=None, access_token=None, def twitter_request(self, path, callback=None, access_token=None,
@ -829,9 +826,8 @@ class TwitterMixin(OAuthMixin):
class MainHandler(tornado.web.RequestHandler, class MainHandler(tornado.web.RequestHandler,
tornado.auth.TwitterMixin): tornado.auth.TwitterMixin):
@tornado.web.authenticated @tornado.web.authenticated
@tornado.gen.coroutine async def get(self):
def get(self): new_entry = await self.twitter_request(
new_entry = yield self.twitter_request(
"/statuses/update", "/statuses/update",
post_args={"status": "Testing Tornado Web Server"}, post_args={"status": "Testing Tornado Web Server"},
access_token=self.current_user["access_token"]) access_token=self.current_user["access_token"])
@ -867,7 +863,7 @@ class TwitterMixin(OAuthMixin):
if args: if args:
url += "?" + urllib_parse.urlencode(args) url += "?" + urllib_parse.urlencode(args)
http = self.get_auth_http_client() http = self.get_auth_http_client()
http_callback = functools.partial(self._on_twitter_request, callback, url) http_callback = wrap(functools.partial(self._on_twitter_request, callback, url))
if post_args is not None: if post_args is not None:
fut = http.fetch(url, method="POST", body=urllib_parse.urlencode(post_args)) fut = http.fetch(url, method="POST", body=urllib_parse.urlencode(post_args))
else: else:
@ -942,19 +938,18 @@ class GoogleOAuth2Mixin(OAuth2Mixin):
class GoogleOAuth2LoginHandler(tornado.web.RequestHandler, class GoogleOAuth2LoginHandler(tornado.web.RequestHandler,
tornado.auth.GoogleOAuth2Mixin): tornado.auth.GoogleOAuth2Mixin):
@tornado.gen.coroutine async def get(self):
def get(self):
if self.get_argument('code', False): if self.get_argument('code', False):
access = yield self.get_authenticated_user( access = await self.get_authenticated_user(
redirect_uri='http://your.site.com/auth/google', redirect_uri='http://your.site.com/auth/google',
code=self.get_argument('code')) code=self.get_argument('code'))
user = yield self.oauth2_request( user = await self.oauth2_request(
"https://www.googleapis.com/oauth2/v1/userinfo", "https://www.googleapis.com/oauth2/v1/userinfo",
access_token=access["access_token"]) access_token=access["access_token"])
# Save the user and access token with # Save the user and access token with
# e.g. set_secure_cookie. # e.g. set_secure_cookie.
else: else:
yield self.authorize_redirect( await self.authorize_redirect(
redirect_uri='http://your.site.com/auth/google', redirect_uri='http://your.site.com/auth/google',
client_id=self.settings['google_oauth']['key'], client_id=self.settings['google_oauth']['key'],
scope=['profile', 'email'], scope=['profile', 'email'],
@ -982,7 +977,7 @@ class GoogleOAuth2Mixin(OAuth2Mixin):
method="POST", method="POST",
headers={'Content-Type': 'application/x-www-form-urlencoded'}, headers={'Content-Type': 'application/x-www-form-urlencoded'},
body=body) body=body)
fut.add_done_callback(functools.partial(self._on_access_token, callback)) fut.add_done_callback(wrap(functools.partial(self._on_access_token, callback)))
def _on_access_token(self, future, response_fut): def _on_access_token(self, future, response_fut):
"""Callback function for the exchange to the access token.""" """Callback function for the exchange to the access token."""
@ -1014,17 +1009,16 @@ class FacebookGraphMixin(OAuth2Mixin):
class FacebookGraphLoginHandler(tornado.web.RequestHandler, class FacebookGraphLoginHandler(tornado.web.RequestHandler,
tornado.auth.FacebookGraphMixin): tornado.auth.FacebookGraphMixin):
@tornado.gen.coroutine async def get(self):
def get(self):
if self.get_argument("code", False): if self.get_argument("code", False):
user = yield self.get_authenticated_user( user = await self.get_authenticated_user(
redirect_uri='/auth/facebookgraph/', redirect_uri='/auth/facebookgraph/',
client_id=self.settings["facebook_api_key"], client_id=self.settings["facebook_api_key"],
client_secret=self.settings["facebook_secret"], client_secret=self.settings["facebook_secret"],
code=self.get_argument("code")) code=self.get_argument("code"))
# Save the user with e.g. set_secure_cookie # Save the user with e.g. set_secure_cookie
else: else:
yield self.authorize_redirect( await self.authorize_redirect(
redirect_uri='/auth/facebookgraph/', redirect_uri='/auth/facebookgraph/',
client_id=self.settings["facebook_api_key"], client_id=self.settings["facebook_api_key"],
extra_params={"scope": "read_stream,offline_access"}) extra_params={"scope": "read_stream,offline_access"})
@ -1067,8 +1061,8 @@ class FacebookGraphMixin(OAuth2Mixin):
fields.update(extra_fields) fields.update(extra_fields)
fut = http.fetch(self._oauth_request_token_url(**args)) fut = http.fetch(self._oauth_request_token_url(**args))
fut.add_done_callback(functools.partial(self._on_access_token, redirect_uri, client_id, fut.add_done_callback(wrap(functools.partial(self._on_access_token, redirect_uri, client_id,
client_secret, callback, fields)) client_secret, callback, fields)))
@gen.coroutine @gen.coroutine
def _on_access_token(self, redirect_uri, client_id, client_secret, def _on_access_token(self, redirect_uri, client_id, client_secret,
@ -1134,9 +1128,8 @@ class FacebookGraphMixin(OAuth2Mixin):
class MainHandler(tornado.web.RequestHandler, class MainHandler(tornado.web.RequestHandler,
tornado.auth.FacebookGraphMixin): tornado.auth.FacebookGraphMixin):
@tornado.web.authenticated @tornado.web.authenticated
@tornado.gen.coroutine async def get(self):
def get(self): new_entry = await self.facebook_request(
new_entry = yield self.facebook_request(
"/me/feed", "/me/feed",
post_args={"message": "I am posting from my Tornado application!"}, post_args={"message": "I am posting from my Tornado application!"},
access_token=self.current_user["access_token"]) access_token=self.current_user["access_token"])

View file

@ -107,6 +107,9 @@ _watched_files = set()
_reload_hooks = [] _reload_hooks = []
_reload_attempted = False _reload_attempted = False
_io_loops = weakref.WeakKeyDictionary() # type: ignore _io_loops = weakref.WeakKeyDictionary() # type: ignore
_autoreload_is_main = False
_original_argv = None
_original_spec = None
def start(check_time=500): def start(check_time=500):
@ -214,11 +217,15 @@ def _reload():
# __spec__ is not available (Python < 3.4), check instead if # __spec__ is not available (Python < 3.4), check instead if
# sys.path[0] is an empty string and add the current directory to # sys.path[0] is an empty string and add the current directory to
# $PYTHONPATH. # $PYTHONPATH.
spec = getattr(sys.modules['__main__'], '__spec__', None) if _autoreload_is_main:
if spec: spec = _original_spec
argv = ['-m', spec.name] + sys.argv[1:] argv = _original_argv
else: else:
spec = getattr(sys.modules['__main__'], '__spec__', None)
argv = sys.argv argv = sys.argv
if spec:
argv = ['-m', spec.name] + argv[1:]
else:
path_prefix = '.' + os.pathsep path_prefix = '.' + os.pathsep
if (sys.path[0] == '' and if (sys.path[0] == '' and
not os.environ.get("PYTHONPATH", "").startswith(path_prefix)): not os.environ.get("PYTHONPATH", "").startswith(path_prefix)):
@ -226,7 +233,7 @@ def _reload():
os.environ.get("PYTHONPATH", "")) os.environ.get("PYTHONPATH", ""))
if not _has_execv: if not _has_execv:
subprocess.Popen([sys.executable] + argv) subprocess.Popen([sys.executable] + argv)
sys.exit(0) os._exit(0)
else: else:
try: try:
os.execv(sys.executable, [sys.executable] + argv) os.execv(sys.executable, [sys.executable] + argv)
@ -269,7 +276,17 @@ def main():
can catch import-time problems like syntax errors that would otherwise can catch import-time problems like syntax errors that would otherwise
prevent the script from reaching its call to `wait`. prevent the script from reaching its call to `wait`.
""" """
# Remember that we were launched with autoreload as main.
# The main module can be tricky; set the variables both in our globals
# (which may be __main__) and the real importable version.
import tornado.autoreload
global _autoreload_is_main
global _original_argv, _original_spec
tornado.autoreload._autoreload_is_main = _autoreload_is_main = True
original_argv = sys.argv original_argv = sys.argv
tornado.autoreload._original_argv = _original_argv = original_argv
original_spec = getattr(sys.modules['__main__'], '__spec__', None)
tornado.autoreload._original_spec = _original_spec = original_spec
sys.argv = sys.argv[:] sys.argv = sys.argv[:]
if len(sys.argv) >= 3 and sys.argv[1] == "-m": if len(sys.argv) >= 3 and sys.argv[1] == "-m":
mode = "module" mode = "module"

View file

@ -483,7 +483,7 @@ def return_future(f):
If no callback is given, the caller should use the ``Future`` to If no callback is given, the caller should use the ``Future`` to
wait for the function to complete (perhaps by yielding it in a wait for the function to complete (perhaps by yielding it in a
`.gen.engine` function, or passing it to `.IOLoop.add_future`). coroutine, or passing it to `.IOLoop.add_future`).
Usage: Usage:
@ -494,10 +494,8 @@ def return_future(f):
# Do stuff (possibly asynchronous) # Do stuff (possibly asynchronous)
callback(result) callback(result)
@gen.engine async def caller():
def caller(callback): await future_func(arg1, arg2)
yield future_func(arg1, arg2)
callback()
.. ..
@ -512,9 +510,22 @@ def return_future(f):
.. deprecated:: 5.1 .. deprecated:: 5.1
New code should use coroutines directly instead of wrapping This decorator will be removed in Tornado 6.0. New code should
callback-based code with this decorator. use coroutines directly instead of wrapping callback-based code
with this decorator. Interactions with non-Tornado
callback-based code should be managed explicitly to avoid
relying on the `.ExceptionStackContext` built into this
decorator.
""" """
warnings.warn("@return_future is deprecated, use coroutines instead",
DeprecationWarning)
return _non_deprecated_return_future(f, warn=True)
def _non_deprecated_return_future(f, warn=False):
# Allow auth.py to use this decorator without triggering
# deprecation warnings. This will go away once auth.py has removed
# its legacy interfaces in 6.0.
replacer = ArgReplacer(f, 'callback') replacer = ArgReplacer(f, 'callback')
@functools.wraps(f) @functools.wraps(f)
@ -528,7 +539,15 @@ def return_future(f):
future_set_exc_info(future, (typ, value, tb)) future_set_exc_info(future, (typ, value, tb))
return True return True
exc_info = None exc_info = None
with ExceptionStackContext(handle_error): esc = ExceptionStackContext(handle_error, delay_warning=True)
with esc:
if not warn:
# HACK: In non-deprecated mode (only used in auth.py),
# suppress the warning entirely. Since this is added
# in a 5.1 patch release and already removed in 6.0
# I'm prioritizing a minimial change instead of a
# clean solution.
esc.delay_warning = False
try: try:
result = f(*args, **kwargs) result = f(*args, **kwargs)
if result is not None: if result is not None:

View file

@ -80,7 +80,7 @@ class CurlAsyncHTTPClient(AsyncHTTPClient):
self._multi = None self._multi = None
def fetch_impl(self, request, callback): def fetch_impl(self, request, callback):
self._requests.append((request, callback)) self._requests.append((request, callback, self.io_loop.time()))
self._process_queue() self._process_queue()
self._set_timeout(0) self._set_timeout(0)
@ -205,13 +205,15 @@ class CurlAsyncHTTPClient(AsyncHTTPClient):
while self._free_list and self._requests: while self._free_list and self._requests:
started += 1 started += 1
curl = self._free_list.pop() curl = self._free_list.pop()
(request, callback) = self._requests.popleft() (request, callback, queue_start_time) = self._requests.popleft()
curl.info = { curl.info = {
"headers": httputil.HTTPHeaders(), "headers": httputil.HTTPHeaders(),
"buffer": BytesIO(), "buffer": BytesIO(),
"request": request, "request": request,
"callback": callback, "callback": callback,
"queue_start_time": queue_start_time,
"curl_start_time": time.time(), "curl_start_time": time.time(),
"curl_start_ioloop_time": self.io_loop.current().time(),
} }
try: try:
self._curl_setup_request( self._curl_setup_request(
@ -257,7 +259,7 @@ class CurlAsyncHTTPClient(AsyncHTTPClient):
# the various curl timings are documented at # the various curl timings are documented at
# http://curl.haxx.se/libcurl/c/curl_easy_getinfo.html # http://curl.haxx.se/libcurl/c/curl_easy_getinfo.html
time_info = dict( time_info = dict(
queue=info["curl_start_time"] - info["request"].start_time, queue=info["curl_start_ioloop_time"] - info["queue_start_time"],
namelookup=curl.getinfo(pycurl.NAMELOOKUP_TIME), namelookup=curl.getinfo(pycurl.NAMELOOKUP_TIME),
connect=curl.getinfo(pycurl.CONNECT_TIME), connect=curl.getinfo(pycurl.CONNECT_TIME),
appconnect=curl.getinfo(pycurl.APPCONNECT_TIME), appconnect=curl.getinfo(pycurl.APPCONNECT_TIME),
@ -271,7 +273,8 @@ class CurlAsyncHTTPClient(AsyncHTTPClient):
request=info["request"], code=code, headers=info["headers"], request=info["request"], code=code, headers=info["headers"],
buffer=buffer, effective_url=effective_url, error=error, buffer=buffer, effective_url=effective_url, error=error,
reason=info['headers'].get("X-Http-Reason", None), reason=info['headers'].get("X-Http-Reason", None),
request_time=time.time() - info["curl_start_time"], request_time=self.io_loop.time() - info["curl_start_ioloop_time"],
start_time=info["curl_start_time"],
time_info=time_info)) time_info=time_info))
except Exception: except Exception:
self.handle_callback_exception(info["callback"]) self.handle_callback_exception(info["callback"])
@ -319,17 +322,7 @@ class CurlAsyncHTTPClient(AsyncHTTPClient):
self.io_loop.add_callback(request.streaming_callback, chunk) self.io_loop.add_callback(request.streaming_callback, chunk)
else: else:
write_function = buffer.write write_function = buffer.write
if bytes is str: # py2
curl.setopt(pycurl.WRITEFUNCTION, write_function) curl.setopt(pycurl.WRITEFUNCTION, write_function)
else: # py3
# Upstream pycurl doesn't support py3, but ubuntu 12.10 includes
# a fork/port. That version has a bug in which it passes unicode
# strings instead of bytes to the WRITEFUNCTION. This means that
# if you use a WRITEFUNCTION (which tornado always does), you cannot
# download arbitrary binary data. This needs to be fixed in the
# ported pycurl package, but in the meantime this lambda will
# make it work for downloading (utf8) text.
curl.setopt(pycurl.WRITEFUNCTION, lambda s: write_function(utf8(s)))
curl.setopt(pycurl.FOLLOWLOCATION, request.follow_redirects) curl.setopt(pycurl.FOLLOWLOCATION, request.follow_redirects)
curl.setopt(pycurl.MAXREDIRS, request.max_redirects) curl.setopt(pycurl.MAXREDIRS, request.max_redirects)
curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(1000 * request.connect_timeout)) curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(1000 * request.connect_timeout))
@ -348,7 +341,7 @@ class CurlAsyncHTTPClient(AsyncHTTPClient):
curl.setopt(pycurl.PROXY, request.proxy_host) curl.setopt(pycurl.PROXY, request.proxy_host)
curl.setopt(pycurl.PROXYPORT, request.proxy_port) curl.setopt(pycurl.PROXYPORT, request.proxy_port)
if request.proxy_username: if request.proxy_username:
credentials = '%s:%s' % (request.proxy_username, credentials = httputil.encode_username_password(request.proxy_username,
request.proxy_password) request.proxy_password)
curl.setopt(pycurl.PROXYUSERPWD, credentials) curl.setopt(pycurl.PROXYUSERPWD, credentials)
@ -441,8 +434,6 @@ class CurlAsyncHTTPClient(AsyncHTTPClient):
curl.setopt(pycurl.INFILESIZE, len(request.body or '')) curl.setopt(pycurl.INFILESIZE, len(request.body or ''))
if request.auth_username is not None: if request.auth_username is not None:
userpwd = "%s:%s" % (request.auth_username, request.auth_password or '')
if request.auth_mode is None or request.auth_mode == "basic": if request.auth_mode is None or request.auth_mode == "basic":
curl.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC) curl.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC)
elif request.auth_mode == "digest": elif request.auth_mode == "digest":
@ -450,7 +441,9 @@ class CurlAsyncHTTPClient(AsyncHTTPClient):
else: else:
raise ValueError("Unsupported auth_mode %s" % request.auth_mode) raise ValueError("Unsupported auth_mode %s" % request.auth_mode)
curl.setopt(pycurl.USERPWD, native_str(userpwd)) userpwd = httputil.encode_username_password(request.auth_username,
request.auth_password)
curl.setopt(pycurl.USERPWD, userpwd)
curl_log.debug("%s %s (username: %r)", request.method, request.url, curl_log.debug("%s %s (username: %r)", request.method, request.url,
request.auth_username) request.auth_username)
else: else:

View file

@ -17,7 +17,7 @@ environment than chaining callbacks. Code using coroutines is
technically asynchronous, but it is written as a single generator technically asynchronous, but it is written as a single generator
instead of a collection of separate functions. instead of a collection of separate functions.
For example, the following asynchronous handler: For example, the following callback-based asynchronous handler:
.. testcode:: .. testcode::

View file

@ -21,6 +21,7 @@
from __future__ import absolute_import, division, print_function from __future__ import absolute_import, division, print_function
import re import re
import warnings
from tornado.concurrent import (Future, future_add_done_callback, from tornado.concurrent import (Future, future_add_done_callback,
future_set_result_unless_cancelled) future_set_result_unless_cancelled)
@ -277,8 +278,14 @@ class HTTP1Connection(httputil.HTTPConnection):
def set_close_callback(self, callback): def set_close_callback(self, callback):
"""Sets a callback that will be run when the connection is closed. """Sets a callback that will be run when the connection is closed.
.. deprecated:: 4.0 Note that this callback is slightly different from
Use `.HTTPMessageDelegate.on_connection_close` instead. `.HTTPMessageDelegate.on_connection_close`: The
`.HTTPMessageDelegate` method is called when the connection is
closed while recieving a message. This callback is used when
there is not an active delegate (for example, on the server
side this callback is used if the client closes the connection
after sending its request but before receiving all the
response.
""" """
self._close_callback = stack_context.wrap(callback) self._close_callback = stack_context.wrap(callback)
@ -395,6 +402,8 @@ class HTTP1Connection(httputil.HTTPConnection):
future.exception() future.exception()
else: else:
if callback is not None: if callback is not None:
warnings.warn("callback argument is deprecated, use returned Future instead",
DeprecationWarning)
self._write_callback = stack_context.wrap(callback) self._write_callback = stack_context.wrap(callback)
else: else:
future = self._write_future = Future() future = self._write_future = Future()
@ -402,7 +411,7 @@ class HTTP1Connection(httputil.HTTPConnection):
if chunk: if chunk:
data += self._format_chunk(chunk) data += self._format_chunk(chunk)
self._pending_write = self.stream.write(data) self._pending_write = self.stream.write(data)
self._pending_write.add_done_callback(self._on_write_complete) future_add_done_callback(self._pending_write, self._on_write_complete)
return future return future
def _format_chunk(self, chunk): def _format_chunk(self, chunk):
@ -434,6 +443,8 @@ class HTTP1Connection(httputil.HTTPConnection):
self._write_future.exception() self._write_future.exception()
else: else:
if callback is not None: if callback is not None:
warnings.warn("callback argument is deprecated, use returned Future instead",
DeprecationWarning)
self._write_callback = stack_context.wrap(callback) self._write_callback = stack_context.wrap(callback)
else: else:
future = self._write_future = Future() future = self._write_future = Future()

View file

@ -125,15 +125,15 @@ class AsyncHTTPClient(Configurable):
Example usage:: Example usage::
def handle_response(response): async def f():
if response.error: http_client = AsyncHTTPClient()
print("Error: %s" % response.error) try:
response = await http_client.fetch("http://www.google.com")
except Exception as e:
print("Error: %s" % e)
else: else:
print(response.body) print(response.body)
http_client = AsyncHTTPClient()
http_client.fetch("http://www.google.com/", handle_response)
The constructor for this class is magic in several respects: It The constructor for this class is magic in several respects: It
actually creates an instance of an implementation-specific actually creates an instance of an implementation-specific
subclass, and instances are reused as a kind of pseudo-singleton subclass, and instances are reused as a kind of pseudo-singleton
@ -578,17 +578,35 @@ class HTTPResponse(object):
* error: Exception object, if any * error: Exception object, if any
* request_time: seconds from request start to finish * request_time: seconds from request start to finish. Includes all network
operations from DNS resolution to receiving the last byte of data.
Does not include time spent in the queue (due to the ``max_clients`` option).
If redirects were followed, only includes the final request.
* start_time: Time at which the HTTP operation started, based on `time.time`
(not the monotonic clock used by `.IOLoop.time`). May be ``None`` if the request
timed out while in the queue.
* time_info: dictionary of diagnostic timing information from the request. * time_info: dictionary of diagnostic timing information from the request.
Available data are subject to change, but currently uses timings Available data are subject to change, but currently uses timings
available from http://curl.haxx.se/libcurl/c/curl_easy_getinfo.html, available from http://curl.haxx.se/libcurl/c/curl_easy_getinfo.html,
plus ``queue``, which is the delay (if any) introduced by waiting for plus ``queue``, which is the delay (if any) introduced by waiting for
a slot under `AsyncHTTPClient`'s ``max_clients`` setting. a slot under `AsyncHTTPClient`'s ``max_clients`` setting.
.. versionadded:: 5.1
Added the ``start_time`` attribute.
.. versionchanged:: 5.1
The ``request_time`` attribute previously included time spent in the queue
for ``simple_httpclient``, but not in ``curl_httpclient``. Now queueing time
is excluded in both implementations. ``request_time`` is now more accurate for
``curl_httpclient`` because it uses a monotonic clock when available.
""" """
def __init__(self, request, code, headers=None, buffer=None, def __init__(self, request, code, headers=None, buffer=None,
effective_url=None, error=None, request_time=None, effective_url=None, error=None, request_time=None,
time_info=None, reason=None): time_info=None, reason=None, start_time=None):
if isinstance(request, _RequestProxy): if isinstance(request, _RequestProxy):
self.request = request.request self.request = request.request
else: else:
@ -615,6 +633,7 @@ class HTTPResponse(object):
self.error = None self.error = None
else: else:
self.error = error self.error = error
self.start_time = start_time
self.request_time = request_time self.request_time = request_time
self.time_info = time_info or {} self.time_info = time_info or {}

View file

@ -29,10 +29,12 @@ import email.utils
import numbers import numbers
import re import re
import time import time
import unicodedata
import warnings
from tornado.escape import native_str, parse_qs_bytes, utf8 from tornado.escape import native_str, parse_qs_bytes, utf8
from tornado.log import gen_log from tornado.log import gen_log
from tornado.util import ObjectDict, PY3 from tornado.util import ObjectDict, PY3, unicode_type
if PY3: if PY3:
import http.cookies as Cookie import http.cookies as Cookie
@ -380,10 +382,15 @@ class HTTPServerRequest(object):
"""Returns True if this request supports HTTP/1.1 semantics. """Returns True if this request supports HTTP/1.1 semantics.
.. deprecated:: 4.0 .. deprecated:: 4.0
Applications are less likely to need this information with the
introduction of `.HTTPConnection`. If you still need it, access Applications are less likely to need this information with
the ``version`` attribute directly. the introduction of `.HTTPConnection`. If you still need
it, access the ``version`` attribute directly. This method
will be removed in Tornado 6.0.
""" """
warnings.warn("supports_http_1_1() is deprecated, use request.version instead",
DeprecationWarning)
return self.version == "HTTP/1.1" return self.version == "HTTP/1.1"
@property @property
@ -412,8 +419,10 @@ class HTTPServerRequest(object):
.. deprecated:: 4.0 .. deprecated:: 4.0
Use ``request.connection`` and the `.HTTPConnection` methods Use ``request.connection`` and the `.HTTPConnection` methods
to write the response. to write the response. This method will be removed in Tornado 6.0.
""" """
warnings.warn("req.write deprecated, use req.connection.write and write_headers instead",
DeprecationWarning)
assert isinstance(chunk, bytes) assert isinstance(chunk, bytes)
assert self.version.startswith("HTTP/1."), \ assert self.version.startswith("HTTP/1."), \
"deprecated interface only supported in HTTP/1.x" "deprecated interface only supported in HTTP/1.x"
@ -424,8 +433,10 @@ class HTTPServerRequest(object):
.. deprecated:: 4.0 .. deprecated:: 4.0
Use ``request.connection`` and the `.HTTPConnection` methods Use ``request.connection`` and the `.HTTPConnection` methods
to write the response. to write the response. This method will be removed in Tornado 6.0.
""" """
warnings.warn("req.finish deprecated, use req.connection.finish instead",
DeprecationWarning)
self.connection.finish() self.connection.finish()
self._finish_time = time.time() self._finish_time = time.time()
@ -581,6 +592,11 @@ class HTTPConnection(object):
The ``version`` field of ``start_line`` is ignored. The ``version`` field of ``start_line`` is ignored.
Returns a `.Future` if no callback is given. Returns a `.Future` if no callback is given.
.. deprecated:: 5.1
The ``callback`` argument is deprecated and will be removed
in Tornado 6.0.
""" """
raise NotImplementedError() raise NotImplementedError()
@ -589,6 +605,11 @@ class HTTPConnection(object):
The callback will be run when the write is complete. If no callback The callback will be run when the write is complete. If no callback
is given, returns a Future. is given, returns a Future.
.. deprecated:: 5.1
The ``callback`` argument is deprecated and will be removed
in Tornado 6.0.
""" """
raise NotImplementedError() raise NotImplementedError()
@ -762,6 +783,11 @@ def parse_multipart_form_data(boundary, data, arguments, files):
The ``boundary`` and ``data`` parameters are both byte strings. The ``boundary`` and ``data`` parameters are both byte strings.
The dictionaries given in the arguments and files parameters The dictionaries given in the arguments and files parameters
will be updated with the contents of the body. will be updated with the contents of the body.
.. versionchanged:: 5.1
Now recognizes non-ASCII filenames in RFC 2231/5987
(``filename*=``) format.
""" """
# The standard allows for the boundary to be quoted in the header, # The standard allows for the boundary to be quoted in the header,
# although it's rare (it happens at least for google app engine # although it's rare (it happens at least for google app engine
@ -870,7 +896,8 @@ def parse_response_start_line(line):
# The original 2.7 version of this code did not correctly support some # The original 2.7 version of this code did not correctly support some
# combinations of semicolons and double quotes. # combinations of semicolons and double quotes.
# It has also been modified to support valueless parameters as seen in # It has also been modified to support valueless parameters as seen in
# websocket extension negotiations. # websocket extension negotiations, and to support non-ascii values in
# RFC 2231/5987 format.
def _parseparam(s): def _parseparam(s):
@ -887,25 +914,37 @@ def _parseparam(s):
def _parse_header(line): def _parse_header(line):
"""Parse a Content-type like header. r"""Parse a Content-type like header.
Return the main content-type and a dictionary of options. Return the main content-type and a dictionary of options.
>>> d = "form-data; foo=\"b\\\\a\\\"r\"; file*=utf-8''T%C3%A4st"
>>> ct, d = _parse_header(d)
>>> ct
'form-data'
>>> d['file'] == r'T\u00e4st'.encode('ascii').decode('unicode_escape')
True
>>> d['foo']
'b\\a"r'
""" """
parts = _parseparam(';' + line) parts = _parseparam(';' + line)
key = next(parts) key = next(parts)
pdict = {} # decode_params treats first argument special, but we already stripped key
params = [('Dummy', 'value')]
for p in parts: for p in parts:
i = p.find('=') i = p.find('=')
if i >= 0: if i >= 0:
name = p[:i].strip().lower() name = p[:i].strip().lower()
value = p[i + 1:].strip() value = p[i + 1:].strip()
if len(value) >= 2 and value[0] == value[-1] == '"': params.append((name, native_str(value)))
params = email.utils.decode_params(params)
params.pop(0) # get rid of the dummy again
pdict = {}
for name, value in params:
value = email.utils.collapse_rfc2231_value(value)
if len(value) >= 2 and value[0] == '"' and value[-1] == '"':
value = value[1:-1] value = value[1:-1]
value = value.replace('\\\\', '\\').replace('\\"', '"')
pdict[name] = value pdict[name] = value
else:
pdict[p] = None
return key, pdict return key, pdict
@ -929,6 +968,20 @@ def _encode_header(key, pdict):
return '; '.join(out) return '; '.join(out)
def encode_username_password(username, password):
"""Encodes a username/password pair in the format used by HTTP auth.
The return value is a byte string in the form ``username:password``.
.. versionadded:: 5.1
"""
if isinstance(username, unicode_type):
username = unicodedata.normalize('NFC', username)
if isinstance(password, unicode_type):
password = unicodedata.normalize('NFC', password)
return utf8(username) + b":" + utf8(password)
def doctests(): def doctests():
import doctest import doctest
return doctest.DocTestSuite() return doctest.DocTestSuite()

View file

@ -101,13 +101,11 @@ class IOLoop(Configurable):
import socket import socket
import tornado.ioloop import tornado.ioloop
from tornado import gen
from tornado.iostream import IOStream from tornado.iostream import IOStream
@gen.coroutine async def handle_connection(connection, address):
def handle_connection(connection, address):
stream = IOStream(connection) stream = IOStream(connection)
message = yield stream.read_until_close() message = await stream.read_until_close()
print("message from client:", message.decode().strip()) print("message from client:", message.decode().strip())
def connection_ready(sock, fd, events): def connection_ready(sock, fd, events):
@ -119,7 +117,8 @@ class IOLoop(Configurable):
raise raise
return return
connection.setblocking(0) connection.setblocking(0)
handle_connection(connection, address) io_loop = tornado.ioloop.IOLoop.current()
io_loop.spawn_callback(handle_connection, connection, address)
if __name__ == '__main__': if __name__ == '__main__':
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
@ -441,7 +440,8 @@ class IOLoop(Configurable):
.. deprecated:: 5.0 .. deprecated:: 5.0
Not implemented on the `asyncio` event loop. Use the environment Not implemented on the `asyncio` event loop. Use the environment
variable ``PYTHONASYNCIODEBUG=1`` instead. variable ``PYTHONASYNCIODEBUG=1`` instead. This method will be
removed in Tornado 6.0.
""" """
raise NotImplementedError() raise NotImplementedError()
@ -455,7 +455,8 @@ class IOLoop(Configurable):
.. deprecated:: 5.0 .. deprecated:: 5.0
Not implemented on the `asyncio` event loop. Use the environment Not implemented on the `asyncio` event loop. Use the environment
variable ``PYTHONASYNCIODEBUG=1`` instead. variable ``PYTHONASYNCIODEBUG=1`` instead. This method will be
removed in Tornado 6.0.
""" """
self.set_blocking_signal_threshold(seconds, self.log_stack) self.set_blocking_signal_threshold(seconds, self.log_stack)
@ -463,6 +464,10 @@ class IOLoop(Configurable):
"""Signal handler to log the stack trace of the current thread. """Signal handler to log the stack trace of the current thread.
For use with `set_blocking_signal_threshold`. For use with `set_blocking_signal_threshold`.
.. deprecated:: 5.1
This method will be removed in Tornado 6.0.
""" """
gen_log.warning('IOLoop blocked for %f seconds in\n%s', gen_log.warning('IOLoop blocked for %f seconds in\n%s',
self._blocking_signal_threshold, self._blocking_signal_threshold,
@ -498,17 +503,6 @@ class IOLoop(Configurable):
If the event loop is not currently running, the next call to `start()` If the event loop is not currently running, the next call to `start()`
will return immediately. will return immediately.
To use asynchronous methods from otherwise-synchronous code (such as
unit tests), you can start and stop the event loop like this::
ioloop = IOLoop()
async_method(ioloop=ioloop, callback=ioloop.stop)
ioloop.start()
``ioloop.start()`` will return after ``async_method`` has run
its callback, whether that callback was invoked before or
after ``ioloop.start``.
Note that even after `stop` has been called, the `IOLoop` is not Note that even after `stop` has been called, the `IOLoop` is not
completely stopped until `IOLoop.start` has also returned. completely stopped until `IOLoop.start` has also returned.
Some work that was scheduled before the call to `stop` may still Some work that was scheduled before the call to `stop` may still
@ -519,10 +513,10 @@ class IOLoop(Configurable):
def run_sync(self, func, timeout=None): def run_sync(self, func, timeout=None):
"""Starts the `IOLoop`, runs the given function, and stops the loop. """Starts the `IOLoop`, runs the given function, and stops the loop.
The function must return either a yieldable object or The function must return either an awaitable object or
``None``. If the function returns a yieldable object, the ``None``. If the function returns an awaitable object, the
`IOLoop` will run until the yieldable is resolved (and `IOLoop` will run until the awaitable is resolved (and
`run_sync()` will return the yieldable's result). If it raises `run_sync()` will return the awaitable's result). If it raises
an exception, the `IOLoop` will stop and the exception will be an exception, the `IOLoop` will stop and the exception will be
re-raised to the caller. re-raised to the caller.
@ -530,21 +524,21 @@ class IOLoop(Configurable):
a maximum duration for the function. If the timeout expires, a maximum duration for the function. If the timeout expires,
a `tornado.util.TimeoutError` is raised. a `tornado.util.TimeoutError` is raised.
This method is useful in conjunction with `tornado.gen.coroutine` This method is useful to allow asynchronous calls in a
to allow asynchronous calls in a ``main()`` function:: ``main()`` function::
@gen.coroutine async def main():
def main():
# do stuff... # do stuff...
if __name__ == '__main__': if __name__ == '__main__':
IOLoop.current().run_sync(main) IOLoop.current().run_sync(main)
.. versionchanged:: 4.3 .. versionchanged:: 4.3
Returning a non-``None``, non-yieldable value is now an error. Returning a non-``None``, non-awaitable value is now an error.
.. versionchanged:: 5.0 .. versionchanged:: 5.0
If a timeout occurs, the ``func`` coroutine will be cancelled. If a timeout occurs, the ``func`` coroutine will be cancelled.
""" """
future_cell = [None] future_cell = [None]
@ -714,6 +708,10 @@ class IOLoop(Configurable):
The callback is invoked with one argument, the The callback is invoked with one argument, the
`.Future`. `.Future`.
This method only accepts `.Future` objects and not other
awaitables (unlike most of Tornado where the two are
interchangeable).
""" """
assert is_future(future) assert is_future(future)
callback = stack_context.wrap(callback) callback = stack_context.wrap(callback)
@ -789,6 +787,16 @@ class IOLoop(Configurable):
The exception itself is not passed explicitly, but is available The exception itself is not passed explicitly, but is available
in `sys.exc_info`. in `sys.exc_info`.
.. versionchanged:: 5.0
When the `asyncio` event loop is used (which is now the
default on Python 3), some callback errors will be handled by
`asyncio` instead of this method.
.. deprecated: 5.1
Support for this method will be removed in Tornado 6.0.
""" """
app_log.error("Exception in callback %r", callback, exc_info=True) app_log.error("Exception in callback %r", callback, exc_info=True)

View file

@ -33,6 +33,7 @@ import os
import socket import socket
import sys import sys
import re import re
import warnings
from tornado.concurrent import Future from tornado.concurrent import Future
from tornado import ioloop from tornado import ioloop
@ -342,6 +343,12 @@ class BaseIOStream(object):
.. versionchanged:: 4.0 .. versionchanged:: 4.0
Added the ``max_bytes`` argument. The ``callback`` argument is Added the ``max_bytes`` argument. The ``callback`` argument is
now optional and a `.Future` will be returned if it is omitted. now optional and a `.Future` will be returned if it is omitted.
.. deprecated:: 5.1
The ``callback`` argument is deprecated and will be removed
in Tornado 6.0. Use the returned `.Future` instead.
""" """
future = self._set_read_callback(callback) future = self._set_read_callback(callback)
self._read_regex = re.compile(regex) self._read_regex = re.compile(regex)
@ -375,6 +382,11 @@ class BaseIOStream(object):
.. versionchanged:: 4.0 .. versionchanged:: 4.0
Added the ``max_bytes`` argument. The ``callback`` argument is Added the ``max_bytes`` argument. The ``callback`` argument is
now optional and a `.Future` will be returned if it is omitted. now optional and a `.Future` will be returned if it is omitted.
.. deprecated:: 5.1
The ``callback`` argument is deprecated and will be removed
in Tornado 6.0. Use the returned `.Future` instead.
""" """
future = self._set_read_callback(callback) future = self._set_read_callback(callback)
self._read_delimiter = delimiter self._read_delimiter = delimiter
@ -408,11 +420,22 @@ class BaseIOStream(object):
.. versionchanged:: 4.0 .. versionchanged:: 4.0
Added the ``partial`` argument. The callback argument is now Added the ``partial`` argument. The callback argument is now
optional and a `.Future` will be returned if it is omitted. optional and a `.Future` will be returned if it is omitted.
.. deprecated:: 5.1
The ``callback`` and ``streaming_callback`` arguments are
deprecated and will be removed in Tornado 6.0. Use the
returned `.Future` (and ``partial=True`` for
``streaming_callback``) instead.
""" """
future = self._set_read_callback(callback) future = self._set_read_callback(callback)
assert isinstance(num_bytes, numbers.Integral) assert isinstance(num_bytes, numbers.Integral)
self._read_bytes = num_bytes self._read_bytes = num_bytes
self._read_partial = partial self._read_partial = partial
if streaming_callback is not None:
warnings.warn("streaming_callback is deprecated, use partial instead",
DeprecationWarning)
self._streaming_callback = stack_context.wrap(streaming_callback) self._streaming_callback = stack_context.wrap(streaming_callback)
try: try:
self._try_inline_read() self._try_inline_read()
@ -434,6 +457,12 @@ class BaseIOStream(object):
entirely filled with read data. entirely filled with read data.
.. versionadded:: 5.0 .. versionadded:: 5.0
.. deprecated:: 5.1
The ``callback`` argument is deprecated and will be removed
in Tornado 6.0. Use the returned `.Future` instead.
""" """
future = self._set_read_callback(callback) future = self._set_read_callback(callback)
@ -485,8 +514,18 @@ class BaseIOStream(object):
The callback argument is now optional and a `.Future` will The callback argument is now optional and a `.Future` will
be returned if it is omitted. be returned if it is omitted.
.. deprecated:: 5.1
The ``callback`` and ``streaming_callback`` arguments are
deprecated and will be removed in Tornado 6.0. Use the
returned `.Future` (and `read_bytes` with ``partial=True``
for ``streaming_callback``) instead.
""" """
future = self._set_read_callback(callback) future = self._set_read_callback(callback)
if streaming_callback is not None:
warnings.warn("streaming_callback is deprecated, use read_bytes(partial=True) instead",
DeprecationWarning)
self._streaming_callback = stack_context.wrap(streaming_callback) self._streaming_callback = stack_context.wrap(streaming_callback)
if self.closed(): if self.closed():
if self._streaming_callback is not None: if self._streaming_callback is not None:
@ -521,6 +560,12 @@ class BaseIOStream(object):
.. versionchanged:: 4.5 .. versionchanged:: 4.5
Added support for `memoryview` arguments. Added support for `memoryview` arguments.
.. deprecated:: 5.1
The ``callback`` argument is deprecated and will be removed
in Tornado 6.0. Use the returned `.Future` instead.
""" """
self._check_closed() self._check_closed()
if data: if data:
@ -530,6 +575,8 @@ class BaseIOStream(object):
self._write_buffer.append(data) self._write_buffer.append(data)
self._total_write_index += len(data) self._total_write_index += len(data)
if callback is not None: if callback is not None:
warnings.warn("callback argument is deprecated, use returned Future instead",
DeprecationWarning)
self._write_callback = stack_context.wrap(callback) self._write_callback = stack_context.wrap(callback)
future = None future = None
else: else:
@ -546,9 +593,14 @@ class BaseIOStream(object):
def set_close_callback(self, callback): def set_close_callback(self, callback):
"""Call the given callback when the stream is closed. """Call the given callback when the stream is closed.
This is not necessary for applications that use the `.Future` This mostly is not necessary for applications that use the
interface; all outstanding ``Futures`` will resolve with a `.Future` interface; all outstanding ``Futures`` will resolve
`StreamClosedError` when the stream is closed. with a `StreamClosedError` when the stream is closed. However,
it is still useful as a way to signal that the stream has been
closed while no other read or write is in progress.
Unlike other callback-based interfaces, ``set_close_callback``
will not be removed in Tornado 6.0.
""" """
self._close_callback = stack_context.wrap(callback) self._close_callback = stack_context.wrap(callback)
self._maybe_add_error_listener() self._maybe_add_error_listener()
@ -808,6 +860,8 @@ class BaseIOStream(object):
assert self._read_callback is None, "Already reading" assert self._read_callback is None, "Already reading"
assert self._read_future is None, "Already reading" assert self._read_future is None, "Already reading"
if callback is not None: if callback is not None:
warnings.warn("callbacks are deprecated, use returned Future instead",
DeprecationWarning)
self._read_callback = stack_context.wrap(callback) self._read_callback = stack_context.wrap(callback)
else: else:
self._read_future = Future() self._read_future = Future()
@ -1137,24 +1191,23 @@ class IOStream(BaseIOStream):
import tornado.iostream import tornado.iostream
import socket import socket
def send_request(): async def main():
stream.write(b"GET / HTTP/1.0\r\nHost: friendfeed.com\r\n\r\n") s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
stream.read_until(b"\r\n\r\n", on_headers) stream = tornado.iostream.IOStream(s)
await stream.connect(("friendfeed.com", 80))
def on_headers(data): await stream.write(b"GET / HTTP/1.0\r\nHost: friendfeed.com\r\n\r\n")
header_data = await stream.read_until(b"\r\n\r\n")
headers = {} headers = {}
for line in data.split(b"\r\n"): for line in header_data.split(b"\r\n"):
parts = line.split(b":") parts = line.split(b":")
if len(parts) == 2: if len(parts) == 2:
headers[parts[0].strip()] = parts[1].strip() headers[parts[0].strip()] = parts[1].strip()
stream.read_bytes(int(headers[b"Content-Length"]), on_body) body_data = await stream.read_bytes(int(headers[b"Content-Length"]))
print(body_data)
def on_body(data):
print(data)
stream.close() stream.close()
tornado.ioloop.IOLoop.current().stop()
if __name__ == '__main__': if __name__ == '__main__':
tornado.ioloop.IOLoop.current().run_sync(main)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
stream = tornado.iostream.IOStream(s) stream = tornado.iostream.IOStream(s)
stream.connect(("friendfeed.com", 80), send_request) stream.connect(("friendfeed.com", 80), send_request)
@ -1238,9 +1291,17 @@ class IOStream(BaseIOStream):
``ssl_options=dict(cert_reqs=ssl.CERT_NONE)`` or a ``ssl_options=dict(cert_reqs=ssl.CERT_NONE)`` or a
suitably-configured `ssl.SSLContext` to the suitably-configured `ssl.SSLContext` to the
`SSLIOStream` constructor to disable. `SSLIOStream` constructor to disable.
.. deprecated:: 5.1
The ``callback`` argument is deprecated and will be removed
in Tornado 6.0. Use the returned `.Future` instead.
""" """
self._connecting = True self._connecting = True
if callback is not None: if callback is not None:
warnings.warn("callback argument is deprecated, use returned Future instead",
DeprecationWarning)
self._connect_callback = stack_context.wrap(callback) self._connect_callback = stack_context.wrap(callback)
future = None future = None
else: else:
@ -1350,7 +1411,13 @@ class IOStream(BaseIOStream):
return future return future
def _handle_connect(self): def _handle_connect(self):
try:
err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
except socket.error as e:
# Hurd doesn't allow SO_ERROR for loopback sockets because all
# errors for such sockets are reported synchronously.
if errno_from_exception(e) == errno.ENOPROTOOPT:
err = 0
if err != 0: if err != 0:
self.error = socket.error(err, os.strerror(err)) self.error = socket.error(err, os.strerror(err))
# IOLoop implementations may vary: some of them return # IOLoop implementations may vary: some of them return
@ -1524,9 +1591,13 @@ class SSLIOStream(IOStream):
def connect(self, address, callback=None, server_hostname=None): def connect(self, address, callback=None, server_hostname=None):
self._server_hostname = server_hostname self._server_hostname = server_hostname
# Pass a dummy callback to super.connect(), which is slightly # Ignore the result of connect(). If it fails,
# more efficient than letting it return a Future we ignore. # wait_for_handshake will raise an error too. This is
super(SSLIOStream, self).connect(address, callback=lambda: None) # necessary for the old semantics of the connect callback
# (which takes no arguments). In 6.0 this can be refactored to
# be a regular coroutine.
fut = super(SSLIOStream, self).connect(address)
fut.add_done_callback(lambda f: f.exception())
return self.wait_for_handshake(callback) return self.wait_for_handshake(callback)
def _handle_connect(self): def _handle_connect(self):
@ -1570,11 +1641,19 @@ class SSLIOStream(IOStream):
handshake to complete). It may only be called once per stream. handshake to complete). It may only be called once per stream.
.. versionadded:: 4.2 .. versionadded:: 4.2
.. deprecated:: 5.1
The ``callback`` argument is deprecated and will be removed
in Tornado 6.0. Use the returned `.Future` instead.
""" """
if (self._ssl_connect_callback is not None or if (self._ssl_connect_callback is not None or
self._ssl_connect_future is not None): self._ssl_connect_future is not None):
raise RuntimeError("Already waiting") raise RuntimeError("Already waiting")
if callback is not None: if callback is not None:
warnings.warn("callback argument is deprecated, use returned Future instead",
DeprecationWarning)
self._ssl_connect_callback = stack_context.wrap(callback) self._ssl_connect_callback = stack_context.wrap(callback)
future = None future = None
else: else:

View file

@ -61,22 +61,19 @@ class Condition(_TimeoutGarbageCollector):
condition = Condition() condition = Condition()
@gen.coroutine async def waiter():
def waiter():
print("I'll wait right here") print("I'll wait right here")
yield condition.wait() # Yield a Future. await condition.wait()
print("I'm done waiting") print("I'm done waiting")
@gen.coroutine async def notifier():
def notifier():
print("About to notify") print("About to notify")
condition.notify() condition.notify()
print("Done notifying") print("Done notifying")
@gen.coroutine async def runner():
def runner(): # Wait for waiter() and notifier() in parallel
# Yield two Futures; wait for waiter() and notifier() to finish. await gen.multi([waiter(), notifier()])
yield [waiter(), notifier()]
IOLoop.current().run_sync(runner) IOLoop.current().run_sync(runner)
@ -93,12 +90,12 @@ class Condition(_TimeoutGarbageCollector):
io_loop = IOLoop.current() io_loop = IOLoop.current()
# Wait up to 1 second for a notification. # Wait up to 1 second for a notification.
yield condition.wait(timeout=io_loop.time() + 1) await condition.wait(timeout=io_loop.time() + 1)
...or a `datetime.timedelta` for a timeout relative to the current time:: ...or a `datetime.timedelta` for a timeout relative to the current time::
# Wait up to 1 second. # Wait up to 1 second.
yield condition.wait(timeout=datetime.timedelta(seconds=1)) await condition.wait(timeout=datetime.timedelta(seconds=1))
The method returns False if there's no notification before the deadline. The method returns False if there's no notification before the deadline.
@ -170,22 +167,19 @@ class Event(object):
event = Event() event = Event()
@gen.coroutine async def waiter():
def waiter():
print("Waiting for event") print("Waiting for event")
yield event.wait() await event.wait()
print("Not waiting this time") print("Not waiting this time")
yield event.wait() await event.wait()
print("Done") print("Done")
@gen.coroutine async def setter():
def setter():
print("About to set the event") print("About to set the event")
event.set() event.set()
@gen.coroutine async def runner():
def runner(): await gen.multi([waiter(), setter()])
yield [waiter(), setter()]
IOLoop.current().run_sync(runner) IOLoop.current().run_sync(runner)
@ -290,12 +284,11 @@ class Semaphore(_TimeoutGarbageCollector):
# Ensure reliable doctest output: resolve Futures one at a time. # Ensure reliable doctest output: resolve Futures one at a time.
futures_q = deque([Future() for _ in range(3)]) futures_q = deque([Future() for _ in range(3)])
@gen.coroutine async def simulator(futures):
def simulator(futures):
for f in futures: for f in futures:
# simulate the asynchronous passage of time # simulate the asynchronous passage of time
yield gen.moment await gen.sleep(0)
yield gen.moment await gen.sleep(0)
f.set_result(None) f.set_result(None)
IOLoop.current().add_callback(simulator, list(futures_q)) IOLoop.current().add_callback(simulator, list(futures_q))
@ -311,20 +304,18 @@ class Semaphore(_TimeoutGarbageCollector):
sem = Semaphore(2) sem = Semaphore(2)
@gen.coroutine async def worker(worker_id):
def worker(worker_id): await sem.acquire()
yield sem.acquire()
try: try:
print("Worker %d is working" % worker_id) print("Worker %d is working" % worker_id)
yield use_some_resource() await use_some_resource()
finally: finally:
print("Worker %d is done" % worker_id) print("Worker %d is done" % worker_id)
sem.release() sem.release()
@gen.coroutine async def runner():
def runner():
# Join all workers. # Join all workers.
yield [worker(i) for i in range(3)] await gen.multi([worker(i) for i in range(3)])
IOLoop.current().run_sync(runner) IOLoop.current().run_sync(runner)
@ -340,7 +331,18 @@ class Semaphore(_TimeoutGarbageCollector):
Workers 0 and 1 are allowed to run concurrently, but worker 2 waits until Workers 0 and 1 are allowed to run concurrently, but worker 2 waits until
the semaphore has been released once, by worker 0. the semaphore has been released once, by worker 0.
`.acquire` is a context manager, so ``worker`` could be written as:: The semaphore can be used as an async context manager::
async def worker(worker_id):
async with sem:
print("Worker %d is working" % worker_id)
await use_some_resource()
# Now the semaphore has been released.
print("Worker %d is done" % worker_id)
For compatibility with older versions of Python, `.acquire` is a
context manager, so ``worker`` could also be written as::
@gen.coroutine @gen.coroutine
def worker(worker_id): def worker(worker_id):
@ -351,19 +353,9 @@ class Semaphore(_TimeoutGarbageCollector):
# Now the semaphore has been released. # Now the semaphore has been released.
print("Worker %d is done" % worker_id) print("Worker %d is done" % worker_id)
In Python 3.5, the semaphore itself can be used as an async context
manager::
async def worker(worker_id):
async with sem:
print("Worker %d is working" % worker_id)
await use_some_resource()
# Now the semaphore has been released.
print("Worker %d is done" % worker_id)
.. versionchanged:: 4.3 .. versionchanged:: 4.3
Added ``async with`` support in Python 3.5. Added ``async with`` support in Python 3.5.
""" """
def __init__(self, value=1): def __init__(self, value=1):
super(Semaphore, self).__init__() super(Semaphore, self).__init__()
@ -464,26 +456,24 @@ class Lock(object):
Releasing an unlocked lock raises `RuntimeError`. Releasing an unlocked lock raises `RuntimeError`.
`acquire` supports the context manager protocol in all Python versions: A Lock can be used as an async context manager with the ``async
with`` statement:
>>> from tornado import gen, locks >>> from tornado import locks
>>> lock = locks.Lock() >>> lock = locks.Lock()
>>> >>>
>>> @gen.coroutine >>> async def f():
... def f(): ... async with lock:
... with (yield lock.acquire()):
... # Do something holding the lock. ... # Do something holding the lock.
... pass ... pass
... ...
... # Now the lock is released. ... # Now the lock is released.
In Python 3.5, `Lock` also supports the async context manager For compatibility with older versions of Python, the `.acquire`
protocol. Note that in this case there is no `acquire`, because method asynchronously returns a regular context manager:
``async with`` includes both the ``yield`` and the ``acquire``
(just as it does with `threading.Lock`):
>>> async def f2(): # doctest: +SKIP >>> async def f2():
... async with lock: ... with (yield lock.acquire()):
... # Do something holding the lock. ... # Do something holding the lock.
... pass ... pass
... ...

View file

@ -138,7 +138,12 @@ def bind_sockets(port, address=None, family=socket.AF_UNSPEC,
raise raise
set_close_exec(sock.fileno()) set_close_exec(sock.fileno())
if os.name != 'nt': if os.name != 'nt':
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
except socket.error as e:
if errno_from_exception(e) != errno.ENOPROTOOPT:
# Hurd doesn't support SO_REUSEADDR.
raise
if reuse_port: if reuse_port:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
if af == socket.AF_INET6: if af == socket.AF_INET6:
@ -180,7 +185,12 @@ if hasattr(socket, 'AF_UNIX'):
""" """
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
set_close_exec(sock.fileno()) set_close_exec(sock.fileno())
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
except socket.error as e:
if errno_from_exception(e) != errno.ENOPROTOOPT:
# Hurd doesn't support SO_REUSEADDR
raise
sock.setblocking(0) sock.setblocking(0)
try: try:
st = os.stat(file) st = os.stat(file)

View file

@ -91,7 +91,6 @@ instances to define isolated sets of options, such as for subcommands.
options can be defined, set, and read with any mix of the two. options can be defined, set, and read with any mix of the two.
Dashes are typical for command-line usage while config files require Dashes are typical for command-line usage while config files require
underscores. underscores.
""" """
from __future__ import absolute_import, division, print_function from __future__ import absolute_import, division, print_function
@ -326,18 +325,20 @@ class OptionParser(object):
the global namespace that matches a defined option will be the global namespace that matches a defined option will be
used to set that option's value. used to set that option's value.
Options are not parsed from strings as they would be on the Options may either be the specified type for the option or
command line; they should be set to the correct type (this strings (in which case they will be parsed the same way as in
means if you have ``datetime`` or ``timedelta`` options you `.parse_command_line`)
will need to import those modules in the config file.
Example (using the options defined in the top-level docs of Example (using the options defined in the top-level docs of
this module):: this module)::
port = 80 port = 80
mysql_host = 'mydb.example.com:3306' mysql_host = 'mydb.example.com:3306'
# Both lists and comma-separated strings are allowed for
# multiple=True.
memcache_hosts = ['cache1.example.com:11011', memcache_hosts = ['cache1.example.com:11011',
'cache2.example.com:11011'] 'cache2.example.com:11011']
memcache_hosts = 'cache1.example.com:11011,cache2.example.com:11011'
If ``final`` is ``False``, parse callbacks will not be run. If ``final`` is ``False``, parse callbacks will not be run.
This is useful for applications that wish to combine configurations This is useful for applications that wish to combine configurations
@ -358,6 +359,9 @@ class OptionParser(object):
The special variable ``__file__`` is available inside config The special variable ``__file__`` is available inside config
files, specifying the absolute path to the config file itself. files, specifying the absolute path to the config file itself.
.. versionchanged:: 5.1
Added the ability to set options via strings in config files.
""" """
config = {'__file__': os.path.abspath(path)} config = {'__file__': os.path.abspath(path)}
with open(path, 'rb') as f: with open(path, 'rb') as f:
@ -365,7 +369,17 @@ class OptionParser(object):
for name in config: for name in config:
normalized = self._normalize_name(name) normalized = self._normalize_name(name)
if normalized in self._options: if normalized in self._options:
self._options[normalized].set(config[name]) option = self._options[normalized]
if option.multiple:
if not isinstance(config[name], (list, str)):
raise Error("Option %r is required to be a list of %s "
"or a comma-separated string" %
(option.name, option.type.__name__))
if type(config[name]) == str and option.type != str:
option.parse(config[name])
else:
option.set(config[name])
if final: if final:
self.run_parse_callbacks() self.run_parse_callbacks()

View file

@ -62,8 +62,13 @@ class BaseAsyncIOLoop(IOLoop):
self.remove_handler(fd) self.remove_handler(fd)
if all_fds: if all_fds:
self.close_fd(fileobj) self.close_fd(fileobj)
self.asyncio_loop.close() # Remove the mapping before closing the asyncio loop. If this
# happened in the other order, we could race against another
# initialize() call which would see the closed asyncio loop,
# assume it was closed from the asyncio side, and do this
# cleanup for us, leading to a KeyError.
del IOLoop._ioloop_for_asyncio[self.asyncio_loop] del IOLoop._ioloop_for_asyncio[self.asyncio_loop]
self.asyncio_loop.close()
def add_handler(self, fd, handler, events): def add_handler(self, fd, handler, events):
fd, fileobj = self.split_fd(fd) fd, fileobj = self.split_fd(fd)

View file

@ -124,6 +124,13 @@ class TornadoReactor(PosixReactorBase):
.. versionchanged:: 5.0 .. versionchanged:: 5.0
The ``io_loop`` argument (deprecated since version 4.1) has been removed. The ``io_loop`` argument (deprecated since version 4.1) has been removed.
.. deprecated:: 5.1
This class will be removed in Tornado 6.0. Use
``twisted.internet.asyncioreactor.AsyncioSelectorReactor``
instead.
""" """
def __init__(self): def __init__(self):
self._io_loop = tornado.ioloop.IOLoop.current() self._io_loop = tornado.ioloop.IOLoop.current()
@ -350,6 +357,10 @@ def install():
.. versionchanged:: 5.0 .. versionchanged:: 5.0
The ``io_loop`` argument (deprecated since version 4.1) has been removed. The ``io_loop`` argument (deprecated since version 4.1) has been removed.
.. deprecated:: 5.1
This functio will be removed in Tornado 6.0. Use
``twisted.internet.asyncioreactor.install`` instead.
""" """
reactor = TornadoReactor() reactor = TornadoReactor()
from twisted.internet.main import installReactor # type: ignore from twisted.internet.main import installReactor # type: ignore
@ -411,6 +422,11 @@ class TwistedIOLoop(tornado.ioloop.IOLoop):
See also :meth:`tornado.ioloop.IOLoop.install` for general notes on See also :meth:`tornado.ioloop.IOLoop.install` for general notes on
installing alternative IOLoops. installing alternative IOLoops.
.. deprecated:: 5.1
The `asyncio` event loop will be the only available implementation in
Tornado 6.0.
""" """
def initialize(self, reactor=None, **kwargs): def initialize(self, reactor=None, **kwargs):
super(TwistedIOLoop, self).initialize(**kwargs) super(TwistedIOLoop, self).initialize(**kwargs)

View file

@ -79,28 +79,24 @@ class Queue(object):
q = Queue(maxsize=2) q = Queue(maxsize=2)
@gen.coroutine async def consumer():
def consumer(): async for item in q:
while True:
item = yield q.get()
try: try:
print('Doing work on %s' % item) print('Doing work on %s' % item)
yield gen.sleep(0.01) await gen.sleep(0.01)
finally: finally:
q.task_done() q.task_done()
@gen.coroutine async def producer():
def producer():
for item in range(5): for item in range(5):
yield q.put(item) await q.put(item)
print('Put %s' % item) print('Put %s' % item)
@gen.coroutine async def main():
def main():
# Start consumer without waiting (since it never finishes). # Start consumer without waiting (since it never finishes).
IOLoop.current().spawn_callback(consumer) IOLoop.current().spawn_callback(consumer)
yield producer() # Wait for producer to put all tasks. await producer() # Wait for producer to put all tasks.
yield q.join() # Wait for consumer to finish all tasks. await q.join() # Wait for consumer to finish all tasks.
print('Done') print('Done')
IOLoop.current().run_sync(main) IOLoop.current().run_sync(main)
@ -119,11 +115,14 @@ class Queue(object):
Doing work on 4 Doing work on 4
Done Done
In Python 3.5, `Queue` implements the async iterator protocol, so
``consumer()`` could be rewritten as::
async def consumer(): In versions of Python without native coroutines (before 3.5),
async for item in q: ``consumer()`` could be written as::
@gen.coroutine
def consumer():
while True:
item = yield q.get()
try: try:
print('Doing work on %s' % item) print('Doing work on %s' % item)
yield gen.sleep(0.01) yield gen.sleep(0.01)

View file

@ -1,6 +1,6 @@
from __future__ import absolute_import, division, print_function from __future__ import absolute_import, division, print_function
from tornado.escape import utf8, _unicode from tornado.escape import _unicode
from tornado import gen from tornado import gen
from tornado.httpclient import HTTPResponse, HTTPError, AsyncHTTPClient, main, _RequestProxy from tornado.httpclient import HTTPResponse, HTTPError, AsyncHTTPClient, main, _RequestProxy
from tornado import httputil from tornado import httputil
@ -20,6 +20,7 @@ import functools
import re import re
import socket import socket
import sys import sys
import time
from io import BytesIO from io import BytesIO
@ -215,6 +216,7 @@ class _HTTPConnection(httputil.HTTPMessageDelegate):
max_header_size, max_body_size): max_header_size, max_body_size):
self.io_loop = IOLoop.current() self.io_loop = IOLoop.current()
self.start_time = self.io_loop.time() self.start_time = self.io_loop.time()
self.start_wall_time = time.time()
self.client = client self.client = client
self.request = request self.request = request
self.release_callback = release_callback self.release_callback = release_callback
@ -230,7 +232,11 @@ class _HTTPConnection(httputil.HTTPMessageDelegate):
# Timeout handle returned by IOLoop.add_timeout # Timeout handle returned by IOLoop.add_timeout
self._timeout = None self._timeout = None
self._sockaddr = None self._sockaddr = None
with stack_context.ExceptionStackContext(self._handle_exception): IOLoop.current().add_callback(self.run)
@gen.coroutine
def run(self):
try:
self.parsed = urlparse.urlsplit(_unicode(self.request.url)) self.parsed = urlparse.urlsplit(_unicode(self.request.url))
if self.parsed.scheme not in ("http", "https"): if self.parsed.scheme not in ("http", "https"):
raise ValueError("Unsupported url scheme: %s" % raise ValueError("Unsupported url scheme: %s" %
@ -248,7 +254,7 @@ class _HTTPConnection(httputil.HTTPMessageDelegate):
host = host[1:-1] host = host[1:-1]
self.parsed_hostname = host # save final host for _on_connect self.parsed_hostname = host # save final host for _on_connect
if request.allow_ipv6 is False: if self.request.allow_ipv6 is False:
af = socket.AF_INET af = socket.AF_INET
else: else:
af = socket.AF_UNSPEC af = socket.AF_UNSPEC
@ -260,56 +266,11 @@ class _HTTPConnection(httputil.HTTPMessageDelegate):
self._timeout = self.io_loop.add_timeout( self._timeout = self.io_loop.add_timeout(
self.start_time + timeout, self.start_time + timeout,
stack_context.wrap(functools.partial(self._on_timeout, "while connecting"))) stack_context.wrap(functools.partial(self._on_timeout, "while connecting")))
fut = self.tcp_client.connect(host, port, af=af, stream = yield self.tcp_client.connect(
host, port, af=af,
ssl_options=ssl_options, ssl_options=ssl_options,
max_buffer_size=self.max_buffer_size) max_buffer_size=self.max_buffer_size)
fut.add_done_callback(stack_context.wrap(self._on_connect))
def _get_ssl_options(self, scheme):
if scheme == "https":
if self.request.ssl_options is not None:
return self.request.ssl_options
# If we are using the defaults, don't construct a
# new SSLContext.
if (self.request.validate_cert and
self.request.ca_certs is None and
self.request.client_cert is None and
self.request.client_key is None):
return _client_ssl_defaults
ssl_ctx = ssl.create_default_context(
ssl.Purpose.SERVER_AUTH,
cafile=self.request.ca_certs)
if not self.request.validate_cert:
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
if self.request.client_cert is not None:
ssl_ctx.load_cert_chain(self.request.client_cert,
self.request.client_key)
if hasattr(ssl, 'OP_NO_COMPRESSION'):
# See netutil.ssl_options_to_context
ssl_ctx.options |= ssl.OP_NO_COMPRESSION
return ssl_ctx
return None
def _on_timeout(self, info=None):
"""Timeout callback of _HTTPConnection instance.
Raise a `HTTPTimeoutError` when a timeout occurs.
:info string key: More detailed timeout information.
"""
self._timeout = None
error_message = "Timeout {0}".format(info) if info else "Timeout"
if self.final_callback is not None:
raise HTTPTimeoutError(error_message)
def _remove_timeout(self):
if self._timeout is not None:
self.io_loop.remove_timeout(self._timeout)
self._timeout = None
def _on_connect(self, stream_fut):
stream = stream_fut.result()
if self.final_callback is None: if self.final_callback is None:
# final_callback is cleared if we've hit our timeout. # final_callback is cleared if we've hit our timeout.
stream.close() stream.close()
@ -349,9 +310,9 @@ class _HTTPConnection(httputil.HTTPMessageDelegate):
if self.request.auth_mode not in (None, "basic"): if self.request.auth_mode not in (None, "basic"):
raise ValueError("unsupported auth_mode %s", raise ValueError("unsupported auth_mode %s",
self.request.auth_mode) self.request.auth_mode)
auth = utf8(username) + b":" + utf8(password) self.request.headers["Authorization"] = (
self.request.headers["Authorization"] = (b"Basic " + b"Basic " + base64.b64encode(
base64.b64encode(auth)) httputil.encode_username_password(username, password)))
if self.request.user_agent: if self.request.user_agent:
self.request.headers["User-Agent"] = self.request.user_agent self.request.headers["User-Agent"] = self.request.user_agent
if not self.request.allow_nonstandard_methods: if not self.request.allow_nonstandard_methods:
@ -386,9 +347,56 @@ class _HTTPConnection(httputil.HTTPMessageDelegate):
req_path, '') req_path, '')
self.connection.write_headers(start_line, self.request.headers) self.connection.write_headers(start_line, self.request.headers)
if self.request.expect_100_continue: if self.request.expect_100_continue:
self._read_response() yield self.connection.read_response(self)
else: else:
self._write_body(True) yield self._write_body(True)
except Exception:
if not self._handle_exception(*sys.exc_info()):
raise
def _get_ssl_options(self, scheme):
if scheme == "https":
if self.request.ssl_options is not None:
return self.request.ssl_options
# If we are using the defaults, don't construct a
# new SSLContext.
if (self.request.validate_cert and
self.request.ca_certs is None and
self.request.client_cert is None and
self.request.client_key is None):
return _client_ssl_defaults
ssl_ctx = ssl.create_default_context(
ssl.Purpose.SERVER_AUTH,
cafile=self.request.ca_certs)
if not self.request.validate_cert:
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
if self.request.client_cert is not None:
ssl_ctx.load_cert_chain(self.request.client_cert,
self.request.client_key)
if hasattr(ssl, 'OP_NO_COMPRESSION'):
# See netutil.ssl_options_to_context
ssl_ctx.options |= ssl.OP_NO_COMPRESSION
return ssl_ctx
return None
def _on_timeout(self, info=None):
"""Timeout callback of _HTTPConnection instance.
Raise a `HTTPTimeoutError` when a timeout occurs.
:info string key: More detailed timeout information.
"""
self._timeout = None
error_message = "Timeout {0}".format(info) if info else "Timeout"
if self.final_callback is not None:
self._handle_exception(HTTPTimeoutError, HTTPTimeoutError(error_message),
None)
def _remove_timeout(self):
if self._timeout is not None:
self.io_loop.remove_timeout(self._timeout)
self._timeout = None
def _create_connection(self, stream): def _create_connection(self, stream):
stream.set_nodelay(True) stream.set_nodelay(True)
@ -402,31 +410,21 @@ class _HTTPConnection(httputil.HTTPMessageDelegate):
self._sockaddr) self._sockaddr)
return connection return connection
@gen.coroutine
def _write_body(self, start_read): def _write_body(self, start_read):
if self.request.body is not None: if self.request.body is not None:
self.connection.write(self.request.body) self.connection.write(self.request.body)
elif self.request.body_producer is not None: elif self.request.body_producer is not None:
fut = self.request.body_producer(self.connection.write) fut = self.request.body_producer(self.connection.write)
if fut is not None: if fut is not None:
fut = gen.convert_yielded(fut) yield fut
def on_body_written(fut):
fut.result()
self.connection.finish() self.connection.finish()
if start_read: if start_read:
self._read_response() try:
self.io_loop.add_future(fut, on_body_written) yield self.connection.read_response(self)
return except StreamClosedError:
self.connection.finish() if not self._handle_exception(*sys.exc_info()):
if start_read: raise
self._read_response()
def _read_response(self):
# Ensure that any exception raised in read_response ends up in our
# stack context.
self.io_loop.add_future(
self.connection.read_response(self),
lambda f: f.result())
def _release(self): def _release(self):
if self.release_callback is not None: if self.release_callback is not None:
@ -451,6 +449,7 @@ class _HTTPConnection(httputil.HTTPMessageDelegate):
value = value.real_error value = value.real_error
self._run_callback(HTTPResponse(self.request, 599, error=value, self._run_callback(HTTPResponse(self.request, 599, error=value,
request_time=self.io_loop.time() - self.start_time, request_time=self.io_loop.time() - self.start_time,
start_time=self.start_wall_time,
)) ))
if hasattr(self, "stream"): if hasattr(self, "stream"):
@ -543,6 +542,7 @@ class _HTTPConnection(httputil.HTTPMessageDelegate):
self.code, reason=getattr(self, 'reason', None), self.code, reason=getattr(self, 'reason', None),
headers=self.headers, headers=self.headers,
request_time=self.io_loop.time() - self.start_time, request_time=self.io_loop.time() - self.start_time,
start_time=self.start_wall_time,
buffer=buffer, buffer=buffer,
effective_url=self.request.url) effective_url=self.request.url)
self._run_callback(response) self._run_callback(response)

View file

@ -64,12 +64,18 @@ Here are a few rules of thumb for when it's necessary:
persist across asynchronous calls, create a new `StackContext` (or persist across asynchronous calls, create a new `StackContext` (or
`ExceptionStackContext`), and make your asynchronous calls in a ``with`` `ExceptionStackContext`), and make your asynchronous calls in a ``with``
block that references your `StackContext`. block that references your `StackContext`.
.. deprecated:: 5.1
The ``stack_context`` package is deprecated and will be removed in
Tornado 6.0.
""" """
from __future__ import absolute_import, division, print_function from __future__ import absolute_import, division, print_function
import sys import sys
import threading import threading
import warnings
from tornado.util import raise_exc_info from tornado.util import raise_exc_info
@ -107,6 +113,8 @@ class StackContext(object):
and not necessary in most applications. and not necessary in most applications.
""" """
def __init__(self, context_factory): def __init__(self, context_factory):
warnings.warn("StackContext is deprecated and will be removed in Tornado 6.0",
DeprecationWarning)
self.context_factory = context_factory self.context_factory = context_factory
self.contexts = [] self.contexts = []
self.active = True self.active = True
@ -174,8 +182,20 @@ class ExceptionStackContext(object):
If the exception handler returns true, the exception will be If the exception handler returns true, the exception will be
consumed and will not be propagated to other exception handlers. consumed and will not be propagated to other exception handlers.
.. versionadded:: 5.1
The ``delay_warning`` argument can be used to delay the emission
of DeprecationWarnings until an exception is caught by the
``ExceptionStackContext``, which facilitates certain transitional
use cases.
""" """
def __init__(self, exception_handler): def __init__(self, exception_handler, delay_warning=False):
self.delay_warning = delay_warning
if not self.delay_warning:
warnings.warn(
"StackContext is deprecated and will be removed in Tornado 6.0",
DeprecationWarning)
self.exception_handler = exception_handler self.exception_handler = exception_handler
self.active = True self.active = True
@ -184,6 +204,10 @@ class ExceptionStackContext(object):
def exit(self, type, value, traceback): def exit(self, type, value, traceback):
if type is not None: if type is not None:
if self.delay_warning:
warnings.warn(
"StackContext is deprecated and will be removed in Tornado 6.0",
DeprecationWarning)
return self.exception_handler(type, value, traceback) return self.exception_handler(type, value, traceback)
def __enter__(self): def __enter__(self):

View file

@ -46,12 +46,11 @@ class TCPServer(object):
from tornado import gen from tornado import gen
class EchoServer(TCPServer): class EchoServer(TCPServer):
@gen.coroutine async def handle_stream(self, stream, address):
def handle_stream(self, stream, address):
while True: while True:
try: try:
data = yield stream.read_until(b"\n") data = await stream.read_until(b"\n")
yield stream.write(data) await stream.write(data)
except StreamClosedError: except StreamClosedError:
break break

View file

@ -145,14 +145,15 @@ class AsyncTestCase(unittest.TestCase):
The unittest framework is synchronous, so the test must be The unittest framework is synchronous, so the test must be
complete by the time the test method returns. This means that complete by the time the test method returns. This means that
asynchronous code cannot be used in quite the same way as usual. asynchronous code cannot be used in quite the same way as usual
To write test functions that use the same ``yield``-based patterns and must be adapted to fit. To write your tests with coroutines,
used with the `tornado.gen` module, decorate your test methods decorate your test methods with `tornado.testing.gen_test` instead
with `tornado.testing.gen_test` instead of of `tornado.gen.coroutine`.
`tornado.gen.coroutine`. This class also provides the `stop()`
and `wait()` methods for a more manual style of testing. The test This class also provides the (deprecated) `stop()` and `wait()`
method itself must call ``self.wait()``, and asynchronous methods for a more manual style of testing. The test method itself
callbacks should call ``self.stop()`` to signal completion. must call ``self.wait()``, and asynchronous callbacks should call
``self.stop()`` to signal completion.
By default, a new `.IOLoop` is constructed for each test and is available By default, a new `.IOLoop` is constructed for each test and is available
as ``self.io_loop``. If the code being tested requires a as ``self.io_loop``. If the code being tested requires a
@ -183,22 +184,6 @@ class AsyncTestCase(unittest.TestCase):
response = self.wait() response = self.wait()
# Test contents of response # Test contents of response
self.assertIn("FriendFeed", response.body) self.assertIn("FriendFeed", response.body)
# This test uses an explicit callback-based style.
class MyTestCase3(AsyncTestCase):
def test_http_fetch(self):
client = AsyncHTTPClient()
client.fetch("http://www.tornadoweb.org/", self.handle_fetch)
self.wait()
def handle_fetch(self, response):
# Test contents of response (failures and exceptions here
# will cause self.wait() to throw an exception and end the
# test).
# Exceptions thrown here are magically propagated to
# self.wait() in test_http_fetch() via stack_context.
self.assertIn("FriendFeed", response.body)
self.stop()
""" """
def __init__(self, methodName='runTest'): def __init__(self, methodName='runTest'):
super(AsyncTestCase, self).__init__(methodName) super(AsyncTestCase, self).__init__(methodName)
@ -265,7 +250,7 @@ class AsyncTestCase(unittest.TestCase):
raise_exc_info(failure) raise_exc_info(failure)
def run(self, result=None): def run(self, result=None):
with ExceptionStackContext(self._handle_exception): with ExceptionStackContext(self._handle_exception, delay_warning=True):
super(AsyncTestCase, self).run(result) super(AsyncTestCase, self).run(result)
# As a last resort, if an exception escaped super.run() and wasn't # As a last resort, if an exception escaped super.run() and wasn't
# re-raised in tearDown, raise it here. This will cause the # re-raised in tearDown, raise it here. This will cause the
@ -279,6 +264,10 @@ class AsyncTestCase(unittest.TestCase):
Keyword arguments or a single positional argument passed to `stop()` are Keyword arguments or a single positional argument passed to `stop()` are
saved and will be returned by `wait()`. saved and will be returned by `wait()`.
.. deprecated:: 5.1
`stop` and `wait` are deprecated; use ``@gen_test`` instead.
""" """
assert _arg is None or not kwargs assert _arg is None or not kwargs
self.__stop_args = kwargs or _arg self.__stop_args = kwargs or _arg
@ -300,6 +289,10 @@ class AsyncTestCase(unittest.TestCase):
.. versionchanged:: 3.1 .. versionchanged:: 3.1
Added the ``ASYNC_TEST_TIMEOUT`` environment variable. Added the ``ASYNC_TEST_TIMEOUT`` environment variable.
.. deprecated:: 5.1
`stop` and `wait` are deprecated; use ``@gen_test`` instead.
""" """
if timeout is None: if timeout is None:
timeout = get_async_test_timeout() timeout = get_async_test_timeout()

View file

@ -78,6 +78,7 @@ import time
import tornado import tornado
import traceback import traceback
import types import types
import warnings
from inspect import isclass from inspect import isclass
from io import BytesIO from io import BytesIO
@ -542,6 +543,10 @@ class RequestHandler(object):
Newly-set cookies are not immediately visible via `get_cookie`; Newly-set cookies are not immediately visible via `get_cookie`;
they are not present until the next request. they are not present until the next request.
expires may be a numeric timestamp as returned by `time.time`,
a time tuple as returned by `time.gmtime`, or a
`datetime.datetime` object.
Additional keyword arguments are set on the cookies.Morsel Additional keyword arguments are set on the cookies.Morsel
directly. directly.
See https://docs.python.org/3/library/http.cookies.html#http.cookies.Morsel See https://docs.python.org/3/library/http.cookies.html#http.cookies.Morsel
@ -744,7 +749,18 @@ class RequestHandler(object):
self._write_buffer.append(chunk) self._write_buffer.append(chunk)
def render(self, template_name, **kwargs): def render(self, template_name, **kwargs):
"""Renders the template with the given arguments as the response.""" """Renders the template with the given arguments as the response.
``render()`` calls ``finish()``, so no other output methods can be called
after it.
Returns a `.Future` with the same semantics as the one returned by `finish`.
Awaiting this `.Future` is optional.
.. versionchanged:: 5.1
Now returns a `.Future` instead of ``None``.
"""
if self._finished: if self._finished:
raise RuntimeError("Cannot render() after finish()") raise RuntimeError("Cannot render() after finish()")
html = self.render_string(template_name, **kwargs) html = self.render_string(template_name, **kwargs)
@ -805,7 +821,7 @@ class RequestHandler(object):
if html_bodies: if html_bodies:
hloc = html.index(b'</body>') hloc = html.index(b'</body>')
html = html[:hloc] + b''.join(html_bodies) + b'\n' + html[hloc:] html = html[:hloc] + b''.join(html_bodies) + b'\n' + html[hloc:]
self.finish(html) return self.finish(html)
def render_linked_js(self, js_files): def render_linked_js(self, js_files):
"""Default method used to render the final js links for the """Default method used to render the final js links for the
@ -945,6 +961,11 @@ class RequestHandler(object):
.. versionchanged:: 4.0 .. versionchanged:: 4.0
Now returns a `.Future` if no callback is given. Now returns a `.Future` if no callback is given.
.. deprecated:: 5.1
The ``callback`` argument is deprecated and will be removed in
Tornado 6.0.
""" """
chunk = b"".join(self._write_buffer) chunk = b"".join(self._write_buffer)
self._write_buffer = [] self._write_buffer = []
@ -983,7 +1004,20 @@ class RequestHandler(object):
return future return future
def finish(self, chunk=None): def finish(self, chunk=None):
"""Finishes this response, ending the HTTP request.""" """Finishes this response, ending the HTTP request.
Passing a ``chunk`` to ``finish()`` is equivalent to passing that
chunk to ``write()`` and then calling ``finish()`` with no arguments.
Returns a `.Future` which may optionally be awaited to track the sending
of the response to the client. This `.Future` resolves when all the response
data has been sent, and raises an error if the connection is closed before all
data can be sent.
.. versionchanged:: 5.1
Now returns a `.Future` instead of ``None``.
"""
if self._finished: if self._finished:
raise RuntimeError("finish() called twice") raise RuntimeError("finish() called twice")
@ -1015,12 +1049,27 @@ class RequestHandler(object):
# are keepalive connections) # are keepalive connections)
self.request.connection.set_close_callback(None) self.request.connection.set_close_callback(None)
self.flush(include_footers=True) future = self.flush(include_footers=True)
self.request.finish() self.request.connection.finish()
self._log() self._log()
self._finished = True self._finished = True
self.on_finish() self.on_finish()
self._break_cycles() self._break_cycles()
return future
def detach(self):
"""Take control of the underlying stream.
Returns the underlying `.IOStream` object and stops all
further HTTP processing. Intended for implementing protocols
like websockets that tunnel over an HTTP handshake.
This method is only supported when HTTP/1.1 is used.
.. versionadded:: 5.1
"""
self._finished = True
return self.request.connection.detach()
def _break_cycles(self): def _break_cycles(self):
# Break up a reference cycle between this handler and the # Break up a reference cycle between this handler and the
@ -1688,7 +1737,14 @@ def asynchronous(method):
.. versionchanged:: 4.3 Returning anything but ``None`` or a .. versionchanged:: 4.3 Returning anything but ``None`` or a
yieldable object from a method decorated with ``@asynchronous`` yieldable object from a method decorated with ``@asynchronous``
is an error. Such return values were previously ignored silently. is an error. Such return values were previously ignored silently.
.. deprecated:: 5.1
This decorator is deprecated and will be removed in Tornado 6.0.
Use coroutines instead.
""" """
warnings.warn("@asynchronous is deprecated, use coroutines instead",
DeprecationWarning)
# Delay the IOLoop import because it's not available on app engine. # Delay the IOLoop import because it's not available on app engine.
from tornado.ioloop import IOLoop from tornado.ioloop import IOLoop
@ -1696,7 +1752,7 @@ def asynchronous(method):
def wrapper(self, *args, **kwargs): def wrapper(self, *args, **kwargs):
self._auto_finish = False self._auto_finish = False
with stack_context.ExceptionStackContext( with stack_context.ExceptionStackContext(
self._stack_context_handle_exception): self._stack_context_handle_exception, delay_warning=True):
result = method(self, *args, **kwargs) result = method(self, *args, **kwargs)
if result is not None: if result is not None:
result = gen.convert_yielded(result) result = gen.convert_yielded(result)

View file

@ -21,6 +21,7 @@ from __future__ import absolute_import, division, print_function
import base64 import base64
import hashlib import hashlib
import os import os
import sys
import struct import struct
import tornado.escape import tornado.escape
import tornado.web import tornado.web
@ -31,7 +32,7 @@ from tornado.escape import utf8, native_str, to_unicode
from tornado import gen, httpclient, httputil from tornado import gen, httpclient, httputil
from tornado.ioloop import IOLoop, PeriodicCallback from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.iostream import StreamClosedError from tornado.iostream import StreamClosedError
from tornado.log import gen_log, app_log from tornado.log import gen_log
from tornado import simple_httpclient from tornado import simple_httpclient
from tornado.queues import Queue from tornado.queues import Queue
from tornado.tcpclient import TCPClient from tornado.tcpclient import TCPClient
@ -43,6 +44,8 @@ if PY3:
else: else:
from urlparse import urlparse # py3 from urlparse import urlparse # py3
_default_max_message_size = 10 * 1024 * 1024
class WebSocketError(Exception): class WebSocketError(Exception):
pass pass
@ -56,6 +59,10 @@ class WebSocketClosedError(WebSocketError):
pass pass
class _DecompressTooLargeError(Exception):
pass
class WebSocketHandler(tornado.web.RequestHandler): class WebSocketHandler(tornado.web.RequestHandler):
"""Subclass this class to create a basic WebSocket handler. """Subclass this class to create a basic WebSocket handler.
@ -145,7 +152,6 @@ class WebSocketHandler(tornado.web.RequestHandler):
self.stream = None self.stream = None
self._on_close_called = False self._on_close_called = False
@tornado.web.asynchronous
def get(self, *args, **kwargs): def get(self, *args, **kwargs):
self.open_args = args self.open_args = args
self.open_kwargs = kwargs self.open_kwargs = kwargs
@ -225,7 +231,7 @@ class WebSocketHandler(tornado.web.RequestHandler):
Default is 10MiB. Default is 10MiB.
""" """
return self.settings.get('websocket_max_message_size', None) return self.settings.get('websocket_max_message_size', _default_max_message_size)
def write_message(self, message, binary=False): def write_message(self, message, binary=False):
"""Sends the given message to the client of this Web Socket. """Sends the given message to the client of this Web Socket.
@ -256,18 +262,38 @@ class WebSocketHandler(tornado.web.RequestHandler):
return self.ws_connection.write_message(message, binary=binary) return self.ws_connection.write_message(message, binary=binary)
def select_subprotocol(self, subprotocols): def select_subprotocol(self, subprotocols):
"""Invoked when a new WebSocket requests specific subprotocols. """Override to implement subprotocol negotiation.
``subprotocols`` is a list of strings identifying the ``subprotocols`` is a list of strings identifying the
subprotocols proposed by the client. This method may be subprotocols proposed by the client. This method may be
overridden to return one of those strings to select it, or overridden to return one of those strings to select it, or
``None`` to not select a subprotocol. Failure to select a ``None`` to not select a subprotocol.
subprotocol does not automatically abort the connection,
although clients may close the connection if none of their Failure to select a subprotocol does not automatically abort
proposed subprotocols was selected. the connection, although clients may close the connection if
none of their proposed subprotocols was selected.
The list may be empty, in which case this method must return
None. This method is always called exactly once even if no
subprotocols were proposed so that the handler can be advised
of this fact.
.. versionchanged:: 5.1
Previously, this method was called with a list containing
an empty string instead of an empty list if no subprotocols
were proposed by the client.
""" """
return None return None
@property
def selected_subprotocol(self):
"""The subprotocol returned by `select_subprotocol`.
.. versionadded:: 5.1
"""
return self.ws_connection.selected_subprotocol
def get_compression_options(self): def get_compression_options(self):
"""Override to return compression options for the connection. """Override to return compression options for the connection.
@ -298,6 +324,13 @@ class WebSocketHandler(tornado.web.RequestHandler):
The arguments to `open` are extracted from the `tornado.web.URLSpec` The arguments to `open` are extracted from the `tornado.web.URLSpec`
regular expression, just like the arguments to regular expression, just like the arguments to
`tornado.web.RequestHandler.get`. `tornado.web.RequestHandler.get`.
`open` may be a coroutine. `on_message` will not be called until
`open` has returned.
.. versionchanged:: 5.1
``open`` may be a coroutine.
""" """
pass pass
@ -481,7 +514,7 @@ class WebSocketHandler(tornado.web.RequestHandler):
self, compression_options=self.get_compression_options()) self, compression_options=self.get_compression_options())
def _attach_stream(self): def _attach_stream(self):
self.stream = self.request.connection.detach() self.stream = self.detach()
self.stream.set_close_callback(self.on_connection_close) self.stream.set_close_callback(self.on_connection_close)
# disable non-WS methods # disable non-WS methods
for method in ["write", "redirect", "set_header", "set_cookie", for method in ["write", "redirect", "set_header", "set_cookie",
@ -512,8 +545,7 @@ class WebSocketProtocol(object):
try: try:
result = callback(*args, **kwargs) result = callback(*args, **kwargs)
except Exception: except Exception:
app_log.error("Uncaught exception in %s", self.handler.log_exception(*sys.exc_info())
getattr(self.request, 'path', None), exc_info=True)
self._abort() self._abort()
else: else:
if result is not None: if result is not None:
@ -570,7 +602,8 @@ class _PerMessageDeflateCompressor(object):
class _PerMessageDeflateDecompressor(object): class _PerMessageDeflateDecompressor(object):
def __init__(self, persistent, max_wbits, compression_options=None): def __init__(self, persistent, max_wbits, max_message_size, compression_options=None):
self._max_message_size = max_message_size
if max_wbits is None: if max_wbits is None:
max_wbits = zlib.MAX_WBITS max_wbits = zlib.MAX_WBITS
if not (8 <= max_wbits <= zlib.MAX_WBITS): if not (8 <= max_wbits <= zlib.MAX_WBITS):
@ -587,7 +620,10 @@ class _PerMessageDeflateDecompressor(object):
def decompress(self, data): def decompress(self, data):
decompressor = self._decompressor or self._create_decompressor() decompressor = self._decompressor or self._create_decompressor()
return decompressor.decompress(data + b'\x00\x00\xff\xff') result = decompressor.decompress(data + b'\x00\x00\xff\xff', self._max_message_size)
if decompressor.unconsumed_tail:
raise _DecompressTooLargeError()
return result
class WebSocketProtocol13(WebSocketProtocol): class WebSocketProtocol13(WebSocketProtocol):
@ -675,13 +711,17 @@ class WebSocketProtocol13(WebSocketProtocol):
return WebSocketProtocol13.compute_accept_value( return WebSocketProtocol13.compute_accept_value(
self.request.headers.get("Sec-Websocket-Key")) self.request.headers.get("Sec-Websocket-Key"))
@gen.coroutine
def _accept_connection(self): def _accept_connection(self):
subprotocols = [s.strip() for s in self.request.headers.get_list("Sec-WebSocket-Protocol")] subprotocol_header = self.request.headers.get("Sec-WebSocket-Protocol")
if subprotocols: if subprotocol_header:
selected = self.handler.select_subprotocol(subprotocols) subprotocols = [s.strip() for s in subprotocol_header.split(',')]
if selected: else:
assert selected in subprotocols subprotocols = []
self.handler.set_header("Sec-WebSocket-Protocol", selected) self.selected_subprotocol = self.handler.select_subprotocol(subprotocols)
if self.selected_subprotocol:
assert self.selected_subprotocol in subprotocols
self.handler.set_header("Sec-WebSocket-Protocol", self.selected_subprotocol)
extensions = self._parse_extensions_header(self.request.headers) extensions = self._parse_extensions_header(self.request.headers)
for ext in extensions: for ext in extensions:
@ -711,9 +751,11 @@ class WebSocketProtocol13(WebSocketProtocol):
self.stream = self.handler.stream self.stream = self.handler.stream
self.start_pinging() self.start_pinging()
self._run_callback(self.handler.open, *self.handler.open_args, open_result = self._run_callback(self.handler.open, *self.handler.open_args,
**self.handler.open_kwargs) **self.handler.open_kwargs)
self._receive_frame() if open_result is not None:
yield open_result
yield self._receive_frame_loop()
def _parse_extensions_header(self, headers): def _parse_extensions_header(self, headers):
extensions = headers.get("Sec-WebSocket-Extensions", '') extensions = headers.get("Sec-WebSocket-Extensions", '')
@ -740,6 +782,8 @@ class WebSocketProtocol13(WebSocketProtocol):
else: else:
raise ValueError("unsupported extension %r", ext) raise ValueError("unsupported extension %r", ext)
self.selected_subprotocol = headers.get('Sec-WebSocket-Protocol', None)
def _get_compressor_options(self, side, agreed_parameters, compression_options=None): def _get_compressor_options(self, side, agreed_parameters, compression_options=None):
"""Converts a websocket agreed_parameters set to keyword arguments """Converts a websocket agreed_parameters set to keyword arguments
for our compressor objects. for our compressor objects.
@ -767,6 +811,7 @@ class WebSocketProtocol13(WebSocketProtocol):
self._compressor = _PerMessageDeflateCompressor( self._compressor = _PerMessageDeflateCompressor(
**self._get_compressor_options(side, agreed_parameters, compression_options)) **self._get_compressor_options(side, agreed_parameters, compression_options))
self._decompressor = _PerMessageDeflateDecompressor( self._decompressor = _PerMessageDeflateDecompressor(
max_message_size=self.handler.max_message_size,
**self._get_compressor_options(other_side, agreed_parameters, compression_options)) **self._get_compressor_options(other_side, agreed_parameters, compression_options))
def _write_frame(self, fin, opcode, data, flags=0): def _write_frame(self, fin, opcode, data, flags=0):
@ -836,111 +881,84 @@ class WebSocketProtocol13(WebSocketProtocol):
assert isinstance(data, bytes) assert isinstance(data, bytes)
self._write_frame(True, 0x9, data) self._write_frame(True, 0x9, data)
def _receive_frame(self): @gen.coroutine
def _receive_frame_loop(self):
try: try:
self.stream.read_bytes(2, self._on_frame_start) while not self.client_terminated:
yield self._receive_frame()
except StreamClosedError: except StreamClosedError:
self._abort() self._abort()
def _on_frame_start(self, data): def _read_bytes(self, n):
self._wire_bytes_in += len(data) self._wire_bytes_in += n
header, payloadlen = struct.unpack("BB", data) return self.stream.read_bytes(n)
self._final_frame = header & self.FIN
@gen.coroutine
def _receive_frame(self):
# Read the frame header.
data = yield self._read_bytes(2)
header, mask_payloadlen = struct.unpack("BB", data)
is_final_frame = header & self.FIN
reserved_bits = header & self.RSV_MASK reserved_bits = header & self.RSV_MASK
self._frame_opcode = header & self.OPCODE_MASK opcode = header & self.OPCODE_MASK
self._frame_opcode_is_control = self._frame_opcode & 0x8 opcode_is_control = opcode & 0x8
if self._decompressor is not None and self._frame_opcode != 0: if self._decompressor is not None and opcode != 0:
# Compression flag is present in the first frame's header,
# but we can't decompress until we have all the frames of
# the message.
self._frame_compressed = bool(reserved_bits & self.RSV1) self._frame_compressed = bool(reserved_bits & self.RSV1)
reserved_bits &= ~self.RSV1 reserved_bits &= ~self.RSV1
if reserved_bits: if reserved_bits:
# client is using as-yet-undefined extensions; abort # client is using as-yet-undefined extensions; abort
self._abort() self._abort()
return return
self._masked_frame = bool(payloadlen & 0x80) is_masked = bool(mask_payloadlen & 0x80)
payloadlen = payloadlen & 0x7f payloadlen = mask_payloadlen & 0x7f
if self._frame_opcode_is_control and payloadlen >= 126:
# Parse and validate the length.
if opcode_is_control and payloadlen >= 126:
# control frames must have payload < 126 # control frames must have payload < 126
self._abort() self._abort()
return return
try:
if payloadlen < 126: if payloadlen < 126:
self._frame_length = payloadlen self._frame_length = payloadlen
if self._masked_frame:
self.stream.read_bytes(4, self._on_masking_key)
else:
self._read_frame_data(False)
elif payloadlen == 126: elif payloadlen == 126:
self.stream.read_bytes(2, self._on_frame_length_16) data = yield self._read_bytes(2)
payloadlen = struct.unpack("!H", data)[0]
elif payloadlen == 127: elif payloadlen == 127:
self.stream.read_bytes(8, self._on_frame_length_64) data = yield self._read_bytes(8)
except StreamClosedError: payloadlen = struct.unpack("!Q", data)[0]
self._abort() new_len = payloadlen
def _read_frame_data(self, masked):
new_len = self._frame_length
if self._fragmented_message_buffer is not None: if self._fragmented_message_buffer is not None:
new_len += len(self._fragmented_message_buffer) new_len += len(self._fragmented_message_buffer)
if new_len > (self.handler.max_message_size or 10 * 1024 * 1024): if new_len > self.handler.max_message_size:
self.close(1009, "message too big") self.close(1009, "message too big")
self._abort()
return return
self.stream.read_bytes(
self._frame_length,
self._on_masked_frame_data if masked else self._on_frame_data)
def _on_frame_length_16(self, data): # Read the payload, unmasking if necessary.
self._wire_bytes_in += len(data) if is_masked:
self._frame_length = struct.unpack("!H", data)[0] self._frame_mask = yield self._read_bytes(4)
try: data = yield self._read_bytes(payloadlen)
if self._masked_frame: if is_masked:
self.stream.read_bytes(4, self._on_masking_key) data = _websocket_mask(self._frame_mask, data)
else:
self._read_frame_data(False)
except StreamClosedError:
self._abort()
def _on_frame_length_64(self, data): # Decide what to do with this frame.
self._wire_bytes_in += len(data) if opcode_is_control:
self._frame_length = struct.unpack("!Q", data)[0]
try:
if self._masked_frame:
self.stream.read_bytes(4, self._on_masking_key)
else:
self._read_frame_data(False)
except StreamClosedError:
self._abort()
def _on_masking_key(self, data):
self._wire_bytes_in += len(data)
self._frame_mask = data
try:
self._read_frame_data(True)
except StreamClosedError:
self._abort()
def _on_masked_frame_data(self, data):
# Don't touch _wire_bytes_in; we'll do it in _on_frame_data.
self._on_frame_data(_websocket_mask(self._frame_mask, data))
def _on_frame_data(self, data):
handled_future = None
self._wire_bytes_in += len(data)
if self._frame_opcode_is_control:
# control frames may be interleaved with a series of fragmented # control frames may be interleaved with a series of fragmented
# data frames, so control frames must not interact with # data frames, so control frames must not interact with
# self._fragmented_* # self._fragmented_*
if not self._final_frame: if not is_final_frame:
# control frames must not be fragmented # control frames must not be fragmented
self._abort() self._abort()
return return
opcode = self._frame_opcode elif opcode == 0: # continuation frame
elif self._frame_opcode == 0: # continuation frame
if self._fragmented_message_buffer is None: if self._fragmented_message_buffer is None:
# nothing to continue # nothing to continue
self._abort() self._abort()
return return
self._fragmented_message_buffer += data self._fragmented_message_buffer += data
if self._final_frame: if is_final_frame:
opcode = self._fragmented_message_opcode opcode = self._fragmented_message_opcode
data = self._fragmented_message_buffer data = self._fragmented_message_buffer
self._fragmented_message_buffer = None self._fragmented_message_buffer = None
@ -949,22 +967,14 @@ class WebSocketProtocol13(WebSocketProtocol):
# can't start new message until the old one is finished # can't start new message until the old one is finished
self._abort() self._abort()
return return
if self._final_frame: if not is_final_frame:
opcode = self._frame_opcode self._fragmented_message_opcode = opcode
else:
self._fragmented_message_opcode = self._frame_opcode
self._fragmented_message_buffer = data self._fragmented_message_buffer = data
if self._final_frame: if is_final_frame:
handled_future = self._handle_message(opcode, data) handled_future = self._handle_message(opcode, data)
if handled_future is not None:
if not self.client_terminated: yield handled_future
if handled_future:
# on_message is a coroutine, process more frames once it's done.
handled_future.add_done_callback(
lambda future: self._receive_frame())
else:
self._receive_frame()
def _handle_message(self, opcode, data): def _handle_message(self, opcode, data):
"""Execute on_message, returning its Future if it is a coroutine.""" """Execute on_message, returning its Future if it is a coroutine."""
@ -972,7 +982,12 @@ class WebSocketProtocol13(WebSocketProtocol):
return return
if self._frame_compressed: if self._frame_compressed:
try:
data = self._decompressor.decompress(data) data = self._decompressor.decompress(data)
except _DecompressTooLargeError:
self.close(1009, "message too big after decompression")
self._abort()
return
if opcode == 0x1: if opcode == 0x1:
# UTF-8 data # UTF-8 data
@ -1092,7 +1107,7 @@ class WebSocketClientConnection(simple_httpclient._HTTPConnection):
""" """
def __init__(self, request, on_message_callback=None, def __init__(self, request, on_message_callback=None,
compression_options=None, ping_interval=None, ping_timeout=None, compression_options=None, ping_interval=None, ping_timeout=None,
max_message_size=None): max_message_size=None, subprotocols=[]):
self.compression_options = compression_options self.compression_options = compression_options
self.connect_future = Future() self.connect_future = Future()
self.protocol = None self.protocol = None
@ -1113,6 +1128,8 @@ class WebSocketClientConnection(simple_httpclient._HTTPConnection):
'Sec-WebSocket-Key': self.key, 'Sec-WebSocket-Key': self.key,
'Sec-WebSocket-Version': '13', 'Sec-WebSocket-Version': '13',
}) })
if subprotocols is not None:
request.headers['Sec-WebSocket-Protocol'] = ','.join(subprotocols)
if self.compression_options is not None: if self.compression_options is not None:
# Always offer to let the server set our max_wbits (and even though # Always offer to let the server set our max_wbits (and even though
# we don't offer it, we will accept a client_no_context_takeover # we don't offer it, we will accept a client_no_context_takeover
@ -1167,7 +1184,7 @@ class WebSocketClientConnection(simple_httpclient._HTTPConnection):
self.protocol = self.get_websocket_protocol() self.protocol = self.get_websocket_protocol()
self.protocol._process_server_headers(self.key, self.headers) self.protocol._process_server_headers(self.key, self.headers)
self.protocol.start_pinging() self.protocol.start_pinging()
self.protocol._receive_frame() IOLoop.current().add_callback(self.protocol._receive_frame_loop)
if self._timeout is not None: if self._timeout is not None:
self.io_loop.remove_timeout(self._timeout) self.io_loop.remove_timeout(self._timeout)
@ -1247,11 +1264,19 @@ class WebSocketClientConnection(simple_httpclient._HTTPConnection):
return WebSocketProtocol13(self, mask_outgoing=True, return WebSocketProtocol13(self, mask_outgoing=True,
compression_options=self.compression_options) compression_options=self.compression_options)
@property
def selected_subprotocol(self):
"""The subprotocol selected by the server.
.. versionadded:: 5.1
"""
return self.protocol.selected_subprotocol
def websocket_connect(url, callback=None, connect_timeout=None, def websocket_connect(url, callback=None, connect_timeout=None,
on_message_callback=None, compression_options=None, on_message_callback=None, compression_options=None,
ping_interval=None, ping_timeout=None, ping_interval=None, ping_timeout=None,
max_message_size=None): max_message_size=_default_max_message_size, subprotocols=None):
"""Client-side websocket support. """Client-side websocket support.
Takes a url and returns a Future whose result is a Takes a url and returns a Future whose result is a
@ -1274,6 +1299,11 @@ def websocket_connect(url, callback=None, connect_timeout=None,
``websocket_connect``. In both styles, a message of ``None`` ``websocket_connect``. In both styles, a message of ``None``
indicates that the connection has been closed. indicates that the connection has been closed.
``subprotocols`` may be a list of strings specifying proposed
subprotocols. The selected protocol may be found on the
``selected_subprotocol`` attribute of the connection object
when the connection is complete.
.. versionchanged:: 3.2 .. versionchanged:: 3.2
Also accepts ``HTTPRequest`` objects in place of urls. Also accepts ``HTTPRequest`` objects in place of urls.
@ -1286,6 +1316,9 @@ def websocket_connect(url, callback=None, connect_timeout=None,
.. versionchanged:: 5.0 .. versionchanged:: 5.0
The ``io_loop`` argument (deprecated since version 4.1) has been removed. The ``io_loop`` argument (deprecated since version 4.1) has been removed.
.. versionchanged:: 5.1
Added the ``subprotocols`` argument.
""" """
if isinstance(url, httpclient.HTTPRequest): if isinstance(url, httpclient.HTTPRequest):
assert connect_timeout is None assert connect_timeout is None
@ -1302,7 +1335,8 @@ def websocket_connect(url, callback=None, connect_timeout=None,
compression_options=compression_options, compression_options=compression_options,
ping_interval=ping_interval, ping_interval=ping_interval,
ping_timeout=ping_timeout, ping_timeout=ping_timeout,
max_message_size=max_message_size) max_message_size=max_message_size,
subprotocols=subprotocols)
if callback is not None: if callback is not None:
IOLoop.current().add_future(conn.connect_future, callback) IOLoop.current().add_future(conn.connect_future, callback)
return conn.connect_future return conn.connect_future

View file

@ -33,6 +33,7 @@ from __future__ import absolute_import, division, print_function
import sys import sys
from io import BytesIO from io import BytesIO
import tornado import tornado
import warnings
from tornado.concurrent import Future from tornado.concurrent import Future
from tornado import escape from tornado import escape
@ -76,6 +77,7 @@ class WSGIApplication(web.Application):
.. deprecated:: 4.0 .. deprecated:: 4.0
Use a regular `.Application` and wrap it in `WSGIAdapter` instead. Use a regular `.Application` and wrap it in `WSGIAdapter` instead.
This class will be removed in Tornado 6.0.
""" """
def __call__(self, environ, start_response): def __call__(self, environ, start_response):
return WSGIAdapter(self)(environ, start_response) return WSGIAdapter(self)(environ, start_response)
@ -83,8 +85,10 @@ class WSGIApplication(web.Application):
# WSGI has no facilities for flow control, so just return an already-done # WSGI has no facilities for flow control, so just return an already-done
# Future when the interface requires it. # Future when the interface requires it.
_dummy_future = Future() def _dummy_future():
_dummy_future.set_result(None) f = Future()
f.set_result(None)
return f
class _WSGIConnection(httputil.HTTPConnection): class _WSGIConnection(httputil.HTTPConnection):
@ -116,7 +120,7 @@ class _WSGIConnection(httputil.HTTPConnection):
self.write(chunk, callback) self.write(chunk, callback)
elif callback is not None: elif callback is not None:
callback() callback()
return _dummy_future return _dummy_future()
def write(self, chunk, callback=None): def write(self, chunk, callback=None):
if self._expected_content_remaining is not None: if self._expected_content_remaining is not None:
@ -128,7 +132,7 @@ class _WSGIConnection(httputil.HTTPConnection):
self._write_buffer.append(chunk) self._write_buffer.append(chunk)
if callback is not None: if callback is not None:
callback() callback()
return _dummy_future return _dummy_future()
def finish(self): def finish(self):
if (self._expected_content_remaining is not None and if (self._expected_content_remaining is not None and
@ -179,9 +183,25 @@ class WSGIAdapter(object):
that it is not possible to use `.AsyncHTTPClient`, or the that it is not possible to use `.AsyncHTTPClient`, or the
`tornado.auth` or `tornado.websocket` modules. `tornado.auth` or `tornado.websocket` modules.
In multithreaded WSGI servers on Python 3, it may be necessary to
permit `asyncio` to create event loops on any thread. Run the
following at startup (typically import time for WSGI
applications)::
import asyncio
from tornado.platform.asyncio import AnyThreadEventLoopPolicy
asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy())
.. versionadded:: 4.0 .. versionadded:: 4.0
.. deprecated:: 5.1
This class is deprecated and will be removed in Tornado 6.0.
Use Tornado's `.HTTPServer` instead of a WSGI container.
""" """
def __init__(self, application): def __init__(self, application):
warnings.warn("WSGIAdapter is deprecated, use Tornado's HTTPServer instead",
DeprecationWarning)
if isinstance(application, WSGIApplication): if isinstance(application, WSGIApplication):
self.application = lambda request: web.Application.__call__( self.application = lambda request: web.Application.__call__(
application, request) application, request)