Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prune pages are all null in ParquetExec by row_counts and fix NOT NULL prune #10051

Merged
merged 10 commits into from
Apr 14, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use arrow_schema::Schema;
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
use itertools::Itertools;
use log::{debug, trace};
use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor};
use parquet::{
Expand Down Expand Up @@ -314,6 +315,7 @@ fn prune_pages_in_one_row_group(
col_page_indexes,
col_offset_indexes,
target_type: &target_type,
num_rows_in_row_group: group.num_rows(),
};

match predicate.prune(&pruning_stats) {
Expand Down Expand Up @@ -385,6 +387,7 @@ struct PagesPruningStatistics<'a> {
// target_type means the logical type in schema: like 'DECIMAL' is the logical type, but the
// real physical type in parquet file may be `INT32, INT64, FIXED_LEN_BYTE_ARRAY`
target_type: &'a Option<DataType>,
num_rows_in_row_group: i64,
}

// Extract the min or max value calling `func` from page idex
Expand Down Expand Up @@ -548,7 +551,20 @@ impl<'a> PruningStatistics for PagesPruningStatistics<'a> {
}

fn row_counts(&self, _column: &datafusion_common::Column) -> Option<ArrayRef> {
None
// see https://github.com/apache/arrow-rs/blob/91f0b1771308609ca27db0fb1d2d49571b3980d8/parquet/src/file/metadata.rs#L979-L982
let mut first_row_index = self
.col_offset_indexes
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can do this same calculation without allocating intermediate vec s-- something like this:

        let row_count_per_page = self
            .col_offset_indexes
            .windows(2)
            .map(|location| Some(location[1].first_row_index - location[0].first_row_index));

        Some(Arc::new(Int64Array::from_iter(row_count_per_page)))

🤔 the name col_offset_indexes is somewhat confusing to me as they are PageLocations --

    col_offset_indexes: &'a Vec<PageLocation>,

maybe we could rename that field to page_locations 🤔

Copy link
Member Author

@Ted-Jiang Ted-Jiang Apr 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb thanks to your always conscientious review and kindly suggestions . 👍

Avoid allocate in b2d3641

In my mind page_locations store the offsets in the column chunk , i think they are the same meaning 🤣

.iter()
.map(|i| i.first_row_index)
.collect_vec();
first_row_index.push(self.num_rows_in_row_group);

let row_count_per_page: Vec<_> = first_row_index
.windows(2)
.map(|window| Some(window[1] - window[0]))
.collect();

Some(Arc::new(Int64Array::from_iter(row_count_per_page)))
}

fn contained(
Expand Down
47 changes: 31 additions & 16 deletions datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ pub trait PruningStatistics {
/// `x < 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_max < 5 END`
/// `x = 5 AND y = 10` | `CASE WHEN x_null_count = x_row_count THEN false ELSE x_min <= 5 AND 5 <= x_max END AND CASE WHEN y_null_count = y_row_count THEN false ELSE y_min <= 10 AND 10 <= y_max END`
/// `x IS NULL` | `x_null_count > 0`
/// `x IS NOT NULL` | `x_null_count = 0`
/// `x IS NOT NULL` | `!(x_null_count = row_count)`
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved
/// `CAST(x as int) = 5` | `CASE WHEN x_null_count = x_row_count THEN false ELSE CAST(x_min as int) <= 5 AND 5 <= CAST(x_max as int) END`
///
/// ## Predicate Evaluation
Expand Down Expand Up @@ -1241,9 +1241,11 @@ fn build_single_column_expr(
/// if the column may contain null, and false if definitely does not
/// contain null.
/// If set `with_not` to true: which means is not null
/// because datafusion use false flag of expr result to prune unit (row group, page ..)
/// Given an expression reference to `expr`, if `expr` is a column expression,
/// returns a pruning expression in terms of IsNotNull that will evaluate to true
/// if the column not contain any null, and false if definitely contain null.
/// if the column may contain any non-null values, and false if definitely does not contain
/// non-null values null as all null values.
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved
fn build_is_null_column_expr(
expr: &Arc<dyn PhysicalExpr>,
schema: &Schema,
Expand All @@ -1254,26 +1256,39 @@ fn build_is_null_column_expr(
let field = schema.field_with_name(col.name()).ok()?;

let null_count_field = &Field::new(field.name(), DataType::UInt64, true);
required_columns
.null_count_column_expr(col, expr, null_count_field)
.map(|null_count_column_expr| {
if with_not {
// IsNotNull(column) => null_count = 0
Arc::new(phys_expr::BinaryExpr::new(
null_count_column_expr,
Operator::Eq,
Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
)) as _
} else {
if with_not {
if let Ok(row_count_expr) =
required_columns.row_count_column_expr(col, expr, null_count_field)
{
required_columns
.null_count_column_expr(col, expr, null_count_field)
alamb marked this conversation as resolved.
Show resolved Hide resolved
.map(|null_count_column_expr| {
// IsNotNull(column) => null_count == row_count
// but use false to prune the whole unit so need add the negate
let equal_expr = Arc::new(phys_expr::BinaryExpr::new(
null_count_column_expr,
Operator::Eq,
row_count_expr,
));
Arc::new(phys_expr::NotExpr::new(equal_expr)) as _
})
.ok()
} else {
return None;
}
} else {
required_columns
.null_count_column_expr(col, expr, null_count_field)
.map(|null_count_column_expr| {
// IsNull(column) => null_count > 0
Arc::new(phys_expr::BinaryExpr::new(
null_count_column_expr,
Operator::Gt,
Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
)) as _
}
})
.ok()
})
.ok()
}
} else {
None
}
Expand Down
65 changes: 56 additions & 9 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use arrow::{
record_batch::RecordBatch,
util::pretty::pretty_format_batches,
};
use arrow_array::new_null_array;
use arrow_array::make_array;
use chrono::{Datelike, Duration, TimeDelta};
use datafusion::{
datasource::{physical_plan::ParquetExec, provider_as_source, TableProvider},
Expand Down Expand Up @@ -77,6 +77,7 @@ enum Scenario {
ByteArray,
PeriodsInColumnNames,
WithNullValues,
WithNullValuesPageLevel,
}

enum Unit {
Expand Down Expand Up @@ -632,22 +633,60 @@ fn make_names_batch(name: &str, service_name_values: Vec<&str>) -> RecordBatch {
RecordBatch::try_new(schema, vec![Arc::new(name), Arc::new(service_name)]).unwrap()
}

/// Return record batch with i8, i16, i32, and i64 sequences with all Null values
fn make_all_null_values() -> RecordBatch {
/// Return record batch with i8, i16, i32, and i64 sequences with Null values
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would help to explain the shape of this data a bit more to help readers. Something like the following (though note it would change if you change the argments as I suggest below)

Suggested change
/// Return record batch with i8, i16, i32, and i64 sequences with Null values
/// Return record batch with i8, i16, i32, and i64 sequences with Null values.
///
/// The first `null_values` values are null and the following
/// `no_null_values_end - no_null_values_start` values are non null.

/// here 5 rows in page when using Unit::Page
fn make_int_batches_with_null(
null_values: usize,
no_null_values_start: usize,
no_null_values_end: usize,
Comment on lines +640 to +641
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you might be able to simplify this method by sending in 2 parameters. Right now it looks like it interleaves nulls arbitrarily but really the nulls are always at the start and non nulls are at the end

  num_nulls: usize,
  non_nulls: usize

) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("i8", DataType::Int8, true),
Field::new("i16", DataType::Int16, true),
Field::new("i32", DataType::Int32, true),
Field::new("i64", DataType::Int64, true),
]));

let v8: Vec<i8> = (no_null_values_start as _..no_null_values_end as _).collect();
let v16: Vec<i16> = (no_null_values_start as _..no_null_values_end as _).collect();
let v32: Vec<i32> = (no_null_values_start as _..no_null_values_end as _).collect();
let v64: Vec<i64> = (no_null_values_start as _..no_null_values_end as _).collect();

RecordBatch::try_new(
schema,
vec![
new_null_array(&DataType::Int8, 5),
new_null_array(&DataType::Int16, 5),
new_null_array(&DataType::Int32, 5),
new_null_array(&DataType::Int64, 5),
make_array(
Int8Array::from_iter(
v8.into_iter()
.map(Some)
.chain(std::iter::repeat(None).take(null_values)),
)
.to_data(),
),
make_array(
Int16Array::from_iter(
v16.into_iter()
.map(Some)
.chain(std::iter::repeat(None).take(null_values)),
)
.to_data(),
),
make_array(
Int32Array::from_iter(
v32.into_iter()
.map(Some)
.chain(std::iter::repeat(None).take(null_values)),
)
.to_data(),
),
make_array(
Int64Array::from_iter(
v64.into_iter()
.map(Some)
.chain(std::iter::repeat(None).take(null_values)),
)
.to_data(),
),
],
)
.unwrap()
Expand Down Expand Up @@ -824,9 +863,17 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
}
Scenario::WithNullValues => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please also add the "some null/some not null" null values case to this scenario as well?

                make_int_batches_with_null(5, 1, 6),

vec![
make_all_null_values(),
make_int_batches_with_null(5, 0, 0),
make_int_batches(1, 6),
make_all_null_values(),
make_int_batches_with_null(5, 0, 0),
]
}
Scenario::WithNullValuesPageLevel => {
vec![
make_int_batches_with_null(5, 1, 6),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this scenario should have at least one batch of all nulls (e.g. make_int_batches_with_nulls(5, 0, 0))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this will be prune by row group -> column level

make_int_batches(1, 11),
make_int_batches_with_null(1, 1, 10),
make_int_batches_with_null(5, 1, 6),
]
}
}
Expand Down
49 changes: 49 additions & 0 deletions datafusion/core/tests/parquet/page_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,55 @@ async fn without_pushdown_filter() {
assert!(bytes_scanned_with_filter > bytes_scanned_without_filter);
}

#[tokio::test]
// Data layout like this:
// row_group1: page1(1~5), page2(All Null)
// row_group2: page1(1~5), page2(6~10)
// row_group3: page1(1~5), page2(6~9 + Null)
// row_group4: page1(1~5), page2(All Null)
// total 40 rows
async fn test_pages_with_null_values() {
test_prune(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It took me a while to convince myself that this was actually setting up the scenario as described. I eventually found it here:

https://github.com/alamb/arrow-datafusion/blob/dee926519030301f052dc2c3196e4fbef0da4c47/datafusion/core/tests/parquet/mod.rs#L916-L920

I wonder if it would be possible to add some better comments to test_prune mentioning that the created parquet files have 5 rows per page

Copy link
Member Author

@Ted-Jiang Ted-Jiang Apr 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i will make another pr to improve this tests

Scenario::WithNullValuesPageLevel,
"SELECT * FROM t where i8 <= 6",
Some(0),
// expect prune two pages which 10 rows
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved
Some(10),
22,
)
.await;

test_prune(
Scenario::WithNullValuesPageLevel,
"SELECT * FROM t where \"i16\" is not null",
Some(0),
// expect prune two pages which 10 rows
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved
Some(10),
29,
)
.await;

test_prune(
Scenario::WithNullValuesPageLevel,
"SELECT * FROM t where \"i32\" is null",
Some(0),
// expect prune 5 pages which 25 rows
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved
Some(25),
11,
)
.await;

test_prune(
Scenario::WithNullValuesPageLevel,
"SELECT * FROM t where \"i64\" > 6",
Some(0),
// 6 pages will be pruned which 30 rows
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved
Some(30),
7,
)
.await;
}

fn cast_count_metric(metric: MetricValue) -> Option<usize> {
match metric {
MetricValue::Count { count, .. } => Some(count.value()),
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/parquet/row_group_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1296,7 +1296,7 @@ async fn test_row_group_with_null_values() {
.test_row_group_prune()
.await;

// After pruning, only row group 2should be selected
// After pruning, only row group 2 should be selected
RowGroupPruningTest::new()
.with_scenario(Scenario::WithNullValues)
.with_query("SELECT * FROM t WHERE \"i16\" is Not Null")
Expand Down
Loading