Skip to content

Commit

Permalink
[improve][broker][PIP-379] Add observability stats for "draining hash…
Browse files Browse the repository at this point in the history
…es" (#23429)
  • Loading branch information
lhotari authored Oct 10, 2024
1 parent 3dc0ade commit acac72e
Show file tree
Hide file tree
Showing 16 changed files with 734 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ public class Consumer {
@Setter
private volatile PendingAcksMap.PendingAcksRemoveHandler pendingAcksRemoveHandler;

@Getter
@Setter
private volatile java.util.function.BiConsumer<Consumer, ConsumerStatsImpl> drainingHashesConsumerStatsUpdater;

public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId,
int priorityLevel, String consumerName,
boolean isDurable, TransportCnx cnx, String appId,
Expand Down Expand Up @@ -976,6 +980,9 @@ public ConsumerStatsImpl getStats() {
if (readPositionWhenJoining != null) {
stats.readPositionWhenJoining = readPositionWhenJoining.toString();
}
if (drainingHashesConsumerStatsUpdater != null) {
drainingHashesConsumerStatsUpdater.accept(this, stats);
}
return stats;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,18 @@

import static org.apache.pulsar.broker.service.StickyKeyConsumerSelector.STICKY_KEY_HASH_NOT_SET;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.PrimitiveIterator;
import java.util.concurrent.ConcurrentHashMap;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.policies.data.DrainingHash;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.policies.data.stats.DrainingHashImpl;
import org.roaringbitmap.RoaringBitmap;

/**
* A thread-safe map to store draining hashes in the consumer.
Expand All @@ -34,6 +44,8 @@ public class DrainingHashesTracker {
private final Int2ObjectOpenHashMap<DrainingHashEntry> drainingHashes = new Int2ObjectOpenHashMap<>();
int batchLevel;
boolean unblockedWhileBatching;
private final Map<ConsumerIdentityWrapper, ConsumerDrainingHashesStats> consumerDrainingHashesStatsMap =
new ConcurrentHashMap<>();

/**
* Represents an entry in the draining hashes tracker.
Expand Down Expand Up @@ -98,6 +110,52 @@ boolean isBlocking() {
}
}

private class ConsumerDrainingHashesStats {
private final RoaringBitmap drainingHashes = new RoaringBitmap();
long drainingHashesClearedTotal;

public synchronized void addHash(int stickyHash) {
drainingHashes.add(stickyHash);
}

public synchronized boolean clearHash(int hash) {
drainingHashes.remove(hash);
drainingHashesClearedTotal++;
boolean empty = drainingHashes.isEmpty();
if (log.isDebugEnabled()) {
log.debug("[{}] Cleared hash {} in stats. empty={} totalCleared={} hashes={}",
dispatcherName, hash, empty, drainingHashesClearedTotal, drainingHashes.getCardinality());
}
return empty;
}

public synchronized void updateConsumerStats(Consumer consumer, ConsumerStatsImpl consumerStats) {
int drainingHashesUnackedMessages = 0;
List<DrainingHash> drainingHashesStats = new ArrayList<>();
PrimitiveIterator.OfInt hashIterator = drainingHashes.stream().iterator();
while (hashIterator.hasNext()) {
int hash = hashIterator.nextInt();
DrainingHashEntry entry = getEntry(hash);
if (entry == null) {
log.warn("[{}] Draining hash {} not found in the tracker for consumer {}", dispatcherName, hash,
consumer);
continue;
}
int unackedMessages = entry.refCount;
DrainingHashImpl drainingHash = new DrainingHashImpl();
drainingHash.hash = hash;
drainingHash.unackMsgs = unackedMessages;
drainingHash.blockedAttempts = entry.blockedCount;
drainingHashesStats.add(drainingHash);
drainingHashesUnackedMessages += unackedMessages;
}
consumerStats.drainingHashesCount = drainingHashesStats.size();
consumerStats.drainingHashesClearedTotal = drainingHashesClearedTotal;
consumerStats.drainingHashesUnackedMessages = drainingHashesUnackedMessages;
consumerStats.drainingHashes = drainingHashesStats;
}
}

/**
* Interface for handling the unblocking of sticky key hashes.
*/
Expand Down Expand Up @@ -127,13 +185,25 @@ public synchronized void addEntry(Consumer consumer, int stickyHash) {
}
DrainingHashEntry entry = drainingHashes.get(stickyHash);
if (entry == null) {
if (log.isDebugEnabled()) {
log.debug("[{}] Adding and incrementing draining hash {} for consumer id:{} name:{}", dispatcherName,
stickyHash, consumer.consumerId(), consumer.consumerName());
}
entry = new DrainingHashEntry(consumer);
drainingHashes.put(stickyHash, entry);
// update the consumer specific stats
consumerDrainingHashesStatsMap.computeIfAbsent(new ConsumerIdentityWrapper(consumer),
k -> new ConsumerDrainingHashesStats()).addHash(stickyHash);
} else if (entry.getConsumer() != consumer) {
throw new IllegalStateException(
"Consumer " + entry.getConsumer() + " is already draining hash " + stickyHash
+ " in dispatcher " + dispatcherName + ". Same hash being used for consumer " + consumer
+ ".");
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Draining hash {} incrementing {} consumer id:{} name:{}", dispatcherName, stickyHash,
entry.refCount + 1, consumer.consumerId(), consumer.consumerName());
}
}
entry.incrementRefCount();
}
Expand Down Expand Up @@ -178,14 +248,29 @@ public synchronized void reduceRefCount(Consumer consumer, int stickyHash, boole
+ ".");
}
if (entry.decrementRefCount()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Draining hash {} removing consumer id:{} name:{}", dispatcherName, stickyHash,
consumer.consumerId(), consumer.consumerName());
}
DrainingHashEntry removed = drainingHashes.remove(stickyHash);
// update the consumer specific stats
ConsumerDrainingHashesStats drainingHashesStats =
consumerDrainingHashesStatsMap.get(new ConsumerIdentityWrapper(consumer));
if (drainingHashesStats != null) {
drainingHashesStats.clearHash(stickyHash);
}
if (!closing && removed.isBlocking()) {
if (batchLevel > 0) {
unblockedWhileBatching = true;
} else {
unblockingHandler.stickyKeyHashUnblocked(stickyHash);
}
}
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Draining hash {} decrementing {} consumer id:{} name:{}", dispatcherName, stickyHash,
entry.refCount, consumer.consumerId(), consumer.consumerName());
}
}
}

