-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Opportunistic Caching #681
Comments
To be explicit, the mechanism to keep data on the cluster might look like this: class CachingPlugin(SchedulerPlugin):
def __init__(self, scheduler):
self.scheduler = scheduler
self.scheduler.add_plugin(self)
def transition(self, key, start, finish, nbytes=None, startstops=None, *args, **kwrags):
if start == 'processing' and finish == 'memory' and should_keep(nbytes, startstops, **kwargs):
self.scheduler.client_desires_keys(keys=[key], client='fake-caching-client')
no_longer_desired_keys = self.cleanup()
self.scheduler.client_releases_keys(keys=no_longer_desired_keys, client='fake-caching-client')
client.run_on_scheduler(lambda dask_scheduler: CachingPlugin(dask_scheduler) |
Thanks. I followed the above example to write an customized cache plugin for our own dask grid now. Trying to test it for some time. |
@mrocklin is the scheduler API explained somewhere? Can you provide more explanation of how this works? What do |
Scheduler plugins are at https://distributed.dask.org/en/latest/plugins.html
and the Scheduler API is at
https://distributed.dask.org/en/latest/scheduling-state.html#distributed.scheduler.Scheduler
…On Thu, Jun 13, 2019 at 12:11 PM IPetrik ***@***.***> wrote:
To be explicit, the mechanism to keep data on the cluster might look like
this:
class CachingPlugin(SchedulerPlugin):
def __init__(self, scheduler):
self.scheduler = scheduler
self.scheduler.add_plugin(self)
def transition(self, key, start, finish, nbytes=None, startstops=None, *args, **kwrags):
if start == 'processing' and finish == 'memory' and should_keep(nbytes, startstops, **kwargs):
self.scheduler.client_desires_keys(keys=[key], client='fake-caching-client')
no_longer_desired_keys = self.cleanup()
self.scheduler.client_releases_keys(keys=no_longer_desired_keys, client='fake-caching-client')
client.run_on_scheduler(lambda dask_scheduler: CachingPlugin(dask_scheduler)
@mrocklin <https://github.com/mrocklin> is the scheduler API explained
somewhere? Can you provide more explanation of how this works? What do
client_desires_keys and client_releases_keys do?
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
<#681?email_source=notifications&email_token=AAKAOISH2A65RLTVGQOYDYLP2J5VDA5CNFSM4CWPLB6KYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGODXUMQQQ#issuecomment-501794882>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAKAOIW2G7Z4QK2QTXCVPZDP2J5VDANCNFSM4CWPLB6A>
.
|
I recently talked with the ilastik team, one of the wishlist items they brought up was 2 level caching (caching to disk, or in RAM) that would work with Dask distributed. @emilmelnikov this issue is likely the best place for discussion |
@GenevieveBuckley We also care about it in napari now. With the large remote datasets (100s+ GB) we're fetching it would be great to be able to have some on-disk persistence, given that the datasets do not change and we frequently want to revisit the same dataset. |
I will now. Thank you @GenevieveBuckley :) |
This has been brought up in other issues (and is somewhat tangential to this issue), but would recommend looking at |
Currently we clean up intermediate results quickly if they are not necessary for any further pending computation. This is good because it minimizes the memory footprint on the workers, often allowing us to process larger-than-distributed-memory computations.
However, this can sometimes be inefficient for interactive workloads when users submit related computations one after the other, so that the scheduler has no opportunity to plan ahead, and instead needs to recompute an intermediate result that was previously computed and garbage collected.
We could hold on to some of these results in hopes that the user will request them again. This trades active memory for potential CPU time. Ideally we would hold onto results that:
We did this for the single machine scheduler
We could do it in the distributed scheduler fairly easily by creating a
SchedulerPlugin
that watched all computations, selected computations to keep based on logic similar to what is currently in cachey, and created a fake Client to keep an active reference to those keys in the scheduler.The text was updated successfully, but these errors were encountered: