Skip to content

Commit

Permalink
fix(rust): properly read/write fixed-sized lists from/to parquet files (
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Jun 13, 2024
1 parent 9a22c10 commit a61dcb4
Show file tree
Hide file tree
Showing 111 changed files with 3,201 additions and 2,378 deletions.
1 change: 1 addition & 0 deletions crates/polars-arrow/src/array/fixed_size_list/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ impl FixedSizeListArray {
}

/// Alias to `Self::try_new(...).unwrap()`
#[track_caller]
pub fn new(data_type: ArrowDataType, values: Box<dyn Array>, validity: Option<Bitmap>) -> Self {
Self::try_new(data_type, values, validity).unwrap()
}
Expand Down
18 changes: 18 additions & 0 deletions crates/polars-arrow/src/offset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,15 @@ impl<O: Offset> Offsets<O> {
}
}

/// Returns a `length` corresponding to the position `index`
/// # Panic
/// This function panics iff `index >= self.len()`
#[inline]
pub fn length_at(&self, index: usize) -> usize {
let (start, end) = self.start_end(index);
end - start
}

/// Returns a range (start, end) corresponding to the position `index`
/// # Panic
/// This function panics iff `index >= self.len()`
Expand Down Expand Up @@ -434,6 +443,15 @@ impl<O: Offset> OffsetsBuffer<O> {
}
}

/// Returns a `length` corresponding to the position `index`
/// # Panic
/// This function panics iff `index >= self.len()`
#[inline]
pub fn length_at(&self, index: usize) -> usize {
let (start, end) = self.start_end(index);
end - start
}

