Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend insert into to support Parquet backed tables #7244

Merged
merged 11 commits into from
Aug 13, 2023

Conversation

devinjdangelo
Copy link
Contributor

@devinjdangelo devinjdangelo commented Aug 9, 2023

Which issue does this PR close?

None, but progresses towards the goals of #5076 and #7079. Follow on to #7141 and #7212 .

Part of #7036

Rationale for this change

Inserting to parquet backed tables is a common use case. The file format itself does not support appending to an existing file, so only appending new files to a table is supported.

This implementation relies on AsyncArrowWriter to handle coordination between the Parquet serializer and ObjectStore writer. So, the implementation for parquet insert into support diverges a bit from JSON and CSV.

What changes are included in this PR?

  • Added parquet write config options to ExecutionOptions
  • Allow specifying parquet schema so empty tables can be created
  • Move write specific code to write.rs mod
  • Add test coverage for inserting to parquet table with or without session config changes

Are these changes tested?

Yes.

Are there any user-facing changes?

Inserting to parquet backed tables is possible.

@github-actions github-actions bot added core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Aug 9, 2023
@devinjdangelo
Copy link
Contributor Author

@alamb @metesynnada Interested in your thoughts on the approach here specifically on putting parquet write options in the session config and using AsyncArrowWriter from the parquet crate vs. a custom implementation within DataFusion.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is looking great @devinjdangelo -- thank you 🦾

@alamb @metesynnada Interested in your thoughts on the approach here specifically on putting parquet write options in the session config

I also left some specific comments -- I think it is a good start and we can add statement level overrides later if people want them

and using AsyncArrowWriter from the parquet crate vs. a custom implementation within DataFusion.

I think this is a great idea for several reasons:

  1. It will drive improvements upstream (if any are required) where others can benefit (and contribute) to them
  2. It will be a nice example of how to use the AsyncArrowWriter API
  3. If successful, we can request / help create equivalents for AsyncCsvWriter and AsyncJsonWriter perhaps upstream

cc @tustvold for your amusement

// The following map to parquet::file::properties::WriterProperties

/// Sets best effort maximum size of data page in bytes
pub data_pagesize_limit: usize, default = 1024 * 1024
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is an interesting question if we want these settings to be session level (as this change proposes) or if they should be per-sql level 🤔

I suppose having a session level default would make sense and if we want to add per statement overrides (like COPY TO <file> AS PARQUET (DATA_PAGE_ROW_COUNT_LIMIT 100000)) we can always do that afterwards as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we will definitely want COPY TO to be able to set any of these configs on a per statement basis.

For insert into, we could allow the table itself to be registered with specific settings e.g.:

create external table my_table(x int, y int) 
stored as parquet
location '/tmp/my_table' 
WITH (
DATA_PAGESIZE_LIMIT 2048,
DATA_PAGE_ROW_COUNT_LIMIT 100000)
...
);

insert into mytable would then use any table specific settings or fall back to the session level configs.

}
// TODO macro not working with Option<f64> or Option<u64>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 that would be cool to clean up if possible prior to a real PR -- let me know if you need some help

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fixed! It was as simple as adding:

config_field!(f64);
config_field!(u64);

/// based on data in file.
pub schema: Option<&'a Schema>,
/// Indicates how the file is sorted
pub file_sort_order: Vec<Vec<Expr>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 ths is very nice

@@ -250,6 +266,18 @@ impl<'a> ParquetReadOptions<'a> {
self.table_partition_cols = table_partition_cols;
self
}

/// Configure if file has known sort order
pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<Expr>>) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this feature was requested by @bmmeijers in #7036

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added "closes #7036" to description since that appears to be the case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested out the usecase in #7036 and I sadly think there is a little bit more to go

❯ create external table foo stored as parquet location '/tmp/foo' with order (time);
Error during planning: Provide a schema before specifying the order while creating a table.
❯ create external table foo(cpu varchar, host1 varchar, time timestamp) stored as parquet location '/tmp/foo' with order (time);
Error during planning: Column definitions can not be specified for PARQUET files.

But now that you have added the underlying feature implementation I think making it happen will be a small matter of plumbing

I updated the PR description to say "part of #7036"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the fact I missed this highlights the need for some additional test cases, particularly for end to end creating and inserting with only SQL. I'll work on a PR today to address this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think one challenge you'll find is that it is not possible to tables and insert into them via SQL yet easily. However, trying to write tests will likely expose those rough edges and perhaps allow you to fix them in the process -- I can't wait to see what you come up with.

let mut builder = WriterProperties::builder()
.set_created_by(parquet_context.created_by.clone())
.set_data_page_row_count_limit(parquet_context.data_page_row_count_limit)
.set_data_page_size_limit(parquet_context.data_pagesize_limit);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it feels like there are some other properties to copy across to (like created by, etc)

Copy link
Contributor Author

@devinjdangelo devinjdangelo Aug 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes more to do here. Some of the WriterProperties configs are not primitive types (such as compression and statistics). Still working through this, but the plan is to have the Session level config be a string and implement TryFrom(String) for these types. Of course since these types/traits are defined in other crates, we would either need to implement TryFrom in arrow-rs or have a wrapper type in Datafusion.

Let me know if we already have a pattern to handle configs that ultimately need to map to non primitive types that I just missed. 🤔

Copy link
Contributor Author

@devinjdangelo devinjdangelo Aug 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this setup, it would also be better UX to push parsing the string up to when the user initially sets the config, so the user can get immediate feedback rather than a runtime error if they provide an invalid option. E.g.:

set execution.parquet.compression = exotic_compression_123;
PropertyError: unknown or unsupported parquet compression codec: exotic_compression_123

Otherwise, it would seem that the setting is valid and only fail much later when a write is attempted.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise, it would seem that the setting is valid and only fail much later when a write is attempted.

I agree this is a non ideal UX.

However, I think storing the config parameters as strings would be acceptable as an initial implementation, especially as we don't apply validation yet for other config settings:

❯ set datafusion.sql_parser.dialect = 'wowsa';
0 rows in set. Query took 0.000 seconds.

❯ select 'foo';
Error during planning: Unsupported SQL dialect: wowsa. Available dialects: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi

Mostly I am trying to make sure you don't feel the need to have to fix everything in one go (though of course, if you wanted to add a generic validation framework for config options, that would be amazing)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed on not doing too much in this one PR. In the current state of this PR we support customizing default parquet writer properties via session config options. Column-by-column customization support needs to be added, and table and/or statement level override support needs to be added. Config validation framework + moving some of the validation code from this PR up to when configs are parsed would be a nice enhancement as well.

We could cut issues to track for future PRs though so this one does not get too complicated or take too long.

}

let mut row_count = 0;
// TODO parallelize serialization accross partitions and batches within partitions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am digging the fact all the writers look quite similar -- we'll be able to sort out parallelizing the writes in one place and use it for all the formats ❤️

// specific language governing permissions and limitations
// under the License.

//! Module containing helper methods/traits related to enabling
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😍

@@ -448,8 +448,10 @@ async fn test_substring_expr() -> Result<()> {
Ok(())
}

/// Test string expressions test split into two batches
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flagging this as it is a bit strange. This test was generating a stackoverflow error consistently until I split it into two batches. I have added a good number of config options to the SessionContext, which I suppose has grown the amount of bytes we are storing on the stack for each SessionContext.

Is there a larger concern here that as we add more config options, we are filling up the stack? I think probably not as it seems this test is creating a very large number of SessionContexts all at once. Perhaps that is not an unusual usecase downstream though 🤔. A single thread on a server responding to many concurrent requests may create many SessionContexts one for each async task and hit a stackoverflow?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not concerned -- your comment is good enough

What we have found is that this large stack frame thing only affects debug builds (I think because the rust compiler isn't reusing stack space so debugger output is better) -- release builds have much smaller stack requirements

In this case could probably make individual test functions for each expression rather than a few functions that each test more than one function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the info, good to know this is specific to debug builds!

@devinjdangelo devinjdangelo marked this pull request as ready for review August 11, 2023 12:51
}

#[tokio::test]
async fn test_insert_into_append_new_parquet_files_session_overrides() -> Result<()> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to add additional tests for writing parquet that fuzz these input settings and write out some larger parquet files so that the configs actually have some effect.

@alamb
Copy link
Contributor

alamb commented Aug 12, 2023

Thanks @devinjdangelo -- I am sorry I am about to run out of time today, but I plan to review this PR carefully tomorrow

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @devinjdangelo -- this is a great step forward

While I was hoping this PR made writing to parquet files possible, I think we are getting close -- and this PR lays a good foundation and hooking up the code.

Thanks again.

@@ -250,6 +266,18 @@ impl<'a> ParquetReadOptions<'a> {
self.table_partition_cols = table_partition_cols;
self
}

/// Configure if file has known sort order
pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<Expr>>) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested out the usecase in #7036 and I sadly think there is a little bit more to go

❯ create external table foo stored as parquet location '/tmp/foo' with order (time);
Error during planning: Provide a schema before specifying the order while creating a table.
❯ create external table foo(cpu varchar, host1 varchar, time timestamp) stored as parquet location '/tmp/foo' with order (time);
Error during planning: Column definitions can not be specified for PARQUET files.

But now that you have added the underlying feature implementation I think making it happen will be a small matter of plumbing

I updated the PR description to say "part of #7036"

fn parse_encoding_string(str_setting: &str) -> Result<parquet::basic::Encoding> {
let str_setting_lower: &str = &str_setting.to_lowercase();
match str_setting_lower {
"plain" => Ok(parquet::basic::Encoding::PLAIN),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 this code could also probably be upstreamed to arrow-rs -- I'll file at ticket to suggest it

Copy link
Contributor

@alamb alamb Aug 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed apache/arrow-rs#4693 and #7275 to leave a reference in DataFusion

@@ -448,8 +448,10 @@ async fn test_substring_expr() -> Result<()> {
Ok(())
}

/// Test string expressions test split into two batches
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not concerned -- your comment is good enough

What we have found is that this large stack frame thing only affects debug builds (I think because the rust compiler isn't reusing stack space so debugger output is better) -- release builds have much smaller stack requirements

In this case could probably make individual test functions for each expression rather than a few functions that each test more than one function

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants