Skip to content

Commit

Permalink
Implement faster arrow array reader (#384)
Browse files Browse the repository at this point in the history
* implement ArrowArrayReader

* change StringArrayConverter to use push_unchecked for offsets

* add ASF license header to new files

* fix clippy issues

* cleanup arrow_array_reader benches

* cleanup arrow_array_reader

* change util module to limit public exports from test_common sub-module

* fix rustfmt issues
  • Loading branch information
yordan-pavlov authored Jun 10, 2021
1 parent 0c00776 commit 71e9d78
Show file tree
Hide file tree
Showing 20 changed files with 2,498 additions and 77 deletions.
5 changes: 5 additions & 0 deletions arrow/src/array/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,11 @@ impl ArrayDataBuilder {
self
}

pub fn null_count(mut self, null_count: usize) -> Self {
self.null_count = Some(null_count);
self
}

pub fn null_bit_buffer(mut self, buf: Buffer) -> Self {
self.null_bit_buffer = Some(buf);
self
Expand Down
66 changes: 49 additions & 17 deletions arrow/src/array/transform/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::mem;

use super::{
data::{into_buffers, new_buffers},
ArrayData,
ArrayData, ArrayDataBuilder,
};
use crate::array::StringOffsetSizeTrait;

Expand Down Expand Up @@ -63,7 +63,7 @@ struct _MutableArrayData<'a> {
}

impl<'a> _MutableArrayData<'a> {
fn freeze(self, dictionary: Option<ArrayData>) -> ArrayData {
fn freeze(self, dictionary: Option<ArrayData>) -> ArrayDataBuilder {
let buffers = into_buffers(&self.data_type, self.buffer1, self.buffer2);

let child_data = match self.data_type {
Expand All @@ -76,19 +76,19 @@ impl<'a> _MutableArrayData<'a> {
child_data
}
};
ArrayData::new(
self.data_type,
self.len,
Some(self.null_count),
if self.null_count > 0 {
Some(self.null_buffer.into())
} else {
None
},
0,
buffers,
child_data,
)

let mut array_data_builder = ArrayDataBuilder::new(self.data_type)
.offset(0)
.len(self.len)
.null_count(self.null_count)
.buffers(buffers)
.child_data(child_data);
if self.null_count > 0 {
array_data_builder =
array_data_builder.null_bit_buffer(self.null_buffer.into());
}

array_data_builder
}
}

Expand Down Expand Up @@ -552,8 +552,13 @@ impl<'a> MutableArrayData<'a> {
.map(|array| build_extend_null_bits(array, use_nulls))
.collect();

let null_bytes = bit_util::ceil(array_capacity, 8);
let null_buffer = MutableBuffer::from_len_zeroed(null_bytes);
let null_buffer = if use_nulls {
let null_bytes = bit_util::ceil(array_capacity, 8);
MutableBuffer::from_len_zeroed(null_bytes)
} else {
// create 0 capacity mutable buffer with the intention that it won't be used
MutableBuffer::with_capacity(0)
};

let extend_values = match &data_type {
DataType::Dictionary(_, _) => {
Expand Down Expand Up @@ -605,13 +610,40 @@ impl<'a> MutableArrayData<'a> {

/// Extends this [MutableArrayData] with null elements, disregarding the bound arrays
pub fn extend_nulls(&mut self, len: usize) {
// TODO: null_buffer should probably be extended here as well
// otherwise is_valid() could later panic
// add test to confirm
self.data.null_count += len;
(self.extend_nulls)(&mut self.data, len);
self.data.len += len;
}

/// Returns the current length
#[inline]
pub fn len(&self) -> usize {
self.data.len
}

/// Returns true if len is 0
#[inline]
pub fn is_empty(&self) -> bool {
self.data.len == 0
}

/// Returns the current null count
#[inline]
pub fn null_count(&self) -> usize {
self.data.null_count
}

/// Creates a [ArrayData] from the pushed regions up to this point, consuming `self`.
pub fn freeze(self) -> ArrayData {
self.data.freeze(self.dictionary).build()
}

/// Creates a [ArrayDataBuilder] from the pushed regions up to this point, consuming `self`.
/// This is useful for extending the default behavior of MutableArrayData.
pub fn into_builder(self) -> ArrayDataBuilder {
self.data.freeze(self.dictionary)
}
}
Expand Down
4 changes: 2 additions & 2 deletions arrow/src/buffer/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,10 +327,10 @@ impl MutableBuffer {
}

/// Extends the buffer with a new item, without checking for sufficient capacity
/// Safety
/// # Safety
/// Caller must ensure that the capacity()-len()>=size_of<T>()
#[inline]
unsafe fn push_unchecked<T: ToByteSlice>(&mut self, item: T) {
pub unsafe fn push_unchecked<T: ToByteSlice>(&mut self, item: T) {
let additional = std::mem::size_of::<T>();
let dst = self.data.as_ptr().add(self.len) as *mut T;
std::ptr::write(dst, item);
Expand Down
32 changes: 17 additions & 15 deletions arrow/src/compute/kernels/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ enum State {
/// slots of a [BooleanArray] are true. Each interval corresponds to a contiguous region of memory to be
/// "taken" from an array to be filtered.
#[derive(Debug)]
pub(crate) struct SlicesIterator<'a> {
pub struct SlicesIterator<'a> {
iter: Enumerate<BitChunkIterator<'a>>,
state: State,
filter_count: usize,
filter: &'a BooleanArray,
remainder_mask: u64,
remainder_len: usize,
chunk_len: usize,
Expand All @@ -59,19 +59,14 @@ pub(crate) struct SlicesIterator<'a> {
}

impl<'a> SlicesIterator<'a> {
pub(crate) fn new(filter: &'a BooleanArray) -> Self {
pub fn new(filter: &'a BooleanArray) -> Self {
let values = &filter.data_ref().buffers()[0];

// this operation is performed before iteration
// because it is fast and allows reserving all the needed memory
let filter_count = values.count_set_bits_offset(filter.offset(), filter.len());

let chunks = values.bit_chunks(filter.offset(), filter.len());

Self {
iter: chunks.iter().enumerate(),
state: State::Chunks,
filter_count,
filter,
remainder_len: chunks.remainder_len(),
chunk_len: chunks.chunk_len(),
remainder_mask: chunks.remainder_bits(),
Expand All @@ -83,6 +78,12 @@ impl<'a> SlicesIterator<'a> {
}
}

/// Counts the number of set bits in the filter array.
fn filter_count(&self) -> usize {
let values = self.filter.values();
values.count_set_bits_offset(self.filter.offset(), self.filter.len())
}

#[inline]
fn current_start(&self) -> usize {
self.current_chunk * 64 + self.current_bit
Expand Down Expand Up @@ -193,7 +194,7 @@ impl<'a> Iterator for SlicesIterator<'a> {
/// Therefore, it is considered undefined behavior to pass `filter` with null values.
pub fn build_filter(filter: &BooleanArray) -> Result<Filter> {
let iter = SlicesIterator::new(filter);
let filter_count = iter.filter_count;
let filter_count = iter.filter_count();
let chunks = iter.collect::<Vec<_>>();

Ok(Box::new(move |array: &ArrayData| {
Expand Down Expand Up @@ -253,7 +254,8 @@ pub fn filter(array: &Array, predicate: &BooleanArray) -> Result<ArrayRef> {
}

let iter = SlicesIterator::new(predicate);
match iter.filter_count {
let filter_count = iter.filter_count();
match filter_count {
0 => {
// return empty
Ok(new_empty_array(array.data_type()))
Expand All @@ -266,7 +268,7 @@ pub fn filter(array: &Array, predicate: &BooleanArray) -> Result<ArrayRef> {
_ => {
// actually filter
let mut mutable =
MutableArrayData::new(vec![array.data_ref()], false, iter.filter_count);
MutableArrayData::new(vec![array.data_ref()], false, filter_count);
iter.for_each(|(start, end)| mutable.extend(0, start, end));
let data = mutable.freeze();
Ok(make_array(data))
Expand Down Expand Up @@ -599,7 +601,7 @@ mod tests {
let filter = BooleanArray::from(filter_values);

let iter = SlicesIterator::new(&filter);
let filter_count = iter.filter_count;
let filter_count = iter.filter_count();
let chunks = iter.collect::<Vec<_>>();

assert_eq!(chunks, vec![(1, 2)]);
Expand All @@ -612,7 +614,7 @@ mod tests {
let filter = BooleanArray::from(filter_values);

let iter = SlicesIterator::new(&filter);
let filter_count = iter.filter_count;
let filter_count = iter.filter_count();
let chunks = iter.collect::<Vec<_>>();

assert_eq!(chunks, vec![(0, 1), (2, 64)]);
Expand All @@ -625,7 +627,7 @@ mod tests {
let filter = BooleanArray::from(filter_values);

let iter = SlicesIterator::new(&filter);
let filter_count = iter.filter_count;
let filter_count = iter.filter_count();
let chunks = iter.collect::<Vec<_>>();

assert_eq!(chunks, vec![(1, 62), (63, 124), (125, 130)]);
Expand Down
5 changes: 5 additions & 0 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ arrow = { path = "../arrow", version = "5.0.0-SNAPSHOT", optional = true }
base64 = { version = "0.13", optional = true }
clap = { version = "2.33.3", optional = true }
serde_json = { version = "1.0", features = ["preserve_order"], optional = true }
rand = "0.8"

[dev-dependencies]
criterion = "0.3"
Expand Down Expand Up @@ -76,3 +77,7 @@ required-features = ["cli"]
[[bench]]
name = "arrow_writer"
harness = false

[[bench]]
name = "arrow_array_reader"
harness = false
Loading

0 comments on commit 71e9d78

Please sign in to comment.