Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make P2P more configurable #8469

Merged
merged 6 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions distributed/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"distributed.scheduler.events-log-length": "distributed.admin.low-level-log-length",
"recent-messages-log-length": "distributed.admin.low-level-log-length",
"distributed.comm.recent-messages-log-length": "distributed.admin.low-level-log-length",
"distributed.p2p.disk": "distributed.p2p.storage.disk",
}

# Affects yaml and env variables configs, as well as calls to dask.config.set()
Expand Down
48 changes: 44 additions & 4 deletions distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,28 @@ properties:
description: Configuration settings for Dask communications specific to P2P
properties:

buffer:
type:
- string
- integer
description: |
The maximum amount of data for P2P's comm buffers to buffer in-memory per worker.
This limit is not absolute but used to apply back pressure.
concurrency:
type: integer
description: Number of concurrent background tasks used for IO-intensive operations in per P2P comm buffer.
message-bytes-limit:
type:
- string
- integer
description: |
The maximum amount of data for P2P to send to another worker in a single operation

Data is sent in batches, and if the first shard is larger than this value,
the task shard still be sent to ensure progress. Hence, this limit is not absolute.
Note that this limit applies to a single send operation and a worker may send data to
multiple workers in parallel.

retry:
type: object
description: |
Expand All @@ -1067,10 +1089,28 @@ properties:
max:
type: string
description: The maximum delay between retries
disk:
type: boolean
description: |
Whether or not P2P stores intermediate data on disk instead of memory

storage:
type: object
description: Configuration settings for P2P storage
properties:

buffer:
type:
- string
- integer
description: |
The maximum amount of data for P2P's disk buffers to buffer in-memory per worker
This limit is not absolute but used to apply back pressure.
disk:
type: boolean
description: |
Whether or not P2P stores intermediate data on disk instead of memory
threads:
type:
- integer
- "null"
description: Number of threads used for CPU-intensive operations per worker. Defaults to number of worker threads.

dashboard:
type: object
Expand Down
8 changes: 7 additions & 1 deletion distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -306,12 +306,18 @@ distributed:

p2p:
comm:
buffer: 1 GiB
concurrency: 10
message-bytes-limit: 2 MiB
retry:
count: 10
delay:
min: 1s # the first non-zero delay between re-tries
max: 30s # the maximum delay between re-tries
disk: True
storage:
buffer: 100 MiB
disk: True
threads: null

###################
# Bokeh dashboard #
Expand Down
1 change: 1 addition & 0 deletions distributed/shuffle/_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def _continue() -> bool:
if self.max_message_size > 0:
size = 0
shards = []
# FIXME: We always exceed the limit, not just on the first shard.
while size < self.max_message_size:
try:
shard = self.shards[part_id].pop()
Expand Down
9 changes: 3 additions & 6 deletions distributed/shuffle/_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
from collections.abc import Awaitable, Callable
from typing import Any

from dask.utils import parse_bytes

