diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java index fc8d410de4aba..e808ecf589610 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java @@ -1018,8 +1018,8 @@ private void acknowledgeBatchIfImplicitAcknowledgement(boolean calledOnPoll) { if (acknowledgementMode == AcknowledgementMode.UNKNOWN) { // The first call to poll(Duration) moves into PENDING acknowledgementMode = AcknowledgementMode.PENDING; - } else if (acknowledgementMode == AcknowledgementMode.PENDING) { - // The second call to poll(Duration) if PENDING moves into IMPLICIT + } else if (acknowledgementMode == AcknowledgementMode.PENDING && !currentFetch.isEmpty()) { + // If there are records to acknowledge and PENDING, moves into IMPLICIT acknowledgementMode = AcknowledgementMode.IMPLICIT; } } else { diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java b/core/src/test/java/kafka/test/api/ShareConsumerTest.java index f5761003e103b..51a5a8728ddb7 100644 --- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java +++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java @@ -688,6 +688,44 @@ public void testExplicitAcknowledgementCommitAsync() throws InterruptedException } } + @ClusterTest + public void testImplicitModeNotTriggeredByPollWhenNoAcksToSend() throws InterruptedException { + setup(); + alterShareAutoOffsetReset("group1", "earliest"); + try (Producer producer = createProducer(); + ShareConsumer shareConsumer = createShareConsumer("group1")) { + + shareConsumer.subscribe(Collections.singleton(tp.topic())); + + Map> partitionOffsetsMap1 = new HashMap<>(); + Map partitionExceptionMap1 = new HashMap<>(); + shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallback(partitionOffsetsMap1, partitionExceptionMap1)); + + // The acknowledgement mode moves to PENDING from UNKNOWN. + ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(0, records.count()); + shareConsumer.commitAsync(); + + ProducerRecord record1 = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes()); + producer.send(record1); + producer.flush(); + + // The acknowledgement mode remains in PENDING because no records were returned. + records = shareConsumer.poll(Duration.ofMillis(5000)); + assertEquals(1, records.count()); + + // The acknowledgement mode now moves to EXPLICIT. + shareConsumer.acknowledge(records.iterator().next()); + shareConsumer.commitAsync(); + + TestUtils.waitForCondition(() -> { + shareConsumer.poll(Duration.ofMillis(500)); + return partitionExceptionMap1.containsKey(tp); + }, 30000, 100L, () -> "Didn't receive call to callback"); + verifyShareGroupStateTopicRecordsProduced(); + } + } + @ClusterTest public void testExplicitAcknowledgementCommitAsyncPartialBatch() { setup();