diff --git a/CHANGES.md b/CHANGES.md index a52b32df..e442bb0c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -10,6 +10,7 @@ * Update Requests library 2.15.1 (282b01a) to 2.19.1 (33b41c7) * Update scandir module 1.6 (c3592ee) to 1.9.0 (9ab3d1f) * Update SimpleJSON 3.13.2 (6ffddbe) to 3.16.0 (e2a54f7) +* Update Tornado Web Server 5.0.1 (2b2a220a) to 5.1.1 (cc2cf07) * Update unidecode module 1.0.22 (81f938d) to 1.0.22 (578cdb9) * Add idna library 2.7 (0f50bdc) * Add urllib3 release 1.23 (7c216f4) diff --git a/lib/tornado/__init__.py b/lib/tornado/__init__.py index 9a9d2202..b269cf70 100644 --- a/lib/tornado/__init__.py +++ b/lib/tornado/__init__.py @@ -24,5 +24,5 @@ from __future__ import absolute_import, division, print_function # is zero for an official release, positive for a development branch, # or negative for a release candidate or beta (after the base version # number has been incremented) -version = "5.1.dev1" -version_info = (5, 1, 0, -100) +version = "5.1.1" +version_info = (5, 1, 1, 0) diff --git a/lib/tornado/auth.py b/lib/tornado/auth.py index 0069efcb..b79ad14b 100644 --- a/lib/tornado/auth.py +++ b/lib/tornado/auth.py @@ -37,15 +37,14 @@ Example usage for Google OAuth: class GoogleOAuth2LoginHandler(tornado.web.RequestHandler, tornado.auth.GoogleOAuth2Mixin): - @tornado.gen.coroutine - def get(self): + async def get(self): if self.get_argument('code', False): - user = yield self.get_authenticated_user( + user = await self.get_authenticated_user( redirect_uri='http://your.site.com/auth/google', code=self.get_argument('code')) # Save the user with e.g. set_secure_cookie else: - yield self.authorize_redirect( + await self.authorize_redirect( redirect_uri='http://your.site.com/auth/google', client_id=self.settings['google_oauth']['key'], scope=['profile', 'email'], @@ -75,15 +74,15 @@ import time import uuid import warnings -from tornado.concurrent import (Future, return_future, chain_future, - future_set_exc_info, +from tornado.concurrent import (Future, _non_deprecated_return_future, + future_set_exc_info, chain_future, future_set_result_unless_cancelled) from tornado import gen from tornado import httpclient from tornado import escape from tornado.httputil import url_concat from tornado.log import gen_log -from tornado.stack_context import ExceptionStackContext +from tornado.stack_context import ExceptionStackContext, wrap from tornado.util import unicode_type, ArgReplacer, PY3 if PY3: @@ -128,7 +127,7 @@ def _auth_return_future(f): warnings.warn("callback arguments are deprecated, use the returned Future instead", DeprecationWarning) future.add_done_callback( - functools.partial(_auth_future_to_callback, callback)) + wrap(functools.partial(_auth_future_to_callback, callback))) def handle_exception(typ, value, tb): if future.done(): @@ -136,7 +135,7 @@ def _auth_return_future(f): else: future_set_exc_info(future, (typ, value, tb)) return True - with ExceptionStackContext(handle_exception): + with ExceptionStackContext(handle_exception, delay_warning=True): f(*args, **kwargs) return future return wrapper @@ -149,7 +148,7 @@ class OpenIdMixin(object): * ``_OPENID_ENDPOINT``: the identity provider's URI. """ - @return_future + @_non_deprecated_return_future def authenticate_redirect(self, callback_uri=None, ax_attrs=["name", "email", "language", "username"], callback=None): @@ -203,8 +202,8 @@ class OpenIdMixin(object): if http_client is None: http_client = self.get_auth_http_client() fut = http_client.fetch(url, method="POST", body=urllib_parse.urlencode(args)) - fut.add_done_callback(functools.partial( - self._on_authentication_verified, callback)) + fut.add_done_callback(wrap(functools.partial( + self._on_authentication_verified, callback))) def _openid_args(self, callback_uri, ax_attrs=[], oauth_scope=None): url = urlparse.urljoin(self.request.full_url(), callback_uri) @@ -344,7 +343,7 @@ class OAuthMixin(object): Subclasses must also override the `_oauth_get_user_future` and `_oauth_consumer_token` methods. """ - @return_future + @_non_deprecated_return_future def authorize_redirect(self, callback_uri=None, extra_params=None, http_client=None, callback=None): """Redirects the user to obtain OAuth authorization for this service. @@ -382,18 +381,18 @@ class OAuthMixin(object): fut = http_client.fetch( self._oauth_request_token_url(callback_uri=callback_uri, extra_params=extra_params)) - fut.add_done_callback(functools.partial( + fut.add_done_callback(wrap(functools.partial( self._on_request_token, self._OAUTH_AUTHORIZE_URL, callback_uri, - callback)) + callback))) else: fut = http_client.fetch(self._oauth_request_token_url()) fut.add_done_callback( - functools.partial( + wrap(functools.partial( self._on_request_token, self._OAUTH_AUTHORIZE_URL, callback_uri, - callback)) + callback))) @_auth_return_future def get_authenticated_user(self, callback, http_client=None): @@ -433,7 +432,7 @@ class OAuthMixin(object): if http_client is None: http_client = self.get_auth_http_client() fut = http_client.fetch(self._oauth_access_token_url(token)) - fut.add_done_callback(functools.partial(self._on_access_token, callback)) + fut.add_done_callback(wrap(functools.partial(self._on_access_token, callback))) def _oauth_request_token_url(self, callback_uri=None, extra_params=None): consumer_token = self._oauth_consumer_token() @@ -516,7 +515,7 @@ class OAuthMixin(object): fut = self._oauth_get_user_future(access_token) fut = gen.convert_yielded(fut) fut.add_done_callback( - functools.partial(self._on_oauth_get_user, access_token, future)) + wrap(functools.partial(self._on_oauth_get_user, access_token, future))) def _oauth_consumer_token(self): """Subclasses must override this to return their OAuth consumer keys. @@ -525,7 +524,7 @@ class OAuthMixin(object): """ raise NotImplementedError() - @return_future + @_non_deprecated_return_future def _oauth_get_user_future(self, access_token, callback): """Subclasses must override this to get basic information about the user. @@ -618,7 +617,7 @@ class OAuth2Mixin(object): * ``_OAUTH_AUTHORIZE_URL``: The service's authorization url. * ``_OAUTH_ACCESS_TOKEN_URL``: The service's access token url. """ - @return_future + @_non_deprecated_return_future def authorize_redirect(self, redirect_uri=None, client_id=None, client_secret=None, extra_params=None, callback=None, scope=None, response_type="code"): @@ -683,16 +682,15 @@ class OAuth2Mixin(object): class MainHandler(tornado.web.RequestHandler, tornado.auth.FacebookGraphMixin): @tornado.web.authenticated - @tornado.gen.coroutine - def get(self): - new_entry = yield self.oauth2_request( + async def get(self): + new_entry = await self.oauth2_request( "https://graph.facebook.com/me/feed", post_args={"message": "I am posting from my Tornado application!"}, access_token=self.current_user["access_token"]) if not new_entry: # Call failed; perhaps missing permission? - yield self.authorize_redirect() + await self.authorize_redirect() return self.finish("Posted a message!") @@ -713,7 +711,7 @@ class OAuth2Mixin(object): if all_args: url += "?" + urllib_parse.urlencode(all_args) - callback = functools.partial(self._on_oauth2_request, callback) + callback = wrap(functools.partial(self._on_oauth2_request, callback)) http = self.get_auth_http_client() if post_args is not None: fut = http.fetch(url, method="POST", body=urllib_parse.urlencode(post_args)) @@ -758,13 +756,12 @@ class TwitterMixin(OAuthMixin): class TwitterLoginHandler(tornado.web.RequestHandler, tornado.auth.TwitterMixin): - @tornado.gen.coroutine - def get(self): + async def get(self): if self.get_argument("oauth_token", None): - user = yield self.get_authenticated_user() + user = await self.get_authenticated_user() # Save the user using e.g. set_secure_cookie() else: - yield self.authorize_redirect() + await self.authorize_redirect() .. testoutput:: :hide: @@ -781,7 +778,7 @@ class TwitterMixin(OAuthMixin): _OAUTH_NO_CALLBACKS = False _TWITTER_BASE_URL = "https://api.twitter.com/1.1" - @return_future + @_non_deprecated_return_future def authenticate_redirect(self, callback_uri=None, callback=None): """Just like `~OAuthMixin.authorize_redirect`, but auto-redirects if authorized. @@ -799,10 +796,10 @@ class TwitterMixin(OAuthMixin): Use the returned awaitable object instead. """ http = self.get_auth_http_client() - http.fetch(self._oauth_request_token_url(callback_uri=callback_uri), - functools.partial( - self._on_request_token, self._OAUTH_AUTHENTICATE_URL, - None, callback)) + fut = http.fetch(self._oauth_request_token_url(callback_uri=callback_uri)) + fut.add_done_callback(wrap(functools.partial( + self._on_request_token, self._OAUTH_AUTHENTICATE_URL, + None, callback))) @_auth_return_future def twitter_request(self, path, callback=None, access_token=None, @@ -829,9 +826,8 @@ class TwitterMixin(OAuthMixin): class MainHandler(tornado.web.RequestHandler, tornado.auth.TwitterMixin): @tornado.web.authenticated - @tornado.gen.coroutine - def get(self): - new_entry = yield self.twitter_request( + async def get(self): + new_entry = await self.twitter_request( "/statuses/update", post_args={"status": "Testing Tornado Web Server"}, access_token=self.current_user["access_token"]) @@ -867,7 +863,7 @@ class TwitterMixin(OAuthMixin): if args: url += "?" + urllib_parse.urlencode(args) http = self.get_auth_http_client() - http_callback = functools.partial(self._on_twitter_request, callback, url) + http_callback = wrap(functools.partial(self._on_twitter_request, callback, url)) if post_args is not None: fut = http.fetch(url, method="POST", body=urllib_parse.urlencode(post_args)) else: @@ -942,19 +938,18 @@ class GoogleOAuth2Mixin(OAuth2Mixin): class GoogleOAuth2LoginHandler(tornado.web.RequestHandler, tornado.auth.GoogleOAuth2Mixin): - @tornado.gen.coroutine - def get(self): + async def get(self): if self.get_argument('code', False): - access = yield self.get_authenticated_user( + access = await self.get_authenticated_user( redirect_uri='http://your.site.com/auth/google', code=self.get_argument('code')) - user = yield self.oauth2_request( + user = await self.oauth2_request( "https://www.googleapis.com/oauth2/v1/userinfo", access_token=access["access_token"]) # Save the user and access token with # e.g. set_secure_cookie. else: - yield self.authorize_redirect( + await self.authorize_redirect( redirect_uri='http://your.site.com/auth/google', client_id=self.settings['google_oauth']['key'], scope=['profile', 'email'], @@ -982,7 +977,7 @@ class GoogleOAuth2Mixin(OAuth2Mixin): method="POST", headers={'Content-Type': 'application/x-www-form-urlencoded'}, body=body) - fut.add_done_callback(functools.partial(self._on_access_token, callback)) + fut.add_done_callback(wrap(functools.partial(self._on_access_token, callback))) def _on_access_token(self, future, response_fut): """Callback function for the exchange to the access token.""" @@ -1014,17 +1009,16 @@ class FacebookGraphMixin(OAuth2Mixin): class FacebookGraphLoginHandler(tornado.web.RequestHandler, tornado.auth.FacebookGraphMixin): - @tornado.gen.coroutine - def get(self): + async def get(self): if self.get_argument("code", False): - user = yield self.get_authenticated_user( + user = await self.get_authenticated_user( redirect_uri='/auth/facebookgraph/', client_id=self.settings["facebook_api_key"], client_secret=self.settings["facebook_secret"], code=self.get_argument("code")) # Save the user with e.g. set_secure_cookie else: - yield self.authorize_redirect( + await self.authorize_redirect( redirect_uri='/auth/facebookgraph/', client_id=self.settings["facebook_api_key"], extra_params={"scope": "read_stream,offline_access"}) @@ -1067,8 +1061,8 @@ class FacebookGraphMixin(OAuth2Mixin): fields.update(extra_fields) fut = http.fetch(self._oauth_request_token_url(**args)) - fut.add_done_callback(functools.partial(self._on_access_token, redirect_uri, client_id, - client_secret, callback, fields)) + fut.add_done_callback(wrap(functools.partial(self._on_access_token, redirect_uri, client_id, + client_secret, callback, fields))) @gen.coroutine def _on_access_token(self, redirect_uri, client_id, client_secret, @@ -1134,9 +1128,8 @@ class FacebookGraphMixin(OAuth2Mixin): class MainHandler(tornado.web.RequestHandler, tornado.auth.FacebookGraphMixin): @tornado.web.authenticated - @tornado.gen.coroutine - def get(self): - new_entry = yield self.facebook_request( + async def get(self): + new_entry = await self.facebook_request( "/me/feed", post_args={"message": "I am posting from my Tornado application!"}, access_token=self.current_user["access_token"]) diff --git a/lib/tornado/autoreload.py b/lib/tornado/autoreload.py index 2f911270..7d69474a 100644 --- a/lib/tornado/autoreload.py +++ b/lib/tornado/autoreload.py @@ -107,6 +107,9 @@ _watched_files = set() _reload_hooks = [] _reload_attempted = False _io_loops = weakref.WeakKeyDictionary() # type: ignore +_autoreload_is_main = False +_original_argv = None +_original_spec = None def start(check_time=500): @@ -214,11 +217,15 @@ def _reload(): # __spec__ is not available (Python < 3.4), check instead if # sys.path[0] is an empty string and add the current directory to # $PYTHONPATH. - spec = getattr(sys.modules['__main__'], '__spec__', None) - if spec: - argv = ['-m', spec.name] + sys.argv[1:] + if _autoreload_is_main: + spec = _original_spec + argv = _original_argv else: + spec = getattr(sys.modules['__main__'], '__spec__', None) argv = sys.argv + if spec: + argv = ['-m', spec.name] + argv[1:] + else: path_prefix = '.' + os.pathsep if (sys.path[0] == '' and not os.environ.get("PYTHONPATH", "").startswith(path_prefix)): @@ -226,7 +233,7 @@ def _reload(): os.environ.get("PYTHONPATH", "")) if not _has_execv: subprocess.Popen([sys.executable] + argv) - sys.exit(0) + os._exit(0) else: try: os.execv(sys.executable, [sys.executable] + argv) @@ -269,7 +276,17 @@ def main(): can catch import-time problems like syntax errors that would otherwise prevent the script from reaching its call to `wait`. """ + # Remember that we were launched with autoreload as main. + # The main module can be tricky; set the variables both in our globals + # (which may be __main__) and the real importable version. + import tornado.autoreload + global _autoreload_is_main + global _original_argv, _original_spec + tornado.autoreload._autoreload_is_main = _autoreload_is_main = True original_argv = sys.argv + tornado.autoreload._original_argv = _original_argv = original_argv + original_spec = getattr(sys.modules['__main__'], '__spec__', None) + tornado.autoreload._original_spec = _original_spec = original_spec sys.argv = sys.argv[:] if len(sys.argv) >= 3 and sys.argv[1] == "-m": mode = "module" diff --git a/lib/tornado/concurrent.py b/lib/tornado/concurrent.py index 85076681..f7e6bccc 100644 --- a/lib/tornado/concurrent.py +++ b/lib/tornado/concurrent.py @@ -483,7 +483,7 @@ def return_future(f): If no callback is given, the caller should use the ``Future`` to wait for the function to complete (perhaps by yielding it in a - `.gen.engine` function, or passing it to `.IOLoop.add_future`). + coroutine, or passing it to `.IOLoop.add_future`). Usage: @@ -494,10 +494,8 @@ def return_future(f): # Do stuff (possibly asynchronous) callback(result) - @gen.engine - def caller(callback): - yield future_func(arg1, arg2) - callback() + async def caller(): + await future_func(arg1, arg2) .. @@ -512,9 +510,22 @@ def return_future(f): .. deprecated:: 5.1 - New code should use coroutines directly instead of wrapping - callback-based code with this decorator. + This decorator will be removed in Tornado 6.0. New code should + use coroutines directly instead of wrapping callback-based code + with this decorator. Interactions with non-Tornado + callback-based code should be managed explicitly to avoid + relying on the `.ExceptionStackContext` built into this + decorator. """ + warnings.warn("@return_future is deprecated, use coroutines instead", + DeprecationWarning) + return _non_deprecated_return_future(f, warn=True) + + +def _non_deprecated_return_future(f, warn=False): + # Allow auth.py to use this decorator without triggering + # deprecation warnings. This will go away once auth.py has removed + # its legacy interfaces in 6.0. replacer = ArgReplacer(f, 'callback') @functools.wraps(f) @@ -528,7 +539,15 @@ def return_future(f): future_set_exc_info(future, (typ, value, tb)) return True exc_info = None - with ExceptionStackContext(handle_error): + esc = ExceptionStackContext(handle_error, delay_warning=True) + with esc: + if not warn: + # HACK: In non-deprecated mode (only used in auth.py), + # suppress the warning entirely. Since this is added + # in a 5.1 patch release and already removed in 6.0 + # I'm prioritizing a minimial change instead of a + # clean solution. + esc.delay_warning = False try: result = f(*args, **kwargs) if result is not None: diff --git a/lib/tornado/curl_httpclient.py b/lib/tornado/curl_httpclient.py index 54fc5b36..7f5cb105 100644 --- a/lib/tornado/curl_httpclient.py +++ b/lib/tornado/curl_httpclient.py @@ -80,7 +80,7 @@ class CurlAsyncHTTPClient(AsyncHTTPClient): self._multi = None def fetch_impl(self, request, callback): - self._requests.append((request, callback)) + self._requests.append((request, callback, self.io_loop.time())) self._process_queue() self._set_timeout(0) @@ -205,13 +205,15 @@ class CurlAsyncHTTPClient(AsyncHTTPClient): while self._free_list and self._requests: started += 1 curl = self._free_list.pop() - (request, callback) = self._requests.popleft() + (request, callback, queue_start_time) = self._requests.popleft() curl.info = { "headers": httputil.HTTPHeaders(), "buffer": BytesIO(), "request": request, "callback": callback, + "queue_start_time": queue_start_time, "curl_start_time": time.time(), + "curl_start_ioloop_time": self.io_loop.current().time(), } try: self._curl_setup_request( @@ -257,7 +259,7 @@ class CurlAsyncHTTPClient(AsyncHTTPClient): # the various curl timings are documented at # http://curl.haxx.se/libcurl/c/curl_easy_getinfo.html time_info = dict( - queue=info["curl_start_time"] - info["request"].start_time, + queue=info["curl_start_ioloop_time"] - info["queue_start_time"], namelookup=curl.getinfo(pycurl.NAMELOOKUP_TIME), connect=curl.getinfo(pycurl.CONNECT_TIME), appconnect=curl.getinfo(pycurl.APPCONNECT_TIME), @@ -271,7 +273,8 @@ class CurlAsyncHTTPClient(AsyncHTTPClient): request=info["request"], code=code, headers=info["headers"], buffer=buffer, effective_url=effective_url, error=error, reason=info['headers'].get("X-Http-Reason", None), - request_time=time.time() - info["curl_start_time"], + request_time=self.io_loop.time() - info["curl_start_ioloop_time"], + start_time=info["curl_start_time"], time_info=time_info)) except Exception: self.handle_callback_exception(info["callback"]) @@ -319,17 +322,7 @@ class CurlAsyncHTTPClient(AsyncHTTPClient): self.io_loop.add_callback(request.streaming_callback, chunk) else: write_function = buffer.write - if bytes is str: # py2 - curl.setopt(pycurl.WRITEFUNCTION, write_function) - else: # py3 - # Upstream pycurl doesn't support py3, but ubuntu 12.10 includes - # a fork/port. That version has a bug in which it passes unicode - # strings instead of bytes to the WRITEFUNCTION. This means that - # if you use a WRITEFUNCTION (which tornado always does), you cannot - # download arbitrary binary data. This needs to be fixed in the - # ported pycurl package, but in the meantime this lambda will - # make it work for downloading (utf8) text. - curl.setopt(pycurl.WRITEFUNCTION, lambda s: write_function(utf8(s))) + curl.setopt(pycurl.WRITEFUNCTION, write_function) curl.setopt(pycurl.FOLLOWLOCATION, request.follow_redirects) curl.setopt(pycurl.MAXREDIRS, request.max_redirects) curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(1000 * request.connect_timeout)) @@ -348,8 +341,8 @@ class CurlAsyncHTTPClient(AsyncHTTPClient): curl.setopt(pycurl.PROXY, request.proxy_host) curl.setopt(pycurl.PROXYPORT, request.proxy_port) if request.proxy_username: - credentials = '%s:%s' % (request.proxy_username, - request.proxy_password) + credentials = httputil.encode_username_password(request.proxy_username, + request.proxy_password) curl.setopt(pycurl.PROXYUSERPWD, credentials) if (request.proxy_auth_mode is None or @@ -441,8 +434,6 @@ class CurlAsyncHTTPClient(AsyncHTTPClient): curl.setopt(pycurl.INFILESIZE, len(request.body or '')) if request.auth_username is not None: - userpwd = "%s:%s" % (request.auth_username, request.auth_password or '') - if request.auth_mode is None or request.auth_mode == "basic": curl.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC) elif request.auth_mode == "digest": @@ -450,7 +441,9 @@ class CurlAsyncHTTPClient(AsyncHTTPClient): else: raise ValueError("Unsupported auth_mode %s" % request.auth_mode) - curl.setopt(pycurl.USERPWD, native_str(userpwd)) + userpwd = httputil.encode_username_password(request.auth_username, + request.auth_password) + curl.setopt(pycurl.USERPWD, userpwd) curl_log.debug("%s %s (username: %r)", request.method, request.url, request.auth_username) else: diff --git a/lib/tornado/gen.py b/lib/tornado/gen.py index cc59402e..3556374d 100644 --- a/lib/tornado/gen.py +++ b/lib/tornado/gen.py @@ -17,7 +17,7 @@ environment than chaining callbacks. Code using coroutines is technically asynchronous, but it is written as a single generator instead of a collection of separate functions. -For example, the following asynchronous handler: +For example, the following callback-based asynchronous handler: .. testcode:: diff --git a/lib/tornado/http1connection.py b/lib/tornado/http1connection.py index e34c5f3f..4ab92882 100644 --- a/lib/tornado/http1connection.py +++ b/lib/tornado/http1connection.py @@ -21,6 +21,7 @@ from __future__ import absolute_import, division, print_function import re +import warnings from tornado.concurrent import (Future, future_add_done_callback, future_set_result_unless_cancelled) @@ -277,8 +278,14 @@ class HTTP1Connection(httputil.HTTPConnection): def set_close_callback(self, callback): """Sets a callback that will be run when the connection is closed. - .. deprecated:: 4.0 - Use `.HTTPMessageDelegate.on_connection_close` instead. + Note that this callback is slightly different from + `.HTTPMessageDelegate.on_connection_close`: The + `.HTTPMessageDelegate` method is called when the connection is + closed while recieving a message. This callback is used when + there is not an active delegate (for example, on the server + side this callback is used if the client closes the connection + after sending its request but before receiving all the + response. """ self._close_callback = stack_context.wrap(callback) @@ -395,6 +402,8 @@ class HTTP1Connection(httputil.HTTPConnection): future.exception() else: if callback is not None: + warnings.warn("callback argument is deprecated, use returned Future instead", + DeprecationWarning) self._write_callback = stack_context.wrap(callback) else: future = self._write_future = Future() @@ -402,7 +411,7 @@ class HTTP1Connection(httputil.HTTPConnection): if chunk: data += self._format_chunk(chunk) self._pending_write = self.stream.write(data) - self._pending_write.add_done_callback(self._on_write_complete) + future_add_done_callback(self._pending_write, self._on_write_complete) return future def _format_chunk(self, chunk): @@ -434,6 +443,8 @@ class HTTP1Connection(httputil.HTTPConnection): self._write_future.exception() else: if callback is not None: + warnings.warn("callback argument is deprecated, use returned Future instead", + DeprecationWarning) self._write_callback = stack_context.wrap(callback) else: future = self._write_future = Future() diff --git a/lib/tornado/httpclient.py b/lib/tornado/httpclient.py index f0a2df88..5ed2ee67 100644 --- a/lib/tornado/httpclient.py +++ b/lib/tornado/httpclient.py @@ -125,15 +125,15 @@ class AsyncHTTPClient(Configurable): Example usage:: - def handle_response(response): - if response.error: - print("Error: %s" % response.error) + async def f(): + http_client = AsyncHTTPClient() + try: + response = await http_client.fetch("http://www.google.com") + except Exception as e: + print("Error: %s" % e) else: print(response.body) - http_client = AsyncHTTPClient() - http_client.fetch("http://www.google.com/", handle_response) - The constructor for this class is magic in several respects: It actually creates an instance of an implementation-specific subclass, and instances are reused as a kind of pseudo-singleton @@ -578,17 +578,35 @@ class HTTPResponse(object): * error: Exception object, if any - * request_time: seconds from request start to finish + * request_time: seconds from request start to finish. Includes all network + operations from DNS resolution to receiving the last byte of data. + Does not include time spent in the queue (due to the ``max_clients`` option). + If redirects were followed, only includes the final request. + + * start_time: Time at which the HTTP operation started, based on `time.time` + (not the monotonic clock used by `.IOLoop.time`). May be ``None`` if the request + timed out while in the queue. * time_info: dictionary of diagnostic timing information from the request. Available data are subject to change, but currently uses timings available from http://curl.haxx.se/libcurl/c/curl_easy_getinfo.html, plus ``queue``, which is the delay (if any) introduced by waiting for a slot under `AsyncHTTPClient`'s ``max_clients`` setting. + + .. versionadded:: 5.1 + + Added the ``start_time`` attribute. + + .. versionchanged:: 5.1 + + The ``request_time`` attribute previously included time spent in the queue + for ``simple_httpclient``, but not in ``curl_httpclient``. Now queueing time + is excluded in both implementations. ``request_time`` is now more accurate for + ``curl_httpclient`` because it uses a monotonic clock when available. """ def __init__(self, request, code, headers=None, buffer=None, effective_url=None, error=None, request_time=None, - time_info=None, reason=None): + time_info=None, reason=None, start_time=None): if isinstance(request, _RequestProxy): self.request = request.request else: @@ -615,6 +633,7 @@ class HTTPResponse(object): self.error = None else: self.error = error + self.start_time = start_time self.request_time = request_time self.time_info = time_info or {} diff --git a/lib/tornado/httputil.py b/lib/tornado/httputil.py index 3d2d3359..39614466 100644 --- a/lib/tornado/httputil.py +++ b/lib/tornado/httputil.py @@ -29,10 +29,12 @@ import email.utils import numbers import re import time +import unicodedata +import warnings from tornado.escape import native_str, parse_qs_bytes, utf8 from tornado.log import gen_log -from tornado.util import ObjectDict, PY3 +from tornado.util import ObjectDict, PY3, unicode_type if PY3: import http.cookies as Cookie @@ -380,10 +382,15 @@ class HTTPServerRequest(object): """Returns True if this request supports HTTP/1.1 semantics. .. deprecated:: 4.0 - Applications are less likely to need this information with the - introduction of `.HTTPConnection`. If you still need it, access - the ``version`` attribute directly. + + Applications are less likely to need this information with + the introduction of `.HTTPConnection`. If you still need + it, access the ``version`` attribute directly. This method + will be removed in Tornado 6.0. + """ + warnings.warn("supports_http_1_1() is deprecated, use request.version instead", + DeprecationWarning) return self.version == "HTTP/1.1" @property @@ -412,8 +419,10 @@ class HTTPServerRequest(object): .. deprecated:: 4.0 Use ``request.connection`` and the `.HTTPConnection` methods - to write the response. + to write the response. This method will be removed in Tornado 6.0. """ + warnings.warn("req.write deprecated, use req.connection.write and write_headers instead", + DeprecationWarning) assert isinstance(chunk, bytes) assert self.version.startswith("HTTP/1."), \ "deprecated interface only supported in HTTP/1.x" @@ -424,8 +433,10 @@ class HTTPServerRequest(object): .. deprecated:: 4.0 Use ``request.connection`` and the `.HTTPConnection` methods - to write the response. + to write the response. This method will be removed in Tornado 6.0. """ + warnings.warn("req.finish deprecated, use req.connection.finish instead", + DeprecationWarning) self.connection.finish() self._finish_time = time.time() @@ -581,6 +592,11 @@ class HTTPConnection(object): The ``version`` field of ``start_line`` is ignored. Returns a `.Future` if no callback is given. + + .. deprecated:: 5.1 + + The ``callback`` argument is deprecated and will be removed + in Tornado 6.0. """ raise NotImplementedError() @@ -589,6 +605,11 @@ class HTTPConnection(object): The callback will be run when the write is complete. If no callback is given, returns a Future. + + .. deprecated:: 5.1 + + The ``callback`` argument is deprecated and will be removed + in Tornado 6.0. """ raise NotImplementedError() @@ -762,6 +783,11 @@ def parse_multipart_form_data(boundary, data, arguments, files): The ``boundary`` and ``data`` parameters are both byte strings. The dictionaries given in the arguments and files parameters will be updated with the contents of the body. + + .. versionchanged:: 5.1 + + Now recognizes non-ASCII filenames in RFC 2231/5987 + (``filename*=``) format. """ # The standard allows for the boundary to be quoted in the header, # although it's rare (it happens at least for google app engine @@ -870,7 +896,8 @@ def parse_response_start_line(line): # The original 2.7 version of this code did not correctly support some # combinations of semicolons and double quotes. # It has also been modified to support valueless parameters as seen in -# websocket extension negotiations. +# websocket extension negotiations, and to support non-ascii values in +# RFC 2231/5987 format. def _parseparam(s): @@ -887,25 +914,37 @@ def _parseparam(s): def _parse_header(line): - """Parse a Content-type like header. + r"""Parse a Content-type like header. Return the main content-type and a dictionary of options. + >>> d = "form-data; foo=\"b\\\\a\\\"r\"; file*=utf-8''T%C3%A4st" + >>> ct, d = _parse_header(d) + >>> ct + 'form-data' + >>> d['file'] == r'T\u00e4st'.encode('ascii').decode('unicode_escape') + True + >>> d['foo'] + 'b\\a"r' """ parts = _parseparam(';' + line) key = next(parts) - pdict = {} + # decode_params treats first argument special, but we already stripped key + params = [('Dummy', 'value')] for p in parts: i = p.find('=') if i >= 0: name = p[:i].strip().lower() value = p[i + 1:].strip() - if len(value) >= 2 and value[0] == value[-1] == '"': - value = value[1:-1] - value = value.replace('\\\\', '\\').replace('\\"', '"') - pdict[name] = value - else: - pdict[p] = None + params.append((name, native_str(value))) + params = email.utils.decode_params(params) + params.pop(0) # get rid of the dummy again + pdict = {} + for name, value in params: + value = email.utils.collapse_rfc2231_value(value) + if len(value) >= 2 and value[0] == '"' and value[-1] == '"': + value = value[1:-1] + pdict[name] = value return key, pdict @@ -929,6 +968,20 @@ def _encode_header(key, pdict): return '; '.join(out) +def encode_username_password(username, password): + """Encodes a username/password pair in the format used by HTTP auth. + + The return value is a byte string in the form ``username:password``. + + .. versionadded:: 5.1 + """ + if isinstance(username, unicode_type): + username = unicodedata.normalize('NFC', username) + if isinstance(password, unicode_type): + password = unicodedata.normalize('NFC', password) + return utf8(username) + b":" + utf8(password) + + def doctests(): import doctest return doctest.DocTestSuite() diff --git a/lib/tornado/ioloop.py b/lib/tornado/ioloop.py index 123f2ba5..889153af 100644 --- a/lib/tornado/ioloop.py +++ b/lib/tornado/ioloop.py @@ -101,13 +101,11 @@ class IOLoop(Configurable): import socket import tornado.ioloop - from tornado import gen from tornado.iostream import IOStream - @gen.coroutine - def handle_connection(connection, address): + async def handle_connection(connection, address): stream = IOStream(connection) - message = yield stream.read_until_close() + message = await stream.read_until_close() print("message from client:", message.decode().strip()) def connection_ready(sock, fd, events): @@ -119,7 +117,8 @@ class IOLoop(Configurable): raise return connection.setblocking(0) - handle_connection(connection, address) + io_loop = tornado.ioloop.IOLoop.current() + io_loop.spawn_callback(handle_connection, connection, address) if __name__ == '__main__': sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) @@ -441,7 +440,8 @@ class IOLoop(Configurable): .. deprecated:: 5.0 Not implemented on the `asyncio` event loop. Use the environment - variable ``PYTHONASYNCIODEBUG=1`` instead. + variable ``PYTHONASYNCIODEBUG=1`` instead. This method will be + removed in Tornado 6.0. """ raise NotImplementedError() @@ -455,7 +455,8 @@ class IOLoop(Configurable): .. deprecated:: 5.0 Not implemented on the `asyncio` event loop. Use the environment - variable ``PYTHONASYNCIODEBUG=1`` instead. + variable ``PYTHONASYNCIODEBUG=1`` instead. This method will be + removed in Tornado 6.0. """ self.set_blocking_signal_threshold(seconds, self.log_stack) @@ -463,6 +464,10 @@ class IOLoop(Configurable): """Signal handler to log the stack trace of the current thread. For use with `set_blocking_signal_threshold`. + + .. deprecated:: 5.1 + + This method will be removed in Tornado 6.0. """ gen_log.warning('IOLoop blocked for %f seconds in\n%s', self._blocking_signal_threshold, @@ -498,17 +503,6 @@ class IOLoop(Configurable): If the event loop is not currently running, the next call to `start()` will return immediately. - To use asynchronous methods from otherwise-synchronous code (such as - unit tests), you can start and stop the event loop like this:: - - ioloop = IOLoop() - async_method(ioloop=ioloop, callback=ioloop.stop) - ioloop.start() - - ``ioloop.start()`` will return after ``async_method`` has run - its callback, whether that callback was invoked before or - after ``ioloop.start``. - Note that even after `stop` has been called, the `IOLoop` is not completely stopped until `IOLoop.start` has also returned. Some work that was scheduled before the call to `stop` may still @@ -519,10 +513,10 @@ class IOLoop(Configurable): def run_sync(self, func, timeout=None): """Starts the `IOLoop`, runs the given function, and stops the loop. - The function must return either a yieldable object or - ``None``. If the function returns a yieldable object, the - `IOLoop` will run until the yieldable is resolved (and - `run_sync()` will return the yieldable's result). If it raises + The function must return either an awaitable object or + ``None``. If the function returns an awaitable object, the + `IOLoop` will run until the awaitable is resolved (and + `run_sync()` will return the awaitable's result). If it raises an exception, the `IOLoop` will stop and the exception will be re-raised to the caller. @@ -530,21 +524,21 @@ class IOLoop(Configurable): a maximum duration for the function. If the timeout expires, a `tornado.util.TimeoutError` is raised. - This method is useful in conjunction with `tornado.gen.coroutine` - to allow asynchronous calls in a ``main()`` function:: + This method is useful to allow asynchronous calls in a + ``main()`` function:: - @gen.coroutine - def main(): + async def main(): # do stuff... if __name__ == '__main__': IOLoop.current().run_sync(main) .. versionchanged:: 4.3 - Returning a non-``None``, non-yieldable value is now an error. + Returning a non-``None``, non-awaitable value is now an error. .. versionchanged:: 5.0 If a timeout occurs, the ``func`` coroutine will be cancelled. + """ future_cell = [None] @@ -714,6 +708,10 @@ class IOLoop(Configurable): The callback is invoked with one argument, the `.Future`. + + This method only accepts `.Future` objects and not other + awaitables (unlike most of Tornado where the two are + interchangeable). """ assert is_future(future) callback = stack_context.wrap(callback) @@ -789,6 +787,16 @@ class IOLoop(Configurable): The exception itself is not passed explicitly, but is available in `sys.exc_info`. + + .. versionchanged:: 5.0 + + When the `asyncio` event loop is used (which is now the + default on Python 3), some callback errors will be handled by + `asyncio` instead of this method. + + .. deprecated: 5.1 + + Support for this method will be removed in Tornado 6.0. """ app_log.error("Exception in callback %r", callback, exc_info=True) diff --git a/lib/tornado/iostream.py b/lib/tornado/iostream.py index 67fa1a68..d35d8cea 100644 --- a/lib/tornado/iostream.py +++ b/lib/tornado/iostream.py @@ -33,6 +33,7 @@ import os import socket import sys import re +import warnings from tornado.concurrent import Future from tornado import ioloop @@ -342,6 +343,12 @@ class BaseIOStream(object): .. versionchanged:: 4.0 Added the ``max_bytes`` argument. The ``callback`` argument is now optional and a `.Future` will be returned if it is omitted. + + .. deprecated:: 5.1 + + The ``callback`` argument is deprecated and will be removed + in Tornado 6.0. Use the returned `.Future` instead. + """ future = self._set_read_callback(callback) self._read_regex = re.compile(regex) @@ -375,6 +382,11 @@ class BaseIOStream(object): .. versionchanged:: 4.0 Added the ``max_bytes`` argument. The ``callback`` argument is now optional and a `.Future` will be returned if it is omitted. + + .. deprecated:: 5.1 + + The ``callback`` argument is deprecated and will be removed + in Tornado 6.0. Use the returned `.Future` instead. """ future = self._set_read_callback(callback) self._read_delimiter = delimiter @@ -408,12 +420,23 @@ class BaseIOStream(object): .. versionchanged:: 4.0 Added the ``partial`` argument. The callback argument is now optional and a `.Future` will be returned if it is omitted. + + .. deprecated:: 5.1 + + The ``callback`` and ``streaming_callback`` arguments are + deprecated and will be removed in Tornado 6.0. Use the + returned `.Future` (and ``partial=True`` for + ``streaming_callback``) instead. + """ future = self._set_read_callback(callback) assert isinstance(num_bytes, numbers.Integral) self._read_bytes = num_bytes self._read_partial = partial - self._streaming_callback = stack_context.wrap(streaming_callback) + if streaming_callback is not None: + warnings.warn("streaming_callback is deprecated, use partial instead", + DeprecationWarning) + self._streaming_callback = stack_context.wrap(streaming_callback) try: self._try_inline_read() except: @@ -434,6 +457,12 @@ class BaseIOStream(object): entirely filled with read data. .. versionadded:: 5.0 + + .. deprecated:: 5.1 + + The ``callback`` argument is deprecated and will be removed + in Tornado 6.0. Use the returned `.Future` instead. + """ future = self._set_read_callback(callback) @@ -485,9 +514,19 @@ class BaseIOStream(object): The callback argument is now optional and a `.Future` will be returned if it is omitted. + .. deprecated:: 5.1 + + The ``callback`` and ``streaming_callback`` arguments are + deprecated and will be removed in Tornado 6.0. Use the + returned `.Future` (and `read_bytes` with ``partial=True`` + for ``streaming_callback``) instead. + """ future = self._set_read_callback(callback) - self._streaming_callback = stack_context.wrap(streaming_callback) + if streaming_callback is not None: + warnings.warn("streaming_callback is deprecated, use read_bytes(partial=True) instead", + DeprecationWarning) + self._streaming_callback = stack_context.wrap(streaming_callback) if self.closed(): if self._streaming_callback is not None: self._run_read_callback(self._read_buffer_size, True) @@ -521,6 +560,12 @@ class BaseIOStream(object): .. versionchanged:: 4.5 Added support for `memoryview` arguments. + + .. deprecated:: 5.1 + + The ``callback`` argument is deprecated and will be removed + in Tornado 6.0. Use the returned `.Future` instead. + """ self._check_closed() if data: @@ -530,6 +575,8 @@ class BaseIOStream(object): self._write_buffer.append(data) self._total_write_index += len(data) if callback is not None: + warnings.warn("callback argument is deprecated, use returned Future instead", + DeprecationWarning) self._write_callback = stack_context.wrap(callback) future = None else: @@ -546,9 +593,14 @@ class BaseIOStream(object): def set_close_callback(self, callback): """Call the given callback when the stream is closed. - This is not necessary for applications that use the `.Future` - interface; all outstanding ``Futures`` will resolve with a - `StreamClosedError` when the stream is closed. + This mostly is not necessary for applications that use the + `.Future` interface; all outstanding ``Futures`` will resolve + with a `StreamClosedError` when the stream is closed. However, + it is still useful as a way to signal that the stream has been + closed while no other read or write is in progress. + + Unlike other callback-based interfaces, ``set_close_callback`` + will not be removed in Tornado 6.0. """ self._close_callback = stack_context.wrap(callback) self._maybe_add_error_listener() @@ -808,6 +860,8 @@ class BaseIOStream(object): assert self._read_callback is None, "Already reading" assert self._read_future is None, "Already reading" if callback is not None: + warnings.warn("callbacks are deprecated, use returned Future instead", + DeprecationWarning) self._read_callback = stack_context.wrap(callback) else: self._read_future = Future() @@ -1137,24 +1191,23 @@ class IOStream(BaseIOStream): import tornado.iostream import socket - def send_request(): - stream.write(b"GET / HTTP/1.0\r\nHost: friendfeed.com\r\n\r\n") - stream.read_until(b"\r\n\r\n", on_headers) - - def on_headers(data): + async def main(): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) + stream = tornado.iostream.IOStream(s) + await stream.connect(("friendfeed.com", 80)) + await stream.write(b"GET / HTTP/1.0\r\nHost: friendfeed.com\r\n\r\n") + header_data = await stream.read_until(b"\r\n\r\n") headers = {} - for line in data.split(b"\r\n"): - parts = line.split(b":") - if len(parts) == 2: - headers[parts[0].strip()] = parts[1].strip() - stream.read_bytes(int(headers[b"Content-Length"]), on_body) - - def on_body(data): - print(data) + for line in header_data.split(b"\r\n"): + parts = line.split(b":") + if len(parts) == 2: + headers[parts[0].strip()] = parts[1].strip() + body_data = await stream.read_bytes(int(headers[b"Content-Length"])) + print(body_data) stream.close() - tornado.ioloop.IOLoop.current().stop() if __name__ == '__main__': + tornado.ioloop.IOLoop.current().run_sync(main) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) stream = tornado.iostream.IOStream(s) stream.connect(("friendfeed.com", 80), send_request) @@ -1238,9 +1291,17 @@ class IOStream(BaseIOStream): ``ssl_options=dict(cert_reqs=ssl.CERT_NONE)`` or a suitably-configured `ssl.SSLContext` to the `SSLIOStream` constructor to disable. + + .. deprecated:: 5.1 + + The ``callback`` argument is deprecated and will be removed + in Tornado 6.0. Use the returned `.Future` instead. + """ self._connecting = True if callback is not None: + warnings.warn("callback argument is deprecated, use returned Future instead", + DeprecationWarning) self._connect_callback = stack_context.wrap(callback) future = None else: @@ -1350,7 +1411,13 @@ class IOStream(BaseIOStream): return future def _handle_connect(self): - err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) + try: + err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) + except socket.error as e: + # Hurd doesn't allow SO_ERROR for loopback sockets because all + # errors for such sockets are reported synchronously. + if errno_from_exception(e) == errno.ENOPROTOOPT: + err = 0 if err != 0: self.error = socket.error(err, os.strerror(err)) # IOLoop implementations may vary: some of them return @@ -1524,9 +1591,13 @@ class SSLIOStream(IOStream): def connect(self, address, callback=None, server_hostname=None): self._server_hostname = server_hostname - # Pass a dummy callback to super.connect(), which is slightly - # more efficient than letting it return a Future we ignore. - super(SSLIOStream, self).connect(address, callback=lambda: None) + # Ignore the result of connect(). If it fails, + # wait_for_handshake will raise an error too. This is + # necessary for the old semantics of the connect callback + # (which takes no arguments). In 6.0 this can be refactored to + # be a regular coroutine. + fut = super(SSLIOStream, self).connect(address) + fut.add_done_callback(lambda f: f.exception()) return self.wait_for_handshake(callback) def _handle_connect(self): @@ -1570,11 +1641,19 @@ class SSLIOStream(IOStream): handshake to complete). It may only be called once per stream. .. versionadded:: 4.2 + + .. deprecated:: 5.1 + + The ``callback`` argument is deprecated and will be removed + in Tornado 6.0. Use the returned `.Future` instead. + """ if (self._ssl_connect_callback is not None or self._ssl_connect_future is not None): raise RuntimeError("Already waiting") if callback is not None: + warnings.warn("callback argument is deprecated, use returned Future instead", + DeprecationWarning) self._ssl_connect_callback = stack_context.wrap(callback) future = None else: diff --git a/lib/tornado/locks.py b/lib/tornado/locks.py index 94adb322..9566a457 100644 --- a/lib/tornado/locks.py +++ b/lib/tornado/locks.py @@ -61,22 +61,19 @@ class Condition(_TimeoutGarbageCollector): condition = Condition() - @gen.coroutine - def waiter(): + async def waiter(): print("I'll wait right here") - yield condition.wait() # Yield a Future. + await condition.wait() print("I'm done waiting") - @gen.coroutine - def notifier(): + async def notifier(): print("About to notify") condition.notify() print("Done notifying") - @gen.coroutine - def runner(): - # Yield two Futures; wait for waiter() and notifier() to finish. - yield [waiter(), notifier()] + async def runner(): + # Wait for waiter() and notifier() in parallel + await gen.multi([waiter(), notifier()]) IOLoop.current().run_sync(runner) @@ -93,12 +90,12 @@ class Condition(_TimeoutGarbageCollector): io_loop = IOLoop.current() # Wait up to 1 second for a notification. - yield condition.wait(timeout=io_loop.time() + 1) + await condition.wait(timeout=io_loop.time() + 1) ...or a `datetime.timedelta` for a timeout relative to the current time:: # Wait up to 1 second. - yield condition.wait(timeout=datetime.timedelta(seconds=1)) + await condition.wait(timeout=datetime.timedelta(seconds=1)) The method returns False if there's no notification before the deadline. @@ -170,22 +167,19 @@ class Event(object): event = Event() - @gen.coroutine - def waiter(): + async def waiter(): print("Waiting for event") - yield event.wait() + await event.wait() print("Not waiting this time") - yield event.wait() + await event.wait() print("Done") - @gen.coroutine - def setter(): + async def setter(): print("About to set the event") event.set() - @gen.coroutine - def runner(): - yield [waiter(), setter()] + async def runner(): + await gen.multi([waiter(), setter()]) IOLoop.current().run_sync(runner) @@ -290,12 +284,11 @@ class Semaphore(_TimeoutGarbageCollector): # Ensure reliable doctest output: resolve Futures one at a time. futures_q = deque([Future() for _ in range(3)]) - @gen.coroutine - def simulator(futures): + async def simulator(futures): for f in futures: # simulate the asynchronous passage of time - yield gen.moment - yield gen.moment + await gen.sleep(0) + await gen.sleep(0) f.set_result(None) IOLoop.current().add_callback(simulator, list(futures_q)) @@ -311,20 +304,18 @@ class Semaphore(_TimeoutGarbageCollector): sem = Semaphore(2) - @gen.coroutine - def worker(worker_id): - yield sem.acquire() + async def worker(worker_id): + await sem.acquire() try: print("Worker %d is working" % worker_id) - yield use_some_resource() + await use_some_resource() finally: print("Worker %d is done" % worker_id) sem.release() - @gen.coroutine - def runner(): + async def runner(): # Join all workers. - yield [worker(i) for i in range(3)] + await gen.multi([worker(i) for i in range(3)]) IOLoop.current().run_sync(runner) @@ -340,7 +331,18 @@ class Semaphore(_TimeoutGarbageCollector): Workers 0 and 1 are allowed to run concurrently, but worker 2 waits until the semaphore has been released once, by worker 0. - `.acquire` is a context manager, so ``worker`` could be written as:: + The semaphore can be used as an async context manager:: + + async def worker(worker_id): + async with sem: + print("Worker %d is working" % worker_id) + await use_some_resource() + + # Now the semaphore has been released. + print("Worker %d is done" % worker_id) + + For compatibility with older versions of Python, `.acquire` is a + context manager, so ``worker`` could also be written as:: @gen.coroutine def worker(worker_id): @@ -351,19 +353,9 @@ class Semaphore(_TimeoutGarbageCollector): # Now the semaphore has been released. print("Worker %d is done" % worker_id) - In Python 3.5, the semaphore itself can be used as an async context - manager:: - - async def worker(worker_id): - async with sem: - print("Worker %d is working" % worker_id) - await use_some_resource() - - # Now the semaphore has been released. - print("Worker %d is done" % worker_id) - .. versionchanged:: 4.3 Added ``async with`` support in Python 3.5. + """ def __init__(self, value=1): super(Semaphore, self).__init__() @@ -464,26 +456,24 @@ class Lock(object): Releasing an unlocked lock raises `RuntimeError`. - `acquire` supports the context manager protocol in all Python versions: + A Lock can be used as an async context manager with the ``async + with`` statement: - >>> from tornado import gen, locks + >>> from tornado import locks >>> lock = locks.Lock() >>> - >>> @gen.coroutine - ... def f(): - ... with (yield lock.acquire()): + >>> async def f(): + ... async with lock: ... # Do something holding the lock. ... pass ... ... # Now the lock is released. - In Python 3.5, `Lock` also supports the async context manager - protocol. Note that in this case there is no `acquire`, because - ``async with`` includes both the ``yield`` and the ``acquire`` - (just as it does with `threading.Lock`): + For compatibility with older versions of Python, the `.acquire` + method asynchronously returns a regular context manager: - >>> async def f2(): # doctest: +SKIP - ... async with lock: + >>> async def f2(): + ... with (yield lock.acquire()): ... # Do something holding the lock. ... pass ... diff --git a/lib/tornado/netutil.py b/lib/tornado/netutil.py index 08c9d886..e63683ad 100644 --- a/lib/tornado/netutil.py +++ b/lib/tornado/netutil.py @@ -138,7 +138,12 @@ def bind_sockets(port, address=None, family=socket.AF_UNSPEC, raise set_close_exec(sock.fileno()) if os.name != 'nt': - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + try: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + except socket.error as e: + if errno_from_exception(e) != errno.ENOPROTOOPT: + # Hurd doesn't support SO_REUSEADDR. + raise if reuse_port: sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) if af == socket.AF_INET6: @@ -180,7 +185,12 @@ if hasattr(socket, 'AF_UNIX'): """ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) set_close_exec(sock.fileno()) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + try: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + except socket.error as e: + if errno_from_exception(e) != errno.ENOPROTOOPT: + # Hurd doesn't support SO_REUSEADDR + raise sock.setblocking(0) try: st = os.stat(file) diff --git a/lib/tornado/options.py b/lib/tornado/options.py index a6f77029..0a4b965f 100644 --- a/lib/tornado/options.py +++ b/lib/tornado/options.py @@ -91,7 +91,6 @@ instances to define isolated sets of options, such as for subcommands. options can be defined, set, and read with any mix of the two. Dashes are typical for command-line usage while config files require underscores. - """ from __future__ import absolute_import, division, print_function @@ -326,18 +325,20 @@ class OptionParser(object): the global namespace that matches a defined option will be used to set that option's value. - Options are not parsed from strings as they would be on the - command line; they should be set to the correct type (this - means if you have ``datetime`` or ``timedelta`` options you - will need to import those modules in the config file. + Options may either be the specified type for the option or + strings (in which case they will be parsed the same way as in + `.parse_command_line`) Example (using the options defined in the top-level docs of this module):: port = 80 mysql_host = 'mydb.example.com:3306' + # Both lists and comma-separated strings are allowed for + # multiple=True. memcache_hosts = ['cache1.example.com:11011', 'cache2.example.com:11011'] + memcache_hosts = 'cache1.example.com:11011,cache2.example.com:11011' If ``final`` is ``False``, parse callbacks will not be run. This is useful for applications that wish to combine configurations @@ -358,6 +359,9 @@ class OptionParser(object): The special variable ``__file__`` is available inside config files, specifying the absolute path to the config file itself. + .. versionchanged:: 5.1 + Added the ability to set options via strings in config files. + """ config = {'__file__': os.path.abspath(path)} with open(path, 'rb') as f: @@ -365,7 +369,17 @@ class OptionParser(object): for name in config: normalized = self._normalize_name(name) if normalized in self._options: - self._options[normalized].set(config[name]) + option = self._options[normalized] + if option.multiple: + if not isinstance(config[name], (list, str)): + raise Error("Option %r is required to be a list of %s " + "or a comma-separated string" % + (option.name, option.type.__name__)) + + if type(config[name]) == str and option.type != str: + option.parse(config[name]) + else: + option.set(config[name]) if final: self.run_parse_callbacks() diff --git a/lib/tornado/platform/asyncio.py b/lib/tornado/platform/asyncio.py index b6a490af..e0042e1d 100644 --- a/lib/tornado/platform/asyncio.py +++ b/lib/tornado/platform/asyncio.py @@ -62,8 +62,13 @@ class BaseAsyncIOLoop(IOLoop): self.remove_handler(fd) if all_fds: self.close_fd(fileobj) - self.asyncio_loop.close() + # Remove the mapping before closing the asyncio loop. If this + # happened in the other order, we could race against another + # initialize() call which would see the closed asyncio loop, + # assume it was closed from the asyncio side, and do this + # cleanup for us, leading to a KeyError. del IOLoop._ioloop_for_asyncio[self.asyncio_loop] + self.asyncio_loop.close() def add_handler(self, fd, handler, events): fd, fileobj = self.split_fd(fd) diff --git a/lib/tornado/platform/twisted.py b/lib/tornado/platform/twisted.py index 4ae98be9..b38a755c 100644 --- a/lib/tornado/platform/twisted.py +++ b/lib/tornado/platform/twisted.py @@ -124,6 +124,13 @@ class TornadoReactor(PosixReactorBase): .. versionchanged:: 5.0 The ``io_loop`` argument (deprecated since version 4.1) has been removed. + + .. deprecated:: 5.1 + + This class will be removed in Tornado 6.0. Use + ``twisted.internet.asyncioreactor.AsyncioSelectorReactor`` + instead. + """ def __init__(self): self._io_loop = tornado.ioloop.IOLoop.current() @@ -350,6 +357,10 @@ def install(): .. versionchanged:: 5.0 The ``io_loop`` argument (deprecated since version 4.1) has been removed. + .. deprecated:: 5.1 + + This functio will be removed in Tornado 6.0. Use + ``twisted.internet.asyncioreactor.install`` instead. """ reactor = TornadoReactor() from twisted.internet.main import installReactor # type: ignore @@ -411,6 +422,11 @@ class TwistedIOLoop(tornado.ioloop.IOLoop): See also :meth:`tornado.ioloop.IOLoop.install` for general notes on installing alternative IOLoops. + + .. deprecated:: 5.1 + + The `asyncio` event loop will be the only available implementation in + Tornado 6.0. """ def initialize(self, reactor=None, **kwargs): super(TwistedIOLoop, self).initialize(**kwargs) diff --git a/lib/tornado/queues.py b/lib/tornado/queues.py index 23b8bb9c..7cb96bfc 100644 --- a/lib/tornado/queues.py +++ b/lib/tornado/queues.py @@ -79,28 +79,24 @@ class Queue(object): q = Queue(maxsize=2) - @gen.coroutine - def consumer(): - while True: - item = yield q.get() + async def consumer(): + async for item in q: try: print('Doing work on %s' % item) - yield gen.sleep(0.01) + await gen.sleep(0.01) finally: q.task_done() - @gen.coroutine - def producer(): + async def producer(): for item in range(5): - yield q.put(item) + await q.put(item) print('Put %s' % item) - @gen.coroutine - def main(): + async def main(): # Start consumer without waiting (since it never finishes). IOLoop.current().spawn_callback(consumer) - yield producer() # Wait for producer to put all tasks. - yield q.join() # Wait for consumer to finish all tasks. + await producer() # Wait for producer to put all tasks. + await q.join() # Wait for consumer to finish all tasks. print('Done') IOLoop.current().run_sync(main) @@ -119,11 +115,14 @@ class Queue(object): Doing work on 4 Done - In Python 3.5, `Queue` implements the async iterator protocol, so - ``consumer()`` could be rewritten as:: - async def consumer(): - async for item in q: + In versions of Python without native coroutines (before 3.5), + ``consumer()`` could be written as:: + + @gen.coroutine + def consumer(): + while True: + item = yield q.get() try: print('Doing work on %s' % item) yield gen.sleep(0.01) diff --git a/lib/tornado/simple_httpclient.py b/lib/tornado/simple_httpclient.py index 7696dd18..60b7956f 100644 --- a/lib/tornado/simple_httpclient.py +++ b/lib/tornado/simple_httpclient.py @@ -1,6 +1,6 @@ from __future__ import absolute_import, division, print_function -from tornado.escape import utf8, _unicode +from tornado.escape import _unicode from tornado import gen from tornado.httpclient import HTTPResponse, HTTPError, AsyncHTTPClient, main, _RequestProxy from tornado import httputil @@ -20,6 +20,7 @@ import functools import re import socket import sys +import time from io import BytesIO @@ -215,6 +216,7 @@ class _HTTPConnection(httputil.HTTPMessageDelegate): max_header_size, max_body_size): self.io_loop = IOLoop.current() self.start_time = self.io_loop.time() + self.start_wall_time = time.time() self.client = client self.request = request self.release_callback = release_callback @@ -230,7 +232,11 @@ class _HTTPConnection(httputil.HTTPMessageDelegate): # Timeout handle returned by IOLoop.add_timeout self._timeout = None self._sockaddr = None - with stack_context.ExceptionStackContext(self._handle_exception): + IOLoop.current().add_callback(self.run) + + @gen.coroutine + def run(self): + try: self.parsed = urlparse.urlsplit(_unicode(self.request.url)) if self.parsed.scheme not in ("http", "https"): raise ValueError("Unsupported url scheme: %s" % @@ -248,7 +254,7 @@ class _HTTPConnection(httputil.HTTPMessageDelegate): host = host[1:-1] self.parsed_hostname = host # save final host for _on_connect - if request.allow_ipv6 is False: + if self.request.allow_ipv6 is False: af = socket.AF_INET else: af = socket.AF_UNSPEC @@ -260,10 +266,93 @@ class _HTTPConnection(httputil.HTTPMessageDelegate): self._timeout = self.io_loop.add_timeout( self.start_time + timeout, stack_context.wrap(functools.partial(self._on_timeout, "while connecting"))) - fut = self.tcp_client.connect(host, port, af=af, - ssl_options=ssl_options, - max_buffer_size=self.max_buffer_size) - fut.add_done_callback(stack_context.wrap(self._on_connect)) + stream = yield self.tcp_client.connect( + host, port, af=af, + ssl_options=ssl_options, + max_buffer_size=self.max_buffer_size) + + if self.final_callback is None: + # final_callback is cleared if we've hit our timeout. + stream.close() + return + self.stream = stream + self.stream.set_close_callback(self.on_connection_close) + self._remove_timeout() + if self.final_callback is None: + return + if self.request.request_timeout: + self._timeout = self.io_loop.add_timeout( + self.start_time + self.request.request_timeout, + stack_context.wrap(functools.partial(self._on_timeout, "during request"))) + if (self.request.method not in self._SUPPORTED_METHODS and + not self.request.allow_nonstandard_methods): + raise KeyError("unknown method %s" % self.request.method) + for key in ('network_interface', + 'proxy_host', 'proxy_port', + 'proxy_username', 'proxy_password', + 'proxy_auth_mode'): + if getattr(self.request, key, None): + raise NotImplementedError('%s not supported' % key) + if "Connection" not in self.request.headers: + self.request.headers["Connection"] = "close" + if "Host" not in self.request.headers: + if '@' in self.parsed.netloc: + self.request.headers["Host"] = self.parsed.netloc.rpartition('@')[-1] + else: + self.request.headers["Host"] = self.parsed.netloc + username, password = None, None + if self.parsed.username is not None: + username, password = self.parsed.username, self.parsed.password + elif self.request.auth_username is not None: + username = self.request.auth_username + password = self.request.auth_password or '' + if username is not None: + if self.request.auth_mode not in (None, "basic"): + raise ValueError("unsupported auth_mode %s", + self.request.auth_mode) + self.request.headers["Authorization"] = ( + b"Basic " + base64.b64encode( + httputil.encode_username_password(username, password))) + if self.request.user_agent: + self.request.headers["User-Agent"] = self.request.user_agent + if not self.request.allow_nonstandard_methods: + # Some HTTP methods nearly always have bodies while others + # almost never do. Fail in this case unless the user has + # opted out of sanity checks with allow_nonstandard_methods. + body_expected = self.request.method in ("POST", "PATCH", "PUT") + body_present = (self.request.body is not None or + self.request.body_producer is not None) + if ((body_expected and not body_present) or + (body_present and not body_expected)): + raise ValueError( + 'Body must %sbe None for method %s (unless ' + 'allow_nonstandard_methods is true)' % + ('not ' if body_expected else '', self.request.method)) + if self.request.expect_100_continue: + self.request.headers["Expect"] = "100-continue" + if self.request.body is not None: + # When body_producer is used the caller is responsible for + # setting Content-Length (or else chunked encoding will be used). + self.request.headers["Content-Length"] = str(len( + self.request.body)) + if (self.request.method == "POST" and + "Content-Type" not in self.request.headers): + self.request.headers["Content-Type"] = "application/x-www-form-urlencoded" + if self.request.decompress_response: + self.request.headers["Accept-Encoding"] = "gzip" + req_path = ((self.parsed.path or '/') + + (('?' + self.parsed.query) if self.parsed.query else '')) + self.connection = self._create_connection(stream) + start_line = httputil.RequestStartLine(self.request.method, + req_path, '') + self.connection.write_headers(start_line, self.request.headers) + if self.request.expect_100_continue: + yield self.connection.read_response(self) + else: + yield self._write_body(True) + except Exception: + if not self._handle_exception(*sys.exc_info()): + raise def _get_ssl_options(self, scheme): if scheme == "https": @@ -301,95 +390,14 @@ class _HTTPConnection(httputil.HTTPMessageDelegate): self._timeout = None error_message = "Timeout {0}".format(info) if info else "Timeout" if self.final_callback is not None: - raise HTTPTimeoutError(error_message) + self._handle_exception(HTTPTimeoutError, HTTPTimeoutError(error_message), + None) def _remove_timeout(self): if self._timeout is not None: self.io_loop.remove_timeout(self._timeout) self._timeout = None - def _on_connect(self, stream_fut): - stream = stream_fut.result() - if self.final_callback is None: - # final_callback is cleared if we've hit our timeout. - stream.close() - return - self.stream = stream - self.stream.set_close_callback(self.on_connection_close) - self._remove_timeout() - if self.final_callback is None: - return - if self.request.request_timeout: - self._timeout = self.io_loop.add_timeout( - self.start_time + self.request.request_timeout, - stack_context.wrap(functools.partial(self._on_timeout, "during request"))) - if (self.request.method not in self._SUPPORTED_METHODS and - not self.request.allow_nonstandard_methods): - raise KeyError("unknown method %s" % self.request.method) - for key in ('network_interface', - 'proxy_host', 'proxy_port', - 'proxy_username', 'proxy_password', - 'proxy_auth_mode'): - if getattr(self.request, key, None): - raise NotImplementedError('%s not supported' % key) - if "Connection" not in self.request.headers: - self.request.headers["Connection"] = "close" - if "Host" not in self.request.headers: - if '@' in self.parsed.netloc: - self.request.headers["Host"] = self.parsed.netloc.rpartition('@')[-1] - else: - self.request.headers["Host"] = self.parsed.netloc - username, password = None, None - if self.parsed.username is not None: - username, password = self.parsed.username, self.parsed.password - elif self.request.auth_username is not None: - username = self.request.auth_username - password = self.request.auth_password or '' - if username is not None: - if self.request.auth_mode not in (None, "basic"): - raise ValueError("unsupported auth_mode %s", - self.request.auth_mode) - auth = utf8(username) + b":" + utf8(password) - self.request.headers["Authorization"] = (b"Basic " + - base64.b64encode(auth)) - if self.request.user_agent: - self.request.headers["User-Agent"] = self.request.user_agent - if not self.request.allow_nonstandard_methods: - # Some HTTP methods nearly always have bodies while others - # almost never do. Fail in this case unless the user has - # opted out of sanity checks with allow_nonstandard_methods. - body_expected = self.request.method in ("POST", "PATCH", "PUT") - body_present = (self.request.body is not None or - self.request.body_producer is not None) - if ((body_expected and not body_present) or - (body_present and not body_expected)): - raise ValueError( - 'Body must %sbe None for method %s (unless ' - 'allow_nonstandard_methods is true)' % - ('not ' if body_expected else '', self.request.method)) - if self.request.expect_100_continue: - self.request.headers["Expect"] = "100-continue" - if self.request.body is not None: - # When body_producer is used the caller is responsible for - # setting Content-Length (or else chunked encoding will be used). - self.request.headers["Content-Length"] = str(len( - self.request.body)) - if (self.request.method == "POST" and - "Content-Type" not in self.request.headers): - self.request.headers["Content-Type"] = "application/x-www-form-urlencoded" - if self.request.decompress_response: - self.request.headers["Accept-Encoding"] = "gzip" - req_path = ((self.parsed.path or '/') + - (('?' + self.parsed.query) if self.parsed.query else '')) - self.connection = self._create_connection(stream) - start_line = httputil.RequestStartLine(self.request.method, - req_path, '') - self.connection.write_headers(start_line, self.request.headers) - if self.request.expect_100_continue: - self._read_response() - else: - self._write_body(True) - def _create_connection(self, stream): stream.set_nodelay(True) connection = HTTP1Connection( @@ -402,31 +410,21 @@ class _HTTPConnection(httputil.HTTPMessageDelegate): self._sockaddr) return connection + @gen.coroutine def _write_body(self, start_read): if self.request.body is not None: self.connection.write(self.request.body) elif self.request.body_producer is not None: fut = self.request.body_producer(self.connection.write) if fut is not None: - fut = gen.convert_yielded(fut) - - def on_body_written(fut): - fut.result() - self.connection.finish() - if start_read: - self._read_response() - self.io_loop.add_future(fut, on_body_written) - return + yield fut self.connection.finish() if start_read: - self._read_response() - - def _read_response(self): - # Ensure that any exception raised in read_response ends up in our - # stack context. - self.io_loop.add_future( - self.connection.read_response(self), - lambda f: f.result()) + try: + yield self.connection.read_response(self) + except StreamClosedError: + if not self._handle_exception(*sys.exc_info()): + raise def _release(self): if self.release_callback is not None: @@ -451,6 +449,7 @@ class _HTTPConnection(httputil.HTTPMessageDelegate): value = value.real_error self._run_callback(HTTPResponse(self.request, 599, error=value, request_time=self.io_loop.time() - self.start_time, + start_time=self.start_wall_time, )) if hasattr(self, "stream"): @@ -543,6 +542,7 @@ class _HTTPConnection(httputil.HTTPMessageDelegate): self.code, reason=getattr(self, 'reason', None), headers=self.headers, request_time=self.io_loop.time() - self.start_time, + start_time=self.start_wall_time, buffer=buffer, effective_url=self.request.url) self._run_callback(response) diff --git a/lib/tornado/stack_context.py b/lib/tornado/stack_context.py index 2f26f384..a1eca4c7 100644 --- a/lib/tornado/stack_context.py +++ b/lib/tornado/stack_context.py @@ -64,12 +64,18 @@ Here are a few rules of thumb for when it's necessary: persist across asynchronous calls, create a new `StackContext` (or `ExceptionStackContext`), and make your asynchronous calls in a ``with`` block that references your `StackContext`. + +.. deprecated:: 5.1 + + The ``stack_context`` package is deprecated and will be removed in + Tornado 6.0. """ from __future__ import absolute_import, division, print_function import sys import threading +import warnings from tornado.util import raise_exc_info @@ -107,6 +113,8 @@ class StackContext(object): and not necessary in most applications. """ def __init__(self, context_factory): + warnings.warn("StackContext is deprecated and will be removed in Tornado 6.0", + DeprecationWarning) self.context_factory = context_factory self.contexts = [] self.active = True @@ -174,8 +182,20 @@ class ExceptionStackContext(object): If the exception handler returns true, the exception will be consumed and will not be propagated to other exception handlers. + + .. versionadded:: 5.1 + + The ``delay_warning`` argument can be used to delay the emission + of DeprecationWarnings until an exception is caught by the + ``ExceptionStackContext``, which facilitates certain transitional + use cases. """ - def __init__(self, exception_handler): + def __init__(self, exception_handler, delay_warning=False): + self.delay_warning = delay_warning + if not self.delay_warning: + warnings.warn( + "StackContext is deprecated and will be removed in Tornado 6.0", + DeprecationWarning) self.exception_handler = exception_handler self.active = True @@ -184,6 +204,10 @@ class ExceptionStackContext(object): def exit(self, type, value, traceback): if type is not None: + if self.delay_warning: + warnings.warn( + "StackContext is deprecated and will be removed in Tornado 6.0", + DeprecationWarning) return self.exception_handler(type, value, traceback) def __enter__(self): diff --git a/lib/tornado/tcpserver.py b/lib/tornado/tcpserver.py index 1adc4895..4f5d6f03 100644 --- a/lib/tornado/tcpserver.py +++ b/lib/tornado/tcpserver.py @@ -46,12 +46,11 @@ class TCPServer(object): from tornado import gen class EchoServer(TCPServer): - @gen.coroutine - def handle_stream(self, stream, address): + async def handle_stream(self, stream, address): while True: try: - data = yield stream.read_until(b"\n") - yield stream.write(data) + data = await stream.read_until(b"\n") + await stream.write(data) except StreamClosedError: break diff --git a/lib/tornado/testing.py b/lib/tornado/testing.py index 04ea3816..d6e5e947 100644 --- a/lib/tornado/testing.py +++ b/lib/tornado/testing.py @@ -144,15 +144,16 @@ class AsyncTestCase(unittest.TestCase): asynchronous code. The unittest framework is synchronous, so the test must be - complete by the time the test method returns. This means that - asynchronous code cannot be used in quite the same way as usual. - To write test functions that use the same ``yield``-based patterns - used with the `tornado.gen` module, decorate your test methods - with `tornado.testing.gen_test` instead of - `tornado.gen.coroutine`. This class also provides the `stop()` - and `wait()` methods for a more manual style of testing. The test - method itself must call ``self.wait()``, and asynchronous - callbacks should call ``self.stop()`` to signal completion. + complete by the time the test method returns. This means that + asynchronous code cannot be used in quite the same way as usual + and must be adapted to fit. To write your tests with coroutines, + decorate your test methods with `tornado.testing.gen_test` instead + of `tornado.gen.coroutine`. + + This class also provides the (deprecated) `stop()` and `wait()` + methods for a more manual style of testing. The test method itself + must call ``self.wait()``, and asynchronous callbacks should call + ``self.stop()`` to signal completion. By default, a new `.IOLoop` is constructed for each test and is available as ``self.io_loop``. If the code being tested requires a @@ -183,22 +184,6 @@ class AsyncTestCase(unittest.TestCase): response = self.wait() # Test contents of response self.assertIn("FriendFeed", response.body) - - # This test uses an explicit callback-based style. - class MyTestCase3(AsyncTestCase): - def test_http_fetch(self): - client = AsyncHTTPClient() - client.fetch("http://www.tornadoweb.org/", self.handle_fetch) - self.wait() - - def handle_fetch(self, response): - # Test contents of response (failures and exceptions here - # will cause self.wait() to throw an exception and end the - # test). - # Exceptions thrown here are magically propagated to - # self.wait() in test_http_fetch() via stack_context. - self.assertIn("FriendFeed", response.body) - self.stop() """ def __init__(self, methodName='runTest'): super(AsyncTestCase, self).__init__(methodName) @@ -265,7 +250,7 @@ class AsyncTestCase(unittest.TestCase): raise_exc_info(failure) def run(self, result=None): - with ExceptionStackContext(self._handle_exception): + with ExceptionStackContext(self._handle_exception, delay_warning=True): super(AsyncTestCase, self).run(result) # As a last resort, if an exception escaped super.run() and wasn't # re-raised in tearDown, raise it here. This will cause the @@ -279,6 +264,10 @@ class AsyncTestCase(unittest.TestCase): Keyword arguments or a single positional argument passed to `stop()` are saved and will be returned by `wait()`. + + .. deprecated:: 5.1 + + `stop` and `wait` are deprecated; use ``@gen_test`` instead. """ assert _arg is None or not kwargs self.__stop_args = kwargs or _arg @@ -300,6 +289,10 @@ class AsyncTestCase(unittest.TestCase): .. versionchanged:: 3.1 Added the ``ASYNC_TEST_TIMEOUT`` environment variable. + + .. deprecated:: 5.1 + + `stop` and `wait` are deprecated; use ``@gen_test`` instead. """ if timeout is None: timeout = get_async_test_timeout() diff --git a/lib/tornado/web.py b/lib/tornado/web.py index 4f427729..6760b0b9 100644 --- a/lib/tornado/web.py +++ b/lib/tornado/web.py @@ -78,6 +78,7 @@ import time import tornado import traceback import types +import warnings from inspect import isclass from io import BytesIO @@ -542,6 +543,10 @@ class RequestHandler(object): Newly-set cookies are not immediately visible via `get_cookie`; they are not present until the next request. + expires may be a numeric timestamp as returned by `time.time`, + a time tuple as returned by `time.gmtime`, or a + `datetime.datetime` object. + Additional keyword arguments are set on the cookies.Morsel directly. See https://docs.python.org/3/library/http.cookies.html#http.cookies.Morsel @@ -744,7 +749,18 @@ class RequestHandler(object): self._write_buffer.append(chunk) def render(self, template_name, **kwargs): - """Renders the template with the given arguments as the response.""" + """Renders the template with the given arguments as the response. + + ``render()`` calls ``finish()``, so no other output methods can be called + after it. + + Returns a `.Future` with the same semantics as the one returned by `finish`. + Awaiting this `.Future` is optional. + + .. versionchanged:: 5.1 + + Now returns a `.Future` instead of ``None``. + """ if self._finished: raise RuntimeError("Cannot render() after finish()") html = self.render_string(template_name, **kwargs) @@ -805,7 +821,7 @@ class RequestHandler(object): if html_bodies: hloc = html.index(b'') html = html[:hloc] + b''.join(html_bodies) + b'\n' + html[hloc:] - self.finish(html) + return self.finish(html) def render_linked_js(self, js_files): """Default method used to render the final js links for the @@ -945,6 +961,11 @@ class RequestHandler(object): .. versionchanged:: 4.0 Now returns a `.Future` if no callback is given. + + .. deprecated:: 5.1 + + The ``callback`` argument is deprecated and will be removed in + Tornado 6.0. """ chunk = b"".join(self._write_buffer) self._write_buffer = [] @@ -983,7 +1004,20 @@ class RequestHandler(object): return future def finish(self, chunk=None): - """Finishes this response, ending the HTTP request.""" + """Finishes this response, ending the HTTP request. + + Passing a ``chunk`` to ``finish()`` is equivalent to passing that + chunk to ``write()`` and then calling ``finish()`` with no arguments. + + Returns a `.Future` which may optionally be awaited to track the sending + of the response to the client. This `.Future` resolves when all the response + data has been sent, and raises an error if the connection is closed before all + data can be sent. + + .. versionchanged:: 5.1 + + Now returns a `.Future` instead of ``None``. + """ if self._finished: raise RuntimeError("finish() called twice") @@ -1015,12 +1049,27 @@ class RequestHandler(object): # are keepalive connections) self.request.connection.set_close_callback(None) - self.flush(include_footers=True) - self.request.finish() + future = self.flush(include_footers=True) + self.request.connection.finish() self._log() self._finished = True self.on_finish() self._break_cycles() + return future + + def detach(self): + """Take control of the underlying stream. + + Returns the underlying `.IOStream` object and stops all + further HTTP processing. Intended for implementing protocols + like websockets that tunnel over an HTTP handshake. + + This method is only supported when HTTP/1.1 is used. + + .. versionadded:: 5.1 + """ + self._finished = True + return self.request.connection.detach() def _break_cycles(self): # Break up a reference cycle between this handler and the @@ -1688,7 +1737,14 @@ def asynchronous(method): .. versionchanged:: 4.3 Returning anything but ``None`` or a yieldable object from a method decorated with ``@asynchronous`` is an error. Such return values were previously ignored silently. + + .. deprecated:: 5.1 + + This decorator is deprecated and will be removed in Tornado 6.0. + Use coroutines instead. """ + warnings.warn("@asynchronous is deprecated, use coroutines instead", + DeprecationWarning) # Delay the IOLoop import because it's not available on app engine. from tornado.ioloop import IOLoop @@ -1696,7 +1752,7 @@ def asynchronous(method): def wrapper(self, *args, **kwargs): self._auto_finish = False with stack_context.ExceptionStackContext( - self._stack_context_handle_exception): + self._stack_context_handle_exception, delay_warning=True): result = method(self, *args, **kwargs) if result is not None: result = gen.convert_yielded(result) diff --git a/lib/tornado/websocket.py b/lib/tornado/websocket.py index 738a9ccb..0b994fc1 100644 --- a/lib/tornado/websocket.py +++ b/lib/tornado/websocket.py @@ -21,6 +21,7 @@ from __future__ import absolute_import, division, print_function import base64 import hashlib import os +import sys import struct import tornado.escape import tornado.web @@ -31,7 +32,7 @@ from tornado.escape import utf8, native_str, to_unicode from tornado import gen, httpclient, httputil from tornado.ioloop import IOLoop, PeriodicCallback from tornado.iostream import StreamClosedError -from tornado.log import gen_log, app_log +from tornado.log import gen_log from tornado import simple_httpclient from tornado.queues import Queue from tornado.tcpclient import TCPClient @@ -43,6 +44,8 @@ if PY3: else: from urlparse import urlparse # py3 +_default_max_message_size = 10 * 1024 * 1024 + class WebSocketError(Exception): pass @@ -56,6 +59,10 @@ class WebSocketClosedError(WebSocketError): pass +class _DecompressTooLargeError(Exception): + pass + + class WebSocketHandler(tornado.web.RequestHandler): """Subclass this class to create a basic WebSocket handler. @@ -145,7 +152,6 @@ class WebSocketHandler(tornado.web.RequestHandler): self.stream = None self._on_close_called = False - @tornado.web.asynchronous def get(self, *args, **kwargs): self.open_args = args self.open_kwargs = kwargs @@ -225,7 +231,7 @@ class WebSocketHandler(tornado.web.RequestHandler): Default is 10MiB. """ - return self.settings.get('websocket_max_message_size', None) + return self.settings.get('websocket_max_message_size', _default_max_message_size) def write_message(self, message, binary=False): """Sends the given message to the client of this Web Socket. @@ -256,18 +262,38 @@ class WebSocketHandler(tornado.web.RequestHandler): return self.ws_connection.write_message(message, binary=binary) def select_subprotocol(self, subprotocols): - """Invoked when a new WebSocket requests specific subprotocols. + """Override to implement subprotocol negotiation. ``subprotocols`` is a list of strings identifying the subprotocols proposed by the client. This method may be overridden to return one of those strings to select it, or - ``None`` to not select a subprotocol. Failure to select a - subprotocol does not automatically abort the connection, - although clients may close the connection if none of their - proposed subprotocols was selected. + ``None`` to not select a subprotocol. + + Failure to select a subprotocol does not automatically abort + the connection, although clients may close the connection if + none of their proposed subprotocols was selected. + + The list may be empty, in which case this method must return + None. This method is always called exactly once even if no + subprotocols were proposed so that the handler can be advised + of this fact. + + .. versionchanged:: 5.1 + + Previously, this method was called with a list containing + an empty string instead of an empty list if no subprotocols + were proposed by the client. """ return None + @property + def selected_subprotocol(self): + """The subprotocol returned by `select_subprotocol`. + + .. versionadded:: 5.1 + """ + return self.ws_connection.selected_subprotocol + def get_compression_options(self): """Override to return compression options for the connection. @@ -298,6 +324,13 @@ class WebSocketHandler(tornado.web.RequestHandler): The arguments to `open` are extracted from the `tornado.web.URLSpec` regular expression, just like the arguments to `tornado.web.RequestHandler.get`. + + `open` may be a coroutine. `on_message` will not be called until + `open` has returned. + + .. versionchanged:: 5.1 + + ``open`` may be a coroutine. """ pass @@ -481,7 +514,7 @@ class WebSocketHandler(tornado.web.RequestHandler): self, compression_options=self.get_compression_options()) def _attach_stream(self): - self.stream = self.request.connection.detach() + self.stream = self.detach() self.stream.set_close_callback(self.on_connection_close) # disable non-WS methods for method in ["write", "redirect", "set_header", "set_cookie", @@ -512,8 +545,7 @@ class WebSocketProtocol(object): try: result = callback(*args, **kwargs) except Exception: - app_log.error("Uncaught exception in %s", - getattr(self.request, 'path', None), exc_info=True) + self.handler.log_exception(*sys.exc_info()) self._abort() else: if result is not None: @@ -570,7 +602,8 @@ class _PerMessageDeflateCompressor(object): class _PerMessageDeflateDecompressor(object): - def __init__(self, persistent, max_wbits, compression_options=None): + def __init__(self, persistent, max_wbits, max_message_size, compression_options=None): + self._max_message_size = max_message_size if max_wbits is None: max_wbits = zlib.MAX_WBITS if not (8 <= max_wbits <= zlib.MAX_WBITS): @@ -587,7 +620,10 @@ class _PerMessageDeflateDecompressor(object): def decompress(self, data): decompressor = self._decompressor or self._create_decompressor() - return decompressor.decompress(data + b'\x00\x00\xff\xff') + result = decompressor.decompress(data + b'\x00\x00\xff\xff', self._max_message_size) + if decompressor.unconsumed_tail: + raise _DecompressTooLargeError() + return result class WebSocketProtocol13(WebSocketProtocol): @@ -675,13 +711,17 @@ class WebSocketProtocol13(WebSocketProtocol): return WebSocketProtocol13.compute_accept_value( self.request.headers.get("Sec-Websocket-Key")) + @gen.coroutine def _accept_connection(self): - subprotocols = [s.strip() for s in self.request.headers.get_list("Sec-WebSocket-Protocol")] - if subprotocols: - selected = self.handler.select_subprotocol(subprotocols) - if selected: - assert selected in subprotocols - self.handler.set_header("Sec-WebSocket-Protocol", selected) + subprotocol_header = self.request.headers.get("Sec-WebSocket-Protocol") + if subprotocol_header: + subprotocols = [s.strip() for s in subprotocol_header.split(',')] + else: + subprotocols = [] + self.selected_subprotocol = self.handler.select_subprotocol(subprotocols) + if self.selected_subprotocol: + assert self.selected_subprotocol in subprotocols + self.handler.set_header("Sec-WebSocket-Protocol", self.selected_subprotocol) extensions = self._parse_extensions_header(self.request.headers) for ext in extensions: @@ -711,9 +751,11 @@ class WebSocketProtocol13(WebSocketProtocol): self.stream = self.handler.stream self.start_pinging() - self._run_callback(self.handler.open, *self.handler.open_args, - **self.handler.open_kwargs) - self._receive_frame() + open_result = self._run_callback(self.handler.open, *self.handler.open_args, + **self.handler.open_kwargs) + if open_result is not None: + yield open_result + yield self._receive_frame_loop() def _parse_extensions_header(self, headers): extensions = headers.get("Sec-WebSocket-Extensions", '') @@ -740,6 +782,8 @@ class WebSocketProtocol13(WebSocketProtocol): else: raise ValueError("unsupported extension %r", ext) + self.selected_subprotocol = headers.get('Sec-WebSocket-Protocol', None) + def _get_compressor_options(self, side, agreed_parameters, compression_options=None): """Converts a websocket agreed_parameters set to keyword arguments for our compressor objects. @@ -767,6 +811,7 @@ class WebSocketProtocol13(WebSocketProtocol): self._compressor = _PerMessageDeflateCompressor( **self._get_compressor_options(side, agreed_parameters, compression_options)) self._decompressor = _PerMessageDeflateDecompressor( + max_message_size=self.handler.max_message_size, **self._get_compressor_options(other_side, agreed_parameters, compression_options)) def _write_frame(self, fin, opcode, data, flags=0): @@ -836,111 +881,84 @@ class WebSocketProtocol13(WebSocketProtocol): assert isinstance(data, bytes) self._write_frame(True, 0x9, data) - def _receive_frame(self): + @gen.coroutine + def _receive_frame_loop(self): try: - self.stream.read_bytes(2, self._on_frame_start) + while not self.client_terminated: + yield self._receive_frame() except StreamClosedError: self._abort() - def _on_frame_start(self, data): - self._wire_bytes_in += len(data) - header, payloadlen = struct.unpack("BB", data) - self._final_frame = header & self.FIN + def _read_bytes(self, n): + self._wire_bytes_in += n + return self.stream.read_bytes(n) + + @gen.coroutine + def _receive_frame(self): + # Read the frame header. + data = yield self._read_bytes(2) + header, mask_payloadlen = struct.unpack("BB", data) + is_final_frame = header & self.FIN reserved_bits = header & self.RSV_MASK - self._frame_opcode = header & self.OPCODE_MASK - self._frame_opcode_is_control = self._frame_opcode & 0x8 - if self._decompressor is not None and self._frame_opcode != 0: + opcode = header & self.OPCODE_MASK + opcode_is_control = opcode & 0x8 + if self._decompressor is not None and opcode != 0: + # Compression flag is present in the first frame's header, + # but we can't decompress until we have all the frames of + # the message. self._frame_compressed = bool(reserved_bits & self.RSV1) reserved_bits &= ~self.RSV1 if reserved_bits: # client is using as-yet-undefined extensions; abort self._abort() return - self._masked_frame = bool(payloadlen & 0x80) - payloadlen = payloadlen & 0x7f - if self._frame_opcode_is_control and payloadlen >= 126: + is_masked = bool(mask_payloadlen & 0x80) + payloadlen = mask_payloadlen & 0x7f + + # Parse and validate the length. + if opcode_is_control and payloadlen >= 126: # control frames must have payload < 126 self._abort() return - try: - if payloadlen < 126: - self._frame_length = payloadlen - if self._masked_frame: - self.stream.read_bytes(4, self._on_masking_key) - else: - self._read_frame_data(False) - elif payloadlen == 126: - self.stream.read_bytes(2, self._on_frame_length_16) - elif payloadlen == 127: - self.stream.read_bytes(8, self._on_frame_length_64) - except StreamClosedError: - self._abort() - - def _read_frame_data(self, masked): - new_len = self._frame_length + if payloadlen < 126: + self._frame_length = payloadlen + elif payloadlen == 126: + data = yield self._read_bytes(2) + payloadlen = struct.unpack("!H", data)[0] + elif payloadlen == 127: + data = yield self._read_bytes(8) + payloadlen = struct.unpack("!Q", data)[0] + new_len = payloadlen if self._fragmented_message_buffer is not None: new_len += len(self._fragmented_message_buffer) - if new_len > (self.handler.max_message_size or 10 * 1024 * 1024): + if new_len > self.handler.max_message_size: self.close(1009, "message too big") + self._abort() return - self.stream.read_bytes( - self._frame_length, - self._on_masked_frame_data if masked else self._on_frame_data) - def _on_frame_length_16(self, data): - self._wire_bytes_in += len(data) - self._frame_length = struct.unpack("!H", data)[0] - try: - if self._masked_frame: - self.stream.read_bytes(4, self._on_masking_key) - else: - self._read_frame_data(False) - except StreamClosedError: - self._abort() + # Read the payload, unmasking if necessary. + if is_masked: + self._frame_mask = yield self._read_bytes(4) + data = yield self._read_bytes(payloadlen) + if is_masked: + data = _websocket_mask(self._frame_mask, data) - def _on_frame_length_64(self, data): - self._wire_bytes_in += len(data) - self._frame_length = struct.unpack("!Q", data)[0] - try: - if self._masked_frame: - self.stream.read_bytes(4, self._on_masking_key) - else: - self._read_frame_data(False) - except StreamClosedError: - self._abort() - - def _on_masking_key(self, data): - self._wire_bytes_in += len(data) - self._frame_mask = data - try: - self._read_frame_data(True) - except StreamClosedError: - self._abort() - - def _on_masked_frame_data(self, data): - # Don't touch _wire_bytes_in; we'll do it in _on_frame_data. - self._on_frame_data(_websocket_mask(self._frame_mask, data)) - - def _on_frame_data(self, data): - handled_future = None - - self._wire_bytes_in += len(data) - if self._frame_opcode_is_control: + # Decide what to do with this frame. + if opcode_is_control: # control frames may be interleaved with a series of fragmented # data frames, so control frames must not interact with # self._fragmented_* - if not self._final_frame: + if not is_final_frame: # control frames must not be fragmented self._abort() return - opcode = self._frame_opcode - elif self._frame_opcode == 0: # continuation frame + elif opcode == 0: # continuation frame if self._fragmented_message_buffer is None: # nothing to continue self._abort() return self._fragmented_message_buffer += data - if self._final_frame: + if is_final_frame: opcode = self._fragmented_message_opcode data = self._fragmented_message_buffer self._fragmented_message_buffer = None @@ -949,22 +967,14 @@ class WebSocketProtocol13(WebSocketProtocol): # can't start new message until the old one is finished self._abort() return - if self._final_frame: - opcode = self._frame_opcode - else: - self._fragmented_message_opcode = self._frame_opcode + if not is_final_frame: + self._fragmented_message_opcode = opcode self._fragmented_message_buffer = data - if self._final_frame: + if is_final_frame: handled_future = self._handle_message(opcode, data) - - if not self.client_terminated: - if handled_future: - # on_message is a coroutine, process more frames once it's done. - handled_future.add_done_callback( - lambda future: self._receive_frame()) - else: - self._receive_frame() + if handled_future is not None: + yield handled_future def _handle_message(self, opcode, data): """Execute on_message, returning its Future if it is a coroutine.""" @@ -972,7 +982,12 @@ class WebSocketProtocol13(WebSocketProtocol): return if self._frame_compressed: - data = self._decompressor.decompress(data) + try: + data = self._decompressor.decompress(data) + except _DecompressTooLargeError: + self.close(1009, "message too big after decompression") + self._abort() + return if opcode == 0x1: # UTF-8 data @@ -1092,7 +1107,7 @@ class WebSocketClientConnection(simple_httpclient._HTTPConnection): """ def __init__(self, request, on_message_callback=None, compression_options=None, ping_interval=None, ping_timeout=None, - max_message_size=None): + max_message_size=None, subprotocols=[]): self.compression_options = compression_options self.connect_future = Future() self.protocol = None @@ -1113,6 +1128,8 @@ class WebSocketClientConnection(simple_httpclient._HTTPConnection): 'Sec-WebSocket-Key': self.key, 'Sec-WebSocket-Version': '13', }) + if subprotocols is not None: + request.headers['Sec-WebSocket-Protocol'] = ','.join(subprotocols) if self.compression_options is not None: # Always offer to let the server set our max_wbits (and even though # we don't offer it, we will accept a client_no_context_takeover @@ -1167,7 +1184,7 @@ class WebSocketClientConnection(simple_httpclient._HTTPConnection): self.protocol = self.get_websocket_protocol() self.protocol._process_server_headers(self.key, self.headers) self.protocol.start_pinging() - self.protocol._receive_frame() + IOLoop.current().add_callback(self.protocol._receive_frame_loop) if self._timeout is not None: self.io_loop.remove_timeout(self._timeout) @@ -1247,11 +1264,19 @@ class WebSocketClientConnection(simple_httpclient._HTTPConnection): return WebSocketProtocol13(self, mask_outgoing=True, compression_options=self.compression_options) + @property + def selected_subprotocol(self): + """The subprotocol selected by the server. + + .. versionadded:: 5.1 + """ + return self.protocol.selected_subprotocol + def websocket_connect(url, callback=None, connect_timeout=None, on_message_callback=None, compression_options=None, ping_interval=None, ping_timeout=None, - max_message_size=None): + max_message_size=_default_max_message_size, subprotocols=None): """Client-side websocket support. Takes a url and returns a Future whose result is a @@ -1274,6 +1299,11 @@ def websocket_connect(url, callback=None, connect_timeout=None, ``websocket_connect``. In both styles, a message of ``None`` indicates that the connection has been closed. + ``subprotocols`` may be a list of strings specifying proposed + subprotocols. The selected protocol may be found on the + ``selected_subprotocol`` attribute of the connection object + when the connection is complete. + .. versionchanged:: 3.2 Also accepts ``HTTPRequest`` objects in place of urls. @@ -1286,6 +1316,9 @@ def websocket_connect(url, callback=None, connect_timeout=None, .. versionchanged:: 5.0 The ``io_loop`` argument (deprecated since version 4.1) has been removed. + + .. versionchanged:: 5.1 + Added the ``subprotocols`` argument. """ if isinstance(url, httpclient.HTTPRequest): assert connect_timeout is None @@ -1302,7 +1335,8 @@ def websocket_connect(url, callback=None, connect_timeout=None, compression_options=compression_options, ping_interval=ping_interval, ping_timeout=ping_timeout, - max_message_size=max_message_size) + max_message_size=max_message_size, + subprotocols=subprotocols) if callback is not None: IOLoop.current().add_future(conn.connect_future, callback) return conn.connect_future diff --git a/lib/tornado/wsgi.py b/lib/tornado/wsgi.py index 22be7a89..e1230da0 100644 --- a/lib/tornado/wsgi.py +++ b/lib/tornado/wsgi.py @@ -33,6 +33,7 @@ from __future__ import absolute_import, division, print_function import sys from io import BytesIO import tornado +import warnings from tornado.concurrent import Future from tornado import escape @@ -76,6 +77,7 @@ class WSGIApplication(web.Application): .. deprecated:: 4.0 Use a regular `.Application` and wrap it in `WSGIAdapter` instead. + This class will be removed in Tornado 6.0. """ def __call__(self, environ, start_response): return WSGIAdapter(self)(environ, start_response) @@ -83,8 +85,10 @@ class WSGIApplication(web.Application): # WSGI has no facilities for flow control, so just return an already-done # Future when the interface requires it. -_dummy_future = Future() -_dummy_future.set_result(None) +def _dummy_future(): + f = Future() + f.set_result(None) + return f class _WSGIConnection(httputil.HTTPConnection): @@ -116,7 +120,7 @@ class _WSGIConnection(httputil.HTTPConnection): self.write(chunk, callback) elif callback is not None: callback() - return _dummy_future + return _dummy_future() def write(self, chunk, callback=None): if self._expected_content_remaining is not None: @@ -128,7 +132,7 @@ class _WSGIConnection(httputil.HTTPConnection): self._write_buffer.append(chunk) if callback is not None: callback() - return _dummy_future + return _dummy_future() def finish(self): if (self._expected_content_remaining is not None and @@ -179,9 +183,25 @@ class WSGIAdapter(object): that it is not possible to use `.AsyncHTTPClient`, or the `tornado.auth` or `tornado.websocket` modules. + In multithreaded WSGI servers on Python 3, it may be necessary to + permit `asyncio` to create event loops on any thread. Run the + following at startup (typically import time for WSGI + applications):: + + import asyncio + from tornado.platform.asyncio import AnyThreadEventLoopPolicy + asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy()) + .. versionadded:: 4.0 + + .. deprecated:: 5.1 + + This class is deprecated and will be removed in Tornado 6.0. + Use Tornado's `.HTTPServer` instead of a WSGI container. """ def __init__(self, application): + warnings.warn("WSGIAdapter is deprecated, use Tornado's HTTPServer instead", + DeprecationWarning) if isinstance(application, WSGIApplication): self.application = lambda request: web.Application.__call__( application, request)