Queues with asyncio.Queue()
Desliza para mostrar el menú
Semaphores limit how many coroutines run at once. Queues go further – they let you separate the work of producing items from the work of consuming them, with a fixed number of workers processing items as they arrive.
The Producer–Consumer Pattern
A queue connects producers (coroutines that generate work) with consumers (coroutines that process it). This is the standard pattern for pipelines:
1234567891011121314151617181920212223242526272829import asyncio import nest_asyncio nest_asyncio.apply() # Simulating a producer–consumer pipeline async def producer(queue, items): for item in items: await queue.put(item) # Adding items to the queue print(f"Produced: {item}") await queue.put(None) # Sending a sentinel to signal completion async def consumer(queue): while True: item = await queue.get() # Waiting for an item if item is None: break print(f"Consumed: {item}") queue.task_done() async def main(): queue = asyncio.Queue() await asyncio.gather( producer(queue, ["post_1", "post_2", "post_3"]), consumer(queue), ) asyncio.run(main())
Multiple Workers
The real power of queues is running multiple consumers in parallel, each pulling from the same queue:
12345678910111213141516171819202122232425262728293031323334353637383940414243import asyncio import httpx import nest_asyncio nest_asyncio.apply() # Fetching posts using a queue and multiple workers async def worker(worker_id, queue, client, results): while True: post_id = await queue.get() if post_id is None: queue.task_done() break response = await client.get( f"https://jsonplaceholder.typicode.com/posts/{post_id}" ) title = response.json()["title"] results.append(title) print(f"Worker {worker_id} fetched post {post_id}") queue.task_done() async def main(): queue = asyncio.Queue() results = [] num_workers = 3 for post_id in range(1, 11): await queue.put(post_id) for _ in range(num_workers): await queue.put(None) # Sending a sentinel per worker async with httpx.AsyncClient() as client: workers = [ asyncio.create_task(worker(worker_id, queue, client, results)) for worker_id in range(1, num_workers + 1) ] await asyncio.gather(*workers) print(f"\nFetched {len(results)} posts total") asyncio.run(main())
Three workers pull post IDs from the queue concurrently. Each worker processes items independently until it receives a None sentinel.
Bounded Queues
Pass maxsize to limit how many items the queue can hold. If the queue is full, put() blocks until a consumer makes space – useful for backpressure:
1234567891011121314151617181920212223242526import asyncio import nest_asyncio nest_asyncio.apply() # Bounded queue that limits in-flight items to 3 async def main(): queue = asyncio.Queue(maxsize=3) async def slow_producer(): for item_id in range(1, 7): await queue.put(item_id) print(f"Put item {item_id} (queue size: {queue.qsize()})") async def slow_consumer(): while True: item = await queue.get() await asyncio.sleep(0.5) # Simulating processing time print(f"Consumed item {item}") queue.task_done() if queue.empty(): break await asyncio.gather(slow_producer(), slow_consumer()) asyncio.run(main())
Semaphore vs Queue
¡Gracias por tus comentarios!
Pregunte a AI
Pregunte a AI
Pregunte lo que quiera o pruebe una de las preguntas sugeridas para comenzar nuestra charla