Skip to content

Commit

Permalink
Improve some interceptor code paths in metrics (#3251)
Browse files Browse the repository at this point in the history
  • Loading branch information
tjquinno authored Aug 24, 2021
1 parent ec9823e commit 8a476cf
Show file tree
Hide file tree
Showing 14 changed files with 208 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ class ExemplarServiceManager {

private static final List<ExemplarService> EXEMPLAR_SERVICES = collectExemplarServices();

private static final boolean IS_ACTIVE = !EXEMPLAR_SERVICES.isEmpty();

private static final Supplier<String> EXEMPLAR_SUPPLIER = EXEMPLAR_SERVICES.isEmpty()
? () -> ""
: () -> EXEMPLAR_SERVICES.stream()
static final String INACTIVE_LABEL = "";

private static final Supplier<String> EXEMPLAR_SUPPLIER = () -> EXEMPLAR_SERVICES.stream()
.map(ExemplarService::label)
.filter(Predicate.not(String::isBlank))
.collect(ExemplarServiceManager::labelsStringJoiner, StringJoiner::add, StringJoiner::merge)
Expand All @@ -58,7 +59,15 @@ private static StringJoiner labelsStringJoiner() {
* @return exemplar string provided by the highest-priority service instance
*/
static String exemplarLabel() {
return EXEMPLAR_SUPPLIER.get();
return IS_ACTIVE ? EXEMPLAR_SUPPLIER.get() : INACTIVE_LABEL;
}

/**
*
* @return whether exemplar handling is active or not
*/
static boolean isActive() {
return IS_ACTIVE;
}

private static List<ExemplarService> collectExemplarServices() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@
package io.helidon.metrics;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import java.util.logging.Logger;

import static java.lang.Math.exp;
import static java.lang.Math.min;
Expand Down Expand Up @@ -49,6 +54,49 @@ class ExponentiallyDecayingReservoir {
private static final double DEFAULT_ALPHA = 0.015;
private static final long RESCALE_THRESHOLD = TimeUnit.HOURS.toNanos(1);

/*
* Avoid computing the current time in seconds during every reservoir update by updating its value on a scheduled basis.
*/
private static final long CURRENT_TIME_IN_SECONDS_UPDATE_INTERVAL_MS = 250;

private static final List<Runnable> CURRENT_TIME_IN_SECONDS_UPDATERS = new ArrayList<>();

private static final ScheduledExecutorService CURRENT_TIME_UPDATER_EXECUTOR_SERVICE = initCurrentTimeUpdater();

private static final Logger LOGGER = Logger.getLogger(ExponentiallyDecayingReservoir.class.getName());

private volatile long currentTimeInSeconds;

private static ScheduledExecutorService initCurrentTimeUpdater() {
ScheduledExecutorService result = Executors.newSingleThreadScheduledExecutor();
result.scheduleAtFixedRate(ExponentiallyDecayingReservoir::updateCurrentTimeInSecondsForAllReservoirs,
CURRENT_TIME_IN_SECONDS_UPDATE_INTERVAL_MS,
CURRENT_TIME_IN_SECONDS_UPDATE_INTERVAL_MS, TimeUnit.MILLISECONDS);
return result;
}

static void onServerShutdown() {
CURRENT_TIME_UPDATER_EXECUTOR_SERVICE.shutdown();
try {
boolean stoppedNormally =
CURRENT_TIME_UPDATER_EXECUTOR_SERVICE.awaitTermination(CURRENT_TIME_IN_SECONDS_UPDATE_INTERVAL_MS * 10,
TimeUnit.MILLISECONDS);
if (!stoppedNormally) {
LOGGER.log(Level.WARNING, "Shutdown of current time updater timed out; continuing");
}
} catch (InterruptedException e) {
LOGGER.log(Level.WARNING, "InterruptedException caught while stopping the current time updater; continuing");
}
}

private static void updateCurrentTimeInSecondsForAllReservoirs() {
CURRENT_TIME_IN_SECONDS_UPDATERS.forEach(Runnable::run);
}

private long computeCurrentTimeInSeconds() {
return TimeUnit.MILLISECONDS.toSeconds(clock.milliTime());
}

private final ConcurrentSkipListMap<Double, WeightedSnapshot.WeightedSample> values;
private final ReentrantReadWriteLock lock;
private final double alpha;
Expand Down Expand Up @@ -81,7 +129,9 @@ class ExponentiallyDecayingReservoir {
this.alpha = alpha;
this.size = size;
this.count = new AtomicLong(0);
this.startTime = currentTimeInSeconds();
CURRENT_TIME_IN_SECONDS_UPDATERS.add(this::computeCurrentTimeInSeconds);
currentTimeInSeconds = computeCurrentTimeInSeconds();
this.startTime = currentTimeInSeconds;
this.nextScaleTime = new AtomicLong(clock.nanoTick() + RESCALE_THRESHOLD);
}

Expand All @@ -90,7 +140,7 @@ public int size() {
}

public void update(long value, String label) {
update(value, currentTimeInSeconds(), label);
update(value, currentTimeInSeconds, label);
}

/**
Expand Down Expand Up @@ -143,10 +193,6 @@ public WeightedSnapshot getSnapshot() {
}
}

private long currentTimeInSeconds() {
return TimeUnit.MILLISECONDS.toSeconds(clock.milliTime());
}

private double weight(long t) {
return exp(alpha * t);
}
Expand Down Expand Up @@ -174,7 +220,7 @@ private void rescale(long now, long next) {
try {
if (nextScaleTime.compareAndSet(next, now + RESCALE_THRESHOLD)) {
final long oldStartTime = startTime;
this.startTime = currentTimeInSeconds();
this.startTime = currentTimeInSeconds;
final double scalingFactor = exp(-alpha * (startTime - oldStartTime));
if (Double.compare(scalingFactor, 0) == 0) {
values.clear();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2020 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2021 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -74,4 +74,17 @@ interface HelidonMetric extends Metric {
* @return the metric's {@link Metadata}
*/
Metadata metadata();

/**
* Returns whether the metric has been deleted.
*
* @return true if the metrics was removed from the registry; false otherwise
*/
boolean isDeleted();

/**
* Mark this metric as deleted.
*/
void markAsDeleted();

}
13 changes: 13 additions & 0 deletions metrics/metrics/src/main/java/io/helidon/metrics/MetricImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ private static Map<String, String> initEscapedCharsMap() {
private final String registryType;
private final Metadata metadata;

// Efficient check from interceptors to see if the metric is still valid
private boolean isDeleted;

MetricImpl(String registryType, Metadata metadata) {
this.metadata = metadata;
this.registryType = registryType;
Expand Down Expand Up @@ -188,6 +191,16 @@ public void jsonMeta(JsonObjectBuilder builder, List<MetricID> metricIDs) {
builder.add(getName(), metaBuilder);
}

@Override
public boolean isDeleted() {
return isDeleted;
}

@Override
public void markAsDeleted() {
isDeleted = true;
}

static String jsonFullKey(String baseName, MetricID metricID) {
return metricID.getTags().isEmpty() ? baseName
: String.format("%s;%s", baseName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,11 @@ public void update(Routing.Rules rules) {
configureEndpoint(rules, rules);
}

@Override
protected void onShutdown() {
ExponentiallyDecayingReservoir.onServerShutdown();
}

private static KeyPerformanceIndicatorSupport.Context kpiContext(ServerRequest request) {
return request.context()
.get(KeyPerformanceIndicatorSupport.Context.class)
Expand Down
29 changes: 23 additions & 6 deletions metrics/metrics/src/main/java/io/helidon/metrics/Registry.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2020 Oracle and/or its affiliates.
* Copyright (c) 2018, 2021 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -82,6 +82,17 @@ public static Registry create(Type type) {
return new Registry(type);
}

/**
* Indicates whether the specific metrics has been marked as deleted.
*
* @param metric the metric to check
* @return true if it's a Helidon metric and has been marked as deleted; false otherwise
*/
public static boolean isMarkedAsDeleted(Metric metric) {
return (metric instanceof HelidonMetric)
&& ((HelidonMetric) metric).isDeleted();
}

@Override
public <T extends Metric> T register(String name, T metric) throws IllegalArgumentException {
return registerUniqueMetric(name, metric);
Expand Down Expand Up @@ -225,12 +236,14 @@ public SimpleTimer simpleTimer(Metadata metadata, Tag... tags) {
*/
@Override
public synchronized boolean remove(String name) {
final List<MetricID> metricIDs = allMetricIDsByName.get(name);
if (metricIDs == null) {
final List<Map.Entry<MetricID, HelidonMetric>> doomedMetrics = getMetricsByName(name);
if (doomedMetrics.isEmpty()) {
return false;
}
final boolean result = metricIDs.stream()
.map(metricID -> allMetrics.remove(metricID) != null)

final boolean result = doomedMetrics.stream()
.peek(entry -> entry.getValue().markAsDeleted())
.map(entry -> allMetrics.remove(entry.getKey()) != null)
.reduce((a, b) -> a || b)
.orElse(false);
allMetricIDsByName.remove(name);
Expand All @@ -256,7 +269,11 @@ public synchronized boolean remove(MetricID metricID) {
allMetadata.remove(metricID.getName());
}

return allMetrics.remove(metricID) != null;
HelidonMetric doomedMetric = allMetrics.remove(metricID);
if (doomedMetric != null) {
doomedMetric.markAsDeleted();
}
return doomedMetric != null;
}

@Override
Expand Down
6 changes: 5 additions & 1 deletion metrics/metrics/src/main/java/io/helidon/metrics/Sample.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
*/
interface Sample {

boolean IS_EXEMPLAR_HANDLING_ACTIVE = ExemplarServiceManager.isActive();

static Derived derived(double value, Sample.Labeled reference) {
return new Derived.Impl(value, reference);
}
Expand All @@ -30,7 +32,9 @@ static Derived derived(double value) {
}

static Labeled labeled(long value) {
return new Labeled.Impl(value, ExemplarServiceManager.exemplarLabel(), System.currentTimeMillis());
return IS_EXEMPLAR_HANDLING_ACTIVE
? new Labeled.Impl(value, ExemplarServiceManager.exemplarLabel(), System.currentTimeMillis())
: new Labeled.Impl(value, ExemplarServiceManager.INACTIVE_LABEL, 0);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,9 @@ public void dump(OutputStream output) {

/**
* Labeled sample with a weight.
*
* If the label is empty, then this sample will never be an exemplar so we do not need to default the timestamp to the
* current time.
*/
static class WeightedSample extends Labeled.Impl {

Expand All @@ -335,11 +338,11 @@ static class WeightedSample extends Labeled.Impl {
}

WeightedSample(long value, double weight, String label) {
this(value, weight, System.currentTimeMillis(), label);
this(value, weight, label.isEmpty() ? 0 : System.currentTimeMillis(), label);
}

WeightedSample(long value) {
this(value, 1.0, System.currentTimeMillis(), "");
this(value, 1.0, 0, "");
}

long getValue() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,18 @@
package io.helidon.microprofile.metrics;

import java.lang.annotation.Annotation;
import java.util.Map;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.inject.Inject;
import javax.interceptor.InvocationContext;

import io.helidon.metrics.Registry;
import io.helidon.microprofile.metrics.MetricsCdiExtension.MetricWorkItem;
import io.helidon.servicecommon.restcdi.HelidonInterceptor;

import org.eclipse.microprofile.metrics.Metric;
import org.eclipse.microprofile.metrics.MetricID;
import org.eclipse.microprofile.metrics.MetricRegistry;

/**
Expand All @@ -53,8 +52,6 @@ abstract class InterceptorBase<M extends Metric> extends HelidonInterceptor.Base
@Inject
private MetricRegistry registry;

private Map<MetricID, Metric> metricsForVerification;

enum ActionType {
PREINVOKE("preinvoke"), COMPLETE("complete");

Expand All @@ -78,27 +75,22 @@ Class<? extends Annotation> annotationType() {
return annotationType;
}

Map<MetricID, Metric> metricsForVerification() {
if (metricsForVerification == null) {
metricsForVerification = registry.getMetrics();
}
return metricsForVerification;
}

@Override
public void preInvocation(InvocationContext context, MetricWorkItem workItem) {
invokeVerifiedAction(context, workItem, this::preInvoke, ActionType.PREINVOKE);
}

void invokeVerifiedAction(InvocationContext context, MetricWorkItem workItem, Consumer<M> action, ActionType actionType) {
if (!metricsForVerification().containsKey(workItem.metricID())) {
Metric metric = workItem.metric();
if (Registry.isMarkedAsDeleted(metric)) {
throw new IllegalStateException("Attempt to use previously-removed metric" + workItem.metricID());
}
Metric metric = workItem.metric();
LOGGER.log(Level.FINEST, () -> String.format(
"%s (%s) is accepting %s %s for processing on %s triggered by @%s",
getClass().getSimpleName(), actionType, workItem.metric().getClass().getSimpleName(), workItem.metricID(),
context.getMethod() != null ? context.getMethod() : context.getConstructor(), annotationType.getSimpleName()));
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.log(Level.FINEST, String.format(
"%s (%s) is accepting %s %s for processing on %s triggered by @%s",
getClass().getSimpleName(), actionType, workItem.metric().getClass().getSimpleName(), workItem.metricID(),
context.getMethod() != null ? context.getMethod() : context.getConstructor(), annotationType.getSimpleName()));
}
action.accept(metricType.cast(metric));
}

Expand Down
Loading

0 comments on commit 8a476cf

Please sign in to comment.