Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add apply_gufunc #149

Merged
merged 4 commits into from
Feb 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cubed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
measure_reserved_memory,
visualize,
)
from .core.gufunc import apply_gufunc
from .core.ops import from_array, from_zarr, map_blocks, store, to_zarr

__all__ = [
Expand All @@ -29,6 +30,7 @@
"Array",
"Spec",
"TaskEndEvent",
"apply_gufunc",
"compute",
"from_array",
"from_zarr",
Expand Down
1 change: 1 addition & 0 deletions cubed/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
measure_reserved_memory,
visualize,
)
from .gufunc import apply_gufunc
from .ops import (
blockwise,
elemwise,
Expand Down
145 changes: 145 additions & 0 deletions cubed/core/gufunc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import numpy as np
from dask.array.gufunc import _parse_gufunc_signature, _validate_normalize_axes
from tlz import concat, merge, unique


def apply_gufunc(
func,
signature,
*args,
axes=None,
axis=None,
output_dtypes=None,
vectorize=None,
**kwargs,
):
"""
Apply a generalized ufunc or similar python function to arrays.

This is a cutdown version of the
`equivalent function <https://docs.dask.org/en/stable/generated/dask.array.gufunc.apply_gufunc.html>`_
in Dask. Refer there for usage information.

Current limitations: ``keepdims``, ``output_sizes``, and ``allow_rechunk`` are not supported;
and multiple outputs are not supported.

Cubed assumes that ``func`` will allocate a new output array. However, if it allocates more memory
than than, then you need to tell Cubed about it by setting the ``extra_required_mem`` parameter
to the amount needed in bytes (per task).
"""

# Currently the following parameters cannot be changed
keepdims = False
output_sizes = None
allow_rechunk = False

# based on dask's apply_gufunc

# Input processing:

# Signature
if not isinstance(signature, str):
raise TypeError("`signature` has to be of type string")
input_coredimss, output_coredimss = _parse_gufunc_signature(signature)

# Determine nout: nout = None for functions of one direct return; nout = int for return tuples
nout = None if not isinstance(output_coredimss, list) else len(output_coredimss)

if nout is not None:
raise NotImplementedError(
"Multiple outputs are not yet supported, see https://github.com/tomwhite/cubed/issues/69"
)

# Vectorize function, if required
if vectorize:
otypes = output_dtypes
func = np.vectorize(func, signature=signature, otypes=otypes)

# Miscellaneous
if output_sizes is None:
output_sizes = {}

# Axes
input_axes, output_axes = _validate_normalize_axes(
axes, axis, keepdims, input_coredimss, output_coredimss
)

# Main code:

# Cast all input arrays to cubed
# args = [asarray(a) for a in args] # TODO: do we need to support casting?

if len(input_coredimss) != len(args):
raise ValueError(
"According to `signature`, `func` requires %d arguments, but %s given"
% (len(input_coredimss), len(args))
)

# Note (cubed): since we don't support allow_rechunk=True, there is no need to transpose args (and outputs back again)

# Assess input args for loop dims
input_shapes = [a.shape for a in args]
input_chunkss = [a.chunks for a in args]
num_loopdims = [len(s) - len(cd) for s, cd in zip(input_shapes, input_coredimss)]
max_loopdims = max(num_loopdims) if num_loopdims else None
core_input_shapes = [
dict(zip(icd, s[n:]))
for s, n, icd in zip(input_shapes, num_loopdims, input_coredimss)
]
core_shapes = merge(*core_input_shapes)
core_shapes.update(output_sizes)

loop_input_dimss = [
tuple("__loopdim%d__" % d for d in range(max_loopdims - n, max_loopdims))
for n in num_loopdims
]
input_dimss = [lp + c for lp, c in zip(loop_input_dimss, input_coredimss)]

loop_output_dims = max(loop_input_dimss, key=len) if loop_input_dimss else tuple()

# Assess input args for same size and chunk sizes
# Collect sizes and chunksizes of all dims in all arrays
dimsizess = {}
chunksizess = {}
for dims, shape, chunksizes in zip(input_dimss, input_shapes, input_chunkss):
for dim, size, chunksize in zip(dims, shape, chunksizes):
dimsizes = dimsizess.get(dim, [])
dimsizes.append(size)
dimsizess[dim] = dimsizes
chunksizes_ = chunksizess.get(dim, [])
chunksizes_.append(chunksize)
chunksizess[dim] = chunksizes_
# Assert correct partitioning, for case:
for dim, sizes in dimsizess.items():
# Check that the arrays have same length for same dimensions or dimension `1`
if set(sizes) | {1} != {1, max(sizes)}:
raise ValueError(f"Dimension `'{dim}'` with different lengths in arrays")
if not allow_rechunk:
chunksizes = chunksizess[dim]
# Check if core dimensions consist of only one chunk
if (dim in core_shapes) and (chunksizes[0][0] < core_shapes[dim]):
raise ValueError(
"Core dimension `'{}'` consists of multiple chunks. To fix, rechunk into a single \
chunk along this dimension or set `allow_rechunk=True`, but beware that this may increase memory usage \
significantly.".format(
dim
)
)
# Check if loop dimensions consist of same chunksizes, when they have sizes > 1
relevant_chunksizes = list(
unique(c for s, c in zip(sizes, chunksizes) if s > 1)
)
if len(relevant_chunksizes) > 1:
raise ValueError(
f"Dimension `'{dim}'` with different chunksize present"
)

# Apply function - use blockwise here
arginds = list(concat(zip(args, input_dimss)))

from cubed.core.ops import blockwise

# Note (cubed): use blockwise on all output dimensions, not just loop_output_dims like in original
out_ind = loop_output_dims + output_coredimss

return blockwise(func, out_ind, *arginds, dtype=output_dtypes, **kwargs)
82 changes: 82 additions & 0 deletions cubed/tests/test_gufunc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import numpy as np
import pytest
from numpy.testing import assert_allclose, assert_equal

import cubed
import cubed.array_api as xp
from cubed import apply_gufunc


@pytest.fixture()
def spec(tmp_path):
return cubed.Spec(tmp_path, max_mem=1000000)


@pytest.mark.parametrize("vectorize", [False, True])
def test_apply_reduction(spec, vectorize):
def stats(x):
return np.mean(x, axis=-1)

r = np.random.normal(size=(10, 20, 30))
a = cubed.from_array(r, chunks=(5, 5, 30), spec=spec)
actual = apply_gufunc(stats, "(i)->()", a, output_dtypes="f", vectorize=vectorize)
expected = stats(r)

assert actual.compute().shape == expected.shape
assert_allclose(actual.compute(), expected)


def test_apply_gufunc_elemwise_01(spec):
def add(x, y):
return x + y

a = cubed.from_array(np.array([1, 2, 3]), chunks=2, spec=spec)
b = cubed.from_array(np.array([1, 2, 3]), chunks=2, spec=spec)
z = apply_gufunc(add, "(),()->()", a, b, output_dtypes=a.dtype)
assert_equal(z, np.array([2, 4, 6]))


def test_apply_gufunc_elemwise_loop(spec):
def foo(x):
assert x.shape in ((2,), (1,))
return 2 * x

a = cubed.from_array(np.array([1, 2, 3]), chunks=2, spec=spec)
z = apply_gufunc(foo, "()->()", a, output_dtypes=int)
assert z.chunks == ((2, 1),)
assert_equal(z, np.array([2, 4, 6]))


def test_apply_gufunc_elemwise_core(spec):
def foo(x):
assert x.shape == (3,)
return 2 * x

a = cubed.from_array(np.array([1, 2, 3]), chunks=3, spec=spec)
z = apply_gufunc(foo, "(i)->(i)", a, output_dtypes=int)
assert z.chunks == ((3,),)
assert_equal(z, np.array([2, 4, 6]))


def test_gufunc_two_inputs(spec):
def foo(x, y):
return np.einsum("...ij,...jk->ik", x, y)

a = xp.ones((2, 3), chunks=100, dtype=int, spec=spec)
b = xp.ones((3, 4), chunks=100, dtype=int, spec=spec)
x = apply_gufunc(foo, "(i,j),(j,k)->(i,k)", a, b, output_dtypes=int)
assert_equal(x, 3 * np.ones((2, 4), dtype=int))


def test_apply_gufunc_axes_two_kept_coredims(spec):
ra = np.random.normal(size=(20, 30))
rb = np.random.normal(size=(10, 1, 40))

a = cubed.from_array(ra, chunks=(10, 30), spec=spec)
b = cubed.from_array(rb, chunks=(5, 1, 40), spec=spec)

def outer_product(x, y):
return np.einsum("i,j->ij", x, y)

c = apply_gufunc(outer_product, "(i),(j)->(i,j)", a, b, vectorize=True)
assert c.compute().shape == (10, 20, 30, 40)
1 change: 1 addition & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Chunk-specific functions
:nosignatures:
:toctree: generated/

apply_gufunc
map_blocks

Random number generation
Expand Down