Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Efficient deserialization for Buffer<u8> #103

Merged
merged 7 commits into from
Mar 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 62 additions & 12 deletions arrow2_convert/benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,50 @@ use arrow2_convert::{
};
use criterion::{black_box, criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};

// Arrow stores U8 arrays as `arrow2::array::BinaryArray`
#[derive(ArrowField, ArrowSerialize, ArrowDeserialize)]
#[arrow_field(transparent)]
pub struct BufStruct(Buffer<u16>);
pub struct BufU8Struct(Buffer<u8>);

// Arrow stores other arrows as `arrow2::array::ListArray`
#[derive(ArrowField, ArrowSerialize, ArrowDeserialize)]
#[arrow_field(transparent)]
pub struct VecStruct(Vec<u16>);
pub struct BufU32Struct(Buffer<u32>);

// Arrow stores U8 arrows as `arrow2::array::BinaryArray`
#[derive(ArrowField, ArrowSerialize, ArrowDeserialize)]
#[arrow_field(transparent)]
pub struct VecU8Struct(Vec<u8>);

// Arrow stores other arrows as `arrow2::array::ListArray`
#[derive(ArrowField, ArrowSerialize, ArrowDeserialize)]
#[arrow_field(transparent)]
pub struct VecU32Struct(Vec<u32>);

