diff --git a/google-cloud-clients/google-cloud-bigtable-admin/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClient.java b/google-cloud-clients/google-cloud-bigtable-admin/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClient.java index cfd358c994bb..eba6a51c4f01 100644 --- a/google-cloud-clients/google-cloud-bigtable-admin/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClient.java +++ b/google-cloud-clients/google-cloud-bigtable-admin/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClient.java @@ -19,23 +19,18 @@ import com.google.api.core.ApiFunction; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; -import com.google.bigtable.admin.v2.CheckConsistencyResponse; import com.google.bigtable.admin.v2.DeleteTableRequest; import com.google.bigtable.admin.v2.DropRowRangeRequest; -import com.google.bigtable.admin.v2.GenerateConsistencyTokenRequest; -import com.google.bigtable.admin.v2.GenerateConsistencyTokenResponse; import com.google.bigtable.admin.v2.GetTableRequest; import com.google.bigtable.admin.v2.InstanceName; import com.google.bigtable.admin.v2.ListTablesRequest; import com.google.bigtable.admin.v2.TableName; import com.google.cloud.bigtable.admin.v2.BaseBigtableTableAdminClient.ListTablesPage; import com.google.cloud.bigtable.admin.v2.BaseBigtableTableAdminClient.ListTablesPagedResponse; -import com.google.cloud.bigtable.admin.v2.models.ConsistencyToken; import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest; import com.google.cloud.bigtable.admin.v2.models.ModifyColumnFamiliesRequest; import com.google.cloud.bigtable.admin.v2.models.Table; import com.google.cloud.bigtable.admin.v2.stub.EnhancedBigtableTableAdminStub; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; @@ -649,91 +644,57 @@ public ApiFuture dropAllRowsAsync(String tableId) { } /** - * Generates a token to verify the replication status of table mutations invoked before this call. - * Token expires in 90 days + * Blocks until replication has caught up to the point this method was called. This allows callers + * to make sure that their mutations have been replicated across all of their clusters. * - *

Sample code: + *

Sample code * *

{@code
-   * ConsistencyToken consistencyToken = client.generateConsistencyToken("my-table");
+   * client.awaitReplication("my-table");
    * }
- */ - @SuppressWarnings("WeakerAccess") - public ConsistencyToken generateConsistencyToken(String tableId) { - return awaitFuture(generateConsistencyTokenAsync(tableId)); - } - - /** - * Asynchornously generates a token to verify the replication status of table mutations invoked - * before this call. Token expires in 90 days - * - *

Sample code: * - *

{@code
-   * ApiFuture consistencyTokenFuture = client.generateConsistencyToken("my-table");
-   * }
+ * @throws com.google.api.gax.retrying.PollException when polling exceeds the total timeout */ - // TODO(igorbernstein2): add sample code for waiting for the fetch consistency token @SuppressWarnings("WeakerAccess") - public ApiFuture generateConsistencyTokenAsync(final String tableId) { - GenerateConsistencyTokenRequest request = GenerateConsistencyTokenRequest.newBuilder() - .setName(getTableName(tableId)) - .build(); - - return ApiFutures.transform( - stub.generateConsistencyTokenCallable().futureCall(request), - new ApiFunction() { - @Override - public ConsistencyToken apply(GenerateConsistencyTokenResponse proto) { - TableName tableName = TableName - .of(instanceName.getProject(), instanceName.getInstance(), tableId); - return ConsistencyToken.of(tableName, proto.getConsistencyToken()); - } - }, - MoreExecutors.directExecutor()); + public void awaitReplication(String tableId) { + TableName tableName = TableName + .of(instanceName.getProject(), instanceName.getInstance(), tableId); + awaitFuture(stub.awaitReplicationCallable().futureCall(tableName)); } /** - * Checks replication consistency for the specified token consistency token + * Returns a future that is resolved when replication has caught up to the point this method was + * called. This allows callers to make sure that their mutations have been replicated across all + * of their clusters. * *

Sample code: * *

{@code
-   * try(BigtableTableAdminClient client =  BigtableTableAdminClient.create(InstanceName.of("[PROJECT]", "[INSTANCE]"))) {
-   *   // Perform some mutations.
+   * ApiFuture replicationFuture = client.awaitReplicationAsync("my-table");
    *
-   *   ConsistencyToken token = client.generateConsistencyToken("table-id");
-   *   while(!client.isConsistent(token)) {
-   *     Thread.sleep(100);
-   *   }
+   * ApiFutures.addCallback(
+   *   replicationFuture,
+   *   new ApiFutureCallback() {
+   *     public void onSuccess(Table table) {
+   *       System.out.println("All clusters are now consistent");
+   *     }
+   *
+   *     public void onFailure(Throwable t) {
+   *       t.printStackTrace();
+   *     }
+   *   },
+   *   MoreExecutors.directExecutor()
+   * );
    *
-   *   // Now all clusters are consistent
-   * }
    * }
