Skip to content

Commit

Permalink
added ArrayConverter trait, implemented decoder iterators for plain e…
Browse files Browse the repository at this point in the history
…ncoding, string array test now passes
  • Loading branch information
yordan-pavlov committed Apr 25, 2021
1 parent 3a820c5 commit dc93466
Show file tree
Hide file tree
Showing 3 changed files with 242 additions and 64 deletions.
195 changes: 131 additions & 64 deletions rust/parquet/src/arrow/arrow_array_reader.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{any::Any, collections::VecDeque};
use std::{rc::Rc, cell::RefCell};
use arrow::{array::{ArrayRef, Int16Array}, buffer::{MutableBuffer, Buffer}, datatypes::{DataType as ArrowType}};
use arrow::{array::{ArrayRef, Int16Array}, buffer::MutableBuffer, datatypes::{DataType as ArrowType}};
use crate::{column::page::{Page, PageIterator}, memory::{ByteBufferPtr, BufferPtr}, schema::types::{ColumnDescPtr, ColumnDescriptor}};
use crate::arrow::schema::parquet_to_arrow_field;
use crate::errors::{ParquetError, Result};
Expand Down Expand Up @@ -245,45 +245,65 @@ impl<T: Clone> Splittable for Result<BufferPtr<T>> {
}
}

