From e9c6922135b48e86d896f1296c289228d247312b Mon Sep 17 00:00:00 2001 From: mwish Date: Fri, 23 Feb 2024 11:12:51 +0800 Subject: [PATCH] GH-39965: [C++] DatasetWriter avoid creating zero-sized batch when `max_rows_per_file` enabled (#39995) ### Rationale for this change `DatasetWriter` might create empty `RecordBatch` when `max_rows_per_file` enabled. This is because `NextWritableChunk` might return a zero-sized batch when the file exactly contains the dest data. ### What changes are included in this PR? Check batch-size == 0 when append to file queue ### Are these changes tested? Yes ### Are there any user-facing changes? User can avoid zero-sized row-group/batch. * Closes: #39965 Authored-by: mwish Signed-off-by: mwish --- cpp/src/arrow/dataset/dataset_writer.cc | 11 +++++++- cpp/src/arrow/dataset/dataset_writer_test.cc | 28 ++++++++++++++------ 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset_writer.cc b/cpp/src/arrow/dataset/dataset_writer.cc index ae9fb36484bb6..34731d19ab3eb 100644 --- a/cpp/src/arrow/dataset/dataset_writer.cc +++ b/cpp/src/arrow/dataset/dataset_writer.cc @@ -610,7 +610,16 @@ class DatasetWriter::DatasetWriterImpl { bool will_open_file = false; ARROW_ASSIGN_OR_RAISE(auto next_chunk, dir_queue->NextWritableChunk( batch, &remainder, &will_open_file)); - + // GH-39965: `NextWritableChunk` may return an empty batch to signal + // that the current file has reached `max_rows_per_file` and should be + // finished. + if (next_chunk->num_rows() == 0) { + batch = std::move(remainder); + if (batch) { + RETURN_NOT_OK(dir_queue->FinishCurrentFile()); + } + continue; + } backpressure = writer_state_.rows_in_flight_throttle.Acquire(next_chunk->num_rows()); if (!backpressure.is_finished()) { diff --git a/cpp/src/arrow/dataset/dataset_writer_test.cc b/cpp/src/arrow/dataset/dataset_writer_test.cc index 1ac0ec3f39e97..871b6ef6f5507 100644 --- a/cpp/src/arrow/dataset/dataset_writer_test.cc +++ b/cpp/src/arrow/dataset/dataset_writer_test.cc @@ -189,8 +189,7 @@ class DatasetWriterTestFixture : public testing::Test { } } - void AssertCreatedData(const std::vector& expected_files, - bool check_num_record_batches = true) { + void AssertCreatedData(const std::vector& expected_files) { counter_ = 0; for (const auto& expected_file : expected_files) { std::optional written_file = FindFile(expected_file.filename); @@ -198,9 +197,7 @@ class DatasetWriterTestFixture : public testing::Test { int num_batches = 0; AssertBatchesEqual(*MakeBatch(expected_file.start, expected_file.num_rows), *ReadAsBatch(written_file->data, &num_batches)); - if (check_num_record_batches) { - ASSERT_EQ(expected_file.num_record_batches, num_batches); - } + ASSERT_EQ(expected_file.num_record_batches, num_batches); } } @@ -299,9 +296,7 @@ TEST_F(DatasetWriterTestFixture, MaxRowsOneWriteBackpresure) { expected_files.emplace_back("testdir/chunk-" + std::to_string(i) + ".arrow", kFileSizeLimit * i, kFileSizeLimit); } - // Not checking the number of record batches because file may contain the - // zero-length record batch. - AssertCreatedData(expected_files, /*check_num_record_batches=*/false); + AssertCreatedData(expected_files); } TEST_F(DatasetWriterTestFixture, MaxRowsOneWriteWithFunctor) { @@ -348,6 +343,23 @@ TEST_F(DatasetWriterTestFixture, MaxRowsManyWrites) { {{"testdir/chunk-0.arrow", 0, 10, 4}, {"testdir/chunk-1.arrow", 10, 8, 3}}); } +TEST_F(DatasetWriterTestFixture, NotProduceZeroSizedBatch) { + // GH-39965: avoid creating zero-sized batch when max_rows_per_file enabled. + write_options_.max_rows_per_file = 10; + write_options_.max_rows_per_group = 10; + auto dataset_writer = MakeDatasetWriter(); + dataset_writer->WriteRecordBatch(MakeBatch(20), ""); + dataset_writer->WriteRecordBatch(MakeBatch(20), ""); + EndWriterChecked(dataset_writer.get()); + AssertCreatedData({ + {"testdir/chunk-0.arrow", 0, 10, 1}, + {"testdir/chunk-1.arrow", 10, 10, 1}, + {"testdir/chunk-2.arrow", 20, 10, 1}, + {"testdir/chunk-3.arrow", 30, 10, 1}, + }); + AssertNotFiles({"testdir/chunk-4.arrow"}); +} + TEST_F(DatasetWriterTestFixture, MinRowGroup) { write_options_.min_rows_per_group = 20; auto dataset_writer = MakeDatasetWriter();