Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backend-library dispatching for Dask collections: Initial Draft #1

Merged
merged 11 commits into from
Nov 2, 2022

Conversation

rjzamora
Copy link
Member

Initial design-document draft for a new backend-library dispatching system for Dask-Array and Dask-Dataframe.

Abstract

We propose a mechanism for configurable backend-library dispatching in the dask.array.Array and dask.dataframe._Frame-based classes. In contrast to the data-type dispatching already used within Dask at computation time, the new system is designed to operate at the collection level, with the primary target being the creation of new objects (i.e. input IO). With this system in place, the user's Dask configuration file can be used to specify that a non-default backend library should be used to create new collections.

@rjzamora rjzamora marked this pull request as ready for review March 10, 2022 15:59
@rjzamora
Copy link
Member Author

Feel free to advise on the general document organization/process used here @fjetter :)

@rjzamora
Copy link
Member Author

@jsignell , @gjoseph92 , and @jcrist - Would the three of you be interested in reviewing this document/proposal? If any of you don't have time, just let me know and I can propose someone else :)

@jsignell jsignell self-requested a review March 15, 2022 16:57
Copy link
Member

@jsignell jsignell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this proposal is sound. I have a bunch of small questions, but generally am in favor of this work! Thanks for writing this up so clearly :)

001-collection-backend-dispatching.md Show resolved Hide resolved
001-collection-backend-dispatching.md Outdated Show resolved Hide resolved
001-collection-backend-dispatching.md Outdated Show resolved Hide resolved

### Registering a New Backend (`DaskBackendEntrypoint`)

In order to allow backend registration outside of the Dask source code, we propose that Dask approximately follow [the approach taken by xarray for custom backend interfaces](https://xarray.pydata.org/en/stable/internals/how-to-add-new-backend.html). That is, external libraries should be able to leverage "entrypoints" to tell Dask to register compatible backends in Dask-Array and Dask-DataFrame at run time. To this end, the external library could be expected to define all dispatch IO logic within a `DaskBackendEntrypoint` subclass. For example, a cudf-based subclass would look something like the `CudfBackendEntrypoint` definition below:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like the idea of following xarray. It would probably be worth checking in with some xarray folks to see if there is anything that they wish they had done differently.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! @crusaderky - Do you know who we can/should ask about this? :)

def read_json(self, *args, engine=None, **kwargs):
return self.fallback.read_json(*args, engine=cudf.read_json, **kwargs)

def read_orc(self, *args, **kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which library is responsible for validating the args and kwargs? Does dask do any normalization before passing off to the backend?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which library is responsible for validating the args and kwargs?

For the current reference implementation: The active backend is responsible. Dask will simply pass through (*args, **kwargs) as defined by the user.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok that seems good to me. It just means that there might be a lot of duplication right? But I guess there is probably already a lot of duplication because dask_cudf already exists.

...

def read_json(self, *args, engine=None, **kwargs):
return self.fallback.read_json(*args, engine=cudf.read_json, **kwargs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When does the move_from_fallback get called? Is DaskBackendEntrypoint responsible for that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the reference implementation, this happens in Dask during dispatching. The DaskBackendEntrypoint is responsible for defining what the fallback entrypoint is, but not for doing the actual falling back.

This can certainly be changed if we think of a better way to get what we want/need.


Once the `DaskBackendEntrypoint` subclass is defined, the new entrypoint can be declared in the library's `setup.py` file (specifying the class with a `"dask.backends"` entrypoint).

Note that the `CudfBackendEntrypoint` example above selects `PandasBackendEntrypoint` as the fallback entrypoint class, but does not directly inherit from this reference class. This approach allows Dask to properly move data from the pandas fallback for any IO functions that lack cudf-specific definitions. If the cudf subclass were to directly inherit from `PandasBackendEntrypoint`, then "fallback" behavior would not result in data-movement or user warnings.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DaskBackendEntrypoint should have all possible methods, and they should all raise NotImplementedError right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do this, but we would probably want to add/use DataFrameBackendEntrypoint and ArrayBackendEntrypoint classes to avoid using a single class for both array and dataframe-specific functions.

The actual dispatching of IO functions will require the definition of a new `BackendDispatch` class in `dask.utils`. In contrast to the existing `dask.utils.Dispatch` class, `BackendDispatch` will use a backend string label (e.g. "pandas") for registration and dispatching, and the dispatching logic will be implemented at the `__getattr__` level (rather than in `__call__`). More specifically, registered "keys" and "values" for the dispatch class will correspond to backend labels and `DaskBackendEntrypoint` subclasses, respectively. When some Dask-collection code calls something like `backend_dispatch.read_parquet`, dispatching logic will be used to return the appropriate `"read_parquet"` attribute for the current backend.

In order to avoid moving numpy- or pandas-specific IO logic into `backends.py`, the existing IO functions will be renamed to `*_pandas`, and referenced "in place". To insure that the real IO functions are still defined at the same absolute and relative paths, and that the original doc-strings are recognized, we can add a few lines below the `*_pandas` definition to direct the original function name to the dispatching machinery:

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd prefer to move the logic into backends. What is the rationale for not doing that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rational is just that we would be moving thousands of lines of code (all the current IO code) into backends.py. There is obviously a middle ground, but the "ideal" middle ground wasn't obvious to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for me, as long as we can keep git blame I don't mind moving code. And I think we can keep git blame by renaming the existing files.

def read_parquet(*args, **kwargs):
return dataframe_backend_dispatch.read_parquet(*args, **kwargs)

read_parquet.__doc__ = read_parquet_pandas.__doc__
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm interested in the docstrings. Often we inherit them from pandas and just augment them a bit, but maybe they should also be part of the dispatch mechanism.


The primary alternative to the dispatch-based changes proposed here is to standardize the `engine=` argument for all input-IO functions in the Array and DataFrame collection APIs. The defaults for this `engine` arguments could depend on one or more fields in `dask.config`, but the logic for selecting/using the desired backend would need to be added to every IO function. There are already a few Dask-DataFrame IO functions (e.g. `read_parquet`, `read_json`) that leverage an `engine` keyword to effectively utilize different library backends for IO. However, the specific usage of `engine=` is inconsistent between the various IO functions, and does **not** necessarily correspond to the use of a distinct dataframe (or array) backend library. In fact, the “pandas” backend already supports multiple engine options in `read_parquet`, and so the concept of an “engine” is already a bit different from that of a DataFrame “backend”. Therefore, it may be a significant challenge to design a general mapping between `engine` options and registered backends.

The alternative to the entry point-registration process proposed here is to follow the approach currently employed for `dask.utils.Dispatch`, where the user is expected to explicitly import the external code to ensure the alternative backend is properly registered. Otherwise, the backend definition would need to exist within the Dask source code itself.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think the entrypoints approach is much clearer for the user. There was some work a while ago that was trying to use entrypoints for some of our existing dispatches dask/dask#7688

@rjzamora
Copy link
Member Author

rjzamora commented Sep 9, 2022

@jsignell - I updated the document to include some open questions from your initial review. Do you think we should iterate on the document in this PR, or should we merge this and use follow-up PRs?

[EDIT: I'm actually planning to revise a few important details here - So, let's hold off on merging this]

@rjzamora rjzamora marked this pull request as draft September 9, 2022 21:04
@jsignell
Copy link
Member

I think we should merge this whenever you are happy with it.

@rjzamora rjzamora marked this pull request as ready for review September 13, 2022 16:06
rapids-bot bot pushed a commit to rapidsai/cudf that referenced this pull request Oct 20, 2022
This PR depends on dask/dask#9475 (**Now Merged**)

After dask#9475, external libraries are now able to implement (and expose) their own `DataFrameBackendEntrypoint` definitions to specify custom creation functions for DataFrame collections. This PR introduces the `CudfBackendEntrypoint` class to create `dask_cudf.DataFrame` collections using the `dask.dataframe` API. By installing `dask_cudf` with this entrypoint definition in place, you get the following behavior in `dask.dataframe`:

```python
import dask.dataframe as dd
import dask

# Tell Dask that you want to create DataFrame collections
# with the "cudf" backend (for supported creation functions).
# This can also be used in a context, or set in a yaml file
dask.config.set({"dataframe.backend": "cudf"})

ddf = dd.from_dict({"a": range(10)}, npartitions=2)
type(ddf)  # dask_cudf.core.DataFrame
```

Note that the code snippet above does not require an explicit import of `cudf` or `dask_cudf`. The following creation functions will support backend dispatching after dask#9475:

- `from_dict`
- `read_paquet`
- `read_json`
- `read_orc`
- `read_csv`
- `read_hdf`

See also: dask/design-docs#1

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - GALI PREM SAGAR (https://github.com/galipremsagar)

URL: #11920
@quasiben
Copy link
Member

What should we do with this PR ? Merge it in or close it ?

@rjzamora
Copy link
Member Author

What should we do with this PR ? Merge it in or close it ?

Oops - Forgot we never merged this. I'll update the text to be consistent with the code in main, and then I'll suggest a merge

@rjzamora
Copy link
Member Author

Update: This should be fine to merge. There may be minor typos, but the general information should be up-to-date.

Copy link
Member

@jacobtomlinson jacobtomlinson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merging! Thanks @rjzamora.

@jacobtomlinson jacobtomlinson merged commit 143ab5e into dask:main Nov 2, 2022
@rjzamora rjzamora deleted the backend-dispatching branch November 2, 2022 13:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants