Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-36280: [Parquet][C++] FileWriter supports WriteTable in the buffered mode #36377

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 139 additions & 4 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5024,6 +5024,41 @@ class TestBufferedParquetIO : public TestParquetIO<TestType> {
ASSERT_OK_NO_THROW(writer->Close());
}

void WriteBufferedTable(const std::shared_ptr<Array>& values,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please refactor this with WriteBufferedFile to avoid copy-pasting entire chunks of code?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm I guess I use some different logic here, I'll try to unify them

int64_t write_table_batch_size,
int64_t writer_properties_max_row_group_size,
int64_t write_table_max_row_group_size, int* num_row_groups) {
std::shared_ptr<GroupNode> schema =
MakeSimpleSchema(*values->type(), Repetition::OPTIONAL);
SchemaDescriptor descriptor;
ASSERT_NO_THROW(descriptor.Init(schema));
std::shared_ptr<::arrow::Schema> arrow_schema;
ArrowReaderProperties props;
ASSERT_OK_NO_THROW(FromParquetSchema(&descriptor, props, &arrow_schema));

parquet::WriterProperties::Builder props_builder;
props_builder.max_row_group_length(writer_properties_max_row_group_size);

this->sink_ = CreateOutputStream();
auto low_level_writer =
ParquetFileWriter::Open(this->sink_, schema, props_builder.build());
std::unique_ptr<FileWriter> writer;
ASSERT_OK_NO_THROW(FileWriter::Make(::arrow::default_memory_pool(),
std::move(low_level_writer), arrow_schema,
default_arrow_writer_properties(), &writer));
EXPECT_EQ(0, values->length() % write_table_batch_size);
for (int i = 0; i * write_table_batch_size < values->length(); i++) {
wgtmac marked this conversation as resolved.
Show resolved Hide resolved
std::shared_ptr<Array> sliced_array =
values->Slice(i * write_table_batch_size, write_table_batch_size);
std::vector<std::shared_ptr<Array>> arrays = {sliced_array};
auto table = ::arrow::Table::Make(arrow_schema, arrays, write_table_batch_size);
ASSERT_OK_NO_THROW(
writer->WriteTable(*table, write_table_max_row_group_size, true));
}
ASSERT_OK_NO_THROW(writer->Close());
*num_row_groups = writer->metadata()->num_row_groups();
}

void ReadAndCheckSingleColumnFile(const Array& values, int num_row_groups) {
std::shared_ptr<Array> out;

Expand Down Expand Up @@ -5057,23 +5092,123 @@ class TestBufferedParquetIO : public TestParquetIO<TestType> {
TYPED_TEST_SUITE(TestBufferedParquetIO, TestTypes);

TYPED_TEST(TestBufferedParquetIO, SingleColumnOptionalBufferedWriteSmall) {
constexpr int64_t batch_size = SMALL_SIZE / 4;
constexpr size_t NUM_BATCHES = 4;
constexpr int64_t batch_size = SMALL_SIZE / NUM_BATCHES;
std::shared_ptr<Array> values;
ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
ASSERT_OK(
NullableArray<TypeParam>(SMALL_SIZE, /*num_nulls=*/10, kDefaultSeed, &values));
int num_row_groups = 0;
this->WriteBufferedFile(values, batch_size, &num_row_groups);
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, num_row_groups));
}

TYPED_TEST(TestBufferedParquetIO, SingleColumnOptionalBufferedWriteLarge) {
constexpr int64_t batch_size = LARGE_SIZE / 4;
constexpr size_t NUM_BATCHES = 4;
constexpr int64_t batch_size = LARGE_SIZE / NUM_BATCHES;
std::shared_ptr<Array> values;
ASSERT_OK(NullableArray<TypeParam>(LARGE_SIZE, 100, kDefaultSeed, &values));
ASSERT_OK(
NullableArray<TypeParam>(LARGE_SIZE, /*num_nulls=*/100, kDefaultSeed, &values));
int num_row_groups = 0;
this->WriteBufferedFile(values, batch_size, &num_row_groups);
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, num_row_groups));
}

TYPED_TEST(TestBufferedParquetIO, WriteTableSmall) {
std::shared_ptr<Array> values;
ASSERT_OK(
NullableArray<TypeParam>(SMALL_SIZE, /*num_nulls=*/10, kDefaultSeed, &values));
int num_row_groups = 0;
// Write all table with one batch.
int64_t write_table_batch_size = SMALL_SIZE;
int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
this->WriteBufferedTable(values, write_table_batch_size, write_table_max_row_group_size,
write_max_row_group_size, &num_row_groups);
EXPECT_EQ(1, num_row_groups);
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, num_row_groups));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there tests with more than one column somewhere?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. I'll add it

}

TYPED_TEST(TestBufferedParquetIO, WriteTableLarge) {
std::shared_ptr<Array> values;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also void repeating yourself in the tests below?

ASSERT_OK(
NullableArray<TypeParam>(LARGE_SIZE, /*num_nulls=*/100, kDefaultSeed, &values));
int num_row_groups = 0;
// Write all table with one batch.
int64_t write_table_batch_size = LARGE_SIZE;
int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
this->WriteBufferedTable(values, write_table_batch_size, write_table_max_row_group_size,
write_max_row_group_size, &num_row_groups);
EXPECT_EQ(1, num_row_groups);
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, num_row_groups));
}

TYPED_TEST(TestBufferedParquetIO, WriteTableInBatches) {
std::shared_ptr<Array> values;
constexpr size_t NUM_BATCHES = 4;
ASSERT_OK(
NullableArray<TypeParam>(SMALL_SIZE, /*num_nulls=*/10, kDefaultSeed, &values));
int num_row_groups = 0;
// Write all table with four batches.
int64_t write_table_batch_size = SMALL_SIZE / NUM_BATCHES;
int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
this->WriteBufferedTable(values, write_table_batch_size, write_table_max_row_group_size,
write_max_row_group_size, &num_row_groups);
EXPECT_EQ(1, num_row_groups);
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, num_row_groups));
}

TYPED_TEST(TestBufferedParquetIO, WriteTableWithRowGroupSizeLimit) {
constexpr size_t NUM_BATCHES = 4;
std::shared_ptr<Array> values;
ASSERT_OK(
NullableArray<TypeParam>(SMALL_SIZE, /*num_nulls=*/10, kDefaultSeed, &values));
int num_row_groups = 0;
// Write all table in one batch with a small max_row_group_size.
{
int64_t write_table_batch_size = SMALL_SIZE;
int64_t parquet_writer_max_row_group_size = SMALL_SIZE / NUM_BATCHES;
int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH;
this->WriteBufferedTable(values, write_table_batch_size,
parquet_writer_max_row_group_size, write_max_row_group_size,
&num_row_groups);
EXPECT_EQ(4, num_row_groups);
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, num_row_groups));
}
}

TYPED_TEST(TestBufferedParquetIO, WriteTableWithRowGroupSizeLimit2) {
constexpr size_t NUM_BATCHES = 4;
std::shared_ptr<Array> values;
ASSERT_OK(
NullableArray<TypeParam>(SMALL_SIZE, /*num_nulls=*/10, kDefaultSeed, &values));
int num_row_groups = 0;
int64_t write_table_batch_size = SMALL_SIZE;
int64_t parquet_writer_max_row_group_size = SMALL_SIZE;
int64_t write_max_row_group_size = SMALL_SIZE / NUM_BATCHES;
this->WriteBufferedTable(values, write_table_batch_size,
parquet_writer_max_row_group_size, write_max_row_group_size,
&num_row_groups);
EXPECT_EQ(4, num_row_groups);
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, num_row_groups));
}

TYPED_TEST(TestBufferedParquetIO, WriteTableInBatchesForMultiRowGroups) {
std::shared_ptr<Array> values;
ASSERT_OK(
NullableArray<TypeParam>(SMALL_SIZE, /*num_nulls=*/10, kDefaultSeed, &values));
int num_row_groups = 0;
int64_t write_table_batch_size = SMALL_SIZE / 10;
int64_t parquet_writer_max_row_group_size = SMALL_SIZE;
int64_t write_max_row_group_size = SMALL_SIZE / 4;
this->WriteBufferedTable(values, write_table_batch_size,
parquet_writer_max_row_group_size, write_max_row_group_size,
&num_row_groups);
EXPECT_EQ(4, num_row_groups);
ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, num_row_groups));
}

