-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle edge cases between queued
and no-worker
#7259
Conversation
it's possible for tasks to not be rootish when they go into no-worker, but to be rootish when they come out.
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 15 files 15 suites 6h 30m 41s ⏱️ For more details on these failures, see this check. Results for commit 63c649d. ♻️ This comment has been updated with latest results. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A simpler approach would be to just remove the assertion from c901823. That is, to say it's okay for decide_worker_non_rootish
to occasionally be used on tasks with no dependencies and no restrictions.
I'd originally thought "but if you submit tasks to an empty cluster and then it scales up, you won't get co-assignment". But as soon as the first worker joins, all unrunnable
root-ish tasks will be assigned to it. So you won't have co-assignment anyway.
I'm leaning towards that approach instead of this, since it feels a little more consistent.
After seeing what caching is_rootish
would look like https://github.com/dask/distributed/pull/7262/files, I think this style is actually better, since it's closer to how this look when root-ish-ness is static.
Again, another option would be to cache is_rootish
on the TaskState, just so it can't change. I like the consistency, though it feels a bit heavy-handed. I also haven't thought about other consequences it could have.
We should maybe just cache it so we don't have to spend more time thinking about these consistency issues.
# Rather than implementing some `no-worker->queued` transition, we | ||
# just live with our original assessment and treat it as though queuing were disabled. | ||
# If we used `decide_worker_rootish_queuing_enabled` here, it's possible that no workers | ||
# are idle, which would leave it in `unrunnable` and cause a deadlock. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that this deadlock case is covered by test_queued_rootish_changes_while_paused
.
If you apply this diff (using decide_worker_rootish_queuing_enabled
when 'appropriate'), that test will deadlock:
diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml
index 105a45e9..f1b966b5 100644
--- a/distributed/distributed.yaml
+++ b/distributed/distributed.yaml
@@ -22,7 +22,7 @@ distributed:
events-log-length: 100000
work-stealing: True # workers should steal tasks from each other
work-stealing-interval: 100ms # Callback time for work stealing
- worker-saturation: .inf # Send this fraction of nthreads root tasks to workers
+ worker-saturation: 1.0 # Send this fraction of nthreads root tasks to workers
worker-ttl: "5 minutes" # like '60s'. Time to live for workers. They must heartbeat faster than this
pickle: True # Is the scheduler allowed to deserialize arbitrary bytestrings
preload: [] # Run custom modules with Scheduler
diff --git a/distributed/scheduler.py b/distributed/scheduler.py
index 87ffce4e..5400c50a 100644
--- a/distributed/scheduler.py
+++ b/distributed/scheduler.py
@@ -2020,9 +2020,13 @@ class SchedulerState:
assert ts in self.unrunnable
decide_worker = (
- self.decide_worker_rootish_queuing_disabled
+ (
+ partial(self.decide_worker_rootish_queuing_disabled, ts)
+ if math.isinf(self.WORKER_SATURATION)
+ else self.decide_worker_rootish_queuing_enabled
+ )
if self.is_rootish(ts)
- else self.decide_worker_non_rootish
+ else partial(self.decide_worker_non_rootish, ts)
)
# NOTE: it's possible that queuing is enabled and `is_rootish(ts)`,
# meaning this task should have been queued and `decide_worker_rootish_queuing_enabled`
@@ -2034,13 +2038,13 @@ class SchedulerState:
# If we used `decide_worker_rootish_queuing_enabled` here, it's possible that no workers
# are idle, which would leave it in `unrunnable` and cause a deadlock.
- if ws := decide_worker(ts):
+ if ws := decide_worker():
self.unrunnable.discard(ts)
worker_msgs = _add_to_processing(self, ts, ws)
# If no worker, task just stays in `no-worker`
- if self.validate and self.is_rootish(ts):
- assert ws is not None
+ # if self.validate and self.is_rootish(ts):
+ # assert ws is not None
return recommendations, client_msgs, worker_msgs
except Exception as e:
Alternative to dask#7259. I'm quite torn about which is cleaner. I'm leaning towards this because I think it's even weirder to call `decide_worker_rootish_queuing_disabled` on a root-ish task when queuing is enabled than to call `decide_worker_non_rootish` on a root-ish task. This also feels more consistent with the philosophy of "stick with the original decision". And if root-ish were a static property, this is what would happen.
Tests from dask#7259
decide_worker = ( | ||
self.decide_worker_rootish_queuing_disabled | ||
if self.is_rootish(ts) | ||
else self.decide_worker_non_rootish | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One of the biggest problems I have right now with the queuing/rootish scheduling is that we have three different decide_*
functions. From an (internal) API perspective, I don't want to have the burden of making the correct decision about which one of these APIs to call in which circumstance. I just want to call a single decide_worker
, provide it with sufficient context and it should return the proper worker. Wouldn't this already avoid the problem?
Naively I would expect that this new decide_worker would look approximately like the block in transition_waiting_processing
distributed/distributed/scheduler.py
Lines 2230 to 2242 in 17156e9
if self.is_rootish(ts): | |
# NOTE: having two root-ish methods is temporary. When the feature flag is removed, | |
# there should only be one, which combines co-assignment and queuing. | |
# Eventually, special-casing root tasks might be removed entirely, with better heuristics. | |
if math.isinf(self.WORKER_SATURATION): | |
if not (ws := self.decide_worker_rootish_queuing_disabled(ts)): | |
return {ts.key: "no-worker"}, {}, {} | |
else: | |
if not (ws := self.decide_worker_rootish_queuing_enabled()): | |
return {ts.key: "queued"}, {}, {} | |
else: | |
if not (ws := self.decide_worker_non_rootish(ts)): | |
return {ts.key: "no-worker"}, {}, {} |
Isn't this always the correct logic when deciding on a worker?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, and this is how I'd originally implemented it in #6614. But based on your feedback (which I agree with) we split it into multiple decide_worker_*
functions for different cases.
The reason we didn't wrap the three decide_worker_*
cases into one overall decide_worker
function, which always "does the right thing", is that the recommendation you make—no-worker
vs queued
—changes depending on which function you use.
So then this decide_worker
function would have to take and mutate a recommendations
dict, or at least return somehow what recommendation to make. I thought we'd decided this was a pattern we wanted to avoid.
Moreover, we'd then have to implement a no-worker->queued
and queued->no-worker
transition. That's not hard, just more complexity. If we don't do #7262, it's maybe the right thing to do instead of this PR.
# would be the most appropriate function to use. But if, at submission time, | ||
# it didn't look root-ish (`TaskGroup` too small, or cluster too big) and there were | ||
# no running workers, it would have gone to `no-worker` instead. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To my understanding this should be impossible.
A non-rootish task will go to no-worker
if there are no workers available that satisfy its worker/host/resource restrictions, or if there are no workers at all.
Tasks with restrictions cannot be rootish, so this leaves us only with the second option - 0 total threads in the cluster, in which case a TaskGroup with a single task can qualify as rootish and neither adding tasks to the group, nor removing them will change that.
If you add workers you can flip a task from rootish to non-rootish, but a rootish task would not be in no-worker to begin with.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see the tests I've added covering these cases.
or if there are no workers at all
If there are no running workers. Paused or retiring workers will contribute to total_nthreads
, allowing a task to look non-rootish (TaskGroup smaller than total_nthreads * 2
), go into no-worker
, then look root-ish when it comes out, either by adding tasks or removing workers.
client.submit
in a for loop probably the most common way to make a task look non-rootish when it enters and root-ish when it leaves, because the TaskGroup grows larger each iteration. The first nthreads * 2
tasks are non-rootish; the rest are root-ish.
Florian, Guido and I talked offline, and decided that we'll take this approach: #7259 (comment). That is, we'll explicitly implement That is the opposite of #7262. It seems like it'll be the most maintainable, since it won't do anything stateful (no weirdness around "did |
While working on #7221, I discovered some edge cases with queuing related to the
queued
vsno-worker
states, and the fact thatis_rootish
can change. (These edge cases were not created by #7221; that just made it easier to find them since we could remove the almost-dead round-robin code c901823.)When queuing is enabled, and there are no running workers (0 workers, or all paused/retiring):
Scheduler.queued
. Cool.Scheduler.unrunnable
. Possible issue.Once worker(s) are running, we schedule tasks in
unrunnable
. By this time,is_rootish(ts)
may now be True. This happens if the TaskGroup grew, or the cluster shrank, passing thelen(tg) > total_nthreads * 2
cutoff.transition_no_worker_processing
used to always assume that tasks inunrunnable
were non-rootish. This is usually the case (most of the time, they're restricted tasks), but not always.If we remove the round-robin code path, this case then fails an assertion. So we should use
decide_worker_rootish_queuing_disabled
versusdecide_worker_non_rootish
depending on whether the task is root-ish or not.More broadly, it's awkward that
is_rootish
isn't static. #6922 will be very nice once we have it.I thought about storing root-ish-ness per task (on the first call,
is_rootish
would cache it) so at least it can't change. But I don't think that's necessary, because it doesn't really matter that much whichdecide_worker
function we use in this very rare case, so long as the task gets scheduled. I could certainly see going the other way, but I felt better about loosening the assertions in thedecide_worker
functions for now.cc @fjetter @crusaderky
pre-commit run --all-files