Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Layer Annotations #6701

Closed
mrocklin opened this issue Oct 2, 2020 · 17 comments
Closed

Layer Annotations #6701

mrocklin opened this issue Oct 2, 2020 · 17 comments

Comments

@mrocklin
Copy link
Member

mrocklin commented Oct 2, 2020

There are a variety of reasons to annotate tasks, including resources like GPUs, memory constraints, retries, worker restrictions, and so on. There are some ways to specify annotations separately from tasks, such as with compute(..., retries=...) but this ends up being awkward.

There have been multiple requests to annotate the tasks themselves, which would make it a bit easier to track annotations and apply them at the point of graph creation. There are at least two issues about this #3783 and #6054 and an implementation at #6217

Unfortunately this is hard because our task type, tuple, isn't well set up for extension. Changing to a Task type is possible, but has some performance implications, and would be a large change at the core of the project, and so would need to be done with some care.

Annotated Layers

An alternative approach would be to annotate high level graph layers which are easy to modify and in flux now and so easy to change designs. Layers maybe also side-step some of the performance concerns.

This also has some limitations, but I think that most people asking for this feature might be ok with layer-based annotations.

Current work with layers

We're currently working to include all graph layers in Layer(Mapping) subclasses, and communicate these layers directly to the scheduler. This gives us a nice conduit of potentially richer information. These will be applied universally across all major Dask collections maintained within the dask/dask repository.

API

I'm going to suggest that we recommend using context managers for annotations like the following:

x = da.ones(10)
y = da.ones(10)

with dask.annotate(priority=1, retries=2):
    z = x + y

The Layer.__init__ method would look at some global state for annotations, and apply those onto the layer on construction. Any layer made within the context block would be affected.

Limitations

I think that it's not yet clear what we would do with Delayed. Delayed does currently use HighLevelGraphs, but we're a bit sensitive here on performance grounds, just because there would be a separate layer per task, and overheads might creep up a little here.

cc @sjperkins @jcrist

@jcrist
Copy link
Member

jcrist commented Oct 2, 2020

I think that it's not yet clear what we would do with Delayed. Delayed does currently use HighLevelGraphs, but we're a bit sensitive here on performance grounds, just because there would be a separate layer per task, and overheads might creep up a little here.

We might make getting the annotations for tasks a method on a layer, rather than force a shared annotation across all tasks in a layer. For things like Blockwise the layer might only store a static annotation, and the method returns that annotation mapped across all tasks (or when we move to sending layers directly to the scheduler, the static version could be sent and interpreted there). We then could add a new DelayedSubgraph layer that would hold all tasks in a delayed object as well as a collection of relevant annotations.

This design lets us keep it simple (single set of annotations across a collection of tasks) in the common case, but break out to something more complicated (per-task annotations) when needed. By putting this in the graph layers api we let the collections api decide which to use. We avoid the cost of storing per-task annotations when not needed, but still keep that available when needed.

@TomAugspurger
Copy link
Member

I'm going to suggest that we recommend using context managers

As an alternative / complement to that, would it make sense for collection methods to have an annotations keyword argument, when you only want to annotate a single layer?

z = x.map_blocks(myfunc, annotations=dict(priority=1, retries=2))

I don't have a good sense for how these will be used yet, so I'm having trouble judging.

@sjperkins
Copy link
Member

Thanks for the write-up and suggestions @mrocklin, @jcrist and @TomAugspurger.

I'm going to suggest that we recommend using context managers

As an alternative / complement to that, would it make sense for collection methods to have an annotations keyword argument, when you only want to annotate a single layer?

z = x.map_blocks(myfunc, annotations=dict(priority=1, retries=2))

I don't have a good sense for how these will be used yet, so I'm having trouble judging.

I actually went this way in #6217. In that PR the following is supported:

array = da.blockwise(...,  annotation={...})

and the (single) annotation was attached to the underlying Blockwise object. I thought about supporting callable's for Blockwise annotations but ultimately removed it because of concerns about remote code execution on the distributed server. So @jcrist's comments are prescient:

We might make getting the annotations for tasks a method on a layer, rather than force a shared annotation across all tasks in a layer. For things like Blockwise the layer might only store a static annotation, and the method returns that annotation mapped across all tasks (or when we move to sending layers directly to the scheduler, the static version could be sent and interpreted there). We then could add a new DelayedSubgraph layer that would hold all tasks in a delayed object as well as a collection of relevant annotations.

If we're moving Layer's to the distributed server, is it now OK to execute their methods on the server (given that they could be sub-classed)? It would certainly extend the power of the concept.

@mrocklin's context manager suggestion is probably the most backwards-compatible and ergonomic, because it'd be laborious to pass through annotations to all Array/Dataframe API methods that call blockwise/map_blocks.

I need to get my head around the new Layer(Mapping) functionality introduced in #6510 (but now reverted). I'll probably wait for this to be re-introduced before experimenting further.

Thanks for the comments and the direction, the way forward seems clearer to me.

@jcrist
Copy link
Member

jcrist commented Oct 5, 2020

If we're moving Layer's to the distributed server, is it now OK to execute their methods on the server (given that they could be sub-classed)? It would certainly extend the power of the concept

In my mind this would all be executed client side to normalize the representation sent to the server. Whether the annotations is a property or a method I don't particularly care, my saying it's a method I just meant that for certain representations we would only need to store a single annotation on the layer that could later be broadcast to all tasks. Alternatively, we could make the server understand annotation broadcasting itself, and have .annotations be Union[Set[str], List[Set[Str]]] or something like that. Lots of options here.

The main point was I think we shouldn't limit ourselves to a layer only having a set of annotations that apply to all tasks within the layer.

@mrocklin
Copy link
Member Author

mrocklin commented Oct 5, 2020

To help with design I think that it would be useful to have examples of when per-task annotations within collections would have been useful. I can think of some situations, but I'm not sure if they would actually occur in the wild or not.

@jcrist
Copy link
Member

jcrist commented Oct 5, 2020

As I said above, I think these would aid with efficient implementation of annotations for delayed tasks. Otherwise we'd end up with a layer per task.

Edit: misread "per-task annotation within collections" as "within layers". Apologies

@sjperkins
Copy link
Member

To help with design I think that it would be useful to have examples of when per-task annotations within collections would have been useful. I can think of some situations, but I'm not sure if they would actually occur in the wild or not.

  • Estimation of the memory output per task, particularly for the Array + Dataframe collections. This would help schedulers prioritise graph paths that minimise memory footprint, as well as preferring small tasks for transfer to other node transfer/work-stealing. Heterogenously chunked arrays/dataframes would be the obvious use case here: I can imagine mapping the Array/Dataframe chunk spec over the key space to produce size annotations per task as per @jcrist's mapping concept. Also see Support Task Annotations within the graph #3783 (comment) for further motivation.

  • Support different priorities per task to support re-ordering algorithms as suggested by @madsbk in [FEA] Task Graph Annotation #6054. I think this may have application in things like Dataframe shuffling but I've thought about it less.

I'll spend sometime thinking of other possible examples.

@JSKenyon
Copy link
Contributor

JSKenyon commented Oct 6, 2020

To help with design I think that it would be useful to have examples of when per-task annotations within collections would have been useful. I can think of some situations, but I'm not sure if they would actually occur in the wild or not.

I don't know it would be possible, but it would be incredibly useful to specify the maximum number of a specific task to be executed simultaneously. An example would be a map_blocks call that a developer knows has a large intermediary memory footprint per chunk. The capacity to do something like:

def myfunc(arr):
    return np.sum(np.repeat(arr, 1e9), keepdims=True)

y = da.ones([100], chunks=10)
z = da.map_blocks(myfunc, y, annotations=dict(max_simul=3))

would help in cases where one specific step causes problems but where there is still value in having more threads/processes running.

Edit: Typos.

@dhirschfeld
Copy link

To help with design I think that it would be useful to have examples of when per-task annotations within collections would have been useful.

I think my use-case of grouping all tasks by job-name (where job-name is an annotation) fits the bill?

I want to report on how many jobs are running concurrently and for each job how many tasks completed/remaining there are.

@fjetter
Copy link
Member

fjetter commented Oct 12, 2020

To help with design I think that it would be useful to have examples of when per-task annotations within collections would have been useful.

Just want to give another POV to @JSKenyon use case regarding concurrency limitation

An example would be a map_blocks call that a developer knows has a large intermediary memory footprint per chunk. The capacity to do something like:

This entire concepts plays into how synchronization objects like Locks, Semaphores, events, etc. are currently implemented in distributed. Currently the only possibility to block concurrent execution with these objects is by blocking within the task. However, with task annotations this could be implemented such that the "blocked" tasks are not even scheduled. This has been reported in dask/distributed#4104 and while I see both features to be of value, most users would probably choose the "don't even schedule my task if it cannot be executed" instead of "schedule task and block the worker" we currently offer.

I have also a hunch that limiting concurrency in shuffle operations could prove to reduce the overall memory footprint but as of now this is incredibly hard to verify.

The bottom line is, for concurrency limitation I think it would be sufficient to have one scalar annotation per layer. This in combination with an appropriate scheduler extension should be sufficient to control concurrency on task level.

@sjperkins
Copy link
Member

I think I'll probably work on an implementation when some of the HLG churn makes it into master. In particular, I think it would make sense to start when dask/distributed#4140 is merged.

@mrocklin
Copy link
Member Author

OK, so I'll suggest that layers have an annotations attribute, which is a dictionary mapping attribute names (like "priority") to any of ...

  1. A concrete value to be applied to all keys in the layer

    annotations={"priority": 1}
  2. A dictionary mapping key names to values

    annotations={"priority": {("x": 0): 0, ("x", 1): 1, ("x", 2): 2}}  # higher priority to later keys
  3. A callable mapping key names to values

    annotations={"priority": lambda key: key[1]}  # higher priority to later keys

In the case of the callable we would expand it out on the client side before sending everything to the scheduler. We could keep the concrete value as a single concrete value and expand those on the scheduler side.

with dask.annotate(priority=lambda key: key[1]):
    y = x + 1

@jcrist
Copy link
Member

jcrist commented Oct 14, 2020

A callable mapping key names to values

Assuming that the attributes are resolved at layer serialization time, any dynamic generation of annotations can be done on the client side and shipped using either of the first 2 options (for delayed stuff, the dictionary mapping key names to values would be useful). I don't see the need for an optional callable, I'd assume any dynamic generation would have its own layer that could handle that code as an @property implementation annotations attribute.

@mrocklin
Copy link
Member Author

I agree that any use of the callable would have to be handled on the client side, and not shipped over. Mostly I think that callables will be more helpful for users who want to provide non-trivial annotations, but don't yet understand the key names. Consider my final example

with dask.annotate(priority=lambda key: key[1]):
    y = x + 1

There isn't really a way for users to provide a dictionary here without knowing what the name of y is going to be. However, maybe your point is that we can expand this immediately, and never store the callable on the layer. I'm fine with that.

@jcrist
Copy link
Member

jcrist commented Oct 14, 2020

However, maybe your point is that we can expand this immediately, and never store the callable on the layer. I'm fine with that.

Yeah, I can see that being nice for an annotate context manager, but I think at the layer level the first two options should be sufficient.

@sjperkins
Copy link
Member

Currently underway in #6767

@jrbourbeau
Copy link
Member

Closed via #6767. Thanks all!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants