diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs index a82c5d97a2b7..8df4925fc566 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs @@ -234,6 +234,10 @@ impl PruningStatistics for BloomFilterStatistics { ScalarValue::Int32(Some(v)) => sbbf.check(v), ScalarValue::Int16(Some(v)) => sbbf.check(v), ScalarValue::Int8(Some(v)) => sbbf.check(v), + ScalarValue::UInt64(Some(v)) => sbbf.check(v), + ScalarValue::UInt32(Some(v)) => sbbf.check(v), + ScalarValue::UInt16(Some(v)) => sbbf.check(v), + ScalarValue::UInt8(Some(v)) => sbbf.check(v), ScalarValue::Decimal128(Some(v), p, s) => match parquet_type { Type::INT32 => { //https://github.com/apache/parquet-format/blob/eb4b31c1d64a01088d02a2f9aefc6c17c54cc6fc/Encodings.md?plain=1#L35-L42 diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index 1da86a0363a5..b4415d638ada 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -66,6 +66,7 @@ enum Scenario { Int, Int32Range, UInt, + UInt32Range, Float64, Decimal, DecimalBloomFilterInt32, @@ -455,6 +456,13 @@ fn make_int32_range(start: i32, end: i32) -> RecordBatch { RecordBatch::try_new(schema, vec![array.clone()]).unwrap() } +fn make_uint32_range(start: u32, end: u32) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![Field::new("u", DataType::UInt32, true)])); + let v = vec![start, end]; + let array = Arc::new(UInt32Array::from(v)) as ArrayRef; + RecordBatch::try_new(schema, vec![array.clone()]).unwrap() +} + /// Return record batch with f64 vector /// /// Columns are named @@ -659,6 +667,9 @@ fn create_data_batch(scenario: Scenario) -> Vec { make_uint_batches(250, 255), ] } + Scenario::UInt32Range => { + vec![make_uint32_range(0, 10), make_uint32_range(200000, 300000)] + } Scenario::Float64 => { vec![ make_f64_batch(vec![-5.0, -4.0, -3.0, -2.0, -1.0]), diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs index b70102f78a96..b7b434d1c3d3 100644 --- a/datafusion/core/tests/parquet/row_group_pruning.rs +++ b/datafusion/core/tests/parquet/row_group_pruning.rs @@ -339,7 +339,7 @@ macro_rules! int_tests { async fn []() { RowGroupPruningTest::new() .with_scenario(Scenario::Int) - .with_query(&format!("SELECT * FROM t where i{} = 1", $bits)) + .with_query(&format!("SELECT * FROM t where abs(i{}) = 1 and i{} = 1", $bits, $bits)) .with_expected_errors(Some(0)) .with_matched_by_stats(Some(1)) .with_pruned_by_stats(Some(3)) @@ -452,6 +452,144 @@ int_tests!(16, correct_bloom_filters: false); int_tests!(32, correct_bloom_filters: true); int_tests!(64, correct_bloom_filters: true); +// $bits: number of bits of the integer to test (8, 16, 32, 64) +// $correct_bloom_filters: if false, replicates the +// https://github.com/apache/arrow-datafusion/issues/9779 bug so that tests pass +// if and only if Bloom filters on UInt8 and UInt16 columns are still buggy. +macro_rules! uint_tests { + ($bits:expr, correct_bloom_filters: $correct_bloom_filters:expr) => { + paste::item! { + #[tokio::test] + async fn []() { + RowGroupPruningTest::new() + .with_scenario(Scenario::UInt) + .with_query(&format!("SELECT * FROM t where u{} < 6", $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(3)) + .with_pruned_by_stats(Some(1)) + .with_matched_by_bloom_filter(Some(0)) + .with_pruned_by_bloom_filter(Some(0)) + .with_expected_rows(11) + .test_row_group_prune() + .await; + } + + #[tokio::test] + async fn []() { + RowGroupPruningTest::new() + .with_scenario(Scenario::UInt) + .with_query(&format!("SELECT * FROM t where u{} = 6", $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(1)) + .with_pruned_by_stats(Some(3)) + .with_matched_by_bloom_filter(Some(if $correct_bloom_filters { 1 } else { 0 })) + .with_pruned_by_bloom_filter(Some(if $correct_bloom_filters { 0 } else { 1 })) + .with_expected_rows(if $correct_bloom_filters { 1 } else { 0 }) + .test_row_group_prune() + .await; + } + #[tokio::test] + async fn []() { + RowGroupPruningTest::new() + .with_scenario(Scenario::UInt) + .with_query(&format!("SELECT * FROM t where power(u{}, 2) = 36 and u{} = 6", $bits, $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(1)) + .with_pruned_by_stats(Some(3)) + .with_matched_by_bloom_filter(Some(if $correct_bloom_filters { 1 } else { 0 })) + .with_pruned_by_bloom_filter(Some(if $correct_bloom_filters { 0 } else { 1 })) + .with_expected_rows(if $correct_bloom_filters { 1 } else { 0 }) + .test_row_group_prune() + .await; + } + + #[tokio::test] + async fn []() { + RowGroupPruningTest::new() + .with_scenario(Scenario::UInt) + .with_query(&format!("SELECT * FROM t where power(u{}, 2) = 25", $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(0)) + .with_pruned_by_stats(Some(0)) + .with_matched_by_bloom_filter(Some(0)) + .with_pruned_by_bloom_filter(Some(0)) + .with_expected_rows(2) + .test_row_group_prune() + .await; + } + + #[tokio::test] + async fn []() { + RowGroupPruningTest::new() + .with_scenario(Scenario::UInt) + .with_query(&format!("SELECT * FROM t where u{}+1 = 6", $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(0)) + .with_pruned_by_stats(Some(0)) + .with_matched_by_bloom_filter(Some(0)) + .with_pruned_by_bloom_filter(Some(0)) + .with_expected_rows(2) + .test_row_group_prune() + .await; + } + + #[tokio::test] + async fn []() { + // result of sql "SELECT * FROM t where in (1)" + RowGroupPruningTest::new() + .with_scenario(Scenario::UInt) + .with_query(&format!("SELECT * FROM t where u{} in (6)", $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(1)) + .with_pruned_by_stats(Some(3)) + .with_matched_by_bloom_filter(Some(if $correct_bloom_filters { 1 } else { 0 })) + .with_pruned_by_bloom_filter(Some(if $correct_bloom_filters { 0 } else { 1 })) + .with_expected_rows(if $correct_bloom_filters { 1 } else { 0 }) + .test_row_group_prune() + .await; + } + + #[tokio::test] + async fn []() { + // result of sql "SELECT * FROM t where in (1000)", prune all + // test whether statistics works + RowGroupPruningTest::new() + .with_scenario(Scenario::UInt) + .with_query(&format!("SELECT * FROM t where u{} in (100)", $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(0)) + .with_pruned_by_stats(Some(4)) + .with_matched_by_bloom_filter(Some(0)) + .with_pruned_by_bloom_filter(Some(0)) + .with_expected_rows(0) + .test_row_group_prune() + .await; + } + + #[tokio::test] + async fn []() { + // result of sql "SELECT * FROM t where not in (1)" prune nothing + RowGroupPruningTest::new() + .with_scenario(Scenario::UInt) + .with_query(&format!("SELECT * FROM t where u{} not in (6)", $bits)) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(4)) + .with_pruned_by_stats(Some(0)) + .with_matched_by_bloom_filter(Some(4)) + .with_pruned_by_bloom_filter(Some(0)) + .with_expected_rows(19) + .test_row_group_prune() + .await; + } + } + }; +} + +uint_tests!(8, correct_bloom_filters: false); +uint_tests!(16, correct_bloom_filters: false); +uint_tests!(32, correct_bloom_filters: true); +uint_tests!(64, correct_bloom_filters: true); + #[tokio::test] async fn prune_int32_eq_large_in_list() { // result of sql "SELECT * FROM t where i in (2050...2582)", prune all @@ -474,6 +612,28 @@ async fn prune_int32_eq_large_in_list() { .await; } +#[tokio::test] +async fn prune_uint32_eq_large_in_list() { + // result of sql "SELECT * FROM t where i in (2050...2582)", prune all + RowGroupPruningTest::new() + .with_scenario(Scenario::UInt32Range) + .with_query( + format!( + "SELECT * FROM t where u in ({})", + (200050..200082).join(",") + ) + .as_str(), + ) + .with_expected_errors(Some(0)) + .with_matched_by_stats(Some(1)) + .with_pruned_by_stats(Some(0)) + .with_matched_by_bloom_filter(Some(0)) + .with_pruned_by_bloom_filter(Some(1)) + .with_expected_rows(0) + .test_row_group_prune() + .await; +} + #[tokio::test] async fn prune_f64_lt() { RowGroupPruningTest::new()