Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Internal worker state transitions #4413

Closed
fjetter opened this issue Jan 8, 2021 · 6 comments · Fixed by #5046
Closed

Internal worker state transitions #4413

fjetter opened this issue Jan 8, 2021 · 6 comments · Fixed by #5046

Comments

@fjetter
Copy link
Member

fjetter commented Jan 8, 2021

In #4360 various deadlock situations are investigated where some of them are connected to the way valid states and state transitions of the worker[1] TaskState objects are defined.

The current documentation is slightly outdated since dependencies and runnable tasks where consolidated in #4107. The current state transitions are now following the pipeline (omitting long-running, error, constrained for the sake of clarity [2])

worker-state-master

What's remarkable about this transition pipeline is that virtually all states allow a transition to memory and there are multiple allowed transition paths which are only allowed in very specific circumstances and only upon intervention via the scheduler. For instance, a task in state flight may be transitioned via ready to executing but this is only possible if the worker actually possess the knowledge about how to execute a given task, i.e. the TaskState object possesses a set attribute runspec. This attribute is usually only known to the worker if the scheduler intents for this worker to actually execute the task. This transition path is, however, allowed since a dependency, i.e. a task without the knowledge of how to compute it, is reassigned by the scheduler for computation on this worker. This may happen if the worker where the task was intended to be computed on originally is shut down.

This ambiguity is essentially introduced by not distinguishing between dependencies and runnable tasks anymore. What I would propose is to make this distinction explicit in the state of the tasks. Consider the following pipeline

worker-state-proposed

Every task starts off in new. This is effectively a dummy state and could be omitted. It represents a known task which hasn't been classified into "can the worker compute this task or not". Based on the answer of this question it is put into the states

waiting_for_dependencies : This task is intended to be computed by this worker. Once all dependencies are available on this worker, it is supposed to be transitioned to ready to be queued up for execution. (No dependencies is a special subset of this case)

waiting_to_fetch : This task is not intended to be computed on this worker but the TaskState on this worker merely is a reference to a remote data key we are about to fetch.

The red transition is only possible via scheduler interference once the scheduler reassigns a task to be computed on this worker. This is relatively painless as long as the TaskState is in a valid state (in particular runspec is set)

Purple is similar but in this case the worker was already trying to fetch a dependency. It is similar to the red transition with the exception that a gather_dep was already scheduled and this worker is currently trying to fetch a result. If that was actually successful we might be in a position where we fast track the "to be executed" task.

I believe defining these transitions properly is essential and we should strive to set up a similar, if not identical, state machine as in the scheduler (w/ recommendations / chained state transitions). This is especially important since there are multiple data structures to keep synchronized (e.g. Worker.ready, Worker.data_needed, Worker.in_flight_workers to name a few) on top of the tasks themselves.

Last but not least, there have been questions around how Worker.release_key works, when it is called and what data is actually stored in Worker.data (is it always a subset of tasks or not). I believe settling the allowed state transitions should help settle these questions.

Alternative: Instead of implementing red/purple we could just reset to new and start all transitions from scratch. that would help reduce the number of edges/allowed transitions but would pose similar problems as the purple path in case the gather_dep is still running

My open questions:

  • How would speculative task assignment fit into this?
  • How is work stealing mapped here? (That's kind of the reverse of red/purple, isn't it?)
  • Do Actors impact this in any kind of way?

cc @gforsyth

[1] The TaskState objects of the scheduler follow a different definition and allow different state transitions. I consider the consolidation of the two out of scope for this issue.
[2] Especially the state error is very loosely defined and tasks can be transitioned to error from almost every start state


Possibly related issues
#4724
#4587
#4439
#4550
#4133
#4721
#4800
#4446

@mrocklin
Copy link
Member

mrocklin commented Jan 8, 2021

Thank you for investigating this and for writing this up.

Alternative: Instead of implementing red/purple we could just reset to new and start all transitions from scratch

In the scheduler we do this as a catch-all. If a transition doesn't exist we transition to released and then transition to the desired state. Ideally we implement all valid transitions, but this allows us to be robust to cases that we have not anticipated.

@fjetter
Copy link
Member Author

fjetter commented Apr 12, 2021

