Skip to content

Commit

Permalink
fix: Adjust FFI_ArrowArray offset based on the offset of offset buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Jun 15, 2024
1 parent 087f34b commit eb51190
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 4 deletions.
56 changes: 56 additions & 0 deletions arrow-array/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1225,6 +1225,7 @@ mod tests_from_ffi {
use std::sync::Arc;

use arrow_buffer::{bit_util, buffer::Buffer, MutableBuffer, OffsetBuffer};
use arrow_data::transform::MutableArrayData;
use arrow_data::ArrayData;
use arrow_schema::{DataType, Field};

Expand All @@ -1234,6 +1235,7 @@ mod tests_from_ffi {
Int32Array, Int64Array, StringArray, StructArray, UInt32Array, UInt64Array,
},
ffi::{from_ffi, FFI_ArrowArray, FFI_ArrowSchema},
make_array, ArrayRef,
};

use super::{ImportedArrowArray, Result};
Expand Down Expand Up @@ -1458,4 +1460,58 @@ mod tests_from_ffi {

test_round_trip(&imported_array.consume()?)
}

fn export_string(array: StringArray) -> StringArray {
let data = array.into_data();

let array = FFI_ArrowArray::new(&data);
let schema = FFI_ArrowSchema::try_from(data.data_type()).unwrap();

let array = unsafe { from_ffi(array, &schema) }.unwrap();
StringArray::from(array)
}

fn extend_array(array: &dyn Array) -> ArrayRef {
let len = array.len();
let data = array.to_data();

let mut mutable = MutableArrayData::new(vec![&data], false, len);
mutable.extend(0, 0, len);
make_array(mutable.freeze())
}

#[test]
fn test_extend_imported_string_slice() {
let mut strings = vec![];

for i in 0..1000 {
strings.push(format!("string: {}", i));
}

let string_array = StringArray::from(strings);

let imported = export_string(string_array.clone());
assert_eq!(imported.len(), 1000);
assert_eq!(imported.value(0), "string: 0");
assert_eq!(imported.value(499), "string: 499");

let copied = extend_array(&imported);
assert_eq!(
copied.as_any().downcast_ref::<StringArray>().unwrap(),
&imported
);

let slice = string_array.slice(500, 500);

let imported = export_string(slice);
assert_eq!(imported.len(), 500);
assert_eq!(imported.value(0), "string: 500");
assert_eq!(imported.value(499), "string: 999");

let copied = extend_array(&imported);
assert_eq!(
copied.as_any().downcast_ref::<StringArray>().unwrap(),
&imported
);
}
}
5 changes: 5 additions & 0 deletions arrow-buffer/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ impl Buffer {
}
}

/// Returns the internal byte buffer.
pub fn get_bytes(&self) -> Arc<Bytes> {
self.data.clone()
}

/// Create a [`Buffer`] from the provided [`Vec`] without copying
#[inline]
pub fn from_vec<T: ArrowNativeType>(vec: Vec<T>) -> Self {
Expand Down
63 changes: 59 additions & 4 deletions arrow-data/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,45 @@ impl FFI_ArrowArray {
data.buffers().iter().map(|b| Some(b.clone())).collect()
};

// Handle buffer offset for offset buffer.
let offset_offset = match data.data_type() {
DataType::Utf8 | DataType::Binary => {
// Offset buffer is possible a slice of the buffer.
// If we use slice pointer as exported buffer pointer, it will cause
// the consumer to calculate incorrect length of data buffer (buffer 1).
// We need to get the offset of the offset buffer and fill it in
// the `FFI_ArrowArray` offset field.
Some(unsafe {
let b = &data.buffers()[0];
b.as_ptr().offset_from(b.get_bytes().ptr().as_ptr()) as usize
/ std::mem::size_of::<i32>()
})
}
DataType::LargeUtf8 | DataType::LargeBinary => {
// Offset buffer is possible a slice of the buffer.
// If we use slice pointer as exported buffer pointer, it will cause
// the consumer to calculate incorrect length of data buffer (buffer 1).
// We need to get the offset of the offset buffer and fill it in
// the `FFI_ArrowArray` offset field.
Some(unsafe {
let b = &data.buffers()[0];
b.as_ptr().offset_from(b.get_bytes().ptr().as_ptr()) as usize
/ std::mem::size_of::<i64>()
})
}
_ => None,
};

let offset = if let Some(offset) = offset_offset {
if data.offset() != 0 {
// TODO: Adjust for data offset
panic!("The ArrayData of a slice offset buffer should not have offset");
}
offset
} else {
data.offset()
};

// `n_buffers` is the number of buffers by the spec.
let n_buffers = {
data_layout.buffers.len() + {
Expand All @@ -143,9 +182,25 @@ impl FFI_ArrowArray {

let buffers_ptr = buffers
.iter()
.flat_map(|maybe_buffer| match maybe_buffer {
// note that `raw_data` takes into account the buffer's offset
Some(b) => Some(b.as_ptr() as *const c_void),
.enumerate()
.flat_map(|(buffer_idx, maybe_buffer)| match maybe_buffer {
Some(b) => {
match (data.data_type(), buffer_idx) {
(
DataType::Utf8
| DataType::LargeUtf8
| DataType::Binary
| DataType::LargeBinary,
1,
) => {
// For offset buffer, take original pointer without offset.
// Buffer offset should be handled by `FFI_ArrowArray` offset field.
Some(b.get_bytes().ptr().as_ptr() as *const c_void)
}
// For other buffers, note that `raw_data` takes into account the buffer's offset
_ => Some(b.as_ptr() as *const c_void),
}
}
// This is for null buffer. We only put a null pointer for
// null buffer if by spec it can contain null mask.
None if data_layout.can_contain_null_mask => Some(std::ptr::null()),
Expand Down Expand Up @@ -186,7 +241,7 @@ impl FFI_ArrowArray {
Self {
length: data.len() as i64,
null_count: null_count as i64,
offset: data.offset() as i64,
offset: offset as i64,
n_buffers,
n_children,
buffers: private_data.buffers_ptr.as_mut_ptr(),
Expand Down

0 comments on commit eb51190

Please sign in to comment.