tornado.queues – Queues for coroutines¶
New in version 4.2.
Classes¶
Queue¶
- class tornado.queues.Queue(maxsize=0)[source]¶
Coordinate producer and consumer coroutines.
If maxsize is 0 (the default) the queue size is unbounded.
from tornado import gen from tornado.ioloop import IOLoop from tornado.queues import Queue q = Queue(maxsize=2) @gen.coroutine def consumer(): while True: item = yield q.get() try: print('Doing work on %s' % item) yield gen.sleep(0.01) finally: q.task_done() @gen.coroutine def producer(): for item in range(5): yield q.put(item) print('Put %s' % item) @gen.coroutine def main(): # Start consumer without waiting (since it never finishes). IOLoop.current().spawn_callback(consumer) yield producer() # Wait for producer to put all tasks. yield q.join() # Wait for consumer to finish all tasks. print('Done') IOLoop.current().run_sync(main)
Put 0 Put 1 Doing work on 0 Put 2 Doing work on 1 Put 3 Doing work on 2 Put 4 Doing work on 3 Doing work on 4 Done
In Python 3.5, Queue implements the async iterator protocol, so consumer() could be rewritten as:
async def consumer(): async for item in q: try: print('Doing work on %s' % item) yield gen.sleep(0.01) finally: q.task_done()
Changed in version 4.3: Added async for support in Python 3.5.
- put(item, timeout=None)[source]¶
Put an item into the queue, perhaps waiting until there is room.
Returns a Future, which raises tornado.gen.TimeoutError after a timeout.
- put_nowait(item)[source]¶
Put an item into the queue without blocking.
If no free slot is immediately available, raise QueueFull.
- get(timeout=None)[source]¶
Remove and return an item from the queue.
Returns a Future which resolves once an item is available, or raises tornado.gen.TimeoutError after a timeout.
- get_nowait()[source]¶
Remove and return an item from the queue without blocking.
Return an item if one is immediately available, else raise QueueEmpty.
- task_done()[source]¶
Indicate that a formerly enqueued task is complete.
Used by queue consumers. For each get used to fetch a task, a subsequent call to task_done tells the queue that the processing on the task is complete.
If a join is blocking, it resumes when all items have been processed; that is, when every put is matched by a task_done.
Raises ValueError if called more times than put.
- join(timeout=None)[source]¶
Block until all items in the queue are processed.
Returns a Future, which raises tornado.gen.TimeoutError after a timeout.
PriorityQueue¶
- class tornado.queues.PriorityQueue(maxsize=0)[source]¶
A Queue that retrieves entries in priority order, lowest first.
Entries are typically tuples like (priority number, data).
from tornado.queues import PriorityQueue q = PriorityQueue() q.put((1, 'medium-priority item')) q.put((0, 'high-priority item')) q.put((10, 'low-priority item')) print(q.get_nowait()) print(q.get_nowait()) print(q.get_nowait())
(0, 'high-priority item') (1, 'medium-priority item') (10, 'low-priority item')
Exceptions¶
QueueEmpty¶
- exception tornado.queues.QueueEmpty[source]¶
Raised by Queue.get_nowait when the queue has no items.
QueueFull¶
- exception tornado.queues.QueueFull[source]¶
Raised by Queue.put_nowait when a queue is at its maximum size.