Skip to content

Commit

Permalink
GH-39979: [Python] Low-level bindings for exporting/importing the C D…
Browse files Browse the repository at this point in the history
…evice Interface (#39980)

### Rationale for this change

We have low-level methods `_import_from_c`/`_export_to_c` for the C Data Interface, we can add similar methods for the C Device data interface.

Expanding the Arrow PyCapsule protocol (i.e. a better public API for other libraries) is covered by #38325. Because of that, we might not want to keep those low-level methods long term (or at least we need to have the equivalents using capsules), but for testing it's useful to already add those.

### What changes are included in this PR?

Added methods to Array and RecordBatch classes. Currently import only works for CPU devices.

* GitHub Issue: #39979

Authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Antoine Pitrou <antoine@python.org>
  • Loading branch information
jorisvandenbossche authored Feb 28, 2024
1 parent 19e874f commit 99c5412
Show file tree
Hide file tree
Showing 7 changed files with 263 additions and 32 deletions.
8 changes: 8 additions & 0 deletions cpp/src/arrow/c/bridge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1967,6 +1967,14 @@ Result<std::shared_ptr<RecordBatch>> ImportRecordBatch(struct ArrowArray* array,
return ImportRecordBatch(array, *maybe_schema);
}

Result<std::shared_ptr<MemoryManager>> DefaultDeviceMapper(ArrowDeviceType device_type,
int64_t device_id) {
if (device_type != ARROW_DEVICE_CPU) {
return Status::NotImplemented("Only importing data on CPU is supported");
}
return default_cpu_memory_manager();
}

Result<std::shared_ptr<Array>> ImportDeviceArray(struct ArrowDeviceArray* array,
std::shared_ptr<DataType> type,
const DeviceMemoryMapper& mapper) {
Expand Down
32 changes: 20 additions & 12 deletions cpp/src/arrow/c/bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ Status ExportDeviceRecordBatch(const RecordBatch& batch,
using DeviceMemoryMapper =
std::function<Result<std::shared_ptr<MemoryManager>>(ArrowDeviceType, int64_t)>;

ARROW_EXPORT
Result<std::shared_ptr<MemoryManager>> DefaultDeviceMapper(ArrowDeviceType device_type,
int64_t device_id);

/// \brief EXPERIMENTAL: Import C++ device array from the C data interface.
///
/// The ArrowArray struct has its contents moved (as per the C data interface
Expand All @@ -226,12 +230,13 @@ using DeviceMemoryMapper =
///
/// \param[in,out] array C data interface struct holding the array data
/// \param[in] type type of the imported array
/// \param[in] mapper A function to map device + id to memory manager
/// \param[in] mapper A function to map device + id to memory manager. If not
/// specified, defaults to map "cpu" to the built-in default memory manager.
/// \return Imported array object
ARROW_EXPORT
Result<std::shared_ptr<Array>> ImportDeviceArray(struct ArrowDeviceArray* array,
std::shared_ptr<DataType> type,
const DeviceMemoryMapper& mapper);
Result<std::shared_ptr<Array>> ImportDeviceArray(
struct ArrowDeviceArray* array, std::shared_ptr<DataType> type,
const DeviceMemoryMapper& mapper = DefaultDeviceMapper);

/// \brief EXPERIMENTAL: Import C++ device array and its type from the C data interface.
///
Expand All @@ -242,12 +247,13 @@ Result<std::shared_ptr<Array>> ImportDeviceArray(struct ArrowDeviceArray* array,
///
/// \param[in,out] array C data interface struct holding the array data
/// \param[in,out] type C data interface struct holding the array type
/// \param[in] mapper A function to map device + id to memory manager
/// \param[in] mapper A function to map device + id to memory manager. If not
/// specified, defaults to map "cpu" to the built-in default memory manager.
/// \return Imported array object
ARROW_EXPORT
Result<std::shared_ptr<Array>> ImportDeviceArray(struct ArrowDeviceArray* array,
struct ArrowSchema* type,
const DeviceMemoryMapper& mapper);
Result<std::shared_ptr<Array>> ImportDeviceArray(
struct ArrowDeviceArray* array, struct ArrowSchema* type,
const DeviceMemoryMapper& mapper = DefaultDeviceMapper);

/// \brief EXPERIMENTAL: Import C++ record batch with buffers on a device from the C data
/// interface.
Expand All @@ -259,12 +265,13 @@ Result<std::shared_ptr<Array>> ImportDeviceArray(struct ArrowDeviceArray* array,
///
/// \param[in,out] array C data interface struct holding the record batch data
/// \param[in] schema schema of the imported record batch
/// \param[in] mapper A function to map device + id to memory manager
/// \param[in] mapper A function to map device + id to memory manager. If not
/// specified, defaults to map "cpu" to the built-in default memory manager.
/// \return Imported record batch object
ARROW_EXPORT
Result<std::shared_ptr<RecordBatch>> ImportDeviceRecordBatch(
struct ArrowDeviceArray* array, std::shared_ptr<Schema> schema,
const DeviceMemoryMapper& mapper);
const DeviceMemoryMapper& mapper = DefaultDeviceMapper);

/// \brief EXPERIMENTAL: Import C++ record batch with buffers on a device and its schema
/// from the C data interface.
Expand All @@ -278,12 +285,13 @@ Result<std::shared_ptr<RecordBatch>> ImportDeviceRecordBatch(
///
/// \param[in,out] array C data interface struct holding the record batch data
/// \param[in,out] schema C data interface struct holding the record batch schema
/// \param[in] mapper A function to map device + id to memory manager
/// \param[in] mapper A function to map device + id to memory manager. If not
/// specified, defaults to map "cpu" to the built-in default memory manager.
/// \return Imported record batch object
ARROW_EXPORT
Result<std::shared_ptr<RecordBatch>> ImportDeviceRecordBatch(
struct ArrowDeviceArray* array, struct ArrowSchema* schema,
const DeviceMemoryMapper& mapper);
const DeviceMemoryMapper& mapper = DefaultDeviceMapper);

/// @}

Expand Down
64 changes: 64 additions & 0 deletions python/pyarrow/array.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -1778,6 +1778,70 @@ cdef class Array(_PandasConvertible):

return pyarrow_wrap_array(array)

def _export_to_c_device(self, out_ptr, out_schema_ptr=0):
"""
Export to a C ArrowDeviceArray struct, given its pointer.
If a C ArrowSchema struct pointer is also given, the array type
is exported to it at the same time.
Parameters
----------
out_ptr: int
The raw pointer to a C ArrowDeviceArray struct.
out_schema_ptr: int (optional)
The raw pointer to a C ArrowSchema struct.
Be careful: if you don't pass the ArrowDeviceArray struct to a consumer,
array memory will leak. This is a low-level function intended for
expert users.
"""
cdef:
void* c_ptr = _as_c_pointer(out_ptr)
void* c_schema_ptr = _as_c_pointer(out_schema_ptr,
allow_null=True)
with nogil:
check_status(ExportDeviceArray(
deref(self.sp_array), <shared_ptr[CSyncEvent]>NULL,
<ArrowDeviceArray*> c_ptr, <ArrowSchema*> c_schema_ptr))

@staticmethod
def _import_from_c_device(in_ptr, type):
"""
Import Array from a C ArrowDeviceArray struct, given its pointer
and the imported array type.
Parameters
----------
in_ptr: int
The raw pointer to a C ArrowDeviceArray struct.
type: DataType or int
Either a DataType object, or the raw pointer to a C ArrowSchema
struct.
This is a low-level function intended for expert users.
"""
cdef:
void* c_ptr = _as_c_pointer(in_ptr)
void* c_type_ptr
shared_ptr[CArray] c_array

c_type = pyarrow_unwrap_data_type(type)
if c_type == nullptr:
# Not a DataType object, perhaps a raw ArrowSchema pointer
c_type_ptr = _as_c_pointer(type)
with nogil:
c_array = GetResultValue(
ImportDeviceArray(<ArrowDeviceArray*> c_ptr,
<ArrowSchema*> c_type_ptr)
)
else:
with nogil:
c_array = GetResultValue(
ImportDeviceArray(<ArrowDeviceArray*> c_ptr, c_type)
)
return pyarrow_wrap_array(c_array)

def __dlpack__(self, stream=None):
"""Export a primitive array as a DLPack capsule.
Expand Down
10 changes: 10 additions & 0 deletions python/pyarrow/cffi.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@
// Opaque producer-specific data
void* private_data;
};
typedef int32_t ArrowDeviceType;
struct ArrowDeviceArray {
struct ArrowArray array;
int64_t device_id;
ArrowDeviceType device_type;
void* sync_event;
int64_t reserved[3];
};
"""

# TODO use out-of-line mode for faster import and avoid C parsing
Expand Down
23 changes: 23 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,12 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
CResult[unique_ptr[CResizableBuffer]] AllocateResizableBuffer(
const int64_t size, CMemoryPool* pool)

cdef cppclass CSyncEvent" arrow::Device::SyncEvent":
pass

cdef cppclass CDevice" arrow::Device":
pass

cdef CMemoryPool* c_default_memory_pool" arrow::default_memory_pool"()
cdef CMemoryPool* c_system_memory_pool" arrow::system_memory_pool"()
cdef CStatus c_jemalloc_memory_pool" arrow::jemalloc_memory_pool"(
Expand Down Expand Up @@ -2902,6 +2908,9 @@ cdef extern from "arrow/c/abi.h":
cdef struct ArrowArrayStream:
void (*release)(ArrowArrayStream*) noexcept nogil

cdef struct ArrowDeviceArray:
pass

cdef extern from "arrow/c/bridge.h" namespace "arrow" nogil:
CStatus ExportType(CDataType&, ArrowSchema* out)
CResult[shared_ptr[CDataType]] ImportType(ArrowSchema*)
Expand Down Expand Up @@ -2934,6 +2943,20 @@ cdef extern from "arrow/c/bridge.h" namespace "arrow" nogil:
CStatus ExportChunkedArray(shared_ptr[CChunkedArray], ArrowArrayStream*)
CResult[shared_ptr[CChunkedArray]] ImportChunkedArray(ArrowArrayStream*)

CStatus ExportDeviceArray(const CArray&, shared_ptr[CSyncEvent],
ArrowDeviceArray* out, ArrowSchema*)
CResult[shared_ptr[CArray]] ImportDeviceArray(
ArrowDeviceArray*, shared_ptr[CDataType])
CResult[shared_ptr[CArray]] ImportDeviceArray(
ArrowDeviceArray*, ArrowSchema*)

CStatus ExportDeviceRecordBatch(const CRecordBatch&, shared_ptr[CSyncEvent],
ArrowDeviceArray* out, ArrowSchema*)
CResult[shared_ptr[CRecordBatch]] ImportDeviceRecordBatch(
ArrowDeviceArray*, shared_ptr[CSchema])
CResult[shared_ptr[CRecordBatch]] ImportDeviceRecordBatch(
ArrowDeviceArray*, ArrowSchema*)


cdef extern from "arrow/util/byte_size.h" namespace "arrow::util" nogil:
CResult[int64_t] ReferencedBufferSize(const CArray& array_data)
Expand Down
62 changes: 62 additions & 0 deletions python/pyarrow/table.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -3145,6 +3145,68 @@ cdef class RecordBatch(_Tabular):

return pyarrow_wrap_batch(c_batch)

def _export_to_c_device(self, out_ptr, out_schema_ptr=0):
"""
Export to a C ArrowDeviceArray struct, given its pointer.
If a C ArrowSchema struct pointer is also given, the record batch
schema is exported to it at the same time.
Parameters
----------
out_ptr: int
The raw pointer to a C ArrowDeviceArray struct.
out_schema_ptr: int (optional)
The raw pointer to a C ArrowSchema struct.
Be careful: if you don't pass the ArrowDeviceArray struct to a consumer,
array memory will leak. This is a low-level function intended for
expert users.
"""
cdef:
void* c_ptr = _as_c_pointer(out_ptr)
void* c_schema_ptr = _as_c_pointer(out_schema_ptr,
allow_null=True)
with nogil:
check_status(ExportDeviceRecordBatch(
deref(self.sp_batch), <shared_ptr[CSyncEvent]>NULL,
<ArrowDeviceArray*> c_ptr, <ArrowSchema*> c_schema_ptr)
)

@staticmethod
def _import_from_c_device(in_ptr, schema):
"""
Import RecordBatch from a C ArrowDeviceArray struct, given its pointer
and the imported schema.
Parameters
----------
in_ptr: int
The raw pointer to a C ArrowDeviceArray struct.
type: Schema or int
Either a Schema object, or the raw pointer to a C ArrowSchema
struct.
This is a low-level function intended for expert users.
"""
cdef:
void* c_ptr = _as_c_pointer(in_ptr)
void* c_schema_ptr
shared_ptr[CRecordBatch] c_batch

c_schema = pyarrow_unwrap_schema(schema)
if c_schema == nullptr:
# Not a Schema object, perhaps a raw ArrowSchema pointer
c_schema_ptr = _as_c_pointer(schema, allow_null=True)
with nogil:
c_batch = GetResultValue(ImportDeviceRecordBatch(
<ArrowDeviceArray*> c_ptr, <ArrowSchema*> c_schema_ptr))
else:
with nogil:
c_batch = GetResultValue(ImportDeviceRecordBatch(
<ArrowDeviceArray*> c_ptr, c_schema))
return pyarrow_wrap_batch(c_batch)


def _reconstruct_record_batch(columns, schema):
"""
Expand Down
Loading

0 comments on commit 99c5412

Please sign in to comment.