Skip to content

Commit

Permalink
Implement arrow-row encoding/decoding for view types (#5922)
Browse files Browse the repository at this point in the history
* implement arrow-row encoding/decoding for view types

* add doc comments, better error msg, more test coverage

* ensure no performance regression

* update perf

* fix bug

* make fmt happy

* Update arrow-array/src/array/byte_view_array.rs

Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>

* update

* update comments

* move cmp around

* move things around and remove inline hint

* Update arrow-array/src/array/byte_view_array.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* Update arrow-ord/src/cmp.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* return error instead of panic

* remove unnecessary func

---------

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>
  • Loading branch information
3 people authored Jun 24, 2024
1 parent 063ac13 commit 0c3a24d
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 120 deletions.
54 changes: 51 additions & 3 deletions arrow-array/src/array/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,22 @@
// under the License.

use crate::array::print_long_array;
use crate::builder::GenericByteViewBuilder;
use crate::builder::{ArrayBuilder, GenericByteViewBuilder};
use crate::iterator::ArrayIter;
use crate::types::bytes::ByteArrayNativeType;
use crate::types::{BinaryViewType, ByteViewType, StringViewType};
use crate::{Array, ArrayAccessor, ArrayRef, Scalar};
use arrow_buffer::{Buffer, NullBuffer, ScalarBuffer};
use crate::{Array, ArrayAccessor, ArrayRef, GenericByteArray, OffsetSizeTrait, Scalar};
use arrow_buffer::{ArrowNativeType, Buffer, NullBuffer, ScalarBuffer};
use arrow_data::{ArrayData, ArrayDataBuilder, ByteView};
use arrow_schema::{ArrowError, DataType};
use num::ToPrimitive;
use std::any::Any;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc;

use super::ByteArrayType;

/// [Variable-size Binary View Layout]: An array of variable length bytes view arrays.
///
/// Different than [`crate::GenericByteArray`] as it stores both an offset and length
Expand Down Expand Up @@ -429,6 +432,51 @@ impl<T: ByteViewType + ?Sized> From<ArrayData> for GenericByteViewArray<T> {
}
}

/// Convert a [`GenericByteArray`] to a [`GenericByteViewArray`] but in a smart way:
/// If the offsets are all less than u32::MAX, then we directly build the view array on top of existing buffer.
impl<FROM, V> From<&GenericByteArray<FROM>> for GenericByteViewArray<V>
where
FROM: ByteArrayType,
FROM::Offset: OffsetSizeTrait + ToPrimitive,
V: ByteViewType<Native = FROM::Native>,
{
fn from(byte_array: &GenericByteArray<FROM>) -> Self {
let offsets = byte_array.offsets();

let can_reuse_buffer = match offsets.last() {
Some(offset) => offset.as_usize() < u32::MAX as usize,
None => true,
};

if can_reuse_buffer {
let len = byte_array.len();
let mut views_builder = GenericByteViewBuilder::<V>::with_capacity(len);
let str_values_buf = byte_array.values().clone();
let block = views_builder.append_block(str_values_buf);
for (i, w) in offsets.windows(2).enumerate() {
let offset = w[0].as_usize();
let end = w[1].as_usize();
let length = end - offset;

if byte_array.is_null(i) {
views_builder.append_null();
} else {
// Safety: the input was a valid array so it valid UTF8 (if string). And
// all offsets were valid
unsafe {
views_builder.append_view_unchecked(block, offset as u32, length as u32)
}
}
}
assert_eq!(views_builder.len(), len);
views_builder.finish()
} else {
// TODO: the first u32::MAX can still be reused
GenericByteViewArray::<V>::from_iter(byte_array.iter())
}
}
}

