From 28fab80ed42dcdf56bac22f67b6e262e3ad54cd2 Mon Sep 17 00:00:00 2001 From: Hanzhen Yi <33737743+hzyi-google@users.noreply.github.com> Date: Tue, 6 Nov 2018 16:38:35 -0800 Subject: [PATCH] Spanner gapic migration (#3881) * Add GapicSpannerRpc stub (#3016) * spanner: implement partitionRead (#3022) * spanner: implement LRO methods using GAPIC stub (#3039) * Add implementation to:listInstances, listInstanceConfigs, listDatabases, getInstance, getInstanceConfigs, getDatabase, dropDatabase, deleteInstance, getDatabaseDdl (#3043) * spanner: move admin clients to GAPIC stub (#3067) * Spanner: migrate all unary call methods to gapic and inject headers (#3112) * Spanner: migrate streaming methods to gapic (#3139) * Spanner gapic migration lro methods (#3166) Migrate longrunning methods to GAPIC including: createDatabase, updateDatabaseDdl, createInstance, and updateInstance. * Make ChannelPool work (#3258) * Spanner gapic migration error augmentation with interceptor (#3304) * Add interceptors (#3346) * Clean up Spanner before merging to master (#3362) * Spanner Gapic Migration: fix updateDatabaseDdl (#3403) --- .../com/google/cloud/spanner/Database.java | 3 +- .../cloud/spanner/DatabaseAdminClient.java | 19 +- .../com/google/cloud/spanner/Instance.java | 5 +- .../cloud/spanner/InstanceAdminClient.java | 39 +- .../com/google/cloud/spanner/SpannerImpl.java | 253 ++++--- .../google/cloud/spanner/SpannerOptions.java | 190 ++--- .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 585 +++++++++++++++ .../cloud/spanner/spi/v1/GrpcSpannerRpc.java | 684 ------------------ .../spanner/spi/v1/LoggingInterceptor.java | 108 +++ .../spi/v1/SpannerInterceptorProvider.java | 62 ++ .../spi/v1/SpannerMetadataProvider.java | 22 +- .../cloud/spanner/spi/v1/SpannerRpc.java | 22 +- .../spanner/testing/RemoteSpannerHelper.java | 22 +- .../cloud/spanner/BatchClientImplTest.java | 13 +- .../spanner/DatabaseAdminClientImplTest.java | 55 +- .../cloud/spanner/GceTestEnvConfig.java | 9 +- .../spanner/InstanceAdminClientImplTest.java | 35 +- .../cloud/spanner/IntegrationTestEnv.java | 15 +- .../cloud/spanner/OperationFutureUtil.java | 275 +++++++ .../cloud/spanner/SpannerOptionsTest.java | 24 +- .../cloud/spanner/it/ITDatabaseAdminTest.java | 39 +- .../cloud/spanner/it/ITInstanceAdminTest.java | 14 +- .../spanner/spi/v1/RequestMetadataTest.java | 70 -- .../spi/v1/SpannerMetadataProviderTest.java | 13 + .../snippets/DatabaseAdminClientSnippets.java | 26 +- .../snippets/InstanceAdminClientSnippets.java | 25 +- 26 files changed, 1474 insertions(+), 1153 deletions(-) create mode 100644 google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java delete mode 100644 google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcSpannerRpc.java create mode 100644 google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/LoggingInterceptor.java create mode 100644 google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerInterceptorProvider.java create mode 100644 google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OperationFutureUtil.java delete mode 100644 google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/RequestMetadataTest.java diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Database.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Database.java index bee2dd1b96fd..97894ae12cec 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Database.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Database.java @@ -18,6 +18,7 @@ import static com.google.common.base.Preconditions.checkArgument; +import com.google.api.gax.longrunning.OperationFuture; import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata; /** @@ -51,7 +52,7 @@ public Database reload() throws SpannerException { * one. This must be unique within a database abd must be a valid identifier * [a-zA-Z][a-zA-Z0-9_]*. */ - public Operation updateDdl( + public OperationFuture updateDdl( Iterable statements, String operationId) throws SpannerException { return dbClient.updateDatabaseDdl(instance(), database(), statements, operationId); } diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseAdminClient.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseAdminClient.java index f4bf86de11d1..9662cc9050bc 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseAdminClient.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseAdminClient.java @@ -16,6 +16,7 @@ package com.google.cloud.spanner; +import com.google.api.gax.longrunning.OperationFuture; import com.google.api.gax.paging.Page; import com.google.cloud.spanner.Options.ListOption; import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; @@ -29,7 +30,8 @@ public interface DatabaseAdminClient { * Creates a new database in a Cloud Spanner instance. * *

Example to create database. - *

 {@code
+   *
+   * 
{@code
    * String instanceId = my_instance_id;
    * String databaseId = my_database_id;
    * Operation op = dbAdminClient
@@ -58,19 +60,19 @@ public interface DatabaseAdminClient {
    * @param statements DDL statements to run while creating the database, for example {@code CREATE
    *     TABLE MyTable ( ... )}. This should not include {@code CREATE DATABASE} statement.
    */
-  Operation createDatabase(
+  OperationFuture createDatabase(
       String instanceId, String databaseId, Iterable statements) throws SpannerException;
 
-  /** 
-   * Gets the current state of a Cloud Spanner database. 
+  /**
+   * Gets the current state of a Cloud Spanner database.
    *
    * 

Example to getDatabase. - *

 {@code
+   *
+   * 
{@code
    * String instanceId = my_instance_id;
    * String databaseId = my_database_id;
    * Database db = dbAdminClient.getDatabase(instanceId, databaseId);
    * }
- * */ Database getDatabase(String instanceId, String databaseId) throws SpannerException; @@ -84,7 +86,8 @@ Operation createDatabase( * fails, all subsequent statements in the batch are automatically cancelled. * *

Example to update the database DDL. - *

 {@code
+   *
+   * 
{@code
    * String instanceId = my_instance_id;
    * String databaseId = my_database_id;
    * dbAdminClient.updateDatabaseDdl(instanceId,
@@ -97,7 +100,7 @@ Operation createDatabase(
    *     one. This must be unique within a database abd must be a valid identifier
    *     [a-zA-Z][a-zA-Z0-9_]*.
    */
-  Operation updateDatabaseDdl(
+  OperationFuture updateDatabaseDdl(
       String instanceId,
       String databaseId,
       Iterable statements,
diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Instance.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Instance.java
index a1dc71362202..c8f92bf2111a 100644
--- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Instance.java
+++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Instance.java
@@ -16,6 +16,7 @@
 
 package com.google.cloud.spanner;
 
+import com.google.api.gax.longrunning.OperationFuture;
 import com.google.api.gax.paging.Page;
 import com.google.spanner.admin.database.v1.CreateDatabaseMetadata;
 import com.google.spanner.admin.instance.v1.UpdateInstanceMetadata;
@@ -104,7 +105,7 @@ public void delete() {
     instanceClient.deleteInstance(instanceId());
   }
 
-  public Operation update(
+  public OperationFuture update(
       InstanceInfo.InstanceField... fieldsToUpdate) {
     return instanceClient.updateInstance(this, fieldsToUpdate);
   }
@@ -125,7 +126,7 @@ public Database getDatabase(String databaseId) {
    * @param statements DDL statements to run while creating the database, for example {@code CREATE
    *     TABLE MyTable ( ... )}. This should not include {@code CREATE DATABASE} statement.
    */
-  public Operation createDatabase(
+  public OperationFuture createDatabase(
       String databaseId, Iterable statements) throws SpannerException {
     return dbClient.createDatabase(instanceId(), databaseId, statements);
   }
diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/InstanceAdminClient.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/InstanceAdminClient.java
index 4a046cc40c9f..b5760ef377fc 100644
--- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/InstanceAdminClient.java
+++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/InstanceAdminClient.java
@@ -16,6 +16,7 @@
 
 package com.google.cloud.spanner;
 
+import com.google.api.gax.longrunning.OperationFuture;
 import com.google.api.gax.paging.Page;
 import com.google.cloud.spanner.Options.ListOption;
 import com.google.spanner.admin.instance.v1.CreateInstanceMetadata;
@@ -51,29 +52,30 @@ public interface InstanceAdminClient {
    * upon completion of this request:
    *
    * 
    - *
  • The instance is readable via the API, with all requested attributes but no allocated + *
  • The instance is readable via the API, with all requested attributes but no allocated * resources. - *
  • Its state is {@code CREATING}. + *
  • Its state is {@code CREATING}. *
* * Until completion of the returned operation: * *
    - *
  • Cancelling the operation renders the instance immediately unreadable via the API. - *
  • The instance can be deleted. - *
  • All other attempts to modify the instance are rejected. + *
  • Cancelling the operation renders the instance immediately unreadable via the API. + *
  • The instance can be deleted. + *
  • All other attempts to modify the instance are rejected. *
* * Upon completion of the returned operation: * *
    - *
  • Billing for all successfully-allocated resources begins (some types may have lower than + *
  • Billing for all successfully-allocated resources begins (some types may have lower than * the requested levels). - *
  • Databases can be created in the instance. - *
  • The instance's allocated resource levels are readable via the + *
  • Databases can be created in the instance. + *
  • The instance's allocated resource levels are readable via the *
* * + * *
{@code
    * final String instanceId = my_instance_id;
    * final String configId = my_config_id;
@@ -88,9 +90,10 @@ public interface InstanceAdminClient {
    *         .build());
    * op.waitFor();
    * }
+ * * */ - Operation createInstance(InstanceInfo instance) + OperationFuture createInstance(InstanceInfo instance) throws SpannerException; /** Gets an instance. */ @@ -142,31 +145,32 @@ Operation createInstance(InstanceInfo instance *

Immediately upon completion of this request: * *

    - *
  • For resource types for which a decrease in the instance's allocation has been requested, + *
  • For resource types for which a decrease in the instance's allocation has been requested, * billing is based on the newly-requested level. *
* * Until completion of the returned operation: * *
    - *
  • Cancelling the operation sets its metadata's + *
  • Cancelling the operation sets its metadata's * [cancel_time][UpdateInstanceMetadata.cancel_time], and begins restoring resources to * their pre-request values. The operation is guaranteed to succeed at undoing all resource * changes, after which point it terminates with a `CANCELLED` status. - *
  • All other attempts to modify the instance are rejected. - *
  • Reading the instance via the API continues to give the pre-request resource levels. + *
  • All other attempts to modify the instance are rejected. + *
  • Reading the instance via the API continues to give the pre-request resource levels. *
* * Upon completion of the returned operation: * *
    - *
  • Billing begins for all successfully-allocated resources (some types may have lower than + *
  • Billing begins for all successfully-allocated resources (some types may have lower than * the requested levels). - *
  • All newly-reserved resources are available for serving the instance's tables. - *
  • The instance's new resource levels are readable via the API. + *
  • All newly-reserved resources are available for serving the instance's tables. + *
  • The instance's new resource levels are readable via the API. *
* * + * *
{@code
    * Instance instance = my_instance;
    * final String clientProject = my_client_project;
@@ -184,9 +188,10 @@ Operation createInstance(InstanceInfo instance
    *     instanceAdminClient.updateInstance(toUpdate, InstanceInfo.InstanceField.DISPLAY_NAME);
    * op.waitFor().getResult();
    * }
+ * * */ - Operation updateInstance( + OperationFuture updateInstance( InstanceInfo instance, InstanceInfo.InstanceField... fieldsToUpdate); /** Returns a builder for {@code Instance} object with the given id. */ diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index d89471a2b8de..ff5a0f481958 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -24,7 +24,13 @@ import com.google.api.client.util.BackOff; import com.google.api.client.util.ExponentialBackOff; +import com.google.api.core.ApiFunction; +import com.google.api.gax.grpc.ProtoOperationTransformers; +import com.google.api.gax.longrunning.OperationFuture; +import com.google.api.gax.longrunning.OperationFutureImpl; +import com.google.api.gax.longrunning.OperationSnapshot; import com.google.api.gax.paging.Page; +import com.google.api.gax.rpc.ServerStream; import com.google.api.pathtemplate.PathTemplate; import com.google.cloud.BaseService; import com.google.cloud.ByteArray; @@ -32,7 +38,6 @@ import com.google.cloud.PageImpl; import com.google.cloud.PageImpl.NextPageFetcher; import com.google.cloud.Timestamp; -import com.google.cloud.spanner.Operation.Parser; import com.google.cloud.spanner.Options.ListOption; import com.google.cloud.spanner.Options.QueryOption; import com.google.cloud.spanner.Options.ReadOption; @@ -51,6 +56,7 @@ import com.google.common.util.concurrent.Uninterruptibles; import com.google.protobuf.Any; import com.google.protobuf.ByteString; +import com.google.protobuf.Empty; import com.google.protobuf.FieldMask; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.ListValue; @@ -75,7 +81,6 @@ import com.google.spanner.v1.TransactionSelector; import com.google.spanner.v1.TypeCode; import io.grpc.Context; -import io.grpc.ManagedChannel; import io.opencensus.common.Scope; import io.opencensus.trace.AttributeValue; import io.opencensus.trace.Span; @@ -147,7 +152,7 @@ private static void throwIfTransactionsPending() { } private final Random random = new Random(); - private final SpannerRpc rpc; + private final SpannerRpc gapicRpc; private final int defaultPrefetchChunks; @GuardedBy("this") @@ -159,12 +164,13 @@ private static void throwIfTransactionsPending() { @GuardedBy("this") private boolean spannerIsClosed = false; - SpannerImpl(SpannerRpc rpc, int defaultPrefetchChunks, SpannerOptions options) { + SpannerImpl(SpannerRpc gapicRpc, int defaultPrefetchChunks, SpannerOptions options) { super(options); - this.rpc = rpc; + this.gapicRpc = gapicRpc; this.defaultPrefetchChunks = defaultPrefetchChunks; - this.dbAdminClient = new DatabaseAdminClientImpl(options.getProjectId(), rpc); - this.instanceClient = new InstanceAdminClientImpl(options.getProjectId(), rpc, dbAdminClient); + this.dbAdminClient = new DatabaseAdminClientImpl(options.getProjectId(), gapicRpc); + this.instanceClient = + new InstanceAdminClientImpl(options.getProjectId(), gapicRpc, dbAdminClient); } SpannerImpl(SpannerOptions options) { @@ -268,7 +274,8 @@ Session createSession(final DatabaseId db) throws SpannerException { new Callable() { @Override public com.google.spanner.v1.Session call() throws Exception { - return rpc.createSession(db.getName(), getOptions().getSessionLabels(), options); + return gapicRpc.createSession( + db.getName(), getOptions().getSessionLabels(), options); } }); span.end(); @@ -332,12 +339,10 @@ public void close() { } catch (InterruptedException | ExecutionException e) { throw SpannerExceptionFactory.newSpannerException(e); } - for (ManagedChannel channel : getOptions().getRpcChannels()) { - try { - channel.shutdown(); - } catch (RuntimeException e) { - logger.log(Level.WARNING, "Failed to close channel", e); - } + try { + gapicRpc.shutdown(); + } catch (RuntimeException e) { + logger.log(Level.WARNING, "Failed to close channels", e); } } @@ -440,27 +445,31 @@ static class DatabaseAdminClientImpl implements DatabaseAdminClient { } @Override - public Operation createDatabase( + public OperationFuture createDatabase( String instanceId, String databaseId, Iterable statements) throws SpannerException { // CreateDatabase() is not idempotent, so we're not retrying this request. String instanceName = getInstanceName(instanceId); String createStatement = "CREATE DATABASE `" + databaseId + "`"; - com.google.longrunning.Operation op = - rpc.createDatabase(instanceName, createStatement, statements); - return Operation.create( - rpc, - op, - new Parser() { + OperationFuture + rawOperationFuture = rpc.createDatabase(instanceName, createStatement, statements); + return new OperationFutureImpl( + rawOperationFuture.getPollingFuture(), + rawOperationFuture.getInitialFuture(), + new ApiFunction() { @Override - public Database parseResult(Any response) { + public Database apply(OperationSnapshot snapshot) { return Database.fromProto( - unpack(response, com.google.spanner.admin.database.v1.Database.class), + ProtoOperationTransformers.ResponseTransformer.create( + com.google.spanner.admin.database.v1.Database.class) + .apply(snapshot), DatabaseAdminClientImpl.this); } - + }, + ProtoOperationTransformers.MetadataTransformer.create(CreateDatabaseMetadata.class), + new ApiFunction() { @Override - public CreateDatabaseMetadata parseMetadata(Any metadata) { - return unpack(metadata, CreateDatabaseMetadata.class); + public Database apply(Exception e) { + throw SpannerExceptionFactory.newSpannerException(e); } }); } @@ -479,7 +488,7 @@ public Database call() throws Exception { } @Override - public Operation updateDatabaseDdl( + public OperationFuture updateDatabaseDdl( final String instanceId, final String databaseId, final Iterable statements, @@ -487,47 +496,24 @@ public Operation updateDatabaseDdl( throws SpannerException { final String dbName = getDatabaseName(instanceId, databaseId); final String opId = operationId != null ? operationId : randomOperationId(); - Callable> callable = - new Callable>() { + OperationFuture rawOperationFuture = + rpc.updateDatabaseDdl(dbName, statements, opId); + return new OperationFutureImpl( + rawOperationFuture.getPollingFuture(), + rawOperationFuture.getInitialFuture(), + new ApiFunction() { @Override - public Operation call() { - com.google.longrunning.Operation op = null; - try { - op = rpc.updateDatabaseDdl(dbName, statements, opId); - } catch (SpannerException e) { - if (e.getErrorCode() == ErrorCode.ALREADY_EXISTS) { - String opName = - OP_NAME_TEMPLATE.instantiate( - "project", - projectId, - "instance", - instanceId, - "database", - databaseId, - "operation", - opId); - op = com.google.longrunning.Operation.newBuilder().setName(opName).build(); - } else { - throw e; - } - } - return Operation.create( - rpc, - op, - new Parser() { - @Override - public Void parseResult(Any response) { - return null; - } - - @Override - public UpdateDatabaseDdlMetadata parseMetadata(Any metadata) { - return unpack(metadata, UpdateDatabaseDdlMetadata.class); - } - }); + public Void apply(OperationSnapshot snapshot) { + return null; } - }; - return runWithRetries(callable); + }, + ProtoOperationTransformers.MetadataTransformer.create(UpdateDatabaseDdlMetadata.class), + new ApiFunction() { + @Override + public Database apply(Exception e) { + throw SpannerExceptionFactory.newSpannerException(e); + } + }); } @Override @@ -643,26 +629,32 @@ public InstanceConfig fromProto( } @Override - public Operation createInstance(InstanceInfo instance) + public OperationFuture createInstance(InstanceInfo instance) throws SpannerException { String projectName = PROJECT_NAME_TEMPLATE.instantiate("project", projectId); - com.google.longrunning.Operation op = - rpc.createInstance(projectName, instance.getId().getInstance(), instance.toProto()); - return Operation.create( - rpc, - op, - new Parser() { + OperationFuture + rawOperationFuture = + rpc.createInstance(projectName, instance.getId().getInstance(), instance.toProto()); + + return new OperationFutureImpl( + rawOperationFuture.getPollingFuture(), + rawOperationFuture.getInitialFuture(), + new ApiFunction() { @Override - public Instance parseResult(Any response) { + public Instance apply(OperationSnapshot snapshot) { return Instance.fromProto( - unpack(response, com.google.spanner.admin.instance.v1.Instance.class), + ProtoOperationTransformers.ResponseTransformer.create( + com.google.spanner.admin.instance.v1.Instance.class) + .apply(snapshot), InstanceAdminClientImpl.this, dbClient); } - + }, + ProtoOperationTransformers.MetadataTransformer.create(CreateInstanceMetadata.class), + new ApiFunction() { @Override - public CreateInstanceMetadata parseMetadata(Any metadata) { - return unpack(metadata, CreateInstanceMetadata.class); + public Instance apply(Exception e) { + throw SpannerExceptionFactory.newSpannerException(e); } }); } @@ -717,28 +709,34 @@ public Void call() { } @Override - public Operation updateInstance( + public OperationFuture updateInstance( InstanceInfo instance, InstanceInfo.InstanceField... fieldsToUpdate) { FieldMask fieldMask = fieldsToUpdate.length == 0 ? InstanceInfo.InstanceField.toFieldMask(InstanceInfo.InstanceField.values()) : InstanceInfo.InstanceField.toFieldMask(fieldsToUpdate); - com.google.longrunning.Operation op = rpc.updateInstance(instance.toProto(), fieldMask); - return Operation.create( - rpc, - op, - new Parser() { + + OperationFuture + rawOperationFuture = rpc.updateInstance(instance.toProto(), fieldMask); + return new OperationFutureImpl( + rawOperationFuture.getPollingFuture(), + rawOperationFuture.getInitialFuture(), + new ApiFunction() { @Override - public Instance parseResult(Any response) { + public Instance apply(OperationSnapshot snapshot) { return Instance.fromProto( - unpack(response, com.google.spanner.admin.instance.v1.Instance.class), + ProtoOperationTransformers.ResponseTransformer.create( + com.google.spanner.admin.instance.v1.Instance.class) + .apply(snapshot), InstanceAdminClientImpl.this, dbClient); } - + }, + ProtoOperationTransformers.MetadataTransformer.create(UpdateInstanceMetadata.class), + new ApiFunction() { @Override - public UpdateInstanceMetadata parseMetadata(Any metadata) { - return unpack(metadata, UpdateInstanceMetadata.class); + public Instance apply(Exception e) { + throw SpannerExceptionFactory.newSpannerException(e); } }); } @@ -772,7 +770,7 @@ public String getName() { @Override public long executePartitionedUpdate(Statement stmt) { setActive(null); - PartitionedDMLTransaction txn = new PartitionedDMLTransaction(this, rpc); + PartitionedDMLTransaction txn = new PartitionedDMLTransaction(this, gapicRpc); return txn.executePartitionedUpdate(stmt); } @@ -814,7 +812,7 @@ public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerEx new Callable() { @Override public CommitResponse call() throws Exception { - return rpc.commit(request, options); + return gapicRpc.commit(request, options); } }); Timestamp t = Timestamp.fromProto(response.getCommitTimestamp()); @@ -836,7 +834,7 @@ public ReadContext singleUse() { @Override public ReadContext singleUse(TimestampBound bound) { - return setActive(new SingleReadContext(this, bound, rpc, defaultPrefetchChunks)); + return setActive(new SingleReadContext(this, bound, gapicRpc, defaultPrefetchChunks)); } @Override @@ -846,7 +844,8 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction() { @Override public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) { - return setActive(new SingleUseReadOnlyTransaction(this, bound, rpc, defaultPrefetchChunks)); + return setActive( + new SingleUseReadOnlyTransaction(this, bound, gapicRpc, defaultPrefetchChunks)); } @Override @@ -856,12 +855,13 @@ public ReadOnlyTransaction readOnlyTransaction() { @Override public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) { - return setActive(new MultiUseReadOnlyTransaction(this, bound, rpc, defaultPrefetchChunks)); + return setActive( + new MultiUseReadOnlyTransaction(this, bound, gapicRpc, defaultPrefetchChunks)); } @Override public TransactionRunner readWriteTransaction() { - return setActive(new TransactionRunnerImpl(this, rpc, defaultPrefetchChunks)); + return setActive(new TransactionRunnerImpl(this, gapicRpc, defaultPrefetchChunks)); } @Override @@ -878,7 +878,7 @@ public void close() { new Callable() { @Override public Void call() throws Exception { - rpc.deleteSession(name, options); + gapicRpc.deleteSession(name, options); return null; } }); @@ -904,7 +904,7 @@ ByteString beginTransaction() { new Callable() { @Override public Transaction call() throws Exception { - return rpc.beginTransaction(request, options); + return gapicRpc.beginTransaction(request, options); } }); if (txn.getId().isEmpty()) { @@ -919,11 +919,11 @@ public Transaction call() throws Exception { } TransactionContextImpl newTransaction() { - TransactionContextImpl txn = new TransactionContextImpl(this, readyTransactionId, rpc, - defaultPrefetchChunks); + TransactionContextImpl txn = + new TransactionContextImpl(this, readyTransactionId, gapicRpc, defaultPrefetchChunks); return txn; } - + T setActive(@Nullable T ctx) { throwIfTransactionsPending(); @@ -1097,10 +1097,7 @@ CloseableIterator startStream(@Nullable ByteString resumeToken } SpannerRpc.StreamingCall call = rpc.executeQuery(request.build(), stream.consumer(), session.options); - // We get one message for free. - if (prefetchChunks > 1) { - call.request(prefetchChunks - 1); - } + call.request(prefetchChunks); stream.setCall(call); return stream; } @@ -1207,10 +1204,7 @@ CloseableIterator startStream(@Nullable ByteString resumeToken } SpannerRpc.StreamingCall call = rpc.read(builder.build(), stream.consumer(), session.options); - // We get one message for free. - if (prefetchChunks > 1) { - call.request(prefetchChunks - 1); - } + call.request(prefetchChunks); stream.setCall(call); return stream; } @@ -2452,6 +2446,49 @@ interface CloseableIterator extends Iterator { void close(@Nullable String message); } + private static final class CloseableServerStreamIterator implements CloseableIterator { + + private final ServerStream stream; + private final Iterator iterator; + + public CloseableServerStreamIterator(ServerStream stream) { + this.stream = stream; + this.iterator = stream.iterator(); + } + + @Override + public boolean hasNext() { + try { + return iterator.hasNext(); + } catch (Exception e) { + throw SpannerExceptionFactory.newSpannerException(e); + } + } + + @Override + public T next() { + try { + return iterator.next(); + } catch (Exception e) { + throw SpannerExceptionFactory.newSpannerException(e); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Not supported: remove."); + } + + @Override + public void close(@Nullable String message) { + try { + stream.cancel(); + } catch (Exception e) { + throw SpannerExceptionFactory.newSpannerException(e); + } + } + } + /** Adapts a streaming read/query call into an iterator over partial result sets. */ @VisibleForTesting static class GrpcStreamIterator extends AbstractIterator diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java index b5ca59c1b9c1..d0d298705deb 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java @@ -16,30 +16,23 @@ package com.google.cloud.spanner; -import com.google.cloud.grpc.GrpcTransportOptions; +import com.google.api.gax.grpc.GrpcInterceptorProvider; +import com.google.api.gax.rpc.TransportChannelProvider; import com.google.cloud.ServiceDefaults; import com.google.cloud.ServiceOptions; import com.google.cloud.ServiceRpc; import com.google.cloud.TransportOptions; -import com.google.cloud.spanner.spi.v1.GrpcSpannerRpc; -import com.google.cloud.spanner.spi.v1.SpannerRpc; +import com.google.cloud.grpc.GrpcTransportOptions; import com.google.cloud.spanner.spi.SpannerRpcFactory; -import com.google.common.base.MoreObjects; +import com.google.cloud.spanner.spi.v1.GapicSpannerRpc; +import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import io.grpc.ClientInterceptor; -import io.grpc.ManagedChannel; -import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; -import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; -import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; import java.net.MalformedURLException; import java.net.URL; -import java.util.List; import java.util.Map; import java.util.Set; -import javax.net.ssl.SSLException; /** Options for the Cloud Spanner service. */ public class SpannerOptions extends ServiceOptions { @@ -52,8 +45,13 @@ public class SpannerOptions extends ServiceOptions { "https://www.googleapis.com/auth/spanner.admin", "https://www.googleapis.com/auth/spanner.data"); private static final int MAX_CHANNELS = 256; - private static final RpcChannelFactory DEFAULT_RPC_CHANNEL_FACTORY = new NettyRpcChannelFactory(); - + private final TransportChannelProvider channelProvider; + private final GrpcInterceptorProvider interceptorProvider; + private final SessionPoolOptions sessionPoolOptions; + private final int prefetchChunks; + private final int numChannels; + private final ImmutableMap sessionLabels; + /** Default implementation of {@code SpannerFactory}. */ private static class DefaultSpannerFactory implements SpannerFactory { private static final DefaultSpannerFactory INSTANCE = new DefaultSpannerFactory(); @@ -70,29 +68,21 @@ private static class DefaultSpannerRpcFactory implements SpannerRpcFactory { @Override public ServiceRpc create(SpannerOptions options) { - return new GrpcSpannerRpc(options); + return new GapicSpannerRpc(options); } } - private final List rpcChannels; - private final SessionPoolOptions sessionPoolOptions; - private final int prefetchChunks; - private final int numChannels; - private final ImmutableMap sessionLabels; - private SpannerOptions(Builder builder) { super(SpannerFactory.class, SpannerRpcFactory.class, builder, new SpannerDefaults()); numChannels = builder.numChannels; - String userAgent = getUserAgent(); - RpcChannelFactory defaultRpcChannelFactory = - userAgent == null - ? DEFAULT_RPC_CHANNEL_FACTORY - : new NettyRpcChannelFactory(userAgent); - rpcChannels = - createChannels( - getHost(), - MoreObjects.firstNonNull(builder.rpcChannelFactory, defaultRpcChannelFactory), - numChannels); + Preconditions.checkArgument( + numChannels >= 1 && numChannels <= MAX_CHANNELS, + "Number of channels must fall in the range [1, %s], found: %s", + MAX_CHANNELS, + numChannels); + + channelProvider = builder.channelProvider; + interceptorProvider = builder.interceptorProvider; sessionPoolOptions = builder.sessionPoolOptions != null ? builder.sessionPoolOptions @@ -103,10 +93,11 @@ private SpannerOptions(Builder builder) { /** Builder for {@link SpannerOptions} instances. */ public static class Builder - extends ServiceOptions.Builder< - Spanner, SpannerOptions, SpannerOptions.Builder> { + extends ServiceOptions.Builder { private static final int DEFAULT_PREFETCH_CHUNKS = 4; - private RpcChannelFactory rpcChannelFactory; + private TransportChannelProvider channelProvider; + private GrpcInterceptorProvider interceptorProvider; + /** By default, we create 4 channels per {@link SpannerOptions} */ private int numChannels = 4; @@ -122,6 +113,8 @@ private Builder() {} this.sessionPoolOptions = options.sessionPoolOptions; this.prefetchChunks = options.prefetchChunks; this.sessionLabels = options.sessionLabels; + this.channelProvider = options.channelProvider; + this.interceptorProvider = options.interceptorProvider; } @Override @@ -133,9 +126,21 @@ public Builder setTransportOptions(TransportOptions transportOptions) { return super.setTransportOptions(transportOptions); } - /** Sets the factory for creating gRPC channels. If not set, a default will be used. */ - public Builder setRpcChannelFactory(RpcChannelFactory factory) { - this.rpcChannelFactory = factory; + /** + * Sets the {@code ChannelProvider}. {@link GapicSpannerRpc} would create a default one if none + * is provided. + */ + public Builder setChannelProvider(TransportChannelProvider channelProvider) { + this.channelProvider = channelProvider; + return this; + } + + /** + * Sets the {@code GrpcInterceptorProvider}. {@link GapicSpannerRpc} would create a default one + * if none is provided. + */ + public Builder setInterceptorProvider(GrpcInterceptorProvider interceptorProvider) { + this.interceptorProvider = interceptorProvider; return this; } @@ -196,14 +201,6 @@ public SpannerOptions build() { } } - /** - * Interface for gRPC channel creation. Most users won't need to use this, as the default covers - * typical deployment scenarios. - */ - public interface RpcChannelFactory { - ManagedChannel newChannel(String host, int port); - } - /** Returns default instance of {@code SpannerOptions}. */ public static SpannerOptions getDefaultInstance() { return newBuilder().build(); @@ -213,8 +210,16 @@ public static Builder newBuilder() { return new Builder(); } - public List getRpcChannels() { - return rpcChannels; + public TransportChannelProvider getChannelProvider() { + return channelProvider; + } + + public GrpcInterceptorProvider getInterceptorProvider() { + return interceptorProvider; + } + + public int getNumChannels() { + return numChannels; } public SessionPoolOptions getSessionPoolOptions() { @@ -233,90 +238,12 @@ public static GrpcTransportOptions getDefaultGrpcTransportOptions() { return GrpcTransportOptions.newBuilder().build(); } - /** - * Returns the default RPC channel factory used when none is specified. This may be useful for - * callers that wish to add interceptors to gRPC channels used by the Cloud Spanner client - * library. - */ - public static RpcChannelFactory getDefaultRpcChannelFactory() { - return DEFAULT_RPC_CHANNEL_FACTORY; - } - @Override protected String getDefaultHost() { return DEFAULT_HOST; } - private static List createChannels( - String rootUrl, RpcChannelFactory factory, int numChannels) { - Preconditions.checkArgument( - numChannels >= 1 && numChannels <= MAX_CHANNELS, - "Number of channels must fall in the range [1, %s], found: %s", - MAX_CHANNELS, - numChannels); - ImmutableList.Builder builder = ImmutableList.builder(); - for (int i = 0; i < numChannels; i++) { - builder.add(createChannel(rootUrl, factory)); - } - return builder.build(); - } - - private static ManagedChannel createChannel(String rootUrl, RpcChannelFactory factory) { - URL url; - try { - url = new URL(rootUrl); - } catch (MalformedURLException e) { - throw new IllegalArgumentException("Invalid host: " + rootUrl, e); - } - ManagedChannel channel = - factory.newChannel(url.getHost(), url.getPort() > 0 ? url.getPort() : url.getDefaultPort()); - return channel; - } - - static class NettyRpcChannelFactory implements RpcChannelFactory { - private static final int MAX_MESSAGE_SIZE = 100 * 1024 * 1024; - private static final int MAX_HEADER_LIST_SIZE = 32 * 1024; //bytes - private final String userAgent; - private final List interceptors; - - NettyRpcChannelFactory() { - this(null); - } - - NettyRpcChannelFactory(String userAgent) { - this(userAgent, ImmutableList.of()); - } - - NettyRpcChannelFactory(String userAgent, List interceptors) { - this.userAgent = userAgent; - this.interceptors = interceptors; - } - - @Override - public ManagedChannel newChannel(String host, int port) { - NettyChannelBuilder builder = - NettyChannelBuilder.forAddress(host, port) - .sslContext(newSslContext()) - .intercept(interceptors) - .maxHeaderListSize(MAX_HEADER_LIST_SIZE) - .maxMessageSize(MAX_MESSAGE_SIZE); - if (userAgent != null) { - builder.userAgent(userAgent); - } - return builder.build(); - } - - private static SslContext newSslContext() { - try { - return GrpcSslContexts.forClient().ciphers(null).build(); - } catch (SSLException e) { - throw new RuntimeException("SSL configuration failed: " + e.getMessage(), e); - } - } - } - - private static class SpannerDefaults implements - ServiceDefaults { + private static class SpannerDefaults implements ServiceDefaults { @Override public SpannerFactory getDefaultServiceFactory() { @@ -348,4 +275,15 @@ protected SpannerRpc getSpannerRpcV1() { public Builder toBuilder() { return new Builder(this); } + + public String getEndpoint() { + URL url; + try { + url = new URL(getHost()); + } catch (MalformedURLException e) { + throw new IllegalArgumentException("Invalid host: " + getHost(), e); + } + return String.format( + "%s:%s", url.getHost(), url.getPort() < 0 ? url.getDefaultPort() : url.getPort()); + } } diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java new file mode 100644 index 000000000000..4f367772cdcc --- /dev/null +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -0,0 +1,585 @@ +/* + * Copyright 2018 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.spi.v1; + +import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException; + +import com.google.api.core.ApiFunction; +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.GaxProperties; +import com.google.api.gax.core.InstantiatingExecutorProvider; +import com.google.api.gax.grpc.GaxGrpcProperties; +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; +import com.google.api.gax.longrunning.OperationFuture; +import com.google.api.gax.rpc.AlreadyExistsException; +import com.google.api.gax.rpc.ApiClientHeaderProvider; +import com.google.api.gax.rpc.HeaderProvider; +import com.google.api.gax.rpc.OperationCallable; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.StatusCode; +import com.google.api.gax.rpc.StreamController; +import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.api.gax.rpc.UnaryCallSettings; +import com.google.api.pathtemplate.PathTemplate; +import com.google.cloud.ServiceOptions; +import com.google.cloud.grpc.GrpcTransportOptions; +import com.google.cloud.spanner.SpannerException; +import com.google.cloud.spanner.SpannerExceptionFactory; +import com.google.cloud.spanner.SpannerOptions; +import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStub; +import com.google.cloud.spanner.admin.database.v1.stub.DatabaseAdminStubSettings; +import com.google.cloud.spanner.admin.database.v1.stub.GrpcDatabaseAdminStub; +import com.google.cloud.spanner.admin.instance.v1.stub.GrpcInstanceAdminStub; +import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStub; +import com.google.cloud.spanner.admin.instance.v1.stub.InstanceAdminStubSettings; +import com.google.cloud.spanner.spi.v1.SpannerRpc.Option; +import com.google.cloud.spanner.v1.stub.GrpcSpannerStub; +import com.google.cloud.spanner.v1.stub.SpannerStub; +import com.google.cloud.spanner.v1.stub.SpannerStubSettings; +import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; +import com.google.longrunning.GetOperationRequest; +import com.google.longrunning.Operation; +import com.google.protobuf.Empty; +import com.google.protobuf.FieldMask; +import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; +import com.google.spanner.admin.database.v1.CreateDatabaseRequest; +import com.google.spanner.admin.database.v1.Database; +import com.google.spanner.admin.database.v1.DropDatabaseRequest; +import com.google.spanner.admin.database.v1.GetDatabaseDdlRequest; +import com.google.spanner.admin.database.v1.GetDatabaseRequest; +import com.google.spanner.admin.database.v1.ListDatabasesRequest; +import com.google.spanner.admin.database.v1.ListDatabasesResponse; +import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata; +import com.google.spanner.admin.database.v1.UpdateDatabaseDdlRequest; +import com.google.spanner.admin.instance.v1.CreateInstanceMetadata; +import com.google.spanner.admin.instance.v1.CreateInstanceRequest; +import com.google.spanner.admin.instance.v1.DeleteInstanceRequest; +import com.google.spanner.admin.instance.v1.GetInstanceConfigRequest; +import com.google.spanner.admin.instance.v1.GetInstanceRequest; +import com.google.spanner.admin.instance.v1.Instance; +import com.google.spanner.admin.instance.v1.InstanceConfig; +import com.google.spanner.admin.instance.v1.ListInstanceConfigsRequest; +import com.google.spanner.admin.instance.v1.ListInstanceConfigsResponse; +import com.google.spanner.admin.instance.v1.ListInstancesRequest; +import com.google.spanner.admin.instance.v1.ListInstancesResponse; +import com.google.spanner.admin.instance.v1.UpdateInstanceMetadata; +import com.google.spanner.admin.instance.v1.UpdateInstanceRequest; +import com.google.spanner.v1.BeginTransactionRequest; +import com.google.spanner.v1.CommitRequest; +import com.google.spanner.v1.CommitResponse; +import com.google.spanner.v1.CreateSessionRequest; +import com.google.spanner.v1.DeleteSessionRequest; +import com.google.spanner.v1.ExecuteSqlRequest; +import com.google.spanner.v1.PartialResultSet; +import com.google.spanner.v1.PartitionQueryRequest; +import com.google.spanner.v1.PartitionReadRequest; +import com.google.spanner.v1.PartitionResponse; +import com.google.spanner.v1.ReadRequest; +import com.google.spanner.v1.ResultSet; +import com.google.spanner.v1.RollbackRequest; +import com.google.spanner.v1.Session; +import com.google.spanner.v1.Transaction; +import io.grpc.Context; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import javax.annotation.Nullable; + +/** Implementation of Cloud Spanner remote calls using Gapic libraries. */ +public class GapicSpannerRpc implements SpannerRpc { + + private static final PathTemplate PROJECT_NAME_TEMPLATE = + PathTemplate.create("projects/{project}"); + private static final PathTemplate OPERATION_NAME_TEMPLATE = + PathTemplate.create("{database=projects/*/instances/*/databases/*}/operations/{operation}"); + private static final int MAX_MESSAGE_SIZE = 100 * 1024 * 1024; + private static final int MAX_METADATA_SIZE = 32 * 1024; // bytes + + private final SpannerStub spannerStub; + private final InstanceAdminStub instanceAdminStub; + private final DatabaseAdminStub databaseAdminStub; + private final String projectId; + private final String projectName; + private final SpannerMetadataProvider metadataProvider; + + public static GapicSpannerRpc create(SpannerOptions options) { + return new GapicSpannerRpc(options); + } + + public GapicSpannerRpc(SpannerOptions options) { + this.projectId = options.getProjectId(); + this.projectName = PROJECT_NAME_TEMPLATE.instantiate("project", this.projectId); + + // create a metadataProvider which combines both internal headers and + // per-method-call extra headers for channelProvider to inject the headers + // for rpc calls + ApiClientHeaderProvider.Builder internalHeaderProviderBuilder = + ApiClientHeaderProvider.newBuilder(); + ApiClientHeaderProvider internalHeaderProvider = + internalHeaderProviderBuilder + .setClientLibToken( + ServiceOptions.getGoogApiClientLibName(), + GaxProperties.getLibraryVersion(options.getClass())) + .setTransportToken( + GaxGrpcProperties.getGrpcTokenName(), GaxGrpcProperties.getGrpcVersion()) + .build(); + + HeaderProvider mergedHeaderProvider = options.getMergedHeaderProvider(internalHeaderProvider); + this.metadataProvider = + SpannerMetadataProvider.create( + mergedHeaderProvider.getHeaders(), + internalHeaderProviderBuilder.getResourceHeaderKey()); + + // First check if SpannerOptions provides a TransportChannerProvider. Create one + // with information gathered from SpannerOptions if none is provided + TransportChannelProvider channelProvider = + MoreObjects.firstNonNull( + options.getChannelProvider(), + InstantiatingGrpcChannelProvider.newBuilder() + .setEndpoint(options.getEndpoint()) + .setMaxInboundMessageSize(MAX_MESSAGE_SIZE) + .setMaxInboundMetadataSize(MAX_METADATA_SIZE) + .setPoolSize(options.getNumChannels()) + + // Then check if SpannerOptions provides an InterceptorProvider. Create a default + // SpannerInterceptorProvider if none is provided + .setInterceptorProvider( + MoreObjects.firstNonNull( + options.getInterceptorProvider(), + SpannerInterceptorProvider.createDefault())) + .setHeaderProvider(mergedHeaderProvider) + .setExecutorProvider(InstantiatingExecutorProvider.newBuilder().build()) + .build()); + + CredentialsProvider credentialsProvider = + GrpcTransportOptions.setUpCredentialsProvider(options); + + // Disabling retry for now because spanner handles retry in SpannerImpl. + // We will finally want to improve gax but for smooth transitioning we + // preserve the retry in SpannerImpl + try { + // TODO: bump the version of gax and remove this try-catch block + // applyToAllUnaryMethods does not throw exception in the latest version + this.spannerStub = + GrpcSpannerStub.create( + SpannerStubSettings.newBuilder() + .setTransportChannelProvider(channelProvider) + .setCredentialsProvider(credentialsProvider) + .applyToAllUnaryMethods( + new ApiFunction, Void>() { + @Override + public Void apply(UnaryCallSettings.Builder builder) { + builder.setRetryableCodes(ImmutableSet.of()); + return null; + } + }) + .build()); + + this.instanceAdminStub = + GrpcInstanceAdminStub.create( + InstanceAdminStubSettings.newBuilder() + .setTransportChannelProvider(channelProvider) + .setCredentialsProvider(credentialsProvider) + .applyToAllUnaryMethods( + new ApiFunction, Void>() { + @Override + public Void apply(UnaryCallSettings.Builder builder) { + builder.setRetryableCodes(ImmutableSet.of()); + return null; + } + }) + .build()); + this.databaseAdminStub = + GrpcDatabaseAdminStub.create( + DatabaseAdminStubSettings.newBuilder() + .setTransportChannelProvider(channelProvider) + .setCredentialsProvider(credentialsProvider) + .applyToAllUnaryMethods( + new ApiFunction, Void>() { + @Override + public Void apply(UnaryCallSettings.Builder builder) { + builder.setRetryableCodes(ImmutableSet.of()); + return null; + } + }) + .build()); + } catch (Exception e) { + throw newSpannerException(e); + } + } + + @Override + public Paginated listInstanceConfigs(int pageSize, @Nullable String pageToken) + throws SpannerException { + ListInstanceConfigsRequest.Builder requestBuilder = + ListInstanceConfigsRequest.newBuilder().setParent(projectName).setPageSize(pageSize); + if (pageToken != null) { + requestBuilder.setPageToken(pageToken); + } + ListInstanceConfigsRequest request = requestBuilder.build(); + + GrpcCallContext context = newCallContext(null, projectName); + ListInstanceConfigsResponse response = + get(instanceAdminStub.listInstanceConfigsCallable().futureCall(request, context)); + return new Paginated<>(response.getInstanceConfigsList(), response.getNextPageToken()); + } + + @Override + public InstanceConfig getInstanceConfig(String instanceConfigName) throws SpannerException { + GetInstanceConfigRequest request = + GetInstanceConfigRequest.newBuilder().setName(instanceConfigName).build(); + + GrpcCallContext context = newCallContext(null, projectName); + return get(instanceAdminStub.getInstanceConfigCallable().futureCall(request, context)); + } + + @Override + public Paginated listInstances( + int pageSize, @Nullable String pageToken, @Nullable String filter) throws SpannerException { + ListInstancesRequest.Builder requestBuilder = + ListInstancesRequest.newBuilder().setParent(projectName).setPageSize(pageSize); + if (pageToken != null) { + requestBuilder.setPageToken(pageToken); + } + if (filter != null) { + requestBuilder.setFilter(filter); + } + ListInstancesRequest request = requestBuilder.build(); + + GrpcCallContext context = newCallContext(null, projectName); + ListInstancesResponse response = + get(instanceAdminStub.listInstancesCallable().futureCall(request, context)); + return new Paginated<>(response.getInstancesList(), response.getNextPageToken()); + } + + @Override + public OperationFuture createInstance( + String parent, String instanceId, Instance instance) throws SpannerException { + CreateInstanceRequest request = + CreateInstanceRequest.newBuilder() + .setParent(parent) + .setInstanceId(instanceId) + .setInstance(instance) + .build(); + GrpcCallContext context = newCallContext(null, parent); + return instanceAdminStub.createInstanceOperationCallable().futureCall(request, context); + } + + @Override + public OperationFuture updateInstance( + Instance instance, FieldMask fieldMask) throws SpannerException { + UpdateInstanceRequest request = + UpdateInstanceRequest.newBuilder().setInstance(instance).setFieldMask(fieldMask).build(); + GrpcCallContext context = newCallContext(null, instance.getName()); + return instanceAdminStub.updateInstanceOperationCallable().futureCall(request, context); + } + + @Override + public Instance getInstance(String instanceName) throws SpannerException { + GetInstanceRequest request = GetInstanceRequest.newBuilder().setName(instanceName).build(); + + GrpcCallContext context = newCallContext(null, instanceName); + return get(instanceAdminStub.getInstanceCallable().futureCall(request, context)); + } + + @Override + public void deleteInstance(String instanceName) throws SpannerException { + DeleteInstanceRequest request = + DeleteInstanceRequest.newBuilder().setName(instanceName).build(); + + GrpcCallContext context = newCallContext(null, instanceName); + get(instanceAdminStub.deleteInstanceCallable().futureCall(request, context)); + } + + @Override + public Paginated listDatabases( + String instanceName, int pageSize, @Nullable String pageToken) throws SpannerException { + ListDatabasesRequest.Builder requestBuilder = + ListDatabasesRequest.newBuilder().setParent(instanceName).setPageSize(pageSize); + if (pageToken != null) { + requestBuilder.setPageToken(pageToken); + } + ListDatabasesRequest request = requestBuilder.build(); + + GrpcCallContext context = newCallContext(null, instanceName); + ListDatabasesResponse response = + get(databaseAdminStub.listDatabasesCallable().futureCall(request, context)); + return new Paginated<>(response.getDatabasesList(), response.getNextPageToken()); + } + + @Override + public OperationFuture createDatabase( + String instanceName, String createDatabaseStatement, Iterable additionalStatements) + throws SpannerException { + CreateDatabaseRequest request = + CreateDatabaseRequest.newBuilder() + .setParent(instanceName) + .setCreateStatement(createDatabaseStatement) + .addAllExtraStatements(additionalStatements) + .build(); + GrpcCallContext context = newCallContext(null, instanceName); + return databaseAdminStub.createDatabaseOperationCallable().futureCall(request, context); + } + + @Override + public OperationFuture updateDatabaseDdl( + String databaseName, Iterable updateDatabaseStatements, @Nullable String updateId) + throws SpannerException { + UpdateDatabaseDdlRequest request = + UpdateDatabaseDdlRequest.newBuilder() + .setDatabase(databaseName) + .addAllStatements(updateDatabaseStatements) + .setOperationId(MoreObjects.firstNonNull(updateId, "")) + .build(); + GrpcCallContext context = newCallContext(null, databaseName); + OperationCallable callable = + databaseAdminStub.updateDatabaseDdlOperationCallable(); + OperationFuture operationFuture = + callable.futureCall(request, context); + try { + operationFuture.getInitialFuture().get(); + } catch (InterruptedException e) { + throw newSpannerException(e); + } catch (ExecutionException e) { + Throwable t = e.getCause(); + if (t instanceof AlreadyExistsException) { + String operationName = + OPERATION_NAME_TEMPLATE.instantiate("database", databaseName, "operation", updateId); + return callable.resumeFutureCall(operationName, context); + } + } + return operationFuture; + } + + @Override + public void dropDatabase(String databaseName) throws SpannerException { + DropDatabaseRequest request = + DropDatabaseRequest.newBuilder().setDatabase(databaseName).build(); + + GrpcCallContext context = newCallContext(null, databaseName); + get(databaseAdminStub.dropDatabaseCallable().futureCall(request, context)); + } + + @Override + public Database getDatabase(String databaseName) throws SpannerException { + GetDatabaseRequest request = GetDatabaseRequest.newBuilder().setName(databaseName).build(); + + GrpcCallContext context = newCallContext(null, databaseName); + return get(databaseAdminStub.getDatabaseCallable().futureCall(request, context)); + } + + @Override + public List getDatabaseDdl(String databaseName) throws SpannerException { + GetDatabaseDdlRequest request = + GetDatabaseDdlRequest.newBuilder().setDatabase(databaseName).build(); + + GrpcCallContext context = newCallContext(null, databaseName); + return get(databaseAdminStub.getDatabaseDdlCallable().futureCall(request, context)) + .getStatementsList(); + } + + @Override + public Operation getOperation(String name) throws SpannerException { + GetOperationRequest request = GetOperationRequest.newBuilder().setName(name).build(); + GrpcCallContext context = newCallContext(null, name); + return get( + databaseAdminStub.getOperationsStub().getOperationCallable().futureCall(request, context)); + } + + @Override + public Session createSession( + String databaseName, @Nullable Map labels, @Nullable Map options) + throws SpannerException { + CreateSessionRequest.Builder requestBuilder = + CreateSessionRequest.newBuilder().setDatabase(databaseName); + if (labels != null && !labels.isEmpty()) { + Session.Builder session = Session.newBuilder().putAllLabels(labels); + requestBuilder.setSession(session); + } + CreateSessionRequest request = requestBuilder.build(); + GrpcCallContext context = newCallContext(options, databaseName); + return get(spannerStub.createSessionCallable().futureCall(request, context)); + } + + @Override + public void deleteSession(String sessionName, @Nullable Map options) + throws SpannerException { + DeleteSessionRequest request = DeleteSessionRequest.newBuilder().setName(sessionName).build(); + GrpcCallContext context = newCallContext(options, sessionName); + get(spannerStub.deleteSessionCallable().futureCall(request, context)); + } + + @Override + public StreamingCall read( + ReadRequest request, ResultStreamConsumer consumer, @Nullable Map options) { + GrpcCallContext context = newCallContext(options, request.getSession()); + SpannerResponseObserver responseObserver = new SpannerResponseObserver(consumer); + spannerStub.streamingReadCallable().call(request, responseObserver, context); + final StreamController controller = responseObserver.getController(); + return new StreamingCall() { + @Override + public void request(int numMessage) { + controller.request(numMessage); + } + + // TODO(hzyi): streamController currently does not support cancel with message. Add + // this in gax and update this method later + @Override + public void cancel(String message) { + controller.cancel(); + } + }; + } + + @Override + public ResultSet executeQuery(ExecuteSqlRequest request, @Nullable Map options) { + GrpcCallContext context = newCallContext(options, request.getSession()); + return get(spannerStub.executeSqlCallable().futureCall(request, context)); + } + + @Override + public StreamingCall executeQuery( + ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map options) { + GrpcCallContext context = newCallContext(options, request.getSession()); + SpannerResponseObserver responseObserver = new SpannerResponseObserver(consumer); + spannerStub.executeStreamingSqlCallable().call(request, responseObserver, context); + final StreamController controller = responseObserver.getController(); + return new StreamingCall() { + @Override + public void request(int numMessage) { + controller.request(numMessage); + } + + // TODO(hzyi): streamController currently does not support cancel with message. Add + // this in gax and update this method later + @Override + public void cancel(String message) { + controller.cancel(); + } + }; + } + + @Override + public Transaction beginTransaction( + BeginTransactionRequest request, @Nullable Map options) throws SpannerException { + GrpcCallContext context = newCallContext(options, request.getSession()); + return get(spannerStub.beginTransactionCallable().futureCall(request, context)); + } + + @Override + public CommitResponse commit(CommitRequest commitRequest, @Nullable Map options) + throws SpannerException { + GrpcCallContext context = newCallContext(options, commitRequest.getSession()); + return get(spannerStub.commitCallable().futureCall(commitRequest, context)); + } + + @Override + public void rollback(RollbackRequest request, @Nullable Map options) + throws SpannerException { + GrpcCallContext context = newCallContext(options, request.getSession()); + get(spannerStub.rollbackCallable().futureCall(request, context)); + } + + @Override + public PartitionResponse partitionQuery( + PartitionQueryRequest request, @Nullable Map options) throws SpannerException { + GrpcCallContext context = newCallContext(options, request.getSession()); + return get(spannerStub.partitionQueryCallable().futureCall(request, context)); + } + + @Override + public PartitionResponse partitionRead( + PartitionReadRequest request, @Nullable Map options) throws SpannerException { + GrpcCallContext context = newCallContext(options, request.getSession()); + return get(spannerStub.partitionReadCallable().futureCall(request, context)); + } + + /** Gets the result of an async RPC call, handling any exceptions encountered. */ + private static T get(final Future future) throws SpannerException { + final Context context = Context.current(); + try { + return future.get(); + } catch (InterruptedException e) { + // We are the sole consumer of the future, so cancel it. + future.cancel(true); + throw SpannerExceptionFactory.propagateInterrupt(e); + } catch (ExecutionException | CancellationException e) { + throw newSpannerException(context, e); + } + } + + private GrpcCallContext newCallContext(@Nullable Map options, String resource) { + GrpcCallContext context = GrpcCallContext.createDefault(); + if (options != null) { + context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()); + } + context = context.withExtraHeaders(metadataProvider.newExtraHeaders(resource, projectName)); + return context; + } + + public void shutdown() { + this.spannerStub.close(); + this.instanceAdminStub.close(); + this.databaseAdminStub.close(); + } + + /** + * A {@code ResponseObserver} that exposes the {@code StreamController} and delegates callbacks to + * the {@link ResultStreamConsumer}. + */ + private static class SpannerResponseObserver implements ResponseObserver { + private StreamController controller; + private final ResultStreamConsumer consumer; + + public SpannerResponseObserver(ResultStreamConsumer consumer) { + this.consumer = consumer; + } + + @Override + public void onStart(StreamController controller) { + + // Disable the auto flow control to allow client library + // set the number of messages it prefers to request + controller.disableAutoInboundFlowControl(); + this.controller = controller; + } + + @Override + public void onResponse(PartialResultSet response) { + consumer.onPartialResultSet(response); + } + + @Override + public void onError(Throwable t) { + consumer.onError(newSpannerException(t)); + } + + @Override + public void onComplete() { + consumer.onCompleted(); + } + + StreamController getController() { + return Preconditions.checkNotNull(this.controller); + } + } +} diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcSpannerRpc.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcSpannerRpc.java deleted file mode 100644 index 0cbdf614d45e..000000000000 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcSpannerRpc.java +++ /dev/null @@ -1,684 +0,0 @@ -/* - * Copyright 2017 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.spanner.spi.v1; - -import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException; - -import com.google.api.gax.core.GaxProperties; -import com.google.api.gax.grpc.GaxGrpcProperties; -import com.google.api.gax.rpc.ApiClientHeaderProvider; -import com.google.api.gax.rpc.HeaderProvider; -import com.google.api.pathtemplate.PathTemplate; -import com.google.cloud.NoCredentials; -import com.google.cloud.ServiceOptions; -import com.google.cloud.spanner.SpannerException; -import com.google.cloud.spanner.SpannerExceptionFactory; -import com.google.cloud.spanner.SpannerOptions; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.MoreObjects; -import com.google.common.collect.ImmutableList; -import com.google.longrunning.GetOperationRequest; -import com.google.longrunning.Operation; -import com.google.longrunning.OperationsGrpc; -import com.google.protobuf.FieldMask; -import com.google.spanner.admin.database.v1.CreateDatabaseRequest; -import com.google.spanner.admin.database.v1.Database; -import com.google.spanner.admin.database.v1.DatabaseAdminGrpc; -import com.google.spanner.admin.database.v1.DropDatabaseRequest; -import com.google.spanner.admin.database.v1.GetDatabaseDdlRequest; -import com.google.spanner.admin.database.v1.GetDatabaseRequest; -import com.google.spanner.admin.database.v1.ListDatabasesRequest; -import com.google.spanner.admin.database.v1.UpdateDatabaseDdlRequest; -import com.google.spanner.admin.instance.v1.CreateInstanceRequest; -import com.google.spanner.admin.instance.v1.DeleteInstanceRequest; -import com.google.spanner.admin.instance.v1.GetInstanceConfigRequest; -import com.google.spanner.admin.instance.v1.GetInstanceRequest; -import com.google.spanner.admin.instance.v1.Instance; -import com.google.spanner.admin.instance.v1.InstanceAdminGrpc; -import com.google.spanner.admin.instance.v1.InstanceConfig; -import com.google.spanner.admin.instance.v1.ListInstanceConfigsRequest; -import com.google.spanner.admin.instance.v1.ListInstanceConfigsResponse; -import com.google.spanner.admin.instance.v1.ListInstancesRequest; -import com.google.spanner.admin.instance.v1.ListInstancesResponse; -import com.google.spanner.admin.instance.v1.UpdateInstanceRequest; -import com.google.spanner.v1.BeginTransactionRequest; -import com.google.spanner.v1.CommitRequest; -import com.google.spanner.v1.CommitResponse; -import com.google.spanner.v1.CreateSessionRequest; -import com.google.spanner.v1.DeleteSessionRequest; -import com.google.spanner.v1.ExecuteSqlRequest; -import com.google.spanner.v1.PartialResultSet; -import com.google.spanner.v1.PartitionQueryRequest; -import com.google.spanner.v1.PartitionReadRequest; -import com.google.spanner.v1.PartitionResponse; -import com.google.spanner.v1.ReadRequest; -import com.google.spanner.v1.ResultSet; -import com.google.spanner.v1.RollbackRequest; -import com.google.spanner.v1.Session; -import com.google.spanner.v1.SpannerGrpc; -import com.google.spanner.v1.Transaction; -import io.grpc.CallCredentials; -import io.grpc.CallOptions; -import io.grpc.Channel; -import io.grpc.ClientCall; -import io.grpc.ClientInterceptor; -import io.grpc.ClientInterceptors; -import io.grpc.Context; -import io.grpc.ForwardingClientCall; -import io.grpc.ForwardingClientCallListener; -import io.grpc.Metadata; -import io.grpc.MethodDescriptor; -import io.grpc.ServiceDescriptor; -import io.grpc.Status; -import io.grpc.auth.MoreCallCredentials; -import io.grpc.stub.AbstractStub; -import io.grpc.stub.ClientCallStreamObserver; -import io.grpc.stub.ClientCalls; -import io.grpc.stub.ClientResponseObserver; -import io.opencensus.trace.export.SampledSpanStore; -import io.opencensus.trace.Tracing; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.logging.Level; -import java.util.logging.Logger; -import javax.annotation.Nullable; - -/** Implementation of Cloud Spanner remote calls using gRPC. */ -public class GrpcSpannerRpc implements SpannerRpc { - - static { - setupTracingConfig(); - } - - private static final Logger logger = Logger.getLogger(GrpcSpannerRpc.class.getName()); - - private static final PathTemplate PROJECT_NAME_TEMPLATE = - PathTemplate.create("projects/{project}"); - - private final Random random = new Random(); - private final List channels; - private final String projectId; - private final String projectName; - private final CallCredentials credentials; - private final SpannerMetadataProvider metadataProvider; - - public GrpcSpannerRpc(SpannerOptions options) { - this.projectId = options.getProjectId(); - this.projectName = PROJECT_NAME_TEMPLATE.instantiate("project", this.projectId); - this.credentials = callCredentials(options); - ImmutableList.Builder channelsBuilder = ImmutableList.builder(); - ImmutableList.Builder stubsBuilder = ImmutableList.builder(); - for (Channel channel : options.getRpcChannels()) { - channel = - ClientInterceptors.intercept( - channel, - new LoggingInterceptor(Level.FINER), - WatchdogInterceptor.newDefaultWatchdogInterceptor(), - new SpannerErrorInterceptor()); - channelsBuilder.add(channel); - stubsBuilder.add(withCredentials(SpannerGrpc.newFutureStub(channel), credentials)); - } - this.channels = channelsBuilder.build(); - - ApiClientHeaderProvider.Builder internalHeaderProviderBuilder = - ApiClientHeaderProvider.newBuilder(); - ApiClientHeaderProvider internalHeaderProvider = - internalHeaderProviderBuilder - .setClientLibToken( - ServiceOptions.getGoogApiClientLibName(), - GaxProperties.getLibraryVersion(options.getClass())) - .setTransportToken( - GaxGrpcProperties.getGrpcTokenName(), GaxGrpcProperties.getGrpcVersion()) - .build(); - - HeaderProvider mergedHeaderProvider = options.getMergedHeaderProvider(internalHeaderProvider); - this.metadataProvider = - SpannerMetadataProvider.create( - mergedHeaderProvider.getHeaders(), - internalHeaderProviderBuilder.getResourceHeaderKey()); - } - - private static CallCredentials callCredentials(SpannerOptions options) { - if (options.getCredentials() == null) { - return null; - } - if (options.getCredentials().equals(NoCredentials.getInstance())) { - return null; - } - return MoreCallCredentials.from(options.getScopedCredentials()); - } - - private > S withCredentials(S stub, CallCredentials credentials) { - if (credentials == null) { - return stub; - } - return stub.withCallCredentials(credentials); - } - - private String projectName() { - return projectName; - } - - @Override - public Paginated listInstanceConfigs(int pageSize, @Nullable String pageToken) - throws SpannerException { - ListInstanceConfigsRequest.Builder request = - ListInstanceConfigsRequest.newBuilder().setParent(projectName()).setPageSize(0); - if (pageToken != null) { - request.setPageToken(pageToken); - } - ListInstanceConfigsResponse response = - get( - doUnaryCall( - InstanceAdminGrpc.getListInstanceConfigsMethod(), - request.build(), - projectName(), - null)); - return new Paginated<>(response.getInstanceConfigsList(), response.getNextPageToken()); - } - - @Override - public InstanceConfig getInstanceConfig(String instanceConfigName) throws SpannerException { - GetInstanceConfigRequest request = - GetInstanceConfigRequest.newBuilder().setName(instanceConfigName).build(); - return get( - doUnaryCall(InstanceAdminGrpc.getGetInstanceConfigMethod(), request, projectName(), null)); - } - - @Override - public Paginated listInstances( - int pageSize, @Nullable String pageToken, @Nullable String filter) throws SpannerException { - ListInstancesRequest.Builder request = - ListInstancesRequest.newBuilder().setParent(projectName()).setPageSize(pageSize); - if (pageToken != null) { - request.setPageToken(pageToken); - } - if (filter != null) { - request.setFilter(filter); - } - ListInstancesResponse response = - get( - doUnaryCall( - InstanceAdminGrpc.getListInstancesMethod(), request.build(), projectName(), null)); - return new Paginated<>(response.getInstancesList(), response.getNextPageToken()); - } - - @Override - public Operation createInstance(String parent, String instanceId, Instance instance) - throws SpannerException { - CreateInstanceRequest request = - CreateInstanceRequest.newBuilder() - .setParent(parent) - .setInstanceId(instanceId) - .setInstance(instance) - .build(); - return get(doUnaryCall(InstanceAdminGrpc.getCreateInstanceMethod(), request, parent, null)); - } - - @Override - public Operation updateInstance(Instance instance, FieldMask fieldMask) throws SpannerException { - UpdateInstanceRequest request = - UpdateInstanceRequest.newBuilder().setInstance(instance).setFieldMask(fieldMask).build(); - return get( - doUnaryCall(InstanceAdminGrpc.getUpdateInstanceMethod(), request, instance.getName(), null)); - } - - @Override - public Instance getInstance(String instanceName) throws SpannerException { - return get( - doUnaryCall( - InstanceAdminGrpc.getGetInstanceMethod(), - GetInstanceRequest.newBuilder().setName(instanceName).build(), - instanceName, - null)); - } - - @Override - public void deleteInstance(String instanceName) throws SpannerException { - get( - doUnaryCall( - InstanceAdminGrpc.getDeleteInstanceMethod(), - DeleteInstanceRequest.newBuilder().setName(instanceName).build(), - instanceName, - null)); - } - - @Override - public Paginated listDatabases( - String instanceName, int pageSize, @Nullable String pageToken) throws SpannerException { - ListDatabasesRequest.Builder builder = - ListDatabasesRequest.newBuilder().setParent(instanceName).setPageSize(pageSize); - if (pageToken != null) { - builder.setPageToken(pageToken); - } - com.google.spanner.admin.database.v1.ListDatabasesResponse response = - get( - doUnaryCall( - DatabaseAdminGrpc.getListDatabasesMethod(), builder.build(), instanceName, null)); - return new Paginated<>(response.getDatabasesList(), response.getNextPageToken()); - } - - @Override - public Operation createDatabase( - String instanceName, String createDatabaseStatement, Iterable additionalStatements) - throws SpannerException { - CreateDatabaseRequest request = - CreateDatabaseRequest.newBuilder() - .setParent(instanceName) - .setCreateStatement(createDatabaseStatement) - .addAllExtraStatements(additionalStatements) - .build(); - return get(doUnaryCall(DatabaseAdminGrpc.getCreateDatabaseMethod(), request, instanceName, null)); - } - - @Override - public Operation updateDatabaseDdl( - String databaseName, Iterable updateStatements, @Nullable String operationId) - throws SpannerException { - UpdateDatabaseDdlRequest request = - UpdateDatabaseDdlRequest.newBuilder() - .setDatabase(databaseName) - .addAllStatements(updateStatements) - .setOperationId(MoreObjects.firstNonNull(operationId, "")) - .build(); - return get( - doUnaryCall(DatabaseAdminGrpc.getUpdateDatabaseDdlMethod(), request, databaseName, null)); - } - - @Override - public void dropDatabase(String databaseName) throws SpannerException { - get( - doUnaryCall( - DatabaseAdminGrpc.getDropDatabaseMethod(), - DropDatabaseRequest.newBuilder().setDatabase(databaseName).build(), - databaseName, - null)); - } - - @Override - public List getDatabaseDdl(String databaseName) throws SpannerException { - GetDatabaseDdlRequest request = - GetDatabaseDdlRequest.newBuilder().setDatabase(databaseName).build(); - return get(doUnaryCall(DatabaseAdminGrpc.getGetDatabaseDdlMethod(), request, databaseName, null)) - .getStatementsList(); - } - - @Override - public Database getDatabase(String databaseName) throws SpannerException { - return get( - doUnaryCall( - DatabaseAdminGrpc.getGetDatabaseMethod(), - GetDatabaseRequest.newBuilder().setName(databaseName).build(), - databaseName, - null)); - } - - @Override - public Operation getOperation(String name) throws SpannerException { - GetOperationRequest request = GetOperationRequest.newBuilder().setName(name).build(); - return get(doUnaryCall(OperationsGrpc.getGetOperationMethod(), request, name, null)); - } - - @Override - public Session createSession( - String databaseName, @Nullable Map labels, @Nullable Map options) { - CreateSessionRequest.Builder request = - CreateSessionRequest.newBuilder().setDatabase(databaseName); - if (labels != null && !labels.isEmpty()) { - Session.Builder session = Session.newBuilder().putAllLabels(labels); - request.setSession(session); - } - return get( - doUnaryCall( - SpannerGrpc.getCreateSessionMethod(), - request.build(), - databaseName, - Option.CHANNEL_HINT.getLong(options))); - } - - @Override - public void deleteSession(String sessionName, @Nullable Map options) { - DeleteSessionRequest request = DeleteSessionRequest.newBuilder().setName(sessionName).build(); - get( - doUnaryCall( - SpannerGrpc.getDeleteSessionMethod(), - request, - sessionName, - Option.CHANNEL_HINT.getLong(options))); - } - - @Override - public StreamingCall read( - ReadRequest request, ResultStreamConsumer consumer, @Nullable Map options) { - return doStreamingCall( - SpannerGrpc.getStreamingReadMethod(), - request, - consumer, - request.getSession(), - Option.CHANNEL_HINT.getLong(options)); - } - - @Override - public ResultSet executeQuery( - ExecuteSqlRequest request, @Nullable Map options) { - return get( - doUnaryCall( - SpannerGrpc.METHOD_EXECUTE_SQL, - request, - request.getSession(), - Option.CHANNEL_HINT.getLong(options))); - } - - @Override - public StreamingCall executeQuery( - ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map options) { - return doStreamingCall( - SpannerGrpc.getExecuteStreamingSqlMethod(), - request, - consumer, - request.getSession(), - Option.CHANNEL_HINT.getLong(options)); - } - - @Override - public Transaction beginTransaction( - BeginTransactionRequest request, @Nullable Map options) { - return get( - doUnaryCall( - SpannerGrpc.getBeginTransactionMethod(), - request, - request.getSession(), - Option.CHANNEL_HINT.getLong(options))); - } - - @Override - public CommitResponse commit(CommitRequest commitRequest, @Nullable Map options) { - return get( - doUnaryCall( - SpannerGrpc.getCommitMethod(), - commitRequest, - commitRequest.getSession(), - Option.CHANNEL_HINT.getLong(options))); - } - - @Override - public void rollback(RollbackRequest request, @Nullable Map options) { - get( - doUnaryCall( - SpannerGrpc.getRollbackMethod(), - request, - request.getSession(), - Option.CHANNEL_HINT.getLong(options))); - } - - @Override - public PartitionResponse partitionQuery( - PartitionQueryRequest request, @Nullable Map options) - throws SpannerException { - return get( - doUnaryCall( - SpannerGrpc.getPartitionQueryMethod(), - request, - request.getSession(), - Option.CHANNEL_HINT.getLong(options))); - } - - @Override - public PartitionResponse partitionRead( - PartitionReadRequest request, @Nullable Map options) - throws SpannerException { - return get( - doUnaryCall( - SpannerGrpc.getPartitionReadMethod(), - request, - request.getSession(), - Option.CHANNEL_HINT.getLong(options))); - } - - /** Gets the result of an async RPC call, handling any exceptions encountered. */ - private static T get(final Future future) throws SpannerException { - final Context context = Context.current(); - try { - return future.get(); - } catch (InterruptedException e) { - // We are the sole consumer of the future, so cancel it. - future.cancel(true); - throw SpannerExceptionFactory.propagateInterrupt(e); - } catch (ExecutionException | CancellationException e) { - throw newSpannerException(context, e); - } - } - - private Future doUnaryCall( - MethodDescriptor method, - ReqT request, - @Nullable String resource, - @Nullable Long channelHint) { - CallOptions callOptions = - credentials == null - ? CallOptions.DEFAULT - : CallOptions.DEFAULT.withCallCredentials(credentials); - final ClientCall call = - new MetadataClientCall<>( - pick(channelHint, channels).newCall(method, callOptions), - metadataProvider.newMetadata(resource, projectName())); - return ClientCalls.futureUnaryCall(call, request); - } - - private StreamingCall doStreamingCall( - MethodDescriptor method, - T request, - ResultStreamConsumer consumer, - @Nullable String resource, - @Nullable Long channelHint) { - final Context context = Context.current(); - // TODO: Add deadline based on context. - CallOptions callOptions = - credentials == null - ? CallOptions.DEFAULT - : CallOptions.DEFAULT.withCallCredentials(credentials); - final ClientCall call = - new MetadataClientCall<>( - pick(channelHint, channels).newCall(method, callOptions), - metadataProvider.newMetadata(resource, projectName())); - ResultSetStreamObserver observer = new ResultSetStreamObserver(consumer, context, call); - ClientCalls.asyncServerStreamingCall(call, request, observer); - return observer; - } - - @VisibleForTesting - static class MetadataClientCall - extends ForwardingClientCall.SimpleForwardingClientCall { - private final Metadata extraMetadata; - - MetadataClientCall(ClientCall call, Metadata extraMetadata) { - super(call); - this.extraMetadata = extraMetadata; - } - - @Override - public void start(Listener responseListener, Metadata metadata) { - metadata.merge(extraMetadata); - super.start(responseListener, metadata); - } - } - - private T pick(@Nullable Long hint, List elements) { - long hintVal = Math.abs(hint != null ? hint : random.nextLong()); - long index = hintVal % elements.size(); - return elements.get((int) index); - } - - /** - * This is a one time setup for grpcz pages. This adds all of the methods to the Tracing - * environment required to show a consistent set of methods relating to Cloud Bigtable on the - * grpcz page. If HBase artifacts are present, this will add tracing metadata for HBase methods. - * - * TODO: Remove this when we depend on gRPC 1.8 - */ - private static void setupTracingConfig() { - SampledSpanStore store = Tracing.getExportComponent().getSampledSpanStore(); - if (store == null) { - // Tracing implementation is not linked. - return; - } - List descriptors = new ArrayList<>(); - addDescriptor(descriptors, SpannerGrpc.getServiceDescriptor()); - addDescriptor(descriptors, DatabaseAdminGrpc.getServiceDescriptor()); - addDescriptor(descriptors, InstanceAdminGrpc.getServiceDescriptor()); - store.registerSpanNamesForCollection(descriptors); - } - - /** - * Reads a list of {@link MethodDescriptor}s from a {@link ServiceDescriptor} and creates a list - * of Open Census tags. - */ - private static void addDescriptor(List descriptors, ServiceDescriptor serviceDescriptor) { - for (MethodDescriptor method : serviceDescriptor.getMethods()) { - // This is added by a grpc ClientInterceptor - descriptors.add("Sent." + method.getFullMethodName().replace('/', '.')); - } - } - - private static class ResultSetStreamObserver - implements ClientResponseObserver, StreamingCall { - private final ResultStreamConsumer consumer; - private final Context context; - private final ClientCall call; - private volatile ClientCallStreamObserver requestStream; - - public ResultSetStreamObserver( - ResultStreamConsumer consumer, Context context, ClientCall call) { - this.consumer = consumer; - this.context = context; - this.call = call; - } - - @Override - public void beforeStart(final ClientCallStreamObserver requestStream) { - this.requestStream = requestStream; - requestStream.disableAutoInboundFlowControl(); - } - - @Override - public void onNext(PartialResultSet value) { - consumer.onPartialResultSet(value); - } - - @Override - public void onError(Throwable t) { - consumer.onError(newSpannerException(context, t)); - } - - @Override - public void onCompleted() { - consumer.onCompleted(); - } - - @Override - public void request(int numMessages) { - requestStream.request(numMessages); - } - - @Override - public void cancel(@Nullable String message) { - call.cancel(message, null); - } - } - - private static class LoggingInterceptor implements ClientInterceptor { - private final Level level; - - LoggingInterceptor(Level level) { - this.level = level; - } - - private class CallLogger { - private final MethodDescriptor method; - - CallLogger(MethodDescriptor method) { - this.method = method; - } - - void log(String message) { - logger.log( - level, - "{0}[{1}]: {2}", - new Object[] { - method.getFullMethodName(), - Integer.toHexString(System.identityHashCode(this)), - message - }); - } - - void logfmt(String message, Object... params) { - log(String.format(message, params)); - } - } - - @Override - public ClientCall interceptCall( - MethodDescriptor method, CallOptions callOptions, Channel next) { - if (!logger.isLoggable(level)) { - return next.newCall(method, callOptions); - } - - final CallLogger callLogger = new CallLogger(method); - callLogger.log("Start"); - return new ForwardingClientCall.SimpleForwardingClientCall( - next.newCall(method, callOptions)) { - @Override - public void start(Listener responseListener, Metadata headers) { - super.start( - new ForwardingClientCallListener.SimpleForwardingClientCallListener( - responseListener) { - @Override - public void onMessage(RespT message) { - callLogger.logfmt("Received:\n%s", message); - super.onMessage(message); - } - - @Override - public void onClose(Status status, Metadata trailers) { - callLogger.logfmt("Closed with status %s and trailers %s", status, trailers); - super.onClose(status, trailers); - } - }, - headers); - } - - @Override - public void sendMessage(ReqT message) { - callLogger.logfmt("Send:\n%s", message); - super.sendMessage(message); - } - - @Override - public void cancel(@Nullable String message, @Nullable Throwable cause) { - callLogger.logfmt("Cancelled with message %s", message); - super.cancel(message, cause); - } - }; - } - } -} diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/LoggingInterceptor.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/LoggingInterceptor.java new file mode 100644 index 000000000000..44571b1a6523 --- /dev/null +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/LoggingInterceptor.java @@ -0,0 +1,108 @@ +/* + * Copyright 2018 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.spanner.spi.v1; + +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ForwardingClientCall; +import io.grpc.ForwardingClientCallListener; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Status; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; + +/** Adds logging to rpc calls */ +class LoggingInterceptor implements ClientInterceptor { + + private final Logger logger; + private final Level level; + + LoggingInterceptor(Logger logger, Level level) { + this.logger = logger; + this.level = level; + } + + private class CallLogger { + + private final MethodDescriptor method; + + CallLogger(MethodDescriptor method) { + this.method = method; + } + + void log(String message) { + logger.log( + level, + "{0}[{1}]: {2}", + new Object[] { + method.getFullMethodName(), Integer.toHexString(System.identityHashCode(this)), message + }); + } + + void logfmt(String message, Object... params) { + log(String.format(message, params)); + } + } + + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + if (!logger.isLoggable(level)) { + return next.newCall(method, callOptions); + } + + final CallLogger callLogger = new CallLogger(method); + callLogger.log("Start"); + return new ForwardingClientCall.SimpleForwardingClientCall( + next.newCall(method, callOptions)) { + @Override + public void start(Listener responseListener, Metadata headers) { + super.start( + new ForwardingClientCallListener.SimpleForwardingClientCallListener( + responseListener) { + @Override + public void onMessage(RespT message) { + callLogger.logfmt("Received:\n%s", message); + super.onMessage(message); + } + + @Override + public void onClose(Status status, Metadata trailers) { + callLogger.logfmt("Closed with status %s and trailers %s", status, trailers); + super.onClose(status, trailers); + } + }, + headers); + } + + @Override + public void sendMessage(ReqT message) { + callLogger.logfmt("Send:\n%s", message); + super.sendMessage(message); + } + + @Override + public void cancel(@Nullable String message, @Nullable Throwable cause) { + callLogger.logfmt("Cancelled with message %s", message); + super.cancel(message, cause); + } + }; + } +} diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerInterceptorProvider.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerInterceptorProvider.java new file mode 100644 index 000000000000..c51966837d00 --- /dev/null +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerInterceptorProvider.java @@ -0,0 +1,62 @@ +/* + * Copyright 2018 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.spanner.spi.v1; + +import com.google.api.core.InternalApi; +import com.google.api.gax.grpc.GrpcInterceptorProvider; +import com.google.common.collect.ImmutableList; +import io.grpc.ClientInterceptor; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * For internal use only. An interceptor provider that provides a list of grpc interceptors for + * {@code GapicSpannerRpc} to handle logging and error augmentation by intercepting grpc calls. + */ +@InternalApi("Exposed for testing") +public class SpannerInterceptorProvider implements GrpcInterceptorProvider { + + private static final List defaultInterceptors = + ImmutableList.of( + new SpannerErrorInterceptor(), + new LoggingInterceptor(Logger.getLogger(GapicSpannerRpc.class.getName()), Level.FINER), + WatchdogInterceptor.newDefaultWatchdogInterceptor()); + + private final List clientInterceptors; + + private SpannerInterceptorProvider(List clientInterceptors) { + this.clientInterceptors = clientInterceptors; + } + + public static SpannerInterceptorProvider createDefault() { + return new SpannerInterceptorProvider(defaultInterceptors); + } + + public SpannerInterceptorProvider with(ClientInterceptor clientInterceptor) { + List interceptors = + ImmutableList.builder() + .addAll(this.clientInterceptors) + .add(clientInterceptor) + .build(); + return new SpannerInterceptorProvider(interceptors); + } + + @Override + public List getInterceptors() { + return clientInterceptors; + } +} diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProvider.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProvider.java index f80eb90b12c1..7bbe6a31a1e7 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProvider.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProvider.java @@ -18,16 +18,16 @@ import com.google.common.collect.ImmutableMap; import io.grpc.Metadata; import io.grpc.Metadata.Key; +import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; -/** - * For internal use only. - */ +/** For internal use only. */ class SpannerMetadataProvider { private final Map, String> headers; - private final Key resourceHeaderKey; + private final String resourceHeaderKey; private static final Pattern[] RESOURCE_TOKEN_PATTERNS = { Pattern.compile("^(?projects/[^/]*/instances/[^/]*/databases/[^/]*)(.*)?"), @@ -35,7 +35,7 @@ class SpannerMetadataProvider { }; private SpannerMetadataProvider(Map headers, String resourceHeaderKey) { - this.resourceHeaderKey = Key.of(resourceHeaderKey, Metadata.ASCII_STRING_MARSHALLER); + this.resourceHeaderKey = resourceHeaderKey; this.headers = constructHeadersAsMetadata(headers); } @@ -50,11 +50,21 @@ Metadata newMetadata(String resourceTokenTemplate, String defaultResourceToken) } metadata.put( - resourceHeaderKey, getResourceHeaderValue(resourceTokenTemplate, defaultResourceToken)); + Key.of(resourceHeaderKey, Metadata.ASCII_STRING_MARSHALLER), + getResourceHeaderValue(resourceTokenTemplate, defaultResourceToken)); return metadata; } + Map> newExtraHeaders( + String resourceTokenTemplate, String defaultResourceToken) { + return ImmutableMap.>builder() + .put( + resourceHeaderKey, + Arrays.asList(getResourceHeaderValue(resourceTokenTemplate, defaultResourceToken))) + .build(); + } + private Map, String> constructHeadersAsMetadata( Map headers) { ImmutableMap.Builder, String> headersAsMetadataBuilder = diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java index c07a80af8ae8..558311c2215c 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java @@ -16,15 +16,21 @@ package com.google.cloud.spanner.spi.v1; +import com.google.api.gax.longrunning.OperationFuture; import com.google.cloud.ServiceRpc; import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.spi.v1.SpannerRpc.Option; import com.google.common.collect.ImmutableList; import com.google.longrunning.Operation; +import com.google.protobuf.Empty; import com.google.protobuf.FieldMask; +import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; import com.google.spanner.admin.database.v1.Database; +import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata; +import com.google.spanner.admin.instance.v1.CreateInstanceMetadata; import com.google.spanner.admin.instance.v1.Instance; import com.google.spanner.admin.instance.v1.InstanceConfig; +import com.google.spanner.admin.instance.v1.UpdateInstanceMetadata; import com.google.spanner.v1.BeginTransactionRequest; import com.google.spanner.v1.CommitRequest; import com.google.spanner.v1.CommitResponse; @@ -163,10 +169,11 @@ Paginated listInstanceConfigs(int pageSize, @Nullable String pag Paginated listInstances( int pageSize, @Nullable String pageToken, @Nullable String filter) throws SpannerException; - Operation createInstance(String parent, String instanceId, Instance instance) - throws SpannerException; + OperationFuture createInstance( + String parent, String instanceId, Instance instance) throws SpannerException; - Operation updateInstance(Instance instance, FieldMask fieldMask) throws SpannerException; + OperationFuture updateInstance( + Instance instance, FieldMask fieldMask) throws SpannerException; Instance getInstance(String instanceName) throws SpannerException; @@ -176,11 +183,11 @@ Operation createInstance(String parent, String instanceId, Instance instance) Paginated listDatabases(String instanceName, int pageSize, @Nullable String pageToken) throws SpannerException; - Operation createDatabase( + OperationFuture createDatabase( String instanceName, String createDatabaseStatement, Iterable additionalStatements) throws SpannerException; - Operation updateDatabaseDdl( + OperationFuture updateDatabaseDdl( String databaseName, Iterable updateDatabaseStatements, @Nullable String updateId) throws SpannerException; @@ -218,7 +225,8 @@ PartitionResponse partitionQuery( PartitionQueryRequest request, @Nullable Map options) throws SpannerException; - PartitionResponse partitionRead( - PartitionReadRequest request, @Nullable Map options) + PartitionResponse partitionRead(PartitionReadRequest request, @Nullable Map options) throws SpannerException; + + public void shutdown(); } diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/testing/RemoteSpannerHelper.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/testing/RemoteSpannerHelper.java index 73ae5f05ceaf..09a314464295 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/testing/RemoteSpannerHelper.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/testing/RemoteSpannerHelper.java @@ -16,13 +16,14 @@ package com.google.cloud.spanner.testing; +import com.google.api.gax.longrunning.OperationFuture; import com.google.cloud.spanner.BatchClient; import com.google.cloud.spanner.Database; import com.google.cloud.spanner.DatabaseClient; import com.google.cloud.spanner.InstanceId; -import com.google.cloud.spanner.Operation; import com.google.cloud.spanner.Spanner; import com.google.cloud.spanner.SpannerException; +import com.google.cloud.spanner.SpannerExceptionFactory; import com.google.cloud.spanner.SpannerOptions; import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; import java.util.ArrayList; @@ -95,13 +96,18 @@ public String getUniqueDatabaseId() { */ public Database createTestDatabase(Iterable statements) throws SpannerException { String dbId = getUniqueDatabaseId(); - Operation op = - client.getDatabaseAdminClient().createDatabase(instanceId.getInstance(), dbId, statements); - op = op.waitFor(); - Database db = op.getResult(); - logger.log(Level.FINE, "Created test database {0}", db.getId()); - dbs.add(db); - return db; + try { + OperationFuture op = + client + .getDatabaseAdminClient() + .createDatabase(instanceId.getInstance(), dbId, statements); + Database db = op.get(); + logger.log(Level.FINE, "Created test database {0}", db.getId()); + dbs.add(db); + return db; + } catch (Exception e) { + throw SpannerExceptionFactory.newSpannerException(e); + } } /** Deletes all the databases created via {@code createTestDatabase}. Shuts down the client. */ diff --git a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java index f7a0c95acfe3..395e11cbea7b 100644 --- a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java +++ b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - + package com.google.cloud.spanner; import static com.google.common.truth.Truth.assertThat; @@ -48,7 +48,7 @@ public final class BatchClientImplTest { private static final ByteString TXN_ID = ByteString.copyFromUtf8("my-txn"); private static final String TIMESTAMP = "2017-11-15T10:54:20Z"; - @Mock private SpannerRpc rpc; + @Mock private SpannerRpc gapicRpc; @Mock private SpannerOptions spannerOptions; @Captor private ArgumentCaptor> optionsCaptor; @Mock private BatchTransactionId txnID; @@ -59,20 +59,21 @@ public final class BatchClientImplTest { public void setUp() { initMocks(this); DatabaseId db = DatabaseId.of(DB_NAME); - SpannerImpl spanner = new SpannerImpl(rpc, 1, spannerOptions); + SpannerImpl spanner = new SpannerImpl(gapicRpc, 1, spannerOptions); client = new BatchClientImpl(db, spanner); } @Test public void testBatchReadOnlyTxnWithBound() throws Exception { Session sessionProto = Session.newBuilder().setName(SESSION_NAME).build(); - when(rpc.createSession(eq(DB_NAME), (Map) anyMap(), optionsCaptor.capture())) + when(gapicRpc.createSession( + eq(DB_NAME), (Map) anyMap(), optionsCaptor.capture())) .thenReturn(sessionProto); com.google.protobuf.Timestamp timestamp = Timestamps.parse(TIMESTAMP); Transaction txnMetadata = Transaction.newBuilder().setId(TXN_ID).setReadTimestamp(timestamp).build(); - when(spannerOptions.getSpannerRpcV1()).thenReturn(rpc); - when(rpc.beginTransaction(Mockito.any(), optionsCaptor.capture())) + when(spannerOptions.getSpannerRpcV1()).thenReturn(gapicRpc); + when(gapicRpc.beginTransaction(Mockito.any(), optionsCaptor.capture())) .thenReturn(txnMetadata); BatchReadOnlyTransaction batchTxn = client.batchReadOnlyTransaction(TimestampBound.strong()); diff --git a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseAdminClientImplTest.java b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseAdminClientImplTest.java index a39630732386..8baa98d7ac32 100644 --- a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseAdminClientImplTest.java +++ b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseAdminClientImplTest.java @@ -21,11 +21,13 @@ import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; +import com.google.api.gax.longrunning.OperationFuture; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.cloud.spanner.spi.v1.SpannerRpc.Paginated; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.protobuf.Any; +import com.google.protobuf.Empty; import com.google.protobuf.Message; import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; import com.google.spanner.admin.database.v1.Database; @@ -82,45 +84,54 @@ public void getDatabase() { } @Test - public void createDatabase() { + public void createDatabase() throws Exception { + OperationFuture rawOperationFuture = + OperationFutureUtil.immediateOperationFuture( + "createDatabase", getDatabaseProto(), CreateDatabaseMetadata.getDefaultInstance()); when(rpc.createDatabase( INSTANCE_NAME, "CREATE DATABASE `" + DB_ID + "`", Collections.emptyList())) - .thenReturn( - com.google.longrunning.Operation.newBuilder() - .setDone(true) - .setName("my-op") - .setResponse(toAny(getDatabaseProto())) - .build()); - Operation op = + .thenReturn(rawOperationFuture); + OperationFuture op = client.createDatabase(INSTANCE_ID, DB_ID, Collections.emptyList()); assertThat(op.isDone()).isTrue(); - assertThat(op.getResult().getId().getName()).isEqualTo(DB_NAME); + assertThat(op.get().getId().getName()).isEqualTo(DB_NAME); } @Test - public void updateDatabaseDdl() { + public void updateDatabaseDdl() throws Exception { String opName = DB_NAME + "/operations/myop"; String opId = "myop"; List ddl = ImmutableList.of(); - when(rpc.updateDatabaseDdl(DB_NAME, ddl, opId)) - .thenReturn( - com.google.longrunning.Operation.newBuilder().setDone(true).setName(opName).build()); - Operation op = + OperationFuture rawOperationFuture = + OperationFutureUtil.immediateOperationFuture( + opName, Empty.getDefaultInstance(), UpdateDatabaseDdlMetadata.getDefaultInstance()); + when(rpc.updateDatabaseDdl(DB_NAME, ddl, opId)).thenReturn(rawOperationFuture); + OperationFuture op = client.updateDatabaseDdl(INSTANCE_ID, DB_ID, ddl, opId); assertThat(op.isDone()).isTrue(); assertThat(op.getName()).isEqualTo(opName); } @Test - public void updateDatabaseDdlOpAlreadyExists() { - String opName = DB_NAME + "/operations/myop"; - String opId = "myop"; + public void updateDatabaseDdlOpAlreadyExists() throws Exception { + String originalOpName = DB_NAME + "/operations/originalop"; List ddl = ImmutableList.of(); - when(rpc.updateDatabaseDdl(DB_NAME, ddl, opId)) - .thenThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ALREADY_EXISTS, "")); - Operation op = - client.updateDatabaseDdl(INSTANCE_ID, DB_ID, ddl, opId); - assertThat(op.getName()).isEqualTo(opName); + OperationFuture originalOp = + OperationFutureUtil.immediateOperationFuture( + originalOpName, + Empty.getDefaultInstance(), + UpdateDatabaseDdlMetadata.getDefaultInstance()); + + String newOpName = DB_NAME + "/operations/newop"; + String newOpId = "newop"; + OperationFuture newop = + OperationFutureUtil.immediateOperationFuture( + newOpName, Empty.getDefaultInstance(), UpdateDatabaseDdlMetadata.getDefaultInstance()); + + when(rpc.updateDatabaseDdl(DB_NAME, ddl, newOpId)).thenReturn(originalOp); + OperationFuture op = + client.updateDatabaseDdl(INSTANCE_ID, DB_ID, ddl, newOpId); + assertThat(op.getName()).isEqualTo(originalOpName); } @Test diff --git a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GceTestEnvConfig.java b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GceTestEnvConfig.java index 422d9b43f525..b49fb1ef5d70 100644 --- a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GceTestEnvConfig.java +++ b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GceTestEnvConfig.java @@ -18,7 +18,7 @@ import static com.google.common.base.Preconditions.checkState; -import com.google.common.collect.ImmutableList; +import com.google.cloud.spanner.spi.v1.SpannerInterceptorProvider; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; @@ -55,10 +55,9 @@ public GceTestEnvConfig() { } options = builder - .setRpcChannelFactory( - new SpannerOptions.NettyRpcChannelFactory( - null, - ImmutableList.of(new GrpcErrorInjector(errorProbability)))) + .setInterceptorProvider( + SpannerInterceptorProvider.createDefault() + .with(new GrpcErrorInjector(errorProbability))) .build(); } diff --git a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InstanceAdminClientImplTest.java b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InstanceAdminClientImplTest.java index c9b31f9d2264..7fb451f508c5 100644 --- a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InstanceAdminClientImplTest.java +++ b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InstanceAdminClientImplTest.java @@ -21,6 +21,7 @@ import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; +import com.google.api.gax.longrunning.OperationFuture; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.cloud.spanner.spi.v1.SpannerRpc.Paginated; import com.google.common.collect.ImmutableList; @@ -100,22 +101,21 @@ private com.google.spanner.admin.instance.v1.Instance getAnotherInstanceProto() } @Test - public void createInstance() { + public void createInstance() throws Exception { + OperationFuture + rawOperationFuture = + OperationFutureUtil.immediateOperationFuture( + "createInstance", getInstanceProto(), CreateInstanceMetadata.getDefaultInstance()); when(rpc.createInstance("projects/" + PROJECT_ID, INSTANCE_ID, getInstanceProto())) - .thenReturn( - com.google.longrunning.Operation.newBuilder() - .setDone(true) - .setName(INSTANCE_NAME + "/operations/op") - .setResponse(DatabaseAdminClientImplTest.toAny(getInstanceProto())) - .build()); - Operation op = + .thenReturn(rawOperationFuture); + OperationFuture op = client.createInstance( InstanceInfo.newBuilder(InstanceId.of(PROJECT_ID, INSTANCE_ID)) .setInstanceConfigId(InstanceConfigId.of(PROJECT_ID, CONFIG_ID)) .setNodeCount(1) .build()); assertThat(op.isDone()).isTrue(); - assertThat(op.getResult().getId().getName()).isEqualTo(INSTANCE_NAME); + assertThat(op.get().getId().getName()).isEqualTo(INSTANCE_NAME); } @Test @@ -131,29 +131,28 @@ public void dropInstance() { } @Test - public void updateInstanceMetadata() { + public void updateInstanceMetadata() throws Exception { com.google.spanner.admin.instance.v1.Instance instance = com.google.spanner.admin.instance.v1.Instance.newBuilder() .setName(INSTANCE_NAME) .setConfig(CONFIG_NAME) .setNodeCount(2) .build(); + OperationFuture + rawOperationFuture = + OperationFutureUtil.immediateOperationFuture( + "updateInstance", getInstanceProto(), UpdateInstanceMetadata.getDefaultInstance()); when(rpc.updateInstance(instance, FieldMask.newBuilder().addPaths("node_count").build())) - .thenReturn( - com.google.longrunning.Operation.newBuilder() - .setDone(true) - .setName(INSTANCE_NAME + "/operations/op") - .setResponse(DatabaseAdminClientImplTest.toAny(instance)) - .build()); + .thenReturn(rawOperationFuture); InstanceInfo instanceInfo = InstanceInfo.newBuilder(InstanceId.of(INSTANCE_NAME)) .setInstanceConfigId(InstanceConfigId.of(CONFIG_NAME)) .setNodeCount(2) .build(); - Operation op = + OperationFuture op = client.updateInstance(instanceInfo, InstanceInfo.InstanceField.NODE_COUNT); assertThat(op.isDone()).isTrue(); - assertThat(op.getResult().getId().getName()).isEqualTo(INSTANCE_NAME); + assertThat(op.get().getId().getName()).isEqualTo(INSTANCE_NAME); } @Test diff --git a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestEnv.java b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestEnv.java index 1bfb6c8372e0..88999969d4c1 100644 --- a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestEnv.java +++ b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestEnv.java @@ -18,14 +18,14 @@ import static com.google.common.base.Preconditions.checkState; -import com.google.cloud.RetryOption; +import com.google.api.gax.longrunning.OperationFuture; import com.google.cloud.spanner.testing.RemoteSpannerHelper; import com.google.common.collect.Iterators; import com.google.spanner.admin.instance.v1.CreateInstanceMetadata; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import org.junit.rules.ExternalResource; -import org.threeten.bp.Duration; /** * JUnit 4 test rule that provides access to a Cloud Spanner instance to use for tests, and allows @@ -113,9 +113,14 @@ private void initializeInstance(InstanceId instanceId) { .setDisplayName("Test instance") .setInstanceConfigId(configId) .build(); - Operation op = instanceAdminClient.createInstance(instance); - op = op.waitFor(RetryOption.initialRetryDelay(Duration.ofMillis(500L))); - Instance createdInstance = op.getResult(); + OperationFuture op = + instanceAdminClient.createInstance(instance); + Instance createdInstance; + try { + createdInstance = op.get(500L, TimeUnit.MILLISECONDS); + } catch (Exception e) { + throw SpannerExceptionFactory.newSpannerException(e); + } logger.log(Level.INFO, "Created test instance: {0}", createdInstance.getId()); } diff --git a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OperationFutureUtil.java b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OperationFutureUtil.java new file mode 100644 index 000000000000..43471b5b700b --- /dev/null +++ b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OperationFutureUtil.java @@ -0,0 +1,275 @@ +/* + * Copyright 2018 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.spanner; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; +import com.google.api.gax.longrunning.OperationFuture; +import com.google.api.gax.longrunning.OperationSnapshot; +import com.google.api.gax.retrying.RetryingFuture; +import com.google.api.gax.retrying.TimedAttemptSettings; +import com.google.api.gax.rpc.StatusCode; +import com.google.common.base.Preconditions; +import com.google.protobuf.Any; +import com.google.protobuf.Message; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +// TODO(hzyi): add a public FakeOperationSnapshot in gax to support testing +class OperationFutureUtil { + + private OperationFutureUtil() { + // Utility class + } + + public static class FakeStatusCode implements StatusCode { + private final Code code; + + public FakeStatusCode(Code code) { + this.code = code; + } + + @Override + public Code getCode() { + return code; + } + + @Override + public Code getTransportCode() { + return getCode(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + FakeStatusCode that = (FakeStatusCode) o; + + return code == that.code; + } + + @Override + public int hashCode() { + return code != null ? code.hashCode() : 0; + } + + public static FakeStatusCode of(Code code) { + return new FakeStatusCode(code); + } + } + + public static final + OperationSnapshot completedSnapshot( + final String name, final ResponseT response, final MetadataT metadata) { + return new OperationSnapshot() { + @Override + public String getName() { + return name; + } + + @Override + public Object getMetadata() { + return Any.pack(metadata); + } + + @Override + public Object getResponse() { + return Any.pack(response); + } + + @Override + public boolean isDone() { + return true; + } + + @Override + public StatusCode getErrorCode() { + return FakeStatusCode.of(StatusCode.Code.OK); + } + + @Override + public String getErrorMessage() { + return null; + } + }; + } + + /** Already-completed {@code ImmediateRetryingFuture}, useful for testing. */ + public static final class ImmediateRetryingFuture implements RetryingFuture { + + private final ApiFuture immediateFuture; + private ApiFuture attemptFuture; + + ImmediateRetryingFuture(V response) { + this.immediateFuture = ApiFutures.immediateFuture(response); + } + + @Override + public void addListener(Runnable runnable, Executor executor) { + immediateFuture.addListener(runnable, executor); + } + + @Override + public V get(long time, TimeUnit unit) + throws ExecutionException, InterruptedException, TimeoutException { + return get(); + } + + @Override + public V get() throws ExecutionException, InterruptedException { + return immediateFuture.get(); + } + + @Override + public boolean isDone() { + return true; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean cancel(boolean b) { + return false; + } + + @Override + public void setAttemptFuture(ApiFuture attemptFuture) { + this.attemptFuture = attemptFuture; + } + + @Override + public ApiFuture getAttemptResult() { + return this.attemptFuture; + } + + @Override + public TimedAttemptSettings getAttemptSettings() { + throw new UnsupportedOperationException("Not implemented: getAttemptSettings()"); + } + + @Override + public Callable getCallable() { + throw new UnsupportedOperationException("Not implemented: getCallable()"); + } + + @Override + public ApiFuture peekAttemptResult() { + return this.attemptFuture; + } + } + + public static final RetryingFuture immediateRetryingFuture( + final ResponseT response) { + return new ImmediateRetryingFuture(response); + } + + public static final + OperationFuture immediateOperationFuture( + final String name, final ResponseT response, final MetadataT metadata) { + return immediateOperationFuture(completedSnapshot(name, response, metadata)); + } + + /** + * Creates an already-completed {@code OperationFuture}, useful for testing. + * + *

{@code completedSnapshot.isDone()} must return true. The snapshot's {@code getResponse()} + * and {@code getMetadata()} must be instances of {@code ResponseT} and {@code MetadataT}, + * respectively. + */ + @SuppressWarnings("unchecked") + public static final + OperationFuture immediateOperationFuture( + final OperationSnapshot completedSnapshot) { + + Preconditions.checkArgument( + completedSnapshot.isDone(), "given snapshot must already be completed"); + final ApiFuture metadataFuture = + ApiFutures.immediateFuture((MetadataT) completedSnapshot.getMetadata()); + final ApiFuture initialFuture = + ApiFutures.immediateFuture(completedSnapshot); + final RetryingFuture pollingFuture = + immediateRetryingFuture(completedSnapshot); + + return new OperationFuture() { + @Override + public String getName() { + return completedSnapshot.getName(); + } + + @Override + public ApiFuture getMetadata() { + return metadataFuture; + } + + @Override + public ApiFuture peekMetadata() { + return metadataFuture; + } + + @Override + public ApiFuture getInitialFuture() { + return initialFuture; + } + + @Override + public RetryingFuture getPollingFuture() { + return pollingFuture; + } + + @Override + public void addListener(Runnable runnable, Executor executor) { + pollingFuture.addListener(runnable, executor); + } + + @Override + public ResponseT get(long time, TimeUnit unit) + throws ExecutionException, InterruptedException, TimeoutException { + return get(); + } + + @Override + public ResponseT get() throws ExecutionException, InterruptedException { + return (ResponseT) pollingFuture.get().getResponse(); + } + + @Override + public boolean isDone() { + return true; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean cancel(boolean b) { + return false; + } + }; + } +} diff --git a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java index 731cd1270d8d..1cc406b8a851 100644 --- a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java +++ b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java @@ -18,13 +18,9 @@ import static com.google.common.truth.Truth.assertThat; +import com.google.cloud.TransportOptions; import java.util.HashMap; import java.util.Map; - -import com.google.cloud.TransportOptions; - -import io.grpc.ManagedChannel; -import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -36,26 +32,13 @@ @RunWith(JUnit4.class) public class SpannerOptionsTest { - @Rule - public ExpectedException thrown = ExpectedException.none(); - - private static class TestChannelFactory implements SpannerOptions.RpcChannelFactory { - @Override - public ManagedChannel newChannel(String host, int port) { - // Disable SSL to avoid a dependency on ALPN/NPN. - return NettyChannelBuilder.forAddress(host, port).usePlaintext(true).build(); - } - } + @Rule public ExpectedException thrown = ExpectedException.none(); @Test public void defaultBuilder() { // We need to set the project id since in test environment we cannot obtain a default project // id. - SpannerOptions options = - SpannerOptions.newBuilder() - .setProjectId("test-project") - .setRpcChannelFactory(new TestChannelFactory()) - .build(); + SpannerOptions options = SpannerOptions.newBuilder().setProjectId("test-project").build(); assertThat(options.getHost()).isEqualTo("https://spanner.googleapis.com"); assertThat(options.getPrefetchChunks()).isEqualTo(4); assertThat(options.getSessionLabels()).isNull(); @@ -69,7 +52,6 @@ public void builder() { labels.put("env", "dev"); SpannerOptions options = SpannerOptions.newBuilder() - .setRpcChannelFactory(new TestChannelFactory()) .setHost(host) .setProjectId(projectId) .setPrefetchChunks(2) diff --git a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDatabaseAdminTest.java b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDatabaseAdminTest.java index af76dc4a5288..04872090bb34 100644 --- a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDatabaseAdminTest.java +++ b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDatabaseAdminTest.java @@ -19,13 +19,13 @@ import static com.google.cloud.spanner.SpannerMatchers.isSpannerException; import static com.google.common.truth.Truth.assertThat; +import com.google.api.gax.longrunning.OperationFuture; import com.google.api.gax.paging.Page; import com.google.cloud.spanner.Database; import com.google.cloud.spanner.DatabaseAdminClient; import com.google.cloud.spanner.ErrorCode; import com.google.cloud.spanner.IntegrationTest; import com.google.cloud.spanner.IntegrationTestEnv; -import com.google.cloud.spanner.Operation; import com.google.cloud.spanner.Options; import com.google.cloud.spanner.testing.RemoteSpannerHelper; import com.google.common.collect.ImmutableList; @@ -74,10 +74,9 @@ public void databaseOperations() throws Exception { String dbId = testHelper.getUniqueDatabaseId(); String instanceId = testHelper.getInstanceId().getInstance(); String statement1 = "CREATE TABLE T (\n" + " K STRING(MAX),\n" + ") PRIMARY KEY(K)"; - Operation op = + OperationFuture op = dbAdminClient.createDatabase(instanceId, dbId, ImmutableList.of(statement1)); - op = op.waitFor(); - Database db = op.getResult(); + Database db = op.get(); dbs.add(db); assertThat(db.getId().getDatabase()).isEqualTo(dbId); @@ -95,9 +94,9 @@ public void databaseOperations() throws Exception { assertThat(foundDb).isTrue(); String statement2 = "CREATE TABLE T2 (\n" + " K2 STRING(MAX),\n" + ") PRIMARY KEY(K2)"; - Operation op2 = + OperationFuture op2 = dbAdminClient.updateDatabaseDdl(instanceId, dbId, ImmutableList.of(statement2), null); - op2.waitFor(); + op2.get(); List statementsInDb = dbAdminClient.getDatabaseDdl(instanceId, dbId); assertThat(statementsInDb).containsExactly(statement1, statement2); @@ -112,19 +111,18 @@ public void updateDdlRetry() throws Exception { String dbId = testHelper.getUniqueDatabaseId(); String instanceId = testHelper.getInstanceId().getInstance(); String statement1 = "CREATE TABLE T (\n" + " K STRING(MAX),\n" + ") PRIMARY KEY(K)"; - Operation op = + OperationFuture op = dbAdminClient.createDatabase(instanceId, dbId, ImmutableList.of(statement1)); - op = op.waitFor(); - Database db = op.getResult(); + Database db = op.get(); dbs.add(db); String statement2 = "CREATE TABLE T2 (\n" + " K2 STRING(MAX),\n" + ") PRIMARY KEY(K2)"; - Operation op1 = + OperationFuture op1 = dbAdminClient.updateDatabaseDdl(instanceId, dbId, ImmutableList.of(statement2), "myop"); - Operation op2 = + OperationFuture op2 = dbAdminClient.updateDatabaseDdl(instanceId, dbId, ImmutableList.of(statement2), "myop"); - op1 = op1.waitFor(); - op2 = op2.waitFor(); - assertThat(op1.getMetadata()).isEqualTo(op2.getMetadata()); + op1.get(); + op2.get(); + assertThat(op1.getMetadata().get()).isEqualTo(op2.getMetadata().get()); } @Test @@ -132,10 +130,9 @@ public void databaseOperationsViaEntity() throws Exception { String dbId = testHelper.getUniqueDatabaseId(); String instanceId = testHelper.getInstanceId().getInstance(); String statement1 = "CREATE TABLE T (\n" + " K STRING(MAX),\n" + ") PRIMARY KEY(K)"; - Operation op = + OperationFuture op = dbAdminClient.createDatabase(instanceId, dbId, ImmutableList.of(statement1)); - op = op.waitFor(); - Database db = op.getResult(); + Database db = op.get(); dbs.add(db); assertThat(db.getId().getDatabase()).isEqualTo(dbId); @@ -143,8 +140,8 @@ public void databaseOperationsViaEntity() throws Exception { assertThat(db.getId().getDatabase()).isEqualTo(dbId); String statement2 = "CREATE TABLE T2 (\n" + " K2 STRING(MAX),\n" + ") PRIMARY KEY(K2)"; - Operation op2 = db.updateDdl(ImmutableList.of(statement2), null); - op2.waitFor(); + OperationFuture op2 = db.updateDdl(ImmutableList.of(statement2), null); + op2.get(); Iterable statementsInDb = db.getDdl(); assertThat(statementsInDb).containsExactly(statement1, statement2); @@ -164,9 +161,7 @@ public void listPagination() throws Exception { String instanceId = testHelper.getInstanceId().getInstance(); for (String dbId : dbIds) { - dbs.add(dbAdminClient.createDatabase(instanceId, dbId, ImmutableList.of()) - .waitFor() - .getResult()); + dbs.add(dbAdminClient.createDatabase(instanceId, dbId, ImmutableList.of()).get()); } Page page = dbAdminClient.listDatabases(instanceId, Options.pageSize(1)); List dbIdsGot = new ArrayList<>(); diff --git a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITInstanceAdminTest.java b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITInstanceAdminTest.java index 57814da19bfe..494e80c4003b 100644 --- a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITInstanceAdminTest.java +++ b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITInstanceAdminTest.java @@ -18,13 +18,13 @@ import static com.google.common.truth.Truth.assertThat; +import com.google.api.gax.longrunning.OperationFuture; import com.google.cloud.spanner.Instance; import com.google.cloud.spanner.InstanceAdminClient; import com.google.cloud.spanner.InstanceConfig; import com.google.cloud.spanner.InstanceInfo; import com.google.cloud.spanner.IntegrationTest; import com.google.cloud.spanner.IntegrationTestEnv; -import com.google.cloud.spanner.Operation; import com.google.cloud.spanner.Options; import com.google.common.collect.Iterators; import com.google.spanner.admin.instance.v1.UpdateInstanceMetadata; @@ -90,9 +90,9 @@ public void updateInstance() throws Exception { .setNodeCount(instance.getNodeCount() + 1) .build(); // Only update display name - Operation op = + OperationFuture op = instanceClient.updateInstance(toUpdate, InstanceInfo.InstanceField.DISPLAY_NAME); - Instance newInstance = op.waitFor().getResult(); + Instance newInstance = op.get(); assertThat(newInstance.getNodeCount()).isEqualTo(instance.getNodeCount()); assertThat(newInstance.getDisplayName()).isEqualTo(newDisplayName); @@ -102,7 +102,7 @@ public void updateInstance() throws Exception { toUpdate = InstanceInfo.newBuilder(instance.getId()).setDisplayName(instance.getDisplayName()).build(); - instanceClient.updateInstance(toUpdate, InstanceInfo.InstanceField.DISPLAY_NAME).waitFor(); + instanceClient.updateInstance(toUpdate, InstanceInfo.InstanceField.DISPLAY_NAME).get(); } @Test @@ -118,9 +118,9 @@ public void updateInstanceViaEntity() throws Exception { .setNodeCount(instance.getNodeCount() + 1) .build(); // Only update display name - Operation op = + OperationFuture op = toUpdate.update(InstanceInfo.InstanceField.DISPLAY_NAME); - Instance newInstance = op.waitFor().getResult(); + Instance newInstance = op.get(); assertThat(newInstance.getNodeCount()).isEqualTo(instance.getNodeCount()); assertThat(newInstance.getDisplayName()).isEqualTo(newDisplayName); @@ -128,6 +128,6 @@ public void updateInstanceViaEntity() throws Exception { assertThat(newInstanceFromGet).isEqualTo(newInstance); toUpdate = newInstance.toBuilder().setDisplayName(instance.getDisplayName()).build(); - toUpdate.update(InstanceInfo.InstanceField.DISPLAY_NAME).waitFor(); + toUpdate.update(InstanceInfo.InstanceField.DISPLAY_NAME).get(); } } diff --git a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/RequestMetadataTest.java b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/RequestMetadataTest.java deleted file mode 100644 index 3e27aa68e042..000000000000 --- a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/RequestMetadataTest.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright 2017 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.spanner.spi.v1; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.doNothing; - -import com.google.cloud.spanner.spi.v1.GrpcSpannerRpc.MetadataClientCall; -import io.grpc.ClientCall; -import io.grpc.Metadata; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; - -/** Unit tests for {@link GrpcSpannerRpc.MetadataClientCall}. */ -@RunWith(JUnit4.class) -public class RequestMetadataTest { - private static final Metadata.Key HEADER_KEY = - Metadata.Key.of("google-cloud-resource-prefix", Metadata.ASCII_STRING_MARSHALLER); - - private Metadata metadata; - - @Mock - private ClientCall innerCall; - @Mock - private ClientCall.Listener listener; - @Captor - private ArgumentCaptor innerMetadata; - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - metadata = new Metadata(); - } - - @Test - public void metadataForwardingTest() { - doNothing() - .when(innerCall) - .start(Mockito.>any(), innerMetadata.capture()); - - Metadata in = new Metadata(); - in.put(HEADER_KEY, "TEST_HEADER"); - MetadataClientCall metadataCall = new MetadataClientCall<>(innerCall, in); - metadataCall.start(listener, metadata); - assertTrue(innerMetadata.getValue().containsKey(HEADER_KEY)); - assertEquals(innerMetadata.getValue().get(HEADER_KEY), "TEST_HEADER"); - } -} diff --git a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProviderTest.java b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProviderTest.java index 76f1f56c4d5a..12a9d2850ce4 100644 --- a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProviderTest.java +++ b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProviderTest.java @@ -15,11 +15,14 @@ */ package com.google.cloud.spanner.spi.v1; +import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.grpc.Metadata; import io.grpc.Metadata.Key; +import java.util.List; import java.util.Map; import org.junit.Test; @@ -67,6 +70,16 @@ public void testGetResourceHeaderValue() { getResourceHeaderValue(metadataProvider, "projects/p/instances/i/databases/d/operations")); } + @Test + public void testNewExtraHeaders() { + SpannerMetadataProvider metadataProvider = + SpannerMetadataProvider.create(ImmutableMap.of(), "header1"); + Map> extraHeaders = metadataProvider.newExtraHeaders(null, "value1"); + assertThat(extraHeaders) + .containsExactlyEntriesIn( + ImmutableMap.>of("header1", ImmutableList.of("value1"))); + } + private String getResourceHeaderValue( SpannerMetadataProvider headerProvider, String resourceTokenTemplate) { Metadata metadata = headerProvider.newMetadata(resourceTokenTemplate, "projects/p"); diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/spanner/snippets/DatabaseAdminClientSnippets.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/spanner/snippets/DatabaseAdminClientSnippets.java index 61c9baa6de7f..d0f5686ae89f 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/spanner/snippets/DatabaseAdminClientSnippets.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/spanner/snippets/DatabaseAdminClientSnippets.java @@ -23,16 +23,19 @@ package com.google.cloud.examples.spanner.snippets; import com.google.api.gax.paging.Page; +import com.google.api.gax.longrunning.OperationFuture; import com.google.common.collect.Iterables; import com.google.cloud.spanner.DatabaseAdminClient; import com.google.cloud.spanner.Options; import com.google.cloud.spanner.Database; -import com.google.cloud.spanner.Operation; +import com.google.cloud.spanner.SpannerException; +import com.google.cloud.spanner.SpannerExceptionFactory; import com.google.spanner.admin.database.v1.CreateDatabaseMetadata; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ExecutionException; /** * This class contains snippets for {@link DatabaseAdminClient} interface. @@ -53,7 +56,7 @@ public DatabaseAdminClientSnippets(DatabaseAdminClient dbAdminClient) { // [VARIABLE my_database_id] public Database createDatabase(String instanceId, String databaseId) { // [START createDatabase] - Operation op = dbAdminClient + OperationFuture op = dbAdminClient .createDatabase( instanceId, databaseId, @@ -70,7 +73,14 @@ public Database createDatabase(String instanceId, String databaseId) { + " AlbumTitle STRING(MAX)\n" + ") PRIMARY KEY (SingerId, AlbumId),\n" + " INTERLEAVE IN PARENT Singers ON DELETE CASCADE")); - Database db = op.waitFor().getResult(); + Database db; + try { + db = op.get(); + } catch(ExecutionException e) { + throw (SpannerException) e.getCause(); + } catch(InterruptedException e) { + throw SpannerExceptionFactory.propagateInterrupt(e); + } // [END createDatabase] return db; } @@ -96,10 +106,16 @@ public Database getDatabase(String instanceId, String databaseId) { // [VARIABLE my_database_id] public void updateDatabaseDdl(String instanceId, String databaseId) { // [START updateDatabaseDdl] - dbAdminClient.updateDatabaseDdl(instanceId, + try { + dbAdminClient.updateDatabaseDdl(instanceId, databaseId, Arrays.asList("ALTER TABLE Albums ADD COLUMN MarketingBudget INT64"), - null).waitFor(); + null).get(); + } catch (ExecutionException e) { + throw (SpannerException) e.getCause(); + } catch (InterruptedException e) { + throw SpannerExceptionFactory.propagateInterrupt(e); + } // [END updateDatabaseDdl] } diff --git a/google-cloud-examples/src/main/java/com/google/cloud/examples/spanner/snippets/InstanceAdminClientSnippets.java b/google-cloud-examples/src/main/java/com/google/cloud/examples/spanner/snippets/InstanceAdminClientSnippets.java index 501a28fc7280..02aa3e91b1a7 100644 --- a/google-cloud-examples/src/main/java/com/google/cloud/examples/spanner/snippets/InstanceAdminClientSnippets.java +++ b/google-cloud-examples/src/main/java/com/google/cloud/examples/spanner/snippets/InstanceAdminClientSnippets.java @@ -22,18 +22,21 @@ package com.google.cloud.examples.spanner.snippets; +import com.google.api.gax.longrunning.OperationFuture; import com.google.cloud.spanner.Instance; import com.google.cloud.spanner.InstanceAdminClient; import com.google.cloud.spanner.InstanceConfig; import com.google.cloud.spanner.InstanceConfigId; import com.google.cloud.spanner.InstanceId; import com.google.cloud.spanner.InstanceInfo; -import com.google.cloud.spanner.Operation; import com.google.cloud.spanner.Options; +import com.google.cloud.spanner.SpannerException; +import com.google.cloud.spanner.SpannerExceptionFactory; import com.google.common.collect.Lists; import com.google.spanner.admin.instance.v1.CreateInstanceMetadata; import com.google.spanner.admin.instance.v1.UpdateInstanceMetadata; import java.util.List; +import java.util.concurrent.ExecutionException; /** * This class contains snippets for {@link InstanceAdminClient} interface. @@ -80,14 +83,20 @@ public void createInstance( final String configId = my_config_id; final String clientProject = my_client_project; - Operation op = + OperationFuture op = instanceAdminClient.createInstance(InstanceInfo .newBuilder(InstanceId.of(clientProject, instanceId)) .setInstanceConfigId(InstanceConfigId.of(clientProject, configId)) .setDisplayName(instanceId) .setNodeCount(1) .build()); - op.waitFor(); + try { + op.get(); + } catch(ExecutionException e) { + throw (SpannerException) e.getCause(); + } catch(InterruptedException e) { + throw SpannerExceptionFactory.propagateInterrupt(e); + } // [END instance_admin_client_create_instance] } @@ -144,9 +153,15 @@ public void updateInstance(Instance my_instance, .setNodeCount(instance.getNodeCount() + 1) .build(); // Only update display name - Operation op = + OperationFuture op = instanceAdminClient.updateInstance(toUpdate, InstanceInfo.InstanceField.DISPLAY_NAME); - op.waitFor().getResult(); + try { + op.get(); + } catch(ExecutionException e) { + throw (SpannerException) e.getCause(); + } catch(InterruptedException e) { + throw SpannerExceptionFactory.propagateInterrupt(e); + } // [END instance_admin_client_update_instance] } }