-
Notifications
You must be signed in to change notification settings - Fork 6.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Data] Refactor block batching to follow iterator pattern #31425
Conversation
Signed-off-by: amogkam <amogkamsetty@yahoo.com>
Signed-off-by: amogkam <amogkamsetty@yahoo.com>
Signed-off-by: amogkam <amogkamsetty@yahoo.com>
Signed-off-by: amogkam <amogkamsetty@yahoo.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good overall, main note is about using collections.deque
rather than queue.Queue
for the sliding prefetch window.
sliding_window = queue.Queue(maxsize=window_size) | ||
|
||
# Create the initial set of blocks to prefetch. | ||
while not sliding_window.full(): | ||
try: | ||
sliding_window.put(next(block_ref_iter)) | ||
except StopIteration: | ||
break | ||
with stats.iter_wait_s.timer() if stats else nullcontext(): | ||
prefetcher.prefetch_blocks(list(sliding_window.queue)) | ||
|
||
while not sliding_window.empty(): | ||
block_ref = sliding_window.get() | ||
try: | ||
sliding_window.put(next(block_ref_iter)) | ||
with stats.iter_wait_s.timer() if stats else nullcontext(): | ||
prefetcher.prefetch_blocks(list(sliding_window.queue)) | ||
except StopIteration: | ||
pass | ||
yield block_ref | ||
if clear_block_after_read: | ||
ray._private.internal_api.free(block_ref, local_only=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we'd want to stick to collections.deque
for a single-threaded sliding window implementation (more efficient, less complicated semantics):
sliding_window = queue.Queue(maxsize=window_size) | |
# Create the initial set of blocks to prefetch. | |
while not sliding_window.full(): | |
try: | |
sliding_window.put(next(block_ref_iter)) | |
except StopIteration: | |
break | |
with stats.iter_wait_s.timer() if stats else nullcontext(): | |
prefetcher.prefetch_blocks(list(sliding_window.queue)) | |
while not sliding_window.empty(): | |
block_ref = sliding_window.get() | |
try: | |
sliding_window.put(next(block_ref_iter)) | |
with stats.iter_wait_s.timer() if stats else nullcontext(): | |
prefetcher.prefetch_blocks(list(sliding_window.queue)) | |
except StopIteration: | |
pass | |
yield block_ref | |
if clear_block_after_read: | |
ray._private.internal_api.free(block_ref, local_only=False) | |
sliding_window = collections.deque( | |
itertools.islice(block_ref_iter, window_size), maxsize=window_size | |
) | |
while sliding_window: | |
block_ref = sliding_window.popleft() | |
try: | |
sliding_window.append(next(block_ref_iter)) | |
with stats.iter_wait_s.timer() if stats else nullcontext(): | |
prefetcher.prefetch_blocks(list(sliding_window)) | |
except StopIteration: | |
pass | |
yield block_ref | |
if clear_block_after_read: | |
ray._private.internal_api.free(block_ref, local_only=False) |
Even after we have a background thread worker, I don't think that we'd want to have a multithreading queue inside of _prefetch_blocks
. We can keep each of these "batch preprocessing" generators threading-agnostic by pushing the producer generator into the background thread and wiring up a multithreading queue between the producer generator and the consumer generator, which should be a lot cleaner and easier to evolve (and easier to test).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, updated
sliding_window = queue.Queue(maxsize=window_size) | ||
|
||
# Create the initial set of blocks to prefetch. | ||
while not sliding_window.full(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be noted that this LBYL pattern is not thread-safe in multithreaded code, since the Queue
class makes no guarantees that a subsequent put()
will not block even if full()
returns False
. https://docs.python.org/3/library/queue.html#queue.Queue.full
The more idiomatic/correct pattern is EAFP, where you try to sliding_window.put_nowait()
and catch a queue.Full
exception.
I know that this isn't an issue for single-threaded use of Queue
, but just pointing it out for the follow-up PR.
# Create the initial set of blocks to prefetch. | ||
while not sliding_window.full(): | ||
try: | ||
sliding_window.put(next(block_ref_iter)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be sliding_window.put_nowait()
since we'd rather throw an error if the Queue
somehow ends up being full (e.g. due to a bug) rather than hanging forever. Same with sliding_window.put()
and sliding_window.get()
below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good to know...will keep this in mind for next PR since we changed to collections.deque in this one.
for block_ref in block_ref_iter: | ||
yield block_ref | ||
if clear_block_after_read: | ||
ray._private.internal_api.free(block_ref, local_only=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An interesting thing to note is that this block ref clearing assumes that block_ref
is no longer in use after control is returned to this generator, so this assumes no buffering by downstream generators, which may or may not hold true for future tweaks. We should keep this in mind.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice refactoring, thanks @amogkam!
# Signal to the batcher that there are no more blocks to add. | ||
batcher.done_adding() | ||
|
||
# Get any leftover batches in ShufflingBatcher. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: ShufflingBatcher
-> batcher
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is specifically ShufflingBatcher.
Regular Batcher will no longer have any full batches at this point. But ShufflingBatcher may still have full batches if the shuffle buffer size is larger than the batch size.
Signed-off-by: amogkam <amogkamsetty@yahoo.com>
Signed-off-by: amogkam <amogkamsetty@yahoo.com>
Thanks for the review guys! I updated the PR, please take another look! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM overall, only thing is the sliding_window.queue
line!
just FYI, seems have some CI test failure (example):
|
As discussed offline with @clarkzinzow (#30190 (comment)), this PR refactors block batching to follow a chained iterators pattern. This allows for more flexibility, composability, and better testing of components upstream of Iterator[Block] (formatting, shuffling, batching, prefetching). This PR only does a refactor and adds tests. There are no API or functionality changes in this PR. This PR also consolidates the map_batches and iter_batches codepaths. Signed-off-by: amogkam <amogkamsetty@yahoo.com>
As discussed offline with @clarkzinzow (#30190 (comment)), this PR refactors block batching to follow a chained iterators pattern.
This allows for more flexibility, composability, and better testing of components upstream of Iterator[Block] (formatting, shuffling, batching, prefetching).
This PR only does a refactor and adds tests. There are no API or functionality changes in this PR. This PR also consolidates the
map_batches
anditer_batches
codepaths.Why are these changes needed?
Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.