Skip to content

Commit

Permalink
Fix the retry topic's REAL_TOPIC & ORIGIN_MESSAGE_ID property sho…
Browse files Browse the repository at this point in the history
…uld not be modified once it has been written. (#12451)

### Motivation
when reconsumer the message with the configuration maxRedeliveryCount > 1 ,eg deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(3).build()):

then when consumer the same message by the third time (RECONSUMETIMES=2) the REAL_TOPIC and the ORIGIN_MESSAGE_ID changed as the second message.

but, this should not changed once the REAL_TOPIC ORIGIN_MESSAGE_ID property was written .
```
2021-10-21T15:03:12,771+0800 [main] ERROR org.apache.pulsar.client.api.RetryTopicTest - consumer received message : 4:0:-1:0 {}
2021-10-21T15:03:12,909+0800 [main] ERROR org.apache.pulsar.client.api.RetryTopicTest - consumer received message : 3:1:-1 {REAL_TOPIC=persistent://my-property/my-ns/retry-topic, ORIGIN_MESSAGE_IDY_TIME=4:1:-1:0, DELAY_TIME=1000, RECONSUMETIMES=1}
2021-10-21T15:03:12,965+0800 [main] ERROR org.apache.pulsar.client.api.RetryTopicTest - consumer received message : 3:10:-1 {REAL_TOPIC=persistent://my-property/my-ns/retry-topic-my-subscription-RETRY, ORIGIN_MESSAGE_IDY_TIME=3:0:-1, DELAY_TIME=1000, RECONSUMETIMES=2}
2021-10-21T15:03:13,026+0800 [main] ERROR org.apache.pulsar.client.api.RetryTopicTest - consumer received message : 3:20:-1 {REAL_TOPIC=persistent://my-property/my-ns/retry-topic-my-subscription-RETRY, ORIGIN_MESSAGE_IDY_TIME=3:10:-1, DELAY_TIME=1000, RECONSUMETIMES=3}
```
Expected Results (REAL_TOPIC ORIGIN_MESSAGE_ID property equals the frist origin message)
```
2021-10-21T15:27:01,390+0800 [main] ERROR org.apache.pulsar.client.api.RetryTopicTest - consumer received message : 4:0:-1:0 {}
2021-10-21T15:27:01,479+0800 [main] ERROR org.apache.pulsar.client.api.RetryTopicTest - consumer received message : 3:0:-1 {REAL_TOPIC=persistent://my-property/my-ns/retry-topic, ORIGIN_MESSAGE_IDY_TIME=4:0:-1:0, DELAY_TIME=1000, RECONSUMETIMES=1}
2021-10-21T15:27:01,547+0800 [main] ERROR org.apache.pulsar.client.api.RetryTopicTest - consumer received message : 3:10:-1 {REAL_TOPIC=persistent://my-property/my-ns/retry-topic, ORIGIN_MESSAGE_IDY_TIME=4:0:-1:0, DELAY_TIME=1000, RECONSUMETIMES=2}
2021-10-21T15:27:01,603+0800 [main] ERROR org.apache.pulsar.client.api.RetryTopicTest - consumer received message : 3:20:-1 {REAL_TOPIC=persistent://my-property/my-ns/retry-topic, ORIGIN_MESSAGE_IDY_TIME=4:0:-1:0, DELAY_TIME=1000, RECONSUMETIMES=3}
```
### Modifications
https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L652-L653

propertiesMap.put() --> propertiesMap.putIfAbsent()

add one testcase to verify the REAL_TOPIC ORIGIN_MESSAGE_ID property

(cherry picked from commit 8517883)
  • Loading branch information
yangl authored and codelipenghui committed Dec 20, 2021
1 parent 1c3d77d commit 509a59d
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@
*/
package org.apache.pulsar.client.api;

import lombok.Cleanup;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import com.google.common.collect.Sets;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.client.util.RetryMessageUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.concurrent.TimeUnit;

import static org.testng.Assert.assertNull;

@Test(groups = "broker-api")
public class RetryTopicTest extends ProducerConsumerBase {

Expand Down Expand Up @@ -119,6 +120,100 @@ public void testRetryTopic() throws Exception {
checkConsumer.close();
}

@Test
public void testRetryTopicProperties() throws Exception {
final String topic = "persistent://my-property/my-ns/retry-topic";

final int maxRedeliveryCount = 3;

final int sendMessages = 10;

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
.receiverQueueSize(100)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

@Cleanup
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);
Consumer<byte[]> deadLetterConsumer = newPulsarClient.newConsumer(Schema.BYTES)
.topic("persistent://my-property/my-ns/retry-topic-my-subscription-DLQ")
.subscriptionName("my-subscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topic)
.create();

Set<String> originMessageIds = Sets.newHashSet();
for (int i = 0; i < sendMessages; i++) {
MessageId msgId = producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
originMessageIds.add(msgId.toString());
}

producer.close();

int totalReceived = 0;
Set<String> retryMessageIds = Sets.newHashSet();
do {
Message<byte[]> message = consumer.receive();
log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
// retry message
if (message.hasProperty(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) {
// check the REAL_TOPIC property
assertEquals(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC), topic);
retryMessageIds.add(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID));
}
consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
totalReceived++;
} while (totalReceived < sendMessages * (maxRedeliveryCount + 1));

// check the REAL_TOPIC property
assertEquals(retryMessageIds, originMessageIds);

int totalInDeadLetter = 0;
Set<String> deadLetterMessageIds = Sets.newHashSet();
do {
Message message = deadLetterConsumer.receive();
log.info("dead letter consumer received message : {} {}", message.getMessageId(),
new String(message.getData()));
// dead letter message
if (message.hasProperty(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) {
// check the REAL_TOPIC property
assertEquals(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC), topic);
deadLetterMessageIds.add(message.getProperty(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID));
}
deadLetterConsumer.acknowledge(message);
totalInDeadLetter++;
} while (totalInDeadLetter < sendMessages);

assertEquals(deadLetterMessageIds, originMessageIds);

deadLetterConsumer.close();
consumer.close();

Consumer<byte[]> checkConsumer = this.pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

Message<byte[]> checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
if (checkMessage != null) {
log.info("check consumer received message : {} {}", checkMessage.getMessageId(),
new String(checkMessage.getData()));
}
assertNull(checkMessage);

checkConsumer.close();
}

//Issue 9327: do compatibility check in case of the default retry and dead letter topic name changed
@Test
public void testRetryTopicNameForCompatibility () throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,8 +649,8 @@ private SortedMap<String, String> getPropertiesMap(Message<?> message, String or
if (message.getProperties() != null) {
propertiesMap.putAll(message.getProperties());
}
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, originTopicNameStr);
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr);
propertiesMap.putIfAbsent(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, originTopicNameStr);
propertiesMap.putIfAbsent(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr);
return propertiesMap;
}

Expand Down

0 comments on commit 509a59d

Please sign in to comment.