Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix can not load parquet table form spark in datafusion-cli. #1665

Merged
merged 3 commits into from
Jan 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ use datafusion::{
},
};

use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
use structopt::StructOpt;

#[cfg(feature = "snmalloc")]
Expand Down Expand Up @@ -652,13 +654,13 @@ fn get_table(
.with_delimiter(b',')
.with_has_header(true);

(Arc::new(format), path, ".csv")
(Arc::new(format), path, DEFAULT_CSV_EXTENSION)
}
"parquet" => {
let path = format!("{}/{}", path, table);
let format = ParquetFormat::default().with_enable_pruning(true);

(Arc::new(format), path, ".parquet")
(Arc::new(format), path, DEFAULT_PARQUET_EXTENSION)
}
other => {
unimplemented!("Invalid file format '{}'", other);
Expand Down
6 changes: 4 additions & 2 deletions datafusion-examples/examples/parquet_sql_multiple_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::parquet::{
ParquetFormat, DEFAULT_PARQUET_EXTENSION,
};
use datafusion::datasource::listing::ListingOptions;
use datafusion::error::Result;
use datafusion::prelude::*;
Expand All @@ -33,7 +35,7 @@ async fn main() -> Result<()> {
// Configure listing options
let file_format = ParquetFormat::default().with_enable_pruning(true);
let listing_options = ListingOptions {
file_extension: ".parquet".to_owned(),
file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(),
format: Arc::new(file_format),
table_partition_cols: vec![],
collect_stat: true,
Expand Down
2 changes: 2 additions & 0 deletions datafusion/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ use crate::physical_plan::file_format::{AvroExec, FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;

/// The default file extension of avro files
pub const DEFAULT_AVRO_EXTENSION: &str = ".avro";
/// Avro `FileFormat` implementation.
#[derive(Default, Debug)]
pub struct AvroFormat;
Expand Down
2 changes: 2 additions & 0 deletions datafusion/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use crate::physical_plan::file_format::{CsvExec, FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;

/// The default file extension of csv files
pub const DEFAULT_CSV_EXTENSION: &str = ".csv";
/// Character Separated Value `FileFormat` implementation.
#[derive(Debug)]
pub struct CsvFormat {
Expand Down
2 changes: 2 additions & 0 deletions datafusion/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ use crate::physical_plan::file_format::NdJsonExec;
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;

/// The default file extension of json files
pub const DEFAULT_JSON_EXTENSION: &str = ".json";
/// New line delimited JSON `FileFormat` implementation.
#[derive(Debug, Default)]
pub struct JsonFormat {
Expand Down
6 changes: 4 additions & 2 deletions datafusion/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,8 @@ impl ListingTable {
mod tests {
use arrow::datatypes::DataType;

use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION;
use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
use crate::{
datasource::{
file_format::{avro::AvroFormat, parquet::ParquetFormat},
Expand Down Expand Up @@ -318,7 +320,7 @@ mod tests {
let store = TestObjectStore::new_arc(&[("table/p1=v1/file.avro", 100)]);

let opt = ListingOptions {
file_extension: ".avro".to_owned(),
file_extension: DEFAULT_AVRO_EXTENSION.to_owned(),
format: Arc::new(AvroFormat {}),
table_partition_cols: vec![String::from("p1")],
target_partitions: 4,
Expand Down Expand Up @@ -419,7 +421,7 @@ mod tests {
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/{}", testdata, name);
let opt = ListingOptions {
file_extension: "parquet".to_owned(),
file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(),
format: Arc::new(ParquetFormat::default()),
table_partition_cols: vec![],
target_partitions: 2,
Expand Down
31 changes: 17 additions & 14 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use crate::{
datasource::listing::{ListingOptions, ListingTable},
datasource::{
file_format::{
avro::AvroFormat,
csv::CsvFormat,
avro::{AvroFormat, DEFAULT_AVRO_EXTENSION},
csv::{CsvFormat, DEFAULT_CSV_EXTENSION},
parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION},
FileFormat,
},
Expand Down Expand Up @@ -209,17 +209,20 @@ impl ExecutionContext {
ref file_type,
ref has_header,
}) => {
let file_format = match file_type {
FileType::CSV => {
Ok(Arc::new(CsvFormat::default().with_has_header(*has_header))
as Arc<dyn FileFormat>)
}
FileType::Parquet => {
Ok(Arc::new(ParquetFormat::default()) as Arc<dyn FileFormat>)
}
FileType::Avro => {
Ok(Arc::new(AvroFormat::default()) as Arc<dyn FileFormat>)
}
let (file_format, file_extension) = match file_type {
FileType::CSV => Ok((
Arc::new(CsvFormat::default().with_has_header(*has_header))
as Arc<dyn FileFormat>,
DEFAULT_CSV_EXTENSION,
)),
FileType::Parquet => Ok((
Arc::new(ParquetFormat::default()) as Arc<dyn FileFormat>,
DEFAULT_PARQUET_EXTENSION,
)),
FileType::Avro => Ok((
Arc::new(AvroFormat::default()) as Arc<dyn FileFormat>,
DEFAULT_AVRO_EXTENSION,
)),
_ => Err(DataFusionError::NotImplemented(format!(
"Unsupported file type {:?}.",
file_type
Expand All @@ -229,7 +232,7 @@ impl ExecutionContext {
let options = ListingOptions {
format: file_format,
collect_stat: false,
file_extension: String::new(),
file_extension: file_extension.to_owned(),
target_partitions: self
.state
.lock()
Expand Down
3 changes: 2 additions & 1 deletion datafusion/src/execution/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::sync::Arc;

use arrow::datatypes::{Schema, SchemaRef};

use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION;
use crate::datasource::{
file_format::{avro::AvroFormat, csv::CsvFormat},
listing::ListingOptions,
Expand Down Expand Up @@ -173,7 +174,7 @@ impl<'a> Default for NdJsonReadOptions<'a> {
Self {
schema: None,
schema_infer_max_records: 1000,
file_extension: ".json",
file_extension: DEFAULT_JSON_EXTENSION,
}
}
}
9 changes: 6 additions & 3 deletions datafusion/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ impl ExecutionPlan for ParquetExec {
object_store.as_ref(),
file_schema_ref,
partition_index,
partition,
&partition,
metrics,
&projection,
&pruning_predicate,
Expand All @@ -230,7 +230,10 @@ impl ExecutionPlan for ParquetExec {
limit,
partition_col_proj,
) {
println!("Parquet reader thread terminated due to error: {:?}", e);
println!(
"Parquet reader thread terminated due to error: {:?} for files: {:?}",
e, partition
);
}
});

Expand Down Expand Up @@ -445,7 +448,7 @@ fn read_partition(
object_store: &dyn ObjectStore,
file_schema: SchemaRef,
partition_index: usize,
partition: Vec<PartitionedFile>,
partition: &[PartitionedFile],
metrics: ExecutionPlanMetricsSet,
projection: &[usize],
pruning_predicate: &Option<PruningPredicate>,
Expand Down