-
-
Notifications
You must be signed in to change notification settings - Fork 402
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added support for dask arrays in GridInterface #2305
Conversation
Thanks @philippjfr. Sorry for getting back to you so late. How would I use this to say instantiate a |
It should work something like this: darr = da.from_array(np.random.rand(10, 20))
# Tuple constructor
hv.Dataset((range(20), range(10), darr), ['x', 'y'], 'z')
# Dict constructor
hv.Dataset({'x': range(20), 'y': range(10), 'z': darr}, ['x', 'y'], 'z') Basically you declare coordinate arrays describing the values along the axes of your dask array and then pass them in as a tuple or dictionary. The order of key dimensions should be the reverse order of the axes of the array. |
Thanks for the tips. Managed to get that to work. Minor note: |
Anything else needed before this can go in? |
The Dataset tests for the |
Looks like the errors were in the test code itself:
|
This is ready to merge once tests pass. |
@@ -301,6 +310,8 @@ def values(cls, dataset, dim, expanded=True, flat=True): | |||
if dim in dataset.vdims or dataset.data[dim.name].ndim > 1: | |||
data = dataset.data[dim.name] | |||
data = cls.canonicalize(dataset, data) | |||
if da and isinstance(data, da.Array): | |||
data = data.compute() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When does this get called? On what data is this being called on?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should only be called during plotting, and only on the subset of data that you are actually displaying. If you use a datashader operation this won't end up being called at all, and all the processing should happen out-of-core.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, thanks for clarifying. Sounds reasonable.
Does this get cached?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, but if you've got ideas about caching I'd love to hear them. I'm currently running into issues with some remote datasets, which end up being downloaded multiple times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the local scheduler there is http://dask.pydata.org/en/latest/caching.html
No opportunistic caching exists today for the distributed scheduler. It's wouldn't be particularly hard to do though if someone wants to dive in. Relevant issue here: dask/distributed#681
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naively was thinking we might be able to call persist
here. It would still be blocking in some cases (single machine scheduler), but no worse than calling compute
. Though given we would want the array as well (in cases where we would get Futures), we would need somewhere to hang on to the Futures. Perhaps in Holoviews (i.e. an object of some kind), a Distributed Client (if using Distributed), somewhere else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the use case here a multi-dimensional array which should be loaded into memory incrementally as you are exploring it? For example let's say you had a stack of images with x, y and z axes and you are browsing through the z-stack, you want each 2D array to persist in memory once it has been displayed? I think that would be a very useful option but I'd have to think about the best approach to support that.
Currently our interfaces are stateless but I could imagine passing through a persist
option to the interfaces in the constructor, which tells them to call .persist()
before calling .compute()
. To make this concrete:
darr = da.from_array(np.random.rand(30, 20, 20), (1, 1, 3))
ds = hv.Dataset((range(20), range(20), range(30), darr),
['x', 'y', 'z'], 'Value', persist=True)
z_stack = ds.to(hv.Image, ['x', 'y'])
z_stack[10]['Value']
In this example we create the 3D array, use .to
to group it by the z-axis and then fetch the 2D array corresponding to z-value 10 and then use getitem to load the 'Value' array into memory. If we add the persist flag the array would then stay in memory (ignoring the fact that this example is in memory anyway) and subsequent access to z_stack[10]['Value']
would be much faster. Is this what you had in mind? If so we'll need to discuss whether interfaces could/should have state for backend specific options like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that medium-term this is the sort of thing that should be handled on the Dask side. We're well-positioned to solve this with minimal code-complexity elsewhere. Again, you can probably do this on the local (non-distributed) scheduler today.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be interesting to see how calling these lines ahead of time affected performance:
from dask.cache import Cache
cache = Cache(2e9).register()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To answer your question, @philippjfr, yes, basically as you described. We are looking at time instead of z (and the axis order differs), but for all intensive purposes this difference is merely semantic as far as Holoviews is concerned.
FWIW, as @mrocklin points out, we do have other options on the Dask side that could be helpful here or could be extended a bit (e.g. Cache support on Distributed). Issue ( dask/distributed#681 ) outlines this problem a bit more. Alternatively there are some existing Dask options, which could allow this to be solved in user code.
There's a separate discussion that is a bit more interesting, which essentially involves having something like Holoviews drive computational priority in Dask. IOW one requests a large array be persisted (in memory, persist
, or on disk, with return_stored=True
), but would like to prioritize computations based on what a user would like to look at first. Outlined a bit more in issue ( dask/distributed#1753 ).
Thanks @philippjfr. Had one question above. Otherwise LGTM. |
Looks good to me as well and I am happy to see a good number of unit tests. I'm also happy to see that |
Note that what you are seeing is only a small fraction of unit tests, it's running the entire dataset test suite, which now includes about 130 tests. |
Thanks all. :) |
@mrocklin, this may be of interest. ;) |
Going to have to follow up on this, it seems the groupby implementation is loading all the data into memory. |
@@ -491,7 +502,10 @@ def sample(cls, dataset, samples=[]): | |||
data[d].append(arr) | |||
for vdim, array in zip(dataset.vdims, arrays): | |||
flat_index = np.ravel_multi_index(tuple(int_inds)[::-1], array.shape) | |||
data[vdim.name].append(array.flat[flat_index]) | |||
if da and isinstance(array, da.Array): | |||
data[vdim.name].append(array.flatten()[tuple(flat_index)]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think the indexing is the issue here. If you switch to vindex
, IIUC what flat_index
means, this should be lazy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, the same issue also applied in select
which was causing most of the problems. Followed up in #2424.
This pull request has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs. |
As requested in #2176 this PR adds support for using dask arrays with the gridded interface. This turned out to be completely trivial (as you can see most of the new code is tests) and let users use dask arrays without relying on the heavier xarray dependency. @jakirkham Would you mind testing this and seeing whether it addresses your requirements?