diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java index e54a3710e1294..524d47e7c1b92 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java @@ -360,7 +360,7 @@ public static void printTopicStats(PrometheusMetricStreams stream, TopicStats st writeSubscriptionMetric(stream, "pulsar_subscription_filter_rescheduled_msg_count", subsStats.filterRescheduledMsgCount, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel); - writeSubscriptionMetric(stream, "pulsar_delayed_message_index_size_bytes", + writeSubscriptionMetric(stream, "pulsar_subscription_delayed_message_index_size_bytes", subsStats.delayedMessageIndexSizeInBytes, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index 903443d37bb07..1196161711224 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -424,10 +424,11 @@ public void testDelayedDeliveryTrackerMemoryUsageMetric(String topic, boolean ex @Cleanup Producer producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create(); + String subName = "test_sub"; @Cleanup Consumer consumer = client.newConsumer(Schema.STRING) .topic(topic) - .subscriptionName("test_sub") + .subscriptionName(subName) .subscriptionType(SubscriptionType.Shared) .messageListener((MessageListener) (consumer1, msg) -> { try { @@ -453,7 +454,13 @@ public void testDelayedDeliveryTrackerMemoryUsageMetric(String topic, boolean ex Multimap metricsMap = parseMetrics(metricsStr); Collection metrics = metricsMap.get("pulsar_delayed_message_index_size_bytes"); - Assert.assertTrue(metrics.size() > 0); + Collection subMetrics = metricsMap.get("pulsar_subscription_delayed_message_index_size_bytes"); + assertFalse(metrics.isEmpty()); + if (exposeTopicLevelMetrics) { + assertFalse(subMetrics.isEmpty()); + } else { + assertTrue(subMetrics.isEmpty()); + } int topicLevelNum = 0; int namespaceLevelNum = 0; @@ -462,14 +469,20 @@ public void testDelayedDeliveryTrackerMemoryUsageMetric(String topic, boolean ex if (exposeTopicLevelMetrics && metric.tags.get("topic").equals(topic)) { Assert.assertTrue(metric.value > 0); topicLevelNum++; - if ("test_sub".equals(metric.tags.get("subscription"))) { - subscriptionLevelNum++; - } } else if (!exposeTopicLevelMetrics && metric.tags.get("namespace").equals(namespace)) { Assert.assertTrue(metric.value > 0); namespaceLevelNum++; } } + if (exposeTopicLevelMetrics) { + for (Metric metric : subMetrics) { + if (metric.tags.get("topic").equals(topic) && + subName.equals(metric.tags.get("subscription"))) { + Assert.assertTrue(metric.value > 0); + subscriptionLevelNum++; + } + } + } if (exposeTopicLevelMetrics) { Assert.assertTrue(topicLevelNum > 0);