Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update ListingTable to use StatisticsConverter, remove redundant statistics extraction code #10924

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions datafusion/common/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,24 @@ impl ColumnStatistics {
distinct_count: Precision::Absent,
}
}

/// Set the null count for the column
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These functions make creating ColumnStatitistic more ergonomic

pub fn with_null_count(mut self, null_count: Precision<usize>) -> Self {
self.null_count = null_count;
self
}

/// Set the max value for the column
pub fn with_max_value(mut self, max_value: Precision<ScalarValue>) -> Self {
self.max_value = max_value;
self
}

/// Set the min value for the column
pub fn with_min_value(mut self, min_value: Precision<ScalarValue>) -> Self {
self.min_value = min_value;
self
}
}

#[cfg(test)]
Expand Down
270 changes: 132 additions & 138 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! [`ParquetFormat`]: Parquet [`FileFormat`] abstractions

use arrow_array::{Array, ArrayRef, UInt64Array};
use std::any::Any;
use std::fmt;
use std::fmt::Debug;
Expand All @@ -25,16 +26,13 @@ use std::sync::Arc;
use super::write::demux::start_demuxer_task;
use super::write::{create_writer, SharedBuffer};
use super::{FileFormat, FileScanConfig};
use crate::arrow::array::{
BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch,
};
use crate::arrow::datatypes::{DataType, Fields, Schema, SchemaRef};
use crate::arrow::array::RecordBatch;
use crate::arrow::datatypes::{Fields, Schema, SchemaRef};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
use crate::datasource::physical_plan::{FileGroupDisplay, FileSinkConfig};
use crate::datasource::schema_adapter::{
DefaultSchemaAdapterFactory, SchemaAdapterFactory,
};
use crate::datasource::statistics::{create_max_min_accs, get_col_stats};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
Expand All @@ -48,7 +46,8 @@ use datafusion_common::config::TableParquetOptions;
use datafusion_common::file_options::parquet_writer::ParquetWriterOptions;
use datafusion_common::stats::Precision;
use datafusion_common::{
exec_err, internal_datafusion_err, not_impl_err, DataFusionError,
exec_err, internal_datafusion_err, not_impl_err, ColumnStatistics, DataFusionError,
ScalarValue,
};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::TaskContext;
Expand All @@ -68,14 +67,15 @@ use parquet::arrow::{
use parquet::file::footer::{decode_footer, decode_metadata};
use parquet::file::metadata::ParquetMetaData;
use parquet::file::properties::WriterProperties;
use parquet::file::statistics::Statistics as ParquetStatistics;
use parquet::file::writer::SerializedFileWriter;
use parquet::format::FileMetaData;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::task::JoinSet;

use crate::datasource::physical_plan::parquet::ParquetExecBuilder;
use crate::datasource::physical_plan::parquet::{
ParquetExecBuilder, StatisticsConverter,
};
use futures::{StreamExt, TryStreamExt};
use hashbrown::HashMap;
use object_store::path::Path;
Expand Down Expand Up @@ -295,86 +295,6 @@ impl FileFormat for ParquetFormat {
}
}

fn summarize_min_max(
max_values: &mut [Option<MaxAccumulator>],
min_values: &mut [Option<MinAccumulator>],
fields: &Fields,
i: usize,
stat: &ParquetStatistics,
) {
if !stat.has_min_max_set() {
max_values[i] = None;
min_values[i] = None;
return;
}
match stat {
ParquetStatistics::Boolean(s) if DataType::Boolean == *fields[i].data_type() => {
if let Some(max_value) = &mut max_values[i] {
max_value
.update_batch(&[Arc::new(BooleanArray::from(vec![*s.max()]))])
.unwrap_or_else(|_| max_values[i] = None);
}
if let Some(min_value) = &mut min_values[i] {
min_value
.update_batch(&[Arc::new(BooleanArray::from(vec![*s.min()]))])
.unwrap_or_else(|_| min_values[i] = None);
}
}
ParquetStatistics::Int32(s) if DataType::Int32 == *fields[i].data_type() => {
if let Some(max_value) = &mut max_values[i] {
max_value
.update_batch(&[Arc::new(Int32Array::from_value(*s.max(), 1))])
.unwrap_or_else(|_| max_values[i] = None);
}
if let Some(min_value) = &mut min_values[i] {
min_value
.update_batch(&[Arc::new(Int32Array::from_value(*s.min(), 1))])
.unwrap_or_else(|_| min_values[i] = None);
}
}
ParquetStatistics::Int64(s) if DataType::Int64 == *fields[i].data_type() => {
if let Some(max_value) = &mut max_values[i] {
max_value
.update_batch(&[Arc::new(Int64Array::from_value(*s.max(), 1))])
Copy link
Contributor Author

@alamb alamb Jun 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, even though update_batch is called, it first creates a single row array

.unwrap_or_else(|_| max_values[i] = None);
}
if let Some(min_value) = &mut min_values[i] {
min_value
.update_batch(&[Arc::new(Int64Array::from_value(*s.min(), 1))])
.unwrap_or_else(|_| min_values[i] = None);
}
}
ParquetStatistics::Float(s) if DataType::Float32 == *fields[i].data_type() => {
if let Some(max_value) = &mut max_values[i] {
max_value
.update_batch(&[Arc::new(Float32Array::from(vec![*s.max()]))])
.unwrap_or_else(|_| max_values[i] = None);
}
if let Some(min_value) = &mut min_values[i] {
min_value
.update_batch(&[Arc::new(Float32Array::from(vec![*s.min()]))])
.unwrap_or_else(|_| min_values[i] = None);
}
}
ParquetStatistics::Double(s) if DataType::Float64 == *fields[i].data_type() => {
if let Some(max_value) = &mut max_values[i] {
max_value
.update_batch(&[Arc::new(Float64Array::from(vec![*s.max()]))])
.unwrap_or_else(|_| max_values[i] = None);
}
if let Some(min_value) = &mut min_values[i] {
min_value
.update_batch(&[Arc::new(Float64Array::from(vec![*s.min()]))])
.unwrap_or_else(|_| min_values[i] = None);
}
}
_ => {
max_values[i] = None;
min_values[i] = None;
}
}
}

/// Fetches parquet metadata from ObjectStore for given object
///
/// This component is a subject to **change** in near future and is exposed for low level integrations
Expand Down Expand Up @@ -482,73 +402,139 @@ pub async fn statistics_from_parquet_meta(
file_metadata.key_value_metadata(),
)?;

let num_fields = table_schema.fields().len();
let fields = table_schema.fields();

let mut num_rows = 0;
let mut total_byte_size = 0;
let mut null_counts = vec![Precision::Exact(0); num_fields];
let mut has_statistics = false;

for row_group_meta in metadata.row_groups() {
num_rows += row_group_meta.num_rows();
total_byte_size += row_group_meta.total_byte_size();
}

let schema_adapter =
DefaultSchemaAdapterFactory::default().create(table_schema.clone());

let (mut max_values, mut min_values) = create_max_min_accs(&table_schema);
// statistics for each of the table's columns (may be different from the
// file schema)
let mut column_statistics = vec![];

for (table_idx, field) in table_schema.fields().iter().enumerate() {
let Some(file_idx) = schema_adapter.map_column_index(table_idx, &file_schema)
else {
// file columns not in table schema are treated as all null
let null_count = Precision::Exact(num_rows as usize);
let null_value = ScalarValue::try_from(field.data_type())?;
let stats = ColumnStatistics::new_unknown()
.with_null_count(null_count)
.with_max_value(Precision::Exact(null_value.clone()))
.with_min_value(Precision::Exact(null_value));
column_statistics.push(stats);
continue;
};

for row_group_meta in metadata.row_groups() {
num_rows += row_group_meta.num_rows();
total_byte_size += row_group_meta.total_byte_size();
let file_field = file_schema.field(file_idx);
let Some(converter) = StatisticsConverter::try_new(
Copy link
Contributor Author

@alamb alamb Jun 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code now uses the well tested StatisticsConverter to extract statistics from the parquet file with the correct type of array in a single call

file_field.name(),
&file_schema,
file_metadata.schema_descr(),
)
.ok() else {
// problem extracting statistics, bail out
column_statistics.push(ColumnStatistics::new_unknown());
continue;
};

let mut column_stats: HashMap<usize, (u64, &ParquetStatistics)> = HashMap::new();
let null_counts = converter.row_group_null_counts(metadata.row_groups())?;
let null_count = accumulate_null_counts(&null_counts);

for (i, column) in row_group_meta.columns().iter().enumerate() {
if let Some(stat) = column.statistics() {
has_statistics = true;
column_stats.insert(i, (stat.null_count(), stat));
}
}
let maxes = converter.row_group_maxes(metadata.row_groups())?;
let max_value =
accumulate_column(MaxAccumulator::try_new(field.data_type()).ok(), maxes);

if has_statistics {
for (table_idx, null_cnt) in null_counts.iter_mut().enumerate() {
if let Some(file_idx) =
schema_adapter.map_column_index(table_idx, &file_schema)
{
if let Some((null_count, stats)) = column_stats.get(&file_idx) {
*null_cnt = null_cnt.add(&Precision::Exact(*null_count as usize));
summarize_min_max(
&mut max_values,
&mut min_values,
fields,
table_idx,
stats,
)
} else {
// If none statistics of current column exists, set the Max/Min Accumulator to None.
max_values[table_idx] = None;
min_values[table_idx] = None;
}
} else {
*null_cnt = null_cnt.add(&Precision::Exact(num_rows as usize));
}
}
}
}
let mins = converter.row_group_mins(metadata.row_groups())?;
let min_value =
accumulate_column(MinAccumulator::try_new(field.data_type()).ok(), mins);

let column_stats = if has_statistics {
get_col_stats(&table_schema, null_counts, &mut max_values, &mut min_values)
} else {
Statistics::unknown_column(&table_schema)
};
column_statistics.push(ColumnStatistics {
null_count,
max_value,
min_value,
distinct_count: Precision::Absent,
});
}

