diff --git a/rust/src/writer/utils.rs b/rust/src/writer/utils.rs index e7af707344..9f23b093a8 100644 --- a/rust/src/writer/utils.rs +++ b/rust/src/writer/utils.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use crate::writer::DeltaWriterError; use crate::DeltaTableError; -use arrow::array::{as_primitive_array, Array}; +use arrow::array::{as_boolean_array, as_primitive_array, Array}; use arrow::datatypes::{ DataType, Int16Type, Int32Type, Int64Type, Int8Type, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, UInt16Type, UInt32Type, UInt64Type, UInt8Type, @@ -137,6 +137,7 @@ pub(crate) fn stringified_partition_value( DataType::UInt16 => as_primitive_array::(arr).value(0).to_string(), DataType::UInt32 => as_primitive_array::(arr).value(0).to_string(), DataType::UInt64 => as_primitive_array::(arr).value(0).to_string(), + DataType::Boolean => as_boolean_array(arr).value(0).to_string(), DataType::Utf8 => { let data = arrow::array::as_string_array(arr); diff --git a/rust/tests/datafusion_test.rs b/rust/tests/datafusion_test.rs index 4529c7a5f9..095427004e 100644 --- a/rust/tests/datafusion_test.rs +++ b/rust/tests/datafusion_test.rs @@ -5,7 +5,10 @@ use std::path::PathBuf; use std::sync::Arc; use arrow::array::*; -use arrow::datatypes::{DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema}; +use arrow::compute::is_null; +use arrow::datatypes::{ + DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, TimeUnit, +}; use arrow::record_batch::RecordBatch; use common::datafusion::context_with_delta_table_factory; use datafusion::assert_batches_sorted_eq; @@ -15,11 +18,13 @@ use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::{common::collect, file_format::ParquetExec, metrics::Label}; use datafusion::physical_plan::{visit_execution_plan, ExecutionPlan, ExecutionPlanVisitor}; use datafusion_common::scalar::ScalarValue; +use datafusion_common::ScalarValue::*; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::Expr; use deltalake::action::SaveMode; use deltalake::{operations::DeltaOps, DeltaTable, Schema}; +use parquet::data_type::Decimal; mod common; @@ -57,7 +62,8 @@ impl ExecutionPlanVisitor for ExecutionMetricsCollector { async fn prepare_table( batches: Vec, save_mode: SaveMode, -) -> (tempfile::TempDir, Arc) { + partitions: Vec, +) -> (tempfile::TempDir, DeltaTable) { let table_dir = tempfile::tempdir().unwrap(); let table_path = table_dir.path(); let table_uri = table_path.to_str().unwrap().to_string(); @@ -69,6 +75,7 @@ async fn prepare_table( .create() .with_save_mode(SaveMode::Ignore) .with_columns(table_schema.get_fields().clone()) + .with_partition_columns(partitions) .await .unwrap(); @@ -80,7 +87,7 @@ async fn prepare_table( .unwrap(); } - (table_dir, Arc::new(table)) + (table_dir, table) } #[tokio::test] @@ -245,6 +252,99 @@ async fn get_scan_metrics( return Ok(metrics); } +fn create_all_types_batch(not_null_rows: usize, null_rows: usize, offset: usize) -> RecordBatch { + let mut decimal_builder = Decimal128Builder::with_capacity(not_null_rows + null_rows); + for x in 0..not_null_rows { + decimal_builder.append_value(((x + offset) * 100) as i128); + } + decimal_builder.append_nulls(null_rows); + let decimal = decimal_builder + .finish() + .with_precision_and_scale(10, 2) + .unwrap(); + + let data: Vec = vec![ + Arc::new(StringArray::from_iter( + (0..not_null_rows) + .map(|x| Some((x + offset).to_string())) + .chain((0..null_rows).map(|_| None)), + )), + Arc::new(Int64Array::from_iter( + (0..not_null_rows) + .map(|x| Some((x + offset) as i64)) + .chain((0..null_rows).map(|_| None)), + )), + Arc::new(Int32Array::from_iter( + (0..not_null_rows) + .map(|x| Some((x + offset) as i32)) + .chain((0..null_rows).map(|_| None)), + )), + Arc::new(Int16Array::from_iter( + (0..not_null_rows) + .map(|x| Some((x + offset) as i16)) + .chain((0..null_rows).map(|_| None)), + )), + Arc::new(Int8Array::from_iter( + (0..not_null_rows) + .map(|x| Some((x + offset) as i8)) + .chain((0..null_rows).map(|_| None)), + )), + Arc::new(Float64Array::from_iter( + (0..not_null_rows) + .map(|x| Some((x + offset) as f64)) + .chain((0..null_rows).map(|_| None)), + )), + Arc::new(Float32Array::from_iter( + (0..not_null_rows) + .map(|x| Some((x + offset) as f32)) + .chain((0..null_rows).map(|_| None)), + )), + Arc::new(BooleanArray::from_iter( + (0..not_null_rows) + .map(|x| Some((x + offset) % 2 == 0)) + .chain((0..null_rows).map(|_| None)), + )), + Arc::new(BinaryArray::from_iter( + (0..not_null_rows) + .map(|x| Some((x + offset).to_string().as_bytes().to_owned())) + .chain((0..null_rows).map(|_| None)), + )), + Arc::new(decimal), + //Convert to seconds + Arc::new(TimestampMicrosecondArray::from_iter( + (0..not_null_rows) + .map(|x| Some(((x + offset) * 1_000_000) as i64)) + .chain((0..null_rows).map(|_| None)), + )), + Arc::new(Date32Array::from_iter( + (0..not_null_rows) + .map(|x| Some((x + offset) as i32)) + .chain((0..null_rows).map(|_| None)), + )), + ]; + + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("utf8", ArrowDataType::Utf8, true), + ArrowField::new("int64", ArrowDataType::Int64, true), + ArrowField::new("int32", ArrowDataType::Int32, true), + ArrowField::new("int16", ArrowDataType::Int16, true), + ArrowField::new("int8", ArrowDataType::Int8, true), + ArrowField::new("float64", ArrowDataType::Float64, true), + ArrowField::new("float32", ArrowDataType::Float32, true), + ArrowField::new("boolean", ArrowDataType::Boolean, true), + ArrowField::new("binary", ArrowDataType::Binary, true), + ArrowField::new("decimal", ArrowDataType::Decimal128(10, 2), true), + ArrowField::new( + "timestamp", + ArrowDataType::Timestamp(TimeUnit::Microsecond, None), + true, + ), + ArrowField::new("date", ArrowDataType::Date32, true), + ])); + + RecordBatch::try_new(schema, data).unwrap() +} + #[tokio::test] async fn test_files_scanned() -> Result<()> { // Validate that datafusion prunes files based on file statistics @@ -253,37 +353,239 @@ async fn test_files_scanned() -> Result<()> { let ctx = SessionContext::new(); let state = ctx.state(); - let arrow_schema = Arc::new(ArrowSchema::new(vec![ - ArrowField::new("id", ArrowDataType::Int32, true), - ArrowField::new("string", ArrowDataType::Utf8, true), - ])); - let columns_1: Vec = vec![ - Arc::new(Int32Array::from(vec![Some(1), Some(2)])), - Arc::new(StringArray::from(vec![Some("hello"), Some("world")])), - ]; - let columns_2: Vec = vec![ - Arc::new(Int32Array::from(vec![Some(10), Some(20)])), - Arc::new(StringArray::from(vec![Some("hello"), Some("world")])), - ]; - let batches = vec![ - RecordBatch::try_new(arrow_schema.clone(), columns_1)?, - RecordBatch::try_new(arrow_schema.clone(), columns_2)?, - ]; - let (_temp_dir, table) = prepare_table(batches, SaveMode::Append).await; - assert_eq!(table.version(), 2); + async fn append_to_table(table: DeltaTable, batch: RecordBatch) -> DeltaTable { + DeltaOps(table) + .write(vec![batch]) + .with_save_mode(SaveMode::Append) + .await + .unwrap() + } + + fn to_binary(s: &str) -> Vec { + s.as_bytes().to_owned() + } + + let batch = create_all_types_batch(3, 0, 0); + let (_tmp, table) = prepare_table(vec![batch], SaveMode::Overwrite, vec![]).await; + + let batch = create_all_types_batch(3, 0, 4); + let table = append_to_table(table, batch).await; + + let batch = create_all_types_batch(3, 0, 7); + let table = append_to_table(table, batch).await; let metrics = get_scan_metrics(&table, &state, &[]).await?; - assert!(metrics.num_scanned_files() == 2); + assert!(metrics.num_scanned_files() == 3); + + // (Column name, value from file 1, value from file 2, value from file 3, non existant value) + let tests = [ + ("utf8", lit("1"), lit("5"), lit("8"), lit("3")), + ( + "int64", + lit(1 as i64), + lit(5 as i64), + lit(8 as i64), + lit(3 as i64), + ), + ( + "int32", + lit(1 as i32), + lit(5 as i32), + lit(8 as i32), + lit(3 as i32), + ), + ( + "int16", + lit(1 as i16), + lit(5 as i16), + lit(8 as i16), + lit(3 as i16), + ), + ( + "int8", + lit(1 as i8), + lit(5 as i8), + lit(8 as i8), + lit(3 as i8), + ), + ( + "float64", + lit(1 as f64), + lit(5 as f64), + lit(8 as f64), + lit(3 as f64), + ), + ( + "float32", + lit(1 as f32), + lit(5 as f32), + lit(8 as f32), + lit(3 as f32), + ), + // TODO: I think decimal statistics are being written to the log incorrectly. The underlying i128 is written + // not the proper string representation as specified by the percision and scale + ( + "decimal", + lit(Decimal128(Some(100), 10, 2)), + lit(Decimal128(Some(500), 10, 2)), + lit(Decimal128(Some(800), 10, 2)), + lit(Decimal128(Some(300), 10, 2)), + ), + ( + "timestamp", + lit(ScalarValue::TimestampMicrosecond(Some(1 * 1_000_000), None)), + lit(ScalarValue::TimestampMicrosecond(Some(5 * 1_000_000), None)), + lit(ScalarValue::TimestampMicrosecond(Some(8 * 1_000_000), None)), + lit(ScalarValue::TimestampMicrosecond(Some(3 * 1_000_000), None)), + ), + // TODO: The writer does not write complete statistiics for date columns + ( + "date", + lit(ScalarValue::Date32(Some(1))), + lit(ScalarValue::Date32(Some(5))), + lit(ScalarValue::Date32(Some(8))), + lit(ScalarValue::Date32(Some(3))), + ), + // TODO: The writer does not write complete statistics for binary columns + ( + "binary", + lit(to_binary("1")), + lit(to_binary("5")), + lit(to_binary("8")), + lit(to_binary("3")), + ), + ]; + + for test in &tests { + let test = test.to_owned(); + //TODO: The following types either have proper stats written. + if test.0 == "decimal" || test.0 == "date" || test.0 == "binary" { + continue; + } + println!("Test Column: {}", test.0); + + // Equality + let e = col(test.0).eq(test.1.clone()); + let metrics = get_scan_metrics(&table, &state, &[e]).await?; + assert_eq!(metrics.num_scanned_files(), 1); + + // Value does not exist + let e = col(test.0).eq(test.4); + let metrics = get_scan_metrics(&table, &state, &[e]).await?; + assert_eq!(metrics.num_scanned_files(), 0); + + // Conjuction + let e = col(test.0).gt(test.1.clone()).and(col(test.0).lt(test.2)); + let metrics = get_scan_metrics(&table, &state, &[e]).await?; + assert_eq!(metrics.num_scanned_files(), 2); + + // Disjunction + let e = col(test.0).lt(test.1).or(col(test.0).gt(test.3)); + let metrics = get_scan_metrics(&table, &state, &[e]).await?; + assert_eq!(metrics.num_scanned_files(), 2); + } + - let e = col("id").gt(lit(5)); + // Validate Boolean type + let batch = create_all_types_batch(1, 0, 0); + let (_tmp, table) = prepare_table(vec![batch], SaveMode::Overwrite, vec![]).await; + let batch = create_all_types_batch(1, 0, 1); + let table = append_to_table(table, batch).await; + + let e = col("boolean").eq(lit(true)); let metrics = get_scan_metrics(&table, &state, &[e]).await?; - assert!(metrics.num_scanned_files() == 1); + assert_eq!(metrics.num_scanned_files(), 1); + + let e = col("boolean").eq(lit(false)); + let metrics = get_scan_metrics(&table, &state, &[e]).await?; + assert_eq!(metrics.num_scanned_files(), 1); + + + // Ensure that tables with stats and partition columns can be pruned + for test in tests { + //TODO: Float, timestamp, decimal, date, binary partitions are not supported by the writer + if test.0 == "float32" + || test.0 == "float64" + || test.0 == "timestamp" + || test.0 == "decimal" + || test.0 == "date" + || test.0 == "binary" + { + continue; + } + + println!("test {}", test.0); + + let partitions = vec![test.0.to_owned()]; + let batch = create_all_types_batch(3, 0, 0); + let (_tmp, table) = prepare_table(vec![batch], SaveMode::Overwrite, partitions).await; + + let batch = create_all_types_batch(3, 0, 4); + let table = append_to_table(table, batch).await; + + let batch = create_all_types_batch(3, 0, 7); + let table = append_to_table(table, batch).await; + + // Equality + let e = col(test.0).eq(test.1.clone()); + let metrics = get_scan_metrics(&table, &state, &[e]).await?; + assert_eq!(metrics.num_scanned_files(), 1); + + // Value does not exist + let e = col(test.0).eq(test.4); + let metrics = get_scan_metrics(&table, &state, &[e]).await?; + assert_eq!(metrics.num_scanned_files(), 0); + + // Conjuction + let e = col(test.0).gt(test.1.clone()).and(col(test.0).lt(test.2)); + let metrics = get_scan_metrics(&table, &state, &[e]).await?; + assert_eq!(metrics.num_scanned_files(), 2); + + // Disjunction + let e = col(test.0).lt(test.1).or(col(test.0).gt(test.3)); + let metrics = get_scan_metrics(&table, &state, &[e]).await?; + assert_eq!(metrics.num_scanned_files(), 2); + + // Validate null pruning + let batch = create_all_types_batch(5, 2, 0); + let partitions = vec![test.0.to_owned()]; + let (_tmp, table) = prepare_table(vec![batch], SaveMode::Overwrite, partitions).await; + + let e = col(test.0).is_null(); + let metrics = get_scan_metrics(&table, &state, &[e]).await?; + assert_eq!(metrics.num_scanned_files(), 1); + + /* logically we should be able to prune the null partition but Datafusion's current implementation prevents this */ + /* + let e = col(value.0).is_not_null(); + let metrics = get_scan_metrics(&table, &state, &[e]).await?; + if value.0 == "boolean" { + assert_eq!(metrics.num_scanned_files(), 2); + } else { + assert_eq!(metrics.num_scanned_files(), 5); + } + */ + } + + // Validate Boolean partition + let batch = create_all_types_batch(1, 0, 0); + let (_tmp, table) = prepare_table(vec![batch], SaveMode::Overwrite, vec!["boolean".to_owned()]).await; + let batch = create_all_types_batch(1, 0, 1); + let table = append_to_table(table, batch).await; + + let e = col("boolean").eq(lit(true)); + let metrics = get_scan_metrics(&table, &state, &[e]).await?; + assert_eq!(metrics.num_scanned_files(), 1); + + let e = col("boolean").eq(lit(false)); + let metrics = get_scan_metrics(&table, &state, &[e]).await?; + assert_eq!(metrics.num_scanned_files(), 1); // Ensure that tables without stats and partition columns can be pruned for just partitions let table = deltalake::open_table("./tests/data/delta-0.8.0-null-partition").await?; /* - // Logically this should prune... Might require an update on datafusion + // Logically this should prune. See above let e = col("k").eq(lit("A")).and(col("k").is_not_null()); let metrics = get_scan_metrics(&table, &state, &[e]).await.unwrap(); println!("{:?}", metrics); @@ -304,25 +606,6 @@ async fn test_files_scanned() -> Result<()> { let metrics = get_scan_metrics(&table, &state, &[e]).await?; assert!(metrics.num_scanned_files() == 2); - // Ensure that tables with stats and partition columns can be pruned - let table = deltalake::open_table("./tests/data/delta-2.2.0-partitioned-types").await?; - - let e = col("c1").eq(lit(1)); - let metrics = get_scan_metrics(&table, &state, &[e]).await?; - assert!(metrics.num_scanned_files() == 0); - - let e = col("c1").eq(lit(4)); - let metrics = get_scan_metrics(&table, &state, &[e]).await?; - assert!(metrics.num_scanned_files() == 1); - - let e = col("c3").eq(lit(4)); - let metrics = get_scan_metrics(&table, &state, &[e]).await?; - assert!(metrics.num_scanned_files() == 1); - - let e = col("c3").eq(lit(0)); - let metrics = get_scan_metrics(&table, &state, &[e]).await?; - assert!(metrics.num_scanned_files() == 0); - Ok(()) }