Skip to content

Commit

Permalink
Use upstream RowSelection::intersection in page index pruning (#4340)
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored Dec 3, 2022
1 parent 27dc295 commit 8db99d2
Showing 1 changed file with 10 additions and 155 deletions.
165 changes: 10 additions & 155 deletions datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use parquet::{
},
format::PageLocation,
};
use std::collections::VecDeque;
use std::sync::Arc;

use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
Expand Down Expand Up @@ -123,7 +122,7 @@ pub(crate) fn build_page_filter(
if let (Some(file_offset_indexes), Some(file_page_indexes)) =
(file_offset_indexes, file_page_indexes)
{
let mut row_selections = VecDeque::with_capacity(page_index_predicates.len());
let mut row_selections = Vec::with_capacity(page_index_predicates.len());
for predicate in page_index_predicates {
// `extract_page_index_push_down_predicates` only return predicate with one col.
// when building `PruningPredicate`, some single column filter like `abs(i) = 1`
Expand Down Expand Up @@ -163,12 +162,11 @@ pub(crate) fn build_page_filter(
}
}
debug!(
"Use filter and page index create RowSelection {:?} from predicate: {:?}",
&selectors,
predicate.predicate_expr(),
);
row_selections
.push_back(selectors.into_iter().flatten().collect::<Vec<_>>());
"Use filter and page index create RowSelection {:?} from predicate: {:?}",
&selectors,
predicate.predicate_expr(),
);
row_selections.push(selectors.into_iter().flatten().collect::<Vec<_>>());
}
}
let final_selection = combine_multi_col_selection(row_selections);
Expand All @@ -184,7 +182,7 @@ pub(crate) fn build_page_filter(
},
);
file_metrics.page_index_rows_filtered.add(total_skip);
Ok(Some(final_selection.into()))
Ok(Some(final_selection))
} else {
Ok(None)
}
Expand All @@ -197,93 +195,14 @@ pub(crate) fn build_page_filter(
///
/// The final selection is the intersection of these `RowSelector`s:
/// * `final_selection:[ Skip(0~199), Read(200~249), Skip(250~299)]`
fn combine_multi_col_selection(
row_selections: VecDeque<Vec<RowSelector>>,
) -> Vec<RowSelector> {
fn combine_multi_col_selection(row_selections: Vec<Vec<RowSelector>>) -> RowSelection {
row_selections
.into_iter()
.reduce(intersect_row_selection)
.map(RowSelection::from)
.reduce(|s1, s2| s1.intersection(&s2))
.unwrap()
}

/// combine two `RowSelection` return the intersection
/// For example:
/// self: NNYYYYNNY
/// other: NYNNNNNNY
///
/// returned: NNNNNNNNY
/// set `need_combine` true will combine result: Select(2) + Select(1) + Skip(2) -> Select(3) + Skip(2)
///
/// Move to arrow-rs: https://github.com/apache/arrow-rs/issues/3003
pub(crate) fn intersect_row_selection(
left: Vec<RowSelector>,
right: Vec<RowSelector>,
) -> Vec<RowSelector> {
let mut res = vec![];
let mut l_iter = left.into_iter().peekable();
let mut r_iter = right.into_iter().peekable();

while let (Some(a), Some(b)) = (l_iter.peek_mut(), r_iter.peek_mut()) {
if a.row_count == 0 {
l_iter.next().unwrap();
continue;
}
if b.row_count == 0 {
r_iter.next().unwrap();
continue;
}
match (a.skip, b.skip) {
// Keep both ranges
(false, false) => {
if a.row_count < b.row_count {
res.push(RowSelector::select(a.row_count));
b.row_count -= a.row_count;
l_iter.next().unwrap();
} else {
res.push(RowSelector::select(b.row_count));
a.row_count -= b.row_count;
r_iter.next().unwrap();
}
}
// skip at least one
_ => {
if a.row_count < b.row_count {
res.push(RowSelector::skip(a.row_count));
b.row_count -= a.row_count;
l_iter.next().unwrap();
} else {
res.push(RowSelector::skip(b.row_count));
a.row_count -= b.row_count;
r_iter.next().unwrap();
}
}
}
}
if l_iter.peek().is_some() {
res.extend(l_iter);
}
if r_iter.peek().is_some() {
res.extend(r_iter);
}
// combine the adjacent same operators and last zero row count
// TODO: remove when https://github.com/apache/arrow-rs/pull/2994 is released~

let mut pre = res[0];
let mut after_combine = vec![];
for selector in res.iter_mut().skip(1) {
if selector.skip == pre.skip {
pre.row_count += selector.row_count;
} else {
after_combine.push(pre);
pre = *selector;
}
}
if pre.row_count != 0 {
after_combine.push(pre);
}
after_combine
}

// Extract single col pruningPredicate from input predicate for evaluating page Index.
fn extract_page_index_push_down_predicates(
predicate: Option<&PruningPredicate>,
Expand Down Expand Up @@ -539,67 +458,3 @@ impl<'a> PruningStatistics for PagesPruningStatistics<'a> {
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_combine_row_selection() {
// a size equal b size
let a = vec![
RowSelector::select(5),
RowSelector::skip(4),
RowSelector::select(1),
];
let b = vec![
RowSelector::select(8),
RowSelector::skip(1),
RowSelector::select(1),
];

let res = intersect_row_selection(a, b);
assert_eq!(
res,
vec![
RowSelector::select(5),
RowSelector::skip(4),
RowSelector::select(1)
],
);

// a size larger than b size
let a = vec![
RowSelector::select(3),
RowSelector::skip(33),
RowSelector::select(3),
RowSelector::skip(33),
];
let b = vec![RowSelector::select(36), RowSelector::skip(36)];
let res = intersect_row_selection(a, b);
assert_eq!(res, vec![RowSelector::select(3), RowSelector::skip(69)]);

// a size less than b size
let a = vec![RowSelector::select(3), RowSelector::skip(7)];
let b = vec![
RowSelector::select(2),
RowSelector::skip(2),
RowSelector::select(2),
RowSelector::skip(2),
RowSelector::select(2),
];
let res = intersect_row_selection(a, b);
assert_eq!(res, vec![RowSelector::select(2), RowSelector::skip(8)]);

let a = vec![RowSelector::select(3), RowSelector::skip(7)];
let b = vec![
RowSelector::select(2),
RowSelector::skip(2),
RowSelector::select(2),
RowSelector::skip(2),
RowSelector::select(2),
];
let res = intersect_row_selection(a, b);
assert_eq!(res, vec![RowSelector::select(2), RowSelector::skip(8),]);
}
}

0 comments on commit 8db99d2

Please sign in to comment.