Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support COPY TO Externally Defined File Formats, add FileType trait #11060

Merged
merged 23 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 19 additions & 12 deletions datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::collections::HashMap;
use std::fs::File;
use std::io::prelude::*;
use std::io::BufReader;
use std::str::FromStr;

use crate::cli_context::CliSessionContext;
use crate::helper::split_from_semicolon;
Expand All @@ -35,14 +34,14 @@ use crate::{

use datafusion::common::instant::Instant;
use datafusion::common::plan_datafusion_err;
use datafusion::config::ConfigFileType;
use datafusion::datasource::listing::ListingTableUrl;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_expr::{DdlStatement, LogicalPlan};
use datafusion::physical_plan::{collect, execute_stream, ExecutionPlanProperties};
use datafusion::sql::parser::{DFParser, Statement};
use datafusion::sql::sqlparser::dialect::dialect_from_str;

use datafusion::common::FileType;
use datafusion::sql::sqlparser;
use rustyline::error::ReadlineError;
use rustyline::Editor;
Expand Down Expand Up @@ -291,6 +290,15 @@ impl AdjustedPrintOptions {
}
}

fn config_file_type_from_str(ext: &str) -> Option<ConfigFileType> {
match ext.to_lowercase().as_str() {
"csv" => Some(ConfigFileType::CSV),
"json" => Some(ConfigFileType::JSON),
"parquet" => Some(ConfigFileType::PARQUET),
_ => None,
}
}

