Skip to content

Commit

Permalink
Fix can not load parquet table form spark in datafusion-cli. (#1665)
Browse files Browse the repository at this point in the history
* fix can not load parquet table form spark

* add Invalid file in log.

* fix fmt
  • Loading branch information
Ted-Jiang authored Jan 31, 2022
1 parent 1caf52a commit f849968
Show file tree
Hide file tree
Showing 9 changed files with 43 additions and 24 deletions.
6 changes: 4 additions & 2 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ use datafusion::{
},
};

use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
use structopt::StructOpt;

#[cfg(feature = "snmalloc")]
Expand Down Expand Up @@ -652,13 +654,13 @@ fn get_table(
.with_delimiter(b',')
.with_has_header(true);

(Arc::new(format), path, ".csv")
(Arc::new(format), path, DEFAULT_CSV_EXTENSION)
}
"parquet" => {
let path = format!("{}/{}", path, table);
let format = ParquetFormat::default().with_enable_pruning(true);

(Arc::new(format), path, ".parquet")
(Arc::new(format), path, DEFAULT_PARQUET_EXTENSION)
}
other => {
unimplemented!("Invalid file format '{}'", other);
Expand Down
6 changes: 4 additions & 2 deletions datafusion-examples/examples/parquet_sql_multiple_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::parquet::{
ParquetFormat, DEFAULT_PARQUET_EXTENSION,
};
use datafusion::datasource::listing::ListingOptions;
use datafusion::error::Result;
use datafusion::prelude::*;
Expand All @@ -33,7 +35,7 @@ async fn main() -> Result<()> {
// Configure listing options
let file_format = ParquetFormat::default().with_enable_pruning(true);
let listing_options = ListingOptions {
file_extension: ".parquet".to_owned(),
file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(),
format: Arc::new(file_format),
table_partition_cols: vec![],
collect_stat: true,
Expand Down
2 changes: 2 additions & 0 deletions datafusion/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ use crate::physical_plan::file_format::{AvroExec, FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;

/// The default file extension of avro files
pub const DEFAULT_AVRO_EXTENSION: &str = ".avro";
/// Avro `FileFormat` implementation.
#[derive(Default, Debug)]
pub struct AvroFormat;
Expand Down
2 changes: 2 additions & 0 deletions datafusion/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;

/// The default file extension of csv files
pub const DEFAULT_CSV_EXTENSION: &str = ".csv";
/// Character Separated Value `FileFormat` implementation.
#[derive(Debug)]
pub struct CsvFormat {
Expand Down
2 changes: 2 additions & 0 deletions datafusion/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ use crate::physical_plan::file_format::NdJsonExec;
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;

/// The default file extension of json files
pub const DEFAULT_JSON_EXTENSION: &str = ".json";
/// New line delimited JSON `FileFormat` implementation.
#[derive(Debug, Default)]
pub struct JsonFormat {
Expand Down
6 changes: 4 additions & 2 deletions datafusion/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,8 @@ impl ListingTable {
mod tests {
use arrow::datatypes::DataType;

use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION;
use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
use crate::{
datasource::{
file_format::{avro::AvroFormat, parquet::ParquetFormat},
Expand Down Expand Up @@ -318,7 +320,7 @@ mod tests {
let store = TestObjectStore::new_arc(&[("table/p1=v1/file.avro", 100)]);

let opt = ListingOptions {
file_extension: ".avro".to_owned(),
file_extension: DEFAULT_AVRO_EXTENSION.to_owned(),
format: Arc::new(AvroFormat {}),
table_partition_cols: vec![String::from("p1")],
target_partitions: 4,
Expand Down Expand Up @@ -419,7 +421,7 @@ mod tests {
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/{}", testdata, name);
let opt = ListingOptions {
file_extension: "parquet".to_owned(),
file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(),
format: Arc::new(ParquetFormat::default()),
table_partition_cols: vec![],
target_partitions: 2,
Expand Down
31 changes: 17 additions & 14 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use crate::{
datasource::listing::{ListingOptions, ListingTable},
datasource::{
file_format::{
avro::AvroFormat,
csv::CsvFormat,
avro::{AvroFormat, DEFAULT_AVRO_EXTENSION},
csv::{CsvFormat, DEFAULT_CSV_EXTENSION},
parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION},
FileFormat,
},
Expand Down Expand Up @@ -218,17 +218,20 @@ impl ExecutionContext {
ref file_type,
ref has_header,
}) => {
let file_format = match file_type {
FileType::CSV => {
Ok(Arc::new(CsvFormat::default().with_has_header(*has_header))
as Arc<dyn FileFormat>)
}
FileType::Parquet => {
Ok(Arc::new(ParquetFormat::default()) as Arc<dyn FileFormat>)
}
FileType::Avro => {
Ok(Arc::new(AvroFormat::default()) as Arc<dyn FileFormat>)
}
let (file_format, file_extension) = match file_type {
FileType::CSV => Ok((
Arc::new(CsvFormat::default().with_has_header(*has_header))
as Arc<dyn FileFormat>,
DEFAULT_CSV_EXTENSION,
)),
FileType::Parquet => Ok((
Arc::new(ParquetFormat::default()) as Arc<dyn FileFormat>,
DEFAULT_PARQUET_EXTENSION,
)),
FileType::Avro => Ok((
Arc::new(AvroFormat::default()) as Arc<dyn FileFormat>,
DEFAULT_AVRO_EXTENSION,
)),
_ => Err(DataFusionError::NotImplemented(format!(
"Unsupported file type {:?}.",
file_type
Expand All @@ -238,7 +241,7 @@ impl ExecutionContext {
let options = ListingOptions {
format: file_format,
collect_stat: false,
file_extension: String::new(),
file_extension: file_extension.to_owned(),
target_partitions: self
.state
.lock()
Expand Down
3 changes: 2 additions & 1 deletion datafusion/src/execution/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::sync::Arc;

use arrow::datatypes::{Schema, SchemaRef};

use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION;
use crate::datasource::{
file_format::{avro::AvroFormat, csv::CsvFormat},
listing::ListingOptions,
Expand Down Expand Up @@ -173,7 +174,7 @@ impl<'a> Default for NdJsonReadOptions<'a> {
Self {
schema: None,
schema_infer_max_records: 1000,
file_extension: ".json",
file_extension: DEFAULT_JSON_EXTENSION,
}
}
}
9 changes: 6 additions & 3 deletions datafusion/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ impl ExecutionPlan for ParquetExec {
object_store.as_ref(),
file_schema_ref,
partition_index,
partition,
&partition,
metrics,
&projection,
&pruning_predicate,
Expand All @@ -230,7 +230,10 @@ impl ExecutionPlan for ParquetExec {
limit,
partition_col_proj,
) {
println!("Parquet reader thread terminated due to error: {:?}", e);
println!(
"Parquet reader thread terminated due to error: {:?} for files: {:?}",
e, partition
);
}
});

Expand Down Expand Up @@ -445,7 +448,7 @@ fn read_partition(
object_store: &dyn ObjectStore,
file_schema: SchemaRef,
partition_index: usize,
partition: Vec<PartitionedFile>,
partition: &[PartitionedFile],
metrics: ExecutionPlanMetricsSet,
projection: &[usize],
pruning_predicate: &Option<PruningPredicate>,
Expand Down

0 comments on commit f849968

Please sign in to comment.