Skip to content

Commit

Permalink
feat + fix: IPC support for run encoded array. (#3662)
Browse files Browse the repository at this point in the history
* Schema.fbs changes, flatbuffer generated code, flatbuffer gen script changes

Add ipc reader, writer and equals

Add/Update  tests

* Add support for non zero offset in run array

* clippy fixes

* format fix

* doc fix

* incorporate pr comments

* fix formatting

* more pr comments

* pr suggestions

---------

Co-authored-by: ask <ask@local>
Co-authored-by: devx <devx@local>
  • Loading branch information
3 people authored Feb 10, 2023
1 parent 07e2063 commit 5b1821e
Show file tree
Hide file tree
Showing 13 changed files with 779 additions and 90 deletions.
170 changes: 143 additions & 27 deletions arrow-array/src/array/run_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ pub struct RunArray<R: RunEndIndexType> {
}

impl<R: RunEndIndexType> RunArray<R> {
// calculates the logical length of the array encoded
// by the given run_ends array.
fn logical_len(run_ends: &PrimitiveArray<R>) -> usize {
/// Calculates the logical length of the array encoded
/// by the given run_ends array.
pub fn logical_len(run_ends: &PrimitiveArray<R>) -> usize {
let len = run_ends.len();
if len == 0 {
return 0;
Expand Down Expand Up @@ -145,14 +145,15 @@ impl<R: RunEndIndexType> RunArray<R> {
}

/// Returns index to the physical array for the given index to the logical array.
/// The function does not adjust the input logical index based on `ArrayData::offset`.
/// Performs a binary search on the run_ends array for the input index.
#[inline]
pub fn get_physical_index(&self, logical_index: usize) -> Option<usize> {
if logical_index >= self.len() {
pub fn get_zero_offset_physical_index(&self, logical_index: usize) -> Option<usize> {
if logical_index >= Self::logical_len(&self.run_ends) {
return None;
}
let mut st: usize = 0;
let mut en: usize = self.run_ends().len();
let mut en: usize = self.run_ends.len();
while st + 1 < en {
let mid: usize = (st + en) / 2;
if logical_index
Expand All @@ -164,7 +165,7 @@ impl<R: RunEndIndexType> RunArray<R> {
// `en` starts with len. The condition `st + 1 < en` ensures
// `st` and `en` differs atleast by two. So the value of `mid`
// will never be either `st` or `en`
self.run_ends().value_unchecked(mid - 1).as_usize()
self.run_ends.value_unchecked(mid - 1).as_usize()
}
{
en = mid
Expand All @@ -175,6 +176,17 @@ impl<R: RunEndIndexType> RunArray<R> {
Some(st)
}

/// Returns index to the physical array for the given index to the logical array.
/// This function adjusts the input logical index based on `ArrayData::offset`
/// Performs a binary search on the run_ends array for the input index.
#[inline]
pub fn get_physical_index(&self, logical_index: usize) -> Option<usize> {
if logical_index >= self.len() {
return None;
}
self.get_zero_offset_physical_index(logical_index + self.offset())
}

/// Returns the physical indices of the input logical indices. Returns error if any of the logical
/// index cannot be converted to physical index. The logical indices are sorted and iterated along
/// with run_ends array to find matching physical index. The approach used here was chosen over
Expand All @@ -192,6 +204,10 @@ impl<R: RunEndIndexType> RunArray<R> {
{
let indices_len = logical_indices.len();

if indices_len == 0 {
return Ok(vec![]);
}

// `ordered_indices` store index into `logical_indices` and can be used
// to iterate `logical_indices` in sorted order.
let mut ordered_indices: Vec<usize> = (0..indices_len).collect();
Expand All @@ -204,12 +220,30 @@ impl<R: RunEndIndexType> RunArray<R> {
.unwrap()
});

// Return early if all the logical indices cannot be converted to physical indices.
let largest_logical_index =
logical_indices[*ordered_indices.last().unwrap()].as_usize();
if largest_logical_index >= self.len() {
return Err(ArrowError::InvalidArgumentError(format!(
"Cannot convert all logical indices to physical indices. The logical index cannot be converted is {largest_logical_index}.",
)));
}

// Skip some physical indices based on offset.
let skip_value = if self.offset() > 0 {
self.get_zero_offset_physical_index(self.offset()).unwrap()
} else {
0
};

let mut physical_indices = vec![0; indices_len];

let mut ordered_index = 0_usize;
for (physical_index, run_end) in self.run_ends.values().iter().enumerate() {
// Get the run end index of current physical index
let run_end_value = run_end.as_usize();
for (physical_index, run_end) in
self.run_ends.values().iter().enumerate().skip(skip_value)
{
// Get the run end index (relative to offset) of current physical index
let run_end_value = run_end.as_usize() - self.offset();

// All the `logical_indices` that are less than current run end index
// belongs to current physical index.
Expand Down Expand Up @@ -552,6 +586,34 @@ mod tests {
result
}

// Asserts that `logical_array[logical_indices[*]] == physical_array[physical_indices[*]]`
fn compare_logical_and_physical_indices(
logical_indices: &[u32],
logical_array: &[Option<i32>],
physical_indices: &[usize],
physical_array: &PrimitiveArray<Int32Type>,
) {
assert_eq!(logical_indices.len(), physical_indices.len());

// check value in logical index in the logical_array matches physical index in physical_array
logical_indices
.iter()
.map(|f| f.as_usize())
.zip(physical_indices.iter())
.for_each(|(logical_ix, physical_ix)| {
let expected = logical_array[logical_ix];
match expected {
Some(val) => {
assert!(physical_array.is_valid(*physical_ix));
let actual = physical_array.value(*physical_ix);
assert_eq!(val, actual);
}
None => {
assert!(physical_array.is_null(*physical_ix))
}
};
});
}
#[test]
fn test_run_array() {
// Construct a value array
Expand Down Expand Up @@ -824,23 +886,77 @@ mod tests {
assert_eq!(logical_indices.len(), physical_indices.len());

// check value in logical index in the input_array matches physical index in typed_run_array
logical_indices
.iter()
.map(|f| f.as_usize())
.zip(physical_indices.iter())
.for_each(|(logical_ix, physical_ix)| {
let expected = input_array[logical_ix];
match expected {
Some(val) => {
assert!(physical_values_array.is_valid(*physical_ix));
let actual = physical_values_array.value(*physical_ix);
assert_eq!(val, actual);
}
None => {
assert!(physical_values_array.is_null(*physical_ix))
}
};
});
compare_logical_and_physical_indices(
&logical_indices,
&input_array,
&physical_indices,
physical_values_array,
);
}
}

#[test]
fn test_get_physical_indices_sliced() {
let total_len = 80;
let input_array = build_input_array(total_len);

// Encode the input_array to run array
let mut builder =
PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
builder.extend(input_array.iter().copied());
let run_array = builder.finish();
let physical_values_array = as_primitive_array::<Int32Type>(run_array.values());

// test for all slice lengths.
for slice_len in 1..=total_len {
// create an array consisting of all the indices repeated twice and shuffled.
let mut logical_indices: Vec<u32> = (0_u32..(slice_len as u32)).collect();
// add same indices once more
logical_indices.append(&mut logical_indices.clone());
let mut rng = thread_rng();
logical_indices.shuffle(&mut rng);

// test for offset = 0 and slice length = slice_len
// slice the input array using which the run array was built.
let sliced_input_array = &input_array[0..slice_len];

// slice the run array
let sliced_run_array: RunArray<Int16Type> =
run_array.slice(0, slice_len).into_data().into();

// Get physical indices.
let physical_indices = sliced_run_array
.get_physical_indices(&logical_indices)
.unwrap();

compare_logical_and_physical_indices(
&logical_indices,
sliced_input_array,
&physical_indices,
physical_values_array,
);

// test for offset = total_len - slice_len and slice length = slice_len
// slice the input array using which the run array was built.
let sliced_input_array = &input_array[total_len - slice_len..total_len];

// slice the run array
let sliced_run_array: RunArray<Int16Type> = run_array
.slice(total_len - slice_len, slice_len)
.into_data()
.into();

// Get physical indices
let physical_indices = sliced_run_array
.get_physical_indices(&logical_indices)
.unwrap();

compare_logical_and_physical_indices(
&logical_indices,
sliced_input_array,
&physical_indices,
physical_values_array,
);
}
}
}
13 changes: 7 additions & 6 deletions arrow-data/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1290,9 +1290,9 @@ impl ArrayData {
DataType::RunEndEncoded(run_ends, _values) => {
let run_ends_data = self.child_data()[0].clone();
match run_ends.data_type() {
DataType::Int16 => run_ends_data.check_run_ends::<i16>(self.len()),
DataType::Int32 => run_ends_data.check_run_ends::<i32>(self.len()),
DataType::Int64 => run_ends_data.check_run_ends::<i64>(self.len()),
DataType::Int16 => run_ends_data.check_run_ends::<i16>(),
DataType::Int32 => run_ends_data.check_run_ends::<i32>(),
DataType::Int64 => run_ends_data.check_run_ends::<i64>(),
_ => unreachable!(),
}
}
Expand Down Expand Up @@ -1451,7 +1451,7 @@ impl ArrayData {
}

/// Validates that each value in run_ends array is positive and strictly increasing.
fn check_run_ends<T>(&self, array_len: usize) -> Result<(), ArrowError>
fn check_run_ends<T>(&self) -> Result<(), ArrowError>
where
T: ArrowNativeType + TryInto<i64> + num::Num + std::fmt::Display,
{
Expand All @@ -1478,9 +1478,10 @@ impl ArrayData {
Ok(())
})?;

if prev_value.as_usize() != array_len {
if prev_value.as_usize() < (self.offset + self.len) {
return Err(ArrowError::InvalidArgumentError(format!(
"The length of array does not match the last value in the run_ends array. The last value of run_ends array is {prev_value} and length of array is {array_len}."
"The offset + length of array should be less or equal to last value in the run_ends array. The last value of run_ends array is {prev_value} and offset + length of array is {}.",
self.offset + self.len
)));
}
Ok(())
Expand Down
5 changes: 4 additions & 1 deletion arrow-data/src/equal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ mod fixed_list;
mod list;
mod null;
mod primitive;
mod run;
mod structure;
mod union;
mod utils;
Expand All @@ -50,6 +51,8 @@ use structure::struct_equal;
use union::union_equal;
use variable_size::variable_sized_equal;

use self::run::run_equal;

/// Compares the values of two [ArrayData] starting at `lhs_start` and `rhs_start` respectively
/// for `len` slots.
#[inline]
Expand Down Expand Up @@ -137,7 +140,7 @@ fn equal_values(
},
DataType::Float16 => primitive_equal::<f16>(lhs, rhs, lhs_start, rhs_start, len),
DataType::Map(_, _) => list_equal::<i32>(lhs, rhs, lhs_start, rhs_start, len),
DataType::RunEndEncoded(_, _) => todo!(),
DataType::RunEndEncoded(_, _) => run_equal(lhs, rhs, lhs_start, rhs_start, len),
}
}

Expand Down
84 changes: 84 additions & 0 deletions arrow-data/src/equal/run.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::data::ArrayData;

use super::equal_range;

/// The current implementation of comparison of run array support physical comparison.
/// Comparing run encoded array based on logical indices (`lhs_start`, `rhs_start`) will
/// be time consuming as converting from logical index to physical index cannot be done
/// in constant time. The current comparison compares the underlying physical arrays.
pub(super) fn run_equal(
lhs: &ArrayData,
rhs: &ArrayData,
lhs_start: usize,
rhs_start: usize,
len: usize,
) -> bool {
if lhs_start != 0
|| rhs_start != 0
|| (lhs.len() != len && rhs.len() != len)
|| lhs.offset() > 0
|| rhs.offset() > 0
{
unimplemented!("Logical comparison for run array not supported.")
}

if lhs.len() != rhs.len() {
return false;
}

let lhs_run_ends_array = lhs.child_data().get(0).unwrap();
let lhs_values_array = lhs.child_data().get(1).unwrap();

let rhs_run_ends_array = rhs.child_data().get(0).unwrap();
let rhs_values_array = rhs.child_data().get(1).unwrap();

if lhs_run_ends_array.len() != rhs_run_ends_array.len() {
return false;
}

if lhs_values_array.len() != rhs_values_array.len() {
return false;
}

// check run ends array are equal. The length of the physical array
// is used to validate the child arrays.
let run_ends_equal = equal_range(
lhs_run_ends_array,
rhs_run_ends_array,
lhs_start,
rhs_start,
lhs_run_ends_array.len(),
);

// if run ends array are not the same return early without validating
// values array.
if !run_ends_equal {
return false;
}

// check values array are equal
equal_range(
lhs_values_array,
rhs_values_array,
lhs_start,
rhs_start,
rhs_values_array.len(),
)
}
Loading

0 comments on commit 5b1821e

Please sign in to comment.