From be648e2f601a2ea00f6178acfff962860eb008c3 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 3 Jul 2023 15:33:59 -0600 Subject: [PATCH] remove filters for now --- docs/source/python/integration/dataset.rst | 56 +++++++++------------- python/pyarrow/dataset/protocol.py | 50 ++++++------------- 2 files changed, 37 insertions(+), 69 deletions(-) diff --git a/docs/source/python/integration/dataset.rst b/docs/source/python/integration/dataset.rst index 849ae6d927262..4d12b89896a10 100644 --- a/docs/source/python/integration/dataset.rst +++ b/docs/source/python/integration/dataset.rst @@ -18,9 +18,13 @@ Extending PyArrow Datasets ========================== +.. warn:: + + This protocol is currently experimental. + PyArrow provides a core protocol for datasets, so third-party libraries can both produce and consume classes that conform to useful subset of the PyArrow dataset -API. This subset provides enough functionality to provide predicate and filter +API. This subset provides enough functionality to provide projection pushdown. The subset of the API is contained in ``pyarrow.dataset.protocol``. .. image:: pyarrow_dataset_protocol.svg @@ -38,18 +42,24 @@ Consumers are responsible for calling methods on the protocol to get the data out of the dataset. The protocol supports getting data as a single stream or as a series of tasks which may be distributed. -From the perspective of a user, the code looks like: +As an example, from the perspective of the user this is what the code looks like +to retrieve a Delta Lake table as a dataset and use it in DuckDB: .. code-block:: python + :emphasize-lines: 2,6 + + from deltalake import DeltaTable + table = DeltaTable("path/to/table") + dataset = table.to_pyarrow_dataset() - dataset = producer_library.get_dataset(...) - df = consumer_library.read_dataset(dataset) - df.filter("x > 0").select("y") + import duckdb + df = duckdb.arrow(dataset) + df.project("y") -Here, the consumer would pass the filter ``x > 0`` and the projection of ``y`` down -to the producer through the dataset protocol. Thus, the user gets to enjoy the -performance benefits of pushing down filters and projections while being able -to specify those in their preferred query engine. +Here, the DuckDB would pass the the projection of ``y`` down to the producer +through the dataset protocol. The deltalake scanner would then only read the +column ``y``. Thus, the user gets to enjoy the performance benefits of pushing +down projections while being able to specify those in their preferred query engine. Dataset Producers @@ -60,24 +70,6 @@ produce a PyArrow-compatible dataset. Your dataset could be backed by the classe implemented in PyArrow or you could implement your own classes. Either way, you should implement the protocol below. -When implementing the dataset, consider the following: - -* Filters passed down should be fully executed. While other systems have scanners - that are "best-effort", only executing the parts of the filter that it can, PyArrow - datasets should always remove all rows that don't match the filter. If the - implementation cannot execute the filter, it should raise an exception. A - limited set of expressions are allowed in these filters for the general - protocol. See the docstrings for ``Scannable`` below for details. -* The API does not require that a dataset has metadata about all fragments - loaded into memory. Indeed, to scale to very large Datasets, don't eagerly - load all the fragment metadata into memory. Instead, load fragment metadata - once a filter is passed. This allows you to skip loading metadata about - fragments that aren't relevant to queries. For example, if you have a dataset - that uses Hive-style paritioning for a column ``date`` and the user passes a - filter for ``date=2023-01-01``, then you can skip listing directory for HIVE - partitions that don't match that date. - - Dataset Consumers ----------------- @@ -92,7 +84,7 @@ There are two general patterns for consuming PyArrow datasets: reading a single stream or creating a scan task per fragment. If you have a streaming execution model, you can receive a single stream -of data by calling ``dataset.scanner(filter=..., columns=...).to_reader()``. +of data by calling ``dataset.scanner(columns=...).to_reader()``. This will return a RecordBatchReader, which can be exported over the :ref:`C Stream Interface `. The record batches yield from the stream can then be passed to worker threads for parallelism. @@ -103,7 +95,7 @@ and readers. In this case, the code looks more like: .. code-block:: python - fragments = list(dataset.get_fragments(filter=..., columns=...)) + fragments = list(dataset.get_fragments(columns=...)) def scan_partition(i): fragment = fragments[i] @@ -113,13 +105,11 @@ and readers. In this case, the code looks more like: Fragments are pickleable, so they can be passed to remote workers in a distributed system. -If your engine supports predicate (filter) and projection (column) pushdown, +If your engine supports projection (column) pushdown, you can pass those down to the dataset by passing them to the ``scanner``. Column pushdown is limited to selecting a subset of columns from the schema. Some implementations, including PyArrow may also support projecting and -renaming columns, but this is not part of the protocol. Predicate pushdown -is limited to a subset of expressions. See the docstrings for ``Scannable`` -for the allowed expressions. +renaming columns, but this is not part of the protocol. The protocol diff --git a/python/pyarrow/dataset/protocol.py b/python/pyarrow/dataset/protocol.py index 1f6657f916a6e..d8696507ba030 100644 --- a/python/pyarrow/dataset/protocol.py +++ b/python/pyarrow/dataset/protocol.py @@ -23,11 +23,13 @@ that implement these protocols, rather than requiring the specific PyArrow classes. +The pyarrow.dataset.Dataset class itself implements this protocol. + See Extending PyArrow Datasets for more information: https://arrow.apache.org/docs/python/integration/dataset.html """ -from abc import abstractmethod, abstractproperty +from abc import abstractmethod from typing import Iterator, List, Optional # TODO: remove once we drop support for Python 3.7 @@ -50,21 +52,21 @@ class Scanner(Protocol): @abstractmethod def count_rows(self) -> int: """ - Count the number of rows in this dataset. + Count the number of rows in this dataset or fragment. Implementors may provide optimized code paths that compute this from metadata. Returns ------- int - The number of rows in the dataset. + The number of rows in the dataset or fragment. """ ... @abstractmethod def head(self, num_rows: int) -> Table: """ - Get the first ``num_rows`` rows of the dataset. + Get the first ``num_rows`` rows of the dataset or fragment. Parameters ---------- @@ -74,7 +76,7 @@ def head(self, num_rows: int) -> Table: Returns ------- Table - A table containing the first ``num_rows`` rows of the dataset. + A table containing the first ``num_rows`` rows of the dataset or fragment. """ ... @@ -96,7 +98,7 @@ def to_reader(self) -> RecordBatchReader: class Scannable(Protocol): @abstractmethod def scanner(self, columns: Optional[List[str]] = None, - filter: Optional[Expression] = None, batch_size: Optional[int] = None, + batch_size: Optional[int] = None, use_threads: bool = True, **kwargs) -> Scanner: """Create a scanner for this dataset. @@ -106,33 +108,14 @@ def scanner(self, columns: Optional[List[str]] = None, columns : List[str], optional Names of columns to include in the scan. If None, all columns are included. - filter : Expression, optional - Filter expression to apply to the scan. If None, no filter is applied. batch_size : int, optional The number of rows to include in each batch. If None, the default value is used. The default value is implementation specific. use_threads : bool, default True - Whether to use multiple threads to read the rows. It is expected - that consumers reading a whole dataset in one scanner will keep this + Whether to use multiple threads to read the rows. Often consumers + reading a whole dataset in one scanner will keep this as True, while consumers reading a single fragment per worker will - typically set this to False. - - Notes - ----- - The filters must be fully satisfied. If the dataset cannot satisfy the - filter, it should raise an error. - - Only the following expressions are allowed in the filter: - - Equality / inequalities (==, !=, <, >, <=, >=) - - Conjunctions (and, or) - - Field references (e.g. "a" or "a.b.c") - - Literals (e.g. 1, 1.0, "a", True) - - cast - - is_null / not_null - - isin - - between - - negation (not) - + set this to False. """ ... @@ -151,24 +134,19 @@ class Fragment(Scannable, Protocol): class Dataset(Scannable, Protocol): @abstractmethod def get_fragments( - self, - filter: Optional[Expression] = None, **kwargs + self, **kwargs ) -> Iterator[Fragment]: """Get the fragments of this dataset. Parameters ---------- - filter : Expression, optional - Filter expression to use to prune which fragments are selected. - See Scannable.scanner for details on allowed filters. The filter is - just used to prune which fragments are selected. It does not need to - save the filter to apply to the scan. That is handled by the scanner. **kwargs : dict Additional arguments to pass to underlying implementation. """ ... - @abstractproperty + @property + @abstractmethod def schema(self) -> Schema: """ Get the schema of this dataset.