Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python/adbc_driver_manager): export handles and ingest data through python Arrow PyCapsule interface #1346

Merged
merged 10 commits into from
Dec 13, 2023
9 changes: 7 additions & 2 deletions python/adbc_driver_manager/adbc_driver_manager/_lib.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,15 @@ from libc.stdint cimport int32_t, int64_t, uint8_t, uint32_t

cdef extern from "adbc.h" nogil:
# C ABI

ctypedef void (*CArrowSchemaRelease)(void*)
ctypedef void (*CArrowArrayRelease)(void*)

cdef struct CArrowSchema"ArrowSchema":
pass
CArrowSchemaRelease release

cdef struct CArrowArray"ArrowArray":
pass
CArrowArrayRelease release

ctypedef int (*CArrowArrayStreamGetLastError)(void*)
ctypedef int (*CArrowArrayStreamGetNext)(void*, CArrowArray*)
Expand Down
84 changes: 82 additions & 2 deletions python/adbc_driver_manager/adbc_driver_manager/_lib.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ import threading
import typing
from typing import List, Tuple

cimport cpython
import cython
from cpython.bytes cimport PyBytes_FromStringAndSize
from cpython.pycapsule cimport PyCapsule_GetPointer, PyCapsule_New
from libc.stdint cimport int32_t, int64_t, uint8_t, uint32_t, uintptr_t
from libc.string cimport memset
from libc.stdlib cimport malloc, free
from libc.string cimport memcpy, memset
from libcpp.vector cimport vector as c_vector

if typing.TYPE_CHECKING:
Expand Down Expand Up @@ -304,9 +307,38 @@ cdef class _AdbcHandle:
f"with open {self._child_type}")


cdef void pycapsule_schema_deleter(object capsule) noexcept:
cdef CArrowSchema* allocated = <CArrowSchema*>PyCapsule_GetPointer(
capsule, "arrow_schema"
)
if allocated.release != NULL:
allocated.release(allocated)
free(allocated)


cdef void pycapsule_array_deleter(object capsule) noexcept:
cdef CArrowArray* allocated = <CArrowArray*> PyCapsule_GetPointer(
capsule, "arrow_array"
)
if allocated.release != NULL:
allocated.release(allocated)
free(allocated)


cdef void pycapsule_stream_deleter(object capsule) noexcept:
cdef CArrowArrayStream* allocated = <CArrowArrayStream*> PyCapsule_GetPointer(
capsule, "arrow_array_stream"
)
if allocated.release != NULL:
allocated.release(allocated)
free(allocated)


cdef class ArrowSchemaHandle:
"""
A wrapper for an allocated ArrowSchema.

This object implements the Arrow PyCapsule interface.
"""
cdef:
CArrowSchema schema
Expand All @@ -316,23 +348,56 @@ cdef class ArrowSchemaHandle:
"""The address of the ArrowSchema."""
return <uintptr_t> &self.schema

