Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Writing Arrow files #8608

Merged
merged 9 commits into from
Dec 24, 2023
Merged

Conversation

devinjdangelo
Copy link
Contributor

Which issue does this PR close?

Closes #8504

Rationale for this change

What changes are included in this PR?

Implements initial support for writing out arrow files via COPY TO and INSERT INTO for listing tables.

Are these changes tested?

Adds new sqllogictests to cover.

Are there any user-facing changes?

Writing arrow files is now possible

@github-actions github-actions bot added core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Dec 20, 2023
1 Foo
2 Bar

# Copy from dict encoded values to single arrow file
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know @tustvold's main concern was dictionaries. I think this test shows we are OK, but let me know if I am overlooking something.

datafusion-proto = { path = "datafusion/proto", version = "34.0.0" }
datafusion-sql = { path = "datafusion/sql", version = "34.0.0" }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, are these changes related? Looks like just moving lines around? Maybe you can revert unrelated change to keep diff smaller?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added arrow-ipc as a dependency for core, and ran cargo tomlfmt. I'm not sure why cargo tomlfmt changed so much of the formatting 🤔

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this PR @devinjdangelo -- I tried it out and it looks quite awesome.

I also tried it out locally and it works great.

I left a few suggestions which would be nice to address but I don't think are required to merge. We can do them as a follow on PR as well

❯ copy (values (1), (2)) to '/tmp/foo.arrow';
+-------+
| count |
+-------+
| 2     |
+-------+
1 row in set. Query took 0.030 seconds.
datafusion-cli -c "select * from '/tmp/foo.arrow'";
DataFusion CLI v34.0.0
+---------+
| column1 |
+---------+
| 1       |
| 2       |
+---------+
2 rows in set. Query took 0.028 seconds.

``shell
$ datafusion-cli -c "select arrow_typeof(column1) from '/tmp/foo.arrow'";
DataFusion CLI v34.0.0
+--------------------------------------+
| arrow_typeof(/tmp/foo.arrow.column1) |
+--------------------------------------+
| Int64 |
| Int64 |
+--------------------------------------+


👍 

) -> Result<u64> {
// No props are supported yet, but can be by updating FileTypeWriterOptions
// to populate this struct and use those options to initialize the arrow_ipc::writer::FileWriter
let _arrow_props = self.config.file_type_writer_options.try_into_arrow()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should track this as a follow on ticket and ideally leave a comment in the code pointing to the ticket so it eventually gets cleaned up

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. When we update FileTypeWriterProperties for arrow files, we should also take care to maintain serialization support which @andygrove has been working on.

Copy link
Contributor Author

@devinjdangelo devinjdangelo Dec 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #8635 and added comment linking to it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also filed apache/arrow-rs#5236 which would help with #8635 (though is not blocking)

let mut file_write_tasks: JoinSet<std::result::Result<usize, DataFusionError>> =
JoinSet::new();
while let Some((path, mut rx)) = file_stream_rx.recv().await {
let shared_buffer = SharedBuffer::new(1048576);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that if any record batch takes more than 1MB to write out we'll get an error?

Would it be possible to make this constant and 1024000 below into names constants with comments that explain what they do ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial buffer size is just a size hint for efficiency. It will grow beyond the set value if needed.

We can definitely make it a named constant, or even make it configurable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This buffer holds serialized bytes in memory until it is periodically uploaded to an object store. This is similar to how the parquet AsyncArrowWriter for parquet is implemented.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #8642 to name the constants

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
&self.get_writer_schema(),
)?;
let mut object_store_writer = create_writer(
FileCompressionType::UNCOMPRESSED,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably want to default this to a compressed version (and make it configurable later)? Default of pyarrow is lz4.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a quick look at the arrow_ipc::FileWriter code and it appears that the writer manages compression internally in batches. The referenced line controls whole file compression (like for CSV and JSON).

Since we are not setting a compression explicitly in DataFusion in this PR, we are inheriting the arrow-rs default compression. I think the arrow-rs default is also lz4, but I am not 100% sure glancing over the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking more closely, I was wrong. arrow_ipc defaults to uncompressed arrow files. I pushed up a change to default to lz4.

@alamb alamb merged commit d5704f7 into apache:main Dec 24, 2023
22 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support writing to Arrow files
4 participants