From 42a63b046312fde7f1a16b56722fce12bd1f8450 Mon Sep 17 00:00:00 2001 From: Igor Berntein Date: Mon, 11 Sep 2023 18:17:15 -0400 Subject: [PATCH 1/7] fix: add a sanity check that all bulk mutation entries are accounted for Add a fail safe that marks missing entries in a response as permanent errors. Previously the client assumed that all entries were present and only looked for errors Change-Id: Ie3f294fd6bb19ec17662b58bfe9c75a3eed81097 --- .../mutaterows/MutateRowsAttemptCallable.java | 24 ++++++++++++++ .../MutateRowsAttemptCallableTest.java | 33 +++++++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java index 36c2930bda..b049219a95 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java @@ -35,6 +35,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; import com.google.common.util.concurrent.MoreExecutors; import com.google.rpc.Code; import java.util.List; @@ -263,9 +264,12 @@ private void handleAttemptSuccess(List responses) { Builder builder = lastRequest.toBuilder().clearEntries(); List newOriginalIndexes = Lists.newArrayList(); + boolean[] seenIndices = new boolean[currentRequest.getEntriesCount()]; for (MutateRowsResponse response : responses) { for (Entry entry : response.getEntriesList()) { + seenIndices[Ints.checkedCast(entry.getIndex())] = true; + if (entry.getStatus().getCode() == Code.OK_VALUE) { continue; } @@ -288,6 +292,26 @@ private void handleAttemptSuccess(List responses) { } } + // Handle missing mutations + for (int i = 0; i < seenIndices.length; i++) { + if (seenIndices[i]) { + continue; + } + + int origIndex = getOriginalIndex(i); + FailedMutation failedMutation = + FailedMutation.create( + origIndex, + ApiExceptionFactory.createException( + "Missing entry response for entry " + origIndex, + null, + GrpcStatusCode.of(io.grpc.Status.Code.INTERNAL), + false)); + + allFailures.add(failedMutation); + permanentFailures.add(failedMutation); + } + currentRequest = builder.build(); originalIndexes = newOriginalIndexes; diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallableTest.java index 2df2aaf2b4..358ff01cde 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallableTest.java @@ -41,6 +41,8 @@ import java.util.List; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -92,6 +94,37 @@ public void singleEntrySuccessTest() throws Exception { assertThat(innerCallable.lastRequest).isEqualTo(request); } + @Test + public void missingEntry() { + MutateRowsRequest request = + MutateRowsRequest.newBuilder() + .addEntries(Entry.getDefaultInstance()) + .addEntries(Entry.getDefaultInstance()) + .build(); + innerCallable.response.add( + MutateRowsResponse.newBuilder() + .addEntries(MutateRowsResponse.Entry.newBuilder().setIndex(0)) + .build()); + + MutateRowsAttemptCallable attemptCallable = + new MutateRowsAttemptCallable(innerCallable, request, callContext, retryCodes); + attemptCallable.setExternalFuture(parentFuture); + attemptCallable.call(); + + ExecutionException executionException = + Assert.assertThrows(ExecutionException.class, () -> parentFuture.attemptFuture.get()); + assertThat(executionException).hasCauseThat().isInstanceOf(MutateRowsException.class); + MutateRowsException e = (MutateRowsException) executionException.getCause(); + + assertThat(e).hasMessageThat().contains("Some mutations failed to apply"); + assertThat(e.getFailedMutations()).hasSize(1); + FailedMutation failedMutation = e.getFailedMutations().get(0); + assertThat(failedMutation.getIndex()).isEqualTo(1); + assertThat(failedMutation.getError()) + .hasMessageThat() + .contains("Missing entry response for entry 1"); + } + @Test public void testNoRpcTimeout() { parentFuture.timedAttemptSettings = From 35adef3bc3414b429a6d4a09e5e8240a646c4610 Mon Sep 17 00:00:00 2001 From: Igor Berntein Date: Mon, 11 Sep 2023 23:23:20 -0400 Subject: [PATCH 2/7] fix tests Change-Id: I7619ef3ca3921ecc92efd65dc16c86548d51af3f --- .../data/v2/stub/metrics/BigtableTracerCallableTest.java | 6 +++++- .../data/v2/stub/metrics/BuiltinMetricsTracerTest.java | 6 +++++- .../bigtable/data/v2/stub/metrics/MetricsTracerTest.java | 7 ++++++- .../data/v2/stub/metrics/StatsHeadersCallableTest.java | 6 +++++- .../data/v2/stub/mutaterows/MutateRowsRetryTest.java | 6 +++++- 5 files changed, 26 insertions(+), 5 deletions(-) diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerCallableTest.java index e783352bf0..d8e3402b84 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracerCallableTest.java @@ -403,7 +403,11 @@ public void mutateRow(MutateRowRequest request, StreamObserver observer) { - observer.onNext(MutateRowsResponse.getDefaultInstance()); + MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder(); + for (int i = 0; i < request.getEntriesCount(); i++) { + builder.addEntries(MutateRowsResponse.Entry.newBuilder().setIndex(i)); + } + observer.onNext(builder.build()); observer.onCompleted(); } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java index c7a47942f2..c2be1ea0ff 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracerTest.java @@ -648,7 +648,11 @@ public void mutateRows( Thread.sleep(SERVER_LATENCY); } catch (InterruptedException e) { } - responseObserver.onNext(MutateRowsResponse.getDefaultInstance()); + MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder(); + for (int i = 0; i < request.getEntriesCount(); i++) { + builder.addEntriesBuilder().setIndex(i); + } + responseObserver.onNext(builder.build()); responseObserver.onCompleted(); } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java index da3dd0770a..da989b65dc 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracerTest.java @@ -422,10 +422,15 @@ public void testBatchMutateRowsThrottledTime() throws Exception { new Answer() { @Override public Object answer(InvocationOnMock invocation) { + MutateRowsRequest request = (MutateRowsRequest) invocation.getArguments()[0]; @SuppressWarnings("unchecked") StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; - observer.onNext(MutateRowsResponse.getDefaultInstance()); + MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder(); + for (int i = 0; i < request.getEntriesCount(); i++) { + builder.addEntriesBuilder().setIndex(i); + } + observer.onNext(builder.build()); observer.onCompleted(); return null; } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsHeadersCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsHeadersCallableTest.java index 538d4fc246..88a874b8c9 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsHeadersCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/metrics/StatsHeadersCallableTest.java @@ -223,7 +223,11 @@ public void mutateRows(MutateRowsRequest request, StreamObserver responseObserver) { attemptCounter.incrementAndGet(); if (expectations.isEmpty()) { - responseObserver.onNext(MutateRowsResponse.getDefaultInstance()); + MutateRowsResponse.Builder builder = MutateRowsResponse.newBuilder(); + for (int i = 0; i < request.getEntriesCount(); i++) { + builder.addEntriesBuilder().setIndex(i); + } + responseObserver.onNext(builder.build()); responseObserver.onCompleted(); } else { Exception expectedRpc = expectations.poll(); From 3d48bf5e36de06d16b98a058cf3698b0f160f228 Mon Sep 17 00:00:00 2001 From: Igor Berntein Date: Tue, 12 Sep 2023 16:15:00 -0400 Subject: [PATCH 3/7] codestyle Change-Id: I3d4a5780bd3f7572c9889c10e60b908b1e05b359 --- .../mutaterows/MutateRowsAttemptCallable.java | 39 +++++++++---------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java index b049219a95..4cb2b2a750 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsAttemptCallable.java @@ -294,30 +294,29 @@ private void handleAttemptSuccess(List responses) { // Handle missing mutations for (int i = 0; i < seenIndices.length; i++) { - if (seenIndices[i]) { - continue; - } + if (!seenIndices[i]) { - int origIndex = getOriginalIndex(i); - FailedMutation failedMutation = - FailedMutation.create( - origIndex, - ApiExceptionFactory.createException( - "Missing entry response for entry " + origIndex, - null, - GrpcStatusCode.of(io.grpc.Status.Code.INTERNAL), - false)); + int origIndex = getOriginalIndex(i); + FailedMutation failedMutation = + FailedMutation.create( + origIndex, + ApiExceptionFactory.createException( + "Missing entry response for entry " + origIndex, + null, + GrpcStatusCode.of(io.grpc.Status.Code.INTERNAL), + false)); - allFailures.add(failedMutation); - permanentFailures.add(failedMutation); - } + allFailures.add(failedMutation); + permanentFailures.add(failedMutation); + } - currentRequest = builder.build(); - originalIndexes = newOriginalIndexes; + currentRequest = builder.build(); + originalIndexes = newOriginalIndexes; - if (!allFailures.isEmpty()) { - boolean isRetryable = builder.getEntriesCount() > 0; - throw new MutateRowsException(null, allFailures, isRetryable); + if (!allFailures.isEmpty()) { + boolean isRetryable = builder.getEntriesCount() > 0; + throw new MutateRowsException(null, allFailures, isRetryable); + } } } From 4c2751805a2693a55be100443f2d8528b9ebdcc9 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Thu, 31 Aug 2023 11:16:55 -0400 Subject: [PATCH 4/7] fix: set wait timeout on watchdog --- .../data/v2/stub/EnhancedBigtableStub.java | 4 +++ .../v2/stub/EnhancedBigtableStubSettings.java | 6 +++-- .../v2/stub/EnhancedBigtableStubTest.java | 25 +++++++++++++++++++ 3 files changed, 33 insertions(+), 2 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 474c140392..0420e47dcf 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -458,6 +458,7 @@ public Map extract(ReadRowsRequest readRowsRequest) { .setRetryableCodes(readRowsSettings.getRetryableCodes()) .setRetrySettings(readRowsSettings.getRetrySettings()) .setIdleTimeout(readRowsSettings.getIdleTimeout()) + .setWaitTimeout(readRowsSettings.getWaitTimeout()) .build(); ServerStreamingCallable watched = @@ -906,6 +907,8 @@ public Map extract( settings.generateInitialChangeStreamPartitionsSettings().getRetrySettings()) .setIdleTimeout( settings.generateInitialChangeStreamPartitionsSettings().getIdleTimeout()) + .setWaitTimeout( + settings.generateInitialChangeStreamPartitionsSettings().getWaitTimeout()) .build(); ServerStreamingCallable watched = @@ -980,6 +983,7 @@ public Map extract( .setRetryableCodes(settings.readChangeStreamSettings().getRetryableCodes()) .setRetrySettings(settings.readChangeStreamSettings().getRetrySettings()) .setIdleTimeout(settings.readChangeStreamSettings().getIdleTimeout()) + .setWaitTimeout(settings.readChangeStreamSettings().getWaitTimeout()) .build(); ServerStreamingCallable watched = diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index fd54daa0d5..9fb574c3e8 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -714,13 +714,15 @@ private Builder() { generateInitialChangeStreamPartitionsSettings .setRetryableCodes(GENERATE_INITIAL_CHANGE_STREAM_PARTITIONS_RETRY_CODES) .setRetrySettings(GENERATE_INITIAL_CHANGE_STREAM_PARTITIONS_RETRY_SETTINGS) - .setIdleTimeout(Duration.ofMinutes(5)); + .setIdleTimeout(Duration.ofMinutes(5)) + .setWaitTimeout(Duration.ofMinutes(5)); readChangeStreamSettings = ServerStreamingCallSettings.newBuilder(); readChangeStreamSettings .setRetryableCodes(READ_CHANGE_STREAM_RETRY_CODES) .setRetrySettings(READ_CHANGE_STREAM_RETRY_SETTINGS) - .setIdleTimeout(Duration.ofMinutes(5)); + .setIdleTimeout(Duration.ofMinutes(5)) + .setWaitTimeout(Duration.ofMinutes(5)); pingAndWarmSettings = UnaryCallSettings.newUnaryCallSettingsBuilder(); pingAndWarmSettings.setRetrySettings( diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java index ed3cec5d95..32811d05fb 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java @@ -31,6 +31,7 @@ import com.google.api.gax.grpc.GrpcTransportChannel; import com.google.api.gax.rpc.FixedTransportChannelProvider; import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.WatchdogTimeoutException; import com.google.auth.oauth2.ServiceAccountJwtAccessCredentials; import com.google.bigtable.v2.BigtableGrpc; import com.google.bigtable.v2.FeatureFlags; @@ -82,11 +83,13 @@ import java.security.NoSuchAlgorithmException; import java.util.Base64; import java.util.Collection; +import java.util.Iterator; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -544,6 +547,21 @@ public void testBulkMutationFlowControlFeatureFlagIsNotSet() throws Exception { assertThat(featureFlags.getMutateRowsRateLimit()).isFalse(); } + @Test + public void testWaitTimeoutIsSet() throws Exception { + EnhancedBigtableStubSettings.Builder settings = defaultSettings.toBuilder(); + settings.readRowsSettings().setWaitTimeout(Duration.ofMillis(100)); + EnhancedBigtableStub stub = EnhancedBigtableStub.create(settings.build()); + Iterator iterator = + stub.readRowsCallable().call(Query.create("test-wait-timeout")).iterator(); + try { + iterator.next(); + Assert.fail("Should throw watchdog timeout exception"); + } catch (WatchdogTimeoutException e) { + assertThat(e.getMessage()).contains("Canceled due to timeout waiting for next response"); + } + } + private static class MetadataInterceptor implements ServerInterceptor { final BlockingQueue headers = Queues.newLinkedBlockingDeque(); @@ -593,6 +611,13 @@ public void mutateRows( @Override public void readRows( ReadRowsRequest request, StreamObserver responseObserver) { + if (request.getTableName().contains("test-wait-timeout")) { + try { + Thread.sleep(60 * 1000 * 6); + } catch (Exception e) { + + } + } requests.add(request); // Dummy row for stream responseObserver.onNext( From b2fca62cae8d7aaf65b7ca3421479b0c689ae3b5 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Thu, 31 Aug 2023 11:27:39 -0400 Subject: [PATCH 5/7] shorten test runtime --- .../data/v2/stub/EnhancedBigtableStubTest.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java index 32811d05fb..aa174bd144 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java @@ -30,6 +30,7 @@ import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.grpc.GrpcTransportChannel; import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.InstantiatingWatchdogProvider; import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.WatchdogTimeoutException; import com.google.auth.oauth2.ServiceAccountJwtAccessCredentials; @@ -104,6 +105,8 @@ public class EnhancedBigtableStubTest { private static final String TABLE_NAME = NameUtil.formatTableName(PROJECT_ID, INSTANCE_ID, "fake-table"); private static final String APP_PROFILE_ID = "app-profile-id"; + private static final String WAIT_TIME_TABLE_ID = "test-wait-timeout"; + private static final Duration WATCHDOG_CHECK_DURATION = Duration.ofMillis(100); private Server server; private MetadataInterceptor metadataInterceptor; @@ -550,10 +553,14 @@ public void testBulkMutationFlowControlFeatureFlagIsNotSet() throws Exception { @Test public void testWaitTimeoutIsSet() throws Exception { EnhancedBigtableStubSettings.Builder settings = defaultSettings.toBuilder(); - settings.readRowsSettings().setWaitTimeout(Duration.ofMillis(100)); + // Set a shorter wait timeout and make watchdog checks more frequently + settings.readRowsSettings().setWaitTimeout(WATCHDOG_CHECK_DURATION.dividedBy(2)); + settings.setStreamWatchdogProvider( + InstantiatingWatchdogProvider.create().withCheckInterval(WATCHDOG_CHECK_DURATION)); + EnhancedBigtableStub stub = EnhancedBigtableStub.create(settings.build()); Iterator iterator = - stub.readRowsCallable().call(Query.create("test-wait-timeout")).iterator(); + stub.readRowsCallable().call(Query.create(WAIT_TIME_TABLE_ID)).iterator(); try { iterator.next(); Assert.fail("Should throw watchdog timeout exception"); @@ -611,9 +618,9 @@ public void mutateRows( @Override public void readRows( ReadRowsRequest request, StreamObserver responseObserver) { - if (request.getTableName().contains("test-wait-timeout")) { + if (request.getTableName().contains(WAIT_TIME_TABLE_ID)) { try { - Thread.sleep(60 * 1000 * 6); + Thread.sleep(WATCHDOG_CHECK_DURATION.toMillis() * 2); } catch (Exception e) { } From 11f06af091c8065c879d65dcf72f4a240680a6db Mon Sep 17 00:00:00 2001 From: Igor Berntein Date: Tue, 12 Sep 2023 17:55:10 -0400 Subject: [PATCH 6/7] lower cdc wait timeouts Change-Id: I3dcbe9f7c0e063852b163a11057a0121aa327665 --- .../bigtable/data/v2/stub/EnhancedBigtableStubSettings.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index 9fb574c3e8..4e6b06f750 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -715,14 +715,14 @@ private Builder() { .setRetryableCodes(GENERATE_INITIAL_CHANGE_STREAM_PARTITIONS_RETRY_CODES) .setRetrySettings(GENERATE_INITIAL_CHANGE_STREAM_PARTITIONS_RETRY_SETTINGS) .setIdleTimeout(Duration.ofMinutes(5)) - .setWaitTimeout(Duration.ofMinutes(5)); + .setWaitTimeout(Duration.ofMinutes(1)); readChangeStreamSettings = ServerStreamingCallSettings.newBuilder(); readChangeStreamSettings .setRetryableCodes(READ_CHANGE_STREAM_RETRY_CODES) .setRetrySettings(READ_CHANGE_STREAM_RETRY_SETTINGS) .setIdleTimeout(Duration.ofMinutes(5)) - .setWaitTimeout(Duration.ofMinutes(5)); + .setWaitTimeout(Duration.ofMinutes(1)); pingAndWarmSettings = UnaryCallSettings.newUnaryCallSettingsBuilder(); pingAndWarmSettings.setRetrySettings( From d7261b7f22eee13b8deb4b3c8871090be72f8f60 Mon Sep 17 00:00:00 2001 From: Igor Berntein Date: Wed, 13 Sep 2023 16:38:11 -0400 Subject: [PATCH 7/7] add a test Change-Id: I7122fe2b53a1ce94aade23830ea95317ccf81595 --- .../v2/stub/EnhancedBigtableStubTest.java | 47 +++++++++++++++++-- 1 file changed, 43 insertions(+), 4 deletions(-) diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java index aa174bd144..a0d56f2344 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubTest.java @@ -40,6 +40,8 @@ import com.google.bigtable.v2.MutateRowsResponse; import com.google.bigtable.v2.PingAndWarmRequest; import com.google.bigtable.v2.PingAndWarmResponse; +import com.google.bigtable.v2.ReadChangeStreamRequest; +import com.google.bigtable.v2.ReadChangeStreamResponse; import com.google.bigtable.v2.ReadRowsRequest; import com.google.bigtable.v2.ReadRowsResponse; import com.google.bigtable.v2.RowSet; @@ -48,11 +50,8 @@ import com.google.cloud.bigtable.data.v2.BigtableDataSettings; import com.google.cloud.bigtable.data.v2.FakeServiceBuilder; import com.google.cloud.bigtable.data.v2.internal.RequestContext; -import com.google.cloud.bigtable.data.v2.models.BulkMutation; -import com.google.cloud.bigtable.data.v2.models.DefaultRowAdapter; -import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.*; import com.google.cloud.bigtable.data.v2.models.Row; -import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Queues; import com.google.common.io.BaseEncoding; @@ -569,6 +568,27 @@ public void testWaitTimeoutIsSet() throws Exception { } } + @Test + public void testReadChangeStreamWaitTimeoutIsSet() throws Exception { + EnhancedBigtableStubSettings.Builder settings = defaultSettings.toBuilder(); + // Set a shorter wait timeout and make watchdog checks more frequently + settings.readChangeStreamSettings().setWaitTimeout(WATCHDOG_CHECK_DURATION.dividedBy(2)); + settings.setStreamWatchdogProvider( + InstantiatingWatchdogProvider.create().withCheckInterval(WATCHDOG_CHECK_DURATION)); + + EnhancedBigtableStub stub = EnhancedBigtableStub.create(settings.build()); + Iterator iterator = + stub.readChangeStreamCallable() + .call(ReadChangeStreamQuery.create(WAIT_TIME_TABLE_ID)) + .iterator(); + try { + iterator.next(); + Assert.fail("Should throw watchdog timeout exception"); + } catch (WatchdogTimeoutException e) { + assertThat(e.getMessage()).contains("Canceled due to timeout waiting for next response"); + } + } + private static class MetadataInterceptor implements ServerInterceptor { final BlockingQueue headers = Queues.newLinkedBlockingDeque(); @@ -597,6 +617,8 @@ public Listener interceptCall( private static class FakeDataService extends BigtableGrpc.BigtableImplBase { final BlockingQueue requests = Queues.newLinkedBlockingDeque(); + final BlockingQueue readChangeReadStreamRequests = + Queues.newLinkedBlockingDeque(); final BlockingQueue pingRequests = Queues.newLinkedBlockingDeque(); @SuppressWarnings("unchecked") @@ -640,6 +662,23 @@ public void readRows( responseObserver.onCompleted(); } + @Override + public void readChangeStream( + ReadChangeStreamRequest request, + StreamObserver responseObserver) { + if (request.getTableName().contains(WAIT_TIME_TABLE_ID)) { + try { + Thread.sleep(WATCHDOG_CHECK_DURATION.toMillis() * 2); + } catch (Exception e) { + + } + } + readChangeReadStreamRequests.add(request); + // Dummy row for stream + responseObserver.onNext(ReadChangeStreamResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + @Override public void pingAndWarm( PingAndWarmRequest request, StreamObserver responseObserver) {