def __arrow_c_schema__(self) -> object:
"""Consume this object to get a PyCapsule."""
# Reference:
# https://arrow.apache.org/docs/dev/format/CDataInterface/PyCapsuleInterface.html#create-a-pycapsule
cdef CArrowSchema* allocated = <CArrowSchema*> malloc(sizeof(CArrowSchema))
allocated.release = NULL
capsule = PyCapsule_New(
<void*>allocated, "arrow_schema", &pycapsule_schema_deleter,
)
memcpy(allocated, &self.schema, sizeof(CArrowSchema))
self.schema.release = NULL
return capsule
Comment on lines +353 to +355
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are "moving" the schema here, while in nanoarrow I opted for a hard copy for the schema (using nanoarrow's ArrowSchemaDeepCopy).

But I think the only advantage of a hard copy is that this means you can consume it multiple times? (or in the case of nanoarrow-python, that the nanoarrow Schema object is still valid and inspectable after it has been converted to eg a pyarrow.Schema)
For ADBC, I think the use case will be much more "receive handle and convert it directly once", given that the Handle object itself isn't useful at all (in contrast to nanoarrow.Schema), so moving here is probably fine?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think moving makes sense here.



cdef class ArrowArrayHandle:
"""
A wrapper for an allocated ArrowArray.

This object implements the Arrow PyCapsule interface.
"""
cdef:
CArrowArray array

@property
def address(self) -> int:
"""The address of the ArrowArray."""
"""
The address of the ArrowArray.
"""
return <uintptr_t> &self.array

def __arrow_c_array__(self, requested_schema=None) -> object:
"""Consume this object to get a PyCapsule."""
if requested_schema is not None:
raise NotImplementedError("requested_schema")

cdef CArrowArray* allocated = <CArrowArray*> malloc(sizeof(CArrowArray))
allocated.release = NULL
capsule = PyCapsule_New(
<void*>allocated, "arrow_array", pycapsule_array_deleter,
)
memcpy(allocated, &self.array, sizeof(CArrowArray))
self.array.release = NULL
return capsule
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually not being used at the moment, because I think none of the ADBC APIs are returning an ArrowArray (only ArrowSchema or ArrowArrayStream).
This handle is currently only used internally for ingesting data (bind).

So I could also remove __arrow_c_array__, given it is unused. If we require pyarrow >= 14, I could probably also remove this class entirely, because then we can use the capsule interface for ingesting data.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And realizing now, this implementation is actually also wrong -> it needs to return two capsules, one for the ArrowArray but also one for the ArrowSchema. And this handle only has the array. So I don't think we can add this dunder here.



cdef class ArrowArrayStreamHandle:
"""
A wrapper for an allocated ArrowArrayStream.

This object implements the Arrow PyCapsule interface.
"""
cdef:
CArrowArrayStream stream
Expand All @@ -342,6 +407,21 @@ cdef class ArrowArrayStreamHandle:
"""The address of the ArrowArrayStream."""
return <uintptr_t> &self.stream

def __arrow_c_stream__(self, requested_schema=None) -> object:
"""Consume this object to get a PyCapsule."""
if requested_schema is not None:
raise NotImplementedError("requested_schema")

cdef CArrowArrayStream* allocated = \
<CArrowArrayStream*> malloc(sizeof(CArrowArrayStream))
allocated.release = NULL
capsule = PyCapsule_New(
<void*>allocated, "arrow_array_stream", &pycapsule_stream_deleter,
)
memcpy(allocated, &self.stream, sizeof(CArrowArrayStream))
self.stream.release = NULL
return capsule


class GetObjectsDepth(enum.IntEnum):
ALL = ADBC_OBJECT_DEPTH_ALL
Expand Down
1 change: 0 additions & 1 deletion python/adbc_driver_manager/adbc_driver_manager/dbapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,6 @@ def execute(self, operation: Union[bytes, str], parameters=None) -> None:
self._prepare_execute(operation, parameters)
handle, self._rowcount = self._stmt.execute_query()
self._results = _RowIterator(
# pyarrow.RecordBatchReader._import_from_c(handle.address)
_reader.AdbcRecordBatchReader._import_from_c(handle.address)
)

Expand Down
4 changes: 2 additions & 2 deletions python/adbc_driver_manager/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ requires-python = ">=3.9"
dynamic = ["version"]

[project.optional-dependencies]
dbapi = ["pandas", "pyarrow>=8.0.0"]
test = ["duckdb", "pandas", "pyarrow>=8.0.0", "pytest"]
dbapi = ["pandas", "pyarrow>=14.0.1"]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we OK with bumping this requirement? (I don't know who are already users of the python adbc packages that might be affected)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured we should bump it just because our official guidance is to upgrade.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a huge deal now but if usage grows over time this could be a pain point for pandas.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove it. I'm just not sure if we can express that you need the fix package if you're < 14.0.1 in the requirements

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted this change to update the minimum requirement. This PR doesn't strictly speaking need it, so let's keep the discussion to bump the minimum requirement separate.

test = ["duckdb", "pandas", "pyarrow>=14.0.1", "pytest"]

[project.urls]
homepage = "https://arrow.apache.org/adbc/"
Expand Down
54 changes: 54 additions & 0 deletions python/adbc_driver_manager/tests/test_lowlevel.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,3 +390,57 @@ def test_child_tracking(sqlite):
RuntimeError, match="Cannot close AdbcDatabase with open AdbcConnection"
):
db.close()


@pytest.mark.sqlite
def test_pycapsule(sqlite):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I further expanded the specific test that David started, but in addition (at least if requiring pyarrow>=14 for testing) I could also update the other tests above to do the import/export using capsules instead of the current calls to _import_from_c), that then will also automatically give some more test coverage.

_, conn = sqlite
handle = conn.get_table_types()
with pyarrow.RecordBatchReader._import_from_c_capsule(handle.__arrow_c_stream__()) as reader:
reader.read_all()

# set up some data
data = pyarrow.record_batch(
[
[1, 2, 3, 4],
["a", "b", "c", "d"],
],
names=["ints", "strs"],
)
with adbc_driver_manager.AdbcStatement(conn) as stmt:
stmt.set_options(**{adbc_driver_manager.INGEST_OPTION_TARGET_TABLE: "foo"})
_bind(stmt, data)
stmt.execute_update()

# importing a schema

handle = conn.get_table_schema(catalog=None, db_schema=None, table_name="foo")
assert data.schema == pyarrow.schema(handle)
# ensure consumed schema was marked as such
with pytest.raises(ValueError, match="Cannot import released ArrowSchema"):
pyarrow.schema(handle)

# smoke test for the capsule calling release
capsule = conn.get_table_schema(catalog=None, db_schema=None, table_name="foo").__arrow_c_schema__()
del capsule

# importing a stream

with adbc_driver_manager.AdbcStatement(conn) as stmt:
stmt.set_sql_query("SELECT * FROM foo")
handle, _ = stmt.execute_query()

result = pyarrow.table(handle)
assert result.to_batches()[0] == data

# ensure consumed schema was marked as such
with pytest.raises(ValueError, match="Cannot import released ArrowArrayStream"):
pyarrow.table(handle)

# smoke test for the capsule calling release
with adbc_driver_manager.AdbcStatement(conn) as stmt:
stmt.set_sql_query("SELECT * FROM foo")
capsule = stmt.execute_query()[0].__arrow_c_stream__()
del capsule

# TODO: also need to import from things supporting protocol
Loading