From 60c3e6cd61aa1a0181f7a60bef8ed240515f37de Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 15 Jun 2024 13:56:22 -0700 Subject: [PATCH] fix: Adjust FFI_ArrowArray offset based on the offset of offset buffer --- arrow-array/src/ffi.rs | 56 +++++++++++++++++++++++++ arrow-buffer/src/buffer/immutable.rs | 5 +++ arrow-data/src/ffi.rs | 63 ++++++++++++++++++++++++++-- 3 files changed, 120 insertions(+), 4 deletions(-) diff --git a/arrow-array/src/ffi.rs b/arrow-array/src/ffi.rs index 088a0a6ab58a..ebc5427b8951 100644 --- a/arrow-array/src/ffi.rs +++ b/arrow-array/src/ffi.rs @@ -1225,6 +1225,7 @@ mod tests_from_ffi { use std::sync::Arc; use arrow_buffer::{bit_util, buffer::Buffer, MutableBuffer, OffsetBuffer}; + use arrow_data::transform::MutableArrayData; use arrow_data::ArrayData; use arrow_schema::{DataType, Field}; @@ -1234,6 +1235,7 @@ mod tests_from_ffi { Int32Array, Int64Array, StringArray, StructArray, UInt32Array, UInt64Array, }, ffi::{from_ffi, FFI_ArrowArray, FFI_ArrowSchema}, + make_array, ArrayRef, }; use super::{ImportedArrowArray, Result}; @@ -1458,4 +1460,58 @@ mod tests_from_ffi { test_round_trip(&imported_array.consume()?) } + + fn export_string(array: StringArray) -> StringArray { + let data = array.into_data(); + + let array = FFI_ArrowArray::new(&data); + let schema = FFI_ArrowSchema::try_from(data.data_type()).unwrap(); + + let array = unsafe { from_ffi(array, &schema) }.unwrap(); + StringArray::from(array) + } + + fn extend_array(array: &dyn Array) -> ArrayRef { + let len = array.len(); + let data = array.to_data(); + + let mut mutable = MutableArrayData::new(vec![&data], false, len); + mutable.extend(0, 0, len); + make_array(mutable.freeze()) + } + + #[test] + fn test_extend_imported_string_slice() { + let mut strings = vec![]; + + for i in 0..1000 { + strings.push(format!("string: {}", i)); + } + + let string_array = StringArray::from(strings); + + let imported = export_string(string_array.clone()); + assert_eq!(imported.len(), 1000); + assert_eq!(imported.value(0), "string: 0"); + assert_eq!(imported.value(499), "string: 499"); + + let copied = extend_array(&imported); + assert_eq!( + copied.as_any().downcast_ref::().unwrap(), + &imported + ); + + let slice = string_array.slice(500, 500); + + let imported = export_string(slice); + assert_eq!(imported.len(), 500); + assert_eq!(imported.value(0), "string: 500"); + assert_eq!(imported.value(499), "string: 999"); + + let copied = extend_array(&imported); + assert_eq!( + copied.as_any().downcast_ref::().unwrap(), + &imported + ); + } } diff --git a/arrow-buffer/src/buffer/immutable.rs b/arrow-buffer/src/buffer/immutable.rs index f26cde05b7ab..8f2fe226e107 100644 --- a/arrow-buffer/src/buffer/immutable.rs +++ b/arrow-buffer/src/buffer/immutable.rs @@ -71,6 +71,11 @@ impl Buffer { } } + /// Returns the internal byte buffer. + pub fn get_bytes(&self) -> Arc { + self.data.clone() + } + /// Create a [`Buffer`] from the provided [`Vec`] without copying #[inline] pub fn from_vec(vec: Vec) -> Self { diff --git a/arrow-data/src/ffi.rs b/arrow-data/src/ffi.rs index 589f7dac6d19..88a76adfe75c 100644 --- a/arrow-data/src/ffi.rs +++ b/arrow-data/src/ffi.rs @@ -131,6 +131,45 @@ impl FFI_ArrowArray { data.buffers().iter().map(|b| Some(b.clone())).collect() }; + // Handle buffer offset for offset buffer. + let offset_offset = match data.data_type() { + DataType::Utf8 | DataType::Binary => { + // Offset buffer is possible a slice of the buffer. + // If we use slice pointer as exported buffer pointer, it will cause + // the consumer to calculate incorrect length of data buffer (buffer 1). + // We need to get the offset of the offset buffer and fill it in + // the `FFI_ArrowArray` offset field. + Some(unsafe { + let b = &data.buffers()[0]; + b.as_ptr().offset_from(b.get_bytes().ptr().as_ptr()) as usize + / std::mem::size_of::() + }) + } + DataType::LargeUtf8 | DataType::LargeBinary => { + // Offset buffer is possible a slice of the buffer. + // If we use slice pointer as exported buffer pointer, it will cause + // the consumer to calculate incorrect length of data buffer (buffer 1). + // We need to get the offset of the offset buffer and fill it in + // the `FFI_ArrowArray` offset field. + Some(unsafe { + let b = &data.buffers()[0]; + b.as_ptr().offset_from(b.get_bytes().ptr().as_ptr()) as usize + / std::mem::size_of::() + }) + } + _ => None, + }; + + let offset = if let Some(offset) = offset_offset { + if data.offset() != 0 { + // TODO: Adjust for data offset + panic!("Offset buffer should not have offset"); + } + offset + } else { + data.offset() + }; + // `n_buffers` is the number of buffers by the spec. let n_buffers = { data_layout.buffers.len() + { @@ -143,9 +182,25 @@ impl FFI_ArrowArray { let buffers_ptr = buffers .iter() - .flat_map(|maybe_buffer| match maybe_buffer { - // note that `raw_data` takes into account the buffer's offset - Some(b) => Some(b.as_ptr() as *const c_void), + .enumerate() + .flat_map(|(buffer_idx, maybe_buffer)| match maybe_buffer { + Some(b) => { + match (data.data_type(), buffer_idx) { + ( + DataType::Utf8 + | DataType::LargeUtf8 + | DataType::Binary + | DataType::LargeBinary, + 1, + ) => { + // For offset buffer, take original pointer without offset. + // Buffer offset should be handled by `FFI_ArrowArray` offset field. + Some(b.get_bytes().ptr().as_ptr() as *const c_void) + } + // For other buffers, note that `raw_data` takes into account the buffer's offset + _ => Some(b.as_ptr() as *const c_void), + } + } // This is for null buffer. We only put a null pointer for // null buffer if by spec it can contain null mask. None if data_layout.can_contain_null_mask => Some(std::ptr::null()), @@ -186,7 +241,7 @@ impl FFI_ArrowArray { Self { length: data.len() as i64, null_count: null_count as i64, - offset: data.offset() as i64, + offset: offset as i64, n_buffers, n_children, buffers: private_data.buffers_ptr.as_mut_ptr(),