From b18f6eabc81aeb924bbd4882216b05293d5252f6 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Tue, 8 Aug 2023 21:47:19 -0400 Subject: [PATCH 01/11] initial AsyncArrowWriter immplementation --- datafusion/common/src/config.rs | 43 +++- .../src/datasource/file_format/options.rs | 37 +++- .../src/datasource/file_format/parquet.rs | 194 +++++++++++++++++- .../core/src/datasource/listing/table.rs | 51 +++-- datafusion/core/src/execution/context.rs | 6 +- 5 files changed, 304 insertions(+), 27 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index fe7fb955033f..ae9d35b1a378 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -255,7 +255,7 @@ config_namespace! { } config_namespace! { - /// Options related to reading of parquet files + /// Options related to parquet files pub struct ParquetOptions { /// If true, reads the Parquet data page level metadata (the /// Page Index), if present, to reduce the I/O and number of @@ -286,7 +286,48 @@ config_namespace! { /// will be reordered heuristically to minimize the cost of evaluation. If false, /// the filters are applied in the same order as written in the query pub reorder_filters: bool, default = false + + // The following map to parquet::file::properties::WriterProperties + + /// Sets best effort maximum size of data page in bytes + pub data_pagesize_limit: usize, default = parquet::file::properties::DEFAULT_PAGE_SIZE + + /// Sets best effort maximum number of rows in data page + pub data_page_row_count_limit: usize, default = usize::MAX + + /// Sets best effort maximum dictionary page size, in bytes + pub dictionary_page_size_limit: usize, default = parquet::file::properties::DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT + + /// Sets maximum number of rows in a row group + pub max_row_group_size: usize, default = parquet::file::properties::DEFAULT_MAX_ROW_GROUP_SIZE + + /// Sets "created by" property + pub created_by: String, default = parquet::file::properties::DEFAULT_CREATED_BY.into() + + pub compression: Option, default = None + + /// Sets default encoding for any column + pub encoding: Option, default = None + + /// Sets if dictionary encoding is enabled + pub dictionary_enabled: Option, default = None + + /// Sets if statistics are enabled for any column + pub statistics_enabled: Option, default = None + + /// Sets max statistics size for any column + pub max_statistics_size: Option, default = None + + /// Sets if bloom filter is enabled for any column + pub bloom_filter_enabled: Option, default = None } + // TODO macro not working with Option or Option + // Sets bloom filter false positive probability + //pub bloom_fiter_fpp: Option, default = None + + // Sets bloom filter number of distinct values + //pub bloom_filter_ndv: Option, default = None + //} } config_namespace! { diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index 73c20d3b0c3a..d8168070eb30 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -214,6 +214,13 @@ pub struct ParquetReadOptions<'a> { /// /// If None specified, uses value in SessionConfig pub skip_metadata: Option, + /// An optional schema representing the parquet files. If None, parquet reader will try to infer it + /// based on data in file. + pub schema: Option<&'a Schema>, + /// Indicates how the file is sorted + pub file_sort_order: Vec>, + /// Setting controls how inserts to this file should be handled + pub insert_mode: ListingTableInsertMode, } impl<'a> Default for ParquetReadOptions<'a> { @@ -223,6 +230,9 @@ impl<'a> Default for ParquetReadOptions<'a> { table_partition_cols: vec![], parquet_pruning: None, skip_metadata: None, + schema: None, + file_sort_order: vec![], + insert_mode: ListingTableInsertMode::AppendNewFiles, } } } @@ -242,6 +252,12 @@ impl<'a> ParquetReadOptions<'a> { self } + /// Specify schema to use for parquet read + pub fn schema(mut self, schema: &'a Schema) -> Self { + self.schema = Some(schema); + self + } + /// Specify table_partition_cols for partition pruning pub fn table_partition_cols( mut self, @@ -250,6 +266,18 @@ impl<'a> ParquetReadOptions<'a> { self.table_partition_cols = table_partition_cols; self } + + /// Configure if file has known sort order + pub fn file_sort_order(mut self, file_sort_order: Vec>) -> Self { + self.file_sort_order = file_sort_order; + self + } + + /// Configure how insertions to this table should be handled + pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self { + self.insert_mode = insert_mode; + self + } } /// Options that control the reading of ARROW files. @@ -525,6 +553,8 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> { .with_file_extension(self.file_extension) .with_target_partitions(config.target_partitions()) .with_table_partition_cols(self.table_partition_cols.clone()) + .with_file_sort_order(self.file_sort_order.clone()) + .with_insert_mode(self.insert_mode.clone()) } async fn get_resolved_schema( @@ -533,11 +563,8 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> { state: SessionState, table_path: ListingTableUrl, ) -> Result { - // with parquet we resolve the schema in all cases - Ok(self - .to_listing_options(config) - .infer_schema(&state, &table_path) - .await?) + self._get_resolved_schema(config, state, table_path, self.schema, false) + .await } } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 6cda0fe68b57..5a0475675c27 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -19,35 +19,44 @@ use std::any::Any; use std::sync::Arc; +use std::fmt; +use std::fmt::Debug; +use rand::distributions::DistString; use arrow::datatypes::SchemaRef; use arrow::datatypes::{Fields, Schema}; use async_trait::async_trait; use bytes::{BufMut, BytesMut}; -use datafusion_common::DataFusionError; +use datafusion_common::{DataFusionError, plan_err}; +use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; use futures::{StreamExt, TryStreamExt}; use hashbrown::HashMap; use object_store::{ObjectMeta, ObjectStore}; -use parquet::arrow::parquet_to_arrow_schema; +use parquet::arrow::{parquet_to_arrow_schema, AsyncArrowWriter}; use parquet::file::footer::{decode_footer, decode_metadata}; use parquet::file::metadata::ParquetMetaData; +use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; use parquet::file::statistics::Statistics as ParquetStatistics; +use rand::distributions::Alphanumeric; +use tokio::io::AsyncWrite; -use super::FileFormat; +use super::{FileFormat, FileWriterMode, AbortableWrite}; use super::FileScanConfig; +use super::file_type::FileCompressionType; use crate::arrow::array::{ BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, }; use crate::arrow::datatypes::DataType; use crate::config::ConfigOptions; -use crate::datasource::physical_plan::{ParquetExec, SchemaAdapter}; +use crate::datasource::physical_plan::{ParquetExec, SchemaAdapter, FileSinkConfig, FileGroupDisplay, FileMeta}; use crate::datasource::{create_max_min_accs, get_col_stats}; use crate::error::Result; use crate::execution::context::SessionState; use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; -use crate::physical_plan::{Accumulator, ExecutionPlan, Statistics}; +use crate::physical_plan::insert::{DataSink, InsertExec}; +use crate::physical_plan::{Accumulator, ExecutionPlan, Statistics, DisplayAs, DisplayFormatType, SendableRecordBatchStream}; /// The default file extension of parquet files pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet"; @@ -208,6 +217,24 @@ impl FileFormat for ParquetFormat { self.metadata_size_hint(state.config_options()), ))) } + + async fn create_writer_physical_plan( + &self, + input: Arc, + _state: &SessionState, + conf: FileSinkConfig, + ) -> Result> { + if conf.overwrite { + return Err(DataFusionError::NotImplemented( + "Overwrites are not implemented yet for Parquet".into(), + )); + } + + let sink_schema = conf.output_schema().clone(); + let sink = Arc::new(ParquetSink::new(conf)); + + Ok(Arc::new(InsertExec::new(input, sink, sink_schema)) as _) + } } fn summarize_min_max( @@ -543,6 +570,163 @@ async fn fetch_statistics( Ok(statistics) } +/// Implements [`DataSink`] for writing to a parquet file. +struct ParquetSink { + /// Config options for writing data + config: FileSinkConfig, +} + +impl Debug for ParquetSink { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ParquetSink") + .finish() + } +} + +impl DisplayAs for ParquetSink { + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!( + f, + "ParquetSink(writer_mode={:?}, file_groups=", + self.config.writer_mode + )?; + FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?; + write!(f, ")") + } + } + } +} + +impl ParquetSink { + fn new( + config: FileSinkConfig, + ) -> Self { + Self { + config, + } + } + + /// Builds a parquet WriterProperties struct, setting options as appropriate from TaskContext options + fn parquet_writer_props_from_context(&self, context: &Arc) -> WriterProperties{ + let parquet_context = &context.session_config().options().execution.parquet; + let mut builder = WriterProperties::builder() + .set_created_by(parquet_context.created_by.clone()) + .set_data_page_row_count_limit(parquet_context.data_page_row_count_limit) + .set_data_page_size_limit(parquet_context.data_pagesize_limit); + + if parquet_context.bloom_filter_enabled.is_some(){ + builder = builder.set_bloom_filter_enabled(parquet_context.bloom_filter_enabled.unwrap()) + } + + // TODO + //.set_bloom_filter_fpp(parquet_context.bloom_filter_fpp) + // TODO + //.set_bloom_filter_ndv(parquet_context.bloom_filter_ndv) + //.set_compression(parquet::basic::Compression::try_from(parquet_context.compression)) + builder.build() + } + + // Create a write for parquet files + async fn create_writer( + &self, + file_meta: FileMeta, + object_store: Arc, + parquet_props: WriterProperties, + ) -> Result>> { + let object = &file_meta.object_meta; + match self.config.writer_mode { + FileWriterMode::Append => { + return plan_err!("Appending to Parquet files is not supported by the file format!") + } + FileWriterMode::Put => { + return Err(DataFusionError::NotImplemented("FileWriterMode::Put is not implemented for ParquetSink".into())) + } + FileWriterMode::PutMultipart => { + let (_, multipart_writer) = object_store + .put_multipart(&object.location) + .await + .map_err(DataFusionError::ObjectStore)?; + let writer = AsyncArrowWriter::try_new( + multipart_writer, + self.config.output_schema.clone(), + 10485760, + Some(parquet_props), + )?; + return Ok(writer) + } + } + } +} + +#[async_trait] +impl DataSink for ParquetSink { + async fn write_all( + &self, + mut data: Vec, + context: &Arc, + ) -> Result { + let num_partitions = data.len(); + let parquet_props = self.parquet_writer_props_from_context(context); + + let object_store = context + .runtime_env() + .object_store(&self.config.object_store_url)?; + + // Construct writer for each file group + let mut writers = vec![]; + match self.config.writer_mode { + FileWriterMode::Append => { + return plan_err!("Parquet format does not support appending to existing file!") + } + FileWriterMode::Put => { + return Err(DataFusionError::NotImplemented( + "Put Mode is not implemented for ParquetSink yet".into(), + )) + } + FileWriterMode::PutMultipart => { + // Currently assuming only 1 partition path (i.e. not hive-style partitioning on a column) + let base_path = &self.config.table_paths[0]; + // Uniquely identify this batch of files with a random string, to prevent collisions overwriting files + let write_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16); + for part_idx in 0..num_partitions { + let file_path = base_path + .prefix() + .child(format!("/{}_{}.parquet", write_id, part_idx)); + let object_meta = ObjectMeta { + location: file_path, + last_modified: chrono::offset::Utc::now(), + size: 0, + e_tag: None, + }; + let writer = self + .create_writer(object_meta.into(), object_store.clone(), parquet_props.clone()) + .await?; + writers.push(writer); + } + } + } + + + let mut row_count = 0; + // TODO parallelize serialization accross partitions and batches within partitions + // see: https://github.com/apache/arrow-datafusion/issues/7079 + for idx in 0..num_partitions { + while let Some(batch) = data[idx].next().await.transpose()? { + row_count += batch.num_rows(); + writers[idx].write(&batch).await?; + } + } + + for writer in writers{ + writer.close().await?; + } + + Ok(row_count as u64) + } +} + #[cfg(test)] pub(crate) mod test_util { use super::*; diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 8519628760f2..fb58105cb693 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1589,6 +1589,27 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_insert_into_append_new_parquet_files() -> Result<()> { + helper_test_append_new_files_to_table( + FileType::PARQUET, + FileCompressionType::UNCOMPRESSED, + ) + .await?; + Ok(()) + } + + #[tokio::test] + async fn test_insert_into_append_to_parquet_file_fails() -> Result<()> { + let maybe_err = helper_test_insert_into_append_to_existing_files( + FileType::PARQUET, + FileCompressionType::UNCOMPRESSED, + ) + .await; + let _err = maybe_err.expect_err("Appending to existing parquet file did not fail!".into()); + Ok(()) + } + fn load_empty_schema_table( schema: SchemaRef, temp_path: &str, @@ -1825,9 +1846,9 @@ mod tests { .register_parquet( "t", tmp_dir.path().to_str().unwrap(), - ParquetReadOptions::default(), // TODO implement insert_mode for parquet - //.insert_mode(ListingTableInsertMode::AppendNewFiles) - //.schema(schema.as_ref()), + ParquetReadOptions::default() + .insert_mode(ListingTableInsertMode::AppendNewFiles) + .schema(schema.as_ref()), ) .await?; } @@ -1894,16 +1915,16 @@ mod tests { // Read the records in the table let batches = session_ctx - .sql("select count(*) from t") + .sql("select count(*) as count from t") .await? .collect() .await?; let expected = vec![ - "+----------+", - "| COUNT(*) |", - "+----------+", - "| 6 |", - "+----------+", + "+-------+", + "| count |", + "+-------+", + "| 6 |", + "+-------+", ]; // Assert that the batches read from the file match the expected result. @@ -1935,18 +1956,18 @@ mod tests { // Read the contents of the table let batches = session_ctx - .sql("select count(*) from t") + .sql("select count(*) AS count from t") .await? .collect() .await?; // Define the expected result after the second append. let expected = vec![ - "+----------+", - "| COUNT(*) |", - "+----------+", - "| 12 |", - "+----------+", + "+-------+", + "| count |", + "+-------+", + "| 12 |", + "+-------+", ]; // Assert that the batches read from the file after the second append match the expected result. diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index ce9165f027c7..5adc3f3ec8df 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -1024,7 +1024,11 @@ impl SessionContext { ) -> Result<()> { let listing_options = options.to_listing_options(&self.state.read().config); - self.register_listing_table(name, table_path, listing_options, None, None) + self.register_listing_table(name, + table_path, + listing_options, + options.schema.map(|s| Arc::new(s.to_owned())), + None) .await?; Ok(()) } From e52e7a097d787b84e97db0d121595cf273ca5a78 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Tue, 8 Aug 2023 21:56:41 -0400 Subject: [PATCH 02/11] clean up --- datafusion/common/src/config.rs | 4 +- .../src/datasource/file_format/parquet.rs | 101 ++++++++++-------- .../core/src/datasource/listing/table.rs | 7 +- datafusion/core/src/execution/context.rs | 12 ++- 4 files changed, 69 insertions(+), 55 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index ae9d35b1a378..0a662f6cc9e8 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -291,7 +291,7 @@ config_namespace! { /// Sets best effort maximum size of data page in bytes pub data_pagesize_limit: usize, default = parquet::file::properties::DEFAULT_PAGE_SIZE - + /// Sets best effort maximum number of rows in data page pub data_page_row_count_limit: usize, default = usize::MAX @@ -321,7 +321,7 @@ config_namespace! { /// Sets if bloom filter is enabled for any column pub bloom_filter_enabled: Option, default = None } - // TODO macro not working with Option or Option + // TODO macro not working with Option or Option // Sets bloom filter false positive probability //pub bloom_fiter_fpp: Option, default = None diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 5a0475675c27..2bd6a9e335aa 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -17,17 +17,17 @@ //! Parquet format abstractions +use rand::distributions::DistString; use std::any::Any; -use std::sync::Arc; use std::fmt; use std::fmt::Debug; -use rand::distributions::DistString; +use std::sync::Arc; use arrow::datatypes::SchemaRef; use arrow::datatypes::{Fields, Schema}; use async_trait::async_trait; use bytes::{BufMut, BytesMut}; -use datafusion_common::{DataFusionError, plan_err}; +use datafusion_common::{plan_err, DataFusionError}; use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalExpr; use futures::{StreamExt, TryStreamExt}; @@ -36,27 +36,30 @@ use object_store::{ObjectMeta, ObjectStore}; use parquet::arrow::{parquet_to_arrow_schema, AsyncArrowWriter}; use parquet::file::footer::{decode_footer, decode_metadata}; use parquet::file::metadata::ParquetMetaData; -use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder}; +use parquet::file::properties::WriterProperties; use parquet::file::statistics::Statistics as ParquetStatistics; use rand::distributions::Alphanumeric; -use tokio::io::AsyncWrite; -use super::{FileFormat, FileWriterMode, AbortableWrite}; use super::FileScanConfig; -use super::file_type::FileCompressionType; +use super::{FileFormat, FileWriterMode}; use crate::arrow::array::{ BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, }; use crate::arrow::datatypes::DataType; use crate::config::ConfigOptions; -use crate::datasource::physical_plan::{ParquetExec, SchemaAdapter, FileSinkConfig, FileGroupDisplay, FileMeta}; +use crate::datasource::physical_plan::{ + FileGroupDisplay, FileMeta, FileSinkConfig, ParquetExec, SchemaAdapter, +}; use crate::datasource::{create_max_min_accs, get_col_stats}; use crate::error::Result; use crate::execution::context::SessionState; use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; use crate::physical_plan::insert::{DataSink, InsertExec}; -use crate::physical_plan::{Accumulator, ExecutionPlan, Statistics, DisplayAs, DisplayFormatType, SendableRecordBatchStream}; +use crate::physical_plan::{ + Accumulator, DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream, + Statistics, +}; /// The default file extension of parquet files pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet"; @@ -578,8 +581,7 @@ struct ParquetSink { impl Debug for ParquetSink { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("ParquetSink") - .finish() + f.debug_struct("ParquetSink").finish() } } @@ -600,30 +602,30 @@ impl DisplayAs for ParquetSink { } impl ParquetSink { - fn new( - config: FileSinkConfig, - ) -> Self { - Self { - config, - } + fn new(config: FileSinkConfig) -> Self { + Self { config } } /// Builds a parquet WriterProperties struct, setting options as appropriate from TaskContext options - fn parquet_writer_props_from_context(&self, context: &Arc) -> WriterProperties{ + fn parquet_writer_props_from_context( + &self, + context: &Arc, + ) -> WriterProperties { let parquet_context = &context.session_config().options().execution.parquet; let mut builder = WriterProperties::builder() - .set_created_by(parquet_context.created_by.clone()) - .set_data_page_row_count_limit(parquet_context.data_page_row_count_limit) - .set_data_page_size_limit(parquet_context.data_pagesize_limit); + .set_created_by(parquet_context.created_by.clone()) + .set_data_page_row_count_limit(parquet_context.data_page_row_count_limit) + .set_data_page_size_limit(parquet_context.data_pagesize_limit); - if parquet_context.bloom_filter_enabled.is_some(){ - builder = builder.set_bloom_filter_enabled(parquet_context.bloom_filter_enabled.unwrap()) + if parquet_context.bloom_filter_enabled.is_some() { + builder = builder + .set_bloom_filter_enabled(parquet_context.bloom_filter_enabled.unwrap()) } // TODO //.set_bloom_filter_fpp(parquet_context.bloom_filter_fpp) // TODO - //.set_bloom_filter_ndv(parquet_context.bloom_filter_ndv) + //.set_bloom_filter_ndv(parquet_context.bloom_filter_ndv) //.set_compression(parquet::basic::Compression::try_from(parquet_context.compression)) builder.build() } @@ -634,15 +636,19 @@ impl ParquetSink { file_meta: FileMeta, object_store: Arc, parquet_props: WriterProperties, - ) -> Result>> { + ) -> Result< + AsyncArrowWriter>, + > { let object = &file_meta.object_meta; match self.config.writer_mode { FileWriterMode::Append => { - return plan_err!("Appending to Parquet files is not supported by the file format!") - } - FileWriterMode::Put => { - return Err(DataFusionError::NotImplemented("FileWriterMode::Put is not implemented for ParquetSink".into())) + plan_err!( + "Appending to Parquet files is not supported by the file format!" + ) } + FileWriterMode::Put => Err(DataFusionError::NotImplemented( + "FileWriterMode::Put is not implemented for ParquetSink".into(), + )), FileWriterMode::PutMultipart => { let (_, multipart_writer) = object_store .put_multipart(&object.location) @@ -650,11 +656,11 @@ impl ParquetSink { .map_err(DataFusionError::ObjectStore)?; let writer = AsyncArrowWriter::try_new( multipart_writer, - self.config.output_schema.clone(), + self.config.output_schema.clone(), 10485760, Some(parquet_props), )?; - return Ok(writer) + Ok(writer) } } } @@ -678,7 +684,9 @@ impl DataSink for ParquetSink { let mut writers = vec![]; match self.config.writer_mode { FileWriterMode::Append => { - return plan_err!("Parquet format does not support appending to existing file!") + return plan_err!( + "Parquet format does not support appending to existing file!" + ) } FileWriterMode::Put => { return Err(DataFusionError::NotImplemented( @@ -701,27 +709,30 @@ impl DataSink for ParquetSink { e_tag: None, }; let writer = self - .create_writer(object_meta.into(), object_store.clone(), parquet_props.clone()) + .create_writer( + object_meta.into(), + object_store.clone(), + parquet_props.clone(), + ) .await?; writers.push(writer); } } } - - let mut row_count = 0; - // TODO parallelize serialization accross partitions and batches within partitions - // see: https://github.com/apache/arrow-datafusion/issues/7079 - for idx in 0..num_partitions { - while let Some(batch) = data[idx].next().await.transpose()? { - row_count += batch.num_rows(); - writers[idx].write(&batch).await?; + let mut row_count = 0; + // TODO parallelize serialization accross partitions and batches within partitions + // see: https://github.com/apache/arrow-datafusion/issues/7079 + for idx in 0..num_partitions { + while let Some(batch) = data[idx].next().await.transpose()? { + row_count += batch.num_rows(); + writers[idx].write(&batch).await?; + } } - } - for writer in writers{ - writer.close().await?; - } + for writer in writers { + writer.close().await?; + } Ok(row_count as u64) } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index fb58105cb693..e48c108d3ebc 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1606,7 +1606,8 @@ mod tests { FileCompressionType::UNCOMPRESSED, ) .await; - let _err = maybe_err.expect_err("Appending to existing parquet file did not fail!".into()); + let _err = + maybe_err.expect_err("Appending to existing parquet file did not fail!"); Ok(()) } @@ -1847,8 +1848,8 @@ mod tests { "t", tmp_dir.path().to_str().unwrap(), ParquetReadOptions::default() - .insert_mode(ListingTableInsertMode::AppendNewFiles) - .schema(schema.as_ref()), + .insert_mode(ListingTableInsertMode::AppendNewFiles) + .schema(schema.as_ref()), ) .await?; } diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 5adc3f3ec8df..6593e22e6cc0 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -1024,12 +1024,14 @@ impl SessionContext { ) -> Result<()> { let listing_options = options.to_listing_options(&self.state.read().config); - self.register_listing_table(name, - table_path, - listing_options, + self.register_listing_table( + name, + table_path, + listing_options, options.schema.map(|s| Arc::new(s.to_owned())), - None) - .await?; + None, + ) + .await?; Ok(()) } From 5761062ab13ab1e38546b73e3254ae43bb747727 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Wed, 9 Aug 2023 07:33:49 -0400 Subject: [PATCH 03/11] fix information_schema test --- .../sqllogictests/test_files/information_schema.slt | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt index 162e20820191..05cce5b96b1d 100644 --- a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt +++ b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt @@ -146,12 +146,23 @@ datafusion.execution.aggregate.scalar_update_factor 10 datafusion.execution.batch_size 8192 datafusion.execution.coalesce_batches true datafusion.execution.collect_statistics false +datafusion.execution.parquet.bloom_filter_enabled NULL +datafusion.execution.parquet.compression NULL +datafusion.execution.parquet.created_by parquet-rs version 45.0.0 +datafusion.execution.parquet.data_page_row_count_limit 18446744073709551615 +datafusion.execution.parquet.data_pagesize_limit 1048576 +datafusion.execution.parquet.dictionary_enabled NULL +datafusion.execution.parquet.dictionary_page_size_limit 1048576 datafusion.execution.parquet.enable_page_index true +datafusion.execution.parquet.encoding NULL +datafusion.execution.parquet.max_row_group_size 1048576 +datafusion.execution.parquet.max_statistics_size NULL datafusion.execution.parquet.metadata_size_hint NULL datafusion.execution.parquet.pruning true datafusion.execution.parquet.pushdown_filters false datafusion.execution.parquet.reorder_filters false datafusion.execution.parquet.skip_metadata true +datafusion.execution.parquet.statistics_enabled NULL datafusion.execution.planning_concurrency 13 datafusion.execution.sort_in_place_threshold_bytes 1048576 datafusion.execution.sort_spill_reservation_bytes 10485760 From 580edaa5570b91b8f5785c1a910bb152a16de988 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Wed, 9 Aug 2023 08:16:03 -0400 Subject: [PATCH 04/11] refactor code to new write.rs mod --- .../core/src/datasource/file_format/csv.rs | 7 +- .../core/src/datasource/file_format/json.rs | 7 +- .../core/src/datasource/file_format/mod.rs | 344 +----------------- .../src/datasource/file_format/parquet.rs | 4 +- .../core/src/datasource/file_format/write.rs | 311 ++++++++++++++++ .../core/src/datasource/listing/table.rs | 4 +- .../datasource/physical_plan/file_stream.rs | 2 +- .../core/src/datasource/physical_plan/mod.rs | 2 +- 8 files changed, 332 insertions(+), 349 deletions(-) create mode 100644 datafusion/core/src/datasource/file_format/write.rs diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 8f56bf139e20..d9eeeb668ebc 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -37,10 +37,13 @@ use futures::stream::BoxStream; use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore}; -use super::{create_writer, stateless_serialize_and_write_files, FileFormat}; +use super::{FileFormat, DEFAULT_SCHEMA_INFER_MAX_RECORD}; use crate::datasource::file_format::file_type::FileCompressionType; use crate::datasource::file_format::FileWriterMode; -use crate::datasource::file_format::{BatchSerializer, DEFAULT_SCHEMA_INFER_MAX_RECORD}; +use crate::datasource::file_format::write::{ + AbortMode, AbortableWrite, AsyncPutWriter, BatchSerializer, MultiPart, + DEFAULT_SCHEMA_INFER_MAX_RECORD, create_writer +}; use crate::datasource::physical_plan::{ CsvExec, FileGroupDisplay, FileScanConfig, FileSinkConfig, }; diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 6856ad89eada..3f43100ffee5 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -47,9 +47,10 @@ use crate::physical_plan::insert::InsertExec; use crate::physical_plan::SendableRecordBatchStream; use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics}; -use super::create_writer; -use super::stateless_serialize_and_write_files; -use super::BatchSerializer; +use crate::datasource::file_format::write::{ + AbortMode, AbortableWrite, AsyncPutWriter, BatchSerializer, MultiPart, + stateless_serialize_and_write_files, FileWriterMode, create_writer +}; use super::FileFormat; use super::FileScanConfig; use super::FileWriterMode; diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 42b16656fbc6..181e1a2edc55 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -16,6 +16,7 @@ // under the License. //! Module containing helper methods for the various file formats +//! See write.rs for write related helper methods /// Default max records to scan to infer the schema pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000; @@ -27,36 +28,24 @@ pub mod file_type; pub mod json; pub mod options; pub mod parquet; +pub mod write; use std::any::Any; -use std::io::Error; -use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; -use std::{fmt, mem}; +use std::fmt; use crate::arrow::datatypes::SchemaRef; use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig}; use crate::error::Result; use crate::execution::context::SessionState; -use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream, Statistics}; +use crate::physical_plan::{ExecutionPlan, Statistics}; -use arrow_array::RecordBatch; use datafusion_common::DataFusionError; use datafusion_physical_expr::PhysicalExpr; use async_trait::async_trait; -use bytes::Bytes; -use futures::future::BoxFuture; -use futures::FutureExt; -use futures::{ready, StreamExt}; -use object_store::path::Path; -use object_store::{MultipartId, ObjectMeta, ObjectStore}; -use tokio::io::{AsyncWrite, AsyncWriteExt}; +use object_store::{ObjectMeta, ObjectStore}; -use self::file_type::FileCompressionType; - -use super::physical_plan::FileMeta; /// This trait abstracts all the file format specific implementations /// from the [`TableProvider`]. This helps code re-utilization across /// providers that support the the same file formats. @@ -116,329 +105,6 @@ pub trait FileFormat: Send + Sync + fmt::Debug { } } -/// `AsyncPutWriter` is an object that facilitates asynchronous writing to object stores. -/// It is specifically designed for the `object_store` crate's `put` method and sends -/// whole bytes at once when the buffer is flushed. -pub struct AsyncPutWriter { - /// Object metadata - object_meta: ObjectMeta, - /// A shared reference to the object store - store: Arc, - /// A buffer that stores the bytes to be sent - current_buffer: Vec, - /// Used for async handling in flush method - inner_state: AsyncPutState, -} - -impl AsyncPutWriter { - /// Constructor for the `AsyncPutWriter` object - pub fn new(object_meta: ObjectMeta, store: Arc) -> Self { - Self { - object_meta, - store, - current_buffer: vec![], - // The writer starts out in buffering mode - inner_state: AsyncPutState::Buffer, - } - } - - /// Separate implementation function that unpins the [`AsyncPutWriter`] so - /// that partial borrows work correctly - fn poll_shutdown_inner( - &mut self, - cx: &mut Context<'_>, - ) -> Poll> { - loop { - match &mut self.inner_state { - AsyncPutState::Buffer => { - // Convert the current buffer to bytes and take ownership of it - let bytes = Bytes::from(mem::take(&mut self.current_buffer)); - // Set the inner state to Put variant with the bytes - self.inner_state = AsyncPutState::Put { bytes } - } - AsyncPutState::Put { bytes } => { - // Send the bytes to the object store's put method - return Poll::Ready( - ready!(self - .store - .put(&self.object_meta.location, bytes.clone()) - .poll_unpin(cx)) - .map_err(Error::from), - ); - } - } - } - } -} - -/// An enum that represents the inner state of AsyncPut -enum AsyncPutState { - /// Building Bytes struct in this state - Buffer, - /// Data in the buffer is being sent to the object store - Put { bytes: Bytes }, -} - -impl AsyncWrite for AsyncPutWriter { - // Define the implementation of the AsyncWrite trait for the `AsyncPutWriter` struct - fn poll_write( - mut self: Pin<&mut Self>, - _: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - // Extend the current buffer with the incoming buffer - self.current_buffer.extend_from_slice(buf); - // Return a ready poll with the length of the incoming buffer - Poll::Ready(Ok(buf.len())) - } - - fn poll_flush( - self: Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll> { - // Return a ready poll with an empty result - Poll::Ready(Ok(())) - } - - fn poll_shutdown( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - // Call the poll_shutdown_inner method to handle the actual sending of data to the object store - self.poll_shutdown_inner(cx) - } -} - -/// Stores data needed during abortion of MultiPart writers -pub(crate) struct MultiPart { - /// A shared reference to the object store - store: Arc, - multipart_id: MultipartId, - location: Path, -} - -impl MultiPart { - /// Create a new `MultiPart` - pub fn new( - store: Arc, - multipart_id: MultipartId, - location: Path, - ) -> Self { - Self { - store, - multipart_id, - location, - } - } -} - -pub(crate) enum AbortMode { - Put, - Append, - MultiPart(MultiPart), -} - -/// A wrapper struct with abort method and writer -pub(crate) struct AbortableWrite { - writer: W, - mode: AbortMode, -} - -impl AbortableWrite { - /// Create a new `AbortableWrite` instance with the given writer, and write mode. - fn new(writer: W, mode: AbortMode) -> Self { - Self { writer, mode } - } - - /// handling of abort for different write modes - fn abort_writer(&self) -> Result>> { - match &self.mode { - AbortMode::Put => Ok(async { Ok(()) }.boxed()), - AbortMode::Append => Err(DataFusionError::Execution( - "Cannot abort in append mode".to_string(), - )), - AbortMode::MultiPart(MultiPart { - store, - multipart_id, - location, - }) => { - let location = location.clone(); - let multipart_id = multipart_id.clone(); - let store = store.clone(); - Ok(Box::pin(async move { - store - .abort_multipart(&location, &multipart_id) - .await - .map_err(DataFusionError::ObjectStore) - })) - } - } - } -} - -impl AsyncWrite for AbortableWrite { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - Pin::new(&mut self.get_mut().writer).poll_write(cx, buf) - } - - fn poll_flush( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - Pin::new(&mut self.get_mut().writer).poll_flush(cx) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - Pin::new(&mut self.get_mut().writer).poll_shutdown(cx) - } -} - -/// An enum that defines different file writer modes. -#[derive(Debug, Clone, Copy)] -pub enum FileWriterMode { - /// Data is appended to an existing file. - Append, - /// Data is written to a new file. - Put, - /// Data is written to a new file in multiple parts. - PutMultipart, -} - -/// Returns an [`AbortableWrite`] which writes to the given object store location -/// with the specified compression -pub(crate) async fn create_writer( - writer_mode: FileWriterMode, - file_compression_type: FileCompressionType, - file_meta: FileMeta, - object_store: Arc, -) -> Result>> { - let object = &file_meta.object_meta; - match writer_mode { - // If the mode is append, call the store's append method and return wrapped in - // a boxed trait object. - FileWriterMode::Append => { - let writer = object_store - .append(&object.location) - .await - .map_err(DataFusionError::ObjectStore)?; - let writer = AbortableWrite::new( - file_compression_type.convert_async_writer(writer)?, - AbortMode::Append, - ); - Ok(writer) - } - // If the mode is put, create a new AsyncPut writer and return it wrapped in - // a boxed trait object - FileWriterMode::Put => { - let writer = Box::new(AsyncPutWriter::new(object.clone(), object_store)); - let writer = AbortableWrite::new( - file_compression_type.convert_async_writer(writer)?, - AbortMode::Put, - ); - Ok(writer) - } - // If the mode is put multipart, call the store's put_multipart method and - // return the writer wrapped in a boxed trait object. - FileWriterMode::PutMultipart => { - let (multipart_id, writer) = object_store - .put_multipart(&object.location) - .await - .map_err(DataFusionError::ObjectStore)?; - Ok(AbortableWrite::new( - file_compression_type.convert_async_writer(writer)?, - AbortMode::MultiPart(MultiPart::new( - object_store, - multipart_id, - object.location.clone(), - )), - )) - } - } -} - -/// A trait that defines the methods required for a RecordBatch serializer. -#[async_trait] -pub trait BatchSerializer: Unpin + Send { - /// Asynchronously serializes a `RecordBatch` and returns the serialized bytes. - async fn serialize(&mut self, batch: RecordBatch) -> Result; -} - -/// Checks if any of the passed writers have encountered an error -/// and if so, all writers are aborted. -async fn check_for_errors( - result: Result, - writers: &mut [AbortableWrite], -) -> Result { - match result { - Ok(value) => Ok(value), - Err(e) => { - // Abort all writers before returning the error: - for writer in writers { - let mut abort_future = writer.abort_writer(); - if let Ok(abort_future) = &mut abort_future { - let _ = abort_future.await; - } - // Ignore errors that occur during abortion, - // We do try to abort all writers before returning error. - } - // After aborting writers return original error. - Err(e) - } - } -} - -/// Contains the common logic for serializing RecordBatches and -/// writing the resulting bytes to an ObjectStore. -/// Serialization is assumed to be stateless, i.e. -/// each RecordBatch can be serialized without any -/// dependency on the RecordBatches before or after. -async fn stateless_serialize_and_write_files( - mut data: Vec, - mut serializers: Vec>, - mut writers: Vec>>, -) -> Result { - let num_partitions = data.len(); - let mut row_count = 0; - // Map errors to DatafusionError. - let err_converter = - |_| DataFusionError::Internal("Unexpected FileSink Error".to_string()); - // TODO parallelize serialization accross partitions and batches within partitions - // see: https://github.com/apache/arrow-datafusion/issues/7079 - for idx in 0..num_partitions { - while let Some(maybe_batch) = data[idx].next().await { - // Write data to files in a round robin fashion: - let serializer = &mut serializers[idx]; - let batch = check_for_errors(maybe_batch, &mut writers).await?; - row_count += batch.num_rows(); - let bytes = - check_for_errors(serializer.serialize(batch).await, &mut writers).await?; - let writer = &mut writers[idx]; - check_for_errors( - writer.write_all(&bytes).await.map_err(err_converter), - &mut writers, - ) - .await?; - } - } - // Perform cleanup: - let n_writers = writers.len(); - for idx in 0..n_writers { - check_for_errors( - writers[idx].shutdown().await.map_err(err_converter), - &mut writers, - ) - .await?; - } - Ok(row_count as u64) -} - #[cfg(test)] pub(crate) mod test_util { use std::ops::Range; diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 2bd6a9e335aa..00fd8222b1d9 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -41,7 +41,8 @@ use parquet::file::statistics::Statistics as ParquetStatistics; use rand::distributions::Alphanumeric; use super::FileScanConfig; -use super::{FileFormat, FileWriterMode}; +use super::FileFormat; +use super::write::FileWriterMode; use crate::arrow::array::{ BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, }; @@ -726,6 +727,7 @@ impl DataSink for ParquetSink { for idx in 0..num_partitions { while let Some(batch) = data[idx].next().await.transpose()? { row_count += batch.num_rows(); + // TODO cleanup all multipart writes when any encounters an error writers[idx].write(&batch).await?; } } diff --git a/datafusion/core/src/datasource/file_format/write.rs b/datafusion/core/src/datasource/file_format/write.rs new file mode 100644 index 000000000000..ce14912be07c --- /dev/null +++ b/datafusion/core/src/datasource/file_format/write.rs @@ -0,0 +1,311 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Module containing helper methods/traits related to enabling +//! write support for the various file formats + +use std::io::Error; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::mem; + +use crate::error::Result; +use crate::physical_plan::SendableRecordBatchStream; + +use arrow_array::RecordBatch; +use datafusion_common::DataFusionError; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::future::BoxFuture; +use futures::FutureExt; +use futures::{ready, StreamExt}; +use object_store::path::Path; +use object_store::{MultipartId, ObjectMeta, ObjectStore}; +use tokio::io::{AsyncWrite, AsyncWriteExt}; + + +/// `AsyncPutWriter` is an object that facilitates asynchronous writing to object stores. +/// It is specifically designed for the `object_store` crate's `put` method and sends +/// whole bytes at once when the buffer is flushed. +pub struct AsyncPutWriter { + /// Object metadata + object_meta: ObjectMeta, + /// A shared reference to the object store + store: Arc, + /// A buffer that stores the bytes to be sent + current_buffer: Vec, + /// Used for async handling in flush method + inner_state: AsyncPutState, +} + +impl AsyncPutWriter { + /// Constructor for the `AsyncPutWriter` object + pub fn new(object_meta: ObjectMeta, store: Arc) -> Self { + Self { + object_meta, + store, + current_buffer: vec![], + // The writer starts out in buffering mode + inner_state: AsyncPutState::Buffer, + } + } + + /// Separate implementation function that unpins the [`AsyncPutWriter`] so + /// that partial borrows work correctly + fn poll_shutdown_inner( + &mut self, + cx: &mut Context<'_>, + ) -> Poll> { + loop { + match &mut self.inner_state { + AsyncPutState::Buffer => { + // Convert the current buffer to bytes and take ownership of it + let bytes = Bytes::from(mem::take(&mut self.current_buffer)); + // Set the inner state to Put variant with the bytes + self.inner_state = AsyncPutState::Put { bytes } + } + AsyncPutState::Put { bytes } => { + // Send the bytes to the object store's put method + return Poll::Ready( + ready!(self + .store + .put(&self.object_meta.location, bytes.clone()) + .poll_unpin(cx)) + .map_err(Error::from), + ); + } + } + } + } +} + +/// An enum that represents the inner state of AsyncPut +enum AsyncPutState { + /// Building Bytes struct in this state + Buffer, + /// Data in the buffer is being sent to the object store + Put { bytes: Bytes }, +} + +impl AsyncWrite for AsyncPutWriter { + // Define the implementation of the AsyncWrite trait for the `AsyncPutWriter` struct + fn poll_write( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + // Extend the current buffer with the incoming buffer + self.current_buffer.extend_from_slice(buf); + // Return a ready poll with the length of the incoming buffer + Poll::Ready(Ok(buf.len())) + } + + fn poll_flush( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + // Return a ready poll with an empty result + Poll::Ready(Ok(())) + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + // Call the poll_shutdown_inner method to handle the actual sending of data to the object store + self.poll_shutdown_inner(cx) + } +} + +/// Stores data needed during abortion of MultiPart writers +pub(crate) struct MultiPart { + /// A shared reference to the object store + store: Arc, + multipart_id: MultipartId, + location: Path, +} + +impl MultiPart { + /// Create a new `MultiPart` + pub fn new( + store: Arc, + multipart_id: MultipartId, + location: Path, + ) -> Self { + Self { + store, + multipart_id, + location, + } + } +} + +pub(crate) enum AbortMode { + Put, + Append, + MultiPart(MultiPart), +} + +/// A wrapper struct with abort method and writer +pub(crate) struct AbortableWrite { + writer: W, + mode: AbortMode, +} + +impl AbortableWrite { + /// Create a new `AbortableWrite` instance with the given writer, and write mode. + pub(crate) fn new(writer: W, mode: AbortMode) -> Self { + Self { writer, mode } + } + + /// handling of abort for different write modes + pub(crate) fn abort_writer(&self) -> Result>> { + match &self.mode { + AbortMode::Put => Ok(async { Ok(()) }.boxed()), + AbortMode::Append => Err(DataFusionError::Execution( + "Cannot abort in append mode".to_string(), + )), + AbortMode::MultiPart(MultiPart { + store, + multipart_id, + location, + }) => { + let location = location.clone(); + let multipart_id = multipart_id.clone(); + let store = store.clone(); + Ok(Box::pin(async move { + store + .abort_multipart(&location, &multipart_id) + .await + .map_err(DataFusionError::ObjectStore) + })) + } + } + } +} + +impl AsyncWrite for AbortableWrite { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.get_mut().writer).poll_write(cx, buf) + } + + fn poll_flush( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.get_mut().writer).poll_flush(cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.get_mut().writer).poll_shutdown(cx) + } +} + +/// An enum that defines different file writer modes. +#[derive(Debug, Clone, Copy)] +pub enum FileWriterMode { + /// Data is appended to an existing file. + Append, + /// Data is written to a new file. + Put, + /// Data is written to a new file in multiple parts. + PutMultipart, +} +/// A trait that defines the methods required for a RecordBatch serializer. +#[async_trait] +pub trait BatchSerializer: Unpin + Send { + /// Asynchronously serializes a `RecordBatch` and returns the serialized bytes. + async fn serialize(&mut self, batch: RecordBatch) -> Result; +} + +/// Checks if any of the passed writers have encountered an error +/// and if so, all writers are aborted. +async fn check_for_errors( + result: Result, + writers: &mut [AbortableWrite], +) -> Result { + match result { + Ok(value) => Ok(value), + Err(e) => { + // Abort all writers before returning the error: + for writer in writers { + let mut abort_future = writer.abort_writer(); + if let Ok(abort_future) = &mut abort_future { + let _ = abort_future.await; + } + // Ignore errors that occur during abortion, + // We do try to abort all writers before returning error. + } + // After aborting writers return original error. + Err(e) + } + } +} + +/// Contains the common logic for serializing RecordBatches and +/// writing the resulting bytes to an ObjectStore. +/// Serialization is assumed to be stateless, i.e. +/// each RecordBatch can be serialized without any +/// dependency on the RecordBatches before or after. +pub(crate) async fn stateless_serialize_and_write_files( + mut data: Vec, + mut serializers: Vec>, + mut writers: Vec>>, +) -> Result { + let num_partitions = data.len(); + let mut row_count = 0; + // Map errors to DatafusionError. + let err_converter = + |_| DataFusionError::Internal("Unexpected FileSink Error".to_string()); + // TODO parallelize serialization accross partitions and batches within partitions + // see: https://github.com/apache/arrow-datafusion/issues/7079 + for idx in 0..num_partitions { + while let Some(maybe_batch) = data[idx].next().await { + // Write data to files in a round robin fashion: + let serializer = &mut serializers[idx]; + let batch = check_for_errors(maybe_batch, &mut writers).await?; + row_count += batch.num_rows(); + let bytes = + check_for_errors(serializer.serialize(batch).await, &mut writers).await?; + let writer = &mut writers[idx]; + check_for_errors( + writer.write_all(&bytes).await.map_err(err_converter), + &mut writers, + ) + .await?; + } + } + // Perform cleanup: + let n_writers = writers.len(); + for idx in 0..n_writers { + check_for_errors( + writers[idx].shutdown().await.map_err(err_converter), + &mut writers, + ) + .await?; + } + Ok(row_count as u64) +} \ No newline at end of file diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index e48c108d3ebc..29cd1152ea25 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -845,10 +845,10 @@ impl TableProvider for ListingTable { file_groups.len() ))); } - writer_mode = crate::datasource::file_format::FileWriterMode::Append; + writer_mode = crate::datasource::file_format::write::FileWriterMode::Append; } ListingTableInsertMode::AppendNewFiles => { - writer_mode = crate::datasource::file_format::FileWriterMode::PutMultipart + writer_mode = crate::datasource::file_format::write::FileWriterMode::PutMultipart } ListingTableInsertMode::Error => { return plan_err!( diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs b/datafusion/core/src/datasource/physical_plan/file_stream.rs index 2c4437de0a92..6ac073c34f61 100644 --- a/datafusion/core/src/datasource/physical_plan/file_stream.rs +++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs @@ -522,7 +522,7 @@ mod tests { use datafusion_common::DataFusionError; use super::*; - use crate::datasource::file_format::BatchSerializer; + use crate::datasource::file_format::write::BatchSerializer; use crate::datasource::object_store::ObjectStoreUrl; use crate::datasource::physical_plan::FileMeta; use crate::physical_plan::metrics::ExecutionPlanMetricsSet; diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 3e607d121ac9..b0914b081616 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -46,7 +46,7 @@ pub use json::{JsonOpener, NdJsonExec}; use crate::physical_plan::ExecutionPlan; use crate::{ - datasource::file_format::FileWriterMode, + datasource::file_format::write::FileWriterMode, physical_plan::{DisplayAs, DisplayFormatType}, }; use crate::{ From 1bd030e426eac48b4d666c0dc7c3b97d0cff9863 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Wed, 9 Aug 2023 08:31:45 -0400 Subject: [PATCH 05/11] remove config dependency on parquet crate --- datafusion/common/src/config.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 0a662f6cc9e8..a957c30e0e39 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -290,19 +290,19 @@ config_namespace! { // The following map to parquet::file::properties::WriterProperties /// Sets best effort maximum size of data page in bytes - pub data_pagesize_limit: usize, default = parquet::file::properties::DEFAULT_PAGE_SIZE + pub data_pagesize_limit: usize, default = 1024 * 1024 /// Sets best effort maximum number of rows in data page pub data_page_row_count_limit: usize, default = usize::MAX /// Sets best effort maximum dictionary page size, in bytes - pub dictionary_page_size_limit: usize, default = parquet::file::properties::DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT + pub dictionary_page_size_limit: usize, default = 1024 * 1024 /// Sets maximum number of rows in a row group - pub max_row_group_size: usize, default = parquet::file::properties::DEFAULT_MAX_ROW_GROUP_SIZE + pub max_row_group_size: usize, default = 1024 * 1024 /// Sets "created by" property - pub created_by: String, default = parquet::file::properties::DEFAULT_CREATED_BY.into() + pub created_by: String, default = concat!("parquet-rs version ", env!("CARGO_PKG_VERSION")).into() pub compression: Option, default = None From bd8f8a8897421633ad2f94bc90e7ae835c838bbb Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Wed, 9 Aug 2023 08:32:06 -0400 Subject: [PATCH 06/11] fmt --- datafusion/core/src/datasource/file_format/csv.rs | 4 ++-- datafusion/core/src/datasource/file_format/json.rs | 4 ++++ datafusion/core/src/datasource/file_format/mod.rs | 2 +- datafusion/core/src/datasource/file_format/parquet.rs | 4 ++-- datafusion/core/src/datasource/file_format/write.rs | 5 ++--- datafusion/core/src/datasource/listing/table.rs | 6 ++++-- 6 files changed, 15 insertions(+), 10 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index d9eeeb668ebc..bf9f74869ffe 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -41,8 +41,8 @@ use super::{FileFormat, DEFAULT_SCHEMA_INFER_MAX_RECORD}; use crate::datasource::file_format::file_type::FileCompressionType; use crate::datasource::file_format::FileWriterMode; use crate::datasource::file_format::write::{ - AbortMode, AbortableWrite, AsyncPutWriter, BatchSerializer, MultiPart, - DEFAULT_SCHEMA_INFER_MAX_RECORD, create_writer + stateless_serialize_and_write_files, AbortMode, AbortableWrite, AsyncPutWriter, + BatchSerializer, FileWriterMode, MultiPart, }; use crate::datasource::physical_plan::{ CsvExec, FileGroupDisplay, FileScanConfig, FileSinkConfig, diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 3f43100ffee5..74a1b4a51b4d 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -55,6 +55,10 @@ use super::FileFormat; use super::FileScanConfig; use super::FileWriterMode; use crate::datasource::file_format::file_type::FileCompressionType; +use crate::datasource::file_format::write::{ + stateless_serialize_and_write_files, AbortMode, AbortableWrite, AsyncPutWriter, + BatchSerializer, FileWriterMode, MultiPart, +}; use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD; use crate::datasource::physical_plan::FileSinkConfig; use crate::datasource::physical_plan::NdJsonExec; diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index 181e1a2edc55..9eec11f224ea 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -31,8 +31,8 @@ pub mod parquet; pub mod write; use std::any::Any; -use std::sync::Arc; use std::fmt; +use std::sync::Arc; use crate::arrow::datatypes::SchemaRef; use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig}; diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 00fd8222b1d9..ed10bf56cedf 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -40,9 +40,9 @@ use parquet::file::properties::WriterProperties; use parquet::file::statistics::Statistics as ParquetStatistics; use rand::distributions::Alphanumeric; -use super::FileScanConfig; -use super::FileFormat; use super::write::FileWriterMode; +use super::FileFormat; +use super::FileScanConfig; use crate::arrow::array::{ BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, }; diff --git a/datafusion/core/src/datasource/file_format/write.rs b/datafusion/core/src/datasource/file_format/write.rs index ce14912be07c..3a63b6c96adf 100644 --- a/datafusion/core/src/datasource/file_format/write.rs +++ b/datafusion/core/src/datasource/file_format/write.rs @@ -19,10 +19,10 @@ //! write support for the various file formats use std::io::Error; +use std::mem; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use std::mem; use crate::error::Result; use crate::physical_plan::SendableRecordBatchStream; @@ -39,7 +39,6 @@ use object_store::path::Path; use object_store::{MultipartId, ObjectMeta, ObjectStore}; use tokio::io::{AsyncWrite, AsyncWriteExt}; - /// `AsyncPutWriter` is an object that facilitates asynchronous writing to object stores. /// It is specifically designed for the `object_store` crate's `put` method and sends /// whole bytes at once when the buffer is flushed. @@ -308,4 +307,4 @@ pub(crate) async fn stateless_serialize_and_write_files( .await?; } Ok(row_count as u64) -} \ No newline at end of file +} diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 29cd1152ea25..bf7f4f0e0d1d 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -845,10 +845,12 @@ impl TableProvider for ListingTable { file_groups.len() ))); } - writer_mode = crate::datasource::file_format::write::FileWriterMode::Append; + writer_mode = + crate::datasource::file_format::write::FileWriterMode::Append; } ListingTableInsertMode::AppendNewFiles => { - writer_mode = crate::datasource::file_format::write::FileWriterMode::PutMultipart + writer_mode = + crate::datasource::file_format::write::FileWriterMode::PutMultipart } ListingTableInsertMode::Error => { return plan_err!( From 6c0bfdb6b2e38d1e07302435b5bc63f1660f0833 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Thu, 10 Aug 2023 21:51:54 -0400 Subject: [PATCH 07/11] finish implementing session write configs and parsing --- datafusion/common/src/config.rs | 63 ++++-- .../src/datasource/file_format/parquet.rs | 183 ++++++++++++++++-- .../core/src/datasource/listing/table.rs | 119 +++++++++++- .../test_files/information_schema.slt | 19 +- 4 files changed, 335 insertions(+), 49 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index a957c30e0e39..114d0b9507e3 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -292,42 +292,61 @@ config_namespace! { /// Sets best effort maximum size of data page in bytes pub data_pagesize_limit: usize, default = 1024 * 1024 - /// Sets best effort maximum number of rows in data page - pub data_page_row_count_limit: usize, default = usize::MAX + /// Sets write_batch_size in bytes + pub write_batch_size: usize, default = 1024 + + /// Sets parquet writer version + /// valid values are "1.0" and "2.0" + pub writer_version: String, default = "1.0".into() + + /// Sets default parquet compression codec + /// Valid values are: uncompressed, snappy, gzip(level), + /// lzo, brotli(level), lz4, zstd(level), and lz4_raw. + /// These values are not case sensitive. + pub compression: String, default = "snappy".into() + + /// Sets if dictionary encoding is enabled + pub dictionary_enabled: bool, default = true /// Sets best effort maximum dictionary page size, in bytes pub dictionary_page_size_limit: usize, default = 1024 * 1024 + /// Sets if statistics are enabled for any column + /// Valid values are: "none", "chunk", and "page" + /// These values are not case sensitive. + pub statistics_enabled: String, default = "page".into() + + /// Sets max statistics size for any column + pub max_statistics_size: usize, default = 4096 + /// Sets maximum number of rows in a row group pub max_row_group_size: usize, default = 1024 * 1024 /// Sets "created by" property - pub created_by: String, default = concat!("parquet-rs version ", env!("CARGO_PKG_VERSION")).into() - - pub compression: Option, default = None + pub created_by: String, default = concat!("datafusion version ", env!("CARGO_PKG_VERSION")).into() - /// Sets default encoding for any column - pub encoding: Option, default = None - - /// Sets if dictionary encoding is enabled - pub dictionary_enabled: Option, default = None + /// Sets column index trucate length + pub column_index_truncate_length: Option, default = None - /// Sets if statistics are enabled for any column - pub statistics_enabled: Option, default = None + /// Sets best effort maximum number of rows in data page + pub data_page_row_count_limit: usize, default = usize::MAX - /// Sets max statistics size for any column - pub max_statistics_size: Option, default = None + /// Sets default encoding for any column + /// Valid values are: plain, plain_dictionary, rle, + /// bit_packed, delta_binary_packed, delta_length_byte_array, + /// delta_byte_array, rle_dictionary, and byte_stream_split. + /// These values are not case sensitive. + pub encoding: String, default = "plain".into() /// Sets if bloom filter is enabled for any column - pub bloom_filter_enabled: Option, default = None - } - // TODO macro not working with Option or Option - // Sets bloom filter false positive probability - //pub bloom_fiter_fpp: Option, default = None + pub bloom_filter_enabled: bool, default = false + + /// Sets bloom filter false positive probability + pub bloom_filter_fpp: f64, default = 0.05 // Sets bloom filter number of distinct values - //pub bloom_filter_ndv: Option, default = None - //} + pub bloom_filter_ndv: u64, default = 1_000_000_u64 + } } config_namespace! { @@ -786,6 +805,8 @@ macro_rules! config_field { config_field!(String); config_field!(bool); config_field!(usize); +config_field!(f64); +config_field!(u64); /// An implementation trait used to recursively walk configuration trait Visit { diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index ed10bf56cedf..c35f59dd4a94 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -17,6 +17,7 @@ //! Parquet format abstractions +use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel}; use rand::distributions::DistString; use std::any::Any; use std::fmt; @@ -36,7 +37,7 @@ use object_store::{ObjectMeta, ObjectStore}; use parquet::arrow::{parquet_to_arrow_schema, AsyncArrowWriter}; use parquet::file::footer::{decode_footer, decode_metadata}; use parquet::file::metadata::ParquetMetaData; -use parquet::file::properties::WriterProperties; +use parquet::file::properties::{EnabledStatistics, WriterProperties, WriterVersion}; use parquet::file::statistics::Statistics as ParquetStatistics; use rand::distributions::Alphanumeric; @@ -602,33 +603,179 @@ impl DisplayAs for ParquetSink { } } +/// Parses datafusion.execution.parquet.encoding String to a parquet::basic::Encoding +fn parse_encoding_string(str_setting: &str) -> Result { + let str_setting_lower: &str = &str_setting.to_lowercase(); + match str_setting_lower { + "plain" => Ok(parquet::basic::Encoding::PLAIN), + "plain_dictionary" => Ok(parquet::basic::Encoding::PLAIN_DICTIONARY), + "rle" => Ok(parquet::basic::Encoding::RLE), + "bit_packed" => Ok(parquet::basic::Encoding::BIT_PACKED), + "delta_binary_packed" => Ok(parquet::basic::Encoding::DELTA_BINARY_PACKED), + "delta_length_byte_array" => { + Ok(parquet::basic::Encoding::DELTA_LENGTH_BYTE_ARRAY) + } + "delta_byte_array" => Ok(parquet::basic::Encoding::DELTA_BYTE_ARRAY), + "rle_dictionary" => Ok(parquet::basic::Encoding::RLE_DICTIONARY), + "byte_stream_split" => Ok(parquet::basic::Encoding::BYTE_STREAM_SPLIT), + _ => Err(DataFusionError::Plan(format!( + "Unknown or unsupported parquet encoding: \ + {str_setting}. Valid values are: plain, plain_dictionary, rle, \ + /// bit_packed, delta_binary_packed, delta_length_byte_array, \ + /// delta_byte_array, rle_dictionary, and byte_stream_split." + ))), + } +} + +/// Splits compression string into compression codec and optional compression_level +/// I.e. gzip(2) -> gzip, 2 +fn split_compression_string(str_setting: &str) -> Result<(&str, Option)> { + let split_setting = str_setting.split_once('('); + + match split_setting { + Some((codec, rh)) => { + let level = &rh[..rh.len() - 1].parse::().map_err(|_| { + DataFusionError::Plan(format!( + "Could not parse compression string. \ + Got codec: {} and unknown level from {}", + codec, str_setting + )) + })?; + Ok((codec, Some(*level))) + } + None => Ok((str_setting, None)), + } +} + +/// Helper to ensure compression codecs which don't support levels +/// don't have one set. E.g. snappy(2) is invalid. +fn check_level_is_none(codec: &str, level: &Option) -> Result<()> { + if level.is_some() { + return Err(DataFusionError::Plan(format!( + "Compression {codec} does not support specifying a level" + ))); + } + Ok(()) +} + +/// Helper to ensure compression codecs which require a level +/// do have one set. E.g. zstd is invalid, zstd(3) is valid +fn require_level(codec: &str, level: Option) -> Result { + level.ok_or(DataFusionError::Plan(format!( + "{codec} compression requires specifying a level such as {codec}(4)" + ))) +} + +/// Parses datafusion.execution.parquet.compression String to a parquet::basic::Compression +fn parse_compression_string(str_setting: &str) -> Result { + let str_setting_lower: &str = &str_setting.to_lowercase(); + let (codec, level) = split_compression_string(str_setting_lower)?; + match codec { + "uncompressed" => { + check_level_is_none(codec, &level)?; + Ok(parquet::basic::Compression::UNCOMPRESSED) + } + "snappy" => { + check_level_is_none(codec, &level)?; + Ok(parquet::basic::Compression::SNAPPY) + } + "gzip" => { + let level = require_level(codec, level)?; + Ok(parquet::basic::Compression::GZIP(GzipLevel::try_new( + level, + )?)) + } + "lzo" => { + check_level_is_none(codec, &level)?; + Ok(parquet::basic::Compression::LZO) + } + "brotli" => { + let level = require_level(codec, level)?; + Ok(parquet::basic::Compression::BROTLI(BrotliLevel::try_new( + level, + )?)) + } + "lz4" => { + check_level_is_none(codec, &level)?; + Ok(parquet::basic::Compression::LZ4) + } + "zstd" => { + let level = require_level(codec, level)?; + Ok(parquet::basic::Compression::ZSTD(ZstdLevel::try_new( + level as i32, + )?)) + } + "lz4_raw" => { + check_level_is_none(codec, &level)?; + Ok(parquet::basic::Compression::LZ4_RAW) + } + _ => Err(DataFusionError::Plan(format!( + "Unknown or unsupported parquet compression: \ + {str_setting}. Valid values are: uncompressed, snappy, gzip(level), \ + lzo, brotli(level), lz4, zstd(level), and lz4_raw." + ))), + } +} + +fn parse_version_string(str_setting: &str) -> Result { + let str_setting_lower: &str = &str_setting.to_lowercase(); + match str_setting_lower { + "1.0" => Ok(WriterVersion::PARQUET_1_0), + "2.0" => Ok(WriterVersion::PARQUET_2_0), + _ => Err(DataFusionError::Plan(format!( + "Unknown or unsupported parquet writer version {str_setting} \ + valid options are '1.0' and '2.0'" + ))), + } +} + +fn parse_statistics_string(str_setting: &str) -> Result { + let str_setting_lower: &str = &str_setting.to_lowercase(); + match str_setting_lower { + "none" => Ok(EnabledStatistics::None), + "chunk" => Ok(EnabledStatistics::Chunk), + "page" => Ok(EnabledStatistics::Page), + _ => Err(DataFusionError::Plan(format!( + "Unknown or unsupported parquet statistics setting {str_setting} \ + valid options are 'none', 'page', and 'chunk'" + ))), + } +} + impl ParquetSink { fn new(config: FileSinkConfig) -> Self { Self { config } } - /// Builds a parquet WriterProperties struct, setting options as appropriate from TaskContext options + /// Builds a parquet WriterProperties struct, setting options as appropriate from TaskContext options. + /// May return error if SessionContext contains invalid or unsupported options fn parquet_writer_props_from_context( &self, context: &Arc, - ) -> WriterProperties { + ) -> Result { let parquet_context = &context.session_config().options().execution.parquet; - let mut builder = WriterProperties::builder() + Ok(WriterProperties::builder() + .set_data_page_size_limit(parquet_context.data_pagesize_limit) + .set_write_batch_size(parquet_context.write_batch_size) + .set_writer_version(parse_version_string(&parquet_context.writer_version)?) + .set_compression(parse_compression_string(&parquet_context.compression)?) + .set_dictionary_enabled(parquet_context.dictionary_enabled) + .set_dictionary_page_size_limit(parquet_context.dictionary_page_size_limit) + .set_statistics_enabled(parse_statistics_string( + &parquet_context.statistics_enabled, + )?) + .set_max_statistics_size(parquet_context.max_statistics_size) + .set_max_row_group_size(parquet_context.max_row_group_size) .set_created_by(parquet_context.created_by.clone()) + .set_column_index_truncate_length( + parquet_context.column_index_truncate_length, + ) .set_data_page_row_count_limit(parquet_context.data_page_row_count_limit) - .set_data_page_size_limit(parquet_context.data_pagesize_limit); - - if parquet_context.bloom_filter_enabled.is_some() { - builder = builder - .set_bloom_filter_enabled(parquet_context.bloom_filter_enabled.unwrap()) - } - - // TODO - //.set_bloom_filter_fpp(parquet_context.bloom_filter_fpp) - // TODO - //.set_bloom_filter_ndv(parquet_context.bloom_filter_ndv) - //.set_compression(parquet::basic::Compression::try_from(parquet_context.compression)) - builder.build() + .set_encoding(parse_encoding_string(&parquet_context.encoding)?) + .set_bloom_filter_enabled(parquet_context.bloom_filter_enabled) + .set_bloom_filter_fpp(parquet_context.bloom_filter_fpp) + .set_bloom_filter_ndv(parquet_context.bloom_filter_ndv) + .build()) } // Create a write for parquet files @@ -675,7 +822,7 @@ impl DataSink for ParquetSink { context: &Arc, ) -> Result { let num_partitions = data.len(); - let parquet_props = self.parquet_writer_props_from_context(context); + let parquet_props = self.parquet_writer_props_from_context(context)?; let object_store = context .runtime_env() diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index bf7f4f0e0d1d..4f2387ad0dfa 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -965,6 +965,7 @@ mod tests { use datafusion_common::assert_contains; use datafusion_expr::LogicalPlanBuilder; use rstest::*; + use std::collections::HashMap; use std::fs::File; use tempfile::TempDir; @@ -1556,6 +1557,7 @@ mod tests { helper_test_insert_into_append_to_existing_files( FileType::JSON, FileCompressionType::UNCOMPRESSED, + None, ) .await?; Ok(()) @@ -1566,6 +1568,7 @@ mod tests { helper_test_append_new_files_to_table( FileType::JSON, FileCompressionType::UNCOMPRESSED, + None, ) .await?; Ok(()) @@ -1576,6 +1579,7 @@ mod tests { helper_test_insert_into_append_to_existing_files( FileType::CSV, FileCompressionType::UNCOMPRESSED, + None, ) .await?; Ok(()) @@ -1586,26 +1590,120 @@ mod tests { helper_test_append_new_files_to_table( FileType::CSV, FileCompressionType::UNCOMPRESSED, + None, ) .await?; Ok(()) } #[tokio::test] - async fn test_insert_into_append_new_parquet_files() -> Result<()> { + async fn test_insert_into_append_new_parquet_files_defaults() -> Result<()> { helper_test_append_new_files_to_table( FileType::PARQUET, FileCompressionType::UNCOMPRESSED, + None, ) .await?; Ok(()) } + #[tokio::test] + async fn test_insert_into_append_new_parquet_files_session_overrides() -> Result<()> { + let mut config_map: HashMap = HashMap::new(); + config_map.insert( + "datafusion.execution.parquet.compression".into(), + "zstd(5)".into(), + ); + config_map.insert( + "datafusion.execution.parquet.dictionary_enabled".into(), + "false".into(), + ); + config_map.insert( + "datafusion.execution.parquet.dictionary_page_size_limit".into(), + "100".into(), + ); + config_map.insert( + "datafusion.execution.parquet.staistics_enabled".into(), + "none".into(), + ); + config_map.insert( + "datafusion.execution.parquet.max_statistics_size".into(), + "10".into(), + ); + config_map.insert( + "datafusion.execution.parquet.max_row_group_size".into(), + "5".into(), + ); + config_map.insert( + "datafusion.execution.parquet.created_by".into(), + "datafusion test".into(), + ); + config_map.insert( + "datafusion.execution.parquet.column_index_truncate_length".into(), + "50".into(), + ); + config_map.insert( + "datafusion.execution.parquet.data_page_row_count_limit".into(), + "50".into(), + ); + config_map.insert( + "datafusion.execution.parquet.encoding".into(), + "delta_binary_packed".into(), + ); + config_map.insert( + "datafusion.execution.parquet.bloom_filter_enabled".into(), + "true".into(), + ); + config_map.insert( + "datafusion.execution.parquet.bloom_filter_fpp".into(), + "0.01".into(), + ); + config_map.insert( + "datafusion.execution.parquet.bloom_filter_ndv".into(), + "1000".into(), + ); + config_map.insert( + "datafusion.execution.parquet.writer_version".into(), + "2.0".into(), + ); + config_map.insert( + "datafusion.execution.parquet.write_batch_size".into(), + "5".into(), + ); + helper_test_append_new_files_to_table( + FileType::PARQUET, + FileCompressionType::UNCOMPRESSED, + Some(config_map), + ) + .await?; + Ok(()) + } + + #[tokio::test] + async fn test_insert_into_append_new_parquet_files_invalid_session_fails( + ) -> Result<()> { + let mut config_map: HashMap = HashMap::new(); + config_map.insert( + "datafusion.execution.parquet.compression".into(), + "zstd".into(), + ); + let e = helper_test_append_new_files_to_table( + FileType::PARQUET, + FileCompressionType::UNCOMPRESSED, + Some(config_map), + ) + .await + .expect_err("Example should fail!"); + assert_eq!("Error during planning: zstd compression requires specifying a level such as zstd(4)", format!("{e}")); + Ok(()) + } + #[tokio::test] async fn test_insert_into_append_to_parquet_file_fails() -> Result<()> { let maybe_err = helper_test_insert_into_append_to_existing_files( FileType::PARQUET, FileCompressionType::UNCOMPRESSED, + None, ) .await; let _err = @@ -1639,9 +1737,16 @@ mod tests { async fn helper_test_insert_into_append_to_existing_files( file_type: FileType, file_compression_type: FileCompressionType, + session_config_map: Option>, ) -> Result<()> { // Create the initial context, schema, and batch. - let session_ctx = SessionContext::new(); + let session_ctx = match session_config_map { + Some(cfg) => { + let config = SessionConfig::from_string_hash_map(cfg)?; + SessionContext::with_config(config) + } + None => SessionContext::new(), + }; // Create a new schema with one field called "a" of type Int32 let schema = Arc::new(Schema::new(vec![Field::new( "column1", @@ -1801,9 +1906,17 @@ mod tests { async fn helper_test_append_new_files_to_table( file_type: FileType, file_compression_type: FileCompressionType, + session_config_map: Option>, ) -> Result<()> { // Create the initial context, schema, and batch. - let session_ctx = SessionContext::new(); + let session_ctx = match session_config_map { + Some(cfg) => { + let config = SessionConfig::from_string_hash_map(cfg)?; + SessionContext::with_config(config) + } + None => SessionContext::new(), + }; + // Create a new schema with one field called "a" of type Int32 let schema = Arc::new(Schema::new(vec![Field::new( "column1", diff --git a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt index 05cce5b96b1d..452a8709c523 100644 --- a/datafusion/core/tests/sqllogictests/test_files/information_schema.slt +++ b/datafusion/core/tests/sqllogictests/test_files/information_schema.slt @@ -146,23 +146,28 @@ datafusion.execution.aggregate.scalar_update_factor 10 datafusion.execution.batch_size 8192 datafusion.execution.coalesce_batches true datafusion.execution.collect_statistics false -datafusion.execution.parquet.bloom_filter_enabled NULL -datafusion.execution.parquet.compression NULL -datafusion.execution.parquet.created_by parquet-rs version 45.0.0 +datafusion.execution.parquet.bloom_filter_enabled false +datafusion.execution.parquet.bloom_filter_fpp 0.05 +datafusion.execution.parquet.bloom_filter_ndv 1000000 +datafusion.execution.parquet.column_index_truncate_length NULL +datafusion.execution.parquet.compression snappy +datafusion.execution.parquet.created_by datafusion version 28.0.0 datafusion.execution.parquet.data_page_row_count_limit 18446744073709551615 datafusion.execution.parquet.data_pagesize_limit 1048576 -datafusion.execution.parquet.dictionary_enabled NULL +datafusion.execution.parquet.dictionary_enabled true datafusion.execution.parquet.dictionary_page_size_limit 1048576 datafusion.execution.parquet.enable_page_index true -datafusion.execution.parquet.encoding NULL +datafusion.execution.parquet.encoding plain datafusion.execution.parquet.max_row_group_size 1048576 -datafusion.execution.parquet.max_statistics_size NULL +datafusion.execution.parquet.max_statistics_size 4096 datafusion.execution.parquet.metadata_size_hint NULL datafusion.execution.parquet.pruning true datafusion.execution.parquet.pushdown_filters false datafusion.execution.parquet.reorder_filters false datafusion.execution.parquet.skip_metadata true -datafusion.execution.parquet.statistics_enabled NULL +datafusion.execution.parquet.statistics_enabled page +datafusion.execution.parquet.write_batch_size 1024 +datafusion.execution.parquet.writer_version 1.0 datafusion.execution.planning_concurrency 13 datafusion.execution.sort_in_place_threshold_bytes 1048576 datafusion.execution.sort_spill_reservation_bytes 10485760 From f10dfbafd34469f93ce4b6b7d7b7d20a0f0adb06 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Thu, 10 Aug 2023 21:55:21 -0400 Subject: [PATCH 08/11] fix ndv doc --- datafusion/common/src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 114d0b9507e3..41c2657e1cca 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -344,7 +344,7 @@ config_namespace! { /// Sets bloom filter false positive probability pub bloom_filter_fpp: f64, default = 0.05 - // Sets bloom filter number of distinct values + /// Sets bloom filter number of distinct values pub bloom_filter_ndv: u64, default = 1_000_000_u64 } } From 7b108813c36c8cbe7dcd65a4cb3da4cc6e34b787 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Thu, 10 Aug 2023 22:19:51 -0400 Subject: [PATCH 09/11] rebase resolve conflicts --- .../core/src/datasource/file_format/csv.rs | 4 +- .../core/src/datasource/file_format/json.rs | 8 +- .../core/src/datasource/file_format/write.rs | 55 +++++++++ docs/source/user-guide/configs.md | 104 ++++++++++-------- 4 files changed, 117 insertions(+), 54 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index bf9f74869ffe..32fbf03b580f 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -39,10 +39,8 @@ use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore} use super::{FileFormat, DEFAULT_SCHEMA_INFER_MAX_RECORD}; use crate::datasource::file_format::file_type::FileCompressionType; -use crate::datasource::file_format::FileWriterMode; use crate::datasource::file_format::write::{ - stateless_serialize_and_write_files, AbortMode, AbortableWrite, AsyncPutWriter, - BatchSerializer, FileWriterMode, MultiPart, + create_writer, stateless_serialize_and_write_files, BatchSerializer, FileWriterMode, }; use crate::datasource::physical_plan::{ CsvExec, FileGroupDisplay, FileScanConfig, FileSinkConfig, diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 74a1b4a51b4d..8472f4e5c164 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -47,17 +47,11 @@ use crate::physical_plan::insert::InsertExec; use crate::physical_plan::SendableRecordBatchStream; use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics}; -use crate::datasource::file_format::write::{ - AbortMode, AbortableWrite, AsyncPutWriter, BatchSerializer, MultiPart, - stateless_serialize_and_write_files, FileWriterMode, create_writer -}; use super::FileFormat; use super::FileScanConfig; -use super::FileWriterMode; use crate::datasource::file_format::file_type::FileCompressionType; use crate::datasource::file_format::write::{ - stateless_serialize_and_write_files, AbortMode, AbortableWrite, AsyncPutWriter, - BatchSerializer, FileWriterMode, MultiPart, + create_writer, stateless_serialize_and_write_files, BatchSerializer, FileWriterMode, }; use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD; use crate::datasource::physical_plan::FileSinkConfig; diff --git a/datafusion/core/src/datasource/file_format/write.rs b/datafusion/core/src/datasource/file_format/write.rs index 3a63b6c96adf..c256c9689ab9 100644 --- a/datafusion/core/src/datasource/file_format/write.rs +++ b/datafusion/core/src/datasource/file_format/write.rs @@ -24,6 +24,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use crate::datasource::physical_plan::FileMeta; use crate::error::Result; use crate::physical_plan::SendableRecordBatchStream; @@ -39,6 +40,8 @@ use object_store::path::Path; use object_store::{MultipartId, ObjectMeta, ObjectStore}; use tokio::io::{AsyncWrite, AsyncWriteExt}; +use super::file_type::FileCompressionType; + /// `AsyncPutWriter` is an object that facilitates asynchronous writing to object stores. /// It is specifically designed for the `object_store` crate's `put` method and sends /// whole bytes at once when the buffer is flushed. @@ -264,6 +267,58 @@ async fn check_for_errors( } } +/// Returns an [`AbortableWrite`] which writes to the given object store location +/// with the specified compression +pub(crate) async fn create_writer( + writer_mode: FileWriterMode, + file_compression_type: FileCompressionType, + file_meta: FileMeta, + object_store: Arc, +) -> Result>> { + let object = &file_meta.object_meta; + match writer_mode { + // If the mode is append, call the store's append method and return wrapped in + // a boxed trait object. + FileWriterMode::Append => { + let writer = object_store + .append(&object.location) + .await + .map_err(DataFusionError::ObjectStore)?; + let writer = AbortableWrite::new( + file_compression_type.convert_async_writer(writer)?, + AbortMode::Append, + ); + Ok(writer) + } + // If the mode is put, create a new AsyncPut writer and return it wrapped in + // a boxed trait object + FileWriterMode::Put => { + let writer = Box::new(AsyncPutWriter::new(object.clone(), object_store)); + let writer = AbortableWrite::new( + file_compression_type.convert_async_writer(writer)?, + AbortMode::Put, + ); + Ok(writer) + } + // If the mode is put multipart, call the store's put_multipart method and + // return the writer wrapped in a boxed trait object. + FileWriterMode::PutMultipart => { + let (multipart_id, writer) = object_store + .put_multipart(&object.location) + .await + .map_err(DataFusionError::ObjectStore)?; + Ok(AbortableWrite::new( + file_compression_type.convert_async_writer(writer)?, + AbortMode::MultiPart(MultiPart::new( + object_store, + multipart_id, + object.location.clone(), + )), + )) + } + } +} + /// Contains the common logic for serializing RecordBatches and /// writing the resulting bytes to an ObjectStore. /// Serialization is assumed to be stateless, i.e. diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 63c9c064bc52..50d9cb7c8b67 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -35,47 +35,63 @@ Values are parsed according to the [same rules used in casts from Utf8](https:// If the value in the environment variable cannot be cast to the type of the configuration option, the default value will be used instead and a warning emitted. Environment variables are read during `SessionConfig` initialisation so they must be set beforehand and will not affect running sessions. -| key | default | description | -| ---------------------------------------------------------- | ---------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| datafusion.catalog.create_default_catalog_and_schema | true | Whether the default catalog and schema should be created automatically. | -| datafusion.catalog.default_catalog | datafusion | The default catalog name - this impacts what SQL queries use if not specified | -| datafusion.catalog.default_schema | public | The default schema name - this impacts what SQL queries use if not specified | -| datafusion.catalog.information_schema | false | Should DataFusion provide access to `information_schema` virtual tables for displaying schema information | -| datafusion.catalog.location | NULL | Location scanned to load tables for `default` schema | -| datafusion.catalog.format | NULL | Type of `TableProvider` to use when loading `default` schema | -| datafusion.catalog.has_header | false | If the file has a header | -| datafusion.execution.batch_size | 8192 | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption | -| datafusion.execution.coalesce_batches | true | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting | -| datafusion.execution.collect_statistics | false | Should DataFusion collect statistics after listing files | -| datafusion.execution.target_partitions | 0 | Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of CPU cores on the system | -| datafusion.execution.time_zone | +00:00 | The default time zone Some functions, e.g. `EXTRACT(HOUR from SOME_TIME)`, shift the underlying datetime according to this time zone, and then extract the hour | -| datafusion.execution.parquet.enable_page_index | true | If true, reads the Parquet data page level metadata (the Page Index), if present, to reduce the I/O and number of rows decoded. | -| datafusion.execution.parquet.pruning | true | If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file | -| datafusion.execution.parquet.skip_metadata | true | If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata | -| datafusion.execution.parquet.metadata_size_hint | NULL | If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer | -| datafusion.execution.parquet.pushdown_filters | false | If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded | -| datafusion.execution.parquet.reorder_filters | false | If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | -| datafusion.execution.aggregate.scalar_update_factor | 10 | Specifies the threshold for using `ScalarValue`s to update accumulators during high-cardinality aggregations for each input batch. The aggregation is considered high-cardinality if the number of affected groups is greater than or equal to `batch_size / scalar_update_factor`. In such cases, `ScalarValue`s are utilized for updating accumulators, rather than the default batch-slice approach. This can lead to performance improvements. By adjusting the `scalar_update_factor`, you can balance the trade-off between more efficient accumulator updates and the number of groups affected. | -| datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | -| datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). | -| datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. | -| datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | -| datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | -| datafusion.optimizer.repartition_aggregations | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level | -| datafusion.optimizer.repartition_file_min_size | 10485760 | Minimum total files size in bytes to perform file scan repartitioning. | -| datafusion.optimizer.repartition_joins | true | Should DataFusion repartition data using the join keys to execute joins in parallel using the provided `target_partitions` level | -| datafusion.optimizer.allow_symmetric_joins_without_pruning | true | Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors. | -| datafusion.optimizer.repartition_file_scans | true | When set to `true`, file groups will be repartitioned to achieve maximum parallelism. Currently Parquet and CSV formats are supported. If set to `true`, all files will be repartitioned evenly (i.e., a single large file might be partitioned into smaller chunks) for parallel scanning. If set to `false`, different files will be read in parallel, but repartitioning won't happen within a single file. | -| datafusion.optimizer.repartition_windows | true | Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level | -| datafusion.optimizer.repartition_sorts | true | Should DataFusion execute sorts in a per-partition fashion and merge afterwards instead of coalescing first and sorting globally. With this flag is enabled, plans in the form below `text "SortExec: [a@0 ASC]", " CoalescePartitionsExec", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` would turn into the plan below which performs better in multithreaded environments `text "SortPreservingMergeExec: [a@0 ASC]", " SortExec: [a@0 ASC]", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` | -| datafusion.optimizer.bounded_order_preserving_variants | false | When true, DataFusion will opportunistically remove sorts by replacing `RepartitionExec` with `SortPreservingRepartitionExec`, and `CoalescePartitionsExec` with `SortPreservingMergeExec`, even when the query is bounded. | -| datafusion.optimizer.skip_failed_rules | false | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail | -| datafusion.optimizer.max_passes | 3 | Number of times that the optimizer will attempt to optimize the plan | -| datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys | -| datafusion.optimizer.prefer_hash_join | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory | -| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition | -| datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | -| datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans | -| datafusion.sql_parser.parse_float_as_decimal | false | When set to true, SQL parser will parse float as decimal type | -| datafusion.sql_parser.enable_ident_normalization | true | When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted) | -| datafusion.sql_parser.dialect | generic | Configure the SQL dialect used by DataFusion's parser; supported values include: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, and Ansi. | +| key | default | description | +| ---------------------------------------------------------- | ------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| datafusion.catalog.create_default_catalog_and_schema | true | Whether the default catalog and schema should be created automatically. | +| datafusion.catalog.default_catalog | datafusion | The default catalog name - this impacts what SQL queries use if not specified | +| datafusion.catalog.default_schema | public | The default schema name - this impacts what SQL queries use if not specified | +| datafusion.catalog.information_schema | false | Should DataFusion provide access to `information_schema` virtual tables for displaying schema information | +| datafusion.catalog.location | NULL | Location scanned to load tables for `default` schema | +| datafusion.catalog.format | NULL | Type of `TableProvider` to use when loading `default` schema | +| datafusion.catalog.has_header | false | If the file has a header | +| datafusion.execution.batch_size | 8192 | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption | +| datafusion.execution.coalesce_batches | true | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting | +| datafusion.execution.collect_statistics | false | Should DataFusion collect statistics after listing files | +| datafusion.execution.target_partitions | 0 | Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of CPU cores on the system | +| datafusion.execution.time_zone | +00:00 | The default time zone Some functions, e.g. `EXTRACT(HOUR from SOME_TIME)`, shift the underlying datetime according to this time zone, and then extract the hour | +| datafusion.execution.parquet.enable_page_index | true | If true, reads the Parquet data page level metadata (the Page Index), if present, to reduce the I/O and number of rows decoded. | +| datafusion.execution.parquet.pruning | true | If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file | +| datafusion.execution.parquet.skip_metadata | true | If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata | +| datafusion.execution.parquet.metadata_size_hint | NULL | If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer | +| datafusion.execution.parquet.pushdown_filters | false | If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded | +| datafusion.execution.parquet.reorder_filters | false | If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query | +| datafusion.execution.parquet.data_pagesize_limit | 1048576 | Sets best effort maximum size of data page in bytes | +| datafusion.execution.parquet.write_batch_size | 1024 | Sets write_batch_size in bytes | +| datafusion.execution.parquet.writer_version | 1.0 | Sets parquet writer version valid values are "1.0" and "2.0" | +| datafusion.execution.parquet.compression | snappy | Sets default parquet compression codec Valid values are: uncompressed, snappy, gzip(level), lzo, brotli(level), lz4, zstd(level), and lz4_raw. These values are not case sensitive. | +| datafusion.execution.parquet.dictionary_enabled | true | Sets if dictionary encoding is enabled | +| datafusion.execution.parquet.dictionary_page_size_limit | 1048576 | Sets best effort maximum dictionary page size, in bytes | +| datafusion.execution.parquet.statistics_enabled | page | Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. | +| datafusion.execution.parquet.max_statistics_size | 4096 | Sets max statistics size for any column | +| datafusion.execution.parquet.max_row_group_size | 1048576 | Sets maximum number of rows in a row group | +| datafusion.execution.parquet.created_by | datafusion version 28.0.0 | Sets "created by" property | +| datafusion.execution.parquet.column_index_truncate_length | NULL | Sets column index trucate length | +| datafusion.execution.parquet.data_page_row_count_limit | 18446744073709551615 | Sets best effort maximum number of rows in data page | +| datafusion.execution.parquet.encoding | plain | Sets default encoding for any column Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. | +| datafusion.execution.parquet.bloom_filter_enabled | false | Sets if bloom filter is enabled for any column | +| datafusion.execution.parquet.bloom_filter_fpp | 0.05 | Sets bloom filter false positive probability | +| datafusion.execution.parquet.bloom_filter_ndv | 1000000 | Sets bloom filter number of distinct values | +| datafusion.execution.aggregate.scalar_update_factor | 10 | Specifies the threshold for using `ScalarValue`s to update accumulators during high-cardinality aggregations for each input batch. The aggregation is considered high-cardinality if the number of affected groups is greater than or equal to `batch_size / scalar_update_factor`. In such cases, `ScalarValue`s are utilized for updating accumulators, rather than the default batch-slice approach. This can lead to performance improvements. By adjusting the `scalar_update_factor`, you can balance the trade-off between more efficient accumulator updates and the number of groups affected. | +| datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | +| datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). | +| datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. | +| datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | +| datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | +| datafusion.optimizer.repartition_aggregations | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level | +| datafusion.optimizer.repartition_file_min_size | 10485760 | Minimum total files size in bytes to perform file scan repartitioning. | +| datafusion.optimizer.repartition_joins | true | Should DataFusion repartition data using the join keys to execute joins in parallel using the provided `target_partitions` level | +| datafusion.optimizer.allow_symmetric_joins_without_pruning | true | Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors. | +| datafusion.optimizer.repartition_file_scans | true | When set to `true`, file groups will be repartitioned to achieve maximum parallelism. Currently Parquet and CSV formats are supported. If set to `true`, all files will be repartitioned evenly (i.e., a single large file might be partitioned into smaller chunks) for parallel scanning. If set to `false`, different files will be read in parallel, but repartitioning won't happen within a single file. | +| datafusion.optimizer.repartition_windows | true | Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level | +| datafusion.optimizer.repartition_sorts | true | Should DataFusion execute sorts in a per-partition fashion and merge afterwards instead of coalescing first and sorting globally. With this flag is enabled, plans in the form below `text "SortExec: [a@0 ASC]", " CoalescePartitionsExec", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` would turn into the plan below which performs better in multithreaded environments `text "SortPreservingMergeExec: [a@0 ASC]", " SortExec: [a@0 ASC]", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` | +| datafusion.optimizer.bounded_order_preserving_variants | false | When true, DataFusion will opportunistically remove sorts by replacing `RepartitionExec` with `SortPreservingRepartitionExec`, and `CoalescePartitionsExec` with `SortPreservingMergeExec`, even when the query is bounded. | +| datafusion.optimizer.skip_failed_rules | false | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail | +| datafusion.optimizer.max_passes | 3 | Number of times that the optimizer will attempt to optimize the plan | +| datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys | +| datafusion.optimizer.prefer_hash_join | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory | +| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition | +| datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | +| datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans | +| datafusion.sql_parser.parse_float_as_decimal | false | When set to true, SQL parser will parse float as decimal type | +| datafusion.sql_parser.enable_ident_normalization | true | When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted) | +| datafusion.sql_parser.dialect | generic | Configure the SQL dialect used by DataFusion's parser; supported values include: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, and Ansi. | From 77c3f61189ab856fe7b14755fe1dfa6f3253a048 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Fri, 11 Aug 2023 08:17:06 -0400 Subject: [PATCH 10/11] split up test_string_expressions into 2 tests to avoid stack overflow error --- datafusion/core/tests/sql/expr.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/datafusion/core/tests/sql/expr.rs b/datafusion/core/tests/sql/expr.rs index fc0a4e7c7ed2..c9fe0e4c7569 100644 --- a/datafusion/core/tests/sql/expr.rs +++ b/datafusion/core/tests/sql/expr.rs @@ -449,7 +449,7 @@ async fn test_substring_expr() -> Result<()> { } #[tokio::test] -async fn test_string_expressions() -> Result<()> { +async fn test_string_expressions_batch1() -> Result<()> { test_expression!("ascii('')", "0"); test_expression!("ascii('x')", "120"); test_expression!("ascii(NULL)", "NULL"); @@ -501,6 +501,11 @@ async fn test_string_expressions() -> Result<()> { test_expression!("rtrim(' zzzytest ', NULL)", "NULL"); test_expression!("rtrim('testxxzx', 'xyz')", "test"); test_expression!("rtrim(NULL, 'xyz')", "NULL"); + Ok(()) +} + +#[tokio::test] +async fn test_string_expressions_batch2() -> Result<()> { test_expression!("split_part('abc~@~def~@~ghi', '~@~', 2)", "def"); test_expression!("split_part('abc~@~def~@~ghi', '~@~', 20)", ""); test_expression!("split_part(NULL, '~@~', 20)", "NULL"); From bd37756cb5736425fb7eb529865f402ed7bc84d8 Mon Sep 17 00:00:00 2001 From: Devin D'Angelo Date: Fri, 11 Aug 2023 08:35:07 -0400 Subject: [PATCH 11/11] add comments explaining test split --- datafusion/core/tests/sql/expr.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/datafusion/core/tests/sql/expr.rs b/datafusion/core/tests/sql/expr.rs index c9fe0e4c7569..36786bd0798f 100644 --- a/datafusion/core/tests/sql/expr.rs +++ b/datafusion/core/tests/sql/expr.rs @@ -448,6 +448,8 @@ async fn test_substring_expr() -> Result<()> { Ok(()) } +/// Test string expressions test split into two batches +/// to prevent stack overflow error #[tokio::test] async fn test_string_expressions_batch1() -> Result<()> { test_expression!("ascii('')", "0"); @@ -504,6 +506,8 @@ async fn test_string_expressions_batch1() -> Result<()> { Ok(()) } +/// Test string expressions test split into two batches +/// to prevent stack overflow error #[tokio::test] async fn test_string_expressions_batch2() -> Result<()> { test_expression!("split_part('abc~@~def~@~ghi', '~@~', 2)", "def");