.. currentmodule:: asyncio
asyncio queues are designed to be similar to classes of the :mod:`queue` module. Although asyncio queues are not thread-safe, they are designed to be used specifically in async/await code.
Note that methods of asyncio queues don't have a timeout parameter; use :func:`asyncio.wait_for` function to do queue operations with a timeout.
See also the Examples section below.
A variant of :class:`Queue`; retrieves entries in priority order (lowest first).
Entries are typically tuples of the form
(priority_number, data).
A variant of :class:`Queue` that retrieves most recently added entries first (last in, first out).
.. exception:: QueueEmpty This exception is raised when the :meth:`~Queue.get_nowait` method is called on an empty queue.
.. exception:: QueueFull Exception raised when the :meth:`~Queue.put_nowait` method is called on a queue that has reached its *maxsize*.
Queues can be used to distribute workload between several concurrent tasks:
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())