pub fn bench_buffer_serialize(c: &mut Criterion) {
let mut group = c.benchmark_group("serialize");
for size in [1, 10, 100, 1000, 10000].iter() {
group.throughput(Throughput::Elements(*size as u64));
group.bench_with_input(BenchmarkId::new("Buffer", size), size, |b, &size| {
let data = [BufStruct((0..size as u16).into_iter().collect())];
group.bench_with_input(BenchmarkId::new("BufferU8", size), size, |b, &size| {
let data = [BufU8Struct((0..size as u8).into_iter().collect())];
b.iter(|| {
let _: Box<dyn Array> = TryIntoArrow::try_into_arrow(black_box(&data)).unwrap();
});
});
group.bench_with_input(BenchmarkId::new("Vec", size), size, |b, &size| {
let data = [VecStruct((0..size as u16).into_iter().collect())];
group.bench_with_input(BenchmarkId::new("VecU8", size), size, |b, &size| {
let data = [VecU8Struct((0..size as u8).into_iter().collect())];
b.iter(|| {
let _: Box<dyn Array> = TryIntoArrow::try_into_arrow(black_box(&data)).unwrap();
});
});
group.bench_with_input(BenchmarkId::new("BufferU32", size), size, |b, &size| {
let data = [BufU32Struct((0..size as u32).into_iter().collect())];
b.iter(|| {
let _: Box<dyn Array> = TryIntoArrow::try_into_arrow(black_box(&data)).unwrap();
});
});
group.bench_with_input(BenchmarkId::new("VecU32", size), size, |b, &size| {
let data = [VecU32Struct((0..size as u32).into_iter().collect())];
b.iter(|| {
let _: Box<dyn Array> = TryIntoArrow::try_into_arrow(black_box(&data)).unwrap();
});
Expand All @@ -35,27 +59,53 @@ pub fn bench_buffer_deserialize(c: &mut Criterion) {
let mut group = c.benchmark_group("deserialize");
for size in [1, 10, 100, 1000, 10000].iter() {
group.throughput(Throughput::Elements(*size as u64));
group.bench_with_input(BenchmarkId::new("Buffer", size), size, |b, &size| {
let data: Box<dyn Array> = [BufStruct((0..size as u16).into_iter().collect())]
group.bench_with_input(BenchmarkId::new("BufferU8", size), size, |b, &size| {
let data: Box<dyn Array> = [BufU8Struct((0..size as u8).into_iter().collect())]
.try_into_arrow()
.unwrap();
b.iter_batched(
|| data.clone(),
|data| {
let _: Vec<BufU8Struct> =
TryIntoCollection::try_into_collection(black_box(data)).unwrap();
},
criterion::BatchSize::SmallInput,
)
});
group.bench_with_input(BenchmarkId::new("VecU8", size), size, |b, &size| {
let data: Box<dyn Array> = [VecU8Struct((0..size as u8).into_iter().collect())]
.try_into_arrow()
.unwrap();
b.iter_batched(
|| data.clone(),
|data| {
let _: Vec<VecU8Struct> =
TryIntoCollection::try_into_collection(black_box(data)).unwrap();
},
criterion::BatchSize::SmallInput,
);
});
group.bench_with_input(BenchmarkId::new("BufferU32", size), size, |b, &size| {
let data: Box<dyn Array> = [BufU32Struct((0..size as u32).into_iter().collect())]
.try_into_arrow()
.unwrap();
b.iter_batched(
|| data.clone(),
|data| {
let _: Vec<BufStruct> =
let _: Vec<BufU32Struct> =
TryIntoCollection::try_into_collection(black_box(data)).unwrap();
},
criterion::BatchSize::SmallInput,
)
});
group.bench_with_input(BenchmarkId::new("Vec", size), size, |b, &size| {
let data: Box<dyn Array> = [VecStruct((0..size as u16).into_iter().collect())]
group.bench_with_input(BenchmarkId::new("VecU32", size), size, |b, &size| {
let data: Box<dyn Array> = [VecU32Struct((0..size as u32).into_iter().collect())]
.try_into_arrow()
.unwrap();
b.iter_batched(
|| data.clone(),
|data| {
let _: Vec<VecStruct> =
let _: Vec<VecU32Struct> =
TryIntoCollection::try_into_collection(black_box(data)).unwrap();
},
criterion::BatchSize::SmallInput,
Expand Down
58 changes: 58 additions & 0 deletions arrow2_convert/src/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,64 @@ impl ArrowDeserialize for NaiveDate {
}
}

/// Iterator for for [`BufferBinaryArray`]
pub struct BufferBinaryArrayIter<'a> {
index: usize,
array: &'a BinaryArray<i32>,
}

impl<'a> Iterator for BufferBinaryArrayIter<'a> {
type Item = Option<Buffer<u8>>;

fn next(&mut self) -> Option<Self::Item> {
if self.index >= self.array.len() {
None
} else {
if let Some(validity) = self.array.validity() {
if !validity.get_bit(self.index) {
self.index += 1;
return Some(None);
}
}
let (start, end) = self.array.offsets().start_end(self.index);
self.index += 1;
Some(Some(self.array.values().clone().slice(start, end - start)))
}
}
}

/// Internal `ArrowArray` helper to iterate over a `BinaryArray` while exposing Buffer slices
pub struct BufferBinaryArray;

impl<'a> IntoIterator for &'a BufferBinaryArray {
type Item = Option<Buffer<u8>>;

type IntoIter = BufferBinaryArrayIter<'a>;

fn into_iter(self) -> Self::IntoIter {
unimplemented!("Use iter_from_array_ref");
}
}

impl ArrowArray for BufferBinaryArray {
type BaseArrayType = BinaryArray<i32>;
#[inline]
fn iter_from_array_ref(a: &dyn Array) -> <&Self as IntoIterator>::IntoIter {
let b = a.as_any().downcast_ref::<Self::BaseArrayType>().unwrap();

BufferBinaryArrayIter { index: 0, array: b }
}
}

impl ArrowDeserialize for Buffer<u8> {
type ArrayType = BufferBinaryArray;

#[inline]
fn arrow_deserialize(v: Option<Buffer<u8>>) -> Option<Self> {
v
}
}

impl ArrowDeserialize for Vec<u8> {
type ArrayType = BinaryArray<i32>;

Expand Down
4 changes: 3 additions & 1 deletion arrow2_convert/src/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,15 +281,17 @@ arrow_enable_vec_for_type!(bool);
arrow_enable_vec_for_type!(NaiveDateTime);
arrow_enable_vec_for_type!(NaiveDate);
arrow_enable_vec_for_type!(Vec<u8>);
arrow_enable_vec_for_type!(Buffer<u8>);
arrow_enable_vec_for_type!(LargeBinary);
impl<const SIZE: usize> ArrowEnableVecForType for FixedSizeBinary<SIZE> {}
impl<const PRECISION: usize, const SCALE: usize> ArrowEnableVecForType for I128<PRECISION, SCALE> {}

// Blanket implementation for Vec<Option<T>> if vectors are enabled for T
impl<T> ArrowEnableVecForType for Option<T> where T: ArrowField + ArrowEnableVecForType {}

// Blanket implementation for Vec<Vec<T>> if vectors are enabled for T
// Blanket implementation for Vec<Vec<T>> and Vec<Buffer<T>> if vectors or buffers are enabled for T
impl<T> ArrowEnableVecForType for Vec<T> where T: ArrowField + ArrowEnableVecForType {}
impl<T> ArrowEnableVecForType for Buffer<T> where T: ArrowField + ArrowEnableVecForType {}
impl<T> ArrowEnableVecForType for LargeVec<T> where T: ArrowField + ArrowEnableVecForType {}
impl<T, const SIZE: usize> ArrowEnableVecForType for FixedSizeVec<T, SIZE> where
T: ArrowField + ArrowEnableVecForType
Expand Down
23 changes: 22 additions & 1 deletion arrow2_convert/tests/test_deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,32 @@ fn test_deserialize_large_types_schema_mismatch_error() {
}

#[test]
fn test_deserialize_buffer() {
fn test_deserialize_buffer_u16() {
let original_array = [Buffer::from_iter(0u16..5), Buffer::from_iter(7..9)];
let b: Box<dyn Array> = original_array.try_into_arrow().unwrap();
let iter = arrow_array_deserialize_iterator::<Buffer<u16>>(b.as_ref()).unwrap();
for (i, k) in iter.zip(original_array.iter()) {
assert_eq!(&i, k);
}
}

#[test]
fn test_deserialize_buffer_u8() {
let original_array = [Buffer::from_iter(0u8..5), Buffer::from_iter(7..9)];
let b: Box<dyn Array> = original_array.try_into_arrow().unwrap();
let iter = arrow_array_deserialize_iterator::<Buffer<u8>>(b.as_ref()).unwrap();
for (i, k) in iter.zip(original_array.iter()) {
assert_eq!(&i, k);
}

let original_array = [
Some(Buffer::from_iter(0u8..5)),
None,
Some(Buffer::from_iter(7..9)),
];
let b: Box<dyn Array> = original_array.try_into_arrow().unwrap();
let iter = arrow_array_deserialize_iterator::<Option<Buffer<u8>>>(b.as_ref()).unwrap();
for (i, k) in iter.zip(original_array.iter()) {
assert_eq!(&i, k);
}
}