Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-36280: [Parquet][C++] FileWriter supports WriteTable in the buffered mode #36377

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

mapleFU
Copy link
Member

@mapleFU mapleFU commented Jun 29, 2023

Rationale for this change

Allow a buffered WriteTable

What changes are included in this PR?

  1. add use_buffering in WriteTable
  2. Add a WriteBuffered in parquet::arrow::FileWriterImpl, and move logic of parquet::arrow::FileWriterImpl::WriteRecordBatch to it.
  3. Testing

Are these changes tested?

Yes

Are there any user-facing changes?

User can write table with buffered row-group

@mapleFU mapleFU marked this pull request as ready for review June 30, 2023 08:44
@mapleFU
Copy link
Member Author

mapleFU commented Jun 30, 2023

@jorisvandenbossche @pitrou Mind take a look? WriteTable with buffering is added now

cpp/src/parquet/arrow/writer.cc Outdated Show resolved Hide resolved
cpp/src/parquet/arrow/writer.cc Outdated Show resolved Hide resolved
cpp/src/parquet/arrow/writer.cc Show resolved Hide resolved
cpp/src/parquet/arrow/writer.h Show resolved Hide resolved
cpp/src/parquet/arrow/writer.cc Show resolved Hide resolved
cpp/src/parquet/arrow/writer.cc Show resolved Hide resolved
@github-actions github-actions bot added awaiting committer review Awaiting committer review and removed awaiting review Awaiting review labels Jul 4, 2023
Copy link
Member

@wgtmac wgtmac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, most comments are addressing readability

cpp/src/parquet/arrow/writer.h Show resolved Hide resolved
cpp/src/parquet/arrow/writer.cc Show resolved Hide resolved
cpp/src/parquet/arrow/arrow_reader_writer_test.cc Outdated Show resolved Hide resolved
cpp/src/parquet/arrow/arrow_reader_writer_test.cc Outdated Show resolved Hide resolved
cpp/src/parquet/arrow/arrow_reader_writer_test.cc Outdated Show resolved Hide resolved
cpp/src/parquet/arrow/arrow_reader_writer_test.cc Outdated Show resolved Hide resolved
cpp/src/parquet/arrow/arrow_reader_writer_test.cc Outdated Show resolved Hide resolved
cpp/src/parquet/arrow/arrow_reader_writer_test.cc Outdated Show resolved Hide resolved
cpp/src/parquet/arrow/arrow_reader_writer_test.cc Outdated Show resolved Hide resolved
@wgtmac wgtmac changed the title GH-36280: [Parquet][C++] parquet::arrow::FileWriter support table buffered write GH-36280: [Parquet][C++] parquet::arrow::FileWriter supports WriteTable in buffered mode Jul 8, 2023
@mapleFU
Copy link
Member Author

mapleFU commented Jul 11, 2023

I guess this would also closes: #31405

@mapleFU
Copy link
Member Author

mapleFU commented Jul 13, 2023

@pitrou would you mind take a look at this? This add a buffering mode for WriteTable, and later can benefits Python side write_table without flush a chunk during every write

Copy link
Member

@wgtmac wgtmac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@wgtmac wgtmac changed the title GH-36280: [Parquet][C++] parquet::arrow::FileWriter supports WriteTable in buffered mode GH-36280: [Parquet][C++] FileWriter supports WriteTable in the buffered mode Jul 16, 2023
@mapleFU mapleFU requested a review from pitrou July 19, 2023 09:01
int column_index_start = 0;

for (int i = 0; i < batch.num_columns(); i++) {
std::shared_ptr<ChunkedArray> chunked_array = GetColumnChunkedArray(batch, i);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit wasteful to call this for each chunk. Why not use a RecordBatchReader instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::shared_ptr<ChunkedArray> GetColumnChunkedArray(const ::arrow::RecordBatch& value,
                                                    int column_id) {
  return std::make_shared<::arrow::ChunkedArray>(value.column(column_id));
}

std::shared_ptr<ChunkedArray> GetColumnChunkedArray(const ::arrow::Table& value,
                                                    int column_id) {
  return value.column(column_id);
}

Seems that here I want to call ArrowColumnWriterV2::Make, which requires a ChunkedArray, so maybe I'm not sure why this is expensive, and how RecordBatch can handling this...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, then you can simply call GetColumnChunkedArray(batch, i) outside of WriteBatch. It does not depend on offset/size.

@@ -5024,6 +5024,41 @@ class TestBufferedParquetIO : public TestParquetIO<TestType> {
ASSERT_OK_NO_THROW(writer->Close());
}

void WriteBufferedTable(const std::shared_ptr<Array>& values,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please refactor this with WriteBufferedFile to avoid copy-pasting entire chunks of code?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm I guess I use some different logic here, I'll try to unify them

}

TYPED_TEST(TestBufferedParquetIO, WriteTableLarge) {
std::shared_ptr<Array> values;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also void repeating yourself in the tests below?

this->WriteBufferedTable(values, write_table_batch_size, write_table_max_row_group_size,
write_max_row_group_size, &num_row_groups);
EXPECT_EQ(1, num_row_groups);
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, num_row_groups));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there tests with more than one column somewhere?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. I'll add it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Python][Parquet] Allow write_batch to directly write batch
3 participants