-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Respect task ordering when making worker assignments #4922
base: main
Are you sure you want to change the base?
Conversation
In situations where tasks have many related tasks, and few dependencies among them, we try to co-schedule those tasks onto similar workers according to their dask.ordering. We do this in hopes that this reduces the burden of communication on their dependents.
It's not worth the effort otherwise
Thanks tests!
|
||
# If our group is large with few dependencies | ||
# Then assign sequential tasks to similar workers, even if occupancy isn't ideal | ||
if len(ts._group) > nthreads * 2 and sum(map(len, ts._group._dependencies)) < 5: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Isn't the length of all dependencies of a TG potentially very expensive? The length of a group iterates over all
TaskState
s in a given group. For some topologies, this would require us to iterate over all tasks (-1), wouldn't it? - Is there any way to reason about the numeric values here? I think I'm still lacking intuition for TGs to tell how stable this heuristic is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The length of a group iterates over all TaskStates in a given group
The code looks like this
def __init__(self, ...):
self._states = {"memory": 0, "processing": 0, ...}
def __len__(self):
return sum(self._states.values())
So it's not as bad as it sounds. However, iterating the dict of a few elements could still be concerning. If so we could always keep a _len
value around. It would be cheap to maintain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any way to reason about the numeric values here? I think I'm still lacking intuition for TGs to tell how stable this heuristic is.
The 2 is because we want more than one task per worker to be allocated. If there are more or equal workers as tasks then we're unlikely to co-schedule any tasks on similar workers, so this is a moot point.
The < 5
is really saying "we want there to be almost no dependencies for the tasks in this group, but we're going to accept a common case of all tasks depending on some parameter or something like an abstract zarr file". We're looking for cases where the dependency won't significantly affect the distribution of tasks throughout the cluster. This could be len(dependencies) in (0, 1)
but we figured we'd allow a couple of these just in case.
I expect that the distribution here will be bi-modal with tasks either in (0, 1)
or in the hundreds or thousands. Five seemed like a good separator value in that distribution. I think that, given the distribution, this choice is stable and defensible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code looks like this
Right, that's state
as in {Running, Memory, Released} and not state as in TaskState
and is an aggregated dict. I was already a bit thrown off when I saw that. That's perfectly fine.
I expect that the distribution here will be bi-modal with tasks either in (0, 1) or in the hundreds or thousands.
Thanks for the detailed description. I think I was thrown off by the TaskGroup semantics again. I was thinking about our typical tree reductions where we have usually task splits like 8 or 16. These are the situations where one would want to group all dependencies for the first reduction.
However, for group dependencies this should be a trivial dependency of one, correct?
Then, five is conservative, I agree 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps state
should have been called state_counts
. Oh well.
Ah, it's not len(ts._group._dependencies)
which is what you're describing, I think. It's sum(map(len, ts._group._dependencies)) < 5
.
We're counting up all of the dependencies for all of the tasks that are like this task. So in a tree reduction, this number would likely be in the thousands for any non-trivially sized computation. It is non-zero and less than five only in cases like the following:
a1 a2 a3 a4 a5 a6 a7 a8
\ \ \ \ / / / /
b
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really we're looking for cases where the number of dependencies, amortized over all similar tasks, is near-zero.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the "ish" in "root-ish" tasks that we sometimes talk about here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps state should have been called state_counts. Oh well.
naming is hard
We're counting up all of the dependencies for all of the tasks that are like this task. So in a tree reduction, this number would likely be in the thousands for any non-trivially sized computation. It is non-zero and less than five only in cases like the following:
Really we're looking for cases where the number of dependencies, amortized over all similar tasks, is near-zero.
This is the "ish" in "root-ish" tasks that we sometimes talk about here.
I think I got it now. That's an interesting approach to gauge the local topology. What I'm currently wondering is if this or a closely related metric (e.g. ratio of group dependents/dependencies) could be used to estimate whether a task has the potential to increase/decrease parallelism. that'd be an interesting metric for work stealing.
anyhow, don't want to increase the scope here. this is a discussion we can delay. I'll let the professionals back to work! thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that it could be a useful metric for memory consuming/producing tasks.
It's also, yes, a good metric for increasing parallelism. My experience though is that we are always in a state of abundant parallelism, and that scheduling to increase parallelism is not worth considering in our domain.
Instead we should focus our scheduling decisions to reduce memory use and free intermediate tasks quickly.
This gets test_scheduler.py::test_reschedule to pass
The test failure is |
Running this code (uncomment the first bit to generate the zarr array) import xarray as xr
import dask.array as da
from distributed import Client, LocalCluster
import coiled
if __name__ == "__main__":
cluster = LocalCluster(
processes=True, n_workers=4, threads_per_worker=1, memory_limit=0
)
client = Client(cluster)
# Write a zarr array to disk (requires 100GB free disk space!)
# Comment this out once you've run it once.
# data = da.zeros((12500000, 1000), chunks=(12500000, 1))
# ds = xr.Dataset({"data": (("x", "y"), data)})
# ds.to_zarr("test.zarr")
# print("Saved zarr")
# Do the same array-sum example, but from zarr.
ds_zarr = xr.open_zarr("test.zarr")
with coiled.performance_report("zarr-4899.html"):
ds_zarr.sum("y").compute() causes a lot of transfers using this branch (performance report) compared to #4899 (performance report). I believe this is because, when moving on to a new worker, this is still using the typical candidate-restricting logic—see commit message of 0fbb75e for an explanation. |
Ah, right. Would this be solved by your trick of including a few workers from the general pool into the mix? We might also consider applying the root-ish check when we check for dependencies. If there are far fewer dependencies than tasks in this group then we just fall back to the all_workers case. |
Yes, but I think we should not consider dependencies at all when selecting candidates in this case: distributed/distributed/scheduler.py Lines 7570 to 7576 in 0fbb75e
So rather than picking candidates as usual and then adding a few random workers, I think we should only use random workers in this instance. The whole point of the "ish" in root-ish tasks is that it's a case where we've decided dependencies don't matter. |
There are two proposals that came out of conversation here:
They might both make sense |
In situations where tasks have many related tasks, and few dependencies
among them, we try to co-schedule those tasks onto similar workers
according to their dask.ordering. We do this in hopes that this reduces
the burden of communication on their dependents.