Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Allow for arbitrary skips in Parquet Dictionary Decoding #19649

Merged
merged 11 commits into from
Nov 6, 2024
18 changes: 17 additions & 1 deletion crates/polars-arrow/src/bitmap/bitmask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::simd::{LaneCount, Mask, MaskElement, SupportedLaneCount};
use polars_utils::slice::load_padded_le_u64;

use super::iterator::FastU56BitmapIter;
use super::utils::{count_zeros, BitmapIter};
use super::utils::{count_zeros, fmt, BitmapIter};
use crate::bitmap::Bitmap;

/// Returns the nth set bit in w, if n+1 bits are set. The indexing is
Expand Down Expand Up @@ -79,6 +79,15 @@ pub struct BitMask<'a> {
len: usize,
}

impl std::fmt::Debug for BitMask<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self { bytes, offset, len } = self;
let offset_num_bytes = offset / 8;
let offset_in_byte = offset % 8;
fmt(&bytes[offset_num_bytes..], offset_in_byte, *len, f)
}
}

impl<'a> BitMask<'a> {
pub fn from_bitmap(bitmap: &'a Bitmap) -> Self {
let (bytes, offset, len) = bitmap.as_slice();
Expand All @@ -92,6 +101,13 @@ impl<'a> BitMask<'a> {
self.len
}

#[inline]
pub fn advance_by(&mut self, idx: usize) {
assert!(idx <= self.len);
self.offset += idx;
self.len -= idx;
}

#[inline]
pub fn split_at(&self, idx: usize) -> (Self, Self) {
assert!(idx <= self.len);
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-parquet/src/arrow/read/deserialize/binview.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ use arrow::bitmap::{Bitmap, MutableBitmap};
use arrow::buffer::Buffer;
use arrow::datatypes::{ArrowDataType, PhysicalType};

use super::utils::dict_encoded::{append_validity, constrain_page_validity};
use super::dictionary_encoded::{append_validity, constrain_page_validity};
use super::utils::{
dict_indices_decoder, filter_from_range, freeze_validity, unspecialized_decode,
};
use super::Filter;
use super::{dictionary_encoded, Filter};
use crate::parquet::encoding::{delta_byte_array, delta_length_byte_array, hybrid_rle, Encoding};
use crate::parquet::error::{ParquetError, ParquetResult};
use crate::parquet::page::{split_buffer, DataPage, DictPage};
Expand Down Expand Up @@ -521,7 +521,7 @@ impl utils::Decoder for BinViewDecoder {

let start_length = decoded.0.views().len();

utils::dict_encoded::decode_dict(
dictionary_encoded::decode_dict(
indexes.clone(),
dict,
state.is_optional,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use arrow::bitmap::{Bitmap, MutableBitmap};
use arrow::datatypes::ArrowDataType;
use polars_compute::filter::filter_boolean_kernel;

use super::utils::dict_encoded::{append_validity, constrain_page_validity};
use super::dictionary_encoded::{append_validity, constrain_page_validity};
use super::utils::{
self, decode_hybrid_rle_into_bitmap, filter_from_range, freeze_validity, Decoder, ExactSize,
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
use arrow::bitmap::bitmask::BitMask;
use arrow::bitmap::{Bitmap, MutableBitmap};
use arrow::types::{AlignedBytes, NativeType};
use polars_compute::filter::filter_boolean_kernel;

use super::ParquetError;
use crate::parquet::encoding::hybrid_rle::HybridRleDecoder;
use crate::parquet::error::ParquetResult;
use crate::read::Filter;

mod optional;
mod optional_masked_dense;
mod required;
mod required_masked_dense;

pub fn decode_dict<T: NativeType>(
values: HybridRleDecoder<'_>,
dict: &[T],
is_optional: bool,
page_validity: Option<&Bitmap>,
filter: Option<Filter>,
validity: &mut MutableBitmap,
target: &mut Vec<T>,
) -> ParquetResult<()> {
decode_dict_dispatch(
values,
bytemuck::cast_slice(dict),
is_optional,
page_validity,
filter,
validity,
<T::AlignedBytes as AlignedBytes>::cast_vec_ref_mut(target),
)
}

#[inline(never)]
pub fn decode_dict_dispatch<B: AlignedBytes>(
mut values: HybridRleDecoder<'_>,
dict: &[B],
is_optional: bool,
page_validity: Option<&Bitmap>,
filter: Option<Filter>,
validity: &mut MutableBitmap,
target: &mut Vec<B>,
) -> ParquetResult<()> {
if cfg!(debug_assertions) && is_optional {
assert_eq!(target.len(), validity.len());
}

if is_optional {
append_validity(page_validity, filter.as_ref(), validity, values.len());
}

let page_validity = constrain_page_validity(values.len(), page_validity, filter.as_ref());

match (filter, page_validity) {
(None, None) => required::decode(values, dict, target, 0),
(Some(Filter::Range(rng)), None) => {
values.limit_to(rng.end);
required::decode(values, dict, target, rng.start)
},
(None, Some(page_validity)) => optional::decode(values, dict, page_validity, target, 0),
(Some(Filter::Range(rng)), Some(page_validity)) => {
optional::decode(values, dict, page_validity, target, rng.start)
},
(Some(Filter::Mask(filter)), None) => {
required_masked_dense::decode(values, dict, filter, target)
},
(Some(Filter::Mask(filter)), Some(page_validity)) => {
optional_masked_dense::decode(values, dict, filter, page_validity, target)
},
}?;

if cfg!(debug_assertions) && is_optional {
assert_eq!(target.len(), validity.len());
}

Ok(())
}

pub(crate) fn append_validity(
page_validity: Option<&Bitmap>,
filter: Option<&Filter>,
validity: &mut MutableBitmap,
values_len: usize,
) {
match (page_validity, filter) {
(None, None) => validity.extend_constant(values_len, true),
(None, Some(f)) => validity.extend_constant(f.num_rows(), true),
(Some(page_validity), None) => validity.extend_from_bitmap(page_validity),
(Some(page_validity), Some(Filter::Range(rng))) => {
let page_validity = page_validity.clone();
validity.extend_from_bitmap(&page_validity.clone().sliced(rng.start, rng.len()))
},
(Some(page_validity), Some(Filter::Mask(mask))) => {
validity.extend_from_bitmap(&filter_boolean_kernel(page_validity, mask))
},
}
}

pub(crate) fn constrain_page_validity(
values_len: usize,
page_validity: Option<&Bitmap>,
filter: Option<&Filter>,
) -> Option<Bitmap> {
let num_unfiltered_rows = match (filter.as_ref(), page_validity) {
(None, None) => values_len,
(None, Some(pv)) => {
debug_assert!(pv.len() >= values_len);
pv.len()
},
(Some(f), v) => {
if cfg!(debug_assertions) {
if let Some(v) = v {
assert!(v.len() >= f.max_offset());
}
}

f.max_offset()
},
};

page_validity.map(|pv| {
if pv.len() > num_unfiltered_rows {
pv.clone().sliced(0, num_unfiltered_rows)
} else {
pv.clone()
}
})
}

#[cold]
fn oob_dict_idx() -> ParquetError {
ParquetError::oos("Dictionary Index is out-of-bounds")
}

#[cold]
fn no_more_bitpacked_values() -> ParquetError {
ParquetError::oos("Bitpacked Hybrid-RLE ran out before all values were served")
}

#[inline(always)]
fn verify_dict_indices(indices: &[u32; 32], dict_size: usize) -> ParquetResult<()> {
let mut is_valid = true;
for &idx in indices {
is_valid &= (idx as usize) < dict_size;
}

if is_valid {
return Ok(());
}

Err(oob_dict_idx())
}

#[inline(always)]
fn verify_dict_indices_slice(indices: &[u32], dict_size: usize) -> ParquetResult<()> {
let mut is_valid = true;
for &idx in indices {
is_valid &= (idx as usize) < dict_size;
}

if is_valid {
return Ok(());
}

Err(oob_dict_idx())
}

/// Skip over entire chunks in a [`HybridRleDecoder`] as long as all skipped chunks do not include
/// more than `num_values_to_skip` values.
#[inline(always)]
fn required_skip_whole_chunks(
values: &mut HybridRleDecoder<'_>,
num_values_to_skip: &mut usize,
) -> ParquetResult<()> {
if *num_values_to_skip == 0 {
return Ok(());
}

loop {
let mut values_clone = values.clone();
let Some(chunk_len) = values_clone.next_chunk_length()? else {
break;
};
if *num_values_to_skip < chunk_len {
break;
}
*values = values_clone;
*num_values_to_skip -= chunk_len;
}

Ok(())
}

/// Skip over entire chunks in a [`HybridRleDecoder`] as long as all skipped chunks do not include
/// more than `num_values_to_skip` values.
#[inline(always)]
fn optional_skip_whole_chunks(
values: &mut HybridRleDecoder<'_>,
validity: &mut BitMask<'_>,
num_rows_to_skip: &mut usize,
num_values_to_skip: &mut usize,
) -> ParquetResult<()> {
if *num_values_to_skip == 0 {
return Ok(());
}

let mut total_num_skipped_values = 0;

loop {
let mut values_clone = values.clone();
let Some(chunk_len) = values_clone.next_chunk_length()? else {
break;
};
if *num_values_to_skip < chunk_len {
break;
}
*values = values_clone;
*num_values_to_skip -= chunk_len;
total_num_skipped_values += chunk_len;
}

if total_num_skipped_values > 0 {
let offset = validity
.nth_set_bit_idx(total_num_skipped_values - 1, 0)
.map_or(validity.len(), |v| v + 1);
*num_rows_to_skip -= offset;
validity.advance_by(offset);
}

Ok(())
}
Loading