I've been working on a more complete definition of the worker state machine which is still WIP but I'd like to share already to get some feedback. This proposal is to some extend much more complex than what we have right now but in other aspects much simpler and I believe the bottom line is a more robust state machine which is also easier to be extended. In particular, I can see much more room for optimistic task assignments or even the possibility for graph reconstruction upon scheduler failure (I won't go into this other than saying that by not deleting stuff prematurely we have all necessary information on the cluster).

I will try to not discuss implementation unless absolutely required but rather focus on the desired state of how things should look like.

Below you can see the state transition diagram for what I would propose.

  • We will never remove or delete information from a TaskState instance. In particular, the intention is never inferred by whether or not a given attribute exists, is null, etc. This is particularly important for the runnable vs not-runnable transition which should be replaced with a dedicated attribute instead of basing this decision on runspec is None. This allows for easier recovery for the state machine by simply "starting from the beginning"
  • We try to eliminate as many "cross transitions" as possible. With cross transitions, I'm referring to the theoretically possible NtoN transitions between graph edges. In theory there are a lot of transitions possible which occur quite unnaturally, e.g. Ready->Memory and we're trying to eliminate those by evolving the TaskState through the sequential chain of states using a chained recommendation system like it is applied for the scheduler
  • The state machine is driven by three external actors which have different latencies and might sometimes disagree which is one of the major sources of the above mentioned "unnatural" state transitions. The three actors are the scheduler, the threadpool executor and the event loop. For instance, let's assume a task is currently executing and a thread is actively working on a task. Even if the scheduler decides to cancel this task, we do not have the possibility to kill the thread (at the very least this is not a very good idea for various reasons. maybe I'm wrong, see discussion Allow workers to cancel running task? #4694) so our intention of killing/canceling this task cannot be fulfilled immediately but at best with a certain delay. Therefore, instead of removing the task immediately from memory, the task should be transitioned into an intermediate state Canceled which signals that the task is to be forgotten but is still waiting for an external resource to be released, in this case the thread. If during this cancelation period, an external trigger (e.g. scheduler) decides to reschedule this task we can continue as before by transitioning the task into the executing state. Similar arguments can be constructed for the event loop where we're not waiting for a thread to be finished but rather a coroutine to finish, e.g. a coro fetching a dependency.
  • Work stealing creates the need to introduce a way to transition between runnable and not-runnable. My proposal would be to handle this via the external state rescheduled where there are similar safety checks implemented as compared to the canceled state. In particular the Rescheduled state is only allowed to be exited if there are no resources allocated by this task anymore. In reality this should not pose a problem for work stealing since stealing is only allowed for tasks without an allocated resource (e.g. Waiting, Ready, Fetch)
  • Ultimately, we only allow the removal of a TaskState from memory if the scheduler allows / signals this, e.g. "forget". The mechanism for this still needs some consideration but this protects us from loosing TaskState metadata prematurely. One example for prematurely lost metadata is the suspicious counter which flags tasks as erroneous if there are enough accumulated failures of some sort (xref Worker can no longer detect faulty dependencies causing infinite loop #4439)

(Note: PNG might be out of date. For most recent version check link below, feel free to leave comment but its still WIP)
Worker state transitions

https://lucid.app/lucidchart/invitations/accept/inv_6298eb14-8172-4eb8-9f68-e4f6cb0b36ef

cc @gforsyth

@gforsyth
Copy link
Contributor

This is great, @fjetter -- thanks for sharing it!

  • We will never remove or delete information from a TaskState instance. In particular, the intention is never inferred by whether or not a given attribute exists, is null, etc. This is particularly important for the runnable vs not-runnable transition which should be replaced with a dedicated attribute instead of basing this decision on runspec is None. This allows for easier recovery for the state machine by simply "starting from the beginning"

I'm very much on board with this (and the rest of your points) -- clarifying question here, the state of a TaskState instance changes -- these are currently tracked in self.story but would we want to include a history of previous states / transitions in the TaskState instance itself?

@mrocklin
Copy link
Member

mrocklin commented Apr 12, 2021 via email

@fjetter
Copy link
Member Author

fjetter commented Apr 14, 2021

Short summary of an offline discussion with @gforsyth with some hints about possible implementations

  • Intermediate states like Rescheduled, Canceled, etc. are probably a good idea. Work stealing, for instance, would always go via these intermediate states and we'd hope to gain more stability in this system by starting from scratch and transition all states through the ordinary, intended paths and keeping the overall number of allowed transitions minimal. However, this is probably not the most important area to focus on first and might require a transition/recommendation system to allow for properly cascading transitions first (see [NEVER MERGE] Initial experiment into adding chained recommendation to workers #4705 why this is not easily introduced)
  • One of the biggest source of confusion is currently the release_key implementation since it includes "Forget everything about this task" and "We're done with this, but hold on to it until X happens" and possibly even more scenarios. Pulling this apart probably requires two components. Firstly, we'll need to figure out what the scheduler actually intends to do with the key on the given worker ("delete everything" vs. "don't compute this anymore" vs ?). This could be made more transparent by offering dedicated coroutines/handlers and refactor release_key accordingly. Secondly, on the worker we currently apply some rather unstable heuristic to distinguish between the above mentioned forget/done scenarios. This could be resolved by introducing a finished or released states and figure out under which conditions the worker actually decides to forget, which is not entirely clear at the moment. Eventually, this heuristic could be replaced by an active scheduler decision (see diagram "scheduler: forget")
  • We currently do not properly distinguish between dynamic task properties (e.g. which dependencies is a task currently working on) and the static task specifications (which dependencies does this task actually have). That introduces many awkward situations in add_task and work stealing situations. Pulling this apart and treating the static metadata / task spec immutable should resolve a lot of ambiguity

@fjetter
Copy link
Member Author

fjetter commented Jun 18, 2021

We've recently merged a big PR which addresses some of the deadlock situations we've seen lately. See #4784
We currently do not have reason to believe that there are more of these deadlock situations and will therefore pause on the big worker state refactoring this issue triggered in favour of maintaining stability for a while. We will, of course, try to address missing state transitions as we go but will no longer refactor the entire state machine unless necessary.

The deadlock fixes will be released later today, see dask/community#165

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants