Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-120924: Simplify consumers by adding *.Queue.iter() and *.Queue.iter_nowait() #15

Closed
wants to merge 42 commits into from

Conversation

nineteendo
Copy link
Owner

@nineteendo nineteendo commented Jun 24, 2024

Purpose

Currently consuming items from a queue can be a bit more complex than necessary. You need to

  1. use an infinite while loop
  2. check for a sentinel or add all worker tasks to a list
  3. call q.task_done() after processing each item
  4. join the queue
  5. cancel all worker tasks
  6. wait until all worker tasks are cancelled

By adding *.Queue.iter() and *.Queue.iter_nowait(), this becomes a bit easier, you only need to call queue.shutdown().

Overview of changes

asyncio

  • Calling queue.Queue.iter() returns an asynchronous generator which iterates over the queue of items
  • Calling queue.Queue.iter_nowait() returns a generator which iterates over the queue of items without blocking.
  • A private _AsyncQueueIterator has been added to handle the asynchronous iteration
  • The example in the documentation is greatly simplified by the new addition

queue.py

  • Calling queue.Queue.iter() returns a generator which iterates over the queue of items
  • Calling queue.Queue.iter_nowait() returns a generator which iterates over the queue of items without blocking.
  • An example has been added in the documentation which is a bit simpler than working with daemon threads

Examples

asyncio

Without iteration

import asyncio

async def producer(queue):
    for i in range(5):
        print(f'{i} -> queue')
        await queue.put(i)
        await asyncio.sleep(.1)

async def consumer(queue):
    while True:
        i = await queue.get()
        print(f'queue -> {i}')
        await asyncio.sleep(.3)
        queue.task_done()

async def main():
    queue = asyncio.Queue()
    tasks = []
    for _ in range(1):
        task = asyncio.create_task(consumer(queue))
        tasks.append(task)

    async with asyncio.TaskGroup() as tg:
        tg.create_task(producer(queue))

    print('produced everything')
    await queue.join()
    for task in tasks:
        task.cancel()

    await asyncio.gather(*tasks, return_exceptions=True)
    print('consumed everything')

asyncio.run(main())

With iteration

import asyncio

async def producer(queue):
    for i in range(5):
        print(f'{i} -> queue')
        await queue.put(i)
        await asyncio.sleep(.1)

async def consumer(queue):
    async for i in queue.iter():
        print(f'queue -> {i}')
        await asyncio.sleep(.3)

async def main():
    queue = asyncio.Queue()
    async with asyncio.TaskGroup() as tg1:
        tg1.create_task(consumer(queue))
        async with asyncio.TaskGroup() as tg2:
            tg2.create_task(producer(queue))

        queue.shutdown()
        print('produced everything')

    print('consumed everything')

asyncio.run(main())

Output

0 -> queue
queue -> 0
1 -> queue
2 -> queue
queue -> 1
3 -> queue
4 -> queue
produced everything
queue -> 2
queue -> 3
queue -> 4
consumed everything

queue as final step

Without iteration

import threading
import queue

def worker(q):
    while True:
        item = q.get()
        print(f'queue -> {item}')
        q.task_done()

q = queue.Queue()
threading.Thread(target=worker, args=(q,) daemon=True).start()
for item in range(5):
    q.put(item)
    print(f'{item} -> queue')

print('produced everything')
q.join()
print('consumed everything') # Worker is still running

With iteration

import concurrent.futures
import queue

def worker(q):
    for item in q.iter():
        print(f'queue -> {item}')

q = queue.Queue()
with concurrent.futures.ThreadPoolExecutor() as tp:
    tp.submit(worker, q)
    for item in range(5):
        q.put(item)
        print(f'{item} -> queue')

    q.shutdown()
    print('produced everything')

print('consumed everything') # No worker is running

Output

0 -> queue
queue -> 0
1 -> queue
queue -> 1
queue -> 2
2 -> queue
3 -> queue
4 -> queue
produced everything
queue -> 3
queue -> 4
consumed everything

queue not as final step

Without iteration

import concurrent.futures
import queue
import time

def producer(q):
    for i in range(5):
        print(f'{i} -> queue')
        q.put(i)
        time.sleep(.1)

def consumer(q):
    while True:
        i = q.get()
        if i is None:
            break

        print(f'queue -> {i}')
        time.sleep(.3)
        q.task_done()

q = queue.Queue()
with concurrent.futures.ThreadPoolExecutor() as tp1:
    tp1.submit(consumer, q)
    with concurrent.futures.ThreadPoolExecutor() as tp2:
        tp2.submit(producer, q)

    print('produced everything')
    q.join()
    q.put(None)

print('consumed everything')

With iteration

import concurrent.futures
import queue
import time

def producer(q):
    for i in range(5):
        print(f'{i} -> queue')
        q.put(i)
        time.sleep(.1)

def consumer(q):
    for i in q.iter():
        print(f'queue -> {i}')
        time.sleep(.3)

q = queue.Queue()
with concurrent.futures.ThreadPoolExecutor() as tp1:
    tp1.submit(consumer, q)
    with concurrent.futures.ThreadPoolExecutor() as tp2:
        tp2.submit(producer, q)

    q.shutdown()
    print('produced everything')

print('consumed everything')

Output

0 -> queue
queue -> 0
1 -> queue
2 -> queue
queue -> 1
3 -> queue
4 -> queue
produced everything
queue -> 2
queue -> 3
queue -> 4
consumed everything

📚 Documentation preview 📚: https://cpython-previews--15.org.readthedocs.build/

@nineteendo nineteendo closed this Jun 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant