diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java index 229ecd5195221..8e704e58fc368 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.api; +import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; @@ -39,9 +40,11 @@ import lombok.Cleanup; import lombok.Data; import org.apache.avro.reflect.Nullable; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.impl.ConsumerBuilderImpl; import org.apache.pulsar.client.util.RetryMessageUtil; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1019,4 +1022,94 @@ public void testDeadLetterPolicyDeserialize() throws Exception { consumerBuilder.loadConf(config); assertEquals(((ConsumerBuilderImpl)consumerBuilder).getConf().getDeadLetterPolicy(), policy); } + + @Data + static class Payload { + String number; + + public Payload() { + + } + + public Payload(String number) { + this.number = number; + } + } + + @Data + static class PayloadIncompatible { + long number; + + public PayloadIncompatible() { + + } + + public PayloadIncompatible(long number) { + this.number = number; + } + } + + // reproduce issue reported in https://github.com/apache/pulsar/issues/20635#issuecomment-1709616321 + @Test + public void testCloseDeadLetterTopicProducerOnExceptionToPreventProducerLeak() throws Exception { + String namespace = BrokerTestUtil.newUniqueName("my-property/my-ns"); + admin.namespaces().createNamespace(namespace); + // don't enforce schema validation + admin.namespaces().setSchemaValidationEnforced(namespace, false); + // set schema compatibility strategy to always compatible + admin.namespaces().setSchemaCompatibilityStrategy(namespace, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + + Schema schema = Schema.AVRO(Payload.class); + Schema schemaIncompatible = Schema.AVRO(PayloadIncompatible.class); + String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace + + "/testCloseDeadLetterTopicProducerOnExceptionToPreventProducerLeak"); + String dlqTopic = topic + "-DLQ"; + + // create topics + admin.topics().createNonPartitionedTopic(topic); + admin.topics().createNonPartitionedTopic(dlqTopic); + + AtomicInteger nackCounter = new AtomicInteger(0); + Consumer payloadConsumer = null; + try { + payloadConsumer = pulsarClient.newConsumer(schema).topic(topic) + .subscriptionType(SubscriptionType.Shared).subscriptionName("sub") + .ackTimeout(1, TimeUnit.SECONDS) + .negativeAckRedeliveryDelay(1, TimeUnit.MILLISECONDS) + .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(3).deadLetterTopic(dlqTopic).build()) + .messageListener((c, msg) -> { + if (nackCounter.incrementAndGet() < 10) { + c.negativeAcknowledge(msg); + } + }).subscribe(); + + // send a message to the topic with the incompatible schema + PayloadIncompatible payloadIncompatible = new PayloadIncompatible(123); + try (Producer producer = pulsarClient.newProducer(schemaIncompatible).topic(topic) + .create()) { + producer.send(payloadIncompatible); + } + + Thread.sleep(2000L); + + assertThat(pulsar.getBrokerService().getTopicReference(dlqTopic).get().getProducers().size()) + .describedAs("producer count of dlq topic %s should be <= 1 so that it doesn't leak producers", + dlqTopic) + .isLessThanOrEqualTo(1); + + } finally { + if (payloadConsumer != null) { + try { + payloadConsumer.close(); + } catch (PulsarClientException e) { + // ignore + } + } + } + + assertThat(pulsar.getBrokerService().getTopicReference(dlqTopic).get().getProducers().size()) + .describedAs("producer count of dlq topic %s should be 0 here", + dlqTopic) + .isEqualTo(0); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java index cd598585c8e87..91b97fa475817 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java @@ -18,12 +18,12 @@ */ package org.apache.pulsar.client.api; +import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; -import java.lang.reflect.Field; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -36,11 +36,10 @@ import lombok.Data; import org.apache.avro.AvroRuntimeException; import org.apache.avro.reflect.Nullable; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.client.api.schema.GenericRecord; -import org.apache.pulsar.client.impl.ConsumerImpl; -import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.util.RetryMessageUtil; -import org.reflections.ReflectionUtils; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; @@ -617,10 +616,12 @@ public void testRetryTopicByCustomTopicName() throws Exception { @Test(timeOut = 30000L) public void testRetryTopicException() throws Exception { - final String topic = "persistent://my-property/my-ns/retry-topic"; + String retryLetterTopic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/retry-topic"); + final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/input-topic"); final int maxRedeliveryCount = 2; final int sendMessages = 1; // subscribe before publish + @Cleanup Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) .topic(topic) .subscriptionName("my-subscription") @@ -629,7 +630,7 @@ public void testRetryTopicException() throws Exception { .receiverQueueSize(100) .deadLetterPolicy(DeadLetterPolicy.builder() .maxRedeliverCount(maxRedeliveryCount) - .retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry") + .retryLetterTopic(retryLetterTopic) .build()) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe(); @@ -642,30 +643,16 @@ public void testRetryTopicException() throws Exception { } producer.close(); - // mock a retry producer exception when reconsumelater is called - MultiTopicsConsumerImpl multiTopicsConsumer = (MultiTopicsConsumerImpl) consumer; - List> consumers = multiTopicsConsumer.getConsumers(); - for (ConsumerImpl c : consumers) { - Set deadLetterPolicyField = - ReflectionUtils.getAllFields(c.getClass(), ReflectionUtils.withName("deadLetterPolicy")); - - if (deadLetterPolicyField.size() != 0) { - Field field = deadLetterPolicyField.iterator().next(); - field.setAccessible(true); - DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) field.get(c); - deadLetterPolicy.setRetryLetterTopic("#persistent://invlaid-topic#"); - } - } + admin.topics().terminateTopic(retryLetterTopic); + Message message = consumer.receive(); log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData())); try { consumer.reconsumeLater(message, 1, TimeUnit.SECONDS); - } catch (PulsarClientException.InvalidTopicNameException e) { - assertEquals(e.getClass(), PulsarClientException.InvalidTopicNameException.class); - } catch (Exception e) { - fail("exception should be PulsarClientException.InvalidTopicNameException"); + fail("exception should be PulsarClientException.TopicTerminatedException"); + } catch (PulsarClientException.TopicTerminatedException e) { + // ok } - consumer.close(); } @@ -718,10 +705,12 @@ public void testRetryProducerWillCloseByConsumer() throws Exception { @Test(timeOut = 30000L) public void testRetryTopicExceptionWithConcurrent() throws Exception { - final String topic = "persistent://my-property/my-ns/retry-topic"; + String retryLetterTopic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/retry-topic"); + final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/input-topic"); final int maxRedeliveryCount = 2; final int sendMessages = 10; // subscribe before publish + @Cleanup Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) .topic(topic) .subscriptionName("my-subscription") @@ -730,7 +719,7 @@ public void testRetryTopicExceptionWithConcurrent() throws Exception { .receiverQueueSize(100) .deadLetterPolicy(DeadLetterPolicy.builder() .maxRedeliverCount(maxRedeliveryCount) - .retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry") + .retryLetterTopic(retryLetterTopic) .build()) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe(); @@ -739,24 +728,11 @@ public void testRetryTopicExceptionWithConcurrent() throws Exception { .topic(topic) .create(); for (int i = 0; i < sendMessages; i++) { - producer.newMessage().key("1").value(String.format("Hello Pulsar [%d]", i).getBytes()).send(); + producer.send(String.format("Hello Pulsar [%d]", i).getBytes()); } producer.close(); - // mock a retry producer exception when reconsumelater is called - MultiTopicsConsumerImpl multiTopicsConsumer = (MultiTopicsConsumerImpl) consumer; - List> consumers = multiTopicsConsumer.getConsumers(); - for (ConsumerImpl c : consumers) { - Set deadLetterPolicyField = - ReflectionUtils.getAllFields(c.getClass(), ReflectionUtils.withName("deadLetterPolicy")); - - if (deadLetterPolicyField.size() != 0) { - Field field = deadLetterPolicyField.iterator().next(); - field.setAccessible(true); - DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) field.get(c); - deadLetterPolicy.setRetryLetterTopic("#persistent://invalid-topic#"); - } - } + admin.topics().terminateTopic(retryLetterTopic); List> messages = Lists.newArrayList(); for (int i = 0; i < sendMessages; i++) { @@ -769,16 +745,114 @@ public void testRetryTopicExceptionWithConcurrent() throws Exception { new Thread(() -> { try { consumer.reconsumeLater(message, 1, TimeUnit.SECONDS); - } catch (Exception ignore) { - - } finally { + } catch (PulsarClientException.TopicTerminatedException e) { + // ok latch.countDown(); + } catch (PulsarClientException e) { + // unexpected exception + fail("unexpected exception", e); } }).start(); } - latch.await(); + latch.await(sendMessages, TimeUnit.SECONDS); consumer.close(); } + @Data + static class Payload { + String number; + + public Payload() { + + } + + public Payload(String number) { + this.number = number; + } + } + + @Data + static class PayloadIncompatible { + long number; + + public PayloadIncompatible() { + + } + + public PayloadIncompatible(long number) { + this.number = number; + } + } + + // reproduce similar issue as reported in https://github.com/apache/pulsar/issues/20635#issuecomment-1709616321 + // but for retry topic + @Test + public void testCloseRetryLetterTopicProducerOnExceptionToPreventProducerLeak() throws Exception { + String namespace = BrokerTestUtil.newUniqueName("my-property/my-ns"); + admin.namespaces().createNamespace(namespace); + // don't enforce schema validation + admin.namespaces().setSchemaValidationEnforced(namespace, false); + // set schema compatibility strategy to always compatible + admin.namespaces().setSchemaCompatibilityStrategy(namespace, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + + Schema schema = Schema.AVRO(Payload.class); + Schema schemaIncompatible = Schema.AVRO( + PayloadIncompatible.class); + String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace + + "/testCloseDeadLetterTopicProducerOnExceptionToPreventProducerLeak"); + String dlqTopic = topic + "-DLQ"; + String retryTopic = topic + "-RETRY"; + + // create topics + admin.topics().createNonPartitionedTopic(topic); + admin.topics().createNonPartitionedTopic(dlqTopic); + admin.topics().createNonPartitionedTopic(retryTopic); + + Consumer payloadConsumer = null; + try { + payloadConsumer = pulsarClient.newConsumer(schema).topic(topic) + .subscriptionType(SubscriptionType.Shared).subscriptionName("sub") + .ackTimeout(1, TimeUnit.SECONDS) + .negativeAckRedeliveryDelay(1, TimeUnit.MILLISECONDS) + .enableRetry(true) + .deadLetterPolicy(DeadLetterPolicy.builder().retryLetterTopic(retryTopic).maxRedeliverCount(3) + .deadLetterTopic(dlqTopic).build()) + .messageListener((c, msg) -> { + try { + c.reconsumeLater(msg, 1, TimeUnit.MILLISECONDS); + } catch (PulsarClientException e) { + throw new RuntimeException(e); + } + }).subscribe(); + + // send a message to the topic with the incompatible schema + PayloadIncompatible payloadIncompatible = new PayloadIncompatible(123); + try (Producer producer = pulsarClient.newProducer(schemaIncompatible).topic(topic) + .create()) { + producer.send(payloadIncompatible); + } + + Thread.sleep(2000L); + + assertThat(pulsar.getBrokerService().getTopicReference(retryTopic).get().getProducers().size()) + .describedAs("producer count of retry topic %s should be <= 1 so that it doesn't leak producers", + retryTopic) + .isLessThanOrEqualTo(1); + + } finally { + if (payloadConsumer != null) { + try { + payloadConsumer.close(); + } catch (PulsarClientException e) { + // ignore + } + } + } + + assertThat(pulsar.getBrokerService().getTopicReference(retryTopic).get().getProducers().size()) + .describedAs("producer count of retry topic %s should be 0 here", + retryTopic) + .isEqualTo(0); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 998e8d70676b2..ceb1ab27be118 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -65,6 +65,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import lombok.AccessLevel; import lombok.Getter; @@ -196,8 +197,9 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private final DeadLetterPolicy deadLetterPolicy; private volatile CompletableFuture> deadLetterProducer; - + private volatile int deadLetterProducerFailureCount; private volatile CompletableFuture> retryLetterProducer; + private volatile int retryLetterProducerFailureCount; private final ReadWriteLock createProducerLock = new ReentrantReadWriteLock(); protected volatile boolean paused; @@ -632,9 +634,8 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a return FutureUtil.failedFuture(exception); } - initRetryLetterProducerIfNeeded(); CompletableFuture result = new CompletableFuture<>(); - if (retryLetterProducer != null) { + if (initRetryLetterProducerIfNeeded() != null) { try { MessageImpl retryMessage = (MessageImpl) getMessageImpl(message); String originMessageIdStr = message.getMessageId().toString(); @@ -657,50 +658,59 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a MessageId finalMessageId = messageId; if (reconsumeTimes > this.deadLetterPolicy.getMaxRedeliverCount() && StringUtils.isNotBlank(deadLetterPolicy.getDeadLetterTopic())) { - initDeadLetterProducerIfNeeded(); - deadLetterProducer.thenAcceptAsync(dlqProducer -> { - TypedMessageBuilder typedMessageBuilderNew = - dlqProducer.newMessage(Schema.AUTO_PRODUCE_BYTES(retryMessage.getReaderSchema().get())) - .value(retryMessage.getData()) - .properties(propertiesMap); - copyMessageKeysIfNeeded(message, typedMessageBuilderNew); - typedMessageBuilderNew.sendAsync().thenAccept(msgId -> { - doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null).thenAccept(v -> { - result.complete(null); + initDeadLetterProducerIfNeeded().thenAcceptAsync(dlqProducer -> { + try { + TypedMessageBuilder typedMessageBuilderNew = + dlqProducer.newMessage( + Schema.AUTO_PRODUCE_BYTES(retryMessage.getReaderSchema().get())) + .value(retryMessage.getData()) + .properties(propertiesMap); + copyMessageKeysIfNeeded(message, typedMessageBuilderNew); + typedMessageBuilderNew.sendAsync().thenAccept(msgId -> { + doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null).thenAccept(v -> { + result.complete(null); + }).exceptionally(ex -> { + result.completeExceptionally(ex); + return null; + }); }).exceptionally(ex -> { result.completeExceptionally(ex); return null; }); - }).exceptionally(ex -> { - result.completeExceptionally(ex); - return null; - }); + } catch (Exception e) { + result.completeExceptionally(e); + } }, internalPinnedExecutor).exceptionally(ex -> { result.completeExceptionally(ex); - deadLetterProducer = null; return null; }); } else { assert retryMessage != null; - retryLetterProducer.thenAcceptAsync(rtlProducer -> { - TypedMessageBuilder typedMessageBuilderNew = rtlProducer - .newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get())) - .value(retryMessage.getData()) - .properties(propertiesMap); - if (delayTime > 0) { - typedMessageBuilderNew.deliverAfter(delayTime, unit); + initRetryLetterProducerIfNeeded().thenAcceptAsync(rtlProducer -> { + try { + TypedMessageBuilder typedMessageBuilderNew = rtlProducer + .newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get())) + .value(retryMessage.getData()) + .properties(propertiesMap); + if (delayTime > 0) { + typedMessageBuilderNew.deliverAfter(delayTime, unit); + } + copyMessageKeysIfNeeded(message, typedMessageBuilderNew); + typedMessageBuilderNew.sendAsync() + .thenCompose( + __ -> doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null)) + .thenAccept(v -> { + result.complete(null); + }) + .exceptionally(ex -> { + result.completeExceptionally(ex); + return null; + }); + } catch (Exception e) { + result.completeExceptionally(e); } - copyMessageKeysIfNeeded(message, typedMessageBuilderNew); - typedMessageBuilderNew.sendAsync() - .thenCompose(__ -> doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null)) - .thenAccept(v -> result.complete(null)) - .exceptionally(ex -> { - result.completeExceptionally(ex); - return null; - }); }, internalPinnedExecutor).exceptionally(ex -> { result.completeExceptionally(ex); - retryLetterProducer = null; return null; }); } @@ -1044,10 +1054,29 @@ public void connectionFailed(PulsarClientException exception) { public synchronized CompletableFuture closeAsync() { CompletableFuture closeFuture = new CompletableFuture<>(); + ArrayList> closeFutures = new ArrayList<>(4); + closeFutures.add(closeFuture); + if (retryLetterProducer != null) { + closeFutures.add(retryLetterProducer.thenCompose(p -> p.closeAsync()).whenComplete((ignore, ex) -> { + if (ex != null) { + log.warn("Exception ignored in closing retryLetterProducer of consumer", ex); + } + })); + } + if (deadLetterProducer != null) { + closeFutures.add(deadLetterProducer.thenCompose(p -> p.closeAsync()).whenComplete((ignore, ex) -> { + if (ex != null) { + log.warn("Exception ignored in closing deadLetterProducer of consumer", ex); + } + })); + } + CompletableFuture compositeCloseFuture = FutureUtil.waitForAll(closeFutures); + + if (getState() == State.Closing || getState() == State.Closed) { closeConsumerTasks(); failPendingReceive().whenComplete((r, t) -> closeFuture.complete(null)); - return closeFuture; + return compositeCloseFuture; } if (!isConnected()) { @@ -1057,7 +1086,7 @@ public synchronized CompletableFuture closeAsync() { deregisterFromClientCnx(); client.cleanupConsumer(this); failPendingReceive().whenComplete((r, t) -> closeFuture.complete(null)); - return closeFuture; + return compositeCloseFuture; } stats.getStatTimeout().ifPresent(Timeout::cancel); @@ -1084,23 +1113,7 @@ public synchronized CompletableFuture closeAsync() { }); } - ArrayList> closeFutures = new ArrayList<>(4); - closeFutures.add(closeFuture); - if (retryLetterProducer != null) { - closeFutures.add(retryLetterProducer.thenCompose(p -> p.closeAsync()).whenComplete((ignore, ex) -> { - if (ex != null) { - log.warn("Exception ignored in closing retryLetterProducer of consumer", ex); - } - })); - } - if (deadLetterProducer != null) { - closeFutures.add(deadLetterProducer.thenCompose(p -> p.closeAsync()).whenComplete((ignore, ex) -> { - if (ex != null) { - log.warn("Exception ignored in closing deadLetterProducer of consumer", ex); - } - })); - } - return FutureUtil.waitForAll(closeFutures); + return compositeCloseFuture; } private void cleanupAtClose(CompletableFuture closeFuture, Throwable exception) { @@ -2133,47 +2146,54 @@ private CompletableFuture processPossibleToDLQ(MessageIdAdv messageId) } CompletableFuture result = new CompletableFuture<>(); if (deadLetterMessages != null) { - initDeadLetterProducerIfNeeded(); List> finalDeadLetterMessages = deadLetterMessages; - deadLetterProducer.thenAcceptAsync(producerDLQ -> { + initDeadLetterProducerIfNeeded().thenAcceptAsync(producerDLQ -> { for (MessageImpl message : finalDeadLetterMessages) { - String originMessageIdStr = message.getMessageId().toString(); - String originTopicNameStr = getOriginTopicNameStr(message); - TypedMessageBuilder typedMessageBuilderNew = - producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get())) - .value(message.getData()) - .properties(getPropertiesMap(message, originMessageIdStr, originTopicNameStr)); - copyMessageKeysIfNeeded(message, typedMessageBuilderNew); - typedMessageBuilderNew.sendAsync() - .thenAccept(messageIdInDLQ -> { - possibleSendToDeadLetterTopicMessages.remove(messageId); - acknowledgeAsync(messageId).whenComplete((v, ex) -> { - if (ex != null) { - log.warn("[{}] [{}] [{}] Failed to acknowledge the message {} of the original" - + " topic but send to the DLQ successfully.", - topicName, subscription, consumerName, messageId, ex); - result.complete(false); + try { + String originMessageIdStr = message.getMessageId().toString(); + String originTopicNameStr = getOriginTopicNameStr(message); + TypedMessageBuilder typedMessageBuilderNew = + producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get())) + .value(message.getData()) + .properties(getPropertiesMap(message, originMessageIdStr, originTopicNameStr)); + copyMessageKeysIfNeeded(message, typedMessageBuilderNew); + typedMessageBuilderNew.sendAsync() + .thenAccept(messageIdInDLQ -> { + possibleSendToDeadLetterTopicMessages.remove(messageId); + acknowledgeAsync(messageId).whenComplete((v, ex) -> { + if (ex != null) { + log.warn( + "[{}] [{}] [{}] Failed to acknowledge the message {} of the " + + "original topic but send to the DLQ successfully.", + topicName, subscription, consumerName, messageId, ex); + result.complete(false); + } else { + result.complete(true); + } + }); + }).exceptionally(ex -> { + if (ex instanceof PulsarClientException.ProducerQueueIsFullError) { + log.warn( + "[{}] [{}] [{}] Failed to send DLQ message to {} for message id {}: {}", + topicName, subscription, consumerName, + deadLetterPolicy.getDeadLetterTopic(), messageId, ex.getMessage()); } else { - result.complete(true); + log.warn("[{}] [{}] [{}] Failed to send DLQ message to {} for message id {}", + topicName, subscription, consumerName, + deadLetterPolicy.getDeadLetterTopic(), messageId, ex); } + result.complete(false); + return null; }); - }).exceptionally(ex -> { - if (ex instanceof PulsarClientException.ProducerQueueIsFullError) { - log.warn("[{}] [{}] [{}] Failed to send DLQ message to {} for message id {}: {}", - topicName, subscription, consumerName, - deadLetterPolicy.getDeadLetterTopic(), messageId, ex.getMessage()); - } else { - log.warn("[{}] [{}] [{}] Failed to send DLQ message to {} for message id {}", - topicName, subscription, consumerName, - deadLetterPolicy.getDeadLetterTopic(), messageId, ex); - } - result.complete(false); - return null; - }); + } catch (Exception e) { + log.warn("[{}] [{}] [{}] Failed to send DLQ message to {} for message id {}", + topicName, subscription, consumerName, deadLetterPolicy.getDeadLetterTopic(), messageId, + e); + result.complete(false); + } } }, internalPinnedExecutor).exceptionally(ex -> { log.error("Dead letter producer exception with topic: {}", deadLetterPolicy.getDeadLetterTopic(), ex); - deadLetterProducer = null; result.complete(false); return null; }); @@ -2183,43 +2203,107 @@ private CompletableFuture processPossibleToDLQ(MessageIdAdv messageId) return result; } - private void initDeadLetterProducerIfNeeded() { - if (deadLetterProducer == null) { + private CompletableFuture> initDeadLetterProducerIfNeeded() { + CompletableFuture> p = deadLetterProducer; + if (p == null || p.isCompletedExceptionally()) { createProducerLock.writeLock().lock(); try { - if (deadLetterProducer == null) { - deadLetterProducer = - ((ProducerBuilderImpl) client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema))) - .initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName()) - .topic(this.deadLetterPolicy.getDeadLetterTopic()) - .blockIfQueueFull(false) - .enableBatching(false) - .enableChunking(true) - .createAsync(); + p = deadLetterProducer; + if (p == null || p.isCompletedExceptionally()) { + p = createProducerWithBackOff(() -> { + CompletableFuture> newProducer = + ((ProducerBuilderImpl) client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema))) + .initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName()) + .topic(this.deadLetterPolicy.getDeadLetterTopic()) + .blockIfQueueFull(false) + .enableBatching(false) + .enableChunking(true) + .createAsync(); + newProducer.whenComplete((producer, ex) -> { + if (ex != null) { + log.error("[{}] [{}] [{}] Failed to create dead letter producer for topic {}", + topicName, subscription, consumerName, deadLetterPolicy.getDeadLetterTopic(), + ex); + deadLetterProducerFailureCount++; + } else { + deadLetterProducerFailureCount = 0; + } + }); + return newProducer; + }, deadLetterProducerFailureCount, () -> "dead letter producer (topic: " + + deadLetterPolicy.getDeadLetterTopic() + ")"); + deadLetterProducer = p; } } finally { createProducerLock.writeLock().unlock(); } } + return p; } - private void initRetryLetterProducerIfNeeded() { - if (retryLetterProducer == null) { + private CompletableFuture> createProducerWithBackOff( + Supplier>> producerSupplier, int failureCount, + Supplier logDescription) { + if (failureCount == 0) { + return producerSupplier.get(); + } else { + // calculate backoff time for given failure count + Backoff backoff = new BackoffBuilder() + .setInitialTime(100, TimeUnit.MILLISECONDS) + .setMandatoryStop(client.getConfiguration().getOperationTimeoutMs() * 2, + TimeUnit.MILLISECONDS) + .setMax(1, TimeUnit.MINUTES) + .create(); + long backoffTimeMillis = 0; + for (int i = 0; i < failureCount; i++) { + backoffTimeMillis = backoff.next(); + } + CompletableFuture> newProducer = new CompletableFuture<>(); + ScheduledExecutorService executor = + (ScheduledExecutorService) client.getScheduledExecutorProvider().getExecutor(this); + log.info("Creating {} with backoff time of {} ms", logDescription.get(), backoffTimeMillis); + executor.schedule(() -> { + FutureUtil.completeAfter(newProducer, producerSupplier.get()); + }, backoffTimeMillis, TimeUnit.MILLISECONDS); + return newProducer; + } + } + + private CompletableFuture> initRetryLetterProducerIfNeeded() { + CompletableFuture> p = retryLetterProducer; + if (p == null || p.isCompletedExceptionally()) { createProducerLock.writeLock().lock(); try { - if (retryLetterProducer == null) { - retryLetterProducer = client - .newProducer(Schema.AUTO_PRODUCE_BYTES(schema)) - .topic(this.deadLetterPolicy.getRetryLetterTopic()) - .enableBatching(false) - .enableChunking(true) - .blockIfQueueFull(false) - .createAsync(); + p = retryLetterProducer; + if (p == null || p.isCompletedExceptionally()) { + p = createProducerWithBackOff(() -> { + CompletableFuture> newProducer = client + .newProducer(Schema.AUTO_PRODUCE_BYTES(schema)) + .topic(this.deadLetterPolicy.getRetryLetterTopic()) + .enableBatching(false) + .enableChunking(true) + .blockIfQueueFull(false) + .createAsync(); + newProducer.whenComplete((producer, ex) -> { + if (ex != null) { + log.error("[{}] [{}] [{}] Failed to create retry letter producer for topic {}", + topicName, subscription, consumerName, deadLetterPolicy.getRetryLetterTopic(), + ex); + retryLetterProducerFailureCount++; + } else { + retryLetterProducerFailureCount = 0; + } + }); + return newProducer; + }, retryLetterProducerFailureCount, () -> "retry letter producer (topic: " + + deadLetterPolicy.getRetryLetterTopic() + ")"); + retryLetterProducer = p; } } finally { createProducerLock.writeLock().unlock(); } } + return p; } @Override diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 60c295c66e84e..2703d5ba209b8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -633,7 +633,14 @@ public CompletableFuture closeAsync() { CompletableFuture closeFuture = new CompletableFuture<>(); List> futureList = consumers.values().stream() - .map(ConsumerImpl::closeAsync).collect(Collectors.toList()); + .map(consumer -> consumer.closeAsync().exceptionally(t -> { + Throwable cause = FutureUtil.unwrapCompletionException(t); + if (!(cause instanceof PulsarClientException.AlreadyClosedException)) { + log.warn("[{}] [{}] Error closing individual consumer", consumer.getTopic(), + consumer.getSubscription(), cause); + } + return null; + })).collect(Collectors.toList()); FutureUtil.waitForAll(futureList) .thenComposeAsync((r) -> { diff --git a/pulsar-client/src/main/resources/findbugsExclude.xml b/pulsar-client/src/main/resources/findbugsExclude.xml index 92ec9e934ee1e..03eaca2b34929 100644 --- a/pulsar-client/src/main/resources/findbugsExclude.xml +++ b/pulsar-client/src/main/resources/findbugsExclude.xml @@ -1012,4 +1012,8 @@ + + + +