-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add open_virtual_mfdataset #349
base: main
Are you sure you want to change the base?
Conversation
datasets, closers = dask.compute(datasets, closers) | ||
elif parallel == "lithops": | ||
|
||
def generate_refs(path): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the equivalent of @thodson-usgs 's map_references
function
def map_references(fil): |
virtualizarr/backend.py
Outdated
|
||
# wait for all the serverless workers to finish, and send their resulting virtual datasets back to the client | ||
completed_futures, _ = fn_exec.wait(futures, download_results=True) | ||
virtual_datasets = [future.get_result() for future in completed_futures] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC this will cause every serverless worker to send a small virtual dataset back to the client process over the internet somehow
virtualizarr/backend.py
Outdated
elif combine == "by_coords": | ||
# Redo ordering from coordinates, ignoring how they were ordered | ||
# previously | ||
combined = combine_by_coords( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only going to work if we have used loadable_variables
to create indexes for 1D coordinates, so it's a good reason to implement the suggestion in #335 (comment)
virtualizarr/backend.py
Outdated
from xarray.backends.api import _multi_file_closer | ||
from xarray.backends.common import _find_absolute_paths | ||
from xarray.core.combine import _infer_concat_order_from_positions, _nested_combine |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like importing these deep xarray internals like this (though _infer_concat_order_from_positions
and _nested_combine
haven't changed since I wrote them 6 years ago), but the only alternative would be to make a general virtualizarr backend engine for xarray (see #35).
# lithops doesn't have a delayed primitive | ||
open_ = open_virtual_dataset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the code would be more straightforward if the parallel primitive we used for lithops was the same as the one we used for dask.
elif parallel == "lithops": | ||
import lithops |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe all of this could also be useful upstream in xr.open_mfdataset
Yes, that should work fine. We may want to loosen/generalize blockwise slightly in Cubed to return an arbitrary object so it can be done with Cubed - but that can be done later.
Agreed - it will be interesting to see this for large datasets. (It's also similar to the approach I've taken for storing data in Icechunk where the changesets are returned to the client - again, small kB-sized UUIDs.) |
print(combined_vds) | ||
print(expected_vds) | ||
print(combined_vds.indexes) | ||
print(combined_vds.indexes) | ||
print(combined_vds["lat"].attrs) | ||
print(expected_vds["lat"].attrs) | ||
print(combined_vds["lat"].encoding) | ||
print(expected_vds["lat"].encoding) | ||
print(combined_vds["lat"].data) | ||
print(expected_vds["lat"].data) | ||
print(combined_vds["lat"].data.zarray) | ||
print(expected_vds["lat"].data.zarray) | ||
print(combined_vds["lat"].data.manifest.dict()) | ||
print(expected_vds["lat"].data.manifest.dict()) | ||
|
||
# TODO this assertion unintentially triggers loading, see issue #354 | ||
# xrt.assert_identical(combined_vds.coords.variables['lat'], expected_vds.coords.variables['lat']) | ||
|
||
# TODO I have no idea why this assertion fails for all the coords - everything about the coords looks identical | ||
# xrt.assert_identical(combined_vds, expected_vds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm stuck on why this assert_identical
on the datasets doesn't pass. It complains that all the coordinates are different, but every attribute I can think to check looks identical 😖
@@ -19,7 +19,7 @@ Reading | |||
:toctree: generated/ | |||
|
|||
open_virtual_dataset | |||
|
|||
open_virtual_mfdataset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to self: the docs and especially the readme should be rewritten to put this function front and center.
Here I have copied the code from
xr.open_mfdataset
, changed it to useopen_virtual_dataset
, and added an option to parallelize with lithops as an alternative to usingdask.delayed
.I haven't even tried to run this yet, but I think this is the right approach @tomwhite? I realised we don't need cubed's blockwise because xarray.open_mfdataset has internal logic to turn the N-dimensional concat into a 1D map already, so
lithops.map
should be fine?Also I think based on our conversation we should be able to use
lithops.map
instead oflithops.map_reduce
like @thodson-usgs did in #203 because the tiny size of the virtual datasets being returned to the client means that we should be able to get away with a single reduction step on the client even at large scale? (see also #104 for justification that we only need to send back kB-sized objects).open_virtual_mfdataset
as suggested in open_virtual_mfdataset #345, but also sketches out how we might close both Trying to runopen_virtual_dataset
in parallel #95 and Serverless parallelization of reference generation #123docs/releases.rst
api.rst