Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify consumers by adding queue.Queue.iter() #13

Closed
wants to merge 40 commits into from

Conversation

nineteendo
Copy link
Owner

@nineteendo nineteendo commented Jun 23, 2024

Purpose

Currently consuming items from a queue is very complex. You need to

  1. use an infinite while loop
  2. check for a sentinel
  3. call q.task_done() after processing each item
  4. join the queue
  5. cancel all worker tasks using sentinels

By adding queue.Queue.iter(), this becomes a lot easier, you only need to call q.shutdown().

Overview of changes

  • Calling queue.Queue.iter() returns a generator which iterates over the queue of items
  • An example has been added in the documentation which is a lot simpler than working with daemon threads

Example

Without iteration

import concurrent.futures
import queue
import time

def producer(q):
    for i in range(5):
        print(f'{i} -> queue')
        q.put(i)
        time.sleep(.1)

def consumer(q):
    while True:
        i = q.get()
        if i is None:
            break

        print(f'queue -> {i}')
        time.sleep(.3)
        q.task_done()

q = queue.Queue()
with concurrent.futures.ThreadPoolExecutor() as tp1:
    tp1.submit(consumer, q)
    with concurrent.futures.ThreadPoolExecutor() as tp2:
        tp2.submit(producer, q)

    print('produced everything')
    q.join()
    q.put(None)

print('consumed everything')

With iteration

import concurrent.futures
import queue
import time

def producer(q):
    for i in range(5):
        print(f'{i} -> queue')
        q.put(i)
        time.sleep(.1)

def consumer(q):
    for i in q.iter():
        print(f'queue -> {i}')
        time.sleep(.3)

q = queue.Queue()
with concurrent.futures.ThreadPoolExecutor() as tp1:
    tp1.submit(consumer, q)
    with concurrent.futures.ThreadPoolExecutor() as tp2:
        tp2.submit(producer, q)

    q.shutdown()
    print('produced everything')

print('consumed everything')

Output

0 -> queue
queue -> 0
1 -> queue
2 -> queue
queue -> 1
3 -> queue
4 -> queue
produced everything
queue -> 2
queue -> 3
queue -> 4
consumed everything

📚 Documentation preview 📚: https://cpython-previews--13.org.readthedocs.build/

@nineteendo nineteendo closed this Jun 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant