From 24691ced75f403c34b29c0f852a96694c18610f2 Mon Sep 17 00:00:00 2001 From: Stuart Tettemer Date: Tue, 17 Oct 2023 15:50:42 -0500 Subject: [PATCH 1/2] Metrics: Synchronous gauges respect attributes Synchronous gauges would only accept the most recent value, regardless of matching attributes. As attributes should be treated as dimensions, we keep the most recent value with matching attributes. --- .../metrics/AbstractGaugeAdapter.java | 57 ++ .../internal/metrics/DoubleGaugeAdapter.java | 15 +- .../internal/metrics/LongGaugeAdapter.java | 16 +- .../internal/metrics/GaugeAdapterTests.java | 168 ++--- .../apm/internal/metrics/MeterRecorder.java | 122 ++++ .../metrics/RecordingMeterProvider.java | 645 ++++++++++++++++++ .../apm/internal/metrics/TestMetric.java | 18 + 7 files changed, 940 insertions(+), 101 deletions(-) create mode 100644 modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/AbstractGaugeAdapter.java create mode 100644 modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/MeterRecorder.java create mode 100644 modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/RecordingMeterProvider.java create mode 100644 modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/TestMetric.java diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/AbstractGaugeAdapter.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/AbstractGaugeAdapter.java new file mode 100644 index 0000000000000..2c5ca29de7e05 --- /dev/null +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/AbstractGaugeAdapter.java @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.Meter; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * AbstractGaugeAdapter records the latest measurement for the given {@link Attributes} for each + * time period. This is a locking implementation. + * T is the Otel instrument + * N is the value of measurement + */ +public abstract class AbstractGaugeAdapter extends AbstractInstrument { + + private ConcurrentHashMap records = new ConcurrentHashMap<>(); + private final ReentrantReadWriteLock.ReadLock addRecordLock; + private final ReentrantReadWriteLock.WriteLock popRecordsLock; + + public AbstractGaugeAdapter(Meter meter, String name, String description, String unit) { + super(meter, name, description, unit); + ReentrantReadWriteLock recordsLock = new ReentrantReadWriteLock(); + addRecordLock = recordsLock.readLock(); + popRecordsLock = recordsLock.writeLock(); + } + + protected ConcurrentHashMap popRecords() { + ConcurrentHashMap currentRecords; + ConcurrentHashMap newRecords = new ConcurrentHashMap<>(); + try { + popRecordsLock.lock(); + currentRecords = records; + records = newRecords; + } finally { + popRecordsLock.unlock(); + } + return currentRecords; + } + + protected void record(N value, Attributes attributes) { + try { + addRecordLock.lock(); + records.put(attributes, value); + } finally { + addRecordLock.unlock(); + } + } +} diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleGaugeAdapter.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleGaugeAdapter.java index 54f33be21698b..57308177c7688 100644 --- a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleGaugeAdapter.java +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/DoubleGaugeAdapter.java @@ -13,20 +13,16 @@ import java.util.Collections; import java.util.Map; import java.util.Objects; -import java.util.concurrent.atomic.AtomicReference; /** * DoubleGaugeAdapter wraps an otel ObservableDoubleMeasurement */ -public class DoubleGaugeAdapter extends AbstractInstrument +public class DoubleGaugeAdapter extends AbstractGaugeAdapter implements org.elasticsearch.telemetry.metric.DoubleGauge { - private final AtomicReference valueWithAttributes; - public DoubleGaugeAdapter(Meter meter, String name, String description, String unit) { super(meter, name, description, unit); - this.valueWithAttributes = new AtomicReference<>(new ValueWithAttributes(0.0, Collections.emptyMap())); } @Override @@ -35,10 +31,7 @@ io.opentelemetry.api.metrics.ObservableDoubleGauge buildInstrument(Meter meter) .gaugeBuilder(getName()) .setDescription(getDescription()) .setUnit(getUnit()) - .buildWithCallback(measurement -> { - var localValueWithAttributed = valueWithAttributes.get(); - measurement.record(localValueWithAttributed.value(), OtelHelper.fromMap(localValueWithAttributed.attributes())); - }); + .buildWithCallback(measurement -> popRecords().forEach((attributes, value) -> measurement.record(value, attributes))); } @Override @@ -48,8 +41,6 @@ public void record(double value) { @Override public void record(double value, Map attributes) { - this.valueWithAttributes.set(new ValueWithAttributes(value, attributes)); + record(value, OtelHelper.fromMap(attributes)); } - - private record ValueWithAttributes(double value, Map attributes) {} } diff --git a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongGaugeAdapter.java b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongGaugeAdapter.java index 66d2287a765dc..8c6695aaa2da1 100644 --- a/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongGaugeAdapter.java +++ b/modules/apm/src/main/java/org/elasticsearch/telemetry/apm/internal/metrics/LongGaugeAdapter.java @@ -13,19 +13,16 @@ import java.util.Collections; import java.util.Map; import java.util.Objects; -import java.util.concurrent.atomic.AtomicReference; /** * LongGaugeAdapter wraps an otel ObservableLongMeasurement */ -public class LongGaugeAdapter extends AbstractInstrument +public class LongGaugeAdapter extends AbstractGaugeAdapter implements org.elasticsearch.telemetry.metric.LongGauge { - private final AtomicReference valueWithAttributes; public LongGaugeAdapter(Meter meter, String name, String description, String unit) { super(meter, name, description, unit); - this.valueWithAttributes = new AtomicReference<>(new ValueWithAttributes(0L, Collections.emptyMap())); } @Override @@ -33,13 +30,10 @@ io.opentelemetry.api.metrics.ObservableLongGauge buildInstrument(Meter meter) { return Objects.requireNonNull(meter) .gaugeBuilder(getName()) - .ofLongs() .setDescription(getDescription()) .setUnit(getUnit()) - .buildWithCallback(measurement -> { - var localValueWithAttributed = valueWithAttributes.get(); - measurement.record(localValueWithAttributed.value(), OtelHelper.fromMap(localValueWithAttributed.attributes())); - }); + .ofLongs() + .buildWithCallback(measurement -> popRecords().forEach((attributes, value) -> measurement.record(value, attributes))); } @Override @@ -49,8 +43,6 @@ public void record(long value) { @Override public void record(long value, Map attributes) { - this.valueWithAttributes.set(new ValueWithAttributes(value, attributes)); + record(value, OtelHelper.fromMap(attributes)); } - - private record ValueWithAttributes(long value, Map attributes) {} } diff --git a/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/GaugeAdapterTests.java b/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/GaugeAdapterTests.java index 1e230eefe32dc..71b7ad069ef14 100644 --- a/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/GaugeAdapterTests.java +++ b/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/GaugeAdapterTests.java @@ -8,116 +8,130 @@ package org.elasticsearch.telemetry.apm.internal.metrics; -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.metrics.DoubleGaugeBuilder; -import io.opentelemetry.api.metrics.LongGaugeBuilder; -import io.opentelemetry.api.metrics.Meter; -import io.opentelemetry.api.metrics.ObservableDoubleMeasurement; -import io.opentelemetry.api.metrics.ObservableLongMeasurement; - import org.elasticsearch.test.ESTestCase; -import org.hamcrest.Matchers; import org.junit.Before; -import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; +import java.util.List; import java.util.Map; -import java.util.function.Consumer; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasSize; public class GaugeAdapterTests extends ESTestCase { - Meter testMeter = Mockito.mock(Meter.class); - LongGaugeBuilder longGaugeBuilder = Mockito.mock(LongGaugeBuilder.class); - DoubleGaugeBuilder mockDoubleGaugeBuilder = Mockito.mock(DoubleGaugeBuilder.class); + RecordingMeterProvider meter; @Before public void init() { - when(longGaugeBuilder.setDescription(Mockito.anyString())).thenReturn(longGaugeBuilder); - when(longGaugeBuilder.setUnit(Mockito.anyString())).thenReturn(longGaugeBuilder); - - - when(mockDoubleGaugeBuilder.ofLongs()).thenReturn(longGaugeBuilder); - when(mockDoubleGaugeBuilder.setUnit(Mockito.anyString())).thenReturn(mockDoubleGaugeBuilder); - when(mockDoubleGaugeBuilder.setDescription(Mockito.anyString())).thenReturn(mockDoubleGaugeBuilder); - when(testMeter.gaugeBuilder(anyString())).thenReturn(mockDoubleGaugeBuilder); + meter = new RecordingMeterProvider(); } // testing that a value reported is then used in a callback @SuppressWarnings("unchecked") public void testLongGaugeRecord() { - LongGaugeAdapter longGaugeAdapter = new LongGaugeAdapter(testMeter, "name", "desc", "unit"); + LongGaugeAdapter gauge = new LongGaugeAdapter(meter, "name", "desc", "unit"); + + // recording values; + Map m1 = Map.of("k", 1L); + Map m3 = Map.of("k", 2L); + Map m4 = Map.of("k", 1L, "j", 3L); + gauge.record(1L, m1); + gauge.record(2L, m1); + gauge.record(3L, m3); + gauge.record(4L, m4); + + meter.collectMetrics(); + + List metrics = meter.getRecorder().get(gauge); + + assertThat(metrics, hasSize(3)); + + assertThat(metrics, containsInAnyOrder(new TestMetric(2L, m1), new TestMetric(3L, m3), new TestMetric(4L, m4))); - // recording a value - longGaugeAdapter.record(1L, Map.of("k", 1L)); + meter.clearCalls(); - // upon metric export, the consumer will be called - ArgumentCaptor> captor = ArgumentCaptor.forClass(Consumer.class); - verify(longGaugeBuilder).buildWithCallback(captor.capture()); + meter.collectMetrics(); + metrics = meter.getRecorder().get(gauge); + assertThat(metrics, empty()); - Consumer value = captor.getValue(); - // making sure that a consumer will fetch the value passed down upon recording of a value - TestLongMeasurement testLongMeasurement = new TestLongMeasurement(); - value.accept(testLongMeasurement); + gauge.record(1L, m1); + gauge.record(3L, m3); - assertThat(testLongMeasurement.value, Matchers.equalTo(1L)); - assertThat(testLongMeasurement.attributes, Matchers.equalTo(Attributes.builder().put("k", 1).build())); + meter.collectMetrics(); + metrics = meter.getRecorder().get(gauge); + assertThat(metrics, hasSize(2)); + + assertThat(metrics, containsInAnyOrder(new TestMetric(1L, m1), new TestMetric(3L, m3))); } - // testing that a value reported is then used in a callback @SuppressWarnings("unchecked") public void testDoubleGaugeRecord() { - DoubleGaugeAdapter doubleGaugeAdapter = new DoubleGaugeAdapter(testMeter, "name", "desc", "unit"); + DoubleGaugeAdapter gauge = new DoubleGaugeAdapter(meter, "name", "desc", "unit"); - // recording a value - doubleGaugeAdapter.record(1.0, Map.of("k", 1.0)); + // recording values; + Map m1 = Map.of("k", 1L); + Map m3 = Map.of("k", 2L); + Map m4 = Map.of("k", 1L, "j", 3L); + gauge.record(1.0, m1); + gauge.record(2.0, m1); + gauge.record(3.0, m3); + gauge.record(4.0, m4); - // upon metric export, the consumer will be called - ArgumentCaptor> captor = ArgumentCaptor.forClass(Consumer.class); - verify(mockDoubleGaugeBuilder).buildWithCallback(captor.capture()); + meter.collectMetrics(); - Consumer value = captor.getValue(); - // making sure that a consumer will fetch the value passed down upon recording of a value - TestDoubleMeasurement testLongMeasurement = new TestDoubleMeasurement(); - value.accept(testLongMeasurement); + List metrics = meter.getRecorder().get(gauge); - assertThat(testLongMeasurement.value, Matchers.equalTo(1.0)); - assertThat(testLongMeasurement.attributes, Matchers.equalTo(Attributes.builder().put("k", 1.0).build())); - } + assertThat(metrics, hasSize(3)); - private static class TestDoubleMeasurement implements ObservableDoubleMeasurement { - double value; - Attributes attributes; + assertThat(metrics, containsInAnyOrder(new TestMetric(2.0, m1), new TestMetric(3.0, m3), new TestMetric(4.0, m4))); - @Override - public void record(double value) { - this.value = value; - } + meter.clearCalls(); + meter.collectMetrics(); + metrics = meter.getRecorder().get(gauge); + assertThat(metrics, empty()); - @Override - public void record(double value, Attributes attributes) { - this.value = value; - this.attributes = attributes; + gauge.record(1.0, m1); + gauge.record(3.0, m3); - } - } + meter.collectMetrics(); + metrics = meter.getRecorder().get(gauge); + assertThat(metrics, hasSize(2)); - private static class TestLongMeasurement implements ObservableLongMeasurement { - long value; - Attributes attributes; - - @Override - public void record(long value) { - this.value = value; - } + assertThat(metrics, containsInAnyOrder(new TestMetric(1.0, m1), new TestMetric(3.0, m3))); + } - @Override - public void record(long value, Attributes attributes) { - this.value = value; - this.attributes = attributes; + public void testDifferentLongGaugesSameValues() { + LongGaugeAdapter gauge1 = new LongGaugeAdapter(meter, "name1", "desc", "unit"); + LongGaugeAdapter gauge2 = new LongGaugeAdapter(meter, "name2", "desc", "unit"); + Map map = Map.of("k", 1L); + gauge1.record(1L, map); + gauge2.record(2L, map); + + meter.collectMetrics(); + List metrics = meter.getRecorder().get(gauge1); + assertThat(metrics, hasSize(1)); + assertThat(metrics, contains(new TestMetric(1L, map))); + + metrics = meter.getRecorder().get(gauge2); + assertThat(metrics, hasSize(1)); + assertThat(metrics, contains(new TestMetric(2L, map))); + } - } + public void testDifferentDoubleGaugesSameValues() { + DoubleGaugeAdapter gauge1 = new DoubleGaugeAdapter(meter, "name1", "desc", "unit"); + DoubleGaugeAdapter gauge2 = new DoubleGaugeAdapter(meter, "name2", "desc", "unit"); + Map map = Map.of("k", 1L); + gauge1.record(1.0, map); + gauge2.record(2.0, map); + + meter.collectMetrics(); + List metrics = meter.getRecorder().get(gauge1); + assertThat(metrics, hasSize(1)); + assertThat(metrics, contains(new TestMetric(1.0, map))); + + metrics = meter.getRecorder().get(gauge2); + assertThat(metrics, hasSize(1)); + assertThat(metrics, contains(new TestMetric(2.0, map))); } } diff --git a/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/MeterRecorder.java b/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/MeterRecorder.java new file mode 100644 index 0000000000000..db18347096d58 --- /dev/null +++ b/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/MeterRecorder.java @@ -0,0 +1,122 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import org.elasticsearch.core.Strings; +import org.elasticsearch.telemetry.metric.DoubleCounter; +import org.elasticsearch.telemetry.metric.DoubleGauge; +import org.elasticsearch.telemetry.metric.DoubleHistogram; +import org.elasticsearch.telemetry.metric.DoubleUpDownCounter; +import org.elasticsearch.telemetry.metric.Instrument; +import org.elasticsearch.telemetry.metric.LongCounter; +import org.elasticsearch.telemetry.metric.LongGauge; +import org.elasticsearch.telemetry.metric.LongHistogram; +import org.elasticsearch.telemetry.metric.LongUpDownCounter; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class MeterRecorder { + enum INSTRUMENT { + COUNTER, + UP_DOWN_COUNTER, + HISTOGRAM, + GAUGE, + GAUGE_OBSERVER + } + + private record Registration(String name, String description, String unit) { + Registration { + Objects.requireNonNull(name); + Objects.requireNonNull(description); + Objects.requireNonNull(unit); + } + }; + + private record RegisteredMetric(Map registered, Map> called) { + void register(String name, String description, String unit) { + assert registered.containsKey(name) == false + : Strings.format("unexpected [{}]: [{}][{}], already registered[{}]", name, description, unit, registered.get(name)); + registered.put(name, new Registration(name, description, unit)); + } + + void call(String name, TestMetric call) { + assert registered.containsKey(name) : Strings.format("call for unregistered metric [{}]: [{}]", name, call); + called.computeIfAbsent(Objects.requireNonNull(name), k -> new ArrayList<>()).add(call); + } + } + + private final Map doubles; + private final Map longs; + + MeterRecorder() { + doubles = new HashMap<>(INSTRUMENT.values().length); + longs = new HashMap<>(INSTRUMENT.values().length); + for (var instrument : INSTRUMENT.values()) { + doubles.put(instrument, new RegisteredMetric(new HashMap<>(), new HashMap<>())); + longs.put(instrument, new RegisteredMetric(new HashMap<>(), new HashMap<>())); + } + } + + public void clearCalls() { + doubles.forEach((inst, rm) -> rm.called.clear()); + longs.forEach((inst, rm) -> rm.called.clear()); + } + + void registerDouble(INSTRUMENT instrument, String name, String description, String unit) { + doubles.get(Objects.requireNonNull(instrument)).register(name, description, unit); + } + + void registerLong(INSTRUMENT instrument, String name, String description, String unit) { + longs.get(Objects.requireNonNull(instrument)).register(name, description, unit); + } + + void call(INSTRUMENT instrument, String name, double value, Map attributes) { + doubles.get(Objects.requireNonNull(instrument)).call(name, new TestMetric(value, attributes)); + } + + void call(INSTRUMENT instrument, String name, long value, Map attributes) { + longs.get(Objects.requireNonNull(instrument)).call(name, new TestMetric(value, attributes)); + } + + List getDouble(INSTRUMENT instrument, String name) { + return doubles.get(Objects.requireNonNull(instrument)).called.getOrDefault(Objects.requireNonNull(name), Collections.emptyList()); + } + + List getLong(INSTRUMENT instrument, String name) { + return longs.get(Objects.requireNonNull(instrument)).called.getOrDefault(Objects.requireNonNull(name), Collections.emptyList()); + } + + List get(Instrument instrument) { + Objects.requireNonNull(instrument); + if (instrument instanceof DoubleCounter) { + return getDouble(MeterRecorder.INSTRUMENT.COUNTER, instrument.getName()); + } else if (instrument instanceof LongCounter) { + return getLong(MeterRecorder.INSTRUMENT.COUNTER, instrument.getName()); + } else if (instrument instanceof DoubleUpDownCounter) { + return getDouble(MeterRecorder.INSTRUMENT.UP_DOWN_COUNTER, instrument.getName()); + } else if (instrument instanceof LongUpDownCounter) { + return getLong(MeterRecorder.INSTRUMENT.UP_DOWN_COUNTER, instrument.getName()); + } else if (instrument instanceof DoubleHistogram) { + return getDouble(MeterRecorder.INSTRUMENT.HISTOGRAM, instrument.getName()); + } else if (instrument instanceof LongHistogram) { + return getLong(MeterRecorder.INSTRUMENT.HISTOGRAM, instrument.getName()); + } else if (instrument instanceof DoubleGauge) { + return getDouble(MeterRecorder.INSTRUMENT.GAUGE_OBSERVER, instrument.getName()); + } else if (instrument instanceof LongGauge) { + return getLong(MeterRecorder.INSTRUMENT.GAUGE_OBSERVER, instrument.getName()); + } else { + throw new IllegalArgumentException("unknown instrument [" + instrument.getClass().getName() + "]"); + } + } +} diff --git a/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/RecordingMeterProvider.java b/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/RecordingMeterProvider.java new file mode 100644 index 0000000000000..740f9841e8ffd --- /dev/null +++ b/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/RecordingMeterProvider.java @@ -0,0 +1,645 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleCounter; +import io.opentelemetry.api.metrics.DoubleCounterBuilder; +import io.opentelemetry.api.metrics.DoubleGaugeBuilder; +import io.opentelemetry.api.metrics.DoubleHistogram; +import io.opentelemetry.api.metrics.DoubleHistogramBuilder; +import io.opentelemetry.api.metrics.DoubleUpDownCounter; +import io.opentelemetry.api.metrics.DoubleUpDownCounterBuilder; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongCounterBuilder; +import io.opentelemetry.api.metrics.LongGaugeBuilder; +import io.opentelemetry.api.metrics.LongHistogram; +import io.opentelemetry.api.metrics.LongHistogramBuilder; +import io.opentelemetry.api.metrics.LongUpDownCounter; +import io.opentelemetry.api.metrics.LongUpDownCounterBuilder; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableDoubleCounter; +import io.opentelemetry.api.metrics.ObservableDoubleGauge; +import io.opentelemetry.api.metrics.ObservableDoubleMeasurement; +import io.opentelemetry.api.metrics.ObservableDoubleUpDownCounter; +import io.opentelemetry.api.metrics.ObservableLongCounter; +import io.opentelemetry.api.metrics.ObservableLongGauge; +import io.opentelemetry.api.metrics.ObservableLongMeasurement; +import io.opentelemetry.api.metrics.ObservableLongUpDownCounter; +import io.opentelemetry.context.Context; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.function.Consumer; + +public class RecordingMeterProvider implements Meter { + + Queue doubleCallbacks = new ConcurrentLinkedQueue<>(); + Queue longCallbacks = new ConcurrentLinkedQueue<>(); + + public void collectMetrics() { + doubleCallbacks.forEach(DoubleGaugeRecorder::doCall); + longCallbacks.forEach(LongGaugeRecorder::doCall); + } + + public void clearCalls() { + recorder.clearCalls(); + } + + public MeterRecorder getRecorder() { + return recorder; + } + + private MeterRecorder recorder = new MeterRecorder(); + + @Override + public LongCounterBuilder counterBuilder(String name) { + return new RecordingLongCounterBuilder(name); + } + + @Override + public LongUpDownCounterBuilder upDownCounterBuilder(String name) { + return new RecordingLongUpDownBuilder(name); + } + + @Override + public DoubleHistogramBuilder histogramBuilder(String name) { + return new RecordingDoubleHistogramBuilder(name); + } + + @Override + public DoubleGaugeBuilder gaugeBuilder(String name) { + return new RecordingDoubleGaugeBuilder(name); + } + + // Counter + private class RecordingLongCounterBuilder extends AbstractBuilder implements LongCounterBuilder { + RecordingLongCounterBuilder(String name) { + super(name); + } + + @Override + public LongCounterBuilder setDescription(String description) { + innerSetDescription(description); + return this; + } + + @Override + public LongCounterBuilder setUnit(String unit) { + innerSetUnit(unit); + return this; + } + + @Override + public DoubleCounterBuilder ofDoubles() { + return new RecordingDoubleCounterBuilder(this); + } + + @Override + public LongCounter build() { + LongRecorder counter = new LongRecorder(name); + recorder.registerLong(counter.getInstrument(), name, description, unit); + return counter; + } + + @Override + public ObservableLongCounter buildWithCallback(Consumer callback) { + unimplemented(); + return null; + } + + @Override + public ObservableLongMeasurement buildObserver() { + unimplemented(); + return null; + } + } + + private class LongRecorder extends LongUpDownRecorder implements LongCounter { + LongRecorder(String name) { + super(name, MeterRecorder.INSTRUMENT.COUNTER); + } + + @Override + public void add(long value) { + assert value >= 0; + super.add(value); + } + + @Override + public void add(long value, Attributes attributes) { + assert value >= 0; + super.add(value, attributes); + } + + @Override + public void add(long value, Attributes attributes, Context context) { + assert value >= 0; + super.add(value, attributes, context); + } + } + + private class RecordingDoubleCounterBuilder extends AbstractBuilder implements DoubleCounterBuilder { + + RecordingDoubleCounterBuilder(AbstractBuilder other) { + super(other); + } + + @Override + public DoubleCounterBuilder setDescription(String description) { + innerSetDescription(description); + return this; + } + + @Override + public DoubleCounterBuilder setUnit(String unit) { + innerSetUnit(unit); + return this; + } + + @Override + public DoubleCounter build() { + DoubleRecorder counter = new DoubleRecorder(name); + recorder.registerDouble(counter.getInstrument(), name, description, unit); + return counter; + } + + @Override + public ObservableDoubleCounter buildWithCallback(Consumer callback) { + unimplemented(); + return null; + } + + @Override + public ObservableDoubleMeasurement buildObserver() { + unimplemented(); + return null; + } + } + + private class DoubleRecorder extends DoubleUpDownRecorder implements DoubleCounter { + DoubleRecorder(String name) { + super(name, MeterRecorder.INSTRUMENT.COUNTER); + } + + @Override + public void add(double value) { + assert value >= 0; + super.add(value); + } + + @Override + public void add(double value, Attributes attributes) { + assert value >= 0; + super.add(value, attributes); + } + + @Override + public void add(double value, Attributes attributes, Context context) { + assert value >= 0; + super.add(value, attributes, context); + } + } + + private class RecordingLongUpDownBuilder extends AbstractBuilder implements LongUpDownCounterBuilder { + RecordingLongUpDownBuilder(String name) { + super(name); + } + + @Override + public LongUpDownCounterBuilder setDescription(String description) { + innerSetDescription(description); + return this; + } + + @Override + public LongUpDownCounterBuilder setUnit(String unit) { + innerSetUnit(unit); + return this; + } + + @Override + public DoubleUpDownCounterBuilder ofDoubles() { + return new RecordingDoubleUpDownBuilder(this); + } + + @Override + public LongUpDownCounter build() { + LongUpDownRecorder counter = new LongUpDownRecorder(name); + recorder.registerLong(counter.getInstrument(), name, description, unit); + return counter; + } + + @Override + public ObservableLongUpDownCounter buildWithCallback(Consumer callback) { + unimplemented(); + return null; + } + + @Override + public ObservableLongMeasurement buildObserver() { + unimplemented(); + return null; + } + } + + private class LongUpDownRecorder extends AbstractInstrument implements LongUpDownCounter { + LongUpDownRecorder(String name) { + super(name, MeterRecorder.INSTRUMENT.UP_DOWN_COUNTER); + } + + protected LongUpDownRecorder(String name, MeterRecorder.INSTRUMENT instrument) { + // used by LongRecorder + super(name, instrument); + } + + @Override + public void add(long value) { + recorder.call(instrument, name, value, null); + } + + @Override + public void add(long value, Attributes attributes) { + recorder.call(instrument, name, value, toMap(attributes)); + } + + @Override + public void add(long value, Attributes attributes, Context context) { + unimplemented(); + } + } + + private class RecordingDoubleUpDownBuilder extends AbstractBuilder implements DoubleUpDownCounterBuilder { + + RecordingDoubleUpDownBuilder(AbstractBuilder other) { + super(other); + } + + @Override + public DoubleUpDownCounterBuilder setDescription(String description) { + innerSetDescription(description); + return this; + } + + @Override + public DoubleUpDownCounterBuilder setUnit(String unit) { + innerSetUnit(unit); + return this; + } + + @Override + public DoubleUpDownCounter build() { + DoubleUpDownRecorder counter = new DoubleUpDownRecorder(name); + recorder.registerDouble(counter.getInstrument(), name, description, unit); + return counter; + } + + @Override + public ObservableDoubleUpDownCounter buildWithCallback(Consumer callback) { + unimplemented(); + return null; + } + + @Override + public ObservableDoubleMeasurement buildObserver() { + unimplemented(); + return null; + } + } + + private class DoubleUpDownRecorder extends AbstractInstrument implements DoubleUpDownCounter { + DoubleUpDownRecorder(String name) { + super(name, MeterRecorder.INSTRUMENT.UP_DOWN_COUNTER); + } + + protected DoubleUpDownRecorder(String name, MeterRecorder.INSTRUMENT instrument) { + // used by DoubleRecorder + super(name, instrument); + } + + @Override + public void add(double value) { + recorder.call(instrument, name, value, null); + } + + @Override + public void add(double value, Attributes attributes) { + recorder.call(instrument, name, value, toMap(attributes)); + } + + @Override + public void add(double value, Attributes attributes, Context context) { + unimplemented(); + } + } + + abstract static class AbstractInstrument { + protected final String name; + protected final MeterRecorder.INSTRUMENT instrument; + + AbstractInstrument(String name, MeterRecorder.INSTRUMENT instrument) { + this.name = name; + this.instrument = instrument; + } + + public MeterRecorder.INSTRUMENT getInstrument() { + return instrument; + } + + protected void unimplemented() { + throw new UnsupportedOperationException("unimplemented"); + } + + Map toMap(Attributes attributes) { + if (attributes == null) { + return null; + } + if (attributes.isEmpty()) { + return Collections.emptyMap(); + } + Map map = new HashMap<>(attributes.size()); + attributes.forEach((k, v) -> map.put(k.getKey(), v)); + return map; + } + } + + // Gauges + private class RecordingDoubleGaugeBuilder extends AbstractBuilder implements DoubleGaugeBuilder { + RecordingDoubleGaugeBuilder(String name) { + super(name); + } + + @Override + public DoubleGaugeBuilder setDescription(String description) { + innerSetDescription(description); + return this; + } + + @Override + public DoubleGaugeBuilder setUnit(String unit) { + innerSetUnit(unit); + return this; + } + + @Override + public LongGaugeBuilder ofLongs() { + return new RecordingLongGaugeBuilder(this); + } + + @Override + public ObservableDoubleGauge buildWithCallback(Consumer callback) { + DoubleGaugeRecorder gauge = new DoubleGaugeRecorder(name, callback); + recorder.registerDouble(gauge.getInstrument(), name, description, unit); + doubleCallbacks.add(gauge); + return gauge; + } + + @Override + public ObservableDoubleMeasurement buildObserver() { + DoubleMeasurementRecorder measurement = new DoubleMeasurementRecorder(name); + recorder.registerDouble(measurement.getInstrument(), name, description, unit); + return measurement; + } + } + + private class DoubleGaugeRecorder extends AbstractInstrument implements ObservableDoubleGauge { + final Consumer callback; + + DoubleGaugeRecorder(String name, Consumer callback) { + super(name, MeterRecorder.INSTRUMENT.GAUGE_OBSERVER); + this.callback = callback; + } + + @Override + public void close() { + doubleCallbacks.remove(this); + } + + void doCall() { + callback.accept(new DoubleMeasurementRecorder(name, instrument)); + } + } + + private class DoubleMeasurementRecorder extends AbstractInstrument implements ObservableDoubleMeasurement { + DoubleMeasurementRecorder(String name, MeterRecorder.INSTRUMENT instrument) { + super(name, instrument); + } + + DoubleMeasurementRecorder(String name) { + super(name, MeterRecorder.INSTRUMENT.GAUGE); + } + + @Override + public void record(double value) { + recorder.call(instrument, name, value, null); + } + + @Override + public void record(double value, Attributes attributes) { + recorder.call(instrument, name, value, toMap(attributes)); + } + } + + private class RecordingLongGaugeBuilder extends AbstractBuilder implements LongGaugeBuilder { + RecordingLongGaugeBuilder(AbstractBuilder other) { + super(other); + } + + @Override + public LongGaugeBuilder setDescription(String description) { + innerSetDescription(description); + return this; + } + + @Override + public LongGaugeBuilder setUnit(String unit) { + innerSetUnit(unit); + return this; + } + + @Override + public ObservableLongGauge buildWithCallback(Consumer callback) { + LongGaugeRecorder gauge = new LongGaugeRecorder(name, callback); + recorder.registerLong(gauge.getInstrument(), name, description, unit); + longCallbacks.add(gauge); + return gauge; + } + + @Override + public ObservableLongMeasurement buildObserver() { + LongMeasurementRecorder measurement = new LongMeasurementRecorder(name); + recorder.registerLong(measurement.getInstrument(), name, description, unit); + return measurement; + } + } + + private class LongGaugeRecorder extends AbstractInstrument implements ObservableLongGauge { + final Consumer callback; + + LongGaugeRecorder(String name, Consumer callback) { + super(name, MeterRecorder.INSTRUMENT.GAUGE_OBSERVER); + this.callback = callback; + } + + @Override + public void close() { + longCallbacks.remove(this); + } + + void doCall() { + callback.accept(new LongMeasurementRecorder(name, instrument)); + } + } + + private class LongMeasurementRecorder extends AbstractInstrument implements ObservableLongMeasurement { + LongMeasurementRecorder(String name, MeterRecorder.INSTRUMENT instrument) { + super(name, instrument); + } + + LongMeasurementRecorder(String name) { + super(name, MeterRecorder.INSTRUMENT.GAUGE); + } + + @Override + public void record(long value) { + recorder.call(instrument, name, value, null); + } + + @Override + public void record(long value, Attributes attributes) { + recorder.call(instrument, name, value, toMap(attributes)); + } + } + + // Histograms + private class RecordingDoubleHistogramBuilder extends AbstractBuilder implements DoubleHistogramBuilder { + RecordingDoubleHistogramBuilder(String name) { + super(name); + } + + @Override + public DoubleHistogramBuilder setDescription(String description) { + innerSetDescription(description); + return this; + } + + @Override + public DoubleHistogramBuilder setUnit(String unit) { + innerSetUnit(unit); + return this; + } + + @Override + public LongHistogramBuilder ofLongs() { + return new RecordingLongHistogramBuilder(this); + } + + @Override + public DoubleHistogram build() { + return new DoubleHistogramRecorder(name); + } + } + + private class DoubleHistogramRecorder extends AbstractInstrument implements DoubleHistogram { + DoubleHistogramRecorder(String name) { + super(name, MeterRecorder.INSTRUMENT.HISTOGRAM); + } + + @Override + public void record(double value) { + recorder.call(getInstrument(), name, value, null); + } + + @Override + public void record(double value, Attributes attributes) { + recorder.call(getInstrument(), name, value, toMap(attributes)); + } + + @Override + public void record(double value, Attributes attributes, Context context) { + unimplemented(); + } + } + + private class RecordingLongHistogramBuilder extends AbstractBuilder implements LongHistogramBuilder { + + RecordingLongHistogramBuilder(AbstractBuilder other) { + super(other); + } + + @Override + public LongHistogramBuilder setDescription(String description) { + innerSetDescription(description); + return this; + } + + @Override + public LongHistogramBuilder setUnit(String unit) { + innerSetUnit(unit); + return this; + } + + @Override + public LongHistogram build() { + return new LongHistogramRecorder(name); + } + } + + private class LongHistogramRecorder extends AbstractInstrument implements LongHistogram { + LongHistogramRecorder(String name) { + super(name, MeterRecorder.INSTRUMENT.HISTOGRAM); + } + + @Override + public void record(long value) { + recorder.call(getInstrument(), name, value, null); + } + + @Override + public void record(long value, Attributes attributes) { + recorder.call(getInstrument(), name, value, toMap(attributes)); + } + + @Override + public void record(long value, Attributes attributes, Context context) { + unimplemented(); + } + } + + abstract static class AbstractBuilder { + protected final String name; + protected String description; + protected String unit; + + AbstractBuilder(String name) { + this.name = name; + } + + AbstractBuilder(AbstractBuilder other) { + this.name = other.name; + this.description = other.description; + this.unit = other.unit; + } + + void innerSetDescription(String description) { + this.description = description; + } + + void innerSetUnit(String unit) { + this.unit = unit; + } + + protected void unimplemented() { + throw new UnsupportedOperationException("unimplemented"); + } + } +} diff --git a/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/TestMetric.java b/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/TestMetric.java new file mode 100644 index 0000000000000..6432d303a3495 --- /dev/null +++ b/modules/apm/src/test/java/org/elasticsearch/telemetry/apm/internal/metrics/TestMetric.java @@ -0,0 +1,18 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.telemetry.apm.internal.metrics; + +import java.util.Map; +import java.util.Objects; + +public record TestMetric(Number number, Map attributes) { + public TestMetric { + Objects.requireNonNull(number); + } +} From f35adb7b83f0a4c398e5542e5d16c38440bf0b7a Mon Sep 17 00:00:00 2001 From: Stuart Tettemer Date: Tue, 17 Oct 2023 15:53:38 -0500 Subject: [PATCH 2/2] Update docs/changelog/101018.yaml --- docs/changelog/101018.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/101018.yaml diff --git a/docs/changelog/101018.yaml b/docs/changelog/101018.yaml new file mode 100644 index 0000000000000..753ca2903b0a1 --- /dev/null +++ b/docs/changelog/101018.yaml @@ -0,0 +1,5 @@ +pr: 101018 +summary: "Metrics: Synchronous gauges respect attributes" +area: Infra/Core +type: bug +issues: []