Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shuffle by partition to reduce memory usage significantly #1068

Merged
merged 18 commits into from
Jan 12, 2023

Conversation

madsbk
Copy link
Member

@madsbk madsbk commented Dec 21, 2022

In order to reduce peak memory usage, this PR implements rounds in explicit-comms shuffle.
The idea is that each worker handles a number of dataframe partitions in each round instead of doing everything at once.
The number of partitions handled in each round can be controlled by setting DASK_EXPLICIT_COMMS_BATCHSIZE or directly when calling shuffle().

By default, each worker handles one partition per round.
Set DASK_EXPLICIT_COMMS_BATCHSIZE=-1, to handle all partitions in a single round (the previous behavior).

@madsbk madsbk added improvement Improvement / enhancement to an existing function non-breaking Non-breaking change labels Dec 21, 2022
@madsbk madsbk requested a review from a team as a code owner December 21, 2022 08:41
@github-actions github-actions bot added the python python code needed label Dec 21, 2022
@codecov-commenter
Copy link

codecov-commenter commented Dec 21, 2022

Codecov Report

Base: 87.17% // Head: 63.56% // Decreases project coverage by -23.61% ⚠️

Coverage data is based on head (0b1e4a2) compared to base (bdb7b56).
Patch coverage: 95.91% of modified lines in pull request are covered.

Additional details and impacted files
@@                Coverage Diff                @@
##           branch-23.02    #1068       +/-   ##
=================================================
- Coverage         87.17%   63.56%   -23.62%     
=================================================
  Files                18       26        +8     
  Lines              2253     3134      +881     
=================================================
+ Hits               1964     1992       +28     
- Misses              289     1142      +853     
Impacted Files Coverage Δ
dask_cuda/explicit_comms/dataframe/shuffle.py 98.65% <95.91%> (-1.35%) ⬇️
dask_cuda/benchmarks/utils.py 0.00% <0.00%> (ø)
dask_cuda/benchmarks/local_cupy.py 0.00% <0.00%> (ø)
dask_cuda/benchmarks/common.py 0.00% <0.00%> (ø)
dask_cuda/_version.py 100.00% <0.00%> (ø)
dask_cuda/benchmarks/local_cudf_shuffle.py 0.00% <0.00%> (ø)
dask_cuda/benchmarks/local_cudf_merge.py 0.00% <0.00%> (ø)
dask_cuda/benchmarks/local_cudf_groupby.py 0.00% <0.00%> (ø)
dask_cuda/benchmarks/local_cupy_map_overlap.py 0.00% <0.00%> (ø)

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

☔ View full report at Codecov.
📢 Do you have feedback about the report comment? Let us know in this issue.

Copy link
Member

@rjzamora rjzamora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is exciting to see. Thanks for working on this @madsbk !

Would it be possible to share some empirical benchmark data to show how different batchsize choices affect performance (and perhaps the system size you are able to handle on a single DGX)?

dask_cuda/benchmarks/local_cudf_shuffle.py Outdated Show resolved Hide resolved
dask_cuda/explicit_comms/dataframe/shuffle.py Show resolved Hide resolved
partitions: list of DataFrames
List of dataframe-partitions
"""
# TODO: use cuDF's partition_by_hash() when `column_names[0] != "_partitions"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ref #952

dask_cuda/explicit_comms/dataframe/shuffle.py Outdated Show resolved Hide resolved
dask_cuda/explicit_comms/dataframe/shuffle.py Show resolved Hide resolved
@rapidsai rapidsai deleted a comment from pentschev Jan 3, 2023
@madsbk
Copy link
Member Author

madsbk commented Jan 3, 2023

Thanks for the review @rjzamora, I think I have addressed all of your suggestions.

Copy link
Contributor

@wence- wence- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly minor comments.

dask_cuda/explicit_comms/dataframe/shuffle.py Outdated Show resolved Hide resolved
dask_cuda/explicit_comms/dataframe/shuffle.py Outdated Show resolved Hide resolved
Comment on lines 25 to 29
class Proxify(Protocol):
"""Proxify type hint"""

def __call__(self, obj: T) -> T:
...
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from typing import Callable
...
Proxify = Callable[[T], T]

?

I see this is what you used to have, so there's presumably some reason it's no longer sufficient?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was mypy that compiled at some point but it seems to accept Proxify = Callable[[T], T] now. I must have changed something :)

Anyways, I think we should begin using mypy in CI: #1077

dask_cuda/explicit_comms/dataframe/shuffle.py Outdated Show resolved Hide resolved
dask_cuda/explicit_comms/dataframe/shuffle.py Outdated Show resolved Hide resolved
@@ -324,6 +471,15 @@ def shuffle(
rank_to_inkeys = c.stage_keys(name=name, keys=df.__dask_keys__())
c.client.cancel(df)

# Get batchsize
max_num_inkeys = max(len(k) for k in rank_to_inkeys.values())
batchsize = batchsize or dask.config.get("xcomm-batchsize", 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're going to introduce some (or more?) configuration parameters for explicit comms, can we think about the naming hierarchy a bit? E.g. we now have DASK_EXPLICIT_COMMS to turn on use of explicit comms for the shuffle, but now DASK_XCOMM_BATCHSIZE. How are we to know these two are related?

Would prefer something hierarchical like:

dask.explicit-comms.enabled = True
dask.explicit-comms.batchsize = ...

And however that translates to environment variables.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, what about ?

dask.explicit-comms = True
dask.explicit-comms-batchsize = ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wence- Perhaps we should consider how this choice would potentially interact with your proposal to make it possible to register the shuffle algorithm externally (e.g. dask/dask#9521). I don't have a clear design/pattern in my mind to suggest, but I thought it was worth mentioning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rjzamora I think there are two slightly orthogonal things here:

  1. How to enable the explicit comms shuffle
  2. How to configure aspects of the explicit comms shuffle

Right now, we basically enable through dask config variables, and this PR introduces some configuration slot.

I agree that it would be nice to be able to register the shuffle externally, but I think that is slightly different.

Here, I'm just wondering about how we minimise the proliferation of different naming hierarchies in the dask configuration dictionary (and potentially make them more discoverable).

We could (not in this PR) collect all of the dask-cuda related configuration slots into a dask-cuda.yaml that we load and insert into the dask config (as distributed and dask itself do). This would also allow us to maintain the defaults in one place for all configuration options.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, what about ?

dask.explicit-comms = True
dask.explicit-comms-batchsize = ...

I think I am +epsilon in favour of hierarchical separation so explicit-comms.batchsize, explicit-comms.enabled, but I am not that fussed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I totally agree that the two goals are (mostly) orthogonal. However, if we were to deprecate something like dask.explicit-comms = True, it seems reasonable to consider if the change would also make sense in the future when external registration is hopefully a thing. (I don't think it matters much, but wasn't sure)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, if dask/dask#9521 gets in, it make sense to have common config hierarchy for all the shuffle backends.

But I think we should keep dask.explicit-comms = True for now.

madsbk and others added 3 commits January 9, 2023 21:15
@madsbk madsbk requested review from wence- and rjzamora January 10, 2023 07:07
Copy link
Member

@rjzamora rjzamora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @madsbk ! As long as your local benchmark results make you confident that this PR provides a good balance between stability and performance, the changes seem good to me.

@madsbk
Copy link
Member Author

madsbk commented Jan 10, 2023

Thanks for the review @rjzamora. Yes, the performance is quite good: https://gist.github.com/madsbk/3bb5529610496661782284e11373a1a7

Copy link
Contributor

@wence- wence- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@madsbk
Copy link
Member Author

madsbk commented Jan 12, 2023

/merge

@rapids-bot rapids-bot bot merged commit b42151d into rapidsai:branch-23.02 Jan 12, 2023
@madsbk madsbk deleted the shuffle_by_partition branch January 12, 2023 15:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
improvement Improvement / enhancement to an existing function non-breaking Non-breaking change python python code needed
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants