diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 968d8215ca4d..fa65709b1e4f 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -22,6 +22,9 @@ use std::collections::{BTreeMap, HashMap}; use std::fmt::{self, Display}; use std::str::FromStr; +#[cfg(feature = "parquet")] +use parquet::file::metadata::KeyValue; + use crate::error::_config_err; use crate::parsers::CompressionTypeVariant; use crate::{DataFusionError, FileType, Result}; @@ -1368,6 +1371,10 @@ pub struct TableParquetOptions { pub global: ParquetOptions, /// Column specific options. Default usage is parquet.XX::column. pub column_specific_options: HashMap, + /// Optional, additional metadata to be inserted into the key_value_metadata + /// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html). + #[cfg(feature = "parquet")] + pub key_value_metadata: Option>, } impl ConfigField for TableParquetOptions { diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index f651ff932a5a..2f989c3f6c30 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -88,7 +88,8 @@ impl TryFrom<&TableParquetOptions> for ParquetWriterOptions { .set_created_by(created_by.clone()) .set_column_index_truncate_length(*column_index_truncate_length) .set_data_page_row_count_limit(*data_page_row_count_limit) - .set_bloom_filter_enabled(*bloom_filter_enabled); + .set_bloom_filter_enabled(*bloom_filter_enabled) + .set_key_value_metadata(parquet_options.key_value_metadata.clone()); if let Some(encoding) = &encoding { builder = builder.set_encoding(parse_encoding_string(encoding)?); diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index bcf4e8a2c8e4..23f8e8e0b5f6 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1136,7 +1136,7 @@ mod tests { }; use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::arrow::ParquetRecordBatchStreamBuilder; - use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex}; + use parquet::file::metadata::{KeyValue, ParquetColumnIndex, ParquetOffsetIndex}; use parquet::file::page_index::index::Index; use tokio::fs::File; use tokio::io::AsyncWrite; @@ -1865,7 +1865,13 @@ mod tests { }; let parquet_sink = Arc::new(ParquetSink::new( file_sink_config, - TableParquetOptions::default(), + TableParquetOptions { + key_value_metadata: Some(vec![KeyValue { + key: "my-data".into(), + value: Some("stuff".to_string()), + }]), + ..Default::default() + }, )); // create data @@ -1899,7 +1905,10 @@ mod tests { let ( path, FileMetaData { - num_rows, schema, .. + num_rows, + schema, + key_value_metadata, + .. }, ) = written.take(1).next().unwrap(); let path_parts = path.parts().collect::>(); @@ -1915,6 +1924,13 @@ mod tests { "output file metadata should contain col b" ); + let key_value_metadata = key_value_metadata.unwrap(); + let my_metadata = key_value_metadata + .iter() + .filter(|kv| kv.key == "my-data") + .collect::>(); + assert_eq!(my_metadata.len(), 1); + Ok(()) } diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index d900d0031df3..80ad17270c30 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -26318,6 +26318,7 @@ impl<'de> serde::Deserialize<'de> for TableParquetOptions { Ok(TableParquetOptions { global: global__, column_specific_options: column_specific_options__.unwrap_or_default(), + key_value_metadata: None, }) } } diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index aaca4dc48236..5d779181976e 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -974,6 +974,7 @@ impl TryFrom<&protobuf::TableParquetOptions> for TableParquetOptions { .unwrap() .unwrap(), column_specific_options, + key_value_metadata: None, }) } }