Skip to content

Commit

Permalink
Prune pages are all null in ParquetExec by row_counts and fix NOT NUL…
Browse files Browse the repository at this point in the history
…L prune (apache#10051)

* Prune pages are all null in ParquetExec by row_counts
and fix NOT NULL prune

* fix clippy

* Update datafusion/core/src/physical_optimizer/pruning.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* Update datafusion/core/tests/parquet/page_pruning.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* Update datafusion/core/tests/parquet/page_pruning.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* Update datafusion/core/tests/parquet/page_pruning.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* Update datafusion/core/tests/parquet/page_pruning.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* remove allocate vec

* better way avoid allocate vec

* simply expr

---------

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
Ted-Jiang and alamb authored Apr 14, 2024
1 parent d698d9d commit 671cef8
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ fn prune_pages_in_one_row_group(
col_page_indexes,
col_offset_indexes,
target_type: &target_type,
num_rows_in_row_group: group.num_rows(),
};

match predicate.prune(&pruning_stats) {
Expand Down Expand Up @@ -385,6 +386,7 @@ struct PagesPruningStatistics<'a> {
// target_type means the logical type in schema: like 'DECIMAL' is the logical type, but the
// real physical type in parquet file may be `INT32, INT64, FIXED_LEN_BYTE_ARRAY`
target_type: &'a Option<DataType>,
num_rows_in_row_group: i64,
}

// Extract the min or max value calling `func` from page idex
Expand Down Expand Up @@ -548,7 +550,19 @@ impl<'a> PruningStatistics for PagesPruningStatistics<'a> {
}

fn row_counts(&self, _column: &datafusion_common::Column) -> Option<ArrayRef> {
None
// see https://github.com/apache/arrow-rs/blob/91f0b1771308609ca27db0fb1d2d49571b3980d8/parquet/src/file/metadata.rs#L979-L982

let row_count_per_page = self.col_offset_indexes.windows(2).map(|location| {
Some(location[1].first_row_index - location[0].first_row_index)
});

// append the last page row count
let row_count_per_page = row_count_per_page.chain(std::iter::once(Some(
self.num_rows_in_row_group
- self.col_offset_indexes.last().unwrap().first_row_index,
)));

Some(Arc::new(Int64Array::from_iter(row_count_per_page)))
}

fn contained(
Expand Down
49 changes: 30 additions & 19 deletions datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ pub trait PruningStatistics {
/// `x < 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_max < 5 END`
/// `x = 5 AND y = 10` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_min <= 5 AND 5 <= x_max END AND CASE WHEN y_null_count = y_row_count THEN false ELSE y_min <= 10 AND 10 <= y_max END`
/// `x IS NULL` | `x_null_count > 0`
/// `x IS NOT NULL` | `x_null_count = 0`
/// `x IS NOT NULL` | `x_null_count != row_count`
/// `CAST(x as int) = 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE CAST(x_min as int) <= 5 AND 5 <= CAST(x_max as int) END`
///
/// ## Predicate Evaluation
Expand Down Expand Up @@ -1240,10 +1240,10 @@ fn build_single_column_expr(
/// returns a pruning expression in terms of IsNull that will evaluate to true
/// if the column may contain null, and false if definitely does not
/// contain null.
/// If set `with_not` to true: which means is not null
/// Given an expression reference to `expr`, if `expr` is a column expression,
/// returns a pruning expression in terms of IsNotNull that will evaluate to true
/// if the column not contain any null, and false if definitely contain null.
/// If `with_not` is true, build a pruning expression for `col IS NOT NULL`: `col_count != col_null_count`
/// The pruning expression evaluates to true ONLY if the column definitely CONTAINS
/// at least one NULL value. In this case we can know that `IS NOT NULL` can not be true and
/// thus can prune the row group / value
fn build_is_null_column_expr(
expr: &Arc<dyn PhysicalExpr>,
schema: &Schema,
Expand All @@ -1254,26 +1254,37 @@ fn build_is_null_column_expr(
let field = schema.field_with_name(col.name()).ok()?;

let null_count_field = &Field::new(field.name(), DataType::UInt64, true);
required_columns
.null_count_column_expr(col, expr, null_count_field)
.map(|null_count_column_expr| {
if with_not {
// IsNotNull(column) => null_count = 0
Arc::new(phys_expr::BinaryExpr::new(
null_count_column_expr,
Operator::Eq,
Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
)) as _
} else {
if with_not {
if let Ok(row_count_expr) =
required_columns.row_count_column_expr(col, expr, null_count_field)
{
required_columns
.null_count_column_expr(col, expr, null_count_field)
.map(|null_count_column_expr| {
// IsNotNull(column) => null_count != row_count
Arc::new(phys_expr::BinaryExpr::new(
null_count_column_expr,
Operator::NotEq,
row_count_expr,
)) as _
})
.ok()
} else {
None
}
} else {
required_columns
.null_count_column_expr(col, expr, null_count_field)
.map(|null_count_column_expr| {
// IsNull(column) => null_count > 0
Arc::new(phys_expr::BinaryExpr::new(
null_count_column_expr,
Operator::Gt,
Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
)) as _
}
})
.ok()
})
.ok()
}
} else {
None
}
Expand Down
65 changes: 56 additions & 9 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use arrow::{
record_batch::RecordBatch,
util::pretty::pretty_format_batches,
};
use arrow_array::new_null_array;
use arrow_array::make_array;
use chrono::{Datelike, Duration, TimeDelta};
use datafusion::{
datasource::{physical_plan::ParquetExec, provider_as_source, TableProvider},
Expand Down Expand Up @@ -77,6 +77,7 @@ enum Scenario {
ByteArray,
PeriodsInColumnNames,
WithNullValues,
WithNullValuesPageLevel,
}

enum Unit {
Expand Down Expand Up @@ -632,22 +633,60 @@ fn make_names_batch(name: &str, service_name_values: Vec<&str>) -> RecordBatch {
RecordBatch::try_new(schema, vec![Arc::new(name), Arc::new(service_name)]).unwrap()
}

/// Return record batch with i8, i16, i32, and i64 sequences with all Null values
fn make_all_null_values() -> RecordBatch {
/// Return record batch with i8, i16, i32, and i64 sequences with Null values
/// here 5 rows in page when using Unit::Page
fn make_int_batches_with_null(
null_values: usize,
no_null_values_start: usize,
no_null_values_end: usize,
) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("i8", DataType::Int8, true),
Field::new("i16", DataType::Int16, true),
Field::new("i32", DataType::Int32, true),
Field::new("i64", DataType::Int64, true),
]));

let v8: Vec<i8> = (no_null_values_start as _..no_null_values_end as _).collect();
let v16: Vec<i16> = (no_null_values_start as _..no_null_values_end as _).collect();
let v32: Vec<i32> = (no_null_values_start as _..no_null_values_end as _).collect();
let v64: Vec<i64> = (no_null_values_start as _..no_null_values_end as _).collect();

RecordBatch::try_new(
schema,
vec![
new_null_array(&DataType::Int8, 5),
new_null_array(&DataType::Int16, 5),
new_null_array(&DataType::Int32, 5),
new_null_array(&DataType::Int64, 5),
make_array(
Int8Array::from_iter(
v8.into_iter()
.map(Some)
.chain(std::iter::repeat(None).take(null_values)),
)
.to_data(),
),
make_array(
Int16Array::from_iter(
v16.into_iter()
.map(Some)
.chain(std::iter::repeat(None).take(null_values)),
)
.to_data(),
),
make_array(
Int32Array::from_iter(
v32.into_iter()
.map(Some)
.chain(std::iter::repeat(None).take(null_values)),
)
.to_data(),
),
make_array(
Int64Array::from_iter(
v64.into_iter()
.map(Some)
.chain(std::iter::repeat(None).take(null_values)),
)
.to_data(),
),
],
)
.unwrap()
Expand Down Expand Up @@ -824,9 +863,17 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
}
Scenario::WithNullValues => {
vec![
make_all_null_values(),
make_int_batches_with_null(5, 0, 0),
make_int_batches(1, 6),
make_all_null_values(),
make_int_batches_with_null(5, 0, 0),
]
}
Scenario::WithNullValuesPageLevel => {
vec![
make_int_batches_with_null(5, 1, 6),
make_int_batches(1, 11),
make_int_batches_with_null(1, 1, 10),
make_int_batches_with_null(5, 1, 6),
]
}
}
Expand Down
51 changes: 51 additions & 0 deletions datafusion/core/tests/parquet/page_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,57 @@ async fn without_pushdown_filter() {
assert!(bytes_scanned_with_filter > bytes_scanned_without_filter);
}

#[tokio::test]
// Data layout like this:
// row_group1: page1(1~5), page2(All Null)
// row_group2: page1(1~5), page2(6~10)
// row_group3: page1(1~5), page2(6~9 + Null)
// row_group4: page1(1~5), page2(All Null)
// total 40 rows
async fn test_pages_with_null_values() {
test_prune(
Scenario::WithNullValuesPageLevel,
"SELECT * FROM t where i8 <= 6",
Some(0),
// expect prune pages with all null or pages that have only values greater than 6
// (row_group1, page2), (row_group4, page2)
Some(10),
22,
)
.await;

test_prune(
Scenario::WithNullValuesPageLevel,
"SELECT * FROM t where \"i16\" is not null",
Some(0),
// expect prune (row_group1, page2) and (row_group4, page2) = 10 rows
Some(10),
29,
)
.await;

test_prune(
Scenario::WithNullValuesPageLevel,
"SELECT * FROM t where \"i32\" is null",
Some(0),
// expect prune (row_group1, page1), (row_group2, page1+2), (row_group3, page1), (row_group3, page1) = 25 rows
Some(25),
11,
)
.await;

test_prune(
Scenario::WithNullValuesPageLevel,
"SELECT * FROM t where \"i64\" > 6",
Some(0),
// expect to prune pages where i is all null, or where always <= 5
// (row_group1, page1+2), (row_group2, page1), (row_group3, page1) (row_group4, page1+2) = 30 rows
Some(30),
7,
)
.await;
}

fn cast_count_metric(metric: MetricValue) -> Option<usize> {
match metric {
MetricValue::Count { count, .. } => Some(count.value()),
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/parquet/row_group_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1296,7 +1296,7 @@ async fn test_row_group_with_null_values() {
.test_row_group_prune()
.await;

// After pruning, only row group 2should be selected
// After pruning, only row group 2 should be selected
RowGroupPruningTest::new()
.with_scenario(Scenario::WithNullValues)
.with_query("SELECT * FROM t WHERE \"i16\" is Not Null")
Expand Down

0 comments on commit 671cef8

Please sign in to comment.