Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python][Rust] Create extension point in python for Dataset/Scanner #33986

Open
changhiskhan opened this issue Feb 1, 2023 · 33 comments · May be fixed by #35568
Open

[Python][Rust] Create extension point in python for Dataset/Scanner #33986

changhiskhan opened this issue Feb 1, 2023 · 33 comments · May be fixed by #35568

Comments

@changhiskhan
Copy link

Describe the enhancement requested

As the Arrow ecosystem grows ever richer, desire paths emerge :)

Integrating Arrow based projects written in Rust works great across the C data interface. But it doesn't allow lazy execution or pushdowns in the same way that pyarrow Dataset/Scanner's do.

My proposal here is to expose Dataset/Scanner python abc's with s.t. rust libraries can extend via pyo3+python so higher level tooling (like duckdb for example, can query these without having to transfer the whole Table into memory first).

In keeping with the same principles as the C data interface, I think it would be sufficient for this python interface to be very minimal: Dataset with schema and scanner methods, Scanner with projected_schema and to_reader methods. The to_reader should return a RecordBatchReader which would then link pyo3 datasets into the Arrow C data interface.

Thanks for your consideration!

Component(s)

Python

@changhiskhan
Copy link
Author

to be precise, this is already doable today as a workaround, what i'm proposing here is:

  1. formalize this extension point
  2. if possible, make the top level abstraction pure python, so subclasses doesn't need to deal with cython etc if coming from Rust

@lidavidm
Copy link
Member

lidavidm commented Feb 1, 2023

From a quick read, is this about formalizing an extension point per se or actually just an interface? Some way to say, "given a filter/column selection, please try to push these down and give me a C Data Interface stream"

@wjones127
Copy link
Member

I've had similar challenges with supporting datasets in delta-rs. Another aspect you'll need to think about is supporting Filesystems. In Rust, that means calling into Python functions, which I fear can be sub-optimal because of the GIL. I don't think there's a practical way to directly access the underlying C++ implemented FS unless we made the ABI stable (which I don't see us doing in the foreseeable future).

A route I'm exploring right now is using the ADBC as a stable ABI for pushing down scan queries to storage formats and systems. It probably make more sense for table formats like Delta Lake, which have database-like semantics, than file formats like Lance (which I assume is the projection with the use case you are discussing).

if possible, make the top level abstraction pure python, so subclasses doesn't need to deal with cython etc if coming from Rust

It's harder, but part of me would prefer a stable C ABI, because it would mean the extension could be used in any language, not just Python.

@changhiskhan
Copy link
Author

From a quick read, is this about formalizing an extension point per se or actually just an interface? Some way to say, "given a filter/column selection, please try to push these down and give me a C Data Interface stream"

You're right. Good question. I think just the interface actually, so this issue is incorrectly named. Ie Rust packages could implement the same interface that higher level tooling will call, without necessarily needing to inherit from the cython classes defining Dataset/Scanner today.

@changhiskhan
Copy link
Author

I've had similar challenges with supporting datasets in delta-rs. Another aspect you'll need to think about is supporting Filesystems. In Rust, that means calling into Python functions, which I fear can be sub-optimal because of the GIL. I don't think there's a practical way to directly access the underlying C++ implemented FS unless we made the ABI stable (which I don't see us doing in the foreseeable future).

Does object store rs work for this?

A route I'm exploring right now is using the ADBC as a stable ABI for pushing down scan queries to storage formats and systems. It probably make more sense for table formats like Delta Lake, which have database-like semantics, than file formats like Lance (which I assume is the projection with the use case you are discussing).

if possible, make the top level abstraction pure python, so subclasses doesn't need to deal with cython etc if coming from Rust

Yeah most def pure python

It's harder, but part of me would prefer a stable C ABI, because it would mean the extension could be used in any language, not just Python.

@lidavidm
Copy link
Member

lidavidm commented Feb 1, 2023

It sounds like what you basically want is a

struct DataFrameProducer {
    int read(struct DataFrameProducer* self, ??? filters, ??? selection, struct ArrowArrayStream* out);
};

with corresponding wrappers/carriers in Python, Rust, Go, etc. Then this can be fed into DuckDB, Ballista, Acero, etc. and can be produced by ADBC, Acero, DuckDB, etc.

You can have nearly both "pure Python" and "C ABI", I think. A Python-level interface could be transformed by PyArrow into the C ABI and vice versa. Where possible, the Python-level interface should let you 'extract' the underlying C ABI, if it exists, but otherwise we can push the responsibility of the GIL and such into PyArrow (or something like that). (So basically, shove all the non-Python code into PyArrow.)

The question is what the filter/selection format should be; ideally it would be language agnostic and implementation agnostic and so Dataset's expressions aren't great there.

@wjones127
Copy link
Member

The question is what the filter/selection format should be; ideally it would be language agnostic and implementation agnostic and so Dataset's expressions aren't great there.

Substrait recently merged a message type for standalone expression lists. :)

substrait-io/substrait#405

@kou kou changed the title [python][rust]Create extension point in python for Dataset/Scanner [Python][Rust] Create extension point in python for Dataset/Scanner Feb 2, 2023
@westonpace
Copy link
Member

What if we created a read type in substrait for datasets? It would require encoding file system options in Substrait as well. However, this seems like a valuable feature.

Then the C API / interface is just the typical substrait consumer API of "substrait plan in, record batch reader out"

@jorisvandenbossche
Copy link
Member

I have been recently thinking about this as well, triggered by our work to support the DataFrame Interchange protocol in pyarrow (#14804, https://data-apis.org/dataframe-protocol/latest/purpose_and_scope.html).
For some context, this "interchange protocol" is a python specific one to allow interchanging dataframe-data, typically between different dataframe libraries (eg convert a dataframe of library1 into a dataframe of library2 without both libraries having to know about each other). Essentially, this is very similar to our C Data Interface (it specifies how to share the memory, and in the end also has pointers to buffers of a certain size, just like the C struct), but because it is a python API, it gives you some functionality on top of those raw buffers (or, before you get to the buffers).

As illustration, using latest pyarrow (using a pyarrow.Table, so all data is already in memory, but we could add the same to something backed by a not-yet materialized stream or data on disk):

>>> table = pa.table({'a': [1, 2, 3], 'b': [4, 5, 6]})
# accessing this interchange object
>>> interchange_object = table.__dataframe__()
>>> interchange_object
<pyarrow.interchange.dataframe._PyArrowDataFrame at 0x7ffb64f45420>
# this doesn't yet need to materialize all the buffers, but you can inspect metadata
>>> interchange_object.num_columns()
2
>>> interchange_object.column_names()
['a', 'b']
# you can select a subset (i.e. simple projection)
>>> subset = interchange_object.select_columns_by_name(['a'])
>>> subset.num_columns()
1
# only when actually asking for the buffers of one chunk of a column, the data needs
# to be in memory (to pass a pointer to the buffers)
>>> subset.get_column_by_name("a").get_buffers()
{'data': (PyArrowBuffer({'bufsize': 24, 'ptr': 140717840175104, 'device': 'CPU'}),
  (<DtypeKind.INT: 0>, 64, 'l', '=')),
 'validity': None,
 'offsets': None}

To be clear, I don't propose we do something exactly like that (and personally, I think it's also a missed opportunity for the DataFrame Interchange protocol to not use Arrow for the memory layout specification, and not give access to data as arrow data or using the Arrow C Interface).
But it has some nice properties compared to our "raw" C Data Interface. It allows to first inspect the data (columns, data types) before you actually start materializing/consuming the data chunk by chunk, and allows you to get the data of only a subset of the columns. While when using the C Stream for a Table or generically RecordBatchReader, you directly get the struct with all buffers for all columns. So if you don't need all columns, you need to ensure to first subset the data before accessing the C interface data, but this means you need to know the API of the object you are getting the data from (partly removing the benefit of the generality of the C Interface).

So having some standard Python interface that would give you delayed and queryable (filter/simple projection) access to Arrow C Interface data sounds really interesting, or having this as an additional C ABI (for which we can still provide such a Python interface as well) like David sketched above.

@westonpace
Copy link
Member

From an implementation perspective I suspect we can satisfy any of these proposed APIs. If we need to come up with a new API then my preference is Substrait, but if the consensus heads in some other direction I'm fine with that too.

@jorisvandenbossche , your proposal seems fine, but I don't see anything in there for filesystems. I think this is for on-disk data moreso than purely in-memory data. Though I believe your approach could be adapted to include filesystems.

Does object store rs work for this?

Yes, I would assume that object store rs would be able to satisfy this but I'm not familiar with the capabilities. For example, my idea of how this would work in Substrait would be:

# This would be usable as ReadRel::read_type
message Dataset {

  # This is already definedin ReadRel and is basically a list of files
  # and a format object which defines things like delimiter (for CSV)
  LocalFiles files = 0;
  oneof filesystem {
    LocalFilesystem = 1;
    S3Filesystem = 2;
    ExtensionFilesystem = 3;
  }

  message LocalFilesystem {}
  message S3Filesystem {
    string region;
    string client_id;
    string client_secret; // could be omitted if credentials negotiated elsewhere
  }
  message ExtensionFilesystem {
    google.protobuf.Any details;
  }

}

The equivalent C interface would just be structifying those messages.

@jorisvandenbossche
Copy link
Member

What's the reason that the potential interface would need to be aware of filesystems?

@westonpace
Copy link
Member

I think the goal is to create a C API capable of representing datasets? I think that would require filesystems. But I may be mistaken on either of these points.

@lidavidm
Copy link
Member

lidavidm commented Feb 3, 2023

Is the goal here dataset specifically, or an API with filters and column selections (of which dataset is an implementation)?

It sounds like what we have is a project that is the latter, but wants to take advantage of the API of the former to get access to things like DuckDB integration

@wjones127
Copy link
Member

The interface is for scanning the dataset, which is after the filesystems have been passed. So it's a separate concern. Yet it is still relevant to "how do I extend dataset" because your scanning implementation needs to use some filesystem. And that means the user needs to configure and pass one in.

The easiest for users is to take fsspec / PyArrow filesystems as Python interfaces, although performance may be impacted by the GIL. (I have started, but not finished, an implementation of ObjectStore in Rust that wraps fsspec filesystems here.) Or you can allow configuring a native filesystem, but then it's another API users have to learn.

wants to take advantage of the API of the former to get access to things like DuckDB integration

IMO the current DuckDB integration feels a little silly. It manipulated Python objects until it can get a RBR and the exports that through the C data interface. The same code is duplicated in the R package, except it manipulates R objects. And nothing is available in other languages. So part of me thinks it would be cleaner to replace that integration with this kind of C API, but that's for the DuckDB devs to decide :)

So there's sort of two questions:

  • How do I extend Dataset from a separate package, particularly if implemented in Rust? This is where the filesystem API / configuration stuff comes in.
  • How do I consume Dataset from a separate package? This is where the DuckDB integration comes in.

It's unclear to me rn whether we just want to create a C API you can use instead of Dataset to solve this, or make a C API on Dataset to solve these. The former is less complicated for sure, but not sure we want to sidestep the Dataset API like that.

@lidavidm
Copy link
Member

lidavidm commented Feb 3, 2023

I don't think it's either-or, I think we're tangling two concerns up here.

My proposal here is to expose Dataset/Scanner python abc's with s.t. rust libraries can extend via pyo3+python so higher level tooling (like duckdb for example, can query these without having to transfer the whole Table into memory first).

Dataset is sort of an API standard for this, or at least you can press it into service. But Dataset is also useful in its own right and a meaningful abstraction, so people want to extend it.

If we define a new API, of course Dataset should implement it!

How do I extend Dataset from a separate package

Dataset is already extensible. The real problem is the Python integration, and wheels/packaging questions on top of that. The toplevel proposal sounds like sidestepping that entirely by introducing a separate abstraction layer at the Python level (hence, exposing ABCs in Python).

IMO the current DuckDB integration feels a little silly.

I don't think they had a choice, because there's not really a formal API for what they really want :)

@changhiskhan
Copy link
Author

The toplevel proposal sounds like sidestepping that entirely by introducing a separate abstraction layer at the Python level (hence, exposing ABCs in Python).

Yup, that's exactly the proposal here.

I don't think they had a choice, because there's not really a formal API for what they really want :)

The main blocker in the current version of DuckDB is using the static methods in Scanner.from_dataset and similar. I made a PR to change that to use the instance method dataset.to_scanner. So next release it will be possible for Rust packages to disguise themselves as pyarrow datasets to DuckDB.

The issue is that you have to be really careful to override all of the methods in Dataset, or else it'll try to unwrap the non-existent CDataset and crash python. This is the main motivation for me proposing a pure python abstraction on top.

Would y'all be open to accepting a PR for this? or is there a more formal process to propose some details?

@wjones127
Copy link
Member

Others should chime in, but I think the best way to move forward is if you could open a draft PR with the proposed ADCs. Then we could discuss on the PR. (You could also create a design doc in a Google doc, but I think seeing the actual classes might be more useful in this case.)

@changhiskhan
Copy link
Author

ok sounds great. I should have time tmr to make the PR and that should give others time to chime in too. Thanks!

@westonpace
Copy link
Member

Yes, I will confess that I got a bit lost in all the possibilities so a concrete list of APIs/ABCs would be very helpful :)

@wjones127
Copy link
Member

I'm suddenly rather interested in seeing this through. Also have had a change of heart and think either what Chang is proposing (an ABC) or Joris (a protocol) is the way to go. ABC seems straightforward, but I'm eager to chat with Joris if he has ideas on why a protocol like the DataFrame protocol makes more sense. (Or maybe both could be combined? The protocol returns something that subclasses the ABC?)

Starting thinking about this in a Google doc: Making Arrow dataset into a protocol

Also wrote up another doc to share the perspective of delta-rs/deltalake on PyArrow Datasets: PyArrow Datasets and Python deltalake

@jorisvandenbossche
Copy link
Member

Thanks for reviving this and the write-up, Will!
(coincidentally, I was finishing up an earlier write up yesterday about a protocol for just the C Data Interface (so without more dataset-like capabilities like predicate/projection pushdown), and while there is potential overlap, opened the separate issue I was writing anyway: #35531)

What is still not clear to me (also after reading the doc) is the exact scope or goal of a potential protocol (where is exactly the extension point?) Some possible ways I could interpret it:

  1. An interface for consuming data from a dataset-like object, without having to be a pyarrow.dataset.Dataset (or Scanner) instance.
    • This is closer to "just" exposing an ArrowArrayStream, but with additional capabilities to inspect the schema and predicate/projection pushdown (before actually consuming the stream)
  2. An interface to describe a dataset source (fragment file paths, filesystem, ..) such that any dataset implementation can read data from a source specified with this interface
    • Would this essentially be like a substrait ReadRel ? (in terms of information that it would need to capture, maybe with additional things like fragment guarantees/statistics).
  3. An extension point (or ABC) specificially for the pyarrow.dataset implementation such that you can define/implement a dataset source without having to extend Arrow Dataset at the C++ level, but still plug it into a pyarrow Dataset/Scanner, such that you can make use of some aspects of the arrow implementation (for example, if you attach guarantees to your fragments, then you can rely on the Arrow Dataset implementation to handle the predicate pushdown) or use consumers that already have support for pyarrow Datasets. This is basically trying to make Arrow C++ Datasets more easily extensible.

Using the example of deltalake to duckdb, the three options would like like:

  1. The user creates a deltalake.DeltaTable object, which exposes a stream of data through some protocol. This table object is passed to a duckdb method, which no longer checks hardcoded for a pyarrow.Table/Dataset/Scanner, but checks for the generic protocol method being available. From that method, it can basically get the equivalent of a pyarrow Scanner(filter, projection) -> RecordBatchReader -> C Stream (but without hardcoding for the pyarrow APIs). The actual reading of the data itself is fully done by the deltalake implementation.
  2. The user creates a deltalake.DeltaTable object, which is passed to duckdb. Duckdb gets the information from this object about what to read (which files to read from where), but then reads it themselves. The actual reading here is done by a different library than the one that specified the source.
  3. The user lets deltalake create a pyarrow Dataset object on top of a deltalake fragment/scanner implementation. This object is passed to duckdb, and duckdb uses its current pyarrow integration to consume this data. The actual reading of the data is done by both deltalake and pyarrow.dataset (Arrow coordinating things for reading fragments and streaming that data, but the actual reading is done by the deltalake fragment implementation)

What is other's people understanding of what we are discussing / are looking for?

I suppose the two questions from #33986 (comment) overlap with this, and essentially ask the same.
But for example, the google doc currently mentions "filesystems" as one of the challenges, but if it's option 1 that we are interested in, I still don't fully understand how filesystems are involved (the filesystem (interaction) is fully defined by the producer (deltalake, or user of deltalake that created the deltalake Table object), and once you are reading data from that source (duckdb), you don't have to be aware of the filesystem details?)

@lidavidm
Copy link
Member

My question is still as follows:

Is the goal here dataset specifically, or an API with filters and column selections (of which dataset is an implementation)?

I think we're still confusing the two.

@lidavidm
Copy link
Member

For Joris's comment: I think we are proposing (3), but I think what people are after is (1). I'm also not sure why filesystems are suddenly in play.

(2) would be interesting, but it sounds like you're effectively https://xkcd.com/927/ -ing Delta Lake and Iceberg.

@jorisvandenbossche
Copy link
Member

Yes, and to be clear: I think what I would find most interesting is (1).

@wjones127
Copy link
Member

Yes I agree what we want is (1): "An interface for consuming data from a dataset-like object, without having to be a pyarrow.dataset.Dataset (or Scanner) instance."

I'm basically thinking we have table formats with Python libraries: Delta Lake, Iceberg, and Lance. And we have single-node query engines such as DuckDB, Polars, and Datafusion. It would be cool if we could pass any of the table formats into any of the query engines, all with one protocol. We have a prototype version of this that works well in some ways, but in order to be fully viable needs to be turned into a proper well-defined protocol.

@westonpace
Copy link
Member

@vibhatha can say more but I believe we have been playing around with Substrait and Iceberg for something similar.

I'm basically thinking we have table formats with Python libraries: Delta Lake, Iceberg, and Lance. And we have single-node query engines such as DuckDB, Polars, and Datafusion. It would be cool if we could pass any of the table formats into any of the query engines, all with one protocol.

  • Start with a query that has named tables (e.g. SELECT foo.x, bar.y FROM foo INNER JOIN bar ON foo.id = bar.id WHERE foo.z > 20)
  • Convert query to Substrait if not there already
  • In some library (e.g. pyiceberg or pysubstrait) Visit the Substrait query to create a new Substrait query
    • For each named table
      • Look up the table in the catalog. Figure out which files to query, how to devolve the filter / selection, what additional filters to add for row level deletes, etc.
      • Create a local_files read that has the correct files, the devolved filter, and the devolved selection
  • Pass the Substrait query on to your engine of choice (e.g. DuckDb, Polars, Datafusion).

Producers only need to know Substrait
Consumers only need to know Substrait
User has to register the middleware piece somewhere.

@wjones127
Copy link
Member

Thanks, this is all helpful feedback. I'm going to spend some time filling out that document more. I'll ping this thread when it deserves some more attention.

@alippai
Copy link
Contributor

alippai commented May 10, 2023

Nitpicking here but in python terms we are interested in Protocols and not ABC, right? For a generic lib a static typing interface sounds better than an abstract class (which is a runtime dependency).

This explains it much better than me: https://jellis18.github.io/post/2022-01-11-abc-vs-protocol/

@wjones127
Copy link
Member

I have updated the document and created a rough sketch. I've also notified some devs from other projects, such as PyIceberg and dask-deltatable, to get more feedback.

Basically, I think the API that we have now for Datasets is actually very good. So doing as Chang originally suggested and just making a typing.Protocol out of it seems like it would be sufficient. I think that's what we want, but I'm honestly not 100% sure the best way to expose / publish this, so I would welcome feedback on that.

There are some possible extensions of it that could be made in the future, but I don't think they should block us from defining a protocol now.

IMO, this is a good opportunity to define something that will work well enough for now. I don't think it will be something that will last the next 5-10 years. But what we learn from pushing this API to it's limits may inform us on the design of something that's more robust and includes input from a much wider part of the PyData ecosystem.

@vibhatha
Copy link
Collaborator

I think @westonpace has already mentioned the high level steps that we have to take.

The key is to find out a way to access the table defined in the namedTable and query the required metadata from the APIs provided by the consumers. We should also assume that the expected access is handled by each consumer itself. That is the only concerning part as far as I understand.

@vibhatha
Copy link
Collaborator

@westonpace would it make sense to add a to_substrait() function to the DataSet API?

@lidavidm
Copy link
Member

@wjones127 do you think it's time to start an ML discussion?

I wonder if we could publish a base API in the Nanoarrow Python code, based on PyCapsule, and also a higher-level version in PyArrow, based on the PyArrow objects.

@wjones127
Copy link
Member

@lidavidm Thanks for the nudge. ML thread here: https://lists.apache.org/thread/ko0j6pk86p5rt24w6s3m40h68r6lcqrr

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants