Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable backend dispatching for Dask-DataFrame creation #11920

Merged
merged 27 commits into from
Oct 20, 2022
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
5c6b86f
define and register CudfBackendEntrypoint entrypoint
rjzamora Mar 8, 2022
2001c29
formatting
rjzamora Mar 8, 2022
a4e9292
Merge remote-tracking branch 'upstream/branch-22.10' into backend-class
rjzamora Sep 8, 2022
9afc190
tweak class names
rjzamora Sep 9, 2022
e4e3639
move fallback into backend entrypoints and revert to DaskBackendEntry…
rjzamora Sep 12, 2022
fbaa4ee
align CudfBackendEntrypoint with latest backend-class branch in dask
rjzamora Sep 12, 2022
8fe6ca8
fix bug in CudfBackendEntrypoint
rjzamora Sep 13, 2022
17919bd
remove timeseries dispatch for now
rjzamora Sep 13, 2022
81f8c36
define from_dict
rjzamora Sep 13, 2022
c7b7888
fix DaskDataFrameBackendEntrypoint import
rjzamora Sep 19, 2022
784dc38
fix backend typo
rjzamora Sep 27, 2022
1a72c55
Merge remote-tracking branch 'upstream/branch-22.10' into backend-class
rjzamora Oct 7, 2022
545a1a2
Merge remote-tracking branch 'upstream/branch-22.12' into backend-class
rjzamora Oct 7, 2022
f4704f4
update entrypoint path
rjzamora Oct 14, 2022
76118a8
cleanup - still need to add specific backend-dispatching tessts
rjzamora Oct 14, 2022
3bb4701
Merge remote-tracking branch 'upstream/branch-22.12' into backend-class
rjzamora Oct 14, 2022
964e53a
formatting
rjzamora Oct 14, 2022
0f7d2f7
remove extra line
rjzamora Oct 14, 2022
41bc7ce
more formatting
rjzamora Oct 14, 2022
e4313e9
remove redundant entrypoint info
rjzamora Oct 14, 2022
22e53d3
add test coverage
rjzamora Oct 17, 2022
540f469
use DASK_BACKEND_SUPPORT
rjzamora Oct 17, 2022
7fb5272
code review
rjzamora Oct 17, 2022
8d0926c
improve documentation
rjzamora Oct 18, 2022
672024a
typo fix
rjzamora Oct 18, 2022
ffc444a
adding from_dask_array and from_array preemptively
rjzamora Oct 18, 2022
3719558
Roll back from_dask_array and from_array for now
rjzamora Oct 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions python/dask_cudf/dask_cudf/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import pyarrow as pa
from pandas.api.types import is_scalar