impl<T: ByteViewType + ?Sized> From<GenericByteViewArray<T>> for ArrayData {
fn from(mut array: GenericByteViewArray<T>) -> Self {
let len = array.len();
Expand Down
40 changes: 4 additions & 36 deletions arrow-cast/src/cast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1230,7 +1230,7 @@ pub fn cast_with_options(
let binary = BinaryArray::from(array.as_string::<i32>().clone());
cast_byte_container::<BinaryType, LargeBinaryType>(&binary)
}
Utf8View => cast_byte_to_view::<Utf8Type, StringViewType>(array),
Utf8View => Ok(Arc::new(StringViewArray::from(array.as_string::<i32>()))),
LargeUtf8 => cast_byte_container::<Utf8Type, LargeUtf8Type>(array),
Time32(TimeUnit::Second) => parse_string::<Time32SecondType, i32>(array, cast_options),
Time32(TimeUnit::Millisecond) => {
Expand Down Expand Up @@ -1290,7 +1290,7 @@ pub fn cast_with_options(
LargeBinary => Ok(Arc::new(LargeBinaryArray::from(
array.as_string::<i64>().clone(),
))),
Utf8View => cast_byte_to_view::<LargeUtf8Type, StringViewType>(array),
Utf8View => Ok(Arc::new(StringViewArray::from(array.as_string::<i64>()))),
Time32(TimeUnit::Second) => parse_string::<Time32SecondType, i64>(array, cast_options),
Time32(TimeUnit::Millisecond) => {
parse_string::<Time32MillisecondType, i64>(array, cast_options)
Expand Down Expand Up @@ -1338,7 +1338,7 @@ pub fn cast_with_options(
FixedSizeBinary(size) => {
cast_binary_to_fixed_size_binary::<i32>(array, *size, cast_options)
}
BinaryView => cast_byte_to_view::<BinaryType, BinaryViewType>(array),
BinaryView => Ok(Arc::new(BinaryViewArray::from(array.as_binary::<i32>()))),
_ => Err(ArrowError::CastError(format!(
"Casting from {from_type:?} to {to_type:?} not supported",
))),
Expand All @@ -1353,7 +1353,7 @@ pub fn cast_with_options(
FixedSizeBinary(size) => {
cast_binary_to_fixed_size_binary::<i64>(array, *size, cast_options)
}
BinaryView => cast_byte_to_view::<LargeBinaryType, BinaryViewType>(array),
BinaryView => Ok(Arc::new(BinaryViewArray::from(array.as_binary::<i64>()))),
_ => Err(ArrowError::CastError(format!(
"Casting from {from_type:?} to {to_type:?} not supported",
))),
Expand Down Expand Up @@ -2345,38 +2345,6 @@ where
Ok(Arc::new(GenericByteArray::<TO>::from(array_data)))
}

/// Helper function to cast from one `ByteArrayType` array to `ByteViewType` array.
fn cast_byte_to_view<FROM, V>(array: &dyn Array) -> Result<ArrayRef, ArrowError>
where
FROM: ByteArrayType,
FROM::Offset: OffsetSizeTrait + ToPrimitive,
V: ByteViewType,
{
let byte_array: &GenericByteArray<FROM> = array.as_bytes();
let len = array.len();
let str_values_buf = byte_array.values().clone();
let offsets = byte_array.offsets();

let mut views_builder = GenericByteViewBuilder::<V>::with_capacity(len);
let block = views_builder.append_block(str_values_buf);
for (i, w) in offsets.windows(2).enumerate() {
let offset = w[0].to_u32().unwrap();
let end = w[1].to_u32().unwrap();
let length = end - offset;

if byte_array.is_null(i) {
views_builder.append_null();
} else {
// Safety: the input was a valid array so it valid UTF8 (if string). And
// all offsets were valid and we created the views correctly
unsafe { views_builder.append_view_unchecked(block, offset, length) }
}
}

assert_eq!(views_builder.len(), len);
Ok(Arc::new(views_builder.finish()))
}

/// Helper function to cast from one `ByteViewType` array to `ByteArrayType` array.
fn cast_view_to_byte<FROM, TO>(array: &dyn Array) -> Result<ArrayRef, ArrowError>
where
Expand Down
158 changes: 82 additions & 76 deletions arrow-ord/src/cmp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,98 +540,32 @@ impl<'a, T: ByteArrayType> ArrayOrd for &'a GenericByteArray<T> {
}
}

/// Comparing two ByteView types are non-trivial.
/// It takes a bit of patience to understand why we don't just compare two &[u8] directly.
///
/// ByteView types give us the following two advantages, and we need to be careful not to lose them:
/// (1) For string/byte smaller than 12 bytes, the entire data is inlined in the view.
/// Meaning that reading one array element requires only one memory access
/// (two memory access required for StringArray, one for offset buffer, the other for value buffer).
///
/// (2) For string/byte larger than 12 bytes, we can still be faster than (for certain operations) StringArray/ByteArray,
/// thanks to the inlined 4 bytes.
/// Consider equality check:
/// If the first four bytes of the two strings are different, we can return false immediately (with just one memory access).
/// If we are unlucky and the first four bytes are the same, we need to fallback to compare two full strings.
impl<'a, T: ByteViewType> ArrayOrd for &'a GenericByteViewArray<T> {
/// Item.0 is the array, Item.1 is the index into the array.
/// Why don't we just store Item.0[Item.1] as the item?
/// - Because if we do so, we materialize the entire string (i.e., make multiple memory accesses), which might be unnecessary.
/// - Most of the time (eq, ord), we only need to look at the first 4 bytes to know the answer,
/// e.g., if the inlined 4 bytes are different, we can directly return unequal without looking at the full string.
/// This is the item type for the GenericByteViewArray::compare
/// Item.0 is the array, Item.1 is the index
type Item = (&'a GenericByteViewArray<T>, usize);

/// # Equality check flow
/// (1) if both string are smaller than 12 bytes, we can directly compare the data inlined to the view.
/// (2) if any of the string is larger than 12 bytes, we need to compare the full string.
/// (2.1) if the inlined 4 bytes are different, we can return false immediately.
/// (2.2) o.w., we need to compare the full string.
///
/// # Safety
/// (1) Indexing. The Self::Item.1 encodes the index value, which is already checked in `value` function,
/// so it is safe to index into the views.
/// (2) Slice data from view. We know the bytes 4-8 are inlined data (per spec), so it is safe to slice from the view.
fn is_eq(l: Self::Item, r: Self::Item) -> bool {
// # Safety
// The index is within bounds as it is checked in value()
let l_view = unsafe { l.0.views().get_unchecked(l.1) };
let l_len = *l_view as u32;

let r_view = unsafe { r.0.views().get_unchecked(r.1) };
let r_len = *r_view as u32;

// This is a fast path for equality check.
// We don't need to look at the actual bytes to determine if they are equal.
if l_len != r_len {
return false;
}

if l_len <= 12 {
let l_data = unsafe { GenericByteViewArray::<T>::inline_value(l_view, l_len as usize) };
let r_data = unsafe { GenericByteViewArray::<T>::inline_value(r_view, r_len as usize) };
l_data == r_data
} else {
let l_inlined_data = unsafe { GenericByteViewArray::<T>::inline_value(l_view, 4) };
let r_inlined_data = unsafe { GenericByteViewArray::<T>::inline_value(r_view, 4) };
if l_inlined_data != r_inlined_data {
return false;
}

let l_full_data: &[u8] = unsafe { l.0.value_unchecked(l.1).as_ref() };
let r_full_data: &[u8] = unsafe { r.0.value_unchecked(r.1).as_ref() };
l_full_data == r_full_data
}
unsafe { compare_byte_view_unchecked(l.0, l.1, r.0, r.1).is_eq() }
}

/// # Ordering check flow
/// (1) if both string are smaller than 12 bytes, we can directly compare the data inlined to the view.
/// (2) if any of the string is larger than 12 bytes, we need to compare the full string.
/// (2.1) if the inlined 4 bytes are different, we can return the result immediately.
/// (2.2) o.w., we need to compare the full string.
///
/// # Safety
/// (1) Indexing. The Self::Item.1 encodes the index value, which is already checked in `value` function,
/// so it is safe to index into the views.
/// (2) Slice data from view. We know the bytes 4-8 are inlined data (per spec), so it is safe to slice from the view.
fn is_lt(l: Self::Item, r: Self::Item) -> bool {
let l_view = l.0.views().get(l.1).unwrap();
let l_len = *l_view as u32;

let r_view = r.0.views().get(r.1).unwrap();
let r_len = *r_view as u32;

if l_len <= 12 && r_len <= 12 {
let l_data = unsafe { GenericByteViewArray::<T>::inline_value(l_view, l_len as usize) };
let r_data = unsafe { GenericByteViewArray::<T>::inline_value(r_view, r_len as usize) };
return l_data < r_data;
}
// one of the string is larger than 12 bytes,
// we then try to compare the inlined data first
let l_inlined_data = unsafe { GenericByteViewArray::<T>::inline_value(l_view, 4) };
let r_inlined_data = unsafe { GenericByteViewArray::<T>::inline_value(r_view, 4) };
if r_inlined_data != l_inlined_data {
return l_inlined_data < r_inlined_data;
}
// unfortunately, we need to compare the full data
let l_full_data: &[u8] = unsafe { l.0.value_unchecked(l.1).as_ref() };
let r_full_data: &[u8] = unsafe { r.0.value_unchecked(r.1).as_ref() };
l_full_data < r_full_data
// # Safety
// The index is within bounds as it is checked in value()
unsafe { compare_byte_view_unchecked(l.0, l.1, r.0, r.1).is_lt() }
}

fn len(&self) -> usize {
Expand Down Expand Up @@ -663,6 +597,78 @@ impl<'a> ArrayOrd for &'a FixedSizeBinaryArray {
}
}

/// Compares two [`GenericByteViewArray`] at index `left_idx` and `right_idx`
pub fn compare_byte_view<T: ByteViewType>(
left: &GenericByteViewArray<T>,
left_idx: usize,
right: &GenericByteViewArray<T>,
right_idx: usize,
) -> std::cmp::Ordering {
assert!(left_idx < left.len());
assert!(right_idx < right.len());
unsafe { compare_byte_view_unchecked(left, left_idx, right, right_idx) }
}

/// Comparing two [`GenericByteViewArray`] at index `left_idx` and `right_idx`
///
/// Comparing two ByteView types are non-trivial.
/// It takes a bit of patience to understand why we don't just compare two &[u8] directly.
///
/// ByteView types give us the following two advantages, and we need to be careful not to lose them:
/// (1) For string/byte smaller than 12 bytes, the entire data is inlined in the view.
/// Meaning that reading one array element requires only one memory access
/// (two memory access required for StringArray, one for offset buffer, the other for value buffer).
///
/// (2) For string/byte larger than 12 bytes, we can still be faster than (for certain operations) StringArray/ByteArray,
/// thanks to the inlined 4 bytes.
/// Consider equality check:
/// If the first four bytes of the two strings are different, we can return false immediately (with just one memory access).
///
/// If we directly compare two &[u8], we materialize the entire string (i.e., make multiple memory accesses), which might be unnecessary.
/// - Most of the time (eq, ord), we only need to look at the first 4 bytes to know the answer,
/// e.g., if the inlined 4 bytes are different, we can directly return unequal without looking at the full string.
///
/// # Order check flow
/// (1) if both string are smaller than 12 bytes, we can directly compare the data inlined to the view.
/// (2) if any of the string is larger than 12 bytes, we need to compare the full string.
/// (2.1) if the inlined 4 bytes are different, we can return the result immediately.
/// (2.2) o.w., we need to compare the full string.
///
/// # Safety
/// The left/right_idx must within range of each array
pub unsafe fn compare_byte_view_unchecked<T: ByteViewType>(
left: &GenericByteViewArray<T>,
left_idx: usize,
right: &GenericByteViewArray<T>,
right_idx: usize,
) -> std::cmp::Ordering {
let l_view = left.views().get_unchecked(left_idx);
let l_len = *l_view as u32;

let r_view = right.views().get_unchecked(right_idx);
let r_len = *r_view as u32;

if l_len <= 12 && r_len <= 12 {
let l_data = unsafe { GenericByteViewArray::<T>::inline_value(l_view, l_len as usize) };
let r_data = unsafe { GenericByteViewArray::<T>::inline_value(r_view, r_len as usize) };
return l_data.cmp(r_data);
}

// one of the string is larger than 12 bytes,
// we then try to compare the inlined data first
let l_inlined_data = unsafe { GenericByteViewArray::<T>::inline_value(l_view, 4) };
let r_inlined_data = unsafe { GenericByteViewArray::<T>::inline_value(r_view, 4) };
if r_inlined_data != l_inlined_data {
return l_inlined_data.cmp(r_inlined_data);
}

// unfortunately, we need to compare the full data
let l_full_data: &[u8] = unsafe { left.value_unchecked(left_idx).as_ref() };
let r_full_data: &[u8] = unsafe { right.value_unchecked(right_idx).as_ref() };

l_full_data.cmp(r_full_data)
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down
17 changes: 17 additions & 0 deletions arrow-ord/src/ord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,21 @@ fn compare_bytes<T: ByteArrayType>(
})
}

fn compare_byte_view<T: ByteViewType>(
left: &dyn Array,
right: &dyn Array,
opts: SortOptions,
) -> DynComparator {
let left = left.as_byte_view::<T>();
let right = right.as_byte_view::<T>();

let l = left.clone();
let r = right.clone();
compare(left, right, opts, move |i, j| {
crate::cmp::compare_byte_view(&l, i, &r, j)
})
}

fn compare_dict<K: ArrowDictionaryKeyType>(
left: &dyn Array,
right: &dyn Array,
Expand Down Expand Up @@ -342,8 +357,10 @@ pub fn make_comparator(
(Boolean, Boolean) => Ok(compare_boolean(left, right, opts)),
(Utf8, Utf8) => Ok(compare_bytes::<Utf8Type>(left, right, opts)),
(LargeUtf8, LargeUtf8) => Ok(compare_bytes::<LargeUtf8Type>(left, right, opts)),
(Utf8View, Utf8View) => Ok(compare_byte_view::<StringViewType>(left, right, opts)),
(Binary, Binary) => Ok(compare_bytes::<BinaryType>(left, right, opts)),
(LargeBinary, LargeBinary) => Ok(compare_bytes::<LargeBinaryType>(left, right, opts)),
(BinaryView, BinaryView) => Ok(compare_byte_view::<BinaryViewType>(left, right, opts)),
(FixedSizeBinary(_), FixedSizeBinary(_)) => {
let left = left.as_fixed_size_binary();
let right = right.as_fixed_size_binary();
Expand Down
Loading

0 comments on commit 0c3a24d

Please sign in to comment.