*/ @SuppressWarnings("WeakerAccess") - public boolean isConsistent(ConsistencyToken token) { - return awaitFuture(isConsistentAsync(token)); + public ApiFuture awaitReplicationAsync(final String tableId) { + TableName tableName = TableName + .of(instanceName.getProject(), instanceName.getInstance(), tableId); + return stub.awaitReplicationCallable().futureCall(tableName); } - @VisibleForTesting - ApiFuture isConsistentAsync(ConsistencyToken token) { - ApiFuture checkConsResp = stub.checkConsistencyCallable() - .futureCall(token.toProto(instanceName)); - - return ApiFutures.transform( - checkConsResp, - new ApiFunction() { - @Override - public Boolean apply(CheckConsistencyResponse input) { - return input.getConsistent(); - } - }, - MoreExecutors.directExecutor()); - } - - // TODO(igorbernstein2): add awaitConsist() & awaitConsistAsync() that generate & poll a token - /** * Helper method to construct the table name in format: projects/{project}/instances/{instance}/tables/{tableId} */ diff --git a/google-cloud-clients/google-cloud-bigtable-admin/src/main/java/com/google/cloud/bigtable/admin/v2/models/ConsistencyToken.java b/google-cloud-clients/google-cloud-bigtable-admin/src/main/java/com/google/cloud/bigtable/admin/v2/models/ConsistencyToken.java deleted file mode 100644 index 4ad56a0bbdda..000000000000 --- a/google-cloud-clients/google-cloud-bigtable-admin/src/main/java/com/google/cloud/bigtable/admin/v2/models/ConsistencyToken.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2018 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.bigtable.admin.v2.models; - -import com.google.api.core.InternalApi; -import com.google.api.core.InternalExtensionOnly; -import com.google.auto.value.AutoValue; -import com.google.bigtable.admin.v2.CheckConsistencyRequest; -import com.google.bigtable.admin.v2.GenerateConsistencyTokenResponse; -import com.google.bigtable.admin.v2.InstanceName; -import com.google.bigtable.admin.v2.TableName; -import com.google.common.base.Preconditions; - -/** - * Wrapper for {@link GenerateConsistencyTokenResponse#getConsistencyToken()} - * - *

Cannot be created. They are obtained by invoking {@link - * com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient#generateConsistencyToken(String)} - */ -@InternalExtensionOnly -@AutoValue -public abstract class ConsistencyToken { - public static ConsistencyToken of(TableName tableName, String token) { - return new AutoValue_ConsistencyToken(tableName, token); - } - - abstract TableName getTableName(); - abstract String getToken(); - - @InternalApi - public CheckConsistencyRequest toProto(InstanceName instanceName) { - Preconditions.checkArgument( - instanceName.equals(InstanceName.of(getTableName().getProject(), getTableName().getInstance())), - "Consistency tokens are only valid within a single instance."); - - return CheckConsistencyRequest.newBuilder() - .setName(getTableName().toString()) - .setConsistencyToken(getToken()) - .build(); - } -} diff --git a/google-cloud-clients/google-cloud-bigtable-admin/src/main/java/com/google/cloud/bigtable/admin/v2/stub/AwaitReplicationCallable.java b/google-cloud-clients/google-cloud-bigtable-admin/src/main/java/com/google/cloud/bigtable/admin/v2/stub/AwaitReplicationCallable.java new file mode 100644 index 000000000000..6620adfe3163 --- /dev/null +++ b/google-cloud-clients/google-cloud-bigtable-admin/src/main/java/com/google/cloud/bigtable/admin/v2/stub/AwaitReplicationCallable.java @@ -0,0 +1,195 @@ +/* + * Copyright 2018 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.admin.v2.stub; + +import com.google.api.core.ApiAsyncFunction; +import com.google.api.core.ApiFunction; +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; +import com.google.api.gax.retrying.ExponentialPollAlgorithm; +import com.google.api.gax.retrying.NonCancellableFuture; +import com.google.api.gax.retrying.ResultRetryAlgorithm; +import com.google.api.gax.retrying.RetryAlgorithm; +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.retrying.RetryingExecutor; +import com.google.api.gax.retrying.RetryingFuture; +import com.google.api.gax.retrying.ScheduledRetryingExecutor; +import com.google.api.gax.retrying.TimedAttemptSettings; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ClientContext; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.bigtable.admin.v2.CheckConsistencyRequest; +import com.google.bigtable.admin.v2.CheckConsistencyResponse; +import com.google.bigtable.admin.v2.GenerateConsistencyTokenRequest; +import com.google.bigtable.admin.v2.GenerateConsistencyTokenResponse; +import com.google.bigtable.admin.v2.TableName; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.MoreExecutors; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; + +/** + * Callable that waits until replication has caught up to the point it was called. + * + *

This callable wraps GenerateConsistencyToken and CheckConsistency RPCs. It will generate a + * token then poll until isConsistent is true. + */ +class AwaitReplicationCallable extends UnaryCallable { + private final UnaryCallable generateCallable; + private final UnaryCallable checkCallable; + private final RetryingExecutor executor; + + static AwaitReplicationCallable create( + UnaryCallable generateCallable, + UnaryCallable checkCallable, + ClientContext clientContext, + RetrySettings pollingSettings) { + + RetryAlgorithm retryAlgorithm = new RetryAlgorithm<>( + new PollResultAlgorithm(), + new ExponentialPollAlgorithm(pollingSettings, clientContext.getClock()) + ); + + RetryingExecutor retryingExecutor = new ScheduledRetryingExecutor<>( + retryAlgorithm, + clientContext.getExecutor() + ); + + return new AwaitReplicationCallable( + generateCallable, + checkCallable, + retryingExecutor + ); + } + + @VisibleForTesting + AwaitReplicationCallable( + UnaryCallable generateCallable, + UnaryCallable checkCallable, + RetryingExecutor executor) { + this.generateCallable = generateCallable; + this.checkCallable = checkCallable; + this.executor = executor; + } + + @Override + public ApiFuture futureCall(final TableName tableName, final ApiCallContext context) { + ApiFuture tokenFuture = generateToken(tableName, context); + + return ApiFutures.transformAsync( + tokenFuture, + new ApiAsyncFunction() { + @Override + public ApiFuture apply(GenerateConsistencyTokenResponse input) { + CheckConsistencyRequest request = CheckConsistencyRequest.newBuilder() + .setName(tableName.toString()) + .setConsistencyToken(input.getConsistencyToken()) + .build(); + + return pollToken(request, context); + } + }, + MoreExecutors.directExecutor() + ); + } + + private ApiFuture generateToken(TableName tableName, + ApiCallContext context) { + GenerateConsistencyTokenRequest generateRequest = GenerateConsistencyTokenRequest.newBuilder() + .setName(tableName.toString()) + .build(); + return generateCallable.futureCall(generateRequest, context); + } + + private ApiFuture pollToken(CheckConsistencyRequest request, ApiCallContext context) { + AttemptCallable attemptCallable = + new AttemptCallable<>(checkCallable, request, context); + RetryingFuture retryingFuture = executor + .createFuture(attemptCallable); + attemptCallable.setExternalFuture(retryingFuture); + attemptCallable.call(); + + return ApiFutures.transform( + retryingFuture, + new ApiFunction() { + @Override + public Void apply(CheckConsistencyResponse input) { + return null; + } + }, + MoreExecutors.directExecutor() + ); + } + + /** + * A callable representing an attempt to make an RPC call. + */ + private static class AttemptCallable implements Callable { + private final UnaryCallable callable; + private final RequestT request; + + private volatile RetryingFuture externalFuture; + private volatile ApiCallContext callContext; + + AttemptCallable( + UnaryCallable callable, RequestT request, ApiCallContext callContext) { + this.callable = callable; + this.request = request; + this.callContext = callContext; + } + + void setExternalFuture(RetryingFuture externalFuture) { + this.externalFuture = externalFuture; + } + + @Override + public ResponseT call() { + try { + // NOTE: unlike gax's AttemptCallable, this ignores rpc timeouts + externalFuture.setAttemptFuture(new NonCancellableFuture()); + if (externalFuture.isDone()) { + return null; + } + ApiFuture internalFuture = callable.futureCall(request, callContext); + externalFuture.setAttemptFuture(internalFuture); + } catch (Throwable e) { + externalFuture.setAttemptFuture(ApiFutures.immediateFailedFuture(e)); + } + + return null; + } + } + + /** + * A polling algorithm for waiting for a consistent {@link CheckConsistencyResponse}. Please note + * that this class doesn't handle retryable errors and expects the underlying callable chain to + * handle this. + */ + private static class PollResultAlgorithm implements + ResultRetryAlgorithm { + @Override + public TimedAttemptSettings createNextAttempt(Throwable prevThrowable, + CheckConsistencyResponse prevResponse, TimedAttemptSettings prevSettings) { + return null; + } + + @Override + public boolean shouldRetry(Throwable prevThrowable, CheckConsistencyResponse prevResponse) + throws CancellationException { + return prevResponse != null && !prevResponse.getConsistent(); + } + } +} diff --git a/google-cloud-clients/google-cloud-bigtable-admin/src/main/java/com/google/cloud/bigtable/admin/v2/stub/EnhancedBigtableTableAdminStub.java b/google-cloud-clients/google-cloud-bigtable-admin/src/main/java/com/google/cloud/bigtable/admin/v2/stub/EnhancedBigtableTableAdminStub.java index f4586acf0553..51e09cff3e48 100644 --- a/google-cloud-clients/google-cloud-bigtable-admin/src/main/java/com/google/cloud/bigtable/admin/v2/stub/EnhancedBigtableTableAdminStub.java +++ b/google-cloud-clients/google-cloud-bigtable-admin/src/main/java/com/google/cloud/bigtable/admin/v2/stub/EnhancedBigtableTableAdminStub.java @@ -16,8 +16,12 @@ package com.google.cloud.bigtable.admin.v2.stub; import com.google.api.core.InternalApi; +import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.ClientContext; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.bigtable.admin.v2.TableName; import java.io.IOException; +import org.threeten.bp.Duration; /** * Extension of the autogenerated {@link GrpcBigtableTableAdminStub}. It acts as a decorator to add @@ -31,6 +35,8 @@ public class EnhancedBigtableTableAdminStub extends GrpcBigtableTableAdminStub { private final BigtableTableAdminStubSettings settings; private final ClientContext clientContext; + private final AwaitReplicationCallable awaitReplicationCallable; + public static EnhancedBigtableTableAdminStub createEnhanced( BigtableTableAdminStubSettings settings) throws IOException { @@ -43,5 +49,37 @@ private EnhancedBigtableTableAdminStub(BigtableTableAdminStubSettings settings, this.settings = settings; this.clientContext = clientContext; + this.awaitReplicationCallable = createAwaitReplicationCallable(); + } + + private AwaitReplicationCallable createAwaitReplicationCallable() { + // TODO(igorbernstein2): expose polling settings + RetrySettings pollingSettings = RetrySettings.newBuilder() + // use overall timeout from checkConsistencyCallable + // NOTE: The overall timeout might exceed this value due to underlying retries + .setTotalTimeout(settings.checkConsistencySettings().getRetrySettings().getTotalTimeout()) + // Use constant polling with jitter + .setInitialRetryDelay(Duration.ofSeconds(10)) + .setRetryDelayMultiplier(1.0) + .setMaxRetryDelay(Duration.ofSeconds(10)) + .setJittered(true) + // These rpc timeouts are ignored, instead the rpc timeouts defined for + // generateConsistencyToken and checkConsistency callables will be used. + .setInitialRpcTimeout(Duration.ZERO) + .setMaxRpcTimeout(Duration.ZERO) + .setRpcTimeoutMultiplier(1.0) + .build(); + + return AwaitReplicationCallable.create( + generateConsistencyTokenCallable(), + checkConsistencyCallable(), + clientContext, + pollingSettings + ); + } + + public UnaryCallable awaitReplicationCallable() { + return awaitReplicationCallable; } } + diff --git a/google-cloud-clients/google-cloud-bigtable-admin/src/test/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClientTest.java b/google-cloud-clients/google-cloud-bigtable-admin/src/test/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClientTest.java index 86d52f1cceb3..33b6a8b410d3 100644 --- a/google-cloud-clients/google-cloud-bigtable-admin/src/test/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClientTest.java +++ b/google-cloud-clients/google-cloud-bigtable-admin/src/test/java/com/google/cloud/bigtable/admin/v2/BigtableTableAdminClientTest.java @@ -20,14 +20,10 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.api.gax.rpc.UnaryCallable; -import com.google.bigtable.admin.v2.CheckConsistencyRequest; -import com.google.bigtable.admin.v2.CheckConsistencyResponse; import com.google.bigtable.admin.v2.ColumnFamily; import com.google.bigtable.admin.v2.DeleteTableRequest; import com.google.bigtable.admin.v2.DropRowRangeRequest; import com.google.bigtable.admin.v2.GcRule; -import com.google.bigtable.admin.v2.GenerateConsistencyTokenRequest; -import com.google.bigtable.admin.v2.GenerateConsistencyTokenResponse; import com.google.bigtable.admin.v2.GetTableRequest; import com.google.bigtable.admin.v2.InstanceName; import com.google.bigtable.admin.v2.ListTablesRequest; @@ -35,11 +31,9 @@ import com.google.bigtable.admin.v2.TableName; import com.google.cloud.bigtable.admin.v2.BaseBigtableTableAdminClient.ListTablesPage; import com.google.cloud.bigtable.admin.v2.BaseBigtableTableAdminClient.ListTablesPagedResponse; -import com.google.cloud.bigtable.admin.v2.models.ConsistencyToken; import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest; import com.google.cloud.bigtable.admin.v2.models.ModifyColumnFamiliesRequest; import com.google.cloud.bigtable.admin.v2.models.Table; -import com.google.cloud.bigtable.admin.v2.stub.BigtableTableAdminStub; import com.google.cloud.bigtable.admin.v2.stub.EnhancedBigtableTableAdminStub; import com.google.common.collect.Lists; import com.google.protobuf.ByteString; @@ -78,14 +72,8 @@ public class BigtableTableAdminClientTest { private UnaryCallable mockListTableCallable; @Mock private UnaryCallable mockDropRowRangeCallable; - - @Mock - private UnaryCallable - mockGenerateConsistencyTokenCallable; - @Mock - private UnaryCallable - mockCheckConsistencyCallable; + private UnaryCallable mockAwaitReplicationCallable; @Before public void setUp() { @@ -97,9 +85,7 @@ public void setUp() { Mockito.when(mockStub.getTableCallable()).thenReturn(mockGetTableCallable); Mockito.when(mockStub.listTablesPagedCallable()).thenReturn(mockListTableCallable); Mockito.when(mockStub.dropRowRangeCallable()).thenReturn(mockDropRowRangeCallable); - Mockito.when(mockStub.generateConsistencyTokenCallable()) - .thenReturn(mockGenerateConsistencyTokenCallable); - Mockito.when(mockStub.checkConsistencyCallable()).thenReturn(mockCheckConsistencyCallable); + Mockito.when(mockStub.awaitReplicationCallable()).thenReturn(mockAwaitReplicationCallable); } @Test @@ -295,48 +281,26 @@ public ApiFuture answer(InvocationOnMock invocationOnMock) { } @Test - public void testGenerateConsistencyToken() { - // Setup - GenerateConsistencyTokenRequest expectedRequest = GenerateConsistencyTokenRequest.newBuilder() - .setName(TABLE_NAME.toString()) - .build(); - - GenerateConsistencyTokenResponse expectedResponse = - GenerateConsistencyTokenResponse.newBuilder() - .setConsistencyToken("fakeToken") - .build(); - - Mockito.when(mockGenerateConsistencyTokenCallable.futureCall(expectedRequest)) - .thenReturn(ApiFutures.immediateFuture(expectedResponse)); - - // Execute - ConsistencyToken actualResult = adminClient.generateConsistencyToken(TABLE_NAME.getTable()); - - // Verify - assertThat(actualResult).isEqualTo(ConsistencyToken.of(TABLE_NAME, "fakeToken")); - } - - @Test - public void testCheckConsistencyToken() { + public void testAwaitReplication() { // Setup - CheckConsistencyRequest expectedRequest = CheckConsistencyRequest.newBuilder() - .setName(TABLE_NAME.toString()) - .setConsistencyToken("fakeToken") - .build(); + @SuppressWarnings("UnnecessaryLocalVariable") + TableName expectedRequest = TABLE_NAME; - CheckConsistencyResponse expectedResponse = CheckConsistencyResponse.newBuilder() - .setConsistent(true) - .build(); + final AtomicBoolean wasCalled = new AtomicBoolean(false); - Mockito.when(mockCheckConsistencyCallable.futureCall(expectedRequest)) - .thenReturn(ApiFutures.immediateFuture(expectedResponse)); + Mockito.when(mockAwaitReplicationCallable.futureCall(expectedRequest)) + .thenAnswer(new Answer>() { + @Override + public ApiFuture answer(InvocationOnMock invocationOnMock) throws Throwable { + wasCalled.set(true); + return ApiFutures.immediateFuture(null); + } + }); // Execute - ConsistencyToken actualToken = ConsistencyToken.of(TABLE_NAME, "fakeToken"); - - boolean actualResult = adminClient.isConsistent(actualToken); + adminClient.awaitReplication(TABLE_NAME.getTable()); // Verify - assertThat(actualResult).isTrue(); + assertThat(wasCalled.get()).isTrue(); } } diff --git a/google-cloud-clients/google-cloud-bigtable-admin/src/test/java/com/google/cloud/bigtable/admin/v2/it/BigtableTableAdminClientIT.java b/google-cloud-clients/google-cloud-bigtable-admin/src/test/java/com/google/cloud/bigtable/admin/v2/it/BigtableTableAdminClientIT.java index 63bc060bb1f6..e79120dec693 100644 --- a/google-cloud-clients/google-cloud-bigtable-admin/src/test/java/com/google/cloud/bigtable/admin/v2/it/BigtableTableAdminClientIT.java +++ b/google-cloud-clients/google-cloud-bigtable-admin/src/test/java/com/google/cloud/bigtable/admin/v2/it/BigtableTableAdminClientIT.java @@ -25,13 +25,12 @@ import com.google.bigtable.admin.v2.TableName; import com.google.cloud.bigtable.admin.v2.BigtableTableAdminClient; import com.google.cloud.bigtable.admin.v2.models.ColumnFamily; +import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest; import com.google.cloud.bigtable.admin.v2.models.GCRules.DurationRule; import com.google.cloud.bigtable.admin.v2.models.GCRules.IntersectionRule; import com.google.cloud.bigtable.admin.v2.models.GCRules.UnionRule; import com.google.cloud.bigtable.admin.v2.models.GCRules.VersionRule; -import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest; import com.google.cloud.bigtable.admin.v2.models.ModifyColumnFamiliesRequest; -import com.google.cloud.bigtable.admin.v2.models.ConsistencyToken; import com.google.cloud.bigtable.admin.v2.models.Table; import com.google.common.collect.Maps; import com.google.protobuf.ByteString; @@ -241,15 +240,12 @@ public void dropRowRange() { } @Test - public void checkConsistency() { + public void awaitReplication() { String tableId = "adminConsistencyTest"; try { tableAdmin.createTable(CreateTableRequest.of(tableId)); - ConsistencyToken consistencyToken = tableAdmin.generateConsistencyToken(tableId); - assertNotNull(consistencyToken); - boolean consistent = tableAdmin.isConsistent(consistencyToken); - assertTrue(consistent); + tableAdmin.awaitReplication(tableId); } finally { tableAdmin.deleteTable(tableId); } diff --git a/google-cloud-clients/google-cloud-bigtable-admin/src/test/java/com/google/cloud/bigtable/admin/v2/models/ConsistencyTokenTest.java b/google-cloud-clients/google-cloud-bigtable-admin/src/test/java/com/google/cloud/bigtable/admin/v2/models/ConsistencyTokenTest.java deleted file mode 100644 index b426c822300f..000000000000 --- a/google-cloud-clients/google-cloud-bigtable-admin/src/test/java/com/google/cloud/bigtable/admin/v2/models/ConsistencyTokenTest.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright 2018 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.bigtable.admin.v2.models; - -import static com.google.common.truth.Truth.assertThat; - -import com.google.bigtable.admin.v2.CheckConsistencyRequest; -import com.google.bigtable.admin.v2.InstanceName; -import com.google.bigtable.admin.v2.TableName; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -@RunWith(JUnit4.class) -public class ConsistencyTokenTest { - private static final InstanceName INSTANCE_NAME = InstanceName.of("my-project", "my-instance"); - private static final TableName TABLE_NAME = TableName.of(INSTANCE_NAME.getProject(), INSTANCE_NAME.getInstance(), "my-table"); - private static final String TOKEN_VALUE = "87282hgwd8yg"; - - @Test - public void testToProto() { - ConsistencyToken token = ConsistencyToken.of(TABLE_NAME, TOKEN_VALUE); - - assertThat(token.toProto(INSTANCE_NAME)).isEqualTo( - CheckConsistencyRequest.newBuilder() - .setName(TABLE_NAME.toString()) - .setConsistencyToken(TOKEN_VALUE) - .build() - ); - } - - @Test - public void testInstanceMismatch() { - ConsistencyToken token = ConsistencyToken.of(TABLE_NAME, TOKEN_VALUE); - - InstanceName otherInstanceName = InstanceName.of("my-project", "other-instance"); - - Exception actualError = null; - - try { - token.toProto(otherInstanceName); - } catch (Exception e) { - actualError = e; - } - - assertThat(actualError).isInstanceOf(IllegalArgumentException.class); - } -} diff --git a/google-cloud-clients/google-cloud-bigtable-admin/src/test/java/com/google/cloud/bigtable/admin/v2/stub/AwaitReplicationCallableTest.java b/google-cloud-clients/google-cloud-bigtable-admin/src/test/java/com/google/cloud/bigtable/admin/v2/stub/AwaitReplicationCallableTest.java new file mode 100644 index 000000000000..122cbee91c0a --- /dev/null +++ b/google-cloud-clients/google-cloud-bigtable-admin/src/test/java/com/google/cloud/bigtable/admin/v2/stub/AwaitReplicationCallableTest.java @@ -0,0 +1,244 @@ +/* + * Copyright 2018 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.admin.v2.stub; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; +import com.google.api.gax.retrying.PollException; +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ClientContext; +import com.google.api.gax.rpc.StatusCode.Code; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.api.gax.rpc.testing.FakeApiException; +import com.google.api.gax.rpc.testing.FakeCallContext; +import com.google.bigtable.admin.v2.CheckConsistencyRequest; +import com.google.bigtable.admin.v2.CheckConsistencyResponse; +import com.google.bigtable.admin.v2.GenerateConsistencyTokenRequest; +import com.google.bigtable.admin.v2.GenerateConsistencyTokenResponse; +import com.google.bigtable.admin.v2.TableName; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; +import org.threeten.bp.Duration; + +@RunWith(MockitoJUnitRunner.class) +public class AwaitReplicationCallableTest { + private static final TableName TABLE_NAME = TableName.of("my-project", "my-instance", "my-table"); + private static final ApiCallContext CALL_CONTEXT = FakeCallContext.createDefault(); + @Mock + private UnaryCallable mockGenerateConsistencyTokenCallable; + @Mock + private UnaryCallable mockCheckConsistencyCallable; + + private AwaitReplicationCallable callable; + + @Before + public void setUp() { + ClientContext clientContext = ClientContext.newBuilder() + .setDefaultCallContext(CALL_CONTEXT) + .build(); + + RetrySettings retrySettings = RetrySettings.newBuilder() + .setTotalTimeout(Duration.ofMillis(100)) + // Delay settings: 1 ms const + .setInitialRetryDelay(Duration.ofMillis(1)) + .setMaxRetryDelay(Duration.ofMillis(1)) + .setRetryDelayMultiplier(1.0) + // RPC timeout: ignored const 1 s + .setInitialRpcTimeout(Duration.ofSeconds(1)) + .setMaxRpcTimeout(Duration.ofSeconds(1)) + .setRpcTimeoutMultiplier(1.0) + .build(); + + callable = AwaitReplicationCallable.create( + mockGenerateConsistencyTokenCallable, + mockCheckConsistencyCallable, + clientContext, + retrySettings + ); + } + + @Test + public void testGenerateFailure() throws Exception { + GenerateConsistencyTokenRequest expectedRequest = GenerateConsistencyTokenRequest.newBuilder() + .setName(TABLE_NAME.toString()) + .build(); + FakeApiException fakeError = new FakeApiException("fake", null, Code.INTERNAL, false); + + Mockito.when(mockGenerateConsistencyTokenCallable.futureCall(expectedRequest, CALL_CONTEXT)) + .thenReturn(ApiFutures.immediateFailedFuture(fakeError)); + + ApiFuture future = callable.futureCall(TABLE_NAME, CALL_CONTEXT); + + Throwable actualError = null; + + try { + future.get(); + } catch (ExecutionException e) { + actualError = e.getCause(); + } + + assertThat(actualError).isSameAs(fakeError); + } + + @Test + public void testCheckFailure() throws Exception { + GenerateConsistencyTokenRequest expectedRequest = GenerateConsistencyTokenRequest.newBuilder() + .setName(TABLE_NAME.toString()) + .build(); + GenerateConsistencyTokenResponse expectedResponse = GenerateConsistencyTokenResponse + .newBuilder() + .setConsistencyToken("fake-token") + .build(); + + Mockito.when(mockGenerateConsistencyTokenCallable.futureCall(expectedRequest, CALL_CONTEXT)) + .thenReturn(ApiFutures.immediateFuture(expectedResponse)); + + CheckConsistencyRequest expectedRequest2 = CheckConsistencyRequest.newBuilder() + .setName(TABLE_NAME.toString()) + .setConsistencyToken("fake-token") + .build(); + + FakeApiException expectedError = new FakeApiException("fake", null, Code.INTERNAL, false); + + Mockito.when(mockCheckConsistencyCallable.futureCall(expectedRequest2, CALL_CONTEXT)) + .thenReturn(ApiFutures.immediateFailedFuture(expectedError)); + + ApiFuture future = callable.futureCall(TABLE_NAME, CALL_CONTEXT); + + Throwable actualError = null; + + try { + future.get(); + } catch (ExecutionException e) { + actualError = e.getCause(); + } + + assertThat(actualError).isSameAs(expectedError); + } + + @Test + public void testImmediatelyConsistent() throws Exception { + GenerateConsistencyTokenRequest expectedRequest = GenerateConsistencyTokenRequest.newBuilder() + .setName(TABLE_NAME.toString()) + .build(); + + GenerateConsistencyTokenResponse expectedResponse = GenerateConsistencyTokenResponse + .newBuilder() + .setConsistencyToken("fake-token") + .build(); + + Mockito.when(mockGenerateConsistencyTokenCallable.futureCall(expectedRequest, CALL_CONTEXT)) + .thenReturn(ApiFutures.immediateFuture(expectedResponse)); + + CheckConsistencyRequest expectedRequest2 = CheckConsistencyRequest.newBuilder() + .setName(TABLE_NAME.toString()) + .setConsistencyToken("fake-token") + .build(); + CheckConsistencyResponse expectedResponse2 = CheckConsistencyResponse.newBuilder() + .setConsistent(true) + .build(); + + Mockito.when(mockCheckConsistencyCallable.futureCall(expectedRequest2, CALL_CONTEXT)) + .thenReturn(ApiFutures.immediateFuture(expectedResponse2)); + + ApiFuture consistentFuture = callable.futureCall(TABLE_NAME, CALL_CONTEXT); + + consistentFuture.get(1, TimeUnit.MILLISECONDS); + } + + @Test + public void testPolling() throws Exception { + GenerateConsistencyTokenRequest expectedRequest = GenerateConsistencyTokenRequest.newBuilder() + .setName(TABLE_NAME.toString()) + .build(); + + GenerateConsistencyTokenResponse expectedResponse = GenerateConsistencyTokenResponse + .newBuilder() + .setConsistencyToken("fake-token") + .build(); + + Mockito.when(mockGenerateConsistencyTokenCallable.futureCall(expectedRequest, CALL_CONTEXT)) + .thenReturn(ApiFutures.immediateFuture(expectedResponse)); + + CheckConsistencyRequest expectedRequest2 = CheckConsistencyRequest.newBuilder() + .setName(TABLE_NAME.toString()) + .setConsistencyToken("fake-token") + .build(); + + CheckConsistencyResponse expectedResponse2 = CheckConsistencyResponse.newBuilder() + .setConsistent(false) + .build(); + + CheckConsistencyResponse expectedResponse3 = CheckConsistencyResponse.newBuilder() + .setConsistent(true) + .build(); + + Mockito.when(mockCheckConsistencyCallable.futureCall(expectedRequest2, CALL_CONTEXT)) + .thenReturn(ApiFutures.immediateFuture(expectedResponse2)) + .thenReturn(ApiFutures.immediateFuture(expectedResponse3)); + + ApiFuture consistentFuture = callable.futureCall(TABLE_NAME, CALL_CONTEXT); + + consistentFuture.get(1, TimeUnit.SECONDS); + } + + @Test + public void testPollingTimeout() throws Exception { + GenerateConsistencyTokenRequest expectedRequest = GenerateConsistencyTokenRequest.newBuilder() + .setName(TABLE_NAME.toString()) + .build(); + + GenerateConsistencyTokenResponse expectedResponse = GenerateConsistencyTokenResponse + .newBuilder() + .setConsistencyToken("fake-token") + .build(); + + Mockito.when(mockGenerateConsistencyTokenCallable.futureCall(expectedRequest, CALL_CONTEXT)) + .thenReturn(ApiFutures.immediateFuture(expectedResponse)); + + CheckConsistencyRequest expectedRequest2 = CheckConsistencyRequest.newBuilder() + .setName(TABLE_NAME.toString()) + .setConsistencyToken("fake-token") + .build(); + + CheckConsistencyResponse expectedResponse2 = CheckConsistencyResponse.newBuilder() + .setConsistent(false) + .build(); + + Mockito.when(mockCheckConsistencyCallable.futureCall(expectedRequest2, CALL_CONTEXT)) + .thenReturn(ApiFutures.immediateFuture(expectedResponse2)); + + ApiFuture consistentFuture = callable.futureCall(TABLE_NAME, CALL_CONTEXT); + + Throwable actualError = null; + try { + consistentFuture.get(1, TimeUnit.SECONDS); + } catch (ExecutionException e) { + actualError = e.getCause(); + } + + assertThat(actualError).isInstanceOf(PollException.class); + } +} \ No newline at end of file