Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use SlicesIterator for ArrayData Equality #3198

Merged
merged 4 commits into from
Nov 27, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 52 additions & 20 deletions arrow-data/src/equal/fixed_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use crate::data::{contains_nulls, ArrayData};
use crate::contains_nulls;
use crate::data::ArrayData;
use crate::equal::primitive::NULL_SLICES_SELECTIVITY_THRESHOLD;
use crate::slices_iterator::SlicesIterator;
use arrow_buffer::bit_util::get_bit;
use arrow_schema::DataType;

Expand Down Expand Up @@ -47,26 +50,55 @@ pub(super) fn fixed_binary_equal(
size * len,
)
} else {
// get a ref of the null buffer bytes, to use in testing for nullness
let lhs_null_bytes = lhs.null_buffer().as_ref().unwrap().as_slice();
let rhs_null_bytes = rhs.null_buffer().as_ref().unwrap().as_slice();
// with nulls, we need to compare item by item whenever it is not null
(0..len).all(|i| {
let lhs_pos = lhs_start + i;
let rhs_pos = rhs_start + i;
let selectivity_frac = lhs.null_count() as f64 / lhs.len() as f64;

let lhs_is_null = !get_bit(lhs_null_bytes, lhs_pos + lhs.offset());
let rhs_is_null = !get_bit(rhs_null_bytes, rhs_pos + rhs.offset());
if selectivity_frac >= NULL_SLICES_SELECTIVITY_THRESHOLD {
// get a ref of the null buffer bytes, to use in testing for nullness
let lhs_null_bytes = lhs.null_buffer().as_ref().unwrap().as_slice();
let rhs_null_bytes = rhs.null_buffer().as_ref().unwrap().as_slice();
// with nulls, we need to compare item by item whenever it is not null
(0..len).all(|i| {
let lhs_pos = lhs_start + i;
let rhs_pos = rhs_start + i;

lhs_is_null
|| (lhs_is_null == rhs_is_null)
&& equal_len(
lhs_values,
rhs_values,
lhs_pos * size,
rhs_pos * size,
size, // 1 * size since we are comparing a single entry
)
})
let lhs_is_null = !get_bit(lhs_null_bytes, lhs_pos + lhs.offset());
let rhs_is_null = !get_bit(rhs_null_bytes, rhs_pos + rhs.offset());

lhs_is_null
|| (lhs_is_null == rhs_is_null)
&& equal_len(
lhs_values,
rhs_values,
lhs_pos * size,
rhs_pos * size,
size, // 1 * size since we are comparing a single entry
)
})
} else {
let lhs_slices_iter = SlicesIterator::new_from_buffer(
lhs.null_buffer().as_ref().unwrap(),
lhs_start + lhs.offset(),
len,
);
let rhs_slices_iter = SlicesIterator::new_from_buffer(
rhs.null_buffer().as_ref().unwrap(),
rhs_start + rhs.offset(),
len,
);

lhs_slices_iter.zip(rhs_slices_iter).all(
|((l_start, l_end), (r_start, r_end))| {
l_start == r_start
&& l_end == r_end
&& equal_len(
lhs_values,
rhs_values,
(lhs_start + l_start) * size,
(rhs_start + r_start) * size,
(l_end - l_start) * size,
)
},
)
}
}
}
75 changes: 54 additions & 21 deletions arrow-data/src/equal/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
// specific language governing permissions and limitations
// under the License.

use crate::contains_nulls;
use arrow_buffer::bit_util::get_bit;
use std::mem::size_of;

use crate::data::{contains_nulls, ArrayData};
use arrow_buffer::bit_util::get_bit;
use crate::data::ArrayData;
use crate::slices_iterator::SlicesIterator;

use super::utils::equal_len;

pub(crate) const NULL_SLICES_SELECTIVITY_THRESHOLD: f64 = 0.4;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did you come up with 0.4, not saying it is a bad choice, just curious

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It came from benchmarking. equal_nulls_512 has 0.5 null density, and this doesn't improve it. So I pick 0.4 as the threshold.


