Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-33986: [Python] Add a minimal protocol for datasets #35568

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

wjones127
Copy link
Member

@wjones127 wjones127 commented May 12, 2023

Rationale for this change

I think we have an opportunity to specify which methods on dataset can be considered a stable protocol. This will allow other libraries to produce their own classes exposing this interface. And it will make clear to consumers (such as DuckDB, DataFusion, and Polars) which methods are okay to assume exist.

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions
Copy link

@github-actions
Copy link

⚠️ GitHub issue #33986 has been automatically assigned in GitHub to PR creator.

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting committer review Awaiting committer review labels May 12, 2023
Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work @wjones127. I've left some comments


class Scannable(Protocol):
@abstractmethod
def scanner(self, columns: Optional[List[str]] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def scanner(self, columns: Optional[List[str]] = None,
def scanner(self, columns: Optional[Tuple[str, ...]] = None,

Nit, I prefer to use Tuples over Lists because:

  • They are immutable
  • And therefore also hash-able:
>>> hash((1,2,3))
529344067295497451
>>> hash([1,2,3])
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: unhashable type: 'list'

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't disagree in principle, but also trying to keep this somewhat compatible. Though maybe we can loosen to Sequence[str]?

Actually, it currently supports List[str] | Dict[str, Expression]. Do we want to support the dictionary generally? or keep the protocol more narrow than that?

if columns is not None:
if isinstance(columns, dict):
for expr in columns.values():
if not isinstance(expr, Expression):
raise TypeError(
"Expected an Expression for a 'column' dictionary "
"value, got {} instead".format(type(expr))
)
c_exprs.push_back((<Expression> expr).unwrap())
check_status(
builder.Project(c_exprs, [tobytes(c) for c in columns.keys()])
)
elif isinstance(columns, list):
check_status(builder.ProjectColumns([tobytes(c) for c in columns]))
else:
raise ValueError(
"Expected a list or a dict for 'columns', "
"got {} instead.".format(type(columns))
)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference between Sequence[str] and Dict[str, Expression] is significant. The former only allows you to pick which columns to load. The latter introduces the concept of project expressions which is big.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to #35568 (comment), we might want to only allow Sequence[str] here so we only support column selection and reordering. That way we don't need to require consumers to perform projections. I don't think any existing consumer relies on this.

class Scannable(Protocol):
@abstractmethod
def scanner(self, columns: Optional[List[str]] = None,
filter: Optional[Expression] = None, **kwargs) -> Scanner:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Iceberg we also have a case-sensitive option for the column names. Might be something to consider.

@abstractmethod
def scanner(self, columns: Optional[List[str]] = None,
filter: Optional[Expression] = None, **kwargs) -> Scanner:
...
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many table formats support time-traveling. In Iceberg we call this a snapshot that is the state of a table at some point in time, including the set of all data files. I think it would be good to add this as well (or through the kwargs).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think that's something that would go into kwargs.

...


class Fragment(Scannable):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not make the Fragment scannable, and call the schema physical_schema

In Iceberg it would be very unlikely that you would apply expressions on a fragment. Except for applying residual filtering, see Google doc, but that should not be exposed in the API.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of making the fragment scannable is allowing them to be used in a distributed scan. So the consumer would create the dataset, get the set of fragments that match the query. Then it would serialize each fragment, distribute them amongst it's worker threads / processes, and then scan each fragment when it can.

This is currently how DataFusion (across threads) and Dask (across processes, I think) scan. See the "Consumers" section of https://docs.google.com/document/d/1r56nt5Un2E7yPrZO9YPknBN4EDtptpx-tqOZReHvq1U/edit#

Do you think that would not be viable for Iceberg?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps "fragment" isn't the right word here. If this is "something that can be scanned" and it can maybe be scanned in parts then we could go back to some old wording we used to have which is "scan tasks".

At the least, we should probably be clear that a fragment is not necessarily a file.

I do think its useful for parallelization. However, I agree with @Fokko, the fragments should not be scannable in the same way the dataset is. It doesn't make sense to provide them with separate filters. They should essentially have to_reader and nothing else.

This does bring up concerns regarding threading and resource contention. E.g. if datafusion was using pyarrow as a data source and spreading the work across many fragments then it probably wouldn't want those individual fragment scanners using their own threads.

Perhaps we can add use_threads? It's a rather coarse control but may be sufficient.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coming back to this.

I would not make the Fragment scannable,

How would you pass a projection then in the distributed case?

I was imagining this sequence:

dataset = ...
projection = ...
filter = ...

fragments = dataset.get_fragments(filter)
for fragment in fragments:
    reader = fragment.scanner(filter, projection).to_reader()
    # use reader...

call the schema [physical_schema]

You're right that fragments currently don't expose the schema. I'll remove that from fragments then. The dataset schema can always be serialized and distributed separately.

Except for applying residual filtering, see Google doc, but that should not be exposed in the API.

What is "residual filtering"? Why should it not be exposed? My expectation is that if the consumer passed a filter x < 2 then the scanner should never pass any rows where x >= 2. Filters should always be fully applied.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's consider the following expression: ts > 2022-05-04T10:00:00, and assume that the table is partitioned by days(ts):

Partitions:
Days: |  2022-05-03  |  2022-05-04  | 2022-05-05  | ...  <-- ts_day
                          |<----- predicate matches -----

      | CANNOT MATCH | MIGHT MATCH  | MUST MATCH  |
                     |<--------------- INCLUSIVE --------
                                    |<---- STRICT -------

Then there are three situations:

Cannot match: no need to read
Might match: read and check the original predicate (residual)
Must match: read and assume it matches the original predicate

This way you can avoid a filter that's not needed because we know of the partition information that is always true.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. I think the expectation is that residual filtering needs to be done in this API. The way this is handled in PyArrow's implementation is that each fragment has a "guarantee". So the fragments look like:

<fragment 1 guarantee="date(ts) = "2022-05-03">
<fragment 2 guarantee="date(ts) = "2022-05-04">
<fragment 3 guarantee="date(ts) = "2022-05-05">

When the consumer calls fragments = dataset.get_fragments(filter), fragment 1 is eliminated, because it cannot match. All the fragments that might match are returned.

The consumer then passes the same filter to the scan of each fragment. When it is passed to fragment 2, the filter is executed because there are some rows it might still filter out. When it is passed to fragment 3, the filter is simplified to true, since we know all rows already satisfy it based on the guarantee date(ts) = "2022-05-05.

In the end, the rows produced by the dataset scanner must not have any rows that would be removed by the filter.

Does that API make sense now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it does, thanks for the elaborate answer. Much appreciated.

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that this is still your goal:

I'm basically thinking we have table formats with Python libraries: Delta Lake, Iceberg, and Lance. And we have single-node query engines such as DuckDB, Polars, and Datafusion. It would be cool if we could pass any of the table formats into any of the query engines, all with one protocol.

I'm not sure the API you are defining helps you with that goal. I think what I is missing is the API used to create the dataset. What you've proposed here isn't flexible enough. For example, if I'm trying to convert a "named table request" (e.g. give me all rows from table "widgets" with filter "xyz" at time point Y) into a "scan request" (e.g. what pyarrow datasets can read) then I want something like...

dataset_builder = DatasetBuilder()
dataset_builder
  .files(["A", "B"])
  # don't need to apply filter(xyz) here because I know it doesn't apply to these files
  .columns(["foo", "bar", "renamed", "missing"])
  # Need to delete some indices from these files
  .delete_indices([1, 2, 50])
dataset_builder
  .files(["C"])
  .filter("xyz")
  .columns(["foo", "bar"])
  # Need to do some evolution to handle a missing column and a renamed column
  .insert_virtual_columns({"missing": "some_default_value", "renamed": field_ref("old_name")})

In other words, for a table format to use a query engine, it's not enough to pass a single query (e.g. filter / columns / whatever). We need to pass a query per file.

python/pyarrow/dataset/protocol.py Outdated Show resolved Hide resolved
class Scannable(Protocol):
@abstractmethod
def scanner(self, columns: Optional[List[str]] = None,
filter: Optional[Expression] = None, **kwargs) -> Scanner:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could potentially consider using Substrait's extended expression instead of pyarrow expressions. This could remove the pyarrow dependency entirely and frame this in terms of Substrait and the C stream interface.

For those that want to use pyarrow expressions then #34834 should make that pretty straightforward.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm an obvious Arrow C interface junkie, but I quite like the idea of this. It would also mean that duckdb, datafusion, polars etc. would be able to implement the protocol once without having to reinvent a way to represent a syntax tree in each language binding.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm +1 for adding Substrait as the preferred expression mechanism and deprecating PyArrow expressions. But for now there's already support for the PyArrow expressions, and once that PR is merged, there will be a nice migration path in the interim.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it absolutely critical right now though? (Do DuckDB et al support PyArrow expressions?)

It would be better to omit a feature that we think we will replace than try to standardize it in the interim.

class Scannable(Protocol):
@abstractmethod
def scanner(self, columns: Optional[List[str]] = None,
filter: Optional[Expression] = None, **kwargs) -> Scanner:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Must this filter be satisfied? Or is it best effort?

If it must be satisfied then how will you handle UDFs and other strange functions in the filter?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's an interesting caveat. I didn't expect that engines would try to pass down UDFs or more complex functions. I suppose we might want to define which functions are allowed in the filter? (Obviously some datasets might support more than that)

Maybe a list like:

  • Equality / inequalities
  • is_null / not_null
  • cast
  • isin
  • not
  • and / or
  • between

That covers everything that is pushed down by

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems workable. There is some prior art from the row-based world with the term sargable which seems to be pretty close to your list.

Copy link
Member

@westonpace westonpace May 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, just to clarify, does this mean the following interpretation is correct?

Yes, the filter must be satisfied and the dataset should return an error if it cannot fully satisfy the filter.


class Scannable(Protocol):
@abstractmethod
def scanner(self, columns: Optional[List[str]] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference between Sequence[str] and Dict[str, Expression] is significant. The former only allows you to pick which columns to load. The latter introduces the concept of project expressions which is big.

Comment on lines 46 to 81
@abstractmethod
def to_table(self) -> Table:
...

@abstractmethod
def to_batches(self) -> Iterator[RecordBatch]:
...
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have to_reader then you could create a default implementation for these two methods. Is there any particular reason to include them? In other words, do you expect an implementation to be able to somehow support to_table more efficiently than to_reader().read_all()?

python/pyarrow/dataset/protocol.py Show resolved Hide resolved
@wjones127
Copy link
Member Author

I'm not sure the API you are defining helps you with that goal. I think what I is missing is the API used to create the dataset. What you've proposed here isn't flexible enough. For example, if I'm trying to convert a "named table request" (e.g. give me all rows from table "widgets" with filter "xyz" at time point Y) into a "scan request" (e.g. what pyarrow datasets can read) then I want something like...

@westonpace you are correct that this doesn't define how such dataset classes are built. That's left to the consumer, who will write their own classes that conform to this API.

However, I do like your idea for a dataset builder. I think it might be worth asking the PyIceberg developers whether something like that would work well for them. (I think Delta Lake and Lance will likely go the route of implementing their own classes in Rust.) I've noted this idea in https://docs.google.com/document/d/1-uVkSZeaBtOALVbqMOPeyV3s2UND7Wl-IGEZ-P-gMXQ/edit#heading=h.31rf5m1tlipg

@westonpace
Copy link
Member

@westonpace you are correct that this doesn't define how such dataset classes are built. That's left to the consumer, who will write their own classes that conform to this API.

That would seem an essential API if this protocol was meant to be used by "table formats" to prepare "queries simple enough for query engines to understand". So perhaps I am misunderstanding.

Is this protocol meant to be used by "query engines" to "query a table format library as if it were a dataset"?

@github-actions github-actions bot added Component: Documentation awaiting change review Awaiting change review and removed awaiting changes Awaiting changes labels May 28, 2023
for HIVE partitions that don't match that date.
* Filters passed down should be fully executed. While other systems have scanners
that are "best-effort", only executing the parts of the filter that it can, PyArrow
datasets should always remove all rows that don't match the filter.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if we can control this. In Iceberg we know if there are any residuals in the file.

metadata about fragments that aren't relevant to queries. For example, if you
have a dataset that uses Hive-style paritioning for a column ``date`` and the
user passes a filter for ``date=2023-01-01``, then you can skip listing directory
for HIVE partitions that don't match that date.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this relevant to the protocol? For context: In Iceberg you can just pass in an expression, and it automatically checks if the partitioning can be used to prune any data. Iceberg is lazy, so multiple partitioning strategies can co-exist because it changed over time (you never know how the partitions evolve over time, so this should be flexible :).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Hive-style partitioning was just an example. I think for Iceberg the equivalent statement would be something like: Iceberg dataset can skip loading manifest files that are irrelevant to the query, instead of loading every manifest file referenced in the manifest lists. Does that make sense? The overall idea is that the API allows you to treat the metadata itself as "big data" that you wouldn't want to scan in it's entirety.

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting change review Awaiting change review labels Jun 6, 2023
@jorisvandenbossche
Copy link
Member

jorisvandenbossche commented Jun 7, 2023

@westonpace you are correct that this doesn't define how such dataset classes are built. That's left to the consumer, who will write their own classes that conform to this API.

That would seem an essential API if this protocol was meant to be used by "table formats" to prepare "queries simple enough for query engines to understand". So perhaps I am misunderstanding.

Is this protocol meant to be used by "query engines" to "query a table format library as if it were a dataset"?

My assumption was that it are the producers that implement the classes that conform to this API?

How are the consumer and producer supposed to interact with this protocol?

Taking duckdb as example, the user can currently manually create a pyarrow object, and then query automatically from this using duckdb:

import pyarrow.dataset as ds

pyarrow_dataset = ds.dataset(...)
duckdb.sql("SELECT * FROM pyarrow_dataset WHERE ..")

(and under the hood, duckdb converts pyarrow_dataset to a scanner / stream of batches passing the appropriate filter / projection based on the actual query)

Is the idea that something similar would then work for any object supporting this protocol? (in the assumption that duckdb relaxes it check for a pyarrow object to any object conforming to the protocol) For example with delta-lake:

from deltalake import DeltaTable

delta_table = DeltaTable("..")
duckdb.sql("SELECT * FROM delta_table WHERE ..")

But if this is the intended usage, I don't understand what the "builder API" (#35568 (review)) would be meant for?

In other words, for a table format to use a query engine, it's not enough to pass a single query (e.g. filter / columns / whatever). We need to pass a query per file.

@westonpace Why is that not sufficient? I think it is up to the table format to translate the single query into a query per file (and execute this)?

@github-actions github-actions bot added awaiting change review Awaiting change review and removed awaiting changes Awaiting changes labels Jun 12, 2023
@wjones127 wjones127 force-pushed the GH-33986-dataset-protocol-python branch from c6d50f4 to 280bfdb Compare June 12, 2023 01:31
@wjones127 wjones127 changed the title GH-33986: [Python] Sketch out a minimal protocol interface for datasets GH-33986: [Python] Add a minimal protocol for datasets Jun 12, 2023
@wjones127
Copy link
Member Author

I've updated the docs to be more descriptive of the intended uses.

There are also some tests to make sure the protocol aligns with what is implemented by pyarrow.dataset.Dataset and related classes.

@westonpace
Copy link
Member

So is the assumption here that the producer and the consumer (in your diagram) are the same library? E.g. both are pyarrow (pyarrow has code for producing datasets and for scanning datasets)? Or is the goal to be able to produce datasets with one library and consume them with a different library?

@westonpace
Copy link
Member

Why is that not sufficient? I think it is up to the table format to translate the single query into a query per file (and execute this)?

@jorisvandenbossche in other words, the table format would create a fragment or dataset for each file (or set of files with a common schema) and then create a unique query for each of those. That would work. I was thinking something much more complex.

@wjones127
Copy link
Member Author

So is the assumption here that the producer and the consumer (in your diagram) are the same library? E.g. both are pyarrow (pyarrow has code for producing datasets and for scanning datasets)? Or is the goal to be able to produce datasets with one library and consume them with a different library?

Different libraries. Producers are libraries like lance, deltalake, and pyiceberg. Consumers are libraries like duckdb, polars, datafusion and dask.

You could say the status quo is that the consumer can be any library, but the producer is assumed to be pyarrow. This protocol helps open up the producer side to be other libraries.

Comment on lines +24 to +29
pushdown. The subset of the API is contained in ``pyarrow.dataset.protocol``.

Copy link
Member Author

@wjones127 wjones127 Jun 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pushdown. The subset of the API is contained in ``pyarrow.dataset.protocol``.
pushdown. The subset of the API is contained in ``pyarrow.dataset.protocol``.
Producers are scanner implementations. For example, table formats like Delta
Lake and Iceberg might provide their own dataset implementations. Consumers
are typically query engines, such as DuckDB, DataFusion, Polars, and Dask.
Providing a common API avoids a situation where supporting ``N`` dataset
formats in ``M`` query engines requires ``N * M`` integrations.

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting change review Awaiting change review labels Jun 14, 2023
- isin
- between
- negation (not)

Copy link
Contributor

@Fokko Fokko Jun 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding support for nan-values? is_nan / not_nan

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I needed to try this, and it works really well! Thanks for the great work here @wjones127

image

My messy code can be found here: apache/iceberg#7846 Only look at it if you're interested, is is very early, and a lot is still missing. Just to check how it fits the structure.

One concern I have is where to implement the schema projection. Currently, in PyIceberg we produce a pa.Table for each of the Parquet files. In the following example, we have an Iceberg table with two columns. There was only one column initially, the first Parquet file was written, and then the name column got added, and another file was written. Iceberg is lazy, so when reading the first file, it will inject a null column:

This is valid:

Schema:
- idx: int32, required=True, field-id=1
- name: string, required=False, field-id=2

00001-data.parquet
idx: int32, field-id=1

00002-data.parquet
idx: int32, field-id=1
name: string, field-id=2

Currently in PyIceberg we materialize each of the fragments to a table, for each of the tables, we convert it to the requested schema, and then we concatinate all the tables. There are a couple of situation, such as field widening:

This is valid:

Schema:
- idx: int32, required=True, field-id=1

00001-data.parquet
idx: int32, field-id=1

00002-data.parquet
idx: int64, field-id=1

Or renaming, where we'll rename the column of the table, before concatenation them:

This is valid:

Schema:
- idx: int64, required=True, field-id=1

00001-data.parquet
idx: int64, field-id=1

00002-data.parquet
index: int64, field-id=1

With Iceberg spec v3, also initial-defaults are being added so we can add new required columns:

This is valid:

Schema:
- idx: int64, required=True, field-id=1
- name: string, required=True, field-id=2, initial-default=unknown

00001-data.parquet
idx: int64, field-id=1

00002-data.parquet
index: int64, field-id=1
name: string, field-id=2

And this way when reading the data, it will inject unknown if the column is not in the physical fragement. Exciting stuff!

As said, currently we iterate both the physical Arrow schema, and the requested Iceberg schema. This is where missing columns are being added with null values, and types are being casted. My question is, would it be appropriate do to this on the RecordBatch? This way the Fragment would already materialize the records, and then it is possible to convert them into the requested schema. Going to a table, and back to batches seems wasteful to me.

Another thing that's not possible today in the current structure is positional deletes, but this is solvable by adding __row_index as suggested in #35301.

python/pyarrow/dataset/protocol.py Outdated Show resolved Hide resolved
python/pyarrow/dataset/protocol.py Outdated Show resolved Hide resolved
python/pyarrow/dataset/protocol.py Outdated Show resolved Hide resolved
python/pyarrow/dataset/protocol.py Outdated Show resolved Hide resolved
python/pyarrow/dataset/protocol.py Outdated Show resolved Hide resolved
...

@abstractmethod
def head(self, num_rows: int) -> Table:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For head and count_rows, should we say that not all producers will support them and that they may raise NotImplementedError?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's reasonable. They aren't that important to the protocol's goals; I almost considered removing them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would leave them in. In Iceberg we can do a count without touching the actual data files (if you don't use a filter).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving them in.

columns : List[str], optional
Names of columns to include in the scan. If None, all columns are
included.
filter : Expression, optional
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should omit filter to begin with, given that we'd be trying to codify semantics for a mini expression language

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think given this piece is already in use and important, I'd like to include it. It's definitely funky. There is desire from the community to move away from PyArrow expressions and toward Substrait or something else, so I expect we'll deprecate this eventually.

.. specific language governing permissions and limitations
.. under the License.

Extending PyArrow Datasets
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we say that this is currently experimental, and list the things that we know are on the roadmap?


PyArrow provides a core protocol for datasets, so third-party libraries can both
produce and consume classes that conform to useful subset of the PyArrow dataset
API. This subset provides enough functionality to provide predicate and filter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"predicate" and "filter" mean the same thing. Do you mean "projection and filter"?


dataset = producer_library.get_dataset(...)
df = consumer_library.read_dataset(dataset)
df.filter("x > 0").select("y")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused about select() here. The Dataset API does not currently expose a select() method, but it allows column projection by allowing users to pass column names or expressions to other methods or to the scanner. Do you propose adding a select() method to the Dataset API?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is meant to be a sketch of what usage looks like from the users perspective. df is a DataFrame object produced by an imaginary consumer library, and filter and select are methods on this imaginary library.

The idea is the user gets a dataset from the producer, and passes it to the consumer. Then the user gets to write their queries with their favorite query engine, and under the hood the datasets API is used to do projection and predicate pushdown.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps I should explain that it's imaginary? Or I could show a concrete usage with DuckDB and then Polars?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've switched this to a concrete example with Delta Lake and DuckDB.

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies that it took me so long but I think the purpose of this protocol has finally clicked for me. I've got a few more thoughts but, in general, I like the idea! I appreciate you putting up with my confusion :)



@runtime_checkable
class Fragment(Scannable, Protocol):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a some things I would like to have here, as a user, but I understand we are just getting started and trying to be minimal. So take these as suggestions:

__repr__ <-- converting a fragment to string is very useful for debugging

estimated_cost <-- I get why this one isn't there but a fragment might be 5 rows or it might be 5 million rows, and that could be valuable for figuring out how to distribute a dataset workload. Still, there is no universal way of estimating cost, so perhaps we can leave this for an extension.

Comment on lines +111 to +113
batch_size : int, optional
The number of rows to include in each batch. If None, the default
value is used. The default value is implementation specific.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe expand that the default value is not only implementation specific but might not even be consistent between batches?

Also, if this is a min/max or is it a maximum-only? In other words, if batch_size is is 1_000_000 and the source is a parquet file with 10 row groups of 100_000 rows does the scanner need to accumulate the rows or is it acceptable to return smaller batches?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good detail to think about. I don't think we should require it to be exact; for example, if the reader can't read in exactly that batch size I don't think it should error. But I do think readers should make their best effort to be close the batch size as possible, even if that means splitting row groups into chunks, for example.

Though you might have more informed opinions here; what do you think is reasonable to expect here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This parameter has lost importance in arrow-c++ datasets. It used to be an important tuning parameter that affected the size of the batches used internally by the C++ implementation. However, it didn't make sense for the user to pick the correct value (and there are multiple batch sizes in the C++ and the right value might even depend on the schema and be quite difficult to calculate).

I think it still has value, especially "max batch size". The user needs someway to say "don't give me 20GB of data all at once".

So I think it needs to be a hard upper limit but it can be a soft lower limit. We could either call it max_batch_size (and ignore it as an upper limit entirely) or preferred_batch_size (and explain that only the upper limit is strictly enforced). I don't think using this as an upper limit is overly burdensome as slicing tables/batches should be pretty easy and lightweight. The reverse (concatenating batches) is more complicated and expensive.

Comment on lines 115 to 118
Whether to use multiple threads to read the rows. It is expected
that consumers reading a whole dataset in one scanner will keep this
as True, while consumers reading a single fragment per worker will
typically set this to False.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe instead of It is expected we could use softer wording like "Often" or "Typically"? I just think there are corner cases for each. For example, you might not want to use threads because you expect to run multiple queries concurrently or you just want to cut down on the resource usage.

included.
filter : Expression, optional
Filter expression to apply to the scan. If None, no filter is applied.
batch_size : int, optional
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should batch_size be part of to_reader instead of scanner?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to that. But I don't want to add that to the protocol without also implementing it in PyArrow Datasets. So if we think this is important, I'll remove this for now.

Comment on lines +35 to +43
Producers are responsible for outputting a class that conforms to the protocol.

Consumers are responsible for calling methods on the protocol to get the data
out of the dataset. The protocol supports getting data as a single stream or
as a series of tasks which may be distributed.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is "ok" but the definition of producer and consumer here are reversed from what they are in Substrait which confused me for a while. Maybe we can go with "Data producer" and "Data consumer"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha it finally clicked why I myself find these terms confusing 🤣. It feels backwards! I'll think of new names.

@wjones127 wjones127 force-pushed the GH-33986-dataset-protocol-python branch from e995cdd to be648e2 Compare July 3, 2023 21:34
@github-actions github-actions bot added awaiting change review Awaiting change review awaiting changes Awaiting changes and removed awaiting changes Awaiting changes awaiting change review Awaiting change review labels Jul 3, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Python][Rust] Create extension point in python for Dataset/Scanner
7 participants