from distributed.core import ErrorMessage, OKMessage, clean_exception
from distributed.metrics import context_meter
from distributed.shuffle._disk import ShardsBuffer
Expand Down Expand Up @@ -49,20 +47,19 @@ class CommShardsBuffer(ShardsBuffer):
Number of background tasks to run.
"""

max_message_size = parse_bytes("2 MiB")

def __init__(
self,
send: Callable[
[str, list[tuple[Any, Any]]], Awaitable[OKMessage | ErrorMessage]
],
memory_limiter: ResourceLimiter,
concurrency_limit: int = 10,
max_message_size: int,
concurrency_limit: int,
):
super().__init__(
memory_limiter=memory_limiter,
concurrency_limit=concurrency_limit,
max_message_size=CommShardsBuffer.max_message_size,
max_message_size=max_message_size,
)
self.send = send

Expand Down
11 changes: 9 additions & 2 deletions distributed/shuffle/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import dask.config
from dask.core import flatten
from dask.typing import Key
from dask.utils import parse_timedelta
from dask.utils import parse_bytes, parse_timedelta

from distributed.core import ErrorMessage, OKMessage, PooledRPCCall, error_message
from distributed.exceptions import Reschedule
Expand Down Expand Up @@ -132,8 +132,15 @@ def __init__(
self._disk_buffer = MemoryShardsBuffer(deserialize=self.deserialize)

with self._capture_metrics("background-comms"):
max_message_size = parse_bytes(
dask.config.get("distributed.p2p.comm.message-bytes-limit")
)
concurrency_limit = dask.config.get("distributed.p2p.comm.concurrency")
self._comm_buffer = CommShardsBuffer(
send=self.send, memory_limiter=memory_limiter_comms
send=self.send,
max_message_size=max_message_size,
memory_limiter=memory_limiter_comms,
concurrency_limit=concurrency_limit,
)

# TODO: reduce number of connections to number of workers
Expand Down
2 changes: 1 addition & 1 deletion distributed/shuffle/_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def hash_join_p2p(
lhs = _calculate_partitions(lhs, left_on, npartitions)
rhs = _calculate_partitions(rhs, right_on, npartitions)
merge_name = "hash-join-" + tokenize(lhs, rhs, **merge_kwargs)
disk: bool = dask.config.get("distributed.p2p.disk")
disk: bool = dask.config.get("distributed.p2p.storage.disk")
join_layer = HashJoinP2PLayer(
name=merge_name,
name_input_left=lhs._name,
Expand Down
2 changes: 1 addition & 1 deletion distributed/shuffle/_rechunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def rechunk_p2p(

token = tokenize(x, chunks)
name = rechunk_name(token)
disk: bool = dask.config.get("distributed.p2p.disk")
disk: bool = dask.config.get("distributed.p2p.storage.disk")

layer = P2PRechunkLayer(
name=name,
Expand Down
2 changes: 1 addition & 1 deletion distributed/shuffle/_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def rearrange_by_column_p2p(
)

name = f"shuffle_p2p-{token}"
disk: bool = dask.config.get("distributed.p2p.disk")
disk: bool = dask.config.get("distributed.p2p.storage.disk")

layer = P2PShuffleLayer(
name,
Expand Down
12 changes: 9 additions & 3 deletions distributed/shuffle/_worker_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from concurrent.futures import ThreadPoolExecutor
from typing import TYPE_CHECKING, Any, overload

import dask
from dask.context import thread_state
from dask.typing import Key
from dask.utils import parse_bytes
Expand Down Expand Up @@ -283,14 +284,19 @@ def setup(self, worker: Worker) -> None:
# Initialize
self.worker = worker
self.shuffle_runs = _ShuffleRunManager(self)
comm_limit = parse_bytes(dask.config.get("distributed.p2p.comm.buffer"))
self.memory_limiter_comms = ResourceLimiter(
parse_bytes("100 MiB"), metrics_label="p2p-comms-limiter"
comm_limit, metrics_label="p2p-comms-limiter"
)
storage_limit = parse_bytes(dask.config.get("distributed.p2p.storage.buffer"))
self.memory_limiter_disk = ResourceLimiter(
parse_bytes("1 GiB"), metrics_label="p2p-disk-limiter"
storage_limit, metrics_label="p2p-disk-limiter"
)
self.closed = False
self._executor = ThreadPoolExecutor(self.worker.state.nthreads)
nthreads = (
dask.config.get("distributed.p2p.threads") or self.worker.state.nthreads
)
self._executor = ThreadPoolExecutor(nthreads)

def __str__(self) -> str:
return f"ShuffleWorkerPlugin on {self.worker.address}"
Expand Down
29 changes: 24 additions & 5 deletions distributed/shuffle/tests/test_comm_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ async def test_basic(tmp_path):
async def send(address, shards):
d[address].extend(shards)

mc = CommShardsBuffer(send=send, memory_limiter=ResourceLimiter(None))
mc = CommShardsBuffer(
send=send,
max_message_size=parse_bytes("2 MiB"),
memory_limiter=ResourceLimiter(None),
concurrency_limit=10,
)
await mc.write({"x": b"0" * 1000, "y": b"1" * 500})
await mc.write({"x": b"0" * 1000, "y": b"1" * 500})

Expand All @@ -37,7 +42,12 @@ async def test_exceptions(tmp_path):
async def send(address, shards):
raise Exception(123)

mc = CommShardsBuffer(send=send, memory_limiter=ResourceLimiter(None))
mc = CommShardsBuffer(
send=send,
max_message_size=parse_bytes("2 MiB"),
memory_limiter=ResourceLimiter(None),
concurrency_limit=10,
)
await mc.write({"x": b"0" * 1000, "y": b"1" * 500})

while not mc._exception:
Expand Down Expand Up @@ -65,7 +75,10 @@ async def send(address, shards):
return {"status": "OK"}

mc = CommShardsBuffer(
send=send, concurrency_limit=1, memory_limiter=ResourceLimiter(None)
send=send,
max_message_size=parse_bytes("2 MiB"),
concurrency_limit=1,
memory_limiter=ResourceLimiter(None),
)
await mc.write({"x": b"0", "y": b"1"})
await mc.write({"x": b"0", "y": b"1"})
Expand Down Expand Up @@ -96,7 +109,10 @@ async def send(address, shards):
nshards = 10
nputs = 20
comm_buffer = CommShardsBuffer(
send=send, memory_limiter=ResourceLimiter(parse_bytes("100 MiB"))
send=send,
max_message_size=parse_bytes("2 MiB"),
memory_limiter=ResourceLimiter(parse_bytes("100 MiB")),
concurrency_limit=10,
)
payload = {
x: gen_bytes(frac, comm_buffer.memory_limiter.limit) for x in range(nshards)
Expand Down Expand Up @@ -138,7 +154,10 @@ async def send(address, shards):
nshards = 10
nputs = 20
comm_buffer = CommShardsBuffer(
send=send, memory_limiter=ResourceLimiter(parse_bytes("100 MiB"))
send=send,
max_message_size=parse_bytes("2 MiB"),
memory_limiter=ResourceLimiter(parse_bytes("100 MiB")),
concurrency_limit=10,
)
payload = {
x: gen_bytes(frac, comm_buffer.memory_limiter.limit) for x in range(nshards)
Expand Down
2 changes: 1 addition & 1 deletion distributed/shuffle/tests/test_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ async def test_merge(c, s, a, b, how, disk):
b = dd.repartition(B, [0, 2, 5])

with dask.config.set({"dataframe.shuffle.method": "p2p"}):
with dask.config.set({"distributed.p2p.disk": disk}):
with dask.config.set({"distributed.p2p.storage.disk": disk}):
joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
res = await c.compute(joined)
assert_eq(
Expand Down
6 changes: 3 additions & 3 deletions distributed/shuffle/tests/test_rechunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ async def test_rechunk_2d(c, s, *ws, disk):
a = np.random.default_rng().uniform(0, 1, 300).reshape((10, 30))
x = da.from_array(a, chunks=((1, 2, 3, 4), (5,) * 6))
new = ((5, 5), (15,) * 2)
with dask.config.set({"distributed.p2p.disk": disk}):
with dask.config.set({"distributed.p2p.storage.disk": disk}):
x2 = rechunk(x, chunks=new, method="p2p")
assert x2.chunks == new
assert np.all(await c.compute(x2) == a)
Expand All @@ -357,7 +357,7 @@ async def test_rechunk_4d(c, s, *ws, disk):
a = np.random.default_rng().uniform(0, 1, 10000).reshape((10,) * 4)
x = da.from_array(a, chunks=old)
new = ((10,),) * 4
with dask.config.set({"distributed.p2p.disk": disk}):
with dask.config.set({"distributed.p2p.storage.disk": disk}):
x2 = rechunk(x, chunks=new, method="p2p")
assert x2.chunks == new
await c.compute(x2)
Expand Down Expand Up @@ -1272,7 +1272,7 @@ async def test_preserve_writeable_flag(c, s, a, b):
assert out.tolist() == [True, True]


@gen_cluster(client=True, config={"distributed.p2p.disk": False})
@gen_cluster(client=True, config={"distributed.p2p.storage.disk": False})
async def test_rechunk_in_memory_shards_dont_share_buffer(c, s, a, b):
"""Test that, if two shards are sent in the same RPC call and they contribute to
different output chunks, downstream tasks don't need to consume all output chunks in
Expand Down
8 changes: 4 additions & 4 deletions distributed/shuffle/tests/test_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ async def test_basic_integration(c, s, a, b, npartitions, disk):
freq="10 s",
)
with dask.config.set(
{"dataframe.shuffle.method": "p2p", "distributed.p2p.disk": disk}
{"dataframe.shuffle.method": "p2p", "distributed.p2p.storage.disk": disk}
):
shuffled = df.shuffle("x", npartitions=npartitions)
if npartitions is None:
Expand Down Expand Up @@ -2055,7 +2055,7 @@ def _write_frames(self, frames, id):
freq="10 s",
)
with dask.config.set(
{"dataframe.shuffle.method": "p2p", "distributed.p2p.disk": True}
{"dataframe.shuffle.method": "p2p", "distributed.p2p.storage.disk": True}
):
shuffled = df.shuffle("x", npartitions=10)
with pytest.raises(P2POutOfDiskError, match="out of available disk space"):
Expand Down Expand Up @@ -2861,7 +2861,7 @@ def make_partition(partition_id, size):
df = dd.from_map(make_partition, np.arange(19), args=(250,))

with dask.config.set(
{"dataframe.shuffle.method": "p2p", "distributed.p2p.disk": disk}
{"dataframe.shuffle.method": "p2p", "distributed.p2p.storage.disk": disk}
):
shuffled = df.shuffle("b")
result, expected = await c.compute([shuffled, df], sync=True)
Expand All @@ -2882,7 +2882,7 @@ async def test_drop_duplicates_stable_ordering(c, s, a, b, keep, disk):
df = dask.datasets.timeseries()

with dask.config.set(
{"dataframe.shuffle.method": "p2p", "distributed.p2p.disk": disk}
{"dataframe.shuffle.method": "p2p", "distributed.p2p.storage.disk": disk}
):
result, expected = await c.compute(
[
Expand Down
Loading