Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] Define a Dataset protocol based on Substrait and C Data Interface #37504

Open
wjones127 opened this issue Sep 1, 2023 · 8 comments
Open

Comments

@wjones127
Copy link
Member

wjones127 commented Sep 1, 2023

Describe the enhancement requested

Based on discussion in the 2023-08-30 Arrow community meeting. This is a continuation of #35568 and #33986.

We'd like to have a protocol for sharing unmaterialized datasets that:

  1. Can be consumed as one or more streams of Arrow data
  2. Can have projections and filters pushed down to the scanner

This would provide a extendible connection between scanners and query engines. Data formats might include Iceberg, Delta Lake, Lance, and PyArrow datasets (parquet, JSON, CSV). Query engines could include DuckDB, DataFusion, Polars, PyVelox, PySpark, Ray, and Dask. Such a connection would let end-users employ their preferred query engine to load any supported dataset. From their perspective, usage would might look like:

from deltalake import DeltaTable
table = DeltaTable("path/to/table")

import duckdb
duckdb.sql("SELECT y FROM table WHERE x > 3")

The protocol is largely invisible to the user. Behind the scenes, duckdb would call __arrow_scanner__() on table to get a scannable object. It would then pass down the column selection ['y'] and the filter x > 3 to the scanner, and get the get the resulting data stream as input to the query.

Shape of the protocol

The overall shape would look roughly like:

from abc import ABC

class AbstractArrowScannable(ABC):
    def __arrow_scanner__(self) -> AbstractArrowScanner


class AbstractArrowScanner(ABC):
    def get_schema(self) -> capsule[ArrowSchema]:
        ...

    def get_stream(
        self,
        columns: List[str],
        filter: SubstraitExpression,
    ) -> capsule[ArrowArrayStream]:
        ...

    def get_partitions(self, filter: filter: SubstraitExpression) -> list[AbstractArrowScanner]:
        ...

Data and schema are returned as C Data Interface objects (see: #35531). Predicates are passed as Substrait extended expressions.

Component(s)

Python

@paleolimbot
Copy link
Member

Is schema negotiation outside the scope of this protocol? If get_schema() contains a Utf8View, for example, is it the consumer's responsibility to do the cast, or can the consumer pass a schema with Utf8View columns as Utf8 to get_stream() (or another method)?

@wjones127
Copy link
Member Author

Is schema negotiation outside the scope of this protocol?

I think we can include that. I'd like to design that as part of the PyCapsule API first, so we match the semantics there.

@wjones127
Copy link
Member Author

Haven't had time to work on this, but wanted to note here a current pain point for users of the dataset API is that there aren't table statistics the caller can access, and this leads to bad join orders. Some mentions of this here:

https://twitter.com/mim_djo/status/1740542585410814393
delta-io/delta-rs#1838

@pitrou
Copy link
Member

pitrou commented Feb 27, 2024

Are we sure a blocking API like this would be palatable for existing execution engines such as Acero, DuckDB... ?

Of course, at worse the various method/function calls can be offloaded to a dedicated thread pool.

@wjones127
Copy link
Member Author

Are we sure a blocking API like this would be palatable?

Are you referring to the fact they would have to acquire the GIL to call these methods? Or something else?

Ideally all these methods are brief.

Though I haven't discussed this in depth with implementors of query engines. I'd be curious for their thoughts.

@pitrou
Copy link
Member

pitrou commented Feb 27, 2024

Are we sure a blocking API like this would be palatable?

Are you referring to the fact they would have to acquire the GIL to call these methods? Or something else?

No, to the fact that these functions are synchronous.

Ideally all these methods are brief.

I'm not sure. get_partitions will typically have to walk a filesystem, which can be long-ish especially on large datasets or remote filesystems.

@paleolimbot
Copy link
Member

Perhaps get_partitions(...) -> Iterable[AbstractArrowScanner] would do it? Not sure if anybody is interested in asyncio for this but an async iterator might work.

@pitrou
Copy link
Member

pitrou commented Feb 27, 2024

An Iterable would probably be better indeed. It would not solve the async use case directly but we would at least allow producing results without blocking on the entire filesystem walk.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants