-
Notifications
You must be signed in to change notification settings - Fork 6.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Data] [1/n] Async iter_batches
: Add Threadpool util
#33575
Conversation
It looks like there is an issue on line 169 of the file "python/ray/data/_internal/block_batching/block_batching.py". The text "prefetch_buffer_size" should be removed as it is not a part of the _make_async_gen function anymore. |
It should cause 10 of the failures to be fixed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly nits/questions; my biggest questions are:
- Whether making the threads daemonic will lead to improperly released resources, which might lead to some bad bugs.
- Whether
output_queue.join()
should be called from the thread that calls.task_done()
, which I think might be nonsensical.
output_queue.put(e, block=True) | ||
|
||
threads = [ | ||
threading.Thread(target=execute_computation, args=(i,), daemon=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we want these to be daemon threads? This is slightly concerning since resources might not be released properly when daemonic threads are terminated (i.e. no __exit__
s run for context managers or finally
blocks for try-finally patterns).
I'm guessing you went this route because we don't want to add the signaling mechanism necessary for graceful worker thread termination, but it doesn't seem like it would be too difficult to capture a threading.Event
in the execute_computation
closure and to check if it's set every 100ms when blocked while putting in the queue.
shutdown = threading.Event()
def try_put(item: Any) -> bool:
while not shutdown.is_set():
try:
output_queue.put(item, block=True, timeout=0.1)
except queue.Full:
pass
else:
return True
return False
def execute_computation(thread_index: int):
try:
for item in fn(thread_safe_generator):
if try_put(item):
return
try_put(Sentinel(thread_index))
except Exception as e:
try_put(e)
while True:
next_item = output_queue.get(block=True)
if isinstance(next_item, Exception):
output_queue.task_done()
shutdown.set()
raise next_item
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, the event is only set if one of the child threads raises an exception. But what happens if the main thread is interrupted? In that case, the event will not be set and the child threads will still be alive right?
@clarkzinzow regarding daemon threads, the main thing is not leaking drivers / worker processes when things aren't properly shut down. For non-daemon threads, those hold the entire process open until you force exit. The streaming executor is run on a daemon thread, for example, due to this issue. On process shutdown you don't care about finally blocks, anyways. |
@ericl Ah, so we're making the worker threads daemonic to prevent spurious blocking within |
Signed-off-by: amogkam <amogkamsetty@yahoo.com>
…3575) Part 1 of async iter_batches support. Converts the existing _make_async_gen util to be able to use a threadpool instead of a single thread. --------- Signed-off-by: amogkam <amogkamsetty@yahoo.com> Signed-off-by: elliottower <elliot@elliottower.com>
…3575) Part 1 of async iter_batches support. Converts the existing _make_async_gen util to be able to use a threadpool instead of a single thread. --------- Signed-off-by: amogkam <amogkamsetty@yahoo.com> Signed-off-by: Jack He <jackhe2345@gmail.com>
Part 1 of async
iter_batches
support. Converts the existing_make_async_gen
util to be able to use a threadpool instead of a single thread.Why are these changes needed?
Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.