Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support StringViewArray interop with python: fix lingering C Data Interface issues for *ViewArray #6368

Merged
merged 8 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ on:
- arrow/**

jobs:

integration:
name: Archery test With other arrows
runs-on: ubuntu-latest
Expand Down Expand Up @@ -118,9 +117,9 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
rust: [ stable ]
# PyArrow 13 was the last version prior to introduction to Arrow PyCapsules
pyarrow: [ "13", "14" ]
rust: [stable]
# PyArrow 15 was the first version to introduce StringView/BinaryView support
pyarrow: ["15", "16", "17"]
steps:
- uses: actions/checkout@v4
with:
Expand Down
168 changes: 154 additions & 14 deletions arrow-array/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,13 @@ fn bit_width(data_type: &DataType, i: usize) -> Result<usize> {
"The datatype \"{data_type:?}\" expects 3 buffers, but requested {i}. Please verify that the C data interface is correctly implemented."
)))
}
// Variable-sized views: have 3 or more buffers.
// Buffer 1 are the u128 views
// Buffers 2...N-1 are u8 byte buffers
(DataType::Utf8View, 1) | (DataType::BinaryView,1) => u128::BITS as _,
(DataType::Utf8View, _) | (DataType::BinaryView, _) => {
u8::BITS as _
}
// type ids. UnionArray doesn't have null bitmap so buffer index begins with 0.
(DataType::Union(_, _), 0) => i8::BITS as _,
// Only DenseUnion has 2nd buffer
Expand Down Expand Up @@ -300,7 +307,7 @@ impl<'a> ImportedArrowArray<'a> {
};

let data_layout = layout(&self.data_type);
let buffers = self.buffers(data_layout.can_contain_null_mask)?;
let buffers = self.buffers(data_layout.can_contain_null_mask, data_layout.variadic)?;

let null_bit_buffer = if data_layout.can_contain_null_mask {
self.null_bit_buffer()
Expand Down Expand Up @@ -373,13 +380,30 @@ impl<'a> ImportedArrowArray<'a> {

/// returns all buffers, as organized by Rust (i.e. null buffer is skipped if it's present
/// in the spec of the type)
fn buffers(&self, can_contain_null_mask: bool) -> Result<Vec<Buffer>> {
fn buffers(&self, can_contain_null_mask: bool, variadic: bool) -> Result<Vec<Buffer>> {
a10y marked this conversation as resolved.
Show resolved Hide resolved
// + 1: skip null buffer
let buffer_begin = can_contain_null_mask as usize;
(buffer_begin..self.array.num_buffers())
.map(|index| {
let len = self.buffer_len(index, &self.data_type)?;
let buffer_end = self.array.num_buffers() - usize::from(variadic);

let variadic_buffer_lens = if variadic {
// Each views array has 1 (optional) null buffer, 1 views buffer, 1 lengths buffer.
// Rest are variadic.
let num_variadic_buffers =
self.array.num_buffers() - (2 + usize::from(can_contain_null_mask));
if num_variadic_buffers == 0 {
&[]
} else {
let lengths = self.array.buffer(self.array.num_buffers() - 1);
// SAFETY: is lengths is non-null, then it must be valid for up to num_variadic_buffers.
unsafe { std::slice::from_raw_parts(lengths.cast::<i64>(), num_variadic_buffers) }
}
} else {
&[]
};

(buffer_begin..buffer_end)
.map(|index| {
let len = self.buffer_len(index, variadic_buffer_lens, &self.data_type)?;
match unsafe { create_buffer(self.owner.clone(), self.array, index, len) } {
Some(buf) => Ok(buf),
None if len == 0 => {
Expand All @@ -399,7 +423,12 @@ impl<'a> ImportedArrowArray<'a> {
/// Rust implementation uses fixed-sized buffers, which require knowledge of their `len`.
/// for variable-sized buffers, such as the second buffer of a stringArray, we need
/// to fetch offset buffer's len to build the second buffer.
fn buffer_len(&self, i: usize, dt: &DataType) -> Result<usize> {
fn buffer_len(
&self,
i: usize,
variadic_buffer_lengths: &[i64],
dt: &DataType,
) -> Result<usize> {
// Special handling for dictionary type as we only care about the key type in the case.
let data_type = match dt {
DataType::Dictionary(key_data_type, _) => key_data_type.as_ref(),
Expand Down Expand Up @@ -430,7 +459,7 @@ impl<'a> ImportedArrowArray<'a> {
}

// the len of the data buffer (buffer 2) equals the last value of the offset buffer (buffer 1)
let len = self.buffer_len(1, dt)?;
let len = self.buffer_len(1, variadic_buffer_lengths, dt)?;
// first buffer is the null buffer => add(1)
// we assume that pointer is aligned for `i32`, as Utf8 uses `i32` offsets.
#[allow(clippy::cast_ptr_alignment)]
Expand All @@ -444,14 +473,24 @@ impl<'a> ImportedArrowArray<'a> {
}

// the len of the data buffer (buffer 2) equals the last value of the offset buffer (buffer 1)
let len = self.buffer_len(1, dt)?;
let len = self.buffer_len(1, variadic_buffer_lengths, dt)?;
// first buffer is the null buffer => add(1)
// we assume that pointer is aligned for `i64`, as Large uses `i64` offsets.
#[allow(clippy::cast_ptr_alignment)]
let offset_buffer = self.array.buffer(1) as *const i64;
// get last offset
(unsafe { *offset_buffer.add(len / size_of::<i64>() - 1) }) as usize
}
// View types: these have variadic buffers.
// Buffer 1 is the views buffer, which stores 1 u128 per length of the array.
// Buffers 2..N-1 are the buffers holding the byte data. Their lengths are variable.
// Buffer N is of length (N - 2) and stores i64 containing the lengths of buffers 2..N-1
(DataType::Utf8View, 1) | (DataType::BinaryView, 1) => {
std::mem::size_of::<u128>() * length
}
(DataType::Utf8View, i) | (DataType::BinaryView, i) => {
variadic_buffer_lengths[i - 2] as usize
}
// buffer len of primitive types
_ => {
let bits = bit_width(data_type, i)?;
Expand Down Expand Up @@ -1229,18 +1268,18 @@ mod tests_from_ffi {
use arrow_data::ArrayData;
use arrow_schema::{DataType, Field};

use crate::types::Int32Type;
use super::{ImportedArrowArray, Result};
use crate::builder::GenericByteViewBuilder;
use crate::types::{BinaryViewType, ByteViewType, Int32Type, StringViewType};
use crate::{
array::{
Array, BooleanArray, DictionaryArray, FixedSizeBinaryArray, FixedSizeListArray,
Int32Array, Int64Array, StringArray, StructArray, UInt32Array, UInt64Array,
},
ffi::{from_ffi, FFI_ArrowArray, FFI_ArrowSchema},
make_array, ArrayRef, ListArray,
make_array, ArrayRef, GenericByteViewArray, ListArray,
};

use super::{ImportedArrowArray, Result};

fn test_round_trip(expected: &ArrayData) -> Result<()> {
// here we export the array
let array = FFI_ArrowArray::new(expected);
Expand Down Expand Up @@ -1453,8 +1492,8 @@ mod tests_from_ffi {
owner: &array,
};

let offset_buf_len = imported_array.buffer_len(1, &imported_array.data_type)?;
let data_buf_len = imported_array.buffer_len(2, &imported_array.data_type)?;
let offset_buf_len = imported_array.buffer_len(1, &[], &imported_array.data_type)?;
let data_buf_len = imported_array.buffer_len(2, &[], &imported_array.data_type)?;

assert_eq!(offset_buf_len, 4);
assert_eq!(data_buf_len, 0);
Expand All @@ -1472,6 +1511,18 @@ mod tests_from_ffi {
StringArray::from(array)
}

fn roundtrip_byte_view_array<T: ByteViewType>(
array: GenericByteViewArray<T>,
) -> GenericByteViewArray<T> {
let data = array.into_data();

let array = FFI_ArrowArray::new(&data);
let schema = FFI_ArrowSchema::try_from(data.data_type()).unwrap();

let array = unsafe { from_ffi(array, &schema) }.unwrap();
GenericByteViewArray::<T>::from(array)
}

fn extend_array(array: &dyn Array) -> ArrayRef {
let len = array.len();
let data = array.to_data();
Expand Down Expand Up @@ -1551,4 +1602,93 @@ mod tests_from_ffi {
&imported
);
}

/// Helper trait to allow us to use easily strings as either BinaryViewType::Native or
/// StringViewType::Native scalars.
trait NativeFromStr {
fn from_str(value: &str) -> &Self;
}

impl NativeFromStr for str {
fn from_str(value: &str) -> &Self {
value
}
}

impl NativeFromStr for [u8] {
fn from_str(value: &str) -> &Self {
value.as_bytes()
}
}

#[test]
fn test_round_trip_byte_view() {
fn test_case<T>()
where
T: ByteViewType,
T::Native: NativeFromStr,
{
macro_rules! run_test_case {
($array:expr) => {{
// round-trip through C Data Interface
let len = $array.len();
let imported = roundtrip_byte_view_array($array);
assert_eq!(imported.len(), len);

let copied = extend_array(&imported);
assert_eq!(
copied
.as_any()
.downcast_ref::<GenericByteViewArray<T>>()
.unwrap(),
&imported
);
}};
}

// Empty test case.
let empty = GenericByteViewBuilder::<T>::new().finish();
run_test_case!(empty);

// All inlined strings test case.
let mut all_inlined = GenericByteViewBuilder::<T>::new();
all_inlined.append_value(T::Native::from_str("inlined1"));
all_inlined.append_value(T::Native::from_str("inlined2"));
all_inlined.append_value(T::Native::from_str("inlined3"));
let all_inlined = all_inlined.finish();
assert_eq!(all_inlined.data_buffers().len(), 0);
run_test_case!(all_inlined);

// some inlined + non-inlined, 1 variadic buffer.
let mixed_one_variadic = {
let mut builder = GenericByteViewBuilder::<T>::new();
builder.append_value(T::Native::from_str("inlined"));
let block_id =
builder.append_block(Buffer::from("non-inlined-string-buffer".as_bytes()));
builder.try_append_view(block_id, 0, 25).unwrap();
builder.finish()
};
assert_eq!(mixed_one_variadic.data_buffers().len(), 1);
run_test_case!(mixed_one_variadic);

// inlined + non-inlined, 2 variadic buffers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

let mixed_two_variadic = {
let mut builder = GenericByteViewBuilder::<T>::new();
builder.append_value(T::Native::from_str("inlined"));
let block_id =
builder.append_block(Buffer::from("non-inlined-string-buffer".as_bytes()));
builder.try_append_view(block_id, 0, 25).unwrap();

let block_id = builder
.append_block(Buffer::from("another-non-inlined-string-buffer".as_bytes()));
builder.try_append_view(block_id, 0, 33).unwrap();
builder.finish()
};
assert_eq!(mixed_two_variadic.data_buffers().len(), 2);
run_test_case!(mixed_two_variadic);
}

test_case::<StringViewType>();
test_case::<BinaryViewType>();
}
}
7 changes: 5 additions & 2 deletions arrow-buffer/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,9 @@ impl Buffer {
pub fn advance(&mut self, offset: usize) {
assert!(
offset <= self.length,
"the offset of the new Buffer cannot exceed the existing length"
"the offset of the new Buffer cannot exceed the existing length: offset={} length={}",
a10y marked this conversation as resolved.
Show resolved Hide resolved
offset,
self.length
);
self.length -= offset;
// Safety:
Expand All @@ -221,7 +223,8 @@ impl Buffer {
pub fn slice_with_length(&self, offset: usize, length: usize) -> Self {
assert!(
offset.saturating_add(length) <= self.length,
"the offset of the new Buffer cannot exceed the existing length"
"the offset of the new Buffer cannot exceed the existing length: slice offset={offset} length={length} selflen={}",
self.length
);
// Safety:
// offset + length <= self.length
Expand Down
20 changes: 16 additions & 4 deletions arrow-data/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use crate::bit_mask::set_bits;
use crate::{layout, ArrayData};
use arrow_buffer::buffer::NullBuffer;
use arrow_buffer::{Buffer, MutableBuffer};
use arrow_buffer::{Buffer, MutableBuffer, ScalarBuffer};
use arrow_schema::DataType;
use std::ffi::c_void;

Expand Down Expand Up @@ -121,7 +121,7 @@ impl FFI_ArrowArray {
pub fn new(data: &ArrayData) -> Self {
let data_layout = layout(data.data_type());

let buffers = if data_layout.can_contain_null_mask {
let mut buffers = if data_layout.can_contain_null_mask {
// * insert the null buffer at the start
// * make all others `Option<Buffer>`.
std::iter::once(align_nulls(data.offset(), data.nulls()))
Expand All @@ -132,7 +132,7 @@ impl FFI_ArrowArray {
};

// `n_buffers` is the number of buffers by the spec.
let n_buffers = {
let mut n_buffers = {
data_layout.buffers.len() + {
// If the layout has a null buffer by Arrow spec.
// Note that even the array doesn't have a null buffer because it has
Expand All @@ -141,10 +141,22 @@ impl FFI_ArrowArray {
}
} as i64;

if data_layout.variadic {
// Save the lengths of all variadic buffers into a new buffer.
// The first buffer is `views`, and the rest are variadic.
let mut data_buffers_lengths = Vec::new();
for buffer in data.buffers().iter().skip(1) {
data_buffers_lengths.push(buffer.len() as i64);
n_buffers += 1;
}

buffers.push(Some(ScalarBuffer::from(data_buffers_lengths).into_inner()));
n_buffers += 1;
}

let buffers_ptr = buffers
.iter()
.flat_map(|maybe_buffer| match maybe_buffer {
// note that `raw_data` takes into account the buffer's offset
Some(b) => Some(b.as_ptr() as *const c_void),
// This is for null buffer. We only put a null pointer for
// null buffer if by spec it can contain null mask.
Expand Down
2 changes: 1 addition & 1 deletion arrow/src/pyarrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ impl FromPyArrow for RecordBatch {
validate_pycapsule(array_capsule, "arrow_array")?;

let schema_ptr = unsafe { schema_capsule.reference::<FFI_ArrowSchema>() };
let ffi_array = unsafe { FFI_ArrowArray::from_raw(array_capsule.pointer() as _) };
let ffi_array = unsafe { FFI_ArrowArray::from_raw(array_capsule.pointer().cast()) };
let array_data = unsafe { ffi::from_ffi(ffi_array, schema_ptr) }.map_err(to_py_err)?;
if !matches!(array_data.data_type(), DataType::Struct(_)) {
return Err(PyTypeError::new_err(
Expand Down
Loading
Loading