Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add open_virtual_mfdataset #349

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
Draft

Add open_virtual_mfdataset #349

wants to merge 16 commits into from

Conversation

TomNicholas
Copy link
Member

@TomNicholas TomNicholas commented Dec 16, 2024

Here I have copied the code from xr.open_mfdataset, changed it to use open_virtual_dataset, and added an option to parallelize with lithops as an alternative to using dask.delayed.

I haven't even tried to run this yet, but I think this is the right approach @tomwhite? I realised we don't need cubed's blockwise because xarray.open_mfdataset has internal logic to turn the N-dimensional concat into a 1D map already, so lithops.map should be fine?

Also I think based on our conversation we should be able to use lithops.map instead of lithops.map_reduce like @thodson-usgs did in #203 because the tiny size of the virtual datasets being returned to the client means that we should be able to get away with a single reduction step on the client even at large scale? (see also #104 for justification that we only need to send back kB-sized objects).

datasets, closers = dask.compute(datasets, closers)
elif parallel == "lithops":

def generate_refs(path):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the equivalent of @thodson-usgs 's map_references function


# wait for all the serverless workers to finish, and send their resulting virtual datasets back to the client
completed_futures, _ = fn_exec.wait(futures, download_results=True)
virtual_datasets = [future.get_result() for future in completed_futures]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC this will cause every serverless worker to send a small virtual dataset back to the client process over the internet somehow

elif combine == "by_coords":
# Redo ordering from coordinates, ignoring how they were ordered
# previously
combined = combine_by_coords(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only going to work if we have used loadable_variables to create indexes for 1D coordinates, so it's a good reason to implement the suggestion in #335 (comment)

Comment on lines 18 to 20
from xarray.backends.api import _multi_file_closer
from xarray.backends.common import _find_absolute_paths
from xarray.core.combine import _infer_concat_order_from_positions, _nested_combine
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like importing these deep xarray internals like this (though _infer_concat_order_from_positions and _nested_combine haven't changed since I wrote them 6 years ago), but the only alternative would be to make a general virtualizarr backend engine for xarray (see #35).

Comment on lines +351 to +352
# lithops doesn't have a delayed primitive
open_ = open_virtual_dataset
Copy link
Member Author

@TomNicholas TomNicholas Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the code would be more straightforward if the parallel primitive we used for lithops was the same as the one we used for dask.

Comment on lines +344 to +345
elif parallel == "lithops":
import lithops
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe all of this could also be useful upstream in xr.open_mfdataset

@tomwhite
Copy link
Collaborator

I realised we don't need cubed's blockwise because xarray.open_mfdataset has internal logic to turn the N-dimensional concat into a 1D map already, so lithops.map should be fine?

Yes, that should work fine. We may want to loosen/generalize blockwise slightly in Cubed to return an arbitrary object so it can be done with Cubed - but that can be done later.

Also I think based on our conversation we should be able to use lithops.map instead of lithops.map_reduce like @thodson-usgs did in #203 because the tiny size of the virtual datasets being returned to the client means that we should be able to get away with a single reduction step on the client even at large scale? (see also #104 for justification that we only need to send back kB-sized objects).

Agreed - it will be interesting to see this for large datasets. (It's also similar to the approach I've taken for storing data in Icechunk where the changesets are returned to the client - again, small kB-sized UUIDs.)

Comment on lines +510 to +529
print(combined_vds)
print(expected_vds)
print(combined_vds.indexes)
print(combined_vds.indexes)
print(combined_vds["lat"].attrs)
print(expected_vds["lat"].attrs)
print(combined_vds["lat"].encoding)
print(expected_vds["lat"].encoding)
print(combined_vds["lat"].data)
print(expected_vds["lat"].data)
print(combined_vds["lat"].data.zarray)
print(expected_vds["lat"].data.zarray)
print(combined_vds["lat"].data.manifest.dict())
print(expected_vds["lat"].data.manifest.dict())

# TODO this assertion unintentially triggers loading, see issue #354
# xrt.assert_identical(combined_vds.coords.variables['lat'], expected_vds.coords.variables['lat'])

# TODO I have no idea why this assertion fails for all the coords - everything about the coords looks identical
# xrt.assert_identical(combined_vds, expected_vds)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm stuck on why this assert_identical on the datasets doesn't pass. It complains that all the coordinates are different, but every attribute I can think to check looks identical 😖

@@ -19,7 +19,7 @@ Reading
:toctree: generated/

open_virtual_dataset

open_virtual_mfdataset
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: the docs and especially the readme should be rewritten to put this function front and center.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants