Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add capability to have configurable aggregation temporality for OTLP Registry #3625

Merged
merged 20 commits into from
Apr 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
5615949
Add capability to have configurable aggregation temporality for OTLP …
lenin-jaganathan Feb 9, 2023
2212f84
Set isDeltaAggregationTemporality in Constructor and avoid re-evaluat…
lenin-jaganathan Feb 17, 2023
d8e4ee8
Fix copyright issues
lenin-jaganathan Feb 17, 2023
2664f89
Make getAggregationTemporality return AggregationTemporality enum.
lenin-jaganathan Feb 17, 2023
7854a88
Add tests for delta registry
lenin-jaganathan Feb 18, 2023
487af47
Add AggregationTemporality Enum
lenin-jaganathan Feb 22, 2023
a83564a
Calculate TimeUnixNano once per publishing interval
lenin-jaganathan Feb 23, 2023
dc2dd6b
Add tests for Histograms.
lenin-jaganathan Feb 23, 2023
8b8c350
Refactor AggregationTemporality.java
lenin-jaganathan Feb 24, 2023
490b2a3
Merge branch 'main' into otlp_stepregistry
jonatan-ivanov Mar 21, 2023
6d9f226
auto-formatting
jonatan-ivanov Mar 21, 2023
cfd491f
Merge branch 'main' into otlp_stepregistry
jonatan-ivanov Mar 28, 2023
2076bb2
Polish
jonatan-ivanov Mar 28, 2023
65831a8
Add Lenin Jaganathan as @author
jonatan-ivanov Mar 29, 2023
789b82b
Polish
lenin-jaganathan Mar 29, 2023
08e43c2
Add the word cumulative to meter names
jonatan-ivanov Mar 31, 2023
8f51093
Add integration tests for delta
jonatan-ivanov Mar 31, 2023
7d1cf36
Set higher timeout for OTLP integration tests
jonatan-ivanov Mar 31, 2023
95cb710
CumulativeMetrics will use config.wallTime() as metric time and Delta…
lenin-jaganathan Apr 1, 2023
0db0221
Revert "Add integration tests for delta"
lenin-jaganathan Apr 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@

import io.micrometer.core.instrument.config.validate.Validated;
import io.micrometer.core.instrument.push.PushRegistryConfig;
import io.opentelemetry.proto.metrics.v1.AggregationTemporality;

import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

import static io.micrometer.core.instrument.config.MeterRegistryConfigValidator.*;
import static io.micrometer.core.instrument.config.validate.PropertyValidator.getString;
import static io.micrometer.core.instrument.config.validate.PropertyValidator.getUrlString;
import static io.micrometer.core.instrument.config.validate.PropertyValidator.*;

