Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: reset a measure map everytime the stats are recorded #1364

Merged
merged 1 commit into from
Aug 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions google-cloud-bigtable-stats/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,10 @@
<method>*StatsRecorderWrapper*</method>
<to>*StatsRecorder*</to>
</difference>
<!-- Internal API is updated -->
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/bigtable/stats/StatsRecorderWrapper</className>
<method>void record(java.lang.String, java.lang.String, java.lang.String, java.lang.String)</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public class StatsRecorderWrapper {
private final SpanName spanName;
private final Map<String, String> statsAttributes;

private MeasureMap measureMap;
private MeasureMap attemptMeasureMap;
private MeasureMap operationMeasureMap;

public StatsRecorderWrapper(
OperationType operationType,
Expand All @@ -54,10 +55,11 @@ public StatsRecorderWrapper(
this.parentContext = tagger.getCurrentTagContext();
this.statsAttributes = statsAttributes;

this.measureMap = statsRecorder.newMeasureMap();
this.attemptMeasureMap = statsRecorder.newMeasureMap();
this.operationMeasureMap = statsRecorder.newMeasureMap();
}

public void record(String status, String tableId, String zone, String cluster) {
public void recordOperation(String status, String tableId, String zone, String cluster) {
TagContextBuilder tagCtx =
newTagContextBuilder(tableId, zone, cluster)
.putLocal(BuiltinMeasureConstants.STATUS, TagValue.create(status));
Expand All @@ -66,39 +68,55 @@ public void record(String status, String tableId, String zone, String cluster) {
tagCtx.putLocal(
BuiltinMeasureConstants.STREAMING, TagValue.create(Boolean.toString(isStreaming)));

measureMap.record(tagCtx.build());
operationMeasureMap.record(tagCtx.build());
// Reinitialize a new map
operationMeasureMap = statsRecorder.newMeasureMap();
}

public void recordAttempt(String status, String tableId, String zone, String cluster) {
TagContextBuilder tagCtx =
newTagContextBuilder(tableId, zone, cluster)
.putLocal(BuiltinMeasureConstants.STATUS, TagValue.create(status));

boolean isStreaming = operationType == OperationType.ServerStreaming;
tagCtx.putLocal(
BuiltinMeasureConstants.STREAMING, TagValue.create(Boolean.toString(isStreaming)));

attemptMeasureMap.record(tagCtx.build());
// Reinitialize a new map
attemptMeasureMap = statsRecorder.newMeasureMap();
}

public void putOperationLatencies(long operationLatency) {
measureMap.put(BuiltinMeasureConstants.OPERATION_LATENCIES, operationLatency);
operationMeasureMap.put(BuiltinMeasureConstants.OPERATION_LATENCIES, operationLatency);
}

public void putAttemptLatencies(long attemptLatency) {
measureMap.put(BuiltinMeasureConstants.ATTEMPT_LATENCIES, attemptLatency);
attemptMeasureMap.put(BuiltinMeasureConstants.ATTEMPT_LATENCIES, attemptLatency);
}

public void putRetryCount(int attemptCount) {
measureMap.put(BuiltinMeasureConstants.RETRY_COUNT, attemptCount);
operationMeasureMap.put(BuiltinMeasureConstants.RETRY_COUNT, attemptCount);
}

public void putApplicationLatencies(long applicationLatency) {
measureMap.put(BuiltinMeasureConstants.APPLICATION_LATENCIES, applicationLatency);
operationMeasureMap.put(BuiltinMeasureConstants.APPLICATION_LATENCIES, applicationLatency);
}

public void putFirstResponseLatencies(long firstResponseLatency) {
measureMap.put(BuiltinMeasureConstants.FIRST_RESPONSE_LATENCIES, firstResponseLatency);
operationMeasureMap.put(BuiltinMeasureConstants.FIRST_RESPONSE_LATENCIES, firstResponseLatency);
}

public void putGfeLatencies(long serverLatency) {
measureMap.put(BuiltinMeasureConstants.SERVER_LATENCIES, serverLatency);
attemptMeasureMap.put(BuiltinMeasureConstants.SERVER_LATENCIES, serverLatency);
}

public void putGfeMissingHeaders(long connectivityErrors) {
measureMap.put(BuiltinMeasureConstants.CONNECTIVITY_ERROR_COUNT, connectivityErrors);
attemptMeasureMap.put(BuiltinMeasureConstants.CONNECTIVITY_ERROR_COUNT, connectivityErrors);
}

public void putBatchRequestThrottled(long throttledTimeMs) {
measureMap.put(BuiltinMeasureConstants.THROTTLING_LATENCIES, throttledTimeMs);
operationMeasureMap.put(BuiltinMeasureConstants.THROTTLING_LATENCIES, throttledTimeMs);
}

private TagContextBuilder newTagContextBuilder(String tableId, String zone, String cluster) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ public void testStreamingOperation() throws InterruptedException {
recorderWrapper.putFirstResponseLatencies(firstResponseLatency);
recorderWrapper.putBatchRequestThrottled(throttlingLatency);

recorderWrapper.record("OK", TABLE_ID, ZONE, CLUSTER);
recorderWrapper.recordOperation("OK", TABLE_ID, ZONE, CLUSTER);
recorderWrapper.recordAttempt("OK", TABLE_ID, ZONE, CLUSTER);

Thread.sleep(100);

Expand Down Expand Up @@ -291,7 +292,8 @@ public void testUnaryOperations() throws InterruptedException {
recorderWrapper.putFirstResponseLatencies(firstResponseLatency);
recorderWrapper.putBatchRequestThrottled(throttlingLatency);

recorderWrapper.record("UNAVAILABLE", TABLE_ID, ZONE, CLUSTER);
recorderWrapper.recordOperation("UNAVAILABLE", TABLE_ID, ZONE, CLUSTER);
recorderWrapper.recordAttempt("UNAVAILABLE", TABLE_ID, ZONE, CLUSTER);

Thread.sleep(100);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ private void recordOperationCompletion(@Nullable Throwable status) {
recorder.putFirstResponseLatencies(firstResponsePerOpTimer.elapsed(TimeUnit.MILLISECONDS));
}

recorder.record(Util.extractStatus(status), tableId, zone, cluster);
recorder.recordOperation(Util.extractStatus(status), tableId, zone, cluster);
}

private void recordAttemptCompletion(@Nullable Throwable status) {
Expand All @@ -257,6 +257,6 @@ private void recordAttemptCompletion(@Nullable Throwable status) {
}
}
recorder.putAttemptLatencies(attemptTimer.elapsed(TimeUnit.MILLISECONDS));
recorder.record(Util.extractStatus(status), tableId, zone, cluster);
recorder.recordAttempt(Util.extractStatus(status), tableId, zone, cluster);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -331,18 +331,16 @@ public void testMutateRowAttempts() {
stub.mutateRowCallable()
.call(RowMutation.create(TABLE_ID, "random-row").setCell("cf", "q", "value"));

// record() will get called 4 times, 3 times for attempts and 1 for recording operation level
// metrics. Also set a timeout to reduce flakiness of this test. BasicRetryingFuture will set
// Set a timeout to reduce flakiness of this test. BasicRetryingFuture will set
// attempt succeeded and set the response which will call complete() in AbstractFuture which
// calls releaseWaiters(). onOperationComplete() is called in TracerFinisher which will be
// called after the mutateRow call is returned. So there's a race between when the call returns
// and when the record() is called in onOperationCompletion().
verify(statsRecorderWrapper, timeout(50).times(fakeService.getAttemptCounter().get() + 1))
.record(status.capture(), tableId.capture(), zone.capture(), cluster.capture());
assertThat(zone.getAllValues()).containsExactly("global", "global", ZONE, ZONE);
assertThat(cluster.getAllValues())
.containsExactly("unspecified", "unspecified", CLUSTER, CLUSTER);
assertThat(status.getAllValues()).containsExactly("UNAVAILABLE", "UNAVAILABLE", "OK", "OK");
verify(statsRecorderWrapper, timeout(50).times(fakeService.getAttemptCounter().get()))
.recordAttempt(status.capture(), tableId.capture(), zone.capture(), cluster.capture());
assertThat(zone.getAllValues()).containsExactly("global", "global", ZONE);
assertThat(cluster.getAllValues()).containsExactly("unspecified", "unspecified", CLUSTER);
assertThat(status.getAllValues()).containsExactly("UNAVAILABLE", "UNAVAILABLE", "OK");
}

private static class FakeService extends BigtableGrpc.BigtableImplBase {
Expand Down