+
+ +

Source code for kerchunk.hdf4

+import fsspec
+import numpy as np
+import ujson
+
+
+decoders = {}
+
+
+def reg(name):
+    def f(func):
+        decoders[name] = func
+        return func
+
+    return f
+
+
+
+[docs] +class HDF4ToZarr: + """Experimental: interface to HDF4 archival files""" + +
+[docs] + def __init__( + self, + path, + storage_options=None, + inline_threshold=100, + out=None, + ): + self.path = path + self.st = storage_options + self.thresh = inline_threshold + self.out = out or {}
+ + + def read_int(self, n): + return int.from_bytes(self.f.read(n), "big") + + def read_ddh(self): + return {"ndd": self.read_int(2), "next": self.read_int(4)} + + def read_dd(self): + loc = self.f.tell() + i = int.from_bytes(self.f.read(2), "big") + if i & 0x4000: + extended = True + i = i - 0x4000 + else: + extended = False + tag = tags.get(i, i) + no_data = tag not in {"NULL"} + ref = (tag, int.from_bytes(self.f.read(2), "big")) + info = { + "offset": int.from_bytes(self.f.read(4), "big") * no_data, + "length": int.from_bytes(self.f.read(4), "big") * no_data, + "extended": extended, + "loc": loc, + } + return ref, info + + def decode(self, tag, info): + self.f.seek(info["offset"]) + ident = lambda _, __: info + return decoders.get(tag, ident)(self, info) + +
+[docs] + def translate(self, filename=None, storage_options=None): + """Scan and return references + + Parameters + ---------- + filename: if given, write to this as JSON + storage_options: to interpret filename + + Returns + ------- + references + """ + import zarr + from kerchunk.codecs import ZlibCodec + + fo = fsspec.open(self.path, **(self.st or {})) + self.f = fo.open() + + # magic header + assert self.f.read(4) == b"\x0e\x03\x13\x01" + + # all the data descriptors in a linked list + self.tags = {} + while True: + ddh = self.read_ddh() + + for _ in range(ddh["ndd"]): + ident, info = self.read_dd() + self.tags[ident] = info + if ddh["next"] == 0: + # "finished" sentry + break + # or continue + self.f.seek(ddh["next"]) + + # basic decode + for tag, ref in self.tags: + self._dec(tag, ref) + + # global attributes + attrs = {} + for (tag, ref), info in self.tags.items(): + if tag == "VH" and info["names"][0].upper() == "VALUES": + # dtype = dtypes[info["types"][0]] + inf2 = self.tags[("VS", ref)] + self.f.seek(inf2["offset"]) + data = self.f.read(inf2["length"]) + # NASA conventions + if info["name"].startswith(("CoreMetadata.", "ArchiveMetadata.")): + obj = None + for line in data.decode().split("\n"): + if "OBJECT" in line: + obj = line.split()[-1] + if "VALUE" in line: + attrs[obj] = line.split()[-1].lstrip('"').rstrip('"') + + # there should be only one root, and it's probably the last VG + # so maybe this loop isn't needed + roots = set() + children = set() + child = {} + for (tag, ref), info in self.tags.items(): + if tag == "VG": + here = child.setdefault((tag, ref), set()) + for t, r in zip(info["tag"], info["refs"]): + if t == "VG": + children.add((t, r)) + roots.discard((t, r)) + here.add((t, r)) + if tag not in children: + roots.add((tag, ref)) + + # hierarchical output + output = self._descend_vg(*list(roots)[0]) + prot = fo.fs.protocol + prot = prot[0] if isinstance(prot, tuple) else prot + fs = fsspec.filesystem( + "reference", + fo=self.out, + remote_protocol=prot, + remote_options=self.st, + ) + g = zarr.open_group("reference://", storage_options=dict(fs=fs)) + refs = {} + for k, v in output.items(): + if isinstance(v, dict): + compression = ZlibCodec() if "refs" in v else None + arr = g.create_dataset( + name=k, + shape=v["dims"], + dtype=v["dtype"], + chunks=v.get("chunks", v["dims"]), + compressor=compression, + overwrite=True, + ) + arr.attrs.update( + dict( + _ARRAY_DIMENSIONS=[f"{k}_x", f"{k}_y"][: len(v["dims"])] + if "refs" in v + else ["0"], + **{ + i: j + for i, j in v.items() + if i not in {"chunk", "dims", "dtype", "refs"} + }, + ) + ) + for r in v.get("refs", []): + refs[f"{k}/{r[0]}"] = [self.path, r[1], r[2]] + else: + attrs[k] = v + fs.references.update(refs) + g.attrs.update(attrs) + + if filename is None: + return fs.references + with fsspec.open(filename, **(storage_options or {})) as f: + ujson.dumps(dict(fs.references), f)
+ + + def _descend_vg(self, tag, ref): + info = self.tags[(tag, ref)] + out = {} + for t, r in zip(info["tag"], info["refs"]): + inf2 = self.tags[(t, r)] + if t == "VG": + tmp = self._descend_vg(t, r) + if list(tmp)[0] == inf2["name"]: + tmp = tmp[inf2["name"]] + out[inf2["name"]] = tmp + elif t == "VH": + if len(inf2["names"]) == 1 and inf2["names"][0].lower() == "values": + dtype = dtypes[inf2["types"][0]] + name = inf2["name"] + inf2 = self.tags[("VS", r)] + self.f.seek(inf2["offset"]) + data = self.f.read(inf2["length"]) + if dtype == "str": + out[name] = data.decode().lstrip('"').rstrip('"') # decode() ? + else: + out[name] = np.frombuffer(data, dtype)[0] + elif t == "NT": + out["dtype"] = inf2["typ"] + elif t == "SD": + out["refs"] = inf2["data"][:-1] + out["chunks"] = [_["chunk_length"] for _ in inf2["data"][-1]] + elif t == "SDD": + out["dims"] = inf2["dims"] + else: + # NDGs contain same info as NT, SD and SDD + pass + return out + + def _dec(self, tag, ref): + info = self.tags[(tag, ref)] + if not set(info) - {"length", "offset", "extended", "loc"}: + self.f.seek(info["offset"]) + if info["extended"]: + info["data"] = self._dec_extended() + else: + info.update(self.decode(tag, info)) + return info + + def _dec_extended(self): + ext_type = spec[self.read_int(2)] + if ext_type == "CHUNKED": + return self._dec_chunked() + elif ext_type == "LINKED": + return self._dec_linked_header() + elif ext_type == "COMP": + return self._dec_comp() + + def _dec_linked_header(self): + # get the bytes of a linked set - these will always be inlined + self.read_int(4) # length + self.read_int(4) # blk_len + self.read_int(4) # num_blk + next_ref = self.read_int(2) + out = [] + while next_ref: + next_ref, data = self._dec_linked_block(self.tags[("LINKED", next_ref)]) + out.extend([d for d in data if d]) + bits = [] + for ref in out: + info = self.tags[("LINKED", ref)] + self.f.seek(info["offset"]) + bits.append(self.f.read(info["length"])) + return b"".join(bits) + + def _dec_linked_block(self, block): + self.f.seek(block["offset"]) + next_ref = self.read_int(2) + refs = [self.read_int(2) for _ in range((block["length"] // 2) - 1)] + return next_ref, refs + + def _dec_chunked(self): + # we want to turn the chunks table into references + # tag_head_len = self.read_int(4) + # version = self.f.read(1)[0] + # flag = self.read_int(4) + # elem_tot_len = self.read_int(4) + # chunk_size = self.read_int(4) + # nt_size = self.read_int(4) + self.f.seek(21, 1) + chk_tbl_tag = tags[self.read_int(2)] # should be VH + chk_tbl_ref = self.read_int(2) + self.read_int(2) # sp_tab = tags[self.read_int(2)] + self.read_int(2) # sp_ref + ndims = self.read_int(4) + + dims = [ # we don't use these, could skip + { + "flag": self.read_int(4), + "dim_length": self.read_int(4), + "chunk_length": self.read_int(4), + } + for _ in range(ndims) + ] + self.f.read( # fill_value + self.read_int(4) + ) # to be interpreted as a number later; but chunk table probs has no fill + # self.f.seek(12*ndims + 4, 1) # if skipping + + header = self._dec(chk_tbl_tag, chk_tbl_ref) + data = self._dec("VS", chk_tbl_ref)["data"] # corresponding table + + # header gives the field pattern for the rows of data, one per chunk + # maybe faster to use struct and iter than numpy, since we iterate anyway + dt = [(f"ind{i}", ">u4") for i in range(ndims)] + [ + ("tag", ">u2"), + ("ref", ">u2"), + ] + rows = np.frombuffer(data, dtype=dt, count=header["nvert"]) + # rows["tag"] should always be 61 -> CHUNK + refs = [] + for *ind, tag, ref in rows: + # maybe ind needs reversing since everything is FORTRAN + chunk_tag = self.tags[("CHUNK", ref)] + if chunk_tag["extended"]: + self.f.seek(chunk_tag["offset"]) + # these are always COMP? + ctype, offset, length = self._dec_extended() + refs.append([".".join(str(_) for _ in ind), offset, length, ctype]) + else: + refs.append( + [ + ".".join(str(_) for _ in ind), + chunk_tag["offset"], + chunk_tag["length"], + ] + ) + refs.append(dims) + return refs + + def _dec_comp(self): + # version = self.read_int(2) # always 0 + # len_uncomp = self.read_int(4) + self.f.seek(6, 1) + + data_ref = self.read_int(2) + # model = self.read_int(2) # always 0 + ctype = "DEFLATE" # comp[self.read_int(2)] + tag = self.tags[("COMPRESSED", data_ref)] + return ctype, tag["offset"], tag["length"]
+ + + +@reg("NDG") +def _dec_ndg(self, info): + # links together these things as a Data Group + return { + "tags": [ + (tags[self.read_int(2)], self.read_int(2)) + for _ in range(0, info["length"], 4) + ] + } + + +@reg("SDD") +def _dec_sdd(self, info): + rank = self.read_int(2) + dims = [self.read_int(4) for _ in range(rank)] + data_tag = (tags[self.read_int(2)], self.read_int(2)) + scale_tags = [(tags[self.read_int(2)], self.read_int(2)) for _ in range(rank)] + return _pl(locals()) + + +@reg("VERSION") +def _dec_version(self, info): + return { + "major": self.read_int(4), + "minor": self.read_int(4), + "release": self.read_int(4), + "string:": _null_str(self.f.read(info["length"] - 10).decode()), + } + + +@reg("VH") +def _dec_vh(self, info): + # virtual group ("table") header + interface = self.read_int(2) + nvert = self.read_int(4) + ivsize = self.read_int(2) + nfields = self.read_int(2) + types = [self.read_int(2) for _ in range(nfields)] + isize = [self.read_int(2) for _ in range(nfields)] + offsets = [self.read_int(2) for _ in range(nfields)] + order = [self.read_int(2) for _ in range(nfields)] + names = [self.f.read(self.read_int(2)).decode() for _ in range(nfields)] + namelen = self.read_int(2) + name = self.f.read(namelen).decode() + classlen = self.read_int(2) + cls = self.f.read(classlen).decode() + ref = (self.read_int(2), self.read_int(2)) + return _pl(locals()) + + +@reg("VG") +def _dec_vg(self, info): + nelt = self.read_int(2) + tag = [tags[self.read_int(2)] for _ in range(nelt)] + refs = [self.read_int(2) for _ in range(nelt)] + name = self.f.read(self.read_int(2)).decode() + cls = self.f.read(self.read_int(2)).decode() + return _pl(locals()) + + +@reg("NT") +def _dec_nt(self, info): + version, typ, width, cls = list(self.f.read(4)) + typ = dtypes[typ] + return _pl(locals()) + + +def _null_str(s): + return s.split("\00", 1)[0] + + +def _pl(l): + return {k: v for k, v in l.items() if k not in {"info", "f", "self"}} + + +# hdf/src/htags.h +tags = { + 1: "NULL", + 20: "LINKED", + 30: "VERSION", + 40: "COMPRESSED", + 50: "VLINKED", + 51: "VLINKED_DATA", + 60: "CHUNKED", + 61: "CHUNK", + 100: "FID", + 101: "FD", + 102: "TID", + 103: "TD", + 104: "DIL", + 105: "DIA", + 106: "NT", + 107: "MT", + 108: "FREE", + 200: "ID8", + 201: "IP8", + 202: "RI8", + 203: "CI8", + 204: "II8", + 300: "ID", + 301: "LUT", + 302: "RI", + 303: "CI", + 304: "NRI", + 306: "RIG", + 307: "LD", + 308: "MD", + 309: "MA", + 310: "CCN", + 311: "CFM", + 312: "AR", + 400: "DRAW", + 401: "RUN", + 500: "XYP", + 501: "MTO", + 602: "T14", + 603: "T105", + 700: "SDG", + 701: "SDD", + 702: "SD", + 703: "SDS", + 704: "SDL", + 705: "SDU", + 706: "SDF", + 707: "SDM", + 708: "SDC", + 709: "SDT", + 710: "SDLNK", + 720: "NDG", + 721: "RESERVED", + # "Objects of tag 721 are never actually written to the file. The tag is + # needed to make things easier mixing DFSD and SD style objects in the same file" + 731: "CAL", + 732: "FV", + 799: "BREQ", + 781: "SDRAG", + 780: "EREQ", + 1965: "VG", + 1962: "VH", + 1963: "VS", + 11: "RLE", + 12: "IMCOMP", + 13: "JPEG", + 14: "GREYJPEG", + 15: "JPEG5", + 16: "GREYJPEG5", +} +spec = { + 1: "LINKED", + 2: "EXT", + 3: "COMP", + 4: "VLINKED", + 5: "CHUNKED", + 6: "BUFFERED", + 7: "COMPRAS", +} + +# hdf4/hdf/src/hntdefs.h +dtypes = { + 5: "f4", + 6: "f8", + 20: "i1", + 21: "u1", + 4: "str", # special case, size given in header + 22: ">i2", + 23: ">u2", + 24: ">i4", + 25: ">u4", + 26: ">i8", + 27: ">u8", +} + +# hdf4/hdf/src/hcomp.h +comp = { + 0: "NONE", + 1: "RLE", + 2: "NBIT", + 3: "SKPHUFF", + 4: "DEFLATE", # called deflate, but code says "gzip" and doc says "GNU zip"; actually zlib? + # see codecs.ZlibCodec + 5: "SZIP", + 7: "JPEG", +} +
+ +
+