Skip to content

Commit

Permalink
Minor: refactor bloom filter tests to reduce duplication
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Dec 18, 2023
1 parent d33ca4d commit 08a274a
Showing 1 changed file with 153 additions and 190 deletions.
343 changes: 153 additions & 190 deletions datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1075,82 +1075,28 @@ mod tests {
create_physical_expr(expr, &df_schema, schema, &execution_props).unwrap()
}

// Note the values in the `String` column are:
// ❯ select * from './parquet-testing/data/data_index_bloom_encoding_stats.parquet';
// +-----------+
// | String |
// +-----------+
// | Hello |
// | This is |
// | a |
// | test |
// | How |
// | are you |
// | doing |
// | today |
// | the quick |
// | brown fox |
// | jumps |
// | over |
// | the lazy |
// | dog |
// +-----------+
#[tokio::test]
async fn test_row_group_bloom_filter_pruning_predicate_simple_expr() {
// load parquet file
let testdata = datafusion_common::test_util::parquet_test_data();
let file_name = "data_index_bloom_encoding_stats.parquet";
let path = format!("{testdata}/{file_name}");
let data = bytes::Bytes::from(std::fs::read(path).unwrap());

// generate pruning predicate `(String = "Hello_Not_exists")`
let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]);
let expr = col(r#""String""#).eq(lit("Hello_Not_Exists"));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();

let row_groups = vec![0];
let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
file_name,
data,
&pruning_predicate,
&row_groups,
)
.await
.unwrap();
assert!(pruned_row_groups.is_empty());
BloomFilterTest::new_data_index_bloom_encoding_stats()
.with_expect_all_pruned()
// generate pruning predicate `(String = "Hello_Not_exists")`
.run(col(r#""String""#).eq(lit("Hello_Not_Exists")))
.await
}

#[tokio::test]
async fn test_row_group_bloom_filter_pruning_predicate_mutiple_expr() {
// load parquet file
let testdata = datafusion_common::test_util::parquet_test_data();
let file_name = "data_index_bloom_encoding_stats.parquet";
let path = format!("{testdata}/{file_name}");
let data = bytes::Bytes::from(std::fs::read(path).unwrap());

// generate pruning predicate `(String = "Hello_Not_exists" OR String = "Hello_Not_exists2")`
let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]);
let expr = lit("1").eq(lit("1")).and(
col(r#""String""#)
.eq(lit("Hello_Not_Exists"))
.or(col(r#""String""#).eq(lit("Hello_Not_Exists2"))),
);
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();

let row_groups = vec![0];
let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
file_name,
data,
&pruning_predicate,
&row_groups,
)
.await
.unwrap();
assert!(pruned_row_groups.is_empty());
BloomFilterTest::new_data_index_bloom_encoding_stats()
.with_expect_all_pruned()
// generate pruning predicate `(String = "Hello_Not_exists" OR String = "Hello_Not_exists2")`
.run(
lit("1").eq(lit("1")).and(
col(r#""String""#)
.eq(lit("Hello_Not_Exists"))
.or(col(r#""String""#).eq(lit("Hello_Not_Exists2"))),
),
)
.await
}

#[tokio::test]
Expand Down Expand Up @@ -1186,144 +1132,161 @@ mod tests {

#[tokio::test]
async fn test_row_group_bloom_filter_pruning_predicate_with_exists_value() {
// load parquet file
let testdata = datafusion_common::test_util::parquet_test_data();
let file_name = "data_index_bloom_encoding_stats.parquet";
let path = format!("{testdata}/{file_name}");
let data = bytes::Bytes::from(std::fs::read(path).unwrap());

// generate pruning predicate `(String = "Hello")`
let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]);
let expr = col(r#""String""#).eq(lit("Hello"));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();

let row_groups = vec![0];
let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
file_name,
data,
&pruning_predicate,
&row_groups,
)
.await
.unwrap();
assert_eq!(pruned_row_groups, row_groups);
BloomFilterTest::new_data_index_bloom_encoding_stats()
.with_expect_none_pruned()
// generate pruning predicate `(String = "Hello")`
.run(col(r#""String""#).eq(lit("Hello")))
.await
}

#[tokio::test]
async fn test_row_group_bloom_filter_pruning_predicate_with_exists_2_values() {
// load parquet file
let testdata = datafusion_common::test_util::parquet_test_data();
let file_name = "data_index_bloom_encoding_stats.parquet";
let path = format!("{testdata}/{file_name}");
let data = bytes::Bytes::from(std::fs::read(path).unwrap());

// generate pruning predicate `(String = "Hello") OR (String = "the quick")`
let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]);
let expr = col(r#""String""#)
.eq(lit("Hello"))
.or(col(r#""String""#).eq(lit("the quick")));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();

let row_groups = vec![0];
let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
file_name,
data,
&pruning_predicate,
&row_groups,
)
.await
.unwrap();
assert_eq!(pruned_row_groups, row_groups);
BloomFilterTest::new_data_index_bloom_encoding_stats()
.with_expect_none_pruned()
// generate pruning predicate `(String = "Hello") OR (String = "the quick")`
.run(
col(r#""String""#)
.eq(lit("Hello"))
.or(col(r#""String""#).eq(lit("the quick"))),
)
.await
}

#[tokio::test]
async fn test_row_group_bloom_filter_pruning_predicate_with_exists_3_values() {
// load parquet file
let testdata = datafusion_common::test_util::parquet_test_data();
let file_name = "data_index_bloom_encoding_stats.parquet";
let path = format!("{testdata}/{file_name}");
let data = bytes::Bytes::from(std::fs::read(path).unwrap());

// generate pruning predicate `(String = "Hello") OR (String = "the quick") OR (String = "are you")`
let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]);
let expr = col(r#""String""#)
.eq(lit("Hello"))
.or(col(r#""String""#).eq(lit("the quick")))
.or(col(r#""String""#).eq(lit("are you")));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();

let row_groups = vec![0];
let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
file_name,
data,
&pruning_predicate,
&row_groups,
)
.await
.unwrap();
assert_eq!(pruned_row_groups, row_groups);
BloomFilterTest::new_data_index_bloom_encoding_stats()
.with_expect_none_pruned()
// generate pruning predicate `(String = "Hello") OR (String = "the quick") OR (String = "are you")`
.run(
col(r#""String""#)
.eq(lit("Hello"))
.or(col(r#""String""#).eq(lit("the quick")))
.or(col(r#""String""#).eq(lit("are you"))),
)
.await
}

#[tokio::test]
async fn test_row_group_bloom_filter_pruning_predicate_with_or_not_eq() {
// load parquet file
let testdata = datafusion_common::test_util::parquet_test_data();
let file_name = "data_index_bloom_encoding_stats.parquet";
let path = format!("{testdata}/{file_name}");
let data = bytes::Bytes::from(std::fs::read(path).unwrap());

// generate pruning predicate `(String = "foo") OR (String != "bar")`
let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]);
let expr = col(r#""String""#)
.not_eq(lit("foo"))
.or(col(r#""String""#).not_eq(lit("bar")));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();

let row_groups = vec![0];
let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
file_name,
data,
&pruning_predicate,
&row_groups,
)
.await
.unwrap();
assert_eq!(pruned_row_groups, row_groups);
BloomFilterTest::new_data_index_bloom_encoding_stats()
.with_expect_none_pruned()
// generate pruning predicate `(String = "foo") OR (String != "bar")`
.run(
col(r#""String""#)
.not_eq(lit("foo"))
.or(col(r#""String""#).not_eq(lit("bar"))),
)
.await
}

#[tokio::test]
async fn test_row_group_bloom_filter_pruning_predicate_without_bloom_filter() {
// load parquet file
let testdata = datafusion_common::test_util::parquet_test_data();
let file_name = "alltypes_plain.parquet";
let path = format!("{testdata}/{file_name}");
let data = bytes::Bytes::from(std::fs::read(path).unwrap());

// generate pruning predicate on a column without a bloom filter
let schema = Schema::new(vec![Field::new("string_col", DataType::Utf8, false)]);
let expr = col(r#""string_col""#).eq(lit("0"));
let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
BloomFilterTest::new_all_types()
.with_expect_none_pruned()
.run(col(r#""string_col""#).eq(lit("0")))
.await
}

let row_groups = vec![0];
let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
file_name,
data,
&pruning_predicate,
&row_groups,
)
.await
.unwrap();
assert_eq!(pruned_row_groups, row_groups);
struct BloomFilterTest {
file_name: String,
schema: Schema,
// which row groups should be attempted to prune
row_groups: Vec<usize>,
// which row groups are expected to be left after pruning. Must be set
// otherwise will panic on run()
post_pruning_row_groups: Option<Vec<usize>>,
}

impl BloomFilterTest {
/// Return a test for data_index_bloom_encoding_stats.parquet
/// Note the values in the `String` column are:
/// ```sql
/// ❯ select * from './parquet-testing/data/data_index_bloom_encoding_stats.parquet';
/// +-----------+
/// | String |
/// +-----------+
/// | Hello |
/// | This is |
/// | a |
/// | test |
/// | How |
/// | are you |
/// | doing |
/// | today |
/// | the quick |
/// | brown fox |
/// | jumps |
/// | over |
/// | the lazy |
/// | dog |
/// +-----------+
/// ```
fn new_data_index_bloom_encoding_stats() -> Self {
Self {
file_name: String::from("data_index_bloom_encoding_stats.parquet"),
schema: Schema::new(vec![Field::new("String", DataType::Utf8, false)]),
row_groups: vec![0],
post_pruning_row_groups: None,
}
}

// Return a test for alltypes_plain.parquet
fn new_all_types() -> Self {
Self {
file_name: String::from("alltypes_plain.parquet"),
schema: Schema::new(vec![Field::new(
"string_col",
DataType::Utf8,
false,
)]),
row_groups: vec![0],
post_pruning_row_groups: None,
}
}

/// Expect all row groups to be pruned
pub fn with_expect_all_pruned(mut self) -> Self {
self.post_pruning_row_groups = Some(vec![]);
self
}

/// Expect all row groups not to be pruned
pub fn with_expect_none_pruned(mut self) -> Self {
self.post_pruning_row_groups = Some(self.row_groups.clone());
self
}

/// Prune this file using the specified expression and check that the expected row groups are left
async fn run(self, expr: Expr) {
let Self {
file_name,
schema,
row_groups,
post_pruning_row_groups,
} = self;

let post_pruning_row_groups =
post_pruning_row_groups.expect("post_pruning_row_groups must be set");

let testdata = datafusion_common::test_util::parquet_test_data();
let path = format!("{testdata}/{file_name}");
let data = bytes::Bytes::from(std::fs::read(path).unwrap());

let expr = logical2physical(&expr, &schema);
let pruning_predicate =
PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();

let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
&file_name,
data,
&pruning_predicate,
&row_groups,
)
.await
.unwrap();
assert_eq!(pruned_row_groups, post_pruning_row_groups);
}
}

async fn test_row_group_bloom_filter_pruning_predicate(
Expand Down

0 comments on commit 08a274a

Please sign in to comment.