From 313728a2945c321a1b7c33a59384971251a7fbe7 Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Tue, 16 Aug 2016 02:45:05 -0700 Subject: [PATCH] More work on signatures, including auto-dask --- xarray/core/computation.py | 315 ++++++++++++++++++++++++++----------- 1 file changed, 227 insertions(+), 88 deletions(-) diff --git a/xarray/core/computation.py b/xarray/core/computation.py index 72f66fee1e6..c80cd5a7205 100644 --- a/xarray/core/computation.py +++ b/xarray/core/computation.py @@ -1,6 +1,55 @@ import itertools from collections import namedtuple +from .core.pycompat import dask_array_type + + +class Signature(object): + """Core dimensions signature for a given function + + Based on the signature provided by generalized ufuncs in NumPy. + + Attributes + ---------- + input_core_dims : list of tuples + A list of tuples of dimension names expected on each input variable. + output_core_dims : list of tuples + A list of tuples of dimension names expected on each output variable. + """ + def __init__(self, input_core_dims, output_core_dims=((),)): + if dtypes is None: + dtypes = {} + self.input_core_dims = input_core_dims + self.output_core_dims = output_core_dims + self._all_input_core_dims = None + self._all_output_core_dims = None + + @property + def all_input_core_dims(self): + if self._all_input_core_dims is None: + self._all_input_core_dims = frozenset( + dim for dims in self.input_core_dims for dim in dims) + return self._all_input_core_dims + + @property + def all_output_core_dims(self): + if self._all_output_core_dims is None: + self._all_output_core_dims = frozenset( + dim for dims in self.output_core_dims for dim in dims) + return self._all_output_core_dims + + @classmethod + def from_string(cls, string): + raise NotImplementedError + + @classmethod + def from_ufunc(cls, ufunc): + raise NotImplementedError + + +def _default_signature(n_inputs): + return Signature([()] * n_inputs, [()]) + def result_name(objects): # use the same naming heuristics as pandas: @@ -14,8 +63,36 @@ def result_name(objects): return name -def apply_dataarray(func, args, join='inner', gufunc_signature=None, - kwargs=None, combine_names=None): +def _default_result_attrs(attrs, func, signature): + return [{}] * len(signature.outputs) + + +def _build_output_coords(args, signature, new_coords=None): + + def get_coord_variables(arg): + return getattr(getattr(arg, 'coords', {}), 'variables') + + coord_variables = [get_coord_variables(a) for a in args] + if new_coords is not None: + coord_variables.append(get_coord_variables(new_coords)) + + merged = merge_coords_without_align(coord_variables) + + output = [] + for output_dims in signature.output_core_dims: + dropped_dims = signature.all_input_core_dims - set(output_dims) + coords = OrderedDict((k, v) for k, v in merged.items() + if set(v.dims).isdisjoint(dropped_dims)) + output.append(coords) + + return output + + +def apply_dataarray(args, func, signature=None, join='inner', + kwargs=None, new_coords=None, combine_names=None): + if signature is None: + signature = _default_signature(len(args)) + if kwargs is None: kwargs = {} @@ -24,17 +101,21 @@ def apply_dataarray(func, args, join='inner', gufunc_signature=None, args = deep_align(*args, join=join, copy=False, raise_on_invalid=False) - coord_variables = [getattr(getattr(a, 'coords', {}), 'variables') - for a in args] - coords = merge_coords_without_align(coord_variables) - name = combine_names(args) + list_of_names = combine_names(args) + list_of_coords = _build_output_coords(args, signature, new_coords) data_vars = [getattr(a, 'variable') for a in args] - variables = func(*data_vars, **kwargs) - - # TODO handle gufunc_signature + variable_or_variables = func(*data_vars, **kwargs) - return DataArray(variable, coords, name=name, fastpath=True) + if len(signature.output_dims) > 1: + return tuple(DataArray(variable, coords, name=name, fastpath=True) + for variable, coords, name in zip( + variable_or_variables, list_of_coords, list_of_names)) + else: + variable = variable_or_variables + coords, = list_of_coords + name, = list_of_names + return DataArray(variable, coords, name=name, fastpath=True) def join_dict_keys(objects, how='inner') @@ -56,22 +137,24 @@ def collect_dict_values(objects, keys, fill_value=None) return result_values -def apply_dataset(func, args, join='inner', fill_value=None, kwargs=None, - combine_attrs=None): +def apply_dataset(args, func, signature=None, join='inner', fill_value=None, + kwargs=None, new_coords=None, result_attrs=None): if kwargs is None: kwargs = {} - if combine_attrs is None: - combine_attrs = lambda func, attrs: None + if signature is None: + signature = _default_signature(len(args)) + + if result_attrs is None: + result_attrs = _default_result_attrs - attrs = combine_attrs(getattr(func, 'func', func), - [getattr(a, 'attrs') for a in args]) + list_of_attrs = result_attrs([getattr(a, 'attrs', {}) for a in args] + getattr(func, 'func', func), + signature) args = deep_align(*args, join=join, copy=False, raise_on_invalid=False) - coord_variables = [getattr(getattr(a, 'coords', {}), 'variables') - for a in args] - coords = merge_coords_without_align(coord_variables) + list_of_coords = _build_output_coords(args, signature, new_coords) list_of_data_vars = [getattr(a, 'data_vars', {}) for a in args] names = join_dict_keys(list_of_data_vars, how=join) @@ -81,10 +164,34 @@ def apply_dataset(func, args, join='inner', fill_value=None, kwargs=None, result_vars = OrderedDict() for name, variable_args in zip(names, lists_of_args): - result[name] = func(*variable_args, **kwargs) - result_vars.update(coords) - - return Dataset._from_vars_and_coord_names(result_vars, coords, attrs) + result_vars[name] = func(*variable_args, **kwargs) + + def make_dataset(data_vars, coord_vars, attrs): + # Normally, we would copy data_vars to be safe, but we created the + # OrderedDict in this function and don't use it for anything else. + variables = data_vars + variables.update(coord_vars) + coord_names = set(coord_vars) + return Dataset._from_vars_and_coord_names( + variables, coord_names, attrs) + + n_outputs = len(signature.output_dims) + if n_outputs > 1: + # we need to unpack result_vars from Dict[object, Tuple[Variable]] -> + # Tuple[Dict[object, Variable]]. + list_of_result_vars = [OrderedDict() for _ in n_outputs] + for name, values in result_vars.items(): + for value, results_dict in zip(values, list_of_result_vars): + list_of_result_vars[name] = value + + return tuple(make_dataset(data_vars, coord_vars, attrs) + for data_vars, coord_vars, attrs in zip( + list_of_result_vars, list_of_coords, list_of_attrs)) + else: + data_vars = result_vars + coords_vars, = list_of_coords + attrs, = list_of_attrs + return make_dataset(data_vars, coord_vars, attrs) def _calculate_unified_dim_sizes(variables): @@ -110,49 +217,9 @@ def _calculate_unified_dim_sizes(variables): return dim_sizes +def _broadcast_variable_data_to(variable, broadcast_dims): -def _as_sequence(arg, cls): - if is_scalar(arg): - return cls([arg]) - else: - return cls(arg) - - -_ElemwiseSignature = namedtuple( - '_ElemwiseSignature', 'broadcast_dims, output_dims') - -class GUFuncSignature(object): - def __init__(self, inputs, outputs): - self.inputs = inputs - self.outputs = outputs - - @classmethod - def from_string(cls, string): - raise NotImplementedError - - -def _build_and_check_signature(variables, gufunc_signature): - # core_dims are not broadcast over, and moved to the right with order - # preserved. - - dim_sizes = _calculate_unified_dim_sizes(variables) - - if gufunc_signature is None: - # broadcast everything, one output - dims = tuple(size_dims) - return _ElemwiseSignature(dims, [dims]) - - core_dims = set(itertools.chain.from_iterable( - itertools.chain(gufunc_signature.inputs, gufunc_signature.outputs))) - broadcast_dims = tuple(d for d in dim_sizes if d not in core_dims) - output_dims = [broadcast_dims + out for out in gufunc_signature.outputs] - return _ElemwiseSignature(broadcast_dims, output_dims) - - -def _broadcast_variable_data_to(variable, broadcast_dims, allow_dask=True): - - data_attr = 'data' if allow_dask else 'values' - data = getattr(variable, data_attr) + data = variable.data old_dims = variable.dims if broadcast_dims == old_dims: @@ -174,58 +241,130 @@ def _broadcast_variable_data_to(variable, broadcast_dims, allow_dask=True): return data -def apply_variable_ufunc(func, args, allow_dask=True, gufunc_signature=None, - combine_attrs=None, kwargs=None): +def _deep_unpack_list(arg): + if isinstance(arg, list): + arg, = arg + return _deep_unpack_list(arg) + return arg + +def _apply_with_dask_atop(func, list_of_input_data, signature, kwargs, dtype): + import toolz # required dependency of dask.array + + if len(signature.output_core_dims) > 1: + raise ValueError('cannot create use dask.array.atop for ' + 'multiple outputs') + if signature.all_output_core_dims - signature.all_input_core_dims: + raise ValueError('cannot create new dimensions in dask.array.atop') + + input_dims = [broadcast_dims + inp for inp in signature.input_core_dims] + dropped = signature.all_input_core_dims - signature.all_output_core_dims + for data, dims in zip(list_of_input_data, input_dims): + if isinstance(data, dask_array_type): + for dropped_dim in dropped: + if (dropped_dim in dims and + len(data.chunks[dims.index(dropped_dim)]) != 1): + raise ValueError('dimension %r dropped in the output does not ' + 'consist of exactly one chunk on all arrays ' + 'in the inputs' % dropped_dim) + + out_ind, = output_dims + atop_args = [ai for a in (list_of_input_data, input_dims) for ai in a] + func2 = toolz.functools.compose(func, _deep_unpack_list) + result_data = da.atop(func2, out_ind, *atop_args, dtype=dtype, **kwargs) + + +def apply_variable_ufunc(args, func, signature=None, dask_array='forbidden', + combine_attrs=None, kwargs=None, dtype=None): + + if signature is None: + signature = _default_signature(len(args)) + if dask_array not in {'forbidden', 'allowed', 'auto'}: + raise ValueError('unknown setting for dask array handling') if kwargs is None: kwargs = {} - if combine_attrs is None: combine_attrs = lambda func, attrs: None + if result_attrs is None: + result_attrs = _default_result_attrs - sig = _build_and_check_signature(variables, gufunc_signature) + dim_sizes = _calculate_unified_dim_sizes(variables) + core_dims = signature.input_core_dims | signature.output_core_dims + broadcast_dims = tuple(d for d in dim_sizes if d not in core_dims) + output_dims = [broadcast_dims + out for out in signature.output_core_dims] - n_out = len(sig.output_dims) - input_attrs = [getattr(a, 'attrs', {}) for a in args] - result_attrs = [combine_attrs(input_attrs, func, n) for n in range(n_out)] + list_of_attrs = result_attrs([getattr(a, 'attrs', {}) for a in args] + getattr(func, 'func', func), + signature) - list_of_data = [] + list_of_input_data = [] for arg in args: if isinstance(arg, Variable): - data = _broadcast_variable_data_to(arg, sig.broadcast_dims, - allow_dask=allow_dask) + data = _broadcast_variable_data_to(arg, broadcast_dims) else: data = arg - list_of_data.append(data) + list_of_input_data.append(data) - result_data = func(*list_of_data, **kwargs) + contains_dask = any(isinstance(d, dask_array_type) + for d in list_of_input_data) + + if dask_array == 'forbidden' and contains_dask: + raise ValueError('encountered dask array') + elif dask_array == 'auto' and contains_dask: + result_data = _apply_with_dask_atop(func, list_of_input_data, signature, + kwargs, dtype) + else: + result_data = func(*list_of_input_data, **kwargs) - if n_out > 1: + if len(output_dims) > 1: output = [] for dims, data, attrs in zip( - sig.output_dims, result_data, result_attrs): + output_dims, result_data, list_of_attrs): output.append(Variable(dims, data, attrs)) return tuple(output) else: - dims, = sig.output_dims - data, = result_data - attrs = result_attrs + dims, = output_dims + data = result_data + attrs, = list_of_attrs return Variable(dims, data, attrs) -def apply_ufunc(func, args, join='inner', allow_dask=True, kwargs=None, - combine_dataset_attrs=None, combine_variable_attrs=None): +def apply_ufunc(args, func=None, signature=None, join='inner', + dask_array='forbidden', kwargs=None, combine_dataset_attrs=None, + combine_variable_attrs=None, dtype=None): + + if signature is None: + signature = _default_signature(len(args)) variables_ufunc = functools.partial( - apply_variable_ufunc, func=func, allow_dask=allow_dask, + apply_variable_ufunc, func=func, dask_array=dask_array, combine_attrs=combine_variable_attrs, kwargs=kwargs) if any(is_dict_like(a) for a in args): - return apply_dataset(variables_ufunc, args, join=join, + return apply_dataset(args, variables_ufunc, join=join, combine_attrs=combine_dataset_attrs) elif any(isinstance(a, DataArray) for a in args): - return apply_dataarray(variables_ufunc, args, join=join) + return apply_dataarray(args, variables_ufunc, join=join) elif any(isinstance(a, Variable) for a in args): - return variables_ufunc(args=args) + return variables_ufunc(args) + elif dask_array == 'auto' and any( + isinstance(arg, dask_array_type) for arg in args): + import dask.array as da + if signature.all_input_core_dims or signature.all_output_core_dims: + raise ValueError("cannot use dask_array='auto' on unlabeled dask " + 'arrays with a function signature that uses core ' + 'dimensions') + return da.elemwise(func, *args, dtype=dtype) else: return func(args) + + +# def mean(xarray_object, dim=None): +# if dim is None: +# signature = Signature([(dim,)]) +# kwargs = {'axis': -1} +# else: +# signature = Signature([xarray_object.dims]) +# kwargs = {} +# return apply_ufunc([xarray_object], ops.mean, signature, +# dask_array='allowed', kwargs=kwargs)