Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial GPU support #793

Merged
merged 1 commit into from
Oct 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 19 additions & 14 deletions datashader/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@


@memoize
def compile_components(agg, schema, glyph):
def compile_components(agg, schema, glyph, cuda=False):
"""Given a ``Aggregation`` object and a schema, return 5 sub-functions.

Parameters
Expand Down Expand Up @@ -51,20 +51,20 @@ def compile_components(agg, schema, glyph):
reds = list(traverse_aggregation(agg))

# List of base reductions (actually computed)
bases = list(unique(concat(r._build_bases() for r in reds)))
bases = list(unique(concat(r._build_bases(cuda) for r in reds)))
dshapes = [b.out_dshape(schema) for b in bases]
# List of tuples of (append, base, input columns, temps)
calls = [_get_call_tuples(b, d, schema) for (b, d) in zip(bases, dshapes)]
calls = [_get_call_tuples(b, d, schema, cuda) for (b, d) in zip(bases, dshapes)]
# List of unique column names needed
cols = list(unique(concat(pluck(2, calls))))
# List of temps needed
temps = list(pluck(3, calls))

create = make_create(bases, dshapes)
create = make_create(bases, dshapes, cuda)
info = make_info(cols)
append = make_append(bases, cols, calls, glyph)
combine = make_combine(bases, dshapes, temps)
finalize = make_finalize(bases, agg, schema)
finalize = make_finalize(bases, agg, schema, cuda)

return create, info, append, combine, finalize

Expand All @@ -79,13 +79,18 @@ def traverse_aggregation(agg):
yield agg


def _get_call_tuples(base, dshape, schema):
return base._build_append(dshape, schema), (base,), base.inputs, base._build_temps()
def _get_call_tuples(base, dshape, schema, cuda):
return (base._build_append(dshape, schema, cuda),
(base,), base.inputs, base._build_temps(cuda))


def make_create(bases, dshapes):
def make_create(bases, dshapes, cuda):
creators = [b._build_create(d) for (b, d) in zip(bases, dshapes)]
array_module = np
if cuda:
import cupy
array_module = cupy
else:
array_module = np
return lambda shape: tuple(c(shape, array_module) for c in creators)


Expand Down Expand Up @@ -145,22 +150,22 @@ def combine(base_tuples):
return combine


def make_finalize(bases, agg, schema):
def make_finalize(bases, agg, schema, cuda):
arg_lk = dict((k, v) for (v, k) in enumerate(bases))
if isinstance(agg, summary):
calls = []
for key, val in zip(agg.keys, agg.values):
f = make_finalize(bases, val, schema)
f = make_finalize(bases, val, schema, cuda)
try:
# Override bases if possible
bases = val._build_bases()
bases = val._build_bases(cuda)
except AttributeError:
pass
inds = [arg_lk[b] for b in bases]
calls.append((key, f, inds))

def finalize(bases, **kwargs):
data = {key: finalizer(get(inds, bases), **kwargs)
def finalize(bases, cuda=False, **kwargs):
data = {key: finalizer(get(inds, bases), cuda, **kwargs)
for (key, finalizer, inds) in calls}
return xr.Dataset(data)
return finalize
Expand Down
14 changes: 12 additions & 2 deletions datashader/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@
from .resampling import resample_2d, resample_2d_distributed
from . import reductions as rd

try:
import cudf
except ImportError:
cudf = None

try:
import dask_cudf
except ImportError:
dask_cudf = None

class Axis(object):
"""Interface for implementing axis transformations.
Expand Down Expand Up @@ -107,7 +116,7 @@ class LogAxis(Axis):
@staticmethod
@ngjit
def mapper(val):
return log10(val)
return log10(float(val))

@staticmethod
@ngjit
Expand Down Expand Up @@ -993,7 +1002,8 @@ def bypixel(source, canvas, glyph, agg):
source = source.drop([col for col in columns if col not in cols_to_keep])
source = source.to_dask_dataframe()

if isinstance(source, pd.DataFrame):
if (isinstance(source, pd.DataFrame) or
(cudf and isinstance(source, cudf.DataFrame))):
# Avoid datashape.Categorical instantiation bottleneck
# by only retaining the necessary columns:
# https://github.com/bokeh/datashader/issues/396
Expand Down
11 changes: 11 additions & 0 deletions datashader/data_libraries/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,14 @@
from . import dask # noqa (API import)
except ImportError:
pass

try:
import cudf as _cudf # noqa (Test cudf installed)
import cupy as _cupy # noqa (Test cupy installed)
from . import cudf # noqa (API import)

import dask_cudf as _dask_cudf # noqa (Test dask_cudf installed)
from . import dask_cudf # noqa (API import)

except ImportError:
pass
9 changes: 9 additions & 0 deletions datashader/data_libraries/cudf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from __future__ import absolute_import
from datashader.data_libraries.pandas import default
from datashader.core import bypixel
import cudf


