From ceb7c44da759cd6c5daf7dc85ebc6e18c60500fd Mon Sep 17 00:00:00 2001 From: David Stuebe Date: Tue, 27 Feb 2024 09:14:45 +0000 Subject: [PATCH] Add parallel chunk_getitems for grib reader --- zarr/core.py | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/zarr/core.py b/zarr/core.py index d22a9d79c3..5b045d0b04 100644 --- a/zarr/core.py +++ b/zarr/core.py @@ -10,6 +10,9 @@ import numpy as np from numcodecs.compat import ensure_bytes +import logging +logger = logging.getLogger(__name__) + from zarr._storage.store import _prefix_to_attrs_key, assert_zarr_v3_api_available from zarr.attrs import Attributes from zarr.codecs import AsType, get_codec @@ -2163,6 +2166,42 @@ def _chunk_getitems( partial_read_decode = False values = self.chunk_store.get_partial_values([(ckey, (0, None)) for ckey in ckeys]) cdatas = {key: value for key, value in zip(ckeys, values) if value is not None} + + + elif "GRIBCodec" in list(map(lambda x: str(x.__class__.__name__), self.filters or [])): + # Start parallel grib hack + # Make this really specific to GRIBCodec for now - we can make this more general later? + from joblib import Parallel, delayed + + def parallel_io_method(instance, c_key, c_select, out_sel, my_out): + try: + cdata = instance.chunk_store[c_key] + chunk = instance._decode_chunk(cdata) + tmp = chunk[c_select] + if drop_axes: + tmp = np.squeeze(tmp, axis=drop_axes) + my_out[out_sel] = tmp + + except Exception: + # TODO: get more context from the mapper about what chunk failed! + logger.exception("Error reading chunk %s", c_key) + my_out[out_sel] = instance._fill_value + + with tempfile.NamedTemporaryFile(mode="w+b", prefix="zarr_memmap") as f: + logger.warning("Creating memmap array of shape %s - this could oom", out.shape) + output = np.memmap(f, dtype=out.dtype, shape=out.shape, mode='w+') + + # Just setting mmap_mode to w+ doesn't seem to copy the data back to out... + Parallel()( + delayed(parallel_io_method)(self, ckey, chunk_select, out_select, output) + for ckey, chunk_select, out_select in zip(ckeys, lchunk_selection, lout_selection) + ) + + out[:] = output[:] + + return + # End parallel grib hack + else: partial_read_decode = False contexts = {}