-
Notifications
You must be signed in to change notification settings - Fork 2.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add parallel module using joblib for Spark #5924
Conversation
Hi @lhoestq, I added the Meanwhile I'm working on adding a with parallel_backend('spark', steps=['downloading']) as backend:
ds = load_dataset(..., parallel_backend=backend) where |
The documentation is not available anymore as the PR was closed or merged. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice ! Added 2 comments :)
@lhoestq Thanks for the comments! with parallel_backend('spark', steps=["download"]):
dataset = load_dataset(..., num_proc=2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Love it ! :)
Note that right now it would use Spark for all the map_nested calls when the parallel_backend
context is used. I think we can use Spark for the map_nested calls related to downloads
One idea is to decorate the download
method to set the current global step to "download", and then only use joblib if the current step is one of the steps provided in parallel_backend
. Though this can be done in a subsequent PR - lmk what you think
and some minor comments:
@lhoestq Can a maintainer help trigger the tests again?
Yes I think this is doable in a subsequent PR. |
Just triggered the tests :)
I think any Dataset method that has a |
Hi maintainers, I've just addressed most of the comments, please take another look, thank you. |
split_kwds = [] # We organize the splits ourselve (contiguous splits) | ||
for index in range(num_proc): | ||
div = len(iterable) // num_proc | ||
mod = len(iterable) % num_proc | ||
start = div * index + min(index, mod) | ||
end = start + div + (1 if index < mod else 0) | ||
split_kwds.append((function, iterable[start:end], types, index, disable_tqdm, desc)) | ||
|
||
if len(iterable) != sum(len(i[1]) for i in split_kwds): | ||
raise ValueError( | ||
f"Error dividing inputs iterable among processes. " | ||
f"Total number of objects {len(iterable)}, " | ||
f"length: {sum(len(i[1]) for i in split_kwds)}" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this can still be in map_nested, so that the signature of parallel_map could be
def parallel_map(function, iterable, num_proc):
and map_nested
would call
parallel_map(_single_map_nested, split_kwds, num_proc=num_proc)
This way it will be easier to start using parallel_map
in other places in the code no ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If so _map_with_joblib
would also take split_kwds
as input, which is arbitrarily split according to num_proc
rather than decided by joblib.
Is there any other places that you are thinking of using parallel_map
for? I thought it's just a replacement of the multiprocessing part of map_nested
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
n_jobs
is specified to joblib anyway no ? not a big deal anyway
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might leave it like this so that n_jobs=-1
could be used when the user wants to let joblib
decide the number of workers / processes etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah good idea !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM for this one :) awesome !
Show benchmarksPyArrow==8.0.0 Show updated benchmarks!Benchmark: benchmark_array_xd.json
Benchmark: benchmark_getitem_100B.json
Benchmark: benchmark_indices_mapping.json
Benchmark: benchmark_iterating.json
Benchmark: benchmark_map_filter.json
Show updated benchmarks!Benchmark: benchmark_array_xd.json
Benchmark: benchmark_getitem_100B.json
Benchmark: benchmark_indices_mapping.json
Benchmark: benchmark_iterating.json
Benchmark: benchmark_map_filter.json
|
Discussion in #5798