-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Select queued tasks in stimuli, not transitions #7402
Changes from all commits
4582d6c
93f3c9e
55021bc
0135882
e98f416
231dbcb
afc4ada
89da28e
7952579
31975e5
6de7c45
1450522
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -1,6 +1,7 @@ | ||||||||||||||||||||||||||||||||||||||
from __future__ import annotations | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
import asyncio | ||||||||||||||||||||||||||||||||||||||
import itertools | ||||||||||||||||||||||||||||||||||||||
import json | ||||||||||||||||||||||||||||||||||||||
import logging | ||||||||||||||||||||||||||||||||||||||
import math | ||||||||||||||||||||||||||||||||||||||
|
@@ -86,14 +87,14 @@ async def test_administration(s, a, b): | |||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)]) | ||||||||||||||||||||||||||||||||||||||
async def test_respect_data_in_memory(c, s, a): | ||||||||||||||||||||||||||||||||||||||
x = delayed(inc)(1) | ||||||||||||||||||||||||||||||||||||||
y = delayed(inc)(x) | ||||||||||||||||||||||||||||||||||||||
x = delayed(inc)(1, dask_key_name="x") | ||||||||||||||||||||||||||||||||||||||
y = delayed(inc)(x, dask_key_name="y") | ||||||||||||||||||||||||||||||||||||||
f = c.persist(y) | ||||||||||||||||||||||||||||||||||||||
await wait([f]) | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
assert s.tasks[y.key].who_has == {s.workers[a.address]} | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
z = delayed(operator.add)(x, y) | ||||||||||||||||||||||||||||||||||||||
z = delayed(operator.add)(x, y, dask_key_name="z") | ||||||||||||||||||||||||||||||||||||||
f2 = c.persist(z) | ||||||||||||||||||||||||||||||||||||||
while f2.key not in s.tasks or not s.tasks[f2.key]: | ||||||||||||||||||||||||||||||||||||||
assert s.tasks[y.key].who_has | ||||||||||||||||||||||||||||||||||||||
|
@@ -371,6 +372,80 @@ def __del__(self): | |||||||||||||||||||||||||||||||||||||
assert max(Refcount.log) <= s.total_nthreads | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
@gen_cluster(client=True, nthreads=[("", 1)]) | ||||||||||||||||||||||||||||||||||||||
async def test_forget_tasks_while_processing(c, s, a, b): | ||||||||||||||||||||||||||||||||||||||
events = [Event() for _ in range(10)] | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
futures = c.map(Event.wait, events) | ||||||||||||||||||||||||||||||||||||||
await events[0].set() | ||||||||||||||||||||||||||||||||||||||
await futures[0] | ||||||||||||||||||||||||||||||||||||||
await c.close() | ||||||||||||||||||||||||||||||||||||||
assert not s.tasks | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
@pytest.mark.slow | ||||||||||||||||||||||||||||||||||||||
@gen_cluster(client=True, nthreads=[("", 1)], Worker=Nanny) | ||||||||||||||||||||||||||||||||||||||
async def test_restart_while_processing(c, s, a, b): | ||||||||||||||||||||||||||||||||||||||
events = [Event() for _ in range(10)] | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
futures = c.map(Event.wait, events) | ||||||||||||||||||||||||||||||||||||||
await events[0].set() | ||||||||||||||||||||||||||||||||||||||
await futures[0] | ||||||||||||||||||||||||||||||||||||||
# TODO slow because worker waits a while for the task to finish | ||||||||||||||||||||||||||||||||||||||
await c.restart() | ||||||||||||||||||||||||||||||||||||||
assert not s.tasks | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
@gen_cluster( | ||||||||||||||||||||||||||||||||||||||
client=True, | ||||||||||||||||||||||||||||||||||||||
nthreads=[("", 1)] * 3, | ||||||||||||||||||||||||||||||||||||||
config={"distributed.scheduler.worker-saturation": 1.0}, | ||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||
async def test_queued_release_multiple_workers(c, s, *workers): | ||||||||||||||||||||||||||||||||||||||
async with Client(s.address, asynchronous=True) as c2: | ||||||||||||||||||||||||||||||||||||||
event = Event(client=c2) | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
rootish_threshold = s.total_nthreads * 2 + 1 | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
first_batch = c.map( | ||||||||||||||||||||||||||||||||||||||
lambda i: event.wait(), | ||||||||||||||||||||||||||||||||||||||
range(rootish_threshold), | ||||||||||||||||||||||||||||||||||||||
key=[f"first-{i}" for i in range(rootish_threshold)], | ||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||
await async_wait_for(lambda: s.queued, 5) | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
second_batch = c2.map( | ||||||||||||||||||||||||||||||||||||||
lambda i: event.wait(), | ||||||||||||||||||||||||||||||||||||||
range(rootish_threshold), | ||||||||||||||||||||||||||||||||||||||
key=[f"second-{i}" for i in range(rootish_threshold)], | ||||||||||||||||||||||||||||||||||||||
fifo_timeout=0, | ||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||
await async_wait_for(lambda: second_batch[0].key in s.tasks, 5) | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
# All of the second batch should be queued after the first batch | ||||||||||||||||||||||||||||||||||||||
assert [ts.key for ts in s.queued.sorted()] == [ | ||||||||||||||||||||||||||||||||||||||
f.key | ||||||||||||||||||||||||||||||||||||||
for f in itertools.chain(first_batch[s.total_nthreads :], second_batch) | ||||||||||||||||||||||||||||||||||||||
] | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
# Cancel the first batch. | ||||||||||||||||||||||||||||||||||||||
# Use `Client.close` instead of `del first_batch` because deleting futures sends cancellation | ||||||||||||||||||||||||||||||||||||||
# messages one at a time. We're testing here that when multiple workers have open slots, we don't | ||||||||||||||||||||||||||||||||||||||
# recommend the same queued tasks for every worker, so we need a bulk cancellation operation. | ||||||||||||||||||||||||||||||||||||||
await c.close() | ||||||||||||||||||||||||||||||||||||||
del c, first_batch | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
await async_wait_for(lambda: len(s.tasks) == len(second_batch), 5) | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
# Second batch should move up the queue and start processing | ||||||||||||||||||||||||||||||||||||||
assert len(s.queued) == len(second_batch) - s.total_nthreads, list( | ||||||||||||||||||||||||||||||||||||||
s.queued.sorted() | ||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
await event.set() | ||||||||||||||||||||||||||||||||||||||
await c2.gather(second_batch) | ||||||||||||||||||||||||||||||||||||||
Comment on lines
+399
to
+446
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test feels a bit heavy weight considering that we have a couple of very simple reproducers, see #7396 (comment) If you feel strongly about this test, fine, but please add the other two very simple reproducers as well. Regardless of all the intricate timings and queuing, etc. The reproducers there should be true regardless of what internals look like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added the reproducer from #7396 (comment) as well (with an Event to avoid timing issues): distributed/distributed/tests/test_scheduler.py Lines 375 to 383 in 7952579
This test is for #7401, which is a different issue (and I think maybe the same as #7398). EDIT: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a simple test with distributed/distributed/tests/test_scheduler.py Lines 386 to 394 in 31975e5
|
||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
@gen_cluster( | ||||||||||||||||||||||||||||||||||||||
client=True, | ||||||||||||||||||||||||||||||||||||||
nthreads=[("", 2)] * 2, | ||||||||||||||||||||||||||||||||||||||
|
@@ -4237,7 +4312,7 @@ def assert_rootish(): | |||||||||||||||||||||||||||||||||||||
await asyncio.sleep(0.005) | ||||||||||||||||||||||||||||||||||||||
assert_rootish() | ||||||||||||||||||||||||||||||||||||||
if rootish: | ||||||||||||||||||||||||||||||||||||||
assert all(s.tasks[k] in s.queued for k in keys) | ||||||||||||||||||||||||||||||||||||||
assert all(s.tasks[k] in s.queued for k in keys), [s.tasks[k] for k in keys] | ||||||||||||||||||||||||||||||||||||||
await block.set() | ||||||||||||||||||||||||||||||||||||||
# At this point we need/want to wait for the task-finished message to | ||||||||||||||||||||||||||||||||||||||
# arrive on the scheduler. There is no proper hook to wait, therefore we | ||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is hardly worth mentioning, but potential for a negligible scheduler performance change here (which may not actually be possible):
Now, every time a task completes, we run
_task_slots_available
on all idle workers. Before, we only ran it on the one worker that just completed a task. In most of those cases,len(self.idle_task_count)
should be 1, so no difference.I mention this because
_task_slots_available
does already show up in py-spy profiles of the scheduler (usually around 0.5-1%), because it's already called frequently. This would maybe allow it to be called even more frequently.But I don't think it can actually run more than it needs to:
When there are more threads than root tasks, so many workers are idle,
queued
would be empty, so this wouldn't run.When there are more root tasks than threads, and we're queuing, at most one worker should ever be idle: as soon as it becomes idle, it gets another queued task and is no longer idle.
The one exception is
client.close()
releasing many processing tasks at once, while a different client has tasks on the queue. In that case, there's a lot of rescheduling to do, and we do need to look at all the workers that just became idle, so no unnecessary work there either.tl;dr I don't see a theoretical way for this to be a problem, but I haven't benchmarked or profiled to be sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for highlighting.
Apart from the theoretical analysis, this is also something we can easily optimize if it becomes a problem, e.g.
saturation * nthreads
is not something we need to compute every time and even theprocessing - long running
could be replaced with a counter we inc/dec during transitions.TLDR Not concerned and "fixing" it right now feels premature