From ab5c57eb3c25d202133fa8e1e818115e7855accc Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon" Date: Fri, 18 Oct 2024 16:39:47 -0400 Subject: [PATCH] Add metrics to track time compaction jobs are queued (#4980) This adds 2 new groups of stats to track information about queued compaction jobs. The first stat is a timer that keeps track of when jobs are being polled and give information on how often/fast jobs are exiting the queue. The second group of stats is a min/max/avg and is tracking age information about how long jobs are waiting on the queue. This closes #4945 --- .../apache/accumulo/core/metrics/Metric.java | 12 ++ .../coordinator/CompactionCoordinator.java | 2 +- .../compaction/coordinator/QueueMetrics.java | 36 ++++++ .../queue/CompactionJobPriorityQueue.java | 106 ++++++++++++++++-- .../queue/CompactionJobPriorityQueueTest.java | 40 ++++++- .../queue/CompactionJobQueuesTest.java | 8 ++ .../ExternalCompactionMetricsIT.java | 28 ++++- 7 files changed, 219 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java b/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java index 92690123c68..7050b733476 100644 --- a/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java +++ b/core/src/main/java/org/apache/accumulo/core/metrics/Metric.java @@ -51,6 +51,18 @@ public enum Metric { MetricType.GAUGE, "Count of rejected jobs.", MetricCategory.COMPACTOR), COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY("accumulo.compactor.queue.jobs.priority", MetricType.GAUGE, "Lowest priority queued job.", MetricCategory.COMPACTOR), + COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MIN_AGE("accumulo.compactor.queue.jobs.min.age", + MetricType.GAUGE, "Minimum age of currently queued jobs in seconds.", + MetricCategory.COMPACTOR), + COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MAX_AGE("accumulo.compactor.queue.jobs.max.age", + MetricType.GAUGE, "Maximum age of currently queued jobs in seconds.", + MetricCategory.COMPACTOR), + COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_AVG_AGE("accumulo.compactor.queue.jobs.avg.age", + MetricType.GAUGE, "Average age of currently queued jobs in seconds.", + MetricCategory.COMPACTOR), + COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_POLL_TIMER("accumulo.compactor.queue.jobs.exit.time", + MetricType.TIMER, "Tracks time a job spent in the queue before exiting the queue.", + MetricCategory.COMPACTOR), // Fate Metrics FATE_TYPE_IN_PROGRESS("accumulo.fate.ops.in.progress.by.type", MetricType.GAUGE, diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 272395354dd..3de05a75785 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -1049,7 +1049,7 @@ private void cleanUpEmptyCompactorPathInZK() { // associated priority queue of jobs CompactionJobPriorityQueue queue = getJobQueues().getQueue(cgid); if (queue != null) { - queue.clear(); + queue.clearIfInactive(Duration.ofMinutes(10)); queue.setMaxSize(this.jobQueueInitialSize); } } else { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java index eb2b9800c20..dd2705eb18b 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/QueueMetrics.java @@ -19,7 +19,11 @@ package org.apache.accumulo.manager.compaction.coordinator; import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUES; +import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_AVG_AGE; import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED; +import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MAX_AGE; +import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MIN_AGE; +import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_POLL_TIMER; import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY; import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED; import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED; @@ -48,6 +52,7 @@ import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.Timer; public class QueueMetrics implements MetricsProducer { @@ -57,6 +62,10 @@ private static class QueueMeters { private final Gauge jobsDequeued; private final Gauge jobsRejected; private final Gauge jobsLowestPriority; + private final Gauge jobsMinAge; + private final Gauge jobsMaxAge; + private final Gauge jobsAvgAge; + private final Timer jobsQueueTimer; public QueueMeters(MeterRegistry meterRegistry, CompactorGroupId cgid, CompactionJobPriorityQueue queue) { @@ -90,6 +99,29 @@ public QueueMeters(MeterRegistry meterRegistry, CompactorGroupId cgid, q -> q.getLowestPriority()) .description(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY.getDescription()) .tags(List.of(Tag.of("queue.id", queueId))).register(meterRegistry); + + jobsMinAge = Gauge + .builder(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MIN_AGE.getName(), queue, + q -> q.getJobQueueStats().getMinAge().toSeconds()) + .description(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MIN_AGE.getDescription()) + .tags(List.of(Tag.of("queue.id", queueId))).register(meterRegistry); + + jobsMaxAge = Gauge + .builder(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MAX_AGE.getName(), queue, + q -> q.getJobQueueStats().getMaxAge().toSeconds()) + .description(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MAX_AGE.getDescription()) + .tags(List.of(Tag.of("queue.id", queueId))).register(meterRegistry); + + jobsAvgAge = Gauge.builder(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_AVG_AGE.getName(), queue, + // Divide by 1000.0 instead of using toSeconds() so we get a double + q -> q.getJobQueueStats().getAvgAge().toMillis() / 1000.0) + .description(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_AVG_AGE.getDescription()) + .tags(List.of(Tag.of("queue.id", queueId))).register(meterRegistry); + + jobsQueueTimer = Timer.builder(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_POLL_TIMER.getName()) + .description(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_POLL_TIMER.getDescription()) + .tags(List.of(Tag.of("queue.id", queueId))).register(meterRegistry); + queue.setJobQueueTimerCallback(jobsQueueTimer); } private void removeMeters(MeterRegistry registry) { @@ -98,6 +130,10 @@ private void removeMeters(MeterRegistry registry) { registry.remove(jobsDequeued); registry.remove(jobsRejected); registry.remove(jobsLowestPriority); + registry.remove(jobsMinAge); + registry.remove(jobsMaxAge); + registry.remove(jobsAvgAge); + registry.remove(jobsQueueTimer); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java index 9e2ddbc96a5..2099dbed6d2 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkState; +import java.time.Duration; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; @@ -28,23 +29,31 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.spi.compaction.CompactionJob; import org.apache.accumulo.core.spi.compaction.CompactorGroupId; +import org.apache.accumulo.core.util.Stat; +import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.compaction.CompactionJobPrioritizer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; /** * Priority Queue for {@link CompactionJob}s that supports a maximum size. When a job is added and @@ -113,6 +122,9 @@ public boolean equals(Object o) { private final AtomicLong dequeuedJobs; private final ArrayDeque> futures; private long futuresAdded = 0; + private final Map jobAges; + private final Supplier jobQueueStats; + private final AtomicReference> jobQueueTimer; private static class TabletJobs { final long generation; @@ -138,6 +150,10 @@ public CompactionJobPriorityQueue(CompactorGroupId groupId, int maxSize) { this.rejectedJobs = new AtomicLong(0); this.dequeuedJobs = new AtomicLong(0); this.futures = new ArrayDeque<>(); + this.jobAges = new ConcurrentHashMap<>(); + this.jobQueueStats = Suppliers.memoizeWithExpiration( + () -> new CompactionJobPriorityQueueStats(jobAges), 5, TimeUnit.SECONDS); + this.jobQueueTimer = new AtomicReference<>(Optional.empty()); } public synchronized void removeOlderGenerations(Ample.DataLevel level, long currGeneration) { @@ -154,7 +170,8 @@ public synchronized void removeOlderGenerations(Ample.DataLevel level, long curr removals.size(), groupId, level); } - removals.forEach(this::removePreviousSubmissions); + // Also clears jobAge timer for tablets that do not need compaction anymore + removals.forEach(ke -> removePreviousSubmissions(ke, true)); } /** @@ -164,7 +181,10 @@ public synchronized int add(TabletMetadata tabletMetadata, Collection job.getGroup().equals(groupId))); - removePreviousSubmissions(tabletMetadata.getExtent()); + // Do not clear jobAge timers, they are cleared later at the end of this method + // if there are no jobs for the extent so we do not reset the timer for an extent + // that had previous jobs and still has jobs + removePreviousSubmissions(tabletMetadata.getExtent(), false); HashSet newEntries = new HashSet<>(jobs.size()); @@ -175,9 +195,14 @@ public synchronized int add(TabletMetadata tabletMetadata, Collection jqt.record(Duration.ZERO)); continue outer; } // else the future was canceled or timed out so could not complete it future = futures.poll(); @@ -197,6 +222,9 @@ public synchronized int add(TabletMetadata tabletMetadata, Collection Timer.startNew()); + } else { + jobAges.remove(tabletMetadata.getExtent()); } return jobsAdded; @@ -235,10 +263,19 @@ public synchronized CompactionJobQueues.MetaJob poll() { if (first != null) { dequeuedJobs.getAndIncrement(); var extent = first.getValue().getTabletMetadata().getExtent(); - Set jobs = tabletJobs.get(extent).jobs; + var timer = jobAges.get(extent); + checkState(timer != null); + jobQueueTimer.get().ifPresent(jqt -> jqt.record(timer.elapsed())); + log.trace("Compaction job age for {} is {} ms", extent, timer.elapsed(TimeUnit.MILLISECONDS)); + Set jobs = tabletJobs.get(extent).jobs; checkState(jobs.remove(first.getKey())); + // If there are no more jobs for this extent we can remove the timer, otherwise + // we need to reset it if (jobs.isEmpty()) { tabletJobs.remove(extent); + jobAges.remove(extent); + } else { + timer.restart(); } } return first == null ? null : first.getValue(); @@ -280,11 +317,15 @@ synchronized CompactionJobQueues.MetaJob peek() { return firstEntry == null ? null : firstEntry.getValue(); } - private void removePreviousSubmissions(KeyExtent extent) { - TabletJobs prevJobs = tabletJobs.get(extent); + private void removePreviousSubmissions(KeyExtent extent, boolean removeJobAges) { + CompactionJobPriorityQueue.TabletJobs prevJobs = tabletJobs.get(extent); if (prevJobs != null) { prevJobs.jobs.forEach(jobQueue::remove); tabletJobs.remove(extent); + if (removeJobAges) { + jobAges.remove(extent); + log.trace("Removed jobAge timer for tablet {} that no longer needs compaction", extent); + } } } @@ -302,7 +343,6 @@ private CjpqKey addJobToQueue(TabletMetadata tabletMetadata, CompactionJob job) rejectedJobs.getAndIncrement(); } } - } var key = new CjpqKey(job); @@ -310,8 +350,56 @@ private CjpqKey addJobToQueue(TabletMetadata tabletMetadata, CompactionJob job) return key; } - public synchronized void clear() { - jobQueue.clear(); - tabletJobs.clear(); + public synchronized void clearIfInactive(Duration duration) { + // IF the minimum age of jobs in the queue is older than the + // duration then clear all the maps as this queue is now + // considered inactive + if (getJobQueueStats().getMinAge().compareTo(duration) > 0) { + jobQueue.clear(); + tabletJobs.clear(); + jobAges.clear(); + } + } + + public CompactionJobPriorityQueueStats getJobQueueStats() { + return jobQueueStats.get(); + } + + public void setJobQueueTimerCallback(io.micrometer.core.instrument.Timer jobQueueTimer) { + this.jobQueueTimer.set(Optional.of(jobQueueTimer)); + } + + // Used for unit testing, can return the map as is because + // it is a ConcurrentHashMap + @VisibleForTesting + Map getJobAges() { + return jobAges; + } + + public static class CompactionJobPriorityQueueStats { + private final Duration minAge; + private final Duration maxAge; + private final Duration avgAge; + + @VisibleForTesting + CompactionJobPriorityQueueStats(Map jobAges) { + final Stat stats = new Stat(); + jobAges.values().forEach(t -> stats.addStat(t.elapsed(TimeUnit.MILLISECONDS))); + this.minAge = Duration.ofMillis(stats.min()); + this.maxAge = Duration.ofMillis(stats.max()); + this.avgAge = Duration.ofMillis(Math.round(stats.mean())); + } + + public Duration getMinAge() { + return minAge; + } + + public Duration getMaxAge() { + return maxAge; + } + + public Duration getAvgAge() { + return avgAge; + } } } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java index 05e0b35c55b..2592be35aab 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java @@ -33,11 +33,13 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.CompactableFileImpl; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.spi.compaction.CompactionJob; import org.apache.accumulo.core.spi.compaction.CompactorGroupId; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.compaction.CompactionJobPrioritizer; +import org.apache.accumulo.manager.compaction.queue.CompactionJobPriorityQueue.CompactionJobPriorityQueueStats; import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues.MetaJob; import org.apache.hadoop.io.Text; import org.easymock.EasyMock; @@ -193,21 +195,26 @@ public void testAddMoreThanMax() { assertEquals(0, queue.getDequeuedJobs()); assertEquals(1, queue.getRejectedJobs()); assertEquals(2, queue.getQueuedJobs()); + // One tablet was added with jobs + assertEquals(1, queue.getJobAges().size()); MetaJob job = queue.poll(); assertEquals(cj1, job.getJob()); assertEquals(tm, job.getTabletMetadata()); assertEquals(1, queue.getDequeuedJobs()); + // still 1 job left so should still have a timer + assertEquals(1, queue.getJobAges().size()); job = queue.poll(); assertEquals(cj2, job.getJob()); assertEquals(tm, job.getTabletMetadata()); assertEquals(2, queue.getDequeuedJobs()); + // no more jobs so timer should be gone + assertTrue(queue.getJobAges().isEmpty()); job = queue.poll(); assertNull(job); assertEquals(2, queue.getDequeuedJobs()); - } private static int counter = 1; @@ -251,6 +258,14 @@ public void test() { assertEquals(100, queue.getMaxSize()); assertEquals(100, queue.getQueuedJobs()); assertEquals(900, queue.getRejectedJobs()); + // There should be 1000 total job ages even though 900 were rejected + // as there were 1000 total tablets added + assertEquals(1000, queue.getJobAges().size()); + + var stats = queue.getJobQueueStats(); + assertTrue(stats.getMinAge().toMillis() > 0); + assertTrue(stats.getMaxAge().toMillis() > 0); + assertTrue(stats.getAvgAge().toMillis() > 0); // iterate over the expected set and make sure that they next job in the queue // matches @@ -266,6 +281,29 @@ public void test() { } assertEquals(100, matchesSeen); + // Should be 900 left as the 100 that were polled would clear as there are no more + // jobs for those tablets. These 900 were rejected so their timers remain and will + // be cleared if there are no computed jobs when jobs are added again or by + // the call to removeOlderGenerations() + assertEquals(900, queue.getJobAges().size()); + + // Create new stats directly vs using queue.getJobQueueStats() because that method + // caches the results for a short period + stats = new CompactionJobPriorityQueueStats(queue.getJobAges()); + assertTrue(stats.getMinAge().toMillis() > 0); + assertTrue(stats.getMaxAge().toMillis() > 0); + assertTrue(stats.getAvgAge().toMillis() > 0); + + // Verify jobAges cleared when calling removeOlderGenerations() + queue.removeOlderGenerations(DataLevel.USER, 2); + + // Stats should be 0 if no jobs + var jobAges = queue.getJobAges(); + assertTrue(jobAges.isEmpty()); + stats = new CompactionJobPriorityQueueStats(queue.getJobAges()); + assertEquals(0, stats.getMinAge().toMillis()); + assertEquals(0, stats.getMaxAge().toMillis()); + assertEquals(0, stats.getAvgAge().toMillis()); } /** diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java index 09ae416091e..bba27daf67c 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java @@ -362,12 +362,20 @@ public void testGetAsync() throws Exception { jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1))); jobQueues.add(tm2, List.of(newJob((short) 2, 6, cg1))); + // Futures were immediately completed so nothing should be queued + assertTrue(jobQueues.getQueue(cg1).getJobAges().isEmpty()); + jobQueues.add(tm3, List.of(newJob((short) 3, 7, cg1))); jobQueues.add(tm4, List.of(newJob((short) 4, 8, cg1))); + // No futures available, so jobAges should exist for 2 tablets + assertEquals(2, jobQueues.getQueue(cg1).getJobAges().size()); var future3 = jobQueues.getAsync(cg1); var future4 = jobQueues.getAsync(cg1); + // Should be back to 0 size after futures complete + assertTrue(jobQueues.getQueue(cg1).getJobAges().isEmpty()); + assertTrue(future1.isDone()); assertTrue(future2.isDone()); assertTrue(future3.isDone()); diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionMetricsIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionMetricsIT.java index 494e9fe6ca3..3245e6bf24e 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionMetricsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionMetricsIT.java @@ -18,6 +18,10 @@ */ package org.apache.accumulo.test.compaction; +import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_AVG_AGE; +import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MAX_AGE; +import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MIN_AGE; +import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_POLL_TIMER; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP1; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP2; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP3; @@ -151,7 +155,6 @@ public void testMetrics() throws Exception { boolean sawDCQ1_5 = false; boolean sawDCQ2_10 = false; - // wait until expected number of queued are seen in metrics while (!sawDCQ1_5 || !sawDCQ2_10) { Metric qm = queueMetrics.take(); @@ -165,12 +168,22 @@ public void testMetrics() throws Exception { boolean sawDCQ1_0 = false; boolean sawDCQ2_0 = false; + boolean minDCQ1 = false; + boolean maxDCQ1 = false; + boolean avgDCQ1 = false; + boolean timerDCQ1 = false; // wait until queued goes to zero in metrics - while (!sawDCQ1_0 || !sawDCQ2_0) { + // and verify stats are positive values + while (!sawDCQ1_0 || !sawDCQ2_0 || !minDCQ1 || !maxDCQ1 || !avgDCQ1 || !timerDCQ1) { Metric qm = queueMetrics.take(); sawDCQ1_0 |= match(qm, "dcq1", "0"); sawDCQ2_0 |= match(qm, "dcq2", "0"); + minDCQ1 |= assertMetric(qm, "dcq1", COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MIN_AGE.getName()); + maxDCQ1 |= assertMetric(qm, "dcq1", COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MAX_AGE.getName()); + avgDCQ1 |= assertMetric(qm, "dcq1", COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_AVG_AGE.getName()); + timerDCQ1 |= + assertMetric(qm, "dcq1", COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_POLL_TIMER.getName()); } shutdownTailer.set(true); @@ -204,4 +217,15 @@ private static boolean match(Metric input, String queue, String value) { return false; } + private static boolean assertMetric(Metric input, String queue, String name) { + if (input.getTags() != null) { + String id = input.getTags().get("queue.id"); + if (id != null && id.equals(queue) && input.getName().equals(name) + && Double.parseDouble(input.getValue()) > 0) { + return true; + } + } + return false; + } + }