From acac72ea03f7c38cab99ec011b309d5e6bb4fe9d Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 10 Oct 2024 12:46:42 +0300 Subject: [PATCH] [improve][broker][PIP-379] Add observability stats for "draining hashes" (#23429) --- .../pulsar/broker/service/Consumer.java | 7 + .../broker/service/DrainingHashesTracker.java | 112 +++++++ ...tStickyKeyDispatcherMultipleConsumers.java | 6 +- .../persistent/PersistentSubscription.java | 20 +- .../persistent/RescheduleReadHandler.java | 14 + .../apache/pulsar/broker/BrokerTestUtil.java | 78 ++++- .../stats/AuthenticatedConsumerStatsTest.java | 57 +--- .../broker/stats/ConsumerStatsTest.java | 276 ++++++++++++++++-- pulsar-broker/src/test/resources/log4j2.xml | 11 + .../common/policies/data/ConsumerStats.java | 48 ++- .../common/policies/data/DrainingHash.java | 41 +++ .../policies/data/SubscriptionStats.java | 24 ++ .../data/stats/ConsumerStatsImpl.java | 43 ++- .../policies/data/stats/DrainingHashImpl.java | 46 +++ .../data/stats/SubscriptionStatsImpl.java | 22 ++ .../common/util/ObjectMapperFactory.java | 3 + 16 files changed, 734 insertions(+), 74 deletions(-) create mode 100644 pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/DrainingHash.java create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/DrainingHashImpl.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index d25ebd0839df1..bcd29d86490cf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -174,6 +174,10 @@ public class Consumer { @Setter private volatile PendingAcksMap.PendingAcksRemoveHandler pendingAcksRemoveHandler; + @Getter + @Setter + private volatile java.util.function.BiConsumer drainingHashesConsumerStatsUpdater; + public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId, int priorityLevel, String consumerName, boolean isDurable, TransportCnx cnx, String appId, @@ -976,6 +980,9 @@ public ConsumerStatsImpl getStats() { if (readPositionWhenJoining != null) { stats.readPositionWhenJoining = readPositionWhenJoining.toString(); } + if (drainingHashesConsumerStatsUpdater != null) { + drainingHashesConsumerStatsUpdater.accept(this, stats); + } return stats; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java index 3521fa197a13d..46762c844db6c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/DrainingHashesTracker.java @@ -20,8 +20,18 @@ import static org.apache.pulsar.broker.service.StickyKeyConsumerSelector.STICKY_KEY_HASH_NOT_SET; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.PrimitiveIterator; +import java.util.concurrent.ConcurrentHashMap; import lombok.ToString; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.common.policies.data.DrainingHash; +import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl; +import org.apache.pulsar.common.policies.data.stats.DrainingHashImpl; +import org.roaringbitmap.RoaringBitmap; /** * A thread-safe map to store draining hashes in the consumer. @@ -34,6 +44,8 @@ public class DrainingHashesTracker { private final Int2ObjectOpenHashMap drainingHashes = new Int2ObjectOpenHashMap<>(); int batchLevel; boolean unblockedWhileBatching; + private final Map consumerDrainingHashesStatsMap = + new ConcurrentHashMap<>(); /** * Represents an entry in the draining hashes tracker. @@ -98,6 +110,52 @@ boolean isBlocking() { } } + private class ConsumerDrainingHashesStats { + private final RoaringBitmap drainingHashes = new RoaringBitmap(); + long drainingHashesClearedTotal; + + public synchronized void addHash(int stickyHash) { + drainingHashes.add(stickyHash); + } + + public synchronized boolean clearHash(int hash) { + drainingHashes.remove(hash); + drainingHashesClearedTotal++; + boolean empty = drainingHashes.isEmpty(); + if (log.isDebugEnabled()) { + log.debug("[{}] Cleared hash {} in stats. empty={} totalCleared={} hashes={}", + dispatcherName, hash, empty, drainingHashesClearedTotal, drainingHashes.getCardinality()); + } + return empty; + } + + public synchronized void updateConsumerStats(Consumer consumer, ConsumerStatsImpl consumerStats) { + int drainingHashesUnackedMessages = 0; + List drainingHashesStats = new ArrayList<>(); + PrimitiveIterator.OfInt hashIterator = drainingHashes.stream().iterator(); + while (hashIterator.hasNext()) { + int hash = hashIterator.nextInt(); + DrainingHashEntry entry = getEntry(hash); + if (entry == null) { + log.warn("[{}] Draining hash {} not found in the tracker for consumer {}", dispatcherName, hash, + consumer); + continue; + } + int unackedMessages = entry.refCount; + DrainingHashImpl drainingHash = new DrainingHashImpl(); + drainingHash.hash = hash; + drainingHash.unackMsgs = unackedMessages; + drainingHash.blockedAttempts = entry.blockedCount; + drainingHashesStats.add(drainingHash); + drainingHashesUnackedMessages += unackedMessages; + } + consumerStats.drainingHashesCount = drainingHashesStats.size(); + consumerStats.drainingHashesClearedTotal = drainingHashesClearedTotal; + consumerStats.drainingHashesUnackedMessages = drainingHashesUnackedMessages; + consumerStats.drainingHashes = drainingHashesStats; + } + } + /** * Interface for handling the unblocking of sticky key hashes. */ @@ -127,13 +185,25 @@ public synchronized void addEntry(Consumer consumer, int stickyHash) { } DrainingHashEntry entry = drainingHashes.get(stickyHash); if (entry == null) { + if (log.isDebugEnabled()) { + log.debug("[{}] Adding and incrementing draining hash {} for consumer id:{} name:{}", dispatcherName, + stickyHash, consumer.consumerId(), consumer.consumerName()); + } entry = new DrainingHashEntry(consumer); drainingHashes.put(stickyHash, entry); + // update the consumer specific stats + consumerDrainingHashesStatsMap.computeIfAbsent(new ConsumerIdentityWrapper(consumer), + k -> new ConsumerDrainingHashesStats()).addHash(stickyHash); } else if (entry.getConsumer() != consumer) { throw new IllegalStateException( "Consumer " + entry.getConsumer() + " is already draining hash " + stickyHash + " in dispatcher " + dispatcherName + ". Same hash being used for consumer " + consumer + "."); + } else { + if (log.isDebugEnabled()) { + log.debug("[{}] Draining hash {} incrementing {} consumer id:{} name:{}", dispatcherName, stickyHash, + entry.refCount + 1, consumer.consumerId(), consumer.consumerName()); + } } entry.incrementRefCount(); } @@ -178,7 +248,17 @@ public synchronized void reduceRefCount(Consumer consumer, int stickyHash, boole + "."); } if (entry.decrementRefCount()) { + if (log.isDebugEnabled()) { + log.debug("[{}] Draining hash {} removing consumer id:{} name:{}", dispatcherName, stickyHash, + consumer.consumerId(), consumer.consumerName()); + } DrainingHashEntry removed = drainingHashes.remove(stickyHash); + // update the consumer specific stats + ConsumerDrainingHashesStats drainingHashesStats = + consumerDrainingHashesStatsMap.get(new ConsumerIdentityWrapper(consumer)); + if (drainingHashesStats != null) { + drainingHashesStats.clearHash(stickyHash); + } if (!closing && removed.isBlocking()) { if (batchLevel > 0) { unblockedWhileBatching = true; @@ -186,6 +266,11 @@ public synchronized void reduceRefCount(Consumer consumer, int stickyHash, boole unblockingHandler.stickyKeyHashUnblocked(stickyHash); } } + } else { + if (log.isDebugEnabled()) { + log.debug("[{}] Draining hash {} decrementing {} consumer id:{} name:{}", dispatcherName, stickyHash, + entry.refCount, consumer.consumerId(), consumer.consumerName()); + } } } @@ -237,5 +322,32 @@ public synchronized DrainingHashEntry getEntry(int stickyKeyHash) { */ public synchronized void clear() { drainingHashes.clear(); + consumerDrainingHashesStatsMap.clear(); + } + + /** + * Update the consumer specific stats to the target {@link ConsumerStatsImpl}. + * + * @param consumer the consumer + * @param consumerStats the consumer stats to update the values to + */ + public void updateConsumerStats(Consumer consumer, ConsumerStatsImpl consumerStats) { + consumerStats.drainingHashesCount = 0; + consumerStats.drainingHashesClearedTotal = 0; + consumerStats.drainingHashesUnackedMessages = 0; + consumerStats.drainingHashes = Collections.emptyList(); + ConsumerDrainingHashesStats consumerDrainingHashesStats = + consumerDrainingHashesStatsMap.get(new ConsumerIdentityWrapper(consumer)); + if (consumerDrainingHashesStats != null) { + consumerDrainingHashesStats.updateConsumerStats(consumer, consumerStats); + } + } + + /** + * Remove the consumer specific stats from the draining hashes tracker. + * @param consumer the consumer + */ + public void consumerRemoved(Consumer consumer) { + consumerDrainingHashesStatsMap.remove(new ConsumerIdentityWrapper(consumer)); } } \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index df053e6d8a549..1a3e2f706cba8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -157,6 +157,7 @@ public void endBatch() { drainingHashesTracker.endBatch(); } }); + consumer.setDrainingHashesConsumerStatsUpdater(drainingHashesTracker::updateConsumerStats); registerDrainingHashes(consumer, impactedConsumers.orElseThrow()); } }).exceptionally(ex -> { @@ -193,6 +194,7 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE // consumer to another. This will handle the case where a hash gets switched from an existing // consumer to another existing consumer during removal. registerDrainingHashes(consumer, impactedConsumers.orElseThrow()); + drainingHashesTracker.consumerRemoved(consumer); } } @@ -349,8 +351,8 @@ private boolean handleAddingPendingAck(Consumer consumer, long ledgerId, long en return false; } if (log.isDebugEnabled()) { - log.debug("[{}] Adding {}:{} to pending acks for consumer {} with sticky key hash {}", - getName(), ledgerId, entryId, consumer, stickyKeyHash); + log.debug("[{}] Adding {}:{} to pending acks for consumer id:{} name:{} with sticky key hash {}", + getName(), ledgerId, entryId, consumer.consumerId(), consumer.consumerName(), stickyKeyHash); } // allow adding the message to pending acks and sending the message to the consumer return true; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index eaa147b81b126..df1c23cbbcb30 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -1253,11 +1253,23 @@ public CompletableFuture getStatsAsync(GetStatsOptions ge subStats.lastConsumedTimestamp = Math.max(subStats.lastConsumedTimestamp, consumerStats.lastConsumedTimestamp); subStats.lastAckedTimestamp = Math.max(subStats.lastAckedTimestamp, consumerStats.lastAckedTimestamp); - if (consumerKeyHashRanges != null && consumerKeyHashRanges.containsKey(consumer)) { - consumerStats.keyHashRanges = consumerKeyHashRanges.get(consumer).stream() - .map(Range::toString) - .collect(Collectors.toList()); + List keyRanges = consumerKeyHashRanges != null ? consumerKeyHashRanges.get(consumer) : null; + if (keyRanges != null) { + if (((StickyKeyDispatcher) dispatcher).isClassic()) { + // Use string representation for classic mode + consumerStats.keyHashRanges = keyRanges.stream() + .map(Range::toString) + .collect(Collectors.toList()); + } else { + // Use array representation for PIP-379 stats + consumerStats.keyHashRangeArrays = keyRanges.stream() + .map(range -> new int[]{range.getStart(), range.getEnd()}) + .collect(Collectors.toList()); + } } + subStats.drainingHashesCount += consumerStats.drainingHashesCount; + subStats.drainingHashesClearedTotal += consumerStats.drainingHashesClearedTotal; + subStats.drainingHashesUnackedMessages += consumerStats.drainingHashesUnackedMessages; }); subStats.filterProcessedMsgCount = dispatcher.getFilterProcessedMsgCount(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/RescheduleReadHandler.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/RescheduleReadHandler.java index 3554f29255227..4812be58cdc78 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/RescheduleReadHandler.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/RescheduleReadHandler.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.BooleanSupplier; import java.util.function.LongSupplier; +import lombok.extern.slf4j.Slf4j; /** * Reschedules reads so that the possible pending read is cancelled if it's waiting for more entries. @@ -30,6 +31,7 @@ * that should be handled. This will also batch multiple calls together to reduce the number of * operations. */ +@Slf4j class RescheduleReadHandler { private static final int UNSET = -1; private static final int NO_PENDING_READ = 0; @@ -70,15 +72,27 @@ public void rescheduleRead() { // are entries in the replay queue. if (maxReadOpCount != NO_PENDING_READ && readOpCounterSupplier.getAsLong() == maxReadOpCount && hasEntriesInReplayQueue.getAsBoolean()) { + if (log.isDebugEnabled()) { + log.debug("Cancelling pending read request because it's waiting for more entries"); + } cancelPendingRead.run(); } // Re-schedule read immediately, or join the next scheduled read + if (log.isDebugEnabled()) { + log.debug("Triggering read"); + } rescheduleReadImmediately.run(); }; long rescheduleDelay = readIntervalMsSupplier.getAsLong(); if (rescheduleDelay > 0) { + if (log.isDebugEnabled()) { + log.debug("Scheduling after {} ms", rescheduleDelay); + } executor.schedule(runnable, rescheduleDelay, TimeUnit.MILLISECONDS); } else { + if (log.isDebugEnabled()) { + log.debug("Running immediately"); + } runnable.run(); } } else { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java index 7ed4542b2505f..6a41e86f8934e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java @@ -21,10 +21,15 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStreamReader; import java.io.StringWriter; import java.io.UncheckedIOException; +import java.net.HttpURLConnection; +import java.net.URL; import java.time.Duration; import java.util.Arrays; import java.util.UUID; @@ -37,6 +42,7 @@ import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.stream.Stream; +import lombok.SneakyThrows; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; @@ -46,7 +52,6 @@ import org.apache.pulsar.common.util.ObjectMapperFactory; import org.mockito.Mockito; import org.slf4j.Logger; - /** * Holds util methods used in test. */ @@ -136,6 +141,77 @@ public static void logTopicStats(Logger logger, PulsarAdmin pulsarAdmin, String } } + /** + * Logs the topic stats and internal stats for the given topic + * @param logger logger to use + * @param baseUrl Pulsar service URL + * @param topic topic name + */ + public static void logTopicStats(Logger logger, String baseUrl, String topic) { + logTopicStats(logger, baseUrl, "public", "default", topic); + } + + /** + * Logs the topic stats and internal stats for the given topic + * @param logger logger to use + * @param baseUrl Pulsar service URL + * @param tenant tenant name + * @param namespace namespace name + * @param topic topic name + */ + public static void logTopicStats(Logger logger, String baseUrl, String tenant, String namespace, String topic) { + String topicStatsUri = + String.format("%s/admin/v2/persistent/%s/%s/%s/stats", baseUrl, tenant, namespace, topic); + logger.info("[{}] stats: {}", topic, jsonPrettyPrint(getJsonResourceAsString(topicStatsUri))); + String topicStatsInternalUri = + String.format("%s/admin/v2/persistent/%s/%s/%s/internalStats", baseUrl, tenant, namespace, topic); + logger.info("[{}] internalStats: {}", topic, jsonPrettyPrint(getJsonResourceAsString(topicStatsInternalUri))); + } + + /** + * Pretty print the given JSON string + * @param jsonString JSON string to pretty print + * @return pretty printed JSON string + */ + public static String jsonPrettyPrint(String jsonString) { + try { + ObjectMapper mapper = new ObjectMapper(); + Object json = mapper.readValue(jsonString, Object.class); + ObjectWriter writer = mapper.writerWithDefaultPrettyPrinter(); + return writer.writeValueAsString(json); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + /** + * Get the resource as a string from the given URI + */ + @SneakyThrows + public static String getJsonResourceAsString(String uri) { + URL url = new URL(uri); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("GET"); + connection.setRequestProperty("Accept", "application/json"); + try { + int responseCode = connection.getResponseCode(); + if (responseCode == 200) { + try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) { + String inputLine; + StringBuilder content = new StringBuilder(); + while ((inputLine = in.readLine()) != null) { + content.append(inputLine); + } + return content.toString(); + } + } else { + throw new IOException("Failed to get resource: " + uri + ", status: " + responseCode); + } + } finally { + connection.disconnect(); + } + } + /** * Receive messages concurrently from multiple consumers and handles them using the provided message handler. * The message handler should return true if it wants to continue receiving more messages, false otherwise. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java index e8cadb72e1e04..20c1c5498ce6d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/AuthenticatedConsumerStatsTest.java @@ -18,11 +18,19 @@ */ package org.apache.pulsar.broker.stats; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Sets; import io.jsonwebtoken.Jwts; import io.jsonwebtoken.SignatureAlgorithm; +import java.security.KeyPair; +import java.security.KeyPairGenerator; +import java.security.NoSuchAlgorithmException; +import java.security.PrivateKey; +import java.time.Duration; +import java.util.Base64; +import java.util.Date; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.api.AuthenticationFactory; @@ -37,18 +45,6 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.security.KeyPair; -import java.security.KeyPairGenerator; -import java.security.NoSuchAlgorithmException; -import java.security.PrivateKey; -import java.time.Duration; -import java.util.Base64; -import java.util.Date; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Properties; -import java.util.Set; - public class AuthenticatedConsumerStatsTest extends ConsumerStatsTest{ private final String ADMIN_TOKEN; private final String TOKEN_PUBLIC_KEY; @@ -115,32 +111,6 @@ protected void setup() throws Exception { @Test public void testConsumerStatsOutput() throws Exception { - Set allowedFields = Sets.newHashSet( - "msgRateOut", - "msgThroughputOut", - "bytesOutCounter", - "msgOutCounter", - "messageAckRate", - "msgRateRedeliver", - "chunkedMessageRate", - "consumerName", - "availablePermits", - "unackedMessages", - "avgMessagesPerEntry", - "blockedConsumerOnUnackedMsgs", - "readPositionWhenJoining", - "lastAckedTime", - "lastAckedTimestamp", - "lastConsumedTime", - "lastConsumedTimestamp", - "lastConsumedFlowTimestamp", - "keyHashRanges", - "metadata", - "address", - "connectedSince", - "clientVersion", - "appId"); - final String topicName = "persistent://public/default/testConsumerStatsOutput"; final String subName = "my-subscription"; @@ -154,13 +124,6 @@ public void testConsumerStatsOutput() throws Exception { ObjectMapper mapper = ObjectMapperFactory.create(); ConsumerStats consumerStats = stats.getSubscriptions() .get(subName).getConsumers().get(0); - Assert.assertTrue(consumerStats.getLastConsumedFlowTimestamp() > 0); - JsonNode node = mapper.readTree(mapper.writer().writeValueAsString(consumerStats)); - Iterator itr = node.fieldNames(); - while (itr.hasNext()) { - String field = itr.next(); - Assert.assertTrue(allowedFields.contains(field), field + " should not be exposed"); - } // assert that role is exposed Assert.assertEquals(consumerStats.getAppId(), "admin"); consumer.close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java index 5b2998216e8e1..59a911500e5d9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java @@ -18,9 +18,12 @@ */ package org.apache.pulsar.broker.stats; +import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName; import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric; import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.InstanceOfAssertFactories.INTEGER; import static org.mockito.Mockito.mock; import static org.testng.Assert.assertNotEquals; import static org.testng.AssertJUnit.assertEquals; @@ -31,22 +34,29 @@ import java.io.ByteArrayOutputStream; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; -import java.util.Iterator; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import lombok.Cleanup; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.PrometheusMetricsTestUtil; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.PendingAcksMap; +import org.apache.pulsar.broker.service.StickyKeyConsumerSelector; +import org.apache.pulsar.broker.service.StickyKeyDispatcher; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -67,13 +77,19 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.common.policies.data.ConsumerStats; +import org.apache.pulsar.common.policies.data.DrainingHash; +import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.assertj.core.groups.Tuple; +import org.awaitility.Awaitility; import org.testng.Assert; +import org.testng.SkipException; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Slf4j @@ -218,9 +234,24 @@ public void testUpdateStatsForActiveConsumerAndSubscription() throws Exception { Assert.assertEquals(updatedStats.getBytesOutCounter(), 1280); } - @Test - public void testConsumerStatsOutput() throws Exception { - Set allowedFields = Sets.newHashSet( + @DataProvider(name = "classicAndSubscriptionType") + public Object[][] classicAndSubscriptionType() { + return new Object[][]{ + {false, SubscriptionType.Shared}, + {true, SubscriptionType.Key_Shared}, + {false, SubscriptionType.Key_Shared} + }; + } + + @Test(dataProvider = "classicAndSubscriptionType") + public void testConsumerStatsOutput(boolean classicDispatchers, SubscriptionType subscriptionType) + throws Exception { + if (this instanceof AuthenticatedConsumerStatsTest) { + throw new SkipException("Skip test for AuthenticatedConsumerStatsTest"); + } + conf.setSubscriptionSharedUseClassicPersistentImplementation(classicDispatchers); + conf.setSubscriptionKeySharedUseClassicPersistentImplementation(classicDispatchers); + Set expectedFields = Sets.newHashSet( "msgRateOut", "msgThroughputOut", "bytesOutCounter", @@ -233,21 +264,56 @@ public void testConsumerStatsOutput() throws Exception { "unackedMessages", "avgMessagesPerEntry", "blockedConsumerOnUnackedMsgs", - "readPositionWhenJoining", "lastAckedTime", "lastAckedTimestamp", "lastConsumedTime", "lastConsumedTimestamp", "lastConsumedFlowTimestamp", - "keyHashRanges", "metadata", "address", "connectedSince", - "clientVersion"); + "clientVersion", + "drainingHashesCount", + "drainingHashesClearedTotal", + "drainingHashesUnackedMessages" + ); + if (subscriptionType == SubscriptionType.Key_Shared) { + if (classicDispatchers) { + expectedFields.addAll(List.of( + "readPositionWhenJoining", + "keyHashRanges" + )); + } else { + expectedFields.addAll(List.of( + "drainingHashes", + "keyHashRangeArrays" + )); + } + } + final String topicName = newUniqueName("persistent://my-property/my-ns/testConsumerStatsOutput"); + final String subName = "my-subscription"; + + @Cleanup + Consumer consumer = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionType(subscriptionType) + .subscriptionName(subName) + .subscribe(); + + String topicStatsUri = + String.format("%s/admin/v2/%s/stats", pulsar.getWebServiceAddress(), topicName.replace("://", "/")); + String topicStatsJson = BrokerTestUtil.getJsonResourceAsString(topicStatsUri); + ObjectMapper mapper = ObjectMapperFactory.create(); + JsonNode node = mapper.readTree(topicStatsJson).get("subscriptions").get(subName).get("consumers").get(0); + assertThat(node.fieldNames()).toIterable().containsExactlyInAnyOrderElementsOf(expectedFields); + } - final String topicName = "persistent://prop/use/ns-abc/testConsumerStatsOutput"; + @Test + public void testLastConsumerFlowTimestamp() throws PulsarClientException, PulsarAdminException { + final String topicName = newUniqueName("persistent://my-property/my-ns/testLastConsumerFlowTimestamp"); final String subName = "my-subscription"; + @Cleanup Consumer consumer = pulsarClient.newConsumer() .topic(topicName) .subscriptionType(SubscriptionType.Shared) @@ -255,18 +321,9 @@ public void testConsumerStatsOutput() throws Exception { .subscribe(); TopicStats stats = admin.topics().getStats(topicName); - ObjectMapper mapper = ObjectMapperFactory.create(); ConsumerStats consumerStats = stats.getSubscriptions() .get(subName).getConsumers().get(0); Assert.assertTrue(consumerStats.getLastConsumedFlowTimestamp() > 0); - JsonNode node = mapper.readTree(mapper.writer().writeValueAsString(consumerStats)); - Iterator itr = node.fieldNames(); - while (itr.hasNext()) { - String field = itr.next(); - Assert.assertTrue(allowedFields.contains(field), field + " should not be exposed"); - } - - consumer.close(); } @@ -481,4 +538,189 @@ public void testNonPersistentTopicSharedSubscriptionUnackedMessages() throws Exc assertEquals(0, consumers.get(0).getUnackedMessages()); } + @Test + public void testKeySharedDrainingHashesConsumerStats() throws Exception { + String topic = newUniqueName("testKeySharedDrainingHashesConsumerStats"); + String subscriptionName = "sub"; + int numberOfKeys = 10; + + // Create a producer for the topic + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .enableBatching(false) + .create(); + + // Create the first consumer (c1) for the topic + @Cleanup + Consumer c1 = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .consumerName("c1") + .receiverQueueSize(100) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Key_Shared) + .subscribe(); + + // Get the dispatcher and selector for the topic + StickyKeyDispatcher dispatcher = getDispatcher(topic, subscriptionName); + StickyKeyConsumerSelector selector = dispatcher.getSelector(); + + // Send 20 messages with keys cycling from 0 to numberOfKeys-1 + for (int i = 0; i < 20; i++) { + String key = String.valueOf(i % numberOfKeys); + int stickyKeyHash = selector.makeStickyKeyHash(key.getBytes(StandardCharsets.UTF_8)); + log.info("Sending message with value {} key {} hash {}", key, i, stickyKeyHash); + producer.newMessage() + .key(key) + .value(i) + .send(); + } + + // Wait until all the already published messages have been pre-fetched by c1 + PendingAcksMap c1PendingAcks = dispatcher.getConsumers().get(0).getPendingAcks(); + Awaitility.await().ignoreExceptions().until(() -> c1PendingAcks.size() == 20); + + // Add a new consumer (c2) for the topic + @Cleanup + Consumer c2 = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .consumerName("c2") + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Key_Shared) + .subscribe(); + + // Get the subscription stats and consumer stats + SubscriptionStats subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName); + ConsumerStats c1Stats = subscriptionStats.getConsumers().get(0); + ConsumerStats c2Stats = subscriptionStats.getConsumers().get(1); + + Set c2HashesByStats = new HashSet<>(); + Set c2HashesByDispatcher = new HashSet<>(); + Map c1DrainingHashesExpected = new HashMap<>(); + + int expectedDrainingHashesUnackMessages = 0; + // Determine which hashes are assigned to c2 and which are draining from c1 + // run for the same keys as the sent messages + for (int i = 0; i < 20; i++) { + // use the same key as in the sent messages + String key = String.valueOf(i % numberOfKeys); + int hash = selector.makeStickyKeyHash(key.getBytes(StandardCharsets.UTF_8)); + // Validate that the hash is assigned to c2 in stats + if ("c2".equals(findConsumerNameForHash(subscriptionStats, hash))) { + c2HashesByStats.add(hash); + } + // use the selector to determine the expected draining hashes for c1 + org.apache.pulsar.broker.service.Consumer selected = selector.select(hash); + if ("c2".equals(selected.consumerName())) { + c2HashesByDispatcher.add(hash); + c1DrainingHashesExpected.compute(hash, (k, v) -> v == null ? 1 : v + 1); + expectedDrainingHashesUnackMessages++; + } + } + + // Validate that the hashes assigned to c2 match between stats and dispatcher + assertThat(c2HashesByStats).containsExactlyInAnyOrderElementsOf(c2HashesByDispatcher); + + // Validate the draining hashes for c1 + assertThat(c1Stats.getDrainingHashes()).extracting(DrainingHash::getHash) + .containsExactlyInAnyOrderElementsOf(c2HashesByStats); + assertThat(c1Stats.getDrainingHashes()).extracting(DrainingHash::getHash, DrainingHash::getUnackMsgs) + .containsExactlyInAnyOrderElementsOf(c1DrainingHashesExpected.entrySet().stream() + .map(e -> Tuple.tuple(e.getKey(), e.getValue())).toList()); + + // Validate that c2 has no draining hashes + assertThat(c2Stats.getDrainingHashes()).isEmpty(); + + // Validate counters + assertThat(c1Stats.getDrainingHashesCount()).isEqualTo(c2HashesByStats.size()); + assertThat(c1Stats.getDrainingHashesClearedTotal()).isEqualTo(0); + assertThat(c1Stats.getDrainingHashesUnackedMessages()).isEqualTo(expectedDrainingHashesUnackMessages); + assertThat(c2Stats.getDrainingHashesCount()).isEqualTo(0); + assertThat(c2Stats.getDrainingHashesClearedTotal()).isEqualTo(0); + assertThat(c2Stats.getDrainingHashesUnackedMessages()).isEqualTo(0); + + // Send another 20 messages + for (int i = 0; i < 20; i++) { + producer.newMessage() + .key(String.valueOf(i % numberOfKeys)) + .value(i) + .send(); + } + + // Validate blocked attempts for c1 + Awaitility.await().ignoreExceptions().untilAsserted(() -> { + SubscriptionStats stats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName); + assertThat(stats.getConsumers().get(0).getDrainingHashes()).isNotEmpty().allSatisfy(dh -> { + assertThat(dh).extracting(DrainingHash::getBlockedAttempts) + .asInstanceOf(INTEGER) + .isGreaterThan(0); + }); + }); + + // Acknowledge messages that were sent before c2 joined, to clear all draining hashes + for (int i = 0; i < 20; i++) { + Message message = c1.receive(1, TimeUnit.SECONDS); + log.info("Acking message with value {} key {}", message.getValue(), message.getKey()); + c1.acknowledge(message); + + if (i == 18) { + // Validate that there is one draining hash left + Awaitility.await().pollInterval(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(3)) + .untilAsserted(() -> { + SubscriptionStats stats = + admin.topics().getStats(topic).getSubscriptions().get(subscriptionName); + assertThat(stats.getConsumers().get(0)).satisfies(consumerStats -> { + assertThat(consumerStats) + .describedAs("Consumer stats should have one draining hash %s", consumerStats) + .extracting(ConsumerStats::getDrainingHashes) + .asList().hasSize(1); + }); + }); + } + + if (i == 19) { + // Validate that there are no draining hashes left + Awaitility.await().pollInterval(Duration.ofSeconds(1)).atMost(Duration.ofSeconds(3)) + .untilAsserted(() -> { + SubscriptionStats stats = + admin.topics().getStats(topic).getSubscriptions().get(subscriptionName); + assertThat(stats.getConsumers().get(0)).satisfies(consumerStats -> { + assertThat(consumerStats).extracting(ConsumerStats::getDrainingHashes) + .asList().isEmpty(); + }); + }); + } + } + + // Get the subscription stats and consumer stats + subscriptionStats = admin.topics().getStats(topic).getSubscriptions().get(subscriptionName); + c1Stats = subscriptionStats.getConsumers().get(0); + c2Stats = subscriptionStats.getConsumers().get(1); + + // Validate counters + assertThat(c1Stats.getDrainingHashesCount()).isEqualTo(0); + assertThat(c1Stats.getDrainingHashesClearedTotal()).isEqualTo(c2HashesByStats.size()); + assertThat(c1Stats.getDrainingHashesUnackedMessages()).isEqualTo(0); + assertThat(c2Stats.getDrainingHashesCount()).isEqualTo(0); + assertThat(c2Stats.getDrainingHashesClearedTotal()).isEqualTo(0); + assertThat(c2Stats.getDrainingHashesUnackedMessages()).isEqualTo(0); + + } + + private String findConsumerNameForHash(SubscriptionStats subscriptionStats, int hash) { + return findConsumerForHash(subscriptionStats, hash).map(ConsumerStats::getConsumerName).orElse(null); + } + + private Optional findConsumerForHash(SubscriptionStats subscriptionStats, int hash) { + return subscriptionStats.getConsumers().stream() + .filter(consumerStats -> consumerStats.getKeyHashRangeArrays().stream() + .anyMatch(hashRanges -> hashRanges[0] <= hash && hashRanges[1] >= hash)) + .findFirst(); + } + + @SneakyThrows + private StickyKeyDispatcher getDispatcher(String topic, String subscription) { + return (StickyKeyDispatcher) pulsar.getBrokerService().getTopicIfExists(topic).get() + .get().getSubscription(subscription).getDispatcher(); + } } diff --git a/pulsar-broker/src/test/resources/log4j2.xml b/pulsar-broker/src/test/resources/log4j2.xml index 09a89702ee2ac..a0732096f2845 100644 --- a/pulsar-broker/src/test/resources/log4j2.xml +++ b/pulsar-broker/src/test/resources/log4j2.xml @@ -36,5 +36,16 @@ + diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java index d2d3600df96ed..16dce5903f492 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java @@ -73,8 +73,41 @@ public interface ConsumerStats { boolean isBlockedConsumerOnUnackedMsgs(); /** The read position of the cursor when the consumer joining. */ + @Deprecated String getReadPositionWhenJoining(); + /** + * For Key_Shared subscription in AUTO_SPLIT ordered mode: + * Retrieves the current number of hashes in the draining state for this consumer. + * + * @return the current number of hashes in the draining state for this consumer + */ + int getDrainingHashesCount(); + + /** + * For Key_Shared subscription in AUTO_SPLIT ordered mode: + * Retrieves the total number of hashes cleared from the draining state since the consumer connected. + * + * @return the total number of hashes cleared from the draining state since the consumer connected + */ + long getDrainingHashesClearedTotal(); + + /** + * For Key_Shared subscription in AUTO_SPLIT ordered mode: + * Retrieves the total number of unacked messages for all draining hashes for this consumer. + * + * @return the total number of unacked messages for all draining hashes for this consumer + */ + int getDrainingHashesUnackedMessages(); + + /** + * For Key_Shared subscription in AUTO_SPLIT ordered mode: + * Retrieves the draining hashes for this consumer. + * + * @return a list of draining hashes for this consumer + */ + List getDrainingHashes(); + /** Address of this consumer. */ String getAddress(); @@ -88,9 +121,20 @@ public interface ConsumerStats { long getLastConsumedTimestamp(); long getLastConsumedFlowTimestamp(); - /** Hash ranges assigned to this consumer if is Key_Shared sub mode. **/ + /** + * Hash ranges assigned to this consumer if in Key_Shared subscription mode. + * This format and field is used when `subscriptionKeySharedUseClassicPersistentImplementation` is set to `false` + * (default). + */ + List getKeyHashRangeArrays(); + + /** + * Hash ranges assigned to this consumer if in Key_Shared subscription mode. + * This format and field is used when `subscriptionKeySharedUseClassicPersistentImplementation` is set to `true`. + */ + @Deprecated List getKeyHashRanges(); /** Metadata (key/value strings) associated with this consumer. */ Map getMetadata(); -} +} \ No newline at end of file diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/DrainingHash.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/DrainingHash.java new file mode 100644 index 0000000000000..685b0b74e64b9 --- /dev/null +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/DrainingHash.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; + +/** + * Contains information about a draining hash in a Key_Shared subscription. + * @see ConsumerStats + */ +public interface DrainingHash { + /** + * Get the sticky key hash value of the draining hash. + * @return the sticky hash value + */ + int getHash(); + /** + * Get number of unacknowledged messages for the draining hash. + * @return number of unacknowledged messages + */ + int getUnackMsgs(); + /** + * Get the number of times the hash has blocked an attempted delivery of a message. + * @return number of times the hash has blocked an attempted delivery of a message + */ + int getBlockedAttempts(); +} diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java index ce3a080a855da..95e7c65266bff 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java @@ -121,6 +121,30 @@ public interface SubscriptionStats { /** This is for Key_Shared subscription to get the recentJoinedConsumers in the Key_Shared subscription. */ Map getConsumersAfterMarkDeletePosition(); + /** + * For Key_Shared subscription in AUTO_SPLIT ordered mode: + * Retrieves the current number of hashes in the draining state. + * + * @return the current number of hashes in the draining state + */ + int getDrainingHashesCount(); + + /** + * For Key_Shared subscription in AUTO_SPLIT ordered mode: + * Retrieves the total number of hashes cleared from the draining state for the connected consumers. + * + * @return the total number of hashes cleared from the draining state for the connected consumers + */ + long getDrainingHashesClearedTotal(); + + /** + * For Key_Shared subscription in AUTO_SPLIT ordered mode: + * Retrieves the total number of unacked messages for all draining hashes. + * + * @return the total number of unacked messages for all draining hashes + */ + int getDrainingHashesUnackedMessages(); + /** SubscriptionProperties (key/value strings) associated with this subscribe. */ Map getSubscriptionProperties(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java index de36b330b7f1a..8811247cb2de3 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java @@ -23,6 +23,7 @@ import java.util.Objects; import lombok.Data; import org.apache.pulsar.common.policies.data.ConsumerStats; +import org.apache.pulsar.common.policies.data.DrainingHash; import org.apache.pulsar.common.util.DateFormatter; /** @@ -80,6 +81,30 @@ public class ConsumerStatsImpl implements ConsumerStats { /** The read position of the cursor when the consumer joining. */ public String readPositionWhenJoining; + /** + * For Key_Shared AUTO_SPLIT ordered subscriptions: The current number of hashes in the draining state. + */ + public int drainingHashesCount; + + /** + * For Key_Shared AUTO_SPLIT ordered subscriptions: The total number of hashes cleared from the draining state for + * the consumer. + */ + public long drainingHashesClearedTotal; + + /** + * For Key_Shared AUTO_SPLIT ordered subscriptions: The total number of unacked messages for all draining hashes. + */ + public int drainingHashesUnackedMessages; + + /** + * For Key_Shared subscription in AUTO_SPLIT ordered mode: + * Retrieves the draining hashes for this consumer. + * + * @return a list of draining hashes for this consumer + */ + public List drainingHashes; + /** Address of this consumer. */ private String address; /** Timestamp of connection. */ @@ -96,7 +121,17 @@ public class ConsumerStatsImpl implements ConsumerStats { public long lastConsumedFlowTimestamp; - /** Hash ranges assigned to this consumer if is Key_Shared sub mode. **/ + /** + * Hash ranges assigned to this consumer if in Key_Shared subscription mode. + * This format and field is used when `subscriptionKeySharedUseClassicPersistentImplementation` is set to `false` + * (default). + */ + public List keyHashRangeArrays; + + /** + * Hash ranges assigned to this consumer if in Key_Shared subscription mode. + * This format and field is used when `subscriptionKeySharedUseClassicPersistentImplementation` is set to `true`. + */ public List keyHashRanges; /** Metadata (key/value strings) associated with this consumer. */ @@ -114,6 +149,12 @@ public ConsumerStatsImpl add(ConsumerStatsImpl stats) { this.unackedMessages += stats.unackedMessages; this.blockedConsumerOnUnackedMsgs = stats.blockedConsumerOnUnackedMsgs; this.readPositionWhenJoining = stats.readPositionWhenJoining; + this.drainingHashesCount = stats.drainingHashesCount; + this.drainingHashesClearedTotal += stats.drainingHashesClearedTotal; + this.drainingHashesUnackedMessages = stats.drainingHashesUnackedMessages; + this.drainingHashes = stats.drainingHashes; + this.keyHashRanges = stats.keyHashRanges; + this.keyHashRangeArrays = stats.keyHashRangeArrays; return this; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/DrainingHashImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/DrainingHashImpl.java new file mode 100644 index 0000000000000..134bdac597b7c --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/DrainingHashImpl.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data.stats; + +import lombok.Data; +import org.apache.pulsar.common.policies.data.ConsumerStats; +import org.apache.pulsar.common.policies.data.DrainingHash; + +/** + * Contains information about a draining hash in a Key_Shared subscription. + * @see ConsumerStats + */ +@Data +public class DrainingHashImpl implements DrainingHash { + /** + * Get the sticky key hash value of the draining hash. + * @return the sticky hash value + */ + public int hash; + /** + * Get number of unacknowledged messages for the draining hash. + * @return number of unacknowledged messages + */ + public int unackMsgs; + /** + * Get the number of times the hash has blocked an attempted delivery of a message. + * @return number of times the hash has blocked an attempted delivery of a message + */ + public int blockedAttempts; +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java index 12734a5586cef..02df9b7870023 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java @@ -129,6 +129,22 @@ public class SubscriptionStatsImpl implements SubscriptionStats { /** This is for Key_Shared subscription to get the recentJoinedConsumers in the Key_Shared subscription. */ public Map consumersAfterMarkDeletePosition; + /** + * For Key_Shared AUTO_SPLIT ordered subscriptions: The current number of hashes in the draining state. + */ + public int drainingHashesCount; + + /** + * For Key_Shared AUTO_SPLIT ordered subscriptions: The total number of hashes cleared from the draining state + * for the connected consumers. + */ + public long drainingHashesClearedTotal; + + /** + * For Key_Shared AUTO_SPLIT ordered subscriptions: The total number of unacked messages for all draining hashes. + */ + public int drainingHashesUnackedMessages; + /** The number of non-contiguous deleted messages ranges. */ public int nonContiguousDeletedMessagesRanges; @@ -180,6 +196,9 @@ public void reset() { lastMarkDeleteAdvancedTimestamp = 0L; consumers.clear(); consumersAfterMarkDeletePosition.clear(); + drainingHashesCount = 0; + drainingHashesClearedTotal = 0L; + drainingHashesUnackedMessages = 0; nonContiguousDeletedMessagesRanges = 0; nonContiguousDeletedMessagesRangesSerializedSize = 0; earliestMsgPublishTimeInBacklog = 0L; @@ -226,6 +245,9 @@ public SubscriptionStatsImpl add(SubscriptionStatsImpl stats) { } this.allowOutOfOrderDelivery |= stats.allowOutOfOrderDelivery; this.consumersAfterMarkDeletePosition.putAll(stats.consumersAfterMarkDeletePosition); + this.drainingHashesCount += stats.drainingHashesCount; + this.drainingHashesClearedTotal += stats.drainingHashesClearedTotal; + this.drainingHashesUnackedMessages += stats.drainingHashesUnackedMessages; this.nonContiguousDeletedMessagesRanges += stats.nonContiguousDeletedMessagesRanges; this.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize; if (this.earliestMsgPublishTimeInBacklog != 0 && stats.earliestMsgPublishTimeInBacklog != 0) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java index 7b235cfa341d1..b737d68d5ea9f 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java @@ -56,6 +56,7 @@ import org.apache.pulsar.common.policies.data.ConsumerStats; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.DispatchRate; +import org.apache.pulsar.common.policies.data.DrainingHash; import org.apache.pulsar.common.policies.data.FailureDomain; import org.apache.pulsar.common.policies.data.FailureDomainImpl; import org.apache.pulsar.common.policies.data.FunctionInstanceStats; @@ -96,6 +97,7 @@ import org.apache.pulsar.common.policies.data.impl.DelayedDeliveryPoliciesImpl; import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl; import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl; +import org.apache.pulsar.common.policies.data.stats.DrainingHashImpl; import org.apache.pulsar.common.policies.data.stats.NonPersistentPartitionedTopicStatsImpl; import org.apache.pulsar.common.policies.data.stats.NonPersistentPublisherStatsImpl; import org.apache.pulsar.common.policies.data.stats.NonPersistentReplicatorStatsImpl; @@ -243,6 +245,7 @@ private static void setAnnotationsModule(ObjectMapper mapper) { resolver.addMapping(DispatchRate.class, DispatchRateImpl.class); resolver.addMapping(TopicStats.class, TopicStatsImpl.class); resolver.addMapping(ConsumerStats.class, ConsumerStatsImpl.class); + resolver.addMapping(DrainingHash.class, DrainingHashImpl.class); resolver.addMapping(NonPersistentPublisherStats.class, NonPersistentPublisherStatsImpl.class); resolver.addMapping(NonPersistentReplicatorStats.class, NonPersistentReplicatorStatsImpl.class); resolver.addMapping(NonPersistentSubscriptionStats.class, NonPersistentSubscriptionStatsImpl.class);