Skip to content

Commit

Permalink
Memory Mode support: Adding memory mode, and implementing it for Asyn…
Browse files Browse the repository at this point in the history
…chronous Instruments (#5709)

Co-authored-by: jack-berg <34418638+jack-berg@users.noreply.github.com>
  • Loading branch information
asafm and jack-berg authored Sep 26, 2023
1 parent 9a93155 commit b0c337b
Show file tree
Hide file tree
Showing 47 changed files with 2,620 additions and 156 deletions.
5 changes: 3 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ requirements and recommendations.

If you want to add new features or change behavior, please make sure your changes follow the
[OpenTelemetry Specification](https://github.com/open-telemetry/opentelemetry-specification).
Otherwise file an issue or submit a PR to the specification repo first.
Otherwise, file an issue or submit a PR to the specification repo first.

Make sure to review the projects [license](LICENSE) and sign the
[CNCF CLA](https://identity.linuxfoundation.org/projects/cncf). A signed CLA will be enforced by an
Expand Down Expand Up @@ -52,7 +52,8 @@ $ ./gradlew check

Note: this gradle task will potentially generate changes to files in
the `docs/apidiffs/current_vs_latest`
directory. Please make sure to include any changes to these files in your pull request.
directory. Please make sure to include any changes to these files in your pull request (i.e.
add those files to your commits in the PR).

## PR Review

Expand Down
1 change: 1 addition & 0 deletions dependencyManagement/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ val DEPENDENCIES = listOf(
"io.opencensus:opencensus-contrib-exemplar-util:${opencensusVersion}",
"org.openjdk.jmh:jmh-core:${jmhVersion}",
"org.openjdk.jmh:jmh-generator-bytecode:${jmhVersion}",
"org.openjdk.jmh:jmh-generator-annprocess:${jmhVersion}",
"org.mockito:mockito-core:${mockitoVersion}",
"org.mockito:mockito-junit-jupiter:${mockitoVersion}",
"org.slf4j:slf4j-simple:${slf4jVersion}",
Expand Down
11 changes: 10 additions & 1 deletion docs/apidiffs/current_vs_latest/opentelemetry-sdk-common.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,11 @@
Comparing source compatibility of against
No changes.
+++ NEW ENUM: PUBLIC(+) FINAL(+) io.opentelemetry.sdk.common.export.MemoryMode (compatible)
+++ CLASS FILE FORMAT VERSION: 52.0 <- n.a.
+++ NEW INTERFACE: java.lang.constant.Constable
+++ NEW INTERFACE: java.lang.Comparable
+++ NEW INTERFACE: java.io.Serializable
+++ NEW SUPERCLASS: java.lang.Enum
+++ NEW FIELD: PUBLIC(+) STATIC(+) FINAL(+) io.opentelemetry.sdk.common.export.MemoryMode REUSABLE_DATA
+++ NEW FIELD: PUBLIC(+) STATIC(+) FINAL(+) io.opentelemetry.sdk.common.export.MemoryMode IMMUTABLE_DATA
+++ NEW METHOD: PUBLIC(+) STATIC(+) io.opentelemetry.sdk.common.export.MemoryMode valueOf(java.lang.String)
+++ NEW METHOD: PUBLIC(+) STATIC(+) io.opentelemetry.sdk.common.export.MemoryMode[] values()
10 changes: 9 additions & 1 deletion docs/apidiffs/current_vs_latest/opentelemetry-sdk-metrics.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,10 @@
Comparing source compatibility of against
No changes.
*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.sdk.metrics.export.MetricExporter (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.common.export.MemoryMode getMemoryMode()
*** MODIFIED INTERFACE: PUBLIC ABSTRACT io.opentelemetry.sdk.metrics.export.MetricReader (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.common.export.MemoryMode getMemoryMode()
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.sdk.metrics.export.PeriodicMetricReader (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.common.export.MemoryMode getMemoryMode()
12 changes: 11 additions & 1 deletion docs/apidiffs/current_vs_latest/opentelemetry-sdk-testing.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,12 @@
Comparing source compatibility of against
No changes.
*** MODIFIED CLASS: PUBLIC io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) STATIC(+) io.opentelemetry.sdk.testing.exporter.InMemoryMetricReaderBuilder builder()
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.common.export.MemoryMode getMemoryMode()
+++ NEW CLASS: PUBLIC(+) FINAL(+) io.opentelemetry.sdk.testing.exporter.InMemoryMetricReaderBuilder (not serializable)
+++ CLASS FILE FORMAT VERSION: 52.0 <- n.a.
+++ NEW SUPERCLASS: java.lang.Object
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader build()
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.testing.exporter.InMemoryMetricReaderBuilder setAggregationTemporalitySelector(io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector)
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.testing.exporter.InMemoryMetricReaderBuilder setDefaultAggregationSelector(io.opentelemetry.sdk.metrics.export.DefaultAggregationSelector)
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.testing.exporter.InMemoryMetricReaderBuilder setMemoryMode(io.opentelemetry.sdk.common.export.MemoryMode)
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.common.export;

/** The memory semantics of the SDK. */
public enum MemoryMode {

/**
* Reuses objects to reduce allocations.
*
* <p>In this mode, the SDK reuses objects to reduce allocations, at the expense of disallowing
* concurrent collections / exports.
*/
REUSABLE_DATA,

/**
* Uses immutable data structures.
*
* <p>In this mode, the SDK passes immutable objects to exporters / readers, increasing
* allocations but ensuring safe concurrent exports.
*/
IMMUTABLE_DATA
}
7 changes: 7 additions & 0 deletions sdk/metrics/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ testing {
}
}
}
register<JvmTestSuite>("jmhBasedTest") {
dependencies {
implementation("org.openjdk.jmh:jmh-core")
implementation("org.openjdk.jmh:jmh-generator-bytecode")
annotationProcessor("org.openjdk.jmh:jmh-generator-annprocess")
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.state;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter;
import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;

/**
* Run this through {@link AsynchronousMetricStorageGarbageCollectionBenchmarkTest}, as it runs it
* embedded with the GC profiler which what this test designed for (No need for command line run)
*
* <p>This test creates 10 asynchronous counters (any asynchronous instrument will do as the code
* path is almost the same for all async instrument types), and 1000 attribute sets. Each time the
* test runs, it calls `flush` which effectively calls the callback for each counter. Each such
* callback records a random number for each of the 1000 attribute sets. The result list ends up in
* {@link NoopMetricExporter} which does nothing with it.
*
* <p>This is repeated 100 times, collectively called Operation in the statistics and each such
* operation is repeated 20 times - known as Iteration.
*
* <p>Each such test is repeated, with a brand new JVM, for all combinations of {@link MemoryMode}
* and {@link AggregationTemporality}. This is done since each combination has a different code
* path.
*/
@BenchmarkMode(Mode.SingleShotTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Measurement(iterations = 20, batchSize = 100)
@Warmup(iterations = 10, batchSize = 10)
@Fork(1)
public class AsynchronousMetricStorageGarbageCollectionBenchmark {

@State(value = Scope.Benchmark)
@SuppressWarnings("SystemOut")
public static class ThreadState {
private final int cardinality;
private final int countersCount;
@Param public AggregationTemporality aggregationTemporality;
@Param public MemoryMode memoryMode;
SdkMeterProvider sdkMeterProvider;
private final Random random = new Random();
List<Attributes> attributesList;

/** Creates a ThreadState. */
@SuppressWarnings("unused")
public ThreadState() {
cardinality = 1000;
countersCount = 10;
}

@SuppressWarnings("SpellCheckingInspection")
@Setup
public void setup() {
PeriodicMetricReader metricReader =
PeriodicMetricReader.builder(
// Configure an exporter that configures the temporality and aggregation
// for the test case, but otherwise drops the data on export
new NoopMetricExporter(aggregationTemporality, Aggregation.sum(), memoryMode))
// Effectively disable periodic reading so reading is only done on #flush()
.setInterval(Duration.ofSeconds(Integer.MAX_VALUE))
.build();
SdkMeterProviderBuilder builder = SdkMeterProvider.builder();
SdkMeterProviderUtil.registerMetricReaderWithCardinalitySelector(
builder, metricReader, unused -> cardinality + 1);

attributesList = AttributesGenerator.generate(cardinality);

// Disable examplars
SdkMeterProviderUtil.setExemplarFilter(builder, ExemplarFilter.alwaysOff());

sdkMeterProvider = builder.build();
for (int i = 0; i < countersCount; i++) {
sdkMeterProvider
.get("meter")
.counterBuilder("counter" + i)
.buildWithCallback(
observableLongMeasurement -> {
for (int j = 0; j < attributesList.size(); j++) {
Attributes attributes = attributesList.get(j);
observableLongMeasurement.record(random.nextInt(10_000), attributes);
}
});
}
}

@TearDown
public void tearDown() {
sdkMeterProvider.shutdown().join(10, TimeUnit.SECONDS);
}
}

/**
* Collects all asynchronous instruments metric data.
*
* @param threadState thread-state
*/
@Benchmark
@Threads(value = 1)
public void recordAndCollect(ThreadState threadState) {
threadState.sdkMeterProvider.forceFlush().join(10, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.state;

import static org.assertj.core.api.Assertions.assertThat;

import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.resources.Resource;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.assertj.core.data.Offset;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;
import org.openjdk.jmh.infra.BenchmarkParams;
import org.openjdk.jmh.results.BenchmarkResult;
import org.openjdk.jmh.results.Result;
import org.openjdk.jmh.results.RunResult;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

public class AsynchronousMetricStorageGarbageCollectionBenchmarkTest {

/**
* This test validates that in {@link MemoryMode#REUSABLE_DATA}, {@link
* AsynchronousMetricStorage#collect(Resource, InstrumentationScopeInfo, long, long)} barely
* allocates memory which is then subsequently garbage collected. It is done so comparatively to
* {@link MemoryMode#IMMUTABLE_DATA},
*
* <p>It runs the JMH test {@link AsynchronousMetricStorageGarbageCollectionBenchmark} with GC
* profiler, and measures for each parameter combination the garbage collector normalized rate
* (bytes allocated per Operation).
*
* <p>Memory allocations can be hidden even at an innocent foreach loop on a collection, which
* under the hood allocates an internal object O(N) times. Someone can accidentally refactor such
* loop, resulting in 30% increase of garbage collected objects during a single collect() run.
*/
@SuppressWarnings("rawtypes")
@Test
public void normalizedAllocationRateTest() throws RunnerException {
// GitHub CI has an environment variable (CI=true). We can use it to skip
// this test since it's a lengthy one (roughly 10 seconds) and have it running
// only in GitHub CI
Assumptions.assumeTrue(
"true".equals(System.getenv("CI")),
"This test should only run in GitHub CI since it's long");

// Runs AsynchronousMetricStorageMemoryProfilingBenchmark
// with garbage collection profiler
Options opt =
new OptionsBuilder()
.include(AsynchronousMetricStorageGarbageCollectionBenchmark.class.getSimpleName())
.addProfiler("gc")
.shouldFailOnError(true)
.jvmArgs("-Xmx1500m")
.build();
Collection<RunResult> results = new Runner(opt).run();

// Collect the normalized GC allocation rate per parameters combination
Map<String, Map<String, Double>> resultMap = new HashMap<>();
for (RunResult result : results) {
for (BenchmarkResult benchmarkResult : result.getBenchmarkResults()) {
BenchmarkParams benchmarkParams = benchmarkResult.getParams();

String memoryMode = benchmarkParams.getParam("memoryMode");
String aggregationTemporality = benchmarkParams.getParam("aggregationTemporality");
assertThat(memoryMode).isNotNull();
assertThat(aggregationTemporality).isNotNull();

Map<String, Result> secondaryResults = benchmarkResult.getSecondaryResults();
Result allocRateNorm = secondaryResults.get("gc.alloc.rate.norm");
assertThat(allocRateNorm)
.describedAs("Allocation rate in secondary results: %s", secondaryResults)
.isNotNull();

resultMap
.computeIfAbsent(aggregationTemporality, k -> new HashMap<>())
.put(memoryMode, allocRateNorm.getScore());
}
}

assertThat(resultMap).hasSameSizeAs(AggregationTemporality.values());

// Asserts that reusable data GC allocation rate is a tiny fraction of immutable data
// GC allocation rate
resultMap.forEach(
(aggregationTemporality, memoryModeToAllocRateMap) -> {
Double immutableDataAllocRate =
memoryModeToAllocRateMap.get(MemoryMode.IMMUTABLE_DATA.toString());
Double reusableDataAllocRate =
memoryModeToAllocRateMap.get(MemoryMode.REUSABLE_DATA.toString());

assertThat(immutableDataAllocRate).isNotNull().isNotZero();
assertThat(reusableDataAllocRate).isNotNull().isNotZero();
assertThat(100 - (reusableDataAllocRate / immutableDataAllocRate) * 100)
.isCloseTo(99.8, Offset.offset(2.0));
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.state;

import io.opentelemetry.api.common.Attributes;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Random;

public class AttributesGenerator {

private AttributesGenerator() {}

/**
* Generates a list of unique attributes, with a single attribute key, and random value.
*
* @param uniqueAttributesCount The amount of unique attribute sets to generate
* @return The list of generates {@link Attributes}
*/
public static List<Attributes> generate(int uniqueAttributesCount) {
Random random = new Random();
HashSet<String> attributeSet = new HashSet<>();
ArrayList<Attributes> attributesList = new ArrayList<>(uniqueAttributesCount);
String last = "aaaaaaaaaaaaaaaaaaaaaaaaaa";
for (int i = 0; i < uniqueAttributesCount; i++) {
char[] chars = last.toCharArray();
int attempts = 0;
do {
chars[random.nextInt(last.length())] = (char) (random.nextInt(26) + 'a');
} while (attributeSet.contains(new String(chars)) && ++attempts < 1000);
if (attributeSet.contains(new String(chars))) {
throw new IllegalStateException("Couldn't create new random attributes");
}
last = new String(chars);
attributesList.add(Attributes.builder().put("key", last).build());
attributeSet.add(last);
}

return attributesList;
}
}
Loading

0 comments on commit b0c337b

Please sign in to comment.