Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] [1/n] Async iter_batches: Add Threadpool util #33575

Merged
merged 9 commits into from
Mar 23, 2023

Conversation

amogkam
Copy link
Contributor

@amogkam amogkam commented Mar 22, 2023

Part 1 of async iter_batches support. Converts the existing _make_async_gen util to be able to use a threadpool instead of a single thread.

Why are these changes needed?

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: amogkam <amogkamsetty@yahoo.com>
@ericl ericl added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Mar 22, 2023
Signed-off-by: amogkam <amogkamsetty@yahoo.com>
@ollie-iterators
Copy link

It looks like there is an issue on line 169 of the file "python/ray/data/_internal/block_batching/block_batching.py". The text "prefetch_buffer_size" should be removed as it is not a part of the _make_async_gen function anymore.

@ollie-iterators
Copy link

It should cause 10 of the failures to be fixed.

amogkam added 4 commits March 22, 2023 11:39
Signed-off-by: amogkam <amogkamsetty@yahoo.com>
Signed-off-by: amogkam <amogkamsetty@yahoo.com>
Signed-off-by: amogkam <amogkamsetty@yahoo.com>
Signed-off-by: amogkam <amogkamsetty@yahoo.com>
Copy link
Contributor

@clarkzinzow clarkzinzow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly nits/questions; my biggest questions are:

  1. Whether making the threads daemonic will lead to improperly released resources, which might lead to some bad bugs.
  2. Whether output_queue.join() should be called from the thread that calls .task_done(), which I think might be nonsensical.

output_queue.put(e, block=True)

threads = [
threading.Thread(target=execute_computation, args=(i,), daemon=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we want these to be daemon threads? This is slightly concerning since resources might not be released properly when daemonic threads are terminated (i.e. no __exit__s run for context managers or finally blocks for try-finally patterns).

I'm guessing you went this route because we don't want to add the signaling mechanism necessary for graceful worker thread termination, but it doesn't seem like it would be too difficult to capture a threading.Event in the execute_computation closure and to check if it's set every 100ms when blocked while putting in the queue.

    shutdown = threading.Event()

    def try_put(item: Any) -> bool:
        while not shutdown.is_set():
            try:
                output_queue.put(item, block=True, timeout=0.1)
            except queue.Full:
                pass
        else:
            return True
        return False

    def execute_computation(thread_index: int):
        try:
            for item in fn(thread_safe_generator):
                if try_put(item):
                    return
            try_put(Sentinel(thread_index))
        except Exception as e:
            try_put(e)

    while True:
        next_item = output_queue.get(block=True)
        if isinstance(next_item, Exception):
            output_queue.task_done()
            shutdown.set()
            raise next_item

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, the event is only set if one of the child threads raises an exception. But what happens if the main thread is interrupted? In that case, the event will not be set and the child threads will still be alive right?

@ericl
Copy link
Contributor

ericl commented Mar 22, 2023

@clarkzinzow regarding daemon threads, the main thing is not leaking drivers / worker processes when things aren't properly shut down. For non-daemon threads, those hold the entire process open until you force exit.

The streaming executor is run on a daemon thread, for example, due to this issue. On process shutdown you don't care about finally blocks, anyways.

@clarkzinzow
Copy link
Contributor

@ericl Ah, so we're making the worker threads daemonic to prevent spurious blocking within fn (which would circumvent our shutdown event checking) from keeping the process from exiting, and we're only terminating these threads asynchronously when the process is exiting so we don't care about properly releasing resources. Got it, sounds good!

amogkam added 2 commits March 22, 2023 16:20
Signed-off-by: amogkam <amogkamsetty@yahoo.com>
Signed-off-by: amogkam <amogkamsetty@yahoo.com>
@amogkam amogkam merged commit 888d270 into ray-project:master Mar 23, 2023
@amogkam amogkam deleted the async-iter-batches-1 branch March 23, 2023 01:06
@amogkam amogkam added tests-ok The tagger certifies test failures are unrelated and assumes personal liability. and removed @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. labels Mar 23, 2023
elliottower pushed a commit to elliottower/ray that referenced this pull request Apr 22, 2023
…3575)

Part 1 of async iter_batches support. Converts the existing _make_async_gen util to be able to use a threadpool instead of a single thread.

---------

Signed-off-by: amogkam <amogkamsetty@yahoo.com>
Signed-off-by: elliottower <elliot@elliottower.com>
ProjectsByJackHe pushed a commit to ProjectsByJackHe/ray that referenced this pull request May 4, 2023
…3575)

Part 1 of async iter_batches support. Converts the existing _make_async_gen util to be able to use a threadpool instead of a single thread.

---------

Signed-off-by: amogkam <amogkamsetty@yahoo.com>
Signed-off-by: Jack He <jackhe2345@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
tests-ok The tagger certifies test failures are unrelated and assumes personal liability.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants