Skip to content

Commit

Permalink
feat: update tracers to use built in metrics (#1244)
Browse files Browse the repository at this point in the history
* feat: add built in metrics measure and views

* remove status from application latency

* Rename methods and add comments

* update based on comments

* feat: update tracers to use built in metrics

* update on comments

* make stopwatch thread safe

* update comments

* calculate application latency correctly

* remove unused check

* clean up tests

* fix typo

* update test

* fix flaky test

* fix retry count
  • Loading branch information
mutianf authored and mpeddada1 committed Jul 18, 2022
1 parent 55c6499 commit 3d0da81
Show file tree
Hide file tree
Showing 15 changed files with 877 additions and 86 deletions.
4 changes: 4 additions & 0 deletions google-cloud-bigtable/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@
<!-- NOTE: Dependencies are organized into two groups, production and test.
Within a group, dependencies are sorted by (groupId, artifactId) -->

<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable-stats</artifactId>
</dependency>
<!-- Production dependencies -->
<dependency>
<groupId>com.google.api</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import com.google.cloud.bigtable.data.v2.models.RowAdapter;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.CompositeTracerFactory;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.HeaderTracerUnaryCallable;
Expand Down Expand Up @@ -194,6 +195,12 @@ public static EnhancedBigtableStubSettings finalizeSettings(
RpcMeasureConstants.BIGTABLE_APP_PROFILE_ID,
TagValue.create(settings.getAppProfileId()))
.build();
ImmutableMap<String, String> builtinAttributes =
ImmutableMap.<String, String>builder()
.put("project_id", settings.getProjectId())
.put("instance_id", settings.getInstanceId())
.put("app_profile", settings.getAppProfileId())
.build();
// Inject Opencensus instrumentation
builder.setTracerFactory(
new CompositeTracerFactory(
Expand All @@ -218,6 +225,7 @@ public static EnhancedBigtableStubSettings finalizeSettings(
.build()),
// Add OpenCensus Metrics
MetricsTracerFactory.create(tagger, stats, attributes),
BuiltinMetricsTracerFactory.create(builtinAttributes),
// Add user configured tracer
settings.getTracerFactory())));
return builder.build();
Expand Down Expand Up @@ -466,7 +474,7 @@ private <RowT> UnaryCallable<Query, List<RowT>> createBulkReadRowsCallable(
new TracedBatcherUnaryCallable<>(readRowsUserCallable.all());

UnaryCallable<Query, List<RowT>> withHeaderTracer =
new HeaderTracerUnaryCallable(tracedBatcher);
new HeaderTracerUnaryCallable<>(tracedBatcher);

UnaryCallable<Query, List<RowT>> traced =
new TracedUnaryCallable<>(withHeaderTracer, clientContext.getTracerFactory(), span);
Expand Down Expand Up @@ -594,11 +602,11 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {

SpanName spanName = getSpanName("MutateRows");

UnaryCallable<BulkMutation, Void> tracedBatcher = new TracedBatcherUnaryCallable<>(userFacing);
UnaryCallable<BulkMutation, Void> tracedBatcherUnaryCallable =
new TracedBatcherUnaryCallable<>(userFacing);

UnaryCallable<BulkMutation, Void> withHeaderTracer =
new HeaderTracerUnaryCallable<>(tracedBatcher);

new HeaderTracerUnaryCallable<>(tracedBatcherUnaryCallable);
UnaryCallable<BulkMutation, Void> traced =
new TracedUnaryCallable<>(withHeaderTracer, clientContext.getTracerFactory(), spanName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* A Bigtable specific {@link ApiTracer} that includes additional contexts. This class is a base
* implementation that does nothing.
*/
@BetaApi("This surface is stable yet it might be removed in the future.")
@BetaApi("This surface is not stable yet it might be removed in the future.")
public class BigtableTracer extends BaseApiTracer {

private volatile int attempt = 0;
Expand All @@ -35,6 +35,23 @@ public void attemptStarted(int attemptNumber) {
this.attempt = attemptNumber;
}

/** annotate when onRequest is called. This will be called in BuiltinMetricsTracer. */
public void onRequest(int requestCount) {
// noop
}

/**
* annotate when automatic flow control is disabled. This will be called in BuiltinMetricsTracer.
*/
public void disableFlowControl() {
// noop
}

/** annotate after the callback from onResponse. This will be called in BuiltinMetricsTracer. */
public void afterResponse(long applicationLatency) {
// noop
}

/**
* Get the attempt number of the current call. Attempt number for the current call is passed in
* and should be recorded in {@link #attemptStarted(int)}. With the getter we can access it from
Expand All @@ -57,4 +74,12 @@ public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwa
public void batchRequestThrottled(long throttledTimeMs) {
// noop
}

/**
* Set the Bigtable zone and cluster so metrics can be tagged with location information. This will
* be called in BuiltinMetricsTracer.
*/
public void setLocations(String zone, String cluster) {
// noop
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.stub.metrics;

import static com.google.api.gax.tracing.ApiTracerFactory.OperationType;

import com.google.api.gax.tracing.SpanName;
import com.google.cloud.bigtable.stats.StatsRecorderWrapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.math.IntMath;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/**
* A {@link BigtableTracer} that records built-in metrics and publish under the
* bigtable.googleapis.com/client namespace
*/
class BuiltinMetricsTracer extends BigtableTracer {

private final StatsRecorderWrapper recorder;

private final OperationType operationType;
private final SpanName spanName;

// Operation level metrics
private final AtomicBoolean opFinished = new AtomicBoolean();
private final Stopwatch operationTimer = Stopwatch.createStarted();
private final Stopwatch firstResponsePerOpTimer = Stopwatch.createStarted();

// Attempt level metrics
private int attemptCount = 0;
private Stopwatch attemptTimer;
private volatile int attempt = 0;

// Total server latency needs to be atomic because it's accessed from different threads. E.g.
// request() from user thread and attempt failed from grpc thread. We're only measuring the extra
// time application spent blocking grpc buffer, which will be operationLatency - serverLatency.
private final AtomicLong totalServerLatency = new AtomicLong(0);
// Stopwatch is not thread safe so this is a workaround to check if the stopwatch changes is
// flushed to memory.
private final Stopwatch serverLatencyTimer = Stopwatch.createUnstarted();
private final AtomicBoolean serverLatencyTimerIsRunning = new AtomicBoolean();

private boolean flowControlIsDisabled = false;

private AtomicInteger requestLeft = new AtomicInteger(0);

// Monitored resource labels
private String tableId = "undefined";
private String zone = "undefined";
private String cluster = "undefined";

// gfe stats
private AtomicLong gfeMissingHeaders = new AtomicLong(0);

@VisibleForTesting
BuiltinMetricsTracer(
OperationType operationType, SpanName spanName, StatsRecorderWrapper recorder) {
this.operationType = operationType;
this.spanName = spanName;
this.recorder = recorder;
}

@Override
public Scope inScope() {
return new Scope() {
@Override
public void close() {}
};
}

@Override
public void operationSucceeded() {
recordOperationCompletion(null);
}

@Override
public void operationCancelled() {
recordOperationCompletion(new CancellationException());
}

@Override
public void operationFailed(Throwable error) {
recordOperationCompletion(error);
}

@Override
public void attemptStarted(int attemptNumber) {
attemptStarted(null, attemptNumber);
}

@Override
public void attemptStarted(Object request, int attemptNumber) {
this.attempt = attemptNumber;
attemptCount++;
attemptTimer = Stopwatch.createStarted();
if (request != null) {
this.tableId = Util.extractTableId(request);
}
if (!flowControlIsDisabled) {
if (serverLatencyTimerIsRunning.compareAndSet(false, true)) {
serverLatencyTimer.start();
}
}
}

@Override
public void attemptSucceeded() {
recordAttemptCompletion(null);
}

@Override
public void attemptCancelled() {
recordAttemptCompletion(new CancellationException());
}

@Override
public void attemptFailed(Throwable error, Duration delay) {
recordAttemptCompletion(error);
}

@Override
public void onRequest(int requestCount) {
requestLeft.accumulateAndGet(requestCount, IntMath::saturatedAdd);
if (flowControlIsDisabled) {
// On request is only called when auto flow control is disabled. When auto flow control is
// disabled, server latency is measured between onRequest and onResponse.
if (serverLatencyTimerIsRunning.compareAndSet(false, true)) {
serverLatencyTimer.start();
}
}
}

@Override
public void responseReceived() {
// When auto flow control is enabled, server latency is measured between afterResponse and
// responseReceived.
// When auto flow control is disabled, server latency is measured between onRequest and
// responseReceived.
// When auto flow control is disabled and application requested multiple responses, server
// latency is measured between afterResponse and responseReceived.
// In all the cases, we want to stop the serverLatencyTimer here.
if (serverLatencyTimerIsRunning.compareAndSet(true, false)) {
totalServerLatency.addAndGet(serverLatencyTimer.elapsed(TimeUnit.MILLISECONDS));
serverLatencyTimer.reset();
}
}

@Override
public void afterResponse(long applicationLatency) {
if (!flowControlIsDisabled || requestLeft.decrementAndGet() > 0) {
// When auto flow control is enabled, request will never be called, so server latency is
// measured between after the last response is processed and before the next response is
// received. If flow control is disabled but requestLeft is greater than 0,
// also start the timer to count the time between afterResponse and responseReceived.
if (serverLatencyTimerIsRunning.compareAndSet(false, true)) {
serverLatencyTimer.start();
}
}
}

@Override
public int getAttempt() {
return attempt;
}

@Override
public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwable) {
// Record the metrics and put in the map after the attempt is done, so we can have cluster and
// zone information
if (latency != null) {
recorder.putGfeLatencies(latency);
} else {
gfeMissingHeaders.incrementAndGet();
}
recorder.putGfeMissingHeaders(gfeMissingHeaders.get());
}

@Override
public void setLocations(String zone, String cluster) {
this.zone = zone;
this.cluster = cluster;
}

@Override
public void batchRequestThrottled(long throttledTimeMs) {
recorder.putBatchRequestThrottled(throttledTimeMs);
}

@Override
public void disableFlowControl() {
flowControlIsDisabled = true;
}

private void recordOperationCompletion(@Nullable Throwable status) {
if (!opFinished.compareAndSet(false, true)) {
return;
}
operationTimer.stop();
long operationLatency = operationTimer.elapsed(TimeUnit.MILLISECONDS);

recorder.putRetryCount(attemptCount - 1);

// serverLatencyTimer should already be stopped in recordAttemptCompletion
recorder.putOperationLatencies(operationLatency);
recorder.putApplicationLatencies(operationLatency - totalServerLatency.get());

if (operationType == OperationType.ServerStreaming
&& spanName.getMethodName().equals("ReadRows")) {
recorder.putFirstResponseLatencies(firstResponsePerOpTimer.elapsed(TimeUnit.MILLISECONDS));
}

recorder.record(Util.extractStatus(status), tableId, zone, cluster);
}

private void recordAttemptCompletion(@Nullable Throwable status) {
// If the attempt failed, the time spent in retry should be counted in application latency.
// Stop the stopwatch and decrement requestLeft.
if (serverLatencyTimerIsRunning.compareAndSet(true, false)) {
requestLeft.decrementAndGet();
totalServerLatency.addAndGet(serverLatencyTimer.elapsed(TimeUnit.MILLISECONDS));
serverLatencyTimer.reset();
}
recorder.putAttemptLatencies(attemptTimer.elapsed(TimeUnit.MILLISECONDS));
recorder.record(Util.extractStatus(status), tableId, zone, cluster);
}
}
Loading

0 comments on commit 3d0da81

Please sign in to comment.