Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: support the Arrow PyCapsule Interface on pandas.DataFrame (export) #56587

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pandas/compat/_optional.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,8 @@ def import_optional_dependency(
The imported module, when found and the version is correct.
None is returned when the package is not found and `errors`
is False, or when the package's version is too old and `errors`
is ``'warn'``.
is ``'warn'`` or ``'ignore'``.
"""

assert errors in {"warn", "raise", "ignore"}

package_name = INSTALL_MAPPING.get(name)
Expand Down Expand Up @@ -190,5 +189,7 @@ def import_optional_dependency(
return None
elif errors == "raise":
raise ImportError(msg)
else:
return None

return module
27 changes: 27 additions & 0 deletions pandas/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,33 @@ def __dataframe_consortium_standard__(
)
return convert_to_standard_compliant_dataframe(self, api_version=api_version)

def __arrow_c_stream__(self, requested_schema=None):
"""
Export the pandas DataFrame as an Arrow C stream PyCapsule.

This relies on pyarrow to convert the pandas DataFrame to the Arrow
format (and follows the default behaviour of ``pyarrow.Table.from_pandas``
in its handling of the index, i.e. store the index as a column except
for RangeIndex).
This conversion is not necessarily zero-copy.

Parameters
----------
requested_schema : PyCapsule, default None
The schema to which the dataframe should be casted, passed as a
PyCapsule containing a C ArrowSchema representation of the
requested schema.

Returns
-------
PyCapsule
"""
pa = import_optional_dependency("pyarrow", min_version="14.0.0")
if requested_schema is not None:
requested_schema = pa.Schema._import_from_c_capsule(requested_schema)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Will _import_from_c_capsule become public in the future?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't you use pa.schema() directly instead of pa.Schema._import_from_c_capsule?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it will necessarily become public in the current form (but the _import_from_c version has been used in many other external projects, so we won't just change those methods in pyarrow)

Can't you use pa.schema() directly instead of pa.Schema._import_from_c_capsule?

Not directly, because we get a capsule here (we are inside the low-level dunder here), and pa.schema() doesn't accept capsules, only objects implementing __arrow_c_schema__. Of course we could have a small wrapper object that has the dunder method and returns the capsule, if we want to avoid using the _import_from_c_capsule.

I brought this up in the past on the pyarrow side whether we need an "official" way to import capsules, in the last paragraph in apache/arrow#38010, but we should maybe discuss that a bit more (or whether we just "bless" the _import_from_c_capsule as the official way to do this)

table = pa.Table.from_pandas(self, schema=requested_schema)
return table.__arrow_c_stream__()

# ----------------------------------------------------------------------

@property
Expand Down
45 changes: 45 additions & 0 deletions pandas/tests/frame/test_arrow_interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import ctypes

import pytest

import pandas.util._test_decorators as td

import pandas as pd

pa = pytest.importorskip("pyarrow")


@td.skip_if_no("pyarrow", min_version="14.0")
def test_dataframe_arrow_interface():
df = pd.DataFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]})

capsule = df.__arrow_c_stream__()
assert (
Copy link
Member

@WillAyd WillAyd Jan 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use ctypes to test this a little more deeply? I had in mind something like this:

import pyarrow as pa
import ctypes

tbl = pa.Table.from_pydict({"col": [1, 2, 3]})
stream = tbl.__arrow_c_stream__()

class ArrowSchema(ctypes.Structure):
    pass

ArrowSchema._fields_ = [
    ("format", ctypes.POINTER(ctypes.c_char)),
    ("name", ctypes.POINTER(ctypes.c_char)),
    ("metadata", ctypes.POINTER(ctypes.c_char)),
    ("flags", ctypes.c_int, 8),
    ("n_children", ctypes.c_int, 8),
    ("children", ctypes.POINTER(ctypes.POINTER(ArrowSchema))),
    ("dictionary", ctypes.POINTER(ArrowSchema)),
    # NB there are more members
    # not sure how to define release callback, but probably not important
]

ctypes.pythonapi.PyCapsule_GetName.restype = ctypes.c_char_p
ctypes.pythonapi.PyCapsule_GetName.argtypes = [ctypes.py_object]
nm = ctypes.pythonapi.PyCapsule_GetName(stream)
#assert nm == b"array_schema"  # TODO: this actually returns arrow_array_stream

capsule_name = ctypes.create_string_buffer("arrow_array_stream".encode())
ctypes.pythonapi.PyCapsule_GetPointer.restype = ctypes.c_void_p
ctypes.pythonapi.PyCapsule_GetPointer.argtypes = [ctypes.py_object, ctypes.c_char_p]

# TODO: not sure why the below isn't working?
#void_ptr = ctypes.pythonapi.PyCapsule_GetPointer(
#    stream,
#    capsule_name
#)
#obj = ctypes.cast(void_ptr, ctypes.POINTER(ArrowSchema))[0]
#assert obj.n_children = 1

I commented out things that weren't working. I'm a little less sure of the last section what is going on, but at the very least there is a problem with the capsule name as it returns b"arrow_array_stream" yet the documentation says it should be "arrow_schema"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignore what I said before - I mistakenly didn't realize this was returning a stream. This all looks good to me - I think the ctypes would get a little too wonky to deal with. Here's something I stubbed out but I'm not sure how ctypes would sanely deal with struct members that are function points. Probably too much detail for us to get into on our end

import pyarrow as pa
import ctypes

tbl = pa.Table.from_pydict({"col": [1, 2, 3]})
stream = tbl.__arrow_c_stream__()

class ArrowSchema(ctypes.Structure):
    pass

class ArrowArray(ctypes.Structure):
    pass

class ArrowArrayStream(ctypes.Structure):
    pass


schema_release_func = ctypes.CFUNCTYPE(None, ctypes.POINTER(ArrowSchema))
ArrowSchema._fields_ = [
    ("format", ctypes.POINTER(ctypes.c_char)),
    ("name", ctypes.POINTER(ctypes.c_char)),
    ("metadata", ctypes.POINTER(ctypes.c_char)),
    ("flags", ctypes.c_int, 8),
    ("n_children", ctypes.c_int, 8),
    ("children", ctypes.POINTER(ctypes.POINTER(ArrowSchema))),
    ("dictionary", ctypes.POINTER(ArrowSchema)),
    ("release", schema_release_func),
]

array_release_func = ctypes.CFUNCTYPE(None, ctypes.POINTER(ArrowArray))
ArrowArray._fields_ = [
    ("length", ctypes.c_int, 8),
    ("null_count", ctypes.c_int, 8),
    ("offset", ctypes.c_int, 8),
    ("n_buffers", ctypes.c_int, 8),
    ("n_children", ctypes.c_int, 8),
    ("buffers", ctypes.POINTER(ctypes.c_void_p)),
    ("children", ctypes.POINTER(ctypes.POINTER(ArrowArray))),
    ("dictionary", ctypes.POINTER(ctypes.POINTER(ArrowArray))),
    ("release", array_release_func),
]

get_schema_func = ctypes.CFUNCTYPE(int, ctypes.POINTER(ArrowArrayStream), ctypes.POINTER(ArrowSchema))
get_next_func = ctypes.CFUNCTYPE(int, ctypes.POINTER(ArrowArrayStream), ctypes.POINTER(ArrowArray))
get_last_error_func = ctypes.CFUNCTYPE(bytes, ctypes.POINTER(ArrowArrayStream))
stream_release_func = ctypes.CFUNCTYPE(None, ctypes.POINTER(ArrowArrayStream))
ArrowArrayStream._fields_ = [
    ("get_schema", get_schema_func),
    ("get_next", get_next_func),
    ("get_last_error", get_last_error_func),
    ("release", stream_release_func),
]


ctypes.pythonapi.PyCapsule_GetName.restype = ctypes.c_char_p
ctypes.pythonapi.PyCapsule_GetName.argtypes = [ctypes.py_object]
nm = ctypes.pythonapi.PyCapsule_GetName(stream)
assert nm == "arrow_array_stream"

capsule_name = ctypes.create_string_buffer("arrow_array_stream".encode())
ctypes.pythonapi.PyCapsule_GetPointer.restype = ctypes.c_void_p
ctypes.pythonapi.PyCapsule_GetPointer.argtypes = [ctypes.py_object, ctypes.c_char_p]

void_ptr = ctypes.pythonapi.PyCapsule_GetPointer(
    stream,
    capsule_name
)
stream_obj = ctypes.cast(void_ptr, ctypes.POINTER(ArrowArrayStream))[0]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think that, because we use pyarrow here, such detailed testing isn't necessary here. We can assume that the struct's content is thoroughly tested on the Arrow side, and we mostly need to test we return the correct capsule (and there is already a test that checks the capsule name with ctypes.pythonapi.PyCapsule_IsValid).

If at some point we would implement our own version of the C Data Interface, then for sure it would need a lot more testing.

ctypes.pythonapi.PyCapsule_IsValid(
ctypes.py_object(capsule), b"arrow_array_stream"
)
== 1
)

table = pa.table(df)
expected = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]})
assert table.equals(expected)

schema = pa.schema([("a", pa.int8()), ("b", pa.string())])
table = pa.table(df, schema=schema)
expected = expected.cast(schema)
assert table.equals(expected)


@td.skip_if_no("pyarrow", min_version="15.0")
def test_dataframe_to_arrow():
df = pd.DataFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]})

table = pa.RecordBatchReader.from_stream(df)
expected = pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]})
assert table.equals(expected)

schema = pa.schema([("a", pa.int8()), ("b", pa.string())])
table = pa.RecordBatchReader.from_stream(df, schema=schema)
expected = expected.cast(schema)
assert table.equals(expected)
14 changes: 14 additions & 0 deletions pandas/tests/test_optional_dependency.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,20 @@ def test_bad_version(monkeypatch):
result = import_optional_dependency("fakemodule")
assert result is module

with pytest.raises(ImportError, match="Pandas requires version '1.1.0'"):
import_optional_dependency("fakemodule", min_version="1.1.0")

with tm.assert_produces_warning(UserWarning):
result = import_optional_dependency(
"fakemodule", errors="warn", min_version="1.1.0"
)
assert result is None

result = import_optional_dependency(
"fakemodule", errors="ignore", min_version="1.1.0"
)
assert result is None


def test_submodule(monkeypatch):
# Create a fake module with a submodule
Expand Down