Skip to content

Commit

Permalink
refinements
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 committed Jun 12, 2023
1 parent 280bfdb commit e995cdd
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 17 deletions.
35 changes: 19 additions & 16 deletions docs/source/python/integration/dataset.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,17 @@
.. specific language governing permissions and limitations
.. under the License.
.. currentmodule:: pyarrow.dataset

Extending PyArrow Datasets
==========================

PyArrow provides a core protocol for datasets, so third-party libraries can both
produce and consume PyArrow datasets.

The idea is that any library can have a method that returns their dataset as a
PyArrow dataset. Then, any query engine can consume that dataset and push down
filters and projections.
produce and consume classes that conform to useful subset of the PyArrow dataset
API. This subset provides enough functionality to provide predicate and filter
pushdown. The subset of the API is contained in ``pyarrow.dataset.protocol``.

.. image:: pyarrow_dataset_protocol.svg
:alt: A diagram showing the workflow for using the PyArrow Dataset protocol.
There are two flows shown, one for streams and one for tasks. The stream
There are two flows shown, one for stream and one for tasks. The stream
case shows a linear flow from a producer class, to a dataset, to a
scanner, and finally to a RecordBatchReader. The tasks case shows a
similar diagram, except the dataset is split into fragments, which are
Expand All @@ -42,7 +38,7 @@ Consumers are responsible for calling methods on the protocol to get the data
out of the dataset. The protocol supports getting data as a single stream or
as a series of tasks which may be distributed.

From the perspective of a user, this looks something like
From the perspective of a user, the code looks like:

.. code-block:: python
Expand All @@ -68,7 +64,10 @@ When implementing the dataset, consider the following:

* Filters passed down should be fully executed. While other systems have scanners
that are "best-effort", only executing the parts of the filter that it can, PyArrow
datasets should always remove all rows that don't match the filter.
datasets should always remove all rows that don't match the filter. If the
implementation cannot execute the filter, it should raise an exception. A
limited set of expressions are allowed in these filters for the general
protocol. See the docstrings for ``Scannable`` below for details.
* The API does not require that a dataset has metadata about all fragments
loaded into memory. Indeed, to scale to very large Datasets, don't eagerly
load all the fragment metadata into memory. Instead, load fragment metadata
Expand All @@ -90,18 +89,17 @@ options and methods beyond those, but they should not be relied upon without
checking for specific classes.

There are two general patterns for consuming PyArrow datasets: reading a single
stream or reading a stream per fragment.
stream or creating a scan task per fragment.

If you have a streaming execution model, you can recieve a single stream
If you have a streaming execution model, you can receive a single stream
of data by calling ``dataset.scanner(filter=..., columns=...).to_reader()``.
This will return a RecordBatchReader, which can be exported over the
:ref:`C Stream Interface <c-stream-interface>`. The record batches yield
from the stream can then be passed to worker threads for parallelism.

If you are using a partition-based or distributed model, you can split the
dataset into fragments and then distribute those fragments into tasks that
create their own scanners and readers. In this case, the code looks more
like:
If you are using a task-based model, you can split the dataset into fragments
and then distribute those fragments into tasks that create their own scanners
and readers. In this case, the code looks more like:

.. code-block:: python
Expand All @@ -117,6 +115,11 @@ distributed system.

If your engine supports predicate (filter) and projection (column) pushdown,
you can pass those down to the dataset by passing them to the ``scanner``.
Column pushdown is limited to selecting a subset of columns from the schema.
Some implementations, including PyArrow may also support projecting and
renaming columns, but this is not part of the protocol. Predicate pushdown
is limited to a subset of expressions. See the docstrings for ``Scannable``
for the allowed expressions.


The protocol
Expand Down
8 changes: 7 additions & 1 deletion python/pyarrow/dataset/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@
https://arrow.apache.org/docs/python/integration/dataset.html
"""
from abc import abstractmethod, abstractproperty
from typing import Iterator, List, Optional, Protocol, runtime_checkable
from typing import Iterator, List, Optional

# TODO: remove once we drop support for Python 3.7
if sys.version_info >= (3, 8):
from typing import Protocol, runtime_checkable
else:
from typing_extensions import Protocol, runtime_checkable

from pyarrow.dataset import Expression
from pyarrow import Table, RecordBatchReader, Schema
Expand Down

0 comments on commit e995cdd

Please sign in to comment.