Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Complete StringViewArray and BinaryViewArray parquet decoder: implement delta byte array and delta length byte array encoding #6004

Merged
merged 9 commits into from
Jul 8, 2024
2 changes: 1 addition & 1 deletion parquet/src/arrow/array_reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::sync::Arc;

use arrow_schema::{DataType, Fields, SchemaBuilder};

use crate::arrow::array_reader::byte_array::make_byte_view_array_reader;
use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader;
use crate::arrow::array_reader::empty_array::make_empty_array_reader;
use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
use crate::arrow::array_reader::{
Expand Down
63 changes: 22 additions & 41 deletions parquet/src/arrow/array_reader/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,36 +74,6 @@ pub fn make_byte_array_reader(
}
}

/// Returns an [`ArrayReader`] that decodes the provided byte array column to view types.
pub fn make_byte_view_array_reader(
pages: Box<dyn PageIterator>,
column_desc: ColumnDescPtr,
arrow_type: Option<ArrowType>,
) -> Result<Box<dyn ArrayReader>> {
// Check if Arrow type is specified, else create it from Parquet type
let data_type = match arrow_type {
Some(t) => t,
None => match parquet_to_arrow_field(column_desc.as_ref())?.data_type() {
ArrowType::Utf8 | ArrowType::Utf8View => ArrowType::Utf8View,
_ => ArrowType::BinaryView,
},
};

match data_type {
ArrowType::BinaryView | ArrowType::Utf8View => {
let reader = GenericRecordReader::new(column_desc);
Ok(Box::new(ByteArrayReader::<i32>::new(
pages, data_type, reader,
)))
}

_ => Err(general_err!(
"invalid data type for byte array reader read to view type - {}",
data_type
)),
}
}

