diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 15bf568cbb295..ace27c3be96d1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -812,7 +812,9 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St readCompacted, keySharedMeta, startMessageId, consumerEpoch); return addConsumerToSubscription(subscription, consumer).thenCompose(v -> { - checkBackloggedCursors(); + if (subscription instanceof PersistentSubscription persistentSubscription) { + checkBackloggedCursor(persistentSubscription); + } if (!cnx.isActive()) { try { consumer.close(); @@ -2566,17 +2568,21 @@ public void checkInactiveSubscriptions() { @Override public void checkBackloggedCursors() { - // activate caught up cursors which include consumers subscriptions.forEach((subName, subscription) -> { - if (!subscription.getConsumers().isEmpty() - && subscription.getCursor().getNumberOfEntries() < backloggedCursorThresholdEntries) { - subscription.getCursor().setActive(); - } else { - subscription.getCursor().setInactive(); - } + checkBackloggedCursor(subscription); }); } + private void checkBackloggedCursor(PersistentSubscription subscription) { + // activate caught up cursor which include consumers + if (!subscription.getConsumers().isEmpty() + && subscription.getCursor().getNumberOfEntries() < backloggedCursorThresholdEntries) { + subscription.getCursor().setActive(); + } else { + subscription.getCursor().setInactive(); + } + } + public void checkInactiveLedgers() { ledger.checkInactiveLedgerAndRollOver(); }