From 9b60ed75b9bf2b850b649503d928f15f39a66da1 Mon Sep 17 00:00:00 2001 From: Francesco Bruzzesi <42817048+FBruzzesi@users.noreply.github.com> Date: Mon, 29 Jul 2024 18:27:50 +0200 Subject: [PATCH] feat: dask `select` method (#667) * feat: dask select method * namespace methods * coverage and pragma * feedback adjustments --- narwhals/_dask/dataframe.py | 24 +++++++++++++++++++ narwhals/_dask/expr.py | 8 +++++++ narwhals/_dask/namespace.py | 41 +++++++++++++++++++++++++++++++++ narwhals/_dask/typing.py | 16 +++++++++++++ narwhals/_expression_parsing.py | 26 ++++++++++++++++----- tests/dask_test.py | 32 +++++++++++++++++++++++++ 6 files changed, 141 insertions(+), 6 deletions(-) create mode 100644 narwhals/_dask/typing.py diff --git a/narwhals/_dask/dataframe.py b/narwhals/_dask/dataframe.py index e85c1fa1a..34849c133 100644 --- a/narwhals/_dask/dataframe.py +++ b/narwhals/_dask/dataframe.py @@ -4,6 +4,7 @@ from typing import Any from narwhals._dask.utils import parse_exprs_and_named_exprs +from narwhals._expression_parsing import evaluate_into_exprs from narwhals.dependencies import get_dask_dataframe from narwhals.dependencies import get_pandas from narwhals.utils import Implementation @@ -14,6 +15,7 @@ from narwhals._dask.expr import DaskExpr from narwhals._dask.namespace import DaskNamespace + from narwhals._dask.typing import IntoDaskExpr class DaskLazyFrame: @@ -52,3 +54,25 @@ def collect(self) -> Any: implementation=Implementation.PANDAS, backend_version=parse_version(get_pandas().__version__), ) + + def lazy(self) -> Self: + return self + + def select( + self: Self, + *exprs: IntoDaskExpr, + **named_exprs: IntoDaskExpr, + ) -> Self: + dd = get_dask_dataframe() + + if exprs and all(isinstance(x, str) for x in exprs) and not named_exprs: + # This is a simple slice => fastpath! + return self._from_native_dataframe(self._native_dataframe.loc[:, exprs]) + + new_series = evaluate_into_exprs(self, *exprs, **named_exprs) + if not new_series: + # return empty dataframe, like Polars does + pd = get_pandas() + return self._from_native_dataframe(dd.from_pandas(pd.DataFrame())) + df = dd.concat(new_series, axis=1) + return self._from_native_dataframe(df) diff --git a/narwhals/_dask/expr.py b/narwhals/_dask/expr.py index 1c00d2866..36033afd9 100644 --- a/narwhals/_dask/expr.py +++ b/narwhals/_dask/expr.py @@ -12,6 +12,7 @@ from typing_extensions import Self from narwhals._dask.dataframe import DaskLazyFrame + from narwhals._dask.namespace import DaskNamespace from narwhals._dask.utils import maybe_evaluate @@ -35,6 +36,13 @@ def __init__( self._output_names = output_names self._backend_version = backend_version + def __narwhals_namespace__(self) -> DaskNamespace: # pragma: no cover + from narwhals._dask.namespace import DaskNamespace + + return DaskNamespace(backend_version=self._backend_version) + + def __narwhals_expr__(self) -> None: ... + @classmethod def from_column_names( cls: type[Self], diff --git a/narwhals/_dask/namespace.py b/narwhals/_dask/namespace.py index f30adba1a..7ae42ea1a 100644 --- a/narwhals/_dask/namespace.py +++ b/narwhals/_dask/namespace.py @@ -1,8 +1,17 @@ from __future__ import annotations +from typing import TYPE_CHECKING +from typing import Any +from typing import NoReturn + from narwhals import dtypes from narwhals._dask.expr import DaskExpr +if TYPE_CHECKING: + from typing import Callable + + from narwhals._dask.dataframe import DaskLazyFrame + class DaskNamespace: Int64 = dtypes.Int64 @@ -33,3 +42,35 @@ def col(self, *column_names: str) -> DaskExpr: *column_names, backend_version=self._backend_version, ) + + def _create_expr_from_series(self, _: Any) -> NoReturn: + msg = "`_create_expr_from_series` for DaskNamespace exists only for compatibility" + raise NotImplementedError(msg) + + def _create_compliant_series(self, _: Any) -> NoReturn: + msg = "`_create_compliant_series` for DaskNamespace exists only for compatibility" + raise NotImplementedError(msg) + + def _create_series_from_scalar(self, *_: Any) -> NoReturn: + msg = ( + "`_create_series_from_scalar` for DaskNamespace exists only for compatibility" + ) + raise NotImplementedError(msg) + + def _create_expr_from_callable( # pragma: no cover + self, + func: Callable[[DaskLazyFrame], list[DaskExpr]], + *, + depth: int, + function_name: str, + root_names: list[str] | None, + output_names: list[str] | None, + ) -> DaskExpr: + return DaskExpr( + call=func, + depth=depth, + function_name=function_name, + root_names=root_names, + output_names=output_names, + backend_version=self._backend_version, + ) diff --git a/narwhals/_dask/typing.py b/narwhals/_dask/typing.py new file mode 100644 index 000000000..23719eac0 --- /dev/null +++ b/narwhals/_dask/typing.py @@ -0,0 +1,16 @@ +from __future__ import annotations # pragma: no cover + +from typing import TYPE_CHECKING # pragma: no cover +from typing import Union # pragma: no cover + +if TYPE_CHECKING: + import sys + + if sys.version_info >= (3, 10): + from typing import TypeAlias + else: + from typing_extensions import TypeAlias + + from narwhals._dask.expr import DaskExpr + + IntoDaskExpr: TypeAlias = Union[DaskExpr, str] diff --git a/narwhals/_expression_parsing.py b/narwhals/_expression_parsing.py index 1cc9d3327..4c1bb3a22 100644 --- a/narwhals/_expression_parsing.py +++ b/narwhals/_expression_parsing.py @@ -20,21 +20,27 @@ from narwhals._arrow.namespace import ArrowNamespace from narwhals._arrow.series import ArrowSeries from narwhals._arrow.typing import IntoArrowExpr + from narwhals._dask.dataframe import DaskLazyFrame + from narwhals._dask.expr import DaskExpr + from narwhals._dask.namespace import DaskNamespace + from narwhals._dask.typing import IntoDaskExpr from narwhals._pandas_like.dataframe import PandasLikeDataFrame from narwhals._pandas_like.expr import PandasLikeExpr from narwhals._pandas_like.namespace import PandasLikeNamespace from narwhals._pandas_like.series import PandasLikeSeries from narwhals._pandas_like.typing import IntoPandasLikeExpr - CompliantNamespace = Union[PandasLikeNamespace, ArrowNamespace] - CompliantExpr = Union[PandasLikeExpr, ArrowExpr] - IntoCompliantExpr = Union[IntoPandasLikeExpr, IntoArrowExpr] + CompliantNamespace = Union[PandasLikeNamespace, ArrowNamespace, DaskNamespace] + CompliantExpr = Union[PandasLikeExpr, ArrowExpr, DaskExpr] + IntoCompliantExpr = Union[IntoPandasLikeExpr, IntoArrowExpr, IntoDaskExpr] IntoCompliantExprT = TypeVar("IntoCompliantExprT", bound=IntoCompliantExpr) CompliantExprT = TypeVar("CompliantExprT", bound=CompliantExpr) CompliantSeries = Union[PandasLikeSeries, ArrowSeries] - ListOfCompliantSeries = Union[list[PandasLikeSeries], list[ArrowSeries]] - ListOfCompliantExpr = Union[list[PandasLikeExpr], list[ArrowExpr]] - CompliantDataFrame = Union[PandasLikeDataFrame, ArrowDataFrame] + ListOfCompliantSeries = Union[ + list[PandasLikeSeries], list[ArrowSeries], list[DaskExpr] + ] + ListOfCompliantExpr = Union[list[PandasLikeExpr], list[ArrowExpr], list[DaskExpr]] + CompliantDataFrame = Union[PandasLikeDataFrame, ArrowDataFrame, DaskLazyFrame] T = TypeVar("T") @@ -63,6 +69,14 @@ def evaluate_into_exprs( ) -> list[ArrowSeries]: ... +@overload +def evaluate_into_exprs( + df: DaskLazyFrame, + *exprs: IntoDaskExpr, + **named_exprs: IntoDaskExpr, +) -> list[DaskExpr]: ... + + def evaluate_into_exprs( df: CompliantDataFrame, *exprs: IntoCompliantExprT, diff --git a/tests/dask_test.py b/tests/dask_test.py index 77657f028..24d0fb0a7 100644 --- a/tests/dask_test.py +++ b/tests/dask_test.py @@ -262,6 +262,38 @@ def test_str_to_lowercase( compare_dicts(result_frame, expected) +def test_select() -> None: + import dask.dataframe as dd + + data = {"a": [1, 3, 2], "b": [4, 4, 6], "z": [7.0, 8, 9]} + df = nw.from_native(dd.from_pandas(pd.DataFrame(data))) + result = df.select("a", nw.col("b") + 1, (nw.col("z") * 2).alias("z*2")) + expected = {"a": [1, 3, 2], "b": [5, 5, 7], "z*2": [14.0, 16.0, 18.0]} + compare_dicts(result, expected) + + +def test_str_only_select() -> None: + import dask.dataframe as dd + + data = {"a": [1, 3, 2], "b": [4, 4, 6], "z": [7.0, 8, 9]} + df = nw.from_native(dd.from_pandas(pd.DataFrame(data))) + result = df.select("a", "b") + expected = {"a": [1, 3, 2], "b": [4, 4, 6]} + compare_dicts(result, expected) + + +def test_empty_select() -> None: + import dask.dataframe as dd + + result = ( + nw.from_native(dd.from_pandas(pd.DataFrame({"a": [1, 2, 3]}))) + .lazy() + .select() + .collect() + ) + assert result.shape == (0, 0) + + def test_dt_year() -> None: import dask.dataframe as dd