Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RunEndBuffer (#1799) #3817

Merged
merged 9 commits into from
Mar 8, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions arrow-array/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,7 @@ mod tests {
assert_eq!(array.null_count(), 0);
assert_eq!(array.values().len(), 1);
assert_eq!(array.values().null_count(), 1);
assert_eq!(array.run_ends().len(), 4);
assert_eq!(array.run_ends().values(), &[4]);

let idx = array.get_physical_indices(&[0, 1, 2, 3]).unwrap();
Expand Down
129 changes: 44 additions & 85 deletions arrow-array/src/array/run_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use std::any::Any;

use arrow_buffer::buffer::RunEndBuffer;
use arrow_buffer::ArrowNativeType;
use arrow_data::{ArrayData, ArrayDataBuilder};
use arrow_schema::{ArrowError, DataType, Field};
Expand Down Expand Up @@ -62,7 +63,7 @@ use crate::{

pub struct RunArray<R: RunEndIndexType> {
data: ArrayData,
run_ends: PrimitiveArray<R>,
run_ends: RunEndBuffer<R::Native>,
values: ArrayRef,
}

Expand Down Expand Up @@ -110,11 +111,8 @@ impl<R: RunEndIndexType> RunArray<R> {
Ok(array_data.into())
}

/// Returns a reference to run_ends array
///
/// Note: any slicing of this [`RunArray`] array is not applied to the returned array
/// and must be handled separately
pub fn run_ends(&self) -> &PrimitiveArray<R> {
/// Returns a reference to [`RunEndBuffer`]
pub fn run_ends(&self) -> &RunEndBuffer<R::Native> {
&self.run_ends
}

Expand All @@ -128,19 +126,12 @@ impl<R: RunEndIndexType> RunArray<R> {

/// Returns the physical index at which the array slice starts.
pub fn get_start_physical_index(&self) -> usize {
if self.offset() == 0 {
return 0;
}
self.get_zero_offset_physical_index(self.offset()).unwrap()
self.run_ends.get_start_physical_index()
}

/// Returns the physical index at which the array slice ends.
pub fn get_end_physical_index(&self) -> usize {
if self.offset() + self.len() == Self::logical_len(&self.run_ends) {
return self.run_ends.len() - 1;
}
self.get_zero_offset_physical_index(self.offset() + self.len() - 1)
.unwrap()
self.run_ends.get_end_physical_index()
}

/// Downcast this [`RunArray`] to a [`TypedRunArray`]
Expand All @@ -164,47 +155,12 @@ impl<R: RunEndIndexType> RunArray<R> {
})
}

/// Returns index to the physical array for the given index to the logical array.
/// The function does not adjust the input logical index based on `ArrayData::offset`.
/// Performs a binary search on the run_ends array for the input index.
#[inline]
pub fn get_zero_offset_physical_index(&self, logical_index: usize) -> Option<usize> {
if logical_index >= Self::logical_len(&self.run_ends) {
return None;
}
let mut st: usize = 0;
let mut en: usize = self.run_ends.len();
while st + 1 < en {
let mid: usize = (st + en) / 2;
if logical_index
< unsafe {
// Safety:
// The value of mid will always be between 1 and len - 1,
// where len is length of run ends array.
// This is based on the fact that `st` starts with 0 and
// `en` starts with len. The condition `st + 1 < en` ensures
// `st` and `en` differs atleast by two. So the value of `mid`
// will never be either `st` or `en`
self.run_ends.value_unchecked(mid - 1).as_usize()
}
{
en = mid
} else {
st = mid
}
}
Some(st)
}

/// Returns index to the physical array for the given index to the logical array.
/// This function adjusts the input logical index based on `ArrayData::offset`
/// Performs a binary search on the run_ends array for the input index.
#[inline]
pub fn get_physical_index(&self, logical_index: usize) -> Option<usize> {
if logical_index >= self.len() {
return None;
}
self.get_zero_offset_physical_index(logical_index + self.offset())
self.run_ends.get_physical_index(logical_index)
}

/// Returns the physical indices of the input logical indices. Returns error if any of the logical
Expand All @@ -222,6 +178,9 @@ impl<R: RunEndIndexType> RunArray<R> {
where
I: ArrowNativeType,
{
let len = self.run_ends().len();
let offset = self.run_ends().offset();

let indices_len = logical_indices.len();

if indices_len == 0 {
Expand All @@ -243,7 +202,7 @@ impl<R: RunEndIndexType> RunArray<R> {
// Return early if all the logical indices cannot be converted to physical indices.
let largest_logical_index =
logical_indices[*ordered_indices.last().unwrap()].as_usize();
if largest_logical_index >= self.len() {
if largest_logical_index >= len {
return Err(ArrowError::InvalidArgumentError(format!(
"Cannot convert all logical indices to physical indices. The logical index cannot be converted is {largest_logical_index}.",
)));
Expand All @@ -259,7 +218,7 @@ impl<R: RunEndIndexType> RunArray<R> {
self.run_ends.values().iter().enumerate().skip(skip_value)
{
// Get the run end index (relative to offset) of current physical index
let run_end_value = run_end.as_usize() - self.offset();
let run_end_value = run_end.as_usize() - offset;

// All the `logical_indices` that are less than current run end index
// belongs to current physical index.
Expand Down Expand Up @@ -295,7 +254,15 @@ impl<R: RunEndIndexType> From<ArrayData> for RunArray<R> {
}
}

let run_ends = PrimitiveArray::<R>::from(data.child_data()[0].clone());
// Safety
// ArrayData is valid
let child = &data.child_data()[0];
assert_eq!(child.data_type(), &R::DATA_TYPE, "Incorrect run ends type");
let run_ends = unsafe {
let scalar = child.buffers()[0].clone().into();
RunEndBuffer::new_unchecked(scalar, data.offset(), data.len())
};

let values = make_array(data.child_data()[1].clone());
Self {
data,
Expand Down Expand Up @@ -330,7 +297,8 @@ impl<R: RunEndIndexType> std::fmt::Debug for RunArray<R> {
writeln!(
f,
"RunArray {{run_ends: {:?}, values: {:?}}}",
self.run_ends, self.values
self.run_ends.values(),
self.values
)
}
}
Expand All @@ -347,7 +315,7 @@ impl<R: RunEndIndexType> std::fmt::Debug for RunArray<R> {
/// .map(|&x| if x == "b" { None } else { Some(x) })
/// .collect();
/// assert_eq!(
/// "RunArray {run_ends: PrimitiveArray<Int16>\n[\n 2,\n 3,\n 5,\n], values: StringArray\n[\n \"a\",\n null,\n \"c\",\n]}\n",
/// "RunArray {run_ends: [2, 3, 5], values: StringArray\n[\n \"a\",\n null,\n \"c\",\n]}\n",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is certainly a nicer API

/// format!("{:?}", array)
/// );
/// ```
Expand All @@ -374,7 +342,7 @@ impl<'a, T: RunEndIndexType> FromIterator<Option<&'a str>> for RunArray<T> {
/// let test = vec!["a", "a", "b", "c"];
/// let array: RunArray<Int16Type> = test.into_iter().collect();
/// assert_eq!(
/// "RunArray {run_ends: PrimitiveArray<Int16>\n[\n 2,\n 3,\n 4,\n], values: StringArray\n[\n \"a\",\n \"b\",\n \"c\",\n]}\n",
/// "RunArray {run_ends: [2, 3, 4], values: StringArray\n[\n \"a\",\n \"b\",\n \"c\",\n]}\n",
/// format!("{:?}", array)
/// );
/// ```
Expand All @@ -401,7 +369,7 @@ impl<'a, T: RunEndIndexType> FromIterator<&'a str> for RunArray<T> {
///
/// let array: Int16RunArray = vec!["a", "a", "b", "c", "c"].into_iter().collect();
/// let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "b", "c"]));
/// assert_eq!(array.run_ends(), &Int16Array::from(vec![2, 3, 5]));
/// assert_eq!(array.run_ends().values(), &[2, 3, 5]);
/// assert_eq!(array.values(), &values);
/// ```
pub type Int16RunArray = RunArray<Int16Type>;
Expand All @@ -416,7 +384,7 @@ pub type Int16RunArray = RunArray<Int16Type>;
///
/// let array: Int32RunArray = vec!["a", "a", "b", "c", "c"].into_iter().collect();
/// let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "b", "c"]));
/// assert_eq!(array.run_ends(), &Int32Array::from(vec![2, 3, 5]));
/// assert_eq!(array.run_ends().values(), &[2, 3, 5]);
/// assert_eq!(array.values(), &values);
/// ```
pub type Int32RunArray = RunArray<Int32Type>;
Expand All @@ -431,7 +399,7 @@ pub type Int32RunArray = RunArray<Int32Type>;
///
/// let array: Int64RunArray = vec!["a", "a", "b", "c", "c"].into_iter().collect();
/// let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "b", "c"]));
/// assert_eq!(array.run_ends(), &Int64Array::from(vec![2, 3, 5]));
/// assert_eq!(array.run_ends().values(), &[2, 3, 5]);
/// assert_eq!(array.values(), &values);
/// ```
pub type Int64RunArray = RunArray<Int64Type>;
Expand Down Expand Up @@ -480,7 +448,7 @@ impl<'a, R: RunEndIndexType, V> std::fmt::Debug for TypedRunArray<'a, R, V> {

impl<'a, R: RunEndIndexType, V> TypedRunArray<'a, R, V> {
/// Returns the run_ends of this [`TypedRunArray`]
pub fn run_ends(&self) -> &'a PrimitiveArray<R> {
pub fn run_ends(&self) -> &'a RunEndBuffer<R::Native> {
self.run_array.run_ends()
}

Expand Down Expand Up @@ -563,7 +531,7 @@ mod tests {
use crate::builder::PrimitiveRunBuilder;
use crate::cast::as_primitive_array;
use crate::types::{Int16Type, Int32Type, Int8Type, UInt32Type};
use crate::{Array, Int16Array, Int32Array, StringArray};
use crate::{Array, Int32Array, StringArray};

fn build_input_array(size: usize) -> Vec<Option<i32>> {
// The input array is created by shuffling and repeating
Expand Down Expand Up @@ -643,9 +611,10 @@ mod tests {
]);

// Construct a run_ends array:
let run_ends_data = PrimitiveArray::<Int16Type>::from_iter_values([
4_i16, 6, 7, 9, 13, 18, 20, 22,
]);
let run_ends_values = [4_i16, 6, 7, 9, 13, 18, 20, 22];
let run_ends_data = PrimitiveArray::<Int16Type>::from_iter_values(
run_ends_values.iter().copied(),
);

// Construct a run ends encoded array from the above two
let ree_array =
Expand All @@ -659,8 +628,7 @@ mod tests {
assert_eq!(&DataType::Int8, values.data_type());

let run_ends = ree_array.run_ends();
assert_eq!(&run_ends_data.into_data(), run_ends.data());
assert_eq!(&DataType::Int16, run_ends.data_type());
assert_eq!(run_ends.values(), &run_ends_values);
}

#[test]
Expand All @@ -671,7 +639,7 @@ mod tests {
builder.append_value(22345678);
let array = builder.finish();
assert_eq!(
"RunArray {run_ends: PrimitiveArray<Int16>\n[\n 1,\n 2,\n 3,\n], values: PrimitiveArray<UInt32>\n[\n 12345678,\n null,\n 22345678,\n]}\n",
"RunArray {run_ends: [1, 2, 3], values: PrimitiveArray<UInt32>\n[\n 12345678,\n null,\n 22345678,\n]}\n",
format!("{array:?}")
);

Expand All @@ -685,7 +653,7 @@ mod tests {
assert_eq!(array.null_count(), 0);

assert_eq!(
"RunArray {run_ends: PrimitiveArray<Int16>\n[\n 20,\n], values: PrimitiveArray<UInt32>\n[\n 1,\n]}\n",
"RunArray {run_ends: [20], values: PrimitiveArray<UInt32>\n[\n 1,\n]}\n",
format!("{array:?}")
);
}
Expand All @@ -698,7 +666,7 @@ mod tests {
.map(|&x| if x == "b" { None } else { Some(x) })
.collect();
assert_eq!(
"RunArray {run_ends: PrimitiveArray<Int16>\n[\n 2,\n 3,\n 4,\n], values: StringArray\n[\n \"a\",\n null,\n \"c\",\n]}\n",
"RunArray {run_ends: [2, 3, 4], values: StringArray\n[\n \"a\",\n null,\n \"c\",\n]}\n",
format!("{array:?}")
);

Expand All @@ -707,7 +675,7 @@ mod tests {

let array: RunArray<Int16Type> = test.into_iter().collect();
assert_eq!(
"RunArray {run_ends: PrimitiveArray<Int16>\n[\n 2,\n 3,\n 4,\n], values: StringArray\n[\n \"a\",\n \"b\",\n \"c\",\n]}\n",
"RunArray {run_ends: [2, 3, 4], values: StringArray\n[\n \"a\",\n \"b\",\n \"c\",\n]}\n",
format!("{array:?}")
);
}
Expand All @@ -721,8 +689,6 @@ mod tests {
assert_eq!(array.null_count(), 0);

let run_ends = array.run_ends();
assert_eq!(&DataType::Int16, run_ends.data_type());
assert_eq!(0, run_ends.null_count());
assert_eq!(&[1, 2, 3, 4], run_ends.values());
}

Expand All @@ -735,9 +701,6 @@ mod tests {
assert_eq!(array.null_count(), 0);

let run_ends = array.run_ends();
assert_eq!(&DataType::Int32, run_ends.data_type());
assert_eq!(0, run_ends.null_count());
assert_eq!(5, run_ends.len());
assert_eq!(&[1, 2, 3, 5, 6], run_ends.values());

let values_data = array.values();
Expand All @@ -754,7 +717,7 @@ mod tests {
assert_eq!(array.null_count(), 0);

let run_ends = array.run_ends();
assert_eq!(1, run_ends.len());
assert_eq!(3, run_ends.len());
assert_eq!(&[3], run_ends.values());

let values_data = array.values();
Expand All @@ -770,16 +733,14 @@ mod tests {
[Some(1), Some(2), Some(3), Some(4)].into_iter().collect();

let array = RunArray::<Int32Type>::try_new(&run_ends, &values).unwrap();
assert_eq!(array.run_ends().data_type(), &DataType::Int32);
assert_eq!(array.values().data_type(), &DataType::Utf8);

assert_eq!(array.null_count(), 0);
assert_eq!(array.len(), 4);
assert_eq!(array.run_ends.null_count(), 0);
assert_eq!(array.values().null_count(), 1);

assert_eq!(
"RunArray {run_ends: PrimitiveArray<Int32>\n[\n 1,\n 2,\n 3,\n 4,\n], values: StringArray\n[\n \"foo\",\n \"bar\",\n null,\n \"baz\",\n]}\n",
"RunArray {run_ends: [1, 2, 3, 4], values: StringArray\n[\n \"foo\",\n \"bar\",\n null,\n \"baz\",\n]}\n",
format!("{array:?}")
);
}
Expand All @@ -788,15 +749,15 @@ mod tests {
fn test_run_array_int16_type_definition() {
let array: Int16RunArray = vec!["a", "a", "b", "c", "c"].into_iter().collect();
let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "b", "c"]));
assert_eq!(array.run_ends(), &Int16Array::from(vec![2, 3, 5]));
assert_eq!(array.run_ends().values(), &[2, 3, 5]);
assert_eq!(array.values(), &values);
}

#[test]
fn test_run_array_empty_string() {
let array: Int16RunArray = vec!["a", "a", "", "", "c"].into_iter().collect();
let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "", "c"]));
assert_eq!(array.run_ends(), &Int16Array::from(vec![2, 4, 5]));
assert_eq!(array.run_ends().values(), &[2, 4, 5]);
assert_eq!(array.values(), &values);
}

Expand Down Expand Up @@ -849,9 +810,7 @@ mod tests {
}

#[test]
#[should_panic(
expected = "PrimitiveArray expected ArrayData with type Int64 got Int32"
)]
#[should_panic(expected = "Incorrect run ends type")]
fn test_run_array_run_ends_data_type_mismatch() {
let a = RunArray::<Int32Type>::from_iter(["32"]);
let _ = RunArray::<Int64Type>::from(a.into_data());
Expand Down
Loading