Expand Down Expand Up @@ -237,5 +322,32 @@ public synchronized DrainingHashEntry getEntry(int stickyKeyHash) {
*/
public synchronized void clear() {
drainingHashes.clear();
consumerDrainingHashesStatsMap.clear();
}

/**
* Update the consumer specific stats to the target {@link ConsumerStatsImpl}.
*
* @param consumer the consumer
* @param consumerStats the consumer stats to update the values to
*/
public void updateConsumerStats(Consumer consumer, ConsumerStatsImpl consumerStats) {
consumerStats.drainingHashesCount = 0;
consumerStats.drainingHashesClearedTotal = 0;
consumerStats.drainingHashesUnackedMessages = 0;
consumerStats.drainingHashes = Collections.emptyList();
ConsumerDrainingHashesStats consumerDrainingHashesStats =
consumerDrainingHashesStatsMap.get(new ConsumerIdentityWrapper(consumer));
if (consumerDrainingHashesStats != null) {
consumerDrainingHashesStats.updateConsumerStats(consumer, consumerStats);
}
}

/**
* Remove the consumer specific stats from the draining hashes tracker.
* @param consumer the consumer
*/
public void consumerRemoved(Consumer consumer) {
consumerDrainingHashesStatsMap.remove(new ConsumerIdentityWrapper(consumer));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ public void endBatch() {
drainingHashesTracker.endBatch();
}
});
consumer.setDrainingHashesConsumerStatsUpdater(drainingHashesTracker::updateConsumerStats);
registerDrainingHashes(consumer, impactedConsumers.orElseThrow());
}
}).exceptionally(ex -> {
Expand Down Expand Up @@ -193,6 +194,7 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE
// consumer to another. This will handle the case where a hash gets switched from an existing
// consumer to another existing consumer during removal.
registerDrainingHashes(consumer, impactedConsumers.orElseThrow());
drainingHashesTracker.consumerRemoved(consumer);
}
}

Expand Down Expand Up @@ -349,8 +351,8 @@ private boolean handleAddingPendingAck(Consumer consumer, long ledgerId, long en
return false;
}
if (log.isDebugEnabled()) {
log.debug("[{}] Adding {}:{} to pending acks for consumer {} with sticky key hash {}",
getName(), ledgerId, entryId, consumer, stickyKeyHash);
log.debug("[{}] Adding {}:{} to pending acks for consumer id:{} name:{} with sticky key hash {}",
getName(), ledgerId, entryId, consumer.consumerId(), consumer.consumerName(), stickyKeyHash);
}
// allow adding the message to pending acks and sending the message to the consumer
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1253,11 +1253,23 @@ public CompletableFuture<SubscriptionStatsImpl> getStatsAsync(GetStatsOptions ge
subStats.lastConsumedTimestamp =
Math.max(subStats.lastConsumedTimestamp, consumerStats.lastConsumedTimestamp);
subStats.lastAckedTimestamp = Math.max(subStats.lastAckedTimestamp, consumerStats.lastAckedTimestamp);
if (consumerKeyHashRanges != null && consumerKeyHashRanges.containsKey(consumer)) {
consumerStats.keyHashRanges = consumerKeyHashRanges.get(consumer).stream()
.map(Range::toString)
.collect(Collectors.toList());
List<Range> keyRanges = consumerKeyHashRanges != null ? consumerKeyHashRanges.get(consumer) : null;
if (keyRanges != null) {
if (((StickyKeyDispatcher) dispatcher).isClassic()) {
// Use string representation for classic mode
consumerStats.keyHashRanges = keyRanges.stream()
.map(Range::toString)
.collect(Collectors.toList());
} else {
// Use array representation for PIP-379 stats
consumerStats.keyHashRangeArrays = keyRanges.stream()
.map(range -> new int[]{range.getStart(), range.getEnd()})
.collect(Collectors.toList());
}
}
subStats.drainingHashesCount += consumerStats.drainingHashesCount;
subStats.drainingHashesClearedTotal += consumerStats.drainingHashesClearedTotal;
subStats.drainingHashesUnackedMessages += consumerStats.drainingHashesUnackedMessages;
});

subStats.filterProcessedMsgCount = dispatcher.getFilterProcessedMsgCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.function.LongSupplier;
import lombok.extern.slf4j.Slf4j;

/**
* Reschedules reads so that the possible pending read is cancelled if it's waiting for more entries.
* This will prevent the dispatcher in getting blocked when there are entries in the replay queue
* that should be handled. This will also batch multiple calls together to reduce the number of
* operations.
*/
@Slf4j
class RescheduleReadHandler {
private static final int UNSET = -1;
private static final int NO_PENDING_READ = 0;
Expand Down Expand Up @@ -70,15 +72,27 @@ public void rescheduleRead() {
// are entries in the replay queue.
if (maxReadOpCount != NO_PENDING_READ && readOpCounterSupplier.getAsLong() == maxReadOpCount
&& hasEntriesInReplayQueue.getAsBoolean()) {
if (log.isDebugEnabled()) {
log.debug("Cancelling pending read request because it's waiting for more entries");
}
cancelPendingRead.run();
}
// Re-schedule read immediately, or join the next scheduled read
if (log.isDebugEnabled()) {
log.debug("Triggering read");
}
rescheduleReadImmediately.run();
};
long rescheduleDelay = readIntervalMsSupplier.getAsLong();
if (rescheduleDelay > 0) {
if (log.isDebugEnabled()) {
log.debug("Scheduling after {} ms", rescheduleDelay);
}
executor.schedule(runnable, rescheduleDelay, TimeUnit.MILLISECONDS);
} else {
if (log.isDebugEnabled()) {
log.debug("Running immediately");
}
runnable.run();
}
} else {
Expand Down
Loading

0 comments on commit acac72e

Please sign in to comment.