Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE #16072

Merged
merged 2 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -494,9 +494,9 @@ class GroupMetadataManager(brokerId: Int,

if (isTxnOffsetCommit) {
addProducerGroup(producerId, group.groupId)
group.prepareTxnOffsetCommit(producerId, offsetMetadata)
group.prepareTxnOffsetCommit(producerId, filteredOffsetMetadata)
} else {
group.prepareOffsetCommit(offsetMetadata)
group.prepareOffsetCommit(filteredOffsetMetadata)
}

appendForGroup(group, records, requestLocal, putCacheCallback, verificationGuards)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1664,6 +1664,134 @@ class GroupMetadataManagerTest {
assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
}

@Test
def testOffsetMetadataTooLargePartialFailure(): Unit = {
val memberId = ""
val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 1, "foo")
val offset = 37
val requireStable = true;

groupMetadataManager.addOwnedPartition(groupPartitionId)
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)

val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId))
val offsets = immutable.Map(
topicIdPartition -> OffsetAndMetadata(offset, "s" * (offsetConfig.maxMetadataSize + 1) , time.milliseconds()),
validTopicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds())
)

expectAppendMessage(Errors.NONE)

var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}

assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, offsets, callback, verificationGuard = None)
assertTrue(group.hasOffsets)

assertEquals(Some(Map(
topicIdPartition -> Errors.OFFSET_METADATA_TOO_LARGE,
validTopicIdPartition -> Errors.NONE)
), commitErrors)

val cachedOffsets = groupMetadataManager.getOffsets(
groupId,
requireStable,
Some(Seq(topicIdPartition.topicPartition, validTopicIdPartition.topicPartition))
)

assertEquals(
Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset)
)
assertEquals(
Some(Errors.NONE),
cachedOffsets.get(topicIdPartition.topicPartition).map(_.error)
)
assertEquals(
Some(offset),
cachedOffsets.get(validTopicIdPartition.topicPartition).map(_.offset)
)

assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
}

@Test
def testTransactionalCommitOffsetWithOffsetMetadataTooLargePartialFailure(): Unit = {
val memberId = ""
val foo0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val foo1 = new TopicIdPartition(Uuid.randomUuid(), 1, "foo")
val producerId = 232L
val producerEpoch = 0.toShort

groupMetadataManager.addOwnedPartition(groupPartitionId)

val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)

val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId))
val offsets = immutable.Map(
foo0 -> OffsetAndMetadata(37, "", time.milliseconds()),
foo1 -> OffsetAndMetadata(38, "s" * (offsetConfig.maxMetadataSize + 1), time.milliseconds())
)

val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] =
ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None

def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}

val verificationGuard = new VerificationGuard()

groupMetadataManager.storeOffsets(
group,
memberId,
offsetTopicPartition,
offsets,
callback,
producerId,
producerEpoch,
verificationGuard = Some(verificationGuard)
)
assertTrue(group.hasOffsets)
assertTrue(group.allOffsets.isEmpty)

verify(replicaManager).appendRecords(anyLong(),
anyShort(),
any(),
any(),
any[Map[TopicPartition, MemoryRecords]],
capturedResponseCallback.capture(),
any[Option[ReentrantLock]],
any(),
any(),
any(),
ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard)))
verify(replicaManager).getMagic(any())
capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))

assertEquals(Some(Map(
foo0 -> Errors.NONE,
foo1 -> Errors.OFFSET_METADATA_TOO_LARGE
)), commitErrors)

assertTrue(group.hasOffsets)
assertTrue(group.allOffsets.isEmpty)

group.completePendingTxnOffsetCommit(producerId, isCommit = true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we complete this pending offset if one of the offsets in the transaction didn't write?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, if the client does not retry the failed offset and commit the transaction. we basically commit whatever is pending.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting -- so the client doesn't even retry. Do they at least get a clear error that it failed and they can choose to retry?

Copy link
Member

@jolshan jolshan May 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the TxnOffsetCommitHandler, it seems like this should be a fatal error?

fatalError(new KafkaException("Unexpected error in InitProducerIdResponse; " + error.message()));

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You brought a good point. The server returns INVALID_OFFSET to the client and the java client does consider it as a fatal error. Therefore, the transaction won’t be committed in the end.

Hum… It does not explain the situation that I was investigating then. Except if another client was used. I have another theory that I must validate. I will keep you posted.

That being said, this is still a bug that leaves the state on the server inconsistent. The patch is still valid.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the patch is valid but I do wonder if it was needed to fix for transactinoal clients and/or if we should change the logic for the transactional clients to not allow only some offsets to be committed.

I guess that is trickier to enforce. I guess for this PR I just wonder if this test suggests a bad behavior. 😅

Copy link
Member Author

@dajac dajac May 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the patch is valid but I do wonder if it was needed to fix for transactinoal clients and/or if we should change the logic for the transactional clients to not allow only some offsets to be committed.

My understanding is that the client already fails in this case because it transitions to the fatal state. This means that the transaction will be aborted, no? Therefore, the partial offsets are not committed in the end.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I guess it's just a little confusing to have the test model a behavior that shouldn't happen. But I'm not sure if there is a simple fix for this.

assertTrue(group.hasOffsets)
assertFalse(group.allOffsets.isEmpty)
assertEquals(offsets.get(foo0), group.offset(foo0.topicPartition))
}

@Test
def testExpireOffset(): Unit = {
val memberId = ""
Expand Down