Skip to content

Commit

Permalink
services, xds, orca: LRS named metrics support (#10282)
Browse files Browse the repository at this point in the history
Implements gRFC A64: xDS LRS Custom Metrics Support
  • Loading branch information
danielzhaotongliu authored Jul 11, 2023
1 parent 8dbd47c commit 4fa2814
Show file tree
Hide file tree
Showing 17 changed files with 343 additions and 58 deletions.
30 changes: 29 additions & 1 deletion services/src/main/java/io/grpc/services/CallMetricRecorder.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public final class CallMetricRecorder {
new AtomicReference<>();
private final AtomicReference<ConcurrentHashMap<String, Double>> requestCostMetrics =
new AtomicReference<>();
private final AtomicReference<ConcurrentHashMap<String, Double>> namedMetrics =
new AtomicReference<>();
private double cpuUtilizationMetric = 0;
private double applicationUtilizationMetric = 0;
private double memoryUtilizationMetric = 0;
Expand Down Expand Up @@ -127,6 +129,27 @@ public CallMetricRecorder recordRequestCostMetric(String name, double value) {
return this;
}

/**
* Records an application-specific opaque custom metric measurement. If RPC has already finished,
* this method is no-op.
*
* <p>A latter record will overwrite its former name-sakes.
*
* @return this recorder object
*/
public CallMetricRecorder recordNamedMetric(String name, double value) {
if (disabled) {
return this;
}
if (namedMetrics.get() == null) {
// The chance of race of creation of the map should be very small, so it should be fine
// to create these maps that might be discarded.
namedMetrics.compareAndSet(null, new ConcurrentHashMap<String, Double>());
}
namedMetrics.get().put(name, value);
return this;
}

/**
* Records a call metric measurement for CPU utilization in the range [0, inf). Values outside the
* valid range are ignored. If RPC has already finished, this method is no-op.
Expand Down Expand Up @@ -235,12 +258,17 @@ Map<String, Double> finalizeAndDump() {
MetricReport finalizeAndDump2() {
Map<String, Double> savedRequestCostMetrics = finalizeAndDump();
Map<String, Double> savedUtilizationMetrics = utilizationMetrics.get();
Map<String, Double> savedNamedMetrics = namedMetrics.get();
if (savedUtilizationMetrics == null) {
savedUtilizationMetrics = Collections.emptyMap();
}
if (savedNamedMetrics == null) {
savedNamedMetrics = Collections.emptyMap();
}
return new MetricReport(cpuUtilizationMetric, applicationUtilizationMetric,
memoryUtilizationMetric, qps, eps, Collections.unmodifiableMap(savedRequestCostMetrics),
Collections.unmodifiableMap(savedUtilizationMetrics)
Collections.unmodifiableMap(savedUtilizationMetrics),
Collections.unmodifiableMap(savedNamedMetrics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ public static MetricReport finalizeAndDump2(CallMetricRecorder recorder) {

public static MetricReport createMetricReport(double cpuUtilization,
double applicationUtilization, double memoryUtilization, double qps, double eps,
Map<String, Double> requestCostMetrics, Map<String, Double> utilizationMetrics) {
Map<String, Double> requestCostMetrics, Map<String, Double> utilizationMetrics,
Map<String, Double> namedMetrics) {
return new MetricReport(cpuUtilization, applicationUtilization, memoryUtilization, qps, eps,
requestCostMetrics, utilizationMetrics);
requestCostMetrics, utilizationMetrics, namedMetrics);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,6 @@ public void clearEpsMetric() {

MetricReport getMetricReport() {
return new MetricReport(cpuUtilization, applicationUtilization, memoryUtilization, qps, eps,
Collections.emptyMap(), Collections.unmodifiableMap(metricsData));
Collections.emptyMap(), Collections.unmodifiableMap(metricsData), Collections.emptyMap());
}
}
9 changes: 8 additions & 1 deletion services/src/main/java/io/grpc/services/MetricReport.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,19 @@ public final class MetricReport {
private double eps;
private Map<String, Double> requestCostMetrics;
private Map<String, Double> utilizationMetrics;
private Map<String, Double> namedMetrics;

MetricReport(double cpuUtilization, double applicationUtilization, double memoryUtilization,
double qps, double eps, Map<String, Double> requestCostMetrics,
Map<String, Double> utilizationMetrics) {
Map<String, Double> utilizationMetrics, Map<String, Double> namedMetrics) {
this.cpuUtilization = cpuUtilization;
this.applicationUtilization = applicationUtilization;
this.memoryUtilization = memoryUtilization;
this.qps = qps;
this.eps = eps;
this.requestCostMetrics = checkNotNull(requestCostMetrics, "requestCostMetrics");
this.utilizationMetrics = checkNotNull(utilizationMetrics, "utilizationMetrics");
this.namedMetrics = checkNotNull(namedMetrics, "namedMetrics");
}

public double getCpuUtilization() {
Expand All @@ -68,6 +70,10 @@ public Map<String, Double> getUtilizationMetrics() {
return utilizationMetrics;
}

public Map<String, Double> getNamedMetrics() {
return namedMetrics;
}

public double getQps() {
return qps;
}
Expand All @@ -84,6 +90,7 @@ public String toString() {
.add("memoryUtilization", memoryUtilization)
.add("requestCost", requestCostMetrics)
.add("utilization", utilizationMetrics)
.add("named", namedMetrics)
.add("qps", qps)
.add("eps", eps)
.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public void dumpDumpsAllSavedMetricValues() {
recorder.recordRequestCostMetric("cost1", 37465.12);
recorder.recordRequestCostMetric("cost2", 10293.0);
recorder.recordRequestCostMetric("cost3", 1.0);
recorder.recordNamedMetric("named1", 0.2233);
recorder.recordNamedMetric("named2", -1.618);
recorder.recordNamedMetric("named3", 3.1415926535);
recorder.recordCpuUtilizationMetric(0.1928);
recorder.recordApplicationUtilizationMetric(0.9987);
recorder.recordMemoryUtilizationMetric(0.474);
Expand All @@ -55,13 +58,18 @@ public void dumpDumpsAllSavedMetricValues() {
.containsExactly("util1", 0.154353423, "util2", 0.1367, "util3", 0.143734);
Truth.assertThat(dump.getRequestCostMetrics())
.containsExactly("cost1", 37465.12, "cost2", 10293.0, "cost3", 1.0);
Truth.assertThat(dump.getNamedMetrics())
.containsExactly("named1", 0.2233, "named2", -1.618, "named3", 3.1415926535);
Truth.assertThat(dump.getCpuUtilization()).isEqualTo(0.1928);
Truth.assertThat(dump.getApplicationUtilization()).isEqualTo(0.9987);
Truth.assertThat(dump.getMemoryUtilization()).isEqualTo(0.474);
Truth.assertThat(dump.getQps()).isEqualTo(2522.54);
Truth.assertThat(dump.getEps()).isEqualTo(1.618);
Truth.assertThat(dump.toString()).contains("eps=1.618");
Truth.assertThat(dump.toString()).contains("applicationUtilization=0.9987");
Truth.assertThat(dump.toString()).contains("named1=0.2233");
Truth.assertThat(dump.toString()).contains("named2=-1.618");
Truth.assertThat(dump.toString()).contains("named3=3.1415926535");
}

@Test
Expand All @@ -71,6 +79,7 @@ public void noMetricsRecordedAfterSnapshot() {
recorder.recordUtilizationMetric("cost", 0.154353423);
recorder.recordQpsMetric(3.14159);
recorder.recordEpsMetric(1.618);
recorder.recordNamedMetric("named1", 2.718);
assertThat(recorder.finalizeAndDump()).isEqualTo(initDump);
}

Expand Down Expand Up @@ -121,6 +130,9 @@ public void lastValueWinForMetricsWithSameName() {
recorder.recordUtilizationMetric("util1", 0.2837421);
recorder.recordMemoryUtilizationMetric(0.93840);
recorder.recordUtilizationMetric("util1", 0.843233);
recorder.recordNamedMetric("named1", 0.2233);
recorder.recordNamedMetric("named2", 2.718);
recorder.recordNamedMetric("named1", 3.1415926535);
recorder.recordQpsMetric(1928.3);
recorder.recordQpsMetric(100.8);
recorder.recordEpsMetric(3.14159);
Expand All @@ -133,6 +145,8 @@ public void lastValueWinForMetricsWithSameName() {
Truth.assertThat(dump.getMemoryUtilization()).isEqualTo(0.93840);
Truth.assertThat(dump.getUtilizationMetrics())
.containsExactly("util1", 0.843233);
Truth.assertThat(dump.getNamedMetrics())
.containsExactly("named1", 3.1415926535, "named2", 2.718);
Truth.assertThat(dump.getCpuUtilization()).isEqualTo(0);
Truth.assertThat(dump.getQps()).isEqualTo(100.8);
Truth.assertThat(dump.getEps()).isEqualTo(1.618);
Expand Down
25 changes: 24 additions & 1 deletion xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.grpc.Status;
import io.grpc.internal.ForwardingClientStreamTracer;
import io.grpc.internal.ObjectPool;
import io.grpc.services.MetricReport;
import io.grpc.util.ForwardingLoadBalancerHelper;
import io.grpc.util.ForwardingSubchannel;
import io.grpc.util.GracefulSwitchLoadBalancer;
Expand All @@ -47,6 +48,8 @@
import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import io.grpc.xds.internal.security.SslContextProviderSupplier;
import io.grpc.xds.orca.OrcaPerRequestUtil;
import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaPerRequestReportListener;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -329,7 +332,9 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
if (stats != null) {
ClientStreamTracer.Factory tracerFactory = new CountingStreamTracerFactory(
stats, inFlights, result.getStreamTracerFactory());
return PickResult.withSubchannel(result.getSubchannel(), tracerFactory);
ClientStreamTracer.Factory orcaTracerFactory = OrcaPerRequestUtil.getInstance()
.newOrcaClientStreamTracerFactory(tracerFactory, new OrcaPerRpcListener(stats));
return PickResult.withSubchannel(result.getSubchannel(), orcaTracerFactory);
}
}
return result;
Expand Down Expand Up @@ -386,4 +391,22 @@ public void streamClosed(Status status) {
};
}
}

private static final class OrcaPerRpcListener implements OrcaPerRequestReportListener {

private final ClusterLocalityStats stats;

private OrcaPerRpcListener(ClusterLocalityStats stats) {
this.stats = checkNotNull(stats, "stats");
}

/**
* Copies {@link MetricReport#getNamedMetrics()} to {@link ClusterLocalityStats} such that it is
* included in the snapshot for the LRS report sent to the LRS server.
*/
@Override
public void onLoadReport(MetricReport report) {
stats.recordBackendLoadMetricStats(report.getNamedMetrics());
}
}
}
12 changes: 11 additions & 1 deletion xds/src/main/java/io/grpc/xds/LoadReportClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -361,7 +362,16 @@ private io.envoyproxy.envoy.config.endpoint.v3.ClusterStats buildClusterStats(
.setTotalSuccessfulRequests(upstreamLocalityStats.totalSuccessfulRequests())
.setTotalErrorRequests(upstreamLocalityStats.totalErrorRequests())
.setTotalRequestsInProgress(upstreamLocalityStats.totalRequestsInProgress())
.setTotalIssuedRequests(upstreamLocalityStats.totalIssuedRequests()));
.setTotalIssuedRequests(upstreamLocalityStats.totalIssuedRequests())
.addAllLoadMetricStats(
upstreamLocalityStats.loadMetricStatsMap().entrySet().stream().map(
e -> io.envoyproxy.envoy.config.endpoint.v3.EndpointLoadMetricStats.newBuilder()
.setMetricName(e.getKey())
.setNumRequestsFinishedWithMetric(
e.getValue().numRequestsFinishedWithMetric())
.setTotalMetricValue(e.getValue().totalMetricValue())
.build())
.collect(Collectors.toList())));
}
for (DroppedRequests droppedRequests : stats.droppedRequestsList()) {
builder.addDroppedRequests(
Expand Down
33 changes: 30 additions & 3 deletions xds/src/main/java/io/grpc/xds/LoadStatsManager2.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.base.Supplier;
import com.google.common.collect.Sets;
import io.grpc.Status;
import io.grpc.xds.Stats.BackendLoadMetricStats;
import io.grpc.xds.Stats.ClusterStats;
import io.grpc.xds.Stats.DroppedRequests;
import io.grpc.xds.Stats.UpstreamLocalityStats;
Expand Down Expand Up @@ -197,7 +198,7 @@ synchronized List<ClusterStats> getClusterStatsReports(String cluster) {
}
UpstreamLocalityStats upstreamLocalityStats = UpstreamLocalityStats.create(
locality, snapshot.callsIssued, snapshot.callsSucceeded, snapshot.callsFailed,
snapshot.callsInProgress);
snapshot.callsInProgress, snapshot.loadMetricStatsMap);
builder.addUpstreamLocalityStats(upstreamLocalityStats);
// Use the max (drops/loads) recording interval as the overall interval for the
// cluster's stats. In general, they should be mostly identical.
Expand Down Expand Up @@ -322,6 +323,7 @@ final class ClusterLocalityStats {
private final AtomicLong callsSucceeded = new AtomicLong();
private final AtomicLong callsFailed = new AtomicLong();
private final AtomicLong callsIssued = new AtomicLong();
private Map<String, BackendLoadMetricStats> loadMetricStatsMap = new HashMap<>();

private ClusterLocalityStats(
String clusterName, @Nullable String edsServiceName, Locality locality,
Expand Down Expand Up @@ -353,6 +355,23 @@ void recordCallFinished(Status status) {
}
}

/**
* Records all custom named backend load metric stats for per-call load reporting. For each
* metric key {@code name}, creates a new {@link BackendLoadMetricStats} with a finished
* requests counter of 1 and the {@code value} if the key is not present in the map. Otherwise,
* increments the finished requests counter and adds the {@code value} to the existing
* {@link BackendLoadMetricStats}.
*/
synchronized void recordBackendLoadMetricStats(Map<String, Double> namedMetrics) {
namedMetrics.forEach((name, value) -> {
if (!loadMetricStatsMap.containsKey(name)) {
loadMetricStatsMap.put(name, new BackendLoadMetricStats(1, value));
} else {
loadMetricStatsMap.get(name).addMetricValueAndIncrementRequestsFinished(value);
}
});
}

/**
* Release the <i>hard</i> reference for this stats object (previously obtained via {@link
* LoadStatsManager2#getClusterLocalityStats}). The object may still be
Expand All @@ -367,8 +386,13 @@ void release() {
private ClusterLocalityStatsSnapshot snapshot() {
long duration = stopwatch.elapsed(TimeUnit.NANOSECONDS);
stopwatch.reset().start();
Map<String, BackendLoadMetricStats> loadMetricStatsMapCopy;
synchronized (this) {
loadMetricStatsMapCopy = Collections.unmodifiableMap(loadMetricStatsMap);
loadMetricStatsMap = new HashMap<>();
}
return new ClusterLocalityStatsSnapshot(callsSucceeded.getAndSet(0), callsInProgress.get(),
callsFailed.getAndSet(0), callsIssued.getAndSet(0), duration);
callsFailed.getAndSet(0), callsIssued.getAndSet(0), duration, loadMetricStatsMapCopy);
}
}

Expand All @@ -378,15 +402,18 @@ private static final class ClusterLocalityStatsSnapshot {
private final long callsFailed;
private final long callsIssued;
private final long durationNano;
private final Map<String, BackendLoadMetricStats> loadMetricStatsMap;

private ClusterLocalityStatsSnapshot(
long callsSucceeded, long callsInProgress, long callsFailed, long callsIssued,
long durationNano) {
long durationNano, Map<String, BackendLoadMetricStats> loadMetricStatsMap) {
this.callsSucceeded = callsSucceeded;
this.callsInProgress = callsInProgress;
this.callsFailed = callsFailed;
this.callsIssued = callsIssued;
this.durationNano = durationNano;
this.loadMetricStatsMap = Collections.unmodifiableMap(
checkNotNull(loadMetricStatsMap, "loadMetricStatsMap"));
}
}
}
41 changes: 39 additions & 2 deletions xds/src/main/java/io/grpc/xds/Stats.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
import javax.annotation.Nullable;

/** Represents client load stats. */
Expand Down Expand Up @@ -101,10 +103,45 @@ abstract static class UpstreamLocalityStats {

abstract long totalRequestsInProgress();

abstract ImmutableMap<String, BackendLoadMetricStats> loadMetricStatsMap();

static UpstreamLocalityStats create(Locality locality, long totalIssuedRequests,
long totalSuccessfulRequests, long totalErrorRequests, long totalRequestsInProgress) {
long totalSuccessfulRequests, long totalErrorRequests, long totalRequestsInProgress,
Map<String, BackendLoadMetricStats> loadMetricStatsMap) {
return new AutoValue_Stats_UpstreamLocalityStats(locality, totalIssuedRequests,
totalSuccessfulRequests, totalErrorRequests, totalRequestsInProgress);
totalSuccessfulRequests, totalErrorRequests, totalRequestsInProgress,
ImmutableMap.copyOf(loadMetricStatsMap));
}
}

/**
* Load metric stats for multi-dimensional load balancing.
*/
static final class BackendLoadMetricStats {

private long numRequestsFinishedWithMetric;
private double totalMetricValue;

BackendLoadMetricStats(long numRequestsFinishedWithMetric, double totalMetricValue) {
this.numRequestsFinishedWithMetric = numRequestsFinishedWithMetric;
this.totalMetricValue = totalMetricValue;
}

public long numRequestsFinishedWithMetric() {
return numRequestsFinishedWithMetric;
}

public double totalMetricValue() {
return totalMetricValue;
}

/**
* Adds the given {@code metricValue} and increments the number of requests finished counter for
* the existing {@link BackendLoadMetricStats}.
*/
public void addMetricValueAndIncrementRequestsFinished(double metricValue) {
numRequestsFinishedWithMetric += 1;
totalMetricValue += metricValue;
}
}
}
Loading

0 comments on commit 4fa2814

Please sign in to comment.