Skip to content

Commit

Permalink
Add metrics to track time compaction jobs are queued (#4980)
Browse files Browse the repository at this point in the history
This adds 2 new groups of stats to track information about queued
compaction jobs. The first stat is a timer that keeps track of when jobs
are being polled and give information on how often/fast jobs are
exiting the queue. The second group of stats is a min/max/avg
and is tracking age information about how long jobs are waiting
on the queue.

This closes #4945
  • Loading branch information
cshannon authored Oct 18, 2024
1 parent af8a11c commit ab5c57e
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 13 deletions.
12 changes: 12 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ public enum Metric {
MetricType.GAUGE, "Count of rejected jobs.", MetricCategory.COMPACTOR),
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY("accumulo.compactor.queue.jobs.priority",
MetricType.GAUGE, "Lowest priority queued job.", MetricCategory.COMPACTOR),
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MIN_AGE("accumulo.compactor.queue.jobs.min.age",
MetricType.GAUGE, "Minimum age of currently queued jobs in seconds.",
MetricCategory.COMPACTOR),
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MAX_AGE("accumulo.compactor.queue.jobs.max.age",
MetricType.GAUGE, "Maximum age of currently queued jobs in seconds.",
MetricCategory.COMPACTOR),
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_AVG_AGE("accumulo.compactor.queue.jobs.avg.age",
MetricType.GAUGE, "Average age of currently queued jobs in seconds.",
MetricCategory.COMPACTOR),
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_POLL_TIMER("accumulo.compactor.queue.jobs.exit.time",
MetricType.TIMER, "Tracks time a job spent in the queue before exiting the queue.",
MetricCategory.COMPACTOR),

// Fate Metrics
FATE_TYPE_IN_PROGRESS("accumulo.fate.ops.in.progress.by.type", MetricType.GAUGE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1049,7 +1049,7 @@ private void cleanUpEmptyCompactorPathInZK() {
// associated priority queue of jobs
CompactionJobPriorityQueue queue = getJobQueues().getQueue(cgid);
if (queue != null) {
queue.clear();
queue.clearIfInactive(Duration.ofMinutes(10));
queue.setMaxSize(this.jobQueueInitialSize);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
package org.apache.accumulo.manager.compaction.coordinator;

import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUES;
import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_AVG_AGE;
import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_DEQUEUED;
import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MAX_AGE;
import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MIN_AGE;
import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_POLL_TIMER;
import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY;
import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED;
import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED;
Expand Down Expand Up @@ -48,6 +52,7 @@
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;

public class QueueMetrics implements MetricsProducer {

Expand All @@ -57,6 +62,10 @@ private static class QueueMeters {
private final Gauge jobsDequeued;
private final Gauge jobsRejected;
private final Gauge jobsLowestPriority;
private final Gauge jobsMinAge;
private final Gauge jobsMaxAge;
private final Gauge jobsAvgAge;
private final Timer jobsQueueTimer;

public QueueMeters(MeterRegistry meterRegistry, CompactorGroupId cgid,
CompactionJobPriorityQueue queue) {
Expand Down Expand Up @@ -90,6 +99,29 @@ public QueueMeters(MeterRegistry meterRegistry, CompactorGroupId cgid,
q -> q.getLowestPriority())
.description(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY.getDescription())
.tags(List.of(Tag.of("queue.id", queueId))).register(meterRegistry);

jobsMinAge = Gauge
.builder(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MIN_AGE.getName(), queue,
q -> q.getJobQueueStats().getMinAge().toSeconds())
.description(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MIN_AGE.getDescription())
.tags(List.of(Tag.of("queue.id", queueId))).register(meterRegistry);

jobsMaxAge = Gauge
.builder(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MAX_AGE.getName(), queue,
q -> q.getJobQueueStats().getMaxAge().toSeconds())
.description(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_MAX_AGE.getDescription())
.tags(List.of(Tag.of("queue.id", queueId))).register(meterRegistry);

jobsAvgAge = Gauge.builder(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_AVG_AGE.getName(), queue,
// Divide by 1000.0 instead of using toSeconds() so we get a double
q -> q.getJobQueueStats().getAvgAge().toMillis() / 1000.0)
.description(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_AVG_AGE.getDescription())
.tags(List.of(Tag.of("queue.id", queueId))).register(meterRegistry);

jobsQueueTimer = Timer.builder(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_POLL_TIMER.getName())
.description(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_POLL_TIMER.getDescription())
.tags(List.of(Tag.of("queue.id", queueId))).register(meterRegistry);
queue.setJobQueueTimerCallback(jobsQueueTimer);
}

private void removeMeters(MeterRegistry registry) {
Expand All @@ -98,6 +130,10 @@ private void removeMeters(MeterRegistry registry) {
registry.remove(jobsDequeued);
registry.remove(jobsRejected);
registry.remove(jobsLowestPriority);
registry.remove(jobsMinAge);
registry.remove(jobsMaxAge);
registry.remove(jobsAvgAge);
registry.remove(jobsQueueTimer);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static com.google.common.base.Preconditions.checkState;

import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -28,23 +29,31 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.spi.compaction.CompactionJob;
import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
import org.apache.accumulo.core.util.Stat;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.compaction.CompactionJobPrioritizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;

/**
* Priority Queue for {@link CompactionJob}s that supports a maximum size. When a job is added and
Expand Down Expand Up @@ -113,6 +122,9 @@ public boolean equals(Object o) {
private final AtomicLong dequeuedJobs;
private final ArrayDeque<CompletableFuture<CompactionJobQueues.MetaJob>> futures;
private long futuresAdded = 0;
private final Map<KeyExtent,Timer> jobAges;
private final Supplier<CompactionJobPriorityQueueStats> jobQueueStats;
private final AtomicReference<Optional<io.micrometer.core.instrument.Timer>> jobQueueTimer;

private static class TabletJobs {
final long generation;
Expand All @@ -138,6 +150,10 @@ public CompactionJobPriorityQueue(CompactorGroupId groupId, int maxSize) {
this.rejectedJobs = new AtomicLong(0);
this.dequeuedJobs = new AtomicLong(0);
this.futures = new ArrayDeque<>();
this.jobAges = new ConcurrentHashMap<>();
this.jobQueueStats = Suppliers.memoizeWithExpiration(
() -> new CompactionJobPriorityQueueStats(jobAges), 5, TimeUnit.SECONDS);
this.jobQueueTimer = new AtomicReference<>(Optional.empty());
}

public synchronized void removeOlderGenerations(Ample.DataLevel level, long currGeneration) {
Expand All @@ -154,7 +170,8 @@ public synchronized void removeOlderGenerations(Ample.DataLevel level, long curr
removals.size(), groupId, level);
}

removals.forEach(this::removePreviousSubmissions);
// Also clears jobAge timer for tablets that do not need compaction anymore
removals.forEach(ke -> removePreviousSubmissions(ke, true));
}

/**
Expand All @@ -164,7 +181,10 @@ public synchronized int add(TabletMetadata tabletMetadata, Collection<Compaction
long generation) {
Preconditions.checkArgument(jobs.stream().allMatch(job -> job.getGroup().equals(groupId)));

removePreviousSubmissions(tabletMetadata.getExtent());
// Do not clear jobAge timers, they are cleared later at the end of this method
// if there are no jobs for the extent so we do not reset the timer for an extent
// that had previous jobs and still has jobs
removePreviousSubmissions(tabletMetadata.getExtent(), false);

HashSet<CjpqKey> newEntries = new HashSet<>(jobs.size());

Expand All @@ -175,9 +195,14 @@ public synchronized int add(TabletMetadata tabletMetadata, Collection<Compaction
// its expected that if futures are present then the queue is empty, if this is not true
// then there is a bug
Preconditions.checkState(jobQueue.isEmpty());
// Queue should be empty so jobAges should be empty
Preconditions.checkState(jobAges.isEmpty());
if (future.complete(new CompactionJobQueues.MetaJob(job, tabletMetadata))) {
// successfully completed a future with this job, so do not need to queue the job
jobsAdded++;
// Record a time of 0 as job as we were able to complete immediately and there
// were no jobs waiting
jobQueueTimer.get().ifPresent(jqt -> jqt.record(Duration.ZERO));
continue outer;
} // else the future was canceled or timed out so could not complete it
future = futures.poll();
Expand All @@ -197,6 +222,9 @@ public synchronized int add(TabletMetadata tabletMetadata, Collection<Compaction
if (!newEntries.isEmpty()) {
checkState(tabletJobs.put(tabletMetadata.getExtent(), new TabletJobs(generation, newEntries))
== null);
jobAges.computeIfAbsent(tabletMetadata.getExtent(), e -> Timer.startNew());
} else {
jobAges.remove(tabletMetadata.getExtent());
}

return jobsAdded;
Expand Down Expand Up @@ -235,10 +263,19 @@ public synchronized CompactionJobQueues.MetaJob poll() {
if (first != null) {
dequeuedJobs.getAndIncrement();
var extent = first.getValue().getTabletMetadata().getExtent();
Set<CjpqKey> jobs = tabletJobs.get(extent).jobs;
var timer = jobAges.get(extent);
checkState(timer != null);
jobQueueTimer.get().ifPresent(jqt -> jqt.record(timer.elapsed()));
log.trace("Compaction job age for {} is {} ms", extent, timer.elapsed(TimeUnit.MILLISECONDS));
Set<CompactionJobPriorityQueue.CjpqKey> jobs = tabletJobs.get(extent).jobs;
checkState(jobs.remove(first.getKey()));
// If there are no more jobs for this extent we can remove the timer, otherwise
// we need to reset it
if (jobs.isEmpty()) {
tabletJobs.remove(extent);
jobAges.remove(extent);
} else {
timer.restart();
}
}
return first == null ? null : first.getValue();
Expand Down Expand Up @@ -280,11 +317,15 @@ synchronized CompactionJobQueues.MetaJob peek() {
return firstEntry == null ? null : firstEntry.getValue();
}

private void removePreviousSubmissions(KeyExtent extent) {
TabletJobs prevJobs = tabletJobs.get(extent);
private void removePreviousSubmissions(KeyExtent extent, boolean removeJobAges) {
CompactionJobPriorityQueue.TabletJobs prevJobs = tabletJobs.get(extent);
if (prevJobs != null) {
prevJobs.jobs.forEach(jobQueue::remove);
tabletJobs.remove(extent);
if (removeJobAges) {
jobAges.remove(extent);
log.trace("Removed jobAge timer for tablet {} that no longer needs compaction", extent);
}
}
}

Expand All @@ -302,16 +343,63 @@ private CjpqKey addJobToQueue(TabletMetadata tabletMetadata, CompactionJob job)
rejectedJobs.getAndIncrement();
}
}

}

var key = new CjpqKey(job);
jobQueue.put(key, new CompactionJobQueues.MetaJob(job, tabletMetadata));
return key;
}

public synchronized void clear() {
jobQueue.clear();
tabletJobs.clear();
public synchronized void clearIfInactive(Duration duration) {
// IF the minimum age of jobs in the queue is older than the
// duration then clear all the maps as this queue is now
// considered inactive
if (getJobQueueStats().getMinAge().compareTo(duration) > 0) {
jobQueue.clear();
tabletJobs.clear();
jobAges.clear();
}
}

public CompactionJobPriorityQueueStats getJobQueueStats() {
return jobQueueStats.get();
}

public void setJobQueueTimerCallback(io.micrometer.core.instrument.Timer jobQueueTimer) {
this.jobQueueTimer.set(Optional.of(jobQueueTimer));
}

// Used for unit testing, can return the map as is because
// it is a ConcurrentHashMap
@VisibleForTesting
Map<KeyExtent,Timer> getJobAges() {
return jobAges;
}

public static class CompactionJobPriorityQueueStats {
private final Duration minAge;
private final Duration maxAge;
private final Duration avgAge;

@VisibleForTesting
CompactionJobPriorityQueueStats(Map<KeyExtent,Timer> jobAges) {
final Stat stats = new Stat();
jobAges.values().forEach(t -> stats.addStat(t.elapsed(TimeUnit.MILLISECONDS)));
this.minAge = Duration.ofMillis(stats.min());
this.maxAge = Duration.ofMillis(stats.max());
this.avgAge = Duration.ofMillis(Math.round(stats.mean()));
}

public Duration getMinAge() {
return minAge;
}

public Duration getMaxAge() {
return maxAge;
}

public Duration getAvgAge() {
return avgAge;
}
}
}
Loading

0 comments on commit ab5c57e

Please sign in to comment.