Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics: Synchronous gauges respect attributes #101018

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/101018.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 101018
summary: "Metrics: Synchronous gauges respect attributes"
area: Infra/Core
type: bug
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.telemetry.apm.internal.metrics;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.Meter;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* AbstractGaugeAdapter records the latest measurement for the given {@link Attributes} for each
* time period. This is a locking implementation.
* T is the Otel instrument
* N is the value of measurement
*/
public abstract class AbstractGaugeAdapter<T, N extends Number> extends AbstractInstrument<T> {

private ConcurrentHashMap<Attributes, N> records = new ConcurrentHashMap<>();
private final ReentrantReadWriteLock.ReadLock addRecordLock;
pgomulka marked this conversation as resolved.
Show resolved Hide resolved
private final ReentrantReadWriteLock.WriteLock popRecordsLock;

public AbstractGaugeAdapter(Meter meter, String name, String description, String unit) {
super(meter, name, description, unit);
ReentrantReadWriteLock recordsLock = new ReentrantReadWriteLock();
addRecordLock = recordsLock.readLock();
popRecordsLock = recordsLock.writeLock();
}

protected ConcurrentHashMap<Attributes, N> popRecords() {
ConcurrentHashMap<Attributes, N> currentRecords;
ConcurrentHashMap<Attributes, N> newRecords = new ConcurrentHashMap<>();
try {
popRecordsLock.lock();
currentRecords = records;
records = newRecords;
} finally {
popRecordsLock.unlock();
}
return currentRecords;
}

protected void record(N value, Attributes attributes) {
try {
addRecordLock.lock();
records.put(attributes, value);
} finally {
addRecordLock.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,16 @@
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;

/**
* DoubleGaugeAdapter wraps an otel ObservableDoubleMeasurement
*/
public class DoubleGaugeAdapter extends AbstractInstrument<io.opentelemetry.api.metrics.ObservableDoubleGauge>
public class DoubleGaugeAdapter extends AbstractGaugeAdapter<io.opentelemetry.api.metrics.ObservableDoubleGauge, Double>
implements
org.elasticsearch.telemetry.metric.DoubleGauge {

private final AtomicReference<ValueWithAttributes> valueWithAttributes;

public DoubleGaugeAdapter(Meter meter, String name, String description, String unit) {
super(meter, name, description, unit);
this.valueWithAttributes = new AtomicReference<>(new ValueWithAttributes(0.0, Collections.emptyMap()));
}

@Override
Expand All @@ -35,10 +31,7 @@ io.opentelemetry.api.metrics.ObservableDoubleGauge buildInstrument(Meter meter)
.gaugeBuilder(getName())
.setDescription(getDescription())
.setUnit(getUnit())
.buildWithCallback(measurement -> {
var localValueWithAttributed = valueWithAttributes.get();
measurement.record(localValueWithAttributed.value(), OtelHelper.fromMap(localValueWithAttributed.attributes()));
});
.buildWithCallback(measurement -> popRecords().forEach((attributes, value) -> measurement.record(value, attributes)));
}

@Override
Expand All @@ -48,8 +41,6 @@ public void record(double value) {

@Override
public void record(double value, Map<String, Object> attributes) {
this.valueWithAttributes.set(new ValueWithAttributes(value, attributes));
record(value, OtelHelper.fromMap(attributes));
}

private record ValueWithAttributes(double value, Map<String, Object> attributes) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,33 +13,27 @@
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;

/**
* LongGaugeAdapter wraps an otel ObservableLongMeasurement
*/
public class LongGaugeAdapter extends AbstractInstrument<io.opentelemetry.api.metrics.ObservableLongGauge>
public class LongGaugeAdapter extends AbstractGaugeAdapter<io.opentelemetry.api.metrics.ObservableLongGauge, Long>
implements
org.elasticsearch.telemetry.metric.LongGauge {
private final AtomicReference<ValueWithAttributes> valueWithAttributes;

public LongGaugeAdapter(Meter meter, String name, String description, String unit) {
super(meter, name, description, unit);
this.valueWithAttributes = new AtomicReference<>(new ValueWithAttributes(0L, Collections.emptyMap()));
}

@Override
io.opentelemetry.api.metrics.ObservableLongGauge buildInstrument(Meter meter) {

return Objects.requireNonNull(meter)
.gaugeBuilder(getName())
.ofLongs()
.setDescription(getDescription())
.setUnit(getUnit())
.buildWithCallback(measurement -> {
var localValueWithAttributed = valueWithAttributes.get();
measurement.record(localValueWithAttributed.value(), OtelHelper.fromMap(localValueWithAttributed.attributes()));
});
.ofLongs()
.buildWithCallback(measurement -> popRecords().forEach((attributes, value) -> measurement.record(value, attributes)));
}

@Override
Expand All @@ -49,8 +43,6 @@ public void record(long value) {

@Override
public void record(long value, Map<String, Object> attributes) {
this.valueWithAttributes.set(new ValueWithAttributes(value, attributes));
record(value, OtelHelper.fromMap(attributes));
}

private record ValueWithAttributes(long value, Map<String, Object> attributes) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,116 +8,130 @@

package org.elasticsearch.telemetry.apm.internal.metrics;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleGaugeBuilder;
import io.opentelemetry.api.metrics.LongGaugeBuilder;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableDoubleMeasurement;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;

import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;

public class GaugeAdapterTests extends ESTestCase {
Meter testMeter = Mockito.mock(Meter.class);
LongGaugeBuilder longGaugeBuilder = Mockito.mock(LongGaugeBuilder.class);
DoubleGaugeBuilder mockDoubleGaugeBuilder = Mockito.mock(DoubleGaugeBuilder.class);
RecordingMeterProvider meter;

@Before
public void init() {
when(longGaugeBuilder.setDescription(Mockito.anyString())).thenReturn(longGaugeBuilder);
when(longGaugeBuilder.setUnit(Mockito.anyString())).thenReturn(longGaugeBuilder);


when(mockDoubleGaugeBuilder.ofLongs()).thenReturn(longGaugeBuilder);
when(mockDoubleGaugeBuilder.setUnit(Mockito.anyString())).thenReturn(mockDoubleGaugeBuilder);
when(mockDoubleGaugeBuilder.setDescription(Mockito.anyString())).thenReturn(mockDoubleGaugeBuilder);
when(testMeter.gaugeBuilder(anyString())).thenReturn(mockDoubleGaugeBuilder);
meter = new RecordingMeterProvider();
}

// testing that a value reported is then used in a callback
@SuppressWarnings("unchecked")
public void testLongGaugeRecord() {
LongGaugeAdapter longGaugeAdapter = new LongGaugeAdapter(testMeter, "name", "desc", "unit");
LongGaugeAdapter gauge = new LongGaugeAdapter(meter, "name", "desc", "unit");

// recording values;
Map<String, Object> m1 = Map.of("k", 1L);
Map<String, Object> m3 = Map.of("k", 2L);
Map<String, Object> m4 = Map.of("k", 1L, "j", 3L);
gauge.record(1L, m1);
gauge.record(2L, m1);
gauge.record(3L, m3);
gauge.record(4L, m4);

meter.collectMetrics();

List<TestMetric> metrics = meter.getRecorder().get(gauge);

assertThat(metrics, hasSize(3));

assertThat(metrics, containsInAnyOrder(new TestMetric(2L, m1), new TestMetric(3L, m3), new TestMetric(4L, m4)));

// recording a value
longGaugeAdapter.record(1L, Map.of("k", 1L));
meter.clearCalls();

// upon metric export, the consumer will be called
ArgumentCaptor<Consumer<ObservableLongMeasurement>> captor = ArgumentCaptor.forClass(Consumer.class);
verify(longGaugeBuilder).buildWithCallback(captor.capture());
meter.collectMetrics();
metrics = meter.getRecorder().get(gauge);
assertThat(metrics, empty());

Consumer<ObservableLongMeasurement> value = captor.getValue();
// making sure that a consumer will fetch the value passed down upon recording of a value
TestLongMeasurement testLongMeasurement = new TestLongMeasurement();
value.accept(testLongMeasurement);
gauge.record(1L, m1);
gauge.record(3L, m3);

assertThat(testLongMeasurement.value, Matchers.equalTo(1L));
assertThat(testLongMeasurement.attributes, Matchers.equalTo(Attributes.builder().put("k", 1).build()));
meter.collectMetrics();
metrics = meter.getRecorder().get(gauge);
assertThat(metrics, hasSize(2));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you check containsInAnyOrder you don't need to check for a size. It would fail if an actual collection has less,more elements then expected matchers


assertThat(metrics, containsInAnyOrder(new TestMetric(1L, m1), new TestMetric(3L, m3)));
}

// testing that a value reported is then used in a callback
@SuppressWarnings("unchecked")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we can remove this now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to remove it, since we no longer have an unchecked cast here

public void testDoubleGaugeRecord() {
DoubleGaugeAdapter doubleGaugeAdapter = new DoubleGaugeAdapter(testMeter, "name", "desc", "unit");
DoubleGaugeAdapter gauge = new DoubleGaugeAdapter(meter, "name", "desc", "unit");

// recording a value
doubleGaugeAdapter.record(1.0, Map.of("k", 1.0));
// recording values;
Map<String, Object> m1 = Map.of("k", 1L);
Map<String, Object> m3 = Map.of("k", 2L);
Map<String, Object> m4 = Map.of("k", 1L, "j", 3L);
gauge.record(1.0, m1);
gauge.record(2.0, m1);
gauge.record(3.0, m3);
gauge.record(4.0, m4);

// upon metric export, the consumer will be called
ArgumentCaptor<Consumer<ObservableDoubleMeasurement>> captor = ArgumentCaptor.forClass(Consumer.class);
verify(mockDoubleGaugeBuilder).buildWithCallback(captor.capture());
meter.collectMetrics();

Consumer<ObservableDoubleMeasurement> value = captor.getValue();
// making sure that a consumer will fetch the value passed down upon recording of a value
TestDoubleMeasurement testLongMeasurement = new TestDoubleMeasurement();
value.accept(testLongMeasurement);
List<TestMetric> metrics = meter.getRecorder().get(gauge);

assertThat(testLongMeasurement.value, Matchers.equalTo(1.0));
assertThat(testLongMeasurement.attributes, Matchers.equalTo(Attributes.builder().put("k", 1.0).build()));
}
assertThat(metrics, hasSize(3));

private static class TestDoubleMeasurement implements ObservableDoubleMeasurement {
double value;
Attributes attributes;
assertThat(metrics, containsInAnyOrder(new TestMetric(2.0, m1), new TestMetric(3.0, m3), new TestMetric(4.0, m4)));

@Override
public void record(double value) {
this.value = value;
}
meter.clearCalls();
meter.collectMetrics();
metrics = meter.getRecorder().get(gauge);
assertThat(metrics, empty());

@Override
public void record(double value, Attributes attributes) {
this.value = value;
this.attributes = attributes;
gauge.record(1.0, m1);
gauge.record(3.0, m3);

}
}
meter.collectMetrics();
metrics = meter.getRecorder().get(gauge);
assertThat(metrics, hasSize(2));

private static class TestLongMeasurement implements ObservableLongMeasurement {
long value;
Attributes attributes;

@Override
public void record(long value) {
this.value = value;
}
assertThat(metrics, containsInAnyOrder(new TestMetric(1.0, m1), new TestMetric(3.0, m3)));
}

@Override
public void record(long value, Attributes attributes) {
this.value = value;
this.attributes = attributes;
public void testDifferentLongGaugesSameValues() {
LongGaugeAdapter gauge1 = new LongGaugeAdapter(meter, "name1", "desc", "unit");
LongGaugeAdapter gauge2 = new LongGaugeAdapter(meter, "name2", "desc", "unit");
Map<String, Object> map = Map.of("k", 1L);
gauge1.record(1L, map);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this test called same values?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh it should be same attributes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we then rename the test?

gauge2.record(2L, map);

meter.collectMetrics();
List<TestMetric> metrics = meter.getRecorder().get(gauge1);
assertThat(metrics, hasSize(1));
assertThat(metrics, contains(new TestMetric(1L, map)));

metrics = meter.getRecorder().get(gauge2);
assertThat(metrics, hasSize(1));
assertThat(metrics, contains(new TestMetric(2L, map)));
}

}
public void testDifferentDoubleGaugesSameValues() {
DoubleGaugeAdapter gauge1 = new DoubleGaugeAdapter(meter, "name1", "desc", "unit");
DoubleGaugeAdapter gauge2 = new DoubleGaugeAdapter(meter, "name2", "desc", "unit");
Map<String, Object> map = Map.of("k", 1L);
gauge1.record(1.0, map);
gauge2.record(2.0, map);

meter.collectMetrics();
List<TestMetric> metrics = meter.getRecorder().get(gauge1);
assertThat(metrics, hasSize(1));
assertThat(metrics, contains(new TestMetric(1.0, map)));

metrics = meter.getRecorder().get(gauge2);
assertThat(metrics, hasSize(1));
assertThat(metrics, contains(new TestMetric(2.0, map)));
}
}
Loading