Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introducing FilePatternToChunks: IO with Pangeo-Forge's FilePattern interface. #31

Merged
merged 20 commits into from
Sep 22, 2021

Conversation

alxmrs
Copy link
Contributor

@alxmrs alxmrs commented Aug 9, 2021

This if the first of a few changes that will let users read in datasets using Pangeo-Forge's FilePattern interface 0. Here, users can describe how data is stored along concat and merge dimensions. This transform will read in the datasets into chunks. This module can be leveraged in pipelines to convert natively formatted datasets to Zarr.

To make use of this transform, the user will need to install pangeo-forge-recipes separately. This dependency is included in the test dependencies.

As of now, this transform is not exposed to the user (i.e., not included in the primary __init__.py). I plan to do this (and update the docs) once the module is tested and feature complete (#29).

…nterface.

This if the first of a few changes that will let users read in datasets using Pangeo-Forge's `FilePattern` interface [0]. Here, users can describe how data is stored along concat and merge dimensions. This transform will read in the datasets into chunks (and optionally, smaller `sub_chunks`). This module can be leveraged in pipelines to convert natively formatted datasets to Zarr.

To make use of this transform, the user will need to install `pangeo-forge-recipes` separately. This dependency is included in the test dependencies.

As on now, this transform is not exposed to the user (i.e., not included in the primary `__init__.py`). I plan to do this (and update the docs) once the module is tested and feature complete (google#29).

[0]: https://pangeo-forge.readthedocs.io/en/latest/file_patterns.html
@google-cla google-cla bot added the cla: yes label Aug 9, 2021
) -> Iterator[Tuple[core.Key, xarray.Dataset]]:
"""Open datasets into chunks with XArray."""
path = self.pattern[index]
with FileSystems().open(path) as file:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do Beam's filesystems really all work out of the box with Xarray? If so, that's awesome!

Can you verify that it works with both netCDF3 and netCDF4 files? These would be using different underlying storage backends (scipy vs h5netcdf).

To be honest, I'm a little skeptical that this will work well. I suspect we'll end up up needing to copy temporary files to local disk (but I'd love to be proven wrong!)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me experiment and see how this works. In my tests in the previous iteration of this change, this worked well with GCS's IO objects.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I experimented a bit more with this based on @mjwillson's suggestion.

Amazingly, it seems that uses file-like objects in Xarray does actually work as used here, though making a local copy might still have better performance.

What doesn't work yet -- but hopefully with small upstream changes to Xarray could work -- is passing xarray datasets opened with these file-like objects into a Beam pipeilne. That could let us do the actual data loading from netCDF in separate workers, which could be quite a win!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit unclear to me how this would not work in a Beam pipeline (or, what needs to be done to get this win). Can you explain a bit more?

Is this a correct understanding: With the change you're referring to, we could pickle the XArray open command (with the file-like object) as PCollections, which would allow us to split the open across workers?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a correct understanding: With the change you're referring to, we could pickle the XArray open command (with the file-like object) as PCollections, which would allow us to split the open across workers?

With this change, we could pickle lazy xarray.Dataset objects corresponding to open netCDF files and pass them between stages in in a Beam pipeline.

Some data would still need to get loaded on worker on which xarray.open_dataset() is called, but this could be much less data than the entire file (e.g., only the "metadata" part of the file). The bulk of the loading work could be split across multiple workers, which could be quite useful for processing large (GB+) netCDF files.

xarray_beam/_src/pangeo.py Outdated Show resolved Hide resolved
xarray_beam/_src/pangeo.py Outdated Show resolved Hide resolved
xarray_beam/_src/pangeo.py Outdated Show resolved Hide resolved
xarray_beam/_src/pangeo.py Outdated Show resolved Hide resolved
xarray_beam/_src/pangeo.py Outdated Show resolved Hide resolved
xarray_beam/_src/pangeo_test.py Outdated Show resolved Hide resolved
xarray_beam/_src/pangeo_test.py Outdated Show resolved Hide resolved
xarray_beam/_src/pangeo_test.py Outdated Show resolved Hide resolved
xarray_beam/_src/pangeo_test.py Outdated Show resolved Hide resolved
xarray_beam/_src/pangeo_forge.py Outdated Show resolved Hide resolved
xarray_beam/_src/pangeo_forge.py Outdated Show resolved Hide resolved
…nks.

This transform will now only open file pattern datasets as whole chunks. Re-chunk (i.e. "sub_chunk"s) can be delegated to a SplitChunk() transform layered after this one.
As a backup to the `FileSystems().open(...)` method, we use fsspec to create a local copy of the data for opening with `xr.open_dataset(...)`.
Copy link
Member

@shoyer shoyer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great, thanks Alex!

xarray_beam/_src/pangeo_forge.py Outdated Show resolved Hide resolved
@shoyer shoyer added the pull ready Ready for Copybara import and testing label Sep 22, 2021
Comment on lines 106 to 120
try:
yield xarray.open_dataset(file, **self.xarray_open_kwargs)
except (TypeError, OSError) as e:

if not self.local_copy:
raise ValueError(f'cannot open {path!r} with buffering.') from e

# The cfgrib engine (and others) may fail with the FileSystems method of
# opening with BufferedReaders. Here, we open the data locally to make
# it easier to work with XArray.
with fsspec.open_local(
f"simplecache::{path}",
simplecache={'cache_storage': '/tmp/files'}
) as fs_file:
yield xarray.open_dataset(fs_file, **self.xarray_open_kwargs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than using local_copy as a fall-back, can we just use an if statement?

Suggested change
try:
yield xarray.open_dataset(file, **self.xarray_open_kwargs)
except (TypeError, OSError) as e:
if not self.local_copy:
raise ValueError(f'cannot open {path!r} with buffering.') from e
# The cfgrib engine (and others) may fail with the FileSystems method of
# opening with BufferedReaders. Here, we open the data locally to make
# it easier to work with XArray.
with fsspec.open_local(
f"simplecache::{path}",
simplecache={'cache_storage': '/tmp/files'}
) as fs_file:
yield xarray.open_dataset(fs_file, **self.xarray_open_kwargs)
if self.local_copy:
# The cfgrib engine (and others) may fail with the FileSystems method of
# opening with BufferedReaders. Here, we open the data locally to make
# it easier to work with XArray.
with fsspec.open_local(
f"simplecache::{path}",
simplecache={'cache_storage': '/tmp/files'}
) as fs_file:
yield xarray.open_dataset(fs_file, **self.xarray_open_kwargs)
else:
yield xarray.open_dataset(file, **self.xarray_open_kwargs)

The old contextmanager approach wasn't applicable, since `open_local` returns a string (path to the open file).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla: yes pull ready Ready for Copybara import and testing
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants