From 89742b2a7eb7e4f5fcc3215de7bcbe69d4b5227a Mon Sep 17 00:00:00 2001 From: David Blajda Date: Thu, 9 Mar 2023 23:56:45 -0500 Subject: [PATCH] Implement pruning on partition columns (#1179) # Description Exposes partition columns in Datafusion's `PruningStatistics` which will reduce the number of files scanned when the table is queried. This also resolves another partition issues where involving `null` partitions. Previously `ScalarValue::Null` was used which would cause an error when the actual datatype was obtained from the physical parquet files. # Related Issue(s) - closes #1175 --- rust/src/builder.rs | 9 +- rust/src/delta_datafusion.rs | 170 ++++++++++---- rust/src/writer/record_batch.rs | 1 - rust/tests/datafusion_test.rs | 397 ++++++++++++++++++++++++++++---- 4 files changed, 486 insertions(+), 91 deletions(-) diff --git a/rust/src/builder.rs b/rust/src/builder.rs index 072403e687..ed2f207ddf 100644 --- a/rust/src/builder.rs +++ b/rust/src/builder.rs @@ -37,9 +37,10 @@ impl From for DeltaTableError { } /// possible version specifications for loading a delta table -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Default)] pub enum DeltaVersion { /// load the newest version + #[default] Newest, /// specify the version to load Version(DeltaDataTypeVersion), @@ -47,12 +48,6 @@ pub enum DeltaVersion { Timestamp(DateTime), } -impl Default for DeltaVersion { - fn default() -> Self { - DeltaVersion::Newest - } -} - /// Configuration options for delta table #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index 29ec01d121..465347d2a6 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -57,9 +57,9 @@ use object_store::{path::Path, ObjectMeta}; use url::Url; use crate::builder::ensure_table_uri; -use crate::Invariant; -use crate::{action, open_table, open_table_with_storage_options}; +use crate::{action, open_table, open_table_with_storage_options, SchemaDataType}; use crate::{schema, DeltaTableBuilder}; +use crate::{DeltaResult, Invariant}; use crate::{DeltaTable, DeltaTableError}; impl From for DataFusionError { @@ -236,49 +236,59 @@ impl DeltaTable { } } +fn get_prune_stats(table: &DeltaTable, column: &Column, get_max: bool) -> Option { + let field = table + .get_schema() + .ok() + .map(|s| s.get_field_with_name(&column.name).ok())??; + + // See issue 1214. Binary type does not support natural order which is required for Datafusion to prune + if let SchemaDataType::primitive(t) = &field.get_type() { + if t == "binary" { + return None; + } + } + + let data_type = field.get_type().try_into().ok()?; + let partition_columns = &table.get_metadata().ok()?.partition_columns; + + let values = table.get_state().files().iter().map(|add| { + if partition_columns.contains(&column.name) { + let value = add.partition_values.get(&column.name).unwrap(); + let value = match value { + Some(v) => serde_json::Value::String(v.to_string()), + None => serde_json::Value::Null, + }; + to_correct_scalar_value(&value, &data_type).unwrap_or(ScalarValue::Null) + } else if let Ok(Some(statistics)) = add.get_stats() { + let values = if get_max { + statistics.max_values + } else { + statistics.min_values + }; + + values + .get(&column.name) + .and_then(|f| to_correct_scalar_value(f.as_value()?, &data_type)) + .unwrap_or(ScalarValue::Null) + } else { + ScalarValue::Null + } + }); + ScalarValue::iter_to_array(values).ok() +} + impl PruningStatistics for DeltaTable { /// return the minimum values for the named column, if known. /// Note: the returned array must contain `num_containers()` rows fn min_values(&self, column: &Column) -> Option { - let field = self - .get_schema() - .ok() - .map(|s| s.get_field_with_name(&column.name).ok())??; - let data_type = field.get_type().try_into().ok()?; - let values = self.get_state().files().iter().map(|add| { - if let Ok(Some(statistics)) = add.get_stats() { - statistics - .min_values - .get(&column.name) - .and_then(|f| to_correct_scalar_value(f.as_value()?, &data_type)) - .unwrap_or(ScalarValue::Null) - } else { - ScalarValue::Null - } - }); - ScalarValue::iter_to_array(values).ok() + get_prune_stats(self, column, false) } /// return the maximum values for the named column, if known. /// Note: the returned array must contain `num_containers()` rows. fn max_values(&self, column: &Column) -> Option { - let field = self - .get_schema() - .ok() - .map(|s| s.get_field_with_name(&column.name).ok())??; - let data_type = field.get_type().try_into().ok()?; - let values = self.get_state().files().iter().map(|add| { - if let Ok(Some(statistics)) = add.get_stats() { - statistics - .max_values - .get(&column.name) - .and_then(|f| to_correct_scalar_value(f.as_value()?, &data_type)) - .unwrap_or(ScalarValue::Null) - } else { - ScalarValue::Null - } - }); - ScalarValue::iter_to_array(values).ok() + get_prune_stats(self, column, true) } /// return the number of containers (e.g. row groups) being @@ -292,13 +302,29 @@ impl PruningStatistics for DeltaTable { /// /// Note: the returned array must contain `num_containers()` rows. fn null_counts(&self, column: &Column) -> Option { + let partition_columns = &self.get_metadata().ok()?.partition_columns; + let values = self.get_state().files().iter().map(|add| { if let Ok(Some(statistics)) = add.get_stats() { - statistics - .null_count - .get(&column.name) - .map(|f| ScalarValue::UInt64(f.as_value().map(|val| val as u64))) - .unwrap_or(ScalarValue::UInt64(None)) + if partition_columns.contains(&column.name) { + let value = add.partition_values.get(&column.name).unwrap(); + match value { + Some(_) => ScalarValue::UInt64(Some(0)), + None => ScalarValue::UInt64(Some(statistics.num_records as u64)), + } + } else { + statistics + .null_count + .get(&column.name) + .map(|f| ScalarValue::UInt64(f.as_value().map(|val| val as u64))) + .unwrap_or(ScalarValue::UInt64(None)) + } + } else if partition_columns.contains(&column.name) { + let value = add.partition_values.get(&column.name).unwrap(); + match value { + Some(_) => ScalarValue::UInt64(Some(0)), + None => ScalarValue::UInt64(None), + } } else { ScalarValue::UInt64(None) } @@ -510,6 +536,64 @@ impl ExecutionPlan for DeltaScan { } } +fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult { + match t { + ArrowDataType::Null => Ok(ScalarValue::Null), + ArrowDataType::Boolean => Ok(ScalarValue::Boolean(None)), + ArrowDataType::Int8 => Ok(ScalarValue::Int8(None)), + ArrowDataType::Int16 => Ok(ScalarValue::Int16(None)), + ArrowDataType::Int32 => Ok(ScalarValue::Int32(None)), + ArrowDataType::Int64 => Ok(ScalarValue::Int64(None)), + ArrowDataType::UInt8 => Ok(ScalarValue::UInt8(None)), + ArrowDataType::UInt16 => Ok(ScalarValue::UInt16(None)), + ArrowDataType::UInt32 => Ok(ScalarValue::UInt32(None)), + ArrowDataType::UInt64 => Ok(ScalarValue::UInt64(None)), + ArrowDataType::Float32 => Ok(ScalarValue::Float32(None)), + ArrowDataType::Float64 => Ok(ScalarValue::Float64(None)), + ArrowDataType::Date32 => Ok(ScalarValue::Date32(None)), + ArrowDataType::Date64 => Ok(ScalarValue::Date64(None)), + ArrowDataType::Binary => Ok(ScalarValue::Binary(None)), + ArrowDataType::FixedSizeBinary(size) => { + Ok(ScalarValue::FixedSizeBinary(size.to_owned(), None)) + } + ArrowDataType::LargeBinary => Ok(ScalarValue::LargeBinary(None)), + ArrowDataType::Utf8 => Ok(ScalarValue::Utf8(None)), + ArrowDataType::LargeUtf8 => Ok(ScalarValue::LargeUtf8(None)), + ArrowDataType::Decimal128(precision, scale) => Ok(ScalarValue::Decimal128( + None, + precision.to_owned(), + scale.to_owned(), + )), + ArrowDataType::Timestamp(unit, tz) => { + let tz = tz.to_owned(); + Ok(match unit { + TimeUnit::Second => ScalarValue::TimestampSecond(None, tz), + TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(None, tz), + TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(None, tz), + TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(None, tz), + }) + } + //Unsupported types... + ArrowDataType::Float16 + | ArrowDataType::Decimal256(_, _) + | ArrowDataType::Union(_, _, _) + | ArrowDataType::Dictionary(_, _) + | ArrowDataType::LargeList(_) + | ArrowDataType::Struct(_) + | ArrowDataType::List(_) + | ArrowDataType::FixedSizeList(_, _) + | ArrowDataType::Time32(_) + | ArrowDataType::Time64(_) + | ArrowDataType::Duration(_) + | ArrowDataType::Interval(_) + | ArrowDataType::RunEndEncoded(_, _) + | ArrowDataType::Map(_, _) => Err(DeltaTableError::Generic(format!( + "Unsupported data type for Delta Lake {}", + t + ))), + } +} + fn partitioned_file_from_action(action: &action::Add, schema: &ArrowSchema) -> PartitionedFile { let partition_values = schema .fields() @@ -521,7 +605,7 @@ fn partitioned_file_from_action(action: &action::Add, schema: &ArrowSchema) -> P f.data_type(), ) .unwrap_or(ScalarValue::Null), - None => ScalarValue::Null, + None => get_null_of_arrow_type(f.data_type()).unwrap_or(ScalarValue::Null), }) }) .collect::>(); @@ -570,7 +654,7 @@ fn to_correct_scalar_value( match stat_val { serde_json::Value::Array(_) => None, serde_json::Value::Object(_) => None, - serde_json::Value::Null => None, + serde_json::Value::Null => get_null_of_arrow_type(field_dt).ok(), serde_json::Value::String(string_val) => match field_dt { ArrowDataType::Timestamp(_, _) => { let time_nanos = ScalarValue::try_from_string( diff --git a/rust/src/writer/record_batch.rs b/rust/src/writer/record_batch.rs index d5a0c1a320..4e0bd38833 100644 --- a/rust/src/writer/record_batch.rs +++ b/rust/src/writer/record_batch.rs @@ -382,7 +382,6 @@ pub(crate) fn divide_by_partition_values( // get row indices for current partition let idx: UInt32Array = (range.start..range.end) .map(|i| Some(indices.value(i))) - .into_iter() .collect(); let partition_key_iter = sorted_partition_columns.iter().map(|c| { diff --git a/rust/tests/datafusion_test.rs b/rust/tests/datafusion_test.rs index e3092aee67..274c36031e 100644 --- a/rust/tests/datafusion_test.rs +++ b/rust/tests/datafusion_test.rs @@ -5,17 +5,20 @@ use std::path::PathBuf; use std::sync::Arc; use arrow::array::*; -use arrow::datatypes::{DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema}; +use arrow::datatypes::{ + DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, TimeUnit, +}; use arrow::record_batch::RecordBatch; use common::datafusion::context_with_delta_table_factory; use datafusion::assert_batches_sorted_eq; use datafusion::datasource::TableProvider; -use datafusion::execution::context::{SessionContext, TaskContext}; +use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::{common::collect, file_format::ParquetExec, metrics::Label}; use datafusion::physical_plan::{visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor}; use datafusion_common::scalar::ScalarValue; -use datafusion_common::{Column, DataFusionError, Result}; +use datafusion_common::ScalarValue::*; +use datafusion_common::{DataFusionError, Result}; use datafusion_expr::Expr; use deltalake::action::SaveMode; @@ -61,7 +64,8 @@ impl ExecutionPlanVisitor for ExecutionMetricsCollector { async fn prepare_table( batches: Vec, save_mode: SaveMode, -) -> (tempfile::TempDir, Arc) { + partitions: Vec, +) -> (tempfile::TempDir, DeltaTable) { let table_dir = tempfile::tempdir().unwrap(); let table_path = table_dir.path(); let table_uri = table_path.to_str().unwrap().to_string(); @@ -73,6 +77,7 @@ async fn prepare_table( .create() .with_save_mode(SaveMode::Ignore) .with_columns(table_schema.get_fields().clone()) + .with_partition_columns(partitions) .await .unwrap(); @@ -84,7 +89,7 @@ async fn prepare_table( .unwrap(); } - (table_dir, Arc::new(table)) + (table_dir, table) } #[tokio::test] @@ -284,51 +289,363 @@ async fn test_datafusion_stats() -> Result<()> { Ok(()) } -#[tokio::test] -async fn test_files_scanned() -> Result<()> { - let arrow_schema = Arc::new(ArrowSchema::new(vec![ - ArrowField::new("id", ArrowDataType::Int32, true), - ArrowField::new("string", ArrowDataType::Utf8, true), - ])); - let columns_1: Vec = vec![ - Arc::new(Int32Array::from(vec![Some(1), Some(2)])), - Arc::new(StringArray::from(vec![Some("hello"), Some("world")])), - ]; - let columns_2: Vec = vec![ - Arc::new(Int32Array::from(vec![Some(10), Some(20)])), - Arc::new(StringArray::from(vec![Some("hello"), Some("world")])), - ]; - let batches = vec![ - RecordBatch::try_new(arrow_schema.clone(), columns_1)?, - RecordBatch::try_new(arrow_schema.clone(), columns_2)?, +async fn get_scan_metrics( + table: &DeltaTable, + state: &SessionState, + e: &[Expr], +) -> Result { + let mut metrics = ExecutionMetricsCollector::default(); + let scan = table.scan(state, None, e, None).await?; + if scan.output_partitioning().partition_count() > 0 { + let plan = CoalescePartitionsExec::new(scan); + let task_ctx = Arc::new(TaskContext::from(state)); + let _result = collect(plan.execute(0, task_ctx)?).await?; + visit_execution_plan(&plan, &mut metrics).unwrap(); + } + + return Ok(metrics); +} + +fn create_all_types_batch(not_null_rows: usize, null_rows: usize, offset: usize) -> RecordBatch { + let mut decimal_builder = Decimal128Builder::with_capacity(not_null_rows + null_rows); + for x in 0..not_null_rows { + decimal_builder.append_value(((x + offset) * 100) as i128); + } + decimal_builder.append_nulls(null_rows); + let decimal = decimal_builder + .finish() + .with_precision_and_scale(10, 2) + .unwrap(); + + let data: Vec = vec![ + Arc::new(StringArray::from_iter( + (0..not_null_rows) + .map(|x| Some((x + offset).to_string())) + .chain((0..null_rows).map(|_| None)), + )), + Arc::new(Int64Array::from_iter( + (0..not_null_rows) + .map(|x| Some((x + offset) as i64)) + .chain((0..null_rows).map(|_| None)), + )), + Arc::new(Int32Array::from_iter( + (0..not_null_rows) + .map(|x| Some((x + offset) as i32)) + .chain((0..null_rows).map(|_| None)), + )), + Arc::new(Int16Array::from_iter( + (0..not_null_rows) + .map(|x| Some((x + offset) as i16)) + .chain((0..null_rows).map(|_| None)), + )), + Arc::new(Int8Array::from_iter( + (0..not_null_rows) + .map(|x| Some((x + offset) as i8)) + .chain((0..null_rows).map(|_| None)), + )), + Arc::new(Float64Array::from_iter( + (0..not_null_rows) + .map(|x| Some((x + offset) as f64)) + .chain((0..null_rows).map(|_| None)), + )), + Arc::new(Float32Array::from_iter( + (0..not_null_rows) + .map(|x| Some((x + offset) as f32)) + .chain((0..null_rows).map(|_| None)), + )), + Arc::new(BooleanArray::from_iter( + (0..not_null_rows) + .map(|x| Some((x + offset) % 2 == 0)) + .chain((0..null_rows).map(|_| None)), + )), + Arc::new(BinaryArray::from_iter( + (0..not_null_rows) + .map(|x| Some((x + offset).to_string().as_bytes().to_owned())) + .chain((0..null_rows).map(|_| None)), + )), + Arc::new(decimal), + //Convert to seconds + Arc::new(TimestampMicrosecondArray::from_iter( + (0..not_null_rows) + .map(|x| Some(((x + offset) * 1_000_000) as i64)) + .chain((0..null_rows).map(|_| None)), + )), + Arc::new(Date32Array::from_iter( + (0..not_null_rows) + .map(|x| Some((x + offset) as i32)) + .chain((0..null_rows).map(|_| None)), + )), ]; - let (_temp_dir, table) = prepare_table(batches, SaveMode::Append).await; - assert_eq!(table.version(), 2); + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("utf8", ArrowDataType::Utf8, true), + ArrowField::new("int64", ArrowDataType::Int64, true), + ArrowField::new("int32", ArrowDataType::Int32, true), + ArrowField::new("int16", ArrowDataType::Int16, true), + ArrowField::new("int8", ArrowDataType::Int8, true), + ArrowField::new("float64", ArrowDataType::Float64, true), + ArrowField::new("float32", ArrowDataType::Float32, true), + ArrowField::new("boolean", ArrowDataType::Boolean, true), + ArrowField::new("binary", ArrowDataType::Binary, true), + ArrowField::new("decimal", ArrowDataType::Decimal128(10, 2), true), + ArrowField::new( + "timestamp", + ArrowDataType::Timestamp(TimeUnit::Microsecond, None), + true, + ), + ArrowField::new("date", ArrowDataType::Date32, true), + ])); + + RecordBatch::try_new(schema, data).unwrap() +} + +struct TestCase { + column: &'static str, + file1_value: Expr, + file2_value: Expr, + file3_value: Expr, + non_existent_value: Expr, +} + +impl TestCase { + fn new(column: &'static str, expression_builder: F) -> Self + where + F: Fn(i64) -> Expr, + { + TestCase { + column, + file1_value: expression_builder(1), + file2_value: expression_builder(5), + file3_value: expression_builder(8), + non_existent_value: expression_builder(3), + } + } +} + +#[tokio::test] +async fn test_files_scanned() -> Result<()> { + // Validate that datafusion prunes files based on file statistics + // Do not scan files when we know it does not contain the requested records + use datafusion::prelude::*; let ctx = SessionContext::new(); - let plan = table.scan(&ctx.state(), None, &[], None).await?; - let plan = CoalescePartitionsExec::new(plan.clone()); + let state = ctx.state(); - let task_ctx = Arc::new(TaskContext::from(&ctx.state())); - let _ = collect(plan.execute(0, task_ctx)?).await?; + async fn append_to_table(table: DeltaTable, batch: RecordBatch) -> DeltaTable { + DeltaOps(table) + .write(vec![batch]) + .with_save_mode(SaveMode::Append) + .await + .unwrap() + } - let mut metrics = ExecutionMetricsCollector::default(); - visit_execution_plan(&plan, &mut metrics).unwrap(); - assert!(metrics.num_scanned_files() == 2); + let batch = create_all_types_batch(3, 0, 0); + let (_tmp, table) = prepare_table(vec![batch], SaveMode::Overwrite, vec![]).await; + + let batch = create_all_types_batch(3, 0, 4); + let table = append_to_table(table, batch).await; + + let batch = create_all_types_batch(3, 0, 7); + let table = append_to_table(table, batch).await; + + let metrics = get_scan_metrics(&table, &state, &[]).await?; + assert!(metrics.num_scanned_files() == 3); + + // (Column name, value from file 1, value from file 2, value from file 3, non existant value) + let tests = [ + TestCase::new("utf8", |value| lit(value.to_string())), + TestCase::new("int64", |value| lit(value)), + TestCase::new("int32", |value| lit(value as i32)), + TestCase::new("int16", |value| lit(value as i16)), + TestCase::new("int8", |value| lit(value as i8)), + TestCase::new("float64", |value| lit(value as f64)), + TestCase::new("float32", |value| lit(value as f32)), + TestCase::new("timestamp", |value| { + lit(ScalarValue::TimestampMicrosecond( + Some(value * 1_000_000), + None, + )) + }), + // TODO: I think decimal statistics are being written to the log incorrectly. The underlying i128 is written + // not the proper string representation as specified by the percision and scale + TestCase::new("decimal", |value| { + lit(Decimal128(Some((value * 100).into()), 10, 2)) + }), + // TODO: The writer does not write complete statistiics for date columns + TestCase::new("date", |value| lit(ScalarValue::Date32(Some(value as i32)))), + // TODO: The writer does not write complete statistics for binary columns + TestCase::new("binary", |value| lit(value.to_string().as_bytes())), + ]; - let filter = Expr::gt( - Expr::Column(Column::from_name("id")), - Expr::Literal(ScalarValue::Int32(Some(5))), - ); + for test in &tests { + let TestCase { + column, + file1_value, + file2_value, + file3_value, + non_existent_value, + } = test.to_owned(); + let column = column.to_owned(); + // TODO: The following types don't have proper stats written. + // See issue #1208 for decimal type + // See issue #1209 for dates + // Min and Max is not calculated for binary columns. This matches the Spark writer + if column == "decimal" || column == "date" || column == "binary" { + continue; + } + println!("Test Column: {}", column); + + // Equality + let e = col(column).eq(file1_value.clone()); + let metrics = get_scan_metrics(&table, &state, &[e]).await?; + assert_eq!(metrics.num_scanned_files(), 1); + + // Value does not exist + let e = col(column).eq(non_existent_value.clone()); + let metrics = get_scan_metrics(&table, &state, &[e]).await?; + assert_eq!(metrics.num_scanned_files(), 0); + + // Conjuction + let e = col(column) + .gt(file1_value.clone()) + .and(col(column).lt(file2_value.clone())); + let metrics = get_scan_metrics(&table, &state, &[e]).await?; + assert_eq!(metrics.num_scanned_files(), 2); + + // Disjunction + let e = col(column) + .lt(file1_value.clone()) + .or(col(column).gt(file3_value.clone())); + let metrics = get_scan_metrics(&table, &state, &[e]).await?; + assert_eq!(metrics.num_scanned_files(), 2); + } - let plan = CoalescePartitionsExec::new(table.scan(&ctx.state(), None, &[filter], None).await?); - let task_ctx = Arc::new(TaskContext::from(&ctx.state())); - let _result = collect(plan.execute(0, task_ctx)?).await?; + // Validate Boolean type + let batch = create_all_types_batch(1, 0, 0); + let (_tmp, table) = prepare_table(vec![batch], SaveMode::Overwrite, vec![]).await; + let batch = create_all_types_batch(1, 0, 1); + let table = append_to_table(table, batch).await; + + let e = col("boolean").eq(lit(true)); + let metrics = get_scan_metrics(&table, &state, &[e]).await?; + assert_eq!(metrics.num_scanned_files(), 1); + + let e = col("boolean").eq(lit(false)); + let metrics = get_scan_metrics(&table, &state, &[e]).await?; + assert_eq!(metrics.num_scanned_files(), 1); + + // Ensure that tables with stats and partition columns can be pruned + for test in tests { + let TestCase { + column, + file1_value, + file2_value, + file3_value, + non_existent_value, + } = test; + // TODO: Float and decimal partitions are not supported by the writer + // binary fails since arrow does not implement a natural order + // The current Datafusion pruning implementation does not work for binary columns since they do not have a natural order. See #1214 + // Timestamp and date are disabled since the hive path contains illegal Windows values. see #1215 + if column == "float32" + || column == "float64" + || column == "decimal" + || column == "binary" + || column == "timestamp" + || column == "date" + { + continue; + } + println!("test {}", column); + + let partitions = vec![column.to_owned()]; + let batch = create_all_types_batch(3, 0, 0); + let (_tmp, table) = prepare_table(vec![batch], SaveMode::Overwrite, partitions).await; + + let batch = create_all_types_batch(3, 0, 4); + let table = append_to_table(table, batch).await; + + let batch = create_all_types_batch(3, 0, 7); + let table = append_to_table(table, batch).await; + + // Equality + let e = col(column).eq(file1_value.clone()); + let metrics = get_scan_metrics(&table, &state, &[e]).await?; + assert_eq!(metrics.num_scanned_files(), 1); + + // Value does not exist + let e = col(column).eq(non_existent_value); + let metrics = get_scan_metrics(&table, &state, &[e]).await?; + assert_eq!(metrics.num_scanned_files(), 0); + + // Conjuction + let e = col(column) + .gt(file1_value.clone()) + .and(col(column).lt(file2_value)); + let metrics = get_scan_metrics(&table, &state, &[e]).await?; + assert_eq!(metrics.num_scanned_files(), 2); + + // Disjunction + let e = col(column).lt(file1_value).or(col(column).gt(file3_value)); + let metrics = get_scan_metrics(&table, &state, &[e]).await?; + assert_eq!(metrics.num_scanned_files(), 2); + + // Validate null pruning + let batch = create_all_types_batch(5, 2, 0); + let partitions = vec![column.to_owned()]; + let (_tmp, table) = prepare_table(vec![batch], SaveMode::Overwrite, partitions).await; + + let e = col(column).is_null(); + let metrics = get_scan_metrics(&table, &state, &[e]).await?; + assert_eq!(metrics.num_scanned_files(), 1); + + /* logically we should be able to prune the null partition but Datafusion's current implementation prevents this */ + /* + let e = col(column).is_not_null(); + let metrics = get_scan_metrics(&table, &state, &[e]).await?; + assert_eq!(metrics.num_scanned_files(), 5); + */ + } - let mut metrics = ExecutionMetricsCollector::default(); - visit_execution_plan(&plan, &mut metrics).unwrap(); + // Validate Boolean partition + let batch = create_all_types_batch(1, 0, 0); + let (_tmp, table) = + prepare_table(vec![batch], SaveMode::Overwrite, vec!["boolean".to_owned()]).await; + let batch = create_all_types_batch(1, 0, 1); + let table = append_to_table(table, batch).await; + + let e = col("boolean").eq(lit(true)); + let metrics = get_scan_metrics(&table, &state, &[e]).await?; + assert_eq!(metrics.num_scanned_files(), 1); + + let e = col("boolean").eq(lit(false)); + let metrics = get_scan_metrics(&table, &state, &[e]).await?; + assert_eq!(metrics.num_scanned_files(), 1); + + // Ensure that tables without stats and partition columns can be pruned for just partitions + let table = deltalake::open_table("./tests/data/delta-0.8.0-null-partition").await?; + + /* + // Logically this should prune. See above + let e = col("k").eq(lit("A")).and(col("k").is_not_null()); + let metrics = get_scan_metrics(&table, &state, &[e]).await.unwrap(); + println!("{:?}", metrics); assert!(metrics.num_scanned_files() == 1); + let e = col("k").eq(lit("B")); + let metrics = get_scan_metrics(&table, &state, &[e]).await?; + assert!(metrics.num_scanned_files() == 1); + */ + + // Check pruning for null partitions + let e = col("k").is_null(); + let metrics = get_scan_metrics(&table, &state, &[e]).await?; + assert!(metrics.num_scanned_files() == 1); + + // Check pruning for null partitions. Since there are no record count statistics pruning cannot be done + let e = col("k").is_not_null(); + let metrics = get_scan_metrics(&table, &state, &[e]).await?; + assert!(metrics.num_scanned_files() == 2); + Ok(()) }