Skip to content

Commit

Permalink
Implemented distributed regridding (#762)
Browse files Browse the repository at this point in the history
* Implemented distributed regridding

* Add argument to set maximum memory load

* Fixes for memory handling

* Fix for nodata variable

* Added tests for distributed resampling

* Fixed partial offset issues in downsampling operations

* Fixed distributed upsampling

* Apply suggestions from code review

Co-Authored-By: James A. Bednar <jbednar@users.noreply.github.com>
  • Loading branch information
philippjfr and jbednar authored Jul 18, 2019
1 parent 2491eac commit 1b9f300
Show file tree
Hide file tree
Showing 3 changed files with 507 additions and 112 deletions.
68 changes: 48 additions & 20 deletions datashader/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask.array import Array
import dask.array as da
from six import string_types
from xarray import DataArray, Dataset
from collections import OrderedDict
Expand All @@ -14,7 +14,7 @@
from .utils import Dispatcher, ngjit, calc_res, calc_bbox, orient_array, compute_coords
from .utils import get_indices, dshape_from_pandas, dshape_from_dask
from .utils import Expr # noqa (API import)
from .resampling import resample_2d
from .resampling import resample_2d, resample_2d_distributed
from . import reductions as rd


Expand Down Expand Up @@ -657,7 +657,9 @@ def raster(self,
downsample_method=rd.mean(), # Deprecated as of datashader=0.6.4
nan_value=None,
agg=None,
interpolate=None):
interpolate=None,
chunksize=None,
max_mem=None):
"""Sample a raster dataset by canvas size and bounds.
Handles 2D or 3D xarray DataArrays, assuming that the last two
Expand All @@ -670,28 +672,44 @@ def raster(self,
"nodata" attribute of the raster) are replaced with `NaN` if
floats, and 0 if int.
Also supports resampling out-of-core DataArrays backed by dask
Arrays. By default it will try to maintain the same chunksize
in the output array but a custom chunksize may be provided.
If there are memory constraints they may be defined using the
max_mem parameter, which determines how large the chunks in
memory may be.
Parameters
----------
source : xarray.DataArray or xr.Dataset
2D or 3D labelled array (if Dataset, the agg reduction must
define the data variable).
layer : float
For a 3D array, value along the z dimension : optional default=None
interpolate : str, optional default=linear
Resampling mode when upsampling raster.
options include: nearest, linear.
agg : Reduction, optional default=mean()
Resampling mode when downsampling raster.
options include: first, last, mean, mode, var, std, min, max
Also accepts string names, for backwards compatibility.
ds_method : str (optional)
Grid cell aggregation method for a possible downsampling.
us_method : str (optional)
Grid cell interpolation method for a possible upsampling.
nan_value : int or float, optional
Optional nan_value which will be masked out when applying
the resampling.
agg : Reduction, optional default=mean()
Resampling mode when downsampling raster.
options include: first, last, mean, mode, var, std, min, max
Accepts an executable function, function object, or string name.
interpolate : str, optional default=linear
Resampling mode when upsampling raster.
options include: nearest, linear.
chunksize : tuple(int, int) (optional)
Size of the output chunks. By default this the chunk size is
inherited from the *src* array.
max_mem : int (optional)
The maximum number of bytes that should be loaded into memory
during the regridding operation.
Returns
-------
data : xarray.Dataset
"""
# For backwards compatibility
if agg is None: agg=downsample_method
Expand Down Expand Up @@ -786,21 +804,28 @@ def raster(self,
us_method=interpolate, fill_value=fill_value)
if array.ndim == 2:
source_window = array[rmin:rmax+1, cmin:cmax+1]
if isinstance(source_window, Array):
source_window = source_window.compute()
if ds_method in ['var', 'std']:
source_window = source_window.astype('f')
data = resample_2d(source_window, **kwargs)
if isinstance(source_window, da.Array):
data = resample_2d_distributed(
source_window, chunksize=chunksize, max_mem=max_mem,
**kwargs)
else:
data = resample_2d(source_window, **kwargs)
layers = 1
else:
source_window = array[:, rmin:rmax+1, cmin:cmax+1]
if ds_method in ['var', 'std']:
source_window = source_window.astype('f')
arrays = []
for arr in source_window:
if isinstance(arr, Array):
arr = arr.compute()
arrays.append(resample_2d(arr, **kwargs))
if isinstance(arr, da.Array):
arr = resample_2d_distributed(
arr, chunksize=chunksize, max_mem=max_mem,
**kwargs)
else:
arr = resample_2d(arr, **kwargs)
arrays.append(arr)
data = np.dstack(arrays)
layers = len(arrays)

Expand Down Expand Up @@ -830,8 +855,11 @@ def raster(self,
top_pad = np.full(tshape, fill_value, source_window.dtype)
bottom_pad = np.full(bshape, fill_value, source_window.dtype)

data = np.concatenate((top_pad, data, bottom_pad), axis=0)
data = np.concatenate((left_pad, data, right_pad), axis=1)
concat = da.concatenate if isinstance(data, da.Array) else np.concatenate
if top_pad.shape[0] > 0:
data = concat((top_pad, data, bottom_pad), axis=0)
if left_pad.shape[1] > 0:
data = concat((left_pad, data, right_pad), axis=1)

# Reorient array to original orientation
if res[1] > 0: data = data[::-1]
Expand All @@ -850,7 +878,7 @@ def raster(self,
coords = {xdim: xs, ydim: ys}
dims = [ydim, xdim]
attrs = dict(res=res[0])
if source._file_obj is not None:
if source._file_obj is not None and hasattr(source._file_obj, 'nodata'):
attrs['nodata'] = source._file_obj.nodata

# Handle DataArray with layers
Expand Down
Loading

0 comments on commit 1b9f300

Please sign in to comment.