From 39722e7019e3af5f8079c8f3d4d734dd8c866aeb Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Fri, 4 Oct 2024 16:37:44 -0400 Subject: [PATCH 01/22] Save progress for next week --- kerchunk/combine.py | 8 ++++---- kerchunk/fits.py | 2 +- kerchunk/grib2.py | 4 ++-- kerchunk/hdf4.py | 2 +- kerchunk/netCDF3.py | 2 +- kerchunk/tests/test_combine.py | 6 +++--- kerchunk/tests/test_combine_concat.py | 20 ++++++++++---------- kerchunk/tests/test_fits.py | 10 +++++----- kerchunk/tests/test_grib.py | 10 +++++----- kerchunk/tests/test_hdf.py | 20 ++++++++++---------- kerchunk/tests/test_tiff.py | 4 ++-- kerchunk/tests/test_utils.py | 8 ++++---- kerchunk/utils.py | 2 +- pyproject.toml | 2 +- 14 files changed, 50 insertions(+), 50 deletions(-) diff --git a/kerchunk/combine.py b/kerchunk/combine.py index eb891de1..155ba4c9 100644 --- a/kerchunk/combine.py +++ b/kerchunk/combine.py @@ -203,7 +203,7 @@ def append( ds = xr.open_dataset( fs.get_mapper(), engine="zarr", backend_kwargs={"consolidated": False} ) - z = zarr.open(fs.get_mapper()) + z = zarr.open(fs.get_mapper(), zarr_version=2) mzz = MultiZarrToZarr( path, out=fs.references, # dict or parquet/lazy @@ -360,7 +360,7 @@ def first_pass(self): fs._dircache_from_items() logger.debug("First pass: %s", i) - z = zarr.open_group(fs.get_mapper("")) + z = zarr.open_group(fs.get_mapper(""), zarr_version=2) for var in self.concat_dims: value = self._get_value(i, z, var, fn=self._paths[i]) if isinstance(value, np.ndarray): @@ -387,7 +387,7 @@ def store_coords(self): """ kv = {} store = zarr.storage.KVStore(kv) - group = zarr.open(store) + group = zarr.open(store, zarr_version=2) m = self.fss[0].get_mapper("") z = zarr.open(m) for k, v in self.coos.items(): @@ -461,7 +461,7 @@ def second_pass(self): for i, fs in enumerate(self.fss): to_download = {} m = fs.get_mapper("") - z = zarr.open(m) + z = zarr.open(m, zarr_version=2) if no_deps is None: # done first time only diff --git a/kerchunk/fits.py b/kerchunk/fits.py index 18729a9b..f714af97 100644 --- a/kerchunk/fits.py +++ b/kerchunk/fits.py @@ -72,7 +72,7 @@ def process_file( storage_options = storage_options or {} out = out or {} - g = zarr.open(out) + g = zarr.open(out, zarr_version=2) with fsspec.open(url, mode="rb", **storage_options) as f: infile = fits.open(f, do_not_scale_image_data=True) diff --git a/kerchunk/grib2.py b/kerchunk/grib2.py index f105fe8b..06108db5 100644 --- a/kerchunk/grib2.py +++ b/kerchunk/grib2.py @@ -191,7 +191,7 @@ def scan_grib( if good is False: continue - z = zarr.open_group(store) + z = zarr.open_group(store, zarr_version=2) global_attrs = { f"GRIB_{k}": m[k] for k in cfgrib.dataset.GLOBAL_ATTRIBUTES_KEYS @@ -398,7 +398,7 @@ def grib_tree( # TODO allow passing a LazyReferenceMapper as output? zarr_store = {} - zroot = zarr.open_group(store=zarr_store) + zroot = zarr.open_group(store=zarr_store, zarr_version=2) aggregations: Dict[str, List] = defaultdict(list) aggregation_dims: Dict[str, Set] = defaultdict(set) diff --git a/kerchunk/hdf4.py b/kerchunk/hdf4.py index 483ffba7..4235d139 100644 --- a/kerchunk/hdf4.py +++ b/kerchunk/hdf4.py @@ -144,7 +144,7 @@ def translate(self, filename=None, storage_options=None): remote_protocol=prot, remote_options=self.st, ) - g = zarr.open_group("reference://", storage_options=dict(fs=fs)) + g = zarr.open_group("reference://", storage_options=dict(fs=fs), zarr_version=2) refs = {} for k, v in output.items(): if isinstance(v, dict): diff --git a/kerchunk/netCDF3.py b/kerchunk/netCDF3.py index d43b6b97..8e0994ca 100644 --- a/kerchunk/netCDF3.py +++ b/kerchunk/netCDF3.py @@ -167,7 +167,7 @@ def translate(self): import zarr out = self.out - z = zarr.open(out, mode="w") + z = zarr.open(out, mode="w", zarr_version=2) for dim, var in self.variables.items(): if dim in self.chunks: shape = self.chunks[dim][-1] diff --git a/kerchunk/tests/test_combine.py b/kerchunk/tests/test_combine.py index 13994921..1b5713b2 100644 --- a/kerchunk/tests/test_combine.py +++ b/kerchunk/tests/test_combine.py @@ -133,14 +133,14 @@ # simple time arrays - xarray can't make these! m = fs.get_mapper("time1.zarr") -z = zarr.open(m, mode="w") +z = zarr.open(m, mode="w", zarr_version=2) ar = z.create_dataset("time", data=np.array([1], dtype="M8[s]")) ar.attrs.update({"_ARRAY_DIMENSIONS": ["time"]}) ar = z.create_dataset("data", data=arr) ar.attrs.update({"_ARRAY_DIMENSIONS": ["time", "x", "y"]}) m = fs.get_mapper("time2.zarr") -z = zarr.open(m, mode="w") +z = zarr.open(m, mode="w", zarr_version=2) ar = z.create_dataset("time", data=np.array([2], dtype="M8[s]")) ar.attrs.update({"_ARRAY_DIMENSIONS": ["time"]}) ar = z.create_dataset("data", data=arr) @@ -272,7 +272,7 @@ def test_get_coos(refs, selector, expected): mzz.first_pass() assert mzz.coos["time"].tolist() == expected mzz.store_coords() - g = zarr.open(mzz.out) + g = zarr.open(mzz.out, zarr_version=2) assert g["time"][:].tolist() == expected assert dict(g.attrs) diff --git a/kerchunk/tests/test_combine_concat.py b/kerchunk/tests/test_combine_concat.py index 3f7ff823..f51f10e8 100644 --- a/kerchunk/tests/test_combine_concat.py +++ b/kerchunk/tests/test_combine_concat.py @@ -51,7 +51,7 @@ def test_success(tmpdir, arrays, chunks, axis, m): refs = [] for i, x in enumerate(arrays): fn = f"{tmpdir}/out{i}.zarr" - g = zarr.open(fn) + g = zarr.open(fn, zarr_version=2) g.create_dataset("x", data=x, chunks=chunks) fns.append(fn) ref = kerchunk.zarr.single_zarr(fn, inline=0) @@ -62,7 +62,7 @@ def test_success(tmpdir, arrays, chunks, axis, m): ) mapper = fsspec.get_mapper("reference://", fo=out) - g = zarr.open(mapper) + g = zarr.open(mapper, zarr_version=2) assert (g.x[:] == np.concatenate(arrays, axis=axis)).all() try: @@ -76,7 +76,7 @@ def test_success(tmpdir, arrays, chunks, axis, m): remote_protocol="file", skip_instance_cache=True, ) - g = zarr.open(mapper) + g = zarr.open(mapper, zarr_version=2) assert (g.x[:] == np.concatenate(arrays, axis=axis)).all() kerchunk.df.refs_to_dataframe(out, "memory://out.parq", record_size=1) @@ -86,7 +86,7 @@ def test_success(tmpdir, arrays, chunks, axis, m): remote_protocol="file", skip_instance_cache=True, ) - g = zarr.open(mapper) + g = zarr.open(mapper, zarr_version=2) assert (g.x[:] == np.concatenate(arrays, axis=axis)).all() @@ -95,9 +95,9 @@ def test_fail_chunks(tmpdir): fn2 = f"{tmpdir}/out2.zarr" x1 = np.arange(10) x2 = np.arange(10, 20) - g = zarr.open(fn1) + g = zarr.open(fn1, zarr_version=2) g.create_dataset("x", data=x1, chunks=(2,)) - g = zarr.open(fn2) + g = zarr.open(fn2, zarr_version=2) g.create_dataset("x", data=x2, chunks=(3,)) ref1 = kerchunk.zarr.single_zarr(fn1, inline=0) @@ -112,9 +112,9 @@ def test_fail_shape(tmpdir): fn2 = f"{tmpdir}/out2.zarr" x1 = np.arange(12).reshape(6, 2) x2 = np.arange(12, 24) - g = zarr.open(fn1) + g = zarr.open(fn1, zarr_version=2) g.create_dataset("x", data=x1, chunks=(2,)) - g = zarr.open(fn2) + g = zarr.open(fn2, zarr_version=2) g.create_dataset("x", data=x2, chunks=(2,)) ref1 = kerchunk.zarr.single_zarr(fn1, inline=0) @@ -129,9 +129,9 @@ def test_fail_irregular_chunk_boundaries(tmpdir): fn2 = f"{tmpdir}/out2.zarr" x1 = np.arange(10) x2 = np.arange(10, 24) - g = zarr.open(fn1) + g = zarr.open(fn1, zarr_version=2) g.create_dataset("x", data=x1, chunks=(4,)) - g = zarr.open(fn2) + g = zarr.open(fn2, zarr_version=2) g.create_dataset("x", data=x2, chunks=(4,)) ref1 = kerchunk.zarr.single_zarr(fn1, inline=0) diff --git a/kerchunk/tests/test_fits.py b/kerchunk/tests/test_fits.py index 14ea6fc0..e7211479 100644 --- a/kerchunk/tests/test_fits.py +++ b/kerchunk/tests/test_fits.py @@ -18,7 +18,7 @@ def test_ascii_table(): url = "https://fits.gsfc.nasa.gov/samples/WFPC2u5780205r_c0fx.fits" out = kerchunk.fits.process_file(url, extension=1) m = fsspec.get_mapper("reference://", fo=out, remote_protocol="https") - g = zarr.open(m) + g = zarr.open(m, zarr_version=2) arr = g["u5780205r_cvt.c0h.tab"][:] with fsspec.open( "https://fits.gsfc.nasa.gov/samples/WFPC2u5780205r_c0fx.fits" @@ -31,7 +31,7 @@ def test_ascii_table(): def test_binary_table(): out = kerchunk.fits.process_file(btable, extension=1) m = fsspec.get_mapper("reference://", fo=out) - z = zarr.open(m) + z = zarr.open(m, zarr_version=2) arr = z["1"] with open(btable, "rb") as f: hdul = fits.open(f) @@ -48,7 +48,7 @@ def test_binary_table(): def test_cube(): out = kerchunk.fits.process_file(range_im) m = fsspec.get_mapper("reference://", fo=out) - z = zarr.open(m) + z = zarr.open(m, zarr_version=2) arr = z["PRIMARY"] with open(range_im, "rb") as f: hdul = fits.open(f) @@ -61,7 +61,7 @@ def test_with_class(): out = ftz.translate() assert "fits" in repr(ftz) m = fsspec.get_mapper("reference://", fo=out) - z = zarr.open(m) + z = zarr.open(m, zarr_version=2) arr = z["PRIMARY"] with open(range_im, "rb") as f: hdul = fits.open(f) @@ -76,7 +76,7 @@ def test_var(): ftz = kerchunk.fits.FitsToZarr(var) out = ftz.translate() m = fsspec.get_mapper("reference://", fo=out) - z = zarr.open(m) + z = zarr.open(m, zarr_version=2) arr = z["1"] vars = [_.tolist() for _ in arr["var"]] diff --git a/kerchunk/tests/test_grib.py b/kerchunk/tests/test_grib.py index 32092ced..91ae9ac7 100644 --- a/kerchunk/tests/test_grib.py +++ b/kerchunk/tests/test_grib.py @@ -119,7 +119,7 @@ def test_grib_tree(): corrected_msg_groups = [correct_hrrr_subhf_step(msg) for msg in scanned_msg_groups] result = grib_tree(corrected_msg_groups) fs = fsspec.filesystem("reference", fo=result) - zg = zarr.open_group(fs.get_mapper("")) + zg = zarr.open_group(fs.get_mapper(""), zarr_version=2) assert isinstance(zg["refc/instant/atmosphere/refc"], zarr.Array) assert isinstance(zg["vbdsf/avg/surface/vbdsf"], zarr.Array) assert set(zg["vbdsf/avg/surface"].attrs["coordinates"].split()) == set( @@ -147,14 +147,14 @@ def test_correct_hrrr_subhf_group_step(): scanned_msgs = ujson.load(fobj) original_zg = [ - zarr.open_group(fsspec.filesystem("reference", fo=val).get_mapper("")) + zarr.open_group(fsspec.filesystem("reference", fo=val).get_mapper(""), zarr_version=2) for val in scanned_msgs ] corrected_msgs = [correct_hrrr_subhf_step(msg) for msg in scanned_msgs] corrected_zg = [ - zarr.open_group(fsspec.filesystem("reference", fo=val).get_mapper("")) + zarr.open_group(fsspec.filesystem("reference", fo=val).get_mapper(""), zarr_version=2) for val in corrected_msgs ] @@ -177,7 +177,7 @@ def test_hrrr_subhf_corrected_grib_tree(): corrected_msgs = [correct_hrrr_subhf_step(msg) for msg in scanned_msgs] merged = grib_tree(corrected_msgs) - zg = zarr.open_group(fsspec.filesystem("reference", fo=merged).get_mapper("")) + zg = zarr.open_group(fsspec.filesystem("reference", fo=merged).get_mapper(""), zarr_version=2) # Check the values and shape of the time coordinates assert zg.u.instant.heightAboveGround.step[:].tolist() == [ 0.0, @@ -220,7 +220,7 @@ def test_hrrr_sfcf_grib_tree(): with open(fpath, "rb") as fobj: scanned_msgs = ujson.load(fobj) merged = grib_tree(scanned_msgs) - zg = zarr.open_group(fsspec.filesystem("reference", fo=merged).get_mapper("")) + zg = zarr.open_group(fsspec.filesystem("reference", fo=merged).get_mapper(""), zarr_version=2) # Check the heightAboveGround level shape of the time coordinates assert zg.u.instant.heightAboveGround.heightAboveGround[()] == 80.0 assert zg.u.instant.heightAboveGround.heightAboveGround.shape == () diff --git a/kerchunk/tests/test_hdf.py b/kerchunk/tests/test_hdf.py index 69fd22b5..2f825e6d 100644 --- a/kerchunk/tests/test_hdf.py +++ b/kerchunk/tests/test_hdf.py @@ -193,7 +193,7 @@ def test_string_embed(): out = h.translate() fs = fsspec.filesystem("reference", fo=out) assert txt in fs.references["vlen_str/0"] - z = zarr.open(fs.get_mapper()) + z = zarr.open(fs.get_mapper(), zarr_version=2) assert z.vlen_str.dtype == "O" assert z.vlen_str[0] == txt assert (z.vlen_str[1:] == "").all() @@ -204,7 +204,7 @@ def test_string_null(): h = kerchunk.hdf.SingleHdf5ToZarr(fn, fn, vlen_encode="null", inline_threshold=0) out = h.translate() fs = fsspec.filesystem("reference", fo=out) - z = zarr.open(fs.get_mapper()) + z = zarr.open(fs.get_mapper(), zarr_version=2) assert z.vlen_str.dtype == "O" assert (z.vlen_str[:] == None).all() @@ -217,7 +217,7 @@ def test_string_leave(): ) out = h.translate() fs = fsspec.filesystem("reference", fo=out) - z = zarr.open(fs.get_mapper()) + z = zarr.open(fs.get_mapper(), zarr_version=2) assert z.vlen_str.dtype == "S16" assert z.vlen_str[0] # some obscured ID assert (z.vlen_str[1:] == b"").all() @@ -232,7 +232,7 @@ def test_string_decode(): out = h.translate() fs = fsspec.filesystem("reference", fo=out) assert txt in fs.cat("vlen_str/.zarray").decode() # stored in filter def - z = zarr.open(fs.get_mapper()) + z = zarr.open(fs.get_mapper(), zarr_version=2) assert z.vlen_str[0] == txt assert (z.vlen_str[1:] == "").all() @@ -243,7 +243,7 @@ def test_compound_string_null(): h = kerchunk.hdf.SingleHdf5ToZarr(f, fn, vlen_encode="null", inline_threshold=0) out = h.translate() fs = fsspec.filesystem("reference", fo=out) - z = zarr.open(fs.get_mapper()) + z = zarr.open(fs.get_mapper(), zarr_version=2) assert z.vlen_str[0].tolist() == (10, None) assert (z.vlen_str["ints"][1:] == 0).all() assert (z.vlen_str["strs"][1:] == None).all() @@ -257,7 +257,7 @@ def test_compound_string_leave(): ) out = h.translate() fs = fsspec.filesystem("reference", fo=out) - z = zarr.open(fs.get_mapper()) + z = zarr.open(fs.get_mapper(), zarr_version=2) assert z.vlen_str["ints"][0] == 10 assert z.vlen_str["strs"][0] # random ID assert (z.vlen_str["ints"][1:] == 0).all() @@ -272,7 +272,7 @@ def test_compound_string_encode(): ) out = h.translate() fs = fsspec.filesystem("reference", fo=out) - z = zarr.open(fs.get_mapper()) + z = zarr.open(fs.get_mapper(), zarr_version=2) assert z.vlen_str["ints"][0] == 10 assert z.vlen_str["strs"][0] == "water" assert (z.vlen_str["ints"][1:] == 0).all() @@ -303,7 +303,7 @@ def test_compress(): continue out = h.translate() m = fsspec.get_mapper("reference://", fo=out) - g = zarr.open(m) + g = zarr.open(m, zarr_version=2) assert np.mean(g.data) == 49.5 @@ -313,7 +313,7 @@ def test_embed(): out = h.translate() fs = fsspec.filesystem("reference", fo=out) - z = zarr.open(fs.get_mapper()) + z = zarr.open(fs.get_mapper(), zarr_version=2) data = z["Domain_10"]["STER"]["min_1"]["boom_1"]["temperature"][:] assert data[0].tolist() == [ "2014-04-01 00:00:00.0", @@ -348,7 +348,7 @@ def test_translate_links(): preserve_linked_dsets=True ) fs = fsspec.filesystem("reference", fo=out) - z = zarr.open(fs.get_mapper()) + z = zarr.open(fs.get_mapper(), zarr_version=2) # 1. Test the hard linked datasets were translated correctly # 2. Test the soft linked datasets were translated correctly diff --git a/kerchunk/tests/test_tiff.py b/kerchunk/tests/test_tiff.py index 3cc52471..4011a67a 100644 --- a/kerchunk/tests/test_tiff.py +++ b/kerchunk/tests/test_tiff.py @@ -16,7 +16,7 @@ def test_one(): fn = files[0] out = kerchunk.tiff.tiff_to_zarr(fn) m = fsspec.get_mapper("reference://", fo=out) - z = zarr.open(m) + z = zarr.open(m, zarr_version=2) assert list(z) == ["0", "1", "2"] assert z.attrs["multiscales"] == [ { @@ -34,7 +34,7 @@ def test_coord(): fn = files[0] out = kerchunk.tiff.tiff_to_zarr(fn) m = fsspec.get_mapper("reference://", fo=out) - z = zarr.open(m) # highest res is the one xarray picks + z = zarr.open(m, zarr_version=2) # highest res is the one xarray picks out = kerchunk.tiff.generate_coords(z.attrs, z[0].shape) ds = xr.open_dataset(fn) diff --git a/kerchunk/tests/test_utils.py b/kerchunk/tests/test_utils.py index a1bb094d..8e4502c1 100644 --- a/kerchunk/tests/test_utils.py +++ b/kerchunk/tests/test_utils.py @@ -79,13 +79,13 @@ def test_inline_array(): assert "data/1" not in out2 assert json.loads(out2["data/.zattrs"]) == json.loads(refs["data/.zattrs"]) fs = fsspec.filesystem("reference", fo=out2) - g = zarr.open(fs.get_mapper()) + g = zarr.open(fs.get_mapper(), zarr_version=2) assert g.data[:].tolist() == [1, 2] out3 = kerchunk.utils.inline_array(refs, threshold=1000) # inlines because of size assert "data/1" not in out3 fs = fsspec.filesystem("reference", fo=out3) - g = zarr.open(fs.get_mapper()) + g = zarr.open(fs.get_mapper(), zarr_version=2) assert g.data[:].tolist() == [1, 2] @@ -99,7 +99,7 @@ def test_json(): @pytest.mark.parametrize("chunks", [[10, 10], [5, 10]]) def test_subchunk_exact(m, chunks): store = m.get_mapper("test.zarr") - g = zarr.open_group(store, mode="w") + g = zarr.open_group(store, mode="w", zarr_version=2) data = np.arange(100).reshape(10, 10) arr = g.create_dataset("data", data=data, chunks=chunks, compression=None) ref = kerchunk.zarr.single_zarr("memory://test.zarr")["refs"] @@ -114,7 +114,7 @@ def test_subchunk_exact(m, chunks): ] g2 = zarr.open_group( - "reference://", storage_options={"fo": out, "remote_protocol": "memory"} + "reference://", storage_options={"fo": out, "remote_protocol": "memory"}, zarr_version=2 ) assert (g2.data[:] == data).all() diff --git a/kerchunk/utils.py b/kerchunk/utils.py index 838c3cb1..4049ee63 100644 --- a/kerchunk/utils.py +++ b/kerchunk/utils.py @@ -226,7 +226,7 @@ def inline_array(store, threshold=1000, names=None, remote_options=None): fs = fsspec.filesystem( "reference", fo=store, **(remote_options or {}), skip_instance_cache=True ) - g = zarr.open_group(fs.get_mapper(), mode="r+") + g = zarr.open_group(fs.get_mapper(), mode="r+", zarr_version=2) _inline_array(g, threshold, names=names or []) return fs.references diff --git a/pyproject.toml b/pyproject.toml index 415c3cbd..680f4c2f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ dependencies = [ "numcodecs", "numpy", "ujson", - "zarr<3", + "zarr==3.0.0a6", ] [project.optional-dependencies] From d3c7e372cfa6f6822361441df79e872c9b68ee4c Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Sat, 5 Oct 2024 09:49:38 -0400 Subject: [PATCH 02/22] Bump zarr python version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 680f4c2f..6e57e223 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ dependencies = [ "numcodecs", "numpy", "ujson", - "zarr==3.0.0a6", + "zarr==3.0.0a7", ] [project.optional-dependencies] From 25d7d14e5fb6e563012d1547013d92f28834bcec Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Sat, 5 Oct 2024 09:58:35 -0400 Subject: [PATCH 03/22] Get some tests working others failing --- kerchunk/hdf.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/kerchunk/hdf.py b/kerchunk/hdf.py index 549923d4..777201b5 100644 --- a/kerchunk/hdf.py +++ b/kerchunk/hdf.py @@ -21,11 +21,11 @@ "for more details." ) -try: - from zarr.meta import encode_fill_value -except ModuleNotFoundError: - # https://github.com/zarr-developers/zarr-python/issues/2021 - from zarr.v2.meta import encode_fill_value +# try: +# from zarr.meta import encode_fill_value +# except ModuleNotFoundError: +# # https://github.com/zarr-developers/zarr-python/issues/2021 +# from zarr.v2.meta import encode_fill_value lggr = logging.getLogger("h5-to-zarr") _HIDDEN_ATTRS = { # from h5netcdf.attrs @@ -465,9 +465,10 @@ def _translator( if h5py.h5ds.is_scale(h5obj.id) and not cinfo: return if h5obj.attrs.get("_FillValue") is not None: - fill = encode_fill_value( - h5obj.attrs.get("_FillValue"), dt or h5obj.dtype - ) + fill = h5obj.attrs.get("_FillValue") + # fill = encode_fill_value( + # h5obj.attrs.get("_FillValue"), dt or h5obj.dtype + # ) # Create a Zarr array equivalent to this HDF5 dataset... za = self._zroot.require_dataset( From ffe5f9d906381be23b41496e167d1d44835a5486 Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Tue, 8 Oct 2024 17:07:53 -0400 Subject: [PATCH 04/22] get through single hdf to zarr --- kerchunk/combine.py | 8 +-- kerchunk/fits.py | 2 +- kerchunk/grib2.py | 4 +- kerchunk/hdf.py | 94 ++++++++++++++++++++++----- kerchunk/hdf4.py | 2 +- kerchunk/netCDF3.py | 2 +- kerchunk/tests/test_combine.py | 6 +- kerchunk/tests/test_combine_concat.py | 20 +++--- kerchunk/tests/test_fits.py | 10 +-- kerchunk/tests/test_grib.py | 10 +-- kerchunk/tests/test_hdf.py | 23 ++++--- kerchunk/tests/test_tiff.py | 4 +- kerchunk/tests/test_utils.py | 8 +-- kerchunk/utils.py | 2 +- 14 files changed, 129 insertions(+), 66 deletions(-) diff --git a/kerchunk/combine.py b/kerchunk/combine.py index 155ba4c9..b02fa395 100644 --- a/kerchunk/combine.py +++ b/kerchunk/combine.py @@ -203,7 +203,7 @@ def append( ds = xr.open_dataset( fs.get_mapper(), engine="zarr", backend_kwargs={"consolidated": False} ) - z = zarr.open(fs.get_mapper(), zarr_version=2) + z = zarr.open(fs.get_mapper(), zarr_format=2) mzz = MultiZarrToZarr( path, out=fs.references, # dict or parquet/lazy @@ -360,7 +360,7 @@ def first_pass(self): fs._dircache_from_items() logger.debug("First pass: %s", i) - z = zarr.open_group(fs.get_mapper(""), zarr_version=2) + z = zarr.open_group(fs.get_mapper(""), zarr_format=2) for var in self.concat_dims: value = self._get_value(i, z, var, fn=self._paths[i]) if isinstance(value, np.ndarray): @@ -387,7 +387,7 @@ def store_coords(self): """ kv = {} store = zarr.storage.KVStore(kv) - group = zarr.open(store, zarr_version=2) + group = zarr.open(store, zarr_format=2) m = self.fss[0].get_mapper("") z = zarr.open(m) for k, v in self.coos.items(): @@ -461,7 +461,7 @@ def second_pass(self): for i, fs in enumerate(self.fss): to_download = {} m = fs.get_mapper("") - z = zarr.open(m, zarr_version=2) + z = zarr.open(m, zarr_format=2) if no_deps is None: # done first time only diff --git a/kerchunk/fits.py b/kerchunk/fits.py index f714af97..f50bef64 100644 --- a/kerchunk/fits.py +++ b/kerchunk/fits.py @@ -72,7 +72,7 @@ def process_file( storage_options = storage_options or {} out = out or {} - g = zarr.open(out, zarr_version=2) + g = zarr.open(out, zarr_format=2) with fsspec.open(url, mode="rb", **storage_options) as f: infile = fits.open(f, do_not_scale_image_data=True) diff --git a/kerchunk/grib2.py b/kerchunk/grib2.py index 06108db5..7d75786f 100644 --- a/kerchunk/grib2.py +++ b/kerchunk/grib2.py @@ -191,7 +191,7 @@ def scan_grib( if good is False: continue - z = zarr.open_group(store, zarr_version=2) + z = zarr.open_group(store, zarr_format=2) global_attrs = { f"GRIB_{k}": m[k] for k in cfgrib.dataset.GLOBAL_ATTRIBUTES_KEYS @@ -398,7 +398,7 @@ def grib_tree( # TODO allow passing a LazyReferenceMapper as output? zarr_store = {} - zroot = zarr.open_group(store=zarr_store, zarr_version=2) + zroot = zarr.open_group(store=zarr_store, zarr_format=2) aggregations: Dict[str, List] = defaultdict(list) aggregation_dims: Dict[str, Set] = defaultdict(set) diff --git a/kerchunk/hdf.py b/kerchunk/hdf.py index 777201b5..4073a2b3 100644 --- a/kerchunk/hdf.py +++ b/kerchunk/hdf.py @@ -1,7 +1,8 @@ import base64 import io import logging -from typing import Union, BinaryIO +from typing import Union, BinaryIO, Any, cast +from packaging.version import Version import fsspec.core from fsspec.implementations.reference import LazyReferenceMapper @@ -111,8 +112,13 @@ def __init__( if vlen_encode not in ["embed", "null", "leave", "encode"]: raise NotImplementedError self.vlen = vlen_encode - self.store = out or {} - self._zroot = zarr.group(store=self.store, overwrite=True) + self.store_dict = out or {} + if Version(zarr.__version__) < Version("3.0.0.a0"): + self.store = zarr.storage.KVStore(self.store_dict) + else: + self.store = zarr.storage.MemoryStore(mode="a", store_dict=self.store_dict) + # self.store = out or {} + self._zroot = zarr.group(store=self.store, zarr_format=2, overwrite=True) self._uri = url self.error = error @@ -141,8 +147,12 @@ def translate(self, preserve_linked_dsets=False): lggr.debug("Translation begins") self._transfer_attrs(self._h5f, self._zroot) + print('transfer done') + self._h5f.visititems(self._translator) + print('visit done') + if preserve_linked_dsets: if not has_visititems_links(): raise RuntimeError( @@ -157,7 +167,10 @@ def translate(self, preserve_linked_dsets=False): self.store.flush() return self.store else: - store = _encode_for_JSON(self.store) + for k, v in self.store_dict.items(): + if isinstance(v, zarr.core.buffer.cpu.Buffer): + self.store_dict[k] = v.to_bytes() + store = _encode_for_JSON(self.store_dict) return {"version": 1, "refs": store} def _unref(self, ref): @@ -466,26 +479,30 @@ def _translator( return if h5obj.attrs.get("_FillValue") is not None: fill = h5obj.attrs.get("_FillValue") - # fill = encode_fill_value( - # h5obj.attrs.get("_FillValue"), dt or h5obj.dtype - # ) + fill = encode_fill_value( + h5obj.attrs.get("_FillValue"), dt or h5obj.dtype + ) + + adims = self._get_array_dims(h5obj) - # Create a Zarr array equivalent to this HDF5 dataset... - za = self._zroot.require_dataset( - h5obj.name, + # Create a Zarr array equivalent to this HDF5 dataset.. + za = self._zroot.require_array( + name=h5obj.name, shape=h5obj.shape, dtype=dt or h5obj.dtype, chunks=h5obj.chunks or False, fill_value=fill, - compression=None, + compressor=None, filters=filters, - overwrite=True, + attributes={ + "_ARRAY_DIMENSIONS": adims, + }, **kwargs, ) lggr.debug(f"Created Zarr array: {za}") - self._transfer_attrs(h5obj, za) - adims = self._get_array_dims(h5obj) - za.attrs["_ARRAY_DIMENSIONS"] = adims + #self._transfer_attrs(h5obj, za) + + # za.attrs["_ARRAY_DIMENSIONS"] = adims lggr.debug(f"_ARRAY_DIMENSIONS = {adims}") if "data" in kwargs: @@ -497,6 +514,7 @@ def _translator( if h5obj.fletcher32: logging.info("Discarding fletcher32 checksum") v["size"] -= 4 + key = ".".join(map(str, k)) if ( self.inline and isinstance(v, dict) @@ -509,9 +527,10 @@ def _translator( data.decode("ascii") except UnicodeDecodeError: data = b"base64:" + base64.b64encode(data) - self.store[za._chunk_key(k)] = data + + self.store_dict[key] = data else: - self.store[za._chunk_key(k)] = [ + self.store_dict[key] = [ self._uri, v["offset"], v["size"], @@ -523,6 +542,7 @@ def _translator( self._transfer_attrs(h5obj, zgrp) except Exception as e: import traceback + raise e msg = "\n".join( [ @@ -682,3 +702,43 @@ def _is_netcdf_variable(dataset: h5py.Dataset): def has_visititems_links(): return hasattr(h5py.Group, "visititems_links") + +def encode_fill_value(v: Any, dtype: np.dtype, object_codec: Any = None) -> Any: + # early out + if v is None: + return v + if dtype.kind == "V" and dtype.hasobject: + if object_codec is None: + raise ValueError("missing object_codec for object array") + v = object_codec.encode(v) + v = str(base64.standard_b64encode(v), "ascii") + return v + if dtype.kind == "f": + if np.isnan(v): + return "NaN" + elif np.isposinf(v): + return "Infinity" + elif np.isneginf(v): + return "-Infinity" + else: + return float(v) + elif dtype.kind in "ui": + return int(v) + elif dtype.kind == "b": + return bool(v) + elif dtype.kind in "c": + c = cast(np.complex128, np.dtype(complex).type()) + v = ( + encode_fill_value(v.real, c.real.dtype, object_codec), + encode_fill_value(v.imag, c.imag.dtype, object_codec), + ) + return v + elif dtype.kind in "SV": + v = str(base64.standard_b64encode(v), "ascii") + return v + elif dtype.kind == "U": + return v + elif dtype.kind in "mM": + return int(v.view("i8")) + else: + return v diff --git a/kerchunk/hdf4.py b/kerchunk/hdf4.py index 4235d139..8339659b 100644 --- a/kerchunk/hdf4.py +++ b/kerchunk/hdf4.py @@ -144,7 +144,7 @@ def translate(self, filename=None, storage_options=None): remote_protocol=prot, remote_options=self.st, ) - g = zarr.open_group("reference://", storage_options=dict(fs=fs), zarr_version=2) + g = zarr.open_group("reference://", storage_options=dict(fs=fs), zarr_format=2) refs = {} for k, v in output.items(): if isinstance(v, dict): diff --git a/kerchunk/netCDF3.py b/kerchunk/netCDF3.py index 8e0994ca..d44fc808 100644 --- a/kerchunk/netCDF3.py +++ b/kerchunk/netCDF3.py @@ -167,7 +167,7 @@ def translate(self): import zarr out = self.out - z = zarr.open(out, mode="w", zarr_version=2) + z = zarr.open(out, mode="w", zarr_format=2) for dim, var in self.variables.items(): if dim in self.chunks: shape = self.chunks[dim][-1] diff --git a/kerchunk/tests/test_combine.py b/kerchunk/tests/test_combine.py index 1b5713b2..868a39ff 100644 --- a/kerchunk/tests/test_combine.py +++ b/kerchunk/tests/test_combine.py @@ -133,14 +133,14 @@ # simple time arrays - xarray can't make these! m = fs.get_mapper("time1.zarr") -z = zarr.open(m, mode="w", zarr_version=2) +z = zarr.open(m, mode="w", zarr_format=2) ar = z.create_dataset("time", data=np.array([1], dtype="M8[s]")) ar.attrs.update({"_ARRAY_DIMENSIONS": ["time"]}) ar = z.create_dataset("data", data=arr) ar.attrs.update({"_ARRAY_DIMENSIONS": ["time", "x", "y"]}) m = fs.get_mapper("time2.zarr") -z = zarr.open(m, mode="w", zarr_version=2) +z = zarr.open(m, mode="w", zarr_format=2) ar = z.create_dataset("time", data=np.array([2], dtype="M8[s]")) ar.attrs.update({"_ARRAY_DIMENSIONS": ["time"]}) ar = z.create_dataset("data", data=arr) @@ -272,7 +272,7 @@ def test_get_coos(refs, selector, expected): mzz.first_pass() assert mzz.coos["time"].tolist() == expected mzz.store_coords() - g = zarr.open(mzz.out, zarr_version=2) + g = zarr.open(mzz.out, zarr_format=2) assert g["time"][:].tolist() == expected assert dict(g.attrs) diff --git a/kerchunk/tests/test_combine_concat.py b/kerchunk/tests/test_combine_concat.py index f51f10e8..23e785df 100644 --- a/kerchunk/tests/test_combine_concat.py +++ b/kerchunk/tests/test_combine_concat.py @@ -51,7 +51,7 @@ def test_success(tmpdir, arrays, chunks, axis, m): refs = [] for i, x in enumerate(arrays): fn = f"{tmpdir}/out{i}.zarr" - g = zarr.open(fn, zarr_version=2) + g = zarr.open(fn, zarr_format=2) g.create_dataset("x", data=x, chunks=chunks) fns.append(fn) ref = kerchunk.zarr.single_zarr(fn, inline=0) @@ -62,7 +62,7 @@ def test_success(tmpdir, arrays, chunks, axis, m): ) mapper = fsspec.get_mapper("reference://", fo=out) - g = zarr.open(mapper, zarr_version=2) + g = zarr.open(mapper, zarr_format=2) assert (g.x[:] == np.concatenate(arrays, axis=axis)).all() try: @@ -76,7 +76,7 @@ def test_success(tmpdir, arrays, chunks, axis, m): remote_protocol="file", skip_instance_cache=True, ) - g = zarr.open(mapper, zarr_version=2) + g = zarr.open(mapper, zarr_format=2) assert (g.x[:] == np.concatenate(arrays, axis=axis)).all() kerchunk.df.refs_to_dataframe(out, "memory://out.parq", record_size=1) @@ -86,7 +86,7 @@ def test_success(tmpdir, arrays, chunks, axis, m): remote_protocol="file", skip_instance_cache=True, ) - g = zarr.open(mapper, zarr_version=2) + g = zarr.open(mapper, zarr_format=2) assert (g.x[:] == np.concatenate(arrays, axis=axis)).all() @@ -95,9 +95,9 @@ def test_fail_chunks(tmpdir): fn2 = f"{tmpdir}/out2.zarr" x1 = np.arange(10) x2 = np.arange(10, 20) - g = zarr.open(fn1, zarr_version=2) + g = zarr.open(fn1, zarr_format=2) g.create_dataset("x", data=x1, chunks=(2,)) - g = zarr.open(fn2, zarr_version=2) + g = zarr.open(fn2, zarr_format=2) g.create_dataset("x", data=x2, chunks=(3,)) ref1 = kerchunk.zarr.single_zarr(fn1, inline=0) @@ -112,9 +112,9 @@ def test_fail_shape(tmpdir): fn2 = f"{tmpdir}/out2.zarr" x1 = np.arange(12).reshape(6, 2) x2 = np.arange(12, 24) - g = zarr.open(fn1, zarr_version=2) + g = zarr.open(fn1, zarr_format=2) g.create_dataset("x", data=x1, chunks=(2,)) - g = zarr.open(fn2, zarr_version=2) + g = zarr.open(fn2, zarr_format=2) g.create_dataset("x", data=x2, chunks=(2,)) ref1 = kerchunk.zarr.single_zarr(fn1, inline=0) @@ -129,9 +129,9 @@ def test_fail_irregular_chunk_boundaries(tmpdir): fn2 = f"{tmpdir}/out2.zarr" x1 = np.arange(10) x2 = np.arange(10, 24) - g = zarr.open(fn1, zarr_version=2) + g = zarr.open(fn1, zarr_format=2) g.create_dataset("x", data=x1, chunks=(4,)) - g = zarr.open(fn2, zarr_version=2) + g = zarr.open(fn2, zarr_format=2) g.create_dataset("x", data=x2, chunks=(4,)) ref1 = kerchunk.zarr.single_zarr(fn1, inline=0) diff --git a/kerchunk/tests/test_fits.py b/kerchunk/tests/test_fits.py index e7211479..5d7c3b6d 100644 --- a/kerchunk/tests/test_fits.py +++ b/kerchunk/tests/test_fits.py @@ -18,7 +18,7 @@ def test_ascii_table(): url = "https://fits.gsfc.nasa.gov/samples/WFPC2u5780205r_c0fx.fits" out = kerchunk.fits.process_file(url, extension=1) m = fsspec.get_mapper("reference://", fo=out, remote_protocol="https") - g = zarr.open(m, zarr_version=2) + g = zarr.open(m, zarr_format=2) arr = g["u5780205r_cvt.c0h.tab"][:] with fsspec.open( "https://fits.gsfc.nasa.gov/samples/WFPC2u5780205r_c0fx.fits" @@ -31,7 +31,7 @@ def test_ascii_table(): def test_binary_table(): out = kerchunk.fits.process_file(btable, extension=1) m = fsspec.get_mapper("reference://", fo=out) - z = zarr.open(m, zarr_version=2) + z = zarr.open(m, zarr_format=2) arr = z["1"] with open(btable, "rb") as f: hdul = fits.open(f) @@ -48,7 +48,7 @@ def test_binary_table(): def test_cube(): out = kerchunk.fits.process_file(range_im) m = fsspec.get_mapper("reference://", fo=out) - z = zarr.open(m, zarr_version=2) + z = zarr.open(m, zarr_format=2) arr = z["PRIMARY"] with open(range_im, "rb") as f: hdul = fits.open(f) @@ -61,7 +61,7 @@ def test_with_class(): out = ftz.translate() assert "fits" in repr(ftz) m = fsspec.get_mapper("reference://", fo=out) - z = zarr.open(m, zarr_version=2) + z = zarr.open(m, zarr_format=2) arr = z["PRIMARY"] with open(range_im, "rb") as f: hdul = fits.open(f) @@ -76,7 +76,7 @@ def test_var(): ftz = kerchunk.fits.FitsToZarr(var) out = ftz.translate() m = fsspec.get_mapper("reference://", fo=out) - z = zarr.open(m, zarr_version=2) + z = zarr.open(m, zarr_format=2) arr = z["1"] vars = [_.tolist() for _ in arr["var"]] diff --git a/kerchunk/tests/test_grib.py b/kerchunk/tests/test_grib.py index 91ae9ac7..9102529e 100644 --- a/kerchunk/tests/test_grib.py +++ b/kerchunk/tests/test_grib.py @@ -119,7 +119,7 @@ def test_grib_tree(): corrected_msg_groups = [correct_hrrr_subhf_step(msg) for msg in scanned_msg_groups] result = grib_tree(corrected_msg_groups) fs = fsspec.filesystem("reference", fo=result) - zg = zarr.open_group(fs.get_mapper(""), zarr_version=2) + zg = zarr.open_group(fs.get_mapper(""), zarr_format=2) assert isinstance(zg["refc/instant/atmosphere/refc"], zarr.Array) assert isinstance(zg["vbdsf/avg/surface/vbdsf"], zarr.Array) assert set(zg["vbdsf/avg/surface"].attrs["coordinates"].split()) == set( @@ -147,14 +147,14 @@ def test_correct_hrrr_subhf_group_step(): scanned_msgs = ujson.load(fobj) original_zg = [ - zarr.open_group(fsspec.filesystem("reference", fo=val).get_mapper(""), zarr_version=2) + zarr.open_group(fsspec.filesystem("reference", fo=val).get_mapper(""), zarr_format=2) for val in scanned_msgs ] corrected_msgs = [correct_hrrr_subhf_step(msg) for msg in scanned_msgs] corrected_zg = [ - zarr.open_group(fsspec.filesystem("reference", fo=val).get_mapper(""), zarr_version=2) + zarr.open_group(fsspec.filesystem("reference", fo=val).get_mapper(""), zarr_format=2) for val in corrected_msgs ] @@ -177,7 +177,7 @@ def test_hrrr_subhf_corrected_grib_tree(): corrected_msgs = [correct_hrrr_subhf_step(msg) for msg in scanned_msgs] merged = grib_tree(corrected_msgs) - zg = zarr.open_group(fsspec.filesystem("reference", fo=merged).get_mapper(""), zarr_version=2) + zg = zarr.open_group(fsspec.filesystem("reference", fo=merged).get_mapper(""), zarr_format=2) # Check the values and shape of the time coordinates assert zg.u.instant.heightAboveGround.step[:].tolist() == [ 0.0, @@ -220,7 +220,7 @@ def test_hrrr_sfcf_grib_tree(): with open(fpath, "rb") as fobj: scanned_msgs = ujson.load(fobj) merged = grib_tree(scanned_msgs) - zg = zarr.open_group(fsspec.filesystem("reference", fo=merged).get_mapper(""), zarr_version=2) + zg = zarr.open_group(fsspec.filesystem("reference", fo=merged).get_mapper(""), zarr_format=2) # Check the heightAboveGround level shape of the time coordinates assert zg.u.instant.heightAboveGround.heightAboveGround[()] == 80.0 assert zg.u.instant.heightAboveGround.heightAboveGround.shape == () diff --git a/kerchunk/tests/test_hdf.py b/kerchunk/tests/test_hdf.py index 2f825e6d..e140ca48 100644 --- a/kerchunk/tests/test_hdf.py +++ b/kerchunk/tests/test_hdf.py @@ -18,6 +18,7 @@ def test_single(): """Test creating references for a single HDF file""" url = "s3://noaa-nwm-retro-v2.0-pds/full_physics/2017/201704010000.CHRTOUT_DOMAIN1.comp" so = dict(anon=True, default_fill_cache=False, default_cache_type="none") + with fsspec.open(url, **so) as f: h5chunks = SingleHdf5ToZarr(f, url, storage_options=so) test_dict = h5chunks.translate() @@ -25,6 +26,8 @@ def test_single(): m = fsspec.get_mapper( "reference://", fo=test_dict, remote_protocol="s3", remote_options=so ) + x = [(k, v) for (k, v) in m.items()] + raise ValueError("foo") ds = xr.open_dataset(m, engine="zarr", backend_kwargs=dict(consolidated=False)) with fsspec.open(url, **so) as f: @@ -193,7 +196,7 @@ def test_string_embed(): out = h.translate() fs = fsspec.filesystem("reference", fo=out) assert txt in fs.references["vlen_str/0"] - z = zarr.open(fs.get_mapper(), zarr_version=2) + z = zarr.open(fs.get_mapper(), zarr_format=2) assert z.vlen_str.dtype == "O" assert z.vlen_str[0] == txt assert (z.vlen_str[1:] == "").all() @@ -204,7 +207,7 @@ def test_string_null(): h = kerchunk.hdf.SingleHdf5ToZarr(fn, fn, vlen_encode="null", inline_threshold=0) out = h.translate() fs = fsspec.filesystem("reference", fo=out) - z = zarr.open(fs.get_mapper(), zarr_version=2) + z = zarr.open(fs.get_mapper(), zarr_format=2) assert z.vlen_str.dtype == "O" assert (z.vlen_str[:] == None).all() @@ -217,7 +220,7 @@ def test_string_leave(): ) out = h.translate() fs = fsspec.filesystem("reference", fo=out) - z = zarr.open(fs.get_mapper(), zarr_version=2) + z = zarr.open(fs.get_mapper(), zarr_format=2) assert z.vlen_str.dtype == "S16" assert z.vlen_str[0] # some obscured ID assert (z.vlen_str[1:] == b"").all() @@ -232,7 +235,7 @@ def test_string_decode(): out = h.translate() fs = fsspec.filesystem("reference", fo=out) assert txt in fs.cat("vlen_str/.zarray").decode() # stored in filter def - z = zarr.open(fs.get_mapper(), zarr_version=2) + z = zarr.open(fs.get_mapper(), zarr_format=2) assert z.vlen_str[0] == txt assert (z.vlen_str[1:] == "").all() @@ -243,7 +246,7 @@ def test_compound_string_null(): h = kerchunk.hdf.SingleHdf5ToZarr(f, fn, vlen_encode="null", inline_threshold=0) out = h.translate() fs = fsspec.filesystem("reference", fo=out) - z = zarr.open(fs.get_mapper(), zarr_version=2) + z = zarr.open(fs.get_mapper(), zarr_format=2) assert z.vlen_str[0].tolist() == (10, None) assert (z.vlen_str["ints"][1:] == 0).all() assert (z.vlen_str["strs"][1:] == None).all() @@ -257,7 +260,7 @@ def test_compound_string_leave(): ) out = h.translate() fs = fsspec.filesystem("reference", fo=out) - z = zarr.open(fs.get_mapper(), zarr_version=2) + z = zarr.open(fs.get_mapper(), zarr_format=2) assert z.vlen_str["ints"][0] == 10 assert z.vlen_str["strs"][0] # random ID assert (z.vlen_str["ints"][1:] == 0).all() @@ -272,7 +275,7 @@ def test_compound_string_encode(): ) out = h.translate() fs = fsspec.filesystem("reference", fo=out) - z = zarr.open(fs.get_mapper(), zarr_version=2) + z = zarr.open(fs.get_mapper(), zarr_format=2) assert z.vlen_str["ints"][0] == 10 assert z.vlen_str["strs"][0] == "water" assert (z.vlen_str["ints"][1:] == 0).all() @@ -303,7 +306,7 @@ def test_compress(): continue out = h.translate() m = fsspec.get_mapper("reference://", fo=out) - g = zarr.open(m, zarr_version=2) + g = zarr.open(m, zarr_format=2) assert np.mean(g.data) == 49.5 @@ -313,7 +316,7 @@ def test_embed(): out = h.translate() fs = fsspec.filesystem("reference", fo=out) - z = zarr.open(fs.get_mapper(), zarr_version=2) + z = zarr.open(fs.get_mapper(), zarr_format=2) data = z["Domain_10"]["STER"]["min_1"]["boom_1"]["temperature"][:] assert data[0].tolist() == [ "2014-04-01 00:00:00.0", @@ -348,7 +351,7 @@ def test_translate_links(): preserve_linked_dsets=True ) fs = fsspec.filesystem("reference", fo=out) - z = zarr.open(fs.get_mapper(), zarr_version=2) + z = zarr.open(fs.get_mapper(), zarr_format=2) # 1. Test the hard linked datasets were translated correctly # 2. Test the soft linked datasets were translated correctly diff --git a/kerchunk/tests/test_tiff.py b/kerchunk/tests/test_tiff.py index 4011a67a..74ba59a4 100644 --- a/kerchunk/tests/test_tiff.py +++ b/kerchunk/tests/test_tiff.py @@ -16,7 +16,7 @@ def test_one(): fn = files[0] out = kerchunk.tiff.tiff_to_zarr(fn) m = fsspec.get_mapper("reference://", fo=out) - z = zarr.open(m, zarr_version=2) + z = zarr.open(m, zarr_format=2) assert list(z) == ["0", "1", "2"] assert z.attrs["multiscales"] == [ { @@ -34,7 +34,7 @@ def test_coord(): fn = files[0] out = kerchunk.tiff.tiff_to_zarr(fn) m = fsspec.get_mapper("reference://", fo=out) - z = zarr.open(m, zarr_version=2) # highest res is the one xarray picks + z = zarr.open(m, zarr_format=2) # highest res is the one xarray picks out = kerchunk.tiff.generate_coords(z.attrs, z[0].shape) ds = xr.open_dataset(fn) diff --git a/kerchunk/tests/test_utils.py b/kerchunk/tests/test_utils.py index 8e4502c1..a951c36c 100644 --- a/kerchunk/tests/test_utils.py +++ b/kerchunk/tests/test_utils.py @@ -79,13 +79,13 @@ def test_inline_array(): assert "data/1" not in out2 assert json.loads(out2["data/.zattrs"]) == json.loads(refs["data/.zattrs"]) fs = fsspec.filesystem("reference", fo=out2) - g = zarr.open(fs.get_mapper(), zarr_version=2) + g = zarr.open(fs.get_mapper(), zarr_format=2) assert g.data[:].tolist() == [1, 2] out3 = kerchunk.utils.inline_array(refs, threshold=1000) # inlines because of size assert "data/1" not in out3 fs = fsspec.filesystem("reference", fo=out3) - g = zarr.open(fs.get_mapper(), zarr_version=2) + g = zarr.open(fs.get_mapper(), zarr_format=2) assert g.data[:].tolist() == [1, 2] @@ -99,7 +99,7 @@ def test_json(): @pytest.mark.parametrize("chunks", [[10, 10], [5, 10]]) def test_subchunk_exact(m, chunks): store = m.get_mapper("test.zarr") - g = zarr.open_group(store, mode="w", zarr_version=2) + g = zarr.open_group(store, mode="w", zarr_format=2) data = np.arange(100).reshape(10, 10) arr = g.create_dataset("data", data=data, chunks=chunks, compression=None) ref = kerchunk.zarr.single_zarr("memory://test.zarr")["refs"] @@ -114,7 +114,7 @@ def test_subchunk_exact(m, chunks): ] g2 = zarr.open_group( - "reference://", storage_options={"fo": out, "remote_protocol": "memory"}, zarr_version=2 + "reference://", storage_options={"fo": out, "remote_protocol": "memory"}, zarr_format=2 ) assert (g2.data[:] == data).all() diff --git a/kerchunk/utils.py b/kerchunk/utils.py index 4049ee63..b52a9c0b 100644 --- a/kerchunk/utils.py +++ b/kerchunk/utils.py @@ -226,7 +226,7 @@ def inline_array(store, threshold=1000, names=None, remote_options=None): fs = fsspec.filesystem( "reference", fo=store, **(remote_options or {}), skip_instance_cache=True ) - g = zarr.open_group(fs.get_mapper(), mode="r+", zarr_version=2) + g = zarr.open_group(fs.get_mapper(), mode="r+", zarr_format=2) _inline_array(g, threshold, names=names or []) return fs.references From 5aef233686c89dc9ca56325f1c654e35a80e8440 Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Tue, 8 Oct 2024 17:13:36 -0400 Subject: [PATCH 05/22] Save progress --- kerchunk/tests/test_hdf.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kerchunk/tests/test_hdf.py b/kerchunk/tests/test_hdf.py index e140ca48..4135495b 100644 --- a/kerchunk/tests/test_hdf.py +++ b/kerchunk/tests/test_hdf.py @@ -6,6 +6,7 @@ import pytest import xarray as xr import zarr +from zarr.storage import MemoryStore import h5py from kerchunk.hdf import SingleHdf5ToZarr, has_visititems_links @@ -26,9 +27,8 @@ def test_single(): m = fsspec.get_mapper( "reference://", fo=test_dict, remote_protocol="s3", remote_options=so ) - x = [(k, v) for (k, v) in m.items()] - raise ValueError("foo") - ds = xr.open_dataset(m, engine="zarr", backend_kwargs=dict(consolidated=False)) + store = MemoryStore(m) + ds = xr.open_dataset(store, engine="zarr", backend_kwargs=dict(consolidated=False)) with fsspec.open(url, **so) as f: expected = xr.open_dataset(f, engine="h5netcdf") From b9323d2e227bd7b163492afe2e7a1f5eec6bda91 Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Tue, 8 Oct 2024 20:37:52 -0400 Subject: [PATCH 06/22] Cleanup, almost working with hdf --- kerchunk/hdf.py | 12 +++------- kerchunk/tests/test_hdf.py | 45 +++++++++++++++++++++++++++----------- 2 files changed, 35 insertions(+), 22 deletions(-) diff --git a/kerchunk/hdf.py b/kerchunk/hdf.py index 4073a2b3..501de4f3 100644 --- a/kerchunk/hdf.py +++ b/kerchunk/hdf.py @@ -115,11 +115,11 @@ def __init__( self.store_dict = out or {} if Version(zarr.__version__) < Version("3.0.0.a0"): self.store = zarr.storage.KVStore(self.store_dict) + self._zroot = zarr.group(store=self.store, overwrite=True) else: self.store = zarr.storage.MemoryStore(mode="a", store_dict=self.store_dict) - # self.store = out or {} - self._zroot = zarr.group(store=self.store, zarr_format=2, overwrite=True) - + self._zroot = zarr.group(store=self.store, zarr_format=2, overwrite=True) + self._uri = url self.error = error lggr.debug(f"HDF5 file URI: {self._uri}") @@ -146,13 +146,8 @@ def translate(self, preserve_linked_dsets=False): """ lggr.debug("Translation begins") self._transfer_attrs(self._h5f, self._zroot) - - print('transfer done') - self._h5f.visititems(self._translator) - print('visit done') - if preserve_linked_dsets: if not has_visititems_links(): raise RuntimeError( @@ -542,7 +537,6 @@ def _translator( self._transfer_attrs(h5obj, zgrp) except Exception as e: import traceback - raise e msg = "\n".join( [ diff --git a/kerchunk/tests/test_hdf.py b/kerchunk/tests/test_hdf.py index 4135495b..e2806545 100644 --- a/kerchunk/tests/test_hdf.py +++ b/kerchunk/tests/test_hdf.py @@ -1,6 +1,9 @@ import fsspec import os.path as osp +import fsspec.implementations +import fsspec.implementations.reference + import kerchunk.hdf import numpy as np import pytest @@ -9,6 +12,8 @@ from zarr.storage import MemoryStore import h5py +from packaging.version import Version + from kerchunk.hdf import SingleHdf5ToZarr, has_visititems_links from kerchunk.combine import MultiZarrToZarr, drop @@ -24,11 +29,15 @@ def test_single(): h5chunks = SingleHdf5ToZarr(f, url, storage_options=so) test_dict = h5chunks.translate() - m = fsspec.get_mapper( - "reference://", fo=test_dict, remote_protocol="s3", remote_options=so - ) - store = MemoryStore(m) - ds = xr.open_dataset(store, engine="zarr", backend_kwargs=dict(consolidated=False)) + if Version(zarr.__version__) < Version("3.0.0.a0"): + store = fsspec.get_mapper( + "reference://", fo=test_dict, remote_protocol="s3", remote_options=so + ) + else: + fs = fsspec.implementations.reference.ReferenceFileSystem(fo=test_dict) + store = zarr.storage.RemoteStore(fs, mode="r") + + ds = xr.open_dataset(store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False)) with fsspec.open(url, **so) as f: expected = xr.open_dataset(f, engine="h5netcdf") @@ -45,22 +54,32 @@ def test_single_direct_open(): h5f=url, inline_threshold=300, storage_options=so ).translate() - m = fsspec.get_mapper( - "reference://", fo=test_dict, remote_protocol="s3", remote_options=so - ) + if Version(zarr.__version__) < Version("3.0.0.a0"): + store = fsspec.get_mapper( + "reference://", fo=test_dict, remote_protocol="s3", remote_options=so + ) + else: + fs = fsspec.implementations.reference.ReferenceFileSystem(fo=test_dict) + store = zarr.storage.RemoteStore(fs, mode="r") + ds_direct = xr.open_dataset( - m, engine="zarr", backend_kwargs=dict(consolidated=False) + store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False) ) with fsspec.open(url, **so) as f: h5chunks = SingleHdf5ToZarr(f, url, storage_options=so) test_dict = h5chunks.translate() - m = fsspec.get_mapper( - "reference://", fo=test_dict, remote_protocol="s3", remote_options=so - ) + if Version(zarr.__version__) < Version("3.0.0.a0"): + store = fsspec.get_mapper( + "reference://", fo=test_dict, remote_protocol="s3", remote_options=so + ) + else: + fs = fsspec.implementations.reference.ReferenceFileSystem(fo=test_dict) + store = zarr.storage.RemoteStore(fs, mode="r") + ds_from_file_opener = xr.open_dataset( - m, engine="zarr", backend_kwargs=dict(consolidated=False) + store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False) ) xr.testing.assert_equal( From 0f1711944159edcbcce563cf5b7c8bde1e5e5348 Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Tue, 8 Oct 2024 21:46:49 -0400 Subject: [PATCH 07/22] Closer... --- kerchunk/hdf.py | 14 +++++++++++--- kerchunk/tests/test_hdf.py | 7 +++++-- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/kerchunk/hdf.py b/kerchunk/hdf.py index 501de4f3..5e4d2304 100644 --- a/kerchunk/hdf.py +++ b/kerchunk/hdf.py @@ -162,9 +162,16 @@ def translate(self, preserve_linked_dsets=False): self.store.flush() return self.store else: + keys_to_remove = [] + new_keys = {} for k, v in self.store_dict.items(): if isinstance(v, zarr.core.buffer.cpu.Buffer): - self.store_dict[k] = v.to_bytes() + key = str.removeprefix(k, "/") + new_keys[key] = v.to_bytes() + keys_to_remove.append(k) + for k in keys_to_remove: + del self.store_dict[k] + self.store_dict.update(new_keys) store = _encode_for_JSON(self.store_dict) return {"version": 1, "refs": store} @@ -495,7 +502,7 @@ def _translator( **kwargs, ) lggr.debug(f"Created Zarr array: {za}") - #self._transfer_attrs(h5obj, za) + self._transfer_attrs(h5obj, za) # za.attrs["_ARRAY_DIMENSIONS"] = adims lggr.debug(f"_ARRAY_DIMENSIONS = {adims}") @@ -509,7 +516,8 @@ def _translator( if h5obj.fletcher32: logging.info("Discarding fletcher32 checksum") v["size"] -= 4 - key = ".".join(map(str, k)) + key = str.removeprefix(h5obj.name, "/") + "/" + ".".join(map(str, k)) + if ( self.inline and isinstance(v, dict) diff --git a/kerchunk/tests/test_hdf.py b/kerchunk/tests/test_hdf.py index e2806545..2fe4e1cf 100644 --- a/kerchunk/tests/test_hdf.py +++ b/kerchunk/tests/test_hdf.py @@ -1,3 +1,4 @@ +import asyncio import fsspec import os.path as osp @@ -9,8 +10,6 @@ import pytest import xarray as xr import zarr -from zarr.storage import MemoryStore -import h5py from packaging.version import Version @@ -20,6 +19,10 @@ here = osp.dirname(__file__) +async def list_dir(store, path): + [x async for x in store.list_dir(path)] + + def test_single(): """Test creating references for a single HDF file""" url = "s3://noaa-nwm-retro-v2.0-pds/full_physics/2017/201704010000.CHRTOUT_DOMAIN1.comp" From 5c8806bf272334b59cfdba13a9d746cef9e51329 Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Wed, 9 Oct 2024 14:18:17 -0400 Subject: [PATCH 08/22] Updating tests --- kerchunk/hdf.py | 1 + kerchunk/tests/test_hdf.py | 63 ++++++++++++++------------------------ 2 files changed, 24 insertions(+), 40 deletions(-) diff --git a/kerchunk/hdf.py b/kerchunk/hdf.py index 5e4d2304..6bb16922 100644 --- a/kerchunk/hdf.py +++ b/kerchunk/hdf.py @@ -705,6 +705,7 @@ def _is_netcdf_variable(dataset: h5py.Dataset): def has_visititems_links(): return hasattr(h5py.Group, "visititems_links") + def encode_fill_value(v: Any, dtype: np.dtype, object_codec: Any = None) -> Any: # early out if v is None: diff --git a/kerchunk/tests/test_hdf.py b/kerchunk/tests/test_hdf.py index 2fe4e1cf..ace45472 100644 --- a/kerchunk/tests/test_hdf.py +++ b/kerchunk/tests/test_hdf.py @@ -23,6 +23,16 @@ async def list_dir(store, path): [x async for x in store.list_dir(path)] +def create_store(test_dict: dict): + if Version(zarr.__version__) < Version("3.0.0.a0"): + return fsspec.get_mapper( + "reference://", fo=test_dict, remote_protocol="s3", remote_options=so + ) + else: + fs = fsspec.implementations.reference.ReferenceFileSystem(fo=test_dict) + return zarr.storage.RemoteStore(fs, mode="r") + + def test_single(): """Test creating references for a single HDF file""" url = "s3://noaa-nwm-retro-v2.0-pds/full_physics/2017/201704010000.CHRTOUT_DOMAIN1.comp" @@ -32,13 +42,7 @@ def test_single(): h5chunks = SingleHdf5ToZarr(f, url, storage_options=so) test_dict = h5chunks.translate() - if Version(zarr.__version__) < Version("3.0.0.a0"): - store = fsspec.get_mapper( - "reference://", fo=test_dict, remote_protocol="s3", remote_options=so - ) - else: - fs = fsspec.implementations.reference.ReferenceFileSystem(fo=test_dict) - store = zarr.storage.RemoteStore(fs, mode="r") + store = create_store(test_dict) ds = xr.open_dataset(store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False)) @@ -57,13 +61,7 @@ def test_single_direct_open(): h5f=url, inline_threshold=300, storage_options=so ).translate() - if Version(zarr.__version__) < Version("3.0.0.a0"): - store = fsspec.get_mapper( - "reference://", fo=test_dict, remote_protocol="s3", remote_options=so - ) - else: - fs = fsspec.implementations.reference.ReferenceFileSystem(fo=test_dict) - store = zarr.storage.RemoteStore(fs, mode="r") + store = create_store(test_dict) ds_direct = xr.open_dataset( store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False) @@ -73,13 +71,7 @@ def test_single_direct_open(): h5chunks = SingleHdf5ToZarr(f, url, storage_options=so) test_dict = h5chunks.translate() - if Version(zarr.__version__) < Version("3.0.0.a0"): - store = fsspec.get_mapper( - "reference://", fo=test_dict, remote_protocol="s3", remote_options=so - ) - else: - fs = fsspec.implementations.reference.ReferenceFileSystem(fo=test_dict) - store = zarr.storage.RemoteStore(fs, mode="r") + store = create_store(test_dict) ds_from_file_opener = xr.open_dataset( store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False) @@ -105,11 +97,8 @@ def test_multizarr(generate_mzz): """Test creating a combined reference file with MultiZarrToZarr""" mzz = generate_mzz test_dict = mzz.translate() - - m = fsspec.get_mapper( - "reference://", fo=test_dict, remote_protocol="s3", remote_options=so - ) - ds = xr.open_dataset(m, engine="zarr", backend_kwargs=dict(consolidated=False)) + store = create_store(test_dict) + ds = xr.open_dataset(store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False)) with fsspec.open_files(urls, **so) as fs: expts = [xr.open_dataset(f, engine="h5netcdf") for f in fs] @@ -183,11 +172,8 @@ def test_times(times_data): h5chunks = SingleHdf5ToZarr(f, url) test_dict = h5chunks.translate() - m = fsspec.get_mapper( - "reference://", - fo=test_dict, - ) - result = xr.open_dataset(m, engine="zarr", backend_kwargs=dict(consolidated=False)) + store = create_store(test_dict) + result = xr.open_dataset(store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False)) expected = x1.to_dataset() xr.testing.assert_equal(result, expected) @@ -199,11 +185,8 @@ def test_times_str(times_data): h5chunks = SingleHdf5ToZarr(url) test_dict = h5chunks.translate() - m = fsspec.get_mapper( - "reference://", - fo=test_dict, - ) - result = xr.open_dataset(m, engine="zarr", backend_kwargs=dict(consolidated=False)) + store = create_store(test_dict) + result = xr.open_dataset(store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False)) expected = x1.to_dataset() xr.testing.assert_equal(result, expected) @@ -327,8 +310,8 @@ def test_compress(): h.translate() continue out = h.translate() - m = fsspec.get_mapper("reference://", fo=out) - g = zarr.open(m, zarr_format=2) + store = create_store(out) + g = zarr.open(store, zarr_format=2) assert np.mean(g.data) == 49.5 @@ -337,8 +320,8 @@ def test_embed(): h = kerchunk.hdf.SingleHdf5ToZarr(fn, vlen_encode="embed") out = h.translate() - fs = fsspec.filesystem("reference", fo=out) - z = zarr.open(fs.get_mapper(), zarr_format=2) + store = create_store(out) + z = zarr.open(store, zarr_format=2) data = z["Domain_10"]["STER"]["min_1"]["boom_1"]["temperature"][:] assert data[0].tolist() == [ "2014-04-01 00:00:00.0", From 80fedcde9a6768761ee2f36bb2ae63b6310d4492 Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Thu, 10 Oct 2024 13:39:25 -0400 Subject: [PATCH 09/22] reorganize --- kerchunk/hdf.py | 51 ++------------------------------------ kerchunk/tests/test_hdf.py | 14 ++++++++--- kerchunk/utils.py | 44 ++++++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+), 53 deletions(-) diff --git a/kerchunk/hdf.py b/kerchunk/hdf.py index 6bb16922..6b7b443d 100644 --- a/kerchunk/hdf.py +++ b/kerchunk/hdf.py @@ -1,7 +1,7 @@ import base64 import io import logging -from typing import Union, BinaryIO, Any, cast +from typing import Union, BinaryIO from packaging.version import Version import fsspec.core @@ -11,7 +11,7 @@ import numcodecs from .codecs import FillStringsCodec -from .utils import _encode_for_JSON +from .utils import _encode_for_JSON, encode_fill_value try: import h5py @@ -22,12 +22,6 @@ "for more details." ) -# try: -# from zarr.meta import encode_fill_value -# except ModuleNotFoundError: -# # https://github.com/zarr-developers/zarr-python/issues/2021 -# from zarr.v2.meta import encode_fill_value - lggr = logging.getLogger("h5-to-zarr") _HIDDEN_ATTRS = { # from h5netcdf.attrs "REFERENCE_LIST", @@ -504,7 +498,6 @@ def _translator( lggr.debug(f"Created Zarr array: {za}") self._transfer_attrs(h5obj, za) - # za.attrs["_ARRAY_DIMENSIONS"] = adims lggr.debug(f"_ARRAY_DIMENSIONS = {adims}") if "data" in kwargs: @@ -705,43 +698,3 @@ def _is_netcdf_variable(dataset: h5py.Dataset): def has_visititems_links(): return hasattr(h5py.Group, "visititems_links") - -def encode_fill_value(v: Any, dtype: np.dtype, object_codec: Any = None) -> Any: - # early out - if v is None: - return v - if dtype.kind == "V" and dtype.hasobject: - if object_codec is None: - raise ValueError("missing object_codec for object array") - v = object_codec.encode(v) - v = str(base64.standard_b64encode(v), "ascii") - return v - if dtype.kind == "f": - if np.isnan(v): - return "NaN" - elif np.isposinf(v): - return "Infinity" - elif np.isneginf(v): - return "-Infinity" - else: - return float(v) - elif dtype.kind in "ui": - return int(v) - elif dtype.kind == "b": - return bool(v) - elif dtype.kind in "c": - c = cast(np.complex128, np.dtype(complex).type()) - v = ( - encode_fill_value(v.real, c.real.dtype, object_codec), - encode_fill_value(v.imag, c.imag.dtype, object_codec), - ) - return v - elif dtype.kind in "SV": - v = str(base64.standard_b64encode(v), "ascii") - return v - elif dtype.kind == "U": - return v - elif dtype.kind in "mM": - return int(v.view("i8")) - else: - return v diff --git a/kerchunk/tests/test_hdf.py b/kerchunk/tests/test_hdf.py index ace45472..665cd392 100644 --- a/kerchunk/tests/test_hdf.py +++ b/kerchunk/tests/test_hdf.py @@ -1,5 +1,6 @@ -import asyncio +from typing import Any import fsspec +import json import os.path as osp import fsspec.implementations @@ -23,25 +24,29 @@ async def list_dir(store, path): [x async for x in store.list_dir(path)] -def create_store(test_dict: dict): +def create_store(test_dict: dict, remote_options: Any = None): if Version(zarr.__version__) < Version("3.0.0.a0"): return fsspec.get_mapper( "reference://", fo=test_dict, remote_protocol="s3", remote_options=so ) else: - fs = fsspec.implementations.reference.ReferenceFileSystem(fo=test_dict) + fs = fsspec.implementations.reference.ReferenceFileSystem(fo=test_dict, remote_options=remote_options) return zarr.storage.RemoteStore(fs, mode="r") def test_single(): """Test creating references for a single HDF file""" - url = "s3://noaa-nwm-retro-v2.0-pds/full_physics/2017/201704010000.CHRTOUT_DOMAIN1.comp" + #url = "s3://noaa-nwm-retro-v2.0-pds/full_physics/2017/201704010000.CHRTOUT_DOMAIN1.comp" + url = "s3://noaa-nos-ofs-pds/ngofs2/netcdf/202410/ngofs2.t03z.20241001.2ds.f020.nc" so = dict(anon=True, default_fill_cache=False, default_cache_type="none") with fsspec.open(url, **so) as f: h5chunks = SingleHdf5ToZarr(f, url, storage_options=so) test_dict = h5chunks.translate() + with open("test_dict.json", "w") as f: + json.dump(test_dict, f) + store = create_store(test_dict) ds = xr.open_dataset(store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False)) @@ -97,6 +102,7 @@ def test_multizarr(generate_mzz): """Test creating a combined reference file with MultiZarrToZarr""" mzz = generate_mzz test_dict = mzz.translate() + store = create_store(test_dict) ds = xr.open_dataset(store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False)) diff --git a/kerchunk/utils.py b/kerchunk/utils.py index b52a9c0b..a0f9e96e 100644 --- a/kerchunk/utils.py +++ b/kerchunk/utils.py @@ -1,11 +1,13 @@ import base64 import copy import itertools +from typing import Any, cast import warnings import ujson import fsspec +import numpy as np import zarr @@ -134,6 +136,48 @@ def _encode_for_JSON(store): return store + +def encode_fill_value(v: Any, dtype: np.dtype, object_codec: Any = None) -> Any: + # early out + if v is None: + return v + if dtype.kind == "V" and dtype.hasobject: + if object_codec is None: + raise ValueError("missing object_codec for object array") + v = object_codec.encode(v) + v = str(base64.standard_b64encode(v), "ascii") + return v + if dtype.kind == "f": + if np.isnan(v): + return "NaN" + elif np.isposinf(v): + return "Infinity" + elif np.isneginf(v): + return "-Infinity" + else: + return float(v) + elif dtype.kind in "ui": + return int(v) + elif dtype.kind == "b": + return bool(v) + elif dtype.kind in "c": + c = cast(np.complex128, np.dtype(complex).type()) + v = ( + encode_fill_value(v.real, c.real.dtype, object_codec), + encode_fill_value(v.imag, c.imag.dtype, object_codec), + ) + return v + elif dtype.kind in "SV": + v = str(base64.standard_b64encode(v), "ascii") + return v + elif dtype.kind == "U": + return v + elif dtype.kind in "mM": + return int(v.view("i8")) + else: + return v + + def do_inline(store, threshold, remote_options=None, remote_protocol=None): """Replace short chunks with the value of that chunk and inline metadata From 1f69a0b129455ed712b1513ebf362c1c3be17b2f Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Thu, 10 Oct 2024 13:48:28 -0400 Subject: [PATCH 10/22] Save progress --- kerchunk/netCDF3.py | 13 ++++++++++--- kerchunk/tests/test_hdf.py | 2 +- kerchunk/tests/test_netcdf.py | 20 ++++++++++++++++++-- 3 files changed, 29 insertions(+), 6 deletions(-) diff --git a/kerchunk/netCDF3.py b/kerchunk/netCDF3.py index d44fc808..b9d47063 100644 --- a/kerchunk/netCDF3.py +++ b/kerchunk/netCDF3.py @@ -1,4 +1,5 @@ from functools import reduce +from packaging.version import Version from operator import mul import numpy as np @@ -167,7 +168,13 @@ def translate(self): import zarr out = self.out - z = zarr.open(out, mode="w", zarr_format=2) + if Version(zarr.__version__) < Version("3.0.0.a0"): + store = zarr.storage.KVStore(out) + z = zarr.group(store=store, overwrite=True) + else: + store = zarr.storage.MemoryStore(mode="a", store_dict=out) + z = zarr.open(store, mode="w", zarr_format=2) + for dim, var in self.variables.items(): if dim in self.chunks: shape = self.chunks[dim][-1] @@ -197,7 +204,7 @@ def translate(self): dtype=var.data.dtype, fill_value=fill, chunks=shape, - compression=None, + compressor=None, ) part = ".".join(["0"] * len(shape)) or "0" k = f"{dim}/{part}" @@ -251,7 +258,7 @@ def translate(self): dtype=base, fill_value=fill, chunks=(1,) + dtype.shape, - compression=None, + compressor=None, ) arr.attrs.update( { diff --git a/kerchunk/tests/test_hdf.py b/kerchunk/tests/test_hdf.py index 665cd392..233a58e4 100644 --- a/kerchunk/tests/test_hdf.py +++ b/kerchunk/tests/test_hdf.py @@ -27,7 +27,7 @@ async def list_dir(store, path): def create_store(test_dict: dict, remote_options: Any = None): if Version(zarr.__version__) < Version("3.0.0.a0"): return fsspec.get_mapper( - "reference://", fo=test_dict, remote_protocol="s3", remote_options=so + "reference://", fo=test_dict, remote_protocol="s3", remote_options=remote_options ) else: fs = fsspec.implementations.reference.ReferenceFileSystem(fo=test_dict, remote_options=remote_options) diff --git a/kerchunk/tests/test_netcdf.py b/kerchunk/tests/test_netcdf.py index 43b6021b..0036c0a3 100644 --- a/kerchunk/tests/test_netcdf.py +++ b/kerchunk/tests/test_netcdf.py @@ -1,4 +1,5 @@ import os +from typing import Any import fsspec @@ -7,6 +8,8 @@ import pytest from kerchunk import netCDF3 +import zarr + xr = pytest.importorskip("xarray") @@ -24,16 +27,29 @@ ) +def create_store(test_dict: dict, remote_options: Any = None): + if Version(zarr.__version__) < Version("3.0.0.a0"): + return fsspec.get_mapper( + "reference://", fo=test_dict, remote_protocol="s3", remote_options=remote_options + ) + else: + fs = fsspec.implementations.reference.ReferenceFileSystem(fo=test_dict, remote_options=remote_options) + return zarr.storage.RemoteStore(fs, mode="r") + + def test_one(m): m.pipe("data.nc3", bdata) h = netCDF3.netcdf_recording_file("memory://data.nc3") out = h.translate() + + store = create_store(out, remote_options={"remote_protocol": "memory"}) + ds = xr.open_dataset( - "reference://", + store, engine="zarr", backend_kwargs={ "consolidated": False, - "storage_options": {"fo": out, "remote_protocol": "memory"}, + "zarr_format": "2", }, ) assert (ds.data == data).all() From d556e528ab7f012afef68a9ec70f5bfd96c4470a Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Thu, 10 Oct 2024 15:30:11 -0400 Subject: [PATCH 11/22] Refactor to clean things up --- kerchunk/hdf.py | 11 ++--- kerchunk/netCDF3.py | 4 +- kerchunk/tests/test_hdf.py | 90 +++++++++++++++++--------------------- kerchunk/utils.py | 37 +++++++++++++--- kerchunk/zarr.py | 35 +++++++++++++++ 5 files changed, 112 insertions(+), 65 deletions(-) diff --git a/kerchunk/hdf.py b/kerchunk/hdf.py index 6b7b443d..7d416f83 100644 --- a/kerchunk/hdf.py +++ b/kerchunk/hdf.py @@ -10,6 +10,8 @@ import zarr import numcodecs +from kerchunk.zarr import dict_to_store + from .codecs import FillStringsCodec from .utils import _encode_for_JSON, encode_fill_value @@ -107,13 +109,8 @@ def __init__( raise NotImplementedError self.vlen = vlen_encode self.store_dict = out or {} - if Version(zarr.__version__) < Version("3.0.0.a0"): - self.store = zarr.storage.KVStore(self.store_dict) - self._zroot = zarr.group(store=self.store, overwrite=True) - else: - self.store = zarr.storage.MemoryStore(mode="a", store_dict=self.store_dict) - self._zroot = zarr.group(store=self.store, zarr_format=2, overwrite=True) - + self.store = dict_to_store(self.store_dict) + self._zroot = zarr.group(store=self.store, zarr_format=2, overwrite=True) self._uri = url self.error = error lggr.debug(f"HDF5 file URI: {self._uri}") diff --git a/kerchunk/netCDF3.py b/kerchunk/netCDF3.py index b9d47063..078a5f7b 100644 --- a/kerchunk/netCDF3.py +++ b/kerchunk/netCDF3.py @@ -198,7 +198,7 @@ def translate(self): fill = float(fill) if fill is not None and var.data.dtype.kind == "i": fill = int(fill) - arr = z.create_dataset( + arr = z.create_array( name=dim, shape=shape, dtype=var.data.dtype, @@ -252,7 +252,7 @@ def translate(self): fill = float(fill) if fill is not None and base.kind == "i": fill = int(fill) - arr = z.create_dataset( + arr = z.create_array( name=name, shape=shape, dtype=base, diff --git a/kerchunk/tests/test_hdf.py b/kerchunk/tests/test_hdf.py index 233a58e4..8e2117cc 100644 --- a/kerchunk/tests/test_hdf.py +++ b/kerchunk/tests/test_hdf.py @@ -1,42 +1,24 @@ -from typing import Any import fsspec import json import os.path as osp -import fsspec.implementations -import fsspec.implementations.reference - import kerchunk.hdf import numpy as np import pytest import xarray as xr import zarr -from packaging.version import Version - from kerchunk.hdf import SingleHdf5ToZarr, has_visititems_links from kerchunk.combine import MultiZarrToZarr, drop +from kerchunk.utils import refs_as_fs, refs_as_store +from kerchunk.zarr import fs_as_store here = osp.dirname(__file__) -async def list_dir(store, path): - [x async for x in store.list_dir(path)] - - -def create_store(test_dict: dict, remote_options: Any = None): - if Version(zarr.__version__) < Version("3.0.0.a0"): - return fsspec.get_mapper( - "reference://", fo=test_dict, remote_protocol="s3", remote_options=remote_options - ) - else: - fs = fsspec.implementations.reference.ReferenceFileSystem(fo=test_dict, remote_options=remote_options) - return zarr.storage.RemoteStore(fs, mode="r") - - def test_single(): """Test creating references for a single HDF file""" - #url = "s3://noaa-nwm-retro-v2.0-pds/full_physics/2017/201704010000.CHRTOUT_DOMAIN1.comp" + # url = "s3://noaa-nwm-retro-v2.0-pds/full_physics/2017/201704010000.CHRTOUT_DOMAIN1.comp" url = "s3://noaa-nos-ofs-pds/ngofs2/netcdf/202410/ngofs2.t03z.20241001.2ds.f020.nc" so = dict(anon=True, default_fill_cache=False, default_cache_type="none") @@ -47,9 +29,11 @@ def test_single(): with open("test_dict.json", "w") as f: json.dump(test_dict, f) - store = create_store(test_dict) + store = refs_as_store(test_dict) - ds = xr.open_dataset(store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False)) + ds = xr.open_dataset( + store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False) + ) with fsspec.open(url, **so) as f: expected = xr.open_dataset(f, engine="h5netcdf") @@ -66,7 +50,7 @@ def test_single_direct_open(): h5f=url, inline_threshold=300, storage_options=so ).translate() - store = create_store(test_dict) + store = refs_as_store(test_dict) ds_direct = xr.open_dataset( store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False) @@ -76,7 +60,7 @@ def test_single_direct_open(): h5chunks = SingleHdf5ToZarr(f, url, storage_options=so) test_dict = h5chunks.translate() - store = create_store(test_dict) + store = refs_as_store(test_dict) ds_from_file_opener = xr.open_dataset( store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False) @@ -103,8 +87,10 @@ def test_multizarr(generate_mzz): mzz = generate_mzz test_dict = mzz.translate() - store = create_store(test_dict) - ds = xr.open_dataset(store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False)) + store = refs_as_store(test_dict) + ds = xr.open_dataset( + store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False) + ) with fsspec.open_files(urls, **so) as fs: expts = [xr.open_dataset(f, engine="h5netcdf") for f in fs] @@ -178,8 +164,10 @@ def test_times(times_data): h5chunks = SingleHdf5ToZarr(f, url) test_dict = h5chunks.translate() - store = create_store(test_dict) - result = xr.open_dataset(store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False)) + store = refs_as_store(test_dict) + result = xr.open_dataset( + store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False) + ) expected = x1.to_dataset() xr.testing.assert_equal(result, expected) @@ -191,8 +179,10 @@ def test_times_str(times_data): h5chunks = SingleHdf5ToZarr(url) test_dict = h5chunks.translate() - store = create_store(test_dict) - result = xr.open_dataset(store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False)) + store = refs_as_store(test_dict) + result = xr.open_dataset( + store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False) + ) expected = x1.to_dataset() xr.testing.assert_equal(result, expected) @@ -205,9 +195,10 @@ def test_string_embed(): fn = osp.join(here, "vlen.h5") h = kerchunk.hdf.SingleHdf5ToZarr(fn, fn, vlen_encode="embed") out = h.translate() - fs = fsspec.filesystem("reference", fo=out) + fs = refs_as_fs(out) assert txt in fs.references["vlen_str/0"] - z = zarr.open(fs.get_mapper(), zarr_format=2) + store = fs_as_store(fs) + z = zarr.open(store, zarr_format=2) assert z.vlen_str.dtype == "O" assert z.vlen_str[0] == txt assert (z.vlen_str[1:] == "").all() @@ -217,8 +208,8 @@ def test_string_null(): fn = osp.join(here, "vlen.h5") h = kerchunk.hdf.SingleHdf5ToZarr(fn, fn, vlen_encode="null", inline_threshold=0) out = h.translate() - fs = fsspec.filesystem("reference", fo=out) - z = zarr.open(fs.get_mapper(), zarr_format=2) + store = refs_as_store(out) + z = zarr.open(store, zarr_format=2) assert z.vlen_str.dtype == "O" assert (z.vlen_str[:] == None).all() @@ -230,8 +221,8 @@ def test_string_leave(): f, fn, vlen_encode="leave", inline_threshold=0 ) out = h.translate() - fs = fsspec.filesystem("reference", fo=out) - z = zarr.open(fs.get_mapper(), zarr_format=2) + store = refs_as_store(out) + z = zarr.open(store, zarr_format=2) assert z.vlen_str.dtype == "S16" assert z.vlen_str[0] # some obscured ID assert (z.vlen_str[1:] == b"").all() @@ -244,9 +235,10 @@ def test_string_decode(): f, fn, vlen_encode="encode", inline_threshold=0 ) out = h.translate() - fs = fsspec.filesystem("reference", fo=out) + fs = refs_as_fs(out) assert txt in fs.cat("vlen_str/.zarray").decode() # stored in filter def - z = zarr.open(fs.get_mapper(), zarr_format=2) + store = fs_as_store(fs) + z = zarr.open(store, zarr_format=2) assert z.vlen_str[0] == txt assert (z.vlen_str[1:] == "").all() @@ -256,8 +248,8 @@ def test_compound_string_null(): with open(fn, "rb") as f: h = kerchunk.hdf.SingleHdf5ToZarr(f, fn, vlen_encode="null", inline_threshold=0) out = h.translate() - fs = fsspec.filesystem("reference", fo=out) - z = zarr.open(fs.get_mapper(), zarr_format=2) + store = refs_as_store(out) + z = zarr.open(store, zarr_format=2) assert z.vlen_str[0].tolist() == (10, None) assert (z.vlen_str["ints"][1:] == 0).all() assert (z.vlen_str["strs"][1:] == None).all() @@ -270,8 +262,8 @@ def test_compound_string_leave(): f, fn, vlen_encode="leave", inline_threshold=0 ) out = h.translate() - fs = fsspec.filesystem("reference", fo=out) - z = zarr.open(fs.get_mapper(), zarr_format=2) + store = refs_as_store(out) + z = zarr.open(store, zarr_format=2) assert z.vlen_str["ints"][0] == 10 assert z.vlen_str["strs"][0] # random ID assert (z.vlen_str["ints"][1:] == 0).all() @@ -285,8 +277,8 @@ def test_compound_string_encode(): f, fn, vlen_encode="encode", inline_threshold=0 ) out = h.translate() - fs = fsspec.filesystem("reference", fo=out) - z = zarr.open(fs.get_mapper(), zarr_format=2) + store = refs_as_store(out) + z = zarr.open(store, zarr_format=2) assert z.vlen_str["ints"][0] == 10 assert z.vlen_str["strs"][0] == "water" assert (z.vlen_str["ints"][1:] == 0).all() @@ -316,7 +308,7 @@ def test_compress(): h.translate() continue out = h.translate() - store = create_store(out) + store = refs_as_store(out) g = zarr.open(store, zarr_format=2) assert np.mean(g.data) == 49.5 @@ -326,7 +318,7 @@ def test_embed(): h = kerchunk.hdf.SingleHdf5ToZarr(fn, vlen_encode="embed") out = h.translate() - store = create_store(out) + store = refs_as_store(out) z = zarr.open(store, zarr_format=2) data = z["Domain_10"]["STER"]["min_1"]["boom_1"]["temperature"][:] assert data[0].tolist() == [ @@ -361,8 +353,8 @@ def test_translate_links(): out = kerchunk.hdf.SingleHdf5ToZarr(fn, inline_threshold=50).translate( preserve_linked_dsets=True ) - fs = fsspec.filesystem("reference", fo=out) - z = zarr.open(fs.get_mapper(), zarr_format=2) + store = refs_as_store(out) + z = zarr.open(store, zarr_format=2) # 1. Test the hard linked datasets were translated correctly # 2. Test the soft linked datasets were translated correctly diff --git a/kerchunk/utils.py b/kerchunk/utils.py index a0f9e96e..59aad1af 100644 --- a/kerchunk/utils.py +++ b/kerchunk/utils.py @@ -10,6 +10,28 @@ import numpy as np import zarr +from kerchunk.zarr import fs_as_store + + +def refs_as_fs(refs, remote_protocol=None, remote_options=None, **kwargs): + """Convert a reference set to an fsspec filesystem""" + fs = fsspec.filesystem( + "reference", + fo=refs, + remote_protocol=remote_protocol, + remote_options=remote_options, + **kwargs, + ) + return fs + + +def refs_as_store(refs, remote_protocol=None, remote_options=None): + """Convert a reference set to a zarr store""" + fs = refs_as_fs( + refs, remote_protocol=remote_protocol, remote_options=remote_options + ) + return fs_as_store(fs) + def class_factory(func): """Experimental uniform API across function-based file scanners""" @@ -74,7 +96,7 @@ def rename_target(refs, renames): ------- dict: the altered reference set, which can be saved """ - fs = fsspec.filesystem("reference", fo=refs) # to produce normalised refs + fs = refs_as_fs(refs) # to produce normalised refs refs = fs.references out = {} for k, v in refs.items(): @@ -136,7 +158,6 @@ def _encode_for_JSON(store): return store - def encode_fill_value(v: Any, dtype: np.dtype, object_codec: Any = None) -> Any: # early out if v is None: @@ -190,6 +211,9 @@ def do_inline(store, threshold, remote_options=None, remote_protocol=None): remote_options=remote_options, remote_protocol=remote_protocol, ) + fs = refs_as_fs( + store, remote_protocol=remote_protocol, remote_options=remote_options + ) out = fs.references.copy() # Inlining is done when one of two conditions are satisfied: @@ -267,10 +291,9 @@ def inline_array(store, threshold=1000, names=None, remote_options=None): ------- amended references set (simple style) """ - fs = fsspec.filesystem( - "reference", fo=store, **(remote_options or {}), skip_instance_cache=True - ) - g = zarr.open_group(fs.get_mapper(), mode="r+", zarr_format=2) + fs = refs_as_fs(store, remote_options=remote_options or {}) + zarr_store = fs_as_store(store, mode="r+", remote_options=remote_options or {}) + g = zarr.open_group(zarr_store, mode="r+", zarr_format=2) _inline_array(g, threshold, names=names or []) return fs.references @@ -293,7 +316,7 @@ def subchunk(store, variable, factor): ------- modified store """ - fs = fsspec.filesystem("reference", fo=store) + fs = refs_as_fs(store) store = fs.references meta_file = f"{variable}/.zarray" meta = ujson.loads(fs.cat(meta_file)) diff --git a/kerchunk/zarr.py b/kerchunk/zarr.py index ea0612de..5560ea99 100644 --- a/kerchunk/zarr.py +++ b/kerchunk/zarr.py @@ -1,9 +1,44 @@ +from packaging.version import Version + import fsspec from fsspec.implementations.reference import LazyReferenceMapper +import zarr import kerchunk.utils +def is_zarr3(): + """Check if the installed zarr version is version 3""" + return Version(zarr.__version__) >= Version("3.0.0.a0") + + +def dict_to_store(store_dict: dict): + """Create an in memory zarr store backed by the given dictionary""" + if is_zarr3(): + return zarr.storage.MemoryStore(mode="a", store_dict=store_dict) + else: + return zarr.storage.KVStore(store_dict) + + +def fs_as_store(fs, mode='r', remote_protocol=None, remote_options=None): + """Open the refs as a zarr store + + Parameters + ---------- + refs: dict-like + the references to open + mode: str + + Returns + ------- + zarr.storage.Store or zarr.storage.Mapper, fsspec.AbstractFileSystem + """ + if is_zarr3(): + return zarr.storage.RemoteStore(fs, mode=mode) + else: + return fs.get_mapper() + + def single_zarr( uri_or_store, storage_options=None, From b27e64c5e0d0e13e83e9ae5adb297ec473d8eada Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Thu, 10 Oct 2024 16:06:03 -0400 Subject: [PATCH 12/22] Fix circular import --- kerchunk/hdf.py | 5 +---- kerchunk/tests/test_netcdf.py | 17 +++-------------- kerchunk/utils.py | 35 +++++++++++++++++++++++++++++++++-- kerchunk/zarr.py | 35 ----------------------------------- 4 files changed, 37 insertions(+), 55 deletions(-) diff --git a/kerchunk/hdf.py b/kerchunk/hdf.py index 7d416f83..bc00517f 100644 --- a/kerchunk/hdf.py +++ b/kerchunk/hdf.py @@ -2,7 +2,6 @@ import io import logging from typing import Union, BinaryIO -from packaging.version import Version import fsspec.core from fsspec.implementations.reference import LazyReferenceMapper @@ -10,10 +9,8 @@ import zarr import numcodecs -from kerchunk.zarr import dict_to_store - from .codecs import FillStringsCodec -from .utils import _encode_for_JSON, encode_fill_value +from .utils import _encode_for_JSON, encode_fill_value, dict_to_store try: import h5py diff --git a/kerchunk/tests/test_netcdf.py b/kerchunk/tests/test_netcdf.py index 0036c0a3..755823da 100644 --- a/kerchunk/tests/test_netcdf.py +++ b/kerchunk/tests/test_netcdf.py @@ -1,5 +1,4 @@ import os -from typing import Any import fsspec @@ -8,7 +7,7 @@ import pytest from kerchunk import netCDF3 -import zarr +from kerchunk.utils import refs_as_store xr = pytest.importorskip("xarray") @@ -27,29 +26,19 @@ ) -def create_store(test_dict: dict, remote_options: Any = None): - if Version(zarr.__version__) < Version("3.0.0.a0"): - return fsspec.get_mapper( - "reference://", fo=test_dict, remote_protocol="s3", remote_options=remote_options - ) - else: - fs = fsspec.implementations.reference.ReferenceFileSystem(fo=test_dict, remote_options=remote_options) - return zarr.storage.RemoteStore(fs, mode="r") - - def test_one(m): m.pipe("data.nc3", bdata) h = netCDF3.netcdf_recording_file("memory://data.nc3") out = h.translate() - store = create_store(out, remote_options={"remote_protocol": "memory"}) + store = refs_as_store(out, remote_protocol="memory") ds = xr.open_dataset( store, engine="zarr", backend_kwargs={ "consolidated": False, - "zarr_format": "2", + "zarr_format": 2, }, ) assert (ds.data == data).all() diff --git a/kerchunk/utils.py b/kerchunk/utils.py index 59aad1af..c90f89fe 100644 --- a/kerchunk/utils.py +++ b/kerchunk/utils.py @@ -1,6 +1,7 @@ import base64 import copy import itertools +from packaging.version import Version from typing import Any, cast import warnings @@ -10,8 +11,6 @@ import numpy as np import zarr -from kerchunk.zarr import fs_as_store - def refs_as_fs(refs, remote_protocol=None, remote_options=None, **kwargs): """Convert a reference set to an fsspec filesystem""" @@ -33,6 +32,38 @@ def refs_as_store(refs, remote_protocol=None, remote_options=None): return fs_as_store(fs) +def is_zarr3(): + """Check if the installed zarr version is version 3""" + return Version(zarr.__version__) >= Version("3.0.0.a0") + + +def dict_to_store(store_dict: dict): + """Create an in memory zarr store backed by the given dictionary""" + if is_zarr3(): + return zarr.storage.MemoryStore(mode="a", store_dict=store_dict) + else: + return zarr.storage.KVStore(store_dict) + + +def fs_as_store(fs, mode='r', remote_protocol=None, remote_options=None): + """Open the refs as a zarr store + + Parameters + ---------- + refs: dict-like + the references to open + mode: str + + Returns + ------- + zarr.storage.Store or zarr.storage.Mapper, fsspec.AbstractFileSystem + """ + if is_zarr3(): + return zarr.storage.RemoteStore(fs, mode=mode) + else: + return fs.get_mapper() + + def class_factory(func): """Experimental uniform API across function-based file scanners""" diff --git a/kerchunk/zarr.py b/kerchunk/zarr.py index 5560ea99..ea0612de 100644 --- a/kerchunk/zarr.py +++ b/kerchunk/zarr.py @@ -1,44 +1,9 @@ -from packaging.version import Version - import fsspec from fsspec.implementations.reference import LazyReferenceMapper -import zarr import kerchunk.utils -def is_zarr3(): - """Check if the installed zarr version is version 3""" - return Version(zarr.__version__) >= Version("3.0.0.a0") - - -def dict_to_store(store_dict: dict): - """Create an in memory zarr store backed by the given dictionary""" - if is_zarr3(): - return zarr.storage.MemoryStore(mode="a", store_dict=store_dict) - else: - return zarr.storage.KVStore(store_dict) - - -def fs_as_store(fs, mode='r', remote_protocol=None, remote_options=None): - """Open the refs as a zarr store - - Parameters - ---------- - refs: dict-like - the references to open - mode: str - - Returns - ------- - zarr.storage.Store or zarr.storage.Mapper, fsspec.AbstractFileSystem - """ - if is_zarr3(): - return zarr.storage.RemoteStore(fs, mode=mode) - else: - return fs.get_mapper() - - def single_zarr( uri_or_store, storage_options=None, From 41d6e8e2eb36b09df844755ea4cb7f38a8d3f818 Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Thu, 10 Oct 2024 16:07:17 -0400 Subject: [PATCH 13/22] Iterate --- kerchunk/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kerchunk/utils.py b/kerchunk/utils.py index c90f89fe..5cab841d 100644 --- a/kerchunk/utils.py +++ b/kerchunk/utils.py @@ -323,7 +323,7 @@ def inline_array(store, threshold=1000, names=None, remote_options=None): amended references set (simple style) """ fs = refs_as_fs(store, remote_options=remote_options or {}) - zarr_store = fs_as_store(store, mode="r+", remote_options=remote_options or {}) + zarr_store = fs_as_store(fs, mode="r+", remote_options=remote_options or {}) g = zarr.open_group(zarr_store, mode="r+", zarr_format=2) _inline_array(g, threshold, names=names or []) return fs.references From 7ade1a6dc2369583869a2a6d34a6953b223a9e02 Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Thu, 10 Oct 2024 17:08:19 -0400 Subject: [PATCH 14/22] Change zarr dep --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 6e57e223..5eb7c0c9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ dependencies = [ "numcodecs", "numpy", "ujson", - "zarr==3.0.0a7", + "zarr", ] [project.optional-dependencies] From 492ddeebac4d844ce63ee6aa93b14f5ce613efed Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Thu, 10 Oct 2024 17:31:49 -0400 Subject: [PATCH 15/22] More conversion --- kerchunk/fits.py | 7 ++++--- kerchunk/hdf.py | 13 ++----------- kerchunk/netCDF3.py | 11 ++++------- kerchunk/tests/test_fits.py | 22 ++++++++++++---------- kerchunk/tests/test_tiff.py | 10 ++++++---- kerchunk/utils.py | 37 ++++++++++++++++++++++++++++++++++--- 6 files changed, 62 insertions(+), 38 deletions(-) diff --git a/kerchunk/fits.py b/kerchunk/fits.py index f50bef64..f0d4fa8e 100644 --- a/kerchunk/fits.py +++ b/kerchunk/fits.py @@ -8,7 +8,7 @@ from fsspec.implementations.reference import LazyReferenceMapper -from kerchunk.utils import class_factory +from kerchunk.utils import class_factory, dict_to_store from kerchunk.codecs import AsciiTableCodec, VarArrCodec try: @@ -72,7 +72,8 @@ def process_file( storage_options = storage_options or {} out = out or {} - g = zarr.open(out, zarr_format=2) + store = dict_to_store(out) + g = zarr.open_group(store=store, zarr_format=2) with fsspec.open(url, mode="rb", **storage_options) as f: infile = fits.open(f, do_not_scale_image_data=True) @@ -164,7 +165,7 @@ def process_file( # TODO: we could sub-chunk on biggest dimension name = hdu.name or str(ext) arr = g.empty( - name, dtype=dtype, shape=shape, chunks=shape, compression=None, **kwargs + name=name, dtype=dtype, shape=shape, chunks=shape, compressor=None, zarr_format=2, **kwargs ) arr.attrs.update( { diff --git a/kerchunk/hdf.py b/kerchunk/hdf.py index bc00517f..7cb4b5f6 100644 --- a/kerchunk/hdf.py +++ b/kerchunk/hdf.py @@ -10,7 +10,7 @@ import numcodecs from .codecs import FillStringsCodec -from .utils import _encode_for_JSON, encode_fill_value, dict_to_store +from .utils import _encode_for_JSON, encode_fill_value, dict_to_store, translate_refs_serializable try: import h5py @@ -150,16 +150,7 @@ def translate(self, preserve_linked_dsets=False): self.store.flush() return self.store else: - keys_to_remove = [] - new_keys = {} - for k, v in self.store_dict.items(): - if isinstance(v, zarr.core.buffer.cpu.Buffer): - key = str.removeprefix(k, "/") - new_keys[key] = v.to_bytes() - keys_to_remove.append(k) - for k in keys_to_remove: - del self.store_dict[k] - self.store_dict.update(new_keys) + translate_refs_serializable(self.store_dict) store = _encode_for_JSON(self.store_dict) return {"version": 1, "refs": store} diff --git a/kerchunk/netCDF3.py b/kerchunk/netCDF3.py index 078a5f7b..31438bb0 100644 --- a/kerchunk/netCDF3.py +++ b/kerchunk/netCDF3.py @@ -6,7 +6,7 @@ from fsspec.implementations.reference import LazyReferenceMapper import fsspec -from kerchunk.utils import _encode_for_JSON, inline_array +from kerchunk.utils import _encode_for_JSON, dict_to_store, inline_array, translate_refs_serializable try: from scipy.io._netcdf import ZERO, NC_VARIABLE, netcdf_file, netcdf_variable @@ -168,12 +168,8 @@ def translate(self): import zarr out = self.out - if Version(zarr.__version__) < Version("3.0.0.a0"): - store = zarr.storage.KVStore(out) - z = zarr.group(store=store, overwrite=True) - else: - store = zarr.storage.MemoryStore(mode="a", store_dict=out) - z = zarr.open(store, mode="w", zarr_format=2) + store = dict_to_store(out) + z = zarr.open(store, mode="w", zarr_format=2, overwrite=True) for dim, var in self.variables.items(): if dim in self.chunks: @@ -302,6 +298,7 @@ def translate(self): out.flush() return out else: + translate_refs_serializable(out) out = _encode_for_JSON(out) return {"version": 1, "refs": out} diff --git a/kerchunk/tests/test_fits.py b/kerchunk/tests/test_fits.py index 5d7c3b6d..de2cad5f 100644 --- a/kerchunk/tests/test_fits.py +++ b/kerchunk/tests/test_fits.py @@ -2,6 +2,8 @@ import fsspec import pytest +from kerchunk.utils import refs_as_store + fits = pytest.importorskip("astropy.io.fits") import kerchunk.fits @@ -17,8 +19,8 @@ def test_ascii_table(): # this one directly hits a remote server - should cache? url = "https://fits.gsfc.nasa.gov/samples/WFPC2u5780205r_c0fx.fits" out = kerchunk.fits.process_file(url, extension=1) - m = fsspec.get_mapper("reference://", fo=out, remote_protocol="https") - g = zarr.open(m, zarr_format=2) + store = refs_as_store(out, remote_protocol="https") + g = zarr.open(store, zarr_format=2) arr = g["u5780205r_cvt.c0h.tab"][:] with fsspec.open( "https://fits.gsfc.nasa.gov/samples/WFPC2u5780205r_c0fx.fits" @@ -30,8 +32,8 @@ def test_ascii_table(): def test_binary_table(): out = kerchunk.fits.process_file(btable, extension=1) - m = fsspec.get_mapper("reference://", fo=out) - z = zarr.open(m, zarr_format=2) + store = refs_as_store(out) + z = zarr.open(store, zarr_format=2) arr = z["1"] with open(btable, "rb") as f: hdul = fits.open(f) @@ -47,8 +49,8 @@ def test_binary_table(): def test_cube(): out = kerchunk.fits.process_file(range_im) - m = fsspec.get_mapper("reference://", fo=out) - z = zarr.open(m, zarr_format=2) + store = refs_as_store(out) + z = zarr.open(store, zarr_format=2) arr = z["PRIMARY"] with open(range_im, "rb") as f: hdul = fits.open(f) @@ -60,8 +62,8 @@ def test_with_class(): ftz = kerchunk.fits.FitsToZarr(range_im) out = ftz.translate() assert "fits" in repr(ftz) - m = fsspec.get_mapper("reference://", fo=out) - z = zarr.open(m, zarr_format=2) + store = refs_as_store(out) + z = zarr.open(store, zarr_format=2) arr = z["PRIMARY"] with open(range_im, "rb") as f: hdul = fits.open(f) @@ -75,8 +77,8 @@ def test_var(): ftz = kerchunk.fits.FitsToZarr(var) out = ftz.translate() - m = fsspec.get_mapper("reference://", fo=out) - z = zarr.open(m, zarr_format=2) + store = refs_as_store(out) + z = zarr.open(store, zarr_format=2) arr = z["1"] vars = [_.tolist() for _ in arr["var"]] diff --git a/kerchunk/tests/test_tiff.py b/kerchunk/tests/test_tiff.py index 74ba59a4..b81e7bab 100644 --- a/kerchunk/tests/test_tiff.py +++ b/kerchunk/tests/test_tiff.py @@ -5,6 +5,8 @@ import pytest import xarray as xr +from kerchunk.utils import refs_as_store + pytest.importorskip("tifffile") pytest.importorskip("rioxarray") import kerchunk.tiff @@ -15,8 +17,8 @@ def test_one(): fn = files[0] out = kerchunk.tiff.tiff_to_zarr(fn) - m = fsspec.get_mapper("reference://", fo=out) - z = zarr.open(m, zarr_format=2) + store = refs_as_store(out) + z = zarr.open(store, zarr_format=2) assert list(z) == ["0", "1", "2"] assert z.attrs["multiscales"] == [ { @@ -33,8 +35,8 @@ def test_one(): def test_coord(): fn = files[0] out = kerchunk.tiff.tiff_to_zarr(fn) - m = fsspec.get_mapper("reference://", fo=out) - z = zarr.open(m, zarr_format=2) # highest res is the one xarray picks + store = refs_as_store(out) + z = zarr.open(out, zarr_format=2) # highest res is the one xarray picks out = kerchunk.tiff.generate_coords(z.attrs, z[0].shape) ds = xr.open_dataset(fn) diff --git a/kerchunk/utils.py b/kerchunk/utils.py index 5cab841d..71cee56a 100644 --- a/kerchunk/utils.py +++ b/kerchunk/utils.py @@ -45,15 +45,15 @@ def dict_to_store(store_dict: dict): return zarr.storage.KVStore(store_dict) -def fs_as_store(fs, mode='r', remote_protocol=None, remote_options=None): +def fs_as_store(fs, mode="r", remote_protocol=None, remote_options=None): """Open the refs as a zarr store - + Parameters ---------- refs: dict-like the references to open mode: str - + Returns ------- zarr.storage.Store or zarr.storage.Mapper, fsspec.AbstractFileSystem @@ -538,3 +538,34 @@ def templateize(strings, min_length=10, template_name="u"): else: template = {} return template, strings + + +def translate_refs_serializable(refs: dict): + """Translate a reference set to a serializable form, given that zarr + v3 memory stores store data in buffers by default. This modifies the + input dictionary in place, and returns a reference to it. + + It also fixes keys that have a leading slash, which is not appropriate for + zarr v3 keys + + Parameters + ---------- + refs: dict + The reference set + + Returns + ------- + dict + A serializable form of the reference set + """ + keys_to_remove = [] + new_keys = {} + for k, v in refs.items(): + if isinstance(v, zarr.core.buffer.cpu.Buffer): + key = k.removeprefix("/") + new_keys[key] = v.to_bytes() + keys_to_remove.append(k) + for k in keys_to_remove: + del refs[k] + refs.update(new_keys) + return refs \ No newline at end of file From 6e5741ca7d4fe25a9d37bbc3d72266e28c6695de Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Tue, 15 Oct 2024 09:48:05 -0400 Subject: [PATCH 16/22] Specify zarr version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 5eb7c0c9..3c361a2d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ dependencies = [ "numcodecs", "numpy", "ujson", - "zarr", + "zarr==3.0.0b0", ] [project.optional-dependencies] From c0316ace9b18455aece8d0910a33cd4791e083ce Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Wed, 23 Oct 2024 09:31:10 -0400 Subject: [PATCH 17/22] Working remote hdf tests --- kerchunk/hdf.py | 2 +- kerchunk/tests/test_hdf.py | 22 +++++++++++----------- kerchunk/utils.py | 37 ++++++++++++++++++++++++------------- kerchunk/xarray_backend.py | 4 +++- pyproject.toml | 2 +- 5 files changed, 40 insertions(+), 27 deletions(-) diff --git a/kerchunk/hdf.py b/kerchunk/hdf.py index 7cb4b5f6..1d4d0054 100644 --- a/kerchunk/hdf.py +++ b/kerchunk/hdf.py @@ -461,7 +461,7 @@ def _translator( if h5obj.attrs.get("_FillValue") is not None: fill = h5obj.attrs.get("_FillValue") fill = encode_fill_value( - h5obj.attrs.get("_FillValue"), dt or h5obj.dtype + fill, dt or h5obj.dtype ) adims = self._get_array_dims(h5obj) diff --git a/kerchunk/tests/test_hdf.py b/kerchunk/tests/test_hdf.py index 8e2117cc..f600a127 100644 --- a/kerchunk/tests/test_hdf.py +++ b/kerchunk/tests/test_hdf.py @@ -1,7 +1,12 @@ +import asyncio import fsspec import json import os.path as osp +import zarr.core +import zarr.core.buffer +import zarr.core.group + import kerchunk.hdf import numpy as np import pytest @@ -11,33 +16,28 @@ from kerchunk.hdf import SingleHdf5ToZarr, has_visititems_links from kerchunk.combine import MultiZarrToZarr, drop from kerchunk.utils import refs_as_fs, refs_as_store -from kerchunk.zarr import fs_as_store +from kerchunk.utils import fs_as_store here = osp.dirname(__file__) def test_single(): """Test creating references for a single HDF file""" - # url = "s3://noaa-nwm-retro-v2.0-pds/full_physics/2017/201704010000.CHRTOUT_DOMAIN1.comp" - url = "s3://noaa-nos-ofs-pds/ngofs2/netcdf/202410/ngofs2.t03z.20241001.2ds.f020.nc" + url = "s3://noaa-nwm-retro-v2.0-pds/full_physics/2017/201704010000.CHRTOUT_DOMAIN1.comp" so = dict(anon=True, default_fill_cache=False, default_cache_type="none") with fsspec.open(url, **so) as f: - h5chunks = SingleHdf5ToZarr(f, url, storage_options=so) + h5chunks = SingleHdf5ToZarr(f, url, storage_options=so, inline_threshold=1) test_dict = h5chunks.translate() with open("test_dict.json", "w") as f: json.dump(test_dict, f) - store = refs_as_store(test_dict) - - ds = xr.open_dataset( - store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False) - ) + store = refs_as_store(test_dict, remote_options=dict(asynchronous=True, anon=True)) + ds = xr.open_zarr(store, zarr_format=2, consolidated=False) with fsspec.open(url, **so) as f: expected = xr.open_dataset(f, engine="h5netcdf") - xr.testing.assert_equal(ds.drop_vars("crs"), expected.drop_vars("crs")) @@ -164,7 +164,7 @@ def test_times(times_data): h5chunks = SingleHdf5ToZarr(f, url) test_dict = h5chunks.translate() - store = refs_as_store(test_dict) + store = refs_as_store(test_dict, remote_protocol="file") result = xr.open_dataset( store, engine="zarr", zarr_format=2, backend_kwargs=dict(consolidated=False) ) diff --git a/kerchunk/utils.py b/kerchunk/utils.py index 71cee56a..8cc2f765 100644 --- a/kerchunk/utils.py +++ b/kerchunk/utils.py @@ -1,6 +1,7 @@ import base64 import copy import itertools +import fsspec.asyn from packaging.version import Version from typing import Any, cast import warnings @@ -24,12 +25,23 @@ def refs_as_fs(refs, remote_protocol=None, remote_options=None, **kwargs): return fs -def refs_as_store(refs, remote_protocol=None, remote_options=None): +def refs_as_store(refs, mode="r", remote_protocol=None, remote_options=None): """Convert a reference set to a zarr store""" + asynchronous = False + if is_zarr3(): + asynchronous = True + if remote_options is None: + remote_options = {"asynchronous": True} + else: + remote_options["asynchronous"] = True + fs = refs_as_fs( - refs, remote_protocol=remote_protocol, remote_options=remote_options + refs, + remote_protocol=remote_protocol, + remote_options=remote_options, + asynchronous=asynchronous, ) - return fs_as_store(fs) + return fs_as_store(fs, mode=mode) def is_zarr3(): @@ -40,18 +52,17 @@ def is_zarr3(): def dict_to_store(store_dict: dict): """Create an in memory zarr store backed by the given dictionary""" if is_zarr3(): - return zarr.storage.MemoryStore(mode="a", store_dict=store_dict) + return zarr.storage.MemoryStore(mode="w", store_dict=store_dict) else: return zarr.storage.KVStore(store_dict) -def fs_as_store(fs, mode="r", remote_protocol=None, remote_options=None): +def fs_as_store(fs: fsspec.asyn.AsyncFileSystem, mode="r"): """Open the refs as a zarr store Parameters ---------- - refs: dict-like - the references to open + fs: fsspec.async.AsyncFileSystem mode: str Returns @@ -541,18 +552,18 @@ def templateize(strings, min_length=10, template_name="u"): def translate_refs_serializable(refs: dict): - """Translate a reference set to a serializable form, given that zarr - v3 memory stores store data in buffers by default. This modifies the + """Translate a reference set to a serializable form, given that zarr + v3 memory stores store data in buffers by default. This modifies the input dictionary in place, and returns a reference to it. - It also fixes keys that have a leading slash, which is not appropriate for - zarr v3 keys + It also fixes keys that have a leading slash, which is not appropriate for + zarr v3 keys Parameters ---------- refs: dict The reference set - + Returns ------- dict @@ -568,4 +579,4 @@ def translate_refs_serializable(refs: dict): for k in keys_to_remove: del refs[k] refs.update(new_keys) - return refs \ No newline at end of file + return refs diff --git a/kerchunk/xarray_backend.py b/kerchunk/xarray_backend.py index ca377f6d..dfbbafba 100644 --- a/kerchunk/xarray_backend.py +++ b/kerchunk/xarray_backend.py @@ -43,4 +43,6 @@ def open_reference_dataset( m = fsspec.get_mapper("reference://", fo=filename_or_obj, **storage_options) - return xr.open_dataset(m, engine="zarr", consolidated=False, **open_dataset_options) + return xr.open_dataset( + m, engine="zarr", zarr_format=2, consolidated=False, **open_dataset_options + ) diff --git a/pyproject.toml b/pyproject.toml index 3c361a2d..5eb7c0c9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ dependencies = [ "numcodecs", "numpy", "ujson", - "zarr==3.0.0b0", + "zarr", ] [project.optional-dependencies] From 59bd36cafd33b9ec3c29ddf90e9041197e38dc30 Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Wed, 23 Oct 2024 10:03:25 -0400 Subject: [PATCH 18/22] Working grib impl --- kerchunk/grib2.py | 27 ++++++++------ kerchunk/tests/test_grib.py | 73 ++++++++++++++++++------------------- 2 files changed, 50 insertions(+), 50 deletions(-) diff --git a/kerchunk/grib2.py b/kerchunk/grib2.py index 7d75786f..e4e64bf3 100644 --- a/kerchunk/grib2.py +++ b/kerchunk/grib2.py @@ -11,7 +11,7 @@ import xarray import numpy as np -from kerchunk.utils import class_factory, _encode_for_JSON +from kerchunk.utils import class_factory, _encode_for_JSON, dict_to_store, translate_refs_serializable from kerchunk.codecs import GRIBCodec from kerchunk.combine import MultiZarrToZarr, drop from kerchunk._grib_idx import parse_grib_idx, build_idx_grib_mapping, map_from_index @@ -71,13 +71,13 @@ def _store_array(store, z, data, var, inline_threshold, offset, size, attr): shape = tuple(data.shape or ()) if nbytes < inline_threshold: logger.debug(f"Store {var} inline") - d = z.create_dataset( + d = z.create_array( name=var, shape=shape, chunks=shape, dtype=data.dtype, fill_value=attr.get("missingValue", None), - compressor=False, + compressor=None, ) if hasattr(data, "tobytes"): b = data.tobytes() @@ -91,15 +91,14 @@ def _store_array(store, z, data, var, inline_threshold, offset, size, attr): store[f"{var}/0"] = b.decode("ascii") else: logger.debug(f"Store {var} reference") - d = z.create_dataset( + d = z.create_array( name=var, shape=shape, chunks=shape, dtype=data.dtype, fill_value=attr.get("missingValue", None), filters=[GRIBCodec(var=var, dtype=str(data.dtype))], - compressor=False, - overwrite=True, + compressor=None, ) store[f"{var}/" + ".".join(["0"] * len(shape))] = ["{{u}}", offset, size] d.attrs.update(attr) @@ -153,7 +152,9 @@ def scan_grib( with fsspec.open(url, "rb", **storage_options) as f: logger.debug(f"File {url}") for offset, size, data in _split_file(f, skip=skip): - store = {} + store_dict = {} + store = dict_to_store(store_dict) + mid = eccodes.codes_new_from_message(data) m = cfgrib.cfmessage.CfMessage(mid) @@ -227,7 +228,7 @@ def scan_grib( varName = m["cfVarName"] if varName in ("undef", "unknown"): varName = m["shortName"] - _store_array(store, z, vals, varName, inline_threshold, offset, size, attrs) + _store_array(store_dict, z, vals, varName, inline_threshold, offset, size, attrs) if "typeOfLevel" in message_keys and "level" in message_keys: name = m["typeOfLevel"] coordinates.append(name) @@ -241,7 +242,7 @@ def scan_grib( attrs = {} attrs["_ARRAY_DIMENSIONS"] = [] _store_array( - store, z, data, name, inline_threshold, offset, size, attrs + store_dict, z, data, name, inline_threshold, offset, size, attrs ) dims = ( ["y", "x"] @@ -298,7 +299,7 @@ def scan_grib( dims = [coord] attrs = cfgrib.dataset.COORD_ATTRS[coord] _store_array( - store, + store_dict, z, x, coord, @@ -311,10 +312,11 @@ def scan_grib( if coordinates: z.attrs["coordinates"] = " ".join(coordinates) + translate_refs_serializable(store_dict) out.append( { "version": 1, - "refs": _encode_for_JSON(store), + "refs": _encode_for_JSON(store_dict), "templates": {"u": url}, } ) @@ -397,7 +399,8 @@ def grib_tree( filters = ["stepType", "typeOfLevel"] # TODO allow passing a LazyReferenceMapper as output? - zarr_store = {} + zarr_store_dict = {} + zarr_store = dict_to_store(zarr_store_dict) zroot = zarr.open_group(store=zarr_store, zarr_format=2) aggregations: Dict[str, List] = defaultdict(list) diff --git a/kerchunk/tests/test_grib.py b/kerchunk/tests/test_grib.py index 9102529e..74f24a6d 100644 --- a/kerchunk/tests/test_grib.py +++ b/kerchunk/tests/test_grib.py @@ -6,7 +6,7 @@ import pandas as pd import pytest import xarray as xr -import datatree +#import datatree import zarr import ujson from kerchunk.grib2 import ( @@ -21,6 +21,7 @@ extract_dataset_chunk_index, extract_datatree_chunk_index, ) +from kerchunk.utils import refs_as_store eccodes_ver = tuple(int(i) for i in eccodes.__version__.split(".")) cfgrib = pytest.importorskip("cfgrib") @@ -68,17 +69,13 @@ def _fetch_first(url): def test_archives(tmpdir, url): grib = GribToZarr(url, storage_options={"anon": True}, skip=1) out = grib.translate()[0] - ours = xr.open_dataset( - "reference://", - engine="zarr", - backend_kwargs={ - "consolidated": False, - "storage_options": { - "fo": out, - "remote_protocol": "s3", - "remote_options": {"anon": True}, - }, - }, + + store = refs_as_store(out) + + ours = xr.open_zarr( + store, + zarr_format=2, + consolidated=False, ) data = _fetch_first(url) @@ -266,22 +263,22 @@ def test_hrrr_sfcf_grib_tree(): assert zg.u.instant.isobaricInhPa.time.shape == (1,) -def test_hrrr_sfcf_grib_datatree(): - fpath = os.path.join(here, "hrrr.wrfsfcf.subset.json") - with open(fpath, "rb") as fobj: - scanned_msgs = ujson.load(fobj) - merged = grib_tree(scanned_msgs) - dt = datatree.open_datatree( - fsspec.filesystem("reference", fo=merged).get_mapper(""), - engine="zarr", - consolidated=False, - ) - # Assert a few things... but if it loads we are mostly done. - np.testing.assert_array_equal( - dt.u.instant.heightAboveGround.step.values[:], - np.array([0, 3600 * 10**9], dtype="timedelta64[ns]"), - ) - assert dt.u.attrs == dict(name="U component of wind") +# def test_hrrr_sfcf_grib_datatree(): +# fpath = os.path.join(here, "hrrr.wrfsfcf.subset.json") +# with open(fpath, "rb") as fobj: +# scanned_msgs = ujson.load(fobj) +# merged = grib_tree(scanned_msgs) +# dt = datatree.open_datatree( +# fsspec.filesystem("reference", fo=merged).get_mapper(""), +# engine="zarr", +# consolidated=False, +# ) +# # Assert a few things... but if it loads we are mostly done. +# np.testing.assert_array_equal( +# dt.u.instant.heightAboveGround.step.values[:], +# np.array([0, 3600 * 10**9], dtype="timedelta64[ns]"), +# ) +# assert dt.u.attrs == dict(name="U component of wind") def test_parse_grib_idx_invalid_url(): @@ -345,17 +342,17 @@ def test_parse_grib_idx_content(idx_url, storage_options): assert idx_df.iloc[message_no]["length"] == output[message_no]["refs"][variable][2] -@pytest.fixture -def zarr_tree_and_datatree_instance(): - fn = os.path.join(here, "gfs.t00z.pgrb2.0p25.f006.test-limit-100") - tree_store = tree_store = grib_tree(scan_grib(fn)) - dt_instance = datatree.open_datatree( - fsspec.filesystem("reference", fo=tree_store).get_mapper(""), - engine="zarr", - consolidated=False, - ) +# @pytest.fixture +# def zarr_tree_and_datatree_instance(): +# fn = os.path.join(here, "gfs.t00z.pgrb2.0p25.f006.test-limit-100") +# tree_store = tree_store = grib_tree(scan_grib(fn)) +# dt_instance = datatree.open_datatree( +# fsspec.filesystem("reference", fo=tree_store).get_mapper(""), +# engine="zarr", +# consolidated=False, +# ) - return tree_store, dt_instance, fn +# return tree_store, dt_instance, fn def test_extract_dataset_chunk_index(zarr_tree_and_datatree_instance): From 187ced261feeda286fae65dbe8dda7e9b3da7c7c Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Wed, 23 Oct 2024 10:04:22 -0400 Subject: [PATCH 19/22] Add back commented out code --- kerchunk/tests/test_grib.py | 56 ++++++++++++++++++------------------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/kerchunk/tests/test_grib.py b/kerchunk/tests/test_grib.py index 74f24a6d..f0e58f9d 100644 --- a/kerchunk/tests/test_grib.py +++ b/kerchunk/tests/test_grib.py @@ -6,7 +6,7 @@ import pandas as pd import pytest import xarray as xr -#import datatree +import datatree import zarr import ujson from kerchunk.grib2 import ( @@ -75,7 +75,7 @@ def test_archives(tmpdir, url): ours = xr.open_zarr( store, zarr_format=2, - consolidated=False, + consolidated=False ) data = _fetch_first(url) @@ -263,22 +263,22 @@ def test_hrrr_sfcf_grib_tree(): assert zg.u.instant.isobaricInhPa.time.shape == (1,) -# def test_hrrr_sfcf_grib_datatree(): -# fpath = os.path.join(here, "hrrr.wrfsfcf.subset.json") -# with open(fpath, "rb") as fobj: -# scanned_msgs = ujson.load(fobj) -# merged = grib_tree(scanned_msgs) -# dt = datatree.open_datatree( -# fsspec.filesystem("reference", fo=merged).get_mapper(""), -# engine="zarr", -# consolidated=False, -# ) -# # Assert a few things... but if it loads we are mostly done. -# np.testing.assert_array_equal( -# dt.u.instant.heightAboveGround.step.values[:], -# np.array([0, 3600 * 10**9], dtype="timedelta64[ns]"), -# ) -# assert dt.u.attrs == dict(name="U component of wind") +def test_hrrr_sfcf_grib_datatree(): + fpath = os.path.join(here, "hrrr.wrfsfcf.subset.json") + with open(fpath, "rb") as fobj: + scanned_msgs = ujson.load(fobj) + merged = grib_tree(scanned_msgs) + dt = datatree.open_datatree( + fsspec.filesystem("reference", fo=merged).get_mapper(""), + engine="zarr", + consolidated=False, + ) + # Assert a few things... but if it loads we are mostly done. + np.testing.assert_array_equal( + dt.u.instant.heightAboveGround.step.values[:], + np.array([0, 3600 * 10**9], dtype="timedelta64[ns]"), + ) + assert dt.u.attrs == dict(name="U component of wind") def test_parse_grib_idx_invalid_url(): @@ -342,17 +342,17 @@ def test_parse_grib_idx_content(idx_url, storage_options): assert idx_df.iloc[message_no]["length"] == output[message_no]["refs"][variable][2] -# @pytest.fixture -# def zarr_tree_and_datatree_instance(): -# fn = os.path.join(here, "gfs.t00z.pgrb2.0p25.f006.test-limit-100") -# tree_store = tree_store = grib_tree(scan_grib(fn)) -# dt_instance = datatree.open_datatree( -# fsspec.filesystem("reference", fo=tree_store).get_mapper(""), -# engine="zarr", -# consolidated=False, -# ) +@pytest.fixture +def zarr_tree_and_datatree_instance(): + fn = os.path.join(here, "gfs.t00z.pgrb2.0p25.f006.test-limit-100") + tree_store = tree_store = grib_tree(scan_grib(fn)) + dt_instance = datatree.open_datatree( + fsspec.filesystem("reference", fo=tree_store).get_mapper(""), + engine="zarr", + consolidated=False, + ) -# return tree_store, dt_instance, fn + return tree_store, dt_instance, fn def test_extract_dataset_chunk_index(zarr_tree_and_datatree_instance): From 690ed21922cd4255eb39a795674bf38372c87427 Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Wed, 23 Oct 2024 11:28:58 -0400 Subject: [PATCH 20/22] Make grib codec a compressor since its bytes to array --- kerchunk/grib2.py | 4 +-- kerchunk/tests/test_grib.py | 54 ++++++++++++++++++------------------- 2 files changed, 29 insertions(+), 29 deletions(-) diff --git a/kerchunk/grib2.py b/kerchunk/grib2.py index e4e64bf3..eb796e2e 100644 --- a/kerchunk/grib2.py +++ b/kerchunk/grib2.py @@ -97,8 +97,8 @@ def _store_array(store, z, data, var, inline_threshold, offset, size, attr): chunks=shape, dtype=data.dtype, fill_value=attr.get("missingValue", None), - filters=[GRIBCodec(var=var, dtype=str(data.dtype))], - compressor=None, + filters=[], + compressor=GRIBCodec(var=var, dtype=str(data.dtype)), ) store[f"{var}/" + ".".join(["0"] * len(shape))] = ["{{u}}", offset, size] d.attrs.update(attr) diff --git a/kerchunk/tests/test_grib.py b/kerchunk/tests/test_grib.py index f0e58f9d..7d9cf32b 100644 --- a/kerchunk/tests/test_grib.py +++ b/kerchunk/tests/test_grib.py @@ -6,7 +6,7 @@ import pandas as pd import pytest import xarray as xr -import datatree +#import datatree import zarr import ujson from kerchunk.grib2 import ( @@ -263,22 +263,22 @@ def test_hrrr_sfcf_grib_tree(): assert zg.u.instant.isobaricInhPa.time.shape == (1,) -def test_hrrr_sfcf_grib_datatree(): - fpath = os.path.join(here, "hrrr.wrfsfcf.subset.json") - with open(fpath, "rb") as fobj: - scanned_msgs = ujson.load(fobj) - merged = grib_tree(scanned_msgs) - dt = datatree.open_datatree( - fsspec.filesystem("reference", fo=merged).get_mapper(""), - engine="zarr", - consolidated=False, - ) - # Assert a few things... but if it loads we are mostly done. - np.testing.assert_array_equal( - dt.u.instant.heightAboveGround.step.values[:], - np.array([0, 3600 * 10**9], dtype="timedelta64[ns]"), - ) - assert dt.u.attrs == dict(name="U component of wind") +# def test_hrrr_sfcf_grib_datatree(): +# fpath = os.path.join(here, "hrrr.wrfsfcf.subset.json") +# with open(fpath, "rb") as fobj: +# scanned_msgs = ujson.load(fobj) +# merged = grib_tree(scanned_msgs) +# dt = datatree.open_datatree( +# fsspec.filesystem("reference", fo=merged).get_mapper(""), +# engine="zarr", +# consolidated=False, +# ) +# # Assert a few things... but if it loads we are mostly done. +# np.testing.assert_array_equal( +# dt.u.instant.heightAboveGround.step.values[:], +# np.array([0, 3600 * 10**9], dtype="timedelta64[ns]"), +# ) +# assert dt.u.attrs == dict(name="U component of wind") def test_parse_grib_idx_invalid_url(): @@ -342,17 +342,17 @@ def test_parse_grib_idx_content(idx_url, storage_options): assert idx_df.iloc[message_no]["length"] == output[message_no]["refs"][variable][2] -@pytest.fixture -def zarr_tree_and_datatree_instance(): - fn = os.path.join(here, "gfs.t00z.pgrb2.0p25.f006.test-limit-100") - tree_store = tree_store = grib_tree(scan_grib(fn)) - dt_instance = datatree.open_datatree( - fsspec.filesystem("reference", fo=tree_store).get_mapper(""), - engine="zarr", - consolidated=False, - ) +# @pytest.fixture +# def zarr_tree_and_datatree_instance(): +# fn = os.path.join(here, "gfs.t00z.pgrb2.0p25.f006.test-limit-100") +# tree_store = tree_store = grib_tree(scan_grib(fn)) +# dt_instance = datatree.open_datatree( +# fsspec.filesystem("reference", fo=tree_store).get_mapper(""), +# engine="zarr", +# consolidated=False, +# ) - return tree_store, dt_instance, fn +# return tree_store, dt_instance, fn def test_extract_dataset_chunk_index(zarr_tree_and_datatree_instance): From 5019b154903199514a0484f71f625971879defe6 Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Wed, 23 Oct 2024 11:36:59 -0400 Subject: [PATCH 21/22] Switch back --- kerchunk/grib2.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kerchunk/grib2.py b/kerchunk/grib2.py index eb796e2e..e4e64bf3 100644 --- a/kerchunk/grib2.py +++ b/kerchunk/grib2.py @@ -97,8 +97,8 @@ def _store_array(store, z, data, var, inline_threshold, offset, size, attr): chunks=shape, dtype=data.dtype, fill_value=attr.get("missingValue", None), - filters=[], - compressor=GRIBCodec(var=var, dtype=str(data.dtype)), + filters=[GRIBCodec(var=var, dtype=str(data.dtype))], + compressor=None, ) store[f"{var}/" + ".".join(["0"] * len(shape))] = ["{{u}}", offset, size] d.attrs.update(attr) From d96cf469c3beca0ac28df23d2f96ec831d169069 Mon Sep 17 00:00:00 2001 From: Matthew Iannucci Date: Sat, 26 Oct 2024 16:42:03 -0400 Subject: [PATCH 22/22] Add first pass at grib zarr 3 codec --- kerchunk/codecs.py | 87 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 84 insertions(+), 3 deletions(-) diff --git a/kerchunk/codecs.py b/kerchunk/codecs.py index 852076ea..4804423e 100644 --- a/kerchunk/codecs.py +++ b/kerchunk/codecs.py @@ -1,11 +1,22 @@ import ast +from dataclasses import dataclass import io +from typing import TYPE_CHECKING import numcodecs from numcodecs.abc import Codec import numpy as np import threading import zlib +from zarr.abc.codec import ArrayBytesCodec +from zarr.core.buffer import Buffer, NDArrayLike, NDBuffer +from zarr.core.common import JSON, parse_enum, parse_named_configuration +from zarr.registry import register_codec + +if TYPE_CHECKING: + from typing import Self + + from zarr.core.array_spec import ArraySpec class FillStringsCodec(Codec): @@ -115,6 +126,78 @@ def decode(self, buf, out=None): numcodecs.register_codec(GRIBCodec, "grib") +@dataclass(frozen=True) +class GRIBZarrCodec(ArrayBytesCodec): + eclock = threading.RLock() + + var: str + dtype: np.dtype + + def __init__(self, *, var: str, dtype: np.dtype) -> None: + object.__setattr__(self, "var", var) + object.__setattr__(self, "dtype", dtype) + + @classmethod + def from_dict(cls, data: dict[str, JSON]) -> Self: + _, configuration_parsed = parse_named_configuration( + data, "bytes", require_configuration=True + ) + configuration_parsed = configuration_parsed or {} + return cls(**configuration_parsed) # type: ignore[arg-type] + + def to_dict(self) -> dict[str, JSON]: + if self.endian is None: + return {"name": "grib"} + else: + return { + "name": "grib", + "configuration": {"var": self.var, "dtype": self.dtype}, + } + + async def _decode_single( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> NDBuffer: + assert isinstance(chunk_bytes, Buffer) + import eccodes + + if self.var in ["latitude", "longitude"]: + var = self.var + "s" + dt = self.dtype or "float64" + else: + var = "values" + dt = self.dtype or "float32" + + with self.eclock: + mid = eccodes.codes_new_from_message(chunk_bytes.to_bytes()) + try: + data = eccodes.codes_get_array(mid, var) + missingValue = eccodes.codes_get_string(mid, "missingValue") + if var == "values" and missingValue: + data[data == float(missingValue)] = np.nan + return data.astype(dt, copy=False) + + finally: + eccodes.codes_release(mid) + + async def _encode_single( + self, + chunk_array: NDBuffer, + chunk_spec: ArraySpec, + ) -> Buffer | None: + # This is a one way codec + raise NotImplementedError + + def compute_encoded_size( + self, input_byte_length: int, _chunk_spec: ArraySpec + ) -> int: + raise NotImplementedError + + +register_codec("grib", GRIBZarrCodec) + + class AsciiTableCodec(numcodecs.abc.Codec): """Decodes ASCII-TABLE extensions in FITS files""" @@ -166,7 +249,6 @@ def decode(self, buf, out=None): arr2 = np.empty((self.nrow,), dtype=dt_out) heap = buf[arr.nbytes :] for name in dt_out.names: - if dt_out[name] == "O": dt = np.dtype(self.ftypes[self.types[name]]) counts = arr[name][:, 0] @@ -244,8 +326,7 @@ def encode(self, buf): class ZlibCodec(Codec): codec_id = "zlib" - def __init__(self): - ... + def __init__(self): ... def decode(self, data, out=None): if out: