From 81b2866161386a65391d5a3f838501c00adae686 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Mon, 15 Apr 2024 10:22:15 +0200 Subject: [PATCH] GH-38010: [Python] Construct pyarrow.Field and ChunkedArray through Arrow PyCapsule Protocol (#40818) ### Rationale for this change See https://github.com/apache/arrow/issues/38010#issuecomment-2010601912 for more context. Right now for _consuming_ ArrowSchema-compatible objects that implement the PyCapsule interface, we only have the private `_import_from_c_capsule` (on Schema, Field, DataType) and we check for the protocol in the public `pa.schema(..)`. But that means you currently can only consume objects that represent the schema of a batch (struct type), and not schemas of individual arrays. ### What changes are included in this PR? Expand the `pa.field(..)` constructor to accept objects implementing the protocol method. ### Are these changes tested? TODO * GitHub Issue: #38010 Authored-by: Joris Van den Bossche Signed-off-by: Joris Van den Bossche --- docs/source/python/extending_types.rst | 29 ++++++++++++++++-- python/pyarrow/table.pxi | 37 ++++++++++++++++++++--- python/pyarrow/tests/test_array.py | 2 +- python/pyarrow/tests/test_cffi.py | 12 ++++++-- python/pyarrow/tests/test_table.py | 41 ++++++++++++++++++++++++++ python/pyarrow/tests/test_types.py | 22 ++++++++++++++ python/pyarrow/types.pxi | 18 ++++++++++- 7 files changed, 151 insertions(+), 10 deletions(-) diff --git a/docs/source/python/extending_types.rst b/docs/source/python/extending_types.rst index b7261005e66ee..8df0ef0b1fe99 100644 --- a/docs/source/python/extending_types.rst +++ b/docs/source/python/extending_types.rst @@ -37,14 +37,14 @@ under the hood, you can implement the following methods on those objects: - ``__arrow_c_schema__`` for schema or type-like objects. - ``__arrow_c_array__`` for arrays and record batches (contiguous tables). -- ``__arrow_c_stream__`` for chunked tables or streams of data. +- ``__arrow_c_stream__`` for chunked arrays, tables and streams of data. Those methods return `PyCapsule `__ objects, and more details on the exact semantics can be found in the :ref:`specification `. When your data structures have those methods defined, the PyArrow constructors -(such as :func:`pyarrow.array` or :func:`pyarrow.table`) will recognize those objects as +(see below) will recognize those objects as supporting this protocol, and convert them to PyArrow data structures zero-copy. And the same can be true for any other library supporting this protocol on ingesting data. @@ -53,6 +53,31 @@ support for this protocol by checking for the presence of those methods, and therefore accept any Arrow data (instead of harcoding support for a specific Arrow producer such as PyArrow). +For consuming data through this protocol with PyArrow, the following constructors +can be used to create the various PyArrow objects: + ++----------------------------+-----------------------------------------------+--------------------+ +| Result class | PyArrow constructor | Supported protocol | ++============================+===============================================+====================+ +| :class:`Array` | :func:`pyarrow.array` | array | ++----------------------------+-----------------------------------------------+--------------------+ +| :class:`ChunkedArray` | :func:`pyarrow.chunked_array` | array, stream | ++----------------------------+-----------------------------------------------+--------------------+ +| :class:`RecordBatch` | :func:`pyarrow.record_batch` | array | ++----------------------------+-----------------------------------------------+--------------------+ +| :class:`Table` | :func:`pyarrow.table` | array, stream | ++----------------------------+-----------------------------------------------+--------------------+ +| :class:`RecordBatchReader` | :meth:`pyarrow.RecordBatchReader.from_stream` | stream | ++----------------------------+-----------------------------------------------+--------------------+ +| :class:`Field` | :func:`pyarrow.field` | schema | ++----------------------------+-----------------------------------------------+--------------------+ +| :class:`Schema` | :func:`pyarrow.schema` | schema | ++----------------------------+-----------------------------------------------+--------------------+ + +A :class:`DataType` can be created by consuming the schema-compatible object +using :func:`pyarrow.field` and then accessing the ``.type`` of the resulting +Field. + .. _arrow_array_protocol: Controlling conversion to pyarrow.Array with the ``__arrow_array__`` protocol diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index 6b3c7d0b56266..d31ea0a5fa1e9 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -1344,17 +1344,28 @@ cdef class ChunkedArray(_PandasConvertible): A capsule containing a C ArrowArrayStream struct. """ cdef: + ChunkedArray chunked ArrowArrayStream* c_stream = NULL if requested_schema is not None: - out_type = DataType._import_from_c_capsule(requested_schema) - if self.type != out_type: - raise NotImplementedError("Casting to requested_schema") + target_type = DataType._import_from_c_capsule(requested_schema) + + if target_type != self.type: + try: + chunked = self.cast(target_type, safe=True) + except ArrowInvalid as e: + raise ValueError( + f"Could not cast {self.type} to requested type {target_type}: {e}" + ) + else: + chunked = self + else: + chunked = self stream_capsule = alloc_c_stream(&c_stream) with nogil: - check_status(ExportChunkedArray(self.sp_chunked_array, c_stream)) + check_status(ExportChunkedArray(chunked.sp_chunked_array, c_stream)) return stream_capsule @@ -1397,6 +1408,9 @@ def chunked_array(arrays, type=None): ---------- arrays : Array, list of Array, or array-like Must all be the same data type. Can be empty only if type also passed. + Any Arrow-compatible array that implements the Arrow PyCapsule Protocol + (has an ``__arrow_c_array__`` or ``__arrow_c_stream__`` method) can be + passed as well. type : DataType or string coercible to DataType Returns @@ -1437,6 +1451,21 @@ def chunked_array(arrays, type=None): if isinstance(arrays, Array): arrays = [arrays] + elif hasattr(arrays, "__arrow_c_stream__"): + if type is not None: + requested_type = type.__arrow_c_schema__() + else: + requested_type = None + capsule = arrays.__arrow_c_stream__(requested_type) + result = ChunkedArray._import_from_c_capsule(capsule) + if type is not None and result.type != type: + # __arrow_c_stream__ coerces schema with best effort, so we might + # need to cast it if the producer wasn't able to cast to exact schema. + result = result.cast(type) + return result + elif hasattr(arrays, "__arrow_c_array__"): + arr = array(arrays, type=type) + arrays = [arr] for x in arrays: arr = x if isinstance(x, Array) else array(x, type=type) diff --git a/python/pyarrow/tests/test_array.py b/python/pyarrow/tests/test_array.py index 472a6c5dce750..8bcb28c0d41b9 100644 --- a/python/pyarrow/tests/test_array.py +++ b/python/pyarrow/tests/test_array.py @@ -3382,7 +3382,7 @@ def __arrow_c_array__(self, requested_schema=None): result = pa.array(arr) assert result == arr.data - # Will case to requested type + # Will cast to requested type result = pa.array(arr, type=pa.int32()) assert result == pa.array([1, 2, 3], type=pa.int32()) diff --git a/python/pyarrow/tests/test_cffi.py b/python/pyarrow/tests/test_cffi.py index f8b2ea15d31ad..5bf41c3c14b6e 100644 --- a/python/pyarrow/tests/test_cffi.py +++ b/python/pyarrow/tests/test_cffi.py @@ -692,8 +692,16 @@ def test_roundtrip_chunked_array_capsule_requested_schema(): imported_chunked = pa.ChunkedArray._import_from_c_capsule(capsule) assert imported_chunked == chunked - # Casting to something else should error + # Casting to something else should error if not possible requested_type = pa.binary() requested_capsule = requested_type.__arrow_c_schema__() - with pytest.raises(NotImplementedError): + capsule = chunked.__arrow_c_stream__(requested_capsule) + imported_chunked = pa.ChunkedArray._import_from_c_capsule(capsule) + assert imported_chunked == chunked.cast(pa.binary()) + + requested_type = pa.int64() + requested_capsule = requested_type.__arrow_c_schema__() + with pytest.raises( + ValueError, match="Could not cast string to requested type int64" + ): chunked.__arrow_c_stream__(requested_capsule) diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index 31d34058b61ef..539da0e685381 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -493,6 +493,47 @@ def test_recordbatch_dunder_init(): pa.RecordBatch() +def test_chunked_array_c_array_interface(): + class ArrayWrapper: + def __init__(self, array): + self.array = array + + def __arrow_c_array__(self, requested_schema=None): + return self.array.__arrow_c_array__(requested_schema) + + data = pa.array([1, 2, 3], pa.int64()) + chunked = pa.chunked_array([data]) + wrapper = ArrayWrapper(data) + + # Can roundtrip through the wrapper. + result = pa.chunked_array(wrapper) + assert result == chunked + + # Can also import with a type that implementer can cast to. + result = pa.chunked_array(wrapper, type=pa.int16()) + assert result == chunked.cast(pa.int16()) + + +def test_chunked_array_c_stream_interface(): + class ChunkedArrayWrapper: + def __init__(self, chunked): + self.chunked = chunked + + def __arrow_c_stream__(self, requested_schema=None): + return self.chunked.__arrow_c_stream__(requested_schema) + + data = pa.chunked_array([[1, 2, 3], [4, None, 6]]) + wrapper = ChunkedArrayWrapper(data) + + # Can roundtrip through the wrapper. + result = pa.chunked_array(wrapper) + assert result == data + + # Can also import with a type that implementer can cast to. + result = pa.chunked_array(wrapper, type=pa.int16()) + assert result == data.cast(pa.int16()) + + def test_recordbatch_c_array_interface(): class BatchWrapper: def __init__(self, batch): diff --git a/python/pyarrow/tests/test_types.py b/python/pyarrow/tests/test_types.py index 21b3829803487..4f66a6f41672d 100644 --- a/python/pyarrow/tests/test_types.py +++ b/python/pyarrow/tests/test_types.py @@ -1335,3 +1335,25 @@ def __arrow_c_schema__(self): wrapped_schema = Wrapper(schema) assert pa.schema(wrapped_schema) == schema + + +def test_field_import_c_schema_interface(): + class Wrapper: + def __init__(self, field): + self.field = field + + def __arrow_c_schema__(self): + return self.field.__arrow_c_schema__() + + field = pa.field("field_name", pa.int32(), metadata={"key": "value"}) + wrapped_field = Wrapper(field) + + assert pa.field(wrapped_field) == field + + with pytest.raises(ValueError, match="cannot specify 'type'"): + pa.field(wrapped_field, type=pa.int64()) + + # override nullable or metadata + assert pa.field(wrapped_field, nullable=False).nullable is False + result = pa.field(wrapped_field, metadata={"other": "meta"}) + assert result.metadata == {b"other": b"meta"} diff --git a/python/pyarrow/types.pxi b/python/pyarrow/types.pxi index 6cbad8eeb653c..018099ae7e659 100644 --- a/python/pyarrow/types.pxi +++ b/python/pyarrow/types.pxi @@ -3462,7 +3462,7 @@ cdef DataType primitive_type(Type type): # Type factory functions -def field(name, type, bint nullable=True, metadata=None): +def field(name, type=None, nullable=None, metadata=None): """ Create a pyarrow.Field instance. @@ -3470,6 +3470,8 @@ def field(name, type, bint nullable=True, metadata=None): ---------- name : str or bytes Name of the field. + Alternatively, you can also pass an object that implements the Arrow + PyCapsule Protocol for schemas (has an ``__arrow_c_schema__`` method). type : pyarrow.DataType Arrow datatype of the field. nullable : bool, default True @@ -3504,11 +3506,25 @@ def field(name, type, bint nullable=True, metadata=None): >>> pa.struct([field]) StructType(struct) """ + if hasattr(name, "__arrow_c_schema__"): + if type is not None: + raise ValueError( + "cannot specify 'type' when creating a Field from an ArrowSchema" + ) + field = Field._import_from_c_capsule(name.__arrow_c_schema__()) + if metadata is not None: + field = field.with_metadata(metadata) + if nullable is not None: + field = field.with_nullable(nullable) + return field + cdef: Field result = Field.__new__(Field) DataType _type = ensure_type(type, allow_none=False) shared_ptr[const CKeyValueMetadata] c_meta + nullable = True if nullable is None else nullable + metadata = ensure_metadata(metadata, allow_none=True) c_meta = pyarrow_unwrap_metadata(metadata)