diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 3583a3213ccbc..562193c3816f2 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -3664,10 +3664,13 @@ cdef class Scanner(_Weakrefable): Parameters ---------- - source : Iterator - The iterator of Batches. + source : Iterator or Arrow-compatible stream object + The iterator of Batches. This can be a pyarrow RecordBatchReader, + any object that implements the Arrow PyCapsule Protocol for + streams, or an actual Python iterator of RecordBatches. schema : Schema - The schema of the batches. + The schema of the batches (required when passing a Python + iterator). columns : list[str] or dict[str, Expression], default None The columns to project. This can be a list of column names to include (order and duplicates will be preserved), or a dictionary @@ -3723,6 +3726,12 @@ cdef class Scanner(_Weakrefable): raise ValueError('Cannot specify a schema when providing ' 'a RecordBatchReader') reader = source + elif hasattr(source, "__arrow_c_stream__"): + if schema: + raise ValueError( + 'Cannot specify a schema when providing an object ' + 'implementing the Arrow PyCapsule Protocol') + reader = pa.ipc.RecordBatchReader.from_stream(source) elif _is_iterable(source): if schema is None: raise ValueError('Must provide schema to construct scanner ' diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 1efbfe1665a75..c61e13ee75801 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -964,7 +964,11 @@ def file_visitor(written_file): elif isinstance(data, (pa.RecordBatch, pa.Table)): schema = schema or data.schema data = InMemoryDataset(data, schema=schema) - elif isinstance(data, pa.ipc.RecordBatchReader) or _is_iterable(data): + elif ( + isinstance(data, pa.ipc.RecordBatchReader) + or hasattr(data, "__arrow_c_stream__") + or _is_iterable(data) + ): data = Scanner.from_batches(data, schema=schema) schema = None elif not isinstance(data, (Dataset, Scanner)): diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 3b0284bcb74a6..a19a9ba9ab51c 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -62,6 +62,14 @@ pytestmark = pytest.mark.dataset +class TableStreamWrapper: + def __init__(self, table): + self.table = table + + def __arrow_c_stream__(self, requested_schema=None): + return self.table.__arrow_c_stream__(requested_schema) + + def _generate_data(n): import datetime import itertools @@ -2472,6 +2480,7 @@ def test_scan_iterator(use_threads): for factory, schema in ( (lambda: pa.RecordBatchReader.from_batches( batch.schema, [batch]), None), + (lambda: TableStreamWrapper(table), None), (lambda: (batch for _ in range(1)), batch.schema), ): # Scanning the fragment consumes the underlying iterator @@ -4615,6 +4624,13 @@ def test_write_iterable(tempdir): result = ds.dataset(base_dir, format="ipc").to_table() assert result.equals(table) + base_dir = tempdir / 'inmemory_pycapsule' + stream = TableStreamWrapper(table) + ds.write_dataset(stream, base_dir, + basename_template='dat_{i}.arrow', format="feather") + result = ds.dataset(base_dir, format="ipc").to_table() + assert result.equals(table) + def test_write_scanner(tempdir, dataset_reader): table = pa.table([