diff --git a/ci/upstream.yml b/ci/upstream.yml index 184c6710..2c2680bc 100644 --- a/ci/upstream.yml +++ b/ci/upstream.yml @@ -24,7 +24,7 @@ dependencies: - fsspec - pip - pip: - - zarr==3.0.0b1 # beta release of zarr-python v3 + - icechunk # Installs zarr v3 as dependency - git+https://github.com/pydata/xarray@zarr-v3 # zarr-v3 compatibility branch - git+https://github.com/zarr-developers/numcodecs@zarr3-codecs # zarr-v3 compatibility branch # - git+https://github.com/fsspec/kerchunk@main # kerchunk is currently incompatible with zarr-python v3 (https://github.com/fsspec/kerchunk/pull/516) diff --git a/conftest.py b/conftest.py index 3af4bf06..810fd833 100644 --- a/conftest.py +++ b/conftest.py @@ -1,6 +1,8 @@ import h5py +import numpy as np import pytest import xarray as xr +from xarray.core.variable import Variable def pytest_addoption(parser): @@ -96,3 +98,16 @@ def hdf5_scalar(tmpdir): dataset = f.create_dataset("scalar", data=0.1, dtype="float32") dataset.attrs["scalar"] = "true" return filepath + + +@pytest.fixture +def simple_netcdf4(tmpdir): + filepath = f"{tmpdir}/simple.nc" + + arr = np.arange(12, dtype=np.dtype("int32")).reshape(3, 4) + var = Variable(data=arr, dims=["x", "y"]) + ds = xr.Dataset({"foo": var}) + + ds.to_netcdf(filepath) + + return filepath diff --git a/docs/api.rst b/docs/api.rst index 81d08a77..755713d0 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -39,6 +39,7 @@ Serialization VirtualiZarrDatasetAccessor.to_kerchunk VirtualiZarrDatasetAccessor.to_zarr + VirtualiZarrDatasetAccessor.to_icechunk Rewriting diff --git a/docs/releases.rst b/docs/releases.rst index ee1ae402..93a5fec9 100644 --- a/docs/releases.rst +++ b/docs/releases.rst @@ -31,6 +31,9 @@ New Features - Support empty files (:pull:`260`) By `Justus Magin `_. +- Can write virtual datasets to Icechunk stores using `vitualize.to_icechunk` (:pull:`256`) + By `Matt Iannucci `_. + Breaking changes ~~~~~~~~~~~~~~~~ diff --git a/docs/usage.md b/docs/usage.md index a0f9d058..30eab144 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -396,6 +396,23 @@ combined_ds = xr.open_dataset('combined.parq', engine="kerchunk") By default references are placed in separate parquet file when the total number of references exceeds `record_size`. If there are fewer than `categorical_threshold` unique urls referenced by a particular variable, url will be stored as a categorical variable. +### Writing to an Icechunk Store + +We can also write these references out as an [IcechunkStore](https://icechunk.io/). `Icechunk` is a Open-source, cloud-native transactional tensor storage engine that is compatible with zarr version 3. To export our virtual dataset to an `Icechunk` Store, we simply use the {py:meth}`ds.virtualize.to_icechunk ` accessor method. + +```python +# create an icechunk store +from icechunk import IcechunkStore, StorageConfig, StoreConfig, VirtualRefConfig +storage = StorageConfig.filesystem(str('combined')) +store = IcechunkStore.create(storage=storage, mode="w", config=StoreConfig( + virtual_ref_config=VirtualRefConfig.s3_anonymous(region='us-east-1'), +)) + +combined_vds.virtualize.to_icechunk(store) +``` + +See the [Icechunk documentation](https://icechunk.io/icechunk-python/virtual/#creating-a-virtual-dataset-with-virtualizarr) for more details. + ### Writing as Zarr Alternatively, we can write these references out as an actual Zarr store, at least one that is compliant with the [proposed "Chunk Manifest" ZEP](https://github.com/zarr-developers/zarr-specs/issues/287). To do this we simply use the {py:meth}`ds.virtualize.to_zarr ` accessor method. diff --git a/virtualizarr/accessor.py b/virtualizarr/accessor.py index cc251e63..336838f9 100644 --- a/virtualizarr/accessor.py +++ b/virtualizarr/accessor.py @@ -1,5 +1,6 @@ from pathlib import Path from typing import ( + TYPE_CHECKING, Callable, Literal, overload, @@ -12,6 +13,9 @@ from virtualizarr.writers.kerchunk import dataset_to_kerchunk_refs from virtualizarr.writers.zarr import dataset_to_zarr +if TYPE_CHECKING: + from icechunk import IcechunkStore # type: ignore[import-not-found] + @register_dataset_accessor("virtualize") class VirtualiZarrDatasetAccessor: @@ -39,6 +43,20 @@ def to_zarr(self, storepath: str) -> None: """ dataset_to_zarr(self.ds, storepath) + def to_icechunk(self, store: "IcechunkStore") -> None: + """ + Write an xarray dataset to an Icechunk store. + + Any variables backed by ManifestArray objects will be be written as virtual references, any other variables will be loaded into memory before their binary chunk data is written into the store. + + Parameters + ---------- + store: IcechunkStore + """ + from virtualizarr.writers.icechunk import dataset_to_icechunk + + dataset_to_icechunk(self.ds, store) + @overload def to_kerchunk( self, filepath: None, format: Literal["dict"] diff --git a/virtualizarr/readers/zarr_v3.py b/virtualizarr/readers/zarr_v3.py index 6da81581..a1f4ab7d 100644 --- a/virtualizarr/readers/zarr_v3.py +++ b/virtualizarr/readers/zarr_v3.py @@ -150,5 +150,7 @@ def _configurable_to_num_codec_config(configurable: dict) -> dict: """ configurable_copy = configurable.copy() codec_id = configurable_copy.pop("name") + if codec_id.startswith("numcodecs."): + codec_id = codec_id[len("numcodecs.") :] configuration = configurable_copy.pop("configuration") return numcodecs.get_codec({"id": codec_id, **configuration}).get_config() diff --git a/virtualizarr/tests/test_integration.py b/virtualizarr/tests/test_integration.py index c9e3e302..09d0c0a8 100644 --- a/virtualizarr/tests/test_integration.py +++ b/virtualizarr/tests/test_integration.py @@ -27,7 +27,7 @@ def test_kerchunk_roundtrip_in_memory_no_concat(): chunks=(2, 2), compressor=None, filters=None, - fill_value=np.nan, + fill_value=None, order="C", ), chunkmanifest=manifest, diff --git a/virtualizarr/tests/test_manifests/test_array.py b/virtualizarr/tests/test_manifests/test_array.py index f3a9ee9f..06e54d95 100644 --- a/virtualizarr/tests/test_manifests/test_array.py +++ b/virtualizarr/tests/test_manifests/test_array.py @@ -47,7 +47,7 @@ def test_create_manifestarray_from_kerchunk_refs(self): assert marr.chunks == (2, 3) assert marr.dtype == np.dtype("int64") assert marr.zarray.compressor is None - assert marr.zarray.fill_value is np.nan + assert marr.zarray.fill_value == 0 assert marr.zarray.filters is None assert marr.zarray.order == "C" diff --git a/virtualizarr/tests/test_readers/test_kerchunk.py b/virtualizarr/tests/test_readers/test_kerchunk.py index 50d4b19b..f693b370 100644 --- a/virtualizarr/tests/test_readers/test_kerchunk.py +++ b/virtualizarr/tests/test_readers/test_kerchunk.py @@ -37,7 +37,7 @@ def test_dataset_from_df_refs(): assert da.data.zarray.compressor is None assert da.data.zarray.filters is None - assert da.data.zarray.fill_value is np.nan + assert da.data.zarray.fill_value == 0 assert da.data.zarray.order == "C" assert da.data.manifest.dict() == { diff --git a/virtualizarr/tests/test_writers/conftest.py b/virtualizarr/tests/test_writers/conftest.py new file mode 100644 index 00000000..28c5b3db --- /dev/null +++ b/virtualizarr/tests/test_writers/conftest.py @@ -0,0 +1,27 @@ +import numpy as np +import pytest +from xarray import Dataset +from xarray.core.variable import Variable + +from virtualizarr.manifests import ChunkManifest, ManifestArray + + +@pytest.fixture +def vds_with_manifest_arrays() -> Dataset: + arr = ManifestArray( + chunkmanifest=ChunkManifest( + entries={"0.0": dict(path="/test.nc", offset=6144, length=48)} + ), + zarray=dict( + shape=(2, 3), + dtype=np.dtype(" "IcechunkStore": + from icechunk import IcechunkStore, StorageConfig + + storage = StorageConfig.filesystem(str(tmpdir)) + + # TODO if icechunk exposed a synchronous version of .open then we wouldn't need to use asyncio.run here + # TODO is this the correct mode to use? + store = IcechunkStore.create(storage=storage, mode="w") + + # TODO instead yield store then store.close() ?? + return store + + +def test_write_new_virtual_variable( + icechunk_filestore: "IcechunkStore", vds_with_manifest_arrays: Dataset +): + vds = vds_with_manifest_arrays + + dataset_to_icechunk(vds, icechunk_filestore) + + # check attrs + root_group = group(store=icechunk_filestore) + assert isinstance(root_group, Group) + assert root_group.attrs == {"something": 0} + + # TODO check against vds, then perhaps parametrize? + + # check array exists + assert "a" in root_group + arr = root_group["a"] + assert isinstance(arr, Array) + + # check array metadata + # TODO why doesn't a .zarr_format or .version attribute exist on zarr.Array? + # assert arr.zarr_format == 3 + assert arr.shape == (2, 3) + assert arr.chunks == (2, 3) + assert arr.dtype == np.dtype(" Dataset: - arr = ManifestArray( - chunkmanifest=ChunkManifest( - entries={"0.0": dict(path="test.nc", offset=6144, length=48)} - ), - zarray=dict( - shape=(2, 3), - dtype=np.dtype(" bool: """ Several metadata attributes in ZarrV3 use a dictionary with keys "name" : str and "configuration" : dict diff --git a/virtualizarr/writers/icechunk.py b/virtualizarr/writers/icechunk.py new file mode 100644 index 00000000..6dadbc08 --- /dev/null +++ b/virtualizarr/writers/icechunk.py @@ -0,0 +1,204 @@ +from typing import TYPE_CHECKING, cast + +import numpy as np +from xarray import Dataset +from xarray.backends.zarr import encode_zarr_attr_value +from xarray.core.variable import Variable + +from virtualizarr.manifests import ChunkManifest, ManifestArray +from virtualizarr.zarr import encode_dtype + +if TYPE_CHECKING: + from icechunk import IcechunkStore # type: ignore[import-not-found] + from zarr import Group # type: ignore + + +VALID_URI_PREFIXES = { + "s3://", + # "gs://", # https://github.com/earth-mover/icechunk/issues/265 + # "azure://", # https://github.com/earth-mover/icechunk/issues/266 + # "r2://", + # "cos://", + # "minio://", + "file:///", +} + + +def dataset_to_icechunk(ds: Dataset, store: "IcechunkStore") -> None: + """ + Write an xarray dataset whose variables wrap ManifestArrays to an Icechunk store. + + Currently requires all variables to be backed by ManifestArray objects. + + Parameters + ---------- + ds: xr.Dataset + store: IcechunkStore + """ + try: + from icechunk import IcechunkStore # type: ignore[import-not-found] + from zarr import Group # type: ignore[import-untyped] + except ImportError: + raise ImportError( + "The 'icechunk' and 'zarr' version 3 libraries are required to use this function" + ) + + if not isinstance(store, IcechunkStore): + raise TypeError(f"expected type IcechunkStore, but got type {type(store)}") + + if not store.supports_writes: + raise ValueError("supplied store does not support writes") + + # TODO only supports writing to the root group currently + # TODO pass zarr_format kwarg? + root_group = Group.from_store(store=store) + + # TODO this is Frozen, the API for setting attributes must be something else + # root_group.attrs = ds.attrs + # for k, v in ds.attrs.items(): + # root_group.attrs[k] = encode_zarr_attr_value(v) + + return write_variables_to_icechunk_group( + ds.variables, + ds.attrs, + store=store, + group=root_group, + ) + + +def write_variables_to_icechunk_group( + variables, + attrs, + store, + group, +): + virtual_variables = { + name: var + for name, var in variables.items() + if isinstance(var.data, ManifestArray) + } + + loadable_variables = { + name: var for name, var in variables.items() if name not in virtual_variables + } + + # First write all the non-virtual variables + # NOTE: We set the attributes of the group before writing the dataset because the dataset + # will overwrite the root group's attributes with the dataset's attributes. We take advantage + # of xarrays zarr integration to ignore having to format the attributes ourselves. + ds = Dataset(loadable_variables, attrs=attrs) + ds.to_zarr(store, zarr_format=3, consolidated=False, mode="a") + + # Then finish by writing the virtual variables to the same group + for name, var in virtual_variables.items(): + write_virtual_variable_to_icechunk( + store=store, + group=group, + name=name, + var=var, + ) + + +def write_variable_to_icechunk( + store: "IcechunkStore", + group: "Group", + name: str, + var: Variable, +) -> None: + """Write a single (possibly virtual) variable into an icechunk store""" + if isinstance(var.data, ManifestArray): + write_virtual_variable_to_icechunk( + store=store, + group=group, + name=name, + var=var, + ) + else: + raise ValueError( + "Cannot write non-virtual variables as virtual variables to Icechunk stores" + ) + + +def write_virtual_variable_to_icechunk( + store: "IcechunkStore", + group: "Group", + name: str, + var: Variable, +) -> None: + """Write a single virtual variable into an icechunk store""" + ma = cast(ManifestArray, var.data) + zarray = ma.zarray + + # creates array if it doesn't already exist + arr = group.require_array( + name=name, + shape=zarray.shape, + chunk_shape=zarray.chunks, + dtype=encode_dtype(zarray.dtype), + codecs=zarray._v3_codec_pipeline(), + dimension_names=var.dims, + fill_value=zarray.fill_value, + # TODO fill_value? + ) + + # TODO it would be nice if we could assign directly to the .attrs property + for k, v in var.attrs.items(): + arr.attrs[k] = encode_zarr_attr_value(v) + arr.attrs["_ARRAY_DIMENSIONS"] = encode_zarr_attr_value(var.dims) + + _encoding_keys = {"_FillValue", "missing_value", "scale_factor", "add_offset"} + for k, v in var.encoding.items(): + if k in _encoding_keys: + arr.attrs[k] = encode_zarr_attr_value(v) + + write_manifest_virtual_refs( + store=store, + group=group, + arr_name=name, + manifest=ma.manifest, + ) + + +def write_manifest_virtual_refs( + store: "IcechunkStore", + group: "Group", + arr_name: str, + manifest: ChunkManifest, +) -> None: + """Write all the virtual references for one array manifest at once.""" + + key_prefix = f"{group.name}{arr_name}" + + # loop over every reference in the ChunkManifest for that array + # TODO inefficient: this should be replaced with something that sets all (new) references for the array at once + # but Icechunk need to expose a suitable API first + it = np.nditer( + [manifest._paths, manifest._offsets, manifest._lengths], # type: ignore[arg-type] + flags=[ + "refs_ok", + "multi_index", + "c_index", + ], + op_flags=[["readonly"]] * 3, # type: ignore + ) + for path, offset, length in it: + index = it.multi_index + chunk_key = "/".join(str(i) for i in index) + + # set each reference individually + store.set_virtual_ref( + # TODO it would be marginally neater if I could pass the group and name as separate args + key=f"{key_prefix}/c/{chunk_key}", # should be of form 'group/arr_name/c/0/1/2', where c stands for chunks + location=as_file_uri(path.item()), + offset=offset.item(), + length=length.item(), + ) + + +def as_file_uri(path): + # TODO a more robust solution to this requirement exists in https://github.com/zarr-developers/VirtualiZarr/pull/243 + if not any(path.startswith(prefix) for prefix in VALID_URI_PREFIXES) and path != "": + # assume path is local + return f"file://{path}" + else: + return path diff --git a/virtualizarr/writers/zarr.py b/virtualizarr/writers/zarr.py index b3dc8f1a..b9529ad5 100644 --- a/virtualizarr/writers/zarr.py +++ b/virtualizarr/writers/zarr.py @@ -80,7 +80,6 @@ def to_zarr_json(var: Variable, array_dir: Path) -> None: def zarr_v3_array_metadata(zarray: ZArray, dim_names: list[str], attrs: dict) -> dict: """Construct a v3-compliant metadata dict from v2 zarray + information stored on the xarray variable.""" # TODO it would be nice if we could use the zarr-python metadata.ArrayMetadata classes to do this conversion for us - metadata = zarray.dict() # adjust to match v3 spec @@ -95,7 +94,7 @@ def zarr_v3_array_metadata(zarray: ZArray, dim_names: list[str], attrs: dict) -> "name": "default", "configuration": {"separator": "/"}, } - metadata["codecs"] = zarray._v3_codec_pipeline() + metadata["codecs"] = tuple(c.to_dict() for c in zarray._v3_codec_pipeline()) metadata.pop("filters") metadata.pop("compressor") metadata.pop("order") diff --git a/virtualizarr/zarr.py b/virtualizarr/zarr.py index 4b3fdd53..e339a3f4 100644 --- a/virtualizarr/zarr.py +++ b/virtualizarr/zarr.py @@ -72,8 +72,11 @@ def codec(self) -> Codec: @classmethod def from_kerchunk_refs(cls, decoded_arr_refs_zarray) -> "ZArray": # coerce type of fill_value as kerchunk can be inconsistent with this + dtype = np.dtype(decoded_arr_refs_zarray["dtype"]) fill_value = decoded_arr_refs_zarray["fill_value"] - if fill_value is None or fill_value == "NaN" or fill_value == "nan": + if np.issubdtype(dtype, np.floating) and ( + fill_value is None or fill_value == "NaN" or fill_value == "nan" + ): fill_value = np.nan compressor = decoded_arr_refs_zarray["compressor"] @@ -84,7 +87,7 @@ def from_kerchunk_refs(cls, decoded_arr_refs_zarray) -> "ZArray": return ZArray( chunks=tuple(decoded_arr_refs_zarray["chunks"]), compressor=compressor, - dtype=np.dtype(decoded_arr_refs_zarray["dtype"]), + dtype=dtype, fill_value=fill_value, filters=decoded_arr_refs_zarray["filters"], order=decoded_arr_refs_zarray["order"], @@ -140,7 +143,7 @@ def replace( replacements["zarr_format"] = zarr_format return dataclasses.replace(self, **replacements) - def _v3_codec_pipeline(self) -> list: + def _v3_codec_pipeline(self) -> Any: """ VirtualiZarr internally uses the `filters`, `compressor`, and `order` attributes from zarr v2, but to create conformant zarr v3 metadata those 3 must be turned into `codecs` objects. @@ -153,46 +156,46 @@ def _v3_codec_pipeline(self) -> list: post_compressor: Iterable[BytesBytesCodec] #optional ``` """ - import numcodecs + try: + from zarr.core.metadata.v3 import ( # type: ignore[import-untyped] + parse_codecs, + ) + except ImportError: + raise ImportError("zarr v3 is required to generate v3 codec pipelines") - if self.filters: - filter_codecs_configs = [ - numcodecs.get_codec(filter).get_config() for filter in self.filters - ] - filters = [ - dict(name=codec.pop("id"), configuration=codec) - for codec in filter_codecs_configs - ] - else: - filters = [] - - # Noting here that zarr v3 has very few codecs specificed in the official spec, - # and that there are far more codecs in `numcodecs`. We take a gamble and assume - # that the codec names and configuration are simply mapped into zarrv3 "configurables". - if self.compressor: - compressor = [_num_codec_config_to_configurable(self.compressor)] - else: - compressor = [] + codec_configs = [] # https://zarr-specs.readthedocs.io/en/latest/v3/codecs/transpose/v1.0.html#transpose-codec-v1 # Either "C" or "F", defining the layout of bytes within each chunk of the array. # "C" means row-major order, i.e., the last dimension varies fastest; # "F" means column-major order, i.e., the first dimension varies fastest. - if self.order == "C": - order = tuple(range(len(self.shape))) - elif self.order == "F": + # For now, we only need transpose if the order is not "C" + if self.order == "F": order = tuple(reversed(range(len(self.shape)))) + transpose = dict(name="transpose", configuration=dict(order=order)) + codec_configs.append(transpose) - transpose = dict(name="transpose", configuration=dict(order=order)) # https://github.com/zarr-developers/zarr-python/pull/1944#issuecomment-2151994097 # "If no ArrayBytesCodec is supplied, we can auto-add a BytesCodec" bytes = dict( name="bytes", configuration={} ) # TODO need to handle endianess configuration + codec_configs.append(bytes) + + # Noting here that zarr v3 has very few codecs specificed in the official spec, + # and that there are far more codecs in `numcodecs`. We take a gamble and assume + # that the codec names and configuration are simply mapped into zarrv3 "configurables". + if self.filters: + codec_configs.extend( + [_num_codec_config_to_configurable(filter) for filter in self.filters] + ) + + if self.compressor: + codec_configs.append(_num_codec_config_to_configurable(self.compressor)) + + # convert the pipeline repr into actual v3 codec objects + codec_pipeline = parse_codecs(codec_configs) - # The order here is significant! - # [ArrayArray] -> ArrayBytes -> [BytesBytes] - codec_pipeline = [transpose, bytes] + compressor + filters return codec_pipeline @@ -220,5 +223,9 @@ def _num_codec_config_to_configurable(num_codec: dict) -> dict: """ Convert a numcodecs codec into a zarr v3 configurable. """ + if num_codec["id"].startswith("numcodecs."): + return num_codec + num_codec_copy = num_codec.copy() - return {"name": num_codec_copy.pop("id"), "configuration": num_codec_copy} + name = "numcodecs." + num_codec_copy.pop("id") + return {"name": name, "configuration": num_codec_copy}