let statistics = Statistics {
num_rows: Precision::Exact(num_rows as usize),
total_byte_size: Precision::Exact(total_byte_size as usize),
column_statistics: column_stats,
column_statistics,
};

Ok(statistics)
}

/// Summarizes the UInt64Array to a single usize
///
/// Nulls in `null_counts` are treated as missing values
///
/// # Returns
///
/// * Precision::Absent if there are any errors or there are no known statistics
/// * Precision::Inexact if there are any null (unknown) null counts
fn accumulate_null_counts(null_counts: &UInt64Array) -> Precision<usize> {
let total_null_count: usize = null_counts
.iter()
.filter_map(|v| v.map(|v| v as usize))
.sum();

let num_unknown_null_counts = null_counts.null_count();
if num_unknown_null_counts == 0 {
// had all null counts (common case)
Precision::Exact(total_null_count)
} else if num_unknown_null_counts == null_counts.len() {
// didn't know any null counts
Precision::Absent
} else {
// if any of the counts were null, don't know the true null count
Precision::Inexact(total_null_count)
}
}

/// Summarizes the column to a single value using the accumulator
///
/// Nulls in `column` are treated as missing values (not actual null values in
/// the parquet file)
///
/// # Returns
///
/// * Precision::Absent if there are any errors or there are no known statistics
/// * Precision::Inexact if there are any nulls
fn accumulate_column<A: Accumulator>(
accumulator: Option<A>,
column: ArrayRef,
) -> Precision<ScalarValue> {
// is_nullable returns false if array is guaranteed to not contains any
// nulls. If there are nulls in the column, that means some of the row
// group statistics were not known
let nullable = column.is_nullable();

let scalar = accumulator.and_then(|mut acc| {
acc.update_batch(&[column]).ok()?;
acc.evaluate().ok()
});

let Some(scalar) = scalar else {
return Precision::Absent;
};

// if the scalar itself is null, means we didn't have any known stats
if scalar.is_null() {
Precision::Absent
} else if nullable {
Precision::Inexact(scalar)
} else {
Precision::Exact(scalar)
}
}

/// Implements [`DataSink`] for writing to a parquet file.
pub struct ParquetSink {
/// Config options for writing data
Expand Down Expand Up @@ -1126,7 +1112,8 @@ mod tests {
use crate::physical_plan::metrics::MetricValue;
use crate::prelude::{SessionConfig, SessionContext};
use arrow::array::{Array, ArrayRef, StringArray};
use arrow_schema::Field;
use arrow_array::Int64Array;
use arrow_schema::{DataType, Field};
use async_trait::async_trait;
use datafusion_common::cast::{
as_binary_array, as_boolean_array, as_float32_array, as_float64_array,
Expand Down Expand Up @@ -1449,8 +1436,15 @@ mod tests {
// column c1
let c1_stats = &stats.column_statistics[0];
assert_eq!(c1_stats.null_count, Precision::Exact(1));
assert_eq!(c1_stats.max_value, Precision::Absent);
assert_eq!(c1_stats.min_value, Precision::Absent);
// Note in ASCII lower case is greater than upper case
assert_eq!(
c1_stats.max_value,
Precision::Exact(ScalarValue::from("bar"))
Copy link
Contributor Author

@alamb alamb Jun 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

strings were previously not handled, but are now properly handled by StatisticsConverter

);
assert_eq!(
c1_stats.min_value,
Precision::Exact(ScalarValue::from("Foo"))
);
// column c2: missing from the file so the table treats all 3 rows as null
let c2_stats = &stats.column_statistics[1];
assert_eq!(c2_stats.null_count, Precision::Exact(3));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,8 @@ impl<'a> StatisticsConverter<'a> {

/// Extract the null counts from row group statistics in [`RowGroupMetaData`]
///
/// The returned array is [`UInt64Array`]
///
/// See docs on [`Self::row_group_mins`] for details
pub fn row_group_null_counts<I>(&self, metadatas: I) -> Result<UInt64Array>
where
Expand Down
Loading
Loading