From ed0e050ff9f0fe38fa6af7b64985751dff334341 Mon Sep 17 00:00:00 2001 From: Jeff Kim Date: Fri, 24 May 2024 16:33:57 -0400 Subject: [PATCH] KAFKA-16831: CoordinatorRuntime should initialize MemoryRecordsBuilder with max batch size write limit (#16059) CoordinatorRuntime should initialize MemoryRecordsBuilder with max batch size write limit. Otherwise, we default the write limit to the min buffer size of 16384 for the write limit. This causes the coordinator to threw RecordTooLargeException even when it's under the 1MB max batch size limit. Reviewers: David Jacot --- .../group/runtime/CoordinatorRuntime.java | 16 ++++-- .../group/runtime/CoordinatorRuntimeTest.java | 52 +++++++++++++++++++ 2 files changed, 64 insertions(+), 4 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java index c0c194a901042..4207c94770b96 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java @@ -800,10 +800,10 @@ public void run() { byte magic = logConfig.recordVersion().value; int maxBatchSize = logConfig.maxMessageSize(); long currentTimeMs = time.milliseconds(); - ByteBuffer buffer = context.bufferSupplier.get(Math.min(16384, maxBatchSize)); + ByteBuffer buffer = context.bufferSupplier.get(Math.min(MIN_BUFFER_SIZE, maxBatchSize)); try { - MemoryRecordsBuilder builder = MemoryRecords.builder( + MemoryRecordsBuilder builder = new MemoryRecordsBuilder( buffer, magic, compression, @@ -814,7 +814,9 @@ public void run() { producerEpoch, 0, producerId != RecordBatch.NO_PRODUCER_ID, - RecordBatch.NO_PARTITION_LEADER_EPOCH + false, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + maxBatchSize ); // Apply the records to the state machine and add them to the batch. @@ -845,7 +847,8 @@ public void run() { ); } else { throw new RecordTooLargeException("Message batch size is " + builder.estimatedSizeInBytes() + - " bytes in append to partition $tp which exceeds the maximum configured size of $maxBatchSize."); + " bytes in append to partition " + tp + " which exceeds the maximum " + + "configured size of " + maxBatchSize + "."); } } @@ -1365,6 +1368,11 @@ public void onHighWatermarkUpdated( } } + /** + * 16KB. Used for initial buffer size for write operations. + */ + static final int MIN_BUFFER_SIZE = 16384; + /** * The log prefix. */ diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java index a8cc200b35922..ae1d404792406 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java @@ -81,6 +81,7 @@ import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.FAILED; import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.INITIAL; import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.LOADING; +import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.MIN_BUFFER_SIZE; import static org.apache.kafka.test.TestUtils.assertFutureThrows; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -3007,6 +3008,57 @@ public void testCoordinatorCompleteTransactionEventWriteTimeoutTaskIsCancelledWh timer.taskQueue().forEach(taskEntry -> assertTrue(taskEntry.cancelled())); } + @Test + public void testAppendRecordBatchSize() { + MockTimer timer = new MockTimer(); + MockPartitionWriter writer = new MockPartitionWriter(); + StringSerializer serializer = new StringSerializer(); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) + .withSerializer(serializer) + .build(); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 10); + + // Verify the initial state. + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + + int maxBatchSize = writer.config(TP).maxMessageSize(); + assertTrue(maxBatchSize > MIN_BUFFER_SIZE); + + // Generate enough records to create a batch that has 16KB < batchSize < maxBatchSize + List records = new ArrayList<>(); + for (int i = 0; i < 3000; i++) { + records.add("record-" + i); + } + + // Write #1. + CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, + state -> new CoordinatorResult<>(records, "response1") + ); + + // Verify that the write has not completed exceptionally. + // This will catch any exceptions thrown including RecordTooLargeException. + assertFalse(write1.isCompletedExceptionally()); + + int batchSize = writer.entries(TP).get(0).sizeInBytes(); + assertTrue(batchSize > MIN_BUFFER_SIZE && batchSize < maxBatchSize); + } + private static , U> ArgumentMatcher> coordinatorMatcher( CoordinatorRuntime runtime, TopicPartition tp