Skip to content

Commit

Permalink
remove filters for now
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 committed Jul 3, 2023
1 parent 0f8a61c commit be648e2
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 69 deletions.
56 changes: 23 additions & 33 deletions docs/source/python/integration/dataset.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
Extending PyArrow Datasets
==========================

.. warn::

This protocol is currently experimental.

PyArrow provides a core protocol for datasets, so third-party libraries can both
produce and consume classes that conform to useful subset of the PyArrow dataset
API. This subset provides enough functionality to provide predicate and filter
API. This subset provides enough functionality to provide projection
pushdown. The subset of the API is contained in ``pyarrow.dataset.protocol``.

.. image:: pyarrow_dataset_protocol.svg
Expand All @@ -38,18 +42,24 @@ Consumers are responsible for calling methods on the protocol to get the data
out of the dataset. The protocol supports getting data as a single stream or
as a series of tasks which may be distributed.

From the perspective of a user, the code looks like:
As an example, from the perspective of the user this is what the code looks like
to retrieve a Delta Lake table as a dataset and use it in DuckDB:

.. code-block:: python
:emphasize-lines: 2,6
from deltalake import DeltaTable
table = DeltaTable("path/to/table")
dataset = table.to_pyarrow_dataset()
dataset = producer_library.get_dataset(...)
df = consumer_library.read_dataset(dataset)
df.filter("x > 0").select("y")
import duckdb
df = duckdb.arrow(dataset)
df.project("y")
Here, the consumer would pass the filter ``x > 0`` and the projection of ``y`` down
to the producer through the dataset protocol. Thus, the user gets to enjoy the
performance benefits of pushing down filters and projections while being able
to specify those in their preferred query engine.
Here, the DuckDB would pass the the projection of ``y`` down to the producer
through the dataset protocol. The deltalake scanner would then only read the
column ``y``. Thus, the user gets to enjoy the performance benefits of pushing
down projections while being able to specify those in their preferred query engine.


Dataset Producers
Expand All @@ -60,24 +70,6 @@ produce a PyArrow-compatible dataset. Your dataset could be backed by the classe
implemented in PyArrow or you could implement your own classes. Either way, you
should implement the protocol below.

When implementing the dataset, consider the following:

* Filters passed down should be fully executed. While other systems have scanners
that are "best-effort", only executing the parts of the filter that it can, PyArrow
datasets should always remove all rows that don't match the filter. If the
implementation cannot execute the filter, it should raise an exception. A
limited set of expressions are allowed in these filters for the general
protocol. See the docstrings for ``Scannable`` below for details.
* The API does not require that a dataset has metadata about all fragments
loaded into memory. Indeed, to scale to very large Datasets, don't eagerly
load all the fragment metadata into memory. Instead, load fragment metadata
once a filter is passed. This allows you to skip loading metadata about
fragments that aren't relevant to queries. For example, if you have a dataset
that uses Hive-style paritioning for a column ``date`` and the user passes a
filter for ``date=2023-01-01``, then you can skip listing directory for HIVE
partitions that don't match that date.


Dataset Consumers
-----------------

Expand All @@ -92,7 +84,7 @@ There are two general patterns for consuming PyArrow datasets: reading a single
stream or creating a scan task per fragment.

If you have a streaming execution model, you can receive a single stream
of data by calling ``dataset.scanner(filter=..., columns=...).to_reader()``.
of data by calling ``dataset.scanner(columns=...).to_reader()``.
This will return a RecordBatchReader, which can be exported over the
:ref:`C Stream Interface <c-stream-interface>`. The record batches yield
from the stream can then be passed to worker threads for parallelism.
Expand All @@ -103,7 +95,7 @@ and readers. In this case, the code looks more like:

.. code-block:: python
fragments = list(dataset.get_fragments(filter=..., columns=...))
fragments = list(dataset.get_fragments(columns=...))
def scan_partition(i):
fragment = fragments[i]
Expand All @@ -113,13 +105,11 @@ and readers. In this case, the code looks more like:
Fragments are pickleable, so they can be passed to remote workers in a
distributed system.

If your engine supports predicate (filter) and projection (column) pushdown,
If your engine supports projection (column) pushdown,
you can pass those down to the dataset by passing them to the ``scanner``.
Column pushdown is limited to selecting a subset of columns from the schema.
Some implementations, including PyArrow may also support projecting and
renaming columns, but this is not part of the protocol. Predicate pushdown
is limited to a subset of expressions. See the docstrings for ``Scannable``
for the allowed expressions.
renaming columns, but this is not part of the protocol.


The protocol
Expand Down
50 changes: 14 additions & 36 deletions python/pyarrow/dataset/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
that implement these protocols, rather than requiring the specific
PyArrow classes.
The pyarrow.dataset.Dataset class itself implements this protocol.
See Extending PyArrow Datasets for more information:
https://arrow.apache.org/docs/python/integration/dataset.html
"""
from abc import abstractmethod, abstractproperty
from abc import abstractmethod
from typing import Iterator, List, Optional

# TODO: remove once we drop support for Python 3.7
Expand All @@ -50,21 +52,21 @@ class Scanner(Protocol):
@abstractmethod
def count_rows(self) -> int:
"""
Count the number of rows in this dataset.
Count the number of rows in this dataset or fragment.
Implementors may provide optimized code paths that compute this from metadata.
Returns
-------
int
The number of rows in the dataset.
The number of rows in the dataset or fragment.
"""
...

@abstractmethod
def head(self, num_rows: int) -> Table:
"""
Get the first ``num_rows`` rows of the dataset.
Get the first ``num_rows`` rows of the dataset or fragment.
Parameters
----------
Expand All @@ -74,7 +76,7 @@ def head(self, num_rows: int) -> Table:
Returns
-------
Table
A table containing the first ``num_rows`` rows of the dataset.
A table containing the first ``num_rows`` rows of the dataset or fragment.
"""
...

Expand All @@ -96,7 +98,7 @@ def to_reader(self) -> RecordBatchReader:
class Scannable(Protocol):
@abstractmethod
def scanner(self, columns: Optional[List[str]] = None,
filter: Optional[Expression] = None, batch_size: Optional[int] = None,
batch_size: Optional[int] = None,
use_threads: bool = True,
**kwargs) -> Scanner:
"""Create a scanner for this dataset.
Expand All @@ -106,33 +108,14 @@ def scanner(self, columns: Optional[List[str]] = None,
columns : List[str], optional
Names of columns to include in the scan. If None, all columns are
included.
filter : Expression, optional
Filter expression to apply to the scan. If None, no filter is applied.
batch_size : int, optional
The number of rows to include in each batch. If None, the default
value is used. The default value is implementation specific.
use_threads : bool, default True
Whether to use multiple threads to read the rows. It is expected
that consumers reading a whole dataset in one scanner will keep this
Whether to use multiple threads to read the rows. Often consumers
reading a whole dataset in one scanner will keep this
as True, while consumers reading a single fragment per worker will
typically set this to False.
Notes
-----
The filters must be fully satisfied. If the dataset cannot satisfy the
filter, it should raise an error.
Only the following expressions are allowed in the filter:
- Equality / inequalities (==, !=, <, >, <=, >=)
- Conjunctions (and, or)
- Field references (e.g. "a" or "a.b.c")
- Literals (e.g. 1, 1.0, "a", True)
- cast
- is_null / not_null
- isin
- between
- negation (not)
set this to False.
"""
...

Expand All @@ -151,24 +134,19 @@ class Fragment(Scannable, Protocol):
class Dataset(Scannable, Protocol):
@abstractmethod
def get_fragments(
self,
filter: Optional[Expression] = None, **kwargs
self, **kwargs
) -> Iterator[Fragment]:
"""Get the fragments of this dataset.
Parameters
----------
filter : Expression, optional
Filter expression to use to prune which fragments are selected.
See Scannable.scanner for details on allowed filters. The filter is
just used to prune which fragments are selected. It does not need to
save the filter to apply to the scan. That is handled by the scanner.
**kwargs : dict
Additional arguments to pass to underlying implementation.
"""
...

@abstractproperty
@property
@abstractmethod
def schema(self) -> Schema:
"""
Get the schema of this dataset.
Expand Down

0 comments on commit be648e2

Please sign in to comment.