Skip to content

Commit

Permalink
add docs, diagram and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
wjones127 committed Jun 12, 2023
1 parent c9a3f1e commit 280bfdb
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 33 deletions.
1 change: 1 addition & 0 deletions docs/source/python/integration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ This allows to easily integrate PyArrow with other languages and technologies.
integration/python_java
integration/extending
integration/cuda
integration/dataset
57 changes: 49 additions & 8 deletions docs/source/python/integration/dataset.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,39 @@ Extending PyArrow Datasets
PyArrow provides a core protocol for datasets, so third-party libraries can both
produce and consume PyArrow datasets.

The idea is that any library can have a method that returns their dataset as a
PyArrow dataset. Then, any query engine can consume that dataset and push down
filters and projections.

.. image:: pyarrow_dataset_protocol.svg
:alt: A diagram showing the workflow for using the PyArrow Dataset protocol.
There are two flows shown, one for streams and one for tasks. The stream
case shows a linear flow from a producer class, to a dataset, to a
scanner, and finally to a RecordBatchReader. The tasks case shows a
similar diagram, except the dataset is split into fragments, which are
then distributed to tasks, which each create their own scanner and
RecordBatchReader.

Producers are responsible for outputting a class that conforms to the protocol.

Consumers are responsible for calling methods on the protocol to get the data
out of the dataset. The protocol supports getting data as a single stream or
as a series of tasks which may be distributed.

From the perspective of a user, this looks something like

.. code-block:: python
dataset = producer_library.get_dataset(...)
df = consumer_library.read_dataset(dataset)
df.filter("x > 0").select("y")
Here, the consumer would pass the filter ``x > 0`` and the projection of ``y`` down
to the producer through the dataset protocol. Thus, the user gets to enjoy the
performance benefits of pushing down filters and projections while being able
to specify those in their preferred query engine.


Dataset Producers
-----------------

Expand All @@ -33,15 +66,17 @@ should implement the protocol below.

When implementing the dataset, consider the following:

* To scale to very large dataset, don't eagerly load all the fragments into memory.
Instead, load fragments once a filter is passed. This allows you to skip loading
metadata about fragments that aren't relevant to queries. For example, if you
have a dataset that uses Hive-style paritioning for a column ``date`` and the
user passes a filter for ``date=2023-01-01``, then you can skip listing directory
for HIVE partitions that don't match that date.
* Filters passed down should be fully executed. While other systems have scanners
that are "best-effort", only executing the parts of the filter that it can, PyArrow
datasets should always remove all rows that don't match the filter.
* The API does not require that a dataset has metadata about all fragments
loaded into memory. Indeed, to scale to very large Datasets, don't eagerly
load all the fragment metadata into memory. Instead, load fragment metadata
once a filter is passed. This allows you to skip loading metadata about
fragments that aren't relevant to queries. For example, if you have a dataset
that uses Hive-style paritioning for a column ``date`` and the user passes a
filter for ``date=2023-01-01``, then you can skip listing directory for HIVE
partitions that don't match that date.


Dataset Consumers
Expand All @@ -51,7 +86,8 @@ If you are a query engine, you'll want to be able to
consume any PyArrow datasets. To make sure your integration is compatible
with any dataset, you should only call methods that are included in the
protocol. Dataset implementations provided by PyArrow implements additional
options and methods beyond those, but they should not be relied upon.
options and methods beyond those, but they should not be relied upon without
checking for specific classes.

There are two general patterns for consuming PyArrow datasets: reading a single
stream or reading a stream per fragment.
Expand Down Expand Up @@ -86,5 +122,10 @@ you can pass those down to the dataset by passing them to the ``scanner``.
The protocol
------------

.. literalinclude:: ../../python/pyarrow/dataset/protocol.py
This module can be imported starting in PyArrow ``13.0.0`` at
``pyarrow.dataset.protocol``. The protocol is defined with ``typing.Protocol``
classes. They can be checked at runtime with ``isinstance`` but can also be
checked statically with Python type checkers like ``mypy``.

