Skip to content

Commit

Permalink
map_blocks (#3276)
Browse files Browse the repository at this point in the history
* map_block attempt 2

* Address reviews: errors, args + kwargs support.

* Works with datasets!

* remove wrong comment.

* Support chunks.

* infer template.

* cleanup

* cleanup2

* api.rst

* simple shape change error check.

* Make test more complicated.

* Fix for when user function doesn't set DataArray.name

* Now _to_temp_dataset works.

* Add whats-new

* chunks kwarg makes no sense right now.

* review feedback:

1. skip index graph nodes.
2. var → name
3. quicker dataarray creation.
4. Add restrictions to docstring.
5. rename chunk construction task.
6. error when non-xarray object is returned.
7. restore non-coord dims.

review

* Support nondim coords in make_meta.

* Add Dataset.unify_chunks

* doc updates.

* minor.

* update comment.

* More complicated test dataset. Tests fail :X

* Don't know why compute is needed.

* work with DataArray nondim coords.

* fastpath unify_chunks

* comment.

* much improved tests.

* Change args, kwargs syntax.

* Add dataset, dataarray methods.

* api.rst

* docstrings.

* Fix unify_chunks.

* Move assert_chunks_equal to xarray.testing.

* minor changes.

* Better error handling when inferring returned object

* wip

* wip

* better to_array

* Docstrings + nicer error message.

* remove unify_chunks in map_blocks + better tests.

* typing for unify_chunks

* address more review comments.

* more unify_chunks tests.

* Just use dask.core.utils.meta_from_array

* get tests working. assert_equal needs a lot of fixing.

* more unify_chunks test.

* assert_chunks_equal fixes.

* copy over meta_from_array.

* minor fixes.

* raise chunks error earlier and test for map_blocks raising chunk error

* fix.

* Type annotations

* py35 compat

* make sure unify_chunks does not compute.

* Make tests functional by call compute before assert_equal

* Update whats-new

* Work with attributes.

* Support attrs and name changes.

* more assert_equal

* test changing coord attribute

* fix whats new

* rework tests to use fixtures (kind of)

* more review changes.

* cleanup

* more review feedback.

* fix unify_chunks.

* read dask_array_compat :)

* Dask 1.2.0 compat.

* documentation polish

* make_meta reflow

* cosmetic

* polish

* Fix tests

* isort

* isort

* Add func call to docstrings.
  • Loading branch information
dcherian authored and crusaderky committed Oct 10, 2019
1 parent 291cb80 commit 3f29551
Show file tree
Hide file tree
Showing 15 changed files with 910 additions and 26 deletions.
6 changes: 6 additions & 0 deletions doc/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Top-level functions
zeros_like
ones_like
dot
map_blocks

Dataset
=======
Expand Down Expand Up @@ -499,6 +500,8 @@ Dataset methods
Dataset.persist
Dataset.load
Dataset.chunk
Dataset.unify_chunks
Dataset.map_blocks
Dataset.filter_by_attrs
Dataset.info

Expand Down Expand Up @@ -529,6 +532,8 @@ DataArray methods
DataArray.persist
DataArray.load
DataArray.chunk
DataArray.unify_chunks
DataArray.map_blocks

GroupBy objects
===============
Expand Down Expand Up @@ -629,6 +634,7 @@ Testing
testing.assert_equal
testing.assert_identical
testing.assert_allclose
testing.assert_chunks_equal

Exceptions
==========
Expand Down
5 changes: 5 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ Breaking changes
New functions/methods
~~~~~~~~~~~~~~~~~~~~~

- Added :py:func:`~xarray.map_blocks`, modeled after :py:func:`dask.array.map_blocks`.
Also added :py:meth:`Dataset.unify_chunks`, :py:meth:`DataArray.unify_chunks` and
:py:meth:`testing.assert_chunks_equal`. By `Deepak Cherian <https://github.com/dcherian>`_
and `Guido Imperiale <https://github.com/crusaderky>`_.

Enhancements
~~~~~~~~~~~~

Expand Down
1 change: 1 addition & 0 deletions xarray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from .core.dataarray import DataArray
from .core.merge import merge, MergeError
from .core.options import set_options
from .core.parallel import map_blocks

from .backends.api import (
open_dataset,
Expand Down
1 change: 0 additions & 1 deletion xarray/coding/times.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
unpack_for_encoding,
)


# standard calendars recognized by cftime
_STANDARD_CALENDARS = {"standard", "gregorian", "proleptic_gregorian"}

Expand Down
91 changes: 91 additions & 0 deletions xarray/core/dask_array_compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from distutils.version import LooseVersion

import dask.array as da
import numpy as np
from dask import __version__ as dask_version

if LooseVersion(dask_version) >= LooseVersion("2.0.0"):
meta_from_array = da.utils.meta_from_array
else:
# Copied from dask v2.4.0
# Used under the terms of Dask's license, see licenses/DASK_LICENSE.
import numbers

def meta_from_array(x, ndim=None, dtype=None):
""" Normalize an array to appropriate meta object
Parameters
----------
x: array-like, callable
Either an object that looks sufficiently like a Numpy array,
or a callable that accepts shape and dtype keywords
ndim: int
Number of dimensions of the array
dtype: Numpy dtype
A valid input for ``np.dtype``
Returns
-------
array-like with zero elements of the correct dtype
"""
# If using x._meta, x must be a Dask Array, some libraries (e.g. zarr)
# implement a _meta attribute that are incompatible with Dask Array._meta
if hasattr(x, "_meta") and isinstance(x, da.Array):
x = x._meta

if dtype is None and x is None:
raise ValueError("You must specify the meta or dtype of the array")

if np.isscalar(x):
x = np.array(x)

if x is None:
x = np.ndarray

if isinstance(x, type):
x = x(shape=(0,) * (ndim or 0), dtype=dtype)

if (
not hasattr(x, "shape")
or not hasattr(x, "dtype")
or not isinstance(x.shape, tuple)
):
return x

if isinstance(x, list) or isinstance(x, tuple):
ndims = [
0
if isinstance(a, numbers.Number)
else a.ndim
if hasattr(a, "ndim")
else len(a)
for a in x
]
a = [a if nd == 0 else meta_from_array(a, nd) for a, nd in zip(x, ndims)]
return a if isinstance(x, list) else tuple(x)

if ndim is None:
ndim = x.ndim

try:
meta = x[tuple(slice(0, 0, None) for _ in range(x.ndim))]
if meta.ndim != ndim:
if ndim > x.ndim:
meta = meta[
(Ellipsis,) + tuple(None for _ in range(ndim - meta.ndim))
]
meta = meta[tuple(slice(0, 0, None) for _ in range(meta.ndim))]
elif ndim == 0:
meta = meta.sum()
else:
meta = meta.reshape((0,) * ndim)
except Exception:
meta = np.empty((0,) * ndim, dtype=dtype or x.dtype)

if np.isscalar(meta):
meta = np.array(meta)

if dtype and meta.dtype != dtype:
meta = meta.astype(dtype)

return meta
76 changes: 76 additions & 0 deletions xarray/core/dataarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
Optional,
Sequence,
Tuple,
TypeVar,
Union,
cast,
overload,
Expand Down Expand Up @@ -63,6 +64,8 @@
)

if TYPE_CHECKING:
T_DSorDA = TypeVar("T_DSorDA", "DataArray", Dataset)

try:
from dask.delayed import Delayed
except ImportError:
Expand Down Expand Up @@ -3038,6 +3041,79 @@ def integrate(
ds = self._to_temp_dataset().integrate(dim, datetime_unit)
return self._from_temp_dataset(ds)

def unify_chunks(self) -> "DataArray":
""" Unify chunk size along all chunked dimensions of this DataArray.
Returns
-------
DataArray with consistent chunk sizes for all dask-array variables
See Also
--------
dask.array.core.unify_chunks
"""
ds = self._to_temp_dataset().unify_chunks()
return self._from_temp_dataset(ds)

def map_blocks(
self,
func: "Callable[..., T_DSorDA]",
args: Sequence[Any] = (),
kwargs: Mapping[str, Any] = None,
) -> "T_DSorDA":
"""
Apply a function to each chunk of this DataArray. This method is experimental
and its signature may change.
Parameters
----------
func: callable
User-provided function that accepts a DataArray as its first parameter. The
function will receive a subset of this DataArray, corresponding to one chunk
along each chunked dimension. ``func`` will be executed as
``func(obj_subset, *args, **kwargs)``.
The function will be first run on mocked-up data, that looks like this array
but has sizes 0, to determine properties of the returned object such as
dtype, variable names, new dimensions and new indexes (if any).
This function must return either a single DataArray or a single Dataset.
This function cannot change size of existing dimensions, or add new chunked
dimensions.
args: Sequence
Passed verbatim to func after unpacking, after the sliced DataArray. xarray
objects, if any, will not be split by chunks. Passing dask collections is
not allowed.
kwargs: Mapping
Passed verbatim to func after unpacking. xarray objects, if any, will not be
split by chunks. Passing dask collections is not allowed.
Returns
-------
A single DataArray or Dataset with dask backend, reassembled from the outputs of
the function.
Notes
-----
This method is designed for when one needs to manipulate a whole xarray object
within each chunk. In the more common case where one can work on numpy arrays,
it is recommended to use apply_ufunc.
If none of the variables in this DataArray is backed by dask, calling this
method is equivalent to calling ``func(self, *args, **kwargs)``.
See Also
--------
dask.array.map_blocks, xarray.apply_ufunc, xarray.map_blocks,
xarray.Dataset.map_blocks
"""
from .parallel import map_blocks

return map_blocks(func, self, args, kwargs)

# this needs to be at the end, or mypy will confuse with `str`
# https://mypy.readthedocs.io/en/latest/common_issues.html#dealing-with-conflicting-names
str = property(StringAccessor)
Expand Down
Loading

0 comments on commit 3f29551

Please sign in to comment.