diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index fe7fb955033f..41c2657e1cca 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -255,7 +255,7 @@ config_namespace! { } config_namespace! { - /// Options related to reading of parquet files + /// Options related to parquet files pub struct ParquetOptions { /// If true, reads the Parquet data page level metadata (the /// Page Index), if present, to reduce the I/O and number of @@ -286,6 +286,66 @@ config_namespace! { /// will be reordered heuristically to minimize the cost of evaluation. If false, /// the filters are applied in the same order as written in the query pub reorder_filters: bool, default = false + + // The following map to parquet::file::properties::WriterProperties + + /// Sets best effort maximum size of data page in bytes + pub data_pagesize_limit: usize, default = 1024 * 1024 + + /// Sets write_batch_size in bytes + pub write_batch_size: usize, default = 1024 + + /// Sets parquet writer version + /// valid values are "1.0" and "2.0" + pub writer_version: String, default = "1.0".into() + + /// Sets default parquet compression codec + /// Valid values are: uncompressed, snappy, gzip(level), + /// lzo, brotli(level), lz4, zstd(level), and lz4_raw. + /// These values are not case sensitive. + pub compression: String, default = "snappy".into() + + /// Sets if dictionary encoding is enabled + pub dictionary_enabled: bool, default = true + + /// Sets best effort maximum dictionary page size, in bytes + pub dictionary_page_size_limit: usize, default = 1024 * 1024 + + /// Sets if statistics are enabled for any column + /// Valid values are: "none", "chunk", and "page" + /// These values are not case sensitive. + pub statistics_enabled: String, default = "page".into() + + /// Sets max statistics size for any column + pub max_statistics_size: usize, default = 4096 + + /// Sets maximum number of rows in a row group + pub max_row_group_size: usize, default = 1024 * 1024 + + /// Sets "created by" property + pub created_by: String, default = concat!("datafusion version ", env!("CARGO_PKG_VERSION")).into() + + /// Sets column index trucate length + pub column_index_truncate_length: Option, default = None + + /// Sets best effort maximum number of rows in data page + pub data_page_row_count_limit: usize, default = usize::MAX + + /// Sets default encoding for any column + /// Valid values are: plain, plain_dictionary, rle, + /// bit_packed, delta_binary_packed, delta_length_byte_array, + /// delta_byte_array, rle_dictionary, and byte_stream_split. + /// These values are not case sensitive. + pub encoding: String, default = "plain".into() + + /// Sets if bloom filter is enabled for any column + pub bloom_filter_enabled: bool, default = false + + /// Sets bloom filter false positive probability + pub bloom_filter_fpp: f64, default = 0.05 + + /// Sets bloom filter number of distinct values + pub bloom_filter_ndv: u64, default = 1_000_000_u64 } } @@ -745,6 +805,8 @@ macro_rules! config_field { config_field!(String); config_field!(bool); config_field!(usize); +config_field!(f64); +config_field!(u64); /// An implementation trait used to recursively walk configuration trait Visit { diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 8f56bf139e20..32fbf03b580f 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -37,10 +37,11 @@ use futures::stream::BoxStream; use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore}; -use super::{create_writer, stateless_serialize_and_write_files, FileFormat}; +use super::{FileFormat, DEFAULT_SCHEMA_INFER_MAX_RECORD}; use crate::datasource::file_format::file_type::FileCompressionType; -use crate::datasource::file_format::FileWriterMode; -use crate::datasource::file_format::{BatchSerializer, DEFAULT_SCHEMA_INFER_MAX_RECORD}; +use crate::datasource::file_format::write::{ + create_writer, stateless_serialize_and_write_files, BatchSerializer, FileWriterMode, +}; use crate::datasource::physical_plan::{ CsvExec, FileGroupDisplay, FileScanConfig, FileSinkConfig, }; diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 6856ad89eada..8472f4e5c164 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -47,13 +47,12 @@ use crate::physical_plan::insert::InsertExec; use crate::physical_plan::SendableRecordBatchStream; use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics}; -use super::create_writer; -use super::stateless_serialize_and_write_files; -use super::BatchSerializer; use super::FileFormat; use super::FileScanConfig; -use super::FileWriterMode; use crate::datasource::file_format::file_type::FileCompressionType; +use crate::datasource::file_format::write::{ + create_writer, stateless_serialize_and_write_files, BatchSerializer, FileWriterMode, +}; use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD; use crate::datasource::physical_plan::FileSinkConfig; use crate::datasource::physical_plan::NdJsonExec; diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 42b16656fbc6..9eec11f224ea 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -16,6 +16,7 @@ // under the License. //! Module containing helper methods for the various file formats +//! See write.rs for write related helper methods /// Default max records to scan to infer the schema pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000; @@ -27,36 +28,24 @@ pub mod file_type; pub mod json; pub mod options; pub mod parquet; +pub mod write; use std::any::Any; -use std::io::Error; -use std::pin::Pin; +use std::fmt; use std::sync::Arc; -use std::task::{Context, Poll}; -use std::{fmt, mem}; use crate::arrow::datatypes::SchemaRef; use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig}; use crate::error::Result; use crate::execution::context::SessionState; -use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream, Statistics}; +use crate::physical_plan::{ExecutionPlan, Statistics}; -use arrow_array::RecordBatch; use datafusion_common::DataFusionError; use datafusion_physical_expr::PhysicalExpr; use async_trait::async_trait; -use bytes::Bytes; -use futures::future::BoxFuture; -use futures::FutureExt; -use futures::{ready, StreamExt}; -use object_store::path::Path; -use object_store::{MultipartId, ObjectMeta, ObjectStore}; -use tokio::io::{AsyncWrite, AsyncWriteExt}; +use object_store::{ObjectMeta, ObjectStore}; -use self::file_type::FileCompressionType; - -use super::physical_plan::FileMeta; /// This trait abstracts all the file format specific implementations /// from the [`TableProvider`]. This helps code re-utilization across /// providers that support the the same file formats. @@ -116,329 +105,6 @@ pub trait FileFormat: Send + Sync + fmt::Debug { } } -/// `AsyncPutWriter` is an object that facilitates asynchronous writing to object stores. -/// It is specifically designed for the `object_store` crate's `put` method and sends -/// whole bytes at once when the buffer is flushed. -pub struct AsyncPutWriter { - /// Object metadata - object_meta: ObjectMeta, - /// A shared reference to the object store - store: Arc, - /// A buffer that stores the bytes to be sent - current_buffer: Vec, - /// Used for async handling in flush method - inner_state: AsyncPutState, -} - -impl AsyncPutWriter { - /// Constructor for the `AsyncPutWriter` object - pub fn new(object_meta: ObjectMeta, store: Arc) -> Self { - Self { - object_meta, - store, - current_buffer: vec![], - // The writer starts out in buffering mode - inner_state: AsyncPutState::Buffer, - } - } - - /// Separate implementation function that unpins the [`AsyncPutWriter`] so - /// that partial borrows work correctly - fn poll_shutdown_inner( - &mut self, - cx: &mut Context<'_>, - ) -> Poll> { - loop { - match &mut self.inner_state { - AsyncPutState::Buffer => { - // Convert the current buffer to bytes and take ownership of it - let bytes = Bytes::from(mem::take(&mut self.current_buffer)); - // Set the inner state to Put variant with the bytes - self.inner_state = AsyncPutState::Put { bytes } - } - AsyncPutState::Put { bytes } => { - // Send the bytes to the object store's put method - return Poll::Ready( - ready!(self - .store - .put(&self.object_meta.location, bytes.clone()) - .poll_unpin(cx)) - .map_err(Error::from), - ); - } - } - } - } -} - -/// An enum that represents the inner state of AsyncPut -enum AsyncPutState { - /// Building Bytes struct in this state - Buffer, - /// Data in the buffer is being sent to the object store - Put { bytes: Bytes }, -} - -impl AsyncWrite for AsyncPutWriter { - // Define the implementation of the AsyncWrite trait for the `AsyncPutWriter` struct - fn poll_write( - mut self: Pin<&mut Self>, - _: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - // Extend the current buffer with the incoming buffer - self.current_buffer.extend_from_slice(buf); - // Return a ready poll with the length of the incoming buffer - Poll::Ready(Ok(buf.len())) - } - - fn poll_flush( - self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll> { - // Return a ready poll with an empty result - Poll::Ready(Ok(())) - } - - fn poll_shutdown( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - // Call the poll_shutdown_inner method to handle the actual sending of data to the object store - self.poll_shutdown_inner(cx) - } -} - -/// Stores data needed during abortion of MultiPart writers -pub(crate) struct MultiPart { - /// A shared reference to the object store - store: Arc, - multipart_id: MultipartId, - location: Path, -} - -impl MultiPart { - /// Create a new `MultiPart` - pub fn new( - store: Arc, - multipart_id: MultipartId, - location: Path, - ) -> Self { - Self { - store, - multipart_id, - location, - } - } -} - -pub(crate) enum AbortMode { - Put, - Append, - MultiPart(MultiPart), -} - -/// A wrapper struct with abort method and writer -pub(crate) struct AbortableWrite { - writer: W, - mode: AbortMode, -} - -impl AbortableWrite { - /// Create a new `AbortableWrite` instance with the given writer, and write mode. - fn new(writer: W, mode: AbortMode) -> Self { - Self { writer, mode } - } - - /// handling of abort for different write modes - fn abort_writer(&self) -> Result>> { - match &self.mode { - AbortMode::Put => Ok(async { Ok(()) }.boxed()), - AbortMode::Append => Err(DataFusionError::Execution( - "Cannot abort in append mode".to_string(), - )), - AbortMode::MultiPart(MultiPart { - store, - multipart_id, - location, - }) => { - let location = location.clone(); - let multipart_id = multipart_id.clone(); - let store = store.clone(); - Ok(Box::pin(async move { - store - .abort_multipart(&location, &multipart_id) - .await - .map_err(DataFusionError::ObjectStore) - })) - } - } - } -} - -impl AsyncWrite for AbortableWrite { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - Pin::new(&mut self.get_mut().writer).poll_write(cx, buf) - } - - fn poll_flush( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - Pin::new(&mut self.get_mut().writer).poll_flush(cx) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - Pin::new(&mut self.get_mut().writer).poll_shutdown(cx) - } -} - -/// An enum that defines different file writer modes. -#[derive(Debug, Clone, Copy)] -pub enum FileWriterMode { - /// Data is appended to an existing file. - Append, - /// Data is written to a new file. - Put, - /// Data is written to a new file in multiple parts. - PutMultipart, -} - -/// Returns an [`AbortableWrite`] which writes to the given object store location -/// with the specified compression -pub(crate) async fn create_writer( - writer_mode: FileWriterMode, - file_compression_type: FileCompressionType, - file_meta: FileMeta, - object_store: Arc, -) -> Result>> { - let object = &file_meta.object_meta; - match writer_mode { - // If the mode is append, call the store's append method and return wrapped in - // a boxed trait object. - FileWriterMode::Append => { - let writer = object_store - .append(&object.location) - .await - .map_err(DataFusionError::ObjectStore)?; - let writer = AbortableWrite::new( - file_compression_type.convert_async_writer(writer)?, - AbortMode::Append, - ); - Ok(writer) - } - // If the mode is put, create a new AsyncPut writer and return it wrapped in - // a boxed trait object - FileWriterMode::Put => { - let writer = Box::new(AsyncPutWriter::new(object.clone(), object_store)); - let writer = AbortableWrite::new( - file_compression_type.convert_async_writer(writer)?, - AbortMode::Put, - ); - Ok(writer) - } - // If the mode is put multipart, call the store's put_multipart method and - // return the writer wrapped in a boxed trait object. - FileWriterMode::PutMultipart => { - let (multipart_id, writer) = object_store - .put_multipart(&object.location) - .await - .map_err(DataFusionError::ObjectStore)?; - Ok(AbortableWrite::new( - file_compression_type.convert_async_writer(writer)?, - AbortMode::MultiPart(MultiPart::new( - object_store, - multipart_id, - object.location.clone(), - )), - )) - } - } -} - -/// A trait that defines the methods required for a RecordBatch serializer. -#[async_trait] -pub trait BatchSerializer: Unpin + Send { - /// Asynchronously serializes a `RecordBatch` and returns the serialized bytes. - async fn serialize(&mut self, batch: RecordBatch) -> Result; -} - -/// Checks if any of the passed writers have encountered an error -/// and if so, all writers are aborted. -async fn check_for_errors( - result: Result, - writers: &mut [AbortableWrite], -) -> Result { - match result { - Ok(value) => Ok(value), - Err(e) => { - // Abort all writers before returning the error: - for writer in writers { - let mut abort_future = writer.abort_writer(); - if let Ok(abort_future) = &mut abort_future { - let _ = abort_future.await; - } - // Ignore errors that occur during abortion, - // We do try to abort all writers before returning error. - } - // After aborting writers return original error. - Err(e) - } - } -} - -/// Contains the common logic for serializing RecordBatches and -/// writing the resulting bytes to an ObjectStore. -/// Serialization is assumed to be stateless, i.e. -/// each RecordBatch can be serialized without any -/// dependency on the RecordBatches before or after. -async fn stateless_serialize_and_write_files( - mut data: Vec, - mut serializers: Vec>, - mut writers: Vec>>, -) -> Result { - let num_partitions = data.len(); - let mut row_count = 0; - // Map errors to DatafusionError. - let err_converter = - |_| DataFusionError::Internal("Unexpected FileSink Error".to_string()); - // TODO parallelize serialization accross partitions and batches within partitions - // see: https://github.com/apache/arrow-datafusion/issues/7079 - for idx in 0..num_partitions { - while let Some(maybe_batch) = data[idx].next().await { - // Write data to files in a round robin fashion: - let serializer = &mut serializers[idx]; - let batch = check_for_errors(maybe_batch, &mut writers).await?; - row_count += batch.num_rows(); - let bytes = - check_for_errors(serializer.serialize(batch).await, &mut writers).await?; - let writer = &mut writers[idx]; - check_for_errors( - writer.write_all(&bytes).await.map_err(err_converter), - &mut writers, - ) - .await?; - } - } - // Perform cleanup: - let n_writers = writers.len(); - for idx in 0..n_writers { - check_for_errors( - writers[idx].shutdown().await.map_err(err_converter), - &mut writers, - ) - .await?; - } - Ok(row_count as u64) -} - #[cfg(test)] pub(crate) mod test_util { use std::ops::Range; diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index 73c20d3b0c3a..d8168070eb30 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -214,6 +214,13 @@ pub struct ParquetReadOptions<'a> { /// /// If None specified, uses value in SessionConfig pub skip_metadata: Option, + /// An optional schema representing the parquet files. If None, parquet reader will try to infer it + /// based on data in file. + pub schema: Option<&'a Schema>, + /// Indicates how the file is sorted + pub file_sort_order: Vec>, + /// Setting controls how inserts to this file should be handled + pub insert_mode: ListingTableInsertMode, } impl<'a> Default for ParquetReadOptions<'a> { @@ -223,6 +230,9 @@ impl<'a> Default for ParquetReadOptions<'a> { table_partition_cols: vec![], parquet_pruning: None, skip_metadata: None, + schema: None, + file_sort_order: vec![], + insert_mode: ListingTableInsertMode::AppendNewFiles, } } } @@ -242,6 +252,12 @@ impl<'a> ParquetReadOptions<'a> { self } + /// Specify schema to use for parquet read + pub fn schema(mut self, schema: &'a Schema) -> Self { + self.schema = Some(schema); + self + } + /// Specify table_partition_cols for partition pruning pub fn table_partition_cols( mut self, @@ -250,6 +266,18 @@ impl<'a> ParquetReadOptions<'a> { self.table_partition_cols = table_partition_cols; self } + + /// Configure if file has known sort order + pub fn file_sort_order(mut self, file_sort_order: Vec>) -> Self { + self.file_sort_order = file_sort_order; + self + } + + /// Configure how insertions to this table should be handled + pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self { + self.insert_mode = insert_mode; + self + } } /// Options that control the reading of ARROW files. @@ -525,6 +553,8 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> { .with_file_extension(self.file_extension) .with_target_partitions(config.target_partitions()) .with_table_partition_cols(self.table_partition_cols.clone()) + .with_file_sort_order(self.file_sort_order.clone()) + .with_insert_mode(self.insert_mode.clone()) } async fn get_resolved_schema( @@ -533,11 +563,8 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> { state: SessionState, table_path: ListingTableUrl, ) -> Result { - // with parquet we resolve the schema in all cases - Ok(self - .to_listing_options(config) - .infer_schema(&state, &table_path) - .await?) + self._get_resolved_schema(config, state, table_path, self.schema, false) + .await } } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 6cda0fe68b57..c35f59dd4a94 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -17,23 +17,31 @@ //! Parquet format abstractions +use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel}; +use rand::distributions::DistString; use std::any::Any; +use std::fmt; +use std::fmt::Debug; use std::sync::Arc; use arrow::datatypes::SchemaRef; use arrow::datatypes::{Fields, Schema}; use async_trait::async_trait; use bytes::{BufMut, BytesMut}; -use datafusion_common::DataFusionError; +use datafusion_common::{plan_err, DataFusionError}; +use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; use futures::{StreamExt, TryStreamExt}; use hashbrown::HashMap; use object_store::{ObjectMeta, ObjectStore}; -use parquet::arrow::parquet_to_arrow_schema; +use parquet::arrow::{parquet_to_arrow_schema, AsyncArrowWriter}; use parquet::file::footer::{decode_footer, decode_metadata}; use parquet::file::metadata::ParquetMetaData; +use parquet::file::properties::{EnabledStatistics, WriterProperties, WriterVersion}; use parquet::file::statistics::Statistics as ParquetStatistics; +use rand::distributions::Alphanumeric; +use super::write::FileWriterMode; use super::FileFormat; use super::FileScanConfig; use crate::arrow::array::{ @@ -42,12 +50,18 @@ use crate::arrow::array::{ use crate::arrow::datatypes::DataType; use crate::config::ConfigOptions; -use crate::datasource::physical_plan::{ParquetExec, SchemaAdapter}; +use crate::datasource::physical_plan::{ + FileGroupDisplay, FileMeta, FileSinkConfig, ParquetExec, SchemaAdapter, +}; use crate::datasource::{create_max_min_accs, get_col_stats}; use crate::error::Result; use crate::execution::context::SessionState; use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; -use crate::physical_plan::{Accumulator, ExecutionPlan, Statistics}; +use crate::physical_plan::insert::{DataSink, InsertExec}; +use crate::physical_plan::{ + Accumulator, DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream, + Statistics, +}; /// The default file extension of parquet files pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet"; @@ -208,6 +222,24 @@ impl FileFormat for ParquetFormat { self.metadata_size_hint(state.config_options()), ))) } + + async fn create_writer_physical_plan( + &self, + input: Arc, + _state: &SessionState, + conf: FileSinkConfig, + ) -> Result> { + if conf.overwrite { + return Err(DataFusionError::NotImplemented( + "Overwrites are not implemented yet for Parquet".into(), + )); + } + + let sink_schema = conf.output_schema().clone(); + let sink = Arc::new(ParquetSink::new(conf)); + + Ok(Arc::new(InsertExec::new(input, sink, sink_schema)) as _) + } } fn summarize_min_max( @@ -543,6 +575,318 @@ async fn fetch_statistics( Ok(statistics) } +/// Implements [`DataSink`] for writing to a parquet file. +struct ParquetSink { + /// Config options for writing data + config: FileSinkConfig, +} + +impl Debug for ParquetSink { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ParquetSink").finish() + } +} + +impl DisplayAs for ParquetSink { + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!( + f, + "ParquetSink(writer_mode={:?}, file_groups=", + self.config.writer_mode + )?; + FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?; + write!(f, ")") + } + } + } +} + +/// Parses datafusion.execution.parquet.encoding String to a parquet::basic::Encoding +fn parse_encoding_string(str_setting: &str) -> Result { + let str_setting_lower: &str = &str_setting.to_lowercase(); + match str_setting_lower { + "plain" => Ok(parquet::basic::Encoding::PLAIN), + "plain_dictionary" => Ok(parquet::basic::Encoding::PLAIN_DICTIONARY), + "rle" => Ok(parquet::basic::Encoding::RLE), + "bit_packed" => Ok(parquet::basic::Encoding::BIT_PACKED), + "delta_binary_packed" => Ok(parquet::basic::Encoding::DELTA_BINARY_PACKED), + "delta_length_byte_array" => { + Ok(parquet::basic::Encoding::DELTA_LENGTH_BYTE_ARRAY) + } + "delta_byte_array" => Ok(parquet::basic::Encoding::DELTA_BYTE_ARRAY), + "rle_dictionary" => Ok(parquet::basic::Encoding::RLE_DICTIONARY), + "byte_stream_split" => Ok(parquet::basic::Encoding::BYTE_STREAM_SPLIT), + _ => Err(DataFusionError::Plan(format!( + "Unknown or unsupported parquet encoding: \ + {str_setting}. Valid values are: plain, plain_dictionary, rle, \ + /// bit_packed, delta_binary_packed, delta_length_byte_array, \ + /// delta_byte_array, rle_dictionary, and byte_stream_split." + ))), + } +} + +/// Splits compression string into compression codec and optional compression_level +/// I.e. gzip(2) -> gzip, 2 +fn split_compression_string(str_setting: &str) -> Result<(&str, Option)> { + let split_setting = str_setting.split_once('('); + + match split_setting { + Some((codec, rh)) => { + let level = &rh[..rh.len() - 1].parse::().map_err(|_| { + DataFusionError::Plan(format!( + "Could not parse compression string. \ + Got codec: {} and unknown level from {}", + codec, str_setting + )) + })?; + Ok((codec, Some(*level))) + } + None => Ok((str_setting, None)), + } +} + +/// Helper to ensure compression codecs which don't support levels +/// don't have one set. E.g. snappy(2) is invalid. +fn check_level_is_none(codec: &str, level: &Option) -> Result<()> { + if level.is_some() { + return Err(DataFusionError::Plan(format!( + "Compression {codec} does not support specifying a level" + ))); + } + Ok(()) +} + +/// Helper to ensure compression codecs which require a level +/// do have one set. E.g. zstd is invalid, zstd(3) is valid +fn require_level(codec: &str, level: Option) -> Result { + level.ok_or(DataFusionError::Plan(format!( + "{codec} compression requires specifying a level such as {codec}(4)" + ))) +} + +/// Parses datafusion.execution.parquet.compression String to a parquet::basic::Compression +fn parse_compression_string(str_setting: &str) -> Result { + let str_setting_lower: &str = &str_setting.to_lowercase(); + let (codec, level) = split_compression_string(str_setting_lower)?; + match codec { + "uncompressed" => { + check_level_is_none(codec, &level)?; + Ok(parquet::basic::Compression::UNCOMPRESSED) + } + "snappy" => { + check_level_is_none(codec, &level)?; + Ok(parquet::basic::Compression::SNAPPY) + } + "gzip" => { + let level = require_level(codec, level)?; + Ok(parquet::basic::Compression::GZIP(GzipLevel::try_new( + level, + )?)) + } + "lzo" => { + check_level_is_none(codec, &level)?; + Ok(parquet::basic::Compression::LZO) + } + "brotli" => { + let level = require_level(codec, level)?; + Ok(parquet::basic::Compression::BROTLI(BrotliLevel::try_new( + level, + )?)) + } + "lz4" => { + check_level_is_none(codec, &level)?; + Ok(parquet::basic::Compression::LZ4) + } + "zstd" => { + let level = require_level(codec, level)?; + Ok(parquet::basic::Compression::ZSTD(ZstdLevel::try_new( + level as i32, + )?)) + } + "lz4_raw" => { + check_level_is_none(codec, &level)?; + Ok(parquet::basic::Compression::LZ4_RAW) + } + _ => Err(DataFusionError::Plan(format!( + "Unknown or unsupported parquet compression: \ + {str_setting}. Valid values are: uncompressed, snappy, gzip(level), \ + lzo, brotli(level), lz4, zstd(level), and lz4_raw." + ))), + } +} + +fn parse_version_string(str_setting: &str) -> Result { + let str_setting_lower: &str = &str_setting.to_lowercase(); + match str_setting_lower { + "1.0" => Ok(WriterVersion::PARQUET_1_0), + "2.0" => Ok(WriterVersion::PARQUET_2_0), + _ => Err(DataFusionError::Plan(format!( + "Unknown or unsupported parquet writer version {str_setting} \ + valid options are '1.0' and '2.0'" + ))), + } +} + +fn parse_statistics_string(str_setting: &str) -> Result { + let str_setting_lower: &str = &str_setting.to_lowercase(); + match str_setting_lower { + "none" => Ok(EnabledStatistics::None), + "chunk" => Ok(EnabledStatistics::Chunk), + "page" => Ok(EnabledStatistics::Page), + _ => Err(DataFusionError::Plan(format!( + "Unknown or unsupported parquet statistics setting {str_setting} \ + valid options are 'none', 'page', and 'chunk'" + ))), + } +} + +impl ParquetSink { + fn new(config: FileSinkConfig) -> Self { + Self { config } + } + + /// Builds a parquet WriterProperties struct, setting options as appropriate from TaskContext options. + /// May return error if SessionContext contains invalid or unsupported options + fn parquet_writer_props_from_context( + &self, + context: &Arc, + ) -> Result { + let parquet_context = &context.session_config().options().execution.parquet; + Ok(WriterProperties::builder() + .set_data_page_size_limit(parquet_context.data_pagesize_limit) + .set_write_batch_size(parquet_context.write_batch_size) + .set_writer_version(parse_version_string(&parquet_context.writer_version)?) + .set_compression(parse_compression_string(&parquet_context.compression)?) + .set_dictionary_enabled(parquet_context.dictionary_enabled) + .set_dictionary_page_size_limit(parquet_context.dictionary_page_size_limit) + .set_statistics_enabled(parse_statistics_string( + &parquet_context.statistics_enabled, + )?) + .set_max_statistics_size(parquet_context.max_statistics_size) + .set_max_row_group_size(parquet_context.max_row_group_size) + .set_created_by(parquet_context.created_by.clone()) + .set_column_index_truncate_length( + parquet_context.column_index_truncate_length, + ) + .set_data_page_row_count_limit(parquet_context.data_page_row_count_limit) + .set_encoding(parse_encoding_string(&parquet_context.encoding)?) + .set_bloom_filter_enabled(parquet_context.bloom_filter_enabled) + .set_bloom_filter_fpp(parquet_context.bloom_filter_fpp) + .set_bloom_filter_ndv(parquet_context.bloom_filter_ndv) + .build()) + } + + // Create a write for parquet files + async fn create_writer( + &self, + file_meta: FileMeta, + object_store: Arc, + parquet_props: WriterProperties, + ) -> Result< + AsyncArrowWriter>, + > { + let object = &file_meta.object_meta; + match self.config.writer_mode { + FileWriterMode::Append => { + plan_err!( + "Appending to Parquet files is not supported by the file format!" + ) + } + FileWriterMode::Put => Err(DataFusionError::NotImplemented( + "FileWriterMode::Put is not implemented for ParquetSink".into(), + )), + FileWriterMode::PutMultipart => { + let (_, multipart_writer) = object_store + .put_multipart(&object.location) + .await + .map_err(DataFusionError::ObjectStore)?; + let writer = AsyncArrowWriter::try_new( + multipart_writer, + self.config.output_schema.clone(), + 10485760, + Some(parquet_props), + )?; + Ok(writer) + } + } + } +} + +#[async_trait] +impl DataSink for ParquetSink { + async fn write_all( + &self, + mut data: Vec, + context: &Arc, + ) -> Result { + let num_partitions = data.len(); + let parquet_props = self.parquet_writer_props_from_context(context)?; + + let object_store = context + .runtime_env() + .object_store(&self.config.object_store_url)?; + + // Construct writer for each file group + let mut writers = vec![]; + match self.config.writer_mode { + FileWriterMode::Append => { + return plan_err!( + "Parquet format does not support appending to existing file!" + ) + } + FileWriterMode::Put => { + return Err(DataFusionError::NotImplemented( + "Put Mode is not implemented for ParquetSink yet".into(), + )) + } + FileWriterMode::PutMultipart => { + // Currently assuming only 1 partition path (i.e. not hive-style partitioning on a column) + let base_path = &self.config.table_paths[0]; + // Uniquely identify this batch of files with a random string, to prevent collisions overwriting files + let write_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16); + for part_idx in 0..num_partitions { + let file_path = base_path + .prefix() + .child(format!("/{}_{}.parquet", write_id, part_idx)); + let object_meta = ObjectMeta { + location: file_path, + last_modified: chrono::offset::Utc::now(), + size: 0, + e_tag: None, + }; + let writer = self + .create_writer( + object_meta.into(), + object_store.clone(), + parquet_props.clone(), + ) + .await?; + writers.push(writer); + } + } + } + + let mut row_count = 0; + // TODO parallelize serialization accross partitions and batches within partitions + // see: https://github.com/apache/arrow-datafusion/issues/7079 + for idx in 0..num_partitions { + while let Some(batch) = data[idx].next().await.transpose()? { + row_count += batch.num_rows(); + // TODO cleanup all multipart writes when any encounters an error + writers[idx].write(&batch).await?; + } + } + + for writer in writers { + writer.close().await?; + } + + Ok(row_count as u64) + } +} + #[cfg(test)] pub(crate) mod test_util { use super::*; diff --git a/datafusion/core/src/datasource/file_format/write.rs b/datafusion/core/src/datasource/file_format/write.rs new file mode 100644 index 000000000000..c256c9689ab9 --- /dev/null +++ b/datafusion/core/src/datasource/file_format/write.rs @@ -0,0 +1,365 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Module containing helper methods/traits related to enabling +//! write support for the various file formats + +use std::io::Error; +use std::mem; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use crate::datasource::physical_plan::FileMeta; +use crate::error::Result; +use crate::physical_plan::SendableRecordBatchStream; + +use arrow_array::RecordBatch; +use datafusion_common::DataFusionError; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::future::BoxFuture; +use futures::FutureExt; +use futures::{ready, StreamExt}; +use object_store::path::Path; +use object_store::{MultipartId, ObjectMeta, ObjectStore}; +use tokio::io::{AsyncWrite, AsyncWriteExt}; + +use super::file_type::FileCompressionType; + +/// `AsyncPutWriter` is an object that facilitates asynchronous writing to object stores. +/// It is specifically designed for the `object_store` crate's `put` method and sends +/// whole bytes at once when the buffer is flushed. +pub struct AsyncPutWriter { + /// Object metadata + object_meta: ObjectMeta, + /// A shared reference to the object store + store: Arc, + /// A buffer that stores the bytes to be sent + current_buffer: Vec, + /// Used for async handling in flush method + inner_state: AsyncPutState, +} + +impl AsyncPutWriter { + /// Constructor for the `AsyncPutWriter` object + pub fn new(object_meta: ObjectMeta, store: Arc) -> Self { + Self { + object_meta, + store, + current_buffer: vec![], + // The writer starts out in buffering mode + inner_state: AsyncPutState::Buffer, + } + } + + /// Separate implementation function that unpins the [`AsyncPutWriter`] so + /// that partial borrows work correctly + fn poll_shutdown_inner( + &mut self, + cx: &mut Context<'_>, + ) -> Poll> { + loop { + match &mut self.inner_state { + AsyncPutState::Buffer => { + // Convert the current buffer to bytes and take ownership of it + let bytes = Bytes::from(mem::take(&mut self.current_buffer)); + // Set the inner state to Put variant with the bytes + self.inner_state = AsyncPutState::Put { bytes } + } + AsyncPutState::Put { bytes } => { + // Send the bytes to the object store's put method + return Poll::Ready( + ready!(self + .store + .put(&self.object_meta.location, bytes.clone()) + .poll_unpin(cx)) + .map_err(Error::from), + ); + } + } + } + } +} + +/// An enum that represents the inner state of AsyncPut +enum AsyncPutState { + /// Building Bytes struct in this state + Buffer, + /// Data in the buffer is being sent to the object store + Put { bytes: Bytes }, +} + +impl AsyncWrite for AsyncPutWriter { + // Define the implementation of the AsyncWrite trait for the `AsyncPutWriter` struct + fn poll_write( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + // Extend the current buffer with the incoming buffer + self.current_buffer.extend_from_slice(buf); + // Return a ready poll with the length of the incoming buffer + Poll::Ready(Ok(buf.len())) + } + + fn poll_flush( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + // Return a ready poll with an empty result + Poll::Ready(Ok(())) + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + // Call the poll_shutdown_inner method to handle the actual sending of data to the object store + self.poll_shutdown_inner(cx) + } +} + +/// Stores data needed during abortion of MultiPart writers +pub(crate) struct MultiPart { + /// A shared reference to the object store + store: Arc, + multipart_id: MultipartId, + location: Path, +} + +impl MultiPart { + /// Create a new `MultiPart` + pub fn new( + store: Arc, + multipart_id: MultipartId, + location: Path, + ) -> Self { + Self { + store, + multipart_id, + location, + } + } +} + +pub(crate) enum AbortMode { + Put, + Append, + MultiPart(MultiPart), +} + +/// A wrapper struct with abort method and writer +pub(crate) struct AbortableWrite { + writer: W, + mode: AbortMode, +} + +impl AbortableWrite { + /// Create a new `AbortableWrite` instance with the given writer, and write mode. + pub(crate) fn new(writer: W, mode: AbortMode) -> Self { + Self { writer, mode } + } + + /// handling of abort for different write modes + pub(crate) fn abort_writer(&self) -> Result>> { + match &self.mode { + AbortMode::Put => Ok(async { Ok(()) }.boxed()), + AbortMode::Append => Err(DataFusionError::Execution( + "Cannot abort in append mode".to_string(), + )), + AbortMode::MultiPart(MultiPart { + store, + multipart_id, + location, + }) => { + let location = location.clone(); + let multipart_id = multipart_id.clone(); + let store = store.clone(); + Ok(Box::pin(async move { + store + .abort_multipart(&location, &multipart_id) + .await + .map_err(DataFusionError::ObjectStore) + })) + } + } + } +} + +impl AsyncWrite for AbortableWrite { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.get_mut().writer).poll_write(cx, buf) + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.get_mut().writer).poll_flush(cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.get_mut().writer).poll_shutdown(cx) + } +} + +/// An enum that defines different file writer modes. +#[derive(Debug, Clone, Copy)] +pub enum FileWriterMode { + /// Data is appended to an existing file. + Append, + /// Data is written to a new file. + Put, + /// Data is written to a new file in multiple parts. + PutMultipart, +} +/// A trait that defines the methods required for a RecordBatch serializer. +#[async_trait] +pub trait BatchSerializer: Unpin + Send { + /// Asynchronously serializes a `RecordBatch` and returns the serialized bytes. + async fn serialize(&mut self, batch: RecordBatch) -> Result; +} + +/// Checks if any of the passed writers have encountered an error +/// and if so, all writers are aborted. +async fn check_for_errors( + result: Result, + writers: &mut [AbortableWrite], +) -> Result { + match result { + Ok(value) => Ok(value), + Err(e) => { + // Abort all writers before returning the error: + for writer in writers { + let mut abort_future = writer.abort_writer(); + if let Ok(abort_future) = &mut abort_future { + let _ = abort_future.await; + } + // Ignore errors that occur during abortion, + // We do try to abort all writers before returning error. + } + // After aborting writers return original error. + Err(e) + } + } +} + +/// Returns an [`AbortableWrite`] which writes to the given object store location +/// with the specified compression +pub(crate) async fn create_writer( + writer_mode: FileWriterMode, + file_compression_type: FileCompressionType, + file_meta: FileMeta, + object_store: Arc, +) -> Result>> { + let object = &file_meta.object_meta; + match writer_mode { + // If the mode is append, call the store's append method and return wrapped in + // a boxed trait object. + FileWriterMode::Append => { + let writer = object_store + .append(&object.location) + .await + .map_err(DataFusionError::ObjectStore)?; + let writer = AbortableWrite::new( + file_compression_type.convert_async_writer(writer)?, + AbortMode::Append, + ); + Ok(writer) + } + // If the mode is put, create a new AsyncPut writer and return it wrapped in + // a boxed trait object + FileWriterMode::Put => { + let writer = Box::new(AsyncPutWriter::new(object.clone(), object_store)); + let writer = AbortableWrite::new( + file_compression_type.convert_async_writer(writer)?, + AbortMode::Put, + ); + Ok(writer) + } + // If the mode is put multipart, call the store's put_multipart method and + // return the writer wrapped in a boxed trait object. + FileWriterMode::PutMultipart => { + let (multipart_id, writer) = object_store + .put_multipart(&object.location) + .await + .map_err(DataFusionError::ObjectStore)?; + Ok(AbortableWrite::new( + file_compression_type.convert_async_writer(writer)?, + AbortMode::MultiPart(MultiPart::new( + object_store, + multipart_id, + object.location.clone(), + )), + )) + } + } +} + +/// Contains the common logic for serializing RecordBatches and +/// writing the resulting bytes to an ObjectStore. +/// Serialization is assumed to be stateless, i.e. +/// each RecordBatch can be serialized without any +/// dependency on the RecordBatches before or after. +pub(crate) async fn stateless_serialize_and_write_files( + mut data: Vec, + mut serializers: Vec>, + mut writers: Vec>>, +) -> Result { + let num_partitions = data.len(); + let mut row_count = 0; + // Map errors to DatafusionError. + let err_converter = + |_| DataFusionError::Internal("Unexpected FileSink Error".to_string()); + // TODO parallelize serialization accross partitions and batches within partitions + // see: https://github.com/apache/arrow-datafusion/issues/7079 + for idx in 0..num_partitions { + while let Some(maybe_batch) = data[idx].next().await { + // Write data to files in a round robin fashion: + let serializer = &mut serializers[idx]; + let batch = check_for_errors(maybe_batch, &mut writers).await?; + row_count += batch.num_rows(); + let bytes = + check_for_errors(serializer.serialize(batch).await, &mut writers).await?; + let writer = &mut writers[idx]; + check_for_errors( + writer.write_all(&bytes).await.map_err(err_converter), + &mut writers, + ) + .await?; + } + } + // Perform cleanup: + let n_writers = writers.len(); + for idx in 0..n_writers { + check_for_errors( + writers[idx].shutdown().await.map_err(err_converter), + &mut writers, + ) + .await?; + } + Ok(row_count as u64) +} diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 8519628760f2..4f2387ad0dfa 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -845,10 +845,12 @@ impl TableProvider for ListingTable { file_groups.len() ))); } - writer_mode = crate::datasource::file_format::FileWriterMode::Append; + writer_mode = + crate::datasource::file_format::write::FileWriterMode::Append; } ListingTableInsertMode::AppendNewFiles => { - writer_mode = crate::datasource::file_format::FileWriterMode::PutMultipart + writer_mode = + crate::datasource::file_format::write::FileWriterMode::PutMultipart } ListingTableInsertMode::Error => { return plan_err!( @@ -963,6 +965,7 @@ mod tests { use datafusion_common::assert_contains; use datafusion_expr::LogicalPlanBuilder; use rstest::*; + use std::collections::HashMap; use std::fs::File; use tempfile::TempDir; @@ -1554,6 +1557,7 @@ mod tests { helper_test_insert_into_append_to_existing_files( FileType::JSON, FileCompressionType::UNCOMPRESSED, + None, ) .await?; Ok(()) @@ -1564,6 +1568,7 @@ mod tests { helper_test_append_new_files_to_table( FileType::JSON, FileCompressionType::UNCOMPRESSED, + None, ) .await?; Ok(()) @@ -1574,6 +1579,7 @@ mod tests { helper_test_insert_into_append_to_existing_files( FileType::CSV, FileCompressionType::UNCOMPRESSED, + None, ) .await?; Ok(()) @@ -1584,11 +1590,127 @@ mod tests { helper_test_append_new_files_to_table( FileType::CSV, FileCompressionType::UNCOMPRESSED, + None, ) .await?; Ok(()) } + #[tokio::test] + async fn test_insert_into_append_new_parquet_files_defaults() -> Result<()> { + helper_test_append_new_files_to_table( + FileType::PARQUET, + FileCompressionType::UNCOMPRESSED, + None, + ) + .await?; + Ok(()) + } + + #[tokio::test] + async fn test_insert_into_append_new_parquet_files_session_overrides() -> Result<()> { + let mut config_map: HashMap = HashMap::new(); + config_map.insert( + "datafusion.execution.parquet.compression".into(), + "zstd(5)".into(), + ); + config_map.insert( + "datafusion.execution.parquet.dictionary_enabled".into(), + "false".into(), + ); + config_map.insert( + "datafusion.execution.parquet.dictionary_page_size_limit".into(), + "100".into(), + ); + config_map.insert( + "datafusion.execution.parquet.staistics_enabled".into(), + "none".into(), + ); + config_map.insert( + "datafusion.execution.parquet.max_statistics_size".into(), + "10".into(), + ); + config_map.insert( + "datafusion.execution.parquet.max_row_group_size".into(), + "5".into(), + ); + config_map.insert( + "datafusion.execution.parquet.created_by".into(), + "datafusion test".into(), + ); + config_map.insert( + "datafusion.execution.parquet.column_index_truncate_length".into(), + "50".into(), + ); + config_map.insert( + "datafusion.execution.parquet.data_page_row_count_limit".into(), + "50".into(), + ); + config_map.insert( + "datafusion.execution.parquet.encoding".into(), + "delta_binary_packed".into(), + ); + config_map.insert( + "datafusion.execution.parquet.bloom_filter_enabled".into(), + "true".into(), + ); + config_map.insert( + "datafusion.execution.parquet.bloom_filter_fpp".into(), + "0.01".into(), + ); + config_map.insert( + "datafusion.execution.parquet.bloom_filter_ndv".into(), + "1000".into(), + ); + config_map.insert( + "datafusion.execution.parquet.writer_version".into(), + "2.0".into(), + ); + config_map.insert( + "datafusion.execution.parquet.write_batch_size".into(), + "5".into(), + ); + helper_test_append_new_files_to_table( + FileType::PARQUET, + FileCompressionType::UNCOMPRESSED, + Some(config_map), + ) + .await?; + Ok(()) + } + + #[tokio::test] + async fn test_insert_into_append_new_parquet_files_invalid_session_fails( + ) -> Result<()> { + let mut config_map: HashMap = HashMap::new(); + config_map.insert( + "datafusion.execution.parquet.compression".into(), + "zstd".into(), + ); + let e = helper_test_append_new_files_to_table( + FileType::PARQUET, + FileCompressionType::UNCOMPRESSED, + Some(config_map), + ) + .await + .expect_err("Example should fail!"); + assert_eq!("Error during planning: zstd compression requires specifying a level such as zstd(4)", format!("{e}")); + Ok(()) + } + + #[tokio::test] + async fn test_insert_into_append_to_parquet_file_fails() -> Result<()> { + let maybe_err = helper_test_insert_into_append_to_existing_files( + FileType::PARQUET, + FileCompressionType::UNCOMPRESSED, + None, + ) + .await; + let _err = + maybe_err.expect_err("Appending to existing parquet file did not fail!"); + Ok(()) + } + fn load_empty_schema_table( schema: SchemaRef, temp_path: &str, @@ -1615,9 +1737,16 @@ mod tests { async fn helper_test_insert_into_append_to_existing_files( file_type: FileType, file_compression_type: FileCompressionType, + session_config_map: Option>, ) -> Result<()> { // Create the initial context, schema, and batch. - let session_ctx = SessionContext::new(); + let session_ctx = match session_config_map { + Some(cfg) => { + let config = SessionConfig::from_string_hash_map(cfg)?; + SessionContext::with_config(config) + } + None => SessionContext::new(), + }; // Create a new schema with one field called "a" of type Int32 let schema = Arc::new(Schema::new(vec![Field::new( "column1", @@ -1777,9 +1906,17 @@ mod tests { async fn helper_test_append_new_files_to_table( file_type: FileType, file_compression_type: FileCompressionType, + session_config_map: Option>, ) -> Result<()> { // Create the initial context, schema, and batch. - let session_ctx = SessionContext::new(); + let session_ctx = match session_config_map { + Some(cfg) => { + let config = SessionConfig::from_string_hash_map(cfg)?; + SessionContext::with_config(config) + } + None => SessionContext::new(), + }; + // Create a new schema with one field called "a" of type Int32 let schema = Arc::new(Schema::new(vec![Field::new( "column1", @@ -1825,9 +1962,9 @@ mod tests { .register_parquet( "t", tmp_dir.path().to_str().unwrap(), - ParquetReadOptions::default(), // TODO implement insert_mode for parquet - //.insert_mode(ListingTableInsertMode::AppendNewFiles) - //.schema(schema.as_ref()), + ParquetReadOptions::default() + .insert_mode(ListingTableInsertMode::AppendNewFiles) + .schema(schema.as_ref()), ) .await?; } @@ -1894,16 +2031,16 @@ mod tests { // Read the records in the table let batches = session_ctx - .sql("select count(*) from t") + .sql("select count(*) as count from t") .await? .collect() .await?; let expected = vec![ - "+----------+", - "| COUNT(*) |", - "+----------+", - "| 6 |", - "+----------+", + "+-------+", + "| count |", + "+-------+", + "| 6 |", + "+-------+", ]; // Assert that the batches read from the file match the expected result. @@ -1935,18 +2072,18 @@ mod tests { // Read the contents of the table let batches = session_ctx - .sql("select count(*) from t") + .sql("select count(*) AS count from t") .await? .collect() .await?; // Define the expected result after the second append. let expected = vec![ - "+----------+", - "| COUNT(*) |", - "+----------+", - "| 12 |", - "+----------+", + "+-------+", + "| count |", + "+-------+", + "| 12 |", + "+-------+", ]; // Assert that the batches read from the file after the second append match the expected result. diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs b/datafusion/core/src/datasource/physical_plan/file_stream.rs index 2c4437de0a92..6ac073c34f61 100644 --- a/datafusion/core/src/datasource/physical_plan/file_stream.rs +++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs @@ -522,7 +522,7 @@ mod tests { use datafusion_common::DataFusionError; use super::*; - use crate::datasource::file_format::BatchSerializer; + use crate::datasource::file_format::write::BatchSerializer; use crate::datasource::object_store::ObjectStoreUrl; use crate::datasource::physical_plan::FileMeta; use crate::physical_plan::metrics::ExecutionPlanMetricsSet; diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 3e607d121ac9..b0914b081616 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -46,7 +46,7 @@ pub use json::{JsonOpener, NdJsonExec}; use crate::physical_plan::ExecutionPlan; use crate::{ - datasource::file_format::FileWriterMode, + datasource::file_format::write::FileWriterMode, physical_plan::{DisplayAs, DisplayFormatType}, }; use crate::{ diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index ce9165f027c7..6593e22e6cc0 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -1024,8 +1024,14 @@ impl SessionContext { ) -> Result<()> { let listing_options = options.to_listing_options(&self.state.read().config); - self.register_listing_table(name, table_path, listing_options, None, None) - .await?; + self.register_listing_table( + name, + table_path, + listing_options, + options.schema.map(|s| Arc::new(s.to_owned())), + None, + ) + .await?; Ok(()) } diff --git a/datafusion/core/tests/sql/expr.rs b/datafusion/core/tests/sql/expr.rs index fc0a4e7c7ed2..36786bd0798f 100644 --- a/datafusion/core/tests/sql/expr.rs +++ b/datafusion/core/tests/sql/expr.rs @@ -448,8 +448,10 @@ async fn test_substring_expr() -> Result<()> { Ok(()) } +/// Test string expressions test split into two batches +/// to prevent stack overflow error #[tokio::test] -async fn test_string_expressions() -> Result<()> { +async fn test_string_expressions_batch1() -> Result<()> { test_expression!("ascii('')", "0"); test_expression!("ascii('x')", "120"); test_expression!("ascii(NULL)", "NULL"); @@ -501,6 +503,13 @@ async fn test_string_expressions() -> Result<()> { test_expression!("rtrim(' zzzytest ', NULL)", "NULL"); test_expression!("rtrim('testxxzx', 'xyz')", "test"); test_expression!("rtrim(NULL, 'xyz')", "NULL"); + Ok(()) +} + +/// Test string expressions test split into two batches +/// to prevent stack overflow error +#[tokio::test] +async fn test_string_expressions_batch2() -> Result<()> { test_expression!("split_part('abc~@~def~@~ghi', '~@~', 2)", "def"); test_expression!("split_part('abc~@~def~@~ghi', '~@~', 20)", ""); test_expression!("split_part(NULL, '~@~', 20)", "NULL"); diff --git a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt index 162e20820191..452a8709c523 100644 --- a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt +++ b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt @@ -146,12 +146,28 @@ datafusion.execution.aggregate.scalar_update_factor 10 datafusion.execution.batch_size 8192 datafusion.execution.coalesce_batches true datafusion.execution.collect_statistics false +datafusion.execution.parquet.bloom_filter_enabled false +datafusion.execution.parquet.bloom_filter_fpp 0.05 +datafusion.execution.parquet.bloom_filter_ndv 1000000 +datafusion.execution.parquet.column_index_truncate_length NULL +datafusion.execution.parquet.compression snappy +datafusion.execution.parquet.created_by datafusion version 28.0.0 +datafusion.execution.parquet.data_page_row_count_limit 18446744073709551615 +datafusion.execution.parquet.data_pagesize_limit 1048576 +datafusion.execution.parquet.dictionary_enabled true +datafusion.execution.parquet.dictionary_page_size_limit 1048576 datafusion.execution.parquet.enable_page_index true +datafusion.execution.parquet.encoding plain +datafusion.execution.parquet.max_row_group_size 1048576 +datafusion.execution.parquet.max_statistics_size 4096 datafusion.execution.parquet.metadata_size_hint NULL datafusion.execution.parquet.pruning true datafusion.execution.parquet.pushdown_filters false datafusion.execution.parquet.reorder_filters false datafusion.execution.parquet.skip_metadata true +datafusion.execution.parquet.statistics_enabled page +datafusion.execution.parquet.write_batch_size 1024 +datafusion.execution.parquet.writer_version 1.0 datafusion.execution.planning_concurrency 13 datafusion.execution.sort_in_place_threshold_bytes 1048576 datafusion.execution.sort_spill_reservation_bytes 10485760 diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 63c9c064bc52..50d9cb7c8b67 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -35,47 +35,63 @@ Values are parsed according to the [same rules used in casts from Utf8](https:// If the value in the environment variable cannot be cast to the type of the configuration option, the default value will be used instead and a warning emitted. Environment variables are read during `SessionConfig` initialisation so they must be set beforehand and will not affect running sessions. -| key | default | description | -| ---------------------------------------------------------- | ---------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| datafusion.catalog.create_default_catalog_and_schema | true | Whether the default catalog and schema should be created automatically. | -| datafusion.catalog.default_catalog | datafusion | The default catalog name - this impacts what SQL queries use if not specified | -| datafusion.catalog.default_schema | public | The default schema name - this impacts what SQL queries use if not specified | -| datafusion.catalog.information_schema | false | Should DataFusion provide access to `information_schema` virtual tables for displaying schema information | -| datafusion.catalog.location | NULL | Location scanned to load tables for `default` schema | -| datafusion.catalog.format | NULL | Type of `TableProvider` to use when loading `default` schema | -| datafusion.catalog.has_header | false | If the file has a header | -| datafusion.execution.batch_size | 8192 | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption | -| datafusion.execution.coalesce_batches | true | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting | -| datafusion.execution.collect_statistics | false | Should DataFusion collect statistics after listing files | -| datafusion.execution.target_partitions | 0 | Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of CPU cores on the system | -| datafusion.execution.time_zone | +00:00 | The default time zone Some functions, e.g. `EXTRACT(HOUR from SOME_TIME)`, shift the underlying datetime according to this time zone, and then extract the hour | -| datafusion.execution.parquet.enable_page_index | true | If true, reads the Parquet data page level metadata (the Page Index), if present, to reduce the I/O and number of rows decoded. | -| datafusion.execution.parquet.pruning | true | If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file | -| datafusion.execution.parquet.skip_metadata | true | If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata | -| datafusion.execution.parquet.metadata_size_hint | NULL | If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer | -| datafusion.execution.parquet.pushdown_filters | false | If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded | -| datafusion.execution.parquet.reorder_filters | false | If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | -| datafusion.execution.aggregate.scalar_update_factor | 10 | Specifies the threshold for using `ScalarValue`s to update accumulators during high-cardinality aggregations for each input batch. The aggregation is considered high-cardinality if the number of affected groups is greater than or equal to `batch_size / scalar_update_factor`. In such cases, `ScalarValue`s are utilized for updating accumulators, rather than the default batch-slice approach. This can lead to performance improvements. By adjusting the `scalar_update_factor`, you can balance the trade-off between more efficient accumulator updates and the number of groups affected. | -| datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | -| datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). | -| datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. | -| datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | -| datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | -| datafusion.optimizer.repartition_aggregations | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level | -| datafusion.optimizer.repartition_file_min_size | 10485760 | Minimum total files size in bytes to perform file scan repartitioning. | -| datafusion.optimizer.repartition_joins | true | Should DataFusion repartition data using the join keys to execute joins in parallel using the provided `target_partitions` level | -| datafusion.optimizer.allow_symmetric_joins_without_pruning | true | Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors. | -| datafusion.optimizer.repartition_file_scans | true | When set to `true`, file groups will be repartitioned to achieve maximum parallelism. Currently Parquet and CSV formats are supported. If set to `true`, all files will be repartitioned evenly (i.e., a single large file might be partitioned into smaller chunks) for parallel scanning. If set to `false`, different files will be read in parallel, but repartitioning won't happen within a single file. | -| datafusion.optimizer.repartition_windows | true | Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level | -| datafusion.optimizer.repartition_sorts | true | Should DataFusion execute sorts in a per-partition fashion and merge afterwards instead of coalescing first and sorting globally. With this flag is enabled, plans in the form below `text "SortExec: [a@0 ASC]", " CoalescePartitionsExec", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` would turn into the plan below which performs better in multithreaded environments `text "SortPreservingMergeExec: [a@0 ASC]", " SortExec: [a@0 ASC]", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` | -| datafusion.optimizer.bounded_order_preserving_variants | false | When true, DataFusion will opportunistically remove sorts by replacing `RepartitionExec` with `SortPreservingRepartitionExec`, and `CoalescePartitionsExec` with `SortPreservingMergeExec`, even when the query is bounded. | -| datafusion.optimizer.skip_failed_rules | false | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail | -| datafusion.optimizer.max_passes | 3 | Number of times that the optimizer will attempt to optimize the plan | -| datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys | -| datafusion.optimizer.prefer_hash_join | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory | -| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition | -| datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | -| datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans | -| datafusion.sql_parser.parse_float_as_decimal | false | When set to true, SQL parser will parse float as decimal type | -| datafusion.sql_parser.enable_ident_normalization | true | When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted) | -| datafusion.sql_parser.dialect | generic | Configure the SQL dialect used by DataFusion's parser; supported values include: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, and Ansi. | +| key | default | description | +| ---------------------------------------------------------- | ------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| datafusion.catalog.create_default_catalog_and_schema | true | Whether the default catalog and schema should be created automatically. | +| datafusion.catalog.default_catalog | datafusion | The default catalog name - this impacts what SQL queries use if not specified | +| datafusion.catalog.default_schema | public | The default schema name - this impacts what SQL queries use if not specified | +| datafusion.catalog.information_schema | false | Should DataFusion provide access to `information_schema` virtual tables for displaying schema information | +| datafusion.catalog.location | NULL | Location scanned to load tables for `default` schema | +| datafusion.catalog.format | NULL | Type of `TableProvider` to use when loading `default` schema | +| datafusion.catalog.has_header | false | If the file has a header | +| datafusion.execution.batch_size | 8192 | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption | +| datafusion.execution.coalesce_batches | true | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting | +| datafusion.execution.collect_statistics | false | Should DataFusion collect statistics after listing files | +| datafusion.execution.target_partitions | 0 | Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of CPU cores on the system | +| datafusion.execution.time_zone | +00:00 | The default time zone Some functions, e.g. `EXTRACT(HOUR from SOME_TIME)`, shift the underlying datetime according to this time zone, and then extract the hour | +| datafusion.execution.parquet.enable_page_index | true | If true, reads the Parquet data page level metadata (the Page Index), if present, to reduce the I/O and number of rows decoded. | +| datafusion.execution.parquet.pruning | true | If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file | +| datafusion.execution.parquet.skip_metadata | true | If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata | +| datafusion.execution.parquet.metadata_size_hint | NULL | If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer | +| datafusion.execution.parquet.pushdown_filters | false | If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded | +| datafusion.execution.parquet.reorder_filters | false | If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | +| datafusion.execution.parquet.data_pagesize_limit | 1048576 | Sets best effort maximum size of data page in bytes | +| datafusion.execution.parquet.write_batch_size | 1024 | Sets write_batch_size in bytes | +| datafusion.execution.parquet.writer_version | 1.0 | Sets parquet writer version valid values are "1.0" and "2.0" | +| datafusion.execution.parquet.compression | snappy | Sets default parquet compression codec Valid values are: uncompressed, snappy, gzip(level), lzo, brotli(level), lz4, zstd(level), and lz4_raw. These values are not case sensitive. | +| datafusion.execution.parquet.dictionary_enabled | true | Sets if dictionary encoding is enabled | +| datafusion.execution.parquet.dictionary_page_size_limit | 1048576 | Sets best effort maximum dictionary page size, in bytes | +| datafusion.execution.parquet.statistics_enabled | page | Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. | +| datafusion.execution.parquet.max_statistics_size | 4096 | Sets max statistics size for any column | +| datafusion.execution.parquet.max_row_group_size | 1048576 | Sets maximum number of rows in a row group | +| datafusion.execution.parquet.created_by | datafusion version 28.0.0 | Sets "created by" property | +| datafusion.execution.parquet.column_index_truncate_length | NULL | Sets column index trucate length | +| datafusion.execution.parquet.data_page_row_count_limit | 18446744073709551615 | Sets best effort maximum number of rows in data page | +| datafusion.execution.parquet.encoding | plain | Sets default encoding for any column Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. | +| datafusion.execution.parquet.bloom_filter_enabled | false | Sets if bloom filter is enabled for any column | +| datafusion.execution.parquet.bloom_filter_fpp | 0.05 | Sets bloom filter false positive probability | +| datafusion.execution.parquet.bloom_filter_ndv | 1000000 | Sets bloom filter number of distinct values | +| datafusion.execution.aggregate.scalar_update_factor | 10 | Specifies the threshold for using `ScalarValue`s to update accumulators during high-cardinality aggregations for each input batch. The aggregation is considered high-cardinality if the number of affected groups is greater than or equal to `batch_size / scalar_update_factor`. In such cases, `ScalarValue`s are utilized for updating accumulators, rather than the default batch-slice approach. This can lead to performance improvements. By adjusting the `scalar_update_factor`, you can balance the trade-off between more efficient accumulator updates and the number of groups affected. | +| datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | +| datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). | +| datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. | +| datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | +| datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | +| datafusion.optimizer.repartition_aggregations | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level | +| datafusion.optimizer.repartition_file_min_size | 10485760 | Minimum total files size in bytes to perform file scan repartitioning. | +| datafusion.optimizer.repartition_joins | true | Should DataFusion repartition data using the join keys to execute joins in parallel using the provided `target_partitions` level | +| datafusion.optimizer.allow_symmetric_joins_without_pruning | true | Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors. | +| datafusion.optimizer.repartition_file_scans | true | When set to `true`, file groups will be repartitioned to achieve maximum parallelism. Currently Parquet and CSV formats are supported. If set to `true`, all files will be repartitioned evenly (i.e., a single large file might be partitioned into smaller chunks) for parallel scanning. If set to `false`, different files will be read in parallel, but repartitioning won't happen within a single file. | +| datafusion.optimizer.repartition_windows | true | Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level | +| datafusion.optimizer.repartition_sorts | true | Should DataFusion execute sorts in a per-partition fashion and merge afterwards instead of coalescing first and sorting globally. With this flag is enabled, plans in the form below `text "SortExec: [a@0 ASC]", " CoalescePartitionsExec", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` would turn into the plan below which performs better in multithreaded environments `text "SortPreservingMergeExec: [a@0 ASC]", " SortExec: [a@0 ASC]", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` | +| datafusion.optimizer.bounded_order_preserving_variants | false | When true, DataFusion will opportunistically remove sorts by replacing `RepartitionExec` with `SortPreservingRepartitionExec`, and `CoalescePartitionsExec` with `SortPreservingMergeExec`, even when the query is bounded. | +| datafusion.optimizer.skip_failed_rules | false | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail | +| datafusion.optimizer.max_passes | 3 | Number of times that the optimizer will attempt to optimize the plan | +| datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys | +| datafusion.optimizer.prefer_hash_join | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory | +| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition | +| datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | +| datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans | +| datafusion.sql_parser.parse_float_as_decimal | false | When set to true, SQL parser will parse float as decimal type | +| datafusion.sql_parser.enable_ident_normalization | true | When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted) | +| datafusion.sql_parser.dialect | generic | Configure the SQL dialect used by DataFusion's parser; supported values include: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, and Ansi. |