Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] Services for out-of-band operations #5948

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from

Conversation

gjoseph92
Copy link
Collaborator

This is a draft of the interface for a Service—a new interface in distributed that could facilitate doing out-of-band operations in a controlled way.

From the docstring:

Services provide a structured way to combine task graphs of pure functions (Dask's main
use case) with stateful, distributed subsystems. Some algorithms (DataFrame shuffling,
array rechunking, etc.) are inefficient when represented as task graphs, but can be
implemented simply and performantly as bespoke systems that handle their own low-level
data transfer and computation manually.

The Service interface offers a structured way to "hand off" data from the task-graph
paradigm to these out-of-band operations, and to "hand back" the results so downstream
tasks can use them, while maintaining resilience of the overall graph.

I've been kicking around this idea for a bit as a better framework for P2P shuffling (xref #5435, #5520, #5524, #5939) that solves both the resilience needs, and some tricky edge cases around graph structure (culling, partial recomputation). I'm hoping it could be generally applicable to similar problems in the future (array rechunking, map-overlap, ML training, etc.)

I suggest reading the docstring for those interested, and reading the Service ABC for those really interested :)

I think actually implementing this would be somewhat similar to what it took to add Actors (Services and Actors are kinda similar), in terms of special-casing things in many scheduler transition functions, and adding support on Workers for running these.

The whole idea is a little hacky, and a little odd, and adding even more complexity right now feels like the wrong direction. But if we're serious about wanting to do things like P2P shuffles, building a first-class framework for expressing out-of-band operations within the task graph will be the best approach to resilience.

Is it over-optimized for P2P shuffles? Maybe. Could it all be implemented as a huge P2P shuffle scheduler plugin? Maybe. Personally though, I find it easier to think about (and test, and ensure correctness) in this generic sense than to merge with all the details of shuffling.

cc @mrocklin @fjetter

@github-actions
Copy link
Contributor

github-actions bot commented Mar 16, 2022

Unit Test Results

       12 files  ±0         12 suites  ±0   5h 43m 40s ⏱️ + 5m 36s
  2 653 tests ±0    2 566 ✔️  - 4    80 💤 ±0  7 +4 
13 021 runs  ±0  12 378 ✔️  - 4  635 💤 ±0  8 +4 

For more details on these failures, see this check.

Results for commit 1ac83b7. ± Comparison against base commit 7a69b5e.

♻️ This comment has been updated with latest results.

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like a more technical description of what we called "virtual tasks" previously. At least what I was thinking about when talking about the virtual tasks is similar to the service_result in this document.
There are surely differences but this gives the entire thing a better foundation. I generally like the approach but I think there are still a few things missing. I'm particularly interested in discussing the necessary scheduler modifications to support this (on a high level).

From our recent discussions about design processes I believe this belongs in https://github.com/dask/design-docs

Comment on lines +85 to +88
Leader election
---------------
* At startup, one instance is designated as the "leader", making it easier to coordinate
initialization tasks that need to run exactly once.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we require any leaders? There sure may be applications needing this but does the shuffle? Does the array rechunking or any other application that would soon benefit from this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't shuffling would. Not sure of other things would either; it just seemed easy and useful to offer, but a pain to implement yourself. Both the shuffle PRs needed a form of leader election to do some initialization, but that was for doing things like starting up the shuffle plugin and sending the list of peer workers, both of which would be handled automatically by the Service framework.

I also realize a more useful implementation might be passing the leader ID to all instances, rather than just True/False.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than leader election, I would propose that we just embrace the scheduler in this case.

We have a natural leader that everyone agrees on. Let's use it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be fine with that for now, seeing as we don't have an immediate need for it.

As a creator of a Service, or maybe a third-party library that uses Services, it would just be nice to not have to create a scheduler plugin and get it registered just to do some simple initialization task.

Comment on lines +80 to +83
* The RPC handle allows a `Service` to call arbitrary methods on its peers on other
workers.
* Loopback calls (using the RPC to call yourself) take a fastpath, bypassing
serialization and the network stack.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are attributes I would like to see generally from our RPC framework

distributed/service.py Outdated Show resolved Hide resolved
Comment on lines +117 to +118
If a task depends on a `Service`, that must be its only dependency. For example, this
will raise an error when the graph is submitted to the scheduler:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate a bit further why we need this restriction?

I get that a service is evaluated as some kind of "apply on everything", similar to a "map_partitions" and we cannot granularly pass dependencies to it, at the very least this may make things much more complex.

