Skip to content

Commit

Permalink
Worker State Machine refactor: redesign TaskState and scheduler messa…
Browse files Browse the repository at this point in the history
…ges (#5922)
  • Loading branch information
crusaderky authored Mar 14, 2022
1 parent 85bf1be commit 2fffe74
Show file tree
Hide file tree
Showing 9 changed files with 636 additions and 486 deletions.
2 changes: 1 addition & 1 deletion distributed/active_memory_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from distributed.metrics import time
from distributed.utils import import_term, log_errors

if TYPE_CHECKING: # pragma: nocover
if TYPE_CHECKING:
from distributed.client import Client
from distributed.scheduler import Scheduler, TaskState, WorkerState

Expand Down
2 changes: 1 addition & 1 deletion distributed/batched.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def _background_send(self):
self.stopped.set()
self.abort()

def send(self, *msgs):
def send(self, *msgs: dict) -> None:
"""Schedule a message for sending to the other side
This completes quickly and synchronously
Expand Down
2 changes: 2 additions & 0 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1212,6 +1212,8 @@ def _to_dict_no_nest(self, *, exclude: "Container[str]" = ()) -> dict:
class TaskState:
"""
A simple object holding information about a task.
Not to be confused with :class:`distributed.worker_state_machine.TaskState`, which
holds similar information on the Worker side.
.. attribute:: key: str
Expand Down
17 changes: 6 additions & 11 deletions distributed/tests/test_failed_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
slowadd,
slowinc,
)
from distributed.worker_state_machine import TaskState

pytestmark = pytest.mark.ci1

Expand Down Expand Up @@ -494,19 +495,13 @@ async def test_worker_time_to_live(c, s, a, b):