import dask.dataframe as dd
from dask import config
from dask.dataframe.core import get_parallel_type, meta_nonempty
from dask.dataframe.dispatch import (
categorical_dtype_dispatch,
Expand Down Expand Up @@ -426,3 +428,70 @@ def sizeof_cudf_dataframe(df):
@_dask_cudf_nvtx_annotate
def sizeof_cudf_series_index(obj):
return obj.memory_usage()


try:

# Define "cudf" backend engine to be registered with Dask
from dask.dataframe.backends import DataFrameBackendEntrypoint

class CudfBackendEntrypoint(DataFrameBackendEntrypoint):
@staticmethod
def from_dict(data, npartitions, orient="columns", **kwargs):
from dask_cudf import from_cudf

if orient != "columns":
raise ValueError(f"orient={orient} is not supported")
# TODO: Use cudf.from_dict
# (See: https://github.com/rapidsai/cudf/issues/11934)
return from_cudf(
cudf.DataFrame(data),
npartitions=npartitions,
)

@staticmethod
def read_parquet(*args, engine=None, **kwargs):
from dask_cudf.io.parquet import CudfEngine

with config.set({"dataframe.backend": "pandas"}):
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
return dd.read_parquet(
*args,
engine=CudfEngine,
**kwargs,
)

@staticmethod
def read_json(*args, engine=None, **kwargs):
with config.set({"dataframe.backend": "pandas"}):
return dd.read_json(*args, engine=cudf.read_json, **kwargs)

@staticmethod
def read_orc(*args, **kwargs):
from dask_cudf.io import read_orc

return read_orc(*args, **kwargs)

@staticmethod
def read_csv(*args, **kwargs):
from dask_cudf.io import read_csv

chunksize = kwargs.pop("chunksize", None)
blocksize = kwargs.pop("blocksize", "default")
wence- marked this conversation as resolved.
Show resolved Hide resolved
if chunksize is None and blocksize != "default":
chunksize = blocksize
return read_csv(
*args,
chunksize=chunksize,
**kwargs,
)

@staticmethod
def read_hdf(*args, **kwargs):
from dask_cudf import from_dask_dataframe

# HDF5 reader not yet implemented in cudf
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
with config.set({"dataframe.backend": "pandas"}):
return from_dask_dataframe(dd.read_hdf(*args, **kwargs))

except ImportError:
pass
23 changes: 12 additions & 11 deletions python/dask_cudf/dask_cudf/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import math
import warnings
from distutils.version import LooseVersion

import numpy as np
import pandas as pd
from packaging.version import parse as parse_version
from tlz import partition_all

import dask
Expand All @@ -31,7 +31,11 @@
from dask_cudf.accessors import ListMethods, StructMethods
from dask_cudf.sorting import _get_shuffle_type

DASK_VERSION = LooseVersion(dask.__version__)
DASK_BACKEND_SUPPORT = parse_version(dask.__version__) >= parse_version(
"2022.10.0"
)
# TODO: Remove DASK_BACKEND_SUPPORT throughout codebase
# when dask_cudf is pinned to dask>=2022.10.0


class _Frame(dd.core._Frame, OperatorMethodMixin):
Expand Down Expand Up @@ -736,7 +740,7 @@ def from_dask_dataframe(df):
return df.map_partitions(cudf.from_pandas)


for name in [
for name in (
"add",
"sub",
"mul",
Expand All @@ -751,16 +755,13 @@ def from_dask_dataframe(df):
"rfloordiv",
"rmod",
"rpow",
]:
):
meth = getattr(cudf.DataFrame, name)
kwargs = {"original": cudf.DataFrame} if DASK_VERSION >= "2.11.1" else {}
DataFrame._bind_operator_method(name, meth, **kwargs)
DataFrame._bind_operator_method(name, meth, original=cudf.Series)

meth = getattr(cudf.Series, name)
kwargs = {"original": cudf.Series} if DASK_VERSION >= "2.11.1" else {}
Series._bind_operator_method(name, meth, **kwargs)
Series._bind_operator_method(name, meth, original=cudf.Series)

for name in ["lt", "gt", "le", "ge", "ne", "eq"]:
for name in ("lt", "gt", "le", "ge", "ne", "eq"):
meth = getattr(cudf.Series, name)
kwargs = {"original": cudf.Series} if DASK_VERSION >= "2.11.1" else {}
Series._bind_comparison_method(name, meth, **kwargs)
Series._bind_comparison_method(name, meth, original=cudf.Series)
16 changes: 16 additions & 0 deletions python/dask_cudf/dask_cudf/io/tests/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,22 @@
import dask_cudf


@pytest.mark.skipif(
not dask_cudf.core.DASK_BACKEND_SUPPORT,
reason="No backend-dispatch support",
)
def test_csv_roundtrip_backend_dispatch(tmp_path):
# Test ddf.read_csv cudf-backend dispatch
df = cudf.DataFrame({"x": [1, 2, 3, 4], "id": ["a", "b", "c", "d"]})
ddf = dask_cudf.from_cudf(df, npartitions=2)
csv_path = str(tmp_path / "data-*.csv")
ddf.to_csv(csv_path, index=False)
with dask.config.set({"dataframe.backend": "cudf"}):
ddf2 = dd.read_csv(csv_path)
assert isinstance(ddf2, dask_cudf.DataFrame)
dd.assert_eq(ddf, ddf2, check_divisions=False, check_index=False)


def test_csv_roundtrip(tmp_path):
df = cudf.DataFrame({"x": [1, 2, 3, 4], "id": ["a", "b", "c", "d"]})
ddf = dask_cudf.from_cudf(df, npartitions=2)
Expand Down
17 changes: 17 additions & 0 deletions python/dask_cudf/dask_cudf/io/tests/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,23 @@
import dask_cudf


@pytest.mark.skipif(
not dask_cudf.core.DASK_BACKEND_SUPPORT,
reason="No backend-dispatch support",
)
def test_read_json_backend_dispatch(tmp_path):
# Test ddf.read_json cudf-backend dispatch
df1 = dask.datasets.timeseries(
dtypes={"x": int, "y": int}, freq="120s"
).reset_index(drop=True)
json_path = str(tmp_path / "data-*.json")
df1.to_json(json_path)
with dask.config.set({"dataframe.backend": "cudf"}):
df2 = dd.read_json(json_path)
assert isinstance(df2, dask_cudf.DataFrame)
dd.assert_eq(df1, df2)


def test_read_json(tmp_path):
df1 = dask.datasets.timeseries(
dtypes={"x": int, "y": int}, freq="120s"
Expand Down
16 changes: 14 additions & 2 deletions python/dask_cudf/dask_cudf/io/tests/test_orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,30 @@

import pytest

import dask
from dask import dataframe as dd

import cudf

import dask_cudf

# import pyarrow.orc as orc

cur_dir = os.path.dirname(__file__)
sample_orc = os.path.join(cur_dir, "data/orc/sample.orc")


@pytest.mark.skipif(
not dask_cudf.core.DASK_BACKEND_SUPPORT,
reason="No backend-dispatch support",
)
def test_read_orc_backend_dispatch():
# Test ddf.read_orc cudf-backend dispatch
df1 = cudf.read_orc(sample_orc)
with dask.config.set({"dataframe.backend": "cudf"}):
df2 = dd.read_orc(sample_orc)
assert isinstance(df2, dask_cudf.DataFrame)
dd.assert_eq(df1, df2, check_index=False)


def test_read_orc_defaults():
df1 = cudf.read_orc(sample_orc)
df2 = dask_cudf.read_orc(sample_orc)
Expand Down
14 changes: 14 additions & 0 deletions python/dask_cudf/dask_cudf/io/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,20 @@ def _divisions(setting):
return {"gather_statistics": setting}


@pytest.mark.skipif(
not dask_cudf.core.DASK_BACKEND_SUPPORT,
reason="No backend-dispatch support",
)
def test_roundtrip_backend_dispatch(tmpdir):
# Test ddf.read_parquet cudf-backend dispatch
tmpdir = str(tmpdir)
ddf.to_parquet(tmpdir, engine="pyarrow")
with dask.config.set({"dataframe.backend": "cudf"}):
ddf2 = dd.read_parquet(tmpdir, index=False)
assert isinstance(ddf2, dask_cudf.DataFrame)
dd.assert_eq(ddf.reset_index(drop=False), ddf2)


@pytest.mark.parametrize("write_metadata_file", [True, False])
@pytest.mark.parametrize("divisions", [True, False])
def test_roundtrip_from_dask(tmpdir, divisions, write_metadata_file):
Expand Down
17 changes: 17 additions & 0 deletions python/dask_cudf/dask_cudf/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,23 @@
import dask_cudf as dgd


@pytest.mark.skipif(
not dgd.core.DASK_BACKEND_SUPPORT, reason="No backend-dispatch support"
)
def test_from_dict_backend_dispatch():
# Test ddf.from_dict cudf-backend dispatch
np.random.seed(0)
data = {
"x": np.random.randint(0, 5, size=10000),
"y": np.random.normal(size=10000),
}
expect = cudf.DataFrame(data)
with dask.config.set({"dataframe.backend": "cudf"}):
ddf = dd.from_dict(data, npartitions=2)
assert isinstance(ddf, dgd.DataFrame)
dd.assert_eq(expect, ddf)


def test_from_cudf():
np.random.seed(0)

Expand Down
4 changes: 4 additions & 0 deletions python/dask_cudf/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,7 @@ skip=
buck-out
build
dist

[options.entry_points]
dask.dataframe.backends =
cudf = dask_cudf.backends:CudfBackendEntrypoint