Skip to content

Commit

Permalink
feat: dask select method (#667)
Browse files Browse the repository at this point in the history
* feat: dask select method

* namespace methods

* coverage and pragma

* feedback adjustments
  • Loading branch information
FBruzzesi authored Jul 29, 2024
1 parent f10443f commit 9b60ed7
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 6 deletions.
24 changes: 24 additions & 0 deletions narwhals/_dask/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Any

from narwhals._dask.utils import parse_exprs_and_named_exprs
from narwhals._expression_parsing import evaluate_into_exprs
from narwhals.dependencies import get_dask_dataframe
from narwhals.dependencies import get_pandas
from narwhals.utils import Implementation
Expand All @@ -14,6 +15,7 @@

from narwhals._dask.expr import DaskExpr
from narwhals._dask.namespace import DaskNamespace
from narwhals._dask.typing import IntoDaskExpr


class DaskLazyFrame:
Expand Down Expand Up @@ -52,3 +54,25 @@ def collect(self) -> Any:
implementation=Implementation.PANDAS,
backend_version=parse_version(get_pandas().__version__),
)

def lazy(self) -> Self:
return self

def select(
self: Self,
*exprs: IntoDaskExpr,
**named_exprs: IntoDaskExpr,
) -> Self:
dd = get_dask_dataframe()

if exprs and all(isinstance(x, str) for x in exprs) and not named_exprs:
# This is a simple slice => fastpath!
return self._from_native_dataframe(self._native_dataframe.loc[:, exprs])

new_series = evaluate_into_exprs(self, *exprs, **named_exprs)
if not new_series:
# return empty dataframe, like Polars does
pd = get_pandas()
return self._from_native_dataframe(dd.from_pandas(pd.DataFrame()))
df = dd.concat(new_series, axis=1)
return self._from_native_dataframe(df)
8 changes: 8 additions & 0 deletions narwhals/_dask/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from typing_extensions import Self

from narwhals._dask.dataframe import DaskLazyFrame
from narwhals._dask.namespace import DaskNamespace

from narwhals._dask.utils import maybe_evaluate

