SickGear/tornado/platform/asyncio.py

153 lines
5.2 KiB
Python
Raw Normal View History

"""Bridges between the `asyncio` module and Tornado IOLoop.
This is a work in progress and interfaces are subject to change.
To test:
python3.4 -m tornado.test.runtests --ioloop=tornado.platform.asyncio.AsyncIOLoop
python3.4 -m tornado.test.runtests --ioloop=tornado.platform.asyncio.AsyncIOMainLoop
(the tests log a few warnings with AsyncIOMainLoop because they leave some
unfinished callbacks on the event loop that fail when it resumes)
"""
from __future__ import absolute_import, division, print_function, with_statement
import datetime
import functools
import os
from tornado.ioloop import IOLoop
from tornado import stack_context
try:
# Import the real asyncio module for py33+ first. Older versions of the
# trollius backport also use this name.
import asyncio
except ImportError as e:
# Asyncio itself isn't available; see if trollius is (backport to py26+).
try:
import trollius as asyncio
except ImportError:
# Re-raise the original asyncio error, not the trollius one.
raise e
class BaseAsyncIOLoop(IOLoop):
def initialize(self, asyncio_loop, close_loop=False):
self.asyncio_loop = asyncio_loop
self.close_loop = close_loop
self.asyncio_loop.call_soon(self.make_current)
# Maps fd to handler function (as in IOLoop.add_handler)
self.handlers = {}
# Set of fds listening for reads/writes
self.readers = set()
self.writers = set()
self.closing = False
def close(self, all_fds=False):
self.closing = True
for fd in list(self.handlers):
self.remove_handler(fd)
if all_fds:
try:
os.close(fd)
except OSError:
pass
if self.close_loop:
self.asyncio_loop.close()
def add_handler(self, fd, handler, events):
if fd in self.handlers:
raise ValueError("fd %d added twice" % fd)
self.handlers[fd] = stack_context.wrap(handler)
if events & IOLoop.READ:
self.asyncio_loop.add_reader(
fd, self._handle_events, fd, IOLoop.READ)
self.readers.add(fd)
if events & IOLoop.WRITE:
self.asyncio_loop.add_writer(
fd, self._handle_events, fd, IOLoop.WRITE)
self.writers.add(fd)
def update_handler(self, fd, events):
if events & IOLoop.READ:
if fd not in self.readers:
self.asyncio_loop.add_reader(
fd, self._handle_events, fd, IOLoop.READ)
self.readers.add(fd)
else:
if fd in self.readers:
self.asyncio_loop.remove_reader(fd)
self.readers.remove(fd)
if events & IOLoop.WRITE:
if fd not in self.writers:
self.asyncio_loop.add_writer(
fd, self._handle_events, fd, IOLoop.WRITE)
self.writers.add(fd)
else:
if fd in self.writers:
self.asyncio_loop.remove_writer(fd)
self.writers.remove(fd)
def remove_handler(self, fd):
if fd not in self.handlers:
return
if fd in self.readers:
self.asyncio_loop.remove_reader(fd)
self.readers.remove(fd)
if fd in self.writers:
self.asyncio_loop.remove_writer(fd)
self.writers.remove(fd)
del self.handlers[fd]
def _handle_events(self, fd, events):
self.handlers[fd](fd, events)
def start(self):
self._setup_logging()
self.asyncio_loop.run_forever()
def stop(self):
self.asyncio_loop.stop()
def _run_callback(self, callback, *args, **kwargs):
try:
callback(*args, **kwargs)
except Exception:
self.handle_callback_exception(callback)
def add_timeout(self, deadline, callback):
if isinstance(deadline, (int, float)):
delay = max(deadline - self.time(), 0)
elif isinstance(deadline, datetime.timedelta):
delay = deadline.total_seconds()
else:
raise TypeError("Unsupported deadline %r", deadline)
return self.asyncio_loop.call_later(delay, self._run_callback,
stack_context.wrap(callback))
def remove_timeout(self, timeout):
timeout.cancel()
def add_callback(self, callback, *args, **kwargs):
if self.closing:
raise RuntimeError("IOLoop is closing")
if kwargs:
self.asyncio_loop.call_soon_threadsafe(functools.partial(
self._run_callback, stack_context.wrap(callback),
*args, **kwargs))
else:
self.asyncio_loop.call_soon_threadsafe(
self._run_callback, stack_context.wrap(callback), *args)
add_callback_from_signal = add_callback
class AsyncIOMainLoop(BaseAsyncIOLoop):
def initialize(self):
super(AsyncIOMainLoop, self).initialize(asyncio.get_event_loop(),
close_loop=False)
class AsyncIOLoop(BaseAsyncIOLoop):
def initialize(self):
super(AsyncIOLoop, self).initialize(asyncio.new_event_loop(),
close_loop=True)