-
Notifications
You must be signed in to change notification settings - Fork 96
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Shuffle by partition to reduce memory usage significantly #1068
Shuffle by partition to reduce memory usage significantly #1068
Conversation
Codecov ReportBase: 87.17% // Head: 63.56% // Decreases project coverage by
Additional details and impacted files@@ Coverage Diff @@
## branch-23.02 #1068 +/- ##
=================================================
- Coverage 87.17% 63.56% -23.62%
=================================================
Files 18 26 +8
Lines 2253 3134 +881
=================================================
+ Hits 1964 1992 +28
- Misses 289 1142 +853
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. ☔ View full report at Codecov. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is exciting to see. Thanks for working on this @madsbk !
Would it be possible to share some empirical benchmark data to show how different batchsize
choices affect performance (and perhaps the system size you are able to handle on a single DGX)?
partitions: list of DataFrames | ||
List of dataframe-partitions | ||
""" | ||
# TODO: use cuDF's partition_by_hash() when `column_names[0] != "_partitions"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ref #952
…ffle_by_partition
…ffle_by_partition
…to shuffle_by_partition
…ffle_by_partition
Thanks for the review @rjzamora, I think I have addressed all of your suggestions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly minor comments.
class Proxify(Protocol): | ||
"""Proxify type hint""" | ||
|
||
def __call__(self, obj: T) -> T: | ||
... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from typing import Callable
...
Proxify = Callable[[T], T]
?
I see this is what you used to have, so there's presumably some reason it's no longer sufficient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was mypy that compiled at some point but it seems to accept Proxify = Callable[[T], T]
now. I must have changed something :)
Anyways, I think we should begin using mypy in CI: #1077
@@ -324,6 +471,15 @@ def shuffle( | |||
rank_to_inkeys = c.stage_keys(name=name, keys=df.__dask_keys__()) | |||
c.client.cancel(df) | |||
|
|||
# Get batchsize | |||
max_num_inkeys = max(len(k) for k in rank_to_inkeys.values()) | |||
batchsize = batchsize or dask.config.get("xcomm-batchsize", 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're going to introduce some (or more?) configuration parameters for explicit comms, can we think about the naming hierarchy a bit? E.g. we now have DASK_EXPLICIT_COMMS
to turn on use of explicit comms for the shuffle, but now DASK_XCOMM_BATCHSIZE
. How are we to know these two are related?
Would prefer something hierarchical like:
dask.explicit-comms.enabled = True
dask.explicit-comms.batchsize = ...
And however that translates to environment variables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, what about ?
dask.explicit-comms = True
dask.explicit-comms-batchsize = ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wence- Perhaps we should consider how this choice would potentially interact with your proposal to make it possible to register the shuffle algorithm externally (e.g. dask/dask#9521). I don't have a clear design/pattern in my mind to suggest, but I thought it was worth mentioning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rjzamora I think there are two slightly orthogonal things here:
- How to enable the explicit comms shuffle
- How to configure aspects of the explicit comms shuffle
Right now, we basically enable through dask config variables, and this PR introduces some configuration slot.
I agree that it would be nice to be able to register the shuffle externally, but I think that is slightly different.
Here, I'm just wondering about how we minimise the proliferation of different naming hierarchies in the dask configuration dictionary (and potentially make them more discoverable).
We could (not in this PR) collect all of the dask-cuda related configuration slots into a dask-cuda.yaml
that we load and insert into the dask config (as distributed
and dask
itself do). This would also allow us to maintain the defaults in one place for all configuration options.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, what about ?
dask.explicit-comms = True dask.explicit-comms-batchsize = ...
I think I am +epsilon in favour of hierarchical separation so explicit-comms.batchsize
, explicit-comms.enabled
, but I am not that fussed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I totally agree that the two goals are (mostly) orthogonal. However, if we were to deprecate something like dask.explicit-comms = True
, it seems reasonable to consider if the change would also make sense in the future when external registration is hopefully a thing. (I don't think it matters much, but wasn't sure)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, if dask/dask#9521 gets in, it make sense to have common config hierarchy for all the shuffle backends.
But I think we should keep dask.explicit-comms = True
for now.
…ffle_by_partition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @madsbk ! As long as your local benchmark results make you confident that this PR provides a good balance between stability and performance, the changes seem good to me.
Thanks for the review @rjzamora. Yes, the performance is quite good: https://gist.github.com/madsbk/3bb5529610496661782284e11373a1a7 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
/merge |
In order to reduce peak memory usage, this PR implements rounds in explicit-comms shuffle.
The idea is that each worker handles a number of dataframe partitions in each round instead of doing everything at once.
The number of partitions handled in each round can be controlled by setting
DASK_EXPLICIT_COMMS_BATCHSIZE
or directly when callingshuffle()
.By default, each worker handles one partition per round.
Set
DASK_EXPLICIT_COMMS_BATCHSIZE=-1
, to handle all partitions in a single round (the previous behavior).