Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parallel module using joblib for Spark #5924

Merged
merged 13 commits into from
Jun 14, 2023

Conversation

es94129
Copy link
Contributor

@es94129 es94129 commented Jun 2, 2023

Discussion in #5798

@es94129
Copy link
Contributor Author

es94129 commented Jun 2, 2023

Hi @lhoestq, I added the parallel part according to the discussion we had. Could you take a look to see if this is aligned with your proposal?

Meanwhile I'm working on adding a parallel_backend parameter to load_datasets so that it can be used like:

with parallel_backend('spark', steps=['downloading']) as backend:
  ds = load_dataset(..., parallel_backend=backend)

where parallel_backend is a ParallelBackend class.

@HuggingFaceDocBuilderDev
Copy link

HuggingFaceDocBuilderDev commented Jun 5, 2023

The documentation is not available anymore as the PR was closed or merged.

Copy link
Member

@lhoestq lhoestq left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice ! Added 2 comments :)

src/datasets/parallel/parallel.py Outdated Show resolved Hide resolved
src/datasets/parallel/parallel.py Outdated Show resolved Hide resolved
@es94129 es94129 changed the title [wip] Add parallel module using joblib for Spark Add parallel module using joblib for Spark Jun 6, 2023
@es94129 es94129 requested a review from lhoestq June 6, 2023 02:12
@es94129
Copy link
Contributor Author

es94129 commented Jun 6, 2023

@lhoestq Thanks for the comments!
With your suggestion, no changes made to load_dataset and I validated that downloading with spark is working now with this:

with parallel_backend('spark', steps=["download"]):
      dataset = load_dataset(..., num_proc=2)

Copy link
Member

@lhoestq lhoestq left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love it ! :)

Note that right now it would use Spark for all the map_nested calls when the parallel_backend context is used. I think we can use Spark for the map_nested calls related to downloads

One idea is to decorate the download method to set the current global step to "download", and then only use joblib if the current step is one of the steps provided in parallel_backend . Though this can be done in a subsequent PR - lmk what you think

and some minor comments:

src/datasets/parallel/parallel.py Outdated Show resolved Hide resolved
src/datasets/parallel/parallel.py Outdated Show resolved Hide resolved
tests/test_parallel.py Outdated Show resolved Hide resolved
tests/test_parallel.py Outdated Show resolved Hide resolved
tests/test_parallel.py Show resolved Hide resolved
tests/test_parallel.py Show resolved Hide resolved
src/datasets/parallel/parallel.py Outdated Show resolved Hide resolved
setup.py Show resolved Hide resolved
@es94129
Copy link
Contributor Author

es94129 commented Jun 7, 2023

@lhoestq Can a maintainer help trigger the tests again?

One idea is to decorate the download method to set the current global step to "download", and then only use joblib if the current step is one of the steps provided in parallel_backend.

Yes I think this is doable in a subsequent PR.
For throwing NotImplementedError I also think it can be done in a subsequent PR, because I'm not sure if Dataset.map is the only function that a user would expect to run using with parallel_backend.

@lhoestq
Copy link
Member

lhoestq commented Jun 7, 2023

Just triggered the tests :)

Yes I think this is doable in a subsequent PR.
For throwing NotImplementedError I also think it can be done in a subsequent PR, because I'm not sure if Dataset.map is the only function that a user would expect to run using with parallel_backend.

I think any Dataset method that has a num_proc argument: Dataset.map (the other methods like filter or cast or based on map), and later we can see for the to_xxx methods (to_csv, to_parquet, etc.)

@es94129 es94129 requested a review from lhoestq June 7, 2023 21:57
@es94129
Copy link
Contributor Author

es94129 commented Jun 7, 2023

Hi maintainers, I've just addressed most of the comments, please take another look, thank you.

