diff --git a/distributed/shuffle/shuffle.py b/distributed/shuffle/shuffle.py index 01eba81b7b..a9c828c294 100644 --- a/distributed/shuffle/shuffle.py +++ b/distributed/shuffle/shuffle.py @@ -3,8 +3,8 @@ from typing import TYPE_CHECKING from dask.base import tokenize -from dask.delayed import Delayed -from dask.highlevelgraph import HighLevelGraph, MaterializedLayer +from dask.highlevelgraph import HighLevelGraph +from dask.layers import SimpleShuffleLayer from distributed.shuffle.shuffle_extension import ShuffleId, ShuffleWorkerExtension @@ -70,49 +70,91 @@ def rearrange_by_column_p2p( ) # TODO: we fail at non-string object dtypes empty[column] = empty[column].astype("int64") # TODO: this shouldn't be necesssary - transferred = df.map_partitions( - shuffle_transfer, - id=token, - npartitions=npartitions, - column=column, - meta=df, - enforce_metadata=False, - transform_divisions=False, - ) - - barrier_key = "shuffle-barrier-" + token - barrier_dsk = {barrier_key: (shuffle_barrier, token, transferred.__dask_keys__())} - barrier = Delayed( - barrier_key, - HighLevelGraph.from_collections( - barrier_key, barrier_dsk, dependencies=[transferred] - ), + name = f"shuffle-p2p-{token}" + layer = P2PShuffleLayer( + name, + column, + npartitions, + npartitions_input=df.npartitions, + ignore_index=True, + name_input=df._name, + meta_input=empty, ) - - name = "shuffle-unpack-" + token - dsk = { - (name, i): (shuffle_unpack, token, i, barrier_key) for i in range(npartitions) - } - layer = MaterializedLayer(dsk, annotations={"shuffle": lambda key: key[1]}) - # TODO: update to use blockwise. - # Changes task names, so breaks setting worker restrictions at the moment. - # Also maybe would be nice if the `DataFrameIOLayer` interface supported this? - # dsk = blockwise( - # shuffle_unpack, - # name, - # "i", - # token, - # None, - # BlockwiseDepDict({(i,): i for i in range(npartitions)}), - # "i", - # barrier_key, - # None, - # numblocks={}, - # ) - return DataFrame( - HighLevelGraph.from_collections(name, layer, [barrier]), + HighLevelGraph.from_collections(name, layer, [df]), name, - df._meta, + empty, [None] * (npartitions + 1), ) + + +class P2PShuffleLayer(SimpleShuffleLayer): + def __init__( + self, + name, + column, + npartitions, + npartitions_input, + ignore_index, + name_input, + meta_input, + parts_out=None, + annotations=None, + ): + annotations = annotations or {} + annotations.update({"shuffle": lambda key: key[1]}) + super().__init__( + name, + column, + npartitions, + npartitions_input, + ignore_index, + name_input, + meta_input, + parts_out, + annotations=annotations, + ) + + def get_split_keys(self): + # TODO: This is doing some funky stuff to set priorities but we don't need this + return [] + + def __repr__(self): + return ( + f"{type(self).__name__}" + ) + + def _cull(self, parts_out): + return P2PShuffleLayer( + self.name, + self.column, + self.npartitions, + self.npartitions_input, + self.ignore_index, + self.name_input, + self.meta_input, + parts_out=parts_out, + ) + + def _construct_graph(self, deserializing=None): + token = tokenize(self.name_input, self.column, self.npartitions, self.parts_out) + dsk = {} + barrier_key = "shuffle-barrier-" + token + name = "shuffle-transfer-" + token + tranfer_keys = list() + for i in range(self.npartitions_input): + tranfer_keys.append((name, i)) + dsk[(name, i)] = ( + shuffle_transfer, + (self.name_input, i), + token, + self.npartitions, + self.column, + ) + + dsk[barrier_key] = (shuffle_barrier, token, tranfer_keys) + + name = self.name + for part_out in self.parts_out: + dsk[(name, part_out)] = (shuffle_unpack, token, part_out, barrier_key) + return dsk diff --git a/distributed/shuffle/tests/test_shuffle.py b/distributed/shuffle/tests/test_shuffle.py index 48f0194b15..63e245bb23 100644 --- a/distributed/shuffle/tests/test_shuffle.py +++ b/distributed/shuffle/tests/test_shuffle.py @@ -288,10 +288,16 @@ async def test_tail(c, s, a, b): dtypes={"x": float, "y": float}, freq="1 s", ) - shuffled = dd.shuffle.shuffle(df, "x", shuffle="p2p").tail(compute=False) - persisted = await shuffled.persist() # Only ask for one key + x = dd.shuffle.shuffle(df, "x", shuffle="p2p") + full = await x.persist() + ntasks_full = len(s.tasks) + del full + while s.tasks: + await asyncio.sleep(0) + partial = await x.tail(compute=False).persist() # Only ask for one key - assert len(s.tasks) < df.npartitions * 2 + assert len(s.tasks) < ntasks_full + del partial clean_worker(a) clean_worker(b) @@ -402,6 +408,7 @@ async def test_restrictions(c, s, a, b): @pytest.mark.xfail(reason="Don't clean up forgotten shuffles") @gen_cluster(client=True) async def test_delete_some_results(c, s, a, b): + # FIXME: This works but not reliably. It fails every ~25% of runs df = dask.datasets.timeseries( start="2000-01-01", end="2000-01-10", @@ -421,7 +428,6 @@ async def test_delete_some_results(c, s, a, b): clean_scheduler(s) -@pytest.mark.xfail(reason="Don't update ongoing shuffles") @gen_cluster(client=True) async def test_add_some_results(c, s, a, b): df = dask.datasets.timeseries(