-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-33986: [Python] Add a minimal protocol for datasets #35568
base: main
Are you sure you want to change the base?
Conversation
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work @wjones127. I've left some comments
|
||
class Scannable(Protocol): | ||
@abstractmethod | ||
def scanner(self, columns: Optional[List[str]] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def scanner(self, columns: Optional[List[str]] = None, | |
def scanner(self, columns: Optional[Tuple[str, ...]] = None, |
Nit, I prefer to use Tuples over Lists because:
- They are immutable
- And therefore also hash-able:
>>> hash((1,2,3))
529344067295497451
>>> hash([1,2,3])
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: unhashable type: 'list'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't disagree in principle, but also trying to keep this somewhat compatible. Though maybe we can loosen to Sequence[str]
?
Actually, it currently supports List[str] | Dict[str, Expression]
. Do we want to support the dictionary generally? or keep the protocol more narrow than that?
arrow/python/pyarrow/_dataset.pyx
Lines 3182 to 3201 in af38263
if columns is not None: | |
if isinstance(columns, dict): | |
for expr in columns.values(): | |
if not isinstance(expr, Expression): | |
raise TypeError( | |
"Expected an Expression for a 'column' dictionary " | |
"value, got {} instead".format(type(expr)) | |
) | |
c_exprs.push_back((<Expression> expr).unwrap()) | |
check_status( | |
builder.Project(c_exprs, [tobytes(c) for c in columns.keys()]) | |
) | |
elif isinstance(columns, list): | |
check_status(builder.ProjectColumns([tobytes(c) for c in columns])) | |
else: | |
raise ValueError( | |
"Expected a list or a dict for 'columns', " | |
"got {} instead.".format(type(columns)) | |
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The difference between Sequence[str]
and Dict[str, Expression]
is significant. The former only allows you to pick which columns to load. The latter introduces the concept of project expressions which is big.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related to #35568 (comment), we might want to only allow Sequence[str]
here so we only support column selection and reordering. That way we don't need to require consumers to perform projections. I don't think any existing consumer relies on this.
python/pyarrow/dataset/protocol.py
Outdated
class Scannable(Protocol): | ||
@abstractmethod | ||
def scanner(self, columns: Optional[List[str]] = None, | ||
filter: Optional[Expression] = None, **kwargs) -> Scanner: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Iceberg we also have a case-sensitive option for the column names. Might be something to consider.
python/pyarrow/dataset/protocol.py
Outdated
@abstractmethod | ||
def scanner(self, columns: Optional[List[str]] = None, | ||
filter: Optional[Expression] = None, **kwargs) -> Scanner: | ||
... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Many table formats support time-traveling. In Iceberg we call this a snapshot that is the state of a table at some point in time, including the set of all data files. I think it would be good to add this as well (or through the kwargs
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think that's something that would go into kwargs.
python/pyarrow/dataset/protocol.py
Outdated
... | ||
|
||
|
||
class Fragment(Scannable): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would not make the Fragment scannable, and call the schema physical_schema
In Iceberg it would be very unlikely that you would apply expressions on a fragment. Except for applying residual filtering, see Google doc, but that should not be exposed in the API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The purpose of making the fragment scannable is allowing them to be used in a distributed scan. So the consumer would create the dataset, get the set of fragments that match the query. Then it would serialize each fragment, distribute them amongst it's worker threads / processes, and then scan each fragment when it can.
This is currently how DataFusion (across threads) and Dask (across processes, I think) scan. See the "Consumers" section of https://docs.google.com/document/d/1r56nt5Un2E7yPrZO9YPknBN4EDtptpx-tqOZReHvq1U/edit#
Do you think that would not be viable for Iceberg?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps "fragment" isn't the right word here. If this is "something that can be scanned" and it can maybe be scanned in parts then we could go back to some old wording we used to have which is "scan tasks".
At the least, we should probably be clear that a fragment is not necessarily a file.
I do think its useful for parallelization. However, I agree with @Fokko, the fragments should not be scannable in the same way the dataset is. It doesn't make sense to provide them with separate filters. They should essentially have to_reader
and nothing else.
This does bring up concerns regarding threading and resource contention. E.g. if datafusion was using pyarrow as a data source and spreading the work across many fragments then it probably wouldn't want those individual fragment scanners using their own threads.
Perhaps we can add use_threads
? It's a rather coarse control but may be sufficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Coming back to this.
I would not make the Fragment scannable,
How would you pass a projection then in the distributed case?
I was imagining this sequence:
dataset = ...
projection = ...
filter = ...
fragments = dataset.get_fragments(filter)
for fragment in fragments:
reader = fragment.scanner(filter, projection).to_reader()
# use reader...
call the schema [physical_schema]
You're right that fragments currently don't expose the schema. I'll remove that from fragments then. The dataset schema can always be serialized and distributed separately.
Except for applying residual filtering, see Google doc, but that should not be exposed in the API.
What is "residual filtering"? Why should it not be exposed? My expectation is that if the consumer passed a filter x < 2
then the scanner should never pass any rows where x >= 2
. Filters should always be fully applied.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's consider the following expression: ts > 2022-05-04T10:00:00
, and assume that the table is partitioned by days(ts)
:
Partitions:
Days: | 2022-05-03 | 2022-05-04 | 2022-05-05 | ... <-- ts_day
|<----- predicate matches -----
| CANNOT MATCH | MIGHT MATCH | MUST MATCH |
|<--------------- INCLUSIVE --------
|<---- STRICT -------
Then there are three situations:
Cannot match: no need to read
Might match: read and check the original predicate (residual)
Must match: read and assume it matches the original predicate
This way you can avoid a filter that's not needed because we know of the partition information that is always true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay. I think the expectation is that residual filtering needs to be done in this API. The way this is handled in PyArrow's implementation is that each fragment has a "guarantee". So the fragments look like:
<fragment 1 guarantee="date(ts) = "2022-05-03">
<fragment 2 guarantee="date(ts) = "2022-05-04">
<fragment 3 guarantee="date(ts) = "2022-05-05">
When the consumer calls fragments = dataset.get_fragments(filter)
, fragment 1 is eliminated, because it cannot match. All the fragments that might match are returned.
The consumer then passes the same filter to the scan of each fragment. When it is passed to fragment 2, the filter is executed because there are some rows it might still filter out. When it is passed to fragment 3, the filter is simplified to true
, since we know all rows already satisfy it based on the guarantee date(ts) = "2022-05-05
.
In the end, the rows produced by the dataset scanner must not have any rows that would be removed by the filter.
Does that API make sense now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it does, thanks for the elaborate answer. Much appreciated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding is that this is still your goal:
I'm basically thinking we have table formats with Python libraries: Delta Lake, Iceberg, and Lance. And we have single-node query engines such as DuckDB, Polars, and Datafusion. It would be cool if we could pass any of the table formats into any of the query engines, all with one protocol.
I'm not sure the API you are defining helps you with that goal. I think what I is missing is the API used to create the dataset. What you've proposed here isn't flexible enough. For example, if I'm trying to convert a "named table request" (e.g. give me all rows from table "widgets" with filter "xyz" at time point Y) into a "scan request" (e.g. what pyarrow datasets can read) then I want something like...
dataset_builder = DatasetBuilder()
dataset_builder
.files(["A", "B"])
# don't need to apply filter(xyz) here because I know it doesn't apply to these files
.columns(["foo", "bar", "renamed", "missing"])
# Need to delete some indices from these files
.delete_indices([1, 2, 50])
dataset_builder
.files(["C"])
.filter("xyz")
.columns(["foo", "bar"])
# Need to do some evolution to handle a missing column and a renamed column
.insert_virtual_columns({"missing": "some_default_value", "renamed": field_ref("old_name")})
In other words, for a table format to use a query engine, it's not enough to pass a single query (e.g. filter / columns / whatever). We need to pass a query per file.
python/pyarrow/dataset/protocol.py
Outdated
class Scannable(Protocol): | ||
@abstractmethod | ||
def scanner(self, columns: Optional[List[str]] = None, | ||
filter: Optional[Expression] = None, **kwargs) -> Scanner: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could potentially consider using Substrait's extended expression instead of pyarrow expressions. This could remove the pyarrow dependency entirely and frame this in terms of Substrait and the C stream interface.
For those that want to use pyarrow expressions then #34834 should make that pretty straightforward.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm an obvious Arrow C interface junkie, but I quite like the idea of this. It would also mean that duckdb, datafusion, polars etc. would be able to implement the protocol once without having to reinvent a way to represent a syntax tree in each language binding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm +1 for adding Substrait as the preferred expression mechanism and deprecating PyArrow expressions. But for now there's already support for the PyArrow expressions, and once that PR is merged, there will be a nice migration path in the interim.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it absolutely critical right now though? (Do DuckDB et al support PyArrow expressions?)
It would be better to omit a feature that we think we will replace than try to standardize it in the interim.
python/pyarrow/dataset/protocol.py
Outdated
class Scannable(Protocol): | ||
@abstractmethod | ||
def scanner(self, columns: Optional[List[str]] = None, | ||
filter: Optional[Expression] = None, **kwargs) -> Scanner: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Must this filter be satisfied? Or is it best effort?
If it must be satisfied then how will you handle UDFs and other strange functions in the filter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's an interesting caveat. I didn't expect that engines would try to pass down UDFs or more complex functions. I suppose we might want to define which functions are allowed in the filter? (Obviously some datasets might support more than that)
Maybe a list like:
- Equality / inequalities
- is_null / not_null
- cast
- isin
- not
- and / or
- between
That covers everything that is pushed down by
- DuckDB: https://github.com/duckdb/duckdb/blob/12638bd15623508cdc7fb3f670b291b51f92ee43/tools/pythonpkg/src/arrow/arrow_array_stream.cpp#L236
- DataFusion: https://github.com/apache/arrow-datafusion-python/blob/main/src/pyarrow_filter_expression.rs
- Polars: https://github.com/pola-rs/polars/blob/main/polars/polars-lazy/polars-plan/src/logical_plan/pyarrow.rs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems workable. There is some prior art from the row-based world with the term sargable which seems to be pretty close to your list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, just to clarify, does this mean the following interpretation is correct?
Yes, the filter must be satisfied and the dataset should return an error if it cannot fully satisfy the filter.
|
||
class Scannable(Protocol): | ||
@abstractmethod | ||
def scanner(self, columns: Optional[List[str]] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The difference between Sequence[str]
and Dict[str, Expression]
is significant. The former only allows you to pick which columns to load. The latter introduces the concept of project expressions which is big.
python/pyarrow/dataset/protocol.py
Outdated
@abstractmethod | ||
def to_table(self) -> Table: | ||
... | ||
|
||
@abstractmethod | ||
def to_batches(self) -> Iterator[RecordBatch]: | ||
... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you have to_reader
then you could create a default implementation for these two methods. Is there any particular reason to include them? In other words, do you expect an implementation to be able to somehow support to_table
more efficiently than to_reader().read_all()
?
@westonpace you are correct that this doesn't define how such dataset classes are built. That's left to the consumer, who will write their own classes that conform to this API. However, I do like your idea for a dataset builder. I think it might be worth asking the PyIceberg developers whether something like that would work well for them. (I think Delta Lake and Lance will likely go the route of implementing their own classes in Rust.) I've noted this idea in https://docs.google.com/document/d/1-uVkSZeaBtOALVbqMOPeyV3s2UND7Wl-IGEZ-P-gMXQ/edit#heading=h.31rf5m1tlipg |
That would seem an essential API if this protocol was meant to be used by "table formats" to prepare "queries simple enough for query engines to understand". So perhaps I am misunderstanding. Is this protocol meant to be used by "query engines" to "query a table format library as if it were a dataset"? |
for HIVE partitions that don't match that date. | ||
* Filters passed down should be fully executed. While other systems have scanners | ||
that are "best-effort", only executing the parts of the filter that it can, PyArrow | ||
datasets should always remove all rows that don't match the filter. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice if we can control this. In Iceberg we know if there are any residuals in the file.
metadata about fragments that aren't relevant to queries. For example, if you | ||
have a dataset that uses Hive-style paritioning for a column ``date`` and the | ||
user passes a filter for ``date=2023-01-01``, then you can skip listing directory | ||
for HIVE partitions that don't match that date. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this relevant to the protocol? For context: In Iceberg you can just pass in an expression, and it automatically checks if the partitioning can be used to prune any data. Iceberg is lazy, so multiple partitioning strategies can co-exist because it changed over time (you never know how the partitions evolve over time, so this should be flexible :).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Hive-style partitioning was just an example. I think for Iceberg the equivalent statement would be something like: Iceberg dataset can skip loading manifest files that are irrelevant to the query, instead of loading every manifest file referenced in the manifest lists. Does that make sense? The overall idea is that the API allows you to treat the metadata itself as "big data" that you wouldn't want to scan in it's entirety.
My assumption was that it are the producers that implement the classes that conform to this API? How are the consumer and producer supposed to interact with this protocol? Taking duckdb as example, the user can currently manually create a pyarrow object, and then query automatically from this using duckdb: import pyarrow.dataset as ds
pyarrow_dataset = ds.dataset(...)
duckdb.sql("SELECT * FROM pyarrow_dataset WHERE ..") (and under the hood, duckdb converts Is the idea that something similar would then work for any object supporting this protocol? (in the assumption that duckdb relaxes it check for a pyarrow object to any object conforming to the protocol) For example with delta-lake: from deltalake import DeltaTable
delta_table = DeltaTable("..")
duckdb.sql("SELECT * FROM delta_table WHERE ..") But if this is the intended usage, I don't understand what the "builder API" (#35568 (review)) would be meant for?
@westonpace Why is that not sufficient? I think it is up to the table format to translate the single query into a query per file (and execute this)? |
c6d50f4
to
280bfdb
Compare
I've updated the docs to be more descriptive of the intended uses. There are also some tests to make sure the protocol aligns with what is implemented by |
So is the assumption here that the producer and the consumer (in your diagram) are the same library? E.g. both are pyarrow (pyarrow has code for producing datasets and for scanning datasets)? Or is the goal to be able to produce datasets with one library and consume them with a different library? |
@jorisvandenbossche in other words, the table format would create a fragment or dataset for each file (or set of files with a common schema) and then create a unique query for each of those. That would work. I was thinking something much more complex. |
Different libraries. Producers are libraries like You could say the status quo is that the consumer can be any library, but the producer is assumed to be pyarrow. This protocol helps open up the producer side to be other libraries. |
pushdown. The subset of the API is contained in ``pyarrow.dataset.protocol``. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pushdown. The subset of the API is contained in ``pyarrow.dataset.protocol``. | |
pushdown. The subset of the API is contained in ``pyarrow.dataset.protocol``. | |
Producers are scanner implementations. For example, table formats like Delta | |
Lake and Iceberg might provide their own dataset implementations. Consumers | |
are typically query engines, such as DuckDB, DataFusion, Polars, and Dask. | |
Providing a common API avoids a situation where supporting ``N`` dataset | |
formats in ``M`` query engines requires ``N * M`` integrations. | |
python/pyarrow/dataset/protocol.py
Outdated
- isin | ||
- between | ||
- negation (not) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about adding support for nan-values? is_nan
/ not_nan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I needed to try this, and it works really well! Thanks for the great work here @wjones127
My messy code can be found here: apache/iceberg#7846 Only look at it if you're interested, is is very early, and a lot is still missing. Just to check how it fits the structure.
One concern I have is where to implement the schema projection. Currently, in PyIceberg we produce a pa.Table
for each of the Parquet files. In the following example, we have an Iceberg table with two columns. There was only one column initially, the first Parquet file was written, and then the name column got added, and another file was written. Iceberg is lazy, so when reading the first file, it will inject a null column:
This is valid:
Schema:
- idx: int32, required=True, field-id=1
- name: string, required=False, field-id=2
00001-data.parquet
idx: int32, field-id=1
00002-data.parquet
idx: int32, field-id=1
name: string, field-id=2
Currently in PyIceberg we materialize each of the fragments to a table, for each of the tables, we convert it to the requested schema, and then we concatinate all the tables. There are a couple of situation, such as field widening:
This is valid:
Schema:
- idx: int32, required=True, field-id=1
00001-data.parquet
idx: int32, field-id=1
00002-data.parquet
idx: int64, field-id=1
Or renaming, where we'll rename the column of the table, before concatenation them:
This is valid:
Schema:
- idx: int64, required=True, field-id=1
00001-data.parquet
idx: int64, field-id=1
00002-data.parquet
index: int64, field-id=1
With Iceberg spec v3, also initial-defaults are being added so we can add new required columns:
This is valid:
Schema:
- idx: int64, required=True, field-id=1
- name: string, required=True, field-id=2, initial-default=unknown
00001-data.parquet
idx: int64, field-id=1
00002-data.parquet
index: int64, field-id=1
name: string, field-id=2
And this way when reading the data, it will inject unknown
if the column is not in the physical fragement. Exciting stuff!
As said, currently we iterate both the physical Arrow schema, and the requested Iceberg schema. This is where missing columns are being added with null values, and types are being casted. My question is, would it be appropriate do to this on the RecordBatch
? This way the Fragment would already materialize the records, and then it is possible to convert them into the requested schema. Going to a table, and back to batches seems wasteful to me.
Another thing that's not possible today in the current structure is positional deletes, but this is solvable by adding __row_index
as suggested in #35301.
... | ||
|
||
@abstractmethod | ||
def head(self, num_rows: int) -> Table: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For head and count_rows, should we say that not all producers will support them and that they may raise NotImplementedError?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's reasonable. They aren't that important to the protocol's goals; I almost considered removing them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would leave them in. In Iceberg we can do a count without touching the actual data files (if you don't use a filter).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving them in.
python/pyarrow/dataset/protocol.py
Outdated
columns : List[str], optional | ||
Names of columns to include in the scan. If None, all columns are | ||
included. | ||
filter : Expression, optional |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should omit filter to begin with, given that we'd be trying to codify semantics for a mini expression language
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think given this piece is already in use and important, I'd like to include it. It's definitely funky. There is desire from the community to move away from PyArrow expressions and toward Substrait or something else, so I expect we'll deprecate this eventually.
.. specific language governing permissions and limitations | ||
.. under the License. | ||
|
||
Extending PyArrow Datasets |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we say that this is currently experimental, and list the things that we know are on the roadmap?
|
||
PyArrow provides a core protocol for datasets, so third-party libraries can both | ||
produce and consume classes that conform to useful subset of the PyArrow dataset | ||
API. This subset provides enough functionality to provide predicate and filter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"predicate" and "filter" mean the same thing. Do you mean "projection and filter"?
|
||
dataset = producer_library.get_dataset(...) | ||
df = consumer_library.read_dataset(dataset) | ||
df.filter("x > 0").select("y") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused about select()
here. The Dataset API does not currently expose a select()
method, but it allows column projection by allowing users to pass column names or expressions to other methods or to the scanner. Do you propose adding a select()
method to the Dataset API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is meant to be a sketch of what usage looks like from the users perspective. df
is a DataFrame object produced by an imaginary consumer library, and filter
and select
are methods on this imaginary library.
The idea is the user gets a dataset from the producer, and passes it to the consumer. Then the user gets to write their queries with their favorite query engine, and under the hood the datasets API is used to do projection and predicate pushdown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps I should explain that it's imaginary? Or I could show a concrete usage with DuckDB and then Polars?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've switched this to a concrete example with Delta Lake and DuckDB.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies that it took me so long but I think the purpose of this protocol has finally clicked for me. I've got a few more thoughts but, in general, I like the idea! I appreciate you putting up with my confusion :)
|
||
|
||
@runtime_checkable | ||
class Fragment(Scannable, Protocol): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a some things I would like to have here, as a user, but I understand we are just getting started and trying to be minimal. So take these as suggestions:
__repr__
<-- converting a fragment to string is very useful for debugging
estimated_cost
<-- I get why this one isn't there but a fragment might be 5 rows or it might be 5 million rows, and that could be valuable for figuring out how to distribute a dataset workload. Still, there is no universal way of estimating cost, so perhaps we can leave this for an extension.
batch_size : int, optional | ||
The number of rows to include in each batch. If None, the default | ||
value is used. The default value is implementation specific. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe expand that the default value is not only implementation specific but might not even be consistent between batches?
Also, if this is a min/max or is it a maximum-only? In other words, if batch_size
is is 1_000_000
and the source is a parquet file with 10 row groups of 100_000
rows does the scanner need to accumulate the rows or is it acceptable to return smaller batches?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good detail to think about. I don't think we should require it to be exact; for example, if the reader can't read in exactly that batch size I don't think it should error. But I do think readers should make their best effort to be close the batch size as possible, even if that means splitting row groups into chunks, for example.
Though you might have more informed opinions here; what do you think is reasonable to expect here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This parameter has lost importance in arrow-c++ datasets. It used to be an important tuning parameter that affected the size of the batches used internally by the C++ implementation. However, it didn't make sense for the user to pick the correct value (and there are multiple batch sizes in the C++ and the right value might even depend on the schema and be quite difficult to calculate).
I think it still has value, especially "max batch size". The user needs someway to say "don't give me 20GB of data all at once".
So I think it needs to be a hard upper limit but it can be a soft lower limit. We could either call it max_batch_size
(and ignore it as an upper limit entirely) or preferred_batch_size
(and explain that only the upper limit is strictly enforced). I don't think using this as an upper limit is overly burdensome as slicing tables/batches should be pretty easy and lightweight. The reverse (concatenating batches) is more complicated and expensive.
python/pyarrow/dataset/protocol.py
Outdated
Whether to use multiple threads to read the rows. It is expected | ||
that consumers reading a whole dataset in one scanner will keep this | ||
as True, while consumers reading a single fragment per worker will | ||
typically set this to False. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe instead of It is expected
we could use softer wording like "Often" or "Typically"? I just think there are corner cases for each. For example, you might not want to use threads because you expect to run multiple queries concurrently or you just want to cut down on the resource usage.
included. | ||
filter : Expression, optional | ||
Filter expression to apply to the scan. If None, no filter is applied. | ||
batch_size : int, optional |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should batch_size
be part of to_reader
instead of scanner
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm open to that. But I don't want to add that to the protocol without also implementing it in PyArrow Datasets. So if we think this is important, I'll remove this for now.
Producers are responsible for outputting a class that conforms to the protocol. | ||
|
||
Consumers are responsible for calling methods on the protocol to get the data | ||
out of the dataset. The protocol supports getting data as a single stream or | ||
as a series of tasks which may be distributed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is "ok" but the definition of producer and consumer here are reversed from what they are in Substrait which confused me for a while. Maybe we can go with "Data producer" and "Data consumer"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ha it finally clicked why I myself find these terms confusing 🤣. It feels backwards! I'll think of new names.
e995cdd
to
be648e2
Compare
Rationale for this change
I think we have an opportunity to specify which methods on dataset can be considered a stable protocol. This will allow other libraries to produce their own classes exposing this interface. And it will make clear to consumers (such as DuckDB, DataFusion, and Polars) which methods are okay to assume exist.
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?