Comment on lines +44 to +57
split_kwds = [] # We organize the splits ourselve (contiguous splits)
for index in range(num_proc):
div = len(iterable) // num_proc
mod = len(iterable) % num_proc
start = div * index + min(index, mod)
end = start + div + (1 if index < mod else 0)
split_kwds.append((function, iterable[start:end], types, index, disable_tqdm, desc))

if len(iterable) != sum(len(i[1]) for i in split_kwds):
raise ValueError(
f"Error dividing inputs iterable among processes. "
f"Total number of objects {len(iterable)}, "
f"length: {sum(len(i[1]) for i in split_kwds)}"
)
Copy link
Member

@lhoestq lhoestq Jun 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this can still be in map_nested, so that the signature of parallel_map could be

def parallel_map(function, iterable, num_proc):

and map_nested would call

parallel_map(_single_map_nested, split_kwds, num_proc=num_proc)

This way it will be easier to start using parallel_map in other places in the code no ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so _map_with_joblib would also take split_kwds as input, which is arbitrarily split according to num_proc rather than decided by joblib.
Is there any other places that you are thinking of using parallel_map for? I thought it's just a replacement of the multiprocessing part of map_nested.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

n_jobs is specified to joblib anyway no ? not a big deal anyway

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might leave it like this so that n_jobs=-1 could be used when the user wants to let joblib decide the number of workers / processes etc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah good idea !

@es94129 es94129 requested a review from lhoestq June 13, 2023 07:18
Copy link
Member

@lhoestq lhoestq left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM for this one :) awesome !

@lhoestq lhoestq merged commit accaaf2 into huggingface:main Jun 14, 2023
@github-actions
Copy link

Show benchmarks

PyArrow==8.0.0

Show updated benchmarks!

Benchmark: benchmark_array_xd.json

metric read_batch_formatted_as_numpy after write_array2d read_batch_formatted_as_numpy after write_flattened_sequence read_batch_formatted_as_numpy after write_nested_sequence read_batch_unformated after write_array2d read_batch_unformated after write_flattened_sequence read_batch_unformated after write_nested_sequence read_col_formatted_as_numpy after write_array2d read_col_formatted_as_numpy after write_flattened_sequence read_col_formatted_as_numpy after write_nested_sequence read_col_unformated after write_array2d read_col_unformated after write_flattened_sequence read_col_unformated after write_nested_sequence read_formatted_as_numpy after write_array2d read_formatted_as_numpy after write_flattened_sequence read_formatted_as_numpy after write_nested_sequence read_unformated after write_array2d read_unformated after write_flattened_sequence read_unformated after write_nested_sequence write_array2d write_flattened_sequence write_nested_sequence
new / old (diff) 0.008422 / 0.011353 (-0.002931) 0.005658 / 0.011008 (-0.005350) 0.135372 / 0.038508 (0.096864) 0.044766 / 0.023109 (0.021657) 0.417876 / 0.275898 (0.141978) 0.462785 / 0.323480 (0.139305) 0.005485 / 0.007986 (-0.002501) 0.005640 / 0.004328 (0.001311) 0.105020 / 0.004250 (0.100770) 0.049114 / 0.037052 (0.012062) 0.490450 / 0.258489 (0.231961) 0.467693 / 0.293841 (0.173852) 0.050929 / 0.128546 (-0.077617) 0.014644 / 0.075646 (-0.061002) 0.452373 / 0.419271 (0.033101) 0.074897 / 0.043533 (0.031364) 0.425816 / 0.255139 (0.170677) 0.420415 / 0.283200 (0.137215) 0.134121 / 0.141683 (-0.007561) 1.927744 / 1.452155 (0.475589) 2.014417 / 1.492716 (0.521701)

Benchmark: benchmark_getitem_100B.json

metric get_batch_of_1024_random_rows get_batch_of_1024_rows get_first_row get_last_row
new / old (diff) 0.254811 / 0.018006 (0.236805) 0.550011 / 0.000490 (0.549521) 0.004913 / 0.000200 (0.004714) 0.000117 / 0.000054 (0.000062)

Benchmark: benchmark_indices_mapping.json

metric select shard shuffle sort train_test_split
new / old (diff) 0.032644 / 0.037411 (-0.004768) 0.135672 / 0.014526 (0.121146) 0.158984 / 0.176557 (-0.017572) 0.218267 / 0.737135 (-0.518869) 0.150348 / 0.296338 (-0.145991)

Benchmark: benchmark_iterating.json

metric read 5000 read 50000 read_batch 50000 10 read_batch 50000 100 read_batch 50000 1000 read_formatted numpy 5000 read_formatted pandas 5000 read_formatted tensorflow 5000 read_formatted torch 5000 read_formatted_batch numpy 5000 10 read_formatted_batch numpy 5000 1000 shuffled read 5000 shuffled read 50000 shuffled read_batch 50000 10 shuffled read_batch 50000 100 shuffled read_batch 50000 1000 shuffled read_formatted numpy 5000 shuffled read_formatted_batch numpy 5000 10 shuffled read_formatted_batch numpy 5000 1000
new / old (diff) 0.625723 / 0.215209 (0.410514) 6.247559 / 2.077655 (4.169905) 2.626785 / 1.504120 (1.122666) 2.195224 / 1.541195 (0.654030) 2.232140 / 1.468490 (0.763650) 0.943082 / 4.584777 (-3.641695) 5.799262 / 3.745712 (2.053550) 2.849411 / 5.269862 (-2.420450) 1.744160 / 4.565676 (-2.821516) 0.119056 / 0.424275 (-0.305219) 0.014233 / 0.007607 (0.006626) 0.795238 / 0.226044 (0.569194) 7.569586 / 2.268929 (5.300657) 3.179481 / 55.444624 (-52.265143) 2.519772 / 6.876477 (-4.356704) 2.714570 / 2.142072 (0.572498) 1.107197 / 4.805227 (-3.698030) 0.229986 / 6.500664 (-6.270678) 0.087993 / 0.075469 (0.012524)

Benchmark: benchmark_map_filter.json

metric filter map fast-tokenizer batched map identity map identity batched map no-op batched map no-op batched numpy map no-op batched pandas map no-op batched pytorch map no-op batched tensorflow
new / old (diff) 1.535610 / 1.841788 (-0.306178) 18.639369 / 8.074308 (10.565061) 21.081844 / 10.191392 (10.890452) 0.253247 / 0.680424 (-0.427177) 0.026711 / 0.534201 (-0.507490) 0.503790 / 0.579283 (-0.075493) 0.600124 / 0.434364 (0.165760) 0.617944 / 0.540337 (0.077607) 0.766947 / 1.386936 (-0.619989)
PyArrow==latest
Show updated benchmarks!

Benchmark: benchmark_array_xd.json

metric read_batch_formatted_as_numpy after write_array2d read_batch_formatted_as_numpy after write_flattened_sequence read_batch_formatted_as_numpy after write_nested_sequence read_batch_unformated after write_array2d read_batch_unformated after write_flattened_sequence read_batch_unformated after write_nested_sequence read_col_formatted_as_numpy after write_array2d read_col_formatted_as_numpy after write_flattened_sequence read_col_formatted_as_numpy after write_nested_sequence read_col_unformated after write_array2d read_col_unformated after write_flattened_sequence read_col_unformated after write_nested_sequence read_formatted_as_numpy after write_array2d read_formatted_as_numpy after write_flattened_sequence read_formatted_as_numpy after write_nested_sequence read_unformated after write_array2d read_unformated after write_flattened_sequence read_unformated after write_nested_sequence write_array2d write_flattened_sequence write_nested_sequence
new / old (diff) 0.007885 / 0.011353 (-0.003468) 0.004761 / 0.011008 (-0.006248) 0.097995 / 0.038508 (0.059487) 0.033624 / 0.023109 (0.010515) 0.504307 / 0.275898 (0.228409) 0.534803 / 0.323480 (0.211323) 0.006048 / 0.007986 (-0.001937) 0.005042 / 0.004328 (0.000714) 0.102288 / 0.004250 (0.098038) 0.048695 / 0.037052 (0.011643) 0.559086 / 0.258489 (0.300597) 0.553233 / 0.293841 (0.259392) 0.044596 / 0.128546 (-0.083950) 0.013696 / 0.075646 (-0.061950) 0.109875 / 0.419271 (-0.309397) 0.059993 / 0.043533 (0.016460) 0.485579 / 0.255139 (0.230440) 0.519835 / 0.283200 (0.236635) 0.123504 / 0.141683 (-0.018179) 1.820506 / 1.452155 (0.368351) 1.963448 / 1.492716 (0.470732)

Benchmark: benchmark_getitem_100B.json

metric get_batch_of_1024_random_rows get_batch_of_1024_rows get_first_row get_last_row
new / old (diff) 0.292663 / 0.018006 (0.274656) 0.557783 / 0.000490 (0.557293) 0.001330 / 0.000200 (0.001130) 0.000112 / 0.000054 (0.000057)

Benchmark: benchmark_indices_mapping.json

metric select shard shuffle sort train_test_split
new / old (diff) 0.036890 / 0.037411 (-0.000522) 0.140373 / 0.014526 (0.125847) 0.140176 / 0.176557 (-0.036381) 0.237378 / 0.737135 (-0.499757) 0.160186 / 0.296338 (-0.136152)

Benchmark: benchmark_iterating.json

metric read 5000 read 50000 read_batch 50000 10 read_batch 50000 100 read_batch 50000 1000 read_formatted numpy 5000 read_formatted pandas 5000 read_formatted tensorflow 5000 read_formatted torch 5000 read_formatted_batch numpy 5000 10 read_formatted_batch numpy 5000 1000 shuffled read 5000 shuffled read 50000 shuffled read_batch 50000 10 shuffled read_batch 50000 100 shuffled read_batch 50000 1000 shuffled read_formatted numpy 5000 shuffled read_formatted_batch numpy 5000 10 shuffled read_formatted_batch numpy 5000 1000
new / old (diff) 0.673599 / 0.215209 (0.458390) 6.510280 / 2.077655 (4.432625) 2.981617 / 1.504120 (1.477497) 2.684664 / 1.541195 (1.143469) 2.760471 / 1.468490 (1.291981) 0.975413 / 4.584777 (-3.609364) 5.708933 / 3.745712 (1.963220) 2.772069 / 5.269862 (-2.497793) 1.763627 / 4.565676 (-2.802049) 0.111632 / 0.424275 (-0.312643) 0.013223 / 0.007607 (0.005616) 0.791545 / 0.226044 (0.565500) 8.063287 / 2.268929 (5.794359) 3.671920 / 55.444624 (-51.772704) 3.057248 / 6.876477 (-3.819229) 3.083569 / 2.142072 (0.941497) 1.118136 / 4.805227 (-3.687092) 0.214655 / 6.500664 (-6.286009) 0.083074 / 0.075469 (0.007605)

Benchmark: benchmark_map_filter.json

metric filter map fast-tokenizer batched map identity map identity batched map no-op batched map no-op batched numpy map no-op batched pandas map no-op batched pytorch map no-op batched tensorflow
new / old (diff) 1.761731 / 1.841788 (-0.080056) 18.874200 / 8.074308 (10.799892) 22.383693 / 10.191392 (12.192301) 0.240292 / 0.680424 (-0.440132) 0.028850 / 0.534201 (-0.505351) 0.557334 / 0.579283 (-0.021949) 0.627732 / 0.434364 (0.193369) 0.634484 / 0.540337 (0.094146) 0.767372 / 1.386936 (-0.619564)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants