From ee75266e28651b6501a0b3a9b477eacbd4c27748 Mon Sep 17 00:00:00 2001 From: Jeff Kim Date: Thu, 23 May 2024 16:51:30 -0400 Subject: [PATCH 1/4] set MemoryRecords#writeLimit to maxBatchSize --- .../group/runtime/CoordinatorRuntime.java | 12 +++-- .../group/runtime/CoordinatorRuntimeTest.java | 50 +++++++++++++++++++ 2 files changed, 58 insertions(+), 4 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java index c0c194a901042..267eaf5d978b8 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java @@ -800,10 +800,10 @@ public void run() { byte magic = logConfig.recordVersion().value; int maxBatchSize = logConfig.maxMessageSize(); long currentTimeMs = time.milliseconds(); - ByteBuffer buffer = context.bufferSupplier.get(Math.min(16384, maxBatchSize)); + ByteBuffer buffer = context.bufferSupplier.get(Math.min(SIXTEEN_KB, maxBatchSize)); try { - MemoryRecordsBuilder builder = MemoryRecords.builder( + MemoryRecordsBuilder builder = new MemoryRecordsBuilder( buffer, magic, compression, @@ -814,7 +814,9 @@ public void run() { producerEpoch, 0, producerId != RecordBatch.NO_PRODUCER_ID, - RecordBatch.NO_PARTITION_LEADER_EPOCH + false, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + maxBatchSize ); // Apply the records to the state machine and add them to the batch. @@ -845,7 +847,7 @@ public void run() { ); } else { throw new RecordTooLargeException("Message batch size is " + builder.estimatedSizeInBytes() + - " bytes in append to partition $tp which exceeds the maximum configured size of $maxBatchSize."); + " bytes in append to partition " + tp + " which exceeds the maximum configured size of " + maxBatchSize + "."); } } @@ -1365,6 +1367,8 @@ public void onHighWatermarkUpdated( } } + static final int SIXTEEN_KB = 16384; + /** * The log prefix. */ diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java index a8cc200b35922..d4af23b611b02 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java @@ -224,6 +224,7 @@ private static class MockPartitionWriter extends InMemoryPartitionWriter { private final int maxWrites; private final boolean failEndMarker; private final AtomicInteger writeCount = new AtomicInteger(0); + private MemoryRecords lastBatch = null; public MockPartitionWriter() { this(Integer.MAX_VALUE, false); @@ -259,6 +260,7 @@ public long append( VerificationGuard verificationGuard, MemoryRecords batch ) { + lastBatch = batch; if (writeCount.incrementAndGet() > maxWrites) throw new KafkaException("Maximum number of writes reached"); @@ -3007,6 +3009,54 @@ public void testCoordinatorCompleteTransactionEventWriteTimeoutTaskIsCancelledWh timer.taskQueue().forEach(taskEntry -> assertTrue(taskEntry.cancelled())); } + @Test + public void testAppendRecordBatchSize() { + MockTimer timer = new MockTimer(); + MockPartitionWriter writer = new MockPartitionWriter(); + StringSerializer serializer = new StringSerializer(); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) + .withSerializer(serializer) + .build(); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 10); + + // Verify the initial state. + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(0L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); + + int maxBatchSize = writer.config(TP).maxMessageSize(); + assertTrue(maxBatchSize > 16834); + + // Generate enough records to create a batch that has 16KB < batchSize < maxBatchSize + List records = new ArrayList<>(); + for (int i = 0; i < 3000; i++) { + records.add("record-" + i); + } + + // Write #1. + CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, + state -> new CoordinatorResult<>(records, "response1") + ); + + // Verify that the write has not completed exceptionally. + // This will catch any exceptions thrown including RecordTooLargeException. + assertFalse(write1.isCompletedExceptionally()); + } + private static , U> ArgumentMatcher> coordinatorMatcher( CoordinatorRuntime runtime, TopicPartition tp From c359827d636e846a0a72281eb1b32a44b99de2ca Mon Sep 17 00:00:00 2001 From: Jeff Kim Date: Thu, 23 May 2024 16:54:59 -0400 Subject: [PATCH 2/4] tidy --- .../kafka/coordinator/group/runtime/CoordinatorRuntime.java | 3 +++ .../coordinator/group/runtime/CoordinatorRuntimeTest.java | 6 +++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java index 267eaf5d978b8..fc9e3e07eb2d4 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java @@ -1367,6 +1367,9 @@ public void onHighWatermarkUpdated( } } + /** + * 16KB. Used for initial buffer size for write operations. + */ static final int SIXTEEN_KB = 16384; /** diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java index d4af23b611b02..7eeb144ab7723 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java @@ -81,6 +81,7 @@ import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.FAILED; import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.INITIAL; import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.LOADING; +import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.SIXTEEN_KB; import static org.apache.kafka.test.TestUtils.assertFutureThrows; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -3039,7 +3040,7 @@ public void testAppendRecordBatchSize() { assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); int maxBatchSize = writer.config(TP).maxMessageSize(); - assertTrue(maxBatchSize > 16834); + assertTrue(maxBatchSize > SIXTEEN_KB); // Generate enough records to create a batch that has 16KB < batchSize < maxBatchSize List records = new ArrayList<>(); @@ -3055,6 +3056,9 @@ public void testAppendRecordBatchSize() { // Verify that the write has not completed exceptionally. // This will catch any exceptions thrown including RecordTooLargeException. assertFalse(write1.isCompletedExceptionally()); + + int batchSize = writer.lastBatch.sizeInBytes(); + assertTrue(batchSize > SIXTEEN_KB && batchSize < maxBatchSize); } private static , U> ArgumentMatcher> coordinatorMatcher( From 1613e6ad366132133e4cb4a1e316a3b1fea40858 Mon Sep 17 00:00:00 2001 From: Jeff Kim Date: Thu, 23 May 2024 16:56:36 -0400 Subject: [PATCH 3/4] log --- .../kafka/coordinator/group/runtime/CoordinatorRuntime.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java index fc9e3e07eb2d4..adabefabaebcc 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java @@ -847,7 +847,8 @@ public void run() { ); } else { throw new RecordTooLargeException("Message batch size is " + builder.estimatedSizeInBytes() + - " bytes in append to partition " + tp + " which exceeds the maximum configured size of " + maxBatchSize + "."); + " bytes in append to partition " + tp + " which exceeds the maximum " + + "configured size of " + maxBatchSize + "."); } } From c881ac1e85a0601763a35c5138b41c2d1858aaab Mon Sep 17 00:00:00 2001 From: Jeff Kim Date: Fri, 24 May 2024 09:54:37 -0400 Subject: [PATCH 4/4] Address comments --- .../coordinator/group/runtime/CoordinatorRuntime.java | 4 ++-- .../group/runtime/CoordinatorRuntimeTest.java | 10 ++++------ 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java index adabefabaebcc..4207c94770b96 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java @@ -800,7 +800,7 @@ public void run() { byte magic = logConfig.recordVersion().value; int maxBatchSize = logConfig.maxMessageSize(); long currentTimeMs = time.milliseconds(); - ByteBuffer buffer = context.bufferSupplier.get(Math.min(SIXTEEN_KB, maxBatchSize)); + ByteBuffer buffer = context.bufferSupplier.get(Math.min(MIN_BUFFER_SIZE, maxBatchSize)); try { MemoryRecordsBuilder builder = new MemoryRecordsBuilder( @@ -1371,7 +1371,7 @@ public void onHighWatermarkUpdated( /** * 16KB. Used for initial buffer size for write operations. */ - static final int SIXTEEN_KB = 16384; + static final int MIN_BUFFER_SIZE = 16384; /** * The log prefix. diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java index 7eeb144ab7723..ae1d404792406 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java @@ -81,7 +81,7 @@ import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.FAILED; import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.INITIAL; import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.LOADING; -import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.SIXTEEN_KB; +import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.MIN_BUFFER_SIZE; import static org.apache.kafka.test.TestUtils.assertFutureThrows; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -225,7 +225,6 @@ private static class MockPartitionWriter extends InMemoryPartitionWriter { private final int maxWrites; private final boolean failEndMarker; private final AtomicInteger writeCount = new AtomicInteger(0); - private MemoryRecords lastBatch = null; public MockPartitionWriter() { this(Integer.MAX_VALUE, false); @@ -261,7 +260,6 @@ public long append( VerificationGuard verificationGuard, MemoryRecords batch ) { - lastBatch = batch; if (writeCount.incrementAndGet() > maxWrites) throw new KafkaException("Maximum number of writes reached"); @@ -3040,7 +3038,7 @@ public void testAppendRecordBatchSize() { assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList()); int maxBatchSize = writer.config(TP).maxMessageSize(); - assertTrue(maxBatchSize > SIXTEEN_KB); + assertTrue(maxBatchSize > MIN_BUFFER_SIZE); // Generate enough records to create a batch that has 16KB < batchSize < maxBatchSize List records = new ArrayList<>(); @@ -3057,8 +3055,8 @@ public void testAppendRecordBatchSize() { // This will catch any exceptions thrown including RecordTooLargeException. assertFalse(write1.isCompletedExceptionally()); - int batchSize = writer.lastBatch.sizeInBytes(); - assertTrue(batchSize > SIXTEEN_KB && batchSize < maxBatchSize); + int batchSize = writer.entries(TP).get(0).sizeInBytes(); + assertTrue(batchSize > MIN_BUFFER_SIZE && batchSize < maxBatchSize); } private static , U> ArgumentMatcher> coordinatorMatcher(