.. literalinclude:: ../../../../python/pyarrow/dataset/protocol.py
:language: python
4 changes: 4 additions & 0 deletions docs/source/python/integration/pyarrow_dataset_protocol.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
144 changes: 119 additions & 25 deletions python/pyarrow/dataset/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,59 +22,153 @@
Applications and libraries that want to consume datasets should accept datasets
that implement these protocols, rather than requiring the specific
PyArrow classes.
See Extending PyArrow Datasets for more information:
https://arrow.apache.org/docs/python/integration/dataset.html
"""
from abc import abstractmethod
from typing import Iterator, List, Optional, Protocol
from abc import abstractmethod, abstractproperty
from typing import Iterator, List, Optional, Protocol, runtime_checkable

from pyarrow.dataset import Expression
from pyarrow import Table, IntegerArray, RecordBatch, RecordBatchReader, Schema
from pyarrow import Table, RecordBatchReader, Schema


@runtime_checkable
class Scanner(Protocol):
"""A scanner implementation for a dataset."""
"""
A scanner implementation for a dataset.
This may be a scan of a whole dataset, or a scan of a single fragment.
"""
@abstractmethod
def count_rows(self) -> int:
"""
Count the number of rows in this dataset.
Implementors may provide optimized code paths that compute this from metadata.
Returns
-------
int
The number of rows in the dataset.
"""
...

@abstractmethod
def head(self, num_rows: int) -> Table:
...
"""
Get the first ``num_rows`` rows of the dataset.
@abstractmethod
def take(self, indices: IntegerArray) -> Table:
Parameters
----------
num_rows : int
The number of rows to return.
Returns
-------
Table
A table containing the first ``num_rows`` rows of the dataset.
"""
...

def to_table(self) -> Table:
self.to_reader().read_all()

def to_batches(self) -> Iterator[RecordBatch]:
for batch in self.to_reader():
yield batch

@abstractmethod
def to_reader(self) -> RecordBatchReader:
"""
Create a Record Batch Reader for this scan.
This is used to read the data in chunks.
Returns
-------
RecordBatchReader
"""
...


@runtime_checkable
class Scannable(Protocol):
@abstractmethod
def scanner(self, columns: Optional[List[str]] = None,
filter: Optional[Expression] = None, **kwargs) -> Scanner:
...

@abstractmethod
def schema(self) -> Schema:
filter: Optional[Expression] = None, batch_size: Optional[int] = None,
use_threads: bool = True,
**kwargs) -> Scanner:
"""Create a scanner for this dataset.
Parameters
----------
columns : List[str], optional
Names of columns to include in the scan. If None, all columns are
included.
filter : Expression, optional
Filter expression to apply to the scan. If None, no filter is applied.
batch_size : int, optional
The number of rows to include in each batch. If None, the default
value is used. The default value is implementation specific.
use_threads : bool, default True
Whether to use multiple threads to read the rows. It is expected
that consumers reading a whole dataset in one scanner will keep this
as True, while consumers reading a single fragment per worker will
typically set this to False.
Notes
-----
The filters must be fully satisfied. If the dataset cannot satisfy the
filter, it should raise an error.
Only the following expressions are allowed in the filter:
- Equality / inequalities (==, !=, <, >, <=, >=)
- Conjunctions (and, or)
- Field references (e.g. "a" or "a.b.c")
- Literals (e.g. 1, 1.0, "a", True)
- cast
- is_null / not_null
- isin
- between
- negation (not)
"""
...


class Fragment(Scannable):
@runtime_checkable
class Fragment(Scannable, Protocol):
"""A fragment of a dataset.
This class should be pickleable so that it can be used in a distrubuted scan."""
This might be a partition, a file, a file chunk, etc.
This class should be pickleable so that it can be used in a distributed scan."""
...


class Dataset(Scannable):
@runtime_checkable
class Dataset(Scannable, Protocol):
@abstractmethod
def get_fragments(self, filter: Optional[Expression] = None) -> Iterator[Fragment]:
def get_fragments(
self,
filter: Optional[Expression] = None, **kwargs
) -> Iterator[Fragment]:
"""Get the fragments of this dataset.
Parameters
----------
filter : Expression, optional
Filter expression to use to prune which fragments are selected.
See Scannable.scanner for details on allowed filters. The filter is
just used to prune which fragments are selected. It does not need to
save the filter to apply to the scan. That is handled by the scanner.
**kwargs : dict
Additional arguments to pass to underlying implementation.
"""
...

@abstractproperty
def schema(self) -> Schema:
"""
Get the schema of this dataset.
Returns
-------
Schema
"""
...
29 changes: 29 additions & 0 deletions python/pyarrow/tests/test_dataset_protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Test that PyArrow datasets conform to the protocol."""
import pyarrow.dataset.protocol as protocol
import pyarrow.dataset as ds


def test_dataset_protocol():
assert isinstance(ds.Dataset, protocol.Dataset)
assert isinstance(ds.Fragment, protocol.Fragment)

assert isinstance(ds.Dataset, protocol.Scannable)
assert isinstance(ds.Fragment, protocol.Scannable)

assert isinstance(ds.Scanner, protocol.Scanner)

0 comments on commit 280bfdb

Please sign in to comment.