diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 69827d5c464b9..f5d6ddcd4fbdc 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -5024,6 +5024,41 @@ class TestBufferedParquetIO : public TestParquetIO { ASSERT_OK_NO_THROW(writer->Close()); } + void WriteBufferedTable(const std::shared_ptr& values, + int64_t write_table_batch_size, + int64_t writer_properties_max_row_group_size, + int64_t write_table_max_row_group_size, int* num_row_groups) { + std::shared_ptr schema = + MakeSimpleSchema(*values->type(), Repetition::OPTIONAL); + SchemaDescriptor descriptor; + ASSERT_NO_THROW(descriptor.Init(schema)); + std::shared_ptr<::arrow::Schema> arrow_schema; + ArrowReaderProperties props; + ASSERT_OK_NO_THROW(FromParquetSchema(&descriptor, props, &arrow_schema)); + + parquet::WriterProperties::Builder props_builder; + props_builder.max_row_group_length(writer_properties_max_row_group_size); + + this->sink_ = CreateOutputStream(); + auto low_level_writer = + ParquetFileWriter::Open(this->sink_, schema, props_builder.build()); + std::unique_ptr writer; + ASSERT_OK_NO_THROW(FileWriter::Make(::arrow::default_memory_pool(), + std::move(low_level_writer), arrow_schema, + default_arrow_writer_properties(), &writer)); + EXPECT_EQ(0, values->length() % write_table_batch_size); + for (int i = 0; i * write_table_batch_size < values->length(); i++) { + std::shared_ptr sliced_array = + values->Slice(i * write_table_batch_size, write_table_batch_size); + std::vector> arrays = {sliced_array}; + auto table = ::arrow::Table::Make(arrow_schema, arrays, write_table_batch_size); + ASSERT_OK_NO_THROW( + writer->WriteTable(*table, write_table_max_row_group_size, true)); + } + ASSERT_OK_NO_THROW(writer->Close()); + *num_row_groups = writer->metadata()->num_row_groups(); + } + void ReadAndCheckSingleColumnFile(const Array& values, int num_row_groups) { std::shared_ptr out; @@ -5057,23 +5092,123 @@ class TestBufferedParquetIO : public TestParquetIO { TYPED_TEST_SUITE(TestBufferedParquetIO, TestTypes); TYPED_TEST(TestBufferedParquetIO, SingleColumnOptionalBufferedWriteSmall) { - constexpr int64_t batch_size = SMALL_SIZE / 4; + constexpr size_t NUM_BATCHES = 4; + constexpr int64_t batch_size = SMALL_SIZE / NUM_BATCHES; std::shared_ptr values; - ASSERT_OK(NullableArray(SMALL_SIZE, 10, kDefaultSeed, &values)); + ASSERT_OK( + NullableArray(SMALL_SIZE, /*num_nulls=*/10, kDefaultSeed, &values)); int num_row_groups = 0; this->WriteBufferedFile(values, batch_size, &num_row_groups); ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, num_row_groups)); } TYPED_TEST(TestBufferedParquetIO, SingleColumnOptionalBufferedWriteLarge) { - constexpr int64_t batch_size = LARGE_SIZE / 4; + constexpr size_t NUM_BATCHES = 4; + constexpr int64_t batch_size = LARGE_SIZE / NUM_BATCHES; std::shared_ptr values; - ASSERT_OK(NullableArray(LARGE_SIZE, 100, kDefaultSeed, &values)); + ASSERT_OK( + NullableArray(LARGE_SIZE, /*num_nulls=*/100, kDefaultSeed, &values)); int num_row_groups = 0; this->WriteBufferedFile(values, batch_size, &num_row_groups); ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, num_row_groups)); } +TYPED_TEST(TestBufferedParquetIO, WriteTableSmall) { + std::shared_ptr values; + ASSERT_OK( + NullableArray(SMALL_SIZE, /*num_nulls=*/10, kDefaultSeed, &values)); + int num_row_groups = 0; + // Write all table with one batch. + int64_t write_table_batch_size = SMALL_SIZE; + int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH; + int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH; + this->WriteBufferedTable(values, write_table_batch_size, write_table_max_row_group_size, + write_max_row_group_size, &num_row_groups); + EXPECT_EQ(1, num_row_groups); + ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, num_row_groups)); +} + +TYPED_TEST(TestBufferedParquetIO, WriteTableLarge) { + std::shared_ptr values; + ASSERT_OK( + NullableArray(LARGE_SIZE, /*num_nulls=*/100, kDefaultSeed, &values)); + int num_row_groups = 0; + // Write all table with one batch. + int64_t write_table_batch_size = LARGE_SIZE; + int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH; + int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH; + this->WriteBufferedTable(values, write_table_batch_size, write_table_max_row_group_size, + write_max_row_group_size, &num_row_groups); + EXPECT_EQ(1, num_row_groups); + ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnTable(values, num_row_groups)); +} + +TYPED_TEST(TestBufferedParquetIO, WriteTableInBatches) { + std::shared_ptr values; + constexpr size_t NUM_BATCHES = 4; + ASSERT_OK( + NullableArray(SMALL_SIZE, /*num_nulls=*/10, kDefaultSeed, &values)); + int num_row_groups = 0; + // Write all table with four batches. + int64_t write_table_batch_size = SMALL_SIZE / NUM_BATCHES; + int64_t write_table_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH; + int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH; + this->WriteBufferedTable(values, write_table_batch_size, write_table_max_row_group_size, + write_max_row_group_size, &num_row_groups); + EXPECT_EQ(1, num_row_groups); + ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, num_row_groups)); +} + +TYPED_TEST(TestBufferedParquetIO, WriteTableWithRowGroupSizeLimit) { + constexpr size_t NUM_BATCHES = 4; + std::shared_ptr values; + ASSERT_OK( + NullableArray(SMALL_SIZE, /*num_nulls=*/10, kDefaultSeed, &values)); + int num_row_groups = 0; + // Write all table in one batch with a small max_row_group_size. + { + int64_t write_table_batch_size = SMALL_SIZE; + int64_t parquet_writer_max_row_group_size = SMALL_SIZE / NUM_BATCHES; + int64_t write_max_row_group_size = DEFAULT_MAX_ROW_GROUP_LENGTH; + this->WriteBufferedTable(values, write_table_batch_size, + parquet_writer_max_row_group_size, write_max_row_group_size, + &num_row_groups); + EXPECT_EQ(4, num_row_groups); + ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, num_row_groups)); + } +} + +TYPED_TEST(TestBufferedParquetIO, WriteTableWithRowGroupSizeLimit2) { + constexpr size_t NUM_BATCHES = 4; + std::shared_ptr values; + ASSERT_OK( + NullableArray(SMALL_SIZE, /*num_nulls=*/10, kDefaultSeed, &values)); + int num_row_groups = 0; + int64_t write_table_batch_size = SMALL_SIZE; + int64_t parquet_writer_max_row_group_size = SMALL_SIZE; + int64_t write_max_row_group_size = SMALL_SIZE / NUM_BATCHES; + this->WriteBufferedTable(values, write_table_batch_size, + parquet_writer_max_row_group_size, write_max_row_group_size, + &num_row_groups); + EXPECT_EQ(4, num_row_groups); + ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, num_row_groups)); +} + +TYPED_TEST(TestBufferedParquetIO, WriteTableInBatchesForMultiRowGroups) { + std::shared_ptr values; + ASSERT_OK( + NullableArray(SMALL_SIZE, /*num_nulls=*/10, kDefaultSeed, &values)); + int num_row_groups = 0; + int64_t write_table_batch_size = SMALL_SIZE / 10; + int64_t parquet_writer_max_row_group_size = SMALL_SIZE; + int64_t write_max_row_group_size = SMALL_SIZE / 4; + this->WriteBufferedTable(values, write_table_batch_size, + parquet_writer_max_row_group_size, write_max_row_group_size, + &num_row_groups); + EXPECT_EQ(4, num_row_groups); + ASSERT_NO_FATAL_FAILURE(this->ReadAndCheckSingleColumnFile(*values, num_row_groups)); +} + TEST(TestArrowReadWrite, WriteAndReadRecordBatch) { auto pool = ::arrow::default_memory_pool(); auto sink = CreateOutputStream(); diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index 6d22f318f6b97..3703a920d2670 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -274,6 +274,16 @@ class ArrowColumnWriterV2 { RowGroupWriter* row_group_writer_; }; +std::shared_ptr GetColumnChunkedArray(const ::arrow::RecordBatch& value, + int column_id) { + return std::make_shared<::arrow::ChunkedArray>(value.column(column_id)); +} + +std::shared_ptr GetColumnChunkedArray(const ::arrow::Table& value, + int column_id) { + return value.column(column_id); +} + } // namespace // ---------------------------------------------------------------------- @@ -356,7 +366,10 @@ class FileWriterImpl : public FileWriter { std::shared_ptr<::arrow::Schema> schema() const override { return schema_; } - Status WriteTable(const Table& table, int64_t chunk_size) override { + template + Status WriteBuffered(const T& batch, int64_t max_row_group_length); + + Status WriteTable(const Table& table, int64_t chunk_size, bool use_buffering) override { RETURN_NOT_OK(table.Validate()); if (chunk_size <= 0 && table.num_rows() > 0) { @@ -369,27 +382,10 @@ class FileWriterImpl : public FileWriter { chunk_size = this->properties().max_row_group_length(); } - auto WriteRowGroup = [&](int64_t offset, int64_t size) { - RETURN_NOT_OK(NewRowGroup(size)); - for (int i = 0; i < table.num_columns(); i++) { - RETURN_NOT_OK(WriteColumnChunk(table.column(i), offset, size)); - } - return Status::OK(); - }; - - if (table.num_rows() == 0) { - // Append a row group with 0 rows - RETURN_NOT_OK_ELSE(WriteRowGroup(0, 0), PARQUET_IGNORE_NOT_OK(Close())); - return Status::OK(); + if (use_buffering) { + return WriteBuffered(table, chunk_size); } - - for (int chunk = 0; chunk * chunk_size < table.num_rows(); chunk++) { - int64_t offset = chunk * chunk_size; - RETURN_NOT_OK_ELSE( - WriteRowGroup(offset, std::min(chunk_size, table.num_rows() - offset)), - PARQUET_IGNORE_NOT_OK(Close())); - } - return Status::OK(); + return WriteTableUnbuffered(table, chunk_size); } Status NewBufferedRowGroup() override { @@ -406,57 +402,7 @@ class FileWriterImpl : public FileWriter { } // Max number of rows allowed in a row group. - const int64_t max_row_group_length = this->properties().max_row_group_length(); - - if (row_group_writer_ == nullptr || !row_group_writer_->buffered() || - row_group_writer_->num_rows() >= max_row_group_length) { - RETURN_NOT_OK(NewBufferedRowGroup()); - } - - auto WriteBatch = [&](int64_t offset, int64_t size) { - std::vector> writers; - int column_index_start = 0; - - for (int i = 0; i < batch.num_columns(); i++) { - ChunkedArray chunked_array{batch.column(i)}; - ARROW_ASSIGN_OR_RAISE( - std::unique_ptr writer, - ArrowColumnWriterV2::Make(chunked_array, offset, size, schema_manifest_, - row_group_writer_, column_index_start)); - column_index_start += writer->leaf_count(); - if (arrow_properties_->use_threads()) { - writers.emplace_back(std::move(writer)); - } else { - RETURN_NOT_OK(writer->Write(&column_write_context_)); - } - } - - if (arrow_properties_->use_threads()) { - DCHECK_EQ(parallel_column_write_contexts_.size(), writers.size()); - RETURN_NOT_OK(::arrow::internal::ParallelFor( - static_cast(writers.size()), - [&](int i) { return writers[i]->Write(¶llel_column_write_contexts_[i]); }, - arrow_properties_->executor())); - } - - return Status::OK(); - }; - - int64_t offset = 0; - while (offset < batch.num_rows()) { - const int64_t batch_size = - std::min(max_row_group_length - row_group_writer_->num_rows(), - batch.num_rows() - offset); - RETURN_NOT_OK(WriteBatch(offset, batch_size)); - offset += batch_size; - - // Flush current row group if it is full. - if (row_group_writer_->num_rows() >= max_row_group_length) { - RETURN_NOT_OK(NewBufferedRowGroup()); - } - } - - return Status::OK(); + return WriteBuffered(batch, this->properties().max_row_group_length()); } const WriterProperties& properties() const { return *writer_->properties(); } @@ -469,6 +415,30 @@ class FileWriterImpl : public FileWriter { return writer_->metadata(); } + Status WriteTableUnbuffered(const Table& table, int64_t chunk_size) { + auto WriteRowGroup = [&](int64_t offset, int64_t size) { + RETURN_NOT_OK(NewRowGroup(size)); + for (int i = 0; i < table.num_columns(); i++) { + RETURN_NOT_OK(WriteColumnChunk(table.column(i), offset, size)); + } + return Status::OK(); + }; + + if (table.num_rows() == 0) { + // Append a row group with 0 rows + RETURN_NOT_OK_ELSE(WriteRowGroup(0, 0), PARQUET_IGNORE_NOT_OK(Close())); + return Status::OK(); + } + + for (int chunk = 0; chunk * chunk_size < table.num_rows(); chunk++) { + int64_t offset = chunk * chunk_size; + RETURN_NOT_OK_ELSE( + WriteRowGroup(offset, std::min(chunk_size, table.num_rows() - offset)), + PARQUET_IGNORE_NOT_OK(Close())); + } + return Status::OK(); + } + private: friend class FileWriter; @@ -488,6 +458,59 @@ class FileWriterImpl : public FileWriter { std::vector parallel_column_write_contexts_; }; +template +Status FileWriterImpl::WriteBuffered(const T& batch, int64_t max_row_group_length) { + if (row_group_writer_ == nullptr || !row_group_writer_->buffered() || + row_group_writer_->num_rows() >= max_row_group_length) { + RETURN_NOT_OK(NewBufferedRowGroup()); + } + + auto WriteBatch = [&](int64_t offset, int64_t size) { + std::vector> writers; + int column_index_start = 0; + + for (int i = 0; i < batch.num_columns(); i++) { + std::shared_ptr chunked_array = GetColumnChunkedArray(batch, i); + ARROW_ASSIGN_OR_RAISE( + std::unique_ptr writer, + ArrowColumnWriterV2::Make(*chunked_array, offset, size, schema_manifest_, + row_group_writer_, column_index_start)); + column_index_start += writer->leaf_count(); + if (arrow_properties_->use_threads()) { + writers.emplace_back(std::move(writer)); + } else { + RETURN_NOT_OK(writer->Write(&column_write_context_)); + } + } + + if (arrow_properties_->use_threads()) { + DCHECK_EQ(parallel_column_write_contexts_.size(), writers.size()); + RETURN_NOT_OK(::arrow::internal::ParallelFor( + static_cast(writers.size()), + [&](int i) { return writers[i]->Write(¶llel_column_write_contexts_[i]); }, + arrow_properties_->executor())); + } + + return Status::OK(); + }; + + int64_t offset = 0; + while (offset < batch.num_rows()) { + const int64_t batch_size = std::min( + max_row_group_length - row_group_writer_->num_rows(), batch.num_rows() - offset); + RETURN_NOT_OK(WriteBatch(offset, batch_size)); + offset += batch_size; + + // Flush current row group if it is full. + if (row_group_writer_->num_rows() >= max_row_group_length && + offset < batch.num_rows()) { + RETURN_NOT_OK(NewBufferedRowGroup()); + } + } + + return Status::OK(); +} + FileWriter::~FileWriter() {} Status FileWriter::Make(::arrow::MemoryPool* pool, diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h index 1decafedc97fd..f34309fb73055 100644 --- a/cpp/src/parquet/arrow/writer.h +++ b/cpp/src/parquet/arrow/writer.h @@ -91,10 +91,26 @@ class PARQUET_EXPORT FileWriter { /// \brief Write a Table to Parquet. /// + /// If `use_buffering` is false, then any pending row group is closed + /// at the beginning and at the end of this call. + /// If `use_buffering` is true, this function reuses an existing + /// buffered row group until the chunk size is met, and leaves + /// the last row group open for further writes. + /// It is recommended to set `use_buffering` to true to minimize + /// the number of row groups, especially when calling `WriteTable` + /// with small tables. + /// + /// WARNING: If you are writing multiple files in parallel in the same + /// executor, deadlock may occur if ArrowWriterProperties::use_threads + /// is set to true to write columns in parallel. Please disable use_threads + /// option in this case. + /// /// \param table Arrow table to write. /// \param chunk_size maximum number of rows to write per row group. - virtual ::arrow::Status WriteTable( - const ::arrow::Table& table, int64_t chunk_size = DEFAULT_MAX_ROW_GROUP_LENGTH) = 0; + /// \param use_buffering Whether to potentially buffer data. + virtual ::arrow::Status WriteTable(const ::arrow::Table& table, + int64_t chunk_size = DEFAULT_MAX_ROW_GROUP_LENGTH, + bool use_buffering = false) = 0; /// \brief Start a new row group. ///