@gen_cluster()
async def test_forget_data_not_supposed_to_have(s, a, b):
"""
If a depednecy fetch finishes on a worker after the scheduler already
released everything, the worker might be stuck with a redundant replica
which is never cleaned up.
"""If a dependency fetch finishes on a worker after the scheduler already released
everything, the worker might be stuck with a redundant replica which is never
cleaned up.
"""
# FIXME: Replace with "blackbox test" which shows an actual example where
# this situation is provoked if this is even possible.
# If this cannot be constructed, the entire superfuous_data handler and its
# corresponding pieces on the scheduler side may be removed
from distributed.worker import TaskState

ts = TaskState("key")
ts.state = "flight"
# this situation is provoked if this is even possible.
ts = TaskState("key", state="flight")
a.tasks["key"] = ts
recommendations = {ts: ("memory", 123)}
a.transitions(recommendations, stimulus_id="test")
Expand Down
70 changes: 1 addition & 69 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,7 @@
slowinc,
slowsum,
)
from distributed.worker import (
TaskState,
UniqueTaskHeap,
Worker,
error_message,
logger,
parse_memory_limit,
)
from distributed.worker import Worker, error_message, logger, parse_memory_limit

pytestmark = pytest.mark.ci1

Expand Down Expand Up @@ -3747,67 +3740,6 @@ async def test_Worker__to_dict(c, s, a):
assert d["tasks"]["x"]["key"] == "x"


@gen_cluster(client=True, nthreads=[("", 1)])
async def test_TaskState__to_dict(c, s, a):
"""tasks that are listed as dependencies of other tasks are dumped as a short repr
and always appear in full under Worker.tasks
"""
x = c.submit(inc, 1, key="x")
y = c.submit(inc, x, key="y")
z = c.submit(inc, 2, key="z")
await wait([x, y, z])

tasks = a._to_dict()["tasks"]

assert isinstance(tasks["x"], dict)
assert isinstance(tasks["y"], dict)
assert isinstance(tasks["z"], dict)
assert tasks["x"]["dependents"] == ["<TaskState 'y' memory>"]
assert tasks["y"]["dependencies"] == ["<TaskState 'x' memory>"]


def test_unique_task_heap():
heap = UniqueTaskHeap()

for x in range(10):
ts = TaskState(f"f{x}")
ts.priority = (0, 0, 1, x % 3)
heap.push(ts)

heap_list = list(heap)
# iteration does not empty heap
assert len(heap) == 10
assert heap_list == sorted(heap_list, key=lambda ts: ts.priority)

seen = set()
last_prio = (0, 0, 0, 0)
while heap:
peeked = heap.peek()
ts = heap.pop()
assert peeked == ts
seen.add(ts.key)
assert ts.priority
assert last_prio <= ts.priority
last_prio = last_prio

ts = TaskState("foo")
heap.push(ts)
heap.push(ts)
assert len(heap) == 1

assert repr(heap) == "<UniqueTaskHeap: 1 items>"

assert heap.pop() == ts
assert not heap

# Test that we're cleaning the seen set on pop
heap.push(ts)
assert len(heap) == 1
assert heap.pop() == ts

assert repr(heap) == "<UniqueTaskHeap: 0 items>"


@gen_cluster(nthreads=[])
async def test_do_not_block_event_loop_during_shutdown(s):
loop = asyncio.get_running_loop()
Expand Down
94 changes: 94 additions & 0 deletions distributed/tests/test_worker_state_machine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import pytest

from distributed.utils import recursive_to_dict
from distributed.worker_state_machine import (
ReleaseWorkerDataMsg,
SendMessageToScheduler,
TaskState,
UniqueTaskHeap,
)


def test_TaskState_get_nbytes():
assert TaskState("x", nbytes=123).get_nbytes() == 123
# Default to distributed.scheduler.default-data-size
assert TaskState("y").get_nbytes() == 1024


def test_TaskState__to_dict():
"""Tasks that are listed as dependencies or dependents of other tasks are dumped as
a short repr and always appear in full directly under Worker.tasks. Uninteresting
fields are omitted.
"""
x = TaskState("x", state="memory", done=True)
y = TaskState("y", priority=(0,), dependencies={x})
x.dependents.add(y)
actual = recursive_to_dict([x, y])
assert actual == [
{
"key": "x",
"state": "memory",
"done": True,
"dependents": ["<TaskState 'y' released>"],
},
{
"key": "y",
"state": "released",
"dependencies": ["<TaskState 'x' memory>"],
"priority": [0],
},
]


def test_unique_task_heap():
heap = UniqueTaskHeap()

for x in range(10):
ts = TaskState(f"f{x}", priority=(0,))
ts.priority = (0, 0, 1, x % 3)
heap.push(ts)

heap_list = list(heap)
# iteration does not empty heap
assert len(heap) == 10
assert heap_list == sorted(heap_list, key=lambda ts: ts.priority)

seen = set()
last_prio = (0, 0, 0, 0)
while heap:
peeked = heap.peek()
ts = heap.pop()
assert peeked == ts
seen.add(ts.key)
assert ts.priority
assert last_prio <= ts.priority
last_prio = last_prio

ts = TaskState("foo", priority=(0,))
heap.push(ts)
heap.push(ts)
assert len(heap) == 1

assert repr(heap) == "<UniqueTaskHeap: 1 items>"

assert heap.pop() == ts
assert not heap

# Test that we're cleaning the seen set on pop
heap.push(ts)
assert len(heap) == 1
assert heap.pop() == ts

assert repr(heap) == "<UniqueTaskHeap: 0 items>"


@pytest.mark.parametrize("cls", SendMessageToScheduler.__subclasses__())
def test_sendmsg_slots(cls):
smsg = cls(**dict.fromkeys(cls.__annotations__))
assert not hasattr(smsg, "__dict__")


def test_sendmsg_to_dict():
# Arbitrary sample class
smsg = ReleaseWorkerDataMsg(key="x")
assert smsg.to_dict() == {"op": "release-worker-data", "key": "x"}
Loading

0 comments on commit 2fffe74

Please sign in to comment.