Skip to content

Commit

Permalink
[fix][broker] fix pulsar-admin topics stats-internal caused a BK clie…
Browse files Browse the repository at this point in the history
…nt thread a deadlock (apache#23258)

(cherry picked from commit 0aaa906)
(cherry picked from commit f5737e6)
  • Loading branch information
poorbarcode authored and nikhil-ctds committed Sep 10, 2024
1 parent 1790dc0 commit ca0c5fa
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -179,6 +180,7 @@
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.CompactedTopicContext;
import org.apache.pulsar.compaction.CompactedTopicImpl;
import org.apache.pulsar.compaction.Compactor;
Expand Down Expand Up @@ -230,6 +232,7 @@ public static boolean isDedupCursorName(String name) {
private static final Long COMPACTION_NEVER_RUN = -0xfebecffeL;
private volatile CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(
COMPACTION_NEVER_RUN);
private final CompactedTopic compactedTopic;
private TopicCompactionService topicCompactionService;

// TODO: Create compaction strategy from topic policy when exposing strategic compaction to users.
Expand Down Expand Up @@ -374,6 +377,7 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
.build();
this.backloggedCursorThresholdEntries =
brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
this.messageDeduplication = new MessageDeduplication(brokerService.pulsar(), this, ledger);
if (ledger.getProperties().containsKey(TOPIC_EPOCH_PROPERTY_NAME)) {
topicEpoch = Optional.of(Long.parseLong(ledger.getProperties().get(TOPIC_EPOCH_PROPERTY_NAME)));
Expand Down Expand Up @@ -468,6 +472,7 @@ public CompletableFuture<Void> initialize() {
.build();
this.backloggedCursorThresholdEntries =
brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());

if (brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()) {
this.transactionBuffer = brokerService.getPulsar()
Expand Down Expand Up @@ -2657,13 +2662,13 @@ public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean
info.entries = -1;
info.size = -1;

Optional<CompactedTopicContext> compactedTopicContext = getCompactedTopicContext();
if (compactedTopicContext.isPresent()) {
CompactedTopicContext ledgerContext = compactedTopicContext.get();
info.ledgerId = ledgerContext.getLedger().getId();
info.entries = ledgerContext.getLedger().getLastAddConfirmed() + 1;
info.size = ledgerContext.getLedger().getLength();
}
futures.add(getCompactedTopicContextAsync().thenAccept(v -> {
if (v != null) {
info.ledgerId = v.getLedger().getId();
info.entries = v.getLedger().getLastAddConfirmed() + 1;
info.size = v.getLedger().getLength();
}
}));

stats.compactedLedger = info;

Expand Down Expand Up @@ -2782,12 +2787,21 @@ public Optional<CompactedTopicContext> getCompactedTopicContext() {
if (topicCompactionService instanceof PulsarTopicCompactionService pulsarCompactedService) {
return pulsarCompactedService.getCompactedTopic().getCompactedTopicContext();
}
} catch (ExecutionException | InterruptedException e) {
} catch (ExecutionException | InterruptedException | TimeoutException e) {
log.warn("[{}]Fail to get ledger information for compacted topic.", topic);
}
return Optional.empty();
}

public CompletableFuture<CompactedTopicContext> getCompactedTopicContextAsync() {
CompletableFuture<CompactedTopicContext> res =
((CompactedTopicImpl) compactedTopic).getCompactedTopicContextFuture();
if (res == null) {
return CompletableFuture.completedFuture(null);
}
return res;
}

public long getBacklogSize() {
return ledger.getEstimatedBacklogSize();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.apache.bookkeeper.client.BKException;
Expand Down Expand Up @@ -304,8 +306,10 @@ static CompletableFuture<List<Entry>> readEntries(LedgerHandle lh, long from, lo
* Getter for CompactedTopicContext.
* @return CompactedTopicContext
*/
public Optional<CompactedTopicContext> getCompactedTopicContext() throws ExecutionException, InterruptedException {
return compactedTopicContext == null ? Optional.empty() : Optional.of(compactedTopicContext.get());
public Optional<CompactedTopicContext> getCompactedTopicContext() throws ExecutionException, InterruptedException,
TimeoutException {
return compactedTopicContext == null ? Optional.empty() :
Optional.of(compactedTopicContext.get(30, TimeUnit.SECONDS));
}

@Override
Expand Down

0 comments on commit ca0c5fa

Please sign in to comment.