Skip to content

Commit

Permalink
code review
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed May 27, 2021
1 parent ab9d873 commit 5c021de
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 14 deletions.
13 changes: 5 additions & 8 deletions distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,6 @@ properties:
the worker, ignore it for this long before considering it in
non-time-sensitive heuristics. This should be set to be longer than
the duration of most dask tasks.
rebalance:
type: object
description: >-
Expand All @@ -423,18 +422,16 @@ properties:
minimum: 0
maximum: 1
description: >-
Fraction of worker process memory at which we stop potentially
receiving data from other workers. Ignored when max_memory is not
set.
Fraction of worker process memory at which we start potentially
transferring data to other workers.
recipient-max:
type: number
minimum: 0
maximum: 1
description: >-
Fraction of worker process memory at which we start potentially
transferring data to other workers.
Fraction of worker process memory at which we stop potentially
receiving data from other workers. Ignored when max_memory is not
set.
sender-recipient-gap:
type: number
minimum: 0
Expand Down
11 changes: 10 additions & 1 deletion distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2895,7 +2895,12 @@ def __reduce__(self):
await x


@gen_cluster(client=True, Worker=Nanny, worker_kwargs={"memory_limit": "1 GiB"})
@gen_cluster(
client=True,
Worker=Nanny,
worker_kwargs={"memory_limit": "1 GiB"},
config={"distributed.worker.memory.rebalance.sender-min": 0.3},
)
async def test_rebalance(c, s, *_):
"""Test Client.rebalance(). These are just to test the Client wrapper around
Scheduler.rebalance(); for more thorough tests on the latter see test_scheduler.py.
Expand Down Expand Up @@ -2967,6 +2972,7 @@ def test_rebalance_sync():
s = c.cluster.scheduler
a, b = [ws.address for ws in s.workers.values()]
futures = c.map(lambda _: "x" * (2 ** 29 // 10), range(10), workers=[a])
wait(futures)
# Wait for heartbeat
while s.memory.process < 2 ** 29:
sleep(0.1)
Expand All @@ -2984,7 +2990,10 @@ def test_rebalance_sync():
async def test_rebalance_unprepared(c, s, a, b):
"""Client.rebalance() internally waits for unfinished futures"""
futures = c.map(slowinc, range(10), delay=0.05, workers=a.address)
# Let the futures reach the scheduler
await asyncio.sleep(0.1)
# We didn't wait enough for futures to complete. However, Client.rebalance() will
# block until all futures are completed before invoking Scheduler.rebalance().
await c.rebalance(futures)
s.validate_state()

Expand Down
21 changes: 17 additions & 4 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2528,7 +2528,12 @@ async def assert_ndata(client, by_addr, total=None):
raise AssertionError(f"Expected {by_addr}, total={total}; got {out}")


@gen_cluster(client=True, Worker=Nanny, worker_kwargs={"memory_limit": "1 GiB"})
@gen_cluster(
client=True,
Worker=Nanny,
worker_kwargs={"memory_limit": "1 GiB"},
config={"distributed.worker.memory.rebalance.sender-min": 0.3},
)
async def test_rebalance(c, s, *_):
# We used nannies to have separate processes for each worker
a, b = s.workers
Expand Down Expand Up @@ -2593,7 +2598,9 @@ async def test_rebalance_missing_data1(s, a, b):

@gen_cluster(client=True)
async def test_rebalance_missing_data2(c, s, a, b):
"""keys exist but belong to unfinished futures"""
"""keys exist but belong to unfinished futures. Unlike Client.rebalance(),
Scheduler.rebalance() does not wait for unfinished futures.
"""
futures = c.map(slowinc, range(10), delay=0.05, workers=a.address)
await asyncio.sleep(0.1)
out = await s.rebalance(keys=[f.key for f in futures])
Expand Down Expand Up @@ -2629,7 +2636,10 @@ async def test_rebalance_no_workers(s):
client=True,
Worker=Nanny,
worker_kwargs={"memory_limit": "1000 MiB"},
config={"distributed.worker.memory.rebalance.measure": "managed"},
config={
"distributed.worker.memory.rebalance.measure": "managed",
"distributed.worker.memory.rebalance.sender-min": 0.3,
},
)
async def test_rebalance_managed_memory(c, s, *_):
a, b = s.workers
Expand Down Expand Up @@ -2751,7 +2761,10 @@ async def test_rebalance_sender_below_mean(c, s, *_):
client=True,
Worker=Nanny,
worker_kwargs={"memory_limit": "1000 MiB"},
config={"distributed.worker.memory.rebalance.measure": "managed"},
config={
"distributed.worker.memory.rebalance.measure": "managed",
"distributed.worker.memory.rebalance.sender-min": 0.3,
},
)
async def test_rebalance_least_recently_inserted_sender_min(c, s, *_):
"""
Expand Down
7 changes: 6 additions & 1 deletion distributed/tests/test_tls_functional.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,12 @@ async def test_nanny(c, s, a, b):
assert result == 11


@gen_tls_cluster(client=True, Worker=Nanny, worker_kwargs={"memory_limit": "1 GiB"})
@gen_tls_cluster(
client=True,
Worker=Nanny,
worker_kwargs={"memory_limit": "1 GiB"},
config={"distributed.worker.memory.rebalance.sender-min": 0.3},
)
async def test_rebalance(c, s, *_):
# We used nannies to have separate processes for each worker
a, b = s.workers
Expand Down

0 comments on commit 5c021de

Please sign in to comment.