diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java index c0c194a901042..4207c94770b96 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java @@ -800,10 +800,10 @@ public void run() { byte magic = logConfig.recordVersion().value; int maxBatchSize = logConfig.maxMessageSize(); long currentTimeMs = time.milliseconds(); - ByteBuffer buffer = context.bufferSupplier.get(Math.min(16384, maxBatchSize)); + ByteBuffer buffer = context.bufferSupplier.get(Math.min(MIN_BUFFER_SIZE, maxBatchSize)); try { - MemoryRecordsBuilder builder = MemoryRecords.builder( + MemoryRecordsBuilder builder = new MemoryRecordsBuilder( buffer, magic, compression, @@ -814,7 +814,9 @@ public void run() { producerEpoch, 0, producerId != RecordBatch.NO_PRODUCER_ID, - RecordBatch.NO_PARTITION_LEADER_EPOCH + false, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + maxBatchSize ); // Apply the records to the state machine and add them to the batch. @@ -845,7 +847,8 @@ public void run() { ); } else { throw new RecordTooLargeException("Message batch size is " + builder.estimatedSizeInBytes() + - " bytes in append to partition $tp which exceeds the maximum configured size of $maxBatchSize."); + " bytes in append to partition " + tp + " which exceeds the maximum " + + "configured size of " + maxBatchSize + "."); } } @@ -1365,6 +1368,11 @@ public void onHighWatermarkUpdated( } } + /** + * 16KB. Used for initial buffer size for write operations. + */ + static final int MIN_BUFFER_SIZE = 16384; + /** * The log prefix. */ diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java index a8cc200b35922..ae1d404792406 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java @@ -81,6 +81,7 @@ import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.FAILED; import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.INITIAL; import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.LOADING; +import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.MIN_BUFFER_SIZE; import static org.apache.kafka.test.TestUtils.assertFutureThrows; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -3007,6 +3008,57 @@ public void testCoordinatorCompleteTransactionEventWriteTimeoutTaskIsCancelledWh timer.taskQueue().forEach(taskEntry -> assertTrue(taskEntry.cancelled())); } + @Test + public void testAppendRecordBatchSize() { + MockTimer timer = new MockTimer(); + MockPartitionWriter writer = new MockPartitionWriter(); + StringSerializer serializer = new StringSerializer(); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) + .withSerializer(serializer) + .build(); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 10); + + // Verify the initial state. + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + + int maxBatchSize = writer.config(TP).maxMessageSize(); + assertTrue(maxBatchSize > MIN_BUFFER_SIZE); + + // Generate enough records to create a batch that has 16KB < batchSize < maxBatchSize + List records = new ArrayList<>(); + for (int i = 0; i < 3000; i++) { + records.add("record-" + i); + } + + // Write #1. + CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, + state -> new CoordinatorResult<>(records, "response1") + ); + + // Verify that the write has not completed exceptionally. + // This will catch any exceptions thrown including RecordTooLargeException. + assertFalse(write1.isCompletedExceptionally()); + + int batchSize = writer.entries(TP).get(0).sizeInBytes(); + assertTrue(batchSize > MIN_BUFFER_SIZE && batchSize < maxBatchSize); + } + private static , U> ArgumentMatcher> coordinatorMatcher( CoordinatorRuntime runtime, TopicPartition tp