diff --git a/README.md b/README.md index 76902f4994..7d31ddc8b0 100644 --- a/README.md +++ b/README.md @@ -56,13 +56,13 @@ implementation 'com.google.cloud:google-cloud-bigtable' If you are using Gradle without BOM, add this to your dependencies: ```Groovy -implementation 'com.google.cloud:google-cloud-bigtable:2.15.1' +implementation 'com.google.cloud:google-cloud-bigtable:2.16.0' ``` If you are using SBT, add this to your dependencies: ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-bigtable" % "2.15.1" +libraryDependencies += "com.google.cloud" % "google-cloud-bigtable" % "2.16.0" ``` ## Authentication diff --git a/test-proxy/EnableAutoValue.txt b/test-proxy/EnableAutoValue.txt new file mode 100644 index 0000000000..e8237e1cbc --- /dev/null +++ b/test-proxy/EnableAutoValue.txt @@ -0,0 +1,2 @@ +This is a marker file to trigger auto-value injection into the annotation processor path +https://github.com/googleapis/java-shared-config/blob/51c9f68ff1736761b21c921f078ab2c8675ff268/pom.xml#L758 \ No newline at end of file diff --git a/test-proxy/README.md b/test-proxy/README.md new file mode 100644 index 0000000000..7346f2132b --- /dev/null +++ b/test-proxy/README.md @@ -0,0 +1,46 @@ +# CBT Java Test Proxy + +The CBT test proxy is intended for running confromance tests for Cloug Bigtable Java Client. + +## Set up + +If you have not already done so, [install golang](https://go.dev/doc/install), then clone the go test library: + +``` +git clone https://github.com/googleapis/cloud-bigtable-clients-test.git +``` + +## Start test proxy + +Build the proxy with the latest version of the client + +``` +cd java-bigtable/test-proxy +mvn clean install +``` + +Start the proxy on default port 9999 + +``` +mvn exec:java -Dexec.mainClass=com.google.cloud.bigtable.testproxy.CbtTestProxyMain +``` + +Start the proxy on a different port + +``` +mvn exec:java -Dexec.mainClass=com.google.cloud.bigtable.testproxy.CbtTestProxyMain -Dport=1 +``` + +Build and start the proxy with an older version of the client + +``` +mvn clean install -Dbigtable.client.version= -Denforcer.skip +mvn exec:java -Dexec.mainClass=com.google.cloud.bigtable.testproxy.CbtTestProxyMain +``` + +## Run the test cases + +``` +cd cloud-bigtable-clients-test/tests +go test -v -proxy_addr=:9999 +``` diff --git a/test-proxy/pom.xml b/test-proxy/pom.xml new file mode 100644 index 0000000000..93cd8e2e31 --- /dev/null +++ b/test-proxy/pom.xml @@ -0,0 +1,140 @@ + + 4.0.0 + com.google.cloud + google-cloud-bigtable-test-proxy + 0.0.1-SNAPSHOT + jar + Google Cloud Bigtable Test Proxy + https://github.com/googleapis/java-bigtable + Cloud Bigtable Java Client test proxy for running conformance tests. + + + google-cloud-bigtable-parent + com.google.cloud + 2.16.1-SNAPSHOT + + + + 2.16.1-SNAPSHOT + + + + + + com.google.cloud + google-cloud-bigtable-bom + ${bigtable.client.version} + pom + import + + + com.google.cloud + google-cloud-bigtable-deps-bom + ${bigtable.client.version} + pom + import + + + + + + + com.google.cloud + google-cloud-bigtable + + + io.grpc + grpc-netty + + + io.grpc + grpc-stub + + + com.google.protobuf + protobuf-java + + + + + + + kr.motd.maven + os-maven-plugin + 1.6.2 + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.6.1 + + com.google.protobuf:protoc:3.9.0:exe:${os.detected.classifier} + grpc-java + io.grpc:protoc-gen-grpc-java:1.24.0:exe:${os.detected.classifier} + + + + + compile + compile-custom + + + + + + + org.apache.maven.plugins + maven-deploy-plugin + + true + + + + org.sonatype.plugins + nexus-staging-maven-plugin + + true + + + + org.apache.maven.plugins + maven-site-plugin + + true + + + + org.apache.maven.plugins + maven-source-plugin + + true + + + + org.apache.maven.plugins + maven-javadoc-plugin + + true + + + + org.apache.maven.plugins + maven-gpg-plugin + + true + + + + org.codehaus.mojo + clirr-maven-plugin + + true + + + + + + diff --git a/test-proxy/src/main/java/com/google/cloud/bigtable/testproxy/CbtTestProxy.java b/test-proxy/src/main/java/com/google/cloud/bigtable/testproxy/CbtTestProxy.java new file mode 100644 index 0000000000..5119eae41f --- /dev/null +++ b/test-proxy/src/main/java/com/google/cloud/bigtable/testproxy/CbtTestProxy.java @@ -0,0 +1,753 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.testproxy; + +import static com.google.cloud.bigtable.data.v2.models.Filters.FILTERS; +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.api.core.ApiFunction; +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.FixedCredentialsProvider; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.ServerStream; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.auto.value.AutoValue; +import com.google.bigtable.v2.Column; +import com.google.bigtable.v2.Family; +import com.google.bigtable.v2.Row; +import com.google.cloud.bigtable.data.v2.BigtableDataClient; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.cloud.bigtable.data.v2.models.BulkMutation; +import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation; +import com.google.cloud.bigtable.data.v2.models.KeyOffset; +import com.google.cloud.bigtable.data.v2.models.MutateRowsException; +import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow; +import com.google.cloud.bigtable.data.v2.models.RowCell; +import com.google.cloud.bigtable.data.v2.models.RowMutation; +import com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings; +import com.google.cloud.bigtable.testproxy.CloudBigtableV2TestProxyGrpc.CloudBigtableV2TestProxyImplBase; +import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; +import com.google.protobuf.util.Durations; +import com.google.rpc.Code; +import io.grpc.ManagedChannelBuilder; +import io.grpc.Status; +import io.grpc.StatusException; +import io.grpc.netty.GrpcSslContexts; +import io.grpc.netty.NettyChannelBuilder; +import io.grpc.stub.StreamObserver; +import io.netty.handler.ssl.SslContext; +import java.io.ByteArrayInputStream; +import java.io.Closeable; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.logging.Logger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.threeten.bp.Duration; + +/** Java implementation of the CBT test proxy. Used to test the Java CBT client. */ +public class CbtTestProxy extends CloudBigtableV2TestProxyImplBase implements Closeable { + + /** + * Class that holds BigtableDataSettings and a BigtableDataClient created with those settings. + * Used so users can retrieve settings for a particular client. + */ + @AutoValue + abstract static class CbtClient { + static CbtClient create(BigtableDataSettings settings, BigtableDataClient dataClient) { + return new AutoValue_CbtTestProxy_CbtClient(settings, dataClient); + } + + abstract BigtableDataSettings settings(); + + abstract BigtableDataClient dataClient(); + } + + private static final Logger logger = Logger.getLogger(CbtTestProxy.class.getName()); + + private CbtTestProxy( + boolean encrypted, + @Nullable String rootCerts, + @Nullable String sslTarget, + @Nullable String credential) { + this.encrypted = encrypted; + this.rootCerts = rootCerts; + this.sslTarget = sslTarget; + this.credential = credential; + this.idClientMap = new ConcurrentHashMap<>(); + } + + /** + * Factory method to return a proxy instance that interacts with server unencrypted and + * unauthenticated. + */ + public static CbtTestProxy createUnencrypted() { + return new CbtTestProxy(false, null, null, null); + } + + /** + * Factory method to return a proxy instance that interacts with server encrypted. Default + * authority and public certificates are used if null values are passed in. + * + * @param rootCertsPemPath The path to a root certificate PEM file + * @param sslTarget The override of SSL target name + * @param credentialJsonPath The path to a credential JSON file + */ + public static CbtTestProxy createEncrypted( + @Nullable String rootCertsPemPath, + @Nullable String sslTarget, + @Nullable String credentialJsonPath) + throws IOException { + String tmpRootCerts = null, tmpCredential = null; + if (rootCertsPemPath != null) { + Path file = Paths.get(rootCertsPemPath); + tmpRootCerts = new String(Files.readAllBytes(file), UTF_8); + } + if (credentialJsonPath != null) { + Path file = Paths.get(credentialJsonPath); + tmpCredential = new String(Files.readAllBytes(file), UTF_8); + } + + return new CbtTestProxy(true, tmpRootCerts, sslTarget, tmpCredential); + } + + /** + * Helper method to override the timeout settings of data APIs. TODO(developer): per-attempt + * timeout may also be overridden, which will involve test harness update. + * + * @param settingsBuilder The Builder object of BigtableDataSettings. + * @param newTimeout The value that is used to set the timeout. + */ + private static BigtableDataSettings.Builder overrideTimeoutSetting( + Duration newTimeout, BigtableDataSettings.Builder settingsBuilder) { + // TODO(developer): remove the initialRpcTimeout update below by updating the client library. + Duration initialRpcTimeout = + settingsBuilder + .stubSettings() + .bulkMutateRowsSettings() + .getRetrySettings() + .getInitialRpcTimeout(); + if (initialRpcTimeout.compareTo(newTimeout) > 0) { + // Total timeout is smaller than initialRpcTimeout, which will cause deadline-related problem. + initialRpcTimeout = newTimeout; + } + settingsBuilder + .stubSettings() + .bulkMutateRowsSettings() + .retrySettings() + .setTotalTimeout(newTimeout) + .setInitialRpcTimeout(initialRpcTimeout); + + settingsBuilder.stubSettings().mutateRowSettings().retrySettings().setTotalTimeout(newTimeout); + + settingsBuilder.stubSettings().readRowSettings().retrySettings().setTotalTimeout(newTimeout); + + settingsBuilder.stubSettings().readRowsSettings().retrySettings().setTotalTimeout(newTimeout); + + settingsBuilder + .stubSettings() + .sampleRowKeysSettings() + .retrySettings() + .setTotalTimeout(newTimeout); + + settingsBuilder + .stubSettings() + .checkAndMutateRowSettings() + .retrySettings() + .setTotalTimeout(newTimeout); + + settingsBuilder + .stubSettings() + .readModifyWriteRowSettings() + .retrySettings() + .setTotalTimeout(newTimeout); + + return settingsBuilder; + } + + /** Helper method to get a client object by its id. */ + private CbtClient getClient(String id) throws StatusException { + CbtClient client = idClientMap.get(id); + if (client == null) { + throw Status.NOT_FOUND.withDescription("Client " + id + " not found.").asException(); + } + return client; + } + + @Override + public synchronized void createClient( + CreateClientRequest request, StreamObserver responseObserver) { + Preconditions.checkArgument(!request.getClientId().isEmpty(), "client id must be provided"); + Preconditions.checkArgument(!request.getProjectId().isEmpty(), "project id must be provided"); + Preconditions.checkArgument(!request.getInstanceId().isEmpty(), "instance id must be provided"); + Preconditions.checkArgument(!request.getDataTarget().isEmpty(), "data target must be provided"); + + if (idClientMap.contains(request.getClientId())) { + responseObserver.onError( + Status.ALREADY_EXISTS + .withDescription("Client " + request.getClientId() + " already exists.") + .asException()); + return; + } + + BigtableDataSettings.Builder settingsBuilder = BigtableDataSettings.newBuilder(); + if (request.hasPerOperationTimeout()) { + Duration newTimeout = Duration.ofMillis(Durations.toMillis(request.getPerOperationTimeout())); + settingsBuilder = overrideTimeoutSetting(newTimeout, settingsBuilder); + logger.info( + String.format( + "Total timeout is set to %s for all the methods", + Durations.toString(request.getPerOperationTimeout()))); + } + + // Create and store CbtClient for later use + try { + settingsBuilder + .setProjectId(request.getProjectId()) + .setInstanceId(request.getInstanceId()) + .stubSettings() + .setEndpoint(request.getDataTarget()) + .setTransportChannelProvider(getTransportChannel()) + .setCredentialsProvider(getCredentialsProvider()); + BigtableDataSettings settings = settingsBuilder.build(); + BigtableDataClient client = BigtableDataClient.create(settings); + CbtClient cbtClient = CbtClient.create(settings, client); + idClientMap.put(request.getClientId(), cbtClient); + } catch (IOException e) { + responseObserver.onError(Status.INTERNAL.withDescription(e.getMessage()).asException()); + return; + } + + responseObserver.onNext(CreateClientResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + + @Override + public void closeClient( + CloseClientRequest request, StreamObserver responseObserver) { + CbtClient client; + try { + client = getClient(request.getClientId()); + } catch (StatusException e) { + responseObserver.onError(e); + return; + } + + client.dataClient().close(); + + responseObserver.onNext(CloseClientResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + + @Override + public void removeClient( + RemoveClientRequest request, StreamObserver responseObserver) { + CbtClient client = idClientMap.remove(request.getClientId()); + if (client == null) { + responseObserver.onError( + Status.NOT_FOUND + .withDescription("Client " + request.getClientId() + " not found.") + .asException()); + return; + } + + responseObserver.onNext(RemoveClientResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + + @Override + public void mutateRow( + MutateRowRequest request, StreamObserver responseObserver) { + CbtClient client; + try { + client = getClient(request.getClientId()); + } catch (StatusException e) { + responseObserver.onError(e); + return; + } + + // TODO(developer): evaluate if we want to manually unpack the proto into a model, instead of + // using fromProto. Same for the other methods. + RowMutation mutation = RowMutation.fromProto(request.getRequest()); + try { + // This response is empty. + client.dataClient().mutateRow(mutation); + } catch (ApiException e) { + responseObserver.onNext( + MutateRowResult.newBuilder() + .setStatus( + com.google.rpc.Status.newBuilder() + .setCode(e.getStatusCode().getCode().ordinal()) + .setMessage(e.getMessage()) + .build()) + .build()); + responseObserver.onCompleted(); + return; + } + + responseObserver.onNext( + MutateRowResult.newBuilder().setStatus(com.google.rpc.Status.getDefaultInstance()).build()); + responseObserver.onCompleted(); + } + + @Override + public void bulkMutateRows( + MutateRowsRequest request, StreamObserver responseObserver) { + CbtClient client; + try { + client = getClient(request.getClientId()); + } catch (StatusException e) { + responseObserver.onError(e); + return; + } + + BulkMutation batch = BulkMutation.fromProto(request.getRequest()); + try { + client.dataClient().bulkMutateRows(batch); + } catch (MutateRowsException e) { + MutateRowsResult.Builder resultBuilder = MutateRowsResult.newBuilder(); + for (MutateRowsException.FailedMutation failed : e.getFailedMutations()) { + resultBuilder + .addEntryBuilder() + .setIndex(failed.getIndex()) + .setStatus( + com.google.rpc.Status.newBuilder() + .setCode(failed.getError().getStatusCode().getCode().ordinal()) + .setMessage(failed.getError().getMessage()) + .build()); + } + responseObserver.onNext( + resultBuilder.setStatus(com.google.rpc.Status.getDefaultInstance()).build()); + responseObserver.onCompleted(); + return; + } catch (ApiException e) { + responseObserver.onNext( + MutateRowsResult.newBuilder() + .setStatus( + com.google.rpc.Status.newBuilder() + .setCode(e.getStatusCode().getCode().ordinal()) + .setMessage(e.getMessage()) + .build()) + .build()); + responseObserver.onCompleted(); + return; + } + + responseObserver.onNext( + MutateRowsResult.newBuilder() + .setStatus(com.google.rpc.Status.getDefaultInstance()) + .build()); + responseObserver.onCompleted(); + } + + @Override + public void readRow(ReadRowRequest request, StreamObserver responseObserver) { + CbtClient client; + try { + client = getClient(request.getClientId()); + } catch (StatusException e) { + responseObserver.onError(e); + return; + } + + String tableId; + try { + tableId = extractTableIdFromTableName(request.getTableName()); + } catch (IllegalArgumentException e) { + responseObserver.onError( + Status.INVALID_ARGUMENT.withDescription(e.getMessage()).asException()); + return; + } + + com.google.cloud.bigtable.data.v2.models.Row row; + try { + row = + client + .dataClient() + .readRow(tableId, request.getRowKey(), FILTERS.fromProto(request.getFilter())); + } catch (ApiException e) { + responseObserver.onNext( + RowResult.newBuilder() + .setStatus( + com.google.rpc.Status.newBuilder() + .setCode(e.getStatusCode().getCode().ordinal()) + .setMessage(e.getMessage()) + .build()) + .build()); + responseObserver.onCompleted(); + return; + } + + if (row != null) { + try { + RowResult.Builder resultBuilder = convertRowResult(row); + responseObserver.onNext( + resultBuilder.setStatus(com.google.rpc.Status.getDefaultInstance()).build()); + } catch (RuntimeException e) { + // If client encounters problem, don't return any row result. + responseObserver.onNext( + RowResult.newBuilder() + .setStatus( + com.google.rpc.Status.newBuilder() + .setCode(Code.INTERNAL.getNumber()) + .setMessage(e.getMessage()) + .build()) + .build()); + responseObserver.onCompleted(); + return; + } + } else { + logger.info(String.format("readRow() did not find row: %s", request.getRowKey())); + responseObserver.onNext( + RowResult.newBuilder().setStatus(com.google.rpc.Status.getDefaultInstance()).build()); + } + responseObserver.onCompleted(); + } + + @Override + public void readRows(ReadRowsRequest request, StreamObserver responseObserver) { + CbtClient client; + try { + client = getClient(request.getClientId()); + } catch (StatusException e) { + responseObserver.onError(e); + return; + } + + ServerStream rows; + Query query = Query.fromProto(request.getRequest()); + try { + rows = client.dataClient().readRows(query); + } catch (ApiException e) { + responseObserver.onNext( + RowsResult.newBuilder() + .setStatus( + com.google.rpc.Status.newBuilder() + .setCode(e.getStatusCode().getCode().ordinal()) + .setMessage(e.getMessage()) + .build()) + .build()); + responseObserver.onCompleted(); + return; + } + + int cancelAfterRows = request.getCancelAfterRows(); + try { + RowsResult.Builder resultBuilder = convertRowsResult(rows, cancelAfterRows); + responseObserver.onNext( + resultBuilder.setStatus(com.google.rpc.Status.getDefaultInstance()).build()); + } catch (RuntimeException e) { + // If client encounters problem, don't return any row result. + responseObserver.onNext( + RowsResult.newBuilder() + .setStatus( + com.google.rpc.Status.newBuilder() + .setCode(Code.INTERNAL.getNumber()) + .setMessage(e.getMessage()) + .build()) + .build()); + responseObserver.onCompleted(); + return; + } + + responseObserver.onCompleted(); + } + + /** + * Helper method to convert row from type com.google.cloud.bigtable.data.v2.models.Row to type + * com.google.bigtable.v2.Row. After conversion, row cells within the same column and family are + * grouped and ordered; but the ordering of family and qualifier is not preserved. + * + * @param row Logical row of type com.google.cloud.bigtable.data.v2.models.Row + * @return the converted row in RowResult Builder + */ + private static RowResult.Builder convertRowResult( + com.google.cloud.bigtable.data.v2.models.Row row) { + Row.Builder rowBuilder = Row.newBuilder(); + rowBuilder.setKey(row.getKey()); + + Map>> grouped = + row.getCells().stream() + .collect( + Collectors.groupingBy( + RowCell::getFamily, Collectors.groupingBy(RowCell::getQualifier))); + for (Map.Entry>> e : grouped.entrySet()) { + Family.Builder family = rowBuilder.addFamiliesBuilder().setName(e.getKey()); + + for (Map.Entry> e2 : e.getValue().entrySet()) { + Column.Builder column = family.addColumnsBuilder().setQualifier(e2.getKey()); + + for (RowCell rowCell : e2.getValue()) { + column + .addCellsBuilder() + .setTimestampMicros(rowCell.getTimestamp()) + .setValue(rowCell.getValue()) + .addAllLabels(rowCell.getLabels()); + } + } + } + + RowResult.Builder resultBuilder = RowResult.newBuilder(); + resultBuilder.setRow(rowBuilder.build()); + return resultBuilder; + } + + /** + * Helper method to convert rows from type com.google.cloud.bigtable.data.v2.models.Row to type + * com.google.bigtable.v2.Row. Row order is preserved. + * + * @param rows Logical rows in ServerStream + * @param cancelAfterRows Ignore the results after this row if set positive + * @return the converted rows in RowsResult Builder + */ + private static RowsResult.Builder convertRowsResult( + ServerStream rows, int cancelAfterRows) { + RowsResult.Builder resultBuilder = RowsResult.newBuilder(); + int rowCounter = 0; + for (com.google.cloud.bigtable.data.v2.models.Row row : rows) { + rowCounter++; + RowResult.Builder rowResultBuilder = convertRowResult(row); + resultBuilder.addRow(rowResultBuilder.getRow()); + + if (cancelAfterRows > 0 && rowCounter >= cancelAfterRows) { + logger.info( + String.format("Canceling ReadRows() to respect cancel_after_rows=%d", cancelAfterRows)); + break; + } + } + return resultBuilder; + } + + @Override + public void sampleRowKeys( + SampleRowKeysRequest request, StreamObserver responseObserver) { + CbtClient client; + try { + client = getClient(request.getClientId()); + } catch (StatusException e) { + responseObserver.onError(e); + return; + } + + String tableId; + try { + tableId = extractTableIdFromTableName(request.getRequest().getTableName()); + } catch (IllegalArgumentException e) { + responseObserver.onError( + Status.INVALID_ARGUMENT.withDescription(e.getMessage()).asException()); + return; + } + + List keyOffsets; + try { + keyOffsets = client.dataClient().sampleRowKeys(tableId); + } catch (ApiException e) { + responseObserver.onNext( + SampleRowKeysResult.newBuilder() + .setStatus( + com.google.rpc.Status.newBuilder() + .setCode(e.getStatusCode().getCode().ordinal()) + .setMessage(e.getMessage()) + .build()) + .build()); + responseObserver.onCompleted(); + return; + } + + SampleRowKeysResult.Builder resultBuilder = SampleRowKeysResult.newBuilder(); + for (KeyOffset keyOffset : keyOffsets) { + resultBuilder + .addSampleBuilder() + .setRowKey(keyOffset.getKey()) + .setOffsetBytes(keyOffset.getOffsetBytes()); + } + responseObserver.onNext( + resultBuilder.setStatus(com.google.rpc.Status.getDefaultInstance()).build()); + responseObserver.onCompleted(); + } + + @Override + public void checkAndMutateRow( + CheckAndMutateRowRequest request, StreamObserver responseObserver) { + CbtClient client; + try { + client = getClient(request.getClientId()); + } catch (StatusException e) { + responseObserver.onError(e); + return; + } + + ConditionalRowMutation mutation = ConditionalRowMutation.fromProto(request.getRequest()); + Boolean matched; + try { + matched = client.dataClient().checkAndMutateRow(mutation); + } catch (ApiException e) { + responseObserver.onNext( + CheckAndMutateRowResult.newBuilder() + .setStatus( + com.google.rpc.Status.newBuilder() + .setCode(e.getStatusCode().getCode().ordinal()) + .setMessage(e.getMessage()) + .build()) + .build()); + responseObserver.onCompleted(); + return; + } + + CheckAndMutateRowResult.Builder resultBuilder = CheckAndMutateRowResult.newBuilder(); + resultBuilder.getResultBuilder().setPredicateMatched(matched); + responseObserver.onNext( + resultBuilder.setStatus(com.google.rpc.Status.getDefaultInstance()).build()); + responseObserver.onCompleted(); + } + + @Override + public void readModifyWriteRow( + ReadModifyWriteRowRequest request, StreamObserver responseObserver) { + CbtClient client; + try { + client = getClient(request.getClientId()); + } catch (StatusException e) { + responseObserver.onError(e); + return; + } + + com.google.cloud.bigtable.data.v2.models.Row row; + ReadModifyWriteRow mutation = ReadModifyWriteRow.fromProto(request.getRequest()); + try { + row = client.dataClient().readModifyWriteRow(mutation); + } catch (ApiException e) { + responseObserver.onNext( + RowResult.newBuilder() + .setStatus( + com.google.rpc.Status.newBuilder() + .setCode(e.getStatusCode().getCode().ordinal()) + .setMessage(e.getMessage()) + .build()) + .build()); + responseObserver.onCompleted(); + return; + } + + if (row != null) { + try { + RowResult.Builder resultBuilder = convertRowResult(row); + responseObserver.onNext( + resultBuilder.setStatus(com.google.rpc.Status.getDefaultInstance()).build()); + } catch (RuntimeException e) { + // If client encounters problem, fail the whole operation. + responseObserver.onNext( + RowResult.newBuilder() + .setStatus( + com.google.rpc.Status.newBuilder() + .setCode(Code.INTERNAL.getNumber()) + .setMessage(e.getMessage()) + .build()) + .build()); + responseObserver.onCompleted(); + return; + } + } else { + logger.info( + String.format( + "readModifyWriteRow() did not find row: %s", request.getRequest().getRowKey())); + responseObserver.onNext( + RowResult.newBuilder().setStatus(com.google.rpc.Status.getDefaultInstance()).build()); + } + responseObserver.onCompleted(); + } + + @Override + public synchronized void close() { + Iterator> it = idClientMap.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + entry.getValue().dataClient().close(); + it.remove(); + } + } + + private static String extractTableIdFromTableName(String fullTableName) + throws IllegalArgumentException { + Matcher matcher = tablePattern.matcher(fullTableName); + if (!matcher.matches()) { + throw new IllegalArgumentException("Invalid table name: " + fullTableName); + } + return matcher.group(3); + } + + private InstantiatingGrpcChannelProvider getTransportChannel() throws IOException { + if (!encrypted) { + return EnhancedBigtableStubSettings.defaultGrpcTransportProviderBuilder() + .setChannelConfigurator(ManagedChannelBuilder::usePlaintext) + .build(); + } + + if (rootCerts == null) { + return EnhancedBigtableStubSettings.defaultGrpcTransportProviderBuilder().build(); + } + + final SslContext secureContext = + GrpcSslContexts.forClient() + .trustManager(new ByteArrayInputStream(rootCerts.getBytes(UTF_8))) + .build(); + return EnhancedBigtableStubSettings.defaultGrpcTransportProviderBuilder() + .setChannelConfigurator( + new ApiFunction() { + @Override + public ManagedChannelBuilder apply(ManagedChannelBuilder input) { + NettyChannelBuilder channelBuilder = (NettyChannelBuilder) input; + channelBuilder.sslContext(secureContext).overrideAuthority(sslTarget); + return channelBuilder; + } + }) + .build(); + } + + private CredentialsProvider getCredentialsProvider() throws IOException { + if (credential == null) { + return NoCredentialsProvider.create(); + } + + final GoogleCredentials creds = + GoogleCredentials.fromStream(new ByteArrayInputStream(credential.getBytes(UTF_8))); + + return FixedCredentialsProvider.create(creds); + } + + private final ConcurrentHashMap idClientMap; + private final boolean encrypted; + + // Parameters that may be needed when "encrypted" is true. + private final String rootCerts; + private final String sslTarget; + private final String credential; + + private static final Pattern tablePattern = + Pattern.compile("projects/([^/]+)/instances/([^/]+)/tables/([^/]+)"); +} diff --git a/test-proxy/src/main/java/com/google/cloud/bigtable/testproxy/CbtTestProxyMain.java b/test-proxy/src/main/java/com/google/cloud/bigtable/testproxy/CbtTestProxyMain.java new file mode 100644 index 0000000000..8750909f1a --- /dev/null +++ b/test-proxy/src/main/java/com/google/cloud/bigtable/testproxy/CbtTestProxyMain.java @@ -0,0 +1,51 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.testproxy; + +import io.grpc.ServerBuilder; +import java.io.IOException; +import java.util.logging.Logger; + +/** Starts a CbtTestProxy server. */ +public final class CbtTestProxyMain { + + private CbtTestProxyMain() {} + + private static final Logger logger = Logger.getLogger(CbtTestProxyMain.class.getName()); + + public static void main(String[] args) throws InterruptedException, IOException { + int port = Integer.getInteger("port", 9999); + if (port <= 0) { + throw new IllegalArgumentException(String.format("Port %d is not > 0.", port)); + } + + CbtTestProxy cbtTestProxy; + + // If encryption is specified + boolean encrypted = Boolean.getBoolean("encrypted"); + if (encrypted) { + String rootCertsPemPath = System.getProperty("root.certs.pem.path"); + String sslTarget = System.getProperty("ssl.target"); + String credentialJsonPath = System.getProperty("credential.json.path"); + cbtTestProxy = CbtTestProxy.createEncrypted(rootCertsPemPath, sslTarget, credentialJsonPath); + } else { + cbtTestProxy = CbtTestProxy.createUnencrypted(); + } + + logger.info(String.format("Test proxy starting on %d", port)); + ServerBuilder.forPort(port).addService(cbtTestProxy).build().start().awaitTermination(); + } +} diff --git a/test-proxy/src/main/proto/v2_test_proxy.proto b/test-proxy/src/main/proto/v2_test_proxy.proto new file mode 100644 index 0000000000..76e4f7826c --- /dev/null +++ b/test-proxy/src/main/proto/v2_test_proxy.proto @@ -0,0 +1,221 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package bigtable.client.test; + +import "google/bigtable/v2/bigtable.proto"; +import "google/bigtable/v2/data.proto"; +import "google/protobuf/duration.proto"; +import "google/rpc/status.proto"; + +option java_multiple_files = true; +option java_package = "com.google.cloud.bigtable.testproxy"; +option go_package = "./testproxypb"; + +// The `status` field of response messages always represents an error returned +// by the client binding, e.g. never a problem in either the proxy logic or +// test driver to proxy communication. After receiving a response from the +// proxy, the test driver should always check its `status` field. +// +// [test driver] <--> [test proxy <--> client binding] <--> [Cloud Bigtable] +// ^^^^ +// `status` represents success or errors +// returned from the client binding. +// +// Status propagation design examples, assuming the C++ client: +// +// // For CloudBigtableV2TestProxy.ReadRow +// StatusOr> result = table.ReadRow(row_key, filter); +// +// Set RowResult.status to OK iff result.status() is OK. +// OK is required even if the bool is false, indicating the row wasn't found. +// +// // For CloudBigtableV2TestProxy.BulkMutateRows +// std::vector failed = table.BulkApply(bulk_mutation); +// +// The semantics are less obvious for BulkApply(), because some mutations +// failing doesn't indicate the overall RPC fails. In such case, test proxy +// should disambiguate between RPC failure and individual entry failure, and +// set MutateRowsResult.status according to the overall RPC status. +// +// The final decision regarding semantics must be documented for the +// CloudBigtableV2TestProxy service in this file. + +message CreateClientRequest { + string client_id = 1; + // The "host:port" address of the data API endpoint (i.e. the backend being + // proxied to). Example: 127.0.0.1:38543 + string data_target = 2; + // The project for all calls on this client. + string project_id = 3; + // The instance for all calls on this client. + string instance_id = 4; + // Optional app profile for all calls on this client. + // Some client bindings allow specifying the app profile on a per-operation + // basis. We don't yet support this in the proxy API, but may in the future. + string app_profile_id = 5; + // If provided, a custom timeout will be set for each API call conducted by + // the created client. Otherwise, the default timeout from the client library + // will be used. Note that the override applies to all the methods. + google.protobuf.Duration per_operation_timeout = 6; +} + +message CreateClientResponse {} + +message CloseClientRequest { + string client_id = 1; +} + +message CloseClientResponse {} + +message RemoveClientRequest { + string client_id = 1; +} + +message RemoveClientResponse {} + +message ReadRowRequest { + string client_id = 1; + // The unique name of the table from which to read the row. + // Values are of the form + // `projects//instances//tables/`. + string table_name = 4; + string row_key = 2; + google.bigtable.v2.RowFilter filter = 3; +} + +message RowResult { + google.rpc.Status status = 1; + google.bigtable.v2.Row row = 2; +} + +message ReadRowsRequest { + string client_id = 1; + google.bigtable.v2.ReadRowsRequest request = 2; + // The streaming read can be canceled before all items are seen. + // Has no effect if non-positive. + int32 cancel_after_rows = 3; +} + +message RowsResult { + google.rpc.Status status = 1; + repeated google.bigtable.v2.Row row = 2; +} + +message MutateRowRequest { + string client_id = 1; + google.bigtable.v2.MutateRowRequest request = 2; +} + +message MutateRowResult { + google.rpc.Status status = 1; +} + +message MutateRowsRequest { + string client_id = 1; + google.bigtable.v2.MutateRowsRequest request = 2; +} + +message MutateRowsResult { + // Overall RPC status + google.rpc.Status status = 1; + // To record individual entry failures + repeated google.bigtable.v2.MutateRowsResponse.Entry entry = 2; +} + +message CheckAndMutateRowRequest { + string client_id = 1; + google.bigtable.v2.CheckAndMutateRowRequest request = 2; +} + +message CheckAndMutateRowResult { + google.rpc.Status status = 1; + google.bigtable.v2.CheckAndMutateRowResponse result = 2; +} + +message SampleRowKeysRequest { + string client_id = 1; + google.bigtable.v2.SampleRowKeysRequest request = 2; +} + +message SampleRowKeysResult { + google.rpc.Status status = 1; + repeated google.bigtable.v2.SampleRowKeysResponse sample = 2; +} + +message ReadModifyWriteRowRequest { + string client_id = 1; + google.bigtable.v2.ReadModifyWriteRowRequest request = 2; +} + +// Note that all RPCs are unary, even when the equivalent client binding call +// may be streaming. This is an intentional simplification. +// +// Most methods have sync (default) and async variants. For async variants, +// the proxy is expected to perform the async operation, then wait for results +// before delivering them back to the driver client. +// +// Operations that may have interesting concurrency characteristics are +// represented explicitly in the API (see ReadRowsRequest.cancel_after_rows). +// We include such operations only when they can be meaningfully performed +// through client bindings. +// +// Users should generally avoid setting deadlines for requests to the Proxy +// because operations are not cancelable. If the deadline is set anyway, please +// understand that the underlying operation will continue to be executed even +// after the deadline expires. +service CloudBigtableV2TestProxy { + // Client management: + // + // Creates a client in the proxy. + // Each client has its own dedicated channel(s), and can be used concurrently + // and independently with other clients. + rpc CreateClient(CreateClientRequest) returns (CreateClientResponse); + // Closes a client in the proxy, making it not accept new requests. + rpc CloseClient(CloseClientRequest) returns (CloseClientResponse); + // Removes a client in the proxy, making it inaccessible. Client closing + // should be done by CloseClient() separately. + rpc RemoveClient(RemoveClientRequest) returns (RemoveClientResponse); + + // Bigtable operations: for each operation, you should use the synchronous or + // asynchronous variant of the client method based on the `use_async_method` + // setting of the client instance. For starters, you can choose to implement + // one variant, and return UNIMPLEMENTED status for the other. + // + // Reads a row with the client instance. + // The result row may not be present in the response. + // Callers should check for it (e.g. calling has_row() in C++). + rpc ReadRow(ReadRowRequest) returns (RowResult); + + // Reads rows with the client instance. + rpc ReadRows(ReadRowsRequest) returns (RowsResult); + + // Writes a row with the client instance. + rpc MutateRow(MutateRowRequest) returns (MutateRowResult); + + // Writes multiple rows with the client instance. + rpc BulkMutateRows(MutateRowsRequest) returns (MutateRowsResult); + + // Performs a check-and-mutate-row operation with the client instance. + rpc CheckAndMutateRow(CheckAndMutateRowRequest) + returns (CheckAndMutateRowResult); + + // Obtains a row key sampling with the client instance. + rpc SampleRowKeys(SampleRowKeysRequest) returns (SampleRowKeysResult); + + // Performs a read-modify-write operation with the client. + rpc ReadModifyWriteRow(ReadModifyWriteRowRequest) returns (RowResult); +}