diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index a2f3498190f93..dd0b19c2ce048 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -5224,6 +5224,47 @@ TEST(TestArrowReadWrite, WriteAndReadRecordBatch) { EXPECT_TRUE(record_batch->Equals(*read_record_batch)); } +TEST(TestArrowReadWrite, WriteRecordBatchNotProduceEmptyRowGroup) { + // GH-39211: WriteRecordBatch should prevent from writing a empty row group + // in the end of the file. + auto pool = ::arrow::default_memory_pool(); + auto sink = CreateOutputStream(); + // Limit the max number of rows in a row group to 2 + auto writer_properties = WriterProperties::Builder().max_row_group_length(2)->build(); + auto arrow_writer_properties = default_arrow_writer_properties(); + + // Prepare schema + auto schema = ::arrow::schema({::arrow::field("a", ::arrow::int64())}); + std::shared_ptr parquet_schema; + ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties, + *arrow_writer_properties, &parquet_schema)); + auto schema_node = std::static_pointer_cast(parquet_schema->schema_root()); + + auto gen = ::arrow::random::RandomArrayGenerator(/*seed=*/42); + + // Create writer to write data via RecordBatch. + auto writer = ParquetFileWriter::Open(sink, schema_node, writer_properties); + std::unique_ptr arrow_writer; + ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, arrow_writer_properties, + &arrow_writer)); + // NewBufferedRowGroup() is not called explicitly and it will be called + // inside WriteRecordBatch(). + // Write 20 rows for two times + for (int i = 0; i < 2; ++i) { + auto record_batch = + gen.BatchOf({::arrow::field("a", ::arrow::int64())}, /*length=*/20); + ASSERT_OK_NO_THROW(arrow_writer->WriteRecordBatch(*record_batch)); + } + ASSERT_OK_NO_THROW(arrow_writer->Close()); + ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish()); + + auto file_metadata = arrow_writer->metadata(); + EXPECT_EQ(20, file_metadata->num_row_groups()); + for (int i = 0; i < 20; ++i) { + EXPECT_EQ(2, file_metadata->RowGroup(i)->num_rows()); + } +} + TEST(TestArrowReadWrite, MultithreadedWrite) { const int num_columns = 20; const int num_rows = 1000; diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc index 07c627d5eda67..5238986c428d3 100644 --- a/cpp/src/parquet/arrow/writer.cc +++ b/cpp/src/parquet/arrow/writer.cc @@ -419,6 +419,7 @@ class FileWriterImpl : public FileWriter { // Max number of rows allowed in a row group. const int64_t max_row_group_length = this->properties().max_row_group_length(); + // Initialize a new buffered row group writer if necessary. if (row_group_writer_ == nullptr || !row_group_writer_->buffered() || row_group_writer_->num_rows() >= max_row_group_length) { RETURN_NOT_OK(NewBufferedRowGroup()); @@ -461,8 +462,9 @@ class FileWriterImpl : public FileWriter { RETURN_NOT_OK(WriteBatch(offset, batch_size)); offset += batch_size; - // Flush current row group if it is full. - if (row_group_writer_->num_rows() >= max_row_group_length) { + // Flush current row group writer and create a new writer if it is full. + if (row_group_writer_->num_rows() >= max_row_group_length && + offset < batch.num_rows()) { RETURN_NOT_OK(NewBufferedRowGroup()); } }