-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Active Memory Management Control System #4982
Comments
This has the benefit of simplicity however it means that all rebalanced keys will be temporarily duplicated for several seconds, until the next rebalance pass. |
How do we manage competing interests between policies?Simple example using some of the policies you mentioned. We have a I'm reasonably confident that we can construct more elaborate scenarios with the potential to cause instabilities. Additional action: Pause task execution / block / throttle?To manage the peak number of replicas we might want to explicitly block execution of a tasks dependent to avoid unnecessary memory pressure and control overall replicas. This would probably require an additional scheduler state but would otherwise also work with the proposed basic actions |
We're going to have a copy around for a bit anyway. If it's simpler / more robust to leave things around for slightly longer (presumably the time of another cycle of this callback + coordination time) then that sounds like a good tradeoff to me. If this thing is running in the background all the time then waiting a few seconds sounds cheap to me. |
I think that short-term we probably don't worry about it / we think about it outside of the codebase. Longer term maybe the control system watches for signs of churn and raises warnings if so? |
I'm inclined to keep this out of scope for now. I think that this pattern of having various independent systems run on the scheduler, inspecting state, and making changes, is a good one though. I could imagine another that pauses workers? |
I'm going to try to lay out a set of steps that might enable an experiment here, with the goal of removing replicated data from the cluster after it is no longer necessary. Some steps: Add and remove dataFirst we probably want to be able to ask workers to add and remove data. We already have this logic built out, but it isn't immediately accessible to the scheduler. We'll need to fix that by providing stream handlers. Some questions:
These two steps are probably testable in isolation @gen_cluster(client=True)
def test_gather_dep(c, s, a, b):
x = await c.scatter(123, address=a.address)
b.collect_key(x.key)
while x.key not in b.data:
await asyncio.sleep(0.01)
a.free_keys(keys=[x.key])
while x.key in a.data:
await asyncio.sleep(0.01) (my apologies for the white-box test, this is mostly for the developer who writes this up, rather than future testing.) This is likely to require/build understanding of both how the workers/scheduler use Control systemThen, we can use these to a SchedulerExtension that holds policies and enacts transfers. This was mostly implemented above but I'll include a copy here for convenience. class ActiveMemoryManager(SchedulerExtension):
def __init__(self, scheduler):
self.scheduler = scheduler
self.periodic_callbacks = {}
def add_policy(self, policy: MemoryManagementPolicy):
policy.scheduler = self.scheduler
pc = PeriodicCallback(functools.partial(self.run, policy=policy.update), interval=parse_timedelta(policy.period))
self.periodic_callbacks[policy] = pc
asyncio.ensure_future(self.cleanup(policy)) # TODO: avoid dangling futures
def run(self, policy):
recommendations = policy.update()
for action, ws, ts in recommendations:
# These are the operations that we built in the last section
if action == "get":
self.send_to_worker(ws.address, {"op": "gather_data", "keys": [ts.key]})
elif action == "remove":
self.send_to_worker(ws.address, {"op": "remove_data", "keys": [ts.key]})
async def cleanup(self, policy)
await policy.finished
pc = self.periodic_callbacks.pop(policy)
pc.stop() On its own this is hard to verify that it works or is helpful, so we need a policy, I'm going to suggest the following policy that removes excessive copies. In particular my recommendation is that we find tasks that are on more workers than they have pending dependents. So if a task still needs to be run by 100 tasks then we're probably not going to remove anything. If it is no longer needed for any dependent then we should start removing some replicas. In order to be more efficient when going through tasks we might instead go through workers and then go through the tasks that are currently in memory class ReduceReplicas(MemoryManagementPolicy):
period = "1s"
def __init__(self):
pass
def update(self):
recommendations = []
for ws in self.scheduler.workers.values():
for ts in ws.has_what.values():
if len(ts.who_has) > len(ts.waiters):
donor = decide_good_worker_to_remove_task(...)
recommendations.append(("remove", donor, ts))
return recommendations This seems like a policy that should be easy to implement and actually useful. We could get more clever by doing things like only running this on workers that are somewhat overburdened with memory, doing it only a few workers at a time in order to keep things lively, etc.. Those are all probably future work though. SummaryI think that the only tricky thing here is taking existing communication system that the workers use to gather dependencies and release data, and exposing it as an operation that the scheduler can use. Then I think that it's mostly code plumbing to make a scheduler extension, make some policy class, set up periodic callbacks, etc.. This stuff should be somewhat straightforward. I don't think that any creativity is necessary here. The reduce replicas policy that I present above can probably be improved in several ways, and that's another place where creativity might be warranted, but I think that a brute force approach might do ok for a while. |
Also, for a general purpose test, I think that something like the following could work: @gen_cluster(client=True)
def test_clear_copies(c, s, a, b):
x = da.random.random((1000, 1000), chunks=(100, 100)).persist()
y = await x.dot(x.T).sum().persist()
# check that copies are eventually cleared out
while any(len(ts.who_has) > 1 for ts in s.tasks.values()):
await asyncio.sleep(0.1) I think that eventually this test may fail, as we decide to accept some copies, especially when we have a lot of excess memory, but it should serve during a proof of concept phase. |
def update(self):
recommendations = []
for ws in self.scheduler.workers.values():
for ts in ws.has_what.values():
if len(ts.who_has) > len(ts.waiters):
donor = decide_good_worker_to_remove_task(...)
recommendations.append(("remove", donor, ts))
return recommendations
It's going to be very expensive to do this full scan of all the in-memory keys in the cluster once you start having 100k+ such keys. The recent O(1) rebalance() PR gives you a good measure of how bad it would get. I doubt that optimization can be left to future work. |
I agree that data structures that indexed tasks by replication would make this faster.
I agree that we shouldn't make this on-by-default until we resolve this. I do think that we can get a proof of concept merged in with opt-in behavior before then though. Some other options for remediation that came to mind:
So in general I agree that this is an issue, but it's an issue with enough solutions that I'm ok ignoring the issue for now, knowing that we have a path (or several paths) to resolve it in the future. I think that we should build a simple system first before investing the time for the full solution (which we might find was wasted) |
Looking a bit more closely at the Worker, I realize that we now have a new state machine that wasn't there the last time I looked. My hope is that the signal from the scheduler to the worker can just say "make a new dependent with this key and set it on a path to be collected" and then have all the rest of the machinery work. It probably makes sense to wait for @fjetter to get back and comment on the right way to ask a worker to collect data before going forwards. |
Indeed the worker state machine is a bit weird at the moment. I looked over the gather_dep functionality once more with this use case in mind and the worker is not really built for this usecase. Registering a taskThe only way to properly register a task at the moment is via the We could easily provide another handler to register a dependency and construct the respective task state. That's what happens in add_task anyhow and we could reuse that logic probably, see distributed/distributed/worker.py Lines 1619 to 1659 in cbcec9c
Actual collectionThis is where it gets tricky since the "path to be collected" is usually done via the Pseudocode this looks like the following while self.data_needed:
key = seld.data_needed.pop() # This is a key to-be-executed, i.e. a runnable task
deps_to_fetch = get_dependencies_not_in_memory(key)
while deps_to_fetch:
dep = deps_to_fetch.pop()
worker = select_good_worker(dep)
# now that we picked a worker, we'll see if on that worker are any more dependencies for any
# task in data_needed to reduce comm overhead
more_dependencies = self.select_keys_for_gather(worker, dep)
trantition_all_to_fetch(more_dependencies) Actually fetching the dataThe actual fetching of the data is done in I think we should try reusing Regarding deletion
The safety guard in On top of this, I believe workers should be allowed to reject a delete request if it requires the key itself, i.e. if a dependent is to be executed. This is a very simple but probably stable option we could start with. It could later be refined to "the key will not be needed for the next X seconds, therefore release it and collect it later again" if necessary. From a stability point of view, I hope this is the only case where we actually need to reject a delete request. Summing up, the biggest problem I see is the |
Last note about the gathering: We do have a mechanism right now to gather keys on the worker side which circumvents the ordinary state machine and instead is an explicit RPC call, namely the |
To try to summarize, there are maybe three options
1. Move all logic into the state machine, then adding this is easy
2. Side step the state machine entirely, use the current gather option.
Easy to do, might not be entirely stable.
3. Use gather_dep and ensure_communicating, add some logic so we check
both data_needed as well as some new collection. A middle ground and
stable solution, but not entirely trivial and increases the inertia that
we'll need to rewrite when we do a state machine rewrite.
Is that a correct way of breaking things down? If so, do you have a
recommendation or preference?
…On Tue, Jul 6, 2021, 5:17 AM Florian Jetter ***@***.***> wrote:
Last note about the gathering: We do have a mechanism right now to gather
keys on the worker side which circumvents the ordinary state machine and
instead is an explicit RPC call, namely the gather: Worker.gather comm
handler. This is much simpler and does not deal with business, retries,
missing data, metrics collection, etc. For a simple mechanism to start
with, this might be sufficient (That's what the current rebalance/replicate
is built on)
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#4982 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTD3KHHZOTDSE5XEHALTWLX63ANCNFSM47MU3OHA>
.
|
Yes that captures it nicely.
I somehow dislike option 3. since I consider the worker often too complicated already. A big problem I'm having with the code lately is that there are many handlers/methods which are doing five different things with subtle differences. Knowledge and understanding about these subtleties is lost after a while and engineering becomes forensics. Now that we have some kind of (almost deadlock free) stability I would like to pick up my draft of a state refactoring to see where we are. I can tell more after I spent an afternoon with it and can estimate how much work it is. In the meantime we can start with 2.) and even work towards 1. simultaneously. One problem we can solve from the very start is trying to figure out what kind of stimuli are required from scheduler to worker. The stimuli we currently have are
If we add the necessary stimuli on worker side right away, even if the implementation doesn't give us all the required guarantees, the refactoring and AMM control system can evolve simultaneously without colliding, theoretically. IIUC the stimuli we require for the first few basic AMM policies are something like def acquire_replica(self, comm, key, who_has):
"""The worker is instructed to fetch a replica of Task/Data `key` from `who_has`"""
# pseudo
worker = pick_worker(who_has)
busy, data = get_data_from_worker_if_not_busy(key, worker)
if not busy:
self.put_key_in_memory(data)
def remove_replica(self, comm, key):
"""Forget data for the given key if it is not actively needed"""
ts = self.tasks[key]
if not any((d.state in ["executing", "ready", "waiting"] for d in ts.dependents)):
self.release_key(key, report=True) I think |
Ideally we would want the
OK, fair enough.
Yeah, I'm trying to find a path through here that let's @crusaderky get back to work on this topic after the CI work (thanks again for that @crusaderky ), but that also leads towards a better future.
OK, that sounds like a plan |
Well, if the worker hasn't heard of that task before, how is it supposed to know where to find the data without This who_has could of course be replaced by the worker calling the scheduler for a who_has. We do this occasionally but I don't see any benefits by not submitting this information right away.
Of course. We wouldn't handle busy workers properly, nor would we be able to leverage synergies by batched get_data calls. Also the missing data handling would not be implemented. Since the replication is less critical and hopefully less frequent (at least tunable), I could live with these things short term. |
I agree that we can submit something right away, but the worker should be able to reach back out on its own. The
Yeah, fair enough. What I'm hearing is that we have a path forward that is cheap and good-enough in the short term and that doesn't get in the way of more solid and robust work in the future. |
@fjetter checking in here. Do you have suggestions on the right way to proceed? I'm thinking about scheduling of @crusaderky's time. It seems like it is most time efficient to wait a bit to see if we can get the worker state machine rewrite in, and then maybe removing replicas becomes very easy. This is also a nice way to separate work between you and @crusaderky where you focus on enacting changes and @crusaderky focuses on specifying what changes should occur. However, this also seems like the kind of project that could take a day, or could take several weeks. My guess is that we maybe have a couple more days while @crusaderky fixes up CI (again, thank you for your service Guido) but after that we should probably start on this work again. So maybe my question is "how far can we get in the next couple of days?" and also "how long will fixing up CI take?" |
CI is now done. |
@fjetter and I had a chat. The agreement is that, in a first iteration, the system that deletes unwanted replicas will use Scheduler.rpc (what rebalance and replicate use now). This has been called "option 2" above in the discussion. This means that, as of the initial iteration, the deletion won't be safe to run in the middle of a computation. Also as of the initial iteration, the cost will be O(n) to the total number of tasks in the scheduler, as already discussed. In the meantime, @fjetter will have the time to complete #5046. The key feature that's not in the PR yet is that, whenever the scheduler sends a fire-and-forget request for the deletion of a key and the worker decides NOT to delete it, the worker must report back its decision. On the scheduler side, TaskState will keep track of the pending deletion requests. This extra state is necessary to avoid either accidentally deleting all copies of a key or accidentally ending up with 2 or more replicas of the same key in perpetuity. Once both of the above are done, it should be pretty much trivial to switch system (literally delete the 2 lines of self.rpc and replace them with the enqueueing into the bulk comms). This will be simple because already as of the initial design the whole thing won't need to wait for feedback from the workers. From a user's perspective, this will enable safely discarding unwanted key replicas in the middle of a computation. As a separate discussion, we are both very concerned about how the policy plugin system is going to handle conflicting decisions between policies, and we would really like to see some real-life use cases of what a user-defined policy could want to achieve (at functional analysis level) before a design is finalised. |
OK, sounds good to me.
I think that deletion is easier than replication. There are, I think, two approaches:
I think that option 1 should be safe and easy to do.
I agree that this could be fertile ground for concerns. Personally I don't think we're going to get a finalized design before trying a few things out. I think that we want to start with a simple system, see how it performs, and then adjust that system as we add in more complex policies. |
Sounds good and robust to me. |
Question for those knowledgeable about the various GPU libraries that are typically used on dask (@mrocklin please ping whoever appropriate). If you have a hybrid cluster where some workers mount a GPU while others only have CPUs, is it ok for a rebalance operation to "park" a GPU object (e.g. a cupy array) on a CPU-only node? Or will it fail to unpickle? |
I recommend that we not worry about the heterogeneous case just yet. But
the answer to your question is that typically it is not ok to park GPU data
on a non-GPU machine.
In my mind the next thing to do here is to test the "we can just replace a
few lines of code for async communication" hypothesis, and then if that
works see if we can solve the retire workers problem (which is probably the
second main use case of this)
…On Tue, Aug 10, 2021 at 12:09 PM crusaderky ***@***.***> wrote:
Question for those knowledgeable about the various GPU libraries that are
typically used on dask ***@***.*** <https://github.com/mrocklin> please
ping whoever appropriate).
If you have a hybrid cluster where some workers mount a GPU while others
only have CPUs, is it ok for a rebalance operation to "park" a GPU object
(e.g. a cupy array) on a CPU-only node? Or will it fail to unpickle?
I think there will be different answers for different libraries and
possibly different settings within the same library (e.g. with or without
unified memory).
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#4982 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTEHKBZW6MTJR4OJ4XDT4FMLRANCNFSM47MU3OHA>
.
Triage notifications on the go with GitHub Mobile for iOS
<https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675>
or Android
<https://play.google.com/store/apps/details?id=com.github.android&utm_campaign=notification-email>
.
|
Just got back today from PTO and am catching up generally. Still need to read through this thread, but wanted to link a few potentially related issues for discussion. |
System to Control Active Memory Management
There has been a lot of side conversations around active memory management,
but I don't think we've ever written this up properly (aside from maybe
#1002 , which I'm now closing)
Dask currently replicates data around the cluster whenever a dependency on
one worker needs to be moved for a dependent on another worker. This passive
approach is good because highly needed data tends to spread around, but bad
because excessive copies may start to take up distributed memory.
Motivating use cases
There are many situations where we want to more actively control the management
of data on the cluster. I'll list a few below:
Data that we know will be in high demand should be replicated proactively.
Demand might be either because of many not-yet-run dependencies,
or as insurance for workers that might go down
Data that has been replicated may no longer be necessary and it may be a good idea to clear it out
When we shut down a worker we want to offload its data before retiring it
If we're able to reliably shut down workers then we would like to cycle their processes to avoid issues like memory leaks
When a worker is running low on space we may want to offload its data rather than sending it to disk
When new workers arrive they should probably proactively pull data away from some of their more saturated peers
Advanced users will want to define their own special policies
The user may specify replication requests to manually motivate one of the above reasons
...
Challenges
However, making a general purpose system that can solve all of these problems
(and more) is hard for a few reasons. I'll list a few below:
even in cases where we have millions of tasks and thousands of workers
This requires a subtle dance between scheduler, possibly multiple
workers, and maybe other active policies
approach here and implement a fully general solution on the first try.
Instead we probably need to make a system that allows for a variety of
overlapping user-defined policies.
Existing work on rebalance logic
There is currently some work from @crusaderky on a special but important case
of this problem, making rebalance faster and more robust. The idea here being
that this is a good and general first step towards a larger system.
This is good work because it solves a tricky problem of how to quickly identify
data that should be moved, and good targets to accept this data. It also
starts to think about robust movement of data.
This is one example of logic that folks might want to implement
Proposed Control System
At the same time, I'd like for us to start thinking about a control system for
these policies. I'll outline something below to serve as a first pass. This
is the simplest system I could think of that can maybe solve the problems
listed above.
We have a variety of memory management policies which are invoked periodically,
they also optionally have a finished criterion, which will clear them from the
system.
Example, retire workers method
So retire workers might look like the following:
These RetireWorkerPolicy objects would live in some plugin for a while until
they finshed up. There might be a variety of other policies in there, some
long-lived, some short-lived
Actions
Each policy, when invoked, emits a set of proposed actions.
Actions can be one of two forms:
Note that we don't merge these two, as we did in replicate/rebalance.
This is error prone and requires careful attention.
I think that by separating these two we can achieve what we want with more
stability.
Acquire data
Workers already have a solid way to acquire a piece of data in a noisy cluster.
They use this today to gather dependencies. Let's reuse that logic.
However to avoid consistency issues I suspect that we're going to need to keep
track of what data each workers are currently trying to obtain. Otherwise, for
data that takes a long time to acquire we might mistakenly ask many workers to
get the same piece of data.
To address this I think that we could start tracking not only what data each
worker has, but what data it is trying to get. We would also have to track the
same information on the task. This would be a dynamic version of
TaskState.who_has
andWorkerState.has_what
. Something likeTaskState.who_is_acquiring
andWorkerState.collecting
but with betternames.
Remove data
Similarly with adding data ...
were just recently assigned a task that requires the piece of data as a
dependency.
Unlike with adding data, I suspect that we can do this without adding extra
state to the workers or tasks. I suspect that we can ask the worker to remove
a piece of data, and then immediately mark that data as removed so that future
scheduling decisions won't depend on it. If for some reason the worker
disagrees then it can send back its usual "I now have this data" message and
things may be ok.
Example, Replicate policy
Then, something like replicate might look like the following:
Some things to note here:
I hope that the system can be relatively simple, given the simple operations that we have.
wanted to by returning multiple actions, but we don't have to. We're
making the world a bit better. This system will be called again shortly,
so we don't need to be super-aggressive here. We just need to improve the
world a bit.
Control loop
I am hopeful (probably naively so) that the control loop can be fairly simple.
I hope that the following code gets my point across
The text was updated successfully, but these errors were encountered: