Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16831: CoordinatorRuntime should initialize MemoryRecordsBuilder with max batch size write limit #16059

Merged
merged 4 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -800,10 +800,10 @@ public void run() {
byte magic = logConfig.recordVersion().value;
int maxBatchSize = logConfig.maxMessageSize();
long currentTimeMs = time.milliseconds();
ByteBuffer buffer = context.bufferSupplier.get(Math.min(16384, maxBatchSize));
ByteBuffer buffer = context.bufferSupplier.get(Math.min(SIXTEEN_KB, maxBatchSize));

try {
MemoryRecordsBuilder builder = MemoryRecords.builder(
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
buffer,
magic,
compression,
Expand All @@ -814,7 +814,9 @@ public void run() {
producerEpoch,
0,
producerId != RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PARTITION_LEADER_EPOCH
false,
RecordBatch.NO_PARTITION_LEADER_EPOCH,
maxBatchSize
);

// Apply the records to the state machine and add them to the batch.
Expand Down Expand Up @@ -845,7 +847,8 @@ public void run() {
);
} else {
throw new RecordTooLargeException("Message batch size is " + builder.estimatedSizeInBytes() +
" bytes in append to partition $tp which exceeds the maximum configured size of $maxBatchSize.");
" bytes in append to partition " + tp + " which exceeds the maximum " +
"configured size of " + maxBatchSize + ".");
}
}

Expand Down Expand Up @@ -1365,6 +1368,11 @@ public void onHighWatermarkUpdated(
}
}

/**
* 16KB. Used for initial buffer size for write operations.
*/
static final int SIXTEEN_KB = 16384;

/**
* The log prefix.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.FAILED;
import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.INITIAL;
import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState.LOADING;
import static org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.SIXTEEN_KB;
import static org.apache.kafka.test.TestUtils.assertFutureThrows;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -224,6 +225,7 @@ private static class MockPartitionWriter extends InMemoryPartitionWriter {
private final int maxWrites;
private final boolean failEndMarker;
private final AtomicInteger writeCount = new AtomicInteger(0);
private MemoryRecords lastBatch = null;

public MockPartitionWriter() {
this(Integer.MAX_VALUE, false);
Expand Down Expand Up @@ -259,6 +261,7 @@ public long append(
VerificationGuard verificationGuard,
MemoryRecords batch
) {
lastBatch = batch;
if (writeCount.incrementAndGet() > maxWrites)
throw new KafkaException("Maximum number of writes reached");

Expand Down Expand Up @@ -3007,6 +3010,57 @@ public void testCoordinatorCompleteTransactionEventWriteTimeoutTaskIsCancelledWh
timer.taskQueue().forEach(taskEntry -> assertTrue(taskEntry.cancelled()));
}

@Test
public void testAppendRecordBatchSize() {
MockTimer timer = new MockTimer();
MockPartitionWriter writer = new MockPartitionWriter();
StringSerializer serializer = new StringSerializer();

CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
.withTimer(timer)
.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
.withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
.withSerializer(serializer)
.build();

// Schedule the loading.
runtime.scheduleLoadOperation(TP, 10);

// Verify the initial state.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(Collections.singletonList(0L), ctx.coordinator.snapshotRegistry().epochsList());

int maxBatchSize = writer.config(TP).maxMessageSize();
assertTrue(maxBatchSize > SIXTEEN_KB);

// Generate enough records to create a batch that has 16KB < batchSize < maxBatchSize
List<String> records = new ArrayList<>();
for (int i = 0; i < 3000; i++) {
records.add("record-" + i);
}

// Write #1.
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(records, "response1")
);

// Verify that the write has not completed exceptionally.
// This will catch any exceptions thrown including RecordTooLargeException.
assertFalse(write1.isCompletedExceptionally());

int batchSize = writer.lastBatch.sizeInBytes();
assertTrue(batchSize > SIXTEEN_KB && batchSize < maxBatchSize);
}

private static <S extends CoordinatorShard<U>, U> ArgumentMatcher<CoordinatorPlayback<U>> coordinatorMatcher(
CoordinatorRuntime<S, U> runtime,
TopicPartition tp
Expand Down