Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic backend dispatching to dask-expr #728

Merged
merged 18 commits into from
Jan 19, 2024
Merged

Conversation

rjzamora
Copy link
Member

@rjzamora rjzamora commented Jan 12, 2024

It seems like we will temporarily need to allow external libraries (i.e. cudf) to register a DaskBackendEntrypoint for both dask.dataframe and dask_expr. I think the easiest approach is to let cudf use a "dask-expr.dataframe.backend" entrypoint path for dask-expr (and continue using "dask.dataframe.backend" in the legacy dask.dataframe API).

This PR also introduces a simple get_collection_type dispatch function (the dask-expr version of get_parallel_type). This completely breaks cudf support for now, but dask-expr + cudf is already broken, and this is probably the only way to fix it anyway.

NOTE: This PR does not introduce any new dispatching mechanisms (like #321). It just adds enough code to support the existing dispatching mechanisms used in dask.dataframe.

@rjzamora rjzamora changed the title [WIP] Add basic creation dispatching to dask-expr [WIP] Add basic backend dispatching to dask-expr Jan 16, 2024
@rjzamora rjzamora changed the title [WIP] Add basic backend dispatching to dask-expr Add basic backend dispatching to dask-expr Jan 16, 2024
@rjzamora rjzamora marked this pull request as ready for review January 16, 2024 21:06
@rjzamora
Copy link
Member Author

@phofl - I know you are busy with many things, but I'd be interested to know if you have any thoughts/concerns about this PR. My hope is that there is nothing controversial here: We are reusing the existing dispatching mechanisms and conventions from dask.dataframe, and nothing should get in the way of default/pandas development.

try:
impl = self._lookup[backend]
except KeyError:
entrypoints = detect_entrypoints(f"dask_expr.{self._module_name}.backends")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only reason we cannot use CreationDispatch directly is the fact that dask_cudf is already using the "dask.dataframe.backends" path to expose the "cudf" backend entrypoint for dask.dataframe.

We can just use CreationDispatch directly after dask/dask#10794 makes it possible to modify the entrypoint path upon initialization.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, removed this now that dask/dask#10794 is in.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scratch that - I should probably wait for the next dask/dask release to remove DXCreationDispatch

Comment on lines 28 to 33
dataframe_creation_dispatch = DXCreationDispatch(
module_name="dataframe",
default="pandas",
entrypoint_class=DataFrameBackendEntrypoint,
name="dataframe_creation_dispatch",
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we want to introduce a new CreationDispatch object for dask-expr, because we want to distinguish "legacy" dask-dataframe creation functions from newer expression-based creation functions (to be discovered at the "dask_expr.dataframe.backend" entrypoint path instead of "dask.dataframe.backend").

In both legacy and expr-based code, the same "dask.dataframe.backend" config value is used to specify the default backend.

Comment on lines 3106 to +3110
def new_collection(expr):
"""Create new collection from an expr"""

meta = expr._meta
expr._name # Ensure backend is imported
if is_dataframe_like(meta):
return DataFrame(expr)
elif is_series_like(meta):
return Series(expr)
elif is_index_like(meta):
return Index(expr)
else:
return Scalar(expr)
return get_collection_type(meta)(expr)
Copy link
Member Author

@rjzamora rjzamora Jan 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we have been using new_collection from the beginning, because we knew we needed to make this change to support other backends (i.e. cudf) long term.

The behavior of new_collection is now consistent with the behavior of new_dd_object in dask.dataframe.


from dask.utils import Dispatch

get_collection_type = Dispatch("get_collection_type")
Copy link
Member Author

@rjzamora rjzamora Jan 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we will need many dispatch functions here. So, we could also define this in dask.datafame.dispatch.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now publicly exposed as dask_expr.get_collection_type. This can always be moved to dask.datafame.dispatch in the future.

@phofl
Copy link
Collaborator

phofl commented Jan 19, 2024

I've disabled released ci because we enabled a couple of other things that aren't in released dask yet. You can remove everything that depends on released here and wouldn't be necessary with main now.

I don't intend to release before the next dask release anyway, so testing against the released version isn't very important

@rjzamora
Copy link
Member Author

You can remove everything that depends on released here and wouldn't be necessary with main now.

Awesome. Thanks!

@phofl phofl merged commit 13af21d into dask:main Jan 19, 2024
5 checks passed
@phofl
Copy link
Collaborator

phofl commented Jan 19, 2024

thx

@rjzamora rjzamora deleted the creation-dispatch branch January 19, 2024 15:51
rapids-bot bot pushed a commit to rapidsai/cudf that referenced this pull request Mar 11, 2024
Mostly addresses #15027

dask/dask-expr#728 exposed the necessary mechanisms for us to define a custom dask-expr backend for `cudf`. The new dispatching mechanisms are effectively the same as those in `dask.dataframe`. The only difference is that we are now registering/implementing "expression-based" collections.

This PR does the following:
- Defines a basic `DataFrameBackendEntrypoint` class for collection creation, and registers new collections using `get_collection_type`.
- Refactors the `dask_cudf` import structure to properly support the `"dataframe.query-planning"` configuration.
- Modifies CI to test dask-expr support for some of the `dask_cudf` tests. This coverage can be expanded in follow-up work.

~**Experimental Change**: This PR patches `dask_expr._expr.Expr.__new__` to enable type-based dispatching. This effectively allows us to surgically replace problematic `Expr` subclasses that do not work for cudf-backed data. For example, this PR replaces the upstream `TakeLast` expression to avoid using `squeeze` (since this method is not supported by cudf). This particular fix can be moved upstream relatively easily. However, having this kind of "patching" mechanism may be valuable for more complicated pandas/cudf discrepancies.~

## Usage example

```python
from dask import config
config.set({"dataframe.query-planning": True})
import dask_cudf

df = dask_cudf.DataFrame.from_dict(
    {"x": range(100), "y":  [1, 2, 3, 4] * 25, "z": ["1", "2"] * 50},
    npartitions=10,
)
df["y2"] = df["x"] + df["y"]
agg = df.groupby("y").agg({"y2": "mean"})["y2"]
agg.simplify().pprint()
```
Dask cuDF should now be using dask-expr for "query planning":
```
Projection: columns='y2'
  GroupbyAggregation: arg={'y2': 'mean'} observed=True split_out=1'y'
    Assign: y2=
      Projection: columns=['y']
        FromPandas: frame='<dataframe>' npartitions=10 columns=['x', 'y']
      Add:
        Projection: columns='x'
          FromPandas: frame='<dataframe>' npartitions=10 columns=['x', 'y']
        Projection: columns='y'
          FromPandas: frame='<dataframe>' npartitions=10 columns=['x', 'y']
```

## TODO

- [x] Add basic tests
- [x] Confirm that general design makes sense

**Follow Up Work**:

- Expand dask-expr test coverage
- Fix local and upstream bugs
- Add documentation once "critical mass" is reached

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)
  - Lawrence Mitchell (https://github.com/wence-)
  - Vyas Ramasubramani (https://github.com/vyasr)
  - Bradley Dice (https://github.com/bdice)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)
  - Ray Douglass (https://github.com/raydouglass)

URL: #14805
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants