Skip to content

Commit

Permalink
[refacto] renamed format module to file_format
Browse files Browse the repository at this point in the history
also removed statistics from the PartitionedFile abstraction
  • Loading branch information
rdettai committed Sep 22, 2021
1 parent 830049e commit 833dafc
Show file tree
Hide file tree
Showing 13 changed files with 99 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ use async_trait::async_trait;
use futures::StreamExt;
use std::fs::File;

use super::PartitionedFile;
use super::{FileFormat, StringStream};
use crate::avro_to_arrow::read_avro_schema_from_reader;
use crate::datasource::PartitionedFile;
use crate::error::Result;
use crate::logical_plan::Expr;
use crate::physical_plan::format::AvroExec;
use crate::physical_plan::file_format::AvroExec;
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;

Expand Down Expand Up @@ -80,7 +80,7 @@ impl FileFormat for AvroFormat {
#[cfg(test)]
#[cfg(feature = "avro")]
mod tests {
use crate::datasource::format::string_stream;
use crate::datasource::file_format::string_stream;
use crate::physical_plan::collect;

use super::*;
Expand Down Expand Up @@ -336,10 +336,7 @@ mod tests {
.infer_stats(&filename)
.await
.expect("Stats inference");
let files = vec![vec![PartitionedFile {
path: filename,
statistics: stats.clone(),
}]];
let files = vec![vec![PartitionedFile { path: filename }]];
let exec = format
.create_executor(schema, files, stats, projection, batch_size, &[], None)
.await?;
Expand All @@ -352,7 +349,7 @@ mod tests {
mod tests {
use super::*;

use crate::datasource::format::string_stream;
use crate::datasource::file_format::string_stream;
use crate::error::DataFusionError;

#[tokio::test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ use async_trait::async_trait;
use futures::StreamExt;
use std::fs::File;

use super::PartitionedFile;
use super::{FileFormat, StringStream};
use crate::datasource::PartitionedFile;
use crate::error::Result;
use crate::logical_plan::Expr;
use crate::physical_plan::format::CsvExec;
use crate::physical_plan::file_format::CsvExec;
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;

Expand Down Expand Up @@ -106,7 +106,7 @@ mod tests {
use arrow::array::StringArray;

use super::*;
use crate::{datasource::format::string_stream, physical_plan::collect};
use crate::{datasource::file_format::string_stream, physical_plan::collect};

#[tokio::test]
async fn read_small_batches() -> Result<()> {
Expand Down Expand Up @@ -212,10 +212,7 @@ mod tests {
.infer_stats(&filename)
.await
.expect("Stats inference");
let files = vec![vec![PartitionedFile {
path: filename,
statistics: stats.clone(),
}]];
let files = vec![vec![PartitionedFile { path: filename }]];
let exec = format
.create_executor(schema, files, stats, projection, batch_size, &[], None)
.await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ use async_trait::async_trait;
use futures::StreamExt;
use std::fs::File;

use super::PartitionedFile;
use super::{FileFormat, StringStream};
use crate::datasource::PartitionedFile;
use crate::error::Result;
use crate::logical_plan::Expr;
use crate::physical_plan::format::NdJsonExec;
use crate::physical_plan::file_format::NdJsonExec;
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;

Expand Down Expand Up @@ -100,7 +100,7 @@ mod tests {
use arrow::array::Int64Array;

use super::*;
use crate::{datasource::format::string_stream, physical_plan::collect};
use crate::{datasource::file_format::string_stream, physical_plan::collect};

#[tokio::test]
async fn read_small_batches() -> Result<()> {
Expand Down Expand Up @@ -186,7 +186,6 @@ mod tests {
let stats = format.infer_stats(filename).await.expect("Stats inference");
let files = vec![vec![PartitionedFile {
path: filename.to_owned(),
statistics: stats.clone(),
}]];
let exec = format
.create_executor(schema, files, stats, projection, batch_size, &[], None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@ pub mod parquet;
use std::pin::Pin;
use std::sync::Arc;

use crate::arrow::datatypes::{Schema, SchemaRef};
use crate::arrow::datatypes::SchemaRef;
use crate::datasource::{create_max_min_accs, get_col_stats};
use crate::error::Result;
use crate::logical_plan::Expr;
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::{Accumulator, ColumnStatistics, ExecutionPlan, Statistics};

use super::PartitionedFile;
use crate::physical_plan::{Accumulator, ExecutionPlan, Statistics};

use async_trait::async_trait;
use futures::Stream;
Expand Down Expand Up @@ -77,8 +75,9 @@ pub trait FileFormat: Send + Sync {
/// if the optional `limit` is provided, includes only sufficient files
/// needed to read up to `limit` number of rows
/// TODO fix case where `num_rows` and `total_byte_size` are not defined (stat should be None instead of Some(0))
/// TODO move back to crate::datasource::mod.rs once legacy cleaned up
pub fn get_statistics_with_limit(
all_files: &[PartitionedFile],
all_files: &[(PartitionedFile, Statistics)],
schema: SchemaRef,
limit: Option<usize>,
) -> (Vec<PartitionedFile>, Statistics) {
Expand All @@ -92,9 +91,8 @@ pub fn get_statistics_with_limit(
let mut num_rows = 0;
let mut num_files = 0;
let mut is_exact = true;
for file in &all_files {
for (_, file_stats) in &all_files {
num_files += 1;
let file_stats = &file.statistics;
is_exact &= file_stats.is_exact;
num_rows += file_stats.num_rows.unwrap_or(0);
total_byte_size += file_stats.total_byte_size.unwrap_or(0);
Expand Down Expand Up @@ -152,47 +150,43 @@ pub fn get_statistics_with_limit(
column_statistics: column_stats,
is_exact,
};
(all_files, statistics)

let files = all_files.into_iter().map(|(f, _)| f).collect();

(files, statistics)
}

fn create_max_min_accs(
schema: &Schema,
) -> (Vec<Option<MaxAccumulator>>, Vec<Option<MinAccumulator>>) {
let max_values: Vec<Option<MaxAccumulator>> = schema
.fields()
.iter()
.map(|field| MaxAccumulator::try_new(field.data_type()).ok())
.collect::<Vec<_>>();
let min_values: Vec<Option<MinAccumulator>> = schema
.fields()
.iter()
.map(|field| MinAccumulator::try_new(field.data_type()).ok())
.collect::<Vec<_>>();
(max_values, min_values)
#[derive(Debug, Clone)]
/// A single file that should be read, along with its schema, statistics
/// and partition column values that need to be appended to each row.
/// TODO move back to crate::datasource::mod.rs once legacy cleaned up
pub struct PartitionedFile {
/// Path for the file (e.g. URL, filesystem path, etc)
pub path: String,
// Values of partition columns to be appended to each row
// pub partition_value: Option<Vec<ScalarValue>>,
// We may include row group range here for a more fine-grained parallel execution
}

fn get_col_stats(
schema: &Schema,
null_counts: Vec<usize>,
max_values: &mut Vec<Option<MaxAccumulator>>,
min_values: &mut Vec<Option<MinAccumulator>>,
) -> Vec<ColumnStatistics> {
(0..schema.fields().len())
.map(|i| {
let max_value = match &max_values[i] {
Some(max_value) => max_value.evaluate().ok(),
None => None,
};
let min_value = match &min_values[i] {
Some(min_value) => min_value.evaluate().ok(),
None => None,
};
ColumnStatistics {
null_count: Some(null_counts[i] as usize),
max_value,
min_value,
distinct_count: None,
}
})
.collect()
impl std::fmt::Display for PartitionedFile {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self.path)
}
}

#[derive(Debug, Clone)]
/// A collection of files that should be read in a single task
/// TODO move back to crate::datasource::mod.rs once legacy cleaned up
pub struct FilePartition {
/// The index of the partition among all partitions
pub index: usize,
/// The contained files of the partition
pub files: Vec<PartitionedFile>,
}

impl std::fmt::Display for FilePartition {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let files: Vec<String> = self.files.iter().map(|f| f.to_string()).collect();
write!(f, "{}", files.join(", "))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ use parquet::file::serialized_reader::SerializedFileReader;
use parquet::file::statistics::Statistics as ParquetStatistics;

use super::FileFormat;
use super::PartitionedFile;
use super::{create_max_min_accs, get_col_stats, StringStream};
use crate::arrow::datatypes::{DataType, Field};
use crate::datasource::PartitionedFile;
use crate::error::DataFusionError;
use crate::error::Result;
use crate::logical_plan::combine_filters;
use crate::logical_plan::Expr;
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::format::ParquetExec;
use crate::physical_plan::file_format::ParquetExec;
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::{Accumulator, Statistics};
use crate::scalar::ScalarValue;
Expand Down Expand Up @@ -284,7 +284,7 @@ fn fetch_metadata(path: &str) -> Result<(Schema, Statistics)> {

#[cfg(test)]
mod tests {
use crate::datasource::format::string_stream;
use crate::datasource::file_format::string_stream;
use crate::physical_plan::collect;

use super::*;
Expand Down Expand Up @@ -532,10 +532,7 @@ mod tests {
.infer_stats(&filename.clone())
.await
.expect("Stats inference");
let files = vec![vec![PartitionedFile {
path: filename,
statistics: stats.clone(),
}]];
let files = vec![vec![PartitionedFile { path: filename }]];
let exec = format
.create_executor(schema, files, stats, projection, batch_size, &[], None)
.await?;
Expand Down
65 changes: 36 additions & 29 deletions datafusion/src/datasource/listing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! A table that uses the files system / table store listing capability
//! A table that uses the `ObjectStore` listing capability
//! to get the list of files to process.

use std::{any::Any, sync::Arc};
Expand All @@ -25,15 +25,14 @@ use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};

use crate::{
datasource::format::{self},
datasource::file_format::{self, PartitionedFile},
error::Result,
logical_plan::Expr,
physical_plan::{common, ExecutionPlan, Statistics},
};

use super::{
datasource::TableProviderFilterPushDown, format::FileFormat, PartitionedFile,
TableProvider,
datasource::TableProviderFilterPushDown, file_format::FileFormat, TableProvider,
};

/// Options for creating a `ListingTable`
Expand Down Expand Up @@ -61,6 +60,9 @@ pub struct ListingOptions {
}

impl ListingOptions {
/// Infer the schema of the files at the given path, including the partitioning
/// columns.
///
/// This method will not be called by the table itself but before creating it.
/// This way when creating the logical plan we can decide to resolve the schema
/// locally or ask a remote service to do it (e.g a scheduler).
Expand Down Expand Up @@ -128,26 +130,24 @@ impl TableProvider for ListingTable {
)?;

// collect the statistics if required by the config
let mut files = file_list;
if self.options.collect_stat {
files = futures::stream::iter(files)
.then(|file| async {
let statistics = self.options.format.infer_stats(&file.path).await?;
Ok(PartitionedFile {
statistics,
path: file.path,
}) as Result<PartitionedFile>
})
.try_collect::<Vec<_>>()
.await?;
}
let files = futures::stream::iter(file_list)
.then(|file| async {
let statistics = if self.options.collect_stat {
self.options.format.infer_stats(&file.path).await?
} else {
Statistics::default()
};
Ok((file, statistics)) as Result<(PartitionedFile, Statistics)>
})
.try_collect::<Vec<_>>()
.await?;

let (files, statistics) =
format::get_statistics_with_limit(&files, self.schema(), limit);
file_format::get_statistics_with_limit(&files, self.schema(), limit);

let partitioned_file_lists = split_files(files, self.options.max_partitions);

// 2. create the plan
// create the execution plan
self.options
.format
.create_executor(
Expand Down Expand Up @@ -182,10 +182,7 @@ fn pruned_partition_list(
let list_all = || {
Ok(common::build_file_list(path, file_extension)?
.into_iter()
.map(|f| PartitionedFile {
path: f,
statistics: Statistics::default(),
})
.map(|f| PartitionedFile { path: f })
.collect::<Vec<PartitionedFile>>())
};
if partition_names.is_empty() {
Expand Down Expand Up @@ -216,11 +213,21 @@ mod tests {
#[test]
fn test_split_files() {
let files = vec![
PartitionedFile::from("a".to_string()),
PartitionedFile::from("b".to_string()),
PartitionedFile::from("c".to_string()),
PartitionedFile::from("d".to_string()),
PartitionedFile::from("e".to_string()),
PartitionedFile {
path: "a".to_owned(),
},
PartitionedFile {
path: "b".to_owned(),
},
PartitionedFile {
path: "c".to_owned(),
},
PartitionedFile {
path: "d".to_owned(),
},
PartitionedFile {
path: "e".to_owned(),
},
];

let chunks = split_files(files.clone(), 1);
Expand Down Expand Up @@ -275,7 +282,7 @@ mod tests {
let filename = format!("{}/{}", testdata, name);
let opt = ListingOptions {
file_extension: "parquet".to_owned(),
format: Arc::new(format::parquet::ParquetFormat {
format: Arc::new(file_format::parquet::ParquetFormat {
enable_pruning: true,
}),
partitions: vec![],
Expand Down
Loading

0 comments on commit 833dafc

Please sign in to comment.