From 9ec764b7c9310095aa3794c27a7709786010a08b Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 8 Sep 2022 16:26:09 +0200 Subject: [PATCH 01/28] Initial test for smarter stealing with dependencies --- distributed/tests/test_steal.py | 72 ++++++++++++++++++++++++++++++++- 1 file changed, 70 insertions(+), 2 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index cc00b50428..cd8f1677f4 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -6,6 +6,7 @@ import logging import math import random +import sys import weakref from operator import mul from time import sleep @@ -703,8 +704,8 @@ def block(*args, event, **kwargs): @pytest.mark.parametrize( "inp,expected", [ - ([[1], []], [[1], []]), # don't move unnecessarily - ([[0, 0], []], [[0], [0]]), # balance + pytest.param([[1], []], [[1], []], id="don't move unnecessarily"), + pytest.param([[0, 0], []], [[0], [0]], id="balance"), ([[0.1, 0.1], []], [[0], [0]]), # balance even if results in even ([[0, 0, 0], []], [[0, 0], [0]]), # don't over balance ([[0, 0], [0, 0, 0], []], [[0, 0], [0, 0], [0]]), # move from larger @@ -1412,3 +1413,70 @@ def func(*args): ideal = ntasks / len(workers) assert (ntasks_per_worker > ideal * 0.5).all(), (ideal, ntasks_per_worker) assert (ntasks_per_worker < ideal * 1.5).all(), (ideal, ntasks_per_worker) + + +@gen_cluster( + client=True, + nthreads=[("", 1)] * 3, + config={"distributed.scheduler.unknown-task-duration": "1s"}, +) +async def test_steal_to_dependency(c, s, *workers): + expected = [[1], [], [1]] + steal = s.extensions["stealing"] + await steal.stop() + ev = Event() + + def block(*args, event, **kwargs): + event.wait() + + class Sizeof: + def __init__(self, nbytes): + self._nbytes = nbytes - sys.getsizeof(object()) + + def __sizeof__(self) -> int: + return self._nbytes + + futures = [] + t = 1 + [dat] = await c.scatter( + [Sizeof(int(t * s.bandwidth))], workers=[workers[0].address, workers[2].address] + ) + for i in range(2): + f = c.submit( + block, + dat, + event=ev, + key=f"{int(t)}-{i}", + workers=workers[0].address, + allow_other_workers=True, + pure=False, + priority=-i, + ) + futures.append(f) + + while len([ts for ts in s.tasks.values() if ts.processing_on]) < len(futures): + await asyncio.sleep(0.001) + if True: # recompute_saturation: + for ws in s.workers.values(): + s._reevaluate_occupancy_worker(ws) + try: + for _ in range(20): + steal.balance() + + while steal.in_flight: + await asyncio.sleep(0.001) + + result = [ + sorted( + (int(key_split(ts.key)) for ts in s.workers[w.address].processing), + reverse=True, + ) + for w in workers + ] + + if result == expected: + # Release the threadpools + return + finally: + await ev.set() + raise Exception(f"Expected: {expected}; got: {result}") From ce1df7a41a3a80576610c1ae7185d4b9644de936 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 8 Sep 2022 18:10:43 +0200 Subject: [PATCH 02/28] Parametrized test --- distributed/tests/test_steal.py | 171 ++++++++++++++++++++++++++------ 1 file changed, 140 insertions(+), 31 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index cd8f1677f4..c1916bf9d3 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -8,6 +8,7 @@ import random import sys import weakref +from collections import defaultdict from operator import mul from time import sleep @@ -49,6 +50,16 @@ teardown_module = nodebug_teardown_module +class Sizeof: + """Helper class for creating task results of a given size""" + + def __init__(self, nbytes): + self._nbytes = nbytes - sys.getsizeof(object) + + def __sizeof__(self) -> int: + return self._nbytes + + @gen_cluster(client=True, nthreads=[("", 2), ("", 2)]) async def test_work_stealing(c, s, a, b): [x] = await c._scatter([1], workers=a.address) @@ -1415,13 +1426,112 @@ def func(*args): assert (ntasks_per_worker < ideal * 1.5).all(), (ideal, ntasks_per_worker) -@gen_cluster( - client=True, - nthreads=[("", 1)] * 3, - config={"distributed.scheduler.unknown-task-duration": "1s"}, +@pytest.mark.parametrize("recompute_saturation", [True, False]) +@pytest.mark.parametrize( + "dependencies, dependency_placement, task_placement, expected_placement", + [ + pytest.param( + {"a": 1}, + [["a"], []], + [[["a"], ["a"], ["a"]], []], + [[["a"], ["a"]], [["a"]]], + id="test_balance[be willing to move costly items]", + ), + pytest.param( + {"a": 1}, + [["a"], []], + [[["a"], ["a"], ["a"], ["a"]], []], + [[["a"], ["a"], ["a"]], [["a"]]], + id="test_balance[but don't move too many]", + ), + pytest.param( + {"a": 1}, + [["a"], []], + [[["a"], ["a"]], []], + [[["a"]], [["a"]]], + id="balance without dependency", + ), + pytest.param( + {"a": 1}, + [["a"], ["a"]], + [[["a"], ["a"]], []], + [[["a"]], [["a"]]], + id="balance with dependency", + ), + pytest.param( + {"a": 1}, + [["a"], [], ["a"]], + [[["a"], ["a"]], [], []], + [[["a"]], [], [["a"]]], + id="balance to dependency", + ), + ], ) -async def test_steal_to_dependency(c, s, *workers): - expected = [[1], [], [1]] +def test_balancing_with_dependencies( + dependencies, + dependency_placement, + task_placement, + expected_placement, + recompute_saturation, +): + async def _test_balance(*args, **kwargs): + await assert_balanced_with_dependencies( + dependencies, + dependency_placement, + task_placement, + expected_placement, + recompute_saturation, + *args, + **kwargs, + ) + + config = {"distributed.scheduler.unknown-task-duration": "1s"} + gen_cluster( + client=True, + nthreads=[("", 1)] * len(task_placement), + config=config, + )(_test_balance)() + + +async def place_dependencies(dependencies, placement, c, s, workers): + dependencies_to_workers = defaultdict(set) + for worker_idx, placed in enumerate(placement): + for dependency in placed: + dependencies_to_workers[dependency].add(workers[worker_idx].address) + + dependency_futures = {} + for name, multiplier in dependencies.items(): + worker_addresses = dependencies_to_workers[name] + [fut] = await c.scatter( + {name: Sizeof(int(multiplier * s.bandwidth))}, + workers=worker_addresses, + broadcast=True, + ) + dependency_futures[name] = fut + + assert_dependency_placement(placement, workers) + + return dependency_futures + + +def assert_dependency_placement(expected, workers): + actual = [] + for worker in workers: + actual.append(list(worker.state.tasks.keys())) + + assert actual == expected + + +async def assert_balanced_with_dependencies( + dependencies, + dependency_placement, + task_placement, + expected_placement, + recompute_saturation, + c, + s, + *workers, +): steal = s.extensions["stealing"] await steal.stop() ev = Event() @@ -1429,34 +1539,33 @@ async def test_steal_to_dependency(c, s, *workers): def block(*args, event, **kwargs): event.wait() - class Sizeof: - def __init__(self, nbytes): - self._nbytes = nbytes - sys.getsizeof(object()) + counter = itertools.count() - def __sizeof__(self) -> int: - return self._nbytes + dependency_futures = await place_dependencies( + dependencies, dependency_placement, c, s, workers + ) futures = [] - t = 1 - [dat] = await c.scatter( - [Sizeof(int(t * s.bandwidth))], workers=[workers[0].address, workers[2].address] - ) - for i in range(2): - f = c.submit( - block, - dat, - event=ev, - key=f"{int(t)}-{i}", - workers=workers[0].address, - allow_other_workers=True, - pure=False, - priority=-i, - ) - futures.append(f) + for idx, tasks in enumerate(task_placement): + for dependencies in tasks: + i = next(counter) + dep_key = "".join(sorted(dependencies)) + key = f"{dep_key}-{i}" + f = c.submit( + block, + [dependency_futures[dependency] for dependency in dependencies], + event=ev, + key=key, + workers=workers[idx].address, + allow_other_workers=True, + pure=False, + priority=-i, + ) + futures.append(f) while len([ts for ts in s.tasks.values() if ts.processing_on]) < len(futures): await asyncio.sleep(0.001) - if True: # recompute_saturation: + if recompute_saturation: for ws in s.workers.values(): s._reevaluate_occupancy_worker(ws) try: @@ -1468,15 +1577,15 @@ def __sizeof__(self) -> int: result = [ sorted( - (int(key_split(ts.key)) for ts in s.workers[w.address].processing), + (list(key_split(ts.key)) for ts in s.workers[w.address].processing), reverse=True, ) for w in workers ] - if result == expected: + if result == expected_placement: # Release the threadpools return finally: await ev.set() - raise Exception(f"Expected: {expected}; got: {result}") + raise Exception(f"Expected: {expected_placement}; got: {result}") From a6cc92864a43bf2ec5e5a1c672df5caf2e1c2c85 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 9 Sep 2022 09:24:19 +0200 Subject: [PATCH 03/28] Add tests and make parts parametrizable --- distributed/tests/test_steal.py | 282 +++++++++++++++++++++----------- 1 file changed, 182 insertions(+), 100 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index c1916bf9d3..d190e50982 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1427,59 +1427,115 @@ def func(*args): @pytest.mark.parametrize("recompute_saturation", [True, False]) -@pytest.mark.parametrize( - "dependencies, dependency_placement, task_placement, expected_placement", - [ - pytest.param( - {"a": 1}, - [["a"], []], - [[["a"], ["a"], ["a"]], []], - [[["a"], ["a"]], [["a"]]], - id="test_balance[be willing to move costly items]", - ), - pytest.param( - {"a": 1}, - [["a"], []], - [[["a"], ["a"], ["a"], ["a"]], []], - [[["a"], ["a"], ["a"]], [["a"]]], - id="test_balance[but don't move too many]", - ), - pytest.param( - {"a": 1}, - [["a"], []], - [[["a"], ["a"]], []], - [[["a"]], [["a"]]], - id="balance without dependency", - ), - pytest.param( - {"a": 1}, - [["a"], ["a"]], - [[["a"], ["a"]], []], - [[["a"]], [["a"]]], - id="balance with dependency", - ), - pytest.param( - {"a": 1}, - [["a"], [], ["a"]], - [[["a"], ["a"]], [], []], - [[["a"]], [], [["a"]]], - id="balance to dependency", - ), - ], -) -def test_balancing_with_dependencies( - dependencies, - dependency_placement, - task_placement, - expected_placement, - recompute_saturation, -): - async def _test_balance(*args, **kwargs): - await assert_balanced_with_dependencies( +def test_balance_willing_to_move_costly_items(recompute_saturation): + dependencies = {"a": 1, "b": 1, "c": 1} + dependency_placement = [["a", "b", "c"], []] + task_placement = [[["a"], ["b"], ["c"]], []] + + def _correct_placement(actual): + actual_task_counts = [len(placed) for placed in actual] + return actual_task_counts == [2, 1] + + async def _run_test(*args, **kwargs): + await _run_balance_test( + dependencies, + dependency_placement, + task_placement, + _correct_placement, + recompute_saturation, + *args, + **kwargs, + ) + + config = {"distributed.scheduler.unknown-task-duration": "1s"} + gen_cluster( + client=True, + nthreads=[("", 1)] * len(task_placement), + config=config, + )(_run_test)() + + +@pytest.mark.parametrize("recompute_saturation", [True, False]) +def test_balance_but_dont_move_too_many(recompute_saturation): + dependencies = {"a": 1, "b": 1, "c": 1, "d": 1} + dependency_placement = [["a", "b", "c", "d"], []] + task_placement = [[["a"], ["b"], ["c"], ["d"]], []] + + def _correct_placement(actual): + actual_task_counts = [len(placed) for placed in actual] + return actual_task_counts == [3, 1] + + async def _run_test(*args, **kwargs): + await _run_balance_test( + dependencies, + dependency_placement, + task_placement, + _correct_placement, + recompute_saturation, + *args, + **kwargs, + ) + + config = {"distributed.scheduler.unknown-task-duration": "1s"} + gen_cluster( + client=True, + nthreads=[("", 1)] * len(task_placement), + config=config, + )(_run_test)() + + +@pytest.mark.parametrize("recompute_saturation", [True, False]) +def test_balance_even_with_replica(recompute_saturation): + dependencies = {"a": 1} + dependency_placement = [["a"], ["a"]] + task_placement = [[["a"], ["a"]], []] + + def _correct_placement(actual): + actual_task_counts = [len(placed) for placed in actual] + return actual_task_counts == [ + 1, + 1, + ] + + async def _run_test(*args, **kwargs): + await _run_balance_test( + dependencies, + dependency_placement, + task_placement, + _correct_placement, + recompute_saturation, + *args, + **kwargs, + ) + + config = {"distributed.scheduler.unknown-task-duration": "1s"} + gen_cluster( + client=True, + nthreads=[("", 1)] * len(task_placement), + config=config, + )(_run_test)() + + +@pytest.mark.parametrize("recompute_saturation", [True, False]) +def test_balance_to_replica(recompute_saturation): + dependencies = {"a": 1} + dependency_placement = [["a"], ["a"], []] + task_placement = [[["a"], ["a"], ["a"]], [], []] + + def _correct_placement(actual): + actual_task_counts = [len(placed) for placed in actual] + return actual_task_counts == [ + 2, + 1, + 0, + ] # Note: The success of this test currently depends on worker ordering + + async def _run_test(*args, **kwargs): + await _run_balance_test( dependencies, dependency_placement, task_placement, - expected_placement, + _correct_placement, recompute_saturation, *args, **kwargs, @@ -1490,31 +1546,73 @@ async def _test_balance(*args, **kwargs): client=True, nthreads=[("", 1)] * len(task_placement), config=config, - )(_test_balance)() + )(_run_test)() -async def place_dependencies(dependencies, placement, c, s, workers): +async def _run_balance_test( + dependencies, + dependency_placement, + task_placement, + correct_placement_fn, + recompute_saturation, + c, + s, + *workers, +): + steal = s.extensions["stealing"] + await steal.stop() + ev = Event() + + dependency_futures = await _place_dependencies( + dependencies, dependency_placement, c, s, workers + ) + + futures = await _place_tasks(ev, task_placement, dependency_futures, c, s, workers) + if recompute_saturation: + for ws in s.workers.values(): + s._reevaluate_occupancy_worker(ws) + try: + for _ in range(20): + steal.balance() + + while steal.in_flight: + await asyncio.sleep(0.001) + + result = _get_task_placement(s, workers) + + if correct_placement_fn(result): + # Release the threadpools + return + finally: + await ev.set() + + raise AssertionError(result) + + +async def _place_dependencies(dependencies, placement, c, s, workers): dependencies_to_workers = defaultdict(set) for worker_idx, placed in enumerate(placement): for dependency in placed: dependencies_to_workers[dependency].add(workers[worker_idx].address) - dependency_futures = {} + futures = {} for name, multiplier in dependencies.items(): worker_addresses = dependencies_to_workers[name] - [fut] = await c.scatter( + futs = await c.scatter( {name: Sizeof(int(multiplier * s.bandwidth))}, workers=worker_addresses, broadcast=True, ) - dependency_futures[name] = fut + futures[name] = futs[name] - assert_dependency_placement(placement, workers) + await c.gather(futures.values()) - return dependency_futures + _assert_dependency_placement(placement, workers) + return futures -def assert_dependency_placement(expected, workers): + +def _assert_dependency_placement(expected, workers): actual = [] for worker in workers: actual.append(list(worker.state.tasks.keys())) @@ -1522,31 +1620,13 @@ def assert_dependency_placement(expected, workers): assert actual == expected -async def assert_balanced_with_dependencies( - dependencies, - dependency_placement, - task_placement, - expected_placement, - recompute_saturation, - c, - s, - *workers, -): - steal = s.extensions["stealing"] - await steal.stop() - ev = Event() - +async def _place_tasks(ev, placement, dependency_futures, c, s, workers): def block(*args, event, **kwargs): event.wait() counter = itertools.count() - - dependency_futures = await place_dependencies( - dependencies, dependency_placement, c, s, workers - ) - futures = [] - for idx, tasks in enumerate(task_placement): + for worker_idx, tasks in enumerate(placement): for dependencies in tasks: i = next(counter) dep_key = "".join(sorted(dependencies)) @@ -1556,7 +1636,7 @@ def block(*args, event, **kwargs): [dependency_futures[dependency] for dependency in dependencies], event=ev, key=key, - workers=workers[idx].address, + workers=workers[worker_idx].address, allow_other_workers=True, pure=False, priority=-i, @@ -1565,27 +1645,29 @@ def block(*args, event, **kwargs): while len([ts for ts in s.tasks.values() if ts.processing_on]) < len(futures): await asyncio.sleep(0.001) - if recompute_saturation: - for ws in s.workers.values(): - s._reevaluate_occupancy_worker(ws) - try: - for _ in range(20): - steal.balance() - while steal.in_flight: - await asyncio.sleep(0.001) + assert_task_placement(placement, s, workers) - result = [ - sorted( - (list(key_split(ts.key)) for ts in s.workers[w.address].processing), - reverse=True, - ) - for w in workers - ] + return futures - if result == expected_placement: - # Release the threadpools - return - finally: - await ev.set() - raise Exception(f"Expected: {expected_placement}; got: {result}") + +def _get_task_placement(s, workers): + actual = [] + for w in workers: + actual.append( + [list(key_split(ts.key)) for ts in s.workers[w.address].processing] + ) + return actual + + +def _equal_placement(actual, expected): + return _comparable_placement(actual) == _comparable_placement(expected) + + +def _comparable_placement(placement): + return [sorted(placed) for placed in placement] + + +def assert_task_placement(expected, s, workers): + actual = _get_task_placement(s, workers) + assert _equal_placement(actual, expected) From bb9fdfa184d70acd5300cb222cbc4ca36a212faa Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 9 Sep 2022 11:06:48 +0200 Subject: [PATCH 04/28] Add more tests [WIP] --- distributed/tests/test_steal.py | 100 ++++++++++++++++++++++++++++---- 1 file changed, 88 insertions(+), 12 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index d190e50982..82a533f090 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -717,18 +717,27 @@ def block(*args, event, **kwargs): [ pytest.param([[1], []], [[1], []], id="don't move unnecessarily"), pytest.param([[0, 0], []], [[0], [0]], id="balance"), - ([[0.1, 0.1], []], [[0], [0]]), # balance even if results in even - ([[0, 0, 0], []], [[0, 0], [0]]), # don't over balance - ([[0, 0], [0, 0, 0], []], [[0, 0], [0, 0], [0]]), # move from larger - ([[0, 0, 0], [0], []], [[0, 0], [0], [0]]), # move to smaller - ([[0, 1], []], [[1], [0]]), # choose easier first - ([[0, 0, 0, 0], [], []], [[0, 0], [0], [0]]), # spread evenly - ([[1, 0, 2, 0], [], []], [[2, 1], [0], [0]]), # move easier - ([[1, 1, 1], []], [[1, 1], [1]]), # be willing to move costly items - ([[1, 1, 1, 1], []], [[1, 1, 1], [1]]), # but don't move too many - ( - [[0, 0], [0, 0], [0, 0], []], # no one clearly saturated + pytest.param( + [[0.1, 0.1], []], [[0], [0]], id="balance even if results in even" + ), + pytest.param([[0, 0, 0], []], [[0, 0], [0]], id="don't over balance"), + pytest.param( + [[0, 0], [0, 0, 0], []], [[0, 0], [0, 0], [0]], id="move from larger" + ), + pytest.param([[0, 0, 0], [0], []], [[0, 0], [0], [0]], id="move to smaller"), + pytest.param([[0, 1], []], [[1], [0]], id="choose easier first"), + pytest.param([[0, 0, 0, 0], [], []], [[0, 0], [0], [0]], id="spread evenly"), + pytest.param([[1, 0, 2, 0], [], []], [[2, 1], [0], [0]], id="move easier"), + pytest.param( + [[1, 1, 1], []], [[1, 1], [1]], id="be willing to move costly items" + ), + pytest.param( + [[1, 1, 1, 1], []], [[1, 1, 1], [1]], id="but don't move too many" + ), + pytest.param( + [[0, 0], [0, 0], [0, 0], []], [[0, 0], [0, 0], [0], [0]], + id="no one clearly saturated", ), # NOTE: There is a timing issue that workers may already start executing # tasks before we call balance, i.e. the workers will reject the @@ -736,9 +745,10 @@ def block(*args, event, **kwargs): # Particularly tests with many input tasks are more likely to fail since # the test setup takes longer and allows the workers more time to # schedule a task on the threadpool - ( + pytest.param( [[4, 2, 2, 2, 2, 1, 1], [4, 2, 1, 1], [], [], []], [[4, 2, 2, 2], [4, 2, 1, 1], [2], [1], [1]], + id="balance multiple saturated workers", ), ], ) @@ -1549,6 +1559,72 @@ async def _run_test(*args, **kwargs): )(_run_test)() +@pytest.mark.parametrize("recompute_saturation", [True, False]) +def test_balance_to_larger_dependency(recompute_saturation): + dependencies = {"a": 2, "b": 1} + dependency_placement = [["a", "b"], ["a"], ["b"]] + task_placement = [[["a", "b"], ["a", "b"], ["a", "b"]], [], []] + + def _correct_placement(actual): + actual_task_counts = [len(placed) for placed in actual] + return actual_task_counts == [ + 2, + 1, + 0, + ] # Note: The success of this test currently depends on worker ordering + + async def _run_test(*args, **kwargs): + await _run_balance_test( + dependencies, + dependency_placement, + task_placement, + _correct_placement, + recompute_saturation, + *args, + **kwargs, + ) + + config = {"distributed.scheduler.unknown-task-duration": "1s"} + gen_cluster( + client=True, + nthreads=[("", 1)] * len(task_placement), + config=config, + )(_run_test)() + + +@pytest.mark.parametrize("recompute_saturation", [True, False]) +def test_balance_move_to_busier_with_dependency(recompute_saturation): + dependencies = {"a": 4, "b": 1} + dependency_placement = [["a"], ["a", "b"], []] + task_placement = [[["a"], ["a"], ["a"]], ["b"], []] + + def _correct_placement(actual): + actual_task_counts = [len(placed) for placed in actual] + return actual_task_counts == [ + 2, + 2, + 0, + ] # Note: The success of this test currently depends on worker ordering + + async def _run_test(*args, **kwargs): + await _run_balance_test( + dependencies, + dependency_placement, + task_placement, + _correct_placement, + recompute_saturation, + *args, + **kwargs, + ) + + config = {"distributed.scheduler.unknown-task-duration": "1s"} + gen_cluster( + client=True, + nthreads=[("", 1)] * len(task_placement), + config=config, + )(_run_test)() + + async def _run_balance_test( dependencies, dependency_placement, From f4374ea6d9339ca80f2f8b9cd6f43eabd817be61 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 9 Sep 2022 11:20:00 +0200 Subject: [PATCH 05/28] Fix test --- distributed/tests/test_steal.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 82a533f090..eede69b263 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1594,14 +1594,14 @@ async def _run_test(*args, **kwargs): @pytest.mark.parametrize("recompute_saturation", [True, False]) def test_balance_move_to_busier_with_dependency(recompute_saturation): - dependencies = {"a": 4, "b": 1} + dependencies = {"a": 2, "b": 1} dependency_placement = [["a"], ["a", "b"], []] - task_placement = [[["a"], ["a"], ["a"]], ["b"], []] + task_placement = [[["a"], ["a"], ["a"], ["a"], ["a"], ["a"]], [["b"]], []] def _correct_placement(actual): actual_task_counts = [len(placed) for placed in actual] return actual_task_counts == [ - 2, + 5, 2, 0, ] # Note: The success of this test currently depends on worker ordering From 0074060d7c20e26df32778d412ea568b0fbc1e57 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 9 Sep 2022 14:13:29 +0200 Subject: [PATCH 06/28] additional tests --- distributed/tests/test_steal.py | 38 ++++++++++++++++++++++++++++++--- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index eede69b263..14b1ecc478 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1601,9 +1601,41 @@ def test_balance_move_to_busier_with_dependency(recompute_saturation): def _correct_placement(actual): actual_task_counts = [len(placed) for placed in actual] return actual_task_counts == [ - 5, - 2, - 0, + 3, + 3, + 1, + ] # Note: The success of this test currently depends on worker ordering + + async def _run_test(*args, **kwargs): + await _run_balance_test( + dependencies, + dependency_placement, + task_placement, + _correct_placement, + recompute_saturation, + *args, + **kwargs, + ) + + config = {"distributed.scheduler.unknown-task-duration": "1s"} + gen_cluster( + client=True, + nthreads=[("", 1)] * len(task_placement), + config=config, + )(_run_test)() + + +@pytest.mark.parametrize("recompute_saturation", [True, False]) +def test_balance_when_moving_dependency(recompute_saturation): + dependencies = {"a": 1} + dependency_placement = [["a"], []] + task_placement = [[["a"], ["a"], ["a"], ["a"], ["a"], ["a"], ["a"]], []] + + def _correct_placement(actual): + actual_task_counts = [len(placed) for placed in actual] + return actual_task_counts == [ + 4, + 3, ] # Note: The success of this test currently depends on worker ordering async def _run_test(*args, **kwargs): From bbfa401a2e033ea111eb6cb12af57720a98f85ea Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 9 Sep 2022 14:15:55 +0200 Subject: [PATCH 07/28] Naming/docstring --- distributed/tests/test_steal.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 14b1ecc478..2630b3c3f3 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1438,6 +1438,7 @@ def func(*args): @pytest.mark.parametrize("recompute_saturation", [True, False]) def test_balance_willing_to_move_costly_items(recompute_saturation): + """See also test_balance""" dependencies = {"a": 1, "b": 1, "c": 1} dependency_placement = [["a", "b", "c"], []] task_placement = [[["a"], ["b"], ["c"]], []] @@ -1467,6 +1468,7 @@ async def _run_test(*args, **kwargs): @pytest.mark.parametrize("recompute_saturation", [True, False]) def test_balance_but_dont_move_too_many(recompute_saturation): + """See also test_balance""" dependencies = {"a": 1, "b": 1, "c": 1, "d": 1} dependency_placement = [["a", "b", "c", "d"], []] task_placement = [[["a"], ["b"], ["c"], ["d"]], []] @@ -1593,7 +1595,7 @@ async def _run_test(*args, **kwargs): @pytest.mark.parametrize("recompute_saturation", [True, False]) -def test_balance_move_to_busier_with_dependency(recompute_saturation): +def test_balance_prefers_busier_with_dependency(recompute_saturation): dependencies = {"a": 2, "b": 1} dependency_placement = [["a"], ["a", "b"], []] task_placement = [[["a"], ["a"], ["a"], ["a"], ["a"], ["a"]], [["b"]], []] @@ -1626,7 +1628,7 @@ async def _run_test(*args, **kwargs): @pytest.mark.parametrize("recompute_saturation", [True, False]) -def test_balance_when_moving_dependency(recompute_saturation): +def test_balance_after_acquiring_dependency(recompute_saturation): dependencies = {"a": 1} dependency_placement = [["a"], []] task_placement = [[["a"], ["a"], ["a"], ["a"], ["a"], ["a"], ["a"]], []] From 09a6ff82d925ef2e297e3fe052cdfa6814f16eb9 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 9 Sep 2022 14:23:00 +0200 Subject: [PATCH 08/28] Cleanup --- distributed/tests/test_steal.py | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 2630b3c3f3..eddff6be6c 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -60,6 +60,11 @@ def __sizeof__(self) -> int: return self._nbytes +@pytest.fixture(params=[True, False]) +def recompute_saturation(request): + yield request.params + + @gen_cluster(client=True, nthreads=[("", 2), ("", 2)]) async def test_work_stealing(c, s, a, b): [x] = await c._scatter([1], workers=a.address) @@ -711,7 +716,6 @@ def block(*args, event, **kwargs): raise Exception(f"Expected: {expected2}; got: {result2}") -@pytest.mark.parametrize("recompute_saturation", [True, False]) @pytest.mark.parametrize( "inp,expected", [ @@ -1436,7 +1440,6 @@ def func(*args): assert (ntasks_per_worker < ideal * 1.5).all(), (ideal, ntasks_per_worker) -@pytest.mark.parametrize("recompute_saturation", [True, False]) def test_balance_willing_to_move_costly_items(recompute_saturation): """See also test_balance""" dependencies = {"a": 1, "b": 1, "c": 1} @@ -1466,7 +1469,6 @@ async def _run_test(*args, **kwargs): )(_run_test)() -@pytest.mark.parametrize("recompute_saturation", [True, False]) def test_balance_but_dont_move_too_many(recompute_saturation): """See also test_balance""" dependencies = {"a": 1, "b": 1, "c": 1, "d": 1} @@ -1496,7 +1498,6 @@ async def _run_test(*args, **kwargs): )(_run_test)() -@pytest.mark.parametrize("recompute_saturation", [True, False]) def test_balance_even_with_replica(recompute_saturation): dependencies = {"a": 1} dependency_placement = [["a"], ["a"]] @@ -1528,7 +1529,6 @@ async def _run_test(*args, **kwargs): )(_run_test)() -@pytest.mark.parametrize("recompute_saturation", [True, False]) def test_balance_to_replica(recompute_saturation): dependencies = {"a": 1} dependency_placement = [["a"], ["a"], []] @@ -1561,7 +1561,6 @@ async def _run_test(*args, **kwargs): )(_run_test)() -@pytest.mark.parametrize("recompute_saturation", [True, False]) def test_balance_to_larger_dependency(recompute_saturation): dependencies = {"a": 2, "b": 1} dependency_placement = [["a", "b"], ["a"], ["b"]] @@ -1594,7 +1593,6 @@ async def _run_test(*args, **kwargs): )(_run_test)() -@pytest.mark.parametrize("recompute_saturation", [True, False]) def test_balance_prefers_busier_with_dependency(recompute_saturation): dependencies = {"a": 2, "b": 1} dependency_placement = [["a"], ["a", "b"], []] @@ -1627,7 +1625,6 @@ async def _run_test(*args, **kwargs): )(_run_test)() -@pytest.mark.parametrize("recompute_saturation", [True, False]) def test_balance_after_acquiring_dependency(recompute_saturation): dependencies = {"a": 1} dependency_placement = [["a"], []] @@ -1684,9 +1681,7 @@ async def _run_balance_test( try: for _ in range(20): steal.balance() - - while steal.in_flight: - await asyncio.sleep(0.001) + await steal.stop() result = _get_task_placement(s, workers) From d383ea783ef2432905e81bfc070fac5451711fbb Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 12 Sep 2022 14:18:55 +0200 Subject: [PATCH 09/28] Fix fixture --- distributed/tests/test_steal.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index eddff6be6c..26ad98c0f5 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -62,7 +62,7 @@ def __sizeof__(self) -> int: @pytest.fixture(params=[True, False]) def recompute_saturation(request): - yield request.params + yield request.param @gen_cluster(client=True, nthreads=[("", 2), ("", 2)]) From 98ac89506ffd5419151d6b80936d1fae3d9335d6 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 12 Sep 2022 16:22:56 +0200 Subject: [PATCH 10/28] Make _get_thief smarter --- distributed/stealing.py | 17 +++++++++++++---- distributed/tests/test_steal.py | 1 + 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/distributed/stealing.py b/distributed/stealing.py index cdbcce30c4..c9dd121dd1 100644 --- a/distributed/stealing.py +++ b/distributed/stealing.py @@ -4,6 +4,7 @@ import logging from collections import defaultdict, deque from collections.abc import Container +from functools import partial from math import log2 from time import time from typing import TYPE_CHECKING, Any, ClassVar, TypedDict, cast @@ -450,6 +451,9 @@ def balance(self) -> None: stealable.discard(ts) continue + if not (thief := _pop_thief(s, ts, potential_thieves)): + continue + occ_thief = self._combined_occupancy(thief) occ_victim = self._combined_occupancy(victim) comm_cost = self.scheduler.get_comm_cost(ts, thief) @@ -520,14 +524,19 @@ def story(self, *keys_or_ts: str | TaskState) -> list: def _get_thief( scheduler: SchedulerState, ts: TaskState, potential_thieves: set[WorkerState] ) -> WorkerState | None: + if not potential_thieves: + return None + valid_workers = scheduler.valid_workers(ts) if valid_workers is not None: - subset = potential_thieves & valid_workers - if subset: - return next(iter(subset)) + valid_thieves = potential_thieves & valid_workers + if valid_thieves: + potential_thieves = valid_thieves elif not ts.loose_restrictions: return None - return next(iter(potential_thieves)) + + thief = min(potential_thieves, key=partial(scheduler.worker_objective, ts)) + return thief fast_tasks = {"split-shuffle"} diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 26ad98c0f5..2a1994d5ed 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -753,6 +753,7 @@ def block(*args, event, **kwargs): [[4, 2, 2, 2, 2, 1, 1], [4, 2, 1, 1], [], [], []], [[4, 2, 2, 2], [4, 2, 1, 1], [2], [1], [1]], id="balance multiple saturated workers", + marks=pytest.mark.skip(reason="This test is highly timing-sensitive"), ), ], ) From 450690d2430eba2f4129d96ce44ccce5be9d02d2 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 15 Sep 2022 15:00:17 +0200 Subject: [PATCH 11/28] Fix rebase issues --- distributed/stealing.py | 7 +------ distributed/tests/test_steal.py | 16 ++-------------- 2 files changed, 3 insertions(+), 20 deletions(-) diff --git a/distributed/stealing.py b/distributed/stealing.py index c9dd121dd1..82bb47df69 100644 --- a/distributed/stealing.py +++ b/distributed/stealing.py @@ -451,9 +451,6 @@ def balance(self) -> None: stealable.discard(ts) continue - if not (thief := _pop_thief(s, ts, potential_thieves)): - continue - occ_thief = self._combined_occupancy(thief) occ_victim = self._combined_occupancy(victim) comm_cost = self.scheduler.get_comm_cost(ts, thief) @@ -534,9 +531,7 @@ def _get_thief( potential_thieves = valid_thieves elif not ts.loose_restrictions: return None - - thief = min(potential_thieves, key=partial(scheduler.worker_objective, ts)) - return thief + return min(potential_thieves, key=partial(scheduler.worker_objective, ts)) fast_tasks = {"split-shuffle"} diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 2a1994d5ed..589135d47e 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -6,7 +6,6 @@ import logging import math import random -import sys import weakref from collections import defaultdict from operator import mul @@ -25,7 +24,6 @@ from distributed.metrics import time from distributed.system import MEMORY_LIMIT from distributed.utils_test import ( - SizeOf, captured_logger, freeze_batched_send, gen_cluster, @@ -50,16 +48,6 @@ teardown_module = nodebug_teardown_module -class Sizeof: - """Helper class for creating task results of a given size""" - - def __init__(self, nbytes): - self._nbytes = nbytes - sys.getsizeof(object) - - def __sizeof__(self) -> int: - return self._nbytes - - @pytest.fixture(params=[True, False]) def recompute_saturation(request): yield request.param @@ -670,7 +658,7 @@ def block(*args, event, **kwargs): for t in sorted(ts, reverse=True): if t: [dat] = await c.scatter( - [SizeOf(int(t * s.bandwidth))], workers=w.address + [gen_nbytes(int(t * s.bandwidth))], workers=w.address ) else: dat = 123 @@ -1705,7 +1693,7 @@ async def _place_dependencies(dependencies, placement, c, s, workers): for name, multiplier in dependencies.items(): worker_addresses = dependencies_to_workers[name] futs = await c.scatter( - {name: Sizeof(int(multiplier * s.bandwidth))}, + {name: gen_nbytes(int(multiplier * s.bandwidth))}, workers=worker_addresses, broadcast=True, ) From 1f52338138237e30413896086c88b008d9dcf832 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 15 Sep 2022 16:53:55 +0200 Subject: [PATCH 12/28] Review comment --- distributed/tests/test_steal.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 589135d47e..090058ac16 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1675,9 +1675,9 @@ async def _run_balance_test( result = _get_task_placement(s, workers) if correct_placement_fn(result): - # Release the threadpools return finally: + # Release the threadpools await ev.set() raise AssertionError(result) From 4d1cd0c78ba7d8c3581386b26440babac6888683 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 15 Sep 2022 17:28:29 +0200 Subject: [PATCH 13/28] Adjust expected test results --- distributed/tests/test_steal.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 090058ac16..1168b85504 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -709,6 +709,11 @@ def block(*args, event, **kwargs): [ pytest.param([[1], []], [[1], []], id="don't move unnecessarily"), pytest.param([[0, 0], []], [[0], [0]], id="balance"), + pytest.param( + [[0, 0, 0, 0, 0, 0, 0, 0], []], + [[0, 0, 0, 0, 0, 0], [0, 0]], + id="balance until none idle", + ), pytest.param( [[0.1, 0.1], []], [[0], [0]], id="balance even if results in even" ), @@ -741,7 +746,6 @@ def block(*args, event, **kwargs): [[4, 2, 2, 2, 2, 1, 1], [4, 2, 1, 1], [], [], []], [[4, 2, 2, 2], [4, 2, 1, 1], [2], [1], [1]], id="balance multiple saturated workers", - marks=pytest.mark.skip(reason="This test is highly timing-sensitive"), ), ], ) @@ -1590,8 +1594,8 @@ def test_balance_prefers_busier_with_dependency(recompute_saturation): def _correct_placement(actual): actual_task_counts = [len(placed) for placed in actual] return actual_task_counts == [ - 3, - 3, + 4, + 2, 1, ] # Note: The success of this test currently depends on worker ordering @@ -1617,13 +1621,13 @@ async def _run_test(*args, **kwargs): def test_balance_after_acquiring_dependency(recompute_saturation): dependencies = {"a": 1} dependency_placement = [["a"], []] - task_placement = [[["a"], ["a"], ["a"], ["a"], ["a"], ["a"], ["a"]], []] + task_placement = [[["a"]] * 8, []] def _correct_placement(actual): actual_task_counts = [len(placed) for placed in actual] return actual_task_counts == [ - 4, - 3, + 6, + 2, ] # Note: The success of this test currently depends on worker ordering async def _run_test(*args, **kwargs): From 34ac7c24658bd0c2ba903927fd6e6e5ee0bb700a Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 15 Sep 2022 17:30:04 +0200 Subject: [PATCH 14/28] Remove baseline tests --- distributed/tests/test_steal.py | 58 --------------------------------- 1 file changed, 58 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 1168b85504..0133ff6223 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1433,64 +1433,6 @@ def func(*args): assert (ntasks_per_worker < ideal * 1.5).all(), (ideal, ntasks_per_worker) -def test_balance_willing_to_move_costly_items(recompute_saturation): - """See also test_balance""" - dependencies = {"a": 1, "b": 1, "c": 1} - dependency_placement = [["a", "b", "c"], []] - task_placement = [[["a"], ["b"], ["c"]], []] - - def _correct_placement(actual): - actual_task_counts = [len(placed) for placed in actual] - return actual_task_counts == [2, 1] - - async def _run_test(*args, **kwargs): - await _run_balance_test( - dependencies, - dependency_placement, - task_placement, - _correct_placement, - recompute_saturation, - *args, - **kwargs, - ) - - config = {"distributed.scheduler.unknown-task-duration": "1s"} - gen_cluster( - client=True, - nthreads=[("", 1)] * len(task_placement), - config=config, - )(_run_test)() - - -def test_balance_but_dont_move_too_many(recompute_saturation): - """See also test_balance""" - dependencies = {"a": 1, "b": 1, "c": 1, "d": 1} - dependency_placement = [["a", "b", "c", "d"], []] - task_placement = [[["a"], ["b"], ["c"], ["d"]], []] - - def _correct_placement(actual): - actual_task_counts = [len(placed) for placed in actual] - return actual_task_counts == [3, 1] - - async def _run_test(*args, **kwargs): - await _run_balance_test( - dependencies, - dependency_placement, - task_placement, - _correct_placement, - recompute_saturation, - *args, - **kwargs, - ) - - config = {"distributed.scheduler.unknown-task-duration": "1s"} - gen_cluster( - client=True, - nthreads=[("", 1)] * len(task_placement), - config=config, - )(_run_test)() - - def test_balance_even_with_replica(recompute_saturation): dependencies = {"a": 1} dependency_placement = [["a"], ["a"]] From b6171b251eed512e3ec222338545faec5a0e962f Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 15 Sep 2022 21:23:55 +0200 Subject: [PATCH 15/28] Permutations (WIP) --- distributed/tests/test_steal.py | 152 +++++++++++++++++++++----------- 1 file changed, 102 insertions(+), 50 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 0133ff6223..be234303ec 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1509,23 +1509,113 @@ def _correct_placement(actual): 0, ] # Note: The success of this test currently depends on worker ordering - async def _run_test(*args, **kwargs): - await _run_balance_test( + _run_balance_test( + dependencies, + dependency_placement, + task_placement, + _correct_placement, + recompute_saturation, + ) + + +def _run_balance_test( + dependencies, + dependency_placement, + task_placement, + correct_placement_fn, + recompute_saturation, +): + nworkers = len(task_placement) + + async def _test( + dependencies, + dependency_placement, + task_placement, + correct_placement_fn, + recompute_saturation, + permutation, + c, + s, + *workers, + ): + steal = s.extensions["stealing"] + await steal.stop() + ev = Event() + + dependency_placement = [dependency_placement[i] for i in permutation] + + dependency_futures = await _place_dependencies( + dependencies, dependency_placement, c, s, workers + ) + + task_placement = [task_placement[i] for i in permutation] + + futures = await _place_tasks( + ev, task_placement, dependency_futures, c, s, workers + ) + if recompute_saturation: + for ws in s.workers.values(): + s._reevaluate_occupancy_worker(ws) + try: + for _ in range(20): + steal.balance() + await steal.stop() + + result = _get_task_placement(s, workers) + result = [result[i] for i in permutation] + if correct_placement_fn(result): + return + finally: + # Release the threadpools + await ev.set() + + raise AssertionError(result, permutation) + + config = {"distributed.scheduler.unknown-task-duration": "1s"} + + for permutation in itertools.permutations(range(nworkers)): + + def _permutation_run( dependencies, dependency_placement, task_placement, - _correct_placement, + correct_placement_fn, recompute_saturation, - *args, - **kwargs, - ) + permutation, + ): + _permutation = permutation + + async def _( + *args, + **kwargs, + ): + await _test( + dependencies, + dependency_placement, + task_placement, + correct_placement_fn, + recompute_saturation, + _permutation, + *args, + **kwargs, + ) - config = {"distributed.scheduler.unknown-task-duration": "1s"} - gen_cluster( - client=True, - nthreads=[("", 1)] * len(task_placement), - config=config, - )(_run_test)() + return _ + + gen_cluster( + client=True, + nthreads=[("", 1)] * len(task_placement), + config=config, + )( + _permutation_run( + dependencies, + dependency_placement, + task_placement, + correct_placement_fn, + recompute_saturation, + permutation, + ) + )() def test_balance_prefers_busier_with_dependency(recompute_saturation): @@ -1591,44 +1681,6 @@ async def _run_test(*args, **kwargs): )(_run_test)() -async def _run_balance_test( - dependencies, - dependency_placement, - task_placement, - correct_placement_fn, - recompute_saturation, - c, - s, - *workers, -): - steal = s.extensions["stealing"] - await steal.stop() - ev = Event() - - dependency_futures = await _place_dependencies( - dependencies, dependency_placement, c, s, workers - ) - - futures = await _place_tasks(ev, task_placement, dependency_futures, c, s, workers) - if recompute_saturation: - for ws in s.workers.values(): - s._reevaluate_occupancy_worker(ws) - try: - for _ in range(20): - steal.balance() - await steal.stop() - - result = _get_task_placement(s, workers) - - if correct_placement_fn(result): - return - finally: - # Release the threadpools - await ev.set() - - raise AssertionError(result) - - async def _place_dependencies(dependencies, placement, c, s, workers): dependencies_to_workers = defaultdict(set) for worker_idx, placed in enumerate(placement): From 5597c6e26074156d88a3d045782454f310c358ee Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 15 Sep 2022 21:46:24 +0200 Subject: [PATCH 16/28] Permutations --- distributed/tests/test_steal.py | 237 ++++++++++++-------------------- 1 file changed, 87 insertions(+), 150 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index be234303ec..e7cefab18f 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1445,23 +1445,13 @@ def _correct_placement(actual): 1, ] - async def _run_test(*args, **kwargs): - await _run_balance_test( - dependencies, - dependency_placement, - task_placement, - _correct_placement, - recompute_saturation, - *args, - **kwargs, - ) - - config = {"distributed.scheduler.unknown-task-duration": "1s"} - gen_cluster( - client=True, - nthreads=[("", 1)] * len(task_placement), - config=config, - )(_run_test)() + _run_dependency_balance_test( + dependencies, + dependency_placement, + task_placement, + _correct_placement, + recompute_saturation, + ) def test_balance_to_replica(recompute_saturation): @@ -1477,23 +1467,13 @@ def _correct_placement(actual): 0, ] # Note: The success of this test currently depends on worker ordering - async def _run_test(*args, **kwargs): - await _run_balance_test( - dependencies, - dependency_placement, - task_placement, - _correct_placement, - recompute_saturation, - *args, - **kwargs, - ) - - config = {"distributed.scheduler.unknown-task-duration": "1s"} - gen_cluster( - client=True, - nthreads=[("", 1)] * len(task_placement), - config=config, - )(_run_test)() + _run_dependency_balance_test( + dependencies, + dependency_placement, + task_placement, + _correct_placement, + recompute_saturation, + ) def test_balance_to_larger_dependency(recompute_saturation): @@ -1509,7 +1489,7 @@ def _correct_placement(actual): 0, ] # Note: The success of this test currently depends on worker ordering - _run_balance_test( + _run_dependency_balance_test( dependencies, dependency_placement, task_placement, @@ -1518,7 +1498,7 @@ def _correct_placement(actual): ) -def _run_balance_test( +def _run_dependency_balance_test( dependencies, dependency_placement, task_placement, @@ -1526,96 +1506,73 @@ def _run_balance_test( recompute_saturation, ): nworkers = len(task_placement) - - async def _test( - dependencies, - dependency_placement, - task_placement, - correct_placement_fn, - recompute_saturation, - permutation, - c, - s, - *workers, - ): - steal = s.extensions["stealing"] - await steal.stop() - ev = Event() - - dependency_placement = [dependency_placement[i] for i in permutation] - - dependency_futures = await _place_dependencies( - dependencies, dependency_placement, c, s, workers - ) - - task_placement = [task_placement[i] for i in permutation] - - futures = await _place_tasks( - ev, task_placement, dependency_futures, c, s, workers - ) - if recompute_saturation: - for ws in s.workers.values(): - s._reevaluate_occupancy_worker(ws) - try: - for _ in range(20): - steal.balance() - await steal.stop() - - result = _get_task_placement(s, workers) - result = [result[i] for i in permutation] - if correct_placement_fn(result): - return - finally: - # Release the threadpools - await ev.set() - - raise AssertionError(result, permutation) - - config = {"distributed.scheduler.unknown-task-duration": "1s"} - for permutation in itertools.permutations(range(nworkers)): - def _permutation_run( - dependencies, - dependency_placement, - task_placement, - correct_placement_fn, - recompute_saturation, - permutation, + async def _run_permutation( + *args, + permutation=permutation, + **kwargs, ): - _permutation = permutation - - async def _( - *args, - **kwargs, - ): - await _test( - dependencies, - dependency_placement, - task_placement, - correct_placement_fn, - recompute_saturation, - _permutation, - *args, - **kwargs, - ) - - return _ - - gen_cluster( - client=True, - nthreads=[("", 1)] * len(task_placement), - config=config, - )( - _permutation_run( + await _dependency_balance_test( dependencies, dependency_placement, task_placement, correct_placement_fn, recompute_saturation, permutation, + *args, + **kwargs, ) - )() + + gen_cluster( + client=True, + nthreads=[("", 1)] * len(task_placement), + config={"distributed.scheduler.unknown-task-duration": "1s"}, + )(_run_permutation)() + + +async def _dependency_balance_test( + dependencies, + dependency_placement, + task_placement, + correct_placement_fn, + recompute_saturation, + permutation, + c, + s, + *workers, +): + steal = s.extensions["stealing"] + await steal.stop() + ev = Event() + + inverse = [permutation.index(i) for i in range(len(permutation))] + dependency_placement = [dependency_placement[i] for i in permutation] + task_placement = [task_placement[i] for i in permutation] + + dependency_futures = await _place_dependencies( + dependencies, dependency_placement, c, s, workers + ) + + futures = await _place_tasks(ev, task_placement, dependency_futures, c, s, workers) + if recompute_saturation: + for ws in s.workers.values(): + s._reevaluate_occupancy_worker(ws) + try: + for _ in range(20): + steal.balance() + await steal.stop() + + result = _get_task_placement(s, workers) + result = [result[i] for i in inverse] + + if correct_placement_fn(result): + return + finally: + # Release the threadpools + await ev.set() + + raise AssertionError(result, permutation) def test_balance_prefers_busier_with_dependency(recompute_saturation): @@ -1631,23 +1588,13 @@ def _correct_placement(actual): 1, ] # Note: The success of this test currently depends on worker ordering - async def _run_test(*args, **kwargs): - await _run_balance_test( - dependencies, - dependency_placement, - task_placement, - _correct_placement, - recompute_saturation, - *args, - **kwargs, - ) - - config = {"distributed.scheduler.unknown-task-duration": "1s"} - gen_cluster( - client=True, - nthreads=[("", 1)] * len(task_placement), - config=config, - )(_run_test)() + _run_dependency_balance_test( + dependencies, + dependency_placement, + task_placement, + _correct_placement, + recompute_saturation, + ) def test_balance_after_acquiring_dependency(recompute_saturation): @@ -1662,23 +1609,13 @@ def _correct_placement(actual): 2, ] # Note: The success of this test currently depends on worker ordering - async def _run_test(*args, **kwargs): - await _run_balance_test( - dependencies, - dependency_placement, - task_placement, - _correct_placement, - recompute_saturation, - *args, - **kwargs, - ) - - config = {"distributed.scheduler.unknown-task-duration": "1s"} - gen_cluster( - client=True, - nthreads=[("", 1)] * len(task_placement), - config=config, - )(_run_test)() + _run_dependency_balance_test( + dependencies, + dependency_placement, + task_placement, + _correct_placement, + recompute_saturation, + ) async def _place_dependencies(dependencies, placement, c, s, workers): From 946fdb9d1281126f3942fff3c1a617fe8380bbe5 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 15 Sep 2022 21:47:41 +0200 Subject: [PATCH 17/28] Reorder --- distributed/tests/test_steal.py | 86 ++++++++++++++++----------------- 1 file changed, 43 insertions(+), 43 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index e7cefab18f..0636704e45 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1498,6 +1498,49 @@ def _correct_placement(actual): ) +def test_balance_prefers_busier_with_dependency(recompute_saturation): + dependencies = {"a": 2, "b": 1} + dependency_placement = [["a"], ["a", "b"], []] + task_placement = [[["a"], ["a"], ["a"], ["a"], ["a"], ["a"]], [["b"]], []] + + def _correct_placement(actual): + actual_task_counts = [len(placed) for placed in actual] + return actual_task_counts == [ + 4, + 2, + 1, + ] # Note: The success of this test currently depends on worker ordering + + _run_dependency_balance_test( + dependencies, + dependency_placement, + task_placement, + _correct_placement, + recompute_saturation, + ) + + +def test_balance_after_acquiring_dependency(recompute_saturation): + dependencies = {"a": 1} + dependency_placement = [["a"], []] + task_placement = [[["a"]] * 8, []] + + def _correct_placement(actual): + actual_task_counts = [len(placed) for placed in actual] + return actual_task_counts == [ + 6, + 2, + ] # Note: The success of this test currently depends on worker ordering + + _run_dependency_balance_test( + dependencies, + dependency_placement, + task_placement, + _correct_placement, + recompute_saturation, + ) + + def _run_dependency_balance_test( dependencies, dependency_placement, @@ -1575,49 +1618,6 @@ async def _dependency_balance_test( raise AssertionError(result, permutation) -def test_balance_prefers_busier_with_dependency(recompute_saturation): - dependencies = {"a": 2, "b": 1} - dependency_placement = [["a"], ["a", "b"], []] - task_placement = [[["a"], ["a"], ["a"], ["a"], ["a"], ["a"]], [["b"]], []] - - def _correct_placement(actual): - actual_task_counts = [len(placed) for placed in actual] - return actual_task_counts == [ - 4, - 2, - 1, - ] # Note: The success of this test currently depends on worker ordering - - _run_dependency_balance_test( - dependencies, - dependency_placement, - task_placement, - _correct_placement, - recompute_saturation, - ) - - -def test_balance_after_acquiring_dependency(recompute_saturation): - dependencies = {"a": 1} - dependency_placement = [["a"], []] - task_placement = [[["a"]] * 8, []] - - def _correct_placement(actual): - actual_task_counts = [len(placed) for placed in actual] - return actual_task_counts == [ - 6, - 2, - ] # Note: The success of this test currently depends on worker ordering - - _run_dependency_balance_test( - dependencies, - dependency_placement, - task_placement, - _correct_placement, - recompute_saturation, - ) - - async def _place_dependencies(dependencies, placement, c, s, workers): dependencies_to_workers = defaultdict(set) for worker_idx, placed in enumerate(placement): From 708b14233f9424f258ca76ac6427454e81838003 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 15 Sep 2022 21:49:26 +0200 Subject: [PATCH 18/28] Remove comment --- distributed/tests/test_steal.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 0636704e45..0bb7bbada6 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1465,7 +1465,7 @@ def _correct_placement(actual): 2, 1, 0, - ] # Note: The success of this test currently depends on worker ordering + ] _run_dependency_balance_test( dependencies, @@ -1487,7 +1487,7 @@ def _correct_placement(actual): 2, 1, 0, - ] # Note: The success of this test currently depends on worker ordering + ] _run_dependency_balance_test( dependencies, @@ -1509,7 +1509,7 @@ def _correct_placement(actual): 4, 2, 1, - ] # Note: The success of this test currently depends on worker ordering + ] _run_dependency_balance_test( dependencies, @@ -1530,7 +1530,7 @@ def _correct_placement(actual): return actual_task_counts == [ 6, 2, - ] # Note: The success of this test currently depends on worker ordering + ] _run_dependency_balance_test( dependencies, From f8a27ce5965e5645c47a446628d5ec52fa647585 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 16 Sep 2022 09:46:15 +0200 Subject: [PATCH 19/28] Improve test --- distributed/tests/test_steal.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 0bb7bbada6..e4ca94832e 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1498,17 +1498,22 @@ def _correct_placement(actual): ) -def test_balance_prefers_busier_with_dependency(recompute_saturation): - dependencies = {"a": 2, "b": 1} +def test_balance_prefers_busier_with_dependency(): + recompute_saturation = True + dependencies = {"a": 5, "b": 1} dependency_placement = [["a"], ["a", "b"], []] - task_placement = [[["a"], ["a"], ["a"], ["a"], ["a"], ["a"]], [["b"]], []] + task_placement = [ + [["a"], ["a"], ["a"], ["a"], ["a"], ["a"]], + [["b"]], + [], + ] def _correct_placement(actual): - actual_task_counts = [len(placed) for placed in actual] - return actual_task_counts == [ - 4, - 2, - 1, + actual_task_placements = [sorted(placed) for placed in actual] + return actual_task_placements == [ + [["a"], ["a"], ["a"], ["a"], ["a"]], + [["a"], ["b"]], + [], ] _run_dependency_balance_test( From eacbbae41b91347b44fed6f3a4f3c986f3299e48 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Fri, 16 Sep 2022 09:58:24 +0200 Subject: [PATCH 20/28] Remove failing test --- distributed/tests/test_steal.py | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index e4ca94832e..5e133c0f91 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1525,27 +1525,6 @@ def _correct_placement(actual): ) -def test_balance_after_acquiring_dependency(recompute_saturation): - dependencies = {"a": 1} - dependency_placement = [["a"], []] - task_placement = [[["a"]] * 8, []] - - def _correct_placement(actual): - actual_task_counts = [len(placed) for placed in actual] - return actual_task_counts == [ - 6, - 2, - ] - - _run_dependency_balance_test( - dependencies, - dependency_placement, - task_placement, - _correct_placement, - recompute_saturation, - ) - - def _run_dependency_balance_test( dependencies, dependency_placement, From ffda2cbb306b97d48db13b94305061ab4ebbb0fe Mon Sep 17 00:00:00 2001 From: fjetter Date: Fri, 16 Sep 2022 20:06:00 +0200 Subject: [PATCH 21/28] Review comments --- distributed/stealing.py | 3 --- distributed/tests/test_steal.py | 7 +++++++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/distributed/stealing.py b/distributed/stealing.py index 82bb47df69..2d0917710a 100644 --- a/distributed/stealing.py +++ b/distributed/stealing.py @@ -521,9 +521,6 @@ def story(self, *keys_or_ts: str | TaskState) -> list: def _get_thief( scheduler: SchedulerState, ts: TaskState, potential_thieves: set[WorkerState] ) -> WorkerState | None: - if not potential_thieves: - return None - valid_workers = scheduler.valid_workers(ts) if valid_workers is not None: valid_thieves = potential_thieves & valid_workers diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 5e133c0f91..250f1e0bc3 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1510,6 +1510,13 @@ def test_balance_prefers_busier_with_dependency(): def _correct_placement(actual): actual_task_placements = [sorted(placed) for placed in actual] + # FIXME: A better task placement would even be but the current balancing + # logic aborts as soon as a worker is no longer classified as idle + # return actual_task_placements == [ + # [["a"], ["a"], ["a"], ["a"]], + # [["a"], ["a"], ["b"]], + # [], + # ] return actual_task_placements == [ [["a"], ["a"], ["a"], ["a"], ["a"]], [["a"], ["b"]], From 1f2f55b8e779a6ef931a7d2db0079b703b080a6f Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 26 Sep 2022 14:37:13 +0200 Subject: [PATCH 22/28] Add docstring for _run_dependency_balance_test --- distributed/tests/test_steal.py | 32 +++++++++++++++++++++++++------- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 250f1e0bc3..a26b16d9db 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -10,6 +10,7 @@ from collections import defaultdict from operator import mul from time import sleep +from typing import Callable, Mapping import numpy as np import pytest @@ -1510,7 +1511,7 @@ def test_balance_prefers_busier_with_dependency(): def _correct_placement(actual): actual_task_placements = [sorted(placed) for placed in actual] - # FIXME: A better task placement would even be but the current balancing + # FIXME: A better task placement would be even but the current balancing # logic aborts as soon as a worker is no longer classified as idle # return actual_task_placements == [ # [["a"], ["a"], ["a"], ["a"]], @@ -1533,12 +1534,29 @@ def _correct_placement(actual): def _run_dependency_balance_test( - dependencies, - dependency_placement, - task_placement, - correct_placement_fn, - recompute_saturation, -): + dependencies: Mapping[str, int], + dependency_placement: list[list[str]], + task_placement: list[list[list[str]]], + correct_placement_fn: Callable[[list[list[list[str]]]], bool], + recompute_saturation: bool, +) -> None: + """Run a test for balancing with task dependencies according to the provided specifications. + + Parameters + ---------- + dependencies + Mapping of task dependencies to their weight. + dependency_placement + List of list of dependencies to be placed on the worker corresponding + to the index of the outer list. + task_placement + List of list of tasks to be placed on the worker corresponding to the + index of the outer list. Each task is a list of names of dependencies. + correct_placement_fn + Callable used to determine if stealing placed the tasks as expected. + recompute_saturation + Whether to recompute worker saturation before stealing. + """ nworkers = len(task_placement) for permutation in itertools.permutations(range(nworkers)): From 6e0b4689724efc751a405e97ab4a88c4beeff825 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 26 Sep 2022 15:31:02 +0200 Subject: [PATCH 23/28] Even more docstrings --- distributed/tests/test_steal.py | 158 ++++++++++++++++++++++++++------ 1 file changed, 129 insertions(+), 29 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index a26b16d9db..4c57a2a832 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -10,7 +10,7 @@ from collections import defaultdict from operator import mul from time import sleep -from typing import Callable, Mapping +from typing import Callable, Iterable, Mapping, Sequence import numpy as np import pytest @@ -19,7 +19,18 @@ import dask from dask.utils import key_split -from distributed import Event, Lock, Nanny, Worker, profile, wait, worker_client +from distributed import ( + Client, + Event, + Lock, + Nanny, + Scheduler, + Worker, + profile, + wait, + worker_client, +) +from distributed.client import Future from distributed.compatibility import LINUX from distributed.core import Status from distributed.metrics import time @@ -1540,7 +1551,11 @@ def _run_dependency_balance_test( correct_placement_fn: Callable[[list[list[list[str]]]], bool], recompute_saturation: bool, ) -> None: - """Run a test for balancing with task dependencies according to the provided specifications. + """Run a test for balancing with task dependencies according to the provided + specifications. + + This method executes the test logic for all permutations of worker placements + and generates a new cluster for each one. Parameters ---------- @@ -1556,16 +1571,20 @@ def _run_dependency_balance_test( Callable used to determine if stealing placed the tasks as expected. recompute_saturation Whether to recompute worker saturation before stealing. + + See Also + -------- + _dependency_balance_test_permutation """ nworkers = len(task_placement) for permutation in itertools.permutations(range(nworkers)): - async def _run_permutation( + async def _run( *args, permutation=permutation, **kwargs, ): - await _dependency_balance_test( + await _dependency_balance_test_permutation( dependencies, dependency_placement, task_placement, @@ -1580,23 +1599,46 @@ async def _run_permutation( client=True, nthreads=[("", 1)] * len(task_placement), config={"distributed.scheduler.unknown-task-duration": "1s"}, - )(_run_permutation)() - - -async def _dependency_balance_test( - dependencies, - dependency_placement, - task_placement, - correct_placement_fn, - recompute_saturation, - permutation, - c, - s, - *workers, -): + )(_run)() + + +async def _dependency_balance_test_permutation( + dependencies: Mapping[str, int], + dependency_placement: list[list[str]], + task_placement: list[list[list[str]]], + correct_placement_fn: Callable[[list[list[list[str]]]], bool], + recompute_saturation: bool, + permutation: list[int], + c: Client, + s: Scheduler, + *workers: Worker, +) -> None: + """Run a test for balancing with task dependencies according to the provided + specifications and worker permutations. + + Parameters + ---------- + dependencies + Mapping of task dependencies to their weight. + dependency_placement + List of list of dependencies to be placed on the worker corresponding + to the index of the outer list. + task_placement + List of list of tasks to be placed on the worker corresponding to the + index of the outer list. Each task is a list of names of dependencies. + correct_placement_fn + Callable used to determine if stealing placed the tasks as expected. + recompute_saturation + Whether to recompute worker saturation before stealing. + permutation + Permutation of workers to use for this run. + + See Also + -------- + _run_dependency_balance_test + """ steal = s.extensions["stealing"] await steal.stop() - ev = Event() inverse = [permutation.index(i) for i in range(len(permutation))] dependency_placement = [dependency_placement[i] for i in permutation] @@ -1606,7 +1648,7 @@ async def _dependency_balance_test( dependencies, dependency_placement, c, s, workers ) - futures = await _place_tasks(ev, task_placement, dependency_futures, c, s, workers) + ev, futures = await _place_tasks(task_placement, dependency_futures, c, s, workers) if recompute_saturation: for ws in s.workers.values(): s._reevaluate_occupancy_worker(ws) @@ -1627,7 +1669,31 @@ async def _dependency_balance_test( raise AssertionError(result, permutation) -async def _place_dependencies(dependencies, placement, c, s, workers): +async def _place_dependencies( + dependencies: Mapping[str, int], + placement: list[list[str]], + c: Client, + s: Scheduler, + workers: Sequence[Worker], +) -> dict[str, Future]: + """Places the dependencies on the workers as specified. + + Parameters + ---------- + dependencies + Mapping of task dependencies to their weight. + placement + List of list of dependencies to be placed on the worker corresponding to the + index of the outer list. + + Returns + ------- + Dictionary of futures matching the input dependencies. + + See Also + -------- + _run_dependency_balance_test + """ dependencies_to_workers = defaultdict(set) for worker_idx, placed in enumerate(placement): for dependency in placed: @@ -1651,6 +1717,7 @@ async def _place_dependencies(dependencies, placement, c, s, workers): def _assert_dependency_placement(expected, workers): + """Assert that dependencies are placed on the workers as expected.""" actual = [] for worker in workers: actual.append(list(worker.state.tasks.keys())) @@ -1658,7 +1725,34 @@ def _assert_dependency_placement(expected, workers): assert actual == expected -async def _place_tasks(ev, placement, dependency_futures, c, s, workers): +async def _place_tasks( + placement: list[list[list[str]]], + dependency_futures: Mapping[str, Future], + c: Client, + s: Scheduler, + workers: Sequence[Worker], +) -> tuple[Event, list[Future]]: + """Places the tasks on the workers as specified. + + Parameters + ---------- + placement + List of list of tasks to be placed on the worker corresponding to the + index of the outer list. Each task is a list of names of dependencies. + dependency_futures + Mapping of dependency names to their corresponding futures. + + Returns + ------- + Tuple of the event blocking the placed tasks and list of futures matching + the input task placement. + + See Also + -------- + _run_dependency_balance_test + """ + ev = Event() + def block(*args, event, **kwargs): event.wait() @@ -1686,26 +1780,32 @@ def block(*args, event, **kwargs): assert_task_placement(placement, s, workers) - return futures + return ev, futures -def _get_task_placement(s, workers): +def _get_task_placement( + s: Scheduler, workers: Iterable[Worker] +) -> list[list[list[str]]]: + """Return the placement of tasks on this worker""" actual = [] for w in workers: actual.append( [list(key_split(ts.key)) for ts in s.workers[w.address].processing] ) - return actual + return _deterministic_placement(actual) -def _equal_placement(actual, expected): - return _comparable_placement(actual) == _comparable_placement(expected) +def _equal_placement(left, right): + """Return True IFF the two input placements are equal.""" + return _deterministic_placement(left) == _deterministic_placement(right) -def _comparable_placement(placement): +def _deterministic_placement(placement): + """Return a deterministic ordering of the tasks or dependencies on each worker.""" return [sorted(placed) for placed in placement] def assert_task_placement(expected, s, workers): + """Assert that tasks are placed on the workers as expected.""" actual = _get_task_placement(s, workers) assert _equal_placement(actual, expected) From d0c3ff78b20cab71bd628d251b20b531111afd70 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 26 Sep 2022 15:35:57 +0200 Subject: [PATCH 24/28] Rename variables --- distributed/tests/test_steal.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 4c57a2a832..e740511fdf 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1641,14 +1641,16 @@ async def _dependency_balance_test_permutation( await steal.stop() inverse = [permutation.index(i) for i in range(len(permutation))] - dependency_placement = [dependency_placement[i] for i in permutation] - task_placement = [task_placement[i] for i in permutation] + permutated_dependency_placement = [dependency_placement[i] for i in permutation] + permutated_task_placement = [task_placement[i] for i in permutation] dependency_futures = await _place_dependencies( - dependencies, dependency_placement, c, s, workers + dependencies, permutated_dependency_placement, c, s, workers ) - ev, futures = await _place_tasks(task_placement, dependency_futures, c, s, workers) + ev, futures = await _place_tasks( + permutated_task_placement, dependency_futures, c, s, workers + ) if recompute_saturation: for ws in s.workers.values(): s._reevaluate_occupancy_worker(ws) @@ -1657,16 +1659,16 @@ async def _dependency_balance_test_permutation( steal.balance() await steal.stop() - result = _get_task_placement(s, workers) - result = [result[i] for i in inverse] + permutated_actual_placement = _get_task_placement(s, workers) + actual_placement = [permutated_actual_placement[i] for i in inverse] - if correct_placement_fn(result): + if correct_placement_fn(actual_placement): return finally: # Release the threadpools await ev.set() - raise AssertionError(result, permutation) + raise AssertionError(actual_placement, permutation) async def _place_dependencies( From 89ecd8214b62392981931ea7829b11ae928bdfc7 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 26 Sep 2022 15:38:00 +0200 Subject: [PATCH 25/28] Cleanup --- distributed/tests/test_steal.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index e740511fdf..a67eab01aa 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1667,6 +1667,7 @@ async def _dependency_balance_test_permutation( finally: # Release the threadpools await ev.set() + await c.gather(futures) raise AssertionError(actual_placement, permutation) From 288305ff7017c6a728f6745ec36f7eb47d0fa2a7 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Mon, 26 Sep 2022 15:47:54 +0200 Subject: [PATCH 26/28] Additional test --- distributed/tests/test_steal.py | 35 ++++++++++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index a67eab01aa..64f0523907 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1467,15 +1467,44 @@ def _correct_placement(actual): def test_balance_to_replica(recompute_saturation): - dependencies = {"a": 1} + dependencies = {"a": 2} dependency_placement = [["a"], ["a"], []] - task_placement = [[["a"], ["a"], ["a"]], [], []] + task_placement = [[["a"], ["a"]], [], []] def _correct_placement(actual): actual_task_counts = [len(placed) for placed in actual] return actual_task_counts == [ - 2, 1, + 1, + 0, + ] + + _run_dependency_balance_test( + dependencies, + dependency_placement, + task_placement, + _correct_placement, + recompute_saturation, + ) + + +def test_balance_multiple_to_replica(recompute_saturation): + dependencies = {"a": 6} + dependency_placement = [["a"], ["a"], []] + task_placement = [[["a"], ["a"], ["a"], ["a"], ["a"], ["a"], ["a"], ["a"]], [], []] + + def _correct_placement(actual): + actual_task_counts = [len(placed) for placed in actual] + # FIXME: A better task placement would be even but the current balancing + # logic aborts as soon as a worker is no longer classified as idle + # return actual_task_counts == [ + # 4, + # 4, + # 0, + # ] + return actual_task_counts == [ + 6, + 2, 0, ] From 6b08c7fd05acc2234cec316a0e3b5c06f79b271a Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Wed, 28 Sep 2022 18:27:56 +0200 Subject: [PATCH 27/28] Wait until tasks arrive on worker --- distributed/tests/test_steal.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 64f0523907..c873fa5f46 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1678,8 +1678,14 @@ async def _dependency_balance_test_permutation( ) ev, futures = await _place_tasks( - permutated_task_placement, dependency_futures, c, s, workers + permutated_task_placement, + permutated_dependency_placement, + dependency_futures, + c, + s, + workers, ) + if recompute_saturation: for ws in s.workers.values(): s._reevaluate_occupancy_worker(ws) @@ -1759,6 +1765,7 @@ def _assert_dependency_placement(expected, workers): async def _place_tasks( placement: list[list[list[str]]], + dependency_placement: list[list[str]], dependency_futures: Mapping[str, Future], c: Client, s: Scheduler, @@ -1771,6 +1778,9 @@ async def _place_tasks( placement List of list of tasks to be placed on the worker corresponding to the index of the outer list. Each task is a list of names of dependencies. + dependency_placement + List of list of dependencies to be placed on the worker corresponding to the + index of the outer list. dependency_futures Mapping of dependency names to their corresponding futures. @@ -1810,6 +1820,12 @@ def block(*args, event, **kwargs): while len([ts for ts in s.tasks.values() if ts.processing_on]) < len(futures): await asyncio.sleep(0.001) + while any( + len(w.state.tasks) < (len(tasks) + len(dependencies)) + for w, dependencies, tasks in zip(workers, dependency_placement, placement) + ): + await asyncio.sleep(0.001) + assert_task_placement(placement, s, workers) return ev, futures From b5afd77e3e3d67107c48c8610f2f67cba79fa6c2 Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 29 Sep 2022 13:41:20 +0200 Subject: [PATCH 28/28] Disable queueing on test_balance_prefers_busier_with_dependency --- distributed/tests/test_steal.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index c873fa5f46..c323c3746d 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -14,7 +14,7 @@ import numpy as np import pytest -from tlz import sliding_window +from tlz import merge, sliding_window import dask from dask.utils import key_split @@ -1570,6 +1570,10 @@ def _correct_placement(actual): task_placement, _correct_placement, recompute_saturation, + # This test relies on disabling queueing to flag workers as idle + config={ + "distributed.scheduler.worker-saturation": float("inf"), + }, ) @@ -1579,6 +1583,7 @@ def _run_dependency_balance_test( task_placement: list[list[list[str]]], correct_placement_fn: Callable[[list[list[list[str]]]], bool], recompute_saturation: bool, + config: dict | None = None, ) -> None: """Run a test for balancing with task dependencies according to the provided specifications. @@ -1600,7 +1605,8 @@ def _run_dependency_balance_test( Callable used to determine if stealing placed the tasks as expected. recompute_saturation Whether to recompute worker saturation before stealing. - + config + Optional configuration to apply to the test. See Also -------- _dependency_balance_test_permutation @@ -1627,7 +1633,12 @@ async def _run( gen_cluster( client=True, nthreads=[("", 1)] * len(task_placement), - config={"distributed.scheduler.unknown-task-duration": "1s"}, + config=merge( + config or {}, + { + "distributed.scheduler.unknown-task-duration": "1s", + }, + ), )(_run)()