Skip to content

Commit

Permalink
GH-39965: [C++] DatasetWriter avoid creating zero-sized batch when `m…
Browse files Browse the repository at this point in the history
…ax_rows_per_file` enabled (#39995)

### Rationale for this change

`DatasetWriter` might create empty `RecordBatch` when `max_rows_per_file` enabled. This is because `NextWritableChunk` might return a zero-sized batch when the file exactly contains the dest data.

### What changes are included in this PR?

Check batch-size == 0 when append to file queue

### Are these changes tested?

Yes

### Are there any user-facing changes?

User can avoid zero-sized row-group/batch.

* Closes: #39965

Authored-by: mwish <maplewish117@gmail.com>
Signed-off-by: mwish <maplewish117@gmail.com>
  • Loading branch information
mapleFU authored Feb 23, 2024
1 parent aceb51a commit 5f75dbf
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 9 deletions.
11 changes: 10 additions & 1 deletion cpp/src/arrow/dataset/dataset_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,16 @@ class DatasetWriter::DatasetWriterImpl {
bool will_open_file = false;
ARROW_ASSIGN_OR_RAISE(auto next_chunk, dir_queue->NextWritableChunk(
batch, &remainder, &will_open_file));

// GH-39965: `NextWritableChunk` may return an empty batch to signal
// that the current file has reached `max_rows_per_file` and should be
// finished.
if (next_chunk->num_rows() == 0) {
batch = std::move(remainder);
if (batch) {
RETURN_NOT_OK(dir_queue->FinishCurrentFile());
}
continue;
}
backpressure =
writer_state_.rows_in_flight_throttle.Acquire(next_chunk->num_rows());
if (!backpressure.is_finished()) {
Expand Down
28 changes: 20 additions & 8 deletions cpp/src/arrow/dataset/dataset_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,18 +189,15 @@ class DatasetWriterTestFixture : public testing::Test {
}
}

void AssertCreatedData(const std::vector<ExpectedFile>& expected_files,
bool check_num_record_batches = true) {
void AssertCreatedData(const std::vector<ExpectedFile>& expected_files) {
counter_ = 0;
for (const auto& expected_file : expected_files) {
std::optional<MockFileInfo> written_file = FindFile(expected_file.filename);
AssertFileCreated(written_file, expected_file.filename);
int num_batches = 0;
AssertBatchesEqual(*MakeBatch(expected_file.start, expected_file.num_rows),
*ReadAsBatch(written_file->data, &num_batches));
if (check_num_record_batches) {
ASSERT_EQ(expected_file.num_record_batches, num_batches);
}
ASSERT_EQ(expected_file.num_record_batches, num_batches);
}
}

Expand Down Expand Up @@ -299,9 +296,7 @@ TEST_F(DatasetWriterTestFixture, MaxRowsOneWriteBackpresure) {
expected_files.emplace_back("testdir/chunk-" + std::to_string(i) + ".arrow",
kFileSizeLimit * i, kFileSizeLimit);
}
// Not checking the number of record batches because file may contain the
// zero-length record batch.
AssertCreatedData(expected_files, /*check_num_record_batches=*/false);
AssertCreatedData(expected_files);
}

TEST_F(DatasetWriterTestFixture, MaxRowsOneWriteWithFunctor) {
Expand Down Expand Up @@ -348,6 +343,23 @@ TEST_F(DatasetWriterTestFixture, MaxRowsManyWrites) {
{{"testdir/chunk-0.arrow", 0, 10, 4}, {"testdir/chunk-1.arrow", 10, 8, 3}});
}

TEST_F(DatasetWriterTestFixture, NotProduceZeroSizedBatch) {
// GH-39965: avoid creating zero-sized batch when max_rows_per_file enabled.
write_options_.max_rows_per_file = 10;
write_options_.max_rows_per_group = 10;
auto dataset_writer = MakeDatasetWriter();
dataset_writer->WriteRecordBatch(MakeBatch(20), "");
dataset_writer->WriteRecordBatch(MakeBatch(20), "");
EndWriterChecked(dataset_writer.get());
AssertCreatedData({
{"testdir/chunk-0.arrow", 0, 10, 1},
{"testdir/chunk-1.arrow", 10, 10, 1},
{"testdir/chunk-2.arrow", 20, 10, 1},
{"testdir/chunk-3.arrow", 30, 10, 1},
});
AssertNotFiles({"testdir/chunk-4.arrow"});
}

TEST_F(DatasetWriterTestFixture, MinRowGroup) {
write_options_.min_rows_per_group = 20;
auto dataset_writer = MakeDatasetWriter();
Expand Down

0 comments on commit 5f75dbf

Please sign in to comment.