Notice: This page requires JavaScript to function properly.
Please enable JavaScript in your browser settings or update your browser.
Impara Queues with asyncio.Queue() | Asyncio in Practice
Python Asyncio in Depth

Queues with asyncio.Queue()

Scorri per mostrare il menu

Semaphores limit how many coroutines run at once. Queues go further – they let you separate the work of producing items from the work of consuming them, with a fixed number of workers processing items as they arrive.

The Producer–Consumer Pattern

A queue connects producers (coroutines that generate work) with consumers (coroutines that process it). This is the standard pattern for pipelines:

1234567891011121314151617181920212223242526272829
import asyncio import nest_asyncio nest_asyncio.apply() # Simulating a producer–consumer pipeline async def producer(queue, items): for item in items: await queue.put(item) # Adding items to the queue print(f"Produced: {item}") await queue.put(None) # Sending a sentinel to signal completion async def consumer(queue): while True: item = await queue.get() # Waiting for an item if item is None: break print(f"Consumed: {item}") queue.task_done() async def main(): queue = asyncio.Queue() await asyncio.gather( producer(queue, ["post_1", "post_2", "post_3"]), consumer(queue), ) asyncio.run(main())

Multiple Workers

The real power of queues is running multiple consumers in parallel, each pulling from the same queue:

12345678910111213141516171819202122232425262728293031323334353637383940414243
import asyncio import httpx import nest_asyncio nest_asyncio.apply() # Fetching posts using a queue and multiple workers async def worker(worker_id, queue, client, results): while True: post_id = await queue.get() if post_id is None: queue.task_done() break response = await client.get( f"https://jsonplaceholder.typicode.com/posts/{post_id}" ) title = response.json()["title"] results.append(title) print(f"Worker {worker_id} fetched post {post_id}") queue.task_done() async def main(): queue = asyncio.Queue() results = [] num_workers = 3 for post_id in range(1, 11): await queue.put(post_id) for _ in range(num_workers): await queue.put(None) # Sending a sentinel per worker async with httpx.AsyncClient() as client: workers = [ asyncio.create_task(worker(worker_id, queue, client, results)) for worker_id in range(1, num_workers + 1) ] await asyncio.gather(*workers) print(f"\nFetched {len(results)} posts total") asyncio.run(main())

Three workers pull post IDs from the queue concurrently. Each worker processes items independently until it receives a None sentinel.

Bounded Queues

Pass maxsize to limit how many items the queue can hold. If the queue is full, put() blocks until a consumer makes space – useful for backpressure:

1234567891011121314151617181920212223242526
import asyncio import nest_asyncio nest_asyncio.apply() # Bounded queue that limits in-flight items to 3 async def main(): queue = asyncio.Queue(maxsize=3) async def slow_producer(): for item_id in range(1, 7): await queue.put(item_id) print(f"Put item {item_id} (queue size: {queue.qsize()})") async def slow_consumer(): while True: item = await queue.get() await asyncio.sleep(0.5) # Simulating processing time print(f"Consumed item {item}") queue.task_done() if queue.empty(): break await asyncio.gather(slow_producer(), slow_consumer()) asyncio.run(main())

Semaphore vs Queue

question mark

What is the purpose of sending a None sentinel value into an asyncio.Queue?

Seleziona la risposta corretta

Tutto è chiaro?

Come possiamo migliorarlo?

Grazie per i tuoi commenti!

Sezione 3. Capitolo 4

Chieda ad AI

expand

Chieda ad AI

ChatGPT

Chieda pure quello che desidera o provi una delle domande suggerite per iniziare la nostra conversazione

Sezione 3. Capitolo 4
some-alt