-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Python][Rust] Create extension point in python for Dataset/Scanner #33986
Comments
to be precise, this is already doable today as a workaround, what i'm proposing here is:
|
From a quick read, is this about formalizing an extension point per se or actually just an interface? Some way to say, "given a filter/column selection, please try to push these down and give me a C Data Interface stream" |
I've had similar challenges with supporting datasets in delta-rs. Another aspect you'll need to think about is supporting Filesystems. In Rust, that means calling into Python functions, which I fear can be sub-optimal because of the GIL. I don't think there's a practical way to directly access the underlying C++ implemented FS unless we made the ABI stable (which I don't see us doing in the foreseeable future). A route I'm exploring right now is using the ADBC as a stable ABI for pushing down scan queries to storage formats and systems. It probably make more sense for table formats like Delta Lake, which have database-like semantics, than file formats like Lance (which I assume is the projection with the use case you are discussing).
It's harder, but part of me would prefer a stable C ABI, because it would mean the extension could be used in any language, not just Python. |
You're right. Good question. I think just the interface actually, so this issue is incorrectly named. Ie Rust packages could implement the same interface that higher level tooling will call, without necessarily needing to inherit from the cython classes defining Dataset/Scanner today. |
Does object store rs work for this?
Yeah most def pure python
|
It sounds like what you basically want is a struct DataFrameProducer {
int read(struct DataFrameProducer* self, ??? filters, ??? selection, struct ArrowArrayStream* out);
}; with corresponding wrappers/carriers in Python, Rust, Go, etc. Then this can be fed into DuckDB, Ballista, Acero, etc. and can be produced by ADBC, Acero, DuckDB, etc. You can have nearly both "pure Python" and "C ABI", I think. A Python-level interface could be transformed by PyArrow into the C ABI and vice versa. Where possible, the Python-level interface should let you 'extract' the underlying C ABI, if it exists, but otherwise we can push the responsibility of the GIL and such into PyArrow (or something like that). (So basically, shove all the non-Python code into PyArrow.) The question is what the filter/selection format should be; ideally it would be language agnostic and implementation agnostic and so Dataset's expressions aren't great there. |
Substrait recently merged a message type for standalone expression lists. :) |
What if we created a read type in substrait for datasets? It would require encoding file system options in Substrait as well. However, this seems like a valuable feature. Then the C API / interface is just the typical substrait consumer API of "substrait plan in, record batch reader out" |
I have been recently thinking about this as well, triggered by our work to support the DataFrame Interchange protocol in pyarrow (#14804, https://data-apis.org/dataframe-protocol/latest/purpose_and_scope.html). As illustration, using latest pyarrow (using a pyarrow.Table, so all data is already in memory, but we could add the same to something backed by a not-yet materialized stream or data on disk): >>> table = pa.table({'a': [1, 2, 3], 'b': [4, 5, 6]})
# accessing this interchange object
>>> interchange_object = table.__dataframe__()
>>> interchange_object
<pyarrow.interchange.dataframe._PyArrowDataFrame at 0x7ffb64f45420>
# this doesn't yet need to materialize all the buffers, but you can inspect metadata
>>> interchange_object.num_columns()
2
>>> interchange_object.column_names()
['a', 'b']
# you can select a subset (i.e. simple projection)
>>> subset = interchange_object.select_columns_by_name(['a'])
>>> subset.num_columns()
1
# only when actually asking for the buffers of one chunk of a column, the data needs
# to be in memory (to pass a pointer to the buffers)
>>> subset.get_column_by_name("a").get_buffers()
{'data': (PyArrowBuffer({'bufsize': 24, 'ptr': 140717840175104, 'device': 'CPU'}),
(<DtypeKind.INT: 0>, 64, 'l', '=')),
'validity': None,
'offsets': None} To be clear, I don't propose we do something exactly like that (and personally, I think it's also a missed opportunity for the DataFrame Interchange protocol to not use Arrow for the memory layout specification, and not give access to data as arrow data or using the Arrow C Interface). So having some standard Python interface that would give you delayed and queryable (filter/simple projection) access to Arrow C Interface data sounds really interesting, or having this as an additional C ABI (for which we can still provide such a Python interface as well) like David sketched above. |
From an implementation perspective I suspect we can satisfy any of these proposed APIs. If we need to come up with a new API then my preference is Substrait, but if the consensus heads in some other direction I'm fine with that too. @jorisvandenbossche , your proposal seems fine, but I don't see anything in there for filesystems. I think this is for on-disk data moreso than purely in-memory data. Though I believe your approach could be adapted to include filesystems.
Yes, I would assume that object store rs would be able to satisfy this but I'm not familiar with the capabilities. For example, my idea of how this would work in Substrait would be:
The equivalent C interface would just be structifying those messages. |
What's the reason that the potential interface would need to be aware of filesystems? |
I think the goal is to create a C API capable of representing datasets? I think that would require filesystems. But I may be mistaken on either of these points. |
Is the goal here dataset specifically, or an API with filters and column selections (of which dataset is an implementation)? It sounds like what we have is a project that is the latter, but wants to take advantage of the API of the former to get access to things like DuckDB integration |
The interface is for scanning the dataset, which is after the filesystems have been passed. So it's a separate concern. Yet it is still relevant to "how do I extend dataset" because your scanning implementation needs to use some filesystem. And that means the user needs to configure and pass one in. The easiest for users is to take fsspec / PyArrow filesystems as Python interfaces, although performance may be impacted by the GIL. (I have started, but not finished, an implementation of
IMO the current DuckDB integration feels a little silly. It manipulated Python objects until it can get a RBR and the exports that through the C data interface. The same code is duplicated in the R package, except it manipulates R objects. And nothing is available in other languages. So part of me thinks it would be cleaner to replace that integration with this kind of C API, but that's for the DuckDB devs to decide :) So there's sort of two questions:
It's unclear to me rn whether we just want to create a C API you can use instead of Dataset to solve this, or make a C API on Dataset to solve these. The former is less complicated for sure, but not sure we want to sidestep the Dataset API like that. |
I don't think it's either-or, I think we're tangling two concerns up here.
Dataset is sort of an API standard for this, or at least you can press it into service. But Dataset is also useful in its own right and a meaningful abstraction, so people want to extend it. If we define a new API, of course Dataset should implement it!
Dataset is already extensible. The real problem is the Python integration, and wheels/packaging questions on top of that. The toplevel proposal sounds like sidestepping that entirely by introducing a separate abstraction layer at the Python level (hence, exposing ABCs in Python).
I don't think they had a choice, because there's not really a formal API for what they really want :) |
Yup, that's exactly the proposal here.
The main blocker in the current version of DuckDB is using the static methods in The issue is that you have to be really careful to override all of the methods in Dataset, or else it'll try to unwrap the non-existent CDataset and crash python. This is the main motivation for me proposing a pure python abstraction on top. Would y'all be open to accepting a PR for this? or is there a more formal process to propose some details? |
Others should chime in, but I think the best way to move forward is if you could open a draft PR with the proposed ADCs. Then we could discuss on the PR. (You could also create a design doc in a Google doc, but I think seeing the actual classes might be more useful in this case.) |
ok sounds great. I should have time tmr to make the PR and that should give others time to chime in too. Thanks! |
Yes, I will confess that I got a bit lost in all the possibilities so a concrete list of APIs/ABCs would be very helpful :) |
I'm suddenly rather interested in seeing this through. Also have had a change of heart and think either what Chang is proposing (an ABC) or Joris (a protocol) is the way to go. ABC seems straightforward, but I'm eager to chat with Joris if he has ideas on why a protocol like the DataFrame protocol makes more sense. (Or maybe both could be combined? The protocol returns something that subclasses the ABC?) Starting thinking about this in a Google doc: Making Arrow dataset into a protocol Also wrote up another doc to share the perspective of delta-rs/deltalake on PyArrow Datasets: PyArrow Datasets and Python deltalake |
Thanks for reviving this and the write-up, Will! What is still not clear to me (also after reading the doc) is the exact scope or goal of a potential protocol (where is exactly the extension point?) Some possible ways I could interpret it:
Using the example of deltalake to duckdb, the three options would like like:
What is other's people understanding of what we are discussing / are looking for? I suppose the two questions from #33986 (comment) overlap with this, and essentially ask the same. |
My question is still as follows:
I think we're still confusing the two. |
For Joris's comment: I think we are proposing (3), but I think what people are after is (1). I'm also not sure why filesystems are suddenly in play. (2) would be interesting, but it sounds like you're effectively https://xkcd.com/927/ -ing Delta Lake and Iceberg. |
Yes, and to be clear: I think what I would find most interesting is (1). |
Yes I agree what we want is (1): "An interface for consuming data from a dataset-like object, without having to be a pyarrow.dataset.Dataset (or Scanner) instance." I'm basically thinking we have table formats with Python libraries: Delta Lake, Iceberg, and Lance. And we have single-node query engines such as DuckDB, Polars, and Datafusion. It would be cool if we could pass any of the table formats into any of the query engines, all with one protocol. We have a prototype version of this that works well in some ways, but in order to be fully viable needs to be turned into a proper well-defined protocol. |
@vibhatha can say more but I believe we have been playing around with Substrait and Iceberg for something similar.
Producers only need to know Substrait |
Thanks, this is all helpful feedback. I'm going to spend some time filling out that document more. I'll ping this thread when it deserves some more attention. |
Nitpicking here but in python terms we are interested in This explains it much better than me: https://jellis18.github.io/post/2022-01-11-abc-vs-protocol/ |
I have updated the document and created a rough sketch. I've also notified some devs from other projects, such as PyIceberg and dask-deltatable, to get more feedback. Basically, I think the API that we have now for Datasets is actually very good. So doing as Chang originally suggested and just making a There are some possible extensions of it that could be made in the future, but I don't think they should block us from defining a protocol now. IMO, this is a good opportunity to define something that will work well enough for now. I don't think it will be something that will last the next 5-10 years. But what we learn from pushing this API to it's limits may inform us on the design of something that's more robust and includes input from a much wider part of the PyData ecosystem. |
I think @westonpace has already mentioned the high level steps that we have to take. The key is to find out a way to access the table defined in the |
@westonpace would it make sense to add a |
@wjones127 do you think it's time to start an ML discussion? I wonder if we could publish a base API in the Nanoarrow Python code, based on PyCapsule, and also a higher-level version in PyArrow, based on the PyArrow objects. |
@lidavidm Thanks for the nudge. ML thread here: https://lists.apache.org/thread/ko0j6pk86p5rt24w6s3m40h68r6lcqrr |
Describe the enhancement requested
As the Arrow ecosystem grows ever richer, desire paths emerge :)
Integrating Arrow based projects written in Rust works great across the C data interface. But it doesn't allow lazy execution or pushdowns in the same way that pyarrow Dataset/Scanner's do.
My proposal here is to expose Dataset/Scanner python abc's with s.t. rust libraries can extend via pyo3+python so higher level tooling (like duckdb for example, can query these without having to transfer the whole Table into memory first).
In keeping with the same principles as the C data interface, I think it would be sufficient for this python interface to be very minimal: Dataset with schema and scanner methods, Scanner with projected_schema and to_reader methods. The to_reader should return a RecordBatchReader which would then link pyo3 datasets into the Arrow C data interface.
Thanks for your consideration!
Component(s)
Python
The text was updated successfully, but these errors were encountered: