-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Draft] Service
s for out-of-band operations
#5948
base: main
Are you sure you want to change the base?
Conversation
Unit Test Results 12 files ±0 12 suites ±0 5h 43m 40s ⏱️ + 5m 36s For more details on these failures, see this check. Results for commit 1ac83b7. ± Comparison against base commit 7a69b5e. ♻️ This comment has been updated with latest results. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels like a more technical description of what we called "virtual tasks" previously. At least what I was thinking about when talking about the virtual tasks is similar to the service_result
in this document.
There are surely differences but this gives the entire thing a better foundation. I generally like the approach but I think there are still a few things missing. I'm particularly interested in discussing the necessary scheduler modifications to support this (on a high level).
From our recent discussions about design processes I believe this belongs in https://github.com/dask/design-docs
Leader election | ||
--------------- | ||
* At startup, one instance is designated as the "leader", making it easier to coordinate | ||
initialization tasks that need to run exactly once. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we require any leaders? There sure may be applications needing this but does the shuffle? Does the array rechunking or any other application that would soon benefit from this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't shuffling would. Not sure of other things would either; it just seemed easy and useful to offer, but a pain to implement yourself. Both the shuffle PRs needed a form of leader election to do some initialization, but that was for doing things like starting up the shuffle plugin and sending the list of peer workers, both of which would be handled automatically by the Service
framework.
I also realize a more useful implementation might be passing the leader ID to all instances, rather than just True/False.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than leader election, I would propose that we just embrace the scheduler in this case.
We have a natural leader that everyone agrees on. Let's use it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be fine with that for now, seeing as we don't have an immediate need for it.
As a creator of a Service
, or maybe a third-party library that uses Service
s, it would just be nice to not have to create a scheduler plugin and get it registered just to do some simple initialization task.
* The RPC handle allows a `Service` to call arbitrary methods on its peers on other | ||
workers. | ||
* Loopback calls (using the RPC to call yourself) take a fastpath, bypassing | ||
serialization and the network stack. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are attributes I would like to see generally from our RPC framework
If a task depends on a `Service`, that must be its only dependency. For example, this | ||
will raise an error when the graph is submitted to the scheduler: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you elaborate a bit further why we need this restriction?
I get that a service
is evaluated as some kind of "apply on everything", similar to a "map_partitions" and we cannot granularly pass dependencies to it, at the very least this may make things much more complex.
I could easily see a version und usages where we allow dependencies being passed to the service but a dependency may be passed to the service but this dependency is passed to every service instance.
graph BT
O1 --> D1
O2 --> D2
O3 --> D3
K --> D1
K --> D2
K --> D3
foo-service --> O1
foo-service --> O2
foo-service --> O3
I1 --> foo-service
I2 --> foo-service
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need this restriction because the output tasks don't actually run—they're dummy keys/virtual tasks, and the scheduler will just wait until Concierge.produce_key
is called for them, then transition them from waiting->memory.
If a dummy output key actually depends on two services, which service gets to mark it as ready? What if the two services mark it as ready on different workers? By having this restriction, we just eliminate all these edge cases.
I'm also open to changing this model. It's weird to have tasks in the graph that never actually run. It also would break optimizations like linear fusion. However, doing so makes things more complicated.
A more powerful model might actually be to have the Service tell the scheduler "this key/task is now available, run it on my worker", and the scheduler would then submit that task to that worker (like we do now with worker restrictions, basically). It would be a mechanism to transition a task from waiting->ready and set worker restrictions, basically. I've tried out that idea in f1d977e.
Still, if that task depends on multiple service tasks, which one gets to mark it as ready? Which worker should it run on?
It's all much easier to reason about if a key can only depend on one service task.
Composition | ||
----------- | ||
|
||
One Service's outputs can be another Service's inputs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume a join/merge would also be expressed as a service? I.e. two shuffle services and one combiner, correct?
This would remove our triangle fusion problem by transferring the responsibility of combining the correct shuffle outputs to produce a merged output. Does this combiner have all necessary information for this? What kind of metadata would we need to track and where to enable this, e.g. is this additional TaskState metadata, another out-of-band communication between the workers?
How would this look like for shuffling specifically?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't thinking there'd be any special combiner service. Just two shuffle services, which feed into tasks combining their outputs.
flowchart BT
subgraph A
a1 & a2 & a3 --> shuffle-a --> sa1 & sa2 & sa3
end
subgraph B
b1 & b2 & b3 --> shuffle-b --> sb1 & sb2 & sb3
end
sa1 & sb1 --> m1
sa2 & sb2 --> m2
sa3 & sb3 --> m3
I still don't have an answer to triangle fusion. d84ce78..ba08e61 was an attempt to support triangle fusion natively in services, but it feels kind of contrived for the general Service interface.
distributed/service.py
Outdated
* The service is informed at runtime which inputs to expect and which outputs to | ||
produce, so partial recomputation (some outputs already in memory, some not) can be | ||
handled correctly. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe related to my other comment on this line.
Would this solve our "culling" problem? If so, how? How would the scheduler detect that the graph is culled and how would the information be propagated to the services?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Talking about the shuffling specifically, does this generate any value?
I'm not actually certain—I haven't actually tried writing out how shuffling would use this yet. To some degree, I was just adding things to the interface that seemed generally useful. I don't think shuffling needs to know about the inputs. But knowing the outputs makes it easy to address partial recomputation:
s = shuffle(df, "id")
futures = client.futures_of(s.persist())
del futures[:-2] # retain some outputs but not others
s.compute() # compute same graph again
This case breaks the current P2P shuffle, because in s.compute()
, 2 of the unpack
tasks will never run (because their results are already in memory, pinned by futures). The current shuffle only knows it's done (and cleans up state) by counting how many times unpack
has been called on each worker, compared to the number of partitions that worker is responsible for. When those numbers don't match up, the shuffle "never ends" and leaves extra data around, among other problems.
Both knowing the output_keys
and having a stop
callback is overkill (you could solve partial recomputation having just one of them), but knowing the output keys allows for optimizations too, and I think is just a nice (and easy to implement) interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this solve our "culling" problem? If so, how?
Maybe? We need the key for the "service task" to change if its outputs change.
graph BT
subgraph Culled
i1' & i2' --> shuffle-service-def567 --> o1' & o2'
end
subgraph Full computation
i1 & i2 --> shuffle-service-abc123 --> o1 & o2 & o3
end
shuffle-service-def567 -. Can't have same keys .- shuffle-service-abc123
We were planning to solve this with a custom graph optimization. I've implemented a version of that in b0e30d6. That optimization would
- Change the name of the shuffle task in its
cull
method - (For the current P2P shuffle) during
cull
, edit the graph so the names of the output keys were passed into the task itself (so it could optimize to not transfer unneeded data)
Via output_keys
we already get 2. We could move 1) to the scheduler if we wanted: if a task is a Service, change its key to include the hash of its dependent keys. But it's probably better to leave that to the graph side.
# https://stackoverflow.com/a/65564936/17100540 | ||
class _RPC(Protocol): | ||
def __call__(self, *args, **kwargs) -> Awaitable: | ||
... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should reinvent a new RPC framework but instead improve on the existing one. I don't see a reason why they should be different, is there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There isn't. We'd use the existing one. This is just a typing.Protocol
to represent the interface (because you can't use *args
and **kwargs
in typing.Callable
).
Thanks for writing this up. I look forward to reading it over. |
This way, the output tasks actually run. It's less weird. Can tasks depend on multiple Services now? Can the scheduler automatically deal with triangle fusion? See the next commit!
The goal of service families is to solve triangle fusion, but that's probably not a good idea. This is really complicated. And the assurance that services in the same service family will distribute outputs in the same way, or even have the same ouputs at all, is tenuous.
Still doesn't seem like a great idea. Can think of other use-cases where you'd want the sibling services to run on different workers (GPU?), then transfer stuff at the end.
distributed/service.py
Outdated
* The service is informed at runtime which inputs to expect and which outputs to | ||
produce, so partial recomputation (some outputs already in memory, some not) can be | ||
handled correctly. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Talking about the shuffling specifically, does this generate any value?
I'm not actually certain—I haven't actually tried writing out how shuffling would use this yet. To some degree, I was just adding things to the interface that seemed generally useful. I don't think shuffling needs to know about the inputs. But knowing the outputs makes it easy to address partial recomputation:
s = shuffle(df, "id")
futures = client.futures_of(s.persist())
del futures[:-2] # retain some outputs but not others
s.compute() # compute same graph again
This case breaks the current P2P shuffle, because in s.compute()
, 2 of the unpack
tasks will never run (because their results are already in memory, pinned by futures). The current shuffle only knows it's done (and cleans up state) by counting how many times unpack
has been called on each worker, compared to the number of partitions that worker is responsible for. When those numbers don't match up, the shuffle "never ends" and leaves extra data around, among other problems.
Both knowing the output_keys
and having a stop
callback is overkill (you could solve partial recomputation having just one of them), but knowing the output keys allows for optimizations too, and I think is just a nice (and easy to implement) interface.
distributed/service.py
Outdated
* The service is informed at runtime which inputs to expect and which outputs to | ||
produce, so partial recomputation (some outputs already in memory, some not) can be | ||
handled correctly. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this solve our "culling" problem? If so, how?
Maybe? We need the key for the "service task" to change if its outputs change.
graph BT
subgraph Culled
i1' & i2' --> shuffle-service-def567 --> o1' & o2'
end
subgraph Full computation
i1 & i2 --> shuffle-service-abc123 --> o1 & o2 & o3
end
shuffle-service-def567 -. Can't have same keys .- shuffle-service-abc123
We were planning to solve this with a custom graph optimization. I've implemented a version of that in b0e30d6. That optimization would
- Change the name of the shuffle task in its
cull
method - (For the current P2P shuffle) during
cull
, edit the graph so the names of the output keys were passed into the task itself (so it could optimize to not transfer unneeded data)
Via output_keys
we already get 2. We could move 1) to the scheduler if we wanted: if a task is a Service, change its key to include the hash of its dependent keys. But it's probably better to leave that to the graph side.
Leader election | ||
--------------- | ||
* At startup, one instance is designated as the "leader", making it easier to coordinate | ||
initialization tasks that need to run exactly once. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't shuffling would. Not sure of other things would either; it just seemed easy and useful to offer, but a pain to implement yourself. Both the shuffle PRs needed a form of leader election to do some initialization, but that was for doing things like starting up the shuffle plugin and sending the list of peer workers, both of which would be handled automatically by the Service
framework.
I also realize a more useful implementation might be passing the leader ID to all instances, rather than just True/False.
If a task depends on a `Service`, that must be its only dependency. For example, this | ||
will raise an error when the graph is submitted to the scheduler: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need this restriction because the output tasks don't actually run—they're dummy keys/virtual tasks, and the scheduler will just wait until Concierge.produce_key
is called for them, then transition them from waiting->memory.
If a dummy output key actually depends on two services, which service gets to mark it as ready? What if the two services mark it as ready on different workers? By having this restriction, we just eliminate all these edge cases.
I'm also open to changing this model. It's weird to have tasks in the graph that never actually run. It also would break optimizations like linear fusion. However, doing so makes things more complicated.
A more powerful model might actually be to have the Service tell the scheduler "this key/task is now available, run it on my worker", and the scheduler would then submit that task to that worker (like we do now with worker restrictions, basically). It would be a mechanism to transition a task from waiting->ready and set worker restrictions, basically. I've tried out that idea in f1d977e.
Still, if that task depends on multiple service tasks, which one gets to mark it as ready? Which worker should it run on?
It's all much easier to reason about if a key can only depend on one service task.
# https://stackoverflow.com/a/65564936/17100540 | ||
class _RPC(Protocol): | ||
def __call__(self, *args, **kwargs) -> Awaitable: | ||
... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There isn't. We'd use the existing one. This is just a typing.Protocol
to represent the interface (because you can't use *args
and **kwargs
in typing.Callable
).
@abstractmethod | ||
async def add_key(self, key: str, data: Any) -> None: | ||
""" | ||
Called by the `Worker` to "hand off" data to the `Service`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if this "virtual tasks" approach may not be a good design, and we should do f1d977e instead.
-
Allowing services to pump data into workers as fast as they want is basically root task overproduction. Services would have to come up with their own form of backpressure, waiting to add more keys until they ones they've already added have been used. But what if they're producing poor combinations of keys, and downstream tasks aren't getting what they need in order to run?
Telling the scheduler "this is the menu of data available, you pick which ones you actually want" seems more correct in theory. While root-task overproduction is still a problem, it won't do any good (the scheduler just says "ALL OF THEM I LOVE KEYS!"), but in theory we could fix that in a couple different ways, and then the service model would fit nicely into it. Letting the scheduler pick which outputs are best to produce feels, in principle, like the right thing, even though right now the scheduler has no ability to make that decision.
-
It's just weird (and a little brittle) to have tasks in the graph that will never run?
-
It means we can't have triangle fusion (the outputs of two different service tasks can't be fused together). In practice, triangle fusion via Blockwise is an important protection against root task overproduction, because it allows DataFrame operations downstream of the
merge
to fuse into themerge
. But triangle fusion is also hard to get right in the context of Services.
Just to beat a decayed and mummified horse over here, we wouldn't have to worry about triangle fusion if we fixed root task overproduction! We really have to twist ourselves into strange knots in many other places because of this scheduling error.
Composition | ||
----------- | ||
|
||
One Service's outputs can be another Service's inputs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't thinking there'd be any special combiner service. Just two shuffle services, which feed into tasks combining their outputs.
flowchart BT
subgraph A
a1 & a2 & a3 --> shuffle-a --> sa1 & sa2 & sa3
end
subgraph B
b1 & b2 & b3 --> shuffle-b --> sb1 & sb2 & sb3
end
sa1 & sb1 --> m1
sa2 & sb2 --> m2
sa3 & sb3 --> m3
I still don't have an answer to triangle fusion. d84ce78..ba08e61 was an attempt to support triangle fusion natively in services, but it feels kind of contrived for the general Service interface.
Very high level thoughts: In general I like the concept. I like the idea of trying to marry something into the task scheduling framework, kinda like Actors. If I were to have a critique it's that this feels over-specified for how much experience we have. Generally before solidifying an abstraction like this I'd like for us to have a few different examples (shuffling, XGBoost, MPI applications, what else?). If we have only a couple of these then it's hard to make an abstraction that will stand the test of time. That is, unless we make it very very simple. There is a lot of design in this proposal, and that makes me a little nervous. So in general I like the broad idea, and I think that this is a sensible direction to go in, but I think that we don't yet have enough experience to nail something broad down. My next step here would be to see how few things we could specify to make shuffling and XGBoost both happy. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some smaller comments in line with my thoughts on how specified we want to be.
Also, it goes without saying that I could easily be wrong on all of this.
|
||
* Once the *first* input key to a service task is ready (state ``memory``), service | ||
instances are started on all workers in the cluster. | ||
* Once the last output key is produced by a service, all instances are told to stop. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if instead we could rely on Dask scoping to handle this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As in, when the scheduler transitions a service task to a processing
state, it tells all the instances to start, and when it transitions to a terminal state, it tells all the instances to stop? That's basically what I'm describing here. Just trying to avoid mentioning the implementation, and focusing on the contract services can expect. Services shouldn't really care who's telling them to stop, just that they get the message.
* Once the *first* input key to a service task is ready (state ``memory``), service | ||
instances are started on all workers in the cluster. | ||
* Once the last output key is produced by a service, all instances are told to stop. | ||
* Any instance can error the service task; all other instances will be told to stop. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could imagine situations where this would not be desired (distributed training in some instances). This seems over-specified to me. Obviously it would be useful for shuffling though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Allowing for anything else (instances must come to some distributed consensus on whether to error the task overall?) feels far more complex and undesirable to me. What do you think the behavior should be instead?
Instances are fully at liberty to suppress errors if they want to. This just says they can error the entire task as a unilateral decision, and use that as a way to stop their peers. It's just a useful handle that's available. They're welcome not to pull it if it doesn't make sense for their case.
* Once the last output key is produced by a service, all instances are told to stop. | ||
* Any instance can error the service task; all other instances will be told to stop. | ||
* Any instance can restart the service task; all other instances will be told to stop, | ||
then re-launched from scratch. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that having this ability would be good. My guess is that we'll want to be flexible here.
---------- | ||
|
||
* If a worker that's running a service leaves, the entire service task is restarted on | ||
all workers. TODO change once `peer_joined`/`peer_left` are added. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems over-specified
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. It's just far simpler to implement this as a first pass than peer_joined
/peer_left
. There are a number of edge cases you have to define to support peer_joined
properly, so since that would be irrelevant for shuffling right now, I left it all out. This is very shuffle-targeted right now.
However, if you don't have peer_joined
, it's not over-specified. If you're not giving services the hooks to know that a peer has joined/left, and decide whether that's a reason to restart, then the contract should make this conservative choice and assume the service always wants to restart. (Relying on broken RPCs is not sufficient for shuffling to identify lost workers if the workers holding inputs and the workers receiving outputs are disjoint sets, and an input worker goes down.)
* If a worker that's running a service leaves, the entire service task is restarted on | ||
all workers. TODO change once `peer_joined`/`peer_left` are added. | ||
* If keys downstream of a service need to be recomputed, the service will be rerun, just | ||
like regular tasks are. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it makes sense to leverage Dask's normal state machine here. When a service transitions from waiting to processing, it starts up. When it transitions from memory to released, it shuts down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's basically what this sentence is saying. It's probably superfluous, just want to be clear to users of the Service interface that it's not something they have to worry about.
Peer discovery and RPC | ||
---------------------- | ||
|
||
* At startup, services are given a list of their peers as RPC handles. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A list of peers yes, but I would use addresses instead. It may also be that they don't need everything. I would leave this programmable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that these aren't RPCs to the other workers; they're RPCs only to the other instances of this Service
belonging to the same service task on other workers. There's a whole lot that's handled for you by doing that that I would not want every Service
implementation to have to do (or even be able to do) itself: opening an RPC properly, connecting to the correct worker handler routed to the correct Service instance, validating that the other instance hasn't changed out from under you, etc.
It's also a much stricter contract than "here's a worker, have at it", which means it's both easier to develop against, much easier to support (random changes in worker.py
/comms won't break existing Service
code), and much easier to refactor/change in the future.
Again, this is a tool that's useful, but you don't have to use. What's actually passed right now is peers: Mapping[ServiceId, ServiceHandle[T]]
. (I originally used addresses, but switched to opaque IDs because a worker could restart at the same address, but effectively be a new peer—guaranteeing IDs are unique is a nicer contract for users to work with.) This could instead just be peers: Sequence[ServiceID]
, and we could have an open_rpc(id: ServiceID) -> RPC
method, so you can choose whether or not you actually need the RPC.
|
||
* At startup, services are given a list of their peers as RPC handles. | ||
* The RPC handle allows a `Service` to call arbitrary methods on its peers on other | ||
workers. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to generalize the handlers system that we have more generally 👍
Leader election | ||
--------------- | ||
* At startup, one instance is designated as the "leader", making it easier to coordinate | ||
initialization tasks that need to run exactly once. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than leader election, I would propose that we just embrace the scheduler in this case.
We have a natural leader that everyone agrees on. Let's use it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I were to have a critique it's that this feels over-specified for how much experience we have
Agreed that we don't have enough experience right now to know what the right interface would be in general. I basically wrote this as "what do I wish we had for shuffling that at least sounds like it could be reasonable in general?"
I don't think that means it's over-specified though. It's much easier to start with an overly-restrictive interface and make it broader. Whatever interface we start with, we're not going to get it right at first, and we'll have to make changes. It's way easier to change something that's tightly specified, because you can reason about what those changes will/won't break, and how you need to adjust existing usage.
A major (unwritten) goal of what I have here is that someone would be able to develop against it using only the interfaces described in this file. There's a reason that a Worker
instance is never passed into Services and you interact via the Concierge
thing, and that I defined a standalone RPC interface instead of saying it's a core.rpc
. It also makes it much easer for us to reason about what a Service
can and can't do. Which makes it easier to iterate on.
|
||
* Once the *first* input key to a service task is ready (state ``memory``), service | ||
instances are started on all workers in the cluster. | ||
* Once the last output key is produced by a service, all instances are told to stop. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As in, when the scheduler transitions a service task to a processing
state, it tells all the instances to start, and when it transitions to a terminal state, it tells all the instances to stop? That's basically what I'm describing here. Just trying to avoid mentioning the implementation, and focusing on the contract services can expect. Services shouldn't really care who's telling them to stop, just that they get the message.
* If a worker that's running a service leaves, the entire service task is restarted on | ||
all workers. TODO change once `peer_joined`/`peer_left` are added. | ||
* If keys downstream of a service need to be recomputed, the service will be rerun, just | ||
like regular tasks are. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's basically what this sentence is saying. It's probably superfluous, just want to be clear to users of the Service interface that it's not something they have to worry about.
---------- | ||
|
||
* If a worker that's running a service leaves, the entire service task is restarted on | ||
all workers. TODO change once `peer_joined`/`peer_left` are added. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. It's just far simpler to implement this as a first pass than peer_joined
/peer_left
. There are a number of edge cases you have to define to support peer_joined
properly, so since that would be irrelevant for shuffling right now, I left it all out. This is very shuffle-targeted right now.
However, if you don't have peer_joined
, it's not over-specified. If you're not giving services the hooks to know that a peer has joined/left, and decide whether that's a reason to restart, then the contract should make this conservative choice and assume the service always wants to restart. (Relying on broken RPCs is not sufficient for shuffling to identify lost workers if the workers holding inputs and the workers receiving outputs are disjoint sets, and an input worker goes down.)
Peer discovery and RPC | ||
---------------------- | ||
|
||
* At startup, services are given a list of their peers as RPC handles. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that these aren't RPCs to the other workers; they're RPCs only to the other instances of this Service
belonging to the same service task on other workers. There's a whole lot that's handled for you by doing that that I would not want every Service
implementation to have to do (or even be able to do) itself: opening an RPC properly, connecting to the correct worker handler routed to the correct Service instance, validating that the other instance hasn't changed out from under you, etc.
It's also a much stricter contract than "here's a worker, have at it", which means it's both easier to develop against, much easier to support (random changes in worker.py
/comms won't break existing Service
code), and much easier to refactor/change in the future.
Again, this is a tool that's useful, but you don't have to use. What's actually passed right now is peers: Mapping[ServiceId, ServiceHandle[T]]
. (I originally used addresses, but switched to opaque IDs because a worker could restart at the same address, but effectively be a new peer—guaranteeing IDs are unique is a nicer contract for users to work with.) This could instead just be peers: Sequence[ServiceID]
, and we could have an open_rpc(id: ServiceID) -> RPC
method, so you can choose whether or not you actually need the RPC.
Leader election | ||
--------------- | ||
* At startup, one instance is designated as the "leader", making it easier to coordinate | ||
initialization tasks that need to run exactly once. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be fine with that for now, seeing as we don't have an immediate need for it.
As a creator of a Service
, or maybe a third-party library that uses Service
s, it would just be nice to not have to create a scheduler plugin and get it registered just to do some simple initialization task.
* Once the *first* input key to a service task is ready (state ``memory``), service | ||
instances are started on all workers in the cluster. | ||
* Once the last output key is produced by a service, all instances are told to stop. | ||
* Any instance can error the service task; all other instances will be told to stop. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Allowing for anything else (instances must come to some distributed consensus on whether to error the task overall?) feels far more complex and undesirable to me. What do you think the behavior should be instead?
Instances are fully at liberty to suppress errors if they want to. This just says they can error the entire task as a unilateral decision, and use that as a way to stop their peers. It's just a useful handle that's available. They're welcome not to pull it if it doesn't make sense for their case.
This is a draft of the interface for a
Service
—a new interface in distributed that could facilitate doing out-of-band operations in a controlled way.From the docstring:
I've been kicking around this idea for a bit as a better framework for P2P shuffling (xref #5435, #5520, #5524, #5939) that solves both the resilience needs, and some tricky edge cases around graph structure (culling, partial recomputation). I'm hoping it could be generally applicable to similar problems in the future (array rechunking, map-overlap, ML training, etc.)
I suggest reading the docstring for those interested, and reading the
Service
ABC for those really interested :)I think actually implementing this would be somewhat similar to what it took to add Actors (Services and Actors are kinda similar), in terms of special-casing things in many scheduler transition functions, and adding support on Workers for running these.
The whole idea is a little hacky, and a little odd, and adding even more complexity right now feels like the wrong direction. But if we're serious about wanting to do things like P2P shuffles, building a first-class framework for expressing out-of-band operations within the task graph will be the best approach to resilience.
Is it over-optimized for P2P shuffles? Maybe. Could it all be implemented as a huge P2P shuffle scheduler plugin? Maybe. Personally though, I find it easier to think about (and test, and ensure correctness) in this generic sense than to merge with all the details of shuffling.
cc @mrocklin @fjetter