@bypixel.pipeline.register(cudf.DataFrame)
def cudf_pipeline(df, schema, canvas, glyph, summary):
return default(glyph, df, schema, canvas, summary, cuda=True)
24 changes: 14 additions & 10 deletions datashader/data_libraries/dask.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import absolute_import, division

import dask
import pandas as pd
import dask.dataframe as dd
from collections import OrderedDict
from dask.base import tokenize, compute
Expand All @@ -16,8 +15,8 @@


@bypixel.pipeline.register(dd.DataFrame)
def dask_pipeline(df, schema, canvas, glyph, summary):
dsk, name = glyph_dispatch(glyph, df, schema, canvas, summary)
def dask_pipeline(df, schema, canvas, glyph, summary, cuda=False):
dsk, name = glyph_dispatch(glyph, df, schema, canvas, summary, cuda=cuda)

# Get user configured scheduler (if any), or fall back to default
# scheduler for dask DataFrame
Expand Down Expand Up @@ -61,12 +60,12 @@ def shape_bounds_st_and_axis(df, canvas, glyph):


@glyph_dispatch.register(Glyph)
def default(glyph, df, schema, canvas, summary):
def default(glyph, df, schema, canvas, summary, cuda=False):
shape, bounds, st, axis = shape_bounds_st_and_axis(df, canvas, glyph)

# Compile functions
create, info, append, combine, finalize = \
compile_components(summary, schema, glyph)
compile_components(summary, schema, glyph, cuda=cuda)
x_mapper = canvas.x_axis.mapper
y_mapper = canvas.y_axis.mapper
extend = glyph._build_extend(x_mapper, y_mapper, info, append)
Expand All @@ -81,25 +80,30 @@ def chunk(df):
keys2 = [(name, i) for i in range(len(keys))]
dsk = dict((k2, (chunk, k)) for (k2, k) in zip(keys2, keys))
dsk[name] = (apply, finalize, [(combine, keys2)],
dict(coords=axis, dims=[glyph.y_label, glyph.x_label]))
dict(cuda=cuda, coords=axis, dims=[glyph.y_label, glyph.x_label]))
return dsk, name


@glyph_dispatch.register(LineAxis0)
def line(glyph, df, schema, canvas, summary):
def line(glyph, df, schema, canvas, summary, cuda=False):
if cuda:
from cudf import concat
else:
from pandas import concat

shape, bounds, st, axis = shape_bounds_st_and_axis(df, canvas, glyph)

# Compile functions
create, info, append, combine, finalize = \
compile_components(summary, schema, glyph)
compile_components(summary, schema, glyph, cuda=cuda)
x_mapper = canvas.x_axis.mapper
y_mapper = canvas.y_axis.mapper
extend = glyph._build_extend(x_mapper, y_mapper, info, append)

def chunk(df, df2=None):
plot_start = True
if df2 is not None:
df = pd.concat([df.iloc[-1:], df2])
df = concat([df.iloc[-1:], df2])
plot_start = False
aggs = create(shape)
extend(aggs, df, st, bounds, plot_start=plot_start)
Expand All @@ -112,5 +116,5 @@ def chunk(df, df2=None):
dsk[(name, i)] = (chunk, (old_name, i - 1), (old_name, i))
keys2 = [(name, i) for i in range(df.npartitions)]
dsk[name] = (apply, finalize, [(combine, keys2)],
dict(coords=axis, dims=[glyph.y_label, glyph.x_label]))
dict(cuda=cuda, coords=axis, dims=[glyph.y_label, glyph.x_label]))
return dsk, name
9 changes: 9 additions & 0 deletions datashader/data_libraries/dask_cudf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from __future__ import absolute_import
from datashader.data_libraries.dask import dask_pipeline
from datashader.core import bypixel
import dask_cudf


@bypixel.pipeline.register(dask_cudf.DataFrame)
def dask_cudf_pipeline(df, schema, canvas, glyph, summary):
return dask_pipeline(df, schema, canvas, glyph, summary, cuda=True)
5 changes: 3 additions & 2 deletions datashader/data_libraries/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ def pandas_pipeline(df, schema, canvas, glyph, summary):

@glyph_dispatch.register(_PointLike)
@glyph_dispatch.register(_AreaToLineLike)
def default(glyph, source, schema, canvas, summary):
create, info, append, _, finalize = compile_components(summary, schema, glyph)
def default(glyph, source, schema, canvas, summary, cuda=False):
create, info, append, _, finalize = compile_components(summary, schema, glyph, cuda)
x_mapper = canvas.x_axis.mapper
y_mapper = canvas.y_axis.mapper
extend = glyph._build_extend(x_mapper, y_mapper, info, append)
Expand All @@ -44,6 +44,7 @@ def default(glyph, source, schema, canvas, summary):
extend(bases, source, x_st + y_st, x_range + y_range)

return finalize(bases,
cuda=cuda,
coords=OrderedDict([(glyph.x_label, x_axis),
(glyph.y_label, y_axis)]),
dims=[glyph.y_label, glyph.x_label])
Loading