diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java index a8b8148d3e..2d8262a93e 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java @@ -273,7 +273,8 @@ private void recordAttemptCompletion(@Nullable Throwable status) { } } - recorder.putClientBlockingLatencies(totalClientBlockingTime.get()); + // Make sure to reset the blocking time after recording it for the next attempt + recorder.putClientBlockingLatencies(totalClientBlockingTime.getAndSet(0)); // Patch the status until it's fixed in gax. When an attempt failed, // it'll throw a ServerStreamingAttemptException. Unwrap the exception diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java index 3b6b1b40ae..0ffabe2606 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java @@ -59,6 +59,9 @@ class MetricsTracer extends BigtableTracer { private volatile int attempt = 0; + private volatile boolean reportBatchingLatency = false; + private volatile long batchThrottledLatency = 0; + MetricsTracer( OperationType operationType, Tagger tagger, @@ -167,6 +170,14 @@ private void recordAttemptCompletion(@Nullable Throwable throwable) { RpcMeasureConstants.BIGTABLE_ATTEMPT_LATENCY, attemptTimer.elapsed(TimeUnit.MILLISECONDS)); + if (reportBatchingLatency) { + measures.put(RpcMeasureConstants.BIGTABLE_BATCH_THROTTLED_TIME, batchThrottledLatency); + + // Reset batch throttling latency for next attempt. This can't be done in attemptStarted + // because batching flow control will add batching latency before the attempt has started. + batchThrottledLatency = 0; + } + // Patch the throwable until it's fixed in gax. When an attempt failed, // it'll throw a ServerStreamingAttemptException. Unwrap the exception // so it could get processed by extractStatus @@ -216,11 +227,8 @@ public void recordGfeMetadata(@Nullable Long latency, @Nullable Throwable throwa @Override public void batchRequestThrottled(long totalThrottledMs) { - MeasureMap measures = - stats - .newMeasureMap() - .put(RpcMeasureConstants.BIGTABLE_BATCH_THROTTLED_TIME, totalThrottledMs); - measures.record(newTagCtxBuilder().build()); + reportBatchingLatency = true; + batchThrottledLatency += totalThrottledMs; } private TagContextBuilder newTagCtxBuilder() { diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java index 8cf8dbd356..da3dd0770a 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java @@ -20,10 +20,8 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.when; -import com.google.api.gax.batching.BatchResource; import com.google.api.gax.batching.Batcher; import com.google.api.gax.batching.BatcherImpl; -import com.google.api.gax.batching.BatchingDescriptor; import com.google.api.gax.batching.FlowController; import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.rpc.ApiCallContext; @@ -387,45 +385,38 @@ public Object answer(InvocationOnMock invocation) { .when(mockService) .readRows(any(ReadRowsRequest.class), any()); - try (Batcher batcher = + try (Batcher batcher = stub.newBulkReadRowsBatcher(Query.create(TABLE_ID), GrpcCallContext.createDefault())) { batcher.add(ByteString.copyFromUtf8("row1")); - batcher.sendOutstanding(); - - long throttledTimeMetric = - StatsTestUtils.getAggregationValueAsLong( - localStats, - RpcViewConstants.BIGTABLE_BATCH_THROTTLED_TIME_VIEW, - ImmutableMap.of( - RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows")), - PROJECT_ID, - INSTANCE_ID, - APP_PROFILE_ID); - assertThat(throttledTimeMetric).isEqualTo(0); } + + long throttledTimeMetric = + StatsTestUtils.getAggregationValueAsLong( + localStats, + RpcViewConstants.BIGTABLE_BATCH_THROTTLED_TIME_VIEW, + ImmutableMap.of(RpcMeasureConstants.BIGTABLE_OP, TagValue.create("Bigtable.ReadRows")), + PROJECT_ID, + INSTANCE_ID, + APP_PROFILE_ID); + assertThat(throttledTimeMetric).isEqualTo(0); } @Test public void testBatchMutateRowsThrottledTime() throws Exception { FlowController flowController = Mockito.mock(FlowController.class); - BatchingDescriptor batchingDescriptor = Mockito.mock(MutateRowsBatchingDescriptor.class); - when(batchingDescriptor.createResource(any())).thenReturn(new FakeBatchResource()); - when(batchingDescriptor.createEmptyResource()).thenReturn(new FakeBatchResource()); + MutateRowsBatchingDescriptor batchingDescriptor = new MutateRowsBatchingDescriptor(); + // Mock throttling final long throttled = 50; doAnswer( - new Answer() { - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - Thread.sleep(throttled); - return null; - } + invocation -> { + Thread.sleep(throttled); + return null; }) .when(flowController) .reserve(any(Long.class), any(Long.class)); when(flowController.getMaxElementCountLimit()).thenReturn(null); when(flowController.getMaxRequestBytesLimit()).thenReturn(null); - when(batchingDescriptor.newRequestBuilder(any())).thenCallRealMethod(); doAnswer( new Answer() { @@ -444,18 +435,18 @@ public Object answer(InvocationOnMock invocation) { ApiCallContext defaultContext = GrpcCallContext.createDefault(); - Batcher batcher = - new BatcherImpl( + try (Batcher batcher = + new BatcherImpl<>( batchingDescriptor, stub.bulkMutateRowsCallable().withDefaultCallContext(defaultContext), BulkMutation.create(TABLE_ID), settings.getStubSettings().bulkMutateRowsSettings().getBatchingSettings(), Executors.newSingleThreadScheduledExecutor(), flowController, - defaultContext); + defaultContext)) { - batcher.add(RowMutationEntry.create("key")); - batcher.sendOutstanding(); + batcher.add(RowMutationEntry.create("key").deleteRow()); + } long throttledTimeMetric = StatsTestUtils.getAggregationValueAsLong( @@ -473,29 +464,4 @@ public Object answer(InvocationOnMock invocation) { private static StreamObserver anyObserver(Class returnType) { return (StreamObserver) any(returnType); } - - private class FakeBatchResource implements BatchResource { - - FakeBatchResource() {} - - @Override - public BatchResource add(BatchResource resource) { - return new FakeBatchResource(); - } - - @Override - public long getElementCount() { - return 1; - } - - @Override - public long getByteCount() { - return 1; - } - - @Override - public boolean shouldFlush(long maxElementThreshold, long maxBytesThreshold) { - return false; - } - } } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsTestUtils.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsTestUtils.java index 6aede96161..e808af8a84 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsTestUtils.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsTestUtils.java @@ -299,6 +299,11 @@ public static long getAggregationValueAsLong( AggregationData aggregationData = aggregationMap.get(tagValues); + if (aggregationData == null) { + throw new RuntimeException( + "Failed to find metric for: " + tags + ". Current aggregation data: " + aggregationMap); + } + return aggregationData.match( new io.opencensus.common.Function() { @Override