/**
* Config for {@link OtlpMeterRegistry}.
Expand Down Expand Up @@ -87,6 +87,19 @@ default Map<String, String> resourceAttributes() {
return resourceAttributes;
}

/**
* {@link AggregationTemporality} of the OtlpMeterRegistry. This determines whether
* the meters should be cumulative(AGGREGATION_TEMPORALITY_CUMULATIVE) or
* step(AGGREGATION_TEMPORALITY_DELTA).
* @return the aggregationTemporality for OtlpRegistry
* @see <a href=
* "https://opentelemetry.io/docs/reference/specification/metrics/data-model/#temporality">Temporality</a>
*/
default int getAggregationTemporality() {
lenin-jaganathan marked this conversation as resolved.
Show resolved Hide resolved
return getInteger(this, "aggregationTemporality")
.orElse(AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE_VALUE);
jonatan-ivanov marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Additional headers to send with exported metrics. This may be needed for
* authorization headers, for example.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.time.Duration;
import java.util.concurrent.TimeUnit;

class OtlpDistributionSummary extends CumulativeDistributionSummary implements StartTimeAwareMeter {
class OtlpCumulativeDistributionSummary extends CumulativeDistributionSummary implements StartTimeAwareMeter {

private static final CountAtBucket[] EMPTY_HISTOGRAM = new CountAtBucket[0];

Expand All @@ -32,8 +32,8 @@ class OtlpDistributionSummary extends CumulativeDistributionSummary implements S
@Nullable
private final Histogram monotonicBucketCountHistogram;

OtlpDistributionSummary(Id id, Clock clock, DistributionStatisticConfig distributionStatisticConfig, double scale,
boolean supportsAggregablePercentiles) {
OtlpCumulativeDistributionSummary(Id id, Clock clock, DistributionStatisticConfig distributionStatisticConfig,
double scale, boolean supportsAggregablePercentiles) {
super(id, clock, DistributionStatisticConfig.builder().percentilesHistogram(false) // avoid
// a
// histogram
Expand Down Expand Up @@ -79,13 +79,9 @@ public HistogramSnapshot takeSnapshot() {
return snapshot;
}

CountAtBucket[] histogramCounts = this.monotonicBucketCountHistogram.takeSnapshot(0, 0, 0).histogramCounts();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily related to this PR, and might be addressed in a follow-up (also untested, I just happen to have fixed a similar bug recently): the fact that super.takeSnapshot() and monotonicBucketCountHistogram.takeSnapshot() are not synchronized might lead to discrepancies between the count and the bucket counts if some other thread records data in the time between the two method calls.

return new HistogramSnapshot(snapshot.count(), snapshot.total(), snapshot.max(), snapshot.percentileValues(),
histogramCounts(), snapshot::outputSummary);
}

private CountAtBucket[] histogramCounts() {
return this.monotonicBucketCountHistogram == null ? EMPTY_HISTOGRAM
: this.monotonicBucketCountHistogram.takeSnapshot(0, 0, 0).histogramCounts();
histogramCounts, snapshot::outputSummary);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@
import java.time.Duration;
import java.util.concurrent.TimeUnit;

class OtlpTimer extends CumulativeTimer implements StartTimeAwareMeter {
class OtlpCumulativeTimer extends CumulativeTimer implements StartTimeAwareMeter {

private final long startTimeNanos;

@Nullable
private final Histogram monotonicCountBucketHistogram;

OtlpTimer(Id id, Clock clock, DistributionStatisticConfig distributionStatisticConfig, PauseDetector pauseDetector,
TimeUnit baseTimeUnit) {
OtlpCumulativeTimer(Id id, Clock clock, DistributionStatisticConfig distributionStatisticConfig,
PauseDetector pauseDetector, TimeUnit baseTimeUnit) {
super(id, clock, DistributionStatisticConfig.builder().percentilesHistogram(false) // avoid
// a
// histogram
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import io.micrometer.core.instrument.distribution.*;
import io.micrometer.core.instrument.distribution.pause.PauseDetector;
import io.micrometer.core.instrument.internal.DefaultGauge;
import io.micrometer.core.instrument.internal.DefaultLongTaskTimer;
import io.micrometer.core.instrument.internal.DefaultMeter;
import io.micrometer.core.instrument.push.PushMeterRegistry;
import io.micrometer.core.instrument.step.*;
import io.micrometer.core.instrument.util.MeterPartition;
import io.micrometer.core.instrument.util.NamedThreadFactory;
import io.micrometer.core.instrument.util.TimeUtils;
Expand Down Expand Up @@ -123,19 +125,27 @@ protected <T> Gauge newGauge(Meter.Id id, @Nullable T obj, ToDoubleFunction<T> v

@Override
protected Counter newCounter(Meter.Id id) {
return new OtlpCounter(id, this.clock);
return isDeltaAggregationTemporality() ? new StepCounter(id, this.clock, config.step().toMillis())
lenin-jaganathan marked this conversation as resolved.
Show resolved Hide resolved
: new OtlpCounter(id, this.clock);
jonatan-ivanov marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
protected Timer newTimer(Meter.Id id, DistributionStatisticConfig distributionStatisticConfig,
PauseDetector pauseDetector) {
return new OtlpTimer(id, this.clock, distributionStatisticConfig, pauseDetector, getBaseTimeUnit());
return isDeltaAggregationTemporality()
? new OtlpStepTimer(id, clock, distributionStatisticConfig, pauseDetector, getBaseTimeUnit(),
config.step().toMillis())
: new OtlpCumulativeTimer(id, this.clock, distributionStatisticConfig, pauseDetector,
getBaseTimeUnit());
}

@Override
protected DistributionSummary newDistributionSummary(Meter.Id id,
DistributionStatisticConfig distributionStatisticConfig, double scale) {
return new OtlpDistributionSummary(id, this.clock, distributionStatisticConfig, scale, true);
return isDeltaAggregationTemporality()
? new OtlpStepDistributionSummary(id, clock, distributionStatisticConfig, scale,
config.step().toMillis())
: new OtlpCumulativeDistributionSummary(id, this.clock, distributionStatisticConfig, scale, true);
}

@Override
Expand All @@ -146,18 +156,25 @@ protected Meter newMeter(Meter.Id id, Meter.Type type, Iterable<Measurement> mea
@Override
protected <T> FunctionTimer newFunctionTimer(Meter.Id id, T obj, ToLongFunction<T> countFunction,
ToDoubleFunction<T> totalTimeFunction, TimeUnit totalTimeFunctionUnit) {
return new OtlpFunctionTimer<>(id, obj, countFunction, totalTimeFunction, totalTimeFunctionUnit,
getBaseTimeUnit(), this.clock);
return isDeltaAggregationTemporality()
? new StepFunctionTimer<>(id, clock, config.step().toMillis(), obj, countFunction, totalTimeFunction,
totalTimeFunctionUnit, getBaseTimeUnit())
: new OtlpFunctionTimer<>(id, obj, countFunction, totalTimeFunction, totalTimeFunctionUnit,
getBaseTimeUnit(), this.clock);
}

@Override
protected <T> FunctionCounter newFunctionCounter(Meter.Id id, T obj, ToDoubleFunction<T> countFunction) {
return new OtlpFunctionCounter<>(id, obj, countFunction, this.clock);
return isDeltaAggregationTemporality()
? new StepFunctionCounter<>(id, clock, config.step().toMillis(), obj, countFunction)
: new OtlpFunctionCounter<>(id, obj, countFunction, this.clock);
}

@Override
protected LongTaskTimer newLongTaskTimer(Meter.Id id, DistributionStatisticConfig distributionStatisticConfig) {
return new OtlpLongTaskTimer(id, this.clock, getBaseTimeUnit(), distributionStatisticConfig);
return isDeltaAggregationTemporality()
? new DefaultLongTaskTimer(id, clock, getBaseTimeUnit(), distributionStatisticConfig, false)
: new OtlpLongTaskTimer(id, this.clock, getBaseTimeUnit(), distributionStatisticConfig);
}

@Override
Expand All @@ -181,31 +198,29 @@ private Metric writeMeter(Meter meter) {
Metric writeGauge(Gauge gauge) {
return getMetricBuilder(gauge.getId())
.setGauge(io.opentelemetry.proto.metrics.v1.Gauge.newBuilder()
.addDataPoints(NumberDataPoint.newBuilder()
.setTimeUnixNano(TimeUnit.MILLISECONDS.toNanos(this.clock.wallTime()))
.addDataPoints(NumberDataPoint.newBuilder().setTimeUnixNano(getEndTimeNanos())
.setAsDouble(gauge.value()).addAllAttributes(getTagsForId(gauge.getId())).build()))
.build();
}

// VisibleForTesting
Metric writeCounter(Counter counter) {
return writeSum((StartTimeAwareMeter) counter, counter::count);
return writeSum(counter, counter::count);
}

// VisibleForTesting
Metric writeFunctionCounter(FunctionCounter functionCounter) {
return writeSum((StartTimeAwareMeter) functionCounter, functionCounter::count);
return writeSum(functionCounter, functionCounter::count);
}

private Metric writeSum(StartTimeAwareMeter meter, DoubleSupplier count) {
return getMetricBuilder(meter.getId())
.setSum(Sum.newBuilder()
.addDataPoints(NumberDataPoint.newBuilder().setStartTimeUnixNano(meter.getStartTimeNanos())
.setTimeUnixNano(TimeUnit.MILLISECONDS.toNanos(this.clock.wallTime()))
.setAsDouble(count.getAsDouble()).addAllAttributes(getTagsForId(meter.getId())).build())
.setIsMonotonic(true)
.setAggregationTemporality(AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE).build())
.build();
private Metric writeSum(Meter meter, DoubleSupplier count) {
return getMetricBuilder(meter.getId()).setSum(Sum.newBuilder()
.addDataPoints(NumberDataPoint.newBuilder().setStartTimeUnixNano(getStartTimeNanos(meter))
.setTimeUnixNano(getEndTimeNanos()).setAsDouble(count.getAsDouble())
.addAllAttributes(getTagsForId(meter.getId())).build())
.setIsMonotonic(true)
.setAggregationTemporality(AggregationTemporality.forNumber(config.getAggregationTemporality()))
.build()).build();
}

// VisibleForTesting
Expand All @@ -215,8 +230,8 @@ Metric writeHistogramSupport(HistogramSupport histogramSupport) {
HistogramSnapshot histogramSnapshot = histogramSupport.takeSnapshot();

Iterable<? extends KeyValue> tags = getTagsForId(histogramSupport.getId());
long startTimeNanos = ((StartTimeAwareMeter) histogramSupport).getStartTimeNanos();
long wallTimeNanos = TimeUnit.MILLISECONDS.toNanos(this.clock.wallTime());
long startTimeNanos = getStartTimeNanos(histogramSupport);
long wallTimeNanos = getEndTimeNanos();
jonatan-ivanov marked this conversation as resolved.
Show resolved Hide resolved
double total = isTimeBased ? histogramSnapshot.total(getBaseTimeUnit()) : histogramSnapshot.total();
long count = histogramSnapshot.count();

Expand All @@ -236,6 +251,9 @@ Metric writeHistogramSupport(HistogramSupport histogramSupport) {
HistogramDataPoint.Builder histogramDataPoint = HistogramDataPoint.newBuilder().addAllAttributes(tags)
.setStartTimeUnixNano(startTimeNanos).setTimeUnixNano(wallTimeNanos).setSum(total).setCount(count);

if (isDeltaAggregationTemporality()) {
lenin-jaganathan marked this conversation as resolved.
Show resolved Hide resolved
histogramDataPoint.setMax(histogramSnapshot.max(getBaseTimeUnit()));
}
// if histogram enabled, add histogram buckets
if (histogramSnapshot.histogramCounts().length != 0) {
for (CountAtBucket countAtBucket : histogramSnapshot.histogramCounts()) {
Expand All @@ -244,26 +262,39 @@ Metric writeHistogramSupport(HistogramSupport histogramSupport) {
histogramDataPoint.addBucketCounts((long) countAtBucket.count());
}
metricBuilder.setHistogram(Histogram.newBuilder()
.setAggregationTemporality(AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE)
.setAggregationTemporality(AggregationTemporality.forNumber(config.getAggregationTemporality()))
.addDataPoints(histogramDataPoint));
return metricBuilder.build();
}

return metricBuilder.setHistogram(Histogram.newBuilder()
.setAggregationTemporality(AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE)
.setAggregationTemporality(AggregationTemporality.forNumber(config.getAggregationTemporality()))
.addDataPoints(histogramDataPoint)).build();
}

// VisibleForTesting
Metric writeFunctionTimer(FunctionTimer functionTimer) {
return getMetricBuilder(functionTimer.getId()).setHistogram(Histogram.newBuilder()
.addDataPoints(HistogramDataPoint.newBuilder().addAllAttributes(getTagsForId(functionTimer.getId()))
.setStartTimeUnixNano(((StartTimeAwareMeter) functionTimer).getStartTimeNanos())
.setTimeUnixNano(TimeUnit.MILLISECONDS.toNanos(this.clock.wallTime()))
.setStartTimeUnixNano(getStartTimeNanos((functionTimer))).setTimeUnixNano(getEndTimeNanos())
.setSum(functionTimer.totalTime(getBaseTimeUnit())).setCount((long) functionTimer.count())))
.build();
}

private boolean isDeltaAggregationTemporality() {
return config.getAggregationTemporality() == AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA_VALUE;
}

private long getStartTimeNanos(Meter meter) {
jonatan-ivanov marked this conversation as resolved.
Show resolved Hide resolved
return isDeltaAggregationTemporality() ? getEndTimeNanos() - config.step().toNanos()
: ((StartTimeAwareMeter) meter).getStartTimeNanos();
}

private long getEndTimeNanos() {
return isDeltaAggregationTemporality() ? (clock.wallTime() / config.step().toMillis()) * config.step().toNanos()
jonatan-ivanov marked this conversation as resolved.
Show resolved Hide resolved
: TimeUnit.MILLISECONDS.toNanos(clock.wallTime());
}

private Metric.Builder getMetricBuilder(Meter.Id id) {
Metric.Builder builder = Metric.newBuilder().setName(getConventionName(id));
if (id.getBaseUnit() != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2022 VMware, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micrometer.registry.otlp;

import io.micrometer.common.lang.Nullable;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.distribution.*;
import io.micrometer.core.instrument.step.StepDistributionSummary;

class OtlpStepDistributionSummary extends StepDistributionSummary {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure that this is a good idea but this class seem the same as OtlpCumulativeDistributionSummary except:

  1. TimeWindowFixedBoundaryHistogram/DistributionStatisticConfig/etc. are created differently
  2. Cumulative has getStartTimeNanos

So I was thinking if we should have the common logic in a common place, e.g.:

  1. Having only one OtlpDistributionSummary that can handle both (getStartTimeNanos will be there in the step case too).
  2. Having a common parent/super class that cumulative/step can extend and provide the TimeWindowFixedBoundaryHistogram/DistributionStatisticConfig/etc. for example in ctor.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think OtlpStepTimer has the same situation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right see one of my TODOs in the PR description. This was one of my POC codes and it needs some re-designing and better code re-organization. Will start on that part and welcome any suggestions there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel free to ping us if you need some help.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I might need some especially in restructuring some of these classes. Already, OtlpMeterRegisrty does what I believe was not How registries were meant to work doing 2 things - Step and Cumulative. And the same goes with the Timers and DistributionSummaries because there are different things inherently - Step and Cumulative. So, any help on this would be helpful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to throw in the idea of get-and-reset semantics for Histograms. Basically, you record the data, snapshot it, and then reset the histogram on every export. This circumvents the whole timer and rotation code, which I am not sure is required for a delta-based export. Just leaving this here as an idea, let me know if there is something I am missing


@Nullable
private final Histogram countBucketHistogram;

/**
* Create a new {@code StepDistributionSummary}.
* @param id ID
* @param clock clock
* @param distributionStatisticConfig distribution static configuration
* @param scale scale
* @param stepMillis step in milliseconds
*/
public OtlpStepDistributionSummary(Id id, Clock clock, DistributionStatisticConfig distributionStatisticConfig,
double scale, long stepMillis) {
super(id, clock, DistributionStatisticConfig.builder().percentilesHistogram(false).serviceLevelObjectives()
jonatan-ivanov marked this conversation as resolved.
Show resolved Hide resolved
.build().merge(distributionStatisticConfig), scale, stepMillis, false);
if (distributionStatisticConfig.isPublishingHistogram()) {
this.countBucketHistogram = new TimeWindowFixedBoundaryHistogram(clock, distributionStatisticConfig, true,
false);
}
else {
this.countBucketHistogram = null;
}
}

@Override
protected void recordNonNegative(double amount) {
super.recordNonNegative(amount);
if (this.countBucketHistogram != null) {
this.countBucketHistogram.recordDouble(amount);
}
}

@Override
public HistogramSnapshot takeSnapshot() {
HistogramSnapshot snapshot = super.takeSnapshot();
if (countBucketHistogram == null) {
return snapshot;
}

CountAtBucket[] histogramCounts = this.countBucketHistogram.takeSnapshot(0, 0, 0).histogramCounts();
return new HistogramSnapshot(snapshot.count(), snapshot.total(), snapshot.max(), snapshot.percentileValues(),
histogramCounts, snapshot::outputSummary);
}

}
Loading