diff --git a/_modules/kerchunk/grib2.html b/_modules/kerchunk/grib2.html index 520629a..a63133c 100644 --- a/_modules/kerchunk/grib2.html +++ b/_modules/kerchunk/grib2.html @@ -88,14 +88,8 @@
import io
import logging
from collections import defaultdict
-import warnings
-from typing import Iterable, List, Dict, Set, TYPE_CHECKING, Optional, Callable
+from typing import Iterable, List, Dict, Set
import ujson
-import itertools
-
-if TYPE_CHECKING:
- import pandas as pd
- import datatree
import fsspec
import zarr
@@ -105,6 +99,7 @@ Source code for kerchunk.grib2
from kerchunk.utils import class_factory, _encode_for_JSON
from kerchunk.codecs import GRIBCodec
from kerchunk.combine import MultiZarrToZarr, drop
+from kerchunk._grib_idx import parse_grib_idx, build_idx_grib_mapping, map_from_index
try:
@@ -118,10 +113,6 @@ Source code for kerchunk.grib2
)
-class DynamicZarrStoreError(ValueError):
- pass
-
-
# cfgrib copies over certain GRIB attributes
# but renames them to CF-compliant values
ATTRS_TO_COPY_OVER = {
@@ -682,394 +673,15 @@ Source code for kerchunk.grib2
return group
-def parse_grib_idx(
- basename: str,
- suffix: str = "idx",
- storage_options: Optional[Dict] = None,
- validate: bool = False,
-) -> "pd.DataFrame":
- """
- Parses per-message metadata from a grib2.idx file (text-type) to a dataframe of attributes
-
- The function uses the idx file, extracts the metadata known as attrs (variables with
- level and forecast time) from each idx entry and converts it into pandas
- DataFrame. The dataframe is later to build the one-to-one mapping to the grib file metadata.
-
- Parameters
- ----------
- basename : str
- The base name is the full path to the grib file.
- suffix : str
- The suffix is the ending for the idx file.
- storage_options: dict
- For accessing the data, passed to filesystem
- validate : bool
- The validation if the metadata table has duplicate attrs.
-
- Returns
- -------
- pandas.DataFrame : The data frame containing the results.
- """
- import pandas as pd
-
- fs, _ = fsspec.core.url_to_fs(basename, **(storage_options or {}))
-
- fname = f"{basename}.{suffix}"
-
- baseinfo = fs.info(basename)
-
- with fs.open(fname) as f:
- result = pd.read_csv(f, header=None, names=["raw_data"])
- result[["idx", "offset", "date", "attrs"]] = result["raw_data"].str.split(
- ":", expand=True, n=3
- )
- result["offset"] = result["offset"].astype(int)
- result["idx"] = result["idx"].astype(int)
-
- # dropping the original single "raw_data" column after formatting
- result.drop(columns=["raw_data"], inplace=True)
-
- result = result.assign(
- length=(
- result.offset.shift(periods=-1, fill_value=baseinfo["size"]) - result.offset
- ),
- idx_uri=fname,
- grib_uri=basename,
- )
-
- if validate and not result["attrs"].is_unique:
- raise ValueError(f"Attribute mapping for grib file {basename} is not unique")
-
- return result.set_index("idx")
-
-
-def build_path(path: Iterable[str | None], suffix: Optional[str] = None) -> str:
- """
- Returns the path to access the values in a zarr store without a leading "/"
-
- Parameters
- ----------
- path : Iterable[str | None]
- The path is the list of values to the element in zarr store
- suffix : str
- Last element if any
-
- Returns
- -------
- str : returns the path as a string
- """
- return "/".join([val for val in [*path, suffix] if val is not None]).lstrip("/")
-
-
-def extract_dataset_chunk_index(
- dset: "datatree.DataTree",
- ref_store: Dict,
- grib: bool = False,
-) -> list[dict]:
- """
- Process and extract a kerchunk index for an xarray dataset or datatree node.
-
- The data_vars from the dataset will be indexed.
- The coordinate vars for each dataset will be used for indexing.
- Datatrees generated by grib_tree have some nice properties which allow a denser index.
-
- Parameters
- ----------
- dset : datatree.DataTree
- The datatree node from the datatree instance
- ref_store : Dict
- The zarr store dictionary backed by the gribtree
- grib : bool
- boolean for treating coordinates as grib levels
-
- Returns
- -------
- list[dict] : returns the extracted grib metadata in the form of key-value pairs inside a list
- """
- import datatree
-
- result: list[dict] = []
- attributes = dset.attrs.copy()
-
- dpath = None
- if isinstance(dset, datatree.DataTree):
- dpath = dset.path
- walk_group = dset.parent
- while walk_group:
- attributes.update(walk_group.attrs)
- walk_group = walk_group.parent
-
- for dname, dvar in dset.data_vars.items():
- # Get the chunk size - `chunks` property only works for xarray native
- zarray = ujson.loads(ref_store[build_path([dpath, dname], suffix=".zarray")])
- dchunk = zarray["chunks"]
- dshape = dvar.shape
-
- index_dims = {}
- for ddim_nane, ddim_size, dchunk_size in zip(dvar.dims, dshape, dchunk):
- if dchunk_size == 1:
- index_dims[ddim_nane] = ddim_size
- elif dchunk_size != ddim_size:
- # Must be able to get a single coordinate value for each chunk to index it.
- raise ValueError(
- "Can not extract chunk index for dimension %s with non singleton chunk dimensions"
- % ddim_nane
- )
- # Drop the dim where each chunk covers the whole dimension - no indexing needed!
-
- for idx in itertools.product(*[range(v) for v in index_dims.values()]):
- # Build an iterator over each of the single dimension chunks
- dim_idx = {key: val for key, val in zip(index_dims.keys(), idx)}
-
- coord_vals = {}
- for cname, cvar in dvar.coords.items():
- if grib:
- # Grib data has only one level coordinate
- cname = (
- cname
- if cname
- in ("valid_time", "time", "step", "latitude", "longitude")
- else "level"
- )
-
- if all([dim_name in dim_idx for dim_name in cvar.dims]):
- coord_index = tuple([dim_idx[dim_name] for dim_name in cvar.dims])
- try:
- coord_vals[cname] = cvar.to_numpy()[coord_index]
- except Exception:
- raise DynamicZarrStoreError(
- f"Error reading coords for {dpath}/{dname} coord {cname} with index {coord_index}"
- )
-
- whole_dim_cnt = len(dvar.dims) - len(dim_idx)
- chunk_idx = map(str, [*idx, *[0] * whole_dim_cnt])
- chunk_key = build_path([dpath, dname], suffix=".".join(chunk_idx))
- chunk_ref = ref_store.get(chunk_key)
-
- if chunk_ref is None:
- logger.warning("Chunk not found: %s", chunk_key)
- continue
-
- elif isinstance(chunk_ref, list) and len(chunk_ref) == 3:
- chunk_data = dict(
- uri=chunk_ref[0],
- offset=chunk_ref[1],
- length=chunk_ref[2],
- inline_value=None,
- )
- elif isinstance(chunk_ref, (bytes, str)):
- chunk_data = dict(inline_value=chunk_ref, offset=-1, length=-1)
- else:
- raise ValueError(f"Key {chunk_key} has bad value '{chunk_ref}'")
- result.append(dict(varname=dname, **attributes, **coord_vals, **chunk_data))
-
- return result
-
-
-def extract_datatree_chunk_index(
- dtree: "datatree.DataTree", kerchunk_store: dict, grib: bool = False
-) -> "pd.DataFrame":
- """
- Recursive method to iterate over the data tree and extract the data variable chunks with index metadata
-
- Parameters
- ----------
- dtree : datatree.DataTree
- The xarray datatree representation of the reference filesystem
- kerchunk_store : dict
- the grib_tree output for a single grib file
- grib : bool
- boolean for treating coordinates as grib levels
-
- Returns
- -------
- pandas.Dataframe : The dataframe constructed from the grib metadata
- """
- import pandas as pd
-
- result: list[dict] = []
-
- for node in dtree.subtree:
- if node.has_data:
- result += extract_dataset_chunk_index(
- node, kerchunk_store["refs"], grib=grib
- )
-
- return pd.DataFrame.from_records(result)
-
-
-def _map_grib_file_by_group(
- fname: str,
- mapper: Optional[Callable] = None,
- storage_options: Optional[Dict] = None,
-) -> "pd.DataFrame":
- """
- Helper method used to read the cfgrib metadata associated with each message (group) in the grib file
- This method does not add metadata
-
- Parameters
- ----------
- fname : str
- the file name to read with scan_grib
- mapper : Optional[Callable]
- the mapper if any to apply (used for hrrr subhf)
-
- Returns
- -------
- pandas.Dataframe : The intermediate dataframe constructed from the grib metadata
- """
- import pandas as pd
-
- mapper = (lambda x: x) if mapper is None else mapper
- references = scan_grib(fname, storage_options=storage_options)
-
- with warnings.catch_warnings():
- warnings.simplefilter("ignore")
- return pd.concat(
- # grib idx is fortran indexed (from one not zero)
- list(
- filter(
- # filtering out the cfgrib metadata dataframe in case it is None
- lambda item: item is not None,
- [
- # extracting the metadata from a single message
- _extract_single_group(mapper(group), i, storage_options)
- for i, group in enumerate(references, start=1)
- ],
- )
- )
- ).set_index("idx")
-
-
-def _extract_single_group(grib_group: dict, idx: int, storage_options: Dict):
- import datatree
-
- grib_tree_store = grib_tree(
- [
- grib_group,
- ],
- storage_options,
- )
-
- if len(grib_tree_store["refs"]) <= 1:
- logger.info("Empty DT: %s", grib_tree_store)
- return None
-
- dt = datatree.open_datatree(
- fsspec.filesystem("reference", fo=grib_tree_store).get_mapper(""),
- engine="zarr",
- consolidated=False,
- )
-
- k_ind = extract_datatree_chunk_index(dt, grib_tree_store, grib=True)
- if k_ind.empty:
- logger.warning("Empty Kind: %s", grib_tree_store)
- return None
-
- assert (
- len(k_ind) == 1
- ), f"expected a single variable grib group but produced: {k_ind}"
- k_ind.loc[:, "idx"] = idx
- return k_ind
-
-
-def build_idx_grib_mapping(
- basename: str,
- storage_options: Optional[Dict] = None,
- suffix: str = "idx",
- mapper: Optional[Callable] = None,
- validate: bool = True,
-) -> "pd.DataFrame":
- """
- Mapping method combines the idx and grib metadata to make a mapping from
- one to the other for a particular model horizon file. This should be generally
- applicable to all forecasts for the given horizon.
-
- Parameters
- ----------
- basename : str
- the full path for the grib2 file
- storage_options: dict
- For accessing the data, passed to filesystem
- suffix : str
- The suffix is the ending for the idx file.
- mapper : Optional[Callable]
- the mapper if any to apply (used for hrrr subhf)
- validate : bool
- to assert the mapping is correct or fail before returning
-
- Returns
- -------
- pandas.Dataframe : The merged dataframe with the results of the two operations
- joined on the grib message (group) number
- """
- import pandas as pd
-
- grib_file_index = _map_grib_file_by_group(
- fname=basename,
- mapper=mapper,
- storage_options=storage_options,
- )
- idx_file_index = parse_grib_idx(
- basename=basename, suffix=suffix, storage_options=storage_options
- )
- result = idx_file_index.merge(
- # Left merge because the idx file should be authoritative - one record per grib message
- grib_file_index,
- on="idx",
- how="left",
- suffixes=("_idx", "_grib"),
- )
-
- if validate:
- # If any of these conditions fail - inspect the result manually on colab.
- all_match_offset = (
- (result.loc[:, "offset_idx"] == result.loc[:, "offset_grib"])
- | pd.isna(result.loc[:, "offset_grib"])
- | ~pd.isna(result.loc[:, "inline_value"])
- )
- all_match_length = (
- (result.loc[:, "length_idx"] == result.loc[:, "length_grib"])
- | pd.isna(result.loc[:, "length_grib"])
- | ~pd.isna(result.loc[:, "inline_value"])
- )
-
- if not all_match_offset.all():
- vcs = all_match_offset.value_counts()
- raise ValueError(
- f"Failed to match message offset mapping for grib file {basename}: "
- f"{vcs[True]} matched, {vcs[False]} didn't"
- )
-
- if not all_match_length.all():
- vcs = all_match_length.value_counts()
- raise ValueError(
- f"Failed to match message offset mapping for grib file {basename}: "
- f"{vcs[True]} matched, {vcs[False]} didn't"
- )
-
- if not result["attrs"].is_unique:
- dups = result.loc[result["attrs"].duplicated(keep=False), :]
- logger.warning(
- "The idx attribute mapping for %s is not unique for %d variables: %s",
- basename,
- len(dups),
- dups.varname.tolist(),
- )
-
- r_index = result.set_index(
- ["varname", "typeOfLevel", "stepType", "level", "valid_time"]
- )
- if not r_index.index.is_unique:
- dups = r_index.loc[r_index.index.duplicated(keep=False), :]
- logger.warning(
- "The grib hierarchy in %s is not unique for %d variables: %s",
- basename,
- len(dups),
- dups.index.get_level_values("varname").tolist(),
- )
-
- return result
+__all__ = [
+ "scan_grib",
+ "grib_tree",
+ "correct_hrrr_subhf_step",
+ "example_combine",
+ "parse_grib_idx",
+ "build_idx_grib_mapping",
+ "map_from_index",
+]