diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 4570e894a7198..7faa86e57053f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -47,6 +47,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -2643,13 +2644,13 @@ public CompletableFuture getInternalStats(boolean info.entries = -1; info.size = -1; - Optional compactedTopicContext = getCompactedTopicContext(); - if (compactedTopicContext.isPresent()) { - CompactedTopicContext ledgerContext = compactedTopicContext.get(); - info.ledgerId = ledgerContext.getLedger().getId(); - info.entries = ledgerContext.getLedger().getLastAddConfirmed() + 1; - info.size = ledgerContext.getLedger().getLength(); - } + futures.add(getCompactedTopicContextAsync().thenAccept(v -> { + if (v != null) { + info.ledgerId = v.getLedger().getId(); + info.entries = v.getLedger().getLastAddConfirmed() + 1; + info.size = v.getLedger().getLength(); + } + })); stats.compactedLedger = info; @@ -2766,12 +2767,21 @@ public CompletableFuture getInternalStats(boolean public Optional getCompactedTopicContext() { try { return ((CompactedTopicImpl) compactedTopic).getCompactedTopicContext(); - } catch (ExecutionException | InterruptedException e) { + } catch (ExecutionException | InterruptedException | TimeoutException e) { log.warn("[{}]Fail to get ledger information for compacted topic.", topic); } return Optional.empty(); } + public CompletableFuture getCompactedTopicContextAsync() { + CompletableFuture res = + ((CompactedTopicImpl) compactedTopic).getCompactedTopicContextFuture(); + if (res == null) { + return CompletableFuture.completedFuture(null); + } + return res; + } + public long getBacklogSize() { return ledger.getEstimatedBacklogSize(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java index b906653099867..1abf3dd25b962 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java @@ -32,6 +32,8 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.annotation.Nullable; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -307,8 +309,10 @@ private static CompletableFuture> readEntries(LedgerHandle lh, long * Getter for CompactedTopicContext. * @return CompactedTopicContext */ - public Optional getCompactedTopicContext() throws ExecutionException, InterruptedException { - return compactedTopicContext == null ? Optional.empty() : Optional.of(compactedTopicContext.get()); + public Optional getCompactedTopicContext() throws ExecutionException, InterruptedException, + TimeoutException { + return compactedTopicContext == null ? Optional.empty() : + Optional.of(compactedTopicContext.get(30, TimeUnit.SECONDS)); } @Override