Skip to content

Commit

Permalink
perf: Several large parquet optimizations (#18437)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Aug 28, 2024
1 parent a5b441a commit f6ef516
Show file tree
Hide file tree
Showing 18 changed files with 666 additions and 403 deletions.
16 changes: 16 additions & 0 deletions crates/polars-arrow/src/bitmap/bitmap_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,22 @@ pub fn intersects_with_mut(lhs: &MutableBitmap, rhs: &MutableBitmap) -> bool {
)
}

pub fn num_edges(lhs: &Bitmap) -> usize {
if lhs.is_empty() {
return 0;
}

// @TODO: If is probably quite inefficient to do it like this because now either one is not
// aligned. Maybe, we can implement a smarter way to do this.
binary_fold(
&unsafe { lhs.clone().sliced_unchecked(0, lhs.len() - 1) },
&unsafe { lhs.clone().sliced_unchecked(1, lhs.len() - 1) },
|l, r| (l ^ r).count_ones() as usize,
0,
|acc, v| acc + v,
)
}

/// Compute `out[i] = if selector[i] { truthy[i] } else { falsy }`.
pub fn select_constant(selector: &Bitmap, truthy: &Bitmap, falsy: bool) -> Bitmap {
let falsy_mask: u64 = if falsy {
Expand Down
5 changes: 5 additions & 0 deletions crates/polars-arrow/src/bitmap/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,11 @@ impl Bitmap {
pub fn select_constant(&self, truthy: &Self, falsy: bool) -> Self {
super::bitmap_ops::select_constant(self, truthy, falsy)
}

/// Calculates the number of edges from `0 -> 1` and `1 -> 0`.
pub fn num_edges(&self) -> usize {
super::bitmap_ops::num_edges(self)
}
}

impl<P: AsRef<[bool]>> From<P> for Bitmap {
Expand Down
16 changes: 16 additions & 0 deletions crates/polars-arrow/src/datatypes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,22 @@ impl ArrowDataType {
}
}

pub fn is_nested(&self) -> bool {
use ArrowDataType as D;

matches!(
self,
D::List(_)
| D::LargeList(_)
| D::FixedSizeList(_, _)
| D::Struct(_)
| D::Union(_, _, _)
| D::Map(_, _)
| D::Dictionary(_, _, _)
| D::Extension(_, _, _)
)
}

pub fn is_view(&self) -> bool {
matches!(self, ArrowDataType::Utf8View | ArrowDataType::BinaryView)
}
Expand Down
131 changes: 91 additions & 40 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::borrow::Cow;
use std::collections::VecDeque;
use std::ops::{Deref, Range};

use arrow::array::BooleanArray;
use arrow::bitmap::{Bitmap, MutableBitmap};
use arrow::datatypes::ArrowSchemaRef;
use polars_core::prelude::*;
Expand Down Expand Up @@ -313,6 +314,20 @@ fn rg_to_dfs_prefiltered(
debug_assert_eq!(live_idx_to_col_idx.len(), num_live_columns);
debug_assert_eq!(dead_idx_to_col_idx.len(), num_dead_columns);

enum MaskSetting {
Auto,
Pre,
Post,
}

let mask_setting =
std::env::var("POLARS_PQ_PREFILTERED_MASK").map_or(MaskSetting::Auto, |v| match &v[..] {
"auto" => MaskSetting::Auto,
"pre" => MaskSetting::Pre,
"post" => MaskSetting::Post,
_ => panic!("Invalid `POLARS_PQ_PREFILTERED_MASK` value '{v}'."),
});

POOL.install(|| {
// Set partitioned fields to prevent quadratic behavior.
// Ensure all row groups are partitioned.
Expand Down Expand Up @@ -394,38 +409,34 @@ fn rg_to_dfs_prefiltered(
return Ok(dfs.into_iter().map(|(_, df)| df).collect());
}

// @TODO: Incorporate this if we how we can properly use it. The problem here is that
// different columns really have a different cost when it comes to collecting them. We
// would need a cost model to properly estimate this.
//
// // For bitmasks that are seemingly random (i.e. not clustered or biased towards 0 or 1),
// // filtering with a bitmask in the Parquet reader is actually around 1.5 - 2.2 times slower
// // than collecting everything and filtering afterwards. This is because stopping and
// // starting decoding is not free.
// //
// // To combat this we try to detect here how biased our data is. We do this with a bithack
// // that estimates the amount of switches from 0 to 1 and from 1 to 0. This can be SIMD-ed
// // very well and gives us quite good estimate of how random our bitmask is. Then, we select
// // the filter if the bitmask is not that random.
// let do_filter_rg = dfs
// .par_iter()
// .map(|(mask, _)| {
// let iter = mask.fast_iter_u64();
//
// // The iter is TrustedLen so the size_hint is exact.
// let num_items = iter.size_hint().0;
// let num_switches = iter
// .map(|v| (v ^ v.rotate_right(1)).count_ones() as u64)
// .sum::<u64>();
//
// // We ignore the iter remainder since we only really care about the average.
// let avg_num_switches_per_element = num_switches / num_items as u64;
//
// // We select the filter if the average amount of switches per 64 elements is less
// // than or equal to 2.
// avg_num_switches_per_element <= 2
// })
// .collect::<Vec<_>>();
let rg_prefilter_costs = matches!(mask_setting, MaskSetting::Auto)
.then(|| {
dfs.par_iter()
.map(|(mask, _)| {
let num_edges = mask.num_edges() as f64;
let rg_len = mask.len() as f64;

// @GB: I did quite some analysis on this.
//
// Pre-filtered and Post-filtered can both be faster in certain scenarios.
//
// - Pre-filtered is faster when there is some amount of clustering or
// sorting involved or if the number of values selected is small.
// - Post-filtering is faster when the predicate selects a somewhat random
// elements throughout the row group.
//
// The following is a heuristic value to try and estimate which one is
// faster. Essentially, it sees how many times it needs to switch between
// skipping items and collecting items and compares it against the number
// of values that it will collect.
//
// Closer to 0: post-filtering is probably better.
// Closer to 1: pre-filtering is probably better.
(num_edges / rg_len).clamp(0.0, 1.0)
})
.collect::<Vec<_>>()
})
.unwrap_or_default();

let mut rg_columns = (0..dfs.len() * num_dead_columns)
.into_par_iter()
Expand All @@ -444,27 +455,67 @@ fn rg_to_dfs_prefiltered(
}
let field_md = part_mds[rg_idx as usize].get_partitions(name).unwrap();

column_idx_to_series(
col_idx,
field_md.as_slice(),
Some(Filter::new_masked(mask.clone())),
schema,
store,
)
let pre = || {
column_idx_to_series(
col_idx,
field_md.as_slice(),
Some(Filter::new_masked(mask.clone())),
schema,
store,
)
};
let post = || {
let array =
column_idx_to_series(col_idx, field_md.as_slice(), None, schema, store)?;

debug_assert_eq!(array.len(), mask.len());

let mask_arr = BooleanArray::new(ArrowDataType::Boolean, mask.clone(), None);
let mask_arr = BooleanChunked::from(mask_arr);
array.filter(&mask_arr)
};

let array = match mask_setting {
MaskSetting::Auto => {
// Prefiltering is more expensive for nested types so we make the cut-off
// higher.
let is_nested = schema.fields[col_idx].data_type.is_nested();
let prefilter_cost = rg_prefilter_costs[i / num_dead_columns];

// We empirically selected these numbers.
let do_prefilter = (is_nested && prefilter_cost <= 0.01)
|| (!is_nested && prefilter_cost <= 0.02);

if do_prefilter {
pre()?
} else {
post()?
}
},
MaskSetting::Pre => pre()?,
MaskSetting::Post => post()?,
};

debug_assert_eq!(array.len(), mask.set_bits());

Ok(array)
})
.collect::<PolarsResult<Vec<_>>>()?;

let Some(df) = dfs.first().map(|(_, df)| df) else {
return Ok(Vec::new());
};
let rearranged_schema = df.schema();
let mut rearranged_schema = df.schema();
rearranged_schema.merge(Schema::from(schema));

rg_columns
.par_chunks_exact_mut(num_dead_columns)
.zip(dfs)
.map(|(rg_cols, (_, mut df))| {
let rg_cols = rg_cols.iter_mut().map(std::mem::take).collect::<Vec<_>>();

debug_assert!(rg_cols.iter().all(|v| v.len() == df.height()));

// We first add the columns with the live columns at the start. Then, we do a
// projections that puts the columns at the right spot.
df._add_columns(rg_cols, &rearranged_schema)?;
Expand Down
Loading

0 comments on commit f6ef516

Please sign in to comment.