diff --git a/datafusion/src/datasource/format/avro.rs b/datafusion/src/datasource/file_format/avro.rs similarity index 97% rename from datafusion/src/datasource/format/avro.rs rename to datafusion/src/datasource/file_format/avro.rs index 52f52d1d6d8c2..1e8bba7c68d05 100644 --- a/datafusion/src/datasource/format/avro.rs +++ b/datafusion/src/datasource/file_format/avro.rs @@ -25,12 +25,12 @@ use async_trait::async_trait; use futures::StreamExt; use std::fs::File; +use super::PartitionedFile; use super::{FileFormat, StringStream}; use crate::avro_to_arrow::read_avro_schema_from_reader; -use crate::datasource::PartitionedFile; use crate::error::Result; use crate::logical_plan::Expr; -use crate::physical_plan::format::AvroExec; +use crate::physical_plan::file_format::AvroExec; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::Statistics; @@ -80,7 +80,7 @@ impl FileFormat for AvroFormat { #[cfg(test)] #[cfg(feature = "avro")] mod tests { - use crate::datasource::format::string_stream; + use crate::datasource::file_format::string_stream; use crate::physical_plan::collect; use super::*; @@ -336,10 +336,7 @@ mod tests { .infer_stats(&filename) .await .expect("Stats inference"); - let files = vec![vec![PartitionedFile { - path: filename, - statistics: stats.clone(), - }]]; + let files = vec![vec![PartitionedFile { path: filename }]]; let exec = format .create_executor(schema, files, stats, projection, batch_size, &[], None) .await?; @@ -352,7 +349,7 @@ mod tests { mod tests { use super::*; - use crate::datasource::format::string_stream; + use crate::datasource::file_format::string_stream; use crate::error::DataFusionError; #[tokio::test] diff --git a/datafusion/src/datasource/format/csv.rs b/datafusion/src/datasource/file_format/csv.rs similarity index 95% rename from datafusion/src/datasource/format/csv.rs rename to datafusion/src/datasource/file_format/csv.rs index 1d9cd6c97765d..6eaabfac7390d 100644 --- a/datafusion/src/datasource/format/csv.rs +++ b/datafusion/src/datasource/file_format/csv.rs @@ -25,11 +25,11 @@ use async_trait::async_trait; use futures::StreamExt; use std::fs::File; +use super::PartitionedFile; use super::{FileFormat, StringStream}; -use crate::datasource::PartitionedFile; use crate::error::Result; use crate::logical_plan::Expr; -use crate::physical_plan::format::CsvExec; +use crate::physical_plan::file_format::CsvExec; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::Statistics; @@ -106,7 +106,7 @@ mod tests { use arrow::array::StringArray; use super::*; - use crate::{datasource::format::string_stream, physical_plan::collect}; + use crate::{datasource::file_format::string_stream, physical_plan::collect}; #[tokio::test] async fn read_small_batches() -> Result<()> { @@ -212,10 +212,7 @@ mod tests { .infer_stats(&filename) .await .expect("Stats inference"); - let files = vec![vec![PartitionedFile { - path: filename, - statistics: stats.clone(), - }]]; + let files = vec![vec![PartitionedFile { path: filename }]]; let exec = format .create_executor(schema, files, stats, projection, batch_size, &[], None) .await?; diff --git a/datafusion/src/datasource/format/json.rs b/datafusion/src/datasource/file_format/json.rs similarity index 96% rename from datafusion/src/datasource/format/json.rs rename to datafusion/src/datasource/file_format/json.rs index a82aebd2d1eb7..5ad363e6aa91a 100644 --- a/datafusion/src/datasource/format/json.rs +++ b/datafusion/src/datasource/file_format/json.rs @@ -28,11 +28,11 @@ use async_trait::async_trait; use futures::StreamExt; use std::fs::File; +use super::PartitionedFile; use super::{FileFormat, StringStream}; -use crate::datasource::PartitionedFile; use crate::error::Result; use crate::logical_plan::Expr; -use crate::physical_plan::format::NdJsonExec; +use crate::physical_plan::file_format::NdJsonExec; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::Statistics; @@ -100,7 +100,7 @@ mod tests { use arrow::array::Int64Array; use super::*; - use crate::{datasource::format::string_stream, physical_plan::collect}; + use crate::{datasource::file_format::string_stream, physical_plan::collect}; #[tokio::test] async fn read_small_batches() -> Result<()> { @@ -186,7 +186,6 @@ mod tests { let stats = format.infer_stats(filename).await.expect("Stats inference"); let files = vec![vec![PartitionedFile { path: filename.to_owned(), - statistics: stats.clone(), }]]; let exec = format .create_executor(schema, files, stats, projection, batch_size, &[], None) diff --git a/datafusion/src/datasource/format/mod.rs b/datafusion/src/datasource/file_format/mod.rs similarity index 74% rename from datafusion/src/datasource/format/mod.rs rename to datafusion/src/datasource/file_format/mod.rs index a6143353e6b1a..933c7c52c12cf 100644 --- a/datafusion/src/datasource/format/mod.rs +++ b/datafusion/src/datasource/file_format/mod.rs @@ -25,13 +25,11 @@ pub mod parquet; use std::pin::Pin; use std::sync::Arc; -use crate::arrow::datatypes::{Schema, SchemaRef}; +use crate::arrow::datatypes::SchemaRef; +use crate::datasource::{create_max_min_accs, get_col_stats}; use crate::error::Result; use crate::logical_plan::Expr; -use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; -use crate::physical_plan::{Accumulator, ColumnStatistics, ExecutionPlan, Statistics}; - -use super::PartitionedFile; +use crate::physical_plan::{Accumulator, ExecutionPlan, Statistics}; use async_trait::async_trait; use futures::Stream; @@ -77,8 +75,9 @@ pub trait FileFormat: Send + Sync { /// if the optional `limit` is provided, includes only sufficient files /// needed to read up to `limit` number of rows /// TODO fix case where `num_rows` and `total_byte_size` are not defined (stat should be None instead of Some(0)) +/// TODO move back to crate::datasource::mod.rs once legacy cleaned up pub fn get_statistics_with_limit( - all_files: &[PartitionedFile], + all_files: &[(PartitionedFile, Statistics)], schema: SchemaRef, limit: Option, ) -> (Vec, Statistics) { @@ -92,9 +91,8 @@ pub fn get_statistics_with_limit( let mut num_rows = 0; let mut num_files = 0; let mut is_exact = true; - for file in &all_files { + for (_, file_stats) in &all_files { num_files += 1; - let file_stats = &file.statistics; is_exact &= file_stats.is_exact; num_rows += file_stats.num_rows.unwrap_or(0); total_byte_size += file_stats.total_byte_size.unwrap_or(0); @@ -152,47 +150,43 @@ pub fn get_statistics_with_limit( column_statistics: column_stats, is_exact, }; - (all_files, statistics) + + let files = all_files.into_iter().map(|(f, _)| f).collect(); + + (files, statistics) } -fn create_max_min_accs( - schema: &Schema, -) -> (Vec>, Vec>) { - let max_values: Vec> = schema - .fields() - .iter() - .map(|field| MaxAccumulator::try_new(field.data_type()).ok()) - .collect::>(); - let min_values: Vec> = schema - .fields() - .iter() - .map(|field| MinAccumulator::try_new(field.data_type()).ok()) - .collect::>(); - (max_values, min_values) +#[derive(Debug, Clone)] +/// A single file that should be read, along with its schema, statistics +/// and partition column values that need to be appended to each row. +/// TODO move back to crate::datasource::mod.rs once legacy cleaned up +pub struct PartitionedFile { + /// Path for the file (e.g. URL, filesystem path, etc) + pub path: String, + // Values of partition columns to be appended to each row + // pub partition_value: Option>, + // We may include row group range here for a more fine-grained parallel execution } -fn get_col_stats( - schema: &Schema, - null_counts: Vec, - max_values: &mut Vec>, - min_values: &mut Vec>, -) -> Vec { - (0..schema.fields().len()) - .map(|i| { - let max_value = match &max_values[i] { - Some(max_value) => max_value.evaluate().ok(), - None => None, - }; - let min_value = match &min_values[i] { - Some(min_value) => min_value.evaluate().ok(), - None => None, - }; - ColumnStatistics { - null_count: Some(null_counts[i] as usize), - max_value, - min_value, - distinct_count: None, - } - }) - .collect() +impl std::fmt::Display for PartitionedFile { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.path) + } +} + +#[derive(Debug, Clone)] +/// A collection of files that should be read in a single task +/// TODO move back to crate::datasource::mod.rs once legacy cleaned up +pub struct FilePartition { + /// The index of the partition among all partitions + pub index: usize, + /// The contained files of the partition + pub files: Vec, +} + +impl std::fmt::Display for FilePartition { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + let files: Vec = self.files.iter().map(|f| f.to_string()).collect(); + write!(f, "{}", files.join(", ")) + } } diff --git a/datafusion/src/datasource/format/parquet.rs b/datafusion/src/datasource/file_format/parquet.rs similarity index 98% rename from datafusion/src/datasource/format/parquet.rs rename to datafusion/src/datasource/file_format/parquet.rs index c61919587b207..e746d81a31729 100644 --- a/datafusion/src/datasource/format/parquet.rs +++ b/datafusion/src/datasource/file_format/parquet.rs @@ -30,15 +30,15 @@ use parquet::file::serialized_reader::SerializedFileReader; use parquet::file::statistics::Statistics as ParquetStatistics; use super::FileFormat; +use super::PartitionedFile; use super::{create_max_min_accs, get_col_stats, StringStream}; use crate::arrow::datatypes::{DataType, Field}; -use crate::datasource::PartitionedFile; use crate::error::DataFusionError; use crate::error::Result; use crate::logical_plan::combine_filters; use crate::logical_plan::Expr; use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; -use crate::physical_plan::format::ParquetExec; +use crate::physical_plan::file_format::ParquetExec; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::{Accumulator, Statistics}; use crate::scalar::ScalarValue; @@ -284,7 +284,7 @@ fn fetch_metadata(path: &str) -> Result<(Schema, Statistics)> { #[cfg(test)] mod tests { - use crate::datasource::format::string_stream; + use crate::datasource::file_format::string_stream; use crate::physical_plan::collect; use super::*; @@ -532,10 +532,7 @@ mod tests { .infer_stats(&filename.clone()) .await .expect("Stats inference"); - let files = vec![vec![PartitionedFile { - path: filename, - statistics: stats.clone(), - }]]; + let files = vec![vec![PartitionedFile { path: filename }]]; let exec = format .create_executor(schema, files, stats, projection, batch_size, &[], None) .await?; diff --git a/datafusion/src/datasource/listing.rs b/datafusion/src/datasource/listing.rs index 9ee39774cae4c..d240f650a5730 100644 --- a/datafusion/src/datasource/listing.rs +++ b/datafusion/src/datasource/listing.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! A table that uses the files system / table store listing capability +//! A table that uses the `ObjectStore` listing capability //! to get the list of files to process. use std::{any::Any, sync::Arc}; @@ -25,15 +25,14 @@ use async_trait::async_trait; use futures::{StreamExt, TryStreamExt}; use crate::{ - datasource::format::{self}, + datasource::file_format::{self, PartitionedFile}, error::Result, logical_plan::Expr, physical_plan::{common, ExecutionPlan, Statistics}, }; use super::{ - datasource::TableProviderFilterPushDown, format::FileFormat, PartitionedFile, - TableProvider, + datasource::TableProviderFilterPushDown, file_format::FileFormat, TableProvider, }; /// Options for creating a `ListingTable` @@ -61,6 +60,9 @@ pub struct ListingOptions { } impl ListingOptions { + /// Infer the schema of the files at the given path, including the partitioning + /// columns. + /// /// This method will not be called by the table itself but before creating it. /// This way when creating the logical plan we can decide to resolve the schema /// locally or ask a remote service to do it (e.g a scheduler). @@ -128,26 +130,24 @@ impl TableProvider for ListingTable { )?; // collect the statistics if required by the config - let mut files = file_list; - if self.options.collect_stat { - files = futures::stream::iter(files) - .then(|file| async { - let statistics = self.options.format.infer_stats(&file.path).await?; - Ok(PartitionedFile { - statistics, - path: file.path, - }) as Result - }) - .try_collect::>() - .await?; - } + let files = futures::stream::iter(file_list) + .then(|file| async { + let statistics = if self.options.collect_stat { + self.options.format.infer_stats(&file.path).await? + } else { + Statistics::default() + }; + Ok((file, statistics)) as Result<(PartitionedFile, Statistics)> + }) + .try_collect::>() + .await?; let (files, statistics) = - format::get_statistics_with_limit(&files, self.schema(), limit); + file_format::get_statistics_with_limit(&files, self.schema(), limit); let partitioned_file_lists = split_files(files, self.options.max_partitions); - // 2. create the plan + // create the execution plan self.options .format .create_executor( @@ -182,10 +182,7 @@ fn pruned_partition_list( let list_all = || { Ok(common::build_file_list(path, file_extension)? .into_iter() - .map(|f| PartitionedFile { - path: f, - statistics: Statistics::default(), - }) + .map(|f| PartitionedFile { path: f }) .collect::>()) }; if partition_names.is_empty() { @@ -216,11 +213,21 @@ mod tests { #[test] fn test_split_files() { let files = vec![ - PartitionedFile::from("a".to_string()), - PartitionedFile::from("b".to_string()), - PartitionedFile::from("c".to_string()), - PartitionedFile::from("d".to_string()), - PartitionedFile::from("e".to_string()), + PartitionedFile { + path: "a".to_owned(), + }, + PartitionedFile { + path: "b".to_owned(), + }, + PartitionedFile { + path: "c".to_owned(), + }, + PartitionedFile { + path: "d".to_owned(), + }, + PartitionedFile { + path: "e".to_owned(), + }, ]; let chunks = split_files(files.clone(), 1); @@ -275,7 +282,7 @@ mod tests { let filename = format!("{}/{}", testdata, name); let opt = ListingOptions { file_extension: "parquet".to_owned(), - format: Arc::new(format::parquet::ParquetFormat { + format: Arc::new(file_format::parquet::ParquetFormat { enable_pruning: true, }), partitions: vec![], diff --git a/datafusion/src/datasource/mod.rs b/datafusion/src/datasource/mod.rs index 046a86e8b14dc..75180c5d64f83 100644 --- a/datafusion/src/datasource/mod.rs +++ b/datafusion/src/datasource/mod.rs @@ -21,7 +21,7 @@ pub mod avro; pub mod csv; pub mod datasource; pub mod empty; -pub mod format; +pub mod file_format; pub mod json; pub mod listing; pub mod memory; diff --git a/datafusion/src/physical_plan/format/avro.rs b/datafusion/src/physical_plan/file_format/avro.rs similarity index 99% rename from datafusion/src/physical_plan/format/avro.rs rename to datafusion/src/physical_plan/file_format/avro.rs index 471e95fa8a20d..fd50f18bf1f09 100644 --- a/datafusion/src/physical_plan/format/avro.rs +++ b/datafusion/src/physical_plan/file_format/avro.rs @@ -235,7 +235,7 @@ mod tests { async fn test() -> Result<()> { use futures::StreamExt; - use crate::datasource::format::{avro::AvroFormat, FileFormat}; + use crate::datasource::file_format::{avro::AvroFormat, FileFormat}; let testdata = crate::test_util::arrow_test_data(); let filename = format!("{}/avro/alltypes_plain.avro", testdata); diff --git a/datafusion/src/physical_plan/format/csv.rs b/datafusion/src/physical_plan/file_format/csv.rs similarity index 100% rename from datafusion/src/physical_plan/format/csv.rs rename to datafusion/src/physical_plan/file_format/csv.rs diff --git a/datafusion/src/physical_plan/format/json.rs b/datafusion/src/physical_plan/file_format/json.rs similarity index 99% rename from datafusion/src/physical_plan/format/json.rs rename to datafusion/src/physical_plan/file_format/json.rs index b187e64211acc..69be9d2e7a9f9 100644 --- a/datafusion/src/physical_plan/format/json.rs +++ b/datafusion/src/physical_plan/file_format/json.rs @@ -221,7 +221,7 @@ impl RecordBatchStream for NdJsonStream { mod tests { use futures::StreamExt; - use crate::datasource::format::{json::JsonFormat, FileFormat}; + use crate::datasource::file_format::{json::JsonFormat, FileFormat}; use super::*; diff --git a/datafusion/src/physical_plan/format/mod.rs b/datafusion/src/physical_plan/file_format/mod.rs similarity index 100% rename from datafusion/src/physical_plan/format/mod.rs rename to datafusion/src/physical_plan/file_format/mod.rs diff --git a/datafusion/src/physical_plan/format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs similarity index 99% rename from datafusion/src/physical_plan/format/parquet.rs rename to datafusion/src/physical_plan/file_format/parquet.rs index 0b216631e6685..615e52b544748 100644 --- a/datafusion/src/physical_plan/format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -58,7 +58,7 @@ use tokio::{ use async_trait::async_trait; -use crate::datasource::{FilePartition, PartitionedFile}; +use crate::datasource::file_format::{FilePartition, PartitionedFile}; /// Execution plan for scanning one or more Parquet partitions #[derive(Debug, Clone)] @@ -526,7 +526,7 @@ fn read_partition( #[cfg(test)] mod tests { - use crate::datasource::format::{parquet::ParquetFormat, FileFormat}; + use crate::datasource::file_format::{parquet::ParquetFormat, FileFormat}; use super::*; use arrow::datatypes::{DataType, Field}; @@ -544,7 +544,6 @@ mod tests { let parquet_exec = ParquetExec::new( vec![vec![PartitionedFile { path: filename.clone(), - statistics: Statistics::default(), }]], Statistics::default(), ParquetFormat { diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index 17722b8837a08..61a6e2401bbe8 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -620,8 +620,8 @@ pub mod distinct_expressions; pub mod empty; pub mod explain; pub mod expressions; +pub mod file_format; pub mod filter; -pub mod format; pub mod functions; pub mod hash_aggregate; pub mod hash_join;