pub(super) fn primitive_equal<T>(
lhs: &ArrayData,
rhs: &ArrayData,
Expand All @@ -45,25 +49,54 @@ pub(super) fn primitive_equal<T>(
len * byte_width,
)
} else {
// get a ref of the null buffer bytes, to use in testing for nullness
let lhs_null_bytes = lhs.null_buffer().as_ref().unwrap().as_slice();
let rhs_null_bytes = rhs.null_buffer().as_ref().unwrap().as_slice();
// with nulls, we need to compare item by item whenever it is not null
(0..len).all(|i| {
let lhs_pos = lhs_start + i;
let rhs_pos = rhs_start + i;
let lhs_is_null = !get_bit(lhs_null_bytes, lhs_pos + lhs.offset());
let rhs_is_null = !get_bit(rhs_null_bytes, rhs_pos + rhs.offset());
let selectivity_frac = lhs.null_count() as f64 / lhs.len() as f64;

if selectivity_frac >= NULL_SLICES_SELECTIVITY_THRESHOLD {
// get a ref of the null buffer bytes, to use in testing for nullness
let lhs_null_bytes = lhs.null_buffer().as_ref().unwrap().as_slice();
let rhs_null_bytes = rhs.null_buffer().as_ref().unwrap().as_slice();
// with nulls, we need to compare item by item whenever it is not null
(0..len).all(|i| {
let lhs_pos = lhs_start + i;
let rhs_pos = rhs_start + i;
let lhs_is_null = !get_bit(lhs_null_bytes, lhs_pos + lhs.offset());
let rhs_is_null = !get_bit(rhs_null_bytes, rhs_pos + rhs.offset());

lhs_is_null
|| (lhs_is_null == rhs_is_null)
&& equal_len(
lhs_values,
rhs_values,
lhs_pos * byte_width,
rhs_pos * byte_width,
byte_width, // 1 * byte_width since we are comparing a single entry
)
})
} else {
let lhs_slices_iter = SlicesIterator::new_from_buffer(
lhs.null_buffer().as_ref().unwrap(),
lhs_start + lhs.offset(),
len,
);
let rhs_slices_iter = SlicesIterator::new_from_buffer(
rhs.null_buffer().as_ref().unwrap(),
rhs_start + rhs.offset(),
len,
);

lhs_is_null
|| (lhs_is_null == rhs_is_null)
&& equal_len(
lhs_values,
rhs_values,
lhs_pos * byte_width,
rhs_pos * byte_width,
byte_width, // 1 * byte_width since we are comparing a single entry
)
})
lhs_slices_iter.zip(rhs_slices_iter).all(
|((l_start, l_end), (r_start, r_end))| {
l_start == r_start
&& l_end == r_end
&& equal_len(
lhs_values,
rhs_values,
(lhs_start + l_start) * byte_width,
(rhs_start + r_start) * byte_width,
(l_end - l_start) * byte_width,
)
},
)
}
}
}
1 change: 1 addition & 0 deletions arrow-data/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ pub mod transform;
pub mod bit_iterator;
pub mod bit_mask;
pub mod decimal;
pub mod slices_iterator;
40 changes: 40 additions & 0 deletions arrow-data/src/slices_iterator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::bit_iterator::BitSliceIterator;
use arrow_buffer::Buffer;

/// An iterator of `(usize, usize)` each representing an interval
/// `[start, end)` whose slots of a bitmap [Buffer] are true. Each
/// interval corresponds to a contiguous region of memory to be
/// "taken" from an array to be filtered.
///
/// ## Notes:
///
/// Only performant for filters that copy across long contiguous runs
#[derive(Debug)]
pub struct SlicesIterator<'a>(BitSliceIterator<'a>);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving this is a breaking change, as it is exposed publicly - https://docs.rs/arrow-select/latest/arrow_select/filter/struct.SlicesIterator.html


