From e2fcf1203f46364c460b5c5685b32a198f1f6c76 Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Mon, 11 Mar 2024 16:52:31 -0500 Subject: [PATCH] Introduce basic "cudf" backend for Dask Expressions (#14805) Mostly addresses https://github.com/rapidsai/cudf/issues/15027 https://github.com/dask-contrib/dask-expr/pull/728 exposed the necessary mechanisms for us to define a custom dask-expr backend for `cudf`. The new dispatching mechanisms are effectively the same as those in `dask.dataframe`. The only difference is that we are now registering/implementing "expression-based" collections. This PR does the following: - Defines a basic `DataFrameBackendEntrypoint` class for collection creation, and registers new collections using `get_collection_type`. - Refactors the `dask_cudf` import structure to properly support the `"dataframe.query-planning"` configuration. - Modifies CI to test dask-expr support for some of the `dask_cudf` tests. This coverage can be expanded in follow-up work. ~**Experimental Change**: This PR patches `dask_expr._expr.Expr.__new__` to enable type-based dispatching. This effectively allows us to surgically replace problematic `Expr` subclasses that do not work for cudf-backed data. For example, this PR replaces the upstream `TakeLast` expression to avoid using `squeeze` (since this method is not supported by cudf). This particular fix can be moved upstream relatively easily. However, having this kind of "patching" mechanism may be valuable for more complicated pandas/cudf discrepancies.~ ## Usage example ```python from dask import config config.set({"dataframe.query-planning": True}) import dask_cudf df = dask_cudf.DataFrame.from_dict( {"x": range(100), "y": [1, 2, 3, 4] * 25, "z": ["1", "2"] * 50}, npartitions=10, ) df["y2"] = df["x"] + df["y"] agg = df.groupby("y").agg({"y2": "mean"})["y2"] agg.simplify().pprint() ``` Dask cuDF should now be using dask-expr for "query planning": ``` Projection: columns='y2' GroupbyAggregation: arg={'y2': 'mean'} observed=True split_out=1'y' Assign: y2= Projection: columns=['y'] FromPandas: frame='' npartitions=10 columns=['x', 'y'] Add: Projection: columns='x' FromPandas: frame='' npartitions=10 columns=['x', 'y'] Projection: columns='y' FromPandas: frame='' npartitions=10 columns=['x', 'y'] ``` ## TODO - [x] Add basic tests - [x] Confirm that general design makes sense **Follow Up Work**: - Expand dask-expr test coverage - Fix local and upstream bugs - Add documentation once "critical mass" is reached Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) - Lawrence Mitchell (https://github.com/wence-) - Vyas Ramasubramani (https://github.com/vyasr) - Bradley Dice (https://github.com/bdice) Approvers: - Lawrence Mitchell (https://github.com/wence-) - Ray Douglass (https://github.com/raydouglass) URL: https://github.com/rapidsai/cudf/pull/14805 --- ci/test_python_other.sh | 8 ++ ci/test_wheel_dask_cudf.sh | 9 ++ python/dask_cudf/dask_cudf/__init__.py | 62 ++++++++-- python/dask_cudf/dask_cudf/backends.py | 63 +++++++++- python/dask_cudf/dask_cudf/core.py | 18 ++- python/dask_cudf/dask_cudf/expr/__init__.py | 22 ++++ .../dask_cudf/dask_cudf/expr/_collection.py | 110 ++++++++++++++++++ python/dask_cudf/dask_cudf/expr/_expr.py | 58 +++++++++ python/dask_cudf/dask_cudf/expr/_groupby.py | 48 ++++++++ .../dask_cudf/dask_cudf/io/tests/test_json.py | 4 + .../dask_cudf/dask_cudf/io/tests/test_orc.py | 4 + .../dask_cudf/io/tests/test_parquet.py | 28 +++-- .../dask_cudf/dask_cudf/io/tests/test_s3.py | 6 +- .../dask_cudf/dask_cudf/io/tests/test_text.py | 6 +- .../dask_cudf/tests/test_accessor.py | 8 +- .../dask_cudf/tests/test_applymap.py | 6 +- python/dask_cudf/dask_cudf/tests/test_core.py | 64 ++++------ .../dask_cudf/dask_cudf/tests/test_groupby.py | 89 +++++++------- python/dask_cudf/dask_cudf/tests/test_join.py | 4 +- .../dask_cudf/dask_cudf/tests/test_onehot.py | 6 +- .../dask_cudf/tests/test_reductions.py | 2 +- python/dask_cudf/dask_cudf/tests/test_sort.py | 23 +++- python/dask_cudf/dask_cudf/tests/utils.py | 16 ++- python/dask_cudf/pyproject.toml | 4 +- 24 files changed, 545 insertions(+), 123 deletions(-) create mode 100644 python/dask_cudf/dask_cudf/expr/__init__.py create mode 100644 python/dask_cudf/dask_cudf/expr/_collection.py create mode 100644 python/dask_cudf/dask_cudf/expr/_expr.py create mode 100644 python/dask_cudf/dask_cudf/expr/_groupby.py diff --git a/ci/test_python_other.sh b/ci/test_python_other.sh index 9cdceb295db..8ecd02f70a1 100755 --- a/ci/test_python_other.sh +++ b/ci/test_python_other.sh @@ -29,6 +29,14 @@ rapids-logger "pytest dask_cudf" --cov-report=xml:"${RAPIDS_COVERAGE_DIR}/dask-cudf-coverage.xml" \ --cov-report=term +# Run tests in dask_cudf/tests and dask_cudf/io/tests with dask-expr +rapids-logger "pytest dask_cudf + dask_expr" +DASK_DATAFRAME__QUERY_PLANNING=True ./ci/run_dask_cudf_pytests.sh \ + --junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cudf-expr.xml" \ + --numprocesses=8 \ + --dist=loadscope \ + . + rapids-logger "pytest custreamz" ./ci/run_custreamz_pytests.sh \ --junitxml="${RAPIDS_TESTS_DIR}/junit-custreamz.xml" \ diff --git a/ci/test_wheel_dask_cudf.sh b/ci/test_wheel_dask_cudf.sh index 59f6ecd8483..398eed43ea4 100755 --- a/ci/test_wheel_dask_cudf.sh +++ b/ci/test_wheel_dask_cudf.sh @@ -38,3 +38,12 @@ python -m pytest \ --numprocesses=8 \ . popd + +# Run tests in dask_cudf/tests and dask_cudf/io/tests with dask-expr +rapids-logger "pytest dask_cudf + dask_expr" +pushd python/dask_cudf/dask_cudf +DASK_DATAFRAME__QUERY_PLANNING=True python -m pytest \ + --junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cudf-expr.xml" \ + --numprocesses=8 \ + . +popd diff --git a/python/dask_cudf/dask_cudf/__init__.py b/python/dask_cudf/dask_cudf/__init__.py index c152a9e6a81..c66e85ed2af 100644 --- a/python/dask_cudf/dask_cudf/__init__.py +++ b/python/dask_cudf/dask_cudf/__init__.py @@ -1,29 +1,75 @@ -# Copyright (c) 2018-2023, NVIDIA CORPORATION. +# Copyright (c) 2018-2024, NVIDIA CORPORATION. +from dask import config + +# For dask>2024.2.0, we can silence the loud deprecation +# warning before importing `dask.dataframe` (this won't +# do anything for dask==2024.2.0) +config.set({"dataframe.query-planning-warning": False}) + +import dask.dataframe as dd from dask.dataframe import from_delayed import cudf from . import backends from ._version import __git_commit__, __version__ -from .core import DataFrame, Series, concat, from_cudf, from_dask_dataframe -from .groupby import groupby_agg -from .io import read_csv, read_json, read_orc, read_text, to_orc +from .core import concat, from_cudf, from_dask_dataframe +from .expr import QUERY_PLANNING_ON + + +def read_csv(*args, **kwargs): + with config.set({"dataframe.backend": "cudf"}): + return dd.read_csv(*args, **kwargs) + + +def read_json(*args, **kwargs): + with config.set({"dataframe.backend": "cudf"}): + return dd.read_json(*args, **kwargs) + + +def read_orc(*args, **kwargs): + with config.set({"dataframe.backend": "cudf"}): + return dd.read_orc(*args, **kwargs) + + +def read_parquet(*args, **kwargs): + with config.set({"dataframe.backend": "cudf"}): + return dd.read_parquet(*args, **kwargs) + + +def raise_not_implemented_error(attr_name): + def inner_func(*args, **kwargs): + raise NotImplementedError( + f"Top-level {attr_name} API is not available for dask-expr." + ) + + return inner_func + + +if QUERY_PLANNING_ON: + from .expr._collection import DataFrame, Index, Series + + groupby_agg = raise_not_implemented_error("groupby_agg") + read_text = raise_not_implemented_error("read_text") + to_orc = raise_not_implemented_error("to_orc") +else: + from .core import DataFrame, Index, Series + from .groupby import groupby_agg + from .io import read_text, to_orc -try: - from .io import read_parquet -except ImportError: - pass __all__ = [ "DataFrame", "Series", + "Index", "from_cudf", "from_dask_dataframe", "concat", "from_delayed", ] + if not hasattr(cudf.DataFrame, "mean"): cudf.DataFrame.mean = None del cudf diff --git a/python/dask_cudf/dask_cudf/backends.py b/python/dask_cudf/dask_cudf/backends.py index 317c45ba582..c7b4a1c4c6a 100644 --- a/python/dask_cudf/dask_cudf/backends.py +++ b/python/dask_cudf/dask_cudf/backends.py @@ -627,13 +627,68 @@ def read_csv(*args, **kwargs): @staticmethod def read_hdf(*args, **kwargs): - from dask_cudf import from_dask_dataframe - # HDF5 reader not yet implemented in cudf warnings.warn( "read_hdf is not yet implemented in cudf/dask_cudf. " "Moving to cudf from pandas. Expect poor performance!" ) - return from_dask_dataframe( - _default_backend(dd.read_hdf, *args, **kwargs) + return _default_backend(dd.read_hdf, *args, **kwargs).to_backend( + "cudf" + ) + + +# Define "cudf" backend entrypoint for dask-expr +class CudfDXBackendEntrypoint(DataFrameBackendEntrypoint): + """Backend-entrypoint class for Dask-Expressions + + This class is registered under the name "cudf" for the + ``dask-expr.dataframe.backends`` entrypoint in ``setup.cfg``. + Dask-DataFrame will use the methods defined in this class + in place of ``dask_expr.`` when the + "dataframe.backend" configuration is set to "cudf": + + Examples + -------- + >>> import dask + >>> import dask_expr + >>> with dask.config.set({"dataframe.backend": "cudf"}): + ... ddf = dx.from_dict({"a": range(10)}) + >>> type(ddf._meta) + + """ + + @classmethod + def to_backend_dispatch(cls): + return CudfBackendEntrypoint.to_backend_dispatch() + + @classmethod + def to_backend(cls, *args, **kwargs): + return CudfBackendEntrypoint.to_backend(*args, **kwargs) + + @staticmethod + def from_dict( + data, + npartitions, + orient="columns", + dtype=None, + columns=None, + constructor=cudf.DataFrame, + ): + import dask_expr as dx + + return _default_backend( + dx.from_dict, + data, + npartitions=npartitions, + orient=orient, + dtype=dtype, + columns=columns, + constructor=constructor, ) + + +# Import/register cudf-specific classes for dask-expr +try: + import dask_cudf.expr # noqa: F401 +except ImportError: + pass diff --git a/python/dask_cudf/dask_cudf/core.py b/python/dask_cudf/dask_cudf/core.py index b051b21790e..bfe58531a73 100644 --- a/python/dask_cudf/dask_cudf/core.py +++ b/python/dask_cudf/dask_cudf/core.py @@ -685,18 +685,27 @@ def reduction( @_dask_cudf_nvtx_annotate def from_cudf(data, npartitions=None, chunksize=None, sort=True, name=None): + from dask_cudf import QUERY_PLANNING_ON + if isinstance(getattr(data, "index", None), cudf.MultiIndex): raise NotImplementedError( "dask_cudf does not support MultiIndex Dataframes." ) - name = name or ("from_cudf-" + tokenize(data, npartitions or chunksize)) + # Dask-expr doesn't support the `name` argument + name = {} + if not QUERY_PLANNING_ON: + name = { + "name": name + or ("from_cudf-" + tokenize(data, npartitions or chunksize)) + } + return dd.from_pandas( data, npartitions=npartitions, chunksize=chunksize, sort=sort, - name=name, + **name, ) @@ -711,7 +720,10 @@ def from_cudf(data, npartitions=None, chunksize=None, sort=True, name=None): rather than pandas objects.\n """ ) - + textwrap.dedent(dd.from_pandas.__doc__) + # TODO: `dd.from_pandas.__doc__` is empty when + # `DASK_DATAFRAME__QUERY_PLANNING=True` + # since dask-expr does not provide a docstring for from_pandas. + + textwrap.dedent(dd.from_pandas.__doc__ or "") ) diff --git a/python/dask_cudf/dask_cudf/expr/__init__.py b/python/dask_cudf/dask_cudf/expr/__init__.py new file mode 100644 index 00000000000..c36dd0abcb9 --- /dev/null +++ b/python/dask_cudf/dask_cudf/expr/__init__.py @@ -0,0 +1,22 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from dask import config + +# Check if dask-dataframe is using dask-expr. +# For dask>=2024.3.0, a null value will default to True +QUERY_PLANNING_ON = config.get("dataframe.query-planning", None) is not False + +# Register custom expressions and collections +try: + import dask_cudf.expr._collection + import dask_cudf.expr._expr + +except ImportError as err: + if QUERY_PLANNING_ON: + # Dask *should* raise an error before this. + # However, we can still raise here to be certain. + raise RuntimeError( + "Failed to register the 'cudf' backend for dask-expr." + " Please make sure you have dask-expr installed.\n" + f"Error Message: {err}" + ) diff --git a/python/dask_cudf/dask_cudf/expr/_collection.py b/python/dask_cudf/dask_cudf/expr/_collection.py new file mode 100644 index 00000000000..b2f92aeddda --- /dev/null +++ b/python/dask_cudf/dask_cudf/expr/_collection.py @@ -0,0 +1,110 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from functools import cached_property + +from dask_expr import ( + DataFrame as DXDataFrame, + FrameBase, + Index as DXIndex, + Series as DXSeries, + get_collection_type, +) +from dask_expr._collection import new_collection +from dask_expr._util import _raise_if_object_series + +from dask import config +from dask.dataframe.core import is_dataframe_like + +import cudf + +## +## Custom collection classes +## + + +# VarMixin can be removed if cudf#15179 is addressed. +# See: https://github.com/rapidsai/cudf/issues/15179 +class VarMixin: + def var( + self, + axis=0, + skipna=True, + ddof=1, + numeric_only=False, + split_every=False, + **kwargs, + ): + _raise_if_object_series(self, "var") + axis = self._validate_axis(axis) + self._meta.var(axis=axis, skipna=skipna, numeric_only=numeric_only) + frame = self + if is_dataframe_like(self._meta) and numeric_only: + # Convert to pandas - cudf does something weird here + index = self._meta.to_pandas().var(numeric_only=True).index + frame = frame[list(index)] + return new_collection( + frame.expr.var( + axis, skipna, ddof, numeric_only, split_every=split_every + ) + ) + + +class DataFrame(VarMixin, DXDataFrame): + @classmethod + def from_dict(cls, *args, **kwargs): + with config.set({"dataframe.backend": "cudf"}): + return DXDataFrame.from_dict(*args, **kwargs) + + def groupby( + self, + by, + group_keys=True, + sort=None, + observed=None, + dropna=None, + **kwargs, + ): + from dask_cudf.expr._groupby import GroupBy + + if isinstance(by, FrameBase) and not isinstance(by, DXSeries): + raise ValueError( + f"`by` must be a column name or list of columns, got {by}." + ) + + return GroupBy( + self, + by, + group_keys=group_keys, + sort=sort, + observed=observed, + dropna=dropna, + **kwargs, + ) + + +class Series(VarMixin, DXSeries): + def groupby(self, by, **kwargs): + from dask_cudf.expr._groupby import SeriesGroupBy + + return SeriesGroupBy(self, by, **kwargs) + + @cached_property + def list(self): + from dask_cudf.accessors import ListMethods + + return ListMethods(self) + + @cached_property + def struct(self): + from dask_cudf.accessors import StructMethods + + return StructMethods(self) + + +class Index(DXIndex): + pass # Same as pandas (for now) + + +get_collection_type.register(cudf.DataFrame, lambda _: DataFrame) +get_collection_type.register(cudf.Series, lambda _: Series) +get_collection_type.register(cudf.BaseIndex, lambda _: Index) diff --git a/python/dask_cudf/dask_cudf/expr/_expr.py b/python/dask_cudf/dask_cudf/expr/_expr.py new file mode 100644 index 00000000000..cbe7a71cb73 --- /dev/null +++ b/python/dask_cudf/dask_cudf/expr/_expr.py @@ -0,0 +1,58 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from dask_expr._cumulative import CumulativeBlockwise, TakeLast +from dask_expr._reductions import Var + +## +## Custom expression patching +## + + +# This can be removed after cudf#15176 is addressed. +# See: https://github.com/rapidsai/cudf/issues/15176 +class PatchCumulativeBlockwise(CumulativeBlockwise): + @property + def _args(self) -> list: + return self.operands[:1] + + @property + def _kwargs(self) -> dict: + # Must pass axis and skipna as kwargs in cudf + return {"axis": self.axis, "skipna": self.skipna} + + +CumulativeBlockwise._args = PatchCumulativeBlockwise._args +CumulativeBlockwise._kwargs = PatchCumulativeBlockwise._kwargs + + +# This can be removed if squeeze support is added to cudf, +# or if squeeze is removed from the dask-expr logic. +# See: https://github.com/rapidsai/cudf/issues/15177 +def _takelast(a, skipna=True): + if not len(a): + return a + if skipna: + a = a.bfill() + # Cannot use `squeeze` with cudf + return a.tail(n=1).iloc[0] + + +TakeLast.operation = staticmethod(_takelast) + + +# This patch accounts for differences between +# numpy and cupy behavior. It may make sense +# to move this logic upstream. +_dx_reduction_aggregate = Var.reduction_aggregate + + +def _reduction_aggregate(*args, **kwargs): + result = _dx_reduction_aggregate(*args, **kwargs) + if result.ndim == 0: + # cupy will sometimes produce a 0d array, and + # we need to convert it to a scalar. + return result.item() + return result + + +Var.reduction_aggregate = staticmethod(_reduction_aggregate) diff --git a/python/dask_cudf/dask_cudf/expr/_groupby.py b/python/dask_cudf/dask_cudf/expr/_groupby.py new file mode 100644 index 00000000000..7f275151f75 --- /dev/null +++ b/python/dask_cudf/dask_cudf/expr/_groupby.py @@ -0,0 +1,48 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from dask_expr._groupby import ( + GroupBy as DXGroupBy, + SeriesGroupBy as DXSeriesGroupBy, +) +from dask_expr._util import is_scalar + +## +## Custom groupby classes +## + +# TODO: These classes are mostly a work-around for missing +# `observed=False` support. +# See: https://github.com/rapidsai/cudf/issues/15173 + + +class GroupBy(DXGroupBy): + def __init__(self, *args, observed=None, **kwargs): + observed = observed if observed is not None else True + super().__init__(*args, observed=observed, **kwargs) + + def __getitem__(self, key): + if is_scalar(key): + return SeriesGroupBy( + self.obj, + by=self.by, + slice=key, + sort=self.sort, + dropna=self.dropna, + observed=self.observed, + ) + g = GroupBy( + self.obj, + by=self.by, + slice=key, + sort=self.sort, + dropna=self.dropna, + observed=self.observed, + group_keys=self.group_keys, + ) + return g + + +class SeriesGroupBy(DXSeriesGroupBy): + def __init__(self, *args, observed=None, **kwargs): + observed = observed if observed is not None else True + super().__init__(*args, observed=observed, **kwargs) diff --git a/python/dask_cudf/dask_cudf/io/tests/test_json.py b/python/dask_cudf/dask_cudf/io/tests/test_json.py index 5e06832ed94..a2b1d7fc114 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_json.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_json.py @@ -10,6 +10,10 @@ from dask.utils import tmpfile import dask_cudf +from dask_cudf.tests.utils import skip_dask_expr + +# No dask-expr support +pytestmark = skip_dask_expr() def test_read_json_backend_dispatch(tmp_path): diff --git a/python/dask_cudf/dask_cudf/io/tests/test_orc.py b/python/dask_cudf/dask_cudf/io/tests/test_orc.py index c2be75e8ddd..8ccb7a7bfe7 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_orc.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_orc.py @@ -12,6 +12,10 @@ import cudf import dask_cudf +from dask_cudf.tests.utils import skip_dask_expr + +# No dask-expr support +pytestmark = skip_dask_expr() cur_dir = os.path.dirname(__file__) sample_orc = os.path.join(cur_dir, "data/orc/sample.orc") diff --git a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py index 5e4ea578101..de2a735b2ce 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py @@ -15,6 +15,7 @@ import cudf import dask_cudf +from dask_cudf.tests.utils import skip_dask_expr, xfail_dask_expr # Check if create_metadata_file is supported by # the current dask.dataframe version @@ -71,7 +72,7 @@ def test_roundtrip_from_dask(tmpdir, divisions, write_metadata_file): ddf2 = dask_cudf.read_parquet( files, columns="y", calculate_divisions=divisions ) - dd.assert_eq(ddf[["y"]], ddf2, check_divisions=divisions) + dd.assert_eq(ddf["y"], ddf2, check_divisions=divisions) # Now include metadata ddf2 = dask_cudf.read_parquet(tmpdir, calculate_divisions=divisions) @@ -87,7 +88,7 @@ def test_roundtrip_from_dask(tmpdir, divisions, write_metadata_file): ddf2 = dask_cudf.read_parquet( tmpdir, columns="y", calculate_divisions=divisions ) - dd.assert_eq(ddf[["y"]], ddf2, check_divisions=divisions) + dd.assert_eq(ddf["y"], ddf2, check_divisions=divisions) def test_roundtrip_from_dask_index_false(tmpdir): @@ -184,6 +185,7 @@ def test_dask_timeseries_from_dask(tmpdir, index, divisions): ) +@xfail_dask_expr("Categorical column support") @pytest.mark.parametrize("index", [False, None]) @pytest.mark.parametrize("divisions", [False, True]) def test_dask_timeseries_from_daskcudf(tmpdir, index, divisions): @@ -292,7 +294,11 @@ def test_filters_at_row_group_level(tmpdir): assert a.npartitions == 1 assert (a.shape[0] == 1).compute() - ddf.to_parquet(tmp_path, engine="pyarrow", row_group_size=1) + # Overwrite=True can be removed for dask-expr>=0.4.1 + # See: https://github.com/dask-contrib/dask-expr/issues/800 + ddf.to_parquet( + tmp_path, engine="pyarrow", row_group_size=1, overwrite=True + ) b = dask_cudf.read_parquet( tmp_path, filters=[("x", "==", 1)], split_row_groups=True @@ -436,6 +442,7 @@ def test_create_metadata_file(tmpdir, partition_on): dd.assert_eq(ddf1, ddf2) +@xfail_dask_expr("dtypes are inconsistent") @need_create_meta def test_create_metadata_file_inconsistent_schema(tmpdir): # NOTE: This test demonstrates that the CudfEngine @@ -516,15 +523,19 @@ def test_cudf_list_struct_write(tmpdir): dd.assert_eq(df, new_ddf) +@skip_dask_expr("Not necessary in dask-expr") def test_check_file_size(tmpdir): # Test simple file-size check to help warn users # of upstream change to `split_row_groups` default fn = str(tmpdir.join("test.parquet")) cudf.DataFrame({"a": np.arange(1000)}).to_parquet(fn) with pytest.warns(match="large parquet file"): - dask_cudf.read_parquet(fn, check_file_size=1).compute() + # Need to use `dask_cudf.io` path + # TODO: Remove outdated `check_file_size` functionality + dask_cudf.io.read_parquet(fn, check_file_size=1).compute() +@xfail_dask_expr("HivePartitioning cannot be hashed") def test_null_partition(tmpdir): import pyarrow as pa from pyarrow.dataset import HivePartitioning @@ -554,11 +565,10 @@ def test_nullable_schema_mismatch(tmpdir): path1 = str(tmpdir.join("test.1.parquet")) cudf.DataFrame.from_dict({"a": [1, 2, 3]}).to_parquet(path0) cudf.DataFrame.from_dict({"a": [4, 5, None]}).to_parquet(path1) - with dask.config.set({"dataframe.backend": "cudf"}): - ddf = dd.read_parquet( - [path0, path1], split_row_groups=2, aggregate_files=True - ) - expect = pd.read_parquet([path0, path1]) + ddf = dask_cudf.read_parquet( + [path0, path1], split_row_groups=2, aggregate_files=True + ) + expect = pd.read_parquet([path0, path1]) dd.assert_eq(ddf, expect, check_index=False) diff --git a/python/dask_cudf/dask_cudf/io/tests/test_s3.py b/python/dask_cudf/dask_cudf/io/tests/test_s3.py index 7614ea38d6a..f4a6fabdb60 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_s3.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_s3.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2023, NVIDIA CORPORATION. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. import os import socket @@ -10,6 +10,10 @@ import pytest import dask_cudf +from dask_cudf.tests.utils import skip_dask_expr + +# No dask-expr support +pytestmark = skip_dask_expr() moto = pytest.importorskip("moto", minversion="3.1.6") boto3 = pytest.importorskip("boto3") diff --git a/python/dask_cudf/dask_cudf/io/tests/test_text.py b/python/dask_cudf/dask_cudf/io/tests/test_text.py index a14eec1fea9..d3dcd386d0d 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_text.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_text.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. import os @@ -9,6 +9,10 @@ import cudf import dask_cudf +from dask_cudf.tests.utils import skip_dask_expr + +# No dask-expr support +pytestmark = skip_dask_expr() cur_dir = os.path.dirname(__file__) text_file = os.path.join(cur_dir, "data/text/sample.pgn") diff --git a/python/dask_cudf/dask_cudf/tests/test_accessor.py b/python/dask_cudf/dask_cudf/tests/test_accessor.py index 8c9ce45df59..ebb8e4be187 100644 --- a/python/dask_cudf/dask_cudf/tests/test_accessor.py +++ b/python/dask_cudf/dask_cudf/tests/test_accessor.py @@ -12,6 +12,7 @@ from cudf.testing._utils import assert_eq, does_not_raise import dask_cudf +from dask_cudf.tests.utils import xfail_dask_expr ############################################################################# # Datetime Accessor # @@ -110,6 +111,7 @@ def test_categorical_accessor_initialization2(data): dsr.cat +@xfail_dask_expr("TODO: Unexplained dask-expr failure") @pytest.mark.parametrize("data", [data_cat_1()]) def test_categorical_basic(data): cat = data.copy() @@ -201,10 +203,11 @@ def test_categorical_compare_unordered(data): dsr < dsr +@xfail_dask_expr("TODO: Unexplained dask-expr failure") @pytest.mark.parametrize("data", [data_cat_3()]) def test_categorical_compare_ordered(data): - cat1 = data[0] - cat2 = data[1] + cat1 = data[0].copy() + cat2 = data[1].copy() pdsr1 = pd.Series(cat1) pdsr2 = pd.Series(cat2) sr1 = Series(cat1) @@ -271,6 +274,7 @@ def test_categorical_categories(): ) +@xfail_dask_expr("TODO: Unexplained dask-expr failure") def test_categorical_as_known(): df = dask_cudf.from_cudf(DataFrame({"col_1": [0, 1, 2, 3]}), npartitions=2) df["col_1"] = df["col_1"].astype("category") diff --git a/python/dask_cudf/dask_cudf/tests/test_applymap.py b/python/dask_cudf/dask_cudf/tests/test_applymap.py index 929f00ec296..d84235481c3 100644 --- a/python/dask_cudf/dask_cudf/tests/test_applymap.py +++ b/python/dask_cudf/dask_cudf/tests/test_applymap.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. import pytest from pandas import NA @@ -24,6 +24,6 @@ def test_applymap_basic(func, has_na): dpdf = dd.from_pandas(pdf, npartitions=dgdf.npartitions) - expect = dpdf.applymap(func) - got = dgdf.applymap(func) + expect = dpdf.map(func) + got = dgdf.map(func) dd.assert_eq(expect, got, check_dtype=False) diff --git a/python/dask_cudf/dask_cudf/tests/test_core.py b/python/dask_cudf/dask_cudf/tests/test_core.py index ecad2220ba5..8a2f3414fd1 100644 --- a/python/dask_cudf/dask_cudf/tests/test_core.py +++ b/python/dask_cudf/dask_cudf/tests/test_core.py @@ -15,6 +15,7 @@ import cudf import dask_cudf +from dask_cudf.tests.utils import skip_dask_expr, xfail_dask_expr def test_from_dict_backend_dispatch(): @@ -83,7 +84,7 @@ def test_to_backend_kwargs(): gser_null.to_backend("pandas", bad_arg=True) -def test_from_cudf(): +def test_from_pandas(): np.random.seed(0) df = pd.DataFrame( @@ -95,16 +96,16 @@ def test_from_cudf(): gdf = cudf.DataFrame.from_pandas(df) - # Test simple around to/from dask + # Test simple around to/from cudf ingested = dd.from_pandas(gdf, npartitions=2) dd.assert_eq(ingested, df) - # Test conversion to dask.dataframe - ddf = ingested.to_dask_dataframe() + # Test conversion back to pandas + ddf = ingested.to_backend("pandas") dd.assert_eq(ddf, df) -def test_from_cudf_multiindex_raises(): +def test_from_pandas_multiindex_raises(): df = cudf.DataFrame({"x": list("abc"), "y": [1, 2, 3], "z": [1, 2, 3]}) with pytest.raises(NotImplementedError): @@ -112,7 +113,7 @@ def test_from_cudf_multiindex_raises(): dask_cudf.from_cudf(df.set_index(["x", "y"])) -def test_from_cudf_with_generic_idx(): +def test_from_pandas_with_generic_idx(): cdf = cudf.DataFrame( { "a": list(range(20)), @@ -187,22 +188,8 @@ def test_head(): dd.assert_eq(dgf.head(), df.head()) -def test_from_dask_dataframe(): - np.random.seed(0) - df = pd.DataFrame( - {"x": np.random.randint(0, 5, size=20), "y": np.random.normal(size=20)} - ) - ddf = dd.from_pandas(df, npartitions=2) - dgdf = ddf.map_partitions(cudf.from_pandas) - got = dgdf.compute().to_pandas() - expect = df - - dd.assert_eq(got, expect) - - @pytest.mark.parametrize("nelem", [10, 200, 1333]) -@pytest.mark.parametrize("divisions", [None, "quantile"]) -def test_set_index(nelem, divisions): +def test_set_index(nelem): with dask.config.set(scheduler="single-threaded"): np.random.seed(0) # Use unique index range as the sort may not be stable-ordering @@ -212,14 +199,15 @@ def test_set_index(nelem, divisions): {"x": x, "y": np.random.randint(0, nelem, size=nelem)} ) ddf = dd.from_pandas(df, npartitions=2) - dgdf = ddf.map_partitions(cudf.from_pandas) + ddf2 = ddf.to_backend("cudf") expect = ddf.set_index("x") - got = dgdf.set_index("x", divisions=divisions) + got = ddf2.set_index("x") dd.assert_eq(expect, got, check_index=False, check_divisions=False) +@xfail_dask_expr("missing support for divisions='quantile'") @pytest.mark.parametrize("by", ["a", "b"]) @pytest.mark.parametrize("nelem", [10, 500]) @pytest.mark.parametrize("nparts", [1, 10]) @@ -269,7 +257,6 @@ def test_set_index_2(nelem): assert_frame_equal_by_index_group(expect, got) -@pytest.mark.xfail(reason="dask's index name '__dask_cudf.index' is correct") def test_set_index_w_series(): with dask.config.set(scheduler="single-threaded"): nelem = 20 @@ -349,7 +336,8 @@ def test_assign(): newcol = dd.from_pandas(cudf.Series(pdcol), npartitions=dgf.npartitions) got = dgf.assign(z=newcol) - dd.assert_eq(got.loc[:, ["x", "y"]], df) + # Using `loc[:, ["x", "y"]]` was broken for dask-expr 0.4.0 + dd.assert_eq(got[["x", "y"]], df) np.testing.assert_array_equal(got["z"].compute().values_host, pdcol) @@ -400,6 +388,7 @@ def test_setitem_scalar_datetime(): np.testing.assert_array_equal(got["z"], df["z"]) +@skip_dask_expr("Not relevant for dask-expr") @pytest.mark.parametrize( "func", [ @@ -756,13 +745,13 @@ def test_dataframe_assign_col(): ddf = dask_cudf.from_cudf(df, npartitions=4) ddf["fold"] = 0 ddf["fold"] = ddf["fold"].map_partitions( - lambda cudf_df: cp.random.randint(0, 4, len(cudf_df)) + lambda cudf_df: cudf.Series(cp.random.randint(0, 4, len(cudf_df))) ) pddf = dd.from_pandas(pdf, npartitions=4) pddf["fold"] = 0 pddf["fold"] = pddf["fold"].map_partitions( - lambda p_df: np.random.randint(0, 4, len(p_df)) + lambda p_df: pd.Series(np.random.randint(0, 4, len(p_df))) ) dd.assert_eq(ddf[0], pddf[0]) @@ -787,6 +776,7 @@ def test_dataframe_set_index(): assert_eq(ddf.compute(), pddf.compute()) +@xfail_dask_expr("Insufficient describe support in dask-expr") def test_series_describe(): random.seed(0) sr = cudf.datasets.randomdata(20)["x"] @@ -802,6 +792,7 @@ def test_series_describe(): ) +@xfail_dask_expr("Insufficient describe support in dask-expr") def test_dataframe_describe(): random.seed(0) df = cudf.datasets.randomdata(20) @@ -815,6 +806,7 @@ def test_dataframe_describe(): ) +@xfail_dask_expr("Insufficient describe support in dask-expr") def test_zero_std_describe(): num = 84886781 df = cudf.DataFrame( @@ -858,15 +850,6 @@ def test_index_map_partitions(): def test_merging_categorical_columns(): - try: - from dask.dataframe.dispatch import ( # noqa: F401 - union_categoricals_dispatch, - ) - except ImportError: - pytest.skip( - "need a version of dask that has union_categoricals_dispatch" - ) - df_1 = cudf.DataFrame( {"id_1": [0, 1, 2, 3], "cat_col": ["a", "b", "f", "f"]} ) @@ -882,6 +865,7 @@ def test_merging_categorical_columns(): ddf_2 = dask_cudf.from_cudf(df_2, npartitions=2) ddf_2 = dd.categorical.categorize(ddf_2, columns=["cat_col"]) + expected = cudf.DataFrame( { "id_1": [2, 3], @@ -894,15 +878,11 @@ def test_merging_categorical_columns(): "id_2": [113, 113], } ) - dd.assert_eq(ddf_1.merge(ddf_2), expected) + with pytest.warns(UserWarning, match="mismatch"): + dd.assert_eq(ddf_1.merge(ddf_2), expected) def test_correct_meta(): - try: - from dask.dataframe.dispatch import make_meta_obj # noqa: F401 - except ImportError: - pytest.skip("need make_meta_obj to be preset") - # Need these local imports in this specific order. # For context: https://github.com/rapidsai/cudf/issues/7946 import pandas as pd diff --git a/python/dask_cudf/dask_cudf/tests/test_groupby.py b/python/dask_cudf/dask_cudf/tests/test_groupby.py index 30251b88dea..3bb3e3b0bb8 100644 --- a/python/dask_cudf/dask_cudf/tests/test_groupby.py +++ b/python/dask_cudf/dask_cudf/tests/test_groupby.py @@ -12,6 +12,17 @@ import dask_cudf from dask_cudf.groupby import OPTIMIZED_AGGS, _aggs_optimized +from dask_cudf.tests.utils import QUERY_PLANNING_ON, xfail_dask_expr + +# XFAIL "collect" tests for now +agg_params = [agg for agg in OPTIMIZED_AGGS if agg != "collect"] +if QUERY_PLANNING_ON: + agg_params.append( + # TODO: "collect" not supported with dask-expr yet + pytest.param("collect", marks=pytest.mark.xfail) + ) +else: + agg_params.append("collect") def assert_cudf_groupby_layers(ddf): @@ -46,48 +57,42 @@ def pdf(request): return pdf -@pytest.mark.parametrize("aggregation", OPTIMIZED_AGGS) +@pytest.mark.parametrize("aggregation", agg_params) @pytest.mark.parametrize("series", [False, True]) def test_groupby_basic(series, aggregation, pdf): gdf = cudf.DataFrame.from_pandas(pdf) - gdf_grouped = gdf.groupby("xx") - ddf_grouped = dask_cudf.from_cudf(gdf, npartitions=5).groupby("xx") + gdf_grouped = gdf.groupby("xx", dropna=True) + ddf_grouped = dask_cudf.from_cudf(gdf, npartitions=5).groupby( + "xx", dropna=True + ) if series: - gdf_grouped = gdf_grouped.xx - ddf_grouped = ddf_grouped.xx + gdf_grouped = gdf_grouped.x + ddf_grouped = ddf_grouped.x check_dtype = aggregation != "count" expect = getattr(gdf_grouped, aggregation)() actual = getattr(ddf_grouped, aggregation)() - assert_cudf_groupby_layers(actual) + if not QUERY_PLANNING_ON: + assert_cudf_groupby_layers(actual) dd.assert_eq(expect, actual, check_dtype=check_dtype) - expect = gdf_grouped.agg({"xx": aggregation}) - actual = ddf_grouped.agg({"xx": aggregation}) + if not series: + expect = gdf_grouped.agg({"x": aggregation}) + actual = ddf_grouped.agg({"x": aggregation}) - assert_cudf_groupby_layers(actual) + if not QUERY_PLANNING_ON: + assert_cudf_groupby_layers(actual) - dd.assert_eq(expect, actual, check_dtype=check_dtype) + dd.assert_eq(expect, actual, check_dtype=check_dtype) # TODO: explore adding support with `.agg()` @pytest.mark.parametrize("series", [True, False]) -@pytest.mark.parametrize( - "aggregation", - [ - "cumsum", - pytest.param( - "cumcount", - marks=pytest.mark.xfail( - reason="https://github.com/rapidsai/cudf/issues/13390" - ), - ), - ], -) +@pytest.mark.parametrize("aggregation", ["cumsum", "cumcount"]) def test_groupby_cumulative(aggregation, pdf, series): gdf = cudf.DataFrame.from_pandas(pdf) ddf = dask_cudf.from_cudf(gdf, npartitions=5) @@ -105,7 +110,7 @@ def test_groupby_cumulative(aggregation, pdf, series): dd.assert_eq(a, b) -@pytest.mark.parametrize("aggregation", OPTIMIZED_AGGS) +@pytest.mark.parametrize("aggregation", agg_params) @pytest.mark.parametrize( "func", [ @@ -119,7 +124,6 @@ def test_groupby_cumulative(aggregation, pdf, series): ) def test_groupby_agg(func, aggregation, pdf): gdf = cudf.DataFrame.from_pandas(pdf) - ddf = dask_cudf.from_cudf(gdf, npartitions=5) actual = func(ddf, aggregation) @@ -127,11 +131,12 @@ def test_groupby_agg(func, aggregation, pdf): check_dtype = aggregation != "count" - assert_cudf_groupby_layers(actual) + if not QUERY_PLANNING_ON: + assert_cudf_groupby_layers(actual) - # groupby.agg should add an explicit getitem layer - # to improve/enable column projection - assert hlg_layer(actual.dask, "getitem") + # groupby.agg should add an explicit getitem layer + # to improve/enable column projection + assert hlg_layer(actual.dask, "getitem") dd.assert_eq(expect, actual, check_names=False, check_dtype=check_dtype) @@ -574,6 +579,7 @@ def test_groupby_categorical_key(): dd.assert_eq(expect, got) +@xfail_dask_expr("as_index not supported in dask-expr") @pytest.mark.parametrize("as_index", [True, False]) @pytest.mark.parametrize("split_out", ["use_dask_default", 1, 2]) @pytest.mark.parametrize("split_every", [False, 4]) @@ -662,6 +668,7 @@ def test_groupby_agg_params(npartitions, split_every, split_out, as_index): dd.assert_eq(gf, pf) +@xfail_dask_expr("Newer dask-expr version needed") @pytest.mark.parametrize( "aggregations", [(sum, "sum"), (max, "max"), (min, "min")] ) @@ -700,6 +707,7 @@ def test_is_supported(arg, supported): assert _aggs_optimized(arg, OPTIMIZED_AGGS) is supported +@xfail_dask_expr("Fails on older versions of dask-expr") def test_groupby_unique_lists(): df = pd.DataFrame({"a": [0, 0, 0, 1, 1, 1], "b": [10, 10, 10, 7, 8, 9]}) gdf = cudf.from_pandas(df) @@ -746,6 +754,7 @@ def test_groupby_first_last(data, agg): ) +@xfail_dask_expr("Co-alignment check fails in dask-expr") def test_groupby_with_list_of_series(): df = cudf.DataFrame({"a": [1, 2, 3, 4, 5]}) gdf = dask_cudf.from_cudf(df, npartitions=2) @@ -760,6 +769,7 @@ def test_groupby_with_list_of_series(): ) +@xfail_dask_expr("Nested renamer not supported in dask-expr") @pytest.mark.parametrize( "func", [ @@ -812,12 +822,12 @@ def test_groupby_all_columns(func): ) ddf = dd.from_pandas(pdf, npartitions=5) - gddf = ddf.map_partitions(cudf.from_pandas) + gddf = ddf.to_backend("cudf") expect = func(ddf) actual = func(gddf) - dd.assert_eq(expect, actual) + dd.assert_eq(expect, actual, check_names=not QUERY_PLANNING_ON) def test_groupby_shuffle(): @@ -855,13 +865,14 @@ def test_groupby_shuffle(): got = gddf.groupby("a", sort=False).agg(spec, split_out=2) dd.assert_eq(expect, got.compute().sort_index()) - # Sorted aggregation fails with split_out>1 when shuffle is False - # (sort=True, split_out=2, shuffle_method=False) - with pytest.raises(ValueError): - gddf.groupby("a", sort=True).agg( - spec, shuffle_method=False, split_out=2 - ) + if not QUERY_PLANNING_ON: + # Sorted aggregation fails with split_out>1 when shuffle is False + # (sort=True, split_out=2, shuffle_method=False) + with pytest.raises(ValueError): + gddf.groupby("a", sort=True).agg( + spec, shuffle_method=False, split_out=2 + ) - # Check shuffle kwarg deprecation - with pytest.warns(match="'shuffle' keyword is deprecated"): - gddf.groupby("a", sort=True).agg(spec, shuffle=False) + # Check shuffle kwarg deprecation + with pytest.warns(match="'shuffle' keyword is deprecated"): + gddf.groupby("a", sort=True).agg(spec, shuffle=False) diff --git a/python/dask_cudf/dask_cudf/tests/test_join.py b/python/dask_cudf/dask_cudf/tests/test_join.py index eb500ad2462..42ecc130298 100644 --- a/python/dask_cudf/dask_cudf/tests/test_join.py +++ b/python/dask_cudf/dask_cudf/tests/test_join.py @@ -163,7 +163,7 @@ def test_merge_left( } ) - expect = left.merge(right, on=("x", "y"), how=how) + expect = left.merge(right, on=["x", "y"], how=how) def normalize(df): return ( @@ -176,7 +176,7 @@ def normalize(df): left = dask_cudf.from_cudf(left, chunksize=chunksize) right = dask_cudf.from_cudf(right, chunksize=chunksize) - result = left.merge(right, on=("x", "y"), how=how).compute( + result = left.merge(right, on=["x", "y"], how=how).compute( scheduler="single-threaded" ) diff --git a/python/dask_cudf/dask_cudf/tests/test_onehot.py b/python/dask_cudf/dask_cudf/tests/test_onehot.py index 6453d843467..96646f85f74 100644 --- a/python/dask_cudf/dask_cudf/tests/test_onehot.py +++ b/python/dask_cudf/dask_cudf/tests/test_onehot.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2022, NVIDIA CORPORATION. +# Copyright (c) 2019-2024, NVIDIA CORPORATION. import pandas as pd import pytest @@ -8,6 +8,10 @@ import cudf import dask_cudf +from dask_cudf.tests.utils import xfail_dask_expr + +# No dask-expr support +pytestmark = xfail_dask_expr("limited get_dummy support in dask-expr + cudf") def test_get_dummies_cat(): diff --git a/python/dask_cudf/dask_cudf/tests/test_reductions.py b/python/dask_cudf/dask_cudf/tests/test_reductions.py index 8688f830dcb..c3056f2607c 100644 --- a/python/dask_cudf/dask_cudf/tests/test_reductions.py +++ b/python/dask_cudf/dask_cudf/tests/test_reductions.py @@ -68,7 +68,7 @@ def test_series_reduce(reducer): ) def test_rowwise_reductions(data, op): gddf = dask_cudf.from_cudf(data, npartitions=10) - pddf = gddf.to_dask_dataframe() + pddf = gddf.to_backend("pandas") with dask.config.set({"dataframe.convert-string": False}): if op in ("var", "std"): diff --git a/python/dask_cudf/dask_cudf/tests/test_sort.py b/python/dask_cudf/dask_cudf/tests/test_sort.py index 8cf621da1bf..9184ad996ad 100644 --- a/python/dask_cudf/dask_cudf/tests/test_sort.py +++ b/python/dask_cudf/dask_cudf/tests/test_sort.py @@ -10,10 +10,26 @@ import cudf import dask_cudf +from dask_cudf.tests.utils import xfail_dask_expr @pytest.mark.parametrize("ascending", [True, False]) -@pytest.mark.parametrize("by", ["a", "b", "c", "d", ["a", "b"], ["c", "d"]]) +@pytest.mark.parametrize( + "by", + [ + "a", + "b", + "c", + pytest.param( + "d", + marks=xfail_dask_expr( + "Dask-expr fails to sort by categorical column." + ), + ), + ["a", "b"], + ["c", "d"], + ], +) @pytest.mark.parametrize("nelem", [10, 500]) @pytest.mark.parametrize("nparts", [1, 10]) def test_sort_values(nelem, nparts, by, ascending): @@ -56,6 +72,7 @@ def test_sort_repartition(): dd.assert_eq(len(new_ddf), len(ddf)) +@xfail_dask_expr("dask-expr code path fails with nulls") @pytest.mark.parametrize("na_position", ["first", "last"]) @pytest.mark.parametrize("ascending", [True, False]) @pytest.mark.parametrize("by", ["a", "b", ["a", "b"]]) @@ -117,10 +134,6 @@ def test_sort_values_empty_string(by): def test_disk_shuffle(): - try: - from dask.dataframe.dispatch import partd_encode_dispatch # noqa: F401 - except ImportError: - pytest.skip("need a version of dask that has partd_encode_dispatch") df = cudf.DataFrame({"a": [1, 2, 3] * 20, "b": [4, 5, 6, 7] * 15}) ddf = dd.from_pandas(df, npartitions=4) got = dd.DataFrame.shuffle(ddf, "a", shuffle_method="disk") diff --git a/python/dask_cudf/dask_cudf/tests/utils.py b/python/dask_cudf/dask_cudf/tests/utils.py index 88a2116fb0a..e838b8d63bc 100644 --- a/python/dask_cudf/dask_cudf/tests/utils.py +++ b/python/dask_cudf/dask_cudf/tests/utils.py @@ -1,12 +1,15 @@ -# Copyright (c) 2022, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. import numpy as np import pandas as pd +import pytest import dask.dataframe as dd import cudf +from dask_cudf.expr import QUERY_PLANNING_ON + def _make_random_frame(nelem, npartitions=2, include_na=False): df = pd.DataFrame( @@ -19,3 +22,14 @@ def _make_random_frame(nelem, npartitions=2, include_na=False): gdf = cudf.DataFrame.from_pandas(df) dgf = dd.from_pandas(gdf, npartitions=npartitions) return df, dgf + + +_default_reason = "Not compatible with dask-expr" + + +def skip_dask_expr(reason=_default_reason): + return pytest.mark.skipif(QUERY_PLANNING_ON, reason=reason) + + +def xfail_dask_expr(reason=_default_reason): + return pytest.mark.xfail(QUERY_PLANNING_ON, reason=reason) diff --git a/python/dask_cudf/pyproject.toml b/python/dask_cudf/pyproject.toml index 4ecfc4f3f85..21aaa17a6c7 100644 --- a/python/dask_cudf/pyproject.toml +++ b/python/dask_cudf/pyproject.toml @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2023, NVIDIA CORPORATION. +# Copyright (c) 2021-2024, NVIDIA CORPORATION. [build-system] build-backend = "setuptools.build_meta" @@ -39,6 +39,8 @@ classifiers = [ [project.entry-points."dask.dataframe.backends"] cudf = "dask_cudf.backends:CudfBackendEntrypoint" +[project.entry-points."dask_expr.dataframe.backends"] +cudf = "dask_cudf.backends:CudfDXBackendEntrypoint" [project.optional-dependencies] test = [