Skip to content

Commit

Permalink
Make pruning tests more readable
Browse files Browse the repository at this point in the history
  • Loading branch information
Blajda committed Mar 10, 2023
1 parent e57d5c2 commit bd6a4d2
Showing 1 changed file with 85 additions and 104 deletions.
189 changes: 85 additions & 104 deletions rust/tests/datafusion_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,29 @@ fn create_all_types_batch(not_null_rows: usize, null_rows: usize, offset: usize)
RecordBatch::try_new(schema, data).unwrap()
}

struct TestCase {
column: &'static str,
file1_value: Expr,
file2_value: Expr,
file3_value: Expr,
non_existent_value: Expr,
}

impl TestCase {
fn new<F>(column: &'static str, expression_builder: F) -> Self
where
F: Fn(i64) -> Expr,
{
TestCase {
column,
file1_value: expression_builder(1),
file2_value: expression_builder(5),
file3_value: expression_builder(8),
non_existent_value: expression_builder(3),
}
}
}

#[tokio::test]
async fn test_files_scanned() -> Result<()> {
// Validate that datafusion prunes files based on file statistics
Expand All @@ -415,10 +438,6 @@ async fn test_files_scanned() -> Result<()> {
.unwrap()
}

fn to_binary(s: &str) -> Vec<u8> {
s.as_bytes().to_owned()
}

let batch = create_all_types_batch(3, 0, 0);
let (_tmp, table) = prepare_table(vec![batch], SaveMode::Overwrite, vec![]).await;

Expand All @@ -433,108 +452,66 @@ async fn test_files_scanned() -> Result<()> {

// (Column name, value from file 1, value from file 2, value from file 3, non existant value)
let tests = [
("utf8", lit("1"), lit("5"), lit("8"), lit("3")),
(
"int64",
lit(1 as i64),
lit(5 as i64),
lit(8 as i64),
lit(3 as i64),
),
(
"int32",
lit(1 as i32),
lit(5 as i32),
lit(8 as i32),
lit(3 as i32),
),
(
"int16",
lit(1 as i16),
lit(5 as i16),
lit(8 as i16),
lit(3 as i16),
),
(
"int8",
lit(1 as i8),
lit(5 as i8),
lit(8 as i8),
lit(3 as i8),
),
(
"float64",
lit(1 as f64),
lit(5 as f64),
lit(8 as f64),
lit(3 as f64),
),
(
"float32",
lit(1 as f32),
lit(5 as f32),
lit(8 as f32),
lit(3 as f32),
),
TestCase::new("utf8", |value| lit(value.to_string())),
TestCase::new("int64", |value| lit(value)),
TestCase::new("int32", |value| lit(value as i32)),
TestCase::new("int16", |value| lit(value as i16)),
TestCase::new("int8", |value| lit(value as i8)),
TestCase::new("float64", |value| lit(value as f64)),
TestCase::new("float32", |value| lit(value as f32)),
TestCase::new("timestamp", |value| {
lit(ScalarValue::TimestampMicrosecond(
Some(value * 1_000_000),
None,
))
}),
// TODO: I think decimal statistics are being written to the log incorrectly. The underlying i128 is written
// not the proper string representation as specified by the percision and scale
(
"decimal",
lit(Decimal128(Some(100), 10, 2)),
lit(Decimal128(Some(500), 10, 2)),
lit(Decimal128(Some(800), 10, 2)),
lit(Decimal128(Some(300), 10, 2)),
),
(
"timestamp",
lit(ScalarValue::TimestampMicrosecond(Some(1 * 1_000_000), None)),
lit(ScalarValue::TimestampMicrosecond(Some(5 * 1_000_000), None)),
lit(ScalarValue::TimestampMicrosecond(Some(8 * 1_000_000), None)),
lit(ScalarValue::TimestampMicrosecond(Some(3 * 1_000_000), None)),
),
TestCase::new("decimal", |value| {
lit(Decimal128(Some((value * 100).into()), 10, 2))
}),
// TODO: The writer does not write complete statistiics for date columns
(
"date",
lit(ScalarValue::Date32(Some(1))),
lit(ScalarValue::Date32(Some(5))),
lit(ScalarValue::Date32(Some(8))),
lit(ScalarValue::Date32(Some(3))),
),
TestCase::new("date", |value| lit(ScalarValue::Date32(Some(value as i32)))),
// TODO: The writer does not write complete statistics for binary columns
(
"binary",
lit(to_binary("1")),
lit(to_binary("5")),
lit(to_binary("8")),
lit(to_binary("3")),
),
TestCase::new("binary", |value| lit(value.to_string().as_bytes())),
];

for test in &tests {
let test = test.to_owned();
//TODO: The following types either have proper stats written.
if test.0 == "decimal" || test.0 == "date" || test.0 == "binary" {
let TestCase {
column,
file1_value,
file2_value,
file3_value,
non_existent_value,
} = test.to_owned();
let column = column.to_owned();
//TODO: The following types don't have proper stats written.
if column == "decimal" || column == "date" || column == "binary" {
continue;
}
println!("Test Column: {}", test.0);
println!("Test Column: {}", column);

// Equality
let e = col(test.0).eq(test.1.clone());
let e = col(column).eq(file1_value.clone());
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 1);

// Value does not exist
let e = col(test.0).eq(test.4);
let e = col(column).eq(non_existent_value.clone());
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 0);

// Conjuction
let e = col(test.0).gt(test.1.clone()).and(col(test.0).lt(test.2));
let e = col(column)
.gt(file1_value.clone())
.and(col(column).lt(file2_value.clone()));
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 2);

// Disjunction
let e = col(test.0).lt(test.1).or(col(test.0).gt(test.3));
let e = col(column)
.lt(file1_value.clone())
.or(col(column).gt(file3_value.clone()));
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 2);
}
Expand All @@ -555,20 +532,26 @@ async fn test_files_scanned() -> Result<()> {

// Ensure that tables with stats and partition columns can be pruned
for test in tests {
let TestCase {
column,
file1_value,
file2_value,
file3_value,
non_existent_value,
} = test;
//TODO: Float, timestamp, decimal, date, binary partitions are not supported by the writer
if test.0 == "float32"
|| test.0 == "float64"
|| test.0 == "timestamp"
|| test.0 == "decimal"
|| test.0 == "date"
|| test.0 == "binary"
if column == "float32"
|| column == "float64"
|| column == "timestamp"
|| column == "decimal"
|| column == "date"
|| column == "binary"
{
continue;
}
println!("test {}", column);

println!("test {}", test.0);

let partitions = vec![test.0.to_owned()];
let partitions = vec![column.to_owned()];
let batch = create_all_types_batch(3, 0, 0);
let (_tmp, table) = prepare_table(vec![batch], SaveMode::Overwrite, partitions).await;

Expand All @@ -579,43 +562,41 @@ async fn test_files_scanned() -> Result<()> {
let table = append_to_table(table, batch).await;

// Equality
let e = col(test.0).eq(test.1.clone());
let e = col(column).eq(file1_value.clone());
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 1);

// Value does not exist
let e = col(test.0).eq(test.4);
let e = col(column).eq(non_existent_value);
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 0);

// Conjuction
let e = col(test.0).gt(test.1.clone()).and(col(test.0).lt(test.2));
let e = col(column)
.gt(file1_value.clone())
.and(col(column).lt(file2_value));
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 2);

// Disjunction
let e = col(test.0).lt(test.1).or(col(test.0).gt(test.3));
let e = col(column).lt(file1_value).or(col(column).gt(file3_value));
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 2);

// Validate null pruning
let batch = create_all_types_batch(5, 2, 0);
let partitions = vec![test.0.to_owned()];
let partitions = vec![column.to_owned()];
let (_tmp, table) = prepare_table(vec![batch], SaveMode::Overwrite, partitions).await;

let e = col(test.0).is_null();
let e = col(column).is_null();
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
assert_eq!(metrics.num_scanned_files(), 1);

/* logically we should be able to prune the null partition but Datafusion's current implementation prevents this */
/*
let e = col(value.0).is_not_null();
let e = col(column).is_not_null();
let metrics = get_scan_metrics(&table, &state, &[e]).await?;
if value.0 == "boolean" {
assert_eq!(metrics.num_scanned_files(), 2);
} else {
assert_eq!(metrics.num_scanned_files(), 5);
}
assert_eq!(metrics.num_scanned_files(), 5);
*/
}

Expand Down

0 comments on commit bd6a4d2

Please sign in to comment.