Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker State Machine refactor: redesign TaskState and scheduler messages #5922

Merged
merged 14 commits into from
Mar 14, 2022

Conversation

crusaderky
Copy link
Collaborator

@crusaderky crusaderky commented Mar 10, 2022

In scope

  • Moved state-related classes out of worker.py to a separate module
  • Replaced Smsgs dicts with a rigorous data model
  • TaskState is now a lot more compact
  • TaskState._to_dict produces a lot terser output
  • TaskState now uses __slots__ in Python >=3.10. This should lead to substantial savings in unmanaged memory.
  • TaskState.state is now a Literal. I would much rather have Enum for worker TaskState names #5444 but this was very cheap and non-intrusive.

Out of scope

@github-actions
Copy link
Contributor

github-actions bot commented Mar 10, 2022

Unit Test Results

       12 files  ±  0         12 suites  ±0   5h 38m 49s ⏱️ + 3m 56s
  2 647 tests +  8    2 564 ✔️ +10    80 💤  - 1  2  - 2  1 🔥 +1 
13 005 runs  +28  12 364 ✔️ +31  636 💤  - 3  4  - 1  1 🔥 +1 

For more details on these failures and errors, see this check.

Results for commit 297bed6. ± Comparison against base commit 85bf1be.

♻️ This comment has been updated with latest results.

@fjetter
Copy link
Member

fjetter commented Mar 10, 2022

CI still seems to be pretty upset but otherwise the changes look good, so far

@crusaderky
Copy link
Collaborator Author

All test failures are unrelated. This is ready for review and merge.

@crusaderky crusaderky marked this pull request as ready for review March 10, 2022 14:14
@crusaderky crusaderky self-assigned this Mar 10, 2022
Comment on lines 427 to 429
.. autoclass:: distributed.worker_state_machine.UniqueTaskHeap
:members:

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this should be publicly documented

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense - removing it

@@ -5516,7 +5520,7 @@ def handle_task_erred(self, key=None, **msg):
recommendations: dict
client_msgs: dict
worker_msgs: dict
r: tuple = self.stimulus_task_erred(key=key, **msg)
r: tuple = self.stimulus_task_erred(key=key, status="error", **msg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the status kwarg for?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaned up and broken out to #5926, which this PR incorporates.

TaskFinishedMsg,
TaskState,
UniqueTaskHeap,
)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quite ugly but transitory. I expect that all *Msg classes and the state sets won't need to be imported after we move the state machine to the other module.

"rescheduled",
"resumed",
"waiting",
]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather have #5444 but this is the next best thing