TEST(TestArrowReadWrite, WriteAndReadRecordBatch) {
auto pool = ::arrow::default_memory_pool();
auto sink = CreateOutputStream();
Expand Down
167 changes: 95 additions & 72 deletions cpp/src/parquet/arrow/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,16 @@ class ArrowColumnWriterV2 {
RowGroupWriter* row_group_writer_;
};

std::shared_ptr<ChunkedArray> GetColumnChunkedArray(const ::arrow::RecordBatch& value,
int column_id) {
return std::make_shared<::arrow::ChunkedArray>(value.column(column_id));
}

std::shared_ptr<ChunkedArray> GetColumnChunkedArray(const ::arrow::Table& value,
int column_id) {
return value.column(column_id);
}

} // namespace

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -356,7 +366,10 @@ class FileWriterImpl : public FileWriter {

std::shared_ptr<::arrow::Schema> schema() const override { return schema_; }

Status WriteTable(const Table& table, int64_t chunk_size) override {
template <typename T>
Status WriteBuffered(const T& batch, int64_t max_row_group_length);
wgtmac marked this conversation as resolved.
Show resolved Hide resolved

Status WriteTable(const Table& table, int64_t chunk_size, bool use_buffering) override {
RETURN_NOT_OK(table.Validate());

if (chunk_size <= 0 && table.num_rows() > 0) {
Expand All @@ -369,27 +382,10 @@ class FileWriterImpl : public FileWriter {
chunk_size = this->properties().max_row_group_length();
}

auto WriteRowGroup = [&](int64_t offset, int64_t size) {
RETURN_NOT_OK(NewRowGroup(size));
for (int i = 0; i < table.num_columns(); i++) {
RETURN_NOT_OK(WriteColumnChunk(table.column(i), offset, size));
}
return Status::OK();
};

if (table.num_rows() == 0) {
// Append a row group with 0 rows
RETURN_NOT_OK_ELSE(WriteRowGroup(0, 0), PARQUET_IGNORE_NOT_OK(Close()));
return Status::OK();
if (use_buffering) {
return WriteBuffered(table, chunk_size);
}

for (int chunk = 0; chunk * chunk_size < table.num_rows(); chunk++) {
int64_t offset = chunk * chunk_size;
RETURN_NOT_OK_ELSE(
WriteRowGroup(offset, std::min(chunk_size, table.num_rows() - offset)),
PARQUET_IGNORE_NOT_OK(Close()));
}
return Status::OK();
return WriteTableUnbuffered(table, chunk_size);
}

Status NewBufferedRowGroup() override {
Expand All @@ -406,57 +402,7 @@ class FileWriterImpl : public FileWriter {
}

// Max number of rows allowed in a row group.
const int64_t max_row_group_length = this->properties().max_row_group_length();

if (row_group_writer_ == nullptr || !row_group_writer_->buffered() ||
row_group_writer_->num_rows() >= max_row_group_length) {
RETURN_NOT_OK(NewBufferedRowGroup());
}

auto WriteBatch = [&](int64_t offset, int64_t size) {
std::vector<std::unique_ptr<ArrowColumnWriterV2>> writers;
int column_index_start = 0;

for (int i = 0; i < batch.num_columns(); i++) {
ChunkedArray chunked_array{batch.column(i)};
ARROW_ASSIGN_OR_RAISE(
std::unique_ptr<ArrowColumnWriterV2> writer,
ArrowColumnWriterV2::Make(chunked_array, offset, size, schema_manifest_,
row_group_writer_, column_index_start));
column_index_start += writer->leaf_count();
if (arrow_properties_->use_threads()) {
writers.emplace_back(std::move(writer));
} else {
RETURN_NOT_OK(writer->Write(&column_write_context_));
}
}

if (arrow_properties_->use_threads()) {
mapleFU marked this conversation as resolved.
Show resolved Hide resolved
DCHECK_EQ(parallel_column_write_contexts_.size(), writers.size());
RETURN_NOT_OK(::arrow::internal::ParallelFor(
static_cast<int>(writers.size()),
[&](int i) { return writers[i]->Write(&parallel_column_write_contexts_[i]); },
arrow_properties_->executor()));
}

return Status::OK();
};

int64_t offset = 0;
while (offset < batch.num_rows()) {
const int64_t batch_size =
std::min(max_row_group_length - row_group_writer_->num_rows(),
batch.num_rows() - offset);
RETURN_NOT_OK(WriteBatch(offset, batch_size));
offset += batch_size;

// Flush current row group if it is full.
if (row_group_writer_->num_rows() >= max_row_group_length) {
RETURN_NOT_OK(NewBufferedRowGroup());
}
}

return Status::OK();
return WriteBuffered(batch, this->properties().max_row_group_length());
}

const WriterProperties& properties() const { return *writer_->properties(); }
Expand All @@ -469,6 +415,30 @@ class FileWriterImpl : public FileWriter {
return writer_->metadata();
}

Status WriteTableUnbuffered(const Table& table, int64_t chunk_size) {
wgtmac marked this conversation as resolved.
Show resolved Hide resolved
auto WriteRowGroup = [&](int64_t offset, int64_t size) {
RETURN_NOT_OK(NewRowGroup(size));
for (int i = 0; i < table.num_columns(); i++) {
RETURN_NOT_OK(WriteColumnChunk(table.column(i), offset, size));
}
return Status::OK();
};

if (table.num_rows() == 0) {
// Append a row group with 0 rows
RETURN_NOT_OK_ELSE(WriteRowGroup(0, 0), PARQUET_IGNORE_NOT_OK(Close()));
return Status::OK();
}

for (int chunk = 0; chunk * chunk_size < table.num_rows(); chunk++) {
int64_t offset = chunk * chunk_size;
RETURN_NOT_OK_ELSE(
WriteRowGroup(offset, std::min(chunk_size, table.num_rows() - offset)),
PARQUET_IGNORE_NOT_OK(Close()));
pitrou marked this conversation as resolved.
Show resolved Hide resolved
}
return Status::OK();
}

private:
friend class FileWriter;

Expand All @@ -488,6 +458,59 @@ class FileWriterImpl : public FileWriter {
std::vector<ArrowWriteContext> parallel_column_write_contexts_;
};

template <typename T>
Status FileWriterImpl::WriteBuffered(const T& batch, int64_t max_row_group_length) {
if (row_group_writer_ == nullptr || !row_group_writer_->buffered() ||
row_group_writer_->num_rows() >= max_row_group_length) {
RETURN_NOT_OK(NewBufferedRowGroup());
}

auto WriteBatch = [&](int64_t offset, int64_t size) {
std::vector<std::unique_ptr<ArrowColumnWriterV2>> writers;
int column_index_start = 0;

for (int i = 0; i < batch.num_columns(); i++) {
std::shared_ptr<ChunkedArray> chunked_array = GetColumnChunkedArray(batch, i);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit wasteful to call this for each chunk. Why not use a RecordBatchReader instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::shared_ptr<ChunkedArray> GetColumnChunkedArray(const ::arrow::RecordBatch& value,
                                                    int column_id) {
  return std::make_shared<::arrow::ChunkedArray>(value.column(column_id));
}

std::shared_ptr<ChunkedArray> GetColumnChunkedArray(const ::arrow::Table& value,
                                                    int column_id) {
  return value.column(column_id);
}

Seems that here I want to call ArrowColumnWriterV2::Make, which requires a ChunkedArray, so maybe I'm not sure why this is expensive, and how RecordBatch can handling this...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, then you can simply call GetColumnChunkedArray(batch, i) outside of WriteBatch. It does not depend on offset/size.

ARROW_ASSIGN_OR_RAISE(
std::unique_ptr<ArrowColumnWriterV2> writer,
ArrowColumnWriterV2::Make(*chunked_array, offset, size, schema_manifest_,
row_group_writer_, column_index_start));
column_index_start += writer->leaf_count();
if (arrow_properties_->use_threads()) {
writers.emplace_back(std::move(writer));
} else {
RETURN_NOT_OK(writer->Write(&column_write_context_));
}
}

if (arrow_properties_->use_threads()) {
DCHECK_EQ(parallel_column_write_contexts_.size(), writers.size());
RETURN_NOT_OK(::arrow::internal::ParallelFor(
static_cast<int>(writers.size()),
[&](int i) { return writers[i]->Write(&parallel_column_write_contexts_[i]); },
arrow_properties_->executor()));
}

return Status::OK();
};

int64_t offset = 0;
while (offset < batch.num_rows()) {
const int64_t batch_size = std::min(
max_row_group_length - row_group_writer_->num_rows(), batch.num_rows() - offset);
RETURN_NOT_OK(WriteBatch(offset, batch_size));
offset += batch_size;

// Flush current row group if it is full.
if (row_group_writer_->num_rows() >= max_row_group_length &&
offset < batch.num_rows()) {
RETURN_NOT_OK(NewBufferedRowGroup());
}
}

return Status::OK();
}

FileWriter::~FileWriter() {}

Status FileWriter::Make(::arrow::MemoryPool* pool,
Expand Down
Loading