I could easily see a version und usages where we allow dependencies being passed to the service but a dependency may be passed to the service but this dependency is passed to every service instance.

graph BT
O1 --> D1
O2 --> D2
O3 --> D3
K --> D1
K --> D2
K --> D3
foo-service --> O1
foo-service --> O2
foo-service --> O3
I1 --> foo-service
I2 --> foo-service
Loading

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this restriction because the output tasks don't actually run—they're dummy keys/virtual tasks, and the scheduler will just wait until Concierge.produce_key is called for them, then transition them from waiting->memory.

If a dummy output key actually depends on two services, which service gets to mark it as ready? What if the two services mark it as ready on different workers? By having this restriction, we just eliminate all these edge cases.

I'm also open to changing this model. It's weird to have tasks in the graph that never actually run. It also would break optimizations like linear fusion. However, doing so makes things more complicated.

A more powerful model might actually be to have the Service tell the scheduler "this key/task is now available, run it on my worker", and the scheduler would then submit that task to that worker (like we do now with worker restrictions, basically). It would be a mechanism to transition a task from waiting->ready and set worker restrictions, basically. I've tried out that idea in f1d977e.

Still, if that task depends on multiple service tasks, which one gets to mark it as ready? Which worker should it run on?

It's all much easier to reason about if a key can only depend on one service task.

Composition
-----------

One Service's outputs can be another Service's inputs.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume a join/merge would also be expressed as a service? I.e. two shuffle services and one combiner, correct?

This would remove our triangle fusion problem by transferring the responsibility of combining the correct shuffle outputs to produce a merged output. Does this combiner have all necessary information for this? What kind of metadata would we need to track and where to enable this, e.g. is this additional TaskState metadata, another out-of-band communication between the workers?

How would this look like for shuffling specifically?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't thinking there'd be any special combiner service. Just two shuffle services, which feed into tasks combining their outputs.

flowchart BT
subgraph A
a1 & a2 & a3 --> shuffle-a --> sa1 & sa2 & sa3
end
subgraph B
b1 & b2 & b3 --> shuffle-b --> sb1 & sb2 & sb3
end
sa1 & sb1 --> m1
sa2 & sb2 --> m2
sa3 & sb3 --> m3
Loading

I still don't have an answer to triangle fusion. d84ce78..ba08e61 was an attempt to support triangle fusion natively in services, but it feels kind of contrived for the general Service interface.

Comment on lines 72 to 74
* The service is informed at runtime which inputs to expect and which outputs to
produce, so partial recomputation (some outputs already in memory, some not) can be
handled correctly.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe related to my other comment on this line.

Would this solve our "culling" problem? If so, how? How would the scheduler detect that the graph is culled and how would the information be propagated to the services?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Talking about the shuffling specifically, does this generate any value?

I'm not actually certain—I haven't actually tried writing out how shuffling would use this yet. To some degree, I was just adding things to the interface that seemed generally useful. I don't think shuffling needs to know about the inputs. But knowing the outputs makes it easy to address partial recomputation:

s = shuffle(df, "id")
futures = client.futures_of(s.persist())
del futures[:-2]  # retain some outputs but not others
s.compute()  # compute same graph again

This case breaks the current P2P shuffle, because in s.compute(), 2 of the unpack tasks will never run (because their results are already in memory, pinned by futures). The current shuffle only knows it's done (and cleans up state) by counting how many times unpack has been called on each worker, compared to the number of partitions that worker is responsible for. When those numbers don't match up, the shuffle "never ends" and leaves extra data around, among other problems.

Both knowing the output_keys and having a stop callback is overkill (you could solve partial recomputation having just one of them), but knowing the output keys allows for optimizations too, and I think is just a nice (and easy to implement) interface.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this solve our "culling" problem? If so, how?

Maybe? We need the key for the "service task" to change if its outputs change.

graph BT
subgraph Culled
i1' & i2' --> shuffle-service-def567 --> o1' & o2'
end

subgraph Full computation
i1 & i2 --> shuffle-service-abc123 --> o1 & o2 & o3
end

shuffle-service-def567 -. Can't have same keys .- shuffle-service-abc123
Loading

We were planning to solve this with a custom graph optimization. I've implemented a version of that in b0e30d6. That optimization would

  1. Change the name of the shuffle task in its cull method
  2. (For the current P2P shuffle) during cull, edit the graph so the names of the output keys were passed into the task itself (so it could optimize to not transfer unneeded data)

Via output_keys we already get 2. We could move 1) to the scheduler if we wanted: if a task is a Service, change its key to include the hash of its dependent keys. But it's probably better to leave that to the graph side.

Comment on lines +423 to +426
# https://stackoverflow.com/a/65564936/17100540
class _RPC(Protocol):
def __call__(self, *args, **kwargs) -> Awaitable:
...
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should reinvent a new RPC framework but instead improve on the existing one. I don't see a reason why they should be different, is there?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't. We'd use the existing one. This is just a typing.Protocol to represent the interface (because you can't use *args and **kwargs in typing.Callable).

@mrocklin
Copy link
Member

Thanks for writing this up. I look forward to reading it over.

This way, the output tasks actually run. It's less weird. Can tasks depend on multiple Services now? Can the scheduler automatically deal with triangle fusion? See the next commit!
The goal of service families is to solve triangle fusion, but that's probably not a good idea. This is really complicated. And the assurance that services in the same service family will distribute outputs in the same way, or even have the same ouputs at all, is tenuous.
Still doesn't seem like a great idea. Can think of other use-cases where you'd want the sibling services to run on different workers (GPU?), then transfer stuff at the end.
`output_ready` can/should probably stay. But service families seem like too much right now. Not sure how else to handle triangle fusion though (besides just not letting the tasks fuse).

This reverts commits ba08e61 d84ce78 f1d977e
Comment on lines 72 to 74
* The service is informed at runtime which inputs to expect and which outputs to
produce, so partial recomputation (some outputs already in memory, some not) can be
handled correctly.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Talking about the shuffling specifically, does this generate any value?

I'm not actually certain—I haven't actually tried writing out how shuffling would use this yet. To some degree, I was just adding things to the interface that seemed generally useful. I don't think shuffling needs to know about the inputs. But knowing the outputs makes it easy to address partial recomputation:

s = shuffle(df, "id")
futures = client.futures_of(s.persist())
del futures[:-2]  # retain some outputs but not others
s.compute()  # compute same graph again

This case breaks the current P2P shuffle, because in s.compute(), 2 of the unpack tasks will never run (because their results are already in memory, pinned by futures). The current shuffle only knows it's done (and cleans up state) by counting how many times unpack has been called on each worker, compared to the number of partitions that worker is responsible for. When those numbers don't match up, the shuffle "never ends" and leaves extra data around, among other problems.

Both knowing the output_keys and having a stop callback is overkill (you could solve partial recomputation having just one of them), but knowing the output keys allows for optimizations too, and I think is just a nice (and easy to implement) interface.

distributed/service.py Outdated Show resolved Hide resolved
Comment on lines 72 to 74
* The service is informed at runtime which inputs to expect and which outputs to
produce, so partial recomputation (some outputs already in memory, some not) can be
handled correctly.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this solve our "culling" problem? If so, how?

Maybe? We need the key for the "service task" to change if its outputs change.

graph BT
subgraph Culled
i1' & i2' --> shuffle-service-def567 --> o1' & o2'
end

subgraph Full computation
i1 & i2 --> shuffle-service-abc123 --> o1 & o2 & o3
end

shuffle-service-def567 -. Can't have same keys .- shuffle-service-abc123
Loading

We were planning to solve this with a custom graph optimization. I've implemented a version of that in b0e30d6. That optimization would

  1. Change the name of the shuffle task in its cull method
  2. (For the current P2P shuffle) during cull, edit the graph so the names of the output keys were passed into the task itself (so it could optimize to not transfer unneeded data)

Via output_keys we already get 2. We could move 1) to the scheduler if we wanted: if a task is a Service, change its key to include the hash of its dependent keys. But it's probably better to leave that to the graph side.

Comment on lines +85 to +88
Leader election
---------------
* At startup, one instance is designated as the "leader", making it easier to coordinate
initialization tasks that need to run exactly once.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't shuffling would. Not sure of other things would either; it just seemed easy and useful to offer, but a pain to implement yourself. Both the shuffle PRs needed a form of leader election to do some initialization, but that was for doing things like starting up the shuffle plugin and sending the list of peer workers, both of which would be handled automatically by the Service framework.

I also realize a more useful implementation might be passing the leader ID to all instances, rather than just True/False.

Comment on lines +117 to +118
If a task depends on a `Service`, that must be its only dependency. For example, this
will raise an error when the graph is submitted to the scheduler:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this restriction because the output tasks don't actually run—they're dummy keys/virtual tasks, and the scheduler will just wait until Concierge.produce_key is called for them, then transition them from waiting->memory.

If a dummy output key actually depends on two services, which service gets to mark it as ready? What if the two services mark it as ready on different workers? By having this restriction, we just eliminate all these edge cases.

I'm also open to changing this model. It's weird to have tasks in the graph that never actually run. It also would break optimizations like linear fusion. However, doing so makes things more complicated.

A more powerful model might actually be to have the Service tell the scheduler "this key/task is now available, run it on my worker", and the scheduler would then submit that task to that worker (like we do now with worker restrictions, basically). It would be a mechanism to transition a task from waiting->ready and set worker restrictions, basically. I've tried out that idea in f1d977e.

Still, if that task depends on multiple service tasks, which one gets to mark it as ready? Which worker should it run on?

It's all much easier to reason about if a key can only depend on one service task.

Comment on lines +423 to +426
# https://stackoverflow.com/a/65564936/17100540
class _RPC(Protocol):
def __call__(self, *args, **kwargs) -> Awaitable:
...
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't. We'd use the existing one. This is just a typing.Protocol to represent the interface (because you can't use *args and **kwargs in typing.Callable).

@abstractmethod
async def add_key(self, key: str, data: Any) -> None:
"""
Called by the `Worker` to "hand off" data to the `Service`.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if this "virtual tasks" approach may not be a good design, and we should do f1d977e instead.

  1. Allowing services to pump data into workers as fast as they want is basically root task overproduction. Services would have to come up with their own form of backpressure, waiting to add more keys until they ones they've already added have been used. But what if they're producing poor combinations of keys, and downstream tasks aren't getting what they need in order to run?

    Telling the scheduler "this is the menu of data available, you pick which ones you actually want" seems more correct in theory. While root-task overproduction is still a problem, it won't do any good (the scheduler just says "ALL OF THEM I LOVE KEYS!"), but in theory we could fix that in a couple different ways, and then the service model would fit nicely into it. Letting the scheduler pick which outputs are best to produce feels, in principle, like the right thing, even though right now the scheduler has no ability to make that decision.

  2. It's just weird (and a little brittle) to have tasks in the graph that will never run?

  3. It means we can't have triangle fusion (the outputs of two different service tasks can't be fused together). In practice, triangle fusion via Blockwise is an important protection against root task overproduction, because it allows DataFrame operations downstream of the merge to fuse into the merge. But triangle fusion is also hard to get right in the context of Services.

Just to beat a decayed and mummified horse over here, we wouldn't have to worry about triangle fusion if we fixed root task overproduction! We really have to twist ourselves into strange knots in many other places because of this scheduling error.

Composition
-----------

One Service's outputs can be another Service's inputs.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't thinking there'd be any special combiner service. Just two shuffle services, which feed into tasks combining their outputs.

flowchart BT
subgraph A
a1 & a2 & a3 --> shuffle-a --> sa1 & sa2 & sa3
end
subgraph B
b1 & b2 & b3 --> shuffle-b --> sb1 & sb2 & sb3
end
sa1 & sb1 --> m1
sa2 & sb2 --> m2
sa3 & sb3 --> m3
Loading

I still don't have an answer to triangle fusion. d84ce78..ba08e61 was an attempt to support triangle fusion natively in services, but it feels kind of contrived for the general Service interface.

@mrocklin
Copy link
Member

Very high level thoughts:

In general I like the concept. I like the idea of trying to marry something into the task scheduling framework, kinda like Actors.

If I were to have a critique it's that this feels over-specified for how much experience we have. Generally before solidifying an abstraction like this I'd like for us to have a few different examples (shuffling, XGBoost, MPI applications, what else?). If we have only a couple of these then it's hard to make an abstraction that will stand the test of time. That is, unless we make it very very simple. There is a lot of design in this proposal, and that makes me a little nervous.

So in general I like the broad idea, and I think that this is a sensible direction to go in, but I think that we don't yet have enough experience to nail something broad down. My next step here would be to see how few things we could specify to make shuffling and XGBoost both happy.

Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some smaller comments in line with my thoughts on how specified we want to be.

Also, it goes without saying that I could easily be wrong on all of this.


* Once the *first* input key to a service task is ready (state ``memory``), service
instances are started on all workers in the cluster.
* Once the last output key is produced by a service, all instances are told to stop.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if instead we could rely on Dask scoping to handle this

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As in, when the scheduler transitions a service task to a processing state, it tells all the instances to start, and when it transitions to a terminal state, it tells all the instances to stop? That's basically what I'm describing here. Just trying to avoid mentioning the implementation, and focusing on the contract services can expect. Services shouldn't really care who's telling them to stop, just that they get the message.

* Once the *first* input key to a service task is ready (state ``memory``), service
instances are started on all workers in the cluster.
* Once the last output key is produced by a service, all instances are told to stop.
* Any instance can error the service task; all other instances will be told to stop.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could imagine situations where this would not be desired (distributed training in some instances). This seems over-specified to me. Obviously it would be useful for shuffling though

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allowing for anything else (instances must come to some distributed consensus on whether to error the task overall?) feels far more complex and undesirable to me. What do you think the behavior should be instead?

Instances are fully at liberty to suppress errors if they want to. This just says they can error the entire task as a unilateral decision, and use that as a way to stop their peers. It's just a useful handle that's available. They're welcome not to pull it if it doesn't make sense for their case.

* Once the last output key is produced by a service, all instances are told to stop.
* Any instance can error the service task; all other instances will be told to stop.
* Any instance can restart the service task; all other instances will be told to stop,
then re-launched from scratch.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that having this ability would be good. My guess is that we'll want to be flexible here.

----------

* If a worker that's running a service leaves, the entire service task is restarted on
all workers. TODO change once `peer_joined`/`peer_left` are added.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems over-specified

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. It's just far simpler to implement this as a first pass than peer_joined/peer_left. There are a number of edge cases you have to define to support peer_joined properly, so since that would be irrelevant for shuffling right now, I left it all out. This is very shuffle-targeted right now.

However, if you don't have peer_joined, it's not over-specified. If you're not giving services the hooks to know that a peer has joined/left, and decide whether that's a reason to restart, then the contract should make this conservative choice and assume the service always wants to restart. (Relying on broken RPCs is not sufficient for shuffling to identify lost workers if the workers holding inputs and the workers receiving outputs are disjoint sets, and an input worker goes down.)

* If a worker that's running a service leaves, the entire service task is restarted on
all workers. TODO change once `peer_joined`/`peer_left` are added.
* If keys downstream of a service need to be recomputed, the service will be rerun, just
like regular tasks are.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to leverage Dask's normal state machine here. When a service transitions from waiting to processing, it starts up. When it transitions from memory to released, it shuts down.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's basically what this sentence is saying. It's probably superfluous, just want to be clear to users of the Service interface that it's not something they have to worry about.

Peer discovery and RPC
----------------------

* At startup, services are given a list of their peers as RPC handles.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A list of peers yes, but I would use addresses instead. It may also be that they don't need everything. I would leave this programmable.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that these aren't RPCs to the other workers; they're RPCs only to the other instances of this Service belonging to the same service task on other workers. There's a whole lot that's handled for you by doing that that I would not want every Service implementation to have to do (or even be able to do) itself: opening an RPC properly, connecting to the correct worker handler routed to the correct Service instance, validating that the other instance hasn't changed out from under you, etc.

It's also a much stricter contract than "here's a worker, have at it", which means it's both easier to develop against, much easier to support (random changes in worker.py/comms won't break existing Service code), and much easier to refactor/change in the future.

Again, this is a tool that's useful, but you don't have to use. What's actually passed right now is peers: Mapping[ServiceId, ServiceHandle[T]]. (I originally used addresses, but switched to opaque IDs because a worker could restart at the same address, but effectively be a new peer—guaranteeing IDs are unique is a nicer contract for users to work with.) This could instead just be peers: Sequence[ServiceID], and we could have an open_rpc(id: ServiceID) -> RPC method, so you can choose whether or not you actually need the RPC.


* At startup, services are given a list of their peers as RPC handles.
* The RPC handle allows a `Service` to call arbitrary methods on its peers on other
workers.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to generalize the handlers system that we have more generally 👍

Comment on lines +85 to +88
Leader election
---------------
* At startup, one instance is designated as the "leader", making it easier to coordinate
initialization tasks that need to run exactly once.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than leader election, I would propose that we just embrace the scheduler in this case.

We have a natural leader that everyone agrees on. Let's use it.

Copy link
Collaborator Author

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I were to have a critique it's that this feels over-specified for how much experience we have

Agreed that we don't have enough experience right now to know what the right interface would be in general. I basically wrote this as "what do I wish we had for shuffling that at least sounds like it could be reasonable in general?"

I don't think that means it's over-specified though. It's much easier to start with an overly-restrictive interface and make it broader. Whatever interface we start with, we're not going to get it right at first, and we'll have to make changes. It's way easier to change something that's tightly specified, because you can reason about what those changes will/won't break, and how you need to adjust existing usage.

A major (unwritten) goal of what I have here is that someone would be able to develop against it using only the interfaces described in this file. There's a reason that a Worker instance is never passed into Services and you interact via the Concierge thing, and that I defined a standalone RPC interface instead of saying it's a core.rpc. It also makes it much easer for us to reason about what a Service can and can't do. Which makes it easier to iterate on.


* Once the *first* input key to a service task is ready (state ``memory``), service
instances are started on all workers in the cluster.
* Once the last output key is produced by a service, all instances are told to stop.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As in, when the scheduler transitions a service task to a processing state, it tells all the instances to start, and when it transitions to a terminal state, it tells all the instances to stop? That's basically what I'm describing here. Just trying to avoid mentioning the implementation, and focusing on the contract services can expect. Services shouldn't really care who's telling them to stop, just that they get the message.

* If a worker that's running a service leaves, the entire service task is restarted on
all workers. TODO change once `peer_joined`/`peer_left` are added.
* If keys downstream of a service need to be recomputed, the service will be rerun, just
like regular tasks are.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's basically what this sentence is saying. It's probably superfluous, just want to be clear to users of the Service interface that it's not something they have to worry about.

----------

* If a worker that's running a service leaves, the entire service task is restarted on
all workers. TODO change once `peer_joined`/`peer_left` are added.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. It's just far simpler to implement this as a first pass than peer_joined/peer_left. There are a number of edge cases you have to define to support peer_joined properly, so since that would be irrelevant for shuffling right now, I left it all out. This is very shuffle-targeted right now.

However, if you don't have peer_joined, it's not over-specified. If you're not giving services the hooks to know that a peer has joined/left, and decide whether that's a reason to restart, then the contract should make this conservative choice and assume the service always wants to restart. (Relying on broken RPCs is not sufficient for shuffling to identify lost workers if the workers holding inputs and the workers receiving outputs are disjoint sets, and an input worker goes down.)

Peer discovery and RPC
----------------------

* At startup, services are given a list of their peers as RPC handles.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that these aren't RPCs to the other workers; they're RPCs only to the other instances of this Service belonging to the same service task on other workers. There's a whole lot that's handled for you by doing that that I would not want every Service implementation to have to do (or even be able to do) itself: opening an RPC properly, connecting to the correct worker handler routed to the correct Service instance, validating that the other instance hasn't changed out from under you, etc.

It's also a much stricter contract than "here's a worker, have at it", which means it's both easier to develop against, much easier to support (random changes in worker.py/comms won't break existing Service code), and much easier to refactor/change in the future.

Again, this is a tool that's useful, but you don't have to use. What's actually passed right now is peers: Mapping[ServiceId, ServiceHandle[T]]. (I originally used addresses, but switched to opaque IDs because a worker could restart at the same address, but effectively be a new peer—guaranteeing IDs are unique is a nicer contract for users to work with.) This could instead just be peers: Sequence[ServiceID], and we could have an open_rpc(id: ServiceID) -> RPC method, so you can choose whether or not you actually need the RPC.

Comment on lines +85 to +88
Leader election
---------------
* At startup, one instance is designated as the "leader", making it easier to coordinate
initialization tasks that need to run exactly once.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be fine with that for now, seeing as we don't have an immediate need for it.

As a creator of a Service, or maybe a third-party library that uses Services, it would just be nice to not have to create a scheduler plugin and get it registered just to do some simple initialization task.

* Once the *first* input key to a service task is ready (state ``memory``), service
instances are started on all workers in the cluster.
* Once the last output key is produced by a service, all instances are told to stop.
* Any instance can error the service task; all other instances will be told to stop.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allowing for anything else (instances must come to some distributed consensus on whether to error the task overall?) feels far more complex and undesirable to me. What do you think the behavior should be instead?

Instances are fully at liberty to suppress errors if they want to. This just says they can error the entire task as a unilateral decision, and use that as a way to stop their peers. It's just a useful handle that's available. They're welcome not to pull it if it doesn't make sense for their case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants