Skip to content

Commit

Permalink
User a layer for p2p shuffle (#7180)
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter authored Oct 26, 2022
1 parent a11dd02 commit c349f4f
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 47 deletions.
128 changes: 85 additions & 43 deletions distributed/shuffle/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
from typing import TYPE_CHECKING

from dask.base import tokenize
from dask.delayed import Delayed
from dask.highlevelgraph import HighLevelGraph, MaterializedLayer
from dask.highlevelgraph import HighLevelGraph
from dask.layers import SimpleShuffleLayer

from distributed.shuffle.shuffle_extension import ShuffleId, ShuffleWorkerExtension

Expand Down Expand Up @@ -70,49 +70,91 @@ def rearrange_by_column_p2p(
) # TODO: we fail at non-string object dtypes
empty[column] = empty[column].astype("int64") # TODO: this shouldn't be necesssary

transferred = df.map_partitions(
shuffle_transfer,
id=token,
npartitions=npartitions,
column=column,
meta=df,
enforce_metadata=False,
transform_divisions=False,
)

barrier_key = "shuffle-barrier-" + token
barrier_dsk = {barrier_key: (shuffle_barrier, token, transferred.__dask_keys__())}
barrier = Delayed(
barrier_key,
HighLevelGraph.from_collections(
barrier_key, barrier_dsk, dependencies=[transferred]
),
name = f"shuffle-p2p-{token}"
layer = P2PShuffleLayer(
name,
column,
npartitions,
npartitions_input=df.npartitions,
ignore_index=True,
name_input=df._name,
meta_input=empty,
)

name = "shuffle-unpack-" + token
dsk = {
(name, i): (shuffle_unpack, token, i, barrier_key) for i in range(npartitions)
}
layer = MaterializedLayer(dsk, annotations={"shuffle": lambda key: key[1]})
# TODO: update to use blockwise.
# Changes task names, so breaks setting worker restrictions at the moment.
# Also maybe would be nice if the `DataFrameIOLayer` interface supported this?
# dsk = blockwise(
# shuffle_unpack,
# name,
# "i",
# token,
# None,
# BlockwiseDepDict({(i,): i for i in range(npartitions)}),
# "i",
# barrier_key,
# None,
# numblocks={},
# )

return DataFrame(
HighLevelGraph.from_collections(name, layer, [barrier]),
HighLevelGraph.from_collections(name, layer, [df]),
name,
df._meta,
empty,
[None] * (npartitions + 1),
)


class P2PShuffleLayer(SimpleShuffleLayer):
def __init__(
self,
name,
column,
npartitions,
npartitions_input,
ignore_index,
name_input,
meta_input,
parts_out=None,
annotations=None,
):
annotations = annotations or {}
annotations.update({"shuffle": lambda key: key[1]})
super().__init__(
name,
column,
npartitions,
npartitions_input,
ignore_index,
name_input,
meta_input,
parts_out,
annotations=annotations,
)

def get_split_keys(self):
# TODO: This is doing some funky stuff to set priorities but we don't need this
return []

def __repr__(self):
return (
f"{type(self).__name__}<name='{self.name}', npartitions={self.npartitions}>"
)

def _cull(self, parts_out):
return P2PShuffleLayer(
self.name,
self.column,
self.npartitions,
self.npartitions_input,
self.ignore_index,
self.name_input,
self.meta_input,
parts_out=parts_out,
)

def _construct_graph(self, deserializing=None):
token = tokenize(self.name_input, self.column, self.npartitions, self.parts_out)
dsk = {}
barrier_key = "shuffle-barrier-" + token
name = "shuffle-transfer-" + token
tranfer_keys = list()
for i in range(self.npartitions_input):
tranfer_keys.append((name, i))
dsk[(name, i)] = (
shuffle_transfer,
(self.name_input, i),
token,
self.npartitions,
self.column,
)

dsk[barrier_key] = (shuffle_barrier, token, tranfer_keys)

name = self.name
for part_out in self.parts_out:
dsk[(name, part_out)] = (shuffle_unpack, token, part_out, barrier_key)
return dsk
14 changes: 10 additions & 4 deletions distributed/shuffle/tests/test_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,16 @@ async def test_tail(c, s, a, b):
dtypes={"x": float, "y": float},
freq="1 s",
)
shuffled = dd.shuffle.shuffle(df, "x", shuffle="p2p").tail(compute=False)
persisted = await shuffled.persist() # Only ask for one key
x = dd.shuffle.shuffle(df, "x", shuffle="p2p")
full = await x.persist()
ntasks_full = len(s.tasks)
del full
while s.tasks:
await asyncio.sleep(0)
partial = await x.tail(compute=False).persist() # Only ask for one key

assert len(s.tasks) < df.npartitions * 2
assert len(s.tasks) < ntasks_full
del partial

clean_worker(a)
clean_worker(b)
Expand Down Expand Up @@ -402,6 +408,7 @@ async def test_restrictions(c, s, a, b):
@pytest.mark.xfail(reason="Don't clean up forgotten shuffles")
@gen_cluster(client=True)
async def test_delete_some_results(c, s, a, b):
# FIXME: This works but not reliably. It fails every ~25% of runs
df = dask.datasets.timeseries(
start="2000-01-01",
end="2000-01-10",
Expand All @@ -421,7 +428,6 @@ async def test_delete_some_results(c, s, a, b):
clean_scheduler(s)


@pytest.mark.xfail(reason="Don't update ongoing shuffles")
@gen_cluster(client=True)
async def test_add_some_results(c, s, a, b):
df = dask.datasets.timeseries(
Expand Down

0 comments on commit c349f4f

Please sign in to comment.