Skip to content

Commit

Permalink
[fix] [broker] Remove blocking calls from Subscription.getStats (apac…
Browse files Browse the repository at this point in the history
…he#23088)

(cherry picked from commit e59cd05)
(cherry picked from commit 9ea2a68)
  • Loading branch information
poorbarcode authored and srinath-ctds committed Jul 30, 2024
1 parent c2fda68 commit f5da2bf
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -1125,8 +1127,29 @@ public long estimateBacklogSize() {
return cursor.getEstimatedSizeSinceMarkDeletePosition();
}

/**
* @deprecated please call {@link #getStatsAsync(Boolean, boolean, boolean)}.
*/
@Deprecated
public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog) {
// So far, there is no case hits this check.
if (getEarliestTimeInBacklog) {
throw new IllegalArgumentException("Calling the sync method subscription.getStats with"
+ " getEarliestTimeInBacklog, it may encountered a deadlock error.");
}
// The method "getStatsAsync" will be a sync method if the param "isGetEarliestTimeInBacklog" is false.
try {
return getStatsAsync(getPreciseBacklog, subscriptionBacklogSize, false).get(5, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
// This error will never occur.
throw new RuntimeException(e);
}
}

public CompletableFuture<SubscriptionStatsImpl> getStatsAsync(Boolean getPreciseBacklog,
boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog) {
SubscriptionStatsImpl subStats = new SubscriptionStatsImpl();
subStats.lastExpireTimestamp = lastExpireTimestamp;
subStats.lastConsumedFlowTimestamp = lastConsumedFlowTimestamp;
Expand Down Expand Up @@ -1197,21 +1220,6 @@ public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscri
} else {
subStats.backlogSize = -1;
}
if (getEarliestTimeInBacklog) {
if (subStats.msgBacklog > 0) {
ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) cursor.getManagedLedger());
PositionImpl markDeletedPosition = (PositionImpl) cursor.getMarkDeletedPosition();
long result = 0;
try {
result = managedLedger.getEarliestMessagePublishTimeOfPos(markDeletedPosition).get();
} catch (InterruptedException | ExecutionException e) {
result = -1;
}
subStats.earliestMsgPublishTimeInBacklog = result;
} else {
subStats.earliestMsgPublishTimeInBacklog = -1;
}
}
subStats.msgBacklogNoDelayed = subStats.msgBacklog - subStats.msgDelayed;
subStats.msgRateExpired = expiryMonitor.getMessageExpiryRate();
subStats.totalMsgExpired = expiryMonitor.getTotalMessageExpired();
Expand All @@ -1236,7 +1244,20 @@ public SubscriptionStatsImpl getStats(Boolean getPreciseBacklog, boolean subscri
subStats.nonContiguousDeletedMessagesRanges = cursor.getTotalNonContiguousDeletedMessagesRange();
subStats.nonContiguousDeletedMessagesRangesSerializedSize =
cursor.getNonContiguousDeletedMessagesRangeSerializedSize();
return subStats;
if (!getEarliestTimeInBacklog) {
return CompletableFuture.completedFuture(subStats);
}
if (subStats.msgBacklog > 0) {
ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) cursor.getManagedLedger());
PositionImpl markDeletedPosition = (PositionImpl) cursor.getMarkDeletedPosition();
return managedLedger.getEarliestMessagePublishTimeOfPos(markDeletedPosition).thenApply(v -> {
subStats.earliestMsgPublishTimeInBacklog = v;
return subStats;
});
} else {
subStats.earliestMsgPublishTimeInBacklog = -1;
return CompletableFuture.completedFuture(subStats);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2434,7 +2434,6 @@ public TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBa
public CompletableFuture<TopicStatsImpl> asyncGetStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog) {

CompletableFuture<TopicStatsImpl> statsFuture = new CompletableFuture<>();
TopicStatsImpl stats = new TopicStatsImpl();

ObjectObjectHashMap<String, PublisherStatsImpl> remotePublishersStats = new ObjectObjectHashMap<>();
Expand Down Expand Up @@ -2463,29 +2462,6 @@ public CompletableFuture<TopicStatsImpl> asyncGetStats(boolean getPreciseBacklog
stats.abortedTxnCount = txnBuffer.getAbortedTxnCount();
stats.committedTxnCount = txnBuffer.getCommittedTxnCount();

subscriptions.forEach((name, subscription) -> {
SubscriptionStatsImpl subStats =
subscription.getStats(getPreciseBacklog, subscriptionBacklogSize, getEarliestTimeInBacklog);

stats.msgRateOut += subStats.msgRateOut;
stats.msgThroughputOut += subStats.msgThroughputOut;
stats.bytesOutCounter += subStats.bytesOutCounter;
stats.msgOutCounter += subStats.msgOutCounter;
stats.subscriptions.put(name, subStats);
stats.nonContiguousDeletedMessagesRanges += subStats.nonContiguousDeletedMessagesRanges;
stats.nonContiguousDeletedMessagesRangesSerializedSize +=
subStats.nonContiguousDeletedMessagesRangesSerializedSize;
stats.delayedMessageIndexSizeInBytes += subStats.delayedMessageIndexSizeInBytes;

subStats.bucketDelayedIndexStats.forEach((k, v) -> {
TopicMetricBean topicMetricBean =
stats.bucketDelayedIndexStats.computeIfAbsent(k, __ -> new TopicMetricBean());
topicMetricBean.name = v.name;
topicMetricBean.labelsAndValues = v.labelsAndValues;
topicMetricBean.value += v.value;
});
});

replicators.forEach((cluster, replicator) -> {
ReplicatorStatsImpl replicatorStats = replicator.getStats();

Expand Down Expand Up @@ -2535,21 +2511,49 @@ public CompletableFuture<TopicStatsImpl> asyncGetStats(boolean getPreciseBacklog
return compactionRecord;
});

if (getEarliestTimeInBacklog && stats.backlogSize != 0) {
ledger.getEarliestMessagePublishTimeInBacklog().whenComplete((earliestTime, e) -> {
if (e != null) {
log.error("[{}] Failed to get earliest message publish time in backlog", topic, e);
statsFuture.completeExceptionally(e);
} else {
stats.earliestMsgPublishTimeInBacklogs = earliestTime;
statsFuture.complete(stats);
}
});
} else {
statsFuture.complete(stats);
}

return statsFuture;
Map<String, CompletableFuture<SubscriptionStatsImpl>> subscriptionFutures = new HashMap<>();
subscriptions.forEach((name, subscription) -> {
subscriptionFutures.put(name, subscription.getStatsAsync(getPreciseBacklog, subscriptionBacklogSize,
getEarliestTimeInBacklog));
});
return FutureUtil.waitForAll(subscriptionFutures.values()).thenCompose(ignore -> {
for (Map.Entry<String, CompletableFuture<SubscriptionStatsImpl>> e : subscriptionFutures.entrySet()) {
String name = e.getKey();
SubscriptionStatsImpl subStats = e.getValue().join();
stats.msgRateOut += subStats.msgRateOut;
stats.msgThroughputOut += subStats.msgThroughputOut;
stats.bytesOutCounter += subStats.bytesOutCounter;
stats.msgOutCounter += subStats.msgOutCounter;
stats.subscriptions.put(name, subStats);
stats.nonContiguousDeletedMessagesRanges += subStats.nonContiguousDeletedMessagesRanges;
stats.nonContiguousDeletedMessagesRangesSerializedSize +=
subStats.nonContiguousDeletedMessagesRangesSerializedSize;
stats.delayedMessageIndexSizeInBytes += subStats.delayedMessageIndexSizeInBytes;

subStats.bucketDelayedIndexStats.forEach((k, v) -> {
TopicMetricBean topicMetricBean =
stats.bucketDelayedIndexStats.computeIfAbsent(k, __ -> new TopicMetricBean());
topicMetricBean.name = v.name;
topicMetricBean.labelsAndValues = v.labelsAndValues;
topicMetricBean.value += v.value;
});
}
if (getEarliestTimeInBacklog && stats.backlogSize != 0) {
CompletableFuture finalRes = ledger.getEarliestMessagePublishTimeInBacklog()
.thenApply((earliestTime) -> {
stats.earliestMsgPublishTimeInBacklogs = earliestTime;
return stats;
});
// print error log.
finalRes.exceptionally(ex -> {
log.error("[{}] Failed to get earliest message publish time in backlog", topic, ex);
return null;
});
return finalRes;
} else {
return CompletableFuture.completedFuture(stats);
}
});
}

private Optional<CompactorMXBean> getCompactorMXBean() {
Expand Down

0 comments on commit f5da2bf

Please sign in to comment.