From 22f5aad50ee6c7ea6987b51e2711a1710250802c Mon Sep 17 00:00:00 2001 From: Penghui Li Date: Thu, 7 Nov 2024 11:37:21 -0800 Subject: [PATCH] Fix the beforeConsumer() method earlier hit with message listener --- .../pulsar/client/api/InterceptorsTest.java | 78 +++++++++++++++++++ .../pulsar/client/impl/ConsumerBase.java | 1 + .../pulsar/client/impl/ConsumerImpl.java | 3 +- .../client/impl/MultiTopicsConsumerImpl.java | 2 +- 4 files changed, 82 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java index 8115f34121d3c..f71cdc551411b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java @@ -476,6 +476,84 @@ public void onAckTimeoutSend(Consumer consumer, Set messageId consumer.close(); } + @Test(dataProvider = "topicPartition") + public void testDoNotEarlierHitBeforeConsumerWithMessageListener(int partitions) throws Exception { + + AtomicInteger beforeConsumeCount = new AtomicInteger(0); + PulsarClient client = PulsarClient.builder() + .serviceUrl(lookupUrl.toString()) + .listenerThreads(1) + .build(); + + ConsumerInterceptor interceptor = new ConsumerInterceptor<>() { + @Override + public void close() { + } + + @Override + public Message beforeConsume(Consumer consumer, Message message) { + beforeConsumeCount.incrementAndGet(); + log.info("beforeConsume messageId: {}", message.getMessageId()); + return message; + } + + @Override + public void onAcknowledge(Consumer consumer, MessageId messageId, Throwable cause) { + } + + @Override + public void onAcknowledgeCumulative(Consumer consumer, MessageId messageId, Throwable cause) { + } + + @Override + public void onNegativeAcksSend(Consumer consumer, Set messageIds) { + } + + @Override + public void onAckTimeoutSend(Consumer consumer, Set messageIds) { + } + }; + + final String topicName = "persistent://my-property/my-ns/my-topic"; + + if (partitions > 0) { + admin.topics().createPartitionedTopic(topicName, partitions); + } else { + admin.topics().createNonPartitionedTopic(topicName); + } + + Consumer consumer = client.newConsumer(Schema.STRING) + .topic(topicName) + .subscriptionType(SubscriptionType.Shared) + .intercept(interceptor) + .subscriptionName("my-subscription") + .messageListener((c, m) -> { + // Simulate a long processing time + try { + Thread.sleep(60000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }) + .subscribe(); + + Producer producer = client.newProducer(Schema.STRING) + .topic("persistent://my-property/my-ns/my-topic") + .create(); + + final int messages = 10; + for (int i = 0; i < messages; i++) { + producer.newMessage().value("Hello Pulsar!").send(); + } + Awaitility.await().untilAsserted(() -> { + // Ensure that the interceptor is not hit before the message listener + Assert.assertEquals(beforeConsumeCount.get(), 1); + }); + producer.close(); + consumer.close(); + client.close(); + } + @Test public void testConsumerInterceptorWithPatternTopicSubscribe() throws PulsarClientException { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 3073f3a833487..0fc906b6e7a9e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -1180,6 +1180,7 @@ protected void callMessageListener(Message msg) { id = msg.getMessageId(); } unAckedMessageTracker.add(id, msg.getRedeliveryCount()); + beforeConsume(msg); listener.received(ConsumerBase.this, msg); } catch (Throwable t) { log.error("[{}][{}] Message listener error in processing message: {}", topic, subscription, diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index be01bd00eb300..004adab56f529 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -542,7 +542,8 @@ protected Message internalReceive(long timeout, TimeUnit unit) throws PulsarC return null; } messageProcessed(message); - return beforeConsume(message); + message = listener == null ? beforeConsume(message) : message; + return message; } catch (InterruptedException e) { ExceptionHandler.handleInterruptedException(e); State state = getState(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index ff293af230838..528a140b81c2c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -401,7 +401,7 @@ protected Message internalReceive(long timeout, TimeUnit unit) throws PulsarC decreaseIncomingMessageSize(message); checkArgument(message instanceof TopicMessageImpl); trackUnAckedMsgIfNoListener(message.getMessageId(), message.getRedeliveryCount()); - message = beforeConsume(message); + message = listener == null ? beforeConsume(message) : message; } resumeReceivingFromPausedConsumersIfNeeded(); return message;