Skip to content

Commit

Permalink
Refactor BloomFilter application into PruningPredicate
Browse files Browse the repository at this point in the history
Rewrite BloomFilterPruningPredicate in terms of BloomFilterPruningPredicate
  • Loading branch information
alamb committed Dec 4, 2023
1 parent 4b4af65 commit 76ee618
Show file tree
Hide file tree
Showing 4 changed files with 817 additions and 367 deletions.
241 changes: 86 additions & 155 deletions datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,23 @@
// under the License.

use arrow::{array::ArrayRef, datatypes::Schema};
use arrow_array::BooleanArray;
use arrow_schema::FieldRef;
use datafusion_common::tree_node::{TreeNode, VisitRecursion};
use datafusion_common::{Column, DataFusionError, Result, ScalarValue};
use datafusion_common::{Column, ScalarValue};
use parquet::file::metadata::ColumnChunkMetaData;
use parquet::schema::types::SchemaDescriptor;
use parquet::{
arrow::{async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder},
bloom_filter::Sbbf,
file::metadata::RowGroupMetaData,
};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use std::collections::{HashMap, HashSet};

use crate::datasource::listing::FileRange;
use crate::datasource::physical_plan::parquet::statistics::{
max_statistics, min_statistics, parquet_column,
};
use crate::logical_expr::Operator;
use crate::physical_expr::expressions as phys_expr;
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use crate::physical_plan::PhysicalExpr;

use super::ParquetFileMetrics;

