Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zarr-python v3 compatibility #516

Draft
wants to merge 22 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions kerchunk/combine.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def append(
ds = xr.open_dataset(
fs.get_mapper(), engine="zarr", backend_kwargs={"consolidated": False}
)
z = zarr.open(fs.get_mapper())
z = zarr.open(fs.get_mapper(), zarr_format=2)
mzz = MultiZarrToZarr(
path,
out=fs.references, # dict or parquet/lazy
Expand Down Expand Up @@ -360,7 +360,7 @@ def first_pass(self):
fs._dircache_from_items()

logger.debug("First pass: %s", i)
z = zarr.open_group(fs.get_mapper(""))
z = zarr.open_group(fs.get_mapper(""), zarr_format=2)
for var in self.concat_dims:
value = self._get_value(i, z, var, fn=self._paths[i])
if isinstance(value, np.ndarray):
Expand All @@ -387,7 +387,7 @@ def store_coords(self):
"""
kv = {}
store = zarr.storage.KVStore(kv)
group = zarr.open(store)
group = zarr.open(store, zarr_format=2)
m = self.fss[0].get_mapper("")
z = zarr.open(m)
for k, v in self.coos.items():
Expand Down Expand Up @@ -461,7 +461,7 @@ def second_pass(self):
for i, fs in enumerate(self.fss):
to_download = {}
m = fs.get_mapper("")
z = zarr.open(m)
z = zarr.open(m, zarr_format=2)

if no_deps is None:
# done first time only
Expand Down
2 changes: 1 addition & 1 deletion kerchunk/fits.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def process_file(

storage_options = storage_options or {}
out = out or {}
g = zarr.open(out)
g = zarr.open(out, zarr_format=2)

with fsspec.open(url, mode="rb", **storage_options) as f:
infile = fits.open(f, do_not_scale_image_data=True)
Expand Down
4 changes: 2 additions & 2 deletions kerchunk/grib2.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def scan_grib(
if good is False:
continue

z = zarr.open_group(store)
z = zarr.open_group(store, zarr_format=2)
global_attrs = {
f"GRIB_{k}": m[k]
for k in cfgrib.dataset.GLOBAL_ATTRIBUTES_KEYS
Expand Down Expand Up @@ -398,7 +398,7 @@ def grib_tree(

# TODO allow passing a LazyReferenceMapper as output?
zarr_store = {}
zroot = zarr.open_group(store=zarr_store)
zroot = zarr.open_group(store=zarr_store, zarr_format=2)

aggregations: Dict[str, List] = defaultdict(list)
aggregation_dims: Dict[str, Set] = defaultdict(set)
Expand Down
104 changes: 84 additions & 20 deletions kerchunk/hdf.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import base64
import io
import logging
from typing import Union, BinaryIO
from typing import Union, BinaryIO, Any, cast
from packaging.version import Version

import fsspec.core
from fsspec.implementations.reference import LazyReferenceMapper
Expand All @@ -21,11 +22,11 @@
"for more details."
)

try:
from zarr.meta import encode_fill_value
except ModuleNotFoundError:
# https://github.com/zarr-developers/zarr-python/issues/2021
from zarr.v2.meta import encode_fill_value
# try:
# from zarr.meta import encode_fill_value
# except ModuleNotFoundError:
# # https://github.com/zarr-developers/zarr-python/issues/2021
# from zarr.v2.meta import encode_fill_value

lggr = logging.getLogger("h5-to-zarr")
_HIDDEN_ATTRS = { # from h5netcdf.attrs
Expand Down Expand Up @@ -111,9 +112,14 @@ def __init__(
if vlen_encode not in ["embed", "null", "leave", "encode"]:
raise NotImplementedError
self.vlen = vlen_encode
self.store = out or {}
self._zroot = zarr.group(store=self.store, overwrite=True)

self.store_dict = out or {}
if Version(zarr.__version__) < Version("3.0.0.a0"):
mpiannucci marked this conversation as resolved.
Show resolved Hide resolved
self.store = zarr.storage.KVStore(self.store_dict)
self._zroot = zarr.group(store=self.store, overwrite=True)
else:
self.store = zarr.storage.MemoryStore(mode="a", store_dict=self.store_dict)
self._zroot = zarr.group(store=self.store, zarr_format=2, overwrite=True)

self._uri = url
self.error = error
lggr.debug(f"HDF5 file URI: {self._uri}")
Expand All @@ -140,7 +146,6 @@ def translate(self, preserve_linked_dsets=False):
"""
lggr.debug("Translation begins")
self._transfer_attrs(self._h5f, self._zroot)

self._h5f.visititems(self._translator)

if preserve_linked_dsets:
Expand All @@ -157,7 +162,17 @@ def translate(self, preserve_linked_dsets=False):
self.store.flush()
return self.store
else:
store = _encode_for_JSON(self.store)
keys_to_remove = []
new_keys = {}
for k, v in self.store_dict.items():
if isinstance(v, zarr.core.buffer.cpu.Buffer):
key = str.removeprefix(k, "/")
new_keys[key] = v.to_bytes()
keys_to_remove.append(k)
for k in keys_to_remove:
del self.store_dict[k]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the hacky bit and could use some explanations. Even when requesting "v2", zarr makes Buffer objects, and the keys are also wrong?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah so two issues here:

  1. the keys we get from hdf are for example /depth/.zarray when then need to be depth/.zarray
  2. we cant jsonify buffers, which is how the internal MemoryStore in v3 stores its data. So we need to convert the buffers to bytes to be serialized

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK - would appreciate comments on the code saying this.

self.store_dict.update(new_keys)
store = _encode_for_JSON(self.store_dict)
return {"version": 1, "refs": store}

def _unref(self, ref):
Expand Down Expand Up @@ -465,26 +480,31 @@ def _translator(
if h5py.h5ds.is_scale(h5obj.id) and not cinfo:
return
if h5obj.attrs.get("_FillValue") is not None:
fill = h5obj.attrs.get("_FillValue")
fill = encode_fill_value(
h5obj.attrs.get("_FillValue"), dt or h5obj.dtype
)

# Create a Zarr array equivalent to this HDF5 dataset...
za = self._zroot.require_dataset(
h5obj.name,
adims = self._get_array_dims(h5obj)

# Create a Zarr array equivalent to this HDF5 dataset..
za = self._zroot.require_array(
name=h5obj.name,
shape=h5obj.shape,
dtype=dt or h5obj.dtype,
chunks=h5obj.chunks or False,
fill_value=fill,
compression=None,
compressor=None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here, you could reintroduce the compressor

filters = filters[:-1]
compressor = filters[-1]

but obviously it depends on whether there are indeed any filters at all.

It would still need back compat, since filters-only datasts definitely exist.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah the big issue is that v3 cares about what type of operation it is, and v2w doesnt so moving them around doesnt necessarily fix that bug

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there needs to be a change upstream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filters=filters,
overwrite=True,
attributes={
"_ARRAY_DIMENSIONS": adims,
},
**kwargs,
)
lggr.debug(f"Created Zarr array: {za}")
self._transfer_attrs(h5obj, za)
adims = self._get_array_dims(h5obj)
za.attrs["_ARRAY_DIMENSIONS"] = adims

# za.attrs["_ARRAY_DIMENSIONS"] = adims
lggr.debug(f"_ARRAY_DIMENSIONS = {adims}")

if "data" in kwargs:
Expand All @@ -496,6 +516,8 @@ def _translator(
if h5obj.fletcher32:
logging.info("Discarding fletcher32 checksum")
v["size"] -= 4
key = str.removeprefix(h5obj.name, "/") + "/" + ".".join(map(str, k))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same as what _chunk_key did? Maybe make it a function with a comment saying it's a copy/reimplementation.

By the way, is h5obj.name not actually a string, so you could have done h5obj.name.removeprefix()?


if (
self.inline
and isinstance(v, dict)
Expand All @@ -508,9 +530,10 @@ def _translator(
data.decode("ascii")
except UnicodeDecodeError:
data = b"base64:" + base64.b64encode(data)
self.store[za._chunk_key(k)] = data

self.store_dict[key] = data
else:
self.store[za._chunk_key(k)] = [
self.store_dict[key] = [
self._uri,
v["offset"],
v["size"],
Expand Down Expand Up @@ -681,3 +704,44 @@ def _is_netcdf_variable(dataset: h5py.Dataset):

def has_visititems_links():
return hasattr(h5py.Group, "visititems_links")


def encode_fill_value(v: Any, dtype: np.dtype, object_codec: Any = None) -> Any:
mpiannucci marked this conversation as resolved.
Show resolved Hide resolved
# early out
if v is None:
return v
if dtype.kind == "V" and dtype.hasobject:
if object_codec is None:
raise ValueError("missing object_codec for object array")
v = object_codec.encode(v)
v = str(base64.standard_b64encode(v), "ascii")
return v
if dtype.kind == "f":
if np.isnan(v):
return "NaN"
elif np.isposinf(v):
return "Infinity"
elif np.isneginf(v):
return "-Infinity"
else:
return float(v)
elif dtype.kind in "ui":
return int(v)
elif dtype.kind == "b":
return bool(v)
elif dtype.kind in "c":
c = cast(np.complex128, np.dtype(complex).type())
v = (
encode_fill_value(v.real, c.real.dtype, object_codec),
encode_fill_value(v.imag, c.imag.dtype, object_codec),
)
return v
elif dtype.kind in "SV":
v = str(base64.standard_b64encode(v), "ascii")
return v
elif dtype.kind == "U":
return v
elif dtype.kind in "mM":
return int(v.view("i8"))
else:
return v
2 changes: 1 addition & 1 deletion kerchunk/hdf4.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def translate(self, filename=None, storage_options=None):
remote_protocol=prot,
remote_options=self.st,
)
g = zarr.open_group("reference://", storage_options=dict(fs=fs))
g = zarr.open_group("reference://", storage_options=dict(fs=fs), zarr_format=2)
refs = {}
for k, v in output.items():
if isinstance(v, dict):
Expand Down
2 changes: 1 addition & 1 deletion kerchunk/netCDF3.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def translate(self):
import zarr

out = self.out
z = zarr.open(out, mode="w")
z = zarr.open(out, mode="w", zarr_format=2)
for dim, var in self.variables.items():
if dim in self.chunks:
shape = self.chunks[dim][-1]
Expand Down
6 changes: 3 additions & 3 deletions kerchunk/tests/test_combine.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,14 @@

# simple time arrays - xarray can't make these!
m = fs.get_mapper("time1.zarr")
z = zarr.open(m, mode="w")
z = zarr.open(m, mode="w", zarr_format=2)
ar = z.create_dataset("time", data=np.array([1], dtype="M8[s]"))
ar.attrs.update({"_ARRAY_DIMENSIONS": ["time"]})
ar = z.create_dataset("data", data=arr)
ar.attrs.update({"_ARRAY_DIMENSIONS": ["time", "x", "y"]})

m = fs.get_mapper("time2.zarr")
z = zarr.open(m, mode="w")
z = zarr.open(m, mode="w", zarr_format=2)
ar = z.create_dataset("time", data=np.array([2], dtype="M8[s]"))
ar.attrs.update({"_ARRAY_DIMENSIONS": ["time"]})
ar = z.create_dataset("data", data=arr)
Expand Down Expand Up @@ -272,7 +272,7 @@ def test_get_coos(refs, selector, expected):
mzz.first_pass()
assert mzz.coos["time"].tolist() == expected
mzz.store_coords()
g = zarr.open(mzz.out)
g = zarr.open(mzz.out, zarr_format=2)
assert g["time"][:].tolist() == expected
assert dict(g.attrs)

Expand Down
20 changes: 10 additions & 10 deletions kerchunk/tests/test_combine_concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def test_success(tmpdir, arrays, chunks, axis, m):
refs = []
for i, x in enumerate(arrays):
fn = f"{tmpdir}/out{i}.zarr"
g = zarr.open(fn)
g = zarr.open(fn, zarr_format=2)
g.create_dataset("x", data=x, chunks=chunks)
fns.append(fn)
ref = kerchunk.zarr.single_zarr(fn, inline=0)
Expand All @@ -62,7 +62,7 @@ def test_success(tmpdir, arrays, chunks, axis, m):
)

mapper = fsspec.get_mapper("reference://", fo=out)
g = zarr.open(mapper)
g = zarr.open(mapper, zarr_format=2)
assert (g.x[:] == np.concatenate(arrays, axis=axis)).all()

try:
Expand All @@ -76,7 +76,7 @@ def test_success(tmpdir, arrays, chunks, axis, m):
remote_protocol="file",
skip_instance_cache=True,
)
g = zarr.open(mapper)
g = zarr.open(mapper, zarr_format=2)
assert (g.x[:] == np.concatenate(arrays, axis=axis)).all()

kerchunk.df.refs_to_dataframe(out, "memory://out.parq", record_size=1)
Expand All @@ -86,7 +86,7 @@ def test_success(tmpdir, arrays, chunks, axis, m):
remote_protocol="file",
skip_instance_cache=True,
)
g = zarr.open(mapper)
g = zarr.open(mapper, zarr_format=2)
assert (g.x[:] == np.concatenate(arrays, axis=axis)).all()


Expand All @@ -95,9 +95,9 @@ def test_fail_chunks(tmpdir):
fn2 = f"{tmpdir}/out2.zarr"
x1 = np.arange(10)
x2 = np.arange(10, 20)
g = zarr.open(fn1)
g = zarr.open(fn1, zarr_format=2)
g.create_dataset("x", data=x1, chunks=(2,))
g = zarr.open(fn2)
g = zarr.open(fn2, zarr_format=2)
g.create_dataset("x", data=x2, chunks=(3,))

ref1 = kerchunk.zarr.single_zarr(fn1, inline=0)
Expand All @@ -112,9 +112,9 @@ def test_fail_shape(tmpdir):
fn2 = f"{tmpdir}/out2.zarr"
x1 = np.arange(12).reshape(6, 2)
x2 = np.arange(12, 24)
g = zarr.open(fn1)
g = zarr.open(fn1, zarr_format=2)
g.create_dataset("x", data=x1, chunks=(2,))
g = zarr.open(fn2)
g = zarr.open(fn2, zarr_format=2)
g.create_dataset("x", data=x2, chunks=(2,))

ref1 = kerchunk.zarr.single_zarr(fn1, inline=0)
Expand All @@ -129,9 +129,9 @@ def test_fail_irregular_chunk_boundaries(tmpdir):
fn2 = f"{tmpdir}/out2.zarr"
x1 = np.arange(10)
x2 = np.arange(10, 24)
g = zarr.open(fn1)
g = zarr.open(fn1, zarr_format=2)
g.create_dataset("x", data=x1, chunks=(4,))
g = zarr.open(fn2)
g = zarr.open(fn2, zarr_format=2)
g.create_dataset("x", data=x2, chunks=(4,))

ref1 = kerchunk.zarr.single_zarr(fn1, inline=0)
Expand Down
Loading
Loading