Skip to content

Commit

Permalink
Add parallel chunk_getitems for grib reader
Browse files Browse the repository at this point in the history
  • Loading branch information
emfdavid committed Feb 27, 2024
1 parent 0b0ac88 commit ceb7c44
Showing 1 changed file with 39 additions and 0 deletions.
39 changes: 39 additions & 0 deletions zarr/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import numpy as np
from numcodecs.compat import ensure_bytes

import logging
logger = logging.getLogger(__name__)

from zarr._storage.store import _prefix_to_attrs_key, assert_zarr_v3_api_available
from zarr.attrs import Attributes
from zarr.codecs import AsType, get_codec
Expand Down Expand Up @@ -2163,6 +2166,42 @@ def _chunk_getitems(
partial_read_decode = False
values = self.chunk_store.get_partial_values([(ckey, (0, None)) for ckey in ckeys])
cdatas = {key: value for key, value in zip(ckeys, values) if value is not None}


elif "GRIBCodec" in list(map(lambda x: str(x.__class__.__name__), self.filters or [])):
# Start parallel grib hack
# Make this really specific to GRIBCodec for now - we can make this more general later?
from joblib import Parallel, delayed

def parallel_io_method(instance, c_key, c_select, out_sel, my_out):
try:
cdata = instance.chunk_store[c_key]
chunk = instance._decode_chunk(cdata)
tmp = chunk[c_select]
if drop_axes:
tmp = np.squeeze(tmp, axis=drop_axes)
my_out[out_sel] = tmp

except Exception:
# TODO: get more context from the mapper about what chunk failed!
logger.exception("Error reading chunk %s", c_key)
my_out[out_sel] = instance._fill_value

with tempfile.NamedTemporaryFile(mode="w+b", prefix="zarr_memmap") as f:
logger.warning("Creating memmap array of shape %s - this could oom", out.shape)
output = np.memmap(f, dtype=out.dtype, shape=out.shape, mode='w+')

# Just setting mmap_mode to w+ doesn't seem to copy the data back to out...
Parallel()(
delayed(parallel_io_method)(self, ckey, chunk_select, out_select, output)
for ckey, chunk_select, out_select in zip(ckeys, lchunk_selection, lout_selection)
)

out[:] = output[:]

return
# End parallel grib hack

else:
partial_read_decode = False
contexts = {}
Expand Down

0 comments on commit ceb7c44

Please sign in to comment.