/// Returns a range (start, end) corresponding to the position `index`
/// # Panic
/// This function panics iff `index >= self.len()`
Expand Down
8 changes: 4 additions & 4 deletions crates/polars-io/src/parquet/read/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ pub async fn fetch_metadata(
file_byte_length
.checked_sub(polars_parquet::parquet::FOOTER_SIZE as usize)
.ok_or_else(|| {
polars_parquet::parquet::error::Error::OutOfSpec(
polars_parquet::parquet::error::ParquetError::OutOfSpec(
"not enough bytes to contain parquet footer".to_string(),
)
})?..file_byte_length,
Expand All @@ -136,13 +136,13 @@ pub async fn fetch_metadata(
let magic = read_n(reader).unwrap();
debug_assert!(reader.is_empty());
if magic != polars_parquet::parquet::PARQUET_MAGIC {
return Err(polars_parquet::parquet::error::Error::OutOfSpec(
return Err(polars_parquet::parquet::error::ParquetError::OutOfSpec(
"incorrect magic in parquet footer".to_string(),
)
.into());
}
footer_byte_size.try_into().map_err(|_| {
polars_parquet::parquet::error::Error::OutOfSpec(
polars_parquet::parquet::error::ParquetError::OutOfSpec(
"negative footer byte length".to_string(),
)
})?
Expand All @@ -154,7 +154,7 @@ pub async fn fetch_metadata(
file_byte_length
.checked_sub(polars_parquet::parquet::FOOTER_SIZE as usize + footer_byte_length)
.ok_or_else(|| {
polars_parquet::parquet::error::Error::OutOfSpec(
polars_parquet::parquet::error::ParquetError::OutOfSpec(
"not enough bytes to contain parquet footer".to_string(),
)
})?..file_byte_length,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use super::super::utils::{get_selected_rows, FilteredOptionalPageValidity, Optio
use super::utils::*;
use crate::parquet::deserialize::SliceFilteredIter;
use crate::parquet::encoding::{delta_bitpacked, delta_length_byte_array, hybrid_rle, Encoding};
use crate::parquet::error::ParquetResult;
use crate::parquet::page::{split_buffer, DataPage};
use crate::read::deserialize::utils::{page_is_filtered, page_is_optional};
use crate::read::ParquetError;

pub(crate) type BinaryDict = BinaryArray<i64>;

Expand All @@ -20,7 +20,7 @@ pub(crate) struct Required<'a> {

impl<'a> Required<'a> {
pub fn try_new(page: &'a DataPage) -> PolarsResult<Self> {
let (_, _, values) = split_buffer(page)?;
let values = split_buffer(page)?.values;
let values = BinaryIter::new(values).take(page.num_values());

Ok(Self { values })
Expand All @@ -39,15 +39,15 @@ pub(crate) struct Delta<'a> {

impl<'a> Delta<'a> {
pub fn try_new(page: &'a DataPage) -> PolarsResult<Self> {
let (_, _, values) = split_buffer(page)?;
let values = split_buffer(page)?.values;

let mut lengths_iter = delta_length_byte_array::Decoder::try_new(values)?;

#[allow(clippy::needless_collect)] // we need to consume it to get the values
let lengths = lengths_iter
.by_ref()
.map(|x| x.map(|x| x as usize))
.collect::<Result<Vec<_>, ParquetError>>()?;
.collect::<ParquetResult<Vec<_>>>()?;

let values = lengths_iter.into_values();
Ok(Self {
Expand Down Expand Up @@ -88,7 +88,7 @@ pub(crate) struct DeltaBytes<'a> {

impl<'a> DeltaBytes<'a> {
pub fn try_new(page: &'a DataPage) -> PolarsResult<Self> {
let (_, _, values) = split_buffer(page)?;
let values = split_buffer(page)?.values;
let mut decoder = delta_bitpacked::Decoder::try_new(values)?;
let prefix = (&mut decoder)
.take(page.num_values())
Expand Down Expand Up @@ -329,7 +329,7 @@ pub(crate) fn build_binary_state<'a>(
))
},
(Encoding::Plain, _, true, false) => {
let (_, _, values) = split_buffer(page)?;
let values = split_buffer(page)?.values;

let values = BinaryIter::new(values);

Expand All @@ -343,7 +343,7 @@ pub(crate) fn build_binary_state<'a>(
Ok(BinaryState::FilteredRequired(FilteredRequired::new(page)))
},
(Encoding::Plain, _, true, true) => {
let (_, _, values) = split_buffer(page)?;
let values = split_buffer(page)?.values;

Ok(BinaryState::FilteredOptional(
FilteredOptionalPageValidity::try_new(page)?,
Expand Down Expand Up @@ -409,14 +409,14 @@ pub(crate) fn build_nested_state<'a>(
ValuesDictionary::try_new(page, dict).map(BinaryNestedState::OptionalDictionary)
},
(Encoding::Plain, _, true, false) => {
let (_, _, values) = split_buffer(page)?;
let values = split_buffer(page)?.values;

let values = BinaryIter::new(values);

Ok(BinaryNestedState::Optional(values))
},
(Encoding::Plain, _, false, false) => {
let (_, _, values) = split_buffer(page)?;
let values = split_buffer(page)?.values;

let values = BinaryIter::new(values);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ struct Values<'a>(BitmapIter<'a>);

impl<'a> Values<'a> {
pub fn try_new(page: &'a DataPage) -> PolarsResult<Self> {
let (_, _, values) = split_buffer(page)?;
let values = split_buffer(page)?.values;

Ok(Self(BitmapIter::new(values, 0, values.len() * 8)))
}
Expand Down Expand Up @@ -54,7 +54,7 @@ struct FilteredRequired<'a> {

impl<'a> FilteredRequired<'a> {
pub fn try_new(page: &'a DataPage) -> PolarsResult<Self> {
let (_, _, values) = split_buffer(page)?;
let values = split_buffer(page)?.values;
// todo: replace this by an iterator over slices, for faster deserialization
let values = BitmapIter::new(values, 0, page.num_values());

Expand Down Expand Up @@ -138,7 +138,7 @@ impl<'a> Decoder<'a> for BooleanDecoder {
},
(Encoding::Rle, true, false) => {
let optional = OptionalPageValidity::try_new(page)?;
let (_, _, values) = split_buffer(page)?;
let values = split_buffer(page)?.values;
// For boolean values the length is pre-pended.
let (_len_in_bytes, values) = values.split_at(4);
let iter = hybrid_rle::Decoder::new(values, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ impl<'a> NestedDecoder<'a> for BooleanDecoder {

match (page.encoding(), is_optional, is_filtered) {
(Encoding::Plain, true, false) => {
let (_, _, values) = split_buffer(page)?;
let values = split_buffer(page)?.values;
let values = BitmapIter::new(values, 0, values.len() * 8);

Ok(State::Optional(values))
},
(Encoding::Plain, false, false) => {
let (_, _, values) = split_buffer(page)?;
let values = split_buffer(page)?.values;
let values = BitmapIter::new(values, 0, values.len() * 8);

Ok(State::Required(values))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub(super) struct Optional<'a> {

impl<'a> Optional<'a> {
pub(super) fn try_new(page: &'a DataPage, size: usize) -> PolarsResult<Self> {
let (_, _, values) = split_buffer(page)?;
let values = split_buffer(page)?.values;

let values = values.chunks_exact(size);

Expand Down Expand Up @@ -186,7 +186,7 @@ impl<'a> Decoder<'a> for BinaryDecoder {
FilteredRequired::new(page, self.size),
)),
(Encoding::Plain, _, true, true) => {
let (_, _, values) = split_buffer(page)?;
let values = split_buffer(page)?.values;

Ok(State::FilteredOptional(
FilteredOptionalPageValidity::try_new(page)?,
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-parquet/src/arrow/read/deserialize/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub fn create_list(
nested: &mut NestedState,
values: Box<dyn Array>,
) -> Box<dyn Array> {
let (mut offsets, validity) = nested.nested.pop().unwrap().inner();
let (mut offsets, validity) = nested.nested.pop().unwrap().take();
match data_type.to_logical_type() {
ArrowDataType::List(_) => {
offsets.push(values.len() as i64);
Expand Down Expand Up @@ -89,7 +89,7 @@ pub fn create_map(
nested: &mut NestedState,
values: Box<dyn Array>,
) -> Box<dyn Array> {
let (mut offsets, validity) = nested.nested.pop().unwrap().inner();
let (mut offsets, validity) = nested.nested.pop().unwrap().take();
match data_type.to_logical_type() {
ArrowDataType::Map(_, _) => {
offsets.push(values.len() as i64);
Expand Down
21 changes: 18 additions & 3 deletions crates/polars-parquet/src/arrow/read/deserialize/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,7 @@ where
dict_read::<$K, _>(iter, init, type_, data_type, num_rows, chunk_size)
})?
},
ArrowDataType::List(inner)
| ArrowDataType::LargeList(inner)
| ArrowDataType::FixedSizeList(inner, _) => {
ArrowDataType::List(inner) | ArrowDataType::LargeList(inner) => {
init.push(InitNested::List(field.is_nullable));
let iter = columns_to_iter_recursive(
columns,
Expand All @@ -258,6 +256,23 @@ where
});
Box::new(iter) as _
},
ArrowDataType::FixedSizeList(inner, width) => {
init.push(InitNested::FixedSizeList(field.is_nullable, *width));
let iter = columns_to_iter_recursive(
columns,
types,
inner.as_ref().clone(),
init,
num_rows,
chunk_size,
)?;
let iter = iter.map(move |x| {
let (mut nested, array) = x?;
let array = create_list(field.data_type().clone(), &mut nested, array);
Ok((nested, array))
});
Box::new(iter) as _
},
ArrowDataType::Decimal(_, _) => {
init.push(InitNested::Primitive(field.is_nullable));
let type_ = types.pop().unwrap();
Expand Down
Loading

0 comments on commit a61dcb4

Please sign in to comment.