"""Bridges between the `asyncio` module and Tornado IOLoop. .. versionadded:: 3.2 This module integrates Tornado with the ``asyncio`` module introduced in Python 3.4. This makes it possible to combine the two libraries on the same event loop. .. deprecated:: 5.0 While the code in this module is still used, it is now enabled automatically when `asyncio` is available, so applications should no longer need to refer to this module directly. .. note:: Tornado requires the `~asyncio.AbstractEventLoop.add_reader` family of methods, so it is not compatible with the `~asyncio.ProactorEventLoop` on Windows. Use the `~asyncio.SelectorEventLoop` instead. """ from __future__ import absolute_import, division, print_function import functools from tornado.gen import convert_yielded from tornado.ioloop import IOLoop from tornado import stack_context import asyncio class BaseAsyncIOLoop(IOLoop): def initialize(self, asyncio_loop, **kwargs): self.asyncio_loop = asyncio_loop # Maps fd to (fileobj, handler function) pair (as in IOLoop.add_handler) self.handlers = {} # Set of fds listening for reads/writes self.readers = set() self.writers = set() self.closing = False # If an asyncio loop was closed through an asyncio interface # instead of IOLoop.close(), we'd never hear about it and may # have left a dangling reference in our map. In case an # application (or, more likely, a test suite) creates and # destroys a lot of event loops in this way, check here to # ensure that we don't have a lot of dead loops building up in # the map. # # TODO(bdarnell): consider making self.asyncio_loop a weakref # for AsyncIOMainLoop and make _ioloop_for_asyncio a # WeakKeyDictionary. for loop in list(IOLoop._ioloop_for_asyncio): if loop.is_closed(): del IOLoop._ioloop_for_asyncio[loop] IOLoop._ioloop_for_asyncio[asyncio_loop] = self super(BaseAsyncIOLoop, self).initialize(**kwargs) def close(self, all_fds=False): self.closing = True for fd in list(self.handlers): fileobj, handler_func = self.handlers[fd] self.remove_handler(fd) if all_fds: self.close_fd(fileobj) self.asyncio_loop.close() del IOLoop._ioloop_for_asyncio[self.asyncio_loop] def add_handler(self, fd, handler, events): fd, fileobj = self.split_fd(fd) if fd in self.handlers: raise ValueError("fd %s added twice" % fd) self.handlers[fd] = (fileobj, stack_context.wrap(handler)) if events & IOLoop.READ: self.asyncio_loop.add_reader( fd, self._handle_events, fd, IOLoop.READ) self.readers.add(fd) if events & IOLoop.WRITE: self.asyncio_loop.add_writer( fd, self._handle_events, fd, IOLoop.WRITE) self.writers.add(fd) def update_handler(self, fd, events): fd, fileobj = self.split_fd(fd) if events & IOLoop.READ: if fd not in self.readers: self.asyncio_loop.add_reader( fd, self._handle_events, fd, IOLoop.READ) self.readers.add(fd) else: if fd in self.readers: self.asyncio_loop.remove_reader(fd) self.readers.remove(fd) if events & IOLoop.WRITE: if fd not in self.writers: self.asyncio_loop.add_writer( fd, self._handle_events, fd, IOLoop.WRITE) self.writers.add(fd) else: if fd in self.writers: self.asyncio_loop.remove_writer(fd) self.writers.remove(fd) def remove_handler(self, fd): fd, fileobj = self.split_fd(fd) if fd not in self.handlers: return if fd in self.readers: self.asyncio_loop.remove_reader(fd) self.readers.remove(fd) if fd in self.writers: self.asyncio_loop.remove_writer(fd) self.writers.remove(fd) del self.handlers[fd] def _handle_events(self, fd, events): fileobj, handler_func = self.handlers[fd] handler_func(fileobj, events) def start(self): try: old_loop = asyncio.get_event_loop() except (RuntimeError, AssertionError): old_loop = None try: self._setup_logging() asyncio.set_event_loop(self.asyncio_loop) self.asyncio_loop.run_forever() finally: asyncio.set_event_loop(old_loop) def stop(self): self.asyncio_loop.stop() def call_at(self, when, callback, *args, **kwargs): # asyncio.call_at supports *args but not **kwargs, so bind them here. # We do not synchronize self.time and asyncio_loop.time, so # convert from absolute to relative. return self.asyncio_loop.call_later( max(0, when - self.time()), self._run_callback, functools.partial(stack_context.wrap(callback), *args, **kwargs)) def remove_timeout(self, timeout): timeout.cancel() def add_callback(self, callback, *args, **kwargs): try: self.asyncio_loop.call_soon_threadsafe( self._run_callback, functools.partial(stack_context.wrap(callback), *args, **kwargs)) except RuntimeError: # "Event loop is closed". Swallow the exception for # consistency with PollIOLoop (and logical consistency # with the fact that we can't guarantee that an # add_callback that completes without error will # eventually execute). pass add_callback_from_signal = add_callback def run_in_executor(self, executor, func, *args): return self.asyncio_loop.run_in_executor(executor, func, *args) def set_default_executor(self, executor): return self.asyncio_loop.set_default_executor(executor) class AsyncIOMainLoop(BaseAsyncIOLoop): """``AsyncIOMainLoop`` creates an `.IOLoop` that corresponds to the current ``asyncio`` event loop (i.e. the one returned by ``asyncio.get_event_loop()``). .. deprecated:: 5.0 Now used automatically when appropriate; it is no longer necessary to refer to this class directly. .. versionchanged:: 5.0 Closing an `AsyncIOMainLoop` now closes the underlying asyncio loop. """ def initialize(self, **kwargs): super(AsyncIOMainLoop, self).initialize(asyncio.get_event_loop(), **kwargs) def make_current(self): # AsyncIOMainLoop already refers to the current asyncio loop so # nothing to do here. pass class AsyncIOLoop(BaseAsyncIOLoop): """``AsyncIOLoop`` is an `.IOLoop` that runs on an ``asyncio`` event loop. This class follows the usual Tornado semantics for creating new ``IOLoops``; these loops are not necessarily related to the ``asyncio`` default event loop. Each ``AsyncIOLoop`` creates a new ``asyncio.EventLoop``; this object can be accessed with the ``asyncio_loop`` attribute. .. versionchanged:: 5.0 When an ``AsyncIOLoop`` becomes the current `.IOLoop`, it also sets the current `asyncio` event loop. .. deprecated:: 5.0 Now used automatically when appropriate; it is no longer necessary to refer to this class directly. """ def initialize(self, **kwargs): self.is_current = False loop = asyncio.new_event_loop() try: super(AsyncIOLoop, self).initialize(loop, **kwargs) except Exception: # If initialize() does not succeed (taking ownership of the loop), # we have to close it. loop.close() raise def close(self, all_fds=False): if self.is_current: self.clear_current() super(AsyncIOLoop, self).close(all_fds=all_fds) def make_current(self): if not self.is_current: try: self.old_asyncio = asyncio.get_event_loop() except (RuntimeError, AssertionError): self.old_asyncio = None self.is_current = True asyncio.set_event_loop(self.asyncio_loop) def _clear_current_hook(self): if self.is_current: asyncio.set_event_loop(self.old_asyncio) self.is_current = False def to_tornado_future(asyncio_future): """Convert an `asyncio.Future` to a `tornado.concurrent.Future`. .. versionadded:: 4.1 .. deprecated:: 5.0 Tornado ``Futures`` have been merged with `asyncio.Future`, so this method is now a no-op. """ return asyncio_future def to_asyncio_future(tornado_future): """Convert a Tornado yieldable object to an `asyncio.Future`. .. versionadded:: 4.1 .. versionchanged:: 4.3 Now accepts any yieldable object, not just `tornado.concurrent.Future`. .. deprecated:: 5.0 Tornado ``Futures`` have been merged with `asyncio.Future`, so this method is now equivalent to `tornado.gen.convert_yielded`. """ return convert_yielded(tornado_future) class AnyThreadEventLoopPolicy(asyncio.DefaultEventLoopPolicy): """Event loop policy that allows loop creation on any thread. The default `asyncio` event loop policy only automatically creates event loops in the main threads. Other threads must create event loops explicitly or `asyncio.get_event_loop` (and therefore `.IOLoop.current`) will fail. Installing this policy allows event loops to be created automatically on any thread, matching the behavior of Tornado versions prior to 5.0 (or 5.0 on Python 2). Usage:: asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy()) .. versionadded:: 5.0 """ def get_event_loop(self): try: return super().get_event_loop() except (RuntimeError, AssertionError): # This was an AssertionError in python 3.4.2 (which ships with debian jessie) # and changed to a RuntimeError in 3.4.3. # "There is no current event loop in thread %r" loop = self.new_event_loop() self.set_event_loop(loop) return loop