diff --git a/datashader/core.py b/datashader/core.py index b657558e4..13e9d16ea 100644 --- a/datashader/core.py +++ b/datashader/core.py @@ -5,7 +5,7 @@ import numpy as np import pandas as pd import dask.dataframe as dd -from dask.array import Array +import dask.array as da from six import string_types from xarray import DataArray, Dataset from collections import OrderedDict @@ -14,7 +14,7 @@ from .utils import Dispatcher, ngjit, calc_res, calc_bbox, orient_array, compute_coords from .utils import get_indices, dshape_from_pandas, dshape_from_dask from .utils import Expr # noqa (API import) -from .resampling import resample_2d +from .resampling import resample_2d, resample_2d_distributed from . import reductions as rd @@ -657,7 +657,9 @@ def raster(self, downsample_method=rd.mean(), # Deprecated as of datashader=0.6.4 nan_value=None, agg=None, - interpolate=None): + interpolate=None, + chunksize=None, + max_mem=None): """Sample a raster dataset by canvas size and bounds. Handles 2D or 3D xarray DataArrays, assuming that the last two @@ -670,6 +672,13 @@ def raster(self, "nodata" attribute of the raster) are replaced with `NaN` if floats, and 0 if int. + Also supports resampling out-of-core DataArrays backed by dask + Arrays. By default it will try to maintain the same chunksize + in the output array but a custom chunksize may be provided. + If there are memory constraints they may be defined using the + max_mem parameter, which determines how large the chunks in + memory may be. + Parameters ---------- source : xarray.DataArray or xr.Dataset @@ -677,21 +686,30 @@ def raster(self, define the data variable). layer : float For a 3D array, value along the z dimension : optional default=None - interpolate : str, optional default=linear - Resampling mode when upsampling raster. - options include: nearest, linear. - agg : Reduction, optional default=mean() - Resampling mode when downsampling raster. - options include: first, last, mean, mode, var, std, min, max - Also accepts string names, for backwards compatibility. + ds_method : str (optional) + Grid cell aggregation method for a possible downsampling. + us_method : str (optional) + Grid cell interpolation method for a possible upsampling. nan_value : int or float, optional Optional nan_value which will be masked out when applying the resampling. + agg : Reduction, optional default=mean() + Resampling mode when downsampling raster. + options include: first, last, mean, mode, var, std, min, max + Accepts an executable function, function object, or string name. + interpolate : str, optional default=linear + Resampling mode when upsampling raster. + options include: nearest, linear. + chunksize : tuple(int, int) (optional) + Size of the output chunks. By default this the chunk size is + inherited from the *src* array. + max_mem : int (optional) + The maximum number of bytes that should be loaded into memory + during the regridding operation. Returns ------- data : xarray.Dataset - """ # For backwards compatibility if agg is None: agg=downsample_method @@ -786,11 +804,14 @@ def raster(self, us_method=interpolate, fill_value=fill_value) if array.ndim == 2: source_window = array[rmin:rmax+1, cmin:cmax+1] - if isinstance(source_window, Array): - source_window = source_window.compute() if ds_method in ['var', 'std']: source_window = source_window.astype('f') - data = resample_2d(source_window, **kwargs) + if isinstance(source_window, da.Array): + data = resample_2d_distributed( + source_window, chunksize=chunksize, max_mem=max_mem, + **kwargs) + else: + data = resample_2d(source_window, **kwargs) layers = 1 else: source_window = array[:, rmin:rmax+1, cmin:cmax+1] @@ -798,9 +819,13 @@ def raster(self, source_window = source_window.astype('f') arrays = [] for arr in source_window: - if isinstance(arr, Array): - arr = arr.compute() - arrays.append(resample_2d(arr, **kwargs)) + if isinstance(arr, da.Array): + arr = resample_2d_distributed( + arr, chunksize=chunksize, max_mem=max_mem, + **kwargs) + else: + arr = resample_2d(arr, **kwargs) + arrays.append(arr) data = np.dstack(arrays) layers = len(arrays) @@ -830,8 +855,11 @@ def raster(self, top_pad = np.full(tshape, fill_value, source_window.dtype) bottom_pad = np.full(bshape, fill_value, source_window.dtype) - data = np.concatenate((top_pad, data, bottom_pad), axis=0) - data = np.concatenate((left_pad, data, right_pad), axis=1) + concat = da.concatenate if isinstance(data, da.Array) else np.concatenate + if top_pad.shape[0] > 0: + data = concat((top_pad, data, bottom_pad), axis=0) + if left_pad.shape[1] > 0: + data = concat((left_pad, data, right_pad), axis=1) # Reorient array to original orientation if res[1] > 0: data = data[::-1] @@ -850,7 +878,7 @@ def raster(self, coords = {xdim: xs, ydim: ys} dims = [ydim, xdim] attrs = dict(res=res[0]) - if source._file_obj is not None: + if source._file_obj is not None and hasattr(source._file_obj, 'nodata'): attrs['nodata'] = source._file_obj.nodata # Handle DataArray with layers diff --git a/datashader/resampling.py b/datashader/resampling.py index fe23c65a1..9e0baa02a 100644 --- a/datashader/resampling.py +++ b/datashader/resampling.py @@ -26,9 +26,14 @@ from __future__ import absolute_import, division, print_function +from itertools import groupby +from math import floor, ceil + +import dask.array as da import numpy as np import numba as nb +from dask.delayed import delayed from numba import prange from .utils import ngjit @@ -79,30 +84,241 @@ mean=DS_MEAN, var=DS_VAR, std=DS_STD, min=DS_MIN, max=DS_MAX) -def resample_2d(src, w, h, ds_method='mean', us_method='linear', fill_value=None, mode_rank=1, out=None): + +def map_chunks(in_shape, out_shape, out_chunks): + """ + Maps index in source array to target array chunks. + + For each chunk in the target array this function computes the + indexes into the source array that will be fed into the regridding + operation. + + Parameters + ---------- + in_shape: tuple(int, int) + The shape of the input array + out_shape: tuple(int, int) + The shape of the output array + out_chunks: tuple(int, int) + The shape of each chunk in the output array + + Returns + ------- + Dictionary mapping of chunks and their indexes + in the input and output array. + """ + outy, outx = out_shape + cys, cxs = out_chunks + xchunks = list(range(0, outx, cxs)) + [outx] + ychunks = list(range(0, outy, cys)) + [outy] + iny, inx = in_shape + xscale = inx/outx + yscale = iny/outy + mapping = {} + for i in range(len(ychunks)-1): + cumy0, cumy1 = ychunks[i:i+2] + iny0, iny1 = cumy0*yscale, cumy1*yscale + iny0r, iny1r = floor(iny0), ceil(iny1) + y0_off, y1_off = iny0-iny0r, iny1r-iny1 + for j in range(len(xchunks)-1): + cumx0, cumx1 = xchunks[j:j+2] + inx0, inx1 = cumx0*xscale, cumx1*xscale + inx0r, inx1r = floor(inx0), ceil(inx1) + x0_off, x1_off = inx0-inx0r, inx1r-inx1 + mapping[(i, j)] = { + 'out': { + 'x': (cumx0, cumx1), + 'y': (cumy0, cumy1), + 'w': (cumx1-cumx0), + 'h': (cumy1-cumy0), + }, + 'in': { + 'x': (inx0r, inx1r), + 'y': (iny0r, iny1r), + 'xoffset': (x0_off, x1_off), + 'yoffset': (y0_off, y1_off), + } + } + return mapping + + +def compute_chunksize(src, w, h, chunksize=None, max_mem=None): + """ + Attempts to compute a chunksize for the resampling output array + that is as close as possible to the input array chunksize, while + also respecting the maximum memory constraint to avoid loading + to much data into memory at the same time. + + Parameters + ---------- + src : dask.array.Array + The source array to resample + w : int + New grid width + h : int + New grid height + chunksize : tuple(int, int) (optional) + Size of the output chunks. By default the chunk size is + inherited from the *src* array. + max_mem : int (optional) + The maximum number of bytes that should be loaded into memory + during the regridding operation. + + Returns + ------- + chunksize : tuple(int, int) + Size of the output chunks. + """ + start_chunksize = src.chunksize if chunksize is None else chunksize + if max_mem is None: + return start_chunksize + + sh, sw = src.shape + height_fraction = float(sh)/h + width_fraction = float(sw)/w + ch, cw = start_chunksize + dim = True + nbytes = src.dtype.itemsize + while ((ch * height_fraction) * (cw * width_fraction) * nbytes) > max_mem: + if dim: + cw -= 1 + else: + ch -= 1 + dim = not dim + if ch == 0 or cw == 0: + min_mem = height_fraction * width_fraction * nbytes + raise ValueError( + "Given the memory constraints the resampling operation " + "could not find a chunksize that avoids loading too much " + "data into memory. Either relax the memory constraint to " + "a minimum of %d bytes or resample to a larger grid size. " + "Note: A future implementation could handle this condition " + "by declaring temporary arrays." % min_mem) + return ch, cw + + +def resample_2d_distributed(src, w, h, ds_method='mean', us_method='linear', + fill_value=None, mode_rank=1, chunksize=None, + max_mem=None): + """ + A distributed version of 2-d grid resampling which operates on + dask arrays and performs regridding on a chunked array. + + Parameters + ---------- + src : dask.array.Array + The source array to resample + w : int + New grid width + h : int + New grid height + ds_method : str (optional) + Grid cell aggregation method for a possible downsampling + (one of the *DS_* constants). + us_method : str (optional) + Grid cell interpolation method for a possible upsampling + (one of the *US_* constants, optional). + fill_value : scalar (optional) + If None, numpy's default value is used. + mode_rank : scalar (optional) + The rank of the frequency determined by the *ds_method* + ``DS_MODE``. One (the default) means most frequent value, two + means second most frequent value, and so forth. + chunksize : tuple(int, int) (optional) + Size of the output chunks. By default this the chunk size is + inherited from the *src* array. + max_mem : int (optional) + The maximum number of bytes that should be loaded into memory + during the regridding operation. + + Returns + ------- + resampled : dask.array.Array + A resampled version of the *src* array. + """ + temp_chunks = compute_chunksize(src, w, h, chunksize, max_mem) + if chunksize is None: + chunksize = src.chunksize + + chunk_map = map_chunks(src.shape, (h, w), temp_chunks) + out_chunks = {} + for (i, j), chunk in chunk_map.items(): + inds = chunk['in'] + inx0, inx1 = inds['x'] + iny0, iny1 = inds['y'] + out = chunk['out'] + chunk_array = src[iny0:iny1, inx0:inx1] + resampled = _resample_2d_delayed( + chunk_array, out['w'], out['h'], ds_method, us_method, + fill_value, mode_rank, inds['xoffset'], inds['yoffset']) + out_chunks[(i, j)] = { + 'array': resampled, + 'shape': (out['h'], out['w']), + 'dtype': src.dtype, + 'in': chunk['in'], + 'out': out + } + + rows = groupby(out_chunks.items(), lambda x: x[0][0]) + cols = [] + for i, row in rows: + row = da.concatenate([ + da.from_delayed(chunk['array'], chunk['shape'], chunk['dtype']) + for _, chunk in row], 1) + cols.append(row) + out = da.concatenate(cols, 0) + + # Ensure chunksize conforms to specified chunksize + if chunksize is not None and out.chunksize != chunksize: + out = out.rechunk(chunksize) + return out + + +def resample_2d(src, w, h, ds_method='mean', us_method='linear', + fill_value=None, mode_rank=1, x_offset=(0, 0), + y_offset=(0, 0), out=None): """ Resample a 2-D grid to a new resolution. - :param src: 2-D *ndarray* - :param w: *int* + Parameters + ---------- + src : np.ndarray + The source array to resample + w : int New grid width - :param h: *int* + h : int New grid height - :param ds_method: one of the *DS_* constants, optional + ds_method : str (optional) Grid cell aggregation method for a possible downsampling - :param us_method: one of the *US_* constants, optional + (one of the *DS_* constants). + us_method : str (optional) Grid cell interpolation method for a possible upsampling - :param fill_value: *scalar*, optional + (one of the *US_* constants, optional). + fill_value : scalar (optional) If ``None``, it is taken from **src** if it is a masked array, otherwise from *out* if it is a masked array, otherwise numpy's default value is used. - :param mode_rank: *scalar*, optional - The rank of the frequency determined by the *ds_method* ``DS_MODE``. One (the default) means - most frequent value, zwo means second most frequent value, and so forth. - :param out: 2-D *ndarray*, optional - Alternate output array in which to place the result. The default is *None*; if provided, it must have the same - shape as the expected output. - :return: A resampled version of the *src* array. + mode_rank : scalar (optional) + The rank of the frequency determined by the *ds_method* + ``DS_MODE``. One (the default) means most frequent value, zwo + means second most frequent value, and so forth. + x_offset : tuple(float, float) (optional) + Offsets for the x-axis indices in the source array (useful + for distributed regridding where chunks are not aligned with + the underlying array). + y_offset : tuple(float, float) (optional) + Offsets for the x-axis indices in the source array (useful + for distributed regridding where chunks are not aligned with + the underlying array). + out : numpy.ndarray (optional) + Alternate output array in which to place the result. The + default is *None*; if provided, it must have the same shape as + the expected output. + + Returns + ------- + resampled : numpy.ndarray or dask.array.Array + A resampled version of the *src* array. """ out = _get_out(out, src, (h, w)) if out is None: @@ -112,30 +328,38 @@ def resample_2d(src, w, h, ds_method='mean', us_method='linear', fill_value=None us_method=upsample_methods[us_method] ds_method=downsample_methods[ds_method] - - return _mask_or_not(_resample_2d(src, mask, use_mask, ds_method, us_method, fill_value, mode_rank, out), - src, fill_value) + + resampled = _resample_2d(src, mask, use_mask, ds_method, us_method, + fill_value, mode_rank, x_offset, y_offset, out) + return _mask_or_not(resampled, src, fill_value) + + +_resample_2d_delayed = delayed(resample_2d) def upsample_2d(src, w, h, method=US_LINEAR, fill_value=None, out=None): """ Upsample a 2-D grid to a higher resolution by interpolating original grid cells. - :param src: 2-D *ndarray* - :param w: *int* + src: 2-D *ndarray* + w: *int* Grid width, which must be greater than or equal to *src.shape[-1]* - :param h: *int* + h: *int* Grid height, which must be greater than or equal to *src.shape[-2]* - :param method: one of the *US_* constants, optional + method: one of the *US_* constants, optional Grid cell interpolation method - :param fill_value: *scalar*, optional + fill_value: *scalar*, optional If ``None``, it is taken from **src** if it is a masked array, otherwise from *out* if it is a masked array, otherwise numpy's default value is used. - :param out: 2-D *ndarray*, optional + out: 2-D *ndarray*, optional Alternate output array in which to place the result. The default is *None*; if provided, it must have the same shape as the expected output. - :return: An upsampled version of the *src* array. + + Returns + ------- + upsampled : numpy.ndarray or dask.array.Array + An upsampled version of the *src* array. """ out = _get_out(out, src, (h, w)) if out is None: @@ -147,31 +371,43 @@ def upsample_2d(src, w, h, method=US_LINEAR, fill_value=None, out=None): raise ValueError('invalid upsampling method') upsampling_method = UPSAMPLING_METHODS[method] - return _mask_or_not(upsampling_method(src, mask, use_mask, fill_value, out), src, fill_value) + upsampled = upsampling_method( + src, mask, use_mask, fill_value, (0, 0), (0, 0), out) + return _mask_or_not(upsampled, src, fill_value) def downsample_2d(src, w, h, method=DS_MEAN, fill_value=None, mode_rank=1, out=None): """ Downsample a 2-D grid to a lower resolution by aggregating original grid cells. - :param src: 2-D *ndarray* - :param w: *int* - Grid width, which must be less than or equal to *src.shape[-1]* - :param h: *int* - Grid height, which must be less than or equal to *src.shape[-2]* - :param method: one of the *DS_* constants, optional - Grid cell aggregation method - :param fill_value: *scalar*, optional + Parameters + ---------- + src : numpy.ndarray or dask.array.Array + The source array to resample + w : int + New grid width + h : int + New grid height + ds_method : str (optional) + Grid cell aggregation method for a possible downsampling + (one of the *DS_* constants). + fill_value : scalar (optional) If ``None``, it is taken from **src** if it is a masked array, otherwise from *out* if it is a masked array, otherwise numpy's default value is used. - :param mode_rank: *scalar*, optional - The rank of the frequency determined by the *method* ``DS_MODE``. One (the default) means - most frequent value, zwo means second most frequent value, and so forth. - :param out: 2-D *ndarray*, optional - Alternate output array in which to place the result. The default is *None*; if provided, it must have the same - shape as the expected output. - :return: A downsampled version of the *src* array. + mode_rank : scalar (optional) + The rank of the frequency determined by the *ds_method* + ``DS_MODE``. One (the default) means most frequent value, two + means second most frequent value, and so forth. + out : numpy.ndarray (optional) + Alternate output array in which to place the result. The + default is *None*; if provided, it must have the same shape as + the expected output. + + Returns + ------- + downsampled : numpy.ndarray or dask.array.Array + An downsampled version of the *src* array. """ if method == DS_MODE and mode_rank < 1: raise ValueError('mode_rank must be >= 1') @@ -185,7 +421,10 @@ def downsample_2d(src, w, h, method=DS_MEAN, fill_value=None, mode_rank=1, out=N raise ValueError('invalid downsampling method') downsampling_method = DOWNSAMPLING_METHODS[method] - return _mask_or_not(downsampling_method(src, mask, use_mask, method, fill_value, mode_rank, out), src, fill_value) + downsampled = downsampling_method( + src, mask, use_mask, method, fill_value, mode_rank, (0, 0), + (0, 0), out) + return _mask_or_not(downsampled, src, fill_value) def _get_out(out, src, shape): @@ -240,8 +479,13 @@ def _get_dimensions(src, out): return src_w, src_h, out_w, out_h -def _resample_2d(src, mask, use_mask, ds_method, us_method, fill_value, mode_rank, out): +def _resample_2d(src, mask, use_mask, ds_method, us_method, fill_value, + mode_rank, x_offset, y_offset, out): src_w, src_h, out_w, out_h = _get_dimensions(src, out) + x0_off, x1_off = x_offset + y0_off, y1_off = y_offset + src_wo = (src_w - x0_off - x1_off) + src_ho = (src_h - y0_off - y1_off) if us_method not in UPSAMPLING_METHODS: raise ValueError('invalid upsampling method') @@ -253,32 +497,50 @@ def _resample_2d(src, mask, use_mask, ds_method, us_method, fill_value, mode_ran if src_h == 0 or src_w == 0 or out_h == 0 or out_w == 0: return np.zeros((out_h, out_w), dtype=src.dtype) - elif out_w < src_w and out_h < src_h: - return downsampling_method(src, mask, use_mask, ds_method, fill_value, mode_rank, out) - elif out_w < src_w: - if out_h > src_h: + elif out_w < src_wo and out_h < src_ho: + return downsampling_method(src, mask, use_mask, ds_method, + fill_value, mode_rank, x_offset, + y_offset, out) + elif out_w < src_wo: + if out_h > src_ho: temp = np.zeros((src_h, out_w), dtype=src.dtype) - temp = downsampling_method(src, mask, use_mask, ds_method, fill_value, mode_rank, temp) + temp = downsampling_method(src, mask, use_mask, ds_method, + fill_value, mode_rank, x_offset, + y_offset, temp) # todo - write test & fix: must use mask=np.ma.getmaskarray(temp) here if use_mask==True - return upsampling_method(temp, mask, use_mask, fill_value, out) + return upsampling_method(temp, mask, use_mask, fill_value, + x_offset, y_offset, out) else: - return downsampling_method(src, mask, use_mask, ds_method, fill_value, mode_rank, out) - elif out_h < src_h: - if out_w > src_w: + return downsampling_method(src, mask, use_mask, ds_method, + fill_value, mode_rank, x_offset, + y_offset, out) + elif out_h < src_ho: + if out_w > src_wo: temp = np.zeros((out_h, src_w), dtype=src.dtype) - temp = downsampling_method(src, mask, use_mask, ds_method, fill_value, mode_rank, temp) + temp = downsampling_method(src, mask, use_mask, ds_method, + fill_value, mode_rank, x_offset, + y_offset, temp) # todo - write test & fix: must use mask=np.ma.getmaskarray(temp) here if use_mask==True - return upsampling_method(temp, mask, use_mask, fill_value, out) + return upsampling_method(temp, mask, use_mask, fill_value, + x_offset, y_offset, out) else: - return downsampling_method(src, mask, use_mask, ds_method, fill_value, mode_rank, out) - elif out_w > src_w or out_h > src_h: - return upsampling_method(src, mask, use_mask, fill_value, out) + return downsampling_method(src, mask, use_mask, ds_method, + fill_value, mode_rank, x_offset, + y_offset, out) + elif out_w > src_wo or out_h > src_ho: + return upsampling_method(src, mask, use_mask, fill_value, + x_offset, y_offset, out) return src @ngjit_parallel -def _upsample_2d_nearest(src, mask, use_mask, fill_value, out): +def _upsample_2d_nearest(src, mask, use_mask, fill_value, x_offset, y_offset, out): src_w, src_h, out_w, out_h = _get_dimensions(src, out) + x0_off, x1_off = x_offset + y0_off, y1_off = y_offset + src_w = (src_w - x0_off - x1_off) + src_h = (src_h - y0_off - y1_off) + if src_w == out_w and src_h == out_h: return src @@ -287,10 +549,11 @@ def _upsample_2d_nearest(src, mask, use_mask, fill_value, out): scale_x = src_w / out_w scale_y = src_h / out_h + for out_y in prange(out_h): - src_y = int(scale_y * out_y) + src_y = int((scale_y * out_y) + y0_off) for out_x in range(out_w): - src_x = int(scale_x * out_x) + src_x = int((scale_x * out_x) + x0_off) value = src[src_y, src_x] if np.isfinite(value) and not (use_mask and mask[src_y, src_x]): out[out_y, out_x] = value @@ -300,25 +563,30 @@ def _upsample_2d_nearest(src, mask, use_mask, fill_value, out): @ngjit_parallel -def _upsample_2d_linear(src, mask, use_mask, fill_value, out): +def _upsample_2d_linear(src, mask, use_mask, fill_value, x_offset, y_offset, out): src_w, src_h, out_w, out_h = _get_dimensions(src, out) - if src_w == out_w and src_h == out_h: + x0_off, x1_off = x_offset + y0_off, y1_off = y_offset + src_wo = (src_w - x0_off - x1_off) + src_ho = (src_h - y0_off - y1_off) + + if src_wo == out_w and src_ho == out_h: return src if out_w < src_w or out_h < src_h: raise ValueError("invalid target size") - scale_x = (src_w - 1.0) / ((out_w - 1.0) if out_w > 1 else 1.0) - scale_y = (src_h - 1.0) / ((out_h - 1.0) if out_h > 1 else 1.0) + scale_x = (src_wo - 1.0) / ((out_w - 1.0) if out_w > 1 else 1.0) + scale_y = (src_ho - 1.0) / ((out_h - 1.0) if out_h > 1 else 1.0) for out_y in prange(out_h): - src_yf = scale_y * out_y + src_yf = (scale_y * out_y) + y0_off src_y0 = int(src_yf) wy = src_yf - src_y0 src_y1 = src_y0 + 1 if src_y1 >= src_h: src_y1 = src_y0 for out_x in range(out_w): - src_xf = scale_x * out_x + src_xf = (scale_x * out_x) + x0_off src_x0 = int(src_xf) wx = src_xf - src_x0 src_x1 = src_x0 + 1 @@ -371,7 +639,8 @@ def _upsample_2d_linear(src, mask, use_mask, fill_value, out): @ngjit_parallel -def _downsample_2d_first_last(src, mask, use_mask, method, fill_value, mode_rank, out): +def _downsample_2d_first_last(src, mask, use_mask, method, fill_value, + mode_rank, x_offset, y_offset, out): src_w, src_h, out_w, out_h = _get_dimensions(src, out) if src_w == out_w and src_h == out_h: @@ -380,11 +649,13 @@ def _downsample_2d_first_last(src, mask, use_mask, method, fill_value, mode_rank if out_w > src_w or out_h > src_h: raise ValueError("invalid target size") - scale_x = src_w / out_w - scale_y = src_h / out_h + x0_off, x1_off = x_offset + y0_off, y1_off = y_offset + scale_x = (src_w - x0_off - x1_off) / out_w + scale_y = (src_h - y0_off - y1_off) / out_h for out_y in prange(out_h): - src_yf0 = scale_y * out_y + src_yf0 = (scale_y * out_y) + y0_off src_yf1 = src_yf0 + scale_y src_y0 = int(src_yf0) src_y1 = int(src_yf1) @@ -392,7 +663,7 @@ def _downsample_2d_first_last(src, mask, use_mask, method, fill_value, mode_rank if wy1 < _EPS and src_y1 > src_y0: src_y1 -= 1 for out_x in range(out_w): - src_xf0 = scale_x * out_x + src_xf0 = (scale_x * out_x) + x0_off src_xf1 = src_xf0 + scale_x src_x0 = int(src_xf0) src_x1 = int(src_xf1) @@ -416,7 +687,8 @@ def _downsample_2d_first_last(src, mask, use_mask, method, fill_value, mode_rank @ngjit_parallel -def _downsample_2d_min_max(src, mask, use_mask, method, fill_value, mode_rank, out): +def _downsample_2d_min_max(src, mask, use_mask, method, fill_value, + mode_rank, x_offset, y_offset, out): src_w, src_h, out_w, out_h = _get_dimensions(src, out) if src_w == out_w and src_h == out_h: @@ -425,11 +697,13 @@ def _downsample_2d_min_max(src, mask, use_mask, method, fill_value, mode_rank, o if out_w > src_w or out_h > src_h: raise ValueError("invalid target size") - scale_x = src_w / out_w - scale_y = src_h / out_h + x0_off, x1_off = x_offset + y0_off, y1_off = y_offset + scale_x = (src_w - x0_off - x1_off) / out_w + scale_y = (src_h - y0_off - y1_off) / out_h for out_y in prange(out_h): - src_yf0 = scale_y * out_y + src_yf0 = (scale_y * out_y) + y0_off src_yf1 = src_yf0 + scale_y src_y0 = int(src_yf0) src_y1 = int(src_yf1) @@ -437,7 +711,7 @@ def _downsample_2d_min_max(src, mask, use_mask, method, fill_value, mode_rank, o if wy1 < _EPS and src_y1 > src_y0: src_y1 -= 1 for out_x in range(out_w): - src_xf0 = scale_x * out_x + src_xf0 = (scale_x * out_x) + x0_off src_xf1 = src_xf0 + scale_x src_x0 = int(src_xf0) src_x1 = int(src_xf1) @@ -466,7 +740,8 @@ def _downsample_2d_min_max(src, mask, use_mask, method, fill_value, mode_rank, o @ngjit_parallel -def _downsample_2d_mode(src, mask, use_mask, method, fill_value, mode_rank, out): +def _downsample_2d_mode(src, mask, use_mask, method, fill_value, + mode_rank, x_offset, y_offset, out): src_w, src_h, out_w, out_h = _get_dimensions(src, out) if src_w == out_w and src_h == out_h: @@ -475,8 +750,10 @@ def _downsample_2d_mode(src, mask, use_mask, method, fill_value, mode_rank, out) if out_w > src_w or out_h > src_h: raise ValueError("invalid target size") - scale_x = src_w / out_w - scale_y = src_h / out_h + x0_off, x1_off = x_offset + y0_off, y1_off = y_offset + scale_x = (src_w - x0_off - x1_off) / out_w + scale_y = (src_h - y0_off - y1_off) / out_h max_value_count = int(scale_x + 1) * int(scale_y + 1) if mode_rank >= max_value_count: @@ -486,7 +763,7 @@ def _downsample_2d_mode(src, mask, use_mask, method, fill_value, mode_rank, out) values = np.zeros((max_value_count,), dtype=src.dtype) frequencies = np.zeros((max_value_count,), dtype=np.uint32) - src_yf0 = scale_y * out_y + src_yf0 = (scale_y * out_y) + y0_off src_yf1 = src_yf0 + scale_y src_y0 = int(src_yf0) src_y1 = int(src_yf1) @@ -497,7 +774,7 @@ def _downsample_2d_mode(src, mask, use_mask, method, fill_value, mode_rank, out) if src_y1 > src_y0: src_y1 -= 1 for out_x in range(out_w): - src_xf0 = scale_x * out_x + src_xf0 = (scale_x * out_x) + x0_off src_xf1 = src_xf0 + scale_x src_x0 = int(src_xf0) src_x1 = int(src_xf1) @@ -549,7 +826,8 @@ def _downsample_2d_mode(src, mask, use_mask, method, fill_value, mode_rank, out) @ngjit_parallel -def _downsample_2d_mean(src, mask, use_mask, method, fill_value, mode_rank, out): +def _downsample_2d_mean(src, mask, use_mask, method, fill_value, + mode_rank, x_offset, y_offset, out): src_w, src_h, out_w, out_h = _get_dimensions(src, out) if src_w == out_w and src_h == out_h: @@ -558,14 +836,17 @@ def _downsample_2d_mean(src, mask, use_mask, method, fill_value, mode_rank, out) if out_w > src_w or out_h > src_h: raise ValueError("invalid target size") - scale_x = src_w / out_w - scale_y = src_h / out_h - + x0_off, x1_off = x_offset + y0_off, y1_off = y_offset + scale_x = (src_w - x0_off - x1_off) / out_w + scale_y = (src_h - y0_off - y1_off) / out_h + for out_y in prange(out_h): - src_yf0 = scale_y * out_y - src_yf1 = src_yf0 + scale_y + src_yf0 = (scale_y * out_y) + y0_off + src_yf1 = (src_yf0 + scale_y) src_y0 = int(src_yf0) src_y1 = int(src_yf1) + wy0 = 1.0 - (src_yf0 - src_y0) wy1 = src_yf1 - src_y1 if wy1 < _EPS: @@ -573,7 +854,7 @@ def _downsample_2d_mean(src, mask, use_mask, method, fill_value, mode_rank, out) if src_y1 > src_y0: src_y1 -= 1 for out_x in range(out_w): - src_xf0 = scale_x * out_x + src_xf0 = (scale_x * out_x) + x0_off src_xf1 = src_xf0 + scale_x src_x0 = int(src_xf0) src_x1 = int(src_xf1) @@ -602,7 +883,8 @@ def _downsample_2d_mean(src, mask, use_mask, method, fill_value, mode_rank, out) @ngjit_parallel -def _downsample_2d_std_var(src, mask, use_mask, method, fill_value, mode_rank, out): +def _downsample_2d_std_var(src, mask, use_mask, method, fill_value, + mode_rank, x_offset, y_offset, out): src_w, src_h, out_w, out_h = _get_dimensions(src, out) if src_w == out_w and src_h == out_h: @@ -611,11 +893,13 @@ def _downsample_2d_std_var(src, mask, use_mask, method, fill_value, mode_rank, o if out_w > src_w or out_h > src_h: raise ValueError("invalid target size") - scale_x = src_w / out_w - scale_y = src_h / out_h + x0_off, x1_off = x_offset + y0_off, y1_off = y_offset + scale_x = (src_w - x0_off - x1_off) / out_w + scale_y = (src_h - y0_off - y1_off) / out_h for out_y in prange(out_h): - src_yf0 = scale_y * out_y + src_yf0 = (scale_y * out_y) + y0_off src_yf1 = src_yf0 + scale_y src_y0 = int(src_yf0) src_y1 = int(src_yf1) @@ -626,7 +910,7 @@ def _downsample_2d_std_var(src, mask, use_mask, method, fill_value, mode_rank, o if src_y1 > src_y0: src_y1 -= 1 for out_x in range(out_w): - src_xf0 = scale_x * out_x + src_xf0 = (scale_x * out_x) + x0_off src_xf1 = src_xf0 + scale_x src_x0 = int(src_xf0) src_x1 = int(src_xf1) diff --git a/datashader/tests/test_raster.py b/datashader/tests/test_raster.py index 4072098c1..0f951146c 100644 --- a/datashader/tests/test_raster.py +++ b/datashader/tests/test_raster.py @@ -2,10 +2,14 @@ rasterio = pytest.importorskip("rasterio") from os import path +from itertools import product import datashader as ds import xarray as xr import numpy as np +import dask.array as da + +from datashader.resampling import compute_chunksize BASE_PATH = path.split(__file__)[0] DATA_PATH = path.abspath(path.join(BASE_PATH, 'data')) @@ -358,6 +362,85 @@ def test_raster_single_pixel_range_with_padding(): assert np.allclose(agg.y.values, np.array([-0.40625, -0.21875, -0.03125, 0.15625])) +@pytest.mark.parametrize('in_size, out_size, agg', product(range(5, 8), range(2, 5), ['mean', 'min', 'max', 'first', 'last', 'var', 'std', 'mode'])) +def test_raster_distributed_downsample(in_size, out_size, agg): + """ + Ensure that distributed regrid is equivalent to regular regrid. + """ + cvs = ds.Canvas(plot_height=out_size, plot_width=out_size) + + vs = np.linspace(-1, 1, in_size) + xs, ys = np.meshgrid(vs, vs) + arr = np.sin(xs*ys) + + darr = da.from_array(arr, (2, 2)) + coords = [('y', range(in_size)), ('x', range(in_size))] + xr_darr = xr.DataArray(darr, coords=coords, name='z') + xr_arr = xr.DataArray(arr, coords=coords, name='z') + + agg_arr = cvs.raster(xr_arr, agg=agg) + agg_darr = cvs.raster(xr_darr, agg=agg) + + assert np.allclose(agg_arr.data, agg_darr.data.compute()) + assert np.allclose(agg_arr.x.values, agg_darr.x.values) + assert np.allclose(agg_arr.y.values, agg_darr.y.values) + + +@pytest.mark.parametrize('in_size, out_size', product(range(2, 5), range(7, 9))) +def test_raster_distributed_upsample(in_size, out_size): + """ + Ensure that distributed regrid is equivalent to regular regrid. + """ + cvs = ds.Canvas(plot_height=out_size, plot_width=out_size) + + vs = np.linspace(-1, 1, in_size) + xs, ys = np.meshgrid(vs, vs) + arr = np.sin(xs*ys) + + darr = da.from_array(arr, (2, 2)) + coords = [('y', range(in_size)), ('x', range(in_size))] + xr_darr = xr.DataArray(darr, coords=coords, name='z') + xr_arr = xr.DataArray(arr, coords=coords, name='z') + + agg_arr = cvs.raster(xr_arr, interpolate='nearest') + agg_darr = cvs.raster(xr_darr, interpolate='nearest') + + assert np.allclose(agg_arr.data, agg_darr.data.compute()) + assert np.allclose(agg_arr.x.values, agg_darr.x.values) + assert np.allclose(agg_arr.y.values, agg_darr.y.values) + + +def test_raster_distributed_regrid_chunksize(): + """ + Ensure that distributed regrid respects explicit chunk size. + """ + cvs = ds.Canvas(plot_height=2, plot_width=2) + + size = 4 + vs = np.linspace(-1, 1, size) + xs, ys = np.meshgrid(vs, vs) + arr = np.sin(xs*ys) + + darr = da.from_array(arr, (2, 2)) + xr_darr = xr.DataArray(darr, coords=[('y', range(size)), ('x', range(size))], name='z') + + agg_darr = cvs.raster(xr_darr, chunksize=(1, 1)) + + assert agg_darr.data.chunksize == (1, 1) + + +def test_resample_compute_chunksize(): + """ + Ensure chunksize computation is correct. + """ + darr = da.from_array(np.zeros((100, 100)), (10, 10)) + + mem_limited_chunksize = compute_chunksize(darr, 10, 10, max_mem=2000) + assert mem_limited_chunksize == (2, 1) + + explicit_chunksize = compute_chunksize(darr, 10, 10, chunksize=(5, 4)) + assert explicit_chunksize == (5, 4) + def test_resample_methods(): """Assert that an error is raised when incorrect upsample and/or downsample