Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix buffer sync logic using modern concurrency primitives #991

Merged
merged 5 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions benchmarks/pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
Expand Down Expand Up @@ -51,9 +52,9 @@
<version>${simpleclient.version}</version>
</dependency>
<dependency>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>${codahale.version}</version>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>${codahale.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
Expand All @@ -74,6 +75,25 @@
<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<compilerArgs>
<!-- need to add parameters to prevent inheritance -->
<arg>-parameters</arg>
</compilerArgs>
<annotationProcessorPaths>
<path>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>${jmh.version}</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
Expand All @@ -86,7 +106,8 @@
<configuration>
<finalName>benchmarks</finalName>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>io.prometheus.metrics.benchmarks.BenchmarkRunner</mainClass>
</transformer>
</transformers>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,21 @@
import org.openjdk.jmh.annotations.Threads;

/**
* Results on a machine with dedicated 8 vCPU cores:
* Results on a machine with dedicated Core i7 1265U:
*
* <pre>
*
* Benchmark Mode Cnt Score Error Units
* CounterBenchmark.codahaleIncNoLabels thrpt 25 25761.677 ± 122.947 ops/s
* CounterBenchmark.openTelemetryAdd thrpt 25 545.026 ± 33.913 ops/s
* CounterBenchmark.openTelemetryInc thrpt 25 550.577 ± 45.415 ops/s
* CounterBenchmark.openTelemetryIncNoLabels thrpt 25 527.638 ± 32.020 ops/s
* CounterBenchmark.prometheusAdd thrpt 25 20341.474 ± 40.973 ops/s
* CounterBenchmark.prometheusInc thrpt 25 26414.616 ± 96.666 ops/s
* CounterBenchmark.prometheusNoLabelsInc thrpt 25 26177.676 ± 120.342 ops/s
* CounterBenchmark.simpleclientAdd thrpt 25 5503.867 ± 161.313 ops/s
* CounterBenchmark.simpleclientInc thrpt 25 5568.125 ± 53.291 ops/s
* CounterBenchmark.simpleclientNoLabelsInc thrpt 25 5394.692 ± 130.531 ops/s
* CounterBenchmark.codahaleIncNoLabels thrpt 25 32969.795 ± 1547.775 ops/s
* CounterBenchmark.openTelemetryAdd thrpt 25 747.068 ± 93.128 ops/s
* CounterBenchmark.openTelemetryInc thrpt 25 760.784 ± 47.595 ops/s
* CounterBenchmark.openTelemetryIncNoLabels thrpt 25 824.346 ± 45.131 ops/s
* CounterBenchmark.prometheusAdd thrpt 25 28403.000 ± 250.774 ops/s
* CounterBenchmark.prometheusInc thrpt 25 38368.142 ± 361.914 ops/s
* CounterBenchmark.prometheusNoLabelsInc thrpt 25 35558.069 ± 4020.926 ops/s
* CounterBenchmark.simpleclientAdd thrpt 25 4081.152 ± 620.094 ops/s
* CounterBenchmark.simpleclientInc thrpt 25 5735.644 ± 1205.329 ops/s
* CounterBenchmark.simpleclientNoLabelsInc thrpt 25 6852.563 ± 544.481 ops/s
* </pre>
*
* Prometheus counters are faster than counters of other libraries. For example, incrementing a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@
import org.openjdk.jmh.annotations.Threads;

/**
* Results on a machine with dedicated 8 vCPU cores:
* Results on a machine with dedicated Core i7 1265U:
*
* <pre>
* Benchmark Mode Cnt Score Error Units
* HistogramBenchmark.openTelemetryClassic thrpt 25 258.660 ± 6.736 ops/s
* HistogramBenchmark.openTelemetryExponential thrpt 25 210.963 ± 11.288 ops/s
* HistogramBenchmark.prometheusClassic thrpt 25 1528.871 ± 43.598 ops/s
* HistogramBenchmark.prometheusNative thrpt 25 1282.643 ± 110.210 ops/s
* HistogramBenchmark.simpleclient thrpt 25 3376.016 ± 173.545 ops/s
* HistogramBenchmark.openTelemetryClassic thrpt 25 390.982 ± 16.058 ops/s
* HistogramBenchmark.openTelemetryExponential thrpt 25 320.160 ± 18.056 ops/s
* HistogramBenchmark.prometheusClassic thrpt 25 2385.862 ± 34.766 ops/s
* HistogramBenchmark.prometheusNative thrpt 25 1947.371 ± 48.193 ops/s
* HistogramBenchmark.simpleclient thrpt 25 4324.961 ± 50.938 ops/s
* </pre>
*
* The simpleclient (i.e. client_java version 0.16.0 and older) histograms perform about the same as
Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@
<name>Gregor Zeitlinger</name>
<email>gregor.zeitlinger@grafana.com</email>
</developer>
<developer>
<id>dhoard</id>
<name>Doug Hoard</name>
<email>doug.hoard@gmail.com</email>
</developer>
</developers>

<modules>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import io.prometheus.metrics.model.snapshots.DataPointSnapshot;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
Expand All @@ -15,17 +17,19 @@
*/
class Buffer {

private static final long signBit = 1L << 63;
private static final long bufferActiveBit = 1L << 63;
private final AtomicLong observationCount = new AtomicLong(0);
private double[] observationBuffer = new double[0];
private int bufferPos = 0;
private boolean reset = false;
private final Object appendLock = new Object();
private final Object runLock = new Object();

ReentrantLock appendLock = new ReentrantLock();
ReentrantLock runLock = new ReentrantLock();
Condition bufferFilled = appendLock.newCondition();

boolean append(double value) {
long count = observationCount.incrementAndGet();
if ((count & signBit) == 0) {
if ((count & bufferActiveBit) == 0) {
return false; // sign bit not set -> buffer not active.
} else {
doAppend(value);
Expand All @@ -34,12 +38,17 @@ boolean append(double value) {
}

private void doAppend(double amount) {
synchronized (appendLock) {
appendLock.lock();
try {
if (bufferPos >= observationBuffer.length) {
observationBuffer = Arrays.copyOf(observationBuffer, observationBuffer.length + 128);
}
observationBuffer[bufferPos] = amount;
bufferPos++;

bufferFilled.signalAll();
} finally {
appendLock.unlock();
}
}

Expand All @@ -48,33 +57,55 @@ void reset() {
reset = true;
}

@SuppressWarnings("ThreadPriorityCheck")
<T extends DataPointSnapshot> T run(
Function<Long, Boolean> complete, Supplier<T> runnable, Consumer<Double> observeFunction) {
Function<Long, Boolean> complete,
Supplier<T> createResult,
Consumer<Double> observeFunction) {
double[] buffer;
int bufferSize;
T result;
synchronized (runLock) {
long count = observationCount.getAndAdd(signBit);
while (!complete.apply(count)) {
Thread.yield();
}
result = runnable.get();
int expectedBufferSize;
if (reset) {
expectedBufferSize = (int) ((observationCount.getAndSet(0) & ~signBit) - count);
reset = false;
} else {
expectedBufferSize = (int) (observationCount.addAndGet(signBit) - count);
}
while (bufferPos != expectedBufferSize) {
Thread.yield();

runLock.lock();
try {
// Signal that the buffer is active.
Long expectedCount = observationCount.getAndAdd(bufferActiveBit);

appendLock.lock();
try {
while (!complete.apply(expectedCount)) {
// Wait until all in-flight threads have added their observations to the buffer.
bufferFilled.await();
}
result = createResult.get();

// Signal that the buffer is inactive.
int expectedBufferSize;
if (reset) {
expectedBufferSize =
(int) ((observationCount.getAndSet(0) & ~bufferActiveBit) - expectedCount);
reset = false;
} else {
expectedBufferSize = (int) (observationCount.addAndGet(bufferActiveBit) - expectedCount);
}

while (bufferPos < expectedBufferSize) {
// Wait until all in-flight threads have added their observations to the buffer.
bufferFilled.await();
}
} finally {
appendLock.unlock();
}

buffer = observationBuffer;
bufferSize = bufferPos;
observationBuffer = new double[0];
bufferPos = 0;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
runLock.unlock();
}

for (int i = 0; i < bufferSize; i++) {
observeFunction.accept(buffer[i]);
}
Expand Down