diff --git a/docs/src/whatsnew/latest.rst b/docs/src/whatsnew/latest.rst index 07e416f7d7..a7ac314758 100644 --- a/docs/src/whatsnew/latest.rst +++ b/docs/src/whatsnew/latest.rst @@ -36,7 +36,8 @@ This document explains the changes made to Iris for this release 🐛 Bugs Fixed ============= -#. N/A +#. `@bouweandela`_ updated the ``chunktype`` of Dask arrays, so it corresponds + to the array content. (:pull:`5801`) 💣 Incompatible Changes diff --git a/lib/iris/_concatenate.py b/lib/iris/_concatenate.py index 8d929c2af2..b25698da89 100644 --- a/lib/iris/_concatenate.py +++ b/lib/iris/_concatenate.py @@ -7,9 +7,9 @@ from collections import defaultdict, namedtuple import warnings -import dask.array as da import numpy as np +from iris._lazy_data import concatenate as concatenate_arrays import iris.coords import iris.cube import iris.exceptions @@ -1112,7 +1112,7 @@ def _build_cell_measures(self): skton.signature.cell_measures_and_dims[i].coord.data for skton in skeletons ] - data = np.concatenate(tuple(data), axis=dim) + data = concatenate_arrays(tuple(data), axis=dim) # Generate the associated metadata. kwargs = cube_signature.cm_metadata[i].defn._asdict() @@ -1152,7 +1152,7 @@ def _build_ancillary_variables(self): skton.signature.ancillary_variables_and_dims[i].coord.data for skton in skeletons ] - data = np.concatenate(tuple(data), axis=dim) + data = concatenate_arrays(tuple(data), axis=dim) # Generate the associated metadata. kwargs = cube_signature.av_metadata[i].defn._asdict() @@ -1245,7 +1245,7 @@ def _build_data(self): skeletons = self._skeletons data = [skeleton.data for skeleton in skeletons] - data = da.concatenate(data, self.axis) + data = concatenate_arrays(data, self.axis) return data diff --git a/lib/iris/_lazy_data.py b/lib/iris/_lazy_data.py index 40984248d1..b1f8e9aa85 100644 --- a/lib/iris/_lazy_data.py +++ b/lib/iris/_lazy_data.py @@ -9,6 +9,7 @@ """ from functools import lru_cache, wraps +from typing import Sequence import dask import dask.array as da @@ -41,6 +42,11 @@ def is_lazy_data(data): return result +def is_masked_data(data: np.ndarray | da.Array) -> bool: + """Return whether the argument is a masked array.""" + return isinstance(da.utils.meta_from_array(data), np.ma.MaskedArray) + + def is_lazy_masked_data(data): """Determine whether managed data is lazy and masked. @@ -48,7 +54,7 @@ def is_lazy_masked_data(data): underlying array is of masked type. Otherwise return False. """ - return is_lazy_data(data) and ma.isMA(da.utils.meta_from_array(data)) + return is_lazy_data(data) and is_masked_data(data) @lru_cache @@ -220,9 +226,7 @@ def _optimum_chunksize( ) -def as_lazy_data( - data, chunks=None, asarray=False, dims_fixed=None, dask_chunking=False -): +def as_lazy_data(data, chunks=None, asarray=False, meta=None, dims_fixed=None): """Convert the input array `data` to a :class:`dask.array.Array`. Parameters @@ -232,17 +236,18 @@ def as_lazy_data( This will be converted to a :class:`dask.array.Array`. chunks : list of int, optional If present, a source chunk shape, e.g. for a chunked netcdf variable. + If set to "auto", Iris chunking optimisation will be bypassed, and dask's + default chunking will be used instead. asarray : bool, default=False If True, then chunks will be converted to instances of `ndarray`. Set to False (default) to pass passed chunks through unchanged. + meta : numpy.ndarray, optional + Empty ndarray created with same NumPy backend, ndim and dtype as the + Dask Array being created. dims_fixed : list of bool, optional If set, a list of values equal in length to 'chunks' or data.ndim. 'True' values indicate a dimension which can not be changed, i.e. the result for that index must equal the value in 'chunks' or data.shape. - dask_chunking : bool, default=False - If True, Iris chunking optimisation will be bypassed, and dask's default - chunking will be used instead. Including a value for chunks while dask_chunking - is set to True will result in a failure. Returns ------- @@ -258,13 +263,16 @@ def as_lazy_data( but reduced by a factor if that exceeds the dask default chunksize. """ - if dask_chunking: - if chunks is not None: - raise ValueError( - f"Dask chunking chosen, but chunks already assigned value {chunks}" - ) - lazy_params = {"asarray": asarray, "meta": np.ndarray} - else: + if isinstance(data, ma.core.MaskedConstant): + data = ma.masked_array(data.data, mask=data.mask) + + if meta is None and not isinstance(data, (np.ndarray, da.Array)): + raise ValueError( + "For performance reasons, `meta` cannot be `None` if `data` is " + "anything other than a Numpy or Dask array." + ) + + if chunks != "auto": if chunks is None: # No existing chunks : Make a chunk the shape of the entire input array # (but we will subdivide it if too big). @@ -281,15 +289,9 @@ def as_lazy_data( dtype=data.dtype, dims_fixed=dims_fixed, ) - lazy_params = { - "chunks": chunks, - "asarray": asarray, - "meta": np.ndarray, - } - if isinstance(data, ma.core.MaskedConstant): - data = ma.masked_array(data.data, mask=data.mask) + if not is_lazy_data(data): - data = da.from_array(data, **lazy_params) + data = da.from_array(data, chunks=chunks, asarray=asarray, meta=meta) return data @@ -351,7 +353,95 @@ def as_concrete_data(data): return data -def multidim_lazy_stack(stack): +def _combine( + arrays: Sequence[da.Array | np.ndarray], + operation: str, + **kwargs, +) -> da.Array | np.ndarray: + """Combine multiple arrays into a single array. + + Provides enhanced versions of :func:`~dask.array.concatenate` or :func:`~dask.array.stack`, + which ensure that all computed results are masked-array, if the combined .meta is masked. + + Parameters + ---------- + arrays : + The arrays to combine. + operation : + The combination operation to apply. + **kwargs : + Any keyword arguments to pass to the combination operation. + + """ + lazy = any(is_lazy_data(a) for a in arrays) + masked = any(is_masked_data(a) for a in arrays) + + array_module = np + if masked: + if lazy: + # Avoid inconsistent array type when slicing resulting array + arrays = tuple( + a if is_lazy_masked_data(a) else da.ma.masked_array(a) for a in arrays + ) + else: + # Avoid dropping the masks + array_module = np.ma + + func = getattr(array_module, operation) + return func(arrays, **kwargs) + + +def concatenate( + arrays: Sequence[da.Array | np.ndarray], + axis: int = 0, +) -> da.Array | np.ndarray: + """Concatenate a sequence of arrays along a new axis. + + Improves on the regular :func:`dask.array.concatenate` by always respecting a masked + ``.meta``, as described for :func:`_combine`. + + Parameters + ---------- + arrays : + The arrays must have the same shape, except in the dimension + corresponding to `axis` (the first, by default). + axis : + Dimension along which to align all of the arrays. If axis is None, + arrays are flattened before use. + + Returns + ------- + The concatenated array. + + """ + return _combine(arrays, operation="concatenate", axis=axis) + + +def stack( + arrays: Sequence[da.Array | np.ndarray], + axis: int = 0, +) -> da.Array | np.ndarray: + """Stack a sequence of arrays along a new axis. + + Improves on the regular :func:`dask.array.stack` by always respecting a masked + ``.meta``, as described for :func:`_combine`. + + Parameters + ---------- + arrays : + The arrays must have the same shape. + axis : + Dimension along which to align all of the arrays. + + Returns + ------- + The stacked array. + + """ + return _combine(arrays, operation="stack", axis=axis) + + +def multidim_lazy_stack(arr): """Recursively build a multidimensional stacked dask array. This is needed because :meth:`dask.array.Array.stack` only accepts a @@ -359,7 +449,7 @@ def multidim_lazy_stack(stack): Parameters ---------- - stack : + arr : An ndarray of :class:`dask.array.Array`. Returns @@ -367,15 +457,15 @@ def multidim_lazy_stack(stack): The input array converted to a lazy :class:`dask.array.Array`. """ - if stack.ndim == 0: + if arr.ndim == 0: # A 0-d array cannot be stacked. - result = stack.item() - elif stack.ndim == 1: + result = arr.item() + elif arr.ndim == 1: # Another base case : simple 1-d goes direct in dask. - result = da.stack(list(stack)) + result = stack(list(arr)) else: # Recurse because dask.stack does not do multi-dimensional. - result = da.stack([multidim_lazy_stack(subarray) for subarray in stack]) + result = stack([multidim_lazy_stack(subarray) for subarray in arr]) return result diff --git a/lib/iris/aux_factory.py b/lib/iris/aux_factory.py index cd59575f93..41e1e9f573 100644 --- a/lib/iris/aux_factory.py +++ b/lib/iris/aux_factory.py @@ -11,6 +11,7 @@ import dask.array as da import numpy as np +from iris._lazy_data import concatenate from iris.common import CFVariableMixin, CoordMetadata, metadata_manager_factory import iris.coords from iris.warnings import IrisIgnoringBoundsWarning @@ -1076,7 +1077,7 @@ def _derive(self, sigma, eta, depth, depth_c, zlev, nsigma, coord_dims_func): result_rest_levs = zlev[z_slices_rest] * ones_full_result[z_slices_rest] # Combine nsigma and 'rest' levels for the final result. - result = da.concatenate([result_nsigma_levs, result_rest_levs], axis=z_dim) + result = concatenate([result_nsigma_levs, result_rest_levs], axis=z_dim) return result def make_coord(self, coord_dims_func): diff --git a/lib/iris/cube.py b/lib/iris/cube.py index fecbcaf0da..8418a630b5 100644 --- a/lib/iris/cube.py +++ b/lib/iris/cube.py @@ -25,7 +25,6 @@ import zlib from cf_units import Unit -import dask.array as da import numpy as np import numpy.ma as ma @@ -3134,12 +3133,7 @@ def make_chunk(key): result = chunks[0] else: chunk_data = [chunk.core_data() for chunk in chunks] - if self.has_lazy_data(): - func = da.concatenate - else: - module = ma if ma.isMaskedArray(self.data) else np - func = module.concatenate - data = func(chunk_data, dim) + data = _lazy.concatenate(chunk_data, axis=dim) result = iris.cube.Cube(data) result.metadata = deepcopy(self.metadata) @@ -4432,13 +4426,10 @@ def aggregated_by(self, coords, aggregator, climatological=False, **kwargs): # Choose appropriate data and functions for data aggregation. if aggregator.lazy_func is not None and self.has_lazy_data(): - stack = da.stack input_data = self.lazy_data() agg_method = aggregator.lazy_aggregate else: input_data = self.data - # Note numpy.stack does not preserve masks. - stack = ma.stack if ma.isMaskedArray(input_data) else np.stack agg_method = aggregator.aggregate # Create data and weights slices. @@ -4475,11 +4466,11 @@ def aggregated_by(self, coords, aggregator, climatological=False, **kwargs): # before combining the different slices. if return_weights: result, weights_result = list(zip(*result)) - aggregateby_weights = stack(weights_result, axis=dimension_to_groupby) + aggregateby_weights = _lazy.stack(weights_result, axis=dimension_to_groupby) else: aggregateby_weights = None - aggregateby_data = stack(result, axis=dimension_to_groupby) + aggregateby_data = _lazy.stack(result, axis=dimension_to_groupby) # Ensure plain ndarray is output if plain ndarray was input. if ma.isMaskedArray(aggregateby_data) and not ma.isMaskedArray(input_data): aggregateby_data = ma.getdata(aggregateby_data) diff --git a/lib/iris/fileformats/netcdf/_thread_safe_nc.py b/lib/iris/fileformats/netcdf/_thread_safe_nc.py index 675a151868..3a556f5447 100644 --- a/lib/iris/fileformats/netcdf/_thread_safe_nc.py +++ b/lib/iris/fileformats/netcdf/_thread_safe_nc.py @@ -323,6 +323,10 @@ def ndim(self): # noqa: D102 return len(self.shape) + @property + def dask_meta(self): + return np.ma.array(np.empty((0,) * self.ndim, dtype=self.dtype), mask=True) + def __getitem__(self, keys): # Using a DatasetWrapper causes problems with invalid ID's and the # netCDF4 library, presumably because __getitem__ gets called so many diff --git a/lib/iris/fileformats/netcdf/loader.py b/lib/iris/fileformats/netcdf/loader.py index 16fe567c9b..95cbc120f5 100644 --- a/lib/iris/fileformats/netcdf/loader.py +++ b/lib/iris/fileformats/netcdf/loader.py @@ -240,7 +240,7 @@ def _get_cf_var_data(cf_var, filename): # Get the chunking specified for the variable : this is either a shape, or # maybe the string "contiguous". if CHUNK_CONTROL.mode is ChunkControl.Modes.AS_DASK: - result = as_lazy_data(proxy, chunks=None, dask_chunking=True) + result = as_lazy_data(proxy, meta=proxy.dask_meta, chunks="auto") else: chunks = cf_var.cf_data.chunking() if chunks is None: @@ -285,7 +285,10 @@ def _get_cf_var_data(cf_var, filename): if dims_fixed is None: dims_fixed = [dims_fixed] result = as_lazy_data( - proxy, chunks=chunks, dims_fixed=tuple(dims_fixed) + proxy, + meta=proxy.dask_meta, + chunks=chunks, + dims_fixed=tuple(dims_fixed), ) return result diff --git a/lib/iris/fileformats/pp.py b/lib/iris/fileformats/pp.py index 0a159b46b6..c2660d022c 100644 --- a/lib/iris/fileformats/pp.py +++ b/lib/iris/fileformats/pp.py @@ -606,6 +606,10 @@ def fill_value(self): def ndim(self): return len(self.shape) + @property + def dask_meta(self): + return np.empty((0,) * self.ndim, dtype=self.dtype) + def __getitem__(self, keys): with open(self.path, "rb") as pp_file: pp_file.seek(self.offset, os.SEEK_SET) @@ -1756,7 +1760,7 @@ def _create_field_data(field, data_shape, land_mask_field=None): if land_mask_field is None: # For a "normal" (non-landsea-masked) field, the proxy can be # wrapped directly as a deferred array. - field.data = as_lazy_data(proxy, chunks=block_shape) + field.data = as_lazy_data(proxy, meta=proxy.dask_meta, chunks=block_shape) else: # This is a landsea-masked field, and its data must be handled in # a different way : Because data shape/size is not known in @@ -1807,8 +1811,12 @@ def calc_array(mask, values): return result delayed_result = calc_array(mask_field_array, delayed_valid_values) + meta = np.ma.array(np.empty((0,) * proxy.ndim, dtype=dtype), mask=True) lazy_result_array = da.from_delayed( - delayed_result, shape=block_shape, dtype=dtype + delayed_result, + shape=block_shape, + dtype=dtype, + meta=meta, ) field.data = lazy_result_array diff --git a/lib/iris/tests/unit/fileformats/netcdf/loader/test__chunk_control.py b/lib/iris/tests/unit/fileformats/netcdf/loader/test__chunk_control.py index d4c813502f..3051754423 100644 --- a/lib/iris/tests/unit/fileformats/netcdf/loader/test__chunk_control.py +++ b/lib/iris/tests/unit/fileformats/netcdf/loader/test__chunk_control.py @@ -203,7 +203,7 @@ def test_as_dask(tmp_filepath, save_cubelist_with_sigma): except RuntimeError as e: if str(e) != message: raise e - as_lazy_data.assert_called_with(ANY, chunks=None, dask_chunking=True) + as_lazy_data.assert_called_with(ANY, meta=ANY, chunks="auto") def test_pinned_optimisation(tmp_filepath, save_cubelist_with_sigma): diff --git a/lib/iris/tests/unit/fileformats/netcdf/loader/test__get_cf_var_data.py b/lib/iris/tests/unit/fileformats/netcdf/loader/test__get_cf_var_data.py index 92cb93496e..efa291a0b4 100644 --- a/lib/iris/tests/unit/fileformats/netcdf/loader/test__get_cf_var_data.py +++ b/lib/iris/tests/unit/fileformats/netcdf/loader/test__get_cf_var_data.py @@ -10,7 +10,7 @@ from unittest import mock -from dask.array import Array as dask_array +import dask.array as da import numpy as np from iris._lazy_data import _optimum_chunksize @@ -50,7 +50,8 @@ def test_cf_data_type(self): chunks = [1, 12, 100] cf_var = self._make(chunks) lazy_data = _get_cf_var_data(cf_var, self.filename) - self.assertIsInstance(lazy_data, dask_array) + self.assertIsInstance(lazy_data, da.Array) + self.assertIsInstance(da.utils.meta_from_array(lazy_data), np.ma.MaskedArray) def test_cf_data_chunks(self): chunks = [2500, 240, 200] @@ -90,7 +91,7 @@ def test_cf_data_contiguous(self): def test_type__1kf8_is_lazy(self): cf_var = self._make(shape=(1000,), dtype="f8") var_data = _get_cf_var_data(cf_var, self.filename) - self.assertIsInstance(var_data, dask_array) + self.assertIsInstance(var_data, da.Array) def test_arraytype__1ki2_is_real(self): cf_var = self._make(shape=(1000,), dtype="i2") diff --git a/lib/iris/tests/unit/fileformats/pp/test__create_field_data.py b/lib/iris/tests/unit/fileformats/pp/test__create_field_data.py index ab80332186..266502253a 100644 --- a/lib/iris/tests/unit/fileformats/pp/test__create_field_data.py +++ b/lib/iris/tests/unit/fileformats/pp/test__create_field_data.py @@ -55,6 +55,7 @@ def test_deferred_bytes(self): data_shape = (100, 120) proxy = mock.Mock( dtype=np.dtype("f4"), + dask_meta=np.empty((0,) * len(data_shape), dtype=np.dtype("f4")), shape=data_shape, spec=pp.PPDataProxy, ndim=len(data_shape), diff --git a/lib/iris/tests/unit/lazy_data/test_as_concrete_data.py b/lib/iris/tests/unit/lazy_data/test_as_concrete_data.py index 91b22a3c0e..9b9929a156 100644 --- a/lib/iris/tests/unit/lazy_data/test_as_concrete_data.py +++ b/lib/iris/tests/unit/lazy_data/test_as_concrete_data.py @@ -60,7 +60,8 @@ def test_lazy_mask_data(self): def test_lazy_scalar_proxy(self): a = np.array(5) proxy = MyProxy(a) - lazy_array = as_lazy_data(proxy) + meta = np.empty((0,) * proxy.ndim, dtype=proxy.dtype) + lazy_array = as_lazy_data(proxy, meta=meta) self.assertTrue(is_lazy_data(lazy_array)) result = as_concrete_data(lazy_array) self.assertFalse(is_lazy_data(result)) @@ -69,7 +70,8 @@ def test_lazy_scalar_proxy(self): def test_lazy_scalar_proxy_masked(self): a = np.ma.masked_array(5, True) proxy = MyProxy(a) - lazy_array = as_lazy_data(proxy) + meta = np.ma.array(np.empty((0,) * proxy.ndim, dtype=proxy.dtype), mask=True) + lazy_array = as_lazy_data(proxy, meta=meta) self.assertTrue(is_lazy_data(lazy_array)) result = as_concrete_data(lazy_array) self.assertFalse(is_lazy_data(result)) diff --git a/lib/iris/tests/unit/lazy_data/test_as_lazy_data.py b/lib/iris/tests/unit/lazy_data/test_as_lazy_data.py index 0f741b10a3..90166b5e78 100644 --- a/lib/iris/tests/unit/lazy_data/test_as_lazy_data.py +++ b/lib/iris/tests/unit/lazy_data/test_as_lazy_data.py @@ -46,26 +46,28 @@ def test_dask_chunking(self): chunks = (12,) optimum = self.patch("iris._lazy_data._optimum_chunksize") optimum.return_value = chunks - _ = as_lazy_data(data, chunks=None, dask_chunking=True) + _ = as_lazy_data(data, chunks="auto") self.assertFalse(optimum.called) - def test_dask_chunking_error(self): - data = np.arange(24) - chunks = (12,) - optimum = self.patch("iris._lazy_data._optimum_chunksize") - optimum.return_value = chunks - with self.assertRaisesRegex( - ValueError, - r"Dask chunking chosen, but chunks already assigned value", - ): - as_lazy_data(data, chunks=chunks, dask_chunking=True) - def test_with_masked_constant(self): masked_data = ma.masked_array([8], mask=True) masked_constant = masked_data[0] result = as_lazy_data(masked_constant) self.assertIsInstance(result, da.core.Array) + def test_missing_meta(self): + class MyProxy: + pass + + data = MyProxy() + + with self.assertRaisesRegex( + ValueError, + r"`meta` cannot be `None` if `data` is anything other than a Numpy " + r"or Dask array.", + ): + as_lazy_data(data) + class Test__optimised_chunks(tests.IrisTest): # Stable, known chunksize for testing. diff --git a/lib/iris/tests/unit/lazy_data/test_co_realise_cubes.py b/lib/iris/tests/unit/lazy_data/test_co_realise_cubes.py index 3b265d615d..4e304d4910 100644 --- a/lib/iris/tests/unit/lazy_data/test_co_realise_cubes.py +++ b/lib/iris/tests/unit/lazy_data/test_co_realise_cubes.py @@ -21,6 +21,7 @@ def __init__(self, array): self.ndim = array.ndim self._array = array self.access_count = 0 + self.meta = np.empty((0,) * array.ndim, dtype=array.dtype) def __getitem__(self, keys): self.access_count += 1 @@ -55,7 +56,7 @@ def test_multi(self): def test_combined_access(self): wrapped_array = ArrayAccessCounter(np.arange(3.0)) - lazy_array = as_lazy_data(wrapped_array) + lazy_array = as_lazy_data(wrapped_array, meta=wrapped_array.meta) derived_a = lazy_array + 1 derived_b = lazy_array + 2 derived_c = lazy_array + 3 diff --git a/lib/iris/tests/unit/lazy_data/test_stack.py b/lib/iris/tests/unit/lazy_data/test_stack.py new file mode 100644 index 0000000000..880bad7df1 --- /dev/null +++ b/lib/iris/tests/unit/lazy_data/test_stack.py @@ -0,0 +1,22 @@ +# Copyright Iris contributors +# +# This file is part of Iris and is released under the BSD license. +# See LICENSE in the root of the repository for full licensing details. +"""Test function :func:`iris._lazy data.stack`.""" + +import dask.array as da +import numpy as np + +from iris._lazy_data import stack + + +def test_stack(): + seq = [ + da.arange(2), + da.ma.masked_array(da.arange(2)), + ] + result = stack(seq) + + assert isinstance(result[0].compute(), np.ma.MaskedArray) + assert isinstance(result[1].compute(), np.ma.MaskedArray) + np.testing.assert_array_equal(stack(seq).compute(), da.stack(seq).compute())