Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for awaiting Data Boost #2329

Merged
merged 27 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1617301
Introduce ConsistencyParams model
djyau Aug 30, 2024
54e6540
Create AwaitConsistencyCallable, a delegate for AwaitReplicationCallable
djyau Sep 4, 2024
d1b4320
Address some PR comments
djyau Sep 4, 2024
febfb02
Remove unused imports from AwaitReplicationCallable
djyau Sep 4, 2024
84a2422
Plumb the Consistency callable through to some places, add some tests
djyau Sep 6, 2024
0f1faec
Add integration test
djyau Sep 6, 2024
20f7809
Rework the ConsistencyRequest model, plumb through RequestContext to …
djyau Sep 6, 2024
2c47294
Fix imports
djyau Sep 6, 2024
ce0a439
Fix more imports, fix some tests
djyau Sep 6, 2024
0327038
Rename some things
djyau Sep 6, 2024
c7fbb27
Add tests for ConsistencyRequest model
djyau Sep 6, 2024
c8ff829
Add newline
djyau Sep 6, 2024
f57b84d
Fix broken test
djyau Sep 6, 2024
86f6d8a
Make request context a final variable in test
djyau Sep 6, 2024
916535f
Get test working using correct expectations
djyau Sep 6, 2024
119ba23
Add a couple of tests for AwaitReplicationCallable
djyau Sep 6, 2024
c16491c
Use RequestContextNoAP class
djyau Sep 9, 2024
09a230f
Make ConsistencyRequest model an AutoValue
djyau Sep 9, 2024
4aa93f2
Fix license year, fix some formatting
djyau Sep 9, 2024
1da2dc7
Run auto formatter
djyau Sep 9, 2024
beadfb0
Rename new RequestContext to TableAdminRequestContext, re run auto fo…
djyau Sep 9, 2024
f774895
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 9, 2024
e263edd
Add license header to ConsistencyRequestTest
djyau Sep 10, 2024
5dbc61a
Merge branch 'feature-await-data-boost-2' of https://github.com/djyau…
djyau Sep 10, 2024
f628eb6
Add EnhancedBigtableTableAdminStub to clirr-ignored-differences
djyau Sep 11, 2024
3614dca
Fix IT tests, skip data boost one for now until we run it concurrently
djyau Sep 12, 2024
1240b3c
Run autoformatter
djyau Sep 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package com.google.cloud.bigtable.admin.v2.models;

import com.google.api.core.InternalApi;
import com.google.bigtable.admin.v2.CheckConsistencyRequest;
import com.google.bigtable.admin.v2.TableName;
import com.google.common.base.Preconditions;

import javax.annotation.Nonnull;

