Skip to content

Commit

Permalink
Add tests for row group pruning on strings
Browse files Browse the repository at this point in the history
  • Loading branch information
progval committed Mar 18, 2024
1 parent e53eb03 commit ed73847
Show file tree
Hide file tree
Showing 2 changed files with 211 additions and 3 deletions.
112 changes: 109 additions & 3 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
use arrow::array::Decimal128Array;
use arrow::{
array::{
Array, ArrayRef, Date32Array, Date64Array, Float64Array, Int32Array, StringArray,
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray,
Array, ArrayRef, BinaryArray, Date32Array, Date64Array, FixedSizeBinaryArray,
Float64Array, Int32Array, StringArray, TimestampMicrosecondArray,
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
Expand Down Expand Up @@ -70,6 +70,7 @@ enum Scenario {
DecimalBloomFilterInt64,
DecimalLargePrecision,
DecimalLargePrecisionBloomFilter,
ByteArray,
PeriodsInColumnNames,
}

Expand Down Expand Up @@ -506,6 +507,51 @@ fn make_date_batch(offset: Duration) -> RecordBatch {
.unwrap()
}

/// returns a batch with two columns (note "service.name" is the name
/// of the column. It is *not* a table named service.name
///
/// name | service.name
fn make_bytearray_batch(
name: &str,
string_values: Vec<&str>,
binary_values: Vec<&[u8]>,
fixedsize_values: Vec<&[u8; 3]>,
) -> RecordBatch {
let num_rows = string_values.len();
let name: StringArray = std::iter::repeat(Some(name)).take(num_rows).collect();
let service_string: StringArray = string_values.iter().map(Some).collect();
let service_binary: BinaryArray = binary_values.iter().map(Some).collect();
let service_fixedsize: FixedSizeBinaryArray = fixedsize_values
.iter()
.map(|value| Some(value.as_slice()))
.collect::<Vec<_>>()
.into();

let schema = Schema::new(vec![
Field::new("name", name.data_type().clone(), true),
// note the column name has a period in it!
Field::new("service_string", service_string.data_type().clone(), true),
Field::new("service_binary", service_binary.data_type().clone(), true),
Field::new(
"service_fixedsize",
service_fixedsize.data_type().clone(),
true,
),
]);
let schema = Arc::new(schema);

RecordBatch::try_new(
schema,
vec![
Arc::new(name),
Arc::new(service_string),
Arc::new(service_binary),
Arc::new(service_fixedsize),
],
)
.unwrap()
}

/// returns a batch with two columns (note "service.name" is the name
/// of the column. It is *not* a table named service.name
///
Expand Down Expand Up @@ -604,6 +650,66 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
make_decimal_batch(vec![100000, 200000, 300000, 400000, 600000], 38, 5),
]
}
Scenario::ByteArray => {
// frontends first, then backends. All in order, except frontends 4 and 7
// are swapped to cause a statistics false positive on the 'fixed size' column.
vec![
make_bytearray_batch(
"all frontends",
vec![
"frontend one",
"frontend two",
"frontend three",
"frontend seven",
"frontend five",
],
vec![
b"frontend one",
b"frontend two",
b"frontend three",
b"frontend seven",
b"frontend five",
],
vec![b"fe1", b"fe2", b"fe3", b"fe7", b"fe5"],
),
make_bytearray_batch(
"mixed",
vec![
"frontend six",
"frontend four",
"backend one",
"backend two",
"backend three",
],
vec![
b"frontend six",
b"frontend four",
b"backend one",
b"backend two",
b"backend three",
],
vec![b"fe6", b"fe4", b"be1", b"be2", b"be3"],
),
make_bytearray_batch(
"all backends",
vec![
"backend four",
"backend five",
"backend six",
"backend seven",
"backend eight",
],
vec![
b"backend four",
b"backend five",
b"backend six",
b"backend seven",
b"backend eight",
],
vec![b"be4", b"be5", b"be6", b"be7", b"be8"],
),
]
}
Scenario::PeriodsInColumnNames => {
vec![
// all frontend
Expand Down
102 changes: 102 additions & 0 deletions datafusion/core/tests/parquet/row_group_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,108 @@ async fn prune_decimal_in_list() {
.await;
}

#[tokio::test]
async fn prune_string_eq_match() {
RowGroupPruningTest::new()
.with_scenario(Scenario::ByteArray)
.with_query(
"SELECT name, service_string FROM t WHERE service_string = 'backend one'",
)
.with_expected_errors(Some(0))
// false positive on 'all backends' batch: 'backend five' < 'backend one' < 'backend three'
.with_matched_by_stats(Some(2))
.with_pruned_by_stats(Some(1))
.with_matched_by_bloom_filter(Some(1))
.with_pruned_by_bloom_filter(Some(1))
.with_expected_rows(1)
.test_row_group_prune()
.await;
}

#[tokio::test]
async fn prune_string_eq_no_match() {
RowGroupPruningTest::new()
.with_scenario(Scenario::ByteArray)
.with_query(
"SELECT name, service_string FROM t WHERE service_string = 'backend nine'",
)
.with_expected_errors(Some(0))
// false positive on 'all backends' batch: 'backend five' < 'backend one' < 'backend three'
.with_matched_by_stats(Some(1))
.with_pruned_by_stats(Some(2))
.with_matched_by_bloom_filter(Some(0))
.with_pruned_by_bloom_filter(Some(1))
.with_expected_rows(0)
.test_row_group_prune()
.await;

RowGroupPruningTest::new()
.with_scenario(Scenario::ByteArray)
.with_query(
"SELECT name, service_string FROM t WHERE service_string = 'frontend nine'",
)
.with_expected_errors(Some(0))
// false positive on 'all frontends' batch: 'frontend five' < 'frontend nine' < 'frontend two'
// false positive on 'mixed' batch: 'backend one' < 'frontend nine' < 'frontend six'
.with_matched_by_stats(Some(2))
.with_pruned_by_stats(Some(1))
.with_matched_by_bloom_filter(Some(0))
.with_pruned_by_bloom_filter(Some(2))
.with_expected_rows(0)
.test_row_group_prune()
.await;
}

#[tokio::test]
async fn prune_string_neq() {
RowGroupPruningTest::new()
.with_scenario(Scenario::ByteArray)
.with_query(
"SELECT name, service_string FROM t WHERE service_string != 'backend one'",
)
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(3))
.with_pruned_by_stats(Some(0))
.with_matched_by_bloom_filter(Some(3))
.with_pruned_by_bloom_filter(Some(0))
.with_expected_rows(14)
.test_row_group_prune()
.await;
}

#[tokio::test]
async fn prune_string_lt() {
RowGroupPruningTest::new()
.with_scenario(Scenario::ByteArray)
.with_query(
"SELECT name, service_string FROM t WHERE service_string < 'backend one'",
)
.with_expected_errors(Some(0))
// matches 'all backends' only
.with_matched_by_stats(Some(1))
.with_pruned_by_stats(Some(2))
.with_matched_by_bloom_filter(Some(0))
.with_pruned_by_bloom_filter(Some(0))
.with_expected_rows(3)
.test_row_group_prune()
.await;

RowGroupPruningTest::new()
.with_scenario(Scenario::ByteArray)
.with_query(
"SELECT name, service_string FROM t WHERE service_string < 'backend zero'",
)
.with_expected_errors(Some(0))
.with_matched_by_stats(Some(2))
.with_pruned_by_stats(Some(1))
.with_matched_by_bloom_filter(Some(0))
.with_pruned_by_bloom_filter(Some(0))
// all backends from 'mixed' and 'all backends'
.with_expected_rows(8)
.test_row_group_prune()
.await;
}

#[tokio::test]
async fn prune_periods_in_column_names() {
// There are three row groups for "service.name", each with 5 rows = 15 rows total
Expand Down

0 comments on commit ed73847

Please sign in to comment.