# Copyright 2015 The Tornado Authors # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. from __future__ import absolute_import, division, print_function, with_statement import collections from tornado import gen, ioloop from tornado.concurrent import Future __all__ = ['Condition', 'Event', 'Semaphore', 'BoundedSemaphore', 'Lock'] class _TimeoutGarbageCollector(object): """Base class for objects that periodically clean up timed-out waiters. Avoids memory leak in a common pattern like: while True: yield condition.wait(short_timeout) print('looping....') """ def __init__(self): self._waiters = collections.deque() # Futures. self._timeouts = 0 def _garbage_collect(self): # Occasionally clear timed-out waiters. self._timeouts += 1 if self._timeouts > 100: self._timeouts = 0 self._waiters = collections.deque( w for w in self._waiters if not w.done()) class Condition(_TimeoutGarbageCollector): """A condition allows one or more coroutines to wait until notified. Like a standard `threading.Condition`, but does not need an underlying lock that is acquired and released. With a `Condition`, coroutines can wait to be notified by other coroutines: .. testcode:: from tornado import gen from tornado.ioloop import IOLoop from tornado.locks import Condition condition = Condition() @gen.coroutine def waiter(): print("I'll wait right here") yield condition.wait() # Yield a Future. print("I'm done waiting") @gen.coroutine def notifier(): print("About to notify") condition.notify() print("Done notifying") @gen.coroutine def runner(): # Yield two Futures; wait for waiter() and notifier() to finish. yield [waiter(), notifier()] IOLoop.current().run_sync(runner) .. testoutput:: I'll wait right here About to notify Done notifying I'm done waiting `wait` takes an optional ``timeout`` argument, which is either an absolute timestamp:: io_loop = IOLoop.current() # Wait up to 1 second for a notification. yield condition.wait(timeout=io_loop.time() + 1) ...or a `datetime.timedelta` for a timeout relative to the current time:: # Wait up to 1 second. yield condition.wait(timeout=datetime.timedelta(seconds=1)) The method raises `tornado.gen.TimeoutError` if there's no notification before the deadline. """ def __init__(self): super(Condition, self).__init__() self.io_loop = ioloop.IOLoop.current() def __repr__(self): result = '<%s' % (self.__class__.__name__, ) if self._waiters: result += ' waiters[%s]' % len(self._waiters) return result + '>' def wait(self, timeout=None): """Wait for `.notify`. Returns a `.Future` that resolves ``True`` if the condition is notified, or ``False`` after a timeout. """ waiter = Future() self._waiters.append(waiter) if timeout: def on_timeout(): waiter.set_result(False) self._garbage_collect() io_loop = ioloop.IOLoop.current() timeout_handle = io_loop.add_timeout(timeout, on_timeout) waiter.add_done_callback( lambda _: io_loop.remove_timeout(timeout_handle)) return waiter def notify(self, n=1): """Wake ``n`` waiters.""" waiters = [] # Waiters we plan to run right now. while n and self._waiters: waiter = self._waiters.popleft() if not waiter.done(): # Might have timed out. n -= 1 waiters.append(waiter) for waiter in waiters: waiter.set_result(True) def notify_all(self): """Wake all waiters.""" self.notify(len(self._waiters)) class Event(object): """An event blocks coroutines until its internal flag is set to True. Similar to `threading.Event`. A coroutine can wait for an event to be set. Once it is set, calls to ``yield event.wait()`` will not block unless the event has been cleared: .. testcode:: from tornado import gen from tornado.ioloop import IOLoop from tornado.locks import Event event = Event() @gen.coroutine def waiter(): print("Waiting for event") yield event.wait() print("Not waiting this time") yield event.wait() print("Done") @gen.coroutine def setter(): print("About to set the event") event.set() @gen.coroutine def runner(): yield [waiter(), setter()] IOLoop.current().run_sync(runner) .. testoutput:: Waiting for event About to set the event Not waiting this time Done """ def __init__(self): self._future = Future() def __repr__(self): return '<%s %s>' % ( self.__class__.__name__, 'set' if self.is_set() else 'clear') def is_set(self): """Return ``True`` if the internal flag is true.""" return self._future.done() def set(self): """Set the internal flag to ``True``. All waiters are awakened. Calling `.wait` once the flag is set will not block. """ if not self._future.done(): self._future.set_result(None) def clear(self): """Reset the internal flag to ``False``. Calls to `.wait` will block until `.set` is called. """ if self._future.done(): self._future = Future() def wait(self, timeout=None): """Block until the internal flag is true. Returns a Future, which raises `tornado.gen.TimeoutError` after a timeout. """ if timeout is None: return self._future else: return gen.with_timeout(timeout, self._future) class _ReleasingContextManager(object): """Releases a Lock or Semaphore at the end of a "with" statement. with (yield semaphore.acquire()): pass # Now semaphore.release() has been called. """ def __init__(self, obj): self._obj = obj def __enter__(self): pass def __exit__(self, exc_type, exc_val, exc_tb): self._obj.release() class Semaphore(_TimeoutGarbageCollector): """A lock that can be acquired a fixed number of times before blocking. A Semaphore manages a counter representing the number of `.release` calls minus the number of `.acquire` calls, plus an initial value. The `.acquire` method blocks if necessary until it can return without making the counter negative. Semaphores limit access to a shared resource. To allow access for two workers at a time: .. testsetup:: semaphore from collections import deque from tornado import gen from tornado.ioloop import IOLoop from tornado.concurrent import Future # Ensure reliable doctest output: resolve Futures one at a time. futures_q = deque([Future() for _ in range(3)]) @gen.coroutine def simulator(futures): for f in futures: yield gen.moment f.set_result(None) IOLoop.current().add_callback(simulator, list(futures_q)) def use_some_resource(): return futures_q.popleft() .. testcode:: semaphore from tornado import gen from tornado.ioloop import IOLoop from tornado.locks import Semaphore sem = Semaphore(2) @gen.coroutine def worker(worker_id): yield sem.acquire() try: print("Worker %d is working" % worker_id) yield use_some_resource() finally: print("Worker %d is done" % worker_id) sem.release() @gen.coroutine def runner(): # Join all workers. yield [worker(i) for i in range(3)] IOLoop.current().run_sync(runner) .. testoutput:: semaphore Worker 0 is working Worker 1 is working Worker 0 is done Worker 2 is working Worker 1 is done Worker 2 is done Workers 0 and 1 are allowed to run concurrently, but worker 2 waits until the semaphore has been released once, by worker 0. `.acquire` is a context manager, so ``worker`` could be written as:: @gen.coroutine def worker(worker_id): with (yield sem.acquire()): print("Worker %d is working" % worker_id) yield use_some_resource() # Now the semaphore has been released. print("Worker %d is done" % worker_id) In Python 3.5, the semaphore itself can be used as an async context manager:: async def worker(worker_id): async with sem: print("Worker %d is working" % worker_id) await use_some_resource() # Now the semaphore has been released. print("Worker %d is done" % worker_id) .. versionchanged:: 4.3 Added ``async with`` support in Python 3.5. """ def __init__(self, value=1): super(Semaphore, self).__init__() if value < 0: raise ValueError('semaphore initial value must be >= 0') self._value = value def __repr__(self): res = super(Semaphore, self).__repr__() extra = 'locked' if self._value == 0 else 'unlocked,value:{0}'.format( self._value) if self._waiters: extra = '{0},waiters:{1}'.format(extra, len(self._waiters)) return '<{0} [{1}]>'.format(res[1:-1], extra) def release(self): """Increment the counter and wake one waiter.""" self._value += 1 while self._waiters: waiter = self._waiters.popleft() if not waiter.done(): self._value -= 1 # If the waiter is a coroutine paused at # # with (yield semaphore.acquire()): # # then the context manager's __exit__ calls release() at the end # of the "with" block. waiter.set_result(_ReleasingContextManager(self)) break def acquire(self, timeout=None): """Decrement the counter. Returns a Future. Block if the counter is zero and wait for a `.release`. The Future raises `.TimeoutError` after the deadline. """ waiter = Future() if self._value > 0: self._value -= 1 waiter.set_result(_ReleasingContextManager(self)) else: self._waiters.append(waiter) if timeout: def on_timeout(): waiter.set_exception(gen.TimeoutError()) self._garbage_collect() io_loop = ioloop.IOLoop.current() timeout_handle = io_loop.add_timeout(timeout, on_timeout) waiter.add_done_callback( lambda _: io_loop.remove_timeout(timeout_handle)) return waiter def __enter__(self): raise RuntimeError( "Use Semaphore like 'with (yield semaphore.acquire())', not like" " 'with semaphore'") __exit__ = __enter__ @gen.coroutine def __aenter__(self): yield self.acquire() @gen.coroutine def __aexit__(self, typ, value, tb): self.release() class BoundedSemaphore(Semaphore): """A semaphore that prevents release() being called too many times. If `.release` would increment the semaphore's value past the initial value, it raises `ValueError`. Semaphores are mostly used to guard resources with limited capacity, so a semaphore released too many times is a sign of a bug. """ def __init__(self, value=1): super(BoundedSemaphore, self).__init__(value=value) self._initial_value = value def release(self): """Increment the counter and wake one waiter.""" if self._value >= self._initial_value: raise ValueError("Semaphore released too many times") super(BoundedSemaphore, self).release() class Lock(object): """A lock for coroutines. A Lock begins unlocked, and `acquire` locks it immediately. While it is locked, a coroutine that yields `acquire` waits until another coroutine calls `release`. Releasing an unlocked lock raises `RuntimeError`. `acquire` supports the context manager protocol in all Python versions: >>> from tornado import gen, locks >>> lock = locks.Lock() >>> >>> @gen.coroutine ... def f(): ... with (yield lock.acquire()): ... # Do something holding the lock. ... pass ... ... # Now the lock is released. In Python 3.5, `Lock` also supports the async context manager protocol. Note that in this case there is no `acquire`, because ``async with`` includes both the ``yield`` and the ``acquire`` (just as it does with `threading.Lock`): >>> async def f(): # doctest: +SKIP ... async with lock: ... # Do something holding the lock. ... pass ... ... # Now the lock is released. .. versionchanged:: 4.3 Added ``async with`` support in Python 3.5. """ def __init__(self): self._block = BoundedSemaphore(value=1) def __repr__(self): return "<%s _block=%s>" % ( self.__class__.__name__, self._block) def acquire(self, timeout=None): """Attempt to lock. Returns a Future. Returns a Future, which raises `tornado.gen.TimeoutError` after a timeout. """ return self._block.acquire(timeout) def release(self): """Unlock. The first coroutine in line waiting for `acquire` gets the lock. If not locked, raise a `RuntimeError`. """ try: self._block.release() except ValueError: raise RuntimeError('release unlocked lock') def __enter__(self): raise RuntimeError( "Use Lock like 'with (yield lock)', not like 'with lock'") __exit__ = __enter__ @gen.coroutine def __aenter__(self): yield self.acquire() @gen.coroutine def __aexit__(self, typ, value, tb): self.release()