-
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Layer Annotations #6701
Comments
We might make getting the annotations for tasks a method on a layer, rather than force a shared annotation across all tasks in a layer. For things like This design lets us keep it simple (single set of annotations across a collection of tasks) in the common case, but break out to something more complicated (per-task annotations) when needed. By putting this in the graph layers api we let the collections api decide which to use. We avoid the cost of storing per-task annotations when not needed, but still keep that available when needed. |
As an alternative / complement to that, would it make sense for collection methods to have an z = x.map_blocks(myfunc, annotations=dict(priority=1, retries=2)) I don't have a good sense for how these will be used yet, so I'm having trouble judging. |
Thanks for the write-up and suggestions @mrocklin, @jcrist and @TomAugspurger.
I actually went this way in #6217. In that PR the following is supported: array = da.blockwise(..., annotation={...}) and the (single) annotation was attached to the underlying Blockwise object. I thought about supporting callable's for Blockwise annotations but ultimately removed it because of concerns about remote code execution on the distributed server. So @jcrist's comments are prescient:
If we're moving Layer's to the distributed server, is it now OK to execute their methods on the server (given that they could be sub-classed)? It would certainly extend the power of the concept. @mrocklin's context manager suggestion is probably the most backwards-compatible and ergonomic, because it'd be laborious to pass through annotations to all Array/Dataframe API methods that call blockwise/map_blocks. I need to get my head around the new Thanks for the comments and the direction, the way forward seems clearer to me. |
In my mind this would all be executed client side to normalize the representation sent to the server. Whether the The main point was I think we shouldn't limit ourselves to a layer only having a set of annotations that apply to all tasks within the layer. |
To help with design I think that it would be useful to have examples of when per-task annotations within collections would have been useful. I can think of some situations, but I'm not sure if they would actually occur in the wild or not. |
As I said above, I think these would aid with efficient implementation of annotations for delayed tasks. Otherwise we'd end up with a layer per task. Edit: misread "per-task annotation within collections" as "within layers". Apologies |
I'll spend sometime thinking of other possible examples. |
I don't know it would be possible, but it would be incredibly useful to specify the maximum number of a specific task to be executed simultaneously. An example would be a map_blocks call that a developer knows has a large intermediary memory footprint per chunk. The capacity to do something like: def myfunc(arr):
return np.sum(np.repeat(arr, 1e9), keepdims=True)
y = da.ones([100], chunks=10)
z = da.map_blocks(myfunc, y, annotations=dict(max_simul=3)) would help in cases where one specific step causes problems but where there is still value in having more threads/processes running. Edit: Typos. |
I think my use-case of grouping all tasks by I want to report on how many jobs are running concurrently and for each job how many tasks completed/remaining there are. |
Just want to give another POV to @JSKenyon use case regarding concurrency limitation
This entire concepts plays into how synchronization objects like Locks, Semaphores, events, etc. are currently implemented in distributed. Currently the only possibility to block concurrent execution with these objects is by blocking within the task. However, with task annotations this could be implemented such that the "blocked" tasks are not even scheduled. This has been reported in dask/distributed#4104 and while I see both features to be of value, most users would probably choose the "don't even schedule my task if it cannot be executed" instead of "schedule task and block the worker" we currently offer. I have also a hunch that limiting concurrency in shuffle operations could prove to reduce the overall memory footprint but as of now this is incredibly hard to verify. The bottom line is, for concurrency limitation I think it would be sufficient to have one scalar annotation per layer. This in combination with an appropriate scheduler extension should be sufficient to control concurrency on task level. |
I think I'll probably work on an implementation when some of the HLG churn makes it into master. In particular, I think it would make sense to start when dask/distributed#4140 is merged. |
OK, so I'll suggest that layers have an annotations attribute, which is a dictionary mapping attribute names (like
In the case of the callable we would expand it out on the client side before sending everything to the scheduler. We could keep the concrete value as a single concrete value and expand those on the scheduler side. with dask.annotate(priority=lambda key: key[1]):
y = x + 1 |
Assuming that the attributes are resolved at layer serialization time, any dynamic generation of annotations can be done on the client side and shipped using either of the first 2 options (for delayed stuff, the dictionary mapping key names to values would be useful). I don't see the need for an optional callable, I'd assume any dynamic generation would have its own layer that could handle that code as an |
I agree that any use of the callable would have to be handled on the client side, and not shipped over. Mostly I think that callables will be more helpful for users who want to provide non-trivial annotations, but don't yet understand the key names. Consider my final example with dask.annotate(priority=lambda key: key[1]):
y = x + 1 There isn't really a way for users to provide a dictionary here without knowing what the name of |
Yeah, I can see that being nice for an |
Currently underway in #6767 |
Closed via #6767. Thanks all! |
There are a variety of reasons to annotate tasks, including resources like GPUs, memory constraints, retries, worker restrictions, and so on. There are some ways to specify annotations separately from tasks, such as with
compute(..., retries=...)
but this ends up being awkward.There have been multiple requests to annotate the tasks themselves, which would make it a bit easier to track annotations and apply them at the point of graph creation. There are at least two issues about this #3783 and #6054 and an implementation at #6217
Unfortunately this is hard because our task type,
tuple
, isn't well set up for extension. Changing to aTask
type is possible, but has some performance implications, and would be a large change at the core of the project, and so would need to be done with some care.Annotated Layers
An alternative approach would be to annotate high level graph layers which are easy to modify and in flux now and so easy to change designs. Layers maybe also side-step some of the performance concerns.
This also has some limitations, but I think that most people asking for this feature might be ok with layer-based annotations.
Current work with layers
We're currently working to include all graph layers in
Layer(Mapping)
subclasses, and communicate these layers directly to the scheduler. This gives us a nice conduit of potentially richer information. These will be applied universally across all major Dask collections maintained within the dask/dask repository.API
I'm going to suggest that we recommend using context managers for annotations like the following:
The
Layer.__init__
method would look at some global state for annotations, and apply those onto the layer on construction. Any layer made within the context block would be affected.Limitations
I think that it's not yet clear what we would do with Delayed. Delayed does currently use HighLevelGraphs, but we're a bit sensitive here on performance grounds, just because there would be a separate layer per task, and overheads might creep up a little here.
cc @sjperkins @jcrist
The text was updated successfully, but these errors were encountered: