diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 0b9fba52140b..59bb55162a8e 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -54,6 +54,8 @@ use datafusion::{ }, }; +use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION; +use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION; use structopt::StructOpt; #[cfg(feature = "snmalloc")] @@ -652,13 +654,13 @@ fn get_table( .with_delimiter(b',') .with_has_header(true); - (Arc::new(format), path, ".csv") + (Arc::new(format), path, DEFAULT_CSV_EXTENSION) } "parquet" => { let path = format!("{}/{}", path, table); let format = ParquetFormat::default().with_enable_pruning(true); - (Arc::new(format), path, ".parquet") + (Arc::new(format), path, DEFAULT_PARQUET_EXTENSION) } other => { unimplemented!("Invalid file format '{}'", other); diff --git a/datafusion-examples/examples/parquet_sql_multiple_files.rs b/datafusion-examples/examples/parquet_sql_multiple_files.rs index 2e954276083e..7485bc72f193 100644 --- a/datafusion-examples/examples/parquet_sql_multiple_files.rs +++ b/datafusion-examples/examples/parquet_sql_multiple_files.rs @@ -15,7 +15,9 @@ // specific language governing permissions and limitations // under the License. -use datafusion::datasource::file_format::parquet::ParquetFormat; +use datafusion::datasource::file_format::parquet::{ + ParquetFormat, DEFAULT_PARQUET_EXTENSION, +}; use datafusion::datasource::listing::ListingOptions; use datafusion::error::Result; use datafusion::prelude::*; @@ -33,7 +35,7 @@ async fn main() -> Result<()> { // Configure listing options let file_format = ParquetFormat::default().with_enable_pruning(true); let listing_options = ListingOptions { - file_extension: ".parquet".to_owned(), + file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(), format: Arc::new(file_format), table_partition_cols: vec![], collect_stat: true, diff --git a/datafusion/src/datasource/file_format/avro.rs b/datafusion/src/datasource/file_format/avro.rs index 08eb34386fb2..fa02d1ae2833 100644 --- a/datafusion/src/datasource/file_format/avro.rs +++ b/datafusion/src/datasource/file_format/avro.rs @@ -34,6 +34,8 @@ use crate::physical_plan::file_format::{AvroExec, FileScanConfig}; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::Statistics; +/// The default file extension of avro files +pub const DEFAULT_AVRO_EXTENSION: &str = ".avro"; /// Avro `FileFormat` implementation. #[derive(Default, Debug)] pub struct AvroFormat; diff --git a/datafusion/src/datasource/file_format/csv.rs b/datafusion/src/datasource/file_format/csv.rs index f0a70d9176db..6aa0d21235a4 100644 --- a/datafusion/src/datasource/file_format/csv.rs +++ b/datafusion/src/datasource/file_format/csv.rs @@ -33,6 +33,8 @@ use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::Statistics; +/// The default file extension of csv files +pub const DEFAULT_CSV_EXTENSION: &str = ".csv"; /// Character Separated Value `FileFormat` implementation. #[derive(Debug)] pub struct CsvFormat { diff --git a/datafusion/src/datasource/file_format/json.rs b/datafusion/src/datasource/file_format/json.rs index d7a278d72a6e..bdd5ef81d559 100644 --- a/datafusion/src/datasource/file_format/json.rs +++ b/datafusion/src/datasource/file_format/json.rs @@ -37,6 +37,8 @@ use crate::physical_plan::file_format::NdJsonExec; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::Statistics; +/// The default file extension of json files +pub const DEFAULT_JSON_EXTENSION: &str = ".json"; /// New line delimited JSON `FileFormat` implementation. #[derive(Debug, Default)] pub struct JsonFormat { diff --git a/datafusion/src/datasource/listing/table.rs b/datafusion/src/datasource/listing/table.rs index 2f8f70f5ede5..1501b8bd7a18 100644 --- a/datafusion/src/datasource/listing/table.rs +++ b/datafusion/src/datasource/listing/table.rs @@ -266,6 +266,8 @@ impl ListingTable { mod tests { use arrow::datatypes::DataType; + use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION; + use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION; use crate::{ datasource::{ file_format::{avro::AvroFormat, parquet::ParquetFormat}, @@ -318,7 +320,7 @@ mod tests { let store = TestObjectStore::new_arc(&[("table/p1=v1/file.avro", 100)]); let opt = ListingOptions { - file_extension: ".avro".to_owned(), + file_extension: DEFAULT_AVRO_EXTENSION.to_owned(), format: Arc::new(AvroFormat {}), table_partition_cols: vec![String::from("p1")], target_partitions: 4, @@ -419,7 +421,7 @@ mod tests { let testdata = crate::test_util::parquet_test_data(); let filename = format!("{}/{}", testdata, name); let opt = ListingOptions { - file_extension: "parquet".to_owned(), + file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(), format: Arc::new(ParquetFormat::default()), table_partition_cols: vec![], target_partitions: 2, diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index ceea83d952e0..0841fea33062 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -24,8 +24,8 @@ use crate::{ datasource::listing::{ListingOptions, ListingTable}, datasource::{ file_format::{ - avro::AvroFormat, - csv::CsvFormat, + avro::{AvroFormat, DEFAULT_AVRO_EXTENSION}, + csv::{CsvFormat, DEFAULT_CSV_EXTENSION}, parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION}, FileFormat, }, @@ -209,17 +209,20 @@ impl ExecutionContext { ref file_type, ref has_header, }) => { - let file_format = match file_type { - FileType::CSV => { - Ok(Arc::new(CsvFormat::default().with_has_header(*has_header)) - as Arc) - } - FileType::Parquet => { - Ok(Arc::new(ParquetFormat::default()) as Arc) - } - FileType::Avro => { - Ok(Arc::new(AvroFormat::default()) as Arc) - } + let (file_format, file_extension) = match file_type { + FileType::CSV => Ok(( + Arc::new(CsvFormat::default().with_has_header(*has_header)) + as Arc, + DEFAULT_CSV_EXTENSION, + )), + FileType::Parquet => Ok(( + Arc::new(ParquetFormat::default()) as Arc, + DEFAULT_PARQUET_EXTENSION, + )), + FileType::Avro => Ok(( + Arc::new(AvroFormat::default()) as Arc, + DEFAULT_AVRO_EXTENSION, + )), _ => Err(DataFusionError::NotImplemented(format!( "Unsupported file type {:?}.", file_type @@ -229,7 +232,7 @@ impl ExecutionContext { let options = ListingOptions { format: file_format, collect_stat: false, - file_extension: String::new(), + file_extension: file_extension.to_owned(), target_partitions: self .state .lock() diff --git a/datafusion/src/execution/options.rs b/datafusion/src/execution/options.rs index 219e2fd89700..79b07536acb3 100644 --- a/datafusion/src/execution/options.rs +++ b/datafusion/src/execution/options.rs @@ -21,6 +21,7 @@ use std::sync::Arc; use arrow::datatypes::{Schema, SchemaRef}; +use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION; use crate::datasource::{ file_format::{avro::AvroFormat, csv::CsvFormat}, listing::ListingOptions, @@ -173,7 +174,7 @@ impl<'a> Default for NdJsonReadOptions<'a> { Self { schema: None, schema_infer_max_records: 1000, - file_extension: ".json", + file_extension: DEFAULT_JSON_EXTENSION, } } } diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index d240fe27c58a..905bb1e28f9a 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -221,7 +221,7 @@ impl ExecutionPlan for ParquetExec { object_store.as_ref(), file_schema_ref, partition_index, - partition, + &partition, metrics, &projection, &pruning_predicate, @@ -230,7 +230,10 @@ impl ExecutionPlan for ParquetExec { limit, partition_col_proj, ) { - println!("Parquet reader thread terminated due to error: {:?}", e); + println!( + "Parquet reader thread terminated due to error: {:?} for files: {:?}", + e, partition + ); } }); @@ -445,7 +448,7 @@ fn read_partition( object_store: &dyn ObjectStore, file_schema: SchemaRef, partition_index: usize, - partition: Vec, + partition: &[PartitionedFile], metrics: ExecutionPlanMetricsSet, projection: &[usize], pruning_predicate: &Option,