Expand All @@ -35,6 +36,13 @@ def __init__(
self._output_names = output_names
self._backend_version = backend_version

def __narwhals_namespace__(self) -> DaskNamespace: # pragma: no cover
from narwhals._dask.namespace import DaskNamespace

return DaskNamespace(backend_version=self._backend_version)

def __narwhals_expr__(self) -> None: ...

@classmethod
def from_column_names(
cls: type[Self],
Expand Down
41 changes: 41 additions & 0 deletions narwhals/_dask/namespace.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
from __future__ import annotations

from typing import TYPE_CHECKING
from typing import Any
from typing import NoReturn

from narwhals import dtypes
from narwhals._dask.expr import DaskExpr

if TYPE_CHECKING:
from typing import Callable

from narwhals._dask.dataframe import DaskLazyFrame


class DaskNamespace:
Int64 = dtypes.Int64
Expand Down Expand Up @@ -33,3 +42,35 @@ def col(self, *column_names: str) -> DaskExpr:
*column_names,
backend_version=self._backend_version,
)

def _create_expr_from_series(self, _: Any) -> NoReturn:
msg = "`_create_expr_from_series` for DaskNamespace exists only for compatibility"
raise NotImplementedError(msg)

def _create_compliant_series(self, _: Any) -> NoReturn:
msg = "`_create_compliant_series` for DaskNamespace exists only for compatibility"
raise NotImplementedError(msg)

def _create_series_from_scalar(self, *_: Any) -> NoReturn:
msg = (
"`_create_series_from_scalar` for DaskNamespace exists only for compatibility"
)
raise NotImplementedError(msg)

def _create_expr_from_callable( # pragma: no cover
self,
func: Callable[[DaskLazyFrame], list[DaskExpr]],
*,
depth: int,
function_name: str,
root_names: list[str] | None,
output_names: list[str] | None,
) -> DaskExpr:
return DaskExpr(
call=func,
depth=depth,
function_name=function_name,
root_names=root_names,
output_names=output_names,
backend_version=self._backend_version,
)
16 changes: 16 additions & 0 deletions narwhals/_dask/typing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from __future__ import annotations # pragma: no cover

from typing import TYPE_CHECKING # pragma: no cover
from typing import Union # pragma: no cover

if TYPE_CHECKING:
import sys

if sys.version_info >= (3, 10):
from typing import TypeAlias
else:
from typing_extensions import TypeAlias

from narwhals._dask.expr import DaskExpr

IntoDaskExpr: TypeAlias = Union[DaskExpr, str]
26 changes: 20 additions & 6 deletions narwhals/_expression_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,27 @@
from narwhals._arrow.namespace import ArrowNamespace
from narwhals._arrow.series import ArrowSeries
from narwhals._arrow.typing import IntoArrowExpr
from narwhals._dask.dataframe import DaskLazyFrame
from narwhals._dask.expr import DaskExpr
from narwhals._dask.namespace import DaskNamespace
from narwhals._dask.typing import IntoDaskExpr
from narwhals._pandas_like.dataframe import PandasLikeDataFrame
from narwhals._pandas_like.expr import PandasLikeExpr
from narwhals._pandas_like.namespace import PandasLikeNamespace
from narwhals._pandas_like.series import PandasLikeSeries
from narwhals._pandas_like.typing import IntoPandasLikeExpr

CompliantNamespace = Union[PandasLikeNamespace, ArrowNamespace]
CompliantExpr = Union[PandasLikeExpr, ArrowExpr]
IntoCompliantExpr = Union[IntoPandasLikeExpr, IntoArrowExpr]
CompliantNamespace = Union[PandasLikeNamespace, ArrowNamespace, DaskNamespace]
CompliantExpr = Union[PandasLikeExpr, ArrowExpr, DaskExpr]
IntoCompliantExpr = Union[IntoPandasLikeExpr, IntoArrowExpr, IntoDaskExpr]
IntoCompliantExprT = TypeVar("IntoCompliantExprT", bound=IntoCompliantExpr)
CompliantExprT = TypeVar("CompliantExprT", bound=CompliantExpr)
CompliantSeries = Union[PandasLikeSeries, ArrowSeries]
ListOfCompliantSeries = Union[list[PandasLikeSeries], list[ArrowSeries]]
ListOfCompliantExpr = Union[list[PandasLikeExpr], list[ArrowExpr]]
CompliantDataFrame = Union[PandasLikeDataFrame, ArrowDataFrame]
ListOfCompliantSeries = Union[
list[PandasLikeSeries], list[ArrowSeries], list[DaskExpr]
]
ListOfCompliantExpr = Union[list[PandasLikeExpr], list[ArrowExpr], list[DaskExpr]]
CompliantDataFrame = Union[PandasLikeDataFrame, ArrowDataFrame, DaskLazyFrame]

T = TypeVar("T")

Expand Down Expand Up @@ -63,6 +69,14 @@ def evaluate_into_exprs(
) -> list[ArrowSeries]: ...


@overload
def evaluate_into_exprs(
df: DaskLazyFrame,
*exprs: IntoDaskExpr,
**named_exprs: IntoDaskExpr,
) -> list[DaskExpr]: ...


def evaluate_into_exprs(
df: CompliantDataFrame,
*exprs: IntoCompliantExprT,
Expand Down
32 changes: 32 additions & 0 deletions tests/dask_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,38 @@ def test_str_to_lowercase(
compare_dicts(result_frame, expected)


def test_select() -> None:
import dask.dataframe as dd

data = {"a": [1, 3, 2], "b": [4, 4, 6], "z": [7.0, 8, 9]}
df = nw.from_native(dd.from_pandas(pd.DataFrame(data)))
result = df.select("a", nw.col("b") + 1, (nw.col("z") * 2).alias("z*2"))
expected = {"a": [1, 3, 2], "b": [5, 5, 7], "z*2": [14.0, 16.0, 18.0]}
compare_dicts(result, expected)


def test_str_only_select() -> None:
import dask.dataframe as dd

data = {"a": [1, 3, 2], "b": [4, 4, 6], "z": [7.0, 8, 9]}
df = nw.from_native(dd.from_pandas(pd.DataFrame(data)))
result = df.select("a", "b")
expected = {"a": [1, 3, 2], "b": [4, 4, 6]}
compare_dicts(result, expected)


def test_empty_select() -> None:
import dask.dataframe as dd

result = (
nw.from_native(dd.from_pandas(pd.DataFrame({"a": [1, 2, 3]})))
.lazy()
.select()
.collect()
)
assert result.shape == (0, 0)


def test_dt_year() -> None:
import dask.dataframe as dd

Expand Down

0 comments on commit 9b60ed7

Please sign in to comment.