diff --git a/datashader/compiler.py b/datashader/compiler.py index 1fafac814..cef428c9d 100644 --- a/datashader/compiler.py +++ b/datashader/compiler.py @@ -15,7 +15,7 @@ @memoize -def compile_components(agg, schema, glyph): +def compile_components(agg, schema, glyph, cuda=False): """Given a ``Aggregation`` object and a schema, return 5 sub-functions. Parameters @@ -51,20 +51,20 @@ def compile_components(agg, schema, glyph): reds = list(traverse_aggregation(agg)) # List of base reductions (actually computed) - bases = list(unique(concat(r._build_bases() for r in reds))) + bases = list(unique(concat(r._build_bases(cuda) for r in reds))) dshapes = [b.out_dshape(schema) for b in bases] # List of tuples of (append, base, input columns, temps) - calls = [_get_call_tuples(b, d, schema) for (b, d) in zip(bases, dshapes)] + calls = [_get_call_tuples(b, d, schema, cuda) for (b, d) in zip(bases, dshapes)] # List of unique column names needed cols = list(unique(concat(pluck(2, calls)))) # List of temps needed temps = list(pluck(3, calls)) - create = make_create(bases, dshapes) + create = make_create(bases, dshapes, cuda) info = make_info(cols) append = make_append(bases, cols, calls, glyph) combine = make_combine(bases, dshapes, temps) - finalize = make_finalize(bases, agg, schema) + finalize = make_finalize(bases, agg, schema, cuda) return create, info, append, combine, finalize @@ -79,13 +79,18 @@ def traverse_aggregation(agg): yield agg -def _get_call_tuples(base, dshape, schema): - return base._build_append(dshape, schema), (base,), base.inputs, base._build_temps() +def _get_call_tuples(base, dshape, schema, cuda): + return (base._build_append(dshape, schema, cuda), + (base,), base.inputs, base._build_temps(cuda)) -def make_create(bases, dshapes): +def make_create(bases, dshapes, cuda): creators = [b._build_create(d) for (b, d) in zip(bases, dshapes)] - array_module = np + if cuda: + import cupy + array_module = cupy + else: + array_module = np return lambda shape: tuple(c(shape, array_module) for c in creators) @@ -145,22 +150,22 @@ def combine(base_tuples): return combine -def make_finalize(bases, agg, schema): +def make_finalize(bases, agg, schema, cuda): arg_lk = dict((k, v) for (v, k) in enumerate(bases)) if isinstance(agg, summary): calls = [] for key, val in zip(agg.keys, agg.values): - f = make_finalize(bases, val, schema) + f = make_finalize(bases, val, schema, cuda) try: # Override bases if possible - bases = val._build_bases() + bases = val._build_bases(cuda) except AttributeError: pass inds = [arg_lk[b] for b in bases] calls.append((key, f, inds)) - def finalize(bases, **kwargs): - data = {key: finalizer(get(inds, bases), **kwargs) + def finalize(bases, cuda=False, **kwargs): + data = {key: finalizer(get(inds, bases), cuda, **kwargs) for (key, finalizer, inds) in calls} return xr.Dataset(data) return finalize diff --git a/datashader/core.py b/datashader/core.py index 39ba7988d..319d1d161 100644 --- a/datashader/core.py +++ b/datashader/core.py @@ -19,6 +19,15 @@ from .resampling import resample_2d, resample_2d_distributed from . import reductions as rd +try: + import cudf +except ImportError: + cudf = None + +try: + import dask_cudf +except ImportError: + dask_cudf = None class Axis(object): """Interface for implementing axis transformations. @@ -107,7 +116,7 @@ class LogAxis(Axis): @staticmethod @ngjit def mapper(val): - return log10(val) + return log10(float(val)) @staticmethod @ngjit @@ -993,7 +1002,8 @@ def bypixel(source, canvas, glyph, agg): source = source.drop([col for col in columns if col not in cols_to_keep]) source = source.to_dask_dataframe() - if isinstance(source, pd.DataFrame): + if (isinstance(source, pd.DataFrame) or + (cudf and isinstance(source, cudf.DataFrame))): # Avoid datashape.Categorical instantiation bottleneck # by only retaining the necessary columns: # https://github.com/bokeh/datashader/issues/396 diff --git a/datashader/data_libraries/__init__.py b/datashader/data_libraries/__init__.py index cdb5110f6..c8883a6a2 100644 --- a/datashader/data_libraries/__init__.py +++ b/datashader/data_libraries/__init__.py @@ -5,3 +5,14 @@ from . import dask # noqa (API import) except ImportError: pass + +try: + import cudf as _cudf # noqa (Test cudf installed) + import cupy as _cupy # noqa (Test cupy installed) + from . import cudf # noqa (API import) + + import dask_cudf as _dask_cudf # noqa (Test dask_cudf installed) + from . import dask_cudf # noqa (API import) + +except ImportError: + pass diff --git a/datashader/data_libraries/cudf.py b/datashader/data_libraries/cudf.py new file mode 100644 index 000000000..a9ad8834d --- /dev/null +++ b/datashader/data_libraries/cudf.py @@ -0,0 +1,9 @@ +from __future__ import absolute_import +from datashader.data_libraries.pandas import default +from datashader.core import bypixel +import cudf + + +@bypixel.pipeline.register(cudf.DataFrame) +def cudf_pipeline(df, schema, canvas, glyph, summary): + return default(glyph, df, schema, canvas, summary, cuda=True) diff --git a/datashader/data_libraries/dask.py b/datashader/data_libraries/dask.py index 2750b8bcf..f166c3e19 100644 --- a/datashader/data_libraries/dask.py +++ b/datashader/data_libraries/dask.py @@ -1,7 +1,6 @@ from __future__ import absolute_import, division import dask -import pandas as pd import dask.dataframe as dd from collections import OrderedDict from dask.base import tokenize, compute @@ -16,8 +15,8 @@ @bypixel.pipeline.register(dd.DataFrame) -def dask_pipeline(df, schema, canvas, glyph, summary): - dsk, name = glyph_dispatch(glyph, df, schema, canvas, summary) +def dask_pipeline(df, schema, canvas, glyph, summary, cuda=False): + dsk, name = glyph_dispatch(glyph, df, schema, canvas, summary, cuda=cuda) # Get user configured scheduler (if any), or fall back to default # scheduler for dask DataFrame @@ -61,12 +60,12 @@ def shape_bounds_st_and_axis(df, canvas, glyph): @glyph_dispatch.register(Glyph) -def default(glyph, df, schema, canvas, summary): +def default(glyph, df, schema, canvas, summary, cuda=False): shape, bounds, st, axis = shape_bounds_st_and_axis(df, canvas, glyph) # Compile functions create, info, append, combine, finalize = \ - compile_components(summary, schema, glyph) + compile_components(summary, schema, glyph, cuda=cuda) x_mapper = canvas.x_axis.mapper y_mapper = canvas.y_axis.mapper extend = glyph._build_extend(x_mapper, y_mapper, info, append) @@ -81,17 +80,22 @@ def chunk(df): keys2 = [(name, i) for i in range(len(keys))] dsk = dict((k2, (chunk, k)) for (k2, k) in zip(keys2, keys)) dsk[name] = (apply, finalize, [(combine, keys2)], - dict(coords=axis, dims=[glyph.y_label, glyph.x_label])) + dict(cuda=cuda, coords=axis, dims=[glyph.y_label, glyph.x_label])) return dsk, name @glyph_dispatch.register(LineAxis0) -def line(glyph, df, schema, canvas, summary): +def line(glyph, df, schema, canvas, summary, cuda=False): + if cuda: + from cudf import concat + else: + from pandas import concat + shape, bounds, st, axis = shape_bounds_st_and_axis(df, canvas, glyph) # Compile functions create, info, append, combine, finalize = \ - compile_components(summary, schema, glyph) + compile_components(summary, schema, glyph, cuda=cuda) x_mapper = canvas.x_axis.mapper y_mapper = canvas.y_axis.mapper extend = glyph._build_extend(x_mapper, y_mapper, info, append) @@ -99,7 +103,7 @@ def line(glyph, df, schema, canvas, summary): def chunk(df, df2=None): plot_start = True if df2 is not None: - df = pd.concat([df.iloc[-1:], df2]) + df = concat([df.iloc[-1:], df2]) plot_start = False aggs = create(shape) extend(aggs, df, st, bounds, plot_start=plot_start) @@ -112,5 +116,5 @@ def chunk(df, df2=None): dsk[(name, i)] = (chunk, (old_name, i - 1), (old_name, i)) keys2 = [(name, i) for i in range(df.npartitions)] dsk[name] = (apply, finalize, [(combine, keys2)], - dict(coords=axis, dims=[glyph.y_label, glyph.x_label])) + dict(cuda=cuda, coords=axis, dims=[glyph.y_label, glyph.x_label])) return dsk, name diff --git a/datashader/data_libraries/dask_cudf.py b/datashader/data_libraries/dask_cudf.py new file mode 100644 index 000000000..e91eee7ca --- /dev/null +++ b/datashader/data_libraries/dask_cudf.py @@ -0,0 +1,9 @@ +from __future__ import absolute_import +from datashader.data_libraries.dask import dask_pipeline +from datashader.core import bypixel +import dask_cudf + + +@bypixel.pipeline.register(dask_cudf.DataFrame) +def dask_cudf_pipeline(df, schema, canvas, glyph, summary): + return dask_pipeline(df, schema, canvas, glyph, summary, cuda=True) diff --git a/datashader/data_libraries/pandas.py b/datashader/data_libraries/pandas.py index d4430152b..0f4ba627c 100644 --- a/datashader/data_libraries/pandas.py +++ b/datashader/data_libraries/pandas.py @@ -22,8 +22,8 @@ def pandas_pipeline(df, schema, canvas, glyph, summary): @glyph_dispatch.register(_PointLike) @glyph_dispatch.register(_AreaToLineLike) -def default(glyph, source, schema, canvas, summary): - create, info, append, _, finalize = compile_components(summary, schema, glyph) +def default(glyph, source, schema, canvas, summary, cuda=False): + create, info, append, _, finalize = compile_components(summary, schema, glyph, cuda) x_mapper = canvas.x_axis.mapper y_mapper = canvas.y_axis.mapper extend = glyph._build_extend(x_mapper, y_mapper, info, append) @@ -44,6 +44,7 @@ def default(glyph, source, schema, canvas, summary): extend(bases, source, x_st + y_st, x_range + y_range) return finalize(bases, + cuda=cuda, coords=OrderedDict([(glyph.x_label, x_axis), (glyph.y_label, y_axis)]), dims=[glyph.y_label, glyph.x_label]) diff --git a/datashader/glyphs/area.py b/datashader/glyphs/area.py index ed9610c6b..dca02e88b 100644 --- a/datashader/glyphs/area.py +++ b/datashader/glyphs/area.py @@ -1,12 +1,19 @@ from __future__ import absolute_import, division -from math import isnan import numpy as np from toolz import memoize -from datashader.glyphs.glyph import Glyph +from datashader.glyphs.glyph import Glyph, isnull from datashader.glyphs.line import _build_map_onto_pixel_for_line, _clipt from datashader.glyphs.points import _PointLike from datashader.utils import isreal, ngjit +from numba import cuda + +try: + import cudf + from ..transfer_functions._cuda_utils import cuda_args +except ImportError: + cudf = None + cuda_args = None class _AreaToLineLike(Glyph): @@ -96,7 +103,7 @@ def _build_extend(self, x_mapper, y_mapper, info, append): append, map_onto_pixel, expand_aggs_and_cols ) - extend_cpu = _build_extend_area_to_zero_axis0( + extend_cpu, extend_cuda = _build_extend_area_to_zero_axis0( draw_trapezoid_y, expand_aggs_and_cols ) x_name = self.x @@ -105,14 +112,21 @@ def _build_extend(self, x_mapper, y_mapper, info, append): def extend(aggs, df, vt, bounds, plot_start=True): sx, tx, sy, ty = vt xmin, xmax, ymin, ymax = bounds - - xs = df[x_name].values - ys = df[y_name].values aggs_and_cols = aggs + info(df) - extend_cpu( + + if cudf and isinstance(df, cudf.DataFrame): + xs = self.to_gpu_matrix(df, x_name) + ys = self.to_gpu_matrix(df, y_name) + do_extend = extend_cuda[cuda_args(xs.shape)] + else: + xs = df[x_name].values + ys = df[y_name].values + do_extend = extend_cpu + + do_extend( sx, tx, sy, ty, xmin, xmax, ymin, ymax, - xs, ys, plot_start, *aggs_and_cols + plot_start, xs, ys, *aggs_and_cols ) return extend @@ -172,7 +186,7 @@ def _build_extend(self, x_mapper, y_mapper, info, append): draw_trapezoid_y = _build_draw_trapezoid_y( append, map_onto_pixel, expand_aggs_and_cols ) - extend_cpu = _build_extend_area_to_line_axis0( + extend_cpu, extend_cuda = _build_extend_area_to_line_axis0( draw_trapezoid_y, expand_aggs_and_cols ) x_name = self.x @@ -182,15 +196,22 @@ def _build_extend(self, x_mapper, y_mapper, info, append): def extend(aggs, df, vt, bounds, plot_start=True): sx, tx, sy, ty = vt xmin, xmax, ymin, ymax = bounds - xs = df[x_name].values - ys = df[y_name].values - ys_stacks = df[y_stack_name].values - aggs_and_cols = aggs + info(df) - extend_cpu( - sx, tx, sy, ty, - xmin, xmax, ymin, ymax, - xs, ys, ys_stacks, plot_start, *aggs_and_cols + + if cudf and isinstance(df, cudf.DataFrame): + xs = self.to_gpu_matrix(df, x_name) + ys0 = self.to_gpu_matrix(df, y_name) + ys1 = self.to_gpu_matrix(df, y_stack_name) + do_extend = extend_cuda[cuda_args(xs.shape)] + else: + xs = df[x_name].values + ys0 = df[y_name].values + ys1 = df[y_stack_name].values + do_extend = extend_cpu + + do_extend( + sx, tx, sy, ty, xmin, xmax, ymin, ymax, + plot_start, xs, ys0, ys1, *aggs_and_cols ) return extend @@ -258,7 +279,7 @@ def _build_extend(self, x_mapper, y_mapper, info, append): append, map_onto_pixel, expand_aggs_and_cols ) - extend_cpu = _build_extend_area_to_zero_axis0_multi( + extend_cpu, extend_cuda = _build_extend_area_to_zero_axis0_multi( draw_trapezoid_y, expand_aggs_and_cols ) x_names = self.x @@ -267,14 +288,20 @@ def _build_extend(self, x_mapper, y_mapper, info, append): def extend(aggs, df, vt, bounds, plot_start=True): sx, tx, sy, ty = vt xmin, xmax, ymin, ymax = bounds - xs = df[list(x_names)].values - ys = df[list(y_names)].values - aggs_and_cols = aggs + info(df) - extend_cpu( - sx, tx, sy, ty, - xmin, xmax, ymin, ymax, - xs, ys, plot_start, *aggs_and_cols + + if cudf and isinstance(df, cudf.DataFrame): + xs = self.to_gpu_matrix(df, x_names) + ys = self.to_gpu_matrix(df, y_names) + do_extend = extend_cuda[cuda_args(xs.shape)] + else: + xs = df[list(x_names)].values + ys = df[list(y_names)].values + do_extend = extend_cpu + + do_extend( + sx, tx, sy, ty, xmin, xmax, ymin, ymax, + plot_start, xs, ys, *aggs_and_cols ) return extend @@ -342,7 +369,7 @@ def _build_extend(self, x_mapper, y_mapper, info, append): draw_trapezoid_y = _build_draw_trapezoid_y( append, map_onto_pixel, expand_aggs_and_cols ) - extend_cpu = _build_extend_area_to_line_axis0_multi( + extend_cpu, extend_cuda = _build_extend_area_to_line_axis0_multi( draw_trapezoid_y, expand_aggs_and_cols ) x_names = self.x @@ -352,15 +379,22 @@ def _build_extend(self, x_mapper, y_mapper, info, append): def extend(aggs, df, vt, bounds, plot_start=True): sx, tx, sy, ty = vt xmin, xmax, ymin, ymax = bounds - xs = df[list(x_names)].values - ys = df[list(y_names)].values - y_stacks = df[list(y_stack_names)].values - aggs_and_cols = aggs + info(df) - extend_cpu( - sx, tx, sy, ty, - xmin, xmax, ymin, ymax, - xs, ys, y_stacks, plot_start, *aggs_and_cols + + if cudf and isinstance(df, cudf.DataFrame): + xs = self.to_gpu_matrix(df, x_names) + ys0 = self.to_gpu_matrix(df, y_names) + ys1 = self.to_gpu_matrix(df, y_stack_names) + do_extend = extend_cuda[cuda_args(xs.shape)] + else: + xs = df[list(x_names)].values + ys0 = df[list(y_names)].values + ys1 = df[list(y_stack_names)].values + do_extend = extend_cpu + + do_extend( + sx, tx, sy, ty, xmin, xmax, ymin, ymax, + plot_start, xs, ys0, ys1, *aggs_and_cols ) return extend @@ -443,7 +477,7 @@ def _build_extend(self, x_mapper, y_mapper, info, append): append, map_onto_pixel, expand_aggs_and_cols ) - extend_cpu = _build_extend_area_to_zero_axis1_none_constant( + extend_cpu, extend_cuda = _build_extend_area_to_zero_axis1_none_constant( draw_trapezoid_y, expand_aggs_and_cols ) @@ -455,10 +489,16 @@ def extend(aggs, df, vt, bounds, plot_start=True): xmin, xmax, ymin, ymax = bounds aggs_and_cols = aggs + info(df) - xs = df[list(x_names)].values - ys = df[list(y_names)].values + if cudf and isinstance(df, cudf.DataFrame): + xs = df[list(x_names)].as_gpu_matrix() + ys = df[list(y_names)].as_gpu_matrix() + do_extend = extend_cuda[cuda_args(xs.shape)] + else: + xs = df[list(x_names)].values + ys = df[list(y_names)].values + do_extend = extend_cpu - extend_cpu( + do_extend( sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, *aggs_and_cols ) @@ -544,7 +584,7 @@ def _build_extend(self, x_mapper, y_mapper, info, append): draw_trapezoid_y = _build_draw_trapezoid_y( append, map_onto_pixel, expand_aggs_and_cols ) - extend_cpu = _build_extend_area_to_line_axis1_none_constant( + extend_cpu, extend_cuda = _build_extend_area_to_line_axis1_none_constant( draw_trapezoid_y, expand_aggs_and_cols ) x_names = self.x @@ -556,11 +596,18 @@ def extend(aggs, df, vt, bounds, plot_start=True): xmin, xmax, ymin, ymax = bounds aggs_and_cols = aggs + info(df) - xs = df[list(x_names)].values - ys = df[list(y_names)].values - y_stacks = df[list(y_stack_names)].values + if cudf and isinstance(df, cudf.DataFrame): + xs = df[list(x_names)].as_gpu_matrix() + ys = df[list(y_names)].as_gpu_matrix() + y_stacks = df[list(y_stack_names)].as_gpu_matrix() + do_extend = extend_cuda[cuda_args(xs.shape)] + else: + xs = df[list(x_names)].values + ys = df[list(y_names)].values + y_stacks = df[list(y_stack_names)].values + do_extend = extend_cpu - extend_cpu( + do_extend( sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, y_stacks, *aggs_and_cols ) @@ -606,7 +653,7 @@ def _build_extend(self, x_mapper, y_mapper, info, append): draw_trapezoid_y = _build_draw_trapezoid_y( append, map_onto_pixel, expand_aggs_and_cols ) - extend_cpu = _build_extend_area_to_zero_axis1_x_constant( + extend_cpu, extend_cuda = _build_extend_area_to_zero_axis1_x_constant( draw_trapezoid_y, expand_aggs_and_cols ) @@ -618,9 +665,14 @@ def extend(aggs, df, vt, bounds, plot_start=True): xmin, xmax, ymin, ymax = bounds aggs_and_cols = aggs + info(df) - ys = df[list(y_names)].values + if cudf and isinstance(df, cudf.DataFrame): + ys = df[list(y_names)].as_gpu_matrix() + do_extend = extend_cuda[cuda_args(ys.shape)] + else: + ys = df[list(y_names)].values + do_extend = extend_cpu - extend_cpu( + do_extend( sx, tx, sy, ty, xmin, xmax, ymin, ymax, x_values, ys, *aggs_and_cols @@ -680,7 +732,7 @@ def _build_extend(self, x_mapper, y_mapper, info, append): draw_trapezoid_y = _build_draw_trapezoid_y( append, map_onto_pixel, expand_aggs_and_cols ) - extend_cpu = _build_extend_area_to_line_axis1_x_constant( + extend_cpu, extend_cuda = _build_extend_area_to_line_axis1_x_constant( draw_trapezoid_y, expand_aggs_and_cols ) @@ -693,10 +745,16 @@ def extend(aggs, df, vt, bounds, plot_start=True): xmin, xmax, ymin, ymax = bounds aggs_and_cols = aggs + info(df) - ys = df[list(y_names)].values - y_stacks = df[list(y_stack_names)].values + if cudf and isinstance(df, cudf.DataFrame): + ys = df[list(y_names)].as_gpu_matrix() + y_stacks = df[list(y_stack_names)].as_gpu_matrix() + do_extend = extend_cuda[cuda_args(ys.shape)] + else: + ys = df[list(y_names)].values + y_stacks = df[list(y_stack_names)].values + do_extend = extend_cpu - extend_cpu( + do_extend( sx, tx, sy, ty, xmin, xmax, ymin, ymax, x_values, ys, y_stacks, *aggs_and_cols @@ -743,7 +801,7 @@ def _build_extend(self, x_mapper, y_mapper, info, append): draw_trapezoid_y = _build_draw_trapezoid_y( append, map_onto_pixel, expand_aggs_and_cols ) - extend_cpu = _build_extend_area_to_zero_axis1_y_constant( + extend_cpu, extend_cuda = _build_extend_area_to_zero_axis1_y_constant( draw_trapezoid_y, expand_aggs_and_cols ) @@ -755,9 +813,14 @@ def extend(aggs, df, vt, bounds, plot_start=True): xmin, xmax, ymin, ymax = bounds aggs_and_cols = aggs + info(df) - xs = df[list(x_names)].values + if cudf and isinstance(df, cudf.DataFrame): + xs = df[list(x_names)].as_gpu_matrix() + do_extend = extend_cuda[cuda_args(xs.shape)] + else: + xs = df[list(x_names)].values + do_extend = extend_cpu - extend_cpu( + do_extend( sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, y_values, *aggs_and_cols @@ -804,7 +867,7 @@ def _build_extend(self, x_mapper, y_mapper, info, append): draw_trapezoid_y = _build_draw_trapezoid_y( append, map_onto_pixel, expand_aggs_and_cols ) - extend_cpu = _build_extend_area_to_line_axis1_y_constant( + extend_cpu, extend_cuda = _build_extend_area_to_line_axis1_y_constant( draw_trapezoid_y, expand_aggs_and_cols ) x_names = self.x @@ -816,9 +879,14 @@ def extend(aggs, df, vt, bounds, plot_start=True): xmin, xmax, ymin, ymax = bounds aggs_and_cols = aggs + info(df) - xs = df[list(x_names)].values + if cudf and isinstance(df, cudf.DataFrame): + xs = df[list(x_names)].as_gpu_matrix() + do_extend = extend_cuda[cuda_args(xs.shape)] + else: + xs = df[list(x_names)].values + do_extend = extend_cpu - extend_cpu( + do_extend( sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, y_values, y_stack_values, *aggs_and_cols @@ -1240,12 +1308,12 @@ def _skip_or_clip_trapezoid_y( # If any of the coordinates are NaN, there's a discontinuity. # Skip the entire trapezoid. - if (isnan(x0) or - isnan(x1) or - isnan(y0) or - isnan(y1) or - isnan(y2) or - isnan(y3)): + if (isnull(x0) or + isnull(x1) or + isnull(y0) or + isnull(y1) or + isnull(y2) or + isnull(y3)): skip = True # Check if trapezoid is out of bounds vertically @@ -1298,7 +1366,7 @@ def _build_extend_area_to_zero_axis0( @ngjit @expand_aggs_and_cols def perform_extend(i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, - area_start, xs, ys, *aggs_and_cols): + plot_start, xs, ys, *aggs_and_cols): stacked = False x0 = xs[i] @@ -1307,8 +1375,8 @@ def perform_extend(i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, y1 = 0.0 y2 = 0.0 y3 = ys[i + 1] - trapezoid_start = (area_start if i == 0 else - (isnan(xs[i - 1]) or isnan(ys[i - 1]))) + trapezoid_start = (plot_start if i == 0 else + (isnull(xs[i - 1]) or isnull(ys[i - 1]))) draw_trapezoid_y( i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, @@ -1319,7 +1387,7 @@ def perform_extend(i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, @expand_aggs_and_cols def extend_cpu( sx, tx, sy, ty, xmin, xmax, ymin, ymax, - xs, ys, plot_start, *aggs_and_cols + plot_start, xs, ys, *aggs_and_cols ): """Aggregate filled area along a line formed by ``xs`` and ``ys``, filled to the y=0 line""" @@ -1328,7 +1396,20 @@ def extend_cpu( perform_extend(i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, plot_start, xs, ys, *aggs_and_cols) - return extend_cpu + @cuda.jit + @expand_aggs_and_cols + def extend_cuda( + sx, tx, sy, ty, xmin, xmax, ymin, ymax, + plot_start, xs, ys, *aggs_and_cols + ): + i = cuda.grid(1) + if i < xs.shape[0] - 1: + perform_extend( + i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, + plot_start, xs, ys, *aggs_and_cols + ) + + return extend_cpu, extend_cuda def _build_extend_area_to_line_axis0( @@ -1336,7 +1417,7 @@ def _build_extend_area_to_line_axis0( ): @ngjit @expand_aggs_and_cols - def perform_extend(i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, area_start, + def perform_extend(i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, plot_start, xs, ys0, ys1, *aggs_and_cols): x0 = xs[i] x1 = xs[i + 1] @@ -1344,10 +1425,10 @@ def perform_extend(i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, area_start, y1 = ys1[i] y2 = ys1[i + 1] y3 = ys0[i + 1] - trapezoid_start = (area_start if i == 0 else - (isnan(xs[i - 1]) or - isnan(ys0[i - 1]) or - isnan(ys1[i - 1]))) + trapezoid_start = (plot_start if i == 0 else + (isnull(xs[i - 1]) or + isnull(ys0[i - 1]) or + isnull(ys1[i - 1]))) stacked = True draw_trapezoid_y( i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, @@ -1359,16 +1440,31 @@ def perform_extend(i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, area_start, @expand_aggs_and_cols def extend_cpu( sx, tx, sy, ty, xmin, xmax, ymin, ymax, - xs, ys0, ys1, area_start, *aggs_and_cols + plot_start, xs, ys0, ys1, *aggs_and_cols ): """Aggregate filled area between the line formed by ``xs`` and ``ys0`` and the line formed by ``xs`` and ``ys1``""" nrows = xs.shape[0] for i in range(nrows - 1): - perform_extend(i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, - area_start, xs, ys0, ys1, *aggs_and_cols) + perform_extend( + i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, + plot_start, xs, ys0, ys1, *aggs_and_cols + ) - return extend_cpu + @cuda.jit + @expand_aggs_and_cols + def extend_cuda( + sx, tx, sy, ty, xmin, xmax, ymin, ymax, + plot_start, xs, ys0, ys1, *aggs_and_cols + ): + i = cuda.grid(1) + if i < xs.shape[0] - 1: + perform_extend( + i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, + plot_start, xs, ys0, ys1, *aggs_and_cols + ) + + return extend_cpu, extend_cuda def _build_extend_area_to_zero_axis0_multi( @@ -1377,16 +1473,16 @@ def _build_extend_area_to_zero_axis0_multi( @ngjit @expand_aggs_and_cols def perform_extend(i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, - area_start, xs, ys, *aggs_and_cols): + plot_start, xs, ys, *aggs_and_cols): x0 = xs[i, j] x1 = xs[i + 1, j] y0 = ys[i, j] y1 = 0.0 y2 = 0.0 y3 = ys[i + 1, j] - trapezoid_start = (area_start if i == 0 else - (isnan(xs[i - 1, j]) or - isnan(ys[i - 1, j]))) + trapezoid_start = (plot_start if i == 0 else + (isnull(xs[i - 1, j]) or + isnull(ys[i - 1, j]))) stacked = False draw_trapezoid_y( i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, @@ -1398,7 +1494,7 @@ def perform_extend(i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, @expand_aggs_and_cols def extend_cpu( sx, tx, sy, ty, xmin, xmax, ymin, ymax, - xs, ys, area_start, *aggs_and_cols + plot_start, xs, ys, *aggs_and_cols ): """Aggregate filled area along a line formed by ``xs`` and ``ys``, filled to the y=0 line""" @@ -1407,9 +1503,22 @@ def extend_cpu( for j in range(ncols): for i in range(nrows - 1): perform_extend(i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, - area_start, xs, ys, *aggs_and_cols) + plot_start, xs, ys, *aggs_and_cols) - return extend_cpu + @cuda.jit + @expand_aggs_and_cols + def extend_cuda( + sx, tx, sy, ty, xmin, xmax, ymin, ymax, + plot_start, xs, ys, *aggs_and_cols + ): + i, j = cuda.grid(2) + if i < xs.shape[0] - 1 and j < xs.shape[1]: + perform_extend( + i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, + plot_start, xs, ys, *aggs_and_cols + ) + + return extend_cpu, extend_cuda def _build_extend_area_to_line_axis0_multi( @@ -1418,17 +1527,17 @@ def _build_extend_area_to_line_axis0_multi( @ngjit @expand_aggs_and_cols def perform_extend(i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, - area_start, xs, ys0, ys1, *aggs_and_cols): + plot_start, xs, ys0, ys1, *aggs_and_cols): x0 = xs[i, j] x1 = xs[i + 1, j] y0 = ys0[i, j] y1 = ys1[i, j] y2 = ys1[i + 1, j] y3 = ys0[i + 1, j] - trapezoid_start = (area_start if i == 0 else - (isnan(xs[i - 1, j]) or - isnan(ys0[i - 1, j]) or - isnan(ys1[i - 1, j]))) + trapezoid_start = (plot_start if i == 0 else + (isnull(xs[i - 1, j]) or + isnull(ys0[i - 1, j]) or + isnull(ys1[i - 1, j]))) stacked = True draw_trapezoid_y( i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, @@ -1440,7 +1549,7 @@ def perform_extend(i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, @expand_aggs_and_cols def extend_cpu( sx, tx, sy, ty, xmin, xmax, ymin, ymax, - xs, ys0, ys1, area_start, *aggs_and_cols + plot_start, xs, ys0, ys1, *aggs_and_cols ): """Aggregate filled area along a line formed by ``xs`` and ``ys``, filled to the y=0 line""" @@ -1450,9 +1559,22 @@ def extend_cpu( for j in range(ncols): for i in range(nrows - 1): perform_extend(i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, - area_start, xs, ys0, ys1, *aggs_and_cols) + plot_start, xs, ys0, ys1, *aggs_and_cols) - return extend_cpu + @cuda.jit + @expand_aggs_and_cols + def extend_cuda( + sx, tx, sy, ty, xmin, xmax, ymin, ymax, + plot_start, xs, ys0, ys1, *aggs_and_cols + ): + i, j = cuda.grid(2) + if i < xs.shape[0] - 1 and j < xs.shape[1]: + perform_extend( + i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, + plot_start, xs, ys0, ys1, *aggs_and_cols + ) + + return extend_cpu, extend_cuda def _build_extend_area_to_zero_axis1_none_constant( @@ -1473,8 +1595,8 @@ def perform_extend( y3 = ys[i, j + 1] trapezoid_start = (j == 0 or - isnan(xs[i, j - 1]) or - isnan(ys[i, j - 1])) + isnull(xs[i, j - 1]) or + isnull(ys[i, j - 1])) stacked = False draw_trapezoid_y( i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, @@ -1494,7 +1616,19 @@ def extend_cpu( xs, ys, *aggs_and_cols ) - return extend_cpu + @cuda.jit + @expand_aggs_and_cols + def extend_cuda( + sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, *aggs_and_cols + ): + i, j = cuda.grid(2) + if i < xs.shape[0] and j < xs.shape[1] - 1: + perform_extend( + i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, + xs, ys, *aggs_and_cols + ) + + return extend_cpu, extend_cuda def _build_extend_area_to_line_axis1_none_constant( @@ -1515,9 +1649,9 @@ def perform_extend( y3 = ys0[i, j + 1] trapezoid_start = (j == 0 or - isnan(xs[i, j - 1]) or - isnan(ys0[i, j - 1]) or - isnan(ys1[i, j - 1])) + isnull(xs[i, j - 1]) or + isnull(ys0[i, j - 1]) or + isnull(ys1[i, j - 1])) stacked = True draw_trapezoid_y( i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, @@ -1541,7 +1675,20 @@ def extend_cpu( xs, ys0, ys1, *aggs_and_cols ) - return extend_cpu + @cuda.jit + @expand_aggs_and_cols + def extend_cuda( + sx, tx, sy, ty, xmin, xmax, ymin, ymax, + xs, ys0, ys1, *aggs_and_cols + ): + i, j = cuda.grid(2) + if i < xs.shape[0] and j < xs.shape[1] - 1: + perform_extend( + i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, + xs, ys0, ys1, *aggs_and_cols + ) + + return extend_cpu, extend_cuda def _build_extend_area_to_zero_axis1_x_constant( @@ -1561,8 +1708,8 @@ def perform_extend(i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, y3 = ys[i, j + 1] trapezoid_start = (j == 0 or - isnan(xs[j - 1]) or - isnan(ys[i, j - 1])) + isnull(xs[j - 1]) or + isnull(ys[i, j - 1])) stacked = False draw_trapezoid_y( i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, @@ -1583,7 +1730,19 @@ def extend_cpu( xs, ys, *aggs_and_cols ) - return extend_cpu + @cuda.jit + @expand_aggs_and_cols + def extend_cuda( + sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, *aggs_and_cols + ): + i, j = cuda.grid(2) + if i < ys.shape[0] and j < ys.shape[1] - 1: + perform_extend( + i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, + xs, ys, *aggs_and_cols + ) + + return extend_cpu, extend_cuda def _build_extend_area_to_line_axis1_x_constant( @@ -1604,9 +1763,9 @@ def perform_extend( y3 = ys0[i, j + 1] trapezoid_start = (j == 0 or - isnan(xs[j - 1]) or - isnan(ys0[i, j - 1]) or - isnan(ys1[i, j - 1])) + isnull(xs[j - 1]) or + isnull(ys0[i, j - 1]) or + isnull(ys1[i, j - 1])) stacked = True draw_trapezoid_y( i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, @@ -1627,7 +1786,20 @@ def extend_cpu( xs, ys0, ys1, *aggs_and_cols ) - return extend_cpu + @cuda.jit + @expand_aggs_and_cols + def extend_cuda( + sx, tx, sy, ty, xmin, xmax, ymin, ymax, + xs, ys0, ys1, *aggs_and_cols + ): + i, j = cuda.grid(2) + if i < ys0.shape[0] and j < ys0.shape[1] - 1: + perform_extend( + i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, + xs, ys0, ys1, *aggs_and_cols + ) + + return extend_cpu, extend_cuda def _build_extend_area_to_zero_axis1_y_constant( @@ -1648,8 +1820,8 @@ def perform_extend( y3 = ys[j + 1] trapezoid_start = (j == 0 or - isnan(xs[i, j - 1]) or - isnan(ys[j - 1])) + isnull(xs[i, j - 1]) or + isnull(ys[j - 1])) stacked = False draw_trapezoid_y( i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, @@ -1670,7 +1842,19 @@ def extend_cpu( xs, ys, *aggs_and_cols ) - return extend_cpu + @cuda.jit + @expand_aggs_and_cols + def extend_cuda( + sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, *aggs_and_cols + ): + i, j = cuda.grid(2) + if i < xs.shape[0] and j < xs.shape[1] - 1: + perform_extend( + i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, + xs, ys, *aggs_and_cols + ) + + return extend_cpu, extend_cuda def _build_extend_area_to_line_axis1_y_constant( @@ -1691,9 +1875,9 @@ def perform_extend( y3 = ys0[j + 1] trapezoid_start = (j == 0 or - isnan(xs[i, j - 1]) or - isnan(ys0[j - 1]) or - isnan(ys1[j - 1])) + isnull(xs[i, j - 1]) or + isnull(ys0[j - 1]) or + isnull(ys1[j - 1])) stacked = True draw_trapezoid_y( i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, @@ -1716,7 +1900,20 @@ def extend_cpu( xs, ys0, ys1, *aggs_and_cols ) - return extend_cpu + @cuda.jit + @expand_aggs_and_cols + def extend_cuda( + sx, tx, sy, ty, xmin, xmax, ymin, ymax, + xs, ys0, ys1, *aggs_and_cols + ): + i, j = cuda.grid(2) + if i < xs.shape[0] and j < xs.shape[1] - 1: + perform_extend( + i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, + xs, ys0, ys1, *aggs_and_cols + ) + + return extend_cpu, extend_cuda def _build_extend_area_to_zero_axis1_ragged( @@ -1774,8 +1971,8 @@ def perform_extend_area_to_zero_axis1_ragged( y3 = y_flat[y_start_i + j + 1] trapezoid_start = (j == 0 or - isnan(x_flat[x_start_i + j - 1]) or - isnan(y_flat[y_start_i + j - 1])) + isnull(x_flat[x_start_i + j - 1]) or + isnull(y_flat[y_start_i + j - 1])) stacked = False draw_trapezoid_y( i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, @@ -1792,7 +1989,7 @@ def perform_extend_area_to_zero_axis1_ragged( def _build_extend_area_to_line_axis1_ragged( draw_trapezoid_y, expand_aggs_and_cols ): - def extend_line( + def extend_cpu( sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys0, ys1, *aggs_and_cols ): @@ -1851,9 +2048,9 @@ def perform_extend_area_to_line_axis1_ragged( y3 = y0_flat[y0_start_i + j + 1] trapezoid_start = (j == 0 or - isnan(x_flat[x_start_i + j - 1]) or - isnan(y0_flat[y0_start_i + j - 1]) or - isnan(y1_flat[y1_start_i + j] - 1)) + isnull(x_flat[x_start_i + j - 1]) or + isnull(y0_flat[y0_start_i + j - 1]) or + isnull(y1_flat[y1_start_i + j] - 1)) stacked = True draw_trapezoid_y( i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, @@ -1863,4 +2060,4 @@ def perform_extend_area_to_line_axis1_ragged( j += 1 i += 1 - return extend_line + return extend_cpu diff --git a/datashader/glyphs/glyph.py b/datashader/glyphs/glyph.py index d4cae2e12..efcdd34e4 100644 --- a/datashader/glyphs/glyph.py +++ b/datashader/glyphs/glyph.py @@ -10,6 +10,19 @@ from datashader.utils import Expr, ngjit from datashader.macros import expand_varargs +try: + import cudf +except ImportError: + cudf = None + + +@ngjit +def isnull(val): + """ + Equivalent to isnan for floats, but also numba compatible with integers + """ + return not (val <= 0 or val > 0) + class Glyph(Expr): """Base class for glyphs.""" @@ -39,7 +52,10 @@ def maybe_expand_bounds(bounds): @staticmethod def _compute_bounds(s): - if isinstance(s, pd.Series): + if cudf and isinstance(s, cudf.Series): + s = s.nans_to_nulls() + return (s.min(), s.max()) + elif isinstance(s, pd.Series): return Glyph._compute_bounds_numba(s.values) else: return Glyph._compute_bounds_numba(s) @@ -74,6 +90,15 @@ def _compute_bounds_2d(vals): return minval, maxval + @staticmethod + def to_gpu_matrix(df, columns): + if not isinstance(columns, (list, tuple)): + return df[columns].to_gpu_array() + else: + return cudf.concat([ + df[name].rename(str(i)) for i, name in enumerate(columns) + ], axis=1).as_gpu_matrix() + def expand_aggs_and_cols(self, append): """ Create a decorator that can be used on functions that accept diff --git a/datashader/glyphs/line.py b/datashader/glyphs/line.py index d0401de22..8d6a45c4a 100644 --- a/datashader/glyphs/line.py +++ b/datashader/glyphs/line.py @@ -1,10 +1,18 @@ from __future__ import absolute_import, division -from math import isnan import numpy as np from toolz import memoize +from datashader.glyphs.glyph import isnull from datashader.glyphs.points import _PointLike from datashader.utils import isreal, ngjit +from numba import cuda + +try: + import cudf + from ..transfer_functions._cuda_utils import cuda_args +except ImportError: + cudf = None + cuda_args = None class LineAxis0(_PointLike): @@ -20,7 +28,7 @@ def _build_extend(self, x_mapper, y_mapper, info, append): expand_aggs_and_cols = self.expand_aggs_and_cols(append) map_onto_pixel = _build_map_onto_pixel_for_line(x_mapper, y_mapper) draw_segment = _build_draw_segment(append, map_onto_pixel, expand_aggs_and_cols) - extend_cpu = _build_extend_line_axis0( + extend_cpu, extend_cuda = _build_extend_line_axis0( draw_segment, expand_aggs_and_cols ) x_name = self.x @@ -29,14 +37,20 @@ def _build_extend(self, x_mapper, y_mapper, info, append): def extend(aggs, df, vt, bounds, plot_start=True): sx, tx, sy, ty = vt xmin, xmax, ymin, ymax = bounds - xs = df[x_name].values - ys = df[y_name].values aggs_and_cols = aggs + info(df) + if cudf and isinstance(df, cudf.DataFrame): + xs = self.to_gpu_matrix(df, x_name) + ys = self.to_gpu_matrix(df, y_name) + do_extend = extend_cuda[cuda_args(xs.shape)] + else: + xs = df[x_name].values + ys = df[y_name].values + do_extend = extend_cpu + # line may be clipped, then mapped to pixels - extend_cpu( - sx, tx, sy, ty, - xmin, xmax, ymin, ymax, + do_extend( + sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, plot_start, *aggs_and_cols ) @@ -99,7 +113,7 @@ def _build_extend(self, x_mapper, y_mapper, info, append): draw_segment = _build_draw_segment( append, map_onto_pixel, expand_aggs_and_cols ) - extend_cpu = _build_extend_line_axis0_multi( + extend_cpu, extend_cuda = _build_extend_line_axis0_multi( draw_segment, expand_aggs_and_cols ) x_names = self.x @@ -108,13 +122,19 @@ def _build_extend(self, x_mapper, y_mapper, info, append): def extend(aggs, df, vt, bounds, plot_start=True): sx, tx, sy, ty = vt xmin, xmax, ymin, ymax = bounds + aggs_and_cols = aggs + info(df) - xs = df[list(x_names)].values - ys = df[list(y_names)].values + if cudf and isinstance(df, cudf.DataFrame): + xs = self.to_gpu_matrix(df, x_names) + ys = self.to_gpu_matrix(df, y_names) + do_extend = extend_cuda[cuda_args(xs.shape)] + else: + xs = df[list(x_names)].values + ys = df[list(y_names)].values + do_extend = extend_cpu - aggs_and_cols = aggs + info(df) # line may be clipped, then mapped to pixels - extend_cpu( + do_extend( sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, plot_start, *aggs_and_cols @@ -201,7 +221,7 @@ def _build_extend(self, x_mapper, y_mapper, info, append): draw_segment = _build_draw_segment( append, map_onto_pixel, expand_aggs_and_cols ) - extend_cpu = _build_extend_line_axis1_none_constant( + extend_cpu, extend_cuda = _build_extend_line_axis1_none_constant( draw_segment, expand_aggs_and_cols ) x_names = self.x @@ -212,10 +232,17 @@ def extend(aggs, df, vt, bounds, plot_start=True): xmin, xmax, ymin, ymax = bounds aggs_and_cols = aggs + info(df) - xs = df[list(x_names)].values - ys = df[list(y_names)].values + if cudf and isinstance(df, cudf.DataFrame): + xs = self.to_gpu_matrix(df, x_names) + ys = self.to_gpu_matrix(df, y_names) + do_extend = extend_cuda[cuda_args(xs.shape)] - extend_cpu( + else: + xs = df[list(x_names)].values + ys = df[list(y_names)].values + do_extend = extend_cpu + + do_extend( sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, *aggs_and_cols ) @@ -263,7 +290,7 @@ def _build_extend(self, x_mapper, y_mapper, info, append): append, map_onto_pixel, expand_aggs_and_cols ) - perform_extend_cpu = _build_extend_line_axis1_x_constant( + extend_cpu, extend_cuda = _build_extend_line_axis1_x_constant( draw_segment, expand_aggs_and_cols ) @@ -275,9 +302,14 @@ def extend(aggs, df, vt, bounds, plot_start=True): xmin, xmax, ymin, ymax = bounds aggs_and_cols = aggs + info(df) - ys = df[list(y_names)].values + if cudf and isinstance(df, cudf.DataFrame): + ys = self.to_gpu_matrix(df, y_names) + do_extend = extend_cuda[cuda_args(ys.shape)] + else: + ys = df[list(y_names)].values + do_extend = extend_cpu - perform_extend_cpu( + do_extend( sx, tx, sy, ty, xmin, xmax, ymin, ymax, x_values, ys, *aggs_and_cols @@ -327,7 +359,7 @@ def _build_extend(self, x_mapper, y_mapper, info, append): draw_segment = _build_draw_segment( append, map_onto_pixel, expand_aggs_and_cols ) - perform_extend_cpu = _build_extend_line_axis1_y_constant( + extend_cpu, extend_cuda = _build_extend_line_axis1_y_constant( draw_segment, expand_aggs_and_cols ) @@ -339,9 +371,14 @@ def extend(aggs, df, vt, bounds, plot_start=True): xmin, xmax, ymin, ymax = bounds aggs_and_cols = aggs + info(df) - xs = df[list(x_names)].values + if cudf and isinstance(df, cudf.DataFrame): + xs = self.to_gpu_matrix(df, x_names) + do_extend = extend_cuda[cuda_args(xs.shape)] + else: + xs = df[list(x_names)].values + do_extend = extend_cpu - perform_extend_cpu( + do_extend( sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, y_values, *aggs_and_cols @@ -397,7 +434,7 @@ def _build_extend(self, x_mapper, y_mapper, info, append): append, map_onto_pixel, expand_aggs_and_cols ) - perform_extend_cpu = _build_extend_line_axis1_ragged( + extend_cpu = _build_extend_line_axis1_ragged( draw_segment, expand_aggs_and_cols ) x_name = self.x @@ -412,7 +449,7 @@ def extend(aggs, df, vt, bounds, plot_start=True): aggs_and_cols = aggs + info(df) # line may be clipped, then mapped to pixels - perform_extend_cpu( + extend_cpu( sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, *aggs_and_cols @@ -474,7 +511,7 @@ def draw_segment( # If any of the coordinates are NaN, there's a discontinuity. # Skip the entire segment. - if isnan(x0) or isnan(y0) or isnan(x1) or isnan(y1): + if isnull(x0) or isnull(y0) or isnull(x1) or isnull(y1): skip = True # Use Liang-Barsky (1992) to clip the segment to a bounding box @@ -590,31 +627,40 @@ def _build_extend_line_axis0(draw_segment, expand_aggs_and_cols): @ngjit @expand_aggs_and_cols def perform_extend_line(i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, - line_start, xs, ys, *aggs_and_cols): + plot_start, xs, ys, *aggs_and_cols): x0 = xs[i] y0 = ys[i] x1 = xs[i + 1] y1 = ys[i + 1] - segment_start = (line_start if i == 0 else - (isnan(xs[i - 1]) or isnan(ys[i - 1]))) + segment_start = (plot_start if i == 0 else + (isnull(xs[i - 1]) or isnull(ys[i - 1]))) draw_segment(i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, segment_start, x0, x1, y0, y1, *aggs_and_cols) @ngjit @expand_aggs_and_cols - def extend_cpu( - sx, tx, sy, ty, - xmin, xmax, ymin, ymax, - xs, ys, line_start, *aggs_and_cols + def extend_cpu(sx, tx, sy, ty, xmin, xmax, ymin, ymax, + xs, ys, plot_start, *aggs_and_cols ): """Aggregate along a line formed by ``xs`` and ``ys``""" nrows = xs.shape[0] for i in range(nrows - 1): perform_extend_line(i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, - line_start, xs, ys, *aggs_and_cols) + plot_start, xs, ys, *aggs_and_cols) - return extend_cpu + @cuda.jit + @expand_aggs_and_cols + def extend_cuda(sx, tx, sy, ty, xmin, xmax, ymin, ymax, + xs, ys, plot_start, *aggs_and_cols): + i = cuda.grid(1) + if i < xs.shape[0] - 1: + perform_extend_line( + i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, + plot_start, xs, ys, *aggs_and_cols + ) + + return extend_cpu, extend_cuda def _build_extend_line_axis0_multi(draw_segment, expand_aggs_and_cols): @@ -622,13 +668,13 @@ def _build_extend_line_axis0_multi(draw_segment, expand_aggs_and_cols): @ngjit @expand_aggs_and_cols def perform_extend_line(i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, - line_start, xs, ys, *aggs_and_cols): + plot_start, xs, ys, *aggs_and_cols): x0 = xs[i, j] y0 = ys[i, j] x1 = xs[i + 1, j] y1 = ys[i + 1, j] - segment_start = (line_start if i == 0 else - (isnan(xs[i - 1, j]) or isnan(ys[i - 1, j]))) + segment_start = (plot_start if i == 0 else + (isnull(xs[i - 1, j]) or isnull(ys[i - 1, j]))) draw_segment(i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, segment_start, x0, x1, y0, y1, *aggs_and_cols) @@ -637,16 +683,27 @@ def perform_extend_line(i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, def extend_cpu( sx, tx, sy, ty, xmin, xmax, ymin, ymax, - xs, ys, line_start, *aggs_and_cols): + xs, ys, plot_start, *aggs_and_cols): """Aggregate along a line formed by ``xs`` and ``ys``""" nrows, ncols = xs.shape for j in range(ncols): for i in range(nrows - 1): perform_extend_line(i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, - line_start, xs, ys, *aggs_and_cols) + plot_start, xs, ys, *aggs_and_cols) - return extend_cpu + @cuda.jit + @expand_aggs_and_cols + def extend_cuda(sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, + plot_start, *aggs_and_cols): + i, j = cuda.grid(2) + if i < xs.shape[0] - 1 and j < xs.shape[1]: + perform_extend_line( + i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, + plot_start, xs, ys, *aggs_and_cols + ) + + return extend_cpu, extend_cuda def _build_extend_line_axis1_none_constant(draw_segment, expand_aggs_and_cols): @@ -661,7 +718,7 @@ def perform_extend_line( x1 = xs[i, j + 1] y1 = ys[i, j + 1] segment_start = ( - (j == 0) or isnan(xs[i, j - 1]) or isnan(ys[i, j - 1]) + (j == 0) or isnull(xs[i, j - 1]) or isnull(ys[i, j - 1]) ) draw_segment(i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, @@ -680,7 +737,18 @@ def extend_cpu( xs, ys, *aggs_and_cols ) - return extend_cpu + @cuda.jit + @expand_aggs_and_cols + def extend_cuda(sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, + *aggs_and_cols): + i, j = cuda.grid(2) + if i < xs.shape[0] and j < xs.shape[1] - 1: + perform_extend_line( + i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, + *aggs_and_cols + ) + + return extend_cpu, extend_cuda def _build_extend_line_axis1_x_constant( @@ -697,7 +765,7 @@ def perform_extend_line( y1 = ys[i, j + 1] segment_start = ( - (j == 0) or isnan(xs[j - 1]) or isnan(ys[i, j - 1]) + (j == 0) or isnull(xs[j - 1]) or isnull(ys[i, j - 1]) ) draw_segment(i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, @@ -713,7 +781,18 @@ def extend_cpu(sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, *aggs_and_cols): i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, *aggs_and_cols ) - return extend_cpu + @cuda.jit + @expand_aggs_and_cols + def extend_cuda(sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, + *aggs_and_cols): + i, j = cuda.grid(2) + if i < ys.shape[0] and j < ys.shape[1] - 1: + perform_extend_line( + i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, + *aggs_and_cols + ) + + return extend_cpu, extend_cuda def _build_extend_line_axis1_y_constant( @@ -730,7 +809,7 @@ def perform_extend_line( y1 = ys[j + 1] segment_start = ( - (j == 0) or isnan(xs[i, j - 1]) or isnan(ys[j - 1]) + (j == 0) or isnull(xs[i, j - 1]) or isnull(ys[j - 1]) ) draw_segment(i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, @@ -750,7 +829,20 @@ def extend_cpu( xs, ys, *aggs_and_cols ) - return extend_cpu + @cuda.jit + @expand_aggs_and_cols + def extend_cuda( + sx, tx, sy, ty, + xmin, xmax, ymin, ymax, xs, ys, *aggs_and_cols + ): + i, j = cuda.grid(2) + if i < xs.shape[0] and j < xs.shape[1] - 1: + perform_extend_line( + i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, + xs, ys, *aggs_and_cols + ) + + return extend_cpu, extend_cuda def _build_extend_line_axis1_ragged( @@ -809,8 +901,8 @@ def extend_cpu_numba( segment_start = ( (j == 0) or - isnan(x_flat[x_start_index + j - 1]) or - isnan(y_flat[y_start_index + j] - 1) + isnull(x_flat[x_start_index + j - 1]) or + isnull(y_flat[y_start_index + j] - 1) ) draw_segment(i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, diff --git a/datashader/glyphs/points.py b/datashader/glyphs/points.py index 9d1b36573..ff975737a 100644 --- a/datashader/glyphs/points.py +++ b/datashader/glyphs/points.py @@ -5,6 +5,22 @@ from datashader.glyphs.glyph import Glyph from datashader.utils import isreal, ngjit +from numba import cuda + +try: + import cudf + from ..transfer_functions._cuda_utils import cuda_args +except ImportError: + cudf = None + cuda_args = None + + +def values(s): + if isinstance(s, cudf.Series): + return s.to_gpu_array(fillna=np.nan) + else: + return s.values + class _PointLike(Glyph): """Shared methods between Point and Line""" @@ -100,15 +116,28 @@ def extend_cpu(sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, *aggs_and_cols): for i in range(xs.shape[0]): _perform_extend_points(i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, *aggs_and_cols) + @cuda.jit + @self.expand_aggs_and_cols(append) + def extend_cuda(sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, *aggs_and_cols): + i = cuda.grid(1) + if i < xs.shape[0]: + _perform_extend_points(i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, *aggs_and_cols) + def extend(aggs, df, vt, bounds): aggs_and_cols = aggs + info(df) sx, tx, sy, ty = vt xmin, xmax, ymin, ymax = bounds - xs = df[x_name].values - ys = df[y_name].values + if cudf and isinstance(df, cudf.DataFrame): + xs = df[x_name].to_gpu_array(fillna=np.nan) + ys = df[y_name].to_gpu_array(fillna=np.nan) + do_extend = extend_cuda[cuda_args(xs.shape[0])] + else: + xs = df[x_name].values + ys = df[y_name].values + do_extend = extend_cpu - extend_cpu( + do_extend( sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, *aggs_and_cols ) diff --git a/datashader/reductions.py b/datashader/reductions.py index 037488b04..5c1d7b066 100644 --- a/datashader/reductions.py +++ b/datashader/reductions.py @@ -1,13 +1,19 @@ from __future__ import absolute_import, division, print_function -from math import isnan import numpy as np from datashape import dshape, isnumeric, Record, Option from datashape import coretypes as ct from toolz import concat, unique import xarray as xr -from .utils import Expr, ngjit, isrealfloat +from datashader.glyphs.glyph import isnull +from .utils import Expr, ngjit +from numba import cuda as nb_cuda + +try: + import cudf +except ImportError: + cudf = None class Preprocess(Expr): @@ -23,13 +29,24 @@ def inputs(self): class extract(Preprocess): """Extract a column from a dataframe as a numpy array of values.""" def apply(self, df): - return df[self.column].values + if cudf and isinstance(df, cudf.DataFrame): + import cupy + if df[self.column].dtype.kind == 'f': + nullval = np.nan + else: + nullval = 0 + return cupy.array(df[self.column].to_gpu_array(fillna=nullval)) + else: + return df[self.column].values class category_codes(Preprocess): """Extract just the category codes from a categorical column.""" def apply(self, df): - return df[self.column].cat.codes.values + if cudf and isinstance(df, cudf.DataFrame): + return df[self.column].cat.codes.to_gpu_array() + else: + return df[self.column].cat.codes.values class Reduction(Expr): @@ -50,22 +67,26 @@ def out_dshape(self, in_dshape): def inputs(self): return (extract(self.column),) - def _build_bases(self): + def _build_bases(self, cuda=False): return (self,) - def _build_temps(self): + def _build_temps(self, cuda=False): return () def _build_create(self, dshape): return self._create - def _build_append(self, dshape, schema): - if self.column is None: - return self._append_no_field - elif isrealfloat(schema[self.column]): - return self._append_float_field + def _build_append(self, dshape, schema, cuda=False): + if cuda: + if self.column is None: + return self._append_no_field_cuda + else: + return self._append_cuda else: - return self._append_int_field + if self.column is None: + return self._append_no_field + else: + return self._append def _build_combine(self, dshape): return self._combine @@ -87,7 +108,7 @@ def validate(self, in_dshape): pass @staticmethod - def _finalize(bases, **kwargs): + def _finalize(bases, cuda=False, **kwargs): return xr.DataArray(bases[0], **kwargs) @@ -102,22 +123,31 @@ class count(OptionalFieldReduction): """ _dshape = dshape(ct.int32) + # CPU append functions @staticmethod @ngjit def _append_no_field(x, y, agg): agg[y, x] += 1 - @staticmethod - @ngjit - def _append_int_field(x, y, agg, field): - agg[y, x] += 1 @staticmethod @ngjit - def _append_float_field(x, y, agg, field): - if not isnan(field): + def _append(x, y, agg, field): + if not isnull(field): agg[y, x] += 1 + # GPU append functions + @staticmethod + @nb_cuda.jit(device=True) + def _append_no_field_cuda(x, y, agg): + nb_cuda.atomic.add(agg, (y, x), 1) + + @staticmethod + @nb_cuda.jit(device=True) + def _append_cuda(x, y, agg, field): + if not isnull(field): + nb_cuda.atomic.add(agg, (y, x), 1) + @staticmethod def _create(shape, array_module): return array_module.zeros(shape, dtype='i4') @@ -141,17 +171,14 @@ class any(OptionalFieldReduction): @ngjit def _append_no_field(x, y, agg): agg[y, x] = True + _append_no_field_cuda = _append_no_field @staticmethod @ngjit - def _append_int_field(x, y, agg, field): - agg[y, x] = True - - @staticmethod - @ngjit - def _append_float_field(x, y, agg, field): - if not isnan(field): + def _append(x, y, agg, field): + if not isnull(field): agg[y, x] = True + _append_cuda =_append @staticmethod def _create(shape, array_module): @@ -171,15 +198,13 @@ def _create(shape, array_module): return array_module.full(shape, np.nan, dtype='f8') @staticmethod - def _finalize(bases, **kwargs): + def _finalize(bases, cuda=False, **kwargs): return xr.DataArray(bases[0], **kwargs) class _sum_zero(FloatingReduction): """Sum of all elements in ``column``. - Elements of resulting aggregate are zero if they are not updated. - Parameters ---------- column : str @@ -193,14 +218,15 @@ def _create(shape, array_module): @staticmethod @ngjit - def _append_int_field(x, y, agg, field): - agg[y, x] += field + def _append(x, y, agg, field): + if not isnull(field): + agg[y, x] += field @staticmethod @ngjit - def _append_float_field(x, y, agg, field): - if not isnan(field): - agg[y, x] += field + def _append_cuda(x, y, agg, field): + if not isnull(field): + nb_cuda.atomic.add(agg, (y, x), field) @staticmethod def _combine(aggs): @@ -219,19 +245,29 @@ class sum(FloatingReduction): """ _dshape = dshape(Option(ct.float64)) + # Cuda implementation + def _build_bases(self, cuda=False): + if cuda: + return (_sum_zero(self.column), any(self.column)) + else: + return (self,) + @staticmethod - @ngjit - def _append_int_field(x, y, agg, field): - if np.isnan(agg[y, x]): - agg[y, x] = field + def _finalize(bases, cuda=False, **kwargs): + if cuda: + sums, anys = bases + x = np.where(anys, sums, np.nan) + return xr.DataArray(x, **kwargs) else: - agg[y, x] += field + return xr.DataArray(bases[0], **kwargs) + # Single pass CPU implementation + # These methods will only be called if _build_bases returned (self,) @staticmethod @ngjit - def _append_float_field(x, y, agg, field): - if not np.isnan(field): - if np.isnan(agg[y, x]): + def _append(x, y, agg, field): + if not isnull(field): + if isnull(agg[y, x]): agg[y, x] = field else: agg[y, x] += field @@ -243,7 +279,6 @@ def _combine(aggs): set_to_zero = missing_vals & ~all_empty return np.where(set_to_zero, 0, aggs).sum(axis=0) - class m2(FloatingReduction): """Sum of square differences from the mean of all elements in ``column``. @@ -261,33 +296,26 @@ class m2(FloatingReduction): def _create(shape, array_module): return array_module.full(shape, 0.0, dtype='f8') - def _build_temps(self): + def _build_temps(self, cuda=False): return (_sum_zero(self.column), count(self.column)) - def _build_append(self, dshape, schema): - return super(m2, self)._build_append(dshape, schema) + def _build_append(self, dshape, schema, cuda=False): + if cuda: + raise ValueError("""\ +The 'std' and 'var' reduction operations are not yet supported on the GPU""") + return super(m2, self)._build_append(dshape, schema, cuda) @staticmethod @ngjit - def _append_float_field(x, y, m2, field, sum, count): + def _append(x, y, m2, field, sum, count): # sum & count are the results of sum[y, x], count[y, x] before being # updated by field - if not isnan(field): + if not isnull(field): if count > 0: u1 = np.float64(sum) / count u = np.float64(sum + field) / (count + 1) m2[y, x] += (field - u1) * (field - u) - @staticmethod - @ngjit - def _append_int_field(x, y, m2, field, sum, count): - # sum & count are the results of sum[y, x], count[y, x] before being - # updated by field - if count > 0: - u1 = np.float64(sum) / count - u = np.float64(sum + field) / (count + 1) - m2[y, x] += (field - u1) * (field - u) - @staticmethod def _combine(Ms, sums, ns): with np.errstate(divide='ignore', invalid='ignore'): @@ -306,12 +334,16 @@ class min(FloatingReduction): """ @staticmethod @ngjit - def _append_float_field(x, y, agg, field): - if isnan(agg[y, x]): + def _append(x, y, agg, field): + if isnull(agg[y, x]): agg[y, x] = field elif agg[y, x] > field: agg[y, x] = field - _append_int_field = _append_float_field + + @staticmethod + @ngjit + def _append_cuda(x, y, agg, field): + nb_cuda.atomic.min(agg, (y, x), field) @staticmethod def _combine(aggs): @@ -329,12 +361,16 @@ class max(FloatingReduction): """ @staticmethod @ngjit - def _append_float_field(x, y, agg, field): - if isnan(agg[y, x]): + def _append(x, y, agg, field): + if isnull(agg[y, x]): agg[y, x] = field elif agg[y, x] < field: agg[y, x] = field - _append_int_field = _append_float_field + + @staticmethod + @ngjit + def _append_cuda(x, y, agg, field): + nb_cuda.atomic.max(agg, (y, x), field) @staticmethod def _combine(aggs): @@ -371,9 +407,13 @@ def _build_create(self, out_dshape): @staticmethod @ngjit - def _append_int_field(x, y, agg, field): + def _append(x, y, agg, field): agg[y, x, field] += 1 - _append_float_field = _append_int_field + + @staticmethod + @ngjit + def _append_cuda(x, y, agg, field): + nb_cuda.atomic.add(agg, (y, x, field), 1) @staticmethod def _combine(aggs): @@ -382,7 +422,7 @@ def _combine(aggs): def _build_finalize(self, dshape): cats = list(dshape[self.column].categories) - def finalize(bases, **kwargs): + def finalize(bases, cuda=False, **kwargs): dims = kwargs['dims'] + [self.column] coords = kwargs['coords'] @@ -402,11 +442,11 @@ class mean(Reduction): """ _dshape = dshape(Option(ct.float64)) - def _build_bases(self): + def _build_bases(self, cuda=False): return (_sum_zero(self.column), count(self.column)) @staticmethod - def _finalize(bases, **kwargs): + def _finalize(bases, cuda=False, **kwargs): sums, counts = bases with np.errstate(divide='ignore', invalid='ignore'): x = np.where(counts > 0, sums/counts, np.nan) @@ -424,11 +464,11 @@ class var(Reduction): """ _dshape = dshape(Option(ct.float64)) - def _build_bases(self): + def _build_bases(self, cuda=False): return (_sum_zero(self.column), count(self.column), m2(self.column)) @staticmethod - def _finalize(bases, **kwargs): + def _finalize(bases, cuda=False, **kwargs): sums, counts, m2s = bases with np.errstate(divide='ignore', invalid='ignore'): x = np.where(counts > 0, m2s / counts, np.nan) @@ -446,11 +486,11 @@ class std(Reduction): """ _dshape = dshape(Option(ct.float64)) - def _build_bases(self): + def _build_bases(self, cuda=False): return (_sum_zero(self.column), count(self.column), m2(self.column)) @staticmethod - def _finalize(bases, **kwargs): + def _finalize(bases, cuda=False, **kwargs): sums, counts, m2s = bases with np.errstate(divide='ignore', invalid='ignore'): x = np.where(counts > 0, np.sqrt(m2s / counts), np.nan) diff --git a/datashader/tests/benchmarks/test_extend_line.py b/datashader/tests/benchmarks/test_extend_line.py index 5a4b80a8b..c41a9191b 100644 --- a/datashader/tests/benchmarks/test_extend_line.py +++ b/datashader/tests/benchmarks/test_extend_line.py @@ -19,7 +19,7 @@ def append(i, x, y, agg): map_onto_pixel = _build_map_onto_pixel_for_line(mapper, mapper) expand_aggs_and_cols = Glyph._expand_aggs_and_cols(append, 1) draw_line = _build_draw_segment(append, map_onto_pixel, expand_aggs_and_cols) - return _build_extend_line_axis0(draw_line, expand_aggs_and_cols) + return _build_extend_line_axis0(draw_line, expand_aggs_and_cols)[0] @pytest.mark.parametrize('high', [0, 10**5]) diff --git a/datashader/tests/test_dask.py b/datashader/tests/test_dask.py index 86079c45d..a9c935e33 100644 --- a/datashader/tests/test_dask.py +++ b/datashader/tests/test_dask.py @@ -10,23 +10,52 @@ import pytest +from datashader.tests.test_pandas import ( + assert_eq_xr, assert_eq_ndarray, values +) + config.set(scheduler='synchronous') -df = pd.DataFrame({'x': np.array(([0.] * 10 + [1] * 10)), - 'y': np.array(([0.] * 5 + [1] * 5 + [0] * 5 + [1] * 5)), - 'log_x': np.array(([1.] * 10 + [10] * 10)), - 'log_y': np.array(([1.] * 5 + [10] * 5 + [1] * 5 + [10] * 5)), - 'i32': np.arange(20, dtype='i4'), - 'i64': np.arange(20, dtype='i8'), - 'f32': np.arange(20, dtype='f4'), - 'f64': np.arange(20, dtype='f8'), - 'empty_bin': np.array([0.] * 15 + [np.nan] * 5), - 'cat': ['a']*5 + ['b']*5 + ['c']*5 + ['d']*5}) -df.cat = df.cat.astype('category') -df.f32[2] = np.nan -df.f64[2] = np.nan - -ddf = dd.from_pandas(df, npartitions=3) +df_pd = pd.DataFrame({'x': np.array(([0.] * 10 + [1] * 10)), + 'y': np.array(([0.] * 5 + [1] * 5 + [0] * 5 + [1] * 5)), + 'log_x': np.array(([1.] * 10 + [10] * 10)), + 'log_y': np.array(([1.] * 5 + [10] * 5 + [1] * 5 + [10] * 5)), + 'i32': np.arange(20, dtype='i4'), + 'i64': np.arange(20, dtype='i8'), + 'f32': np.arange(20, dtype='f4'), + 'f64': np.arange(20, dtype='f8'), + 'empty_bin': np.array([0.] * 15 + [np.nan] * 5), + 'cat': ['a']*5 + ['b']*5 + ['c']*5 + ['d']*5}) +df_pd.cat = df_pd.cat.astype('category') +df_pd.f32[2] = np.nan +df_pd.f64[2] = np.nan + +_ddf = dd.from_pandas(df_pd, npartitions=2) + + +def dask_DataFrame(*args, **kwargs): + return dd.from_pandas(pd.DataFrame(*args, **kwargs), npartitions=2) + + +try: + import cudf + import cupy + import dask_cudf + ddfs = [_ddf, dask_cudf.from_dask_dataframe(_ddf)] + + def dask_cudf_DataFrame(*args, **kwargs): + cdf = cudf.DataFrame.from_pandas( + pd.DataFrame(*args, **kwargs), nan_as_null=False + ) + return dask_cudf.from_cudf(cdf, npartitions=2) + + DataFrames = [dask_DataFrame, dask_cudf_DataFrame] +except ImportError: + cudf = cupy = dask_cudf = None + ddfs = [_ddf] + DataFrames = [dask_DataFrame] + dask_cudf_DataFrame = None + c = ds.Canvas(plot_width=2, plot_height=2, x_range=(0, 1), y_range=(0, 1)) c_logx = ds.Canvas(plot_width=2, plot_height=2, x_range=(1, 10), @@ -53,103 +82,134 @@ def floats(n): n = n + np.spacing(n) -def test_count(): +@pytest.mark.parametrize('ddf', ddfs) +def test_count(ddf): out = xr.DataArray(np.array([[5, 5], [5, 5]], dtype='i4'), coords=coords, dims=dims) - assert_eq(c.points(ddf, 'x', 'y', ds.count('i32')), out) - assert_eq(c.points(ddf, 'x', 'y', ds.count('i64')), out) - assert_eq(c.points(ddf, 'x', 'y', ds.count()), out) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.count('i32')), out) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.count('i64')), out) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.count()), out) out = xr.DataArray(np.array([[4, 5], [5, 5]], dtype='i4'), coords=coords, dims=dims) - assert_eq(c.points(ddf, 'x', 'y', ds.count('f32')), out) - assert_eq(c.points(ddf, 'x', 'y', ds.count('f64')), out) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.count('f32')), out) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.count('f64')), out) -def test_any(): +@pytest.mark.parametrize('ddf', ddfs) +def test_any(ddf): out = xr.DataArray(np.array([[True, True], [True, True]]), coords=coords, dims=dims) - assert_eq(c.points(ddf, 'x', 'y', ds.any('i64')), out) - assert_eq(c.points(ddf, 'x', 'y', ds.any('f64')), out) - assert_eq(c.points(ddf, 'x', 'y', ds.any()), out) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.any('i64')), out) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.any('f64')), out) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.any()), out) out = xr.DataArray(np.array([[True, True], [True, False]]), coords=coords, dims=dims) - assert_eq(c.points(ddf, 'x', 'y', ds.any('empty_bin')), out) - - -def test_sum(): - out = xr.DataArray(df.i32.values.reshape((2, 2, 5)).sum(axis=2, dtype='f8').T, - coords=coords, dims=dims) - assert_eq(c.points(ddf, 'x', 'y', ds.sum('i32')), out) - assert_eq(c.points(ddf, 'x', 'y', ds.sum('i64')), out) - out = xr.DataArray(np.nansum(df.f64.values.reshape((2, 2, 5)), axis=2).T, - coords=coords, dims=dims) - assert_eq(c.points(ddf, 'x', 'y', ds.sum('f32')), out) - assert_eq(c.points(ddf, 'x', 'y', ds.sum('f64')), out) - - -def test_min(): - out = xr.DataArray(df.i64.values.reshape((2, 2, 5)).min(axis=2).astype('f8').T, - coords=coords, dims=dims) - assert_eq(c.points(ddf, 'x', 'y', ds.min('i32')), out) - assert_eq(c.points(ddf, 'x', 'y', ds.min('i64')), out) - assert_eq(c.points(ddf, 'x', 'y', ds.min('f32')), out) - assert_eq(c.points(ddf, 'x', 'y', ds.min('f64')), out) - - -def test_max(): - out = xr.DataArray(df.i64.values.reshape((2, 2, 5)).max(axis=2).astype('f8').T, - coords=coords, dims=dims) - assert_eq(c.points(ddf, 'x', 'y', ds.max('i32')), out) - assert_eq(c.points(ddf, 'x', 'y', ds.max('i64')), out) - assert_eq(c.points(ddf, 'x', 'y', ds.max('f32')), out) - assert_eq(c.points(ddf, 'x', 'y', ds.max('f64')), out) - - -def test_mean(): - out = xr.DataArray(df.i32.values.reshape((2, 2, 5)).mean(axis=2, dtype='f8').T, - coords=coords, dims=dims) - assert_eq(c.points(ddf, 'x', 'y', ds.mean('i32')), out) - assert_eq(c.points(ddf, 'x', 'y', ds.mean('i64')), out) - out = xr.DataArray(np.nanmean(df.f64.values.reshape((2, 2, 5)), axis=2).T, - coords=coords, dims=dims) - assert_eq(c.points(ddf, 'x', 'y', ds.mean('f32')), out) - assert_eq(c.points(ddf, 'x', 'y', ds.mean('f64')), out) - - -def test_var(): - out = xr.DataArray(df.i32.values.reshape((2, 2, 5)).var(axis=2, dtype='f8').T, - coords=coords, dims=dims) - assert_eq(c.points(ddf, 'x', 'y', ds.var('i32')), out) - assert_eq(c.points(ddf, 'x', 'y', ds.var('i64')), out) - out = xr.DataArray(np.nanvar(df.f64.values.reshape((2, 2, 5)), axis=2).T, - coords=coords, dims=dims) - assert_eq(c.points(ddf, 'x', 'y', ds.var('f32')), out) - assert_eq(c.points(ddf, 'x', 'y', ds.var('f64')), out) - - -def test_std(): - out = xr.DataArray(df.i32.values.reshape((2, 2, 5)).std(axis=2, dtype='f8').T, - coords=coords, dims=dims) - assert_eq(c.points(ddf, 'x', 'y', ds.std('i32')), out) - assert_eq(c.points(ddf, 'x', 'y', ds.std('i64')), out) - out = xr.DataArray(np.nanstd(df.f64.values.reshape((2, 2, 5)), axis=2).T, - coords=coords, dims=dims) - assert_eq(c.points(ddf, 'x', 'y', ds.std('f32')), out) - assert_eq(c.points(ddf, 'x', 'y', ds.std('f64')), out) - - -def test_count_cat(): + assert_eq_xr(c.points(ddf, 'x', 'y', ds.any('empty_bin')), out) + + +@pytest.mark.parametrize('ddf', ddfs) +def test_sum(ddf): + out = xr.DataArray( + values(df_pd.i32).reshape((2, 2, 5)).sum(axis=2, dtype='f8').T, + coords=coords, dims=dims) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.sum('i32')), out) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.sum('i64')), out) + + out = xr.DataArray( + np.nansum(values(df_pd.f64).reshape((2, 2, 5)), axis=2).T, + coords=coords, dims=dims) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.sum('f32')), out) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.sum('f64')), out) + + +@pytest.mark.parametrize('ddf', ddfs) +def test_min(ddf): + out = xr.DataArray( + values(df_pd.i64).reshape((2, 2, 5)).min(axis=2).astype('f8').T, + coords=coords, dims=dims) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.min('i32')), out) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.min('i64')), out) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.min('f32')), out) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.min('f64')), out) + + +@pytest.mark.parametrize('ddf', ddfs) +def test_max(ddf): + out = xr.DataArray( + values(df_pd.i64).reshape((2, 2, 5)).max(axis=2).astype('f8').T, + coords=coords, dims=dims) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.max('i32')), out) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.max('i64')), out) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.max('f32')), out) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.max('f64')), out) + + +@pytest.mark.parametrize('ddf', ddfs) +def test_mean(ddf): + out = xr.DataArray( + values(df_pd.i32).reshape((2, 2, 5)).mean(axis=2, dtype='f8').T, + coords=coords, dims=dims) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.mean('i32')), out) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.mean('i64')), out) + out = xr.DataArray( + np.nanmean(values(df_pd.f64).reshape((2, 2, 5)), axis=2).T, + coords=coords, dims=dims) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.mean('f32')), out) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.mean('f64')), out) + + +@pytest.mark.parametrize('ddf', ddfs) +def test_var(ddf): + if dask_cudf and isinstance(ddf, dask_cudf.DataFrame): + pytest.skip("var not supported with cudf") + + out = xr.DataArray( + values(df_pd.i32).reshape((2, 2, 5)).var(axis=2, dtype='f8').T, + coords=coords, dims=dims) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.var('i32')), out) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.var('i64')), out) + out = xr.DataArray( + np.nanvar(values(df_pd.f64).reshape((2, 2, 5)), axis=2).T, + coords=coords, dims=dims) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.var('f32')), out) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.var('f64')), out) + + +@pytest.mark.parametrize('ddf', ddfs) +def test_std(ddf): + if dask_cudf and isinstance(ddf, dask_cudf.DataFrame): + pytest.skip("std not supported with cudf") + + out = xr.DataArray( + values(df_pd.i32).reshape((2, 2, 5)).std(axis=2, dtype='f8').T, + coords=coords, dims=dims) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.std('i32')), out) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.std('i64')), out) + out = xr.DataArray( + np.nanstd(values(df_pd.f64).reshape((2, 2, 5)), axis=2).T, + coords=coords, dims=dims) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.std('f32')), out) + assert_eq_xr(c.points(ddf, 'x', 'y', ds.std('f64')), out) + + +@pytest.mark.parametrize('ddf', ddfs) +def test_count_cat(ddf): sol = np.array([[[5, 0, 0, 0], [0, 0, 5, 0]], [[0, 5, 0, 0], [0, 0, 0, 5]]]) - out = xr.DataArray(sol, coords=(coords + [['a', 'b', 'c', 'd']]), - dims=(dims + ['cat'])) + out = xr.DataArray( + sol, coords=(coords + [['a', 'b', 'c', 'd']]), dims=(dims + ['cat']) + ) agg = c.points(ddf, 'x', 'y', ds.count_cat('cat')) - assert_eq(agg, out) + assert_eq_xr(agg, out) + +@pytest.mark.parametrize('ddf', ddfs) +def test_multiple_aggregates(ddf): + if dask_cudf and isinstance(ddf, dask_cudf.DataFrame): + pytest.skip("std not supported with cudf") -def test_multiple_aggregates(): agg = c.points(ddf, 'x', 'y', ds.summary(f64_std=ds.std('f64'), f64_mean=ds.mean('f64'), @@ -157,39 +217,39 @@ def test_multiple_aggregates(): i32_count=ds.count('i32'))) f = lambda x: xr.DataArray(x, coords=coords, dims=dims) - assert_eq(agg.f64_std, f(np.nanstd(df.f64.values.reshape((2, 2, 5)), axis=2).T)) - assert_eq(agg.f64_mean, f(np.nanmean(df.f64.values.reshape((2, 2, 5)), axis=2).T)) - assert_eq(agg.i32_sum, f(df.i32.values.reshape((2, 2, 5)).sum(axis=2, dtype='f8').T)) - assert_eq(agg.i32_count, f(np.array([[5, 5], [5, 5]], dtype='i4'))) + assert_eq_xr(agg.f64_std, f(np.nanstd(values(df_pd.f64).reshape((2, 2, 5)), axis=2).T)) + assert_eq_xr(agg.f64_mean, f(np.nanmean(values(df_pd.f64).reshape((2, 2, 5)), axis=2).T)) + assert_eq_xr(agg.i32_sum, f(values(df_pd.i32).reshape((2, 2, 5)).sum(axis=2, dtype='f8').T)) + assert_eq_xr(agg.i32_count, f(np.array([[5, 5], [5, 5]], dtype='i4'))) -def test_auto_range_points(): +@pytest.mark.parametrize('DataFrame', DataFrames) +def test_auto_range_points(DataFrame): n = 10 data = np.arange(n, dtype='i4') - df = pd.DataFrame({'time': np.arange(n), - 'x': data, - 'y': data}) - ddf = dd.from_pandas(df, npartitions=2) + + ddf = DataFrame({'time': np.arange(n), + 'x': data, + 'y': data}) cvs = ds.Canvas(plot_width=n, plot_height=n) agg = cvs.points(ddf, 'x', 'y', ds.count('time')) sol = np.zeros((n, n), int) np.fill_diagonal(sol, 1) - np.testing.assert_equal(agg.data, sol) + assert_eq_ndarray(agg.data, sol) cvs = ds.Canvas(plot_width=n+1, plot_height=n+1) agg = cvs.points(ddf, 'x', 'y', ds.count('time')) sol = np.zeros((n+1, n+1), int) np.fill_diagonal(sol, 1) sol[5, 5] = 0 - np.testing.assert_equal(agg.data, sol) + assert_eq_ndarray(agg.data, sol) n = 4 data = np.arange(n, dtype='i4') - df = pd.DataFrame({'time': np.arange(n), - 'x': data, - 'y': data}) - ddf = dd.from_pandas(df, npartitions=2) + ddf = DataFrame({'time': np.arange(n), + 'x': data, + 'y': data}) cvs = ds.Canvas(plot_width=2*n, plot_height=2*n) agg = cvs.points(ddf, 'x', 'y', ds.count('time')) @@ -197,7 +257,7 @@ def test_auto_range_points(): np.fill_diagonal(sol, 1) sol[[tuple(range(1, 4, 2))]] = 0 sol[[tuple(range(4, 8, 2))]] = 0 - np.testing.assert_equal(agg.data, sol) + assert_eq_ndarray(agg.data, sol) cvs = ds.Canvas(plot_width=2*n+1, plot_height=2*n+1) agg = cvs.points(ddf, 'x', 'y', ds.count('time')) @@ -206,44 +266,47 @@ def test_auto_range_points(): sol[3, 3] = 1 sol[6, 6] = 1 sol[8, 8] = 1 - np.testing.assert_equal(agg.data, sol) + assert_eq_ndarray(agg.data, sol) -def test_uniform_points(): +@pytest.mark.parametrize('DataFrame', DataFrames) +def test_uniform_points(DataFrame): n = 101 - df = pd.DataFrame({'time': np.ones(2*n, dtype='i4'), - 'x': np.concatenate((np.arange(n, dtype='f8'), - np.arange(n, dtype='f8'))), - 'y': np.concatenate(([0.] * n, [1.] * n))}) + ddf = DataFrame({'time': np.ones(2*n, dtype='i4'), + 'x': np.concatenate((np.arange(n, dtype='f8'), + np.arange(n, dtype='f8'))), + 'y': np.concatenate(([0.] * n, [1.] * n))}) cvs = ds.Canvas(plot_width=10, plot_height=2, y_range=(0, 1)) - agg = cvs.points(df, 'x', 'y', ds.count('time')) + agg = cvs.points(ddf, 'x', 'y', ds.count('time')) sol = np.array([[10] * 9 + [11], [10] * 9 + [11]], dtype='i4') - np.testing.assert_equal(agg.data, sol) + assert_eq_ndarray(agg.data, sol) +@pytest.mark.parametrize('DataFrame', DataFrames) @pytest.mark.parametrize('high', [9, 10, 99, 100]) @pytest.mark.parametrize('low', [0]) -def test_uniform_diagonal_points(low, high): +def test_uniform_diagonal_points(DataFrame, low, high): bounds = (low, high) x_range, y_range = bounds, bounds width = x_range[1] - x_range[0] height = y_range[1] - y_range[0] n = width * height - df = pd.DataFrame({'time': np.ones(n, dtype='i4'), - 'x': np.array([np.arange(*x_range, dtype='f8')] * width).flatten(), - 'y': np.array([np.arange(*y_range, dtype='f8')] * height).flatten()}) + ddf = DataFrame({'time': np.ones(n, dtype='i4'), + 'x': np.array([np.arange(*x_range, dtype='f8')] * width).flatten(), + 'y': np.array([np.arange(*y_range, dtype='f8')] * height).flatten()}) cvs = ds.Canvas(plot_width=2, plot_height=2, x_range=x_range, y_range=y_range) - agg = cvs.points(df, 'x', 'y', ds.count('time')) + agg = cvs.points(ddf, 'x', 'y', ds.count('time')) diagonal = agg.data.diagonal(0) assert sum(diagonal) == n assert abs(bounds[1] - bounds[0]) % 2 == abs(diagonal[1] / high - diagonal[0] / high) -def test_log_axis_points(): +@pytest.mark.parametrize('ddf', ddfs) +def test_log_axis_points(ddf): axis = ds.core.LogAxis() logcoords = axis.compute_index(axis.compute_scale_and_translate((1, 10), 2), 2) @@ -253,22 +316,22 @@ def test_log_axis_points(): sol = np.array([[5, 5], [5, 5]], dtype='i4') out = xr.DataArray(sol, coords=[lincoords, logcoords], dims=['y', 'log_x']) - assert_eq(c_logx.points(ddf, 'log_x', 'y', ds.count('i32')), out) + assert_eq_xr(c_logx.points(ddf, 'log_x', 'y', ds.count('i32')), out) out = xr.DataArray(sol, coords=[logcoords, lincoords], dims=['log_y', 'x']) - assert_eq(c_logy.points(ddf, 'x', 'log_y', ds.count('i32')), out) + assert_eq_xr(c_logy.points(ddf, 'x', 'log_y', ds.count('i32')), out) out = xr.DataArray(sol, coords=[logcoords, logcoords], dims=['log_y', 'log_x']) - assert_eq(c_logxy.points(ddf, 'log_x', 'log_y', ds.count('i32')), out) + assert_eq_xr(c_logxy.points(ddf, 'log_x', 'log_y', ds.count('i32')), out) -def test_line(): +@pytest.mark.parametrize('DataFrame', DataFrames) +def test_line(DataFrame): axis = ds.core.LinearAxis() lincoords = axis.compute_index(axis.compute_scale_and_translate((-3., 3.), 7), 7) - df = pd.DataFrame({'x': [4, 0, -4, -3, -2, -1.9, 0, 10, 10, 0, 4], - 'y': [0, -4, 0, 1, 2, 2.1, 4, 20, 30, 4, 0]}) - ddf = dd.from_pandas(df, npartitions=2) + ddf = DataFrame({'x': [4, 0, -4, -3, -2, -1.9, 0, 10, 10, 0, 4], + 'y': [0, -4, 0, 1, 2, 2.1, 4, 20, 30, 4, 0]}) cvs = ds.Canvas(plot_width=7, plot_height=7, x_range=(-3, 3), y_range=(-3, 3)) agg = cvs.line(ddf, 'x', 'y', ds.count()) @@ -281,13 +344,14 @@ def test_line(): [0, 0, 1, 0, 1, 0, 0]], dtype='i4') out = xr.DataArray(sol, coords=[lincoords, lincoords], dims=['y', 'x']) - assert_eq(agg, out) + assert_eq_xr(agg, out) # # Line tests -@pytest.mark.parametrize('df,x,y,ax', [ +@pytest.mark.parametrize('DataFrame', DataFrames) +@pytest.mark.parametrize('df_kwargs,x,y,ax', [ # axis1 none constant - (pd.DataFrame({ + (dict(data={ 'x0': [4, -4, 4], 'x1': [0, 0, 0], 'x2': [-4, 4, -4], @@ -297,20 +361,20 @@ def test_line(): }), ['x0', 'x1', 'x2'], ['y0', 'y1', 'y2'], 1), # axis1 x constant - (pd.DataFrame({ + (dict(data={ 'y0': [0, 0, 0], 'y1': [0, 4, -4], 'y2': [0, 0, 0] }), np.array([-4, 0, 4]), ['y0', 'y1', 'y2'], 1), # axis0 single - (pd.DataFrame({ + (dict(data={ 'x': [4, 0, -4, np.nan, -4, 0, 4, np.nan, 4, 0, -4], 'y': [0, -4, 0, np.nan, 0, 4, 0, np.nan, 0, 0, 0], }), 'x', 'y', 0), # axis0 multi - (pd.DataFrame({ + (dict(data={ 'x0': [4, 0, -4], 'x1': [-4, 0, 4], 'x2': [4, 0, -4], @@ -320,7 +384,7 @@ def test_line(): }), ['x0', 'x1', 'x2'], ['y0', 'y1', 'y2'], 0), # axis0 multi with string - (pd.DataFrame({ + (dict(data={ 'x0': [-4, 0, 4], 'y0': [0, -4, 0], 'y1': [0, 4, 0], @@ -328,17 +392,20 @@ def test_line(): }), 'x0', ['y0', 'y1', 'y2'], 0), # axis1 RaggedArray - (pd.DataFrame({ + (dict(data={ 'x': [[4, 0, -4], [-4, 0, 4, 4, 0, -4]], 'y': [[0, -4, 0], [0, 4, 0, 0, 0, 0]], }, dtype='Ragged[int64]'), 'x', 'y', 1), ]) -def test_line_manual_range(df, x, y, ax): +def test_line_manual_range(DataFrame, df_kwargs, x, y, ax): + if DataFrame is dask_cudf_DataFrame: + if df_kwargs.get('dtype', '').startswith('Ragged'): + pytest.skip("Ragged array not supported with cudf") + axis = ds.core.LinearAxis() lincoords = axis.compute_index(axis.compute_scale_and_translate((-3., 3.), 7), 7) - ddf = dd.from_pandas(df, npartitions=2) - + ddf = DataFrame(**df_kwargs) cvs = ds.Canvas(plot_width=7, plot_height=7, x_range=(-3, 3), y_range=(-3, 3)) @@ -354,12 +421,13 @@ def test_line_manual_range(df, x, y, ax): out = xr.DataArray(sol, coords=[lincoords, lincoords], dims=['y', 'x']) - assert_eq(agg, out) + assert_eq_xr(agg, out) -@pytest.mark.parametrize('df,x,y,ax', [ +@pytest.mark.parametrize('DataFrame', DataFrames) +@pytest.mark.parametrize('df_kwargs,x,y,ax', [ # axis1 none constant - (pd.DataFrame({ + (dict(data={ 'x0': [0, 0, 0], 'x1': [-4, 0, 4], 'x2': [0, 0, 0], @@ -369,20 +437,20 @@ def test_line_manual_range(df, x, y, ax): }), ['x0', 'x1', 'x2'], ['y0', 'y1', 'y2'], 1), # axis1 y constant - (pd.DataFrame({ + (dict(data={ 'x0': [0, 0, 0], 'x1': [-4, 0, 4], 'x2': [0, 0, 0], }), ['x0', 'x1', 'x2'], np.array([-4, 0, 4]), 1), # axis0 single - (pd.DataFrame({ + (dict(data={ 'x': [0, -4, 0, np.nan, 0, 0, 0, np.nan, 0, 4, 0], 'y': [-4, 0, 4, np.nan, 4, 0, -4, np.nan, -4, 0, 4], }), 'x', 'y', 0), # axis0 multi - (pd.DataFrame({ + (dict(data={ 'x0': [0, -4, 0], 'x1': [0, 0, 0], 'x2': [0, 4, 0], @@ -392,7 +460,7 @@ def test_line_manual_range(df, x, y, ax): }), ['x0', 'x1', 'x2'], ['y0', 'y1', 'y2'], 0), # axis0 multi with string - (pd.DataFrame({ + (dict(data={ 'x0': [0, -4, 0], 'x1': [0, 0, 0], 'x2': [0, 4, 0], @@ -400,18 +468,22 @@ def test_line_manual_range(df, x, y, ax): }), ['x0', 'x1', 'x2'], 'y0', 0), # axis1 RaggedArray - (pd.DataFrame({ + (dict(data={ 'x': [[0, -4, 0], [0, 0, 0], [0, 4, 0]], 'y': [[-4, 0, 4], [4, 0, -4], [-4, 0, 4]], }, dtype='Ragged[int64]'), 'x', 'y', 1), ]) -def test_line_autorange(df, x, y, ax): +def test_line_autorange(DataFrame, df_kwargs, x, y, ax): + if DataFrame is dask_cudf_DataFrame: + if df_kwargs.get('dtype', '').startswith('Ragged'): + pytest.skip("Ragged array not supported with cudf") + axis = ds.core.LinearAxis() lincoords = axis.compute_index( axis.compute_scale_and_translate((-4., 4.), 9), 9) - ddf = dd.from_pandas(df, npartitions=2) + ddf = DataFrame(**df_kwargs) cvs = ds.Canvas(plot_width=9, plot_height=9) @@ -429,17 +501,12 @@ def test_line_autorange(df, x, y, ax): out = xr.DataArray(sol, coords=[lincoords, lincoords], dims=['y', 'x']) - assert_eq(agg, out) + assert_eq_xr(agg, out) -def test_line_x_constant_autorange(): +@pytest.mark.parametrize('DataFrame', DataFrames) +def test_line_x_constant_autorange(DataFrame): # axis1 y constant - df = pd.DataFrame({ - 'y0': [0, 0, 0], - 'y1': [-4, 0, 4], - 'y2': [0, 0, 0], - }) - x = np.array([-4, 0, 4]) y = ['y0', 'y1', 'y2'] ax = 1 @@ -448,7 +515,11 @@ def test_line_x_constant_autorange(): lincoords = axis.compute_index( axis.compute_scale_and_translate((-4., 4.), 9), 9) - ddf = dd.from_pandas(df, npartitions=2) + ddf = DataFrame({ + 'y0': [0, 0, 0], + 'y1': [-4, 0, 4], + 'y2': [0, 0, 0], + }) cvs = ds.Canvas(plot_width=9, plot_height=9) @@ -466,10 +537,11 @@ def test_line_x_constant_autorange(): out = xr.DataArray(sol, coords=[lincoords, lincoords], dims=['y', 'x']) - assert_eq(agg, out) + assert_eq_xr(agg, out) -def test_log_axis_line(): +@pytest.mark.parametrize('ddf', ddfs) +def test_log_axis_line(ddf): axis = ds.core.LogAxis() logcoords = axis.compute_index(axis.compute_scale_and_translate((1, 10), 2), 2) @@ -479,22 +551,24 @@ def test_log_axis_line(): sol = np.array([[5, 5], [5, 5]], dtype='i4') out = xr.DataArray(sol, coords=[lincoords, logcoords], dims=['y', 'log_x']) - assert_eq(c_logx.line(ddf, 'log_x', 'y', ds.count('i32')), out) + + assert_eq_xr(c_logx.line(ddf, 'log_x', 'y', ds.count('i32')), out) out = xr.DataArray(sol, coords=[logcoords, lincoords], dims=['log_y', 'x']) - assert_eq(c_logy.line(ddf, 'x', 'log_y', ds.count('i32')), out) + assert_eq_xr(c_logy.line(ddf, 'x', 'log_y', ds.count('i32')), out) out = xr.DataArray(sol, coords=[logcoords, logcoords], dims=['log_y', 'log_x']) - assert_eq(c_logxy.line(ddf, 'log_x', 'log_y', ds.count('i32')), out) + assert_eq_xr(c_logxy.line(ddf, 'log_x', 'log_y', ds.count('i32')), out) -def test_auto_range_line(): +@pytest.mark.parametrize('DataFrame', DataFrames) +def test_auto_range_line(DataFrame): axis = ds.core.LinearAxis() lincoords = axis.compute_index(axis.compute_scale_and_translate((-10., 10.), 5), 5) - df = pd.DataFrame({'x': [-10, 0, 10, 0, -10], - 'y': [ 0, 10, 0, -10, 0]}) - ddf = dd.from_pandas(df, npartitions=2) + ddf = DataFrame({'x': [-10, 0, 10, 0, -10], + 'y': [ 0, 10, 0, -10, 0]}) + cvs = ds.Canvas(plot_width=5, plot_height=5) agg = cvs.line(ddf, 'x', 'y', ds.count()) sol = np.array([[0, 0, 1, 0, 0], @@ -504,12 +578,13 @@ def test_auto_range_line(): [0, 0, 1, 0, 0]], dtype='i4') out = xr.DataArray(sol, coords=[lincoords, lincoords], dims=['y', 'x']) - assert_eq(agg, out) + assert_eq_xr(agg, out) -@pytest.mark.parametrize('df,x,y,ax', [ +@pytest.mark.parametrize('DataFrame', DataFrames) +@pytest.mark.parametrize('df_kwargs,x,y,ax', [ # axis1 none constant - (pd.DataFrame({ + (dict(data={ 'x0': [-4, np.nan], 'x1': [-2, 2], 'x2': [0, 4], @@ -519,13 +594,13 @@ def test_auto_range_line(): }, dtype='float32'), ['x0', 'x1', 'x2'], ['y0', 'y1', 'y2'], 1), # axis0 single - (pd.DataFrame({ + (dict(data={ 'x': [-4, -2, 0, np.nan, 2, 4], 'y': [0, -4, 0, np.nan, 4, 0], }), 'x', 'y', 0), # axis0 multi - (pd.DataFrame({ + (dict(data={ 'x0': [-4, -2, 0], 'x1': [np.nan, 2, 4], 'y0': [0, -4, 0], @@ -533,12 +608,16 @@ def test_auto_range_line(): }, dtype='float32'), ['x0', 'x1'], ['y0', 'y1'], 0), # axis1 ragged arrays - (pd.DataFrame({ - 'x': pd.array([[-4, -2, 0], [2, 4]], dtype='Ragged[float32]'), - 'y': pd.array([[0, -4, 0], [4, 0]], dtype='Ragged[float32]') - }), 'x', 'y', 1) + (dict(data={ + 'x': pd.array([[-4, -2, 0], [2, 4]]), + 'y': pd.array([[0, -4, 0], [4, 0]]) + }, dtype='Ragged[float32]'), 'x', 'y', 1) ]) -def test_area_to_zero_fixedrange(df, x, y, ax): +def test_area_to_zero_fixedrange(DataFrame, df_kwargs, x, y, ax): + if DataFrame is dask_cudf_DataFrame: + if df_kwargs.get('dtype', '').startswith('Ragged'): + pytest.skip("Ragged array not supported with cudf") + axis = ds.core.LinearAxis() lincoords_y = axis.compute_index( axis.compute_scale_and_translate((-2.25, 2.25), 5), 5) @@ -549,7 +628,7 @@ def test_area_to_zero_fixedrange(df, x, y, ax): cvs = ds.Canvas(plot_width=9, plot_height=5, x_range=[-3.75, 3.75], y_range=[-2.25, 2.25]) - ddf = dd.from_pandas(df, npartitions=2) + ddf = DataFrame(**df_kwargs) agg = cvs.area(ddf, x, y, ds.count(), axis=ax) @@ -562,12 +641,13 @@ def test_area_to_zero_fixedrange(df, x, y, ax): out = xr.DataArray(sol, coords=[lincoords_y, lincoords_x], dims=['y', 'x']) - assert_eq(agg, out) + assert_eq_xr(agg, out) -@pytest.mark.parametrize('df,x,y,ax', [ +@pytest.mark.parametrize('DataFrame', DataFrames) +@pytest.mark.parametrize('df_kwargs,x,y,ax', [ # axis1 none constant - (pd.DataFrame({ + (dict(data={ 'x0': [-4, 0], 'x1': [-2, 2], 'x2': [0, 4], @@ -577,7 +657,7 @@ def test_area_to_zero_fixedrange(df, x, y, ax): }, dtype='float32'), ['x0', 'x1', 'x2'], ['y0', 'y1', 'y2'], 1), # axis1 y constant - (pd.DataFrame({ + (dict(data={ 'x0': [-4, 0], 'x1': [-2, 2], 'x2': [0, 4], @@ -585,13 +665,13 @@ def test_area_to_zero_fixedrange(df, x, y, ax): ['x0', 'x1', 'x2'], np.array([0, -4, 0], dtype='float32'), 1), # axis0 single - (pd.DataFrame({ + (dict(data={ 'x': [-4, -2, 0, 0, 2, 4], 'y': [0, -4, 0, 0, -4, 0], }), 'x', 'y', 0), # axis0 multi - (pd.DataFrame({ + (dict(data={ 'x0': [-4, -2, 0], 'x1': [0, 2, 4], 'y0': [0, -4, 0], @@ -599,19 +679,23 @@ def test_area_to_zero_fixedrange(df, x, y, ax): }, dtype='float32'), ['x0', 'x1'], ['y0', 'y1'], 0), # axis0 multi, y string - (pd.DataFrame({ + (dict(data={ 'x0': [-4, -2, 0], 'x1': [0, 2, 4], 'y0': [0, -4, 0], }, dtype='float32'), ['x0', 'x1'], 'y0', 0), # axis1 ragged arrays - (pd.DataFrame({ - 'x': pd.array([[-4, -2, 0], [0, 2, 4]], dtype='Ragged[float32]'), - 'y': pd.array([[0, -4, 0], [0, -4, 0]], dtype='Ragged[float32]') - }), 'x', 'y', 1) + (dict(data={ + 'x': [[-4, -2, 0], [0, 2, 4]], + 'y': [[0, -4, 0], [0, -4, 0]] + }, dtype='Ragged[float32]'), 'x', 'y', 1) ]) -def test_area_to_zero_autorange(df, x, y, ax): +def test_area_to_zero_autorange(DataFrame, df_kwargs, x, y, ax): + if DataFrame is dask_cudf_DataFrame: + if df_kwargs.get('dtype', '').startswith('Ragged'): + pytest.skip("Ragged array not supported with cudf") + axis = ds.core.LinearAxis() lincoords_y = axis.compute_index( axis.compute_scale_and_translate((-4., 0.), 7), 7) @@ -620,7 +704,7 @@ def test_area_to_zero_autorange(df, x, y, ax): cvs = ds.Canvas(plot_width=13, plot_height=7) - ddf = dd.from_pandas(df, npartitions=2) + ddf = DataFrame(**df_kwargs) agg = cvs.area(ddf, x, y, ds.count(), axis=ax) sol = np.array([[0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0], @@ -634,12 +718,13 @@ def test_area_to_zero_autorange(df, x, y, ax): out = xr.DataArray(sol, coords=[lincoords_y, lincoords_x], dims=['y', 'x']) - assert_eq(agg, out) + assert_eq_xr(agg, out) -@pytest.mark.parametrize('df,x,y,ax', [ +@pytest.mark.parametrize('DataFrame', DataFrames) +@pytest.mark.parametrize('df_kwargs,x,y,ax', [ # axis1 none constant - (pd.DataFrame({ + (dict(data={ 'x0': [-4, np.nan], 'x1': [-2, 2], 'x2': [0, 4], @@ -649,13 +734,13 @@ def test_area_to_zero_autorange(df, x, y, ax): }, dtype='float32'), ['x0', 'x1', 'x2'], ['y0', 'y1', 'y2'], 1), # axis0 single - (pd.DataFrame({ + (dict(data={ 'x': [-4, -2, 0, np.nan, 2, 4], 'y': [0, -4, 0, np.nan, 4, 0], }), 'x', 'y', 0), # axis0 multi - (pd.DataFrame({ + (dict(data={ 'x0': [-4, -2, 0], 'x1': [np.nan, 2, 4], 'y0': [0, -4, 0], @@ -663,12 +748,16 @@ def test_area_to_zero_autorange(df, x, y, ax): }, dtype='float32'), ['x0', 'x1'], ['y0', 'y1'], 0), # axis1 ragged arrays - (pd.DataFrame({ - 'x': pd.array([[-4, -2, 0], [2, 4]], dtype='Ragged[float32]'), - 'y': pd.array([[0, -4, 0], [4, 0]], dtype='Ragged[float32]') - }), 'x', 'y', 1) + (dict(data={ + 'x': [[-4, -2, 0], [2, 4]], + 'y': [[0, -4, 0], [4, 0]], + }, dtype='Ragged[float32]'), 'x', 'y', 1) ]) -def test_area_to_zero_autorange_gap(df, x, y, ax): +def test_area_to_zero_autorange_gap(DataFrame, df_kwargs, x, y, ax): + if DataFrame is dask_cudf_DataFrame: + if df_kwargs.get('dtype', '').startswith('Ragged'): + pytest.skip("Ragged array not supported with cudf") + axis = ds.core.LinearAxis() lincoords_y = axis.compute_index( axis.compute_scale_and_translate((-4., 4.), 7), 7) @@ -677,7 +766,7 @@ def test_area_to_zero_autorange_gap(df, x, y, ax): cvs = ds.Canvas(plot_width=13, plot_height=7) - ddf = dd.from_pandas(df, npartitions=2) + ddf = DataFrame(**df_kwargs) agg = cvs.area(ddf, x, y, ds.count(), axis=ax) @@ -692,12 +781,13 @@ def test_area_to_zero_autorange_gap(df, x, y, ax): out = xr.DataArray(sol, coords=[lincoords_y, lincoords_x], dims=['y', 'x']) - assert_eq(agg, out) + assert_eq_xr(agg, out) -@pytest.mark.parametrize('df,x,y,y_stack,ax', [ +@pytest.mark.parametrize('DataFrame', DataFrames) +@pytest.mark.parametrize('df_kwargs,x,y,y_stack,ax', [ # axis1 none constant - (pd.DataFrame({ + (dict(data={ 'x0': [-4, 0], 'x1': [-2, 2], 'x2': [0, 4], @@ -711,7 +801,7 @@ def test_area_to_zero_autorange_gap(df, x, y, ax): ['x0', 'x1', 'x2'], ['y0', 'y1', 'y2'], ['y3', 'y4', 'y5'], 1), # axis1 y constant - (pd.DataFrame({ + (dict(data={ 'x0': [-4, 0], 'x1': [-2, 2], 'x2': [0, 4], @@ -721,14 +811,14 @@ def test_area_to_zero_autorange_gap(df, x, y, ax): np.array([0, -2, 0], dtype='float32'), 1), # axis0 single - (pd.DataFrame({ + (dict(data={ 'x': [-4, -2, 0, 0, 2, 4], 'y': [0, -4, 0, 0, -4, 0], 'y_stack': [0, -2, 0, 0, -2, 0], }), 'x', 'y', 'y_stack', 0), # axis0 multi - (pd.DataFrame({ + (dict(data={ 'x0': [-4, -2, 0], 'x1': [0, 2, 4], 'y0': [0, -4, 0], @@ -738,7 +828,7 @@ def test_area_to_zero_autorange_gap(df, x, y, ax): }, dtype='float32'), ['x0', 'x1'], ['y0', 'y1'], ['y2', 'y3'], 0), # axis0 multi, y string - (pd.DataFrame({ + (dict(data={ 'x0': [-4, -2, 0], 'x1': [0, 2, 4], 'y0': [0, -4, 0], @@ -746,13 +836,17 @@ def test_area_to_zero_autorange_gap(df, x, y, ax): }, dtype='float32'), ['x0', 'x1'], 'y0', 'y2', 0), # axis1 ragged arrays - (pd.DataFrame({ - 'x': pd.array([[-4, -2, 0], [0, 2, 4]], dtype='Ragged[float32]'), - 'y': pd.array([[0, -4, 0], [0, -4, 0]], dtype='Ragged[float32]'), - 'y_stack': pd.array([[0, -2, 0], [0, -2, 0]], dtype='Ragged[float32]') - }), 'x', 'y', 'y_stack', 1) + (dict(data={ + 'x': [[-4, -2, 0], [0, 2, 4]], + 'y': [[0, -4, 0], [0, -4, 0]], + 'y_stack': [[0, -2, 0], [0, -2, 0]] + }, dtype='Ragged[float32]'), 'x', 'y', 'y_stack', 1) ]) -def test_area_to_line_autorange(df, x, y, y_stack, ax): +def test_area_to_line_autorange(DataFrame, df_kwargs, x, y, y_stack, ax): + if DataFrame is dask_cudf_DataFrame: + if df_kwargs.get('dtype', '').startswith('Ragged'): + pytest.skip("Ragged array not supported with cudf") + axis = ds.core.LinearAxis() lincoords_y = axis.compute_index( axis.compute_scale_and_translate((-4., 0.), 7), 7) @@ -761,7 +855,7 @@ def test_area_to_line_autorange(df, x, y, y_stack, ax): cvs = ds.Canvas(plot_width=13, plot_height=7) - ddf = dd.from_pandas(df, npartitions=2) + ddf = DataFrame(**df_kwargs) agg = cvs.area(ddf, x, y, ds.count(), axis=ax, y_stack=y_stack) sol = np.array([[0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0], @@ -775,12 +869,13 @@ def test_area_to_line_autorange(df, x, y, y_stack, ax): out = xr.DataArray(sol, coords=[lincoords_y, lincoords_x], dims=['y', 'x']) - assert_eq(agg, out) + assert_eq_xr(agg, out) -@pytest.mark.parametrize('df,x,y,y_stack,ax', [ +@pytest.mark.parametrize('DataFrame', DataFrames) +@pytest.mark.parametrize('df_kwargs,x,y,y_stack,ax', [ # axis1 none constant - (pd.DataFrame({ + (dict(data={ 'x0': [-4, np.nan], 'x1': [-2, 2], 'x2': [0, 4], @@ -794,14 +889,14 @@ def test_area_to_line_autorange(df, x, y, y_stack, ax): ['x0', 'x1', 'x2'], ['y0', 'y1', 'y2'], ['y4', 'y5', 'y6'], 1), # axis0 single - (pd.DataFrame({ + (dict(data={ 'x': [-4, -2, 0, np.nan, 2, 4], 'y': [0, -4, 0, np.nan, 4, 0], 'y_stack': [0, 0, 0, 0, 0, 0], }), 'x', 'y', 'y_stack', 0), # axis0 multi - (pd.DataFrame({ + (dict(data={ 'x0': [-4, -2, 0], 'x1': [np.nan, 2, 4], 'y0': [0, -4, 0], @@ -811,13 +906,17 @@ def test_area_to_line_autorange(df, x, y, y_stack, ax): }, dtype='float32'), ['x0', 'x1'], ['y0', 'y1'], ['y2', 'y3'], 0), # axis1 ragged arrays - (pd.DataFrame({ - 'x': pd.array([[-4, -2, 0], [2, 4]], dtype='Ragged[float32]'), - 'y': pd.array([[0, -4, 0], [4, 0]], dtype='Ragged[float32]'), - 'y_stack': pd.array([[0, 0, 0], [0, 0]], dtype='Ragged[float32]') - }), 'x', 'y', 'y_stack', 1) + (dict(data={ + 'x': [[-4, -2, 0], [2, 4]], + 'y': [[0, -4, 0], [4, 0]], + 'y_stack': [[0, 0, 0], [0, 0]], + }, dtype='Ragged[float32]'), 'x', 'y', 'y_stack', 1) ]) -def test_area_to_line_autorange_gap(df, x, y, y_stack, ax): +def test_area_to_line_autorange_gap(DataFrame, df_kwargs, x, y, y_stack, ax): + if DataFrame is dask_cudf_DataFrame: + if df_kwargs.get('dtype', '').startswith('Ragged'): + pytest.skip("Ragged array not supported with cudf") + axis = ds.core.LinearAxis() lincoords_y = axis.compute_index( axis.compute_scale_and_translate((-4., 4.), 7), 7) @@ -826,7 +925,7 @@ def test_area_to_line_autorange_gap(df, x, y, y_stack, ax): cvs = ds.Canvas(plot_width=13, plot_height=7) - ddf = dd.from_pandas(df, npartitions=2) + ddf = DataFrame(**df_kwargs) # When a line is specified to fill to, this line is not included in # the fill. So we expect the y=0 line to not be filled. @@ -843,7 +942,7 @@ def test_area_to_line_autorange_gap(df, x, y, y_stack, ax): out = xr.DataArray(sol, coords=[lincoords_y, lincoords_x], dims=['y', 'x']) - assert_eq(agg, out) + assert_eq_xr(agg, out) def test_trimesh_no_double_edge(): diff --git a/datashader/tests/test_glyphs.py b/datashader/tests/test_glyphs.py index ff9991de8..8aaa3a602 100644 --- a/datashader/tests/test_glyphs.py +++ b/datashader/tests/test_glyphs.py @@ -54,7 +54,7 @@ def new_agg(): # Line rasterization expand_aggs_and_cols = Glyph._expand_aggs_and_cols(append, 1) _draw_segment = _build_draw_segment(append, map_onto_pixel_for_line, expand_aggs_and_cols) -extend_line = _build_extend_line_axis0(_draw_segment, expand_aggs_and_cols) +extend_line, _ = _build_extend_line_axis0(_draw_segment, expand_aggs_and_cols) # Triangles rasterization draw_triangle, draw_triangle_interp = _build_draw_triangle(tri_append) diff --git a/datashader/tests/test_pandas.py b/datashader/tests/test_pandas.py index a332d6f8a..61dd47871 100644 --- a/datashader/tests/test_pandas.py +++ b/datashader/tests/test_pandas.py @@ -8,6 +8,7 @@ import pytest +from datashader.datatypes import RaggedDtype df_pd = pd.DataFrame({'x': np.array(([0.] * 10 + [1] * 10)), 'y': np.array(([0.] * 5 + [1] * 5 + [0] * 5 + [1] * 5)), @@ -24,8 +25,20 @@ df_pd.f64[2] = np.nan dfs_pd = [df_pd] -dfs = [df_pd] -DataFrames = [pd.DataFrame] +try: + import cudf + import cupy + def cudf_DataFrame(*args, **kwargs): + return cudf.DataFrame.from_pandas( + pd.DataFrame(*args, **kwargs), nan_as_null=False + ) + df_cuda = cudf_DataFrame(df_pd) + dfs = [df_pd, df_cuda] + DataFrames = [pd.DataFrame, cudf_DataFrame] +except ImportError: + cudf = cupy = None + dfs = [df_pd] + DataFrames = [pd.DataFrame] c = ds.Canvas(plot_width=2, plot_height=2, x_range=(0, 1), y_range=(0, 1)) c_logx = ds.Canvas(plot_width=2, plot_height=2, x_range=(1, 10), @@ -44,12 +57,18 @@ def assert_eq_xr(agg, b): """Assert that two xarray DataArrays are equal, handling the possibility that the two DataArrays may be backed by ndarrays of different types""" + if cupy and isinstance(agg.data, cupy.ndarray): + agg = xr.DataArray( + cupy.asnumpy(agg.data), coords=agg.coords, dims=agg.dims + ) assert agg.equals(b) def assert_eq_ndarray(data, b): """Assert that two ndarrays are equal, handling the possibility that the ndarrays are of different types""" + if cupy and isinstance(data, cupy.ndarray): + data = cupy.asnumpy(data) np.testing.assert_equal(data, b) @@ -63,7 +82,10 @@ def floats(n): def values(s): """Get numpy array of values from pandas-like Series, handling Series of different types""" - return s.values + if cudf and isinstance(s, cudf.Series): + return s.to_array(fillna=np.nan) + else: + return s.values @pytest.mark.parametrize('df', dfs) @@ -685,6 +707,10 @@ def test_bug_570(): }], 'x', 'y', 1) ]) def test_line_manual_range(DataFrame, df_args, x, y, ax): + if cudf and DataFrame is cudf_DataFrame: + if isinstance(getattr(df_args[0].get('x', []), 'dtype', ''), RaggedDtype): + pytest.skip("cudf DataFrames do not support extension types") + df = DataFrame(*df_args) axis = ds.core.LinearAxis() @@ -757,6 +783,10 @@ def test_line_manual_range(DataFrame, df_args, x, y, ax): }], 'x', 'y', 1) ]) def test_line_autorange(DataFrame, df_args, x, y, ax): + if cudf and DataFrame is cudf_DataFrame: + if isinstance(getattr(df_args[0].get('x', []), 'dtype', ''), RaggedDtype): + pytest.skip("cudf DataFrames do not support extension types") + df = DataFrame(*df_args) axis = ds.core.LinearAxis() @@ -922,6 +952,10 @@ def test_line_autorange_axis1_ragged(): }), 'x', 'y', 1) ]) def test_area_to_zero_fixedrange(DataFrame, df_kwargs, x, y, ax): + if cudf and DataFrame is cudf_DataFrame: + if isinstance(getattr(df_kwargs['data'].get('x', []), 'dtype', ''), RaggedDtype): + pytest.skip("cudf DataFrames do not support extension types") + df = DataFrame(**df_kwargs) axis = ds.core.LinearAxis() @@ -996,6 +1030,10 @@ def test_area_to_zero_fixedrange(DataFrame, df_kwargs, x, y, ax): }), 'x', 'y', 1) ]) def test_area_to_zero_autorange(DataFrame, df_kwargs, x, y, ax): + if cudf and DataFrame is cudf_DataFrame: + if isinstance(getattr(df_kwargs['data'].get('x', []), 'dtype', ''), RaggedDtype): + pytest.skip("cudf DataFrames do not support extension types") + df = DataFrame(**df_kwargs) axis = ds.core.LinearAxis() @@ -1057,6 +1095,10 @@ def test_area_to_zero_autorange(DataFrame, df_kwargs, x, y, ax): }), 'x', 'y', 1) ]) def test_area_to_zero_autorange_gap(DataFrame, df_kwargs, x, y, ax): + if cudf and DataFrame is cudf_DataFrame: + if isinstance(getattr(df_kwargs['data'].get('x', []), 'dtype', ''), RaggedDtype): + pytest.skip("cudf DataFrames do not support extension types") + df = DataFrame(**df_kwargs) axis = ds.core.LinearAxis() @@ -1142,6 +1184,10 @@ def test_area_to_zero_autorange_gap(DataFrame, df_kwargs, x, y, ax): }), 'x', 'y', 'y_stack', 1) ]) def test_area_to_line_autorange(DataFrame, df_kwargs, x, y, y_stack, ax): + if cudf and DataFrame is cudf_DataFrame: + if isinstance(getattr(df_kwargs['data'].get('x', []), 'dtype', ''), RaggedDtype): + pytest.skip("cudf DataFrames do not support extension types") + df = DataFrame(**df_kwargs) axis = ds.core.LinearAxis() diff --git a/datashader/tests/test_transfer_functions.py b/datashader/tests/test_transfer_functions.py index 0c231e950..8554ae3a4 100644 --- a/datashader/tests/test_transfer_functions.py +++ b/datashader/tests/test_transfer_functions.py @@ -27,8 +27,15 @@ def build_agg(array_module=np): agg = xr.Dataset(dict(a=s_a, b=s_b, c=s_c)) return agg -aggs = [build_agg(np)] -arrays = [np.array] + +try: + import cupy + aggs = [build_agg(np), build_agg(cupy)] + arrays = [np.array, cupy.array] +except ImportError: + cupy = None + aggs = [build_agg(np)] + arrays = [np.array] int_span = [11, 17] float_span = [11.0, 17.0] diff --git a/datashader/transfer_functions.py b/datashader/transfer_functions/__init__.py similarity index 94% rename from datashader/transfer_functions.py rename to datashader/transfer_functions/__init__.py index 572cf754e..717ace673 100644 --- a/datashader/transfer_functions.py +++ b/datashader/transfer_functions/__init__.py @@ -9,22 +9,28 @@ import xarray as xr from PIL.Image import fromarray +from datashader.colors import rgb, Sets1to3 +from datashader.composite import composite_op_lookup, over +from datashader.utils import ngjit, orient_array -from .colors import rgb, Sets1to3 -from .composite import composite_op_lookup, over -from .utils import ngjit, orient_array - +try: + import cupy +except ImportError: + cupy = None __all__ = ['Image', 'stack', 'shade', 'set_background', 'spread', 'dynspread'] class Image(xr.DataArray): - __slots__ = () + __slots__ = () __array_priority__ = 70 border=1 - + def to_pil(self, origin='lower'): - arr = np.flipud(self.data) if origin == 'lower' else self.data + data = self.data + if cupy: + data = cupy.asnumpy(data) + arr = np.flipud(data) if origin == 'lower' else data return fromarray(arr, 'RGBA') def to_bytesio(self, format='png', origin='lower'): @@ -58,8 +64,8 @@ class Images(object): A list of HTML-representable objects to display in a table. Primarily intended for Image objects, but could be anything that has _repr_html_. - """ - + """ + def __init__(self, *images): """Makes an HTML table from a list of HTML-representable arguments.""" for i in images: @@ -74,7 +80,7 @@ def cols(self,n): """ self.num_cols=n return self - + def _repr_html_(self): """Supports rich display in a Jupyter notebook, using an HTML table""" htmls = [] @@ -82,14 +88,14 @@ def _repr_html_(self): tr="""""" for i in self.images: label=i.name if hasattr(i,"name") and i.name is not None else "" - - htmls.append("""""" + label + + + htmls.append("""""" + label + """

{0}""".format(i._repr_html_())) col+=1 if self.num_cols is not None and col>=self.num_cols: col=0 htmls.append(""+tr) - + return """"""+ tr +\ "".join(htmls) + """
""" @@ -116,7 +122,7 @@ def stack(*imgs, **kwargs): if len(imgs) == 1: return imgs[0] imgs = xr.align(*imgs, copy=False, join='outer') - with np.errstate(divide='ignore', invalid='ignore'): + with np.errstate(divide='ignore', invalid='ignore'): out = tz.reduce(tz.flip(op), [i.data for i in imgs]) return Image(out, coords=imgs[0].coords, dims=imgs[0].dims, name=name) @@ -143,8 +149,10 @@ def eq_hist(data, mask=None, nbins=256*256): ---------- .. [1] http://scikit-image.org/docs/stable/api/skimage.exposure.html#equalize-hist """ - if not isinstance(data, np.ndarray): - raise TypeError("data must be np.ndarray") + if cupy and isinstance(data, cupy.ndarray): + from._cuda_utils import interp + elif not isinstance(data, np.ndarray): + raise TypeError("data must be an ndarray") else: interp = np.interp @@ -211,7 +219,11 @@ def masked_clip_2d(data, mask, lower, upper): data[i, j] = upper def _interpolate(agg, cmap, how, alpha, span, min_alpha, name): - interp = np.interp + if cupy and isinstance(agg.data, cupy.ndarray): + from ._cuda_utils import masked_clip_2d, interp + else: + from ._cpu_utils import masked_clip_2d + interp = np.interp if agg.ndim != 2: raise ValueError("agg must be 2D") @@ -259,9 +271,9 @@ def _interpolate(agg, cmap, how, alpha, span, min_alpha, name): # For eq_hist to work with span, we'll need to store the histogram # from the data and then apply it to the span argument. raise ValueError("span is not (yet) valid to use with eq_hist") - + span = interpolater([0, span[1] - span[0]], 0) - + if isinstance(cmap, Iterator): cmap = list(cmap) if isinstance(cmap, list): @@ -291,12 +303,20 @@ def _interpolate(agg, cmap, how, alpha, span, min_alpha, name): img = rgba.view(np.uint32).reshape(data.shape) + if cupy and isinstance(img, cupy.ndarray): + # Convert cupy array to numpy for final image + img = cupy.asnumpy(img) + return Image(img, coords=agg.coords, dims=agg.dims, name=name) def _colorize(agg, color_key, how, min_alpha, name): - interp = np.interp - array = np.array + if cupy and isinstance(agg.data, cupy.ndarray): + from ._cuda_utils import interp + array = cupy.array + else: + interp = np.interp + array = np.array if not agg.ndim == 3: raise ValueError("agg must be 3D") @@ -333,6 +353,10 @@ def _colorize(agg, color_key, how, min_alpha, name): r[mask] = g[mask] = b[mask] = 255 values = np.dstack([r, g, b, a]).view(np.uint32).reshape(a.shape) + if cupy and isinstance(values, cupy.ndarray): + # Convert cupy array to numpy for final image + values = cupy.asnumpy(values) + return Image(values, dims=agg.dims[:-1], coords=OrderedDict([ @@ -402,13 +426,13 @@ def shade(agg, cmap=["lightblue", "darkblue"], color_key=Sets1to3, Min and max data values to use for colormap interpolation, when wishing to override autoranging. name : string name, optional - Optional string name to give to the Image object to return, + Optional string name to give to the Image object to return, to label results for display. """ if not isinstance(agg, xr.DataArray): raise TypeError("agg must be instance of DataArray") name = agg.name if name is None else name - + if agg.ndim == 2: return _interpolate(agg, cmap, how, alpha, span, min_alpha, name) elif agg.ndim == 3: @@ -459,7 +483,7 @@ def spread(img, px=1, shape='circle', how='over', mask=None, name=None): with odd dimensions. Pixels are spread from the center of the mask to locations where the mask is True. name : string name, optional - Optional string name to give to the Image object to return, + Optional string name to give to the Image object to return, to label results for display. """ if not isinstance(img, Image): diff --git a/datashader/transfer_functions/_cpu_utils.py b/datashader/transfer_functions/_cpu_utils.py new file mode 100644 index 000000000..162f95ab6 --- /dev/null +++ b/datashader/transfer_functions/_cpu_utils.py @@ -0,0 +1,35 @@ +from datashader.utils import ngjit + + +@ngjit +def masked_clip_2d(data, mask, lower, upper): + """ + Clip the elements of an input array between lower and upper bounds, + skipping over elements that are masked out. + + Parameters + ---------- + data: np.ndarray + Numeric ndarray that will be clipped in-place + mask: np.ndarray + Boolean ndarray where True values indicate elements that should be + skipped + lower: int or float + Lower bound to clip to + upper: int or float + Upper bound to clip to + + Returns + ------- + None + data array is modified in-place + """ + for i in range(data.shape[0]): + for j in range(data.shape[1]): + if mask[i, j]: + continue + val = data[i, j] + if val < lower: + data[i, j] = lower + elif val > upper: + data[i, j] = upper diff --git a/datashader/transfer_functions/_cuda_utils.py b/datashader/transfer_functions/_cuda_utils.py new file mode 100644 index 000000000..523f0ebe5 --- /dev/null +++ b/datashader/transfer_functions/_cuda_utils.py @@ -0,0 +1,139 @@ +from __future__ import division +from math import ceil, isnan, nan +from numba import cuda +import cupy +import numpy as np + + +if cupy.result_type is np.result_type: + # Workaround until cupy release of https://github.com/cupy/cupy/pull/2249 + # Without this, cupy.histogram raises an error that cupy.result_type + # is not defined. + cupy.result_type = lambda *args: np.result_type( + *[arg.dtype if isinstance(arg, cupy.ndarray) else arg + for arg in args] + ) + + +def cuda_args(shape): + """ + Compute the blocks-per-grid and threads-per-block parameters for use when + invoking cuda kernels + + Parameters + ---------- + shape: int or tuple of ints + The shape of the input array that the kernel will parallelize over + + Returns + ------- + tuple + Tuple of (blocks_per_grid, threads_per_block) + """ + if isinstance(shape, int): + shape = (shape,) + + max_threads = cuda.get_current_device().MAX_THREADS_PER_BLOCK + # Note: We divide max_threads by 2.0 to leave room for the registers + # occupied by the kernel. For some discussion, see + # https://github.com/numba/numba/issues/3798. + threads_per_block = int(ceil(max_threads / 2.0) ** (1.0 / len(shape))) + tpb = (threads_per_block,) * len(shape) + bpg = tuple(int(ceil(d / threads_per_block)) for d in shape) + return bpg, tpb + + +# masked_clip_2d +# -------------- +def masked_clip_2d(data, mask, lower, upper): + """ + Clip the elements of an input array between lower and upper bounds, + skipping over elements that are masked out. + + Parameters + ---------- + data: cupy.ndarray + Numeric ndarray that will be clipped in-place + mask: cupy.ndarray + Boolean ndarray where True values indicate elements that should be + skipped + lower: int or float + Lower bound to clip to + upper: int or float + Upper bound to clip to + + Returns + ------- + None + data array is modified in-place + """ + masked_clip_2d_kernel[cuda_args(data.shape)](data, mask, lower, upper) + + +@cuda.jit +def masked_clip_2d_kernel(data, mask, lower, upper): + i, j = cuda.grid(2) + maxi, maxj = data.shape + if i >= 0 and i < maxi and j >= 0 and j < maxj and not mask[i, j]: + cuda.atomic.max(data, (i, j), lower) + cuda.atomic.min(data, (i, j), upper) + + +# interp +# ------ +# When cupy adds cupy.interp support, this function can be removed +def interp(x, xp, fp, left=None, right=None): + """ + cupy implementation of np.interp. This function can be removed when an + official cupy.interp function is added to the cupy library. + """ + output_y = cupy.zeros(x.shape, dtype=cupy.float64) + assert len(x.shape) == 2 + if left is None: + left = fp[0] + left = float(left) + if right is None: + right = fp[-1] + right = float(right) + interp2d_kernel[cuda_args(x.shape)]( + x.astype(cupy.float64), xp.astype(cupy.float64), fp.astype(cupy.float64), left, right, output_y + ) + return output_y + + +@cuda.jit +def interp2d_kernel(x, xp, fp, left, right, output_y): + i, j = cuda.grid(2) + if i < x.shape[0] and j < x.shape[1]: + xval = x[i, j] + + if isnan(xval): + output_y[i, j] = nan + elif xval < xp[0]: + output_y[i, j] = left + elif xval >= xp[-1]: + output_y[i, j] = right + else: + # Find indices of xp that straddle xval + upper_i = len(xp) - 1 + lower_i = 0 + while True: + stop_i = 1 + (lower_i + upper_i) // 2 + if xp[stop_i] < xval: + lower_i = stop_i + elif xp[stop_i - 1] > xval: + upper_i = stop_i - 1 + else: + break + + # Compute interpolate y value + x0 = xp[stop_i - 1] + x1 = xp[stop_i] + y0 = fp[stop_i - 1] + y1 = fp[stop_i] + + slope = (y1 - y0) / (x1 - x0) + y_interp = y0 + slope * (xval - x0) + + # Update output + output_y[i, j] = y_interp diff --git a/datashader/utils.py b/datashader/utils.py index 6f8b679fa..10429ce8b 100644 --- a/datashader/utils.py +++ b/datashader/utils.py @@ -420,11 +420,16 @@ def dshape_from_pandas(df): def dshape_from_dask(df): """Return a datashape.DataShape object given a dask dataframe.""" cat_columns = [ - col for col in df.columns if (isinstance(df[col].dtype, type(pd.Categorical.dtype)) - or isinstance(df[col].dtype, pd.api.types.CategoricalDtype)) and not df[col].cat.known] + col for col in df.columns + if (isinstance(df[col].dtype, type(pd.Categorical.dtype)) or + isinstance(df[col].dtype, pd.api.types.CategoricalDtype)) + and not getattr(df['cat'].cat, 'known', True)] df = df.categorize(cat_columns, index=False) - return datashape.var * datashape.Record([(k, dshape_from_pandas_helper(df[k])) - for k in df.columns]) + # get_partition(0) used below because categories are sometimes repeated + # for dask-cudf DataFrames with multiple partitions + return datashape.var * datashape.Record([ + (k, dshape_from_pandas_helper(df[k].get_partition(0))) for k in df.columns + ]) def dshape_from_xarray_dataset(xr_ds):