Expand Down Expand Up @@ -122,182 +116,118 @@ pub(crate) async fn prune_row_groups_by_bloom_filters<
predicate: &PruningPredicate,
metrics: &ParquetFileMetrics,
) -> Vec<usize> {
let bf_predicates = match BloomFilterPruningPredicate::try_new(predicate.orig_expr())
{
Ok(predicates) => predicates,
Err(_) => {
return row_groups.to_vec();
}
};
println!(
"prune_row_groups_by_bloom_filters with pruning predicate: {:#?}",
predicate
);
let mut filtered = Vec::with_capacity(groups.len());
for idx in row_groups {
let rg_metadata = &groups[*idx];
// get all columns bloom filter
let mut column_sbbf =
HashMap::with_capacity(bf_predicates.required_columns.len());
for column_name in bf_predicates.required_columns.iter() {
let column_idx = match rg_metadata
let literal_columns = predicate.literal_columns();
let mut column_sbbf = HashMap::with_capacity(literal_columns.len());

for column_name in literal_columns {
// This is very likely incorrect as it will not work for nested columns
// should use parquet_column instead
let Some((column_idx, _)) = rg_metadata
.columns()
.iter()
.enumerate()
.find(|(_, column)| column.column_path().string().eq(column_name))
{
Some((column_idx, _)) => column_idx,
None => continue,
.find(|(_, column)| column.column_path().string().eq(&column_name))
else {
continue;
};

let bf = match builder
.get_row_group_column_bloom_filter(*idx, column_idx)
.await
{
Ok(bf) => match bf {
Some(bf) => bf,
None => {
continue;
}
},
Ok(Some(bf)) => bf,
Ok(None) => continue, // no bloom filter for this column
Err(e) => {
log::error!("Error evaluating row group predicate values when using BloomFilterPruningPredicate {e}");
log::error!("Ignoring error reading bloom filter: {e}");
metrics.predicate_evaluation_errors.add(1);
continue;
}
};
column_sbbf.insert(column_name.to_owned(), bf);
column_sbbf.insert(column_name.to_string(), bf);
}
if bf_predicates.prune(&column_sbbf) {

let stats = BloomFilterStatistics { column_sbbf };

// Can this group be pruned?
let prune_result = predicate.prune(&stats);
println!("prune result: {:?}", prune_result);
let prune_group = match prune_result {
Ok(values) => !values[0],
Err(e) => {
log::debug!("Error evaluating row group predicate on bloom filter: {e}");
metrics.predicate_evaluation_errors.add(1);
false
}
};

println!("prune group: {}", prune_group);

if prune_group {
metrics.row_groups_pruned.add(1);
continue;
} else {
filtered.push(*idx);
}
filtered.push(*idx);
}
filtered
}

struct BloomFilterPruningPredicate {
/// Actual pruning predicate
predicate_expr: Option<phys_expr::BinaryExpr>,
/// The statistics required to evaluate this predicate
required_columns: Vec<String>,
struct BloomFilterStatistics {
column_sbbf: HashMap<String, Sbbf>,
}

impl BloomFilterPruningPredicate {
fn try_new(expr: &Arc<dyn PhysicalExpr>) -> Result<Self> {
let binary_expr = expr.as_any().downcast_ref::<phys_expr::BinaryExpr>();
match binary_expr {
Some(binary_expr) => {
let columns = Self::get_predicate_columns(expr);
Ok(Self {
predicate_expr: Some(binary_expr.clone()),
required_columns: columns.into_iter().collect(),
})
}
None => Err(DataFusionError::Execution(
"BloomFilterPruningPredicate only support binary expr".to_string(),
)),
}
}

fn prune(&self, column_sbbf: &HashMap<String, Sbbf>) -> bool {
Self::prune_expr_with_bloom_filter(self.predicate_expr.as_ref(), column_sbbf)
impl PruningStatistics for BloomFilterStatistics {
fn num_containers(&self) -> usize {
1
}

/// Return true if the `expr` can be proved not `true`
/// based on the bloom filter.
///
/// We only checked `BinaryExpr` but it also support `InList`,
/// Because of the `optimizer` will convert `InList` to `BinaryExpr`.
fn prune_expr_with_bloom_filter(
expr: Option<&phys_expr::BinaryExpr>,
column_sbbf: &HashMap<String, Sbbf>,
) -> bool {
let Some(expr) = expr else {
// unsupported predicate
return false;
/// Use bloom filters to determine if we are sure this column can not contain `value`
fn contains(
&self,
column: &Column,
values: &HashSet<ScalarValue>,
) -> Option<BooleanArray> {
println!("Checking column {} for values {:?}", column.name, values);
let sbbf = self.column_sbbf.get(column.name.as_str())?;
println!(" have sbbf: {:?}", sbbf);

// if true, means column probably contains value
// if false, means column definitely DOES NOT contain value
let known_not_present = values
.iter()
.map(|value| match value {
ScalarValue::Utf8(Some(v)) => sbbf.check(&v.as_str()),
ScalarValue::Boolean(Some(v)) => sbbf.check(v),
ScalarValue::Float64(Some(v)) => sbbf.check(v),
ScalarValue::Float32(Some(v)) => sbbf.check(v),
ScalarValue::Int64(Some(v)) => sbbf.check(v),
ScalarValue::Int32(Some(v)) => sbbf.check(v),
ScalarValue::Int16(Some(v)) => sbbf.check(v),
ScalarValue::Int8(Some(v)) => sbbf.check(v),
_ => true,
})
// We know the row group doesn't contain any of the values if the checks are all
// false
.all(|v| !v);
println!("known_not_present result: {}", known_not_present);

let contains = if known_not_present {
Some(false)
} else {
// The column might contain one of the values
None
};
match expr.op() {
Operator::And | Operator::Or => {
let left = Self::prune_expr_with_bloom_filter(
expr.left().as_any().downcast_ref::<phys_expr::BinaryExpr>(),
column_sbbf,
);
let right = Self::prune_expr_with_bloom_filter(
expr.right()
.as_any()
.downcast_ref::<phys_expr::BinaryExpr>(),
column_sbbf,
);
match expr.op() {
Operator::And => left || right,
Operator::Or => left && right,
_ => false,
}
}
Operator::Eq => {
if let Some((col, val)) = Self::check_expr_is_col_equal_const(expr) {
if let Some(sbbf) = column_sbbf.get(col.name()) {
match val {
ScalarValue::Utf8(Some(v)) => !sbbf.check(&v.as_str()),
ScalarValue::Boolean(Some(v)) => !sbbf.check(&v),
ScalarValue::Float64(Some(v)) => !sbbf.check(&v),
ScalarValue::Float32(Some(v)) => !sbbf.check(&v),
ScalarValue::Int64(Some(v)) => !sbbf.check(&v),
ScalarValue::Int32(Some(v)) => !sbbf.check(&v),
ScalarValue::Int16(Some(v)) => !sbbf.check(&v),
ScalarValue::Int8(Some(v)) => !sbbf.check(&v),
_ => false,
}
} else {
false
}
} else {
false
}
}
_ => false,
}
}

fn get_predicate_columns(expr: &Arc<dyn PhysicalExpr>) -> HashSet<String> {
let mut columns = HashSet::new();
expr.apply(&mut |expr| {
if let Some(binary_expr) =
expr.as_any().downcast_ref::<phys_expr::BinaryExpr>()
{
if let Some((column, _)) =
Self::check_expr_is_col_equal_const(binary_expr)
{
columns.insert(column.name().to_string());
}
}
Ok(VisitRecursion::Continue)
})
// no way to fail as only Ok(VisitRecursion::Continue) is returned
.unwrap();

columns
}

fn check_expr_is_col_equal_const(
exr: &phys_expr::BinaryExpr,
) -> Option<(phys_expr::Column, ScalarValue)> {
if Operator::Eq.ne(exr.op()) {
return None;
}

let left_any = exr.left().as_any();
let right_any = exr.right().as_any();
if let (Some(col), Some(liter)) = (
left_any.downcast_ref::<phys_expr::Column>(),
right_any.downcast_ref::<phys_expr::Literal>(),
) {
return Some((col.clone(), liter.value().clone()));
}
if let (Some(liter), Some(col)) = (
left_any.downcast_ref::<phys_expr::Literal>(),
right_any.downcast_ref::<phys_expr::Column>(),
) {
return Some((col.clone(), liter.value().clone()));
}
None
let result = Some(BooleanArray::from(vec![contains]));
println!("result: {:?}", result);
result
}
}

Expand Down Expand Up @@ -350,6 +280,7 @@ mod tests {
use arrow::datatypes::Schema;
use arrow::datatypes::{DataType, Field};
use datafusion_common::{config::ConfigOptions, TableReference, ToDFSchema};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::{
builder::LogicalTableSource, cast, col, lit, AggregateUDF, Expr, ScalarUDF,
TableSource, WindowUDF,
Expand Down
Loading

0 comments on commit 76ee618

Please sign in to comment.