impl<'a> SlicesIterator<'a> {
pub fn new_from_buffer(values: &'a Buffer, offset: usize, len: usize) -> Self {
Self(BitSliceIterator::new(values, offset, len))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point why not just use BitSliceIterator?

}
}

impl<'a> Iterator for SlicesIterator<'a> {
type Item = (usize, usize);

fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
}
60 changes: 19 additions & 41 deletions arrow-select/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use arrow_array::types::ByteArrayType;
use arrow_array::*;
use arrow_buffer::bit_util;
use arrow_buffer::{buffer::buffer_bin_and, Buffer, MutableBuffer};
use arrow_data::bit_iterator::{BitIndexIterator, BitSliceIterator};
use arrow_data::bit_iterator::BitIndexIterator;
use arrow_data::slices_iterator::SlicesIterator;
use arrow_data::transform::MutableArrayData;
use arrow_data::{ArrayData, ArrayDataBuilder};
use arrow_schema::*;
Expand All @@ -38,35 +39,12 @@ use arrow_schema::*;
///
const FILTER_SLICES_SELECTIVITY_THRESHOLD: f64 = 0.8;

/// An iterator of `(usize, usize)` each representing an interval
/// `[start, end)` whose slots of a [BooleanArray] are true. Each
/// interval corresponds to a contiguous region of memory to be
/// "taken" from an array to be filtered.
///
/// ## Notes:
///
/// 1. Ignores the validity bitmap (ignores nulls)
///
/// 2. Only performant for filters that copy across long contiguous runs
#[derive(Debug)]
pub struct SlicesIterator<'a>(BitSliceIterator<'a>);

impl<'a> SlicesIterator<'a> {
pub fn new(filter: &'a BooleanArray) -> Self {
let values = &filter.data_ref().buffers()[0];
let len = filter.len();
let offset = filter.offset();

Self(BitSliceIterator::new(values, offset, len))
}
}