public class ConsistencyParams {
djyau marked this conversation as resolved.
Show resolved Hide resolved
public enum ConsistencyMode {
djyau marked this conversation as resolved.
Show resolved Hide resolved
/**
* Checks that reads using an app profile with `StandardIsolation` can
* see all writes committed before the token was created, even if the
* read and write target different clusters.
*/
STANDARD(CheckConsistencyRequest.ModeCase.STANDARD_READ_REMOTE_WRITES),

/**
* Checks that reads using an app profile with `DataBoostIsolationReadOnly`
* can see all writes committed before the token was created, but only if
* the read and write target the same cluster.
*/
DATA_BOOST(CheckConsistencyRequest.ModeCase.DATA_BOOST_READ_LOCAL_WRITES);

@Nonnull
private final CheckConsistencyRequest.ModeCase proto;

ConsistencyMode(CheckConsistencyRequest.ModeCase proto) {
this.proto = proto;
}

/**
* Wraps the protobuf. This method is considered an internal implementation detail and not meant
* to be used by applications.
*/
@InternalApi
public static ConsistencyParams.ConsistencyMode fromProto(CheckConsistencyRequest.ModeCase proto) {
djyau marked this conversation as resolved.
Show resolved Hide resolved
for (ConsistencyParams.ConsistencyMode mode : values()) {
if (mode.proto.equals(proto)) {
return mode;
}
}
throw new IllegalArgumentException("Unknown consistency mode: " + proto);
}

/**
* Creates the request protobuf. This method is considered an internal implementation detail and
* not meant to be used by applications.
*/
@InternalApi
public CheckConsistencyRequest.ModeCase toProto() {
return proto;
}
}

private final TableName tableName;

private final ConsistencyMode consistencyMode;

ConsistencyParams(TableName tableName, ConsistencyMode mode) {
Preconditions.checkNotNull(tableName);
Preconditions.checkNotNull(mode);
this.tableName = tableName;
this.consistencyMode = mode;
}

public static ConsistencyParams of(TableName tableName, ConsistencyMode mode) {
djyau marked this conversation as resolved.
Show resolved Hide resolved
djyau marked this conversation as resolved.
Show resolved Hide resolved
return new ConsistencyParams(tableName, mode);
}

public TableName getTableName() {
return tableName;
}

public ConsistencyMode getConsistencyMode() {
return consistencyMode;
}
djyau marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Copyright 2018 Google LLC
djyau marked this conversation as resolved.
Show resolved Hide resolved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.admin.v2.stub;

import com.google.api.core.ApiAsyncFunction;
import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.retrying.ExponentialPollAlgorithm;
import com.google.api.gax.retrying.NonCancellableFuture;
import com.google.api.gax.retrying.ResultRetryAlgorithm;
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.RetryingExecutor;
import com.google.api.gax.retrying.RetryingFuture;
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
import com.google.api.gax.retrying.TimedAttemptSettings;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.bigtable.admin.v2.CheckConsistencyRequest;
import com.google.bigtable.admin.v2.CheckConsistencyResponse;
import com.google.bigtable.admin.v2.GenerateConsistencyTokenRequest;
import com.google.bigtable.admin.v2.GenerateConsistencyTokenResponse;
import com.google.bigtable.admin.v2.TableName;
import com.google.cloud.bigtable.admin.v2.models.ConsistencyParams;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;

/**
* Callable that waits until either replication or Data Boost has caught up to the point it was called.
*
* <p>This callable wraps GenerateConsistencyToken and CheckConsistency RPCs. It will generate a
* token then poll until isConsistent is true.
*/
class AwaitConsistencyCallable extends UnaryCallable<ConsistencyParams, Void> {
private final UnaryCallable<GenerateConsistencyTokenRequest, GenerateConsistencyTokenResponse>
generateCallable;
private final UnaryCallable<CheckConsistencyRequest, CheckConsistencyResponse> checkCallable;
private final RetryingExecutor<CheckConsistencyResponse> executor;

static AwaitConsistencyCallable create(
UnaryCallable<GenerateConsistencyTokenRequest, GenerateConsistencyTokenResponse>
generateCallable,
UnaryCallable<CheckConsistencyRequest, CheckConsistencyResponse> checkCallable,
ClientContext clientContext,
RetrySettings pollingSettings) {

RetryAlgorithm<CheckConsistencyResponse> retryAlgorithm =
new RetryAlgorithm<>(
new PollResultAlgorithm(),
new ExponentialPollAlgorithm(pollingSettings, clientContext.getClock()));

RetryingExecutor<CheckConsistencyResponse> retryingExecutor =
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());

return new AwaitConsistencyCallable(generateCallable, checkCallable, retryingExecutor);
}

@VisibleForTesting
AwaitConsistencyCallable(
UnaryCallable<GenerateConsistencyTokenRequest, GenerateConsistencyTokenResponse>
generateCallable,
UnaryCallable<CheckConsistencyRequest, CheckConsistencyResponse> checkCallable,
RetryingExecutor<CheckConsistencyResponse> executor) {
this.generateCallable = generateCallable;
this.checkCallable = checkCallable;
this.executor = executor;
}

@Override
public ApiFuture<Void> futureCall(final ConsistencyParams consistencyParams, final ApiCallContext context) {
TableName tableName = consistencyParams.getTableName();
ApiFuture<GenerateConsistencyTokenResponse> tokenFuture = generateToken(tableName, context);

return ApiFutures.transformAsync(
tokenFuture,
new ApiAsyncFunction<GenerateConsistencyTokenResponse, Void>() {
@Override
public ApiFuture<Void> apply(GenerateConsistencyTokenResponse input) {
CheckConsistencyRequest request =
CheckConsistencyRequest.newBuilder()
.setName(tableName.toString())
.setConsistencyToken(input.getConsistencyToken())
.build();

return pollToken(request, context);
}
},
MoreExecutors.directExecutor());
}

private ApiFuture<GenerateConsistencyTokenResponse> generateToken(
TableName tableName, ApiCallContext context) {
GenerateConsistencyTokenRequest generateRequest =
GenerateConsistencyTokenRequest.newBuilder().setName(tableName.toString()).build();
djyau marked this conversation as resolved.
Show resolved Hide resolved
return generateCallable.futureCall(generateRequest, context);
}

private ApiFuture<Void> pollToken(CheckConsistencyRequest request, ApiCallContext context) {
AttemptCallable<CheckConsistencyRequest, CheckConsistencyResponse> attemptCallable =
new AttemptCallable<>(checkCallable, request, context);
RetryingFuture<CheckConsistencyResponse> retryingFuture =
executor.createFuture(attemptCallable);
attemptCallable.setExternalFuture(retryingFuture);
attemptCallable.call();

return ApiFutures.transform(
retryingFuture,
new ApiFunction<CheckConsistencyResponse, Void>() {
@Override
public Void apply(CheckConsistencyResponse input) {
return null;
}
},
MoreExecutors.directExecutor());
}

/** A callable representing an attempt to make an RPC call. */
private static class AttemptCallable<RequestT, ResponseT> implements Callable<ResponseT> {
private final UnaryCallable<RequestT, ResponseT> callable;
private final RequestT request;

private volatile RetryingFuture<ResponseT> externalFuture;
private volatile ApiCallContext callContext;

AttemptCallable(
UnaryCallable<RequestT, ResponseT> callable, RequestT request, ApiCallContext callContext) {
this.callable = callable;
this.request = request;
this.callContext = callContext;
}

void setExternalFuture(RetryingFuture<ResponseT> externalFuture) {
this.externalFuture = externalFuture;
}

@Override
public ResponseT call() {
try {
// NOTE: unlike gax's AttemptCallable, this ignores rpc timeouts
externalFuture.setAttemptFuture(new NonCancellableFuture<ResponseT>());
if (externalFuture.isDone()) {
return null;
}
ApiFuture<ResponseT> internalFuture = callable.futureCall(request, callContext);
externalFuture.setAttemptFuture(internalFuture);
} catch (Throwable e) {
externalFuture.setAttemptFuture(ApiFutures.<ResponseT>immediateFailedFuture(e));
}

return null;
}
}

/**
* A polling algorithm for waiting for a consistent {@link CheckConsistencyResponse}. Please note
* that this class doesn't handle retryable errors and expects the underlying callable chain to
* handle this.
*/
private static class PollResultAlgorithm
implements ResultRetryAlgorithm<CheckConsistencyResponse> {
@Override
public TimedAttemptSettings createNextAttempt(
Throwable prevThrowable,
CheckConsistencyResponse prevResponse,
TimedAttemptSettings prevSettings) {
return null;
}

@Override
public boolean shouldRetry(Throwable prevThrowable, CheckConsistencyResponse prevResponse)
throws CancellationException {
return prevResponse != null && !prevResponse.getConsistent();
}
}
}
Loading