/// An [`ArrayReader`] for variable length byte arrays
struct ByteArrayReader<I: OffsetSizeTrait> {
data_type: ArrowType,
Expand Down Expand Up @@ -472,6 +442,23 @@ impl ByteArrayDecoderDeltaLength {
let mut lengths = vec![0; values];
len_decoder.get(&mut lengths)?;

let mut total_bytes = 0;

for l in lengths.iter() {
if *l < 0 {
return Err(ParquetError::General(
"negative delta length byte array length".to_string(),
));
}
total_bytes += *l as usize;
}

if total_bytes + len_decoder.get_offset() > data.len() {
return Err(ParquetError::General(
"Insufficient delta length byte array bytes".to_string(),
));
}

Ok(Self {
lengths,
data,
Expand All @@ -496,23 +483,17 @@ impl ByteArrayDecoderDeltaLength {
let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
output.values.reserve(total_bytes);

if self.data_offset + total_bytes > self.data.len() {
return Err(ParquetError::EOF(
"Insufficient delta length byte array bytes".to_string(),
));
}

let mut start_offset = self.data_offset;
let mut current_offset = self.data_offset;
for length in src_lengths {
let end_offset = start_offset + *length as usize;
let end_offset = current_offset + *length as usize;
output.try_push(
&self.data.as_ref()[start_offset..end_offset],
&self.data.as_ref()[current_offset..end_offset],
self.validate_utf8,
)?;
start_offset = end_offset;
current_offset = end_offset;
}

self.data_offset = start_offset;
self.data_offset = current_offset;
self.length_offset += to_read;

if self.validate_utf8 {
Expand Down
205 changes: 191 additions & 14 deletions parquet/src/arrow/array_reader/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,23 @@

use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
use crate::arrow::buffer::view_buffer::ViewBuffer;
use crate::arrow::decoder::DictIndexDecoder;
use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
use crate::arrow::record_reader::GenericRecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
use crate::basic::{ConvertedType, Encoding};
use crate::column::page::PageIterator;
use crate::column::reader::decoder::ColumnValueDecoder;
use crate::data_type::Int32Type;
use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder};
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use arrow_array::ArrayRef;
use arrow_array::{builder::make_view, ArrayRef};
use arrow_data::ByteView;
use arrow_schema::DataType as ArrowType;
use bytes::Bytes;
use std::any::Any;

/// Returns an [`ArrayReader`] that decodes the provided byte array column to view types.
#[allow(unused)]
pub fn make_byte_view_array_reader(
pages: Box<dyn PageIterator>,
column_desc: ColumnDescPtr,
Expand Down Expand Up @@ -61,7 +62,6 @@ pub fn make_byte_view_array_reader(
}

/// An [`ArrayReader`] for variable length byte arrays
#[allow(unused)]
struct ByteViewArrayReader {
data_type: ArrowType,
pages: Box<dyn PageIterator>,
Expand Down Expand Up @@ -213,6 +213,8 @@ impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder {
pub enum ByteViewArrayDecoder {
Plain(ByteViewArrayDecoderPlain),
Dictionary(ByteViewArrayDecoderDictionary),
DeltaLength(ByteViewArrayDecoderDeltaLength),
DeltaByteArray(ByteViewArrayDecoderDelta),
}

impl ByteViewArrayDecoder {
Expand All @@ -235,9 +237,12 @@ impl ByteViewArrayDecoder {
data, num_levels, num_values,
))
}
Encoding::DELTA_LENGTH_BYTE_ARRAY | Encoding::DELTA_BYTE_ARRAY => {
unimplemented!("stay tuned!")
}
Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteViewArrayDecoder::DeltaLength(
ByteViewArrayDecoderDeltaLength::new(data, validate_utf8)?,
),
Encoding::DELTA_BYTE_ARRAY => ByteViewArrayDecoder::DeltaByteArray(
ByteViewArrayDecoderDelta::new(data, validate_utf8)?,
),
_ => {
return Err(general_err!(
"unsupported encoding for byte array: {}",
Expand All @@ -263,6 +268,8 @@ impl ByteViewArrayDecoder {
.ok_or_else(|| general_err!("dictionary required for dictionary encoding"))?;
d.read(out, dict, len)
}
ByteViewArrayDecoder::DeltaLength(d) => d.read(out, len),
ByteViewArrayDecoder::DeltaByteArray(d) => d.read(out, len),
}
}

Expand All @@ -275,6 +282,8 @@ impl ByteViewArrayDecoder {
.ok_or_else(|| general_err!("dictionary required for dictionary encoding"))?;
d.skip(dict, len)
}
ByteViewArrayDecoder::DeltaLength(d) => d.skip(len),
ByteViewArrayDecoder::DeltaByteArray(d) => d.skip(len),
}
}
}
Expand Down Expand Up @@ -487,6 +496,181 @@ impl ByteViewArrayDecoderDictionary {
}
}

/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`ViewBuffer`]
pub struct ByteViewArrayDecoderDeltaLength {
lengths: Vec<i32>,
data: Bytes,
length_offset: usize,
data_offset: usize,
validate_utf8: bool,
}

impl ByteViewArrayDecoderDeltaLength {
fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
len_decoder.set_data(data.clone(), 0)?;
let values = len_decoder.values_left();

let mut lengths = vec![0; values];
len_decoder.get(&mut lengths)?;

let mut total_bytes = 0;

for l in lengths.iter() {
if *l < 0 {
return Err(ParquetError::General(
"negative delta length byte array length".to_string(),
));
}
total_bytes += *l as usize;
}

if total_bytes + len_decoder.get_offset() > data.len() {
return Err(ParquetError::General(
"Insufficient delta length byte array bytes".to_string(),
));
}

Ok(Self {
lengths,
data,
validate_utf8,
length_offset: 0,
data_offset: len_decoder.get_offset(),
})
}

fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
let to_read = len.min(self.lengths.len() - self.length_offset);
output.views.reserve(to_read);

let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_read];

let block_id = output.append_block(self.data.clone().into());
alamb marked this conversation as resolved.
Show resolved Hide resolved

let mut current_offset = self.data_offset;
let initial_offset = current_offset;
for length in src_lengths {
// # Safety
// The length is from the delta length decoder, so it is valid
// The start_offset is calculated from the lengths, so it is valid
XiangpengHao marked this conversation as resolved.
Show resolved Hide resolved
// `start_offset + length` is guaranteed to be within the bounds of `data`, as checked in `new`
unsafe { output.append_view_unchecked(block_id, current_offset as u32, *length as u32) }

current_offset += *length as usize;
}

// Delta length encoding has continuous strings, we can validate utf8 in one go
if self.validate_utf8 {
check_valid_utf8(&self.data[initial_offset..current_offset])?;
}

self.data_offset = current_offset;
self.length_offset += to_read;

Ok(to_read)
}

fn skip(&mut self, to_skip: usize) -> Result<usize> {
let remain_values = self.lengths.len() - self.length_offset;
let to_skip = remain_values.min(to_skip);

let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_skip];
let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();

self.data_offset += total_bytes;
self.length_offset += to_skip;
Ok(to_skip)
}
}

/// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`ViewBuffer`]
pub struct ByteViewArrayDecoderDelta {
decoder: DeltaByteArrayDecoder,
validate_utf8: bool,
}

impl ByteViewArrayDecoderDelta {
fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
Ok(Self {
decoder: DeltaByteArrayDecoder::new(data)?,
validate_utf8,
})
}

// Unlike other encodings, we need to copy the data.
//
// DeltaByteArray data is stored using shared prefixes/suffixes,
// which results in potentially non-contiguous
// strings, while Arrow encodings require contiguous strings
//
// <https://parquet.apache.org/docs/file-format/data-pages/encodings/#delta-strings-delta_byte_array--7>

fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
output.views.reserve(len.min(self.decoder.remaining()));

// array buffer only have long strings
let mut array_buffer: Vec<u8> = Vec::with_capacity(4096);

let buffer_id = output.buffers.len() as u32;

let read = if !self.validate_utf8 {
self.decoder.read(len, |bytes| {
let offset = array_buffer.len();
let view = make_view(bytes, buffer_id, offset as u32);
if bytes.len() > 12 {
// only copy the data to buffer if the string can not be inlined.
array_buffer.extend_from_slice(bytes);
}

// # Safety
// The buffer_id is the last buffer in the output buffers
// The offset is calculated from the buffer, so it is valid
unsafe {
output.append_raw_view_unchecked(&view);
}
Ok(())
})?
} else {
// utf8 validation buffer has only short strings. These short
// strings are inlined into the views but we copy them into a
// contiguous buffer to accelerate validation.®
let mut utf8_validation_buffer = Vec::with_capacity(4096);

let v = self.decoder.read(len, |bytes| {
let offset = array_buffer.len();
let view = make_view(bytes, buffer_id, offset as u32);
if bytes.len() > 12 {
// only copy the data to buffer if the string can not be inlined.
array_buffer.extend_from_slice(bytes);
} else {
utf8_validation_buffer.extend_from_slice(bytes);
}

// # Safety
// The buffer_id is the last buffer in the output buffers
// The offset is calculated from the buffer, so it is valid
// Utf-8 validation is done later
unsafe {
output.append_raw_view_unchecked(&view);
}
Ok(())
})?;
check_valid_utf8(&array_buffer)?;
check_valid_utf8(&utf8_validation_buffer)?;
v
};

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we also need to check array_buffer for valid utf8? Maybe I am missing it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the utf8_validation_buffer already have all the data from array_buffer, so we just do one pass validation.

Copy link
Contributor

@alamb alamb Jul 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see -- I was thinking about something like this: XiangpengHao#1 (call validate_utf8 twice by only copy the strings once. What do you think? (Sorry for being overly obsessive)

let actual_block_id = output.append_block(array_buffer.into());
assert_eq!(actual_block_id, buffer_id);
XiangpengHao marked this conversation as resolved.
Show resolved Hide resolved
Ok(read)
}

fn skip(&mut self, to_skip: usize) -> Result<usize> {
self.decoder.skip(to_skip)
}
}

/// Check that `val` is a valid UTF-8 sequence
pub fn check_valid_utf8(val: &[u8]) -> Result<()> {
match std::str::from_utf8(val) {
Expand Down Expand Up @@ -525,13 +709,6 @@ mod tests {
.unwrap();

for (encoding, page) in pages {
if encoding != Encoding::PLAIN
&& encoding != Encoding::RLE_DICTIONARY
&& encoding != Encoding::PLAIN_DICTIONARY
{
// skip unsupported encodings for now as they are not yet implemented
continue;
}
let mut output = ViewBuffer::default();
decoder.set_data(encoding, page, 4, Some(4)).unwrap();

Expand Down
Loading
Loading