async fn create_plan(
ctx: &mut dyn CliSessionContext,
statement: Statement,
Expand All @@ -302,7 +310,7 @@ async fn create_plan(
// will raise Configuration errors.
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
// To support custom formats, treat error as None
let format = FileType::from_str(&cmd.file_type).ok();
let format = config_file_type_from_str(&cmd.file_type);
register_object_store_and_config_extensions(
ctx,
&cmd.location,
Expand All @@ -313,13 +321,13 @@ async fn create_plan(
}

if let LogicalPlan::Copy(copy_to) = &mut plan {
let format: FileType = (&copy_to.format_options).into();
let format = config_file_type_from_str(&copy_to.file_type.get_ext());

register_object_store_and_config_extensions(
ctx,
&copy_to.output_url,
&copy_to.options,
Some(format),
format,
)
.await?;
}
Expand Down Expand Up @@ -357,7 +365,7 @@ pub(crate) async fn register_object_store_and_config_extensions(
ctx: &dyn CliSessionContext,
location: &String,
options: &HashMap<String, String>,
format: Option<FileType>,
format: Option<ConfigFileType>,
) -> Result<()> {
// Parse the location URL to extract the scheme and other components
let table_path = ListingTableUrl::parse(location)?;
Expand All @@ -374,7 +382,7 @@ pub(crate) async fn register_object_store_and_config_extensions(
// Clone and modify the default table options based on the provided options
let mut table_options = ctx.session_state().default_table_options().clone();
if let Some(format) = format {
table_options.set_file_format(format);
table_options.set_config_format(format);
}
table_options.alter_with_string_hash_map(options)?;

Expand All @@ -392,7 +400,6 @@ pub(crate) async fn register_object_store_and_config_extensions(
mod tests {
use super::*;

use datafusion::common::config::FormatOptions;
use datafusion::common::plan_err;

use datafusion::prelude::SessionContext;
Expand All @@ -403,7 +410,7 @@ mod tests {
let plan = ctx.state().create_logical_plan(sql).await?;

if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &plan {
let format = FileType::from_str(&cmd.file_type).ok();
let format = config_file_type_from_str(&cmd.file_type);
register_object_store_and_config_extensions(
&ctx,
&cmd.location,
Expand All @@ -429,12 +436,12 @@ mod tests {
let plan = ctx.state().create_logical_plan(sql).await?;

if let LogicalPlan::Copy(cmd) = &plan {
let format: FileType = (&cmd.format_options).into();
let format = config_file_type_from_str(&cmd.file_type.get_ext());
register_object_store_and_config_extensions(
&ctx,
&cmd.output_url,
&cmd.options,
Some(format),
format,
)
.await?;
} else {
Expand Down Expand Up @@ -484,7 +491,7 @@ mod tests {
let mut plan = create_plan(&mut ctx, statement).await?;
if let LogicalPlan::Copy(copy_to) = &mut plan {
assert_eq!(copy_to.output_url, location);
assert!(matches!(copy_to.format_options, FormatOptions::PARQUET(_)));
assert_eq!(copy_to.file_type.get_ext(), "parquet".to_string());
ctx.runtime_env()
.object_store_registry
.get_store(&Url::parse(&copy_to.output_url).unwrap())?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ use std::sync::Arc;

use datafusion::dataframe::DataFrameWriteOptions;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::listing::ListingOptions;
use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion_common::{FileType, GetExt};

use object_store::aws::AmazonS3Builder;
use url::Url;
Expand Down Expand Up @@ -54,7 +54,7 @@ async fn main() -> Result<()> {
let path = format!("s3://{bucket_name}/test_data/");
let file_format = ParquetFormat::default().with_enable_pruning(true);
let listing_options = ListingOptions::new(Arc::new(file_format))
.with_file_extension(FileType::PARQUET.get_ext());
.with_file_extension(ParquetFormat::default().get_ext());
ctx.register_listing_table("test", &path, listing_options, None, None)
.await?;

Expand All @@ -79,7 +79,7 @@ async fn main() -> Result<()> {

let file_format = ParquetFormat::default().with_enable_pruning(true);
let listing_options = ListingOptions::new(Arc::new(file_format))
.with_file_extension(FileType::PARQUET.get_ext());
.with_file_extension(ParquetFormat::default().get_ext());
ctx.register_listing_table("test2", &out_path, listing_options, None, None)
.await?;

Expand Down
77 changes: 36 additions & 41 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::str::FromStr;

use crate::error::_config_err;
use crate::parsers::CompressionTypeVariant;
use crate::{DataFusionError, FileType, Result};
use crate::{DataFusionError, Result};

/// A macro that wraps a configuration struct and automatically derives
/// [`Default`] and [`ConfigField`] for it, allowing it to be used
Expand Down Expand Up @@ -1116,6 +1116,16 @@ macro_rules! extensions_options {
}
}

/// These file types have special built in behavior for configuration.
/// Use TableOptions::Extensions for configuring other file types.
#[derive(Debug, Clone)]
pub enum ConfigFileType {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the last vestige of the old FileType enum. It can be completely ignored if using a custom format.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 perhaps we should add some Trait to unify the handling of options for built in formats and custom formats 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could potentially remove the TableOptions code and instead have each FileFormatFactory handle configuration. This is actually mostly the case in this PR already, but TableOptions is a common helper.

CSV,
#[cfg(feature = "parquet")]
PARQUET,
JSON,
}

/// Represents the configuration options available for handling different table formats within a data processing application.
/// This struct encompasses options for various file formats including CSV, Parquet, and JSON, allowing for flexible configuration
/// of parsing and writing behaviors specific to each format. Additionally, it supports extending functionality through custom extensions.
Expand All @@ -1134,7 +1144,7 @@ pub struct TableOptions {

/// The current file format that the table operations should assume. This option allows
/// for dynamic switching between the supported file types (e.g., CSV, Parquet, JSON).
pub current_format: Option<FileType>,
pub current_format: Option<ConfigFileType>,

/// Optional extensions that can be used to extend or customize the behavior of the table
/// options. Extensions can be registered using `Extensions::insert` and might include
Expand All @@ -1152,10 +1162,9 @@ impl ConfigField for TableOptions {
if let Some(file_type) = &self.current_format {
match file_type {
#[cfg(feature = "parquet")]
FileType::PARQUET => self.parquet.visit(v, "format", ""),
FileType::CSV => self.csv.visit(v, "format", ""),
FileType::JSON => self.json.visit(v, "format", ""),
_ => {}
ConfigFileType::PARQUET => self.parquet.visit(v, "format", ""),
ConfigFileType::CSV => self.csv.visit(v, "format", ""),
ConfigFileType::JSON => self.json.visit(v, "format", ""),
}
} else {
self.csv.visit(v, "csv", "");
Expand Down Expand Up @@ -1188,12 +1197,9 @@ impl ConfigField for TableOptions {
match key {
"format" => match format {
#[cfg(feature = "parquet")]
FileType::PARQUET => self.parquet.set(rem, value),
FileType::CSV => self.csv.set(rem, value),
FileType::JSON => self.json.set(rem, value),
_ => {
_config_err!("Config value \"{key}\" is not supported on {}", format)
}
ConfigFileType::PARQUET => self.parquet.set(rem, value),
ConfigFileType::CSV => self.csv.set(rem, value),
ConfigFileType::JSON => self.json.set(rem, value),
},
_ => _config_err!("Config value \"{key}\" not found on TableOptions"),
}
Expand All @@ -1210,15 +1216,6 @@ impl TableOptions {
Self::default()
}

/// Sets the file format for the table.
///
/// # Parameters
///
/// * `format`: The file format to use (e.g., CSV, Parquet).
pub fn set_file_format(&mut self, format: FileType) {
self.current_format = Some(format);
}

/// Creates a new `TableOptions` instance initialized with settings from a given session config.
///
/// # Parameters
Expand Down Expand Up @@ -1249,6 +1246,15 @@ impl TableOptions {
clone
}

/// Sets the file format for the table.
///
/// # Parameters
///
/// * `format`: The file format to use (e.g., CSV, Parquet).
pub fn set_config_format(&mut self, format: ConfigFileType) {
self.current_format = Some(format);
}

/// Sets the extensions for this `TableOptions` instance.
///
/// # Parameters
Expand Down Expand Up @@ -1673,6 +1679,8 @@ config_namespace! {
}
}

pub trait FormatOptionsExt: Display {}

#[derive(Debug, Clone, PartialEq)]
#[allow(clippy::large_enum_variant)]
pub enum FormatOptions {
Expand All @@ -1698,28 +1706,15 @@ impl Display for FormatOptions {
}
}

impl From<FileType> for FormatOptions {
fn from(value: FileType) -> Self {
match value {
FileType::ARROW => FormatOptions::ARROW,
FileType::AVRO => FormatOptions::AVRO,
#[cfg(feature = "parquet")]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is certainly nice to avoid many of these #[cfg(feature = "parquet")] that were sprinkled throughout the code

FileType::PARQUET => FormatOptions::PARQUET(TableParquetOptions::default()),
FileType::CSV => FormatOptions::CSV(CsvOptions::default()),
FileType::JSON => FormatOptions::JSON(JsonOptions::default()),
}
}
}

#[cfg(test)]
mod tests {
use std::any::Any;
use std::collections::HashMap;

use crate::config::{
ConfigEntry, ConfigExtension, ExtensionOptions, Extensions, TableOptions,
ConfigEntry, ConfigExtension, ConfigFileType, ExtensionOptions, Extensions,
TableOptions,
};
use crate::FileType;

#[derive(Default, Debug, Clone)]
pub struct TestExtensionConfig {
Expand Down Expand Up @@ -1777,7 +1772,7 @@ mod tests {
let mut extension = Extensions::new();
extension.insert(TestExtensionConfig::default());
let mut table_config = TableOptions::new().with_extensions(extension);
table_config.set_file_format(FileType::CSV);
table_config.set_config_format(ConfigFileType::CSV);
table_config.set("format.delimiter", ";").unwrap();
assert_eq!(table_config.csv.delimiter, b';');
table_config.set("test.bootstrap.servers", "asd").unwrap();
Expand All @@ -1794,7 +1789,7 @@ mod tests {
#[test]
fn csv_u8_table_options() {
let mut table_config = TableOptions::new();
table_config.set_file_format(FileType::CSV);
table_config.set_config_format(ConfigFileType::CSV);
table_config.set("format.delimiter", ";").unwrap();
assert_eq!(table_config.csv.delimiter as char, ';');
table_config.set("format.escape", "\"").unwrap();
Expand All @@ -1807,7 +1802,7 @@ mod tests {
#[test]
fn parquet_table_options() {
let mut table_config = TableOptions::new();
table_config.set_file_format(FileType::PARQUET);
table_config.set_config_format(ConfigFileType::PARQUET);
table_config
.set("format.bloom_filter_enabled::col1", "true")
.unwrap();
Expand All @@ -1821,7 +1816,7 @@ mod tests {
#[test]
fn parquet_table_options_config_entry() {
let mut table_config = TableOptions::new();
table_config.set_file_format(FileType::PARQUET);
table_config.set_config_format(ConfigFileType::PARQUET);
table_config
.set("format.bloom_filter_enabled::col1", "true")
.unwrap();
Expand All @@ -1835,7 +1830,7 @@ mod tests {
#[test]
fn parquet_table_options_config_metadata_entry() {
let mut table_config = TableOptions::new();
table_config.set_file_format(FileType::PARQUET);
table_config.set_config_format(ConfigFileType::PARQUET);
table_config.set("format.metadata::key1", "").unwrap();
table_config.set("format.metadata::key2", "value2").unwrap();
table_config
Expand Down
Loading