impl<'a> Iterator for SlicesIterator<'a> {
type Item = (usize, usize);
pub fn build_slices_iterator(filter: &BooleanArray) -> SlicesIterator {
let values = &filter.data_ref().buffers()[0];
let len = filter.len();
let offset = filter.offset();

fn next(&mut self) -> Option<Self::Item> {
self.0.next()
}
SlicesIterator::new_from_buffer(values, offset, len)
}

/// An iterator of `usize` whose index in [`BooleanArray`] is true
Expand Down Expand Up @@ -130,7 +108,7 @@ pub type Filter<'a> = Box<dyn Fn(&ArrayData) -> ArrayData + 'a>;
#[deprecated]
#[allow(deprecated)]
pub fn build_filter(filter: &BooleanArray) -> Result<Filter, ArrowError> {
let iter = SlicesIterator::new(filter);
let iter = build_slices_iterator(filter);
let filter_count = filter_count(filter);
let chunks = iter.collect::<Vec<_>>();

Expand Down Expand Up @@ -243,7 +221,7 @@ impl FilterBuilder {
pub fn optimize(mut self) -> Self {
match self.strategy {
IterationStrategy::SlicesIterator => {
let slices = SlicesIterator::new(&self.filter).collect();
let slices = build_slices_iterator(&self.filter).collect();
self.strategy = IterationStrategy::Slices(slices)
}
IterationStrategy::IndexIterator => {
Expand Down Expand Up @@ -384,7 +362,7 @@ fn filter_array(
.for_each(|(start, end)| mutable.extend(0, *start, *end));
}
_ => {
let iter = SlicesIterator::new(&predicate.filter);
let iter = build_slices_iterator(&predicate.filter);
iter.for_each(|(start, end)| mutable.extend(0, start, end));
}
}
Expand Down Expand Up @@ -445,7 +423,7 @@ fn filter_bits(buffer: &Buffer, offset: usize, predicate: &FilterPredicate) -> B
IterationStrategy::SlicesIterator => {
let mut builder =
BooleanBufferBuilder::new(bit_util::ceil(predicate.count, 8));
for (start, end) in SlicesIterator::new(&predicate.filter) {
for (start, end) in build_slices_iterator(&predicate.filter) {
builder.append_packed_range(start + offset..end + offset, src)
}
builder.finish()
Expand Down Expand Up @@ -501,7 +479,7 @@ where
IterationStrategy::SlicesIterator => {
let mut buffer =
MutableBuffer::with_capacity(predicate.count * T::get_byte_width());
for (start, end) in SlicesIterator::new(&predicate.filter) {
for (start, end) in build_slices_iterator(&predicate.filter) {
buffer.extend_from_slice(&values[start..end]);
}
buffer
Expand Down Expand Up @@ -640,7 +618,7 @@ where

match &predicate.strategy {
IterationStrategy::SlicesIterator => {
filter.extend_slices(SlicesIterator::new(&predicate.filter))
filter.extend_slices(build_slices_iterator(&predicate.filter))
}
IterationStrategy::Slices(slices) => filter.extend_slices(slices.iter().cloned()),
IterationStrategy::IndexIterator => {
Expand Down Expand Up @@ -973,7 +951,7 @@ mod tests {
let filter = BooleanArray::from(filter_values);
let filter_count = filter_count(&filter);

let iter = SlicesIterator::new(&filter);
let iter = build_slices_iterator(&filter);
let chunks = iter.collect::<Vec<_>>();

assert_eq!(chunks, vec![(1, 2)]);
Expand All @@ -986,7 +964,7 @@ mod tests {
let filter = BooleanArray::from(filter_values);
let filter_count = filter_count(&filter);

let iter = SlicesIterator::new(&filter);
let iter = build_slices_iterator(&filter);
let chunks = iter.collect::<Vec<_>>();

assert_eq!(chunks, vec![(0, 1), (2, 64)]);
Expand All @@ -999,7 +977,7 @@ mod tests {
let filter = BooleanArray::from(filter_values);
let filter_count = filter_count(&filter);

let iter = SlicesIterator::new(&filter);
let iter = build_slices_iterator(&filter);
let chunks = iter.collect::<Vec<_>>();

assert_eq!(chunks, vec![(1, 62), (63, 124), (125, 130)]);
Expand Down Expand Up @@ -1048,7 +1026,7 @@ mod tests {

let bool_array: BooleanArray = bools.map(Some).collect();

let slices: Vec<_> = SlicesIterator::new(&bool_array).collect();
let slices: Vec<_> = build_slices_iterator(&bool_array).collect();
let expected = vec![(0, 10), (40, 60), (77, 81)];
assert_eq!(slices, expected);

Expand All @@ -1059,7 +1037,7 @@ mod tests {
.as_any()
.downcast_ref::<BooleanArray>()
.unwrap();
let slices: Vec<_> = SlicesIterator::new(sliced_array).collect();
let slices: Vec<_> = build_slices_iterator(sliced_array).collect();
let expected = vec![(0, 3), (33, 53), (70, 71)];
assert_eq!(slices, expected);
}
Expand All @@ -1084,7 +1062,7 @@ mod tests {

let filter = BooleanArray::from(data);

let slice_bits: Vec<_> = SlicesIterator::new(&filter)
let slice_bits: Vec<_> = build_slices_iterator(&filter)
.flat_map(|(start, end)| start..end)
.collect();

Expand Down
3 changes: 3 additions & 0 deletions arrow/benches/equal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ fn add_benchmark(c: &mut Criterion) {
let arr_a_nulls = create_primitive_array::<Float32Type>(512, 0.5);
c.bench_function("equal_nulls_512", |b| b.iter(|| bench_equal(&arr_a_nulls)));

let arr_a = create_primitive_array::<Float32Type>(51200, 0.1);
c.bench_function("equal_51200", |b| b.iter(|| bench_equal(&arr_a)));

let arr_a = create_string_array::<i32>(512, 0.0);
c.bench_function("equal_string_512", |b| b.iter(|| bench_equal(&arr_a)));

Expand Down
Loading