Not to be confused with :class:`distributed.scheduler.TaskState`, which holds
similar information on the scheduler side.
"""

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reduced the size of the attributes declaration by a factor of 3 (docstring + class annotations + init method -> just the class annotations)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for the dataclass

Re doc string I'm fine with this but we should be aware that this obviously removes any sphinx rendering. I know scheduler state tasks are rendered atm

#: The previous state of the task. This is a state machine implementation detail.
_previous: TaskStateState | None = None
#: The next state of the task. This is a state machine implementation detail.
_next: TaskStateState | None = None
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fjetter wanna chip in on these two?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some documentation about this here

def _transition_from_resumed(
self, ts: TaskState, finish: str, *, stimulus_id: str
) -> tuple[Recs, Smsgs]:
"""`resumed` is an intermediate degenerate state which splits further up
into two states depending on what the last signal / next state is
intended to be. There are only two viable choices depending on whether
the task is required to be fetched from another worker `resumed(fetch)`
or the task shall be computed on this worker `resumed(waiting)`.
The only viable state transitions ending up here are
flight -> cancelled -> resumed(waiting)
or
executing -> cancelled -> resumed(fetch)
depending on the origin. Equally, only `fetch`, `waiting` or `released`
are allowed output states.
See also `transition_resumed_waiting`
"""

dc_slots = {"slots": True} if sys.version_info >= (3, 10) else {}


@dataclass(repr=False, eq=False, **dc_slots)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only way I could find to get __slots__ in Python 3.8/3.9 was not to use @dataclass, which in my opinion offers much bigger rewards in terms of readability.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slots are a nice performance boost for attribute access but I don't think it's required on the worker side. I'm fine with this, shouldn't cause any problems.

Comment on lines 427 to 429
.. autoclass:: distributed.worker_state_machine.UniqueTaskHeap
:members:

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense - removing it

@crusaderky
Copy link
Collaborator Author

Blocked by #5926

@crusaderky crusaderky marked this pull request as draft March 10, 2022 18:45
@crusaderky
Copy link
Collaborator Author

#5926 no longer blocks this issue

traceback=ts.traceback,
exception_text=ts.exception_text,
traceback_text=ts.traceback_text,
)
Copy link
Collaborator Author

@crusaderky crusaderky Mar 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keys "status", "thread", and "startstops" were ignored by the scheduler

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are not. They are subtly picked up by extensions such as TaskStream and EventStream. -__-

@crusaderky crusaderky marked this pull request as ready for review March 11, 2022 15:01
@crusaderky
Copy link
Collaborator Author

All test failures are unrelated. Ready for final review and merge.

crusaderky added a commit to crusaderky/distributed that referenced this pull request Mar 11, 2022
dc_slots = {"slots": True} if sys.version_info >= (3, 10) else {}


@dataclass(repr=False, eq=False, **dc_slots)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slots are a nice performance boost for attribute access but I don't think it's required on the worker side. I'm fine with this, shouldn't cause any problems.

Comment on lines +70 to +72
@lru_cache
def _default_data_size() -> int:
return parse_bytes(dask.config.get("distributed.scheduler.default-data-size"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To read the config on first use instead of when loading the module like it was before, thus avoiding headaches related to module load order.

Not to be confused with :class:`distributed.scheduler.TaskState`, which holds
similar information on the scheduler side.
"""

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for the dataclass

Re doc string I'm fine with this but we should be aware that this obviously removes any sphinx rendering. I know scheduler state tasks are rendered atm

#: The previous state of the task. This is a state machine implementation detail.
_previous: TaskStateState | None = None
#: The next state of the task. This is a state machine implementation detail.
_next: TaskStateState | None = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some documentation about this here

def _transition_from_resumed(
self, ts: TaskState, finish: str, *, stimulus_id: str
) -> tuple[Recs, Smsgs]:
"""`resumed` is an intermediate degenerate state which splits further up
into two states depending on what the last signal / next state is
intended to be. There are only two viable choices depending on whether
the task is required to be fetched from another worker `resumed(fetch)`
or the task shall be computed on this worker `resumed(waiting)`.
The only viable state transitions ending up here are
flight -> cancelled -> resumed(waiting)
or
executing -> cancelled -> resumed(fetch)
depending on the origin. Equally, only `fetch`, `waiting` or `released`
are allowed output states.
See also `transition_resumed_waiting`
"""

# Note: as of Python 3.10.2, @dataclass(slots=True) doesn't work with __init__subclass__
# https://bugs.python.org/issue46970
@dataclass
class TaskFinishedMsg(SendMessageToScheduler, op="task-finished"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a functional difference to the case where I simply define the op in our subclasses, e.g.

class TaskFinishedMsg(SendMessageToScheduler):
    op = "task-finished"

The usage of metaclasses feels a bit complex. is there anything else going on that I'm not aware of or is this a style question?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's just style. Happy to remove it.

crusaderky added a commit to crusaderky/distributed that referenced this pull request Mar 14, 2022
@crusaderky
Copy link
Collaborator Author

Re doc string I'm fine with this but we should be aware that this obviously removes any sphinx rendering. I know scheduler state tasks are rendered atm

It renders fine

crusaderky added a commit to crusaderky/distributed that referenced this pull request Mar 14, 2022
@fjetter
Copy link
Member

fjetter commented Mar 14, 2022

Re doc string I'm fine with this but we should be aware that this obviously removes any sphinx rendering. I know scheduler state tasks are rendered atm

It renders fine

Interesting. Thanks for verifying. This is indeed much better and compact

@crusaderky
Copy link
Collaborator Author

@fjetter are there any outstanding points?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants