Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend insert into to support Parquet backed tables #7244

Merged
merged 11 commits into from
Aug 13, 2023
64 changes: 63 additions & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ config_namespace! {
}

config_namespace! {
/// Options related to reading of parquet files
/// Options related to parquet files
pub struct ParquetOptions {
/// If true, reads the Parquet data page level metadata (the
/// Page Index), if present, to reduce the I/O and number of
Expand Down Expand Up @@ -286,6 +286,66 @@ config_namespace! {
/// will be reordered heuristically to minimize the cost of evaluation. If false,
/// the filters are applied in the same order as written in the query
pub reorder_filters: bool, default = false

// The following map to parquet::file::properties::WriterProperties

/// Sets best effort maximum size of data page in bytes
pub data_pagesize_limit: usize, default = 1024 * 1024
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is an interesting question if we want these settings to be session level (as this change proposes) or if they should be per-sql level 🤔

I suppose having a session level default would make sense and if we want to add per statement overrides (like COPY TO <file> AS PARQUET (DATA_PAGE_ROW_COUNT_LIMIT 100000)) we can always do that afterwards as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we will definitely want COPY TO to be able to set any of these configs on a per statement basis.

For insert into, we could allow the table itself to be registered with specific settings e.g.:

create external table my_table(x int, y int) 
stored as parquet
location '/tmp/my_table' 
WITH (
DATA_PAGESIZE_LIMIT 2048,
DATA_PAGE_ROW_COUNT_LIMIT 100000)
...
);

insert into mytable would then use any table specific settings or fall back to the session level configs.


/// Sets write_batch_size in bytes
pub write_batch_size: usize, default = 1024

/// Sets parquet writer version
/// valid values are "1.0" and "2.0"
pub writer_version: String, default = "1.0".into()

/// Sets default parquet compression codec
/// Valid values are: uncompressed, snappy, gzip(level),
/// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
/// These values are not case sensitive.
pub compression: String, default = "snappy".into()

/// Sets if dictionary encoding is enabled
pub dictionary_enabled: bool, default = true

/// Sets best effort maximum dictionary page size, in bytes
pub dictionary_page_size_limit: usize, default = 1024 * 1024

/// Sets if statistics are enabled for any column
/// Valid values are: "none", "chunk", and "page"
/// These values are not case sensitive.
pub statistics_enabled: String, default = "page".into()

/// Sets max statistics size for any column
pub max_statistics_size: usize, default = 4096

/// Sets maximum number of rows in a row group
pub max_row_group_size: usize, default = 1024 * 1024

/// Sets "created by" property
pub created_by: String, default = concat!("datafusion version ", env!("CARGO_PKG_VERSION")).into()

/// Sets column index trucate length
pub column_index_truncate_length: Option<usize>, default = None

/// Sets best effort maximum number of rows in data page
pub data_page_row_count_limit: usize, default = usize::MAX

/// Sets default encoding for any column
/// Valid values are: plain, plain_dictionary, rle,
/// bit_packed, delta_binary_packed, delta_length_byte_array,
/// delta_byte_array, rle_dictionary, and byte_stream_split.
/// These values are not case sensitive.
pub encoding: String, default = "plain".into()

/// Sets if bloom filter is enabled for any column
pub bloom_filter_enabled: bool, default = false

/// Sets bloom filter false positive probability
pub bloom_filter_fpp: f64, default = 0.05

/// Sets bloom filter number of distinct values
pub bloom_filter_ndv: u64, default = 1_000_000_u64
}
}

Expand Down Expand Up @@ -745,6 +805,8 @@ macro_rules! config_field {
config_field!(String);
config_field!(bool);
config_field!(usize);
config_field!(f64);
config_field!(u64);

/// An implementation trait used to recursively walk configuration
trait Visit {
Expand Down
7 changes: 4 additions & 3 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore};

use super::{create_writer, stateless_serialize_and_write_files, FileFormat};
use super::{FileFormat, DEFAULT_SCHEMA_INFER_MAX_RECORD};
use crate::datasource::file_format::file_type::FileCompressionType;
use crate::datasource::file_format::FileWriterMode;
use crate::datasource::file_format::{BatchSerializer, DEFAULT_SCHEMA_INFER_MAX_RECORD};
use crate::datasource::file_format::write::{
create_writer, stateless_serialize_and_write_files, BatchSerializer, FileWriterMode,
};
use crate::datasource::physical_plan::{
CsvExec, FileGroupDisplay, FileScanConfig, FileSinkConfig,
};
Expand Down
7 changes: 3 additions & 4 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,12 @@ use crate::physical_plan::insert::InsertExec;
use crate::physical_plan::SendableRecordBatchStream;
use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};

use super::create_writer;
use super::stateless_serialize_and_write_files;
use super::BatchSerializer;
use super::FileFormat;
use super::FileScanConfig;
use super::FileWriterMode;
use crate::datasource::file_format::file_type::FileCompressionType;
use crate::datasource::file_format::write::{
create_writer, stateless_serialize_and_write_files, BatchSerializer, FileWriterMode,
};
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::datasource::physical_plan::FileSinkConfig;
use crate::datasource::physical_plan::NdJsonExec;
Expand Down
Loading