-
-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Backend-library dispatching for Dask collections: Initial Draft #1
Conversation
Feel free to advise on the general document organization/process used here @fjetter :) |
@jsignell , @gjoseph92 , and @jcrist - Would the three of you be interested in reviewing this document/proposal? If any of you don't have time, just let me know and I can propose someone else :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this proposal is sound. I have a bunch of small questions, but generally am in favor of this work! Thanks for writing this up so clearly :)
|
||
### Registering a New Backend (`DaskBackendEntrypoint`) | ||
|
||
In order to allow backend registration outside of the Dask source code, we propose that Dask approximately follow [the approach taken by xarray for custom backend interfaces](https://xarray.pydata.org/en/stable/internals/how-to-add-new-backend.html). That is, external libraries should be able to leverage "entrypoints" to tell Dask to register compatible backends in Dask-Array and Dask-DataFrame at run time. To this end, the external library could be expected to define all dispatch IO logic within a `DaskBackendEntrypoint` subclass. For example, a cudf-based subclass would look something like the `CudfBackendEntrypoint` definition below: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really like the idea of following xarray. It would probably be worth checking in with some xarray folks to see if there is anything that they wish they had done differently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! @crusaderky - Do you know who we can/should ask about this? :)
def read_json(self, *args, engine=None, **kwargs): | ||
return self.fallback.read_json(*args, engine=cudf.read_json, **kwargs) | ||
|
||
def read_orc(self, *args, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which library is responsible for validating the args and kwargs? Does dask do any normalization before passing off to the backend?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which library is responsible for validating the args and kwargs?
For the current reference implementation: The active backend is responsible. Dask will simply pass through (*args, **kwargs)
as defined by the user.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok that seems good to me. It just means that there might be a lot of duplication right? But I guess there is probably already a lot of duplication because dask_cudf already exists.
... | ||
|
||
def read_json(self, *args, engine=None, **kwargs): | ||
return self.fallback.read_json(*args, engine=cudf.read_json, **kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When does the move_from_fallback
get called? Is DaskBackendEntrypoint
responsible for that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the reference implementation, this happens in Dask during dispatching. The DaskBackendEntrypoint
is responsible for defining what the fallback entrypoint is, but not for doing the actual falling back.
This can certainly be changed if we think of a better way to get what we want/need.
|
||
Once the `DaskBackendEntrypoint` subclass is defined, the new entrypoint can be declared in the library's `setup.py` file (specifying the class with a `"dask.backends"` entrypoint). | ||
|
||
Note that the `CudfBackendEntrypoint` example above selects `PandasBackendEntrypoint` as the fallback entrypoint class, but does not directly inherit from this reference class. This approach allows Dask to properly move data from the pandas fallback for any IO functions that lack cudf-specific definitions. If the cudf subclass were to directly inherit from `PandasBackendEntrypoint`, then "fallback" behavior would not result in data-movement or user warnings. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DaskBackendEntrypoint
should have all possible methods, and they should all raise NotImplementedError right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could do this, but we would probably want to add/use DataFrameBackendEntrypoint
and ArrayBackendEntrypoint
classes to avoid using a single class for both array and dataframe-specific functions.
The actual dispatching of IO functions will require the definition of a new `BackendDispatch` class in `dask.utils`. In contrast to the existing `dask.utils.Dispatch` class, `BackendDispatch` will use a backend string label (e.g. "pandas") for registration and dispatching, and the dispatching logic will be implemented at the `__getattr__` level (rather than in `__call__`). More specifically, registered "keys" and "values" for the dispatch class will correspond to backend labels and `DaskBackendEntrypoint` subclasses, respectively. When some Dask-collection code calls something like `backend_dispatch.read_parquet`, dispatching logic will be used to return the appropriate `"read_parquet"` attribute for the current backend. | ||
|
||
In order to avoid moving numpy- or pandas-specific IO logic into `backends.py`, the existing IO functions will be renamed to `*_pandas`, and referenced "in place". To insure that the real IO functions are still defined at the same absolute and relative paths, and that the original doc-strings are recognized, we can add a few lines below the `*_pandas` definition to direct the original function name to the dispatching machinery: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'd prefer to move the logic into backends. What is the rationale for not doing that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rational is just that we would be moving thousands of lines of code (all the current IO code) into backends.py
. There is obviously a middle ground, but the "ideal" middle ground wasn't obvious to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for me, as long as we can keep git blame I don't mind moving code. And I think we can keep git blame by renaming the existing files.
def read_parquet(*args, **kwargs): | ||
return dataframe_backend_dispatch.read_parquet(*args, **kwargs) | ||
|
||
read_parquet.__doc__ = read_parquet_pandas.__doc__ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm interested in the docstrings. Often we inherit them from pandas and just augment them a bit, but maybe they should also be part of the dispatch mechanism.
|
||
The primary alternative to the dispatch-based changes proposed here is to standardize the `engine=` argument for all input-IO functions in the Array and DataFrame collection APIs. The defaults for this `engine` arguments could depend on one or more fields in `dask.config`, but the logic for selecting/using the desired backend would need to be added to every IO function. There are already a few Dask-DataFrame IO functions (e.g. `read_parquet`, `read_json`) that leverage an `engine` keyword to effectively utilize different library backends for IO. However, the specific usage of `engine=` is inconsistent between the various IO functions, and does **not** necessarily correspond to the use of a distinct dataframe (or array) backend library. In fact, the “pandas” backend already supports multiple engine options in `read_parquet`, and so the concept of an “engine” is already a bit different from that of a DataFrame “backend”. Therefore, it may be a significant challenge to design a general mapping between `engine` options and registered backends. | ||
|
||
The alternative to the entry point-registration process proposed here is to follow the approach currently employed for `dask.utils.Dispatch`, where the user is expected to explicitly import the external code to ensure the alternative backend is properly registered. Otherwise, the backend definition would need to exist within the Dask source code itself. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think the entrypoints approach is much clearer for the user. There was some work a while ago that was trying to use entrypoints for some of our existing dispatches dask/dask#7688
@jsignell - I updated the document to include some open questions from your initial review. Do you think we should iterate on the document in this PR, or should we merge this and use follow-up PRs? [EDIT: I'm actually planning to revise a few important details here - So, let's hold off on merging this] |
I think we should merge this whenever you are happy with it. |
This PR depends on dask/dask#9475 (**Now Merged**) After dask#9475, external libraries are now able to implement (and expose) their own `DataFrameBackendEntrypoint` definitions to specify custom creation functions for DataFrame collections. This PR introduces the `CudfBackendEntrypoint` class to create `dask_cudf.DataFrame` collections using the `dask.dataframe` API. By installing `dask_cudf` with this entrypoint definition in place, you get the following behavior in `dask.dataframe`: ```python import dask.dataframe as dd import dask # Tell Dask that you want to create DataFrame collections # with the "cudf" backend (for supported creation functions). # This can also be used in a context, or set in a yaml file dask.config.set({"dataframe.backend": "cudf"}) ddf = dd.from_dict({"a": range(10)}, npartitions=2) type(ddf) # dask_cudf.core.DataFrame ``` Note that the code snippet above does not require an explicit import of `cudf` or `dask_cudf`. The following creation functions will support backend dispatching after dask#9475: - `from_dict` - `read_paquet` - `read_json` - `read_orc` - `read_csv` - `read_hdf` See also: dask/design-docs#1 Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) URL: #11920
What should we do with this PR ? Merge it in or close it ? |
Oops - Forgot we never merged this. I'll update the text to be consistent with the code in |
Update: This should be fine to merge. There may be minor typos, but the general information should be up-to-date. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merging! Thanks @rjzamora.
Initial design-document draft for a new backend-library dispatching system for Dask-Array and Dask-Dataframe.