From c99189c50661aaa4325d2469b26e20f412587ed9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Mon, 29 Jan 2024 12:20:15 +0100 Subject: [PATCH] chore: add support for virtual threads to Connection API (#2789) Adds support for using virtual threads in the Connection API. Virtual threads can be enabled for two things: 1. As the StatementExecutor thread for each connection. 2. As the gRPC transport thread pool. Both options can (for now) only be set in the Connection API. Setting any of these options only has any effect if the application is running on Java 21 or higher. --- .../google/cloud/spanner/SpannerOptions.java | 25 ++++- .../cloud/spanner/ThreadFactoryUtil.java | 99 +++++++++++++++++++ .../spanner/connection/ChecksumResultSet.java | 9 +- .../spanner/connection/ConnectionImpl.java | 7 +- .../spanner/connection/ConnectionOptions.java | 43 ++++++++ .../connection/ConnectionSpannerOptions.java | 48 +++++++++ .../connection/ReadWriteTransaction.java | 9 +- .../cloud/spanner/connection/SpannerPool.java | 13 ++- .../spanner/connection/StatementExecutor.java | 46 +++++---- .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 10 +- .../cloud/spanner/SessionPoolStressTest.java | 2 + .../cloud/spanner/ThreadFactoryUtilTest.java | 76 ++++++++++++++ .../connection/ConnectionOptionsTest.java | 52 ++++++++++ .../connection/SavepointMockServerTest.java | 43 ++++++-- .../connection/it/ITTransactionRetryTest.java | 5 +- 15 files changed, 444 insertions(+), 43 deletions(-) create mode 100644 google-cloud-spanner/src/main/java/com/google/cloud/spanner/ThreadFactoryUtil.java create mode 100644 google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionSpannerOptions.java create mode 100644 google-cloud-spanner/src/test/java/com/google/cloud/spanner/ThreadFactoryUtilTest.java diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java index 877ea72e467..0d9f3b85c19 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java @@ -140,6 +140,7 @@ public class SpannerOptions extends ServiceOptions { private final boolean leaderAwareRoutingEnabled; private final boolean attemptDirectPath; private final DirectedReadOptions directedReadOptions; + private final boolean useVirtualThreads; /** Interface that can be used to provide {@link CallCredentials} to {@link SpannerOptions}. */ public interface CallCredentialsProvider { @@ -580,9 +581,9 @@ public static CloseableExecutorProvider createAsyncExecutorProvider( return FixedCloseableExecutorProvider.create(executor); } - private SpannerOptions(Builder builder) { + protected SpannerOptions(Builder builder) { super(SpannerFactory.class, SpannerRpcFactory.class, builder, new SpannerDefaults()); - numChannels = builder.numChannels; + numChannels = builder.numChannels == null ? DEFAULT_CHANNELS : builder.numChannels; Preconditions.checkArgument( numChannels >= 1 && numChannels <= MAX_CHANNELS, "Number of channels must fall in the range [1, %s], found: %s", @@ -631,6 +632,7 @@ private SpannerOptions(Builder builder) { leaderAwareRoutingEnabled = builder.leaderAwareRoutingEnabled; attemptDirectPath = builder.attemptDirectPath; directedReadOptions = builder.directedReadOptions; + useVirtualThreads = builder.useVirtualThreads; } /** @@ -734,12 +736,13 @@ public static class Builder private boolean leaderAwareRoutingEnabled = true; private boolean attemptDirectPath = true; private DirectedReadOptions directedReadOptions; + private boolean useVirtualThreads = false; private static String createCustomClientLibToken(String token) { return token + " " + ServiceOptions.getGoogApiClientLibName(); } - private Builder() { + protected Builder() { // Manually set retry and polling settings that work. OperationTimedPollAlgorithm longRunningPollingAlgorithm = OperationTimedPollAlgorithm.create( @@ -795,6 +798,7 @@ private Builder() { this.interceptorProvider = options.interceptorProvider; this.attemptDirectPath = options.attemptDirectPath; this.directedReadOptions = options.directedReadOptions; + this.useVirtualThreads = options.useVirtualThreads; } @Override @@ -1263,6 +1267,16 @@ public Builder disableDirectPath() { return this; } + /** + * Enables/disables the use of virtual threads for the gRPC executor. Setting this option only + * has any effect on Java 21 and higher. In all other cases, the option will be ignored. + */ + @BetaApi + protected Builder setUseVirtualThreads(boolean useVirtualThreads) { + this.useVirtualThreads = useVirtualThreads; + return this; + } + @SuppressWarnings("rawtypes") @Override public SpannerOptions build() { @@ -1412,6 +1426,11 @@ public boolean isAttemptDirectPath() { return attemptDirectPath; } + @BetaApi + public boolean isUseVirtualThreads() { + return useVirtualThreads; + } + /** Returns the default query options to use for the specific database. */ public QueryOptions getDefaultQueryOptions(DatabaseId databaseId) { // Use the specific query options for the database if any have been specified. These have diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ThreadFactoryUtil.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ThreadFactoryUtil.java new file mode 100644 index 00000000000..67f4d3230d4 --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ThreadFactoryUtil.java @@ -0,0 +1,99 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import com.google.api.core.InternalApi; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import javax.annotation.Nullable; + +/** Utility class for creating a thread factory for daemon or virtual threads. */ +@InternalApi +public class ThreadFactoryUtil { + + /** + * Tries to create a thread factory for virtual threads, and otherwise falls back to creating a + * platform thread factory that creates daemon threads. Virtual threads are supported from JDK21. + * + * @param baseNameFormat the base name format for the threads, '-%d' will be appended to the + * actual thread name format + * @param tryVirtualThreads whether to try to use virtual threads if available or not + * @return a {@link ThreadFactory} that produces virtual threads (Java 21 or higher) or platform + * daemon threads + */ + @InternalApi + public static ThreadFactory createVirtualOrPlatformDaemonThreadFactory( + String baseNameFormat, boolean tryVirtualThreads) { + ThreadFactory virtualThreadFactory = + tryVirtualThreads ? tryCreateVirtualThreadFactory(baseNameFormat) : null; + if (virtualThreadFactory != null) { + return virtualThreadFactory; + } + + return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(baseNameFormat + "-%d").build(); + } + + /** + * Tries to create a {@link ThreadFactory} that creates virtual threads. Returns null if virtual + * threads are not supported on this JVM. + */ + @InternalApi + @Nullable + public static ThreadFactory tryCreateVirtualThreadFactory(String baseNameFormat) { + try { + Class threadBuilderClass = Class.forName("java.lang.Thread$Builder"); + Method ofVirtualMethod = Thread.class.getDeclaredMethod("ofVirtual"); + Object virtualBuilder = ofVirtualMethod.invoke(null); + Method nameMethod = threadBuilderClass.getDeclaredMethod("name", String.class, long.class); + virtualBuilder = nameMethod.invoke(virtualBuilder, baseNameFormat + "-", 0); + Method factoryMethod = threadBuilderClass.getDeclaredMethod("factory"); + return (ThreadFactory) factoryMethod.invoke(virtualBuilder); + } catch (ClassNotFoundException | NoSuchMethodException ignore) { + return null; + } catch (InvocationTargetException | IllegalAccessException e) { + throw new RuntimeException(e); + } + } + + /** + * Tries to create an {@link ExecutorService} that creates a new virtual thread for each task that + * it runs. Creating a new virtual thread is the recommended way to create executors using virtual + * threads, instead of creating a pool of virtual threads. Returns null if virtual threads are not + * supported on this JVM. + */ + @InternalApi + @Nullable + public static ExecutorService tryCreateVirtualThreadPerTaskExecutor(String baseNameFormat) { + ThreadFactory factory = tryCreateVirtualThreadFactory(baseNameFormat); + if (factory != null) { + try { + Method newThreadPerTaskExecutorMethod = + Executors.class.getDeclaredMethod("newThreadPerTaskExecutor", ThreadFactory.class); + return (ExecutorService) newThreadPerTaskExecutorMethod.invoke(null, factory); + } catch (NoSuchMethodException ignore) { + return null; + } catch (InvocationTargetException | IllegalAccessException e) { + throw new RuntimeException(e); + } + } + return null; + } +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ChecksumResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ChecksumResultSet.java index 24389546669..dc373cf03bd 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ChecksumResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ChecksumResultSet.java @@ -39,6 +39,7 @@ import java.math.BigDecimal; import java.util.Objects; import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicLong; /** * {@link ResultSet} implementation that keeps a running checksum that can be used to determine @@ -66,7 +67,7 @@ @VisibleForTesting class ChecksumResultSet extends ReplaceableForwardingResultSet implements RetriableStatement { private final ReadWriteTransaction transaction; - private volatile long numberOfNextCalls; + private final AtomicLong numberOfNextCalls = new AtomicLong(); private final ParsedStatement statement; private final AnalyzeMode analyzeMode; private final QueryOption[] options; @@ -103,7 +104,7 @@ public Boolean call() { if (res) { checksumCalculator.calculateNextChecksum(getCurrentRowAsStruct()); } - numberOfNextCalls++; + numberOfNextCalls.incrementAndGet(); return res; } } @@ -142,7 +143,7 @@ public void retry(AbortedException aborted) throws AbortedException { DirectExecuteResultSet.ofResultSet( transaction.internalExecuteQuery(statement, analyzeMode, options)); boolean next = true; - while (counter < numberOfNextCalls && next) { + while (counter < numberOfNextCalls.get() && next) { transaction .getStatementExecutor() .invokeInterceptors( @@ -169,7 +170,7 @@ public void retry(AbortedException aborted) throws AbortedException { // Check that we have the same number of rows and the same checksum. HashCode newChecksum = newChecksumCalculator.getChecksum(); HashCode currentChecksum = checksumCalculator.getChecksum(); - if (counter == numberOfNextCalls && Objects.equals(newChecksum, currentChecksum)) { + if (counter == numberOfNextCalls.get() && Objects.equals(newChecksum, currentChecksum)) { // Checksum is ok, we only need to replace the delegate result set if it's still open. if (isClosed()) { resultSet.close(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java index 9135fc44a54..3e524e4fe92 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java @@ -248,7 +248,9 @@ static UnitOfWorkType of(TransactionMode transactionMode) { Preconditions.checkNotNull(options); this.leakedException = options.isTrackConnectionLeaks() ? new LeakedConnectionException() : null; - this.statementExecutor = new StatementExecutor(options.getStatementExecutionInterceptors()); + this.statementExecutor = + new StatementExecutor( + options.isUseVirtualThreads(), options.getStatementExecutionInterceptors()); this.spannerPool = SpannerPool.INSTANCE; this.options = options; this.spanner = spannerPool.getSpanner(options, this); @@ -283,7 +285,8 @@ static UnitOfWorkType of(TransactionMode transactionMode) { BatchClient batchClient) { this.leakedException = options.isTrackConnectionLeaks() ? new LeakedConnectionException() : null; - this.statementExecutor = new StatementExecutor(Collections.emptyList()); + this.statementExecutor = + new StatementExecutor(options.isUseVirtualThreads(), Collections.emptyList()); this.spannerPool = Preconditions.checkNotNull(spannerPool); this.options = Preconditions.checkNotNull(options); this.spanner = spannerPool.getSpanner(options, this); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionOptions.java index e8c2855af50..268661aef9c 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionOptions.java @@ -167,6 +167,8 @@ public String[] getValidValues() { static final boolean DEFAULT_AUTOCOMMIT = true; static final boolean DEFAULT_READONLY = false; static final boolean DEFAULT_RETRY_ABORTS_INTERNALLY = true; + static final boolean DEFAULT_USE_VIRTUAL_THREADS = false; + static final boolean DEFAULT_USE_VIRTUAL_GRPC_TRANSPORT_THREADS = false; private static final String DEFAULT_CREDENTIALS = null; private static final String DEFAULT_OAUTH_TOKEN = null; private static final String DEFAULT_MIN_SESSIONS = null; @@ -204,6 +206,11 @@ public String[] getValidValues() { public static final String ROUTE_TO_LEADER_PROPERTY_NAME = "routeToLeader"; /** Name of the 'retry aborts internally' connection property. */ public static final String RETRY_ABORTS_INTERNALLY_PROPERTY_NAME = "retryAbortsInternally"; + /** Name of the property to enable/disable virtual threads for the statement executor. */ + public static final String USE_VIRTUAL_THREADS_PROPERTY_NAME = "useVirtualThreads"; + /** Name of the property to enable/disable virtual threads for gRPC transport. */ + public static final String USE_VIRTUAL_GRPC_TRANSPORT_THREADS_PROPERTY_NAME = + "useVirtualGrpcTransportThreads"; /** Name of the 'credentials' connection property. */ public static final String CREDENTIALS_PROPERTY_NAME = "credentials"; /** Name of the 'encodedCredentials' connection property. */ @@ -293,6 +300,16 @@ private static String generateGuardedConnectionPropertyError( RETRY_ABORTS_INTERNALLY_PROPERTY_NAME, "Should the connection automatically retry Aborted errors (true/false)", DEFAULT_RETRY_ABORTS_INTERNALLY), + ConnectionProperty.createBooleanProperty( + USE_VIRTUAL_THREADS_PROPERTY_NAME, + "Use a virtual thread instead of a platform thread for each connection (true/false). " + + "This option only has any effect if the application is running on Java 21 or higher. In all other cases, the option is ignored.", + DEFAULT_USE_VIRTUAL_THREADS), + ConnectionProperty.createBooleanProperty( + USE_VIRTUAL_GRPC_TRANSPORT_THREADS_PROPERTY_NAME, + "Use a virtual thread instead of a platform thread for the gRPC executor (true/false). " + + "This option only has any effect if the application is running on Java 21 or higher. In all other cases, the option is ignored.", + DEFAULT_USE_VIRTUAL_GRPC_TRANSPORT_THREADS), ConnectionProperty.createStringProperty( CREDENTIALS_PROPERTY_NAME, "The location of the credentials file to use for this connection. If neither this property or encoded credentials are set, the connection will use the default Google Cloud credentials for the runtime environment."), @@ -672,6 +689,8 @@ public static Builder newBuilder() { private final boolean readOnly; private final boolean routeToLeader; private final boolean retryAbortsInternally; + private final boolean useVirtualThreads; + private final boolean useVirtualGrpcTransportThreads; private final List statementExecutionInterceptors; private final SpannerOptionsConfigurator configurator; @@ -771,6 +790,8 @@ private ConnectionOptions(Builder builder) { this.readOnly = parseReadOnly(this.uri); this.routeToLeader = parseRouteToLeader(this.uri); this.retryAbortsInternally = parseRetryAbortsInternally(this.uri); + this.useVirtualThreads = parseUseVirtualThreads(this.uri); + this.useVirtualGrpcTransportThreads = parseUseVirtualGrpcTransportThreads(this.uri); this.statementExecutionInterceptors = Collections.unmodifiableList(builder.statementExecutionInterceptors); this.configurator = builder.configurator; @@ -873,6 +894,18 @@ static boolean parseRetryAbortsInternally(String uri) { return value != null ? Boolean.parseBoolean(value) : DEFAULT_RETRY_ABORTS_INTERNALLY; } + @VisibleForTesting + static boolean parseUseVirtualThreads(String uri) { + String value = parseUriProperty(uri, USE_VIRTUAL_THREADS_PROPERTY_NAME); + return value != null ? Boolean.parseBoolean(value) : DEFAULT_USE_VIRTUAL_THREADS; + } + + @VisibleForTesting + static boolean parseUseVirtualGrpcTransportThreads(String uri) { + String value = parseUriProperty(uri, USE_VIRTUAL_GRPC_TRANSPORT_THREADS_PROPERTY_NAME); + return value != null ? Boolean.parseBoolean(value) : DEFAULT_USE_VIRTUAL_GRPC_TRANSPORT_THREADS; + } + @VisibleForTesting static @Nullable String parseCredentials(String uri) { String value = parseUriProperty(uri, CREDENTIALS_PROPERTY_NAME); @@ -1293,6 +1326,16 @@ public boolean isRetryAbortsInternally() { return retryAbortsInternally; } + /** Whether connections should use virtual threads for connection executors. */ + public boolean isUseVirtualThreads() { + return useVirtualThreads; + } + + /** Whether virtual threads should be used for gRPC transport. */ + public boolean isUseVirtualGrpcTransportThreads() { + return useVirtualGrpcTransportThreads; + } + /** Any warnings that were generated while creating the {@link ConnectionOptions} instance. */ @Nullable public String getWarnings() { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionSpannerOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionSpannerOptions.java new file mode 100644 index 00000000000..f9d310b69e3 --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionSpannerOptions.java @@ -0,0 +1,48 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.connection; + +import com.google.cloud.spanner.SpannerOptions; + +/** + * This class is used for building {@link SpannerOptions} for {@link Connection} instances. It gives + * access to (experimental) properties that are not public in the standard {@link SpannerOptions} + * implementation. + */ +class ConnectionSpannerOptions extends SpannerOptions { + public static Builder newBuilder() { + return new Builder(); + } + + static class Builder extends SpannerOptions.Builder { + Builder() {} + + @Override + protected SpannerOptions.Builder setUseVirtualThreads(boolean useVirtualThreads) { + return super.setUseVirtualThreads(useVirtualThreads); + } + + @Override + public ConnectionSpannerOptions build() { + return new ConnectionSpannerOptions(this); + } + } + + ConnectionSpannerOptions(Builder builder) { + super(builder); + } +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java index a618851ad85..e1fb87e4ade 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java @@ -62,6 +62,7 @@ import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; @@ -80,6 +81,7 @@ class ReadWriteTransaction extends AbstractMultiUseTransaction { private static final String MAX_INTERNAL_RETRIES_EXCEEDED = "Internal transaction retry maximum exceeded"; private static final int MAX_INTERNAL_RETRIES = 50; + private final ReentrantLock abortedLock = new ReentrantLock(); private final long transactionId; private final DatabaseClient dbClient; private final TransactionOption[] transactionOptions; @@ -100,7 +102,6 @@ class ReadWriteTransaction extends AbstractMultiUseTransaction { private final List statements = new ArrayList<>(); private final List mutations = new ArrayList<>(); private Timestamp transactionStarted; - final Object abortedLock = new Object(); private static final class RollbackToSavepointException extends Exception {} @@ -772,7 +773,8 @@ public ApiFuture commitAsync(CallType callType) { */ T runWithRetry(Callable callable) throws SpannerException { while (true) { - synchronized (abortedLock) { + abortedLock.lock(); + try { checkAborted(); try { checkRolledBackToSavepoint(); @@ -784,6 +786,8 @@ T runWithRetry(Callable callable) throws SpannerException { } catch (Exception e) { throw SpannerExceptionFactory.asSpannerException(e); } + } finally { + abortedLock.unlock(); } } } @@ -870,6 +874,7 @@ private void handleAborted(AbortedException aborted) { long delay = aborted.getRetryDelayInMillis(); try { if (delay > 0L) { + //noinspection BusyWait Thread.sleep(delay); } } catch (InterruptedException ie) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SpannerPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SpannerPool.java index df7190f11a1..2a5a805c2c7 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SpannerPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SpannerPool.java @@ -22,7 +22,6 @@ import com.google.cloud.spanner.Spanner; import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.SpannerExceptionFactory; -import com.google.cloud.spanner.SpannerOptions; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.MoreObjects; @@ -155,6 +154,7 @@ static class SpannerPoolKey { private final String userAgent; private final String databaseRole; private final boolean routeToLeader; + private final boolean useVirtualGrpcTransportThreads; @VisibleForTesting static SpannerPoolKey of(ConnectionOptions options) { @@ -181,6 +181,7 @@ private SpannerPoolKey(ConnectionOptions options) throws IOException { this.usePlainText = options.isUsePlainText(); this.userAgent = options.getUserAgent(); this.routeToLeader = options.isRouteToLeader(); + this.useVirtualGrpcTransportThreads = options.isUseVirtualGrpcTransportThreads(); } @Override @@ -197,7 +198,9 @@ public boolean equals(Object o) { && Objects.equals(this.databaseRole, other.databaseRole) && Objects.equals(this.usePlainText, other.usePlainText) && Objects.equals(this.userAgent, other.userAgent) - && Objects.equals(this.routeToLeader, other.routeToLeader); + && Objects.equals(this.routeToLeader, other.routeToLeader) + && Objects.equals( + this.useVirtualGrpcTransportThreads, other.useVirtualGrpcTransportThreads); } @Override @@ -211,7 +214,8 @@ public int hashCode() { this.usePlainText, this.databaseRole, this.userAgent, - this.routeToLeader); + this.routeToLeader, + this.useVirtualGrpcTransportThreads); } } @@ -332,8 +336,9 @@ private void initialize() { @VisibleForTesting Spanner createSpanner(SpannerPoolKey key, ConnectionOptions options) { - SpannerOptions.Builder builder = SpannerOptions.newBuilder(); + ConnectionSpannerOptions.Builder builder = ConnectionSpannerOptions.newBuilder(); builder + .setUseVirtualThreads(key.useVirtualGrpcTransportThreads) .setClientLibToken(MoreObjects.firstNonNull(key.userAgent, CONNECTION_API_CLIENT_LIB_TOKEN)) .setHost(key.host) .setProjectId(key.projectId) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/StatementExecutor.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/StatementExecutor.java index 438fc749fdc..f3c93c6dd1c 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/StatementExecutor.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/StatementExecutor.java @@ -16,15 +16,17 @@ package com.google.cloud.spanner.connection; +import static com.google.cloud.spanner.connection.ConnectionOptions.DEFAULT_USE_VIRTUAL_THREADS; + import com.google.api.core.ApiFuture; import com.google.api.core.ListenableFutureToApiFuture; +import com.google.cloud.spanner.ThreadFactoryUtil; import com.google.cloud.spanner.connection.AbstractStatementParser.ParsedStatement; import com.google.cloud.spanner.connection.ReadOnlyStalenessUtil.DurationValueGetter; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.Duration; import java.util.Collections; import java.util.List; @@ -130,37 +132,45 @@ org.threeten.bp.Duration asDuration() { } /** - * Use a {@link ThreadFactory} that produces daemon threads and sets recognizable name on the + * Use a {@link ThreadFactory} that produces daemon or virtual threads and sets a recognizable + * name on the threads. + */ + private static final ThreadFactory DEFAULT_VIRTUAL_THREAD_FACTORY = + ThreadFactoryUtil.createVirtualOrPlatformDaemonThreadFactory("connection-executor", true); + /** + * Use a {@link ThreadFactory} that produces daemon threads and sets a recognizable name on the * threads. */ - private static final ThreadFactory THREAD_FACTORY = - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("connection-executor-%d") - .setThreadFactory(MoreExecutors.platformThreadFactory()) - .build(); + private static final ThreadFactory DEFAULT_DAEMON_THREAD_FACTORY = + ThreadFactoryUtil.createVirtualOrPlatformDaemonThreadFactory("connection-executor", false); /** Creates an {@link ExecutorService} for a {@link StatementExecutor}. */ - private static ListeningExecutorService createExecutorService() { + private static ListeningExecutorService createExecutorService(boolean useVirtualThreads) { return MoreExecutors.listeningDecorator( new ThreadPoolExecutor( - 1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), THREAD_FACTORY)); + 1, + 1, + 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(), + useVirtualThreads ? DEFAULT_VIRTUAL_THREAD_FACTORY : DEFAULT_DAEMON_THREAD_FACTORY)); } - private ListeningExecutorService executor = createExecutorService(); + private final ListeningExecutorService executor; /** * Interceptors that should be invoked before or after a statement is executed can be registered - * for a connection. This are added to this list. The interceptors are intended for test usage. + * for a connection. These are added to this list. The interceptors are intended for test usage. */ private final List interceptors; @VisibleForTesting StatementExecutor() { - this.interceptors = Collections.emptyList(); + this(DEFAULT_USE_VIRTUAL_THREADS, Collections.emptyList()); } - StatementExecutor(List interceptors) { + StatementExecutor(boolean useVirtualThreads, List interceptors) { + this.executor = createExecutorService(useVirtualThreads); this.interceptors = Collections.unmodifiableList(interceptors); } @@ -168,15 +178,11 @@ void shutdown() { executor.shutdown(); } - void awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { - executor.awaitTermination(timeout, unit); - } - /** * Shutdown this executor now and do not wait for any statement that is being executed to finish. */ - List shutdownNow() { - return executor.shutdownNow(); + void shutdownNow() { + executor.shutdownNow(); } /** Execute a statement on this {@link StatementExecutor}. */ diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index a65e533338f..a8618af5c79 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -17,6 +17,7 @@ package com.google.cloud.spanner.spi.v1; import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException; +import static com.google.cloud.spanner.ThreadFactoryUtil.tryCreateVirtualThreadPerTaskExecutor; import com.google.api.core.ApiFunction; import com.google.api.core.ApiFuture; @@ -202,6 +203,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; @@ -351,7 +353,13 @@ public GapicSpannerRpc(final SpannerOptions options) { options.isAttemptDirectPath() && !Objects.equals( options.getScopedCredentials(), NoCredentials.getInstance())); - + if (options.isUseVirtualThreads()) { + ExecutorService executor = + tryCreateVirtualThreadPerTaskExecutor("spanner-virtual-grpc-executor"); + if (executor != null) { + defaultChannelProviderBuilder.setExecutor(executor); + } + } // If it is enabled in options uses the channel pool provided by the gRPC-GCP extension. maybeEnableGrpcGcpExtension(defaultChannelProviderBuilder, options); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java index 61d37145aa4..535423bb9e4 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java @@ -294,6 +294,8 @@ public void stressTest() throws Exception { () -> { while (!stopMaintenance.get()) { runMaintenanceLoop(clock, pool, 1); + // Sleep 1ms between maintenance loops to prevent the long-running session remover + // from stealing all sessions before they can be used. Uninterruptibles.sleepUninterruptibly(1L, TimeUnit.MILLISECONDS); } }) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ThreadFactoryUtilTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ThreadFactoryUtilTest.java new file mode 100644 index 00000000000..faf70ff577a --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ThreadFactoryUtilTest.java @@ -0,0 +1,76 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.spanner; + +import static com.google.cloud.spanner.ThreadFactoryUtil.createVirtualOrPlatformDaemonThreadFactory; +import static com.google.cloud.spanner.ThreadFactoryUtil.tryCreateVirtualThreadFactory; +import static com.google.cloud.spanner.ThreadFactoryUtil.tryCreateVirtualThreadPerTaskExecutor; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import com.google.common.util.concurrent.SettableFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ThreadFactoryUtilTest { + + @Test + public void testCreateThreadFactory() throws Exception { + ThreadFactory threadFactory = createVirtualOrPlatformDaemonThreadFactory("test-thread", true); + assertNotNull(threadFactory); + SettableFuture future = SettableFuture.create(); + Thread thread = threadFactory.newThread(() -> future.set(true)); + assertNotNull(thread); + // Virtual threads are by definition always daemon threads. + assertTrue(thread.isDaemon()); + thread.start(); + assertTrue(future.get(1L, TimeUnit.SECONDS)); + + if (isJava21OrHigher()) { + ThreadFactory virtualFactory = tryCreateVirtualThreadFactory("test-thread"); + assertNotNull(virtualFactory); + assertEquals(virtualFactory.getClass(), threadFactory.getClass()); + } else { + assertNull(tryCreateVirtualThreadFactory("test-thread")); + } + } + + @Test + public void testTryCreateVirtualThreadPerTaskExecutor() { + if (isJava21OrHigher()) { + assertNotNull(tryCreateVirtualThreadPerTaskExecutor("test-virtual-thread")); + } else { + assertNull(tryCreateVirtualThreadPerTaskExecutor("test-virtual-thread")); + } + } + + private static boolean isJava21OrHigher() { + String[] versionElements = System.getProperty("java.version").split("\\."); + int majorVersion = Integer.parseInt(versionElements[0]); + // Java 1.8 (Java 8) and lower used the format 1.8 etc. + // Java 9 and higher use the format 9.x + if (majorVersion == 1) { + majorVersion = Integer.parseInt(versionElements[1]); + } + return majorVersion >= 21; + } +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionOptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionOptionsTest.java index ca0b779981f..bdca80a214d 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionOptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionOptionsTest.java @@ -1035,4 +1035,56 @@ public void testExternalChannelProvider_WithoutEnablingProperty() throws Throwab .build()); assertEquals(ErrorCode.FAILED_PRECONDITION, exception.getErrorCode()); } + + @Test + public void testUseVirtualThreads() { + assertTrue( + ConnectionOptions.newBuilder() + .setUri( + "cloudspanner:/projects/test-project-123/instances/test-instance/databases/test-database?useVirtualThreads=true") + .setCredentials(NoCredentials.getInstance()) + .build() + .isUseVirtualThreads()); + assertFalse( + ConnectionOptions.newBuilder() + .setUri( + "cloudspanner:/projects/test-project-123/instances/test-instance/databases/test-database?useVirtualThreads=false") + .setCredentials(NoCredentials.getInstance()) + .build() + .isUseVirtualThreads()); + assertEquals( + ConnectionOptions.DEFAULT_USE_VIRTUAL_THREADS, + ConnectionOptions.newBuilder() + .setUri( + "cloudspanner:/projects/test-project-123/instances/test-instance/databases/test-database") + .setCredentials(NoCredentials.getInstance()) + .build() + .isUseVirtualThreads()); + } + + @Test + public void testUseVirtualGrpcTransportThreads() { + assertTrue( + ConnectionOptions.newBuilder() + .setUri( + "cloudspanner:/projects/test-project-123/instances/test-instance/databases/test-database?useVirtualGrpcTransportThreads=true") + .setCredentials(NoCredentials.getInstance()) + .build() + .isUseVirtualGrpcTransportThreads()); + assertFalse( + ConnectionOptions.newBuilder() + .setUri( + "cloudspanner:/projects/test-project-123/instances/test-instance/databases/test-database?useVirtualGrpcTransportThreads=false") + .setCredentials(NoCredentials.getInstance()) + .build() + .isUseVirtualGrpcTransportThreads()); + assertEquals( + ConnectionOptions.DEFAULT_USE_VIRTUAL_GRPC_TRANSPORT_THREADS, + ConnectionOptions.newBuilder() + .setUri( + "cloudspanner:/projects/test-project-123/instances/test-instance/databases/test-database") + .setCredentials(NoCredentials.getInstance()) + .build() + .isUseVirtualThreads()); + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SavepointMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SavepointMockServerTest.java index 537617f4a0c..e2ab63b3213 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SavepointMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SavepointMockServerTest.java @@ -28,6 +28,7 @@ import com.google.cloud.spanner.ResultSet; import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.Statement; +import com.google.cloud.spanner.connection.ITAbstractSpannerTest.ITConnection; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.protobuf.AbstractMessage; @@ -36,6 +37,7 @@ import com.google.spanner.v1.ExecuteBatchDmlRequest; import com.google.spanner.v1.ExecuteSqlRequest; import com.google.spanner.v1.RollbackRequest; +import java.util.Collection; import java.util.List; import java.util.stream.Collectors; import org.junit.After; @@ -49,22 +51,51 @@ @RunWith(Parameterized.class) public class SavepointMockServerTest extends AbstractMockServerTest { - @Parameters(name = "dialect = {0}") - public static Object[] data() { - return Dialect.values(); + // This test uses both platform threads and virtual threads (when available). We use specifically + // this test for testing virtual threads, because it relies heavily on the internal checksum retry + // strategy. This is the only significant calculation that is executed by the StatementExecutor + // thread, meaning that this shows that using a virtual thread for those calculations also works. + @Parameters(name = "dialect = {0}, useVirtualThreads = {1}") + public static Collection data() { + ImmutableList.Builder builder = ImmutableList.builder(); + for (Dialect dialect : Dialect.values()) { + for (boolean useVirtualThreads : new boolean[] {true, false}) { + builder.add(new Object[] {dialect, useVirtualThreads}); + } + } + return builder.build(); } - @Parameter public Dialect dialect; + @Parameter(0) + public Dialect dialect; + + @Parameter(1) + public boolean useVirtualThreads; + + private Dialect currentDialect; @Before public void setupDialect() { - mockSpanner.putStatementResult(StatementResult.detectDialectResult(dialect)); + if (currentDialect != dialect) { + // Reset the dialect result. + SpannerPool.closeSpannerPool(); + mockSpanner.putStatementResult(StatementResult.detectDialectResult(dialect)); + currentDialect = dialect; + } } @After public void clearRequests() { mockSpanner.clearRequests(); - SpannerPool.closeSpannerPool(); + } + + @Override + public ITConnection createConnection() { + return createConnection( + ";useVirtualThreads=" + + useVirtualThreads + + ";useVirtualGrpcTransportThreads=" + + useVirtualThreads); } @Test diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/it/ITTransactionRetryTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/it/ITTransactionRetryTest.java index 1d7de23cb40..0cf3abda6bf 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/it/ITTransactionRetryTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/it/ITTransactionRetryTest.java @@ -59,7 +59,10 @@ public class ITTransactionRetryTest extends ITAbstractSpannerTest { @Override protected void appendConnectionUri(StringBuilder uri) { - uri.append(";autocommit=false;retryAbortsInternally=true"); + // This test uses virtual threads when available to verify that the checksum calculation can + // reliably be executed by virtual threads. + uri.append( + ";autocommit=false;retryAbortsInternally=true;useVirtualThreads=true;useVirtualGrpcTransportThreads=true"); } @Override