From 9593e728233e52a57a0d2a0fc61354a12442150e Mon Sep 17 00:00:00 2001 From: mwish Date: Thu, 29 Jun 2023 19:28:07 +0800 Subject: [PATCH 1/5] [add] Basic add writetable field --- cpp/src/parquet/arrow/writer.cc | 166 ++++++++++++++++++-------------- cpp/src/parquet/arrow/writer.h | 15 ++- 2 files changed, 107 insertions(+), 74 deletions(-) diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index 6d22f318f6b97..917d4cccffc1b 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -274,6 +274,16 @@ class ArrowColumnWriterV2 { RowGroupWriter* row_group_writer_; }; +std::shared_ptr getColumnChunkedArray(const ::arrow::RecordBatch& value, + int column_id) { + return std::make_shared<::arrow::ChunkedArray>(value.column(column_id)); +} + +std::shared_ptr getColumnChunkedArray(const ::arrow::Table& value, + int column_id) { + return value.column(column_id); +} + } // namespace // ---------------------------------------------------------------------- @@ -356,7 +366,10 @@ class FileWriterImpl : public FileWriter { std::shared_ptr<::arrow::Schema> schema() const override { return schema_; } - Status WriteTable(const Table& table, int64_t chunk_size) override { + template + Status WriteBuffered(const T& batch, int64_t max_row_group_length); + + Status WriteTable(const Table& table, int64_t chunk_size, bool use_buffering) override { RETURN_NOT_OK(table.Validate()); if (chunk_size <= 0 && table.num_rows() > 0) { @@ -369,27 +382,10 @@ class FileWriterImpl : public FileWriter { chunk_size = this->properties().max_row_group_length(); } - auto WriteRowGroup = [&](int64_t offset, int64_t size) { - RETURN_NOT_OK(NewRowGroup(size)); - for (int i = 0; i < table.num_columns(); i++) { - RETURN_NOT_OK(WriteColumnChunk(table.column(i), offset, size)); - } - return Status::OK(); - }; - - if (table.num_rows() == 0) { - // Append a row group with 0 rows - RETURN_NOT_OK_ELSE(WriteRowGroup(0, 0), PARQUET_IGNORE_NOT_OK(Close())); - return Status::OK(); - } - - for (int chunk = 0; chunk * chunk_size < table.num_rows(); chunk++) { - int64_t offset = chunk * chunk_size; - RETURN_NOT_OK_ELSE( - WriteRowGroup(offset, std::min(chunk_size, table.num_rows() - offset)), - PARQUET_IGNORE_NOT_OK(Close())); + if (use_buffering) { + return WriteBuffered(table, chunk_size); } - return Status::OK(); + return WriteTableUnbuffered(table, chunk_size); } Status NewBufferedRowGroup() override { @@ -406,57 +402,7 @@ class FileWriterImpl : public FileWriter { } // Max number of rows allowed in a row group. - const int64_t max_row_group_length = this->properties().max_row_group_length(); - - if (row_group_writer_ == nullptr || !row_group_writer_->buffered() || - row_group_writer_->num_rows() >= max_row_group_length) { - RETURN_NOT_OK(NewBufferedRowGroup()); - } - - auto WriteBatch = [&](int64_t offset, int64_t size) { - std::vector> writers; - int column_index_start = 0; - - for (int i = 0; i < batch.num_columns(); i++) { - ChunkedArray chunked_array{batch.column(i)}; - ARROW_ASSIGN_OR_RAISE( - std::unique_ptr writer, - ArrowColumnWriterV2::Make(chunked_array, offset, size, schema_manifest_, - row_group_writer_, column_index_start)); - column_index_start += writer->leaf_count(); - if (arrow_properties_->use_threads()) { - writers.emplace_back(std::move(writer)); - } else { - RETURN_NOT_OK(writer->Write(&column_write_context_)); - } - } - - if (arrow_properties_->use_threads()) { - DCHECK_EQ(parallel_column_write_contexts_.size(), writers.size()); - RETURN_NOT_OK(::arrow::internal::ParallelFor( - static_cast(writers.size()), - [&](int i) { return writers[i]->Write(¶llel_column_write_contexts_[i]); }, - arrow_properties_->executor())); - } - - return Status::OK(); - }; - - int64_t offset = 0; - while (offset < batch.num_rows()) { - const int64_t batch_size = - std::min(max_row_group_length - row_group_writer_->num_rows(), - batch.num_rows() - offset); - RETURN_NOT_OK(WriteBatch(offset, batch_size)); - offset += batch_size; - - // Flush current row group if it is full. - if (row_group_writer_->num_rows() >= max_row_group_length) { - RETURN_NOT_OK(NewBufferedRowGroup()); - } - } - - return Status::OK(); + return WriteBuffered(batch, this->properties().max_row_group_length()); } const WriterProperties& properties() const { return *writer_->properties(); } @@ -469,6 +415,30 @@ class FileWriterImpl : public FileWriter { return writer_->metadata(); } + Status WriteTableUnbuffered(const Table& table, int64_t chunk_size) { + auto WriteRowGroup = [&](int64_t offset, int64_t size) { + RETURN_NOT_OK(NewRowGroup(size)); + for (int i = 0; i < table.num_columns(); i++) { + RETURN_NOT_OK(WriteColumnChunk(table.column(i), offset, size)); + } + return Status::OK(); + }; + + if (table.num_rows() == 0) { + // Append a row group with 0 rows + RETURN_NOT_OK_ELSE(WriteRowGroup(0, 0), PARQUET_IGNORE_NOT_OK(Close())); + return Status::OK(); + } + + for (int chunk = 0; chunk * chunk_size < table.num_rows(); chunk++) { + int64_t offset = chunk * chunk_size; + RETURN_NOT_OK_ELSE( + WriteRowGroup(offset, std::min(chunk_size, table.num_rows() - offset)), + PARQUET_IGNORE_NOT_OK(Close())); + } + return Status::OK(); + } + private: friend class FileWriter; @@ -488,6 +458,58 @@ class FileWriterImpl : public FileWriter { std::vector parallel_column_write_contexts_; }; +template +Status FileWriterImpl::WriteBuffered(const T& batch, int64_t max_row_group_length) { + if (row_group_writer_ == nullptr || !row_group_writer_->buffered() || + row_group_writer_->num_rows() >= max_row_group_length) { + RETURN_NOT_OK(NewBufferedRowGroup()); + } + + auto WriteBatch = [&](int64_t offset, int64_t size) { + std::vector> writers; + int column_index_start = 0; + + for (int i = 0; i < batch.num_columns(); i++) { + std::shared_ptr chunked_array = getColumnChunkedArray(batch, i); + ARROW_ASSIGN_OR_RAISE( + std::unique_ptr writer, + ArrowColumnWriterV2::Make(*chunked_array, offset, size, schema_manifest_, + row_group_writer_, column_index_start)); + column_index_start += writer->leaf_count(); + if (arrow_properties_->use_threads()) { + writers.emplace_back(std::move(writer)); + } else { + RETURN_NOT_OK(writer->Write(&column_write_context_)); + } + } + + if (arrow_properties_->use_threads()) { + DCHECK_EQ(parallel_column_write_contexts_.size(), writers.size()); + RETURN_NOT_OK(::arrow::internal::ParallelFor( + static_cast(writers.size()), + [&](int i) { return writers[i]->Write(¶llel_column_write_contexts_[i]); }, + arrow_properties_->executor())); + } + + return Status::OK(); + }; + + int64_t offset = 0; + while (offset < batch.num_rows()) { + const int64_t batch_size = std::min( + max_row_group_length - row_group_writer_->num_rows(), batch.num_rows() - offset); + RETURN_NOT_OK(WriteBatch(offset, batch_size)); + offset += batch_size; + + // Flush current row group if it is full. + if (row_group_writer_->num_rows() >= max_row_group_length) { + RETURN_NOT_OK(NewBufferedRowGroup()); + } + } + + return Status::OK(); +} + FileWriter::~FileWriter() {} Status FileWriter::Make(::arrow::MemoryPool* pool, diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h index 1decafedc97fd..f0a0b1eec33be 100644 --- a/cpp/src/parquet/arrow/writer.h +++ b/cpp/src/parquet/arrow/writer.h @@ -91,10 +91,21 @@ class PARQUET_EXPORT FileWriter { /// \brief Write a Table to Parquet. /// + /// If `use_buffering` is false, then any pending row group is closed + /// at the beginning and at the end of this call. + /// If `use_buffering` is true, this function reuses an existing + /// buffered row group until the chunk size is met, and leaves + /// the last row group open for further writes. + /// It is recommended to set `use_buffering` to true to minimize + /// the number of row groups, especially when calling `WriteTable` + /// with small tables. + /// /// \param table Arrow table to write. /// \param chunk_size maximum number of rows to write per row group. - virtual ::arrow::Status WriteTable( - const ::arrow::Table& table, int64_t chunk_size = DEFAULT_MAX_ROW_GROUP_LENGTH) = 0; + /// \param use_buffering Whether to potentially buffer data. + virtual ::arrow::Status WriteTable(const ::arrow::Table& table, + int64_t chunk_size = DEFAULT_MAX_ROW_GROUP_LENGTH, + bool use_buffering = false) = 0; /// \brief Start a new row group. /// From c306c6c3f8c79761d4ddd93cd29a1d7f2de681e4 Mon Sep 17 00:00:00 2001 From: mwish Date: Thu, 29 Jun 2023 19:52:12 +0800 Subject: [PATCH 2/5] [add] Basic test --- .../parquet/arrow/arrow_reader_writer_test.cc | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 47570405c9131..1effef6255841 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -4980,6 +4980,34 @@ class TestBufferedParquetIO : public TestParquetIO { ASSERT_OK_NO_THROW(writer->Close()); } + void WriteBufferedTable(const std::shared_ptr& values, int64_t batch_size, + int* num_row_groups) { + std::shared_ptr schema = + MakeSimpleSchema(*values->type(), Repetition::OPTIONAL); + SchemaDescriptor descriptor; + ASSERT_NO_THROW(descriptor.Init(schema)); + std::shared_ptr<::arrow::Schema> arrow_schema; + ArrowReaderProperties props; + ASSERT_OK_NO_THROW(FromParquetSchema(&descriptor, props, &arrow_schema)); + + std::unique_ptr writer; + ASSERT_OK_NO_THROW(FileWriter::Make(::arrow::default_memory_pool(), + this->MakeWriter(schema), arrow_schema, + default_arrow_writer_properties(), &writer)); + *num_row_groups = 0; + for (int i = 0; i < 4; i++) { + if (i % 2 == 0) { + ASSERT_OK_NO_THROW(writer->NewBufferedRowGroup()); + (*num_row_groups)++; + } + std::shared_ptr sliced_array = values->Slice(i * batch_size, batch_size); + std::vector> arrays = {sliced_array}; + auto table = ::arrow::Table::Make(arrow_schema, arrays, batch_size); + ASSERT_OK_NO_THROW(writer->WriteTable(*table, DEFAULT_MAX_ROW_GROUP_LENGTH, true)); + } + ASSERT_OK_NO_THROW(writer->Close()); + } + void ReadAndCheckSingleColumnFile(const Array& values, int num_row_groups) { std::shared_ptr out; @@ -5030,6 +5058,24 @@ TYPED_TEST(TestBufferedParquetIO, SingleColumnOptionalBufferedWriteLarge) { ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, num_row_groups)); } +TYPED_TEST(TestBufferedParquetIO, SingleColumnOptionalBufferedWriteSmallTable) { + constexpr int64_t batch_size = SMALL_SIZE / 4; + std::shared_ptr values; + ASSERT_OK(NullableArray(SMALL_SIZE, 10, kDefaultSeed, &values)); + int num_row_groups = 0; + this->WriteBufferedTable(values, batch_size, &num_row_groups); + ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, num_row_groups)); +} + +TYPED_TEST(TestBufferedParquetIO, SingleColumnOptionalBufferedWriteLargeTable) { + constexpr int64_t batch_size = LARGE_SIZE / 4; + std::shared_ptr values; + ASSERT_OK(NullableArray(LARGE_SIZE, 100, kDefaultSeed, &values)); + int num_row_groups = 0; + this->WriteBufferedTable(values, batch_size, &num_row_groups); + ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, num_row_groups)); +} + TEST(TestArrowReadWrite, WriteAndReadRecordBatch) { auto pool = ::arrow::default_memory_pool(); auto sink = CreateOutputStream(); From 30271c787ee5cf5e1fd36ac514fb73cbdb97084f Mon Sep 17 00:00:00 2001 From: mwish Date: Fri, 30 Jun 2023 16:44:10 +0800 Subject: [PATCH 3/5] add tests --- .../parquet/arrow/arrow_reader_writer_test.cc | 111 +++++++++++++++--- cpp/src/parquet/arrow/writer.cc | 3 +- 2 files changed, 95 insertions(+), 19 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 1effef6255841..bb59b695b69aa 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -4980,8 +4980,10 @@ class TestBufferedParquetIO : public TestParquetIO { ASSERT_OK_NO_THROW(writer->Close()); } - void WriteBufferedTable(const std::shared_ptr& values, int64_t batch_size, - int* num_row_groups) { + void WriteBufferedTable(const std::shared_ptr& values, + int64_t write_table_batch_size, + int64_t parquet_writer_max_row_group_size, + int64_t write_table_max_row_group_size, int* num_row_groups) { std::shared_ptr schema = MakeSimpleSchema(*values->type(), Repetition::OPTIONAL); SchemaDescriptor descriptor; @@ -4990,22 +4992,26 @@ class TestBufferedParquetIO : public TestParquetIO { ArrowReaderProperties props; ASSERT_OK_NO_THROW(FromParquetSchema(&descriptor, props, &arrow_schema)); + parquet::WriterProperties::Builder props_builder; + props_builder.max_row_group_length(parquet_writer_max_row_group_size); + + this->sink_ = CreateOutputStream(); + auto low_level_writer = + ParquetFileWriter::Open(this->sink_, schema, props_builder.build()); std::unique_ptr writer; ASSERT_OK_NO_THROW(FileWriter::Make(::arrow::default_memory_pool(), - this->MakeWriter(schema), arrow_schema, + std::move(low_level_writer), arrow_schema, default_arrow_writer_properties(), &writer)); - *num_row_groups = 0; - for (int i = 0; i < 4; i++) { - if (i % 2 == 0) { - ASSERT_OK_NO_THROW(writer->NewBufferedRowGroup()); - (*num_row_groups)++; - } - std::shared_ptr sliced_array = values->Slice(i * batch_size, batch_size); + for (int i = 0; i * write_table_batch_size < values->length(); i++) { + std::shared_ptr sliced_array = + values->Slice(i * write_table_batch_size, write_table_batch_size); std::vector> arrays = {sliced_array}; - auto table = ::arrow::Table::Make(arrow_schema, arrays, batch_size); - ASSERT_OK_NO_THROW(writer->WriteTable(*table, DEFAULT_MAX_ROW_GROUP_LENGTH, true)); + auto table = ::arrow::Table::Make(arrow_schema, arrays, write_table_batch_size); + ASSERT_OK_NO_THROW( + writer->WriteTable(*table, write_table_max_row_group_size, true)); } ASSERT_OK_NO_THROW(writer->Close()); + *num_row_groups = writer->metadata()->num_row_groups(); } void ReadAndCheckSingleColumnFile(const Array& values, int num_row_groups) { @@ -5058,24 +5064,93 @@ TYPED_TEST(TestBufferedParquetIO, SingleColumnOptionalBufferedWriteLarge) { ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, num_row_groups)); } -TYPED_TEST(TestBufferedParquetIO, SingleColumnOptionalBufferedWriteSmallTable) { - constexpr int64_t batch_size = SMALL_SIZE / 4; +TYPED_TEST(TestBufferedParquetIO, WriteTableBase) { std::shared_ptr values; ASSERT_OK(NullableArray(SMALL_SIZE, 10, kDefaultSeed, &values)); int num_row_groups = 0; - this->WriteBufferedTable(values, batch_size, &num_row_groups); + // Write all table with one batch. + int64_t write_table_batch_size = SMALL_SIZE; + int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH; + int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH; + this->WriteBufferedTable(values, write_table_batch_size, write_table_max_row_group_size, + write_max_row_group_size, &num_row_groups); + EXPECT_EQ(1, num_row_groups); ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, num_row_groups)); } -TYPED_TEST(TestBufferedParquetIO, SingleColumnOptionalBufferedWriteLargeTable) { - constexpr int64_t batch_size = LARGE_SIZE / 4; +TYPED_TEST(TestBufferedParquetIO, WriteTableLarge) { std::shared_ptr values; ASSERT_OK(NullableArray(LARGE_SIZE, 100, kDefaultSeed, &values)); int num_row_groups = 0; - this->WriteBufferedTable(values, batch_size, &num_row_groups); + // Write all table with one batch. + int64_t write_table_batch_size = LARGE_SIZE; + int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH; + int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH; + this->WriteBufferedTable(values, write_table_batch_size, write_table_max_row_group_size, + write_max_row_group_size, &num_row_groups); + EXPECT_EQ(1, num_row_groups); ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, num_row_groups)); } +TYPED_TEST(TestBufferedParquetIO, WriteTableBatches) { + std::shared_ptr values; + ASSERT_OK(NullableArray(SMALL_SIZE, 10, kDefaultSeed, &values)); + int num_row_groups = 0; + // Write all table with four batches. + int64_t write_table_batch_size = SMALL_SIZE / 4; + int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH; + int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH; + this->WriteBufferedTable(values, write_table_batch_size, write_table_max_row_group_size, + write_max_row_group_size, &num_row_groups); + EXPECT_EQ(1, num_row_groups); + ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, num_row_groups)); +} + +TYPED_TEST(TestBufferedParquetIO, WriteTableLimit) { + std::shared_ptr values; + ASSERT_OK(NullableArray(SMALL_SIZE, 10, kDefaultSeed, &values)); + int num_row_groups = 0; + // Write all table with one batches, but set parquet max-row-group size smaller + { + int64_t write_table_batch_size = SMALL_SIZE; + int64_t parquet_writer_max_row_group_size = SMALL_SIZE / 4; + int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH; + this->WriteBufferedTable(values, write_table_batch_size, + parquet_writer_max_row_group_size, write_max_row_group_size, + &num_row_groups); + EXPECT_EQ(4, num_row_groups); + ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, num_row_groups)); + } +} + +TYPED_TEST(TestBufferedParquetIO, WriteTableLimit2) { + std::shared_ptr values; + ASSERT_OK(NullableArray(SMALL_SIZE, 10, kDefaultSeed, &values)); + int num_row_groups = 0; + int64_t write_table_batch_size = SMALL_SIZE; + int64_t parquet_writer_max_row_group_size = SMALL_SIZE; + int64_t write_max_row_group_size = SMALL_SIZE / 4; + this->WriteBufferedTable(values, write_table_batch_size, + parquet_writer_max_row_group_size, write_max_row_group_size, + &num_row_groups); + EXPECT_EQ(4, num_row_groups); + ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, num_row_groups)); +} + +TYPED_TEST(TestBufferedParquetIO, WriteTableBatchesLimit) { + std::shared_ptr values; + ASSERT_OK(NullableArray(SMALL_SIZE, 10, kDefaultSeed, &values)); + int num_row_groups = 0; + int64_t write_table_batch_size = SMALL_SIZE / 10; + int64_t parquet_writer_max_row_group_size = SMALL_SIZE; + int64_t write_max_row_group_size = SMALL_SIZE / 4; + this->WriteBufferedTable(values, write_table_batch_size, + parquet_writer_max_row_group_size, write_max_row_group_size, + &num_row_groups); + EXPECT_EQ(4, num_row_groups); + ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, num_row_groups)); +} + TEST(TestArrowReadWrite, WriteAndReadRecordBatch) { auto pool = ::arrow::default_memory_pool(); auto sink = CreateOutputStream(); diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index 917d4cccffc1b..99a1f688ab557 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -502,7 +502,8 @@ Status FileWriterImpl::WriteBuffered(const T& batch, int64_t max_row_group_lengt offset += batch_size; // Flush current row group if it is full. - if (row_group_writer_->num_rows() >= max_row_group_length) { + if (row_group_writer_->num_rows() >= max_row_group_length && + offset < batch.num_rows()) { RETURN_NOT_OK(NewBufferedRowGroup()); } } From b9ff92157e18b3380e61ba5d7a29b0e558dd20ad Mon Sep 17 00:00:00 2001 From: mwish Date: Sat, 8 Jul 2023 23:37:37 +0800 Subject: [PATCH 4/5] fix comment --- .../parquet/arrow/arrow_reader_writer_test.cc | 56 ++++++++++++------- cpp/src/parquet/arrow/writer.cc | 6 +- cpp/src/parquet/arrow/writer.h | 5 ++ 3 files changed, 43 insertions(+), 24 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 964a378ddc985..205720d8e3bbb 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -5026,7 +5026,7 @@ class TestBufferedParquetIO : public TestParquetIO { void WriteBufferedTable(const std::shared_ptr& values, int64_t write_table_batch_size, - int64_t parquet_writer_max_row_group_size, + int64_t writer_properties_max_row_group_size, int64_t write_table_max_row_group_size, int* num_row_groups) { std::shared_ptr schema = MakeSimpleSchema(*values->type(), Repetition::OPTIONAL); @@ -5037,7 +5037,7 @@ class TestBufferedParquetIO : public TestParquetIO { ASSERT_OK_NO_THROW(FromParquetSchema(&descriptor, props, &arrow_schema)); parquet::WriterProperties::Builder props_builder; - props_builder.max_row_group_length(parquet_writer_max_row_group_size); + props_builder.max_row_group_length(writer_properties_max_row_group_size); this->sink_ = CreateOutputStream(); auto low_level_writer = @@ -5046,6 +5046,7 @@ class TestBufferedParquetIO : public TestParquetIO { ASSERT_OK_NO_THROW(FileWriter::Make(::arrow::default_memory_pool(), std::move(low_level_writer), arrow_schema, default_arrow_writer_properties(), &writer)); + EXPECT_TRUE(values->length() % write_table_batch_size == 0); for (int i = 0; i * write_table_batch_size < values->length(); i++) { std::shared_ptr sliced_array = values->Slice(i * write_table_batch_size, write_table_batch_size); @@ -5091,26 +5092,31 @@ class TestBufferedParquetIO : public TestParquetIO { TYPED_TEST_SUITE(TestBufferedParquetIO, TestTypes); TYPED_TEST(TestBufferedParquetIO, SingleColumnOptionalBufferedWriteSmall) { - constexpr int64_t batch_size = SMALL_SIZE / 4; + constexpr size_t NUM_BATCHES = 4; + constexpr int64_t batch_size = SMALL_SIZE / NUM_BATCHES; std::shared_ptr values; - ASSERT_OK(NullableArray(SMALL_SIZE, 10, kDefaultSeed, &values)); + ASSERT_OK( + NullableArray(SMALL_SIZE, /*num_nulls=*/10, kDefaultSeed, &values)); int num_row_groups = 0; this->WriteBufferedFile(values, batch_size, &num_row_groups); ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, num_row_groups)); } TYPED_TEST(TestBufferedParquetIO, SingleColumnOptionalBufferedWriteLarge) { - constexpr int64_t batch_size = LARGE_SIZE / 4; + constexpr size_t NUM_BATCHES = 4; + constexpr int64_t batch_size = LARGE_SIZE / NUM_BATCHES; std::shared_ptr values; - ASSERT_OK(NullableArray(LARGE_SIZE, 100, kDefaultSeed, &values)); + ASSERT_OK( + NullableArray(LARGE_SIZE, /*num_nulls=*/100, kDefaultSeed, &values)); int num_row_groups = 0; this->WriteBufferedFile(values, batch_size, &num_row_groups); ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, num_row_groups)); } -TYPED_TEST(TestBufferedParquetIO, WriteTableBase) { +TYPED_TEST(TestBufferedParquetIO, WriteTableSmall) { std::shared_ptr values; - ASSERT_OK(NullableArray(SMALL_SIZE, 10, kDefaultSeed, &values)); + ASSERT_OK( + NullableArray(SMALL_SIZE, /*num_nulls=*/10, kDefaultSeed, &values)); int num_row_groups = 0; // Write all table with one batch. int64_t write_table_batch_size = SMALL_SIZE; @@ -5124,7 +5130,8 @@ TYPED_TEST(TestBufferedParquetIO, WriteTableBase) { TYPED_TEST(TestBufferedParquetIO, WriteTableLarge) { std::shared_ptr values; - ASSERT_OK(NullableArray(LARGE_SIZE, 100, kDefaultSeed, &values)); + ASSERT_OK( + NullableArray(LARGE_SIZE, /*num_nulls=*/100, kDefaultSeed, &values)); int num_row_groups = 0; // Write all table with one batch. int64_t write_table_batch_size = LARGE_SIZE; @@ -5136,12 +5143,14 @@ TYPED_TEST(TestBufferedParquetIO, WriteTableLarge) { ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, num_row_groups)); } -TYPED_TEST(TestBufferedParquetIO, WriteTableBatches) { +TYPED_TEST(TestBufferedParquetIO, WriteTableInBatches) { std::shared_ptr values; - ASSERT_OK(NullableArray(SMALL_SIZE, 10, kDefaultSeed, &values)); + constexpr size_t NUM_BATCHES = 4; + ASSERT_OK( + NullableArray(SMALL_SIZE, /*num_nulls=*/10, kDefaultSeed, &values)); int num_row_groups = 0; // Write all table with four batches. - int64_t write_table_batch_size = SMALL_SIZE / 4; + int64_t write_table_batch_size = SMALL_SIZE / NUM_BATCHES; int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH; int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH; this->WriteBufferedTable(values, write_table_batch_size, write_table_max_row_group_size, @@ -5150,14 +5159,16 @@ TYPED_TEST(TestBufferedParquetIO, WriteTableBatches) { ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, num_row_groups)); } -TYPED_TEST(TestBufferedParquetIO, WriteTableLimit) { +TYPED_TEST(TestBufferedParquetIO, WriteTableWithRowGroupSizeLimit) { + constexpr size_t NUM_BATCHES = 4; std::shared_ptr values; - ASSERT_OK(NullableArray(SMALL_SIZE, 10, kDefaultSeed, &values)); + ASSERT_OK( + NullableArray(SMALL_SIZE, /*num_nulls=*/10, kDefaultSeed, &values)); int num_row_groups = 0; - // Write all table with one batches, but set parquet max-row-group size smaller + // Write all table in one batch with a small max_row_group_size. { int64_t write_table_batch_size = SMALL_SIZE; - int64_t parquet_writer_max_row_group_size = SMALL_SIZE / 4; + int64_t parquet_writer_max_row_group_size = SMALL_SIZE / NUM_BATCHES; int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH; this->WriteBufferedTable(values, write_table_batch_size, parquet_writer_max_row_group_size, write_max_row_group_size, @@ -5167,13 +5178,15 @@ TYPED_TEST(TestBufferedParquetIO, WriteTableLimit) { } } -TYPED_TEST(TestBufferedParquetIO, WriteTableLimit2) { +TYPED_TEST(TestBufferedParquetIO, WriteTableWithRowGroupSizeLimit2) { + constexpr size_t NUM_BATCHES = 4; std::shared_ptr values; - ASSERT_OK(NullableArray(SMALL_SIZE, 10, kDefaultSeed, &values)); + ASSERT_OK( + NullableArray(SMALL_SIZE, /*num_nulls=*/10, kDefaultSeed, &values)); int num_row_groups = 0; int64_t write_table_batch_size = SMALL_SIZE; int64_t parquet_writer_max_row_group_size = SMALL_SIZE; - int64_t write_max_row_group_size = SMALL_SIZE / 4; + int64_t write_max_row_group_size = SMALL_SIZE / NUM_BATCHES; this->WriteBufferedTable(values, write_table_batch_size, parquet_writer_max_row_group_size, write_max_row_group_size, &num_row_groups); @@ -5181,9 +5194,10 @@ TYPED_TEST(TestBufferedParquetIO, WriteTableLimit2) { ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, num_row_groups)); } -TYPED_TEST(TestBufferedParquetIO, WriteTableBatchesLimit) { +TYPED_TEST(TestBufferedParquetIO, WriteTableInBatchesForMultiRowGroups) { std::shared_ptr values; - ASSERT_OK(NullableArray(SMALL_SIZE, 10, kDefaultSeed, &values)); + ASSERT_OK( + NullableArray(SMALL_SIZE, /*num_nulls=*/10, kDefaultSeed, &values)); int num_row_groups = 0; int64_t write_table_batch_size = SMALL_SIZE / 10; int64_t parquet_writer_max_row_group_size = SMALL_SIZE; diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index 99a1f688ab557..3703a920d2670 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -274,12 +274,12 @@ class ArrowColumnWriterV2 { RowGroupWriter* row_group_writer_; }; -std::shared_ptr getColumnChunkedArray(const ::arrow::RecordBatch& value, +std::shared_ptr GetColumnChunkedArray(const ::arrow::RecordBatch& value, int column_id) { return std::make_shared<::arrow::ChunkedArray>(value.column(column_id)); } -std::shared_ptr getColumnChunkedArray(const ::arrow::Table& value, +std::shared_ptr GetColumnChunkedArray(const ::arrow::Table& value, int column_id) { return value.column(column_id); } @@ -470,7 +470,7 @@ Status FileWriterImpl::WriteBuffered(const T& batch, int64_t max_row_group_lengt int column_index_start = 0; for (int i = 0; i < batch.num_columns(); i++) { - std::shared_ptr chunked_array = getColumnChunkedArray(batch, i); + std::shared_ptr chunked_array = GetColumnChunkedArray(batch, i); ARROW_ASSIGN_OR_RAISE( std::unique_ptr writer, ArrowColumnWriterV2::Make(*chunked_array, offset, size, schema_manifest_, diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h index f0a0b1eec33be..f34309fb73055 100644 --- a/cpp/src/parquet/arrow/writer.h +++ b/cpp/src/parquet/arrow/writer.h @@ -100,6 +100,11 @@ class PARQUET_EXPORT FileWriter { /// the number of row groups, especially when calling `WriteTable` /// with small tables. /// + /// WARNING: If you are writing multiple files in parallel in the same + /// executor, deadlock may occur if ArrowWriterProperties::use_threads + /// is set to true to write columns in parallel. Please disable use_threads + /// option in this case. + /// /// \param table Arrow table to write. /// \param chunk_size maximum number of rows to write per row group. /// \param use_buffering Whether to potentially buffer data. From 00aedf151a8f0c55ace65eb369dcb0214e6a68a6 Mon Sep 17 00:00:00 2001 From: mwish Date: Sat, 8 Jul 2023 23:37:37 +0800 Subject: [PATCH 5/5] fix comment --- .../parquet/arrow/arrow_reader_writer_test.cc | 56 ++++++++++++------- cpp/src/parquet/arrow/writer.cc | 6 +- cpp/src/parquet/arrow/writer.h | 5 ++ 3 files changed, 43 insertions(+), 24 deletions(-) diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 964a378ddc985..f5d6ddcd4fbdc 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -5026,7 +5026,7 @@ class TestBufferedParquetIO : public TestParquetIO { void WriteBufferedTable(const std::shared_ptr& values, int64_t write_table_batch_size, - int64_t parquet_writer_max_row_group_size, + int64_t writer_properties_max_row_group_size, int64_t write_table_max_row_group_size, int* num_row_groups) { std::shared_ptr schema = MakeSimpleSchema(*values->type(), Repetition::OPTIONAL); @@ -5037,7 +5037,7 @@ class TestBufferedParquetIO : public TestParquetIO { ASSERT_OK_NO_THROW(FromParquetSchema(&descriptor, props, &arrow_schema)); parquet::WriterProperties::Builder props_builder; - props_builder.max_row_group_length(parquet_writer_max_row_group_size); + props_builder.max_row_group_length(writer_properties_max_row_group_size); this->sink_ = CreateOutputStream(); auto low_level_writer = @@ -5046,6 +5046,7 @@ class TestBufferedParquetIO : public TestParquetIO { ASSERT_OK_NO_THROW(FileWriter::Make(::arrow::default_memory_pool(), std::move(low_level_writer), arrow_schema, default_arrow_writer_properties(), &writer)); + EXPECT_EQ(0, values->length() % write_table_batch_size); for (int i = 0; i * write_table_batch_size < values->length(); i++) { std::shared_ptr sliced_array = values->Slice(i * write_table_batch_size, write_table_batch_size); @@ -5091,26 +5092,31 @@ class TestBufferedParquetIO : public TestParquetIO { TYPED_TEST_SUITE(TestBufferedParquetIO, TestTypes); TYPED_TEST(TestBufferedParquetIO, SingleColumnOptionalBufferedWriteSmall) { - constexpr int64_t batch_size = SMALL_SIZE / 4; + constexpr size_t NUM_BATCHES = 4; + constexpr int64_t batch_size = SMALL_SIZE / NUM_BATCHES; std::shared_ptr values; - ASSERT_OK(NullableArray(SMALL_SIZE, 10, kDefaultSeed, &values)); + ASSERT_OK( + NullableArray(SMALL_SIZE, /*num_nulls=*/10, kDefaultSeed, &values)); int num_row_groups = 0; this->WriteBufferedFile(values, batch_size, &num_row_groups); ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, num_row_groups)); } TYPED_TEST(TestBufferedParquetIO, SingleColumnOptionalBufferedWriteLarge) { - constexpr int64_t batch_size = LARGE_SIZE / 4; + constexpr size_t NUM_BATCHES = 4; + constexpr int64_t batch_size = LARGE_SIZE / NUM_BATCHES; std::shared_ptr values; - ASSERT_OK(NullableArray(LARGE_SIZE, 100, kDefaultSeed, &values)); + ASSERT_OK( + NullableArray(LARGE_SIZE, /*num_nulls=*/100, kDefaultSeed, &values)); int num_row_groups = 0; this->WriteBufferedFile(values, batch_size, &num_row_groups); ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, num_row_groups)); } -TYPED_TEST(TestBufferedParquetIO, WriteTableBase) { +TYPED_TEST(TestBufferedParquetIO, WriteTableSmall) { std::shared_ptr values; - ASSERT_OK(NullableArray(SMALL_SIZE, 10, kDefaultSeed, &values)); + ASSERT_OK( + NullableArray(SMALL_SIZE, /*num_nulls=*/10, kDefaultSeed, &values)); int num_row_groups = 0; // Write all table with one batch. int64_t write_table_batch_size = SMALL_SIZE; @@ -5124,7 +5130,8 @@ TYPED_TEST(TestBufferedParquetIO, WriteTableBase) { TYPED_TEST(TestBufferedParquetIO, WriteTableLarge) { std::shared_ptr values; - ASSERT_OK(NullableArray(LARGE_SIZE, 100, kDefaultSeed, &values)); + ASSERT_OK( + NullableArray(LARGE_SIZE, /*num_nulls=*/100, kDefaultSeed, &values)); int num_row_groups = 0; // Write all table with one batch. int64_t write_table_batch_size = LARGE_SIZE; @@ -5136,12 +5143,14 @@ TYPED_TEST(TestBufferedParquetIO, WriteTableLarge) { ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, num_row_groups)); } -TYPED_TEST(TestBufferedParquetIO, WriteTableBatches) { +TYPED_TEST(TestBufferedParquetIO, WriteTableInBatches) { std::shared_ptr values; - ASSERT_OK(NullableArray(SMALL_SIZE, 10, kDefaultSeed, &values)); + constexpr size_t NUM_BATCHES = 4; + ASSERT_OK( + NullableArray(SMALL_SIZE, /*num_nulls=*/10, kDefaultSeed, &values)); int num_row_groups = 0; // Write all table with four batches. - int64_t write_table_batch_size = SMALL_SIZE / 4; + int64_t write_table_batch_size = SMALL_SIZE / NUM_BATCHES; int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH; int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH; this->WriteBufferedTable(values, write_table_batch_size, write_table_max_row_group_size, @@ -5150,14 +5159,16 @@ TYPED_TEST(TestBufferedParquetIO, WriteTableBatches) { ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, num_row_groups)); } -TYPED_TEST(TestBufferedParquetIO, WriteTableLimit) { +TYPED_TEST(TestBufferedParquetIO, WriteTableWithRowGroupSizeLimit) { + constexpr size_t NUM_BATCHES = 4; std::shared_ptr values; - ASSERT_OK(NullableArray(SMALL_SIZE, 10, kDefaultSeed, &values)); + ASSERT_OK( + NullableArray(SMALL_SIZE, /*num_nulls=*/10, kDefaultSeed, &values)); int num_row_groups = 0; - // Write all table with one batches, but set parquet max-row-group size smaller + // Write all table in one batch with a small max_row_group_size. { int64_t write_table_batch_size = SMALL_SIZE; - int64_t parquet_writer_max_row_group_size = SMALL_SIZE / 4; + int64_t parquet_writer_max_row_group_size = SMALL_SIZE / NUM_BATCHES; int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH; this->WriteBufferedTable(values, write_table_batch_size, parquet_writer_max_row_group_size, write_max_row_group_size, @@ -5167,13 +5178,15 @@ TYPED_TEST(TestBufferedParquetIO, WriteTableLimit) { } } -TYPED_TEST(TestBufferedParquetIO, WriteTableLimit2) { +TYPED_TEST(TestBufferedParquetIO, WriteTableWithRowGroupSizeLimit2) { + constexpr size_t NUM_BATCHES = 4; std::shared_ptr values; - ASSERT_OK(NullableArray(SMALL_SIZE, 10, kDefaultSeed, &values)); + ASSERT_OK( + NullableArray(SMALL_SIZE, /*num_nulls=*/10, kDefaultSeed, &values)); int num_row_groups = 0; int64_t write_table_batch_size = SMALL_SIZE; int64_t parquet_writer_max_row_group_size = SMALL_SIZE; - int64_t write_max_row_group_size = SMALL_SIZE / 4; + int64_t write_max_row_group_size = SMALL_SIZE / NUM_BATCHES; this->WriteBufferedTable(values, write_table_batch_size, parquet_writer_max_row_group_size, write_max_row_group_size, &num_row_groups); @@ -5181,9 +5194,10 @@ TYPED_TEST(TestBufferedParquetIO, WriteTableLimit2) { ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, num_row_groups)); } -TYPED_TEST(TestBufferedParquetIO, WriteTableBatchesLimit) { +TYPED_TEST(TestBufferedParquetIO, WriteTableInBatchesForMultiRowGroups) { std::shared_ptr values; - ASSERT_OK(NullableArray(SMALL_SIZE, 10, kDefaultSeed, &values)); + ASSERT_OK( + NullableArray(SMALL_SIZE, /*num_nulls=*/10, kDefaultSeed, &values)); int num_row_groups = 0; int64_t write_table_batch_size = SMALL_SIZE / 10; int64_t parquet_writer_max_row_group_size = SMALL_SIZE; diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index 99a1f688ab557..3703a920d2670 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -274,12 +274,12 @@ class ArrowColumnWriterV2 { RowGroupWriter* row_group_writer_; }; -std::shared_ptr getColumnChunkedArray(const ::arrow::RecordBatch& value, +std::shared_ptr GetColumnChunkedArray(const ::arrow::RecordBatch& value, int column_id) { return std::make_shared<::arrow::ChunkedArray>(value.column(column_id)); } -std::shared_ptr getColumnChunkedArray(const ::arrow::Table& value, +std::shared_ptr GetColumnChunkedArray(const ::arrow::Table& value, int column_id) { return value.column(column_id); } @@ -470,7 +470,7 @@ Status FileWriterImpl::WriteBuffered(const T& batch, int64_t max_row_group_lengt int column_index_start = 0; for (int i = 0; i < batch.num_columns(); i++) { - std::shared_ptr chunked_array = getColumnChunkedArray(batch, i); + std::shared_ptr chunked_array = GetColumnChunkedArray(batch, i); ARROW_ASSIGN_OR_RAISE( std::unique_ptr writer, ArrowColumnWriterV2::Make(*chunked_array, offset, size, schema_manifest_, diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h index f0a0b1eec33be..f34309fb73055 100644 --- a/cpp/src/parquet/arrow/writer.h +++ b/cpp/src/parquet/arrow/writer.h @@ -100,6 +100,11 @@ class PARQUET_EXPORT FileWriter { /// the number of row groups, especially when calling `WriteTable` /// with small tables. /// + /// WARNING: If you are writing multiple files in parallel in the same + /// executor, deadlock may occur if ArrowWriterProperties::use_threads + /// is set to true to write columns in parallel. Please disable use_threads + /// option in this case. + /// /// \param table Arrow table to write. /// \param chunk_size maximum number of rows to write per row group. /// \param use_buffering Whether to potentially buffer data.