From 3e6a537829e43f3cc1937ecafac6f2c085b6363e Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 14 Jun 2024 17:47:19 -0400 Subject: [PATCH 1/2] Update ListingTable to use StatisticsConverter --- datafusion/common/src/stats.rs | 18 ++ .../src/datasource/file_format/parquet.rs | 272 +++++++++--------- .../physical_plan/parquet/statistics.rs | 9 +- datafusion/core/src/datasource/statistics.rs | 48 +--- .../sqllogictest/test_files/explain.slt | 18 ++ 5 files changed, 178 insertions(+), 187 deletions(-) diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index 6cefef8d0eb5..52768a556c80 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -344,6 +344,24 @@ impl ColumnStatistics { distinct_count: Precision::Absent, } } + + /// Set the null count for the column + pub fn with_null_count(mut self, null_count: Precision) -> Self { + self.null_count = null_count; + self + } + + /// Set the max value for the column + pub fn with_max_value(mut self, max_value: Precision) -> Self { + self.max_value = max_value; + self + } + + /// Set the min value for the column + pub fn with_min_value(mut self, min_value: Precision) -> Self { + self.min_value = min_value; + self + } } #[cfg(test)] diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 572904254fd7..bde7c5d3f260 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -17,6 +17,9 @@ //! [`ParquetFormat`]: Parquet [`FileFormat`] abstractions +use arrow_array::cast::AsArray; +use arrow_array::types::UInt64Type; +use arrow_array::{Array, ArrayRef, UInt64Array}; use std::any::Any; use std::fmt; use std::fmt::Debug; @@ -25,16 +28,13 @@ use std::sync::Arc; use super::write::demux::start_demuxer_task; use super::write::{create_writer, SharedBuffer}; use super::{FileFormat, FileScanConfig}; -use crate::arrow::array::{ - BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, -}; -use crate::arrow::datatypes::{DataType, Fields, Schema, SchemaRef}; +use crate::arrow::array::RecordBatch; +use crate::arrow::datatypes::{Fields, Schema, SchemaRef}; use crate::datasource::file_format::file_compression_type::FileCompressionType; use crate::datasource::physical_plan::{FileGroupDisplay, FileSinkConfig}; use crate::datasource::schema_adapter::{ DefaultSchemaAdapterFactory, SchemaAdapterFactory, }; -use crate::datasource::statistics::{create_max_min_accs, get_col_stats}; use crate::error::Result; use crate::execution::context::SessionState; use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; @@ -48,7 +48,8 @@ use datafusion_common::config::TableParquetOptions; use datafusion_common::file_options::parquet_writer::ParquetWriterOptions; use datafusion_common::stats::Precision; use datafusion_common::{ - exec_err, internal_datafusion_err, not_impl_err, DataFusionError, + exec_err, internal_datafusion_err, not_impl_err, ColumnStatistics, DataFusionError, + ScalarValue, }; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::TaskContext; @@ -68,14 +69,15 @@ use parquet::arrow::{ use parquet::file::footer::{decode_footer, decode_metadata}; use parquet::file::metadata::ParquetMetaData; use parquet::file::properties::WriterProperties; -use parquet::file::statistics::Statistics as ParquetStatistics; use parquet::file::writer::SerializedFileWriter; use parquet::format::FileMetaData; use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::task::JoinSet; -use crate::datasource::physical_plan::parquet::ParquetExecBuilder; +use crate::datasource::physical_plan::parquet::{ + ParquetExecBuilder, StatisticsConverter, +}; use futures::{StreamExt, TryStreamExt}; use hashbrown::HashMap; use object_store::path::Path; @@ -295,86 +297,6 @@ impl FileFormat for ParquetFormat { } } -fn summarize_min_max( - max_values: &mut [Option], - min_values: &mut [Option], - fields: &Fields, - i: usize, - stat: &ParquetStatistics, -) { - if !stat.has_min_max_set() { - max_values[i] = None; - min_values[i] = None; - return; - } - match stat { - ParquetStatistics::Boolean(s) if DataType::Boolean == *fields[i].data_type() => { - if let Some(max_value) = &mut max_values[i] { - max_value - .update_batch(&[Arc::new(BooleanArray::from(vec![*s.max()]))]) - .unwrap_or_else(|_| max_values[i] = None); - } - if let Some(min_value) = &mut min_values[i] { - min_value - .update_batch(&[Arc::new(BooleanArray::from(vec![*s.min()]))]) - .unwrap_or_else(|_| min_values[i] = None); - } - } - ParquetStatistics::Int32(s) if DataType::Int32 == *fields[i].data_type() => { - if let Some(max_value) = &mut max_values[i] { - max_value - .update_batch(&[Arc::new(Int32Array::from_value(*s.max(), 1))]) - .unwrap_or_else(|_| max_values[i] = None); - } - if let Some(min_value) = &mut min_values[i] { - min_value - .update_batch(&[Arc::new(Int32Array::from_value(*s.min(), 1))]) - .unwrap_or_else(|_| min_values[i] = None); - } - } - ParquetStatistics::Int64(s) if DataType::Int64 == *fields[i].data_type() => { - if let Some(max_value) = &mut max_values[i] { - max_value - .update_batch(&[Arc::new(Int64Array::from_value(*s.max(), 1))]) - .unwrap_or_else(|_| max_values[i] = None); - } - if let Some(min_value) = &mut min_values[i] { - min_value - .update_batch(&[Arc::new(Int64Array::from_value(*s.min(), 1))]) - .unwrap_or_else(|_| min_values[i] = None); - } - } - ParquetStatistics::Float(s) if DataType::Float32 == *fields[i].data_type() => { - if let Some(max_value) = &mut max_values[i] { - max_value - .update_batch(&[Arc::new(Float32Array::from(vec![*s.max()]))]) - .unwrap_or_else(|_| max_values[i] = None); - } - if let Some(min_value) = &mut min_values[i] { - min_value - .update_batch(&[Arc::new(Float32Array::from(vec![*s.min()]))]) - .unwrap_or_else(|_| min_values[i] = None); - } - } - ParquetStatistics::Double(s) if DataType::Float64 == *fields[i].data_type() => { - if let Some(max_value) = &mut max_values[i] { - max_value - .update_batch(&[Arc::new(Float64Array::from(vec![*s.max()]))]) - .unwrap_or_else(|_| max_values[i] = None); - } - if let Some(min_value) = &mut min_values[i] { - min_value - .update_batch(&[Arc::new(Float64Array::from(vec![*s.min()]))]) - .unwrap_or_else(|_| min_values[i] = None); - } - } - _ => { - max_values[i] = None; - min_values[i] = None; - } - } -} - /// Fetches parquet metadata from ObjectStore for given object /// /// This component is a subject to **change** in near future and is exposed for low level integrations @@ -482,73 +404,139 @@ pub async fn statistics_from_parquet_meta( file_metadata.key_value_metadata(), )?; - let num_fields = table_schema.fields().len(); - let fields = table_schema.fields(); - let mut num_rows = 0; let mut total_byte_size = 0; - let mut null_counts = vec![Precision::Exact(0); num_fields]; - let mut has_statistics = false; + + for row_group_meta in metadata.row_groups() { + num_rows += row_group_meta.num_rows(); + total_byte_size += row_group_meta.total_byte_size(); + } let schema_adapter = DefaultSchemaAdapterFactory::default().create(table_schema.clone()); - let (mut max_values, mut min_values) = create_max_min_accs(&table_schema); + // statistics for each of the table's columns (may be different from the + // file schema) + let mut column_statistics = vec![]; + + for (table_idx, field) in table_schema.fields().iter().enumerate() { + let Some(file_idx) = schema_adapter.map_column_index(table_idx, &file_schema) + else { + // file columns not in table schema are treated as all null + let null_count = Precision::Exact(num_rows as usize); + let null_value = ScalarValue::try_from(field.data_type())?; + let stats = ColumnStatistics::new_unknown() + .with_null_count(null_count) + .with_max_value(Precision::Exact(null_value.clone())) + .with_min_value(Precision::Exact(null_value)); + column_statistics.push(stats); + continue; + }; - for row_group_meta in metadata.row_groups() { - num_rows += row_group_meta.num_rows(); - total_byte_size += row_group_meta.total_byte_size(); + let file_field = file_schema.field(file_idx); + let Some(converter) = StatisticsConverter::try_new( + file_field.name(), + &file_schema, + file_metadata.schema_descr(), + ) + .ok() else { + // problem extracting statistics, bail out + column_statistics.push(ColumnStatistics::new_unknown()); + continue; + }; - let mut column_stats: HashMap = HashMap::new(); + let null_counts = converter.row_group_null_counts(metadata.row_groups())?; + let null_count = accumulate_null_counts(null_counts.as_primitive::()); - for (i, column) in row_group_meta.columns().iter().enumerate() { - if let Some(stat) = column.statistics() { - has_statistics = true; - column_stats.insert(i, (stat.null_count(), stat)); - } - } + let maxes = converter.row_group_maxes(metadata.row_groups())?; + let max_value = + accumulate_column(MaxAccumulator::try_new(field.data_type()).ok(), maxes); - if has_statistics { - for (table_idx, null_cnt) in null_counts.iter_mut().enumerate() { - if let Some(file_idx) = - schema_adapter.map_column_index(table_idx, &file_schema) - { - if let Some((null_count, stats)) = column_stats.get(&file_idx) { - *null_cnt = null_cnt.add(&Precision::Exact(*null_count as usize)); - summarize_min_max( - &mut max_values, - &mut min_values, - fields, - table_idx, - stats, - ) - } else { - // If none statistics of current column exists, set the Max/Min Accumulator to None. - max_values[table_idx] = None; - min_values[table_idx] = None; - } - } else { - *null_cnt = null_cnt.add(&Precision::Exact(num_rows as usize)); - } - } - } - } + let mins = converter.row_group_mins(metadata.row_groups())?; + let min_value = + accumulate_column(MinAccumulator::try_new(field.data_type()).ok(), mins); - let column_stats = if has_statistics { - get_col_stats(&table_schema, null_counts, &mut max_values, &mut min_values) - } else { - Statistics::unknown_column(&table_schema) - }; + column_statistics.push(ColumnStatistics { + null_count, + max_value, + min_value, + distinct_count: Precision::Absent, + }); + } let statistics = Statistics { num_rows: Precision::Exact(num_rows as usize), total_byte_size: Precision::Exact(total_byte_size as usize), - column_statistics: column_stats, + column_statistics, }; Ok(statistics) } +/// Summarizes the UInt64Array to a single usize +/// +/// Nulls in `null_counts` are treated as missing values +/// +/// # Returns +/// +/// * Precision::Absent if there are any errors or there are no known statistics +/// * Precision::Inexact if there are any null (unknown) null counts +fn accumulate_null_counts(null_counts: &UInt64Array) -> Precision { + let total_null_count: usize = null_counts + .iter() + .filter_map(|v| v.map(|v| v as usize)) + .sum(); + + let num_unknown_null_counts = null_counts.null_count(); + if num_unknown_null_counts == 0 { + // had all null counts (common case) + Precision::Exact(total_null_count) + } else if num_unknown_null_counts == null_counts.len() { + // didn't know any null counts + Precision::Absent + } else { + // if any of the counts were null, don't know the true null count + Precision::Inexact(total_null_count) + } +} + +/// Summarizes the column to a single value using the accumulator +/// +/// Nulls in `column` are treated as missing values (not actual null values in +/// the parquet file) +/// +/// # Returns +/// +/// * Precision::Absent if there are any errors or there are no known statistics +/// * Precision::Inexact if there are any nulls +fn accumulate_column( + accumulator: Option, + column: ArrayRef, +) -> Precision { + // is_nullable returns false if array is guaranteed to not contains any + // nulls. If there are nulls in the column, that means some of the row + // group statistics were not known + let nullable = column.is_nullable(); + + let scalar = accumulator.and_then(|mut acc| { + acc.update_batch(&[column]).ok()?; + acc.evaluate().ok() + }); + + let Some(scalar) = scalar else { + return Precision::Absent; + }; + + // if the scalar itself is null, means we didn't have any known stats + if scalar.is_null() { + Precision::Absent + } else if nullable { + Precision::Inexact(scalar) + } else { + Precision::Exact(scalar) + } +} + /// Implements [`DataSink`] for writing to a parquet file. pub struct ParquetSink { /// Config options for writing data @@ -1126,7 +1114,8 @@ mod tests { use crate::physical_plan::metrics::MetricValue; use crate::prelude::{SessionConfig, SessionContext}; use arrow::array::{Array, ArrayRef, StringArray}; - use arrow_schema::Field; + use arrow_array::Int64Array; + use arrow_schema::{DataType, Field}; use async_trait::async_trait; use datafusion_common::cast::{ as_binary_array, as_boolean_array, as_float32_array, as_float64_array, @@ -1449,8 +1438,15 @@ mod tests { // column c1 let c1_stats = &stats.column_statistics[0]; assert_eq!(c1_stats.null_count, Precision::Exact(1)); - assert_eq!(c1_stats.max_value, Precision::Absent); - assert_eq!(c1_stats.min_value, Precision::Absent); + // Note in ASCII lower case is greater than upper case + assert_eq!( + c1_stats.max_value, + Precision::Exact(ScalarValue::from("bar")) + ); + assert_eq!( + c1_stats.min_value, + Precision::Exact(ScalarValue::from("Foo")) + ); // column c2: missing from the file so the table treats all 3 rows as null let c2_stats = &stats.column_statistics[1]; assert_eq!(c2_stats.null_count, Precision::Exact(3)); diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 327a516f1af1..bc1763b400a0 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -843,15 +843,18 @@ impl<'a> StatisticsConverter<'a> { /// Extract the null counts from row group statistics in [`RowGroupMetaData`] /// + /// The returned array is [`UInt64Array`] + /// /// See docs on [`Self::row_group_mins`] for details pub fn row_group_null_counts(&self, metadatas: I) -> Result where I: IntoIterator, { - let data_type = self.arrow_field.data_type(); - let Some(parquet_index) = self.parquet_index else { - return Ok(self.make_null_array(data_type, metadatas)); + let num_row_groups = metadatas.into_iter().count(); + return Ok(Arc::new(UInt64Array::from_iter( + std::iter::repeat(None).take(num_row_groups), + ))); }; let null_counts = metadatas diff --git a/datafusion/core/src/datasource/statistics.rs b/datafusion/core/src/datasource/statistics.rs index c67227f966a2..b022345d168c 100644 --- a/datafusion/core/src/datasource/statistics.rs +++ b/datafusion/core/src/datasource/statistics.rs @@ -16,10 +16,9 @@ // under the License. use super::listing::PartitionedFile; -use crate::arrow::datatypes::{Schema, SchemaRef}; +use crate::arrow::datatypes::SchemaRef; use crate::error::Result; -use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; -use crate::physical_plan::{Accumulator, ColumnStatistics, Statistics}; +use crate::physical_plan::{ColumnStatistics, Statistics}; use datafusion_common::stats::Precision; use datafusion_common::ScalarValue; @@ -149,23 +148,6 @@ pub async fn get_statistics_with_limit( Ok((result_files, statistics)) } - -pub(crate) fn create_max_min_accs( - schema: &Schema, -) -> (Vec>, Vec>) { - let max_values: Vec> = schema - .fields() - .iter() - .map(|field| MaxAccumulator::try_new(field.data_type()).ok()) - .collect(); - let min_values: Vec> = schema - .fields() - .iter() - .map(|field| MinAccumulator::try_new(field.data_type()).ok()) - .collect(); - (max_values, min_values) -} - fn add_row_stats( file_num_rows: Precision, num_rows: Precision, @@ -192,32 +174,6 @@ pub(crate) fn get_col_stats_vec( .collect() } -pub(crate) fn get_col_stats( - schema: &Schema, - null_counts: Vec>, - max_values: &mut [Option], - min_values: &mut [Option], -) -> Vec { - (0..schema.fields().len()) - .map(|i| { - let max_value = match max_values.get_mut(i).unwrap() { - Some(max_value) => max_value.evaluate().ok(), - None => None, - }; - let min_value = match min_values.get_mut(i).unwrap() { - Some(min_value) => min_value.evaluate().ok(), - None => None, - }; - ColumnStatistics { - null_count: null_counts[i].clone(), - max_value: max_value.map(Precision::Exact).unwrap_or(Precision::Absent), - min_value: min_value.map(Precision::Exact).unwrap_or(Precision::Absent), - distinct_count: Precision::Absent, - } - }) - .collect() -} - /// If the given value is numerically greater than the original maximum value, /// return the new maximum value with appropriate exactness information. fn set_max_if_greater( diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 3c5f8c7f7ad6..a12ae4af61ab 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -321,6 +321,24 @@ physical_plan 02)--ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] +# Create a new parquet file (with row group statistics) and show that +query IT +COPY (values (1, 'foo'), (2, 'bar'), (3, null)) TO 'test_files/scratch/explain/stats.parquet'; +---- +3 + +statement ok +CREATE EXTERNAL TABLE stats STORED AS PARQUET LOCATION 'test_files/scratch/explain/stats.parquet'; + +## Expect to see value statistics in the plan +query TT +EXPLAIN SELECT * from stats; +---- +physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/explain/stats.parquet]]}, projection=[column1, column2], statistics=[Rows=Exact(3), Bytes=Absent, [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(3)) Null=Exact(0)),(Col[1]: Min=Exact(Utf8("bar")) Max=Exact(Utf8("foo")) Null=Exact(1))]] + +statement ok +DROP TABLE stats; + statement ok set datafusion.explain.show_statistics = false; From da01b95da42601c32fbb6d0b665c8498f4e00995 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 17 Jun 2024 15:30:33 -0400 Subject: [PATCH 2/2] Fixup logical conflict --- datafusion/core/src/datasource/file_format/parquet.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index bde7c5d3f260..77de200af9d5 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -17,8 +17,6 @@ //! [`ParquetFormat`]: Parquet [`FileFormat`] abstractions -use arrow_array::cast::AsArray; -use arrow_array::types::UInt64Type; use arrow_array::{Array, ArrayRef, UInt64Array}; use std::any::Any; use std::fmt; @@ -446,7 +444,7 @@ pub async fn statistics_from_parquet_meta( }; let null_counts = converter.row_group_null_counts(metadata.row_groups())?; - let null_count = accumulate_null_counts(null_counts.as_primitive::()); + let null_count = accumulate_null_counts(&null_counts); let maxes = converter.row_group_maxes(metadata.row_groups())?; let max_value =