Skip to content

Commit

Permalink
fix bugs and improve logs
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode committed Jul 3, 2024
1 parent c390775 commit 24b59e8
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
Expand Down Expand Up @@ -1024,17 +1025,17 @@ public void testAutoUnsubscribePatternConsumer() throws Exception {

// 6. remove producer 1,3; verify only consumer 2 left
// seems no direct way to verify auto-unsubscribe, because this patternConsumer also referenced the topic.
List<String> topicNames = Lists.newArrayList(topicName2);
String tp2p0 = TopicName.get(topicName2).getPartition(0).toString();
String tp2p1 = TopicName.get(topicName2).getPartition(1).toString();
List<String> topicNames = Lists.newArrayList(tp2p0, tp2p1);
NamespaceService nss = pulsar.getNamespaceService();
doReturn(CompletableFuture.completedFuture(topicNames)).when(nss)
.getListOfPersistentTopics(NamespaceName.get("my-property/my-ns"));

// 7. call recheckTopics to unsubscribe topic 1,3, verify topics number: 2=6-1-3
log.debug("recheck topics change");
PatternMultiTopicsConsumerImpl<byte[]> consumer1 = ((PatternMultiTopicsConsumerImpl<byte[]>) consumer);
Timeout recheckPatternTimeout = spy(consumer1.getRecheckPatternTimeout());
doReturn(false).when(recheckPatternTimeout).isCancelled();
consumer1.run(recheckPatternTimeout);
PatternConsumerUpdateQueue taskQueue = WhiteboxImpl.getInternalState(consumer, "updateTaskQueue");
taskQueue.appendRecheckOp();
Thread.sleep(100);
assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getPartitions().size(), 2);
assertEquals(((PatternMultiTopicsConsumerImpl<byte[]>) consumer).getConsumers().size(), 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ synchronized void appendRecheckOp() {

synchronized void doAppend(Pair<UpdateSubscriptionType, Collection<String>> task) {
if (log.isDebugEnabled()) {
log.debug("[{} {}] Pattern consumer [{}] try to append task", task.getLeft(),
task.getRight() == null ? "" : task.getRight(), patternConsumer.getSubscription());
log.debug("Pattern consumer [{}] try to append task. {} {}", patternConsumer.getSubscription(),
task.getLeft(), task.getRight() == null ? "" : task.getRight());
}
// Once there is a recheck task in queue, it means other tasks can be skipped.
if (recheckTaskInQueue) {
Expand Down Expand Up @@ -192,21 +192,25 @@ synchronized void triggerNextTask() {
}
}
if (log.isDebugEnabled()) {
log.debug("[{} {}] Pattern consumer [{}] updating subscriptions", task.getLeft(),
task.getRight() == null ? "" : task.getRight(), patternConsumer.getSubscription());
log.debug("Pattern consumer [{}] starting task. {} {} ", patternConsumer.getSubscription(),
task.getLeft(), task.getRight() == null ? "" : task.getRight());
}
// Trigger next pending task.
taskInProgress = Pair.of(task.getLeft(), newTaskFuture);
newTaskFuture.thenAccept(ignore -> {
triggerNextTask();
if (log.isDebugEnabled()) {
log.debug("Pattern consumer [{}] task finished. {} {} ", patternConsumer.getSubscription(),
task.getLeft(), task.getRight() == null ? "" : task.getRight());
}
triggerNextTask();
}).exceptionally(ex -> {
/**
* Once a updating fails, trigger a delayed new recheck task to guarantee all things is correct.
* - Skip if there is already a recheck task in queue.
* - Skip if the last recheck task has been executed after the current time.
*/
log.error("[{} {}] Pattern consumer [{}] failed to update subscriptions", task.getLeft(), task.getRight(),
patternConsumer.getSubscription(), ex);
log.error("Pattern consumer [{}] task finished. {} {}. But it failed", patternConsumer.getSubscription(),
task.getLeft(), task.getRight() == null ? "" : task.getRight(), ex);
// Skip if there is already a recheck task in queue.
synchronized (PatternConsumerUpdateQueue.this) {
if (recheckTaskInQueue || PatternConsumerUpdateQueue.this.closed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,13 @@ public PatternMultiTopicsConsumerImpl(Pattern topicsPattern,
watcherFuture
.thenAccept(__ -> recheckPatternTimeout.cancel())
.exceptionally(ex -> {
log.warn("Unable to create topic list watcher. Falling back to only polling for new topics", ex);
log.warn("Pattern consumer [{}] unable to create topic list watcher. Falling back to only polling"
+ " for new topics", conf.getSubscriptionName(), ex);
return null;
});
} else {
log.debug("Not creating topic list watcher for subscription mode {}", subscriptionMode);
log.debug("Pattern consumer [{}] not creating topic list watcher for subscription mode {}",
conf.getSubscriptionName(), subscriptionMode);
watcherFuture.complete(null);
}
}
Expand All @@ -133,17 +135,7 @@ private void recheckTopicsChangeAfterReconnect() {
return;
}
// Do check.
recheckTopicsChange().whenComplete((ignore, ex) -> {
if (ex != null) {
log.warn("[{}] Failed to recheck topics change: {}", topic, ex.getMessage());
long delayMs = recheckPatternTaskBackoff.next();
client.timer().newTimeout(timeout -> {
recheckTopicsChangeAfterReconnect();
}, delayMs, TimeUnit.MILLISECONDS);
} else {
recheckPatternTaskBackoff.reset();
}
});
updateTaskQueue.appendRecheckOp();
}

// TimerTask to recheck topics change, and trigger subscribe/unsubscribe based on the change.
Expand All @@ -168,7 +160,9 @@ CompletableFuture<Void> recheckTopicsChange() {
return CompletableFuture.completedFuture(null);
}
if (log.isDebugEnabled()) {
log.debug("Get topics under namespace {}, topics.size: {}, topicsHash: {}, filtered: {}",
log.debug("Pattern consumer [{}] get topics under namespace {}, topics.size: {},"
+ " topicsHash: {}, filtered: {}",
PatternMultiTopicsConsumerImpl.this.getSubscription(),
namespaceName, getTopicsResult.getTopics().size(), getTopicsResult.getTopicsHash(),
getTopicsResult.isFiltered());
getTopicsResult.getTopics().forEach(topicName ->
Expand All @@ -177,7 +171,7 @@ CompletableFuture<Void> recheckTopicsChange() {

final List<String> oldTopics = new ArrayList<>(getPartitions());
return updateSubscriptions(topicsPattern, this::setTopicsHash, getTopicsResult,
topicsChangeListener, oldTopics);
topicsChangeListener, oldTopics, subscription);
}
});
}
Expand All @@ -186,7 +180,8 @@ static CompletableFuture<Void> updateSubscriptions(Pattern topicsPattern,
java.util.function.Consumer<String> topicsHashSetter,
GetTopicsResult getTopicsResult,
TopicsChangedListener topicsChangedListener,
List<String> oldTopics) {
List<String> oldTopics,
String subscriptionForLog) {
topicsHashSetter.accept(getTopicsResult.getTopicsHash());
if (!getTopicsResult.isChanged()) {
return CompletableFuture.completedFuture(null);
Expand All @@ -200,8 +195,16 @@ static CompletableFuture<Void> updateSubscriptions(Pattern topicsPattern,
}

final List<CompletableFuture<?>> listenersCallback = new ArrayList<>(2);
listenersCallback.add(topicsChangedListener.onTopicsAdded(TopicList.minus(newTopics, oldTopics)));
listenersCallback.add(topicsChangedListener.onTopicsRemoved(TopicList.minus(oldTopics, newTopics)));
Set<String> topicsAdded = TopicList.minus(newTopics, oldTopics);
Set<String> topicsRemoved = TopicList.minus(oldTopics, newTopics);
log.info("Pattern consumer [{}] Recheck pattern consumer's topics. topicsAdded: {}, topicsRemoved: []",
subscriptionForLog, topicsAdded, topicsRemoved);
if (!topicsAdded.isEmpty()) {
listenersCallback.add(topicsChangedListener.onTopicsAdded(topicsAdded));
}
if (!topicsRemoved.isEmpty()) {
listenersCallback.add(topicsChangedListener.onTopicsRemoved(topicsRemoved));
}
return FutureUtil.waitForAll(Collections.unmodifiableList(listenersCallback));
}

Expand Down Expand Up @@ -244,13 +247,15 @@ public CompletableFuture<Void> onTopicsRemoved(Collection<String> removedTopics)
// Unsubscribe and remove consumers in memory.
List<CompletableFuture<Void>> unsubscribeList = new ArrayList<>(removedTopics.size());
Set<String> partialRemoved = new HashSet<>(removedTopics.size());
Set<String> partialRemovedForLog = new HashSet<>(removedTopics.size());
for (String tp : removedTopics) {
ConsumerImpl<T> consumer = consumers.get(tp);
if (consumer != null) {
CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
consumer.closeAsync().whenComplete((__, ex) -> {
if (ex != null) {
log.error("[{}] Failed to unsubscribe from topics: {}", tp, ex);
log.error("Pattern consumer [{}] failed to unsubscribe from topics: {}",
PatternMultiTopicsConsumerImpl.this.getSubscription(), tp, ex);
unsubscribeFuture.completeExceptionally(ex);
} else {
consumers.remove(tp, consumer);
Expand All @@ -259,11 +264,15 @@ public CompletableFuture<Void> onTopicsRemoved(Collection<String> removedTopics)
});
unsubscribeList.add(unsubscribeFuture);
partialRemoved.add(TopicName.get(tp).getPartitionedTopicName());
partialRemovedForLog.add(TopicName.get(tp).toString());
}
}
log.info("Pattern consumer [{}] remove topics. {}", PatternMultiTopicsConsumerImpl.this.getSubscription(),
partialRemovedForLog);

// Remove partitioned topics in memory.
return FutureUtil.waitForAll(unsubscribeList).whenComplete((__, ex) -> {
return FutureUtil.waitForAll(unsubscribeList).handle((__, ex) -> {
List<String> removedPartitionedTopicsForLog = new ArrayList<>();
for (String groupedTopicRemoved : partialRemoved) {
Integer partitions = partitionedTopics.get(groupedTopicRemoved);
if (partitions != null) {
Expand All @@ -276,10 +285,16 @@ public CompletableFuture<Void> onTopicsRemoved(Collection<String> removedTopics)
}
}
if (allPartitionsHasBeenRemoved) {
removedPartitionedTopicsForLog.add(String.format("%s with %s partitions",
groupedTopicRemoved, partitions));
partitionedTopics.remove(groupedTopicRemoved, partitions);
}
}
}
log.info("Pattern consumer [{}] remove partitioned topics because all partitions have been removed. {}",
PatternMultiTopicsConsumerImpl.this.getSubscription(),
removedPartitionedTopicsForLog);
return null;
});
}

Expand All @@ -302,6 +317,7 @@ public CompletableFuture<Void> onTopicsAdded(Collection<String> addedTopics) {
* {@link PatternMultiTopicsConsumerImpl}.
*/
Set<String> groupedTopics = new HashSet<>();
List<String> expendPartitionsForLog = new ArrayList<>();
for (String tp : addedTopics) {
TopicName topicName = TopicName.get(tp);
groupedTopics.add(topicName.getPartitionedTopicName());
Expand All @@ -318,10 +334,12 @@ public CompletableFuture<Void> onTopicsAdded(Collection<String> addedTopics) {
partitionedTopics.put(topicName.getPartitionedTopicName(),
topicName.getPartitionIndex() + 1);
}
expendPartitionsForLog.add(tp);
CompletableFuture consumerFuture = subscribeAsync(tp, PartitionedTopicMetadata.NON_PARTITIONED);
consumerFuture.whenComplete((__, ex) -> {
if (ex != null) {
log.warn("[{}] Failed to subscribe to topics: {}", tp, ex);
log.warn("Pattern consumer [{}] Failed to subscribe to topics: {}",
PatternMultiTopicsConsumerImpl.this.getSubscription(), tp, ex);
}
});
futures.add(consumerFuture);
Expand All @@ -337,11 +355,14 @@ public CompletableFuture<Void> onTopicsAdded(Collection<String> addedTopics) {
CompletableFuture consumerFuture = subscribeAsync(partitionedTopic, false);
consumerFuture.whenComplete((__, ex) -> {
if (ex != null) {
log.warn("[{}] Failed to subscribe to topics: {}", partitionedTopic, ex);
log.warn("Pattern consumer [{}] Failed to subscribe to topics: {}",
PatternMultiTopicsConsumerImpl.this.getSubscription(), partitionedTopic, ex);
}
});
futures.add(consumerFuture);
}
log.info("Pattern consumer [{}] add topics. expend partitions {}, new subscribing {}",
PatternMultiTopicsConsumerImpl.this.getSubscription(), expendPartitionsForLog, groupedTopics);
return FutureUtil.waitForAll(futures);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,19 +585,26 @@ private <T> CompletableFuture<Consumer<T>> patternTopicSubscribeAsync(ConsumerCo
lookup.getTopicsUnderNamespace(namespaceName, subscriptionMode, regex, null)
.thenAccept(getTopicsResult -> {
if (log.isDebugEnabled()) {
log.debug("Get topics under namespace {}, topics.size: {},"
+ " topicsHash: {}, changed: {}, filtered: {}",
log.debug("Pattern consumer [{}] get topics under namespace {}, topics.size: {},"
+ " topicsHash: {}, changed: {}, filtered: {}", conf.getSubscriptionName(),
namespaceName, getTopicsResult.getTopics().size(), getTopicsResult.getTopicsHash(),
getTopicsResult.isChanged(), getTopicsResult.isFiltered());
getTopicsResult.getTopics().forEach(topicName ->
log.debug("Get topics under namespace {}, topic: {}", namespaceName, topicName));
log.debug("Pattern consumer [{}] get topics under namespace {}, topic: {}",
conf.getSubscriptionName(), namespaceName, topicName));
}

List<String> topicsList = getTopicsResult.getTopics();
if (!getTopicsResult.isFiltered()) {
topicsList = TopicList.filterTopics(getTopicsResult.getTopics(), pattern);
}
conf.getTopicNames().addAll(topicsList);

if (log.isInfoEnabled()) {
log.info("Pattern consumer [{}] initialize topics. {}", conf.getSubscriptionName(),
getTopicsResult.getNonPartitionedOrPartitionTopics());
}

// Pattern consumer has his unique check mechanism, so do not need the feature "autoUpdatePartitions".
conf.setAutoUpdatePartitions(false);
ConsumerBase<T> consumer = new PatternMultiTopicsConsumerImpl<>(pattern,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void testChangedUnfilteredResponse() {
"persistent://tenant/my-ns/non-matching"),
null, false, true),
mockListener,
Collections.emptyList());
Collections.emptyList(), "");
verify(mockListener).onTopicsAdded(Sets.newHashSet(
"persistent://tenant/my-ns/name-1",
"persistent://tenant/my-ns/name-2"));
Expand All @@ -80,7 +80,7 @@ public void testChangedFilteredResponse() {
"persistent://tenant/my-ns/name-2"),
"TOPICS_HASH", true, true),
mockListener,
Arrays.asList("persistent://tenant/my-ns/name-0"));
Arrays.asList("persistent://tenant/my-ns/name-0"), "");
verify(mockListener).onTopicsAdded(Sets.newHashSet(
"persistent://tenant/my-ns/name-1",
"persistent://tenant/my-ns/name-2"));
Expand All @@ -99,7 +99,7 @@ public void testUnchangedResponse() {
"persistent://tenant/my-ns/name-2"),
"TOPICS_HASH", true, false),
mockListener,
Arrays.asList("persistent://tenant/my-ns/name-0"));
Arrays.asList("persistent://tenant/my-ns/name-0"), "");
verify(mockListener, never()).onTopicsAdded(any());
verify(mockListener, never()).onTopicsRemoved(any());
verify(mockTopicsHashSetter).accept("TOPICS_HASH");
Expand Down

0 comments on commit 24b59e8

Please sign in to comment.