-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-36280: [Parquet][C++] FileWriter supports WriteTable in the buffered mode #36377
base: main
Are you sure you want to change the base?
Changes from all commits
9593e72
c306c6c
25b101e
30271c7
3424ee6
b9ff921
00aedf1
6818f3e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5024,6 +5024,41 @@ class TestBufferedParquetIO : public TestParquetIO<TestType> { | |
ASSERT_OK_NO_THROW(writer->Close()); | ||
} | ||
|
||
void WriteBufferedTable(const std::shared_ptr<Array>& values, | ||
int64_t write_table_batch_size, | ||
int64_t writer_properties_max_row_group_size, | ||
int64_t write_table_max_row_group_size, int* num_row_groups) { | ||
std::shared_ptr<GroupNode> schema = | ||
MakeSimpleSchema(*values->type(), Repetition::OPTIONAL); | ||
SchemaDescriptor descriptor; | ||
ASSERT_NO_THROW(descriptor.Init(schema)); | ||
std::shared_ptr<::arrow::Schema> arrow_schema; | ||
ArrowReaderProperties props; | ||
ASSERT_OK_NO_THROW(FromParquetSchema(&descriptor, props, &arrow_schema)); | ||
|
||
parquet::WriterProperties::Builder props_builder; | ||
props_builder.max_row_group_length(writer_properties_max_row_group_size); | ||
|
||
this->sink_ = CreateOutputStream(); | ||
auto low_level_writer = | ||
ParquetFileWriter::Open(this->sink_, schema, props_builder.build()); | ||
std::unique_ptr<FileWriter> writer; | ||
ASSERT_OK_NO_THROW(FileWriter::Make(::arrow::default_memory_pool(), | ||
std::move(low_level_writer), arrow_schema, | ||
default_arrow_writer_properties(), &writer)); | ||
EXPECT_EQ(0, values->length() % write_table_batch_size); | ||
for (int i = 0; i * write_table_batch_size < values->length(); i++) { | ||
wgtmac marked this conversation as resolved.
Show resolved
Hide resolved
|
||
std::shared_ptr<Array> sliced_array = | ||
values->Slice(i * write_table_batch_size, write_table_batch_size); | ||
std::vector<std::shared_ptr<Array>> arrays = {sliced_array}; | ||
auto table = ::arrow::Table::Make(arrow_schema, arrays, write_table_batch_size); | ||
ASSERT_OK_NO_THROW( | ||
writer->WriteTable(*table, write_table_max_row_group_size, true)); | ||
} | ||
ASSERT_OK_NO_THROW(writer->Close()); | ||
*num_row_groups = writer->metadata()->num_row_groups(); | ||
} | ||
|
||
void ReadAndCheckSingleColumnFile(const Array& values, int num_row_groups) { | ||
std::shared_ptr<Array> out; | ||
|
||
|
@@ -5057,23 +5092,123 @@ class TestBufferedParquetIO : public TestParquetIO<TestType> { | |
TYPED_TEST_SUITE(TestBufferedParquetIO, TestTypes); | ||
|
||
TYPED_TEST(TestBufferedParquetIO, SingleColumnOptionalBufferedWriteSmall) { | ||
constexpr int64_t batch_size = SMALL_SIZE / 4; | ||
constexpr size_t NUM_BATCHES = 4; | ||
constexpr int64_t batch_size = SMALL_SIZE / NUM_BATCHES; | ||
std::shared_ptr<Array> values; | ||
ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values)); | ||
ASSERT_OK( | ||
NullableArray<TypeParam>(SMALL_SIZE, /*num_nulls=*/10, kDefaultSeed, &values)); | ||
int num_row_groups = 0; | ||
this->WriteBufferedFile(values, batch_size, &num_row_groups); | ||
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, num_row_groups)); | ||
} | ||
|
||
TYPED_TEST(TestBufferedParquetIO, SingleColumnOptionalBufferedWriteLarge) { | ||
constexpr int64_t batch_size = LARGE_SIZE / 4; | ||
constexpr size_t NUM_BATCHES = 4; | ||
constexpr int64_t batch_size = LARGE_SIZE / NUM_BATCHES; | ||
std::shared_ptr<Array> values; | ||
ASSERT_OK(NullableArray<TypeParam>(LARGE_SIZE, 100, kDefaultSeed, &values)); | ||
ASSERT_OK( | ||
NullableArray<TypeParam>(LARGE_SIZE, /*num_nulls=*/100, kDefaultSeed, &values)); | ||
int num_row_groups = 0; | ||
this->WriteBufferedFile(values, batch_size, &num_row_groups); | ||
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, num_row_groups)); | ||
} | ||
|
||
TYPED_TEST(TestBufferedParquetIO, WriteTableSmall) { | ||
std::shared_ptr<Array> values; | ||
ASSERT_OK( | ||
NullableArray<TypeParam>(SMALL_SIZE, /*num_nulls=*/10, kDefaultSeed, &values)); | ||
int num_row_groups = 0; | ||
// Write all table with one batch. | ||
int64_t write_table_batch_size = SMALL_SIZE; | ||
int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH; | ||
int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH; | ||
this->WriteBufferedTable(values, write_table_batch_size, write_table_max_row_group_size, | ||
write_max_row_group_size, &num_row_groups); | ||
EXPECT_EQ(1, num_row_groups); | ||
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, num_row_groups)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are there tests with more than one column somewhere? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No. I'll add it |
||
} | ||
|
||
TYPED_TEST(TestBufferedParquetIO, WriteTableLarge) { | ||
std::shared_ptr<Array> values; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you also void repeating yourself in the tests below? |
||
ASSERT_OK( | ||
NullableArray<TypeParam>(LARGE_SIZE, /*num_nulls=*/100, kDefaultSeed, &values)); | ||
int num_row_groups = 0; | ||
// Write all table with one batch. | ||
int64_t write_table_batch_size = LARGE_SIZE; | ||
int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH; | ||
int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH; | ||
this->WriteBufferedTable(values, write_table_batch_size, write_table_max_row_group_size, | ||
write_max_row_group_size, &num_row_groups); | ||
EXPECT_EQ(1, num_row_groups); | ||
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, num_row_groups)); | ||
} | ||
|
||
TYPED_TEST(TestBufferedParquetIO, WriteTableInBatches) { | ||
std::shared_ptr<Array> values; | ||
constexpr size_t NUM_BATCHES = 4; | ||
ASSERT_OK( | ||
NullableArray<TypeParam>(SMALL_SIZE, /*num_nulls=*/10, kDefaultSeed, &values)); | ||
int num_row_groups = 0; | ||
// Write all table with four batches. | ||
int64_t write_table_batch_size = SMALL_SIZE / NUM_BATCHES; | ||
int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH; | ||
int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH; | ||
this->WriteBufferedTable(values, write_table_batch_size, write_table_max_row_group_size, | ||
write_max_row_group_size, &num_row_groups); | ||
EXPECT_EQ(1, num_row_groups); | ||
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, num_row_groups)); | ||
} | ||
|
||
TYPED_TEST(TestBufferedParquetIO, WriteTableWithRowGroupSizeLimit) { | ||
constexpr size_t NUM_BATCHES = 4; | ||
std::shared_ptr<Array> values; | ||
ASSERT_OK( | ||
NullableArray<TypeParam>(SMALL_SIZE, /*num_nulls=*/10, kDefaultSeed, &values)); | ||
int num_row_groups = 0; | ||
// Write all table in one batch with a small max_row_group_size. | ||
{ | ||
int64_t write_table_batch_size = SMALL_SIZE; | ||
int64_t parquet_writer_max_row_group_size = SMALL_SIZE / NUM_BATCHES; | ||
int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH; | ||
this->WriteBufferedTable(values, write_table_batch_size, | ||
parquet_writer_max_row_group_size, write_max_row_group_size, | ||
&num_row_groups); | ||
EXPECT_EQ(4, num_row_groups); | ||
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, num_row_groups)); | ||
} | ||
} | ||
|
||
TYPED_TEST(TestBufferedParquetIO, WriteTableWithRowGroupSizeLimit2) { | ||
constexpr size_t NUM_BATCHES = 4; | ||
std::shared_ptr<Array> values; | ||
ASSERT_OK( | ||
NullableArray<TypeParam>(SMALL_SIZE, /*num_nulls=*/10, kDefaultSeed, &values)); | ||
int num_row_groups = 0; | ||
int64_t write_table_batch_size = SMALL_SIZE; | ||
int64_t parquet_writer_max_row_group_size = SMALL_SIZE; | ||
int64_t write_max_row_group_size = SMALL_SIZE / NUM_BATCHES; | ||
this->WriteBufferedTable(values, write_table_batch_size, | ||
parquet_writer_max_row_group_size, write_max_row_group_size, | ||
&num_row_groups); | ||
EXPECT_EQ(4, num_row_groups); | ||
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, num_row_groups)); | ||
} | ||
|
||
TYPED_TEST(TestBufferedParquetIO, WriteTableInBatchesForMultiRowGroups) { | ||
std::shared_ptr<Array> values; | ||
ASSERT_OK( | ||
NullableArray<TypeParam>(SMALL_SIZE, /*num_nulls=*/10, kDefaultSeed, &values)); | ||
int num_row_groups = 0; | ||
int64_t write_table_batch_size = SMALL_SIZE / 10; | ||
int64_t parquet_writer_max_row_group_size = SMALL_SIZE; | ||
int64_t write_max_row_group_size = SMALL_SIZE / 4; | ||
this->WriteBufferedTable(values, write_table_batch_size, | ||
parquet_writer_max_row_group_size, write_max_row_group_size, | ||
&num_row_groups); | ||
EXPECT_EQ(4, num_row_groups); | ||
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, num_row_groups)); | ||
} | ||
|
||
TEST(TestArrowReadWrite, WriteAndReadRecordBatch) { | ||
auto pool = ::arrow::default_memory_pool(); | ||
auto sink = CreateOutputStream(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -274,6 +274,16 @@ class ArrowColumnWriterV2 { | |
RowGroupWriter* row_group_writer_; | ||
}; | ||
|
||
std::shared_ptr<ChunkedArray> GetColumnChunkedArray(const ::arrow::RecordBatch& value, | ||
int column_id) { | ||
return std::make_shared<::arrow::ChunkedArray>(value.column(column_id)); | ||
} | ||
|
||
std::shared_ptr<ChunkedArray> GetColumnChunkedArray(const ::arrow::Table& value, | ||
int column_id) { | ||
return value.column(column_id); | ||
} | ||
|
||
} // namespace | ||
|
||
// ---------------------------------------------------------------------- | ||
|
@@ -356,7 +366,10 @@ class FileWriterImpl : public FileWriter { | |
|
||
std::shared_ptr<::arrow::Schema> schema() const override { return schema_; } | ||
|
||
Status WriteTable(const Table& table, int64_t chunk_size) override { | ||
template <typename T> | ||
Status WriteBuffered(const T& batch, int64_t max_row_group_length); | ||
wgtmac marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
Status WriteTable(const Table& table, int64_t chunk_size, bool use_buffering) override { | ||
RETURN_NOT_OK(table.Validate()); | ||
|
||
if (chunk_size <= 0 && table.num_rows() > 0) { | ||
|
@@ -369,27 +382,10 @@ class FileWriterImpl : public FileWriter { | |
chunk_size = this->properties().max_row_group_length(); | ||
} | ||
|
||
auto WriteRowGroup = [&](int64_t offset, int64_t size) { | ||
RETURN_NOT_OK(NewRowGroup(size)); | ||
for (int i = 0; i < table.num_columns(); i++) { | ||
RETURN_NOT_OK(WriteColumnChunk(table.column(i), offset, size)); | ||
} | ||
return Status::OK(); | ||
}; | ||
|
||
if (table.num_rows() == 0) { | ||
// Append a row group with 0 rows | ||
RETURN_NOT_OK_ELSE(WriteRowGroup(0, 0), PARQUET_IGNORE_NOT_OK(Close())); | ||
return Status::OK(); | ||
if (use_buffering) { | ||
return WriteBuffered(table, chunk_size); | ||
} | ||
|
||
for (int chunk = 0; chunk * chunk_size < table.num_rows(); chunk++) { | ||
int64_t offset = chunk * chunk_size; | ||
RETURN_NOT_OK_ELSE( | ||
WriteRowGroup(offset, std::min(chunk_size, table.num_rows() - offset)), | ||
PARQUET_IGNORE_NOT_OK(Close())); | ||
} | ||
return Status::OK(); | ||
return WriteTableUnbuffered(table, chunk_size); | ||
} | ||
|
||
Status NewBufferedRowGroup() override { | ||
|
@@ -406,57 +402,7 @@ class FileWriterImpl : public FileWriter { | |
} | ||
|
||
// Max number of rows allowed in a row group. | ||
const int64_t max_row_group_length = this->properties().max_row_group_length(); | ||
|
||
if (row_group_writer_ == nullptr || !row_group_writer_->buffered() || | ||
row_group_writer_->num_rows() >= max_row_group_length) { | ||
RETURN_NOT_OK(NewBufferedRowGroup()); | ||
} | ||
|
||
auto WriteBatch = [&](int64_t offset, int64_t size) { | ||
std::vector<std::unique_ptr<ArrowColumnWriterV2>> writers; | ||
int column_index_start = 0; | ||
|
||
for (int i = 0; i < batch.num_columns(); i++) { | ||
ChunkedArray chunked_array{batch.column(i)}; | ||
ARROW_ASSIGN_OR_RAISE( | ||
std::unique_ptr<ArrowColumnWriterV2> writer, | ||
ArrowColumnWriterV2::Make(chunked_array, offset, size, schema_manifest_, | ||
row_group_writer_, column_index_start)); | ||
column_index_start += writer->leaf_count(); | ||
if (arrow_properties_->use_threads()) { | ||
writers.emplace_back(std::move(writer)); | ||
} else { | ||
RETURN_NOT_OK(writer->Write(&column_write_context_)); | ||
} | ||
} | ||
|
||
if (arrow_properties_->use_threads()) { | ||
mapleFU marked this conversation as resolved.
Show resolved
Hide resolved
|
||
DCHECK_EQ(parallel_column_write_contexts_.size(), writers.size()); | ||
RETURN_NOT_OK(::arrow::internal::ParallelFor( | ||
static_cast<int>(writers.size()), | ||
[&](int i) { return writers[i]->Write(¶llel_column_write_contexts_[i]); }, | ||
arrow_properties_->executor())); | ||
} | ||
|
||
return Status::OK(); | ||
}; | ||
|
||
int64_t offset = 0; | ||
while (offset < batch.num_rows()) { | ||
const int64_t batch_size = | ||
std::min(max_row_group_length - row_group_writer_->num_rows(), | ||
batch.num_rows() - offset); | ||
RETURN_NOT_OK(WriteBatch(offset, batch_size)); | ||
offset += batch_size; | ||
|
||
// Flush current row group if it is full. | ||
if (row_group_writer_->num_rows() >= max_row_group_length) { | ||
RETURN_NOT_OK(NewBufferedRowGroup()); | ||
} | ||
} | ||
|
||
return Status::OK(); | ||
return WriteBuffered(batch, this->properties().max_row_group_length()); | ||
} | ||
|
||
const WriterProperties& properties() const { return *writer_->properties(); } | ||
|
@@ -469,6 +415,30 @@ class FileWriterImpl : public FileWriter { | |
return writer_->metadata(); | ||
} | ||
|
||
Status WriteTableUnbuffered(const Table& table, int64_t chunk_size) { | ||
wgtmac marked this conversation as resolved.
Show resolved
Hide resolved
|
||
auto WriteRowGroup = [&](int64_t offset, int64_t size) { | ||
RETURN_NOT_OK(NewRowGroup(size)); | ||
for (int i = 0; i < table.num_columns(); i++) { | ||
RETURN_NOT_OK(WriteColumnChunk(table.column(i), offset, size)); | ||
} | ||
return Status::OK(); | ||
}; | ||
|
||
if (table.num_rows() == 0) { | ||
// Append a row group with 0 rows | ||
RETURN_NOT_OK_ELSE(WriteRowGroup(0, 0), PARQUET_IGNORE_NOT_OK(Close())); | ||
return Status::OK(); | ||
} | ||
|
||
for (int chunk = 0; chunk * chunk_size < table.num_rows(); chunk++) { | ||
int64_t offset = chunk * chunk_size; | ||
RETURN_NOT_OK_ELSE( | ||
WriteRowGroup(offset, std::min(chunk_size, table.num_rows() - offset)), | ||
PARQUET_IGNORE_NOT_OK(Close())); | ||
pitrou marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
return Status::OK(); | ||
} | ||
|
||
private: | ||
friend class FileWriter; | ||
|
||
|
@@ -488,6 +458,59 @@ class FileWriterImpl : public FileWriter { | |
std::vector<ArrowWriteContext> parallel_column_write_contexts_; | ||
}; | ||
|
||
template <typename T> | ||
Status FileWriterImpl::WriteBuffered(const T& batch, int64_t max_row_group_length) { | ||
if (row_group_writer_ == nullptr || !row_group_writer_->buffered() || | ||
row_group_writer_->num_rows() >= max_row_group_length) { | ||
RETURN_NOT_OK(NewBufferedRowGroup()); | ||
} | ||
|
||
auto WriteBatch = [&](int64_t offset, int64_t size) { | ||
std::vector<std::unique_ptr<ArrowColumnWriterV2>> writers; | ||
int column_index_start = 0; | ||
|
||
for (int i = 0; i < batch.num_columns(); i++) { | ||
std::shared_ptr<ChunkedArray> chunked_array = GetColumnChunkedArray(batch, i); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is a bit wasteful to call this for each chunk. Why not use a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. std::shared_ptr<ChunkedArray> GetColumnChunkedArray(const ::arrow::RecordBatch& value,
int column_id) {
return std::make_shared<::arrow::ChunkedArray>(value.column(column_id));
}
std::shared_ptr<ChunkedArray> GetColumnChunkedArray(const ::arrow::Table& value,
int column_id) {
return value.column(column_id);
} Seems that here I want to call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, then you can simply call |
||
ARROW_ASSIGN_OR_RAISE( | ||
std::unique_ptr<ArrowColumnWriterV2> writer, | ||
ArrowColumnWriterV2::Make(*chunked_array, offset, size, schema_manifest_, | ||
row_group_writer_, column_index_start)); | ||
column_index_start += writer->leaf_count(); | ||
if (arrow_properties_->use_threads()) { | ||
writers.emplace_back(std::move(writer)); | ||
} else { | ||
RETURN_NOT_OK(writer->Write(&column_write_context_)); | ||
} | ||
} | ||
|
||
if (arrow_properties_->use_threads()) { | ||
DCHECK_EQ(parallel_column_write_contexts_.size(), writers.size()); | ||
RETURN_NOT_OK(::arrow::internal::ParallelFor( | ||
static_cast<int>(writers.size()), | ||
[&](int i) { return writers[i]->Write(¶llel_column_write_contexts_[i]); }, | ||
arrow_properties_->executor())); | ||
} | ||
|
||
return Status::OK(); | ||
}; | ||
|
||
int64_t offset = 0; | ||
while (offset < batch.num_rows()) { | ||
const int64_t batch_size = std::min( | ||
max_row_group_length - row_group_writer_->num_rows(), batch.num_rows() - offset); | ||
RETURN_NOT_OK(WriteBatch(offset, batch_size)); | ||
offset += batch_size; | ||
|
||
// Flush current row group if it is full. | ||
if (row_group_writer_->num_rows() >= max_row_group_length && | ||
offset < batch.num_rows()) { | ||
RETURN_NOT_OK(NewBufferedRowGroup()); | ||
} | ||
} | ||
|
||
return Status::OK(); | ||
} | ||
|
||
FileWriter::~FileWriter() {} | ||
|
||
Status FileWriter::Make(::arrow::MemoryPool* pool, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please refactor this with
WriteBufferedFile
to avoid copy-pasting entire chunks of code?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm I guess I use some different logic here, I'll try to unify them