impl Splittable for Result<(usize, ByteBufferPtr)> {
type BufferType = Result<(usize, ByteBufferPtr)>;
type OutputType = Result<(usize, ByteBufferPtr)>;
use crate::encodings::decoding::ValueByteChunk;

impl Splittable for Result<ValueByteChunk> {
type BufferType = Result<ValueByteChunk>;
type OutputType = Result<ValueByteChunk>;

#[inline]
fn len(&self) -> usize {
match self {
Ok(x) => x.0,
Ok(x) => x.value_count,
_ => 0
}
}

#[inline]
fn split(self, len: usize) -> (Self::BufferType, Self::BufferType) {
let (value_count, mut byte_buffer) = if let Ok(item) = self {
let mut value_byte_chunk = if let Ok(item) = self {
item
}
else {
// this shouldn't happen as len() returns 0 for error
// so it should always be fully consumed and never split
return (self.clone(), self);
};
let value_size = byte_buffer.len() / value_count;
(Ok((len, byte_buffer.split_to( len * value_size))), Ok((value_count - len, byte_buffer)))
let value_bit_len = value_byte_chunk.value_bit_len;
let bit_len = len * value_bit_len;
// TODO: use bit_offset when splitting bit / bool values
assert!(bit_len % 8 == 0, "value byte buffer can only be split into whole bytes");
let byte_len = bit_len / 8;

let split_value_chunk = ValueByteChunk::new(
value_byte_chunk.data.split_to(byte_len),
len,
value_bit_len,
);
value_byte_chunk.value_count -= len;
(Ok(split_value_chunk), Ok(value_byte_chunk))
}
}

struct ArrowArrayReader<'a> {
type LevelBufferPtr = BufferPtr<i16>;

trait ArrayConverter {
fn convert_value_chunks(&self, value_byte_chunks: Vec<ValueByteChunk>) -> Result<arrow::array::ArrayData>;
}

struct ArrowArrayReader<'a, C: ArrayConverter + 'a> {
column_desc: ColumnDescPtr,
data_type: ArrowType,
def_level_iter_factory: SplittableBatchingIteratorFactory<'a, Result<BufferPtr<i16>>>,
rep_level_iter_factory: SplittableBatchingIteratorFactory<'a, Result<BufferPtr<i16>>>,
value_iter_factory: SplittableBatchingIteratorFactory<'a, Result<(usize, ByteBufferPtr)>>,
def_level_iter_factory: SplittableBatchingIteratorFactory<'a, Result<LevelBufferPtr>>,
rep_level_iter_factory: SplittableBatchingIteratorFactory<'a, Result<LevelBufferPtr>>,
value_iter_factory: SplittableBatchingIteratorFactory<'a, Result<ValueByteChunk>>,
last_def_levels: Option<Int16Array>,
last_rep_levels: Option<Int16Array>,
array_converter: C,
}

impl<'a> ArrowArrayReader<'a> {
fn try_new<P: PageIterator + 'a>(column_chunk_iterator: P, column_desc: ColumnDescPtr, arrow_type: Option<ArrowType>) -> Result<Self> {
impl<'a, C: ArrayConverter + 'a> ArrowArrayReader<'a, C> {
fn try_new<P: PageIterator + 'a>(column_chunk_iterator: P, column_desc: ColumnDescPtr, array_converter: C, arrow_type: Option<ArrowType>) -> Result<Self> {
let data_type = match arrow_type {
Some(t) => t,
None => parquet_to_arrow_field(column_desc.as_ref())?
Expand Down Expand Up @@ -332,6 +352,7 @@ impl<'a> ArrowArrayReader<'a> {
value_iter_factory: SplittableBatchingIteratorFactory::new(value_iter),
last_def_levels: None,
last_rep_levels: None,
array_converter,
})
}

Expand All @@ -345,14 +366,14 @@ impl<'a> ArrowArrayReader<'a> {
column_desc.max_rep_level() > 0
}

fn map_page_error(err: ParquetError) -> (Box<dyn Iterator<Item = Result<(usize, ByteBufferPtr)>>>, Box<dyn Iterator<Item = Result<BufferPtr<i16>>>>, Box<dyn Iterator<Item = Result<BufferPtr<i16>>>>)
fn map_page_error(err: ParquetError) -> (Box<dyn Iterator<Item = Result<ValueByteChunk>>>, Box<dyn Iterator<Item = Result<LevelBufferPtr>>>, Box<dyn Iterator<Item = Result<LevelBufferPtr>>>)
{
(Box::new(std::iter::once(Err(err.clone()))), Box::new(std::iter::once(Err(err.clone()))), Box::new(std::iter::once(Err(err))))
}

// Split Result<Page> into Result<(Iterator<Values>, Iterator<DefLevels>, Iterator<RepLevels>)>
// this method could fail, e.g. if the page encoding is not supported
fn map_page(page: Page, column_desc: &ColumnDescriptor) -> Result<(Box<dyn Iterator<Item = Result<(usize, ByteBufferPtr)>>>, Box<dyn Iterator<Item = Result<BufferPtr<i16>>>>, Box<dyn Iterator<Item = Result<BufferPtr<i16>>>>)>
fn map_page(page: Page, column_desc: &ColumnDescriptor) -> Result<(Box<dyn Iterator<Item = Result<ValueByteChunk>>>, Box<dyn Iterator<Item = Result<LevelBufferPtr>>>, Box<dyn Iterator<Item = Result<LevelBufferPtr>>>)>
{
use crate::encodings::levels::LevelDecoder;
// process page (V1, V2, Dictionary)
Expand All @@ -370,7 +391,7 @@ impl<'a> ArrowArrayReader<'a> {
} => {
let mut offset = 0;
// create rep level decoder iterator
let rep_level_iter: Box<dyn Iterator<Item = Result<BufferPtr<i16>>>> = if Self::rep_levels_available(&column_desc) {
let rep_level_iter: Box<dyn Iterator<Item = Result<LevelBufferPtr>>> = if Self::rep_levels_available(&column_desc) {
let rep_levels_byte_len = rep_levels_byte_len as usize;
let mut rep_decoder =
LevelDecoder::v2(column_desc.max_rep_level());
Expand All @@ -387,10 +408,10 @@ impl<'a> ArrowArrayReader<'a> {
Box::new(std::iter::once(Err(ParquetError::General(format!("rep levels are not available")))))
};
// create def level decoder iterator
let def_level_iter: Box<dyn Iterator<Item = Result<BufferPtr<i16>>>> = if Self::def_levels_available(&column_desc) {
let def_level_iter: Box<dyn Iterator<Item = Result<LevelBufferPtr>>> = if Self::def_levels_available(&column_desc) {
let def_levels_byte_len = def_levels_byte_len as usize;
let mut def_decoder =
LevelDecoder::v2(column_desc.max_rep_level());
LevelDecoder::v2(column_desc.max_def_level());
def_decoder.set_data_range(
num_values as usize,
&buf,
Expand All @@ -405,10 +426,12 @@ impl<'a> ArrowArrayReader<'a> {
};

// create value decoder iterator
let value_iter = vec![Ok((0, ByteBufferPtr::new(Vec::<u8>::new())))];

let values_buffer = buf.start_from(offset);
let value_iter = Self::get_values_decoder_iter(
values_buffer, num_values as usize, encoding, column_desc
)?;
Ok((
Box::new(value_iter.into_iter()),
value_iter,
def_level_iter,
rep_level_iter
))
Expand All @@ -419,35 +442,48 @@ impl<'a> ArrowArrayReader<'a> {
}
}

fn convert_value_buffers(value_buffers: Vec<(usize, ByteBufferPtr)>) -> Result<arrow::array::ArrayData> {
use arrow::datatypes::ArrowNativeType;
let data_len = value_buffers.len();
let offset_size = std::mem::size_of::<i32>();
let mut offsets = MutableBuffer::new((data_len + 1) * offset_size);
let values_byte_len = value_buffers.iter().map(|(_len, bytes)| bytes.len()).sum();
let mut values = MutableBuffer::new(values_byte_len);

let mut length_so_far = i32::default();
offsets.push(length_so_far);
fn get_values_decoder_iter(values_buffer: ByteBufferPtr, num_values: usize, mut encoding: crate::basic::Encoding, column_desc: &ColumnDescriptor) -> Result<Box<dyn Iterator<Item = Result<ValueByteChunk>>>> {
use crate::basic::Encoding;
if encoding == Encoding::PLAIN_DICTIONARY {
encoding = Encoding::RLE_DICTIONARY;
}

for (value_len, value_bytes) in value_buffers {
debug_assert_eq!(
value_len, 1,
"offset length value buffers can only contain bytes for a single value"
);
length_so_far += <i32 as ArrowNativeType>::from_usize(value_bytes.len()).unwrap();
offsets.push(length_so_far);
values.extend_from_slice(value_bytes.data());
match encoding {
Encoding::PLAIN => Self::get_plain_decoder_iterator(values_buffer, num_values, column_desc),
// Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
// return Err(general_err!(
// "Cannot initialize this encoding through this function"
// ));
// }
// Encoding::RLE => Box::new(RleValueDecoder::new()),
// Encoding::DELTA_BINARY_PACKED => Box::new(DeltaBitPackDecoder::new()),
// Encoding::DELTA_LENGTH_BYTE_ARRAY => Box::new(DeltaLengthByteArrayDecoder::new()),
// Encoding::DELTA_BYTE_ARRAY => Box::new(DeltaByteArrayDecoder::new()),
e => return Err(nyi_err!("Encoding {} is not supported", e)),
}
let array_data = arrow::array::ArrayData::builder(ArrowType::Utf8)
.len(data_len)
.add_buffer(offsets.into())
.add_buffer(values.into())
.build();
Ok(array_data)
}

fn build_level_array(level_buffers: Vec<BufferPtr<i16>>) -> Int16Array {
fn get_plain_decoder_iterator(values_buffer: ByteBufferPtr, num_values: usize, column_desc: &ColumnDescriptor) -> Result<Box<dyn Iterator<Item = Result<ValueByteChunk>>>> {
use crate::encodings::decoding::{FixedLenPlainDecoder, VariableLenPlainDecoder};
use crate::basic::Type as PhysicalType;

// parquet only supports a limited number of physical types
// later converters cast to a more specific arrow / logical type if necessary
let value_bit_len: usize = match column_desc.physical_type() {
PhysicalType::BOOLEAN => 1,
PhysicalType::INT32 | PhysicalType::FLOAT => 32,
PhysicalType::INT64 | PhysicalType::DOUBLE => 64,
PhysicalType::INT96 => 96,
PhysicalType::BYTE_ARRAY => {
return Ok(Box::new(VariableLenPlainDecoder::new(values_buffer, num_values)));
},
PhysicalType::FIXED_LEN_BYTE_ARRAY => column_desc.type_length() as usize * 8,
};

Ok(Box::new(FixedLenPlainDecoder::new(values_buffer, num_values, value_bit_len)))
}

fn build_level_array(level_buffers: Vec<LevelBufferPtr>) -> Int16Array {
let value_count = level_buffers.iter().map(|levels| levels.len()).sum();
let values_byte_len = value_count * std::mem::size_of::<i16>();
let mut value_buffer = MutableBuffer::new(values_byte_len);
Expand All @@ -462,7 +498,7 @@ impl<'a> ArrowArrayReader<'a> {
}
}

impl ArrayReader for ArrowArrayReader<'static> {
impl<C: ArrayConverter> ArrayReader for ArrowArrayReader<'static, C> {
fn as_any(&self) -> &dyn Any {
self
}
Expand All @@ -475,7 +511,7 @@ impl ArrayReader for ArrowArrayReader<'static> {
if Self::rep_levels_available(&self.column_desc) {
// read rep levels if available
let rep_level_iter = self.rep_level_iter_factory.get_batch_iter(batch_size);
let rep_level_buffers: Vec<BufferPtr<i16>> = rep_level_iter.collect::<Result<_>>()?;
let rep_level_buffers: Vec<LevelBufferPtr> = rep_level_iter.collect::<Result<_>>()?;
let rep_level_array = Self::build_level_array(rep_level_buffers);
self.last_rep_levels = Some(rep_level_array);
}
Expand All @@ -489,7 +525,7 @@ impl ArrayReader for ArrowArrayReader<'static> {
// if def levels are available - they determine how many values will be read
let def_level_iter = self.def_level_iter_factory.get_batch_iter(batch_size);
// decode def levels, return first error if any
let def_level_buffers: Vec<BufferPtr<i16>> = def_level_iter.collect::<Result<_>>()?;
let def_level_buffers: Vec<LevelBufferPtr> = def_level_iter.collect::<Result<_>>()?;
let def_level_array = Self::build_level_array(def_level_buffers);
let def_level_count = def_level_array.len();
// use eq_scalar to efficiently build null bitmap array from def levels
Expand All @@ -505,9 +541,9 @@ impl ArrayReader for ArrowArrayReader<'static> {
// collect and unwrap values into Vec, return first error if any
// this will separate reading and decoding values from creating an arrow array
// extra memory is allocated for the Vec, but the values still point to the page buffer
let values: Vec<(usize, ByteBufferPtr)> = value_iter.collect::<Result<_>>()?;
let values: Vec<ValueByteChunk> = value_iter.collect::<Result<_>>()?;
// converter only creates a no-null / all value array data
let mut value_array_data = Self::convert_value_buffers(values)?;
let mut value_array_data = self.array_converter.convert_value_chunks(values)?;

if let Some(null_bitmap_array) = null_bitmap_array {
// Only if def levels are available - insert null values efficiently using MutableArrayData.
Expand Down Expand Up @@ -552,6 +588,45 @@ impl ArrayReader for ArrowArrayReader<'static> {
}
}

struct StringArrayConverter {}

impl StringArrayConverter {
fn new() -> Self {
Self {}
}
}

impl ArrayConverter for StringArrayConverter {
fn convert_value_chunks(&self, value_byte_chunks: Vec<ValueByteChunk>) -> Result<arrow::array::ArrayData> {
use arrow::datatypes::ArrowNativeType;
let data_len = value_byte_chunks.len();
let offset_size = std::mem::size_of::<i32>();
let mut offsets = MutableBuffer::new((data_len + 1) * offset_size);
let values_byte_len = value_byte_chunks.iter().map(|x| x.data.len()).sum();
let mut values = MutableBuffer::new(values_byte_len);

let mut length_so_far = i32::default();
offsets.push(length_so_far);

for value_chunk in value_byte_chunks {
debug_assert_eq!(
value_chunk.value_count, 1,
"offset length value buffers can only contain bytes for a single value"
);
let value_bytes = value_chunk.data;
length_so_far += <i32 as ArrowNativeType>::from_usize(value_bytes.len()).unwrap();
offsets.push(length_so_far);
values.extend_from_slice(value_bytes.data());
}
let array_data = arrow::array::ArrayData::builder(ArrowType::Utf8)
.len(data_len)
.add_buffer(offsets.into())
.add_buffer(values.into())
.build();
Ok(array_data)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -631,18 +706,10 @@ mod tests {
}

let page_iterator = InMemoryPageIterator::new(schema, column_desc.clone(), pages);

// let converter = Utf8Converter::new(Utf8ArrayConverter {});
// let mut array_reader =
// ComplexObjectArrayReader::<ByteArrayType, Utf8Converter>::new(
// Box::new(page_iterator),
// column_desc,
// converter,
// None,
// )
// .unwrap();

let mut array_reader = ArrowArrayReader::try_new(page_iterator, column_desc, None).unwrap();
let converter = StringArrayConverter::new();
let mut array_reader = ArrowArrayReader::try_new(
page_iterator, column_desc, converter, None
).unwrap();

let mut accu_len: usize = 0;

Expand Down
Loading

0 comments on commit dc93466

Please sign in to comment.