Skip to content

Commit

Permalink
chore: add support for virtual threads to Connection API (#2789)
Browse files Browse the repository at this point in the history
Adds support for using virtual threads in the Connection API. Virtual
threads can be enabled for two things:
1. As the StatementExecutor thread for each connection.
2. As the gRPC transport thread pool.

Both options can (for now) only be set in the Connection API.

Setting any of these options only has any effect if the application is
running on Java 21 or higher.
  • Loading branch information
olavloite authored Jan 29, 2024
1 parent 340ba13 commit c99189c
Show file tree
Hide file tree
Showing 15 changed files with 444 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
private final boolean leaderAwareRoutingEnabled;
private final boolean attemptDirectPath;
private final DirectedReadOptions directedReadOptions;
private final boolean useVirtualThreads;

/** Interface that can be used to provide {@link CallCredentials} to {@link SpannerOptions}. */
public interface CallCredentialsProvider {
Expand Down Expand Up @@ -580,9 +581,9 @@ public static CloseableExecutorProvider createAsyncExecutorProvider(
return FixedCloseableExecutorProvider.create(executor);
}

private SpannerOptions(Builder builder) {
protected SpannerOptions(Builder builder) {
super(SpannerFactory.class, SpannerRpcFactory.class, builder, new SpannerDefaults());
numChannels = builder.numChannels;
numChannels = builder.numChannels == null ? DEFAULT_CHANNELS : builder.numChannels;
Preconditions.checkArgument(
numChannels >= 1 && numChannels <= MAX_CHANNELS,
"Number of channels must fall in the range [1, %s], found: %s",
Expand Down Expand Up @@ -631,6 +632,7 @@ private SpannerOptions(Builder builder) {
leaderAwareRoutingEnabled = builder.leaderAwareRoutingEnabled;
attemptDirectPath = builder.attemptDirectPath;
directedReadOptions = builder.directedReadOptions;
useVirtualThreads = builder.useVirtualThreads;
}

/**
Expand Down Expand Up @@ -734,12 +736,13 @@ public static class Builder
private boolean leaderAwareRoutingEnabled = true;
private boolean attemptDirectPath = true;
private DirectedReadOptions directedReadOptions;
private boolean useVirtualThreads = false;

private static String createCustomClientLibToken(String token) {
return token + " " + ServiceOptions.getGoogApiClientLibName();
}

private Builder() {
protected Builder() {
// Manually set retry and polling settings that work.
OperationTimedPollAlgorithm longRunningPollingAlgorithm =
OperationTimedPollAlgorithm.create(
Expand Down Expand Up @@ -795,6 +798,7 @@ private Builder() {
this.interceptorProvider = options.interceptorProvider;
this.attemptDirectPath = options.attemptDirectPath;
this.directedReadOptions = options.directedReadOptions;
this.useVirtualThreads = options.useVirtualThreads;
}

@Override
Expand Down Expand Up @@ -1263,6 +1267,16 @@ public Builder disableDirectPath() {
return this;
}

/**
* Enables/disables the use of virtual threads for the gRPC executor. Setting this option only
* has any effect on Java 21 and higher. In all other cases, the option will be ignored.
*/
@BetaApi
protected Builder setUseVirtualThreads(boolean useVirtualThreads) {
this.useVirtualThreads = useVirtualThreads;
return this;
}

@SuppressWarnings("rawtypes")
@Override
public SpannerOptions build() {
Expand Down Expand Up @@ -1412,6 +1426,11 @@ public boolean isAttemptDirectPath() {
return attemptDirectPath;
}

@BetaApi
public boolean isUseVirtualThreads() {
return useVirtualThreads;
}

/** Returns the default query options to use for the specific database. */
public QueryOptions getDefaultQueryOptions(DatabaseId databaseId) {
// Use the specific query options for the database if any have been specified. These have
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import com.google.api.core.InternalApi;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import javax.annotation.Nullable;

/** Utility class for creating a thread factory for daemon or virtual threads. */
@InternalApi
public class ThreadFactoryUtil {

/**
* Tries to create a thread factory for virtual threads, and otherwise falls back to creating a
* platform thread factory that creates daemon threads. Virtual threads are supported from JDK21.
*
* @param baseNameFormat the base name format for the threads, '-%d' will be appended to the
* actual thread name format
* @param tryVirtualThreads whether to try to use virtual threads if available or not
* @return a {@link ThreadFactory} that produces virtual threads (Java 21 or higher) or platform
* daemon threads
*/
@InternalApi
public static ThreadFactory createVirtualOrPlatformDaemonThreadFactory(
String baseNameFormat, boolean tryVirtualThreads) {
ThreadFactory virtualThreadFactory =
tryVirtualThreads ? tryCreateVirtualThreadFactory(baseNameFormat) : null;
if (virtualThreadFactory != null) {
return virtualThreadFactory;
}

return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(baseNameFormat + "-%d").build();
}

/**
* Tries to create a {@link ThreadFactory} that creates virtual threads. Returns null if virtual
* threads are not supported on this JVM.
*/
@InternalApi
@Nullable
public static ThreadFactory tryCreateVirtualThreadFactory(String baseNameFormat) {
try {
Class<?> threadBuilderClass = Class.forName("java.lang.Thread$Builder");
Method ofVirtualMethod = Thread.class.getDeclaredMethod("ofVirtual");
Object virtualBuilder = ofVirtualMethod.invoke(null);
Method nameMethod = threadBuilderClass.getDeclaredMethod("name", String.class, long.class);
virtualBuilder = nameMethod.invoke(virtualBuilder, baseNameFormat + "-", 0);
Method factoryMethod = threadBuilderClass.getDeclaredMethod("factory");
return (ThreadFactory) factoryMethod.invoke(virtualBuilder);
} catch (ClassNotFoundException | NoSuchMethodException ignore) {
return null;
} catch (InvocationTargetException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}

/**
* Tries to create an {@link ExecutorService} that creates a new virtual thread for each task that
* it runs. Creating a new virtual thread is the recommended way to create executors using virtual
* threads, instead of creating a pool of virtual threads. Returns null if virtual threads are not
* supported on this JVM.
*/
@InternalApi
@Nullable
public static ExecutorService tryCreateVirtualThreadPerTaskExecutor(String baseNameFormat) {
ThreadFactory factory = tryCreateVirtualThreadFactory(baseNameFormat);
if (factory != null) {
try {
Method newThreadPerTaskExecutorMethod =
Executors.class.getDeclaredMethod("newThreadPerTaskExecutor", ThreadFactory.class);
return (ExecutorService) newThreadPerTaskExecutorMethod.invoke(null, factory);
} catch (NoSuchMethodException ignore) {
return null;
} catch (InvocationTargetException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.math.BigDecimal;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;

/**
* {@link ResultSet} implementation that keeps a running checksum that can be used to determine
Expand Down Expand Up @@ -66,7 +67,7 @@
@VisibleForTesting
class ChecksumResultSet extends ReplaceableForwardingResultSet implements RetriableStatement {
private final ReadWriteTransaction transaction;
private volatile long numberOfNextCalls;
private final AtomicLong numberOfNextCalls = new AtomicLong();
private final ParsedStatement statement;
private final AnalyzeMode analyzeMode;
private final QueryOption[] options;
Expand Down Expand Up @@ -103,7 +104,7 @@ public Boolean call() {
if (res) {
checksumCalculator.calculateNextChecksum(getCurrentRowAsStruct());
}
numberOfNextCalls++;
numberOfNextCalls.incrementAndGet();
return res;
}
}
Expand Down Expand Up @@ -142,7 +143,7 @@ public void retry(AbortedException aborted) throws AbortedException {
DirectExecuteResultSet.ofResultSet(
transaction.internalExecuteQuery(statement, analyzeMode, options));
boolean next = true;
while (counter < numberOfNextCalls && next) {
while (counter < numberOfNextCalls.get() && next) {
transaction
.getStatementExecutor()
.invokeInterceptors(
Expand All @@ -169,7 +170,7 @@ public void retry(AbortedException aborted) throws AbortedException {
// Check that we have the same number of rows and the same checksum.
HashCode newChecksum = newChecksumCalculator.getChecksum();
HashCode currentChecksum = checksumCalculator.getChecksum();
if (counter == numberOfNextCalls && Objects.equals(newChecksum, currentChecksum)) {
if (counter == numberOfNextCalls.get() && Objects.equals(newChecksum, currentChecksum)) {
// Checksum is ok, we only need to replace the delegate result set if it's still open.
if (isClosed()) {
resultSet.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,9 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
Preconditions.checkNotNull(options);
this.leakedException =
options.isTrackConnectionLeaks() ? new LeakedConnectionException() : null;
this.statementExecutor = new StatementExecutor(options.getStatementExecutionInterceptors());
this.statementExecutor =
new StatementExecutor(
options.isUseVirtualThreads(), options.getStatementExecutionInterceptors());
this.spannerPool = SpannerPool.INSTANCE;
this.options = options;
this.spanner = spannerPool.getSpanner(options, this);
Expand Down Expand Up @@ -283,7 +285,8 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
BatchClient batchClient) {
this.leakedException =
options.isTrackConnectionLeaks() ? new LeakedConnectionException() : null;
this.statementExecutor = new StatementExecutor(Collections.emptyList());
this.statementExecutor =
new StatementExecutor(options.isUseVirtualThreads(), Collections.emptyList());
this.spannerPool = Preconditions.checkNotNull(spannerPool);
this.options = Preconditions.checkNotNull(options);
this.spanner = spannerPool.getSpanner(options, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ public String[] getValidValues() {
static final boolean DEFAULT_AUTOCOMMIT = true;
static final boolean DEFAULT_READONLY = false;
static final boolean DEFAULT_RETRY_ABORTS_INTERNALLY = true;
static final boolean DEFAULT_USE_VIRTUAL_THREADS = false;
static final boolean DEFAULT_USE_VIRTUAL_GRPC_TRANSPORT_THREADS = false;
private static final String DEFAULT_CREDENTIALS = null;
private static final String DEFAULT_OAUTH_TOKEN = null;
private static final String DEFAULT_MIN_SESSIONS = null;
Expand Down Expand Up @@ -204,6 +206,11 @@ public String[] getValidValues() {
public static final String ROUTE_TO_LEADER_PROPERTY_NAME = "routeToLeader";
/** Name of the 'retry aborts internally' connection property. */
public static final String RETRY_ABORTS_INTERNALLY_PROPERTY_NAME = "retryAbortsInternally";
/** Name of the property to enable/disable virtual threads for the statement executor. */
public static final String USE_VIRTUAL_THREADS_PROPERTY_NAME = "useVirtualThreads";
/** Name of the property to enable/disable virtual threads for gRPC transport. */
public static final String USE_VIRTUAL_GRPC_TRANSPORT_THREADS_PROPERTY_NAME =
"useVirtualGrpcTransportThreads";
/** Name of the 'credentials' connection property. */
public static final String CREDENTIALS_PROPERTY_NAME = "credentials";
/** Name of the 'encodedCredentials' connection property. */
Expand Down Expand Up @@ -293,6 +300,16 @@ private static String generateGuardedConnectionPropertyError(
RETRY_ABORTS_INTERNALLY_PROPERTY_NAME,
"Should the connection automatically retry Aborted errors (true/false)",
DEFAULT_RETRY_ABORTS_INTERNALLY),
ConnectionProperty.createBooleanProperty(
USE_VIRTUAL_THREADS_PROPERTY_NAME,
"Use a virtual thread instead of a platform thread for each connection (true/false). "
+ "This option only has any effect if the application is running on Java 21 or higher. In all other cases, the option is ignored.",
DEFAULT_USE_VIRTUAL_THREADS),
ConnectionProperty.createBooleanProperty(
USE_VIRTUAL_GRPC_TRANSPORT_THREADS_PROPERTY_NAME,
"Use a virtual thread instead of a platform thread for the gRPC executor (true/false). "
+ "This option only has any effect if the application is running on Java 21 or higher. In all other cases, the option is ignored.",
DEFAULT_USE_VIRTUAL_GRPC_TRANSPORT_THREADS),
ConnectionProperty.createStringProperty(
CREDENTIALS_PROPERTY_NAME,
"The location of the credentials file to use for this connection. If neither this property or encoded credentials are set, the connection will use the default Google Cloud credentials for the runtime environment."),
Expand Down Expand Up @@ -672,6 +689,8 @@ public static Builder newBuilder() {
private final boolean readOnly;
private final boolean routeToLeader;
private final boolean retryAbortsInternally;
private final boolean useVirtualThreads;
private final boolean useVirtualGrpcTransportThreads;
private final List<StatementExecutionInterceptor> statementExecutionInterceptors;
private final SpannerOptionsConfigurator configurator;

Expand Down Expand Up @@ -771,6 +790,8 @@ private ConnectionOptions(Builder builder) {
this.readOnly = parseReadOnly(this.uri);
this.routeToLeader = parseRouteToLeader(this.uri);
this.retryAbortsInternally = parseRetryAbortsInternally(this.uri);
this.useVirtualThreads = parseUseVirtualThreads(this.uri);
this.useVirtualGrpcTransportThreads = parseUseVirtualGrpcTransportThreads(this.uri);
this.statementExecutionInterceptors =
Collections.unmodifiableList(builder.statementExecutionInterceptors);
this.configurator = builder.configurator;
Expand Down Expand Up @@ -873,6 +894,18 @@ static boolean parseRetryAbortsInternally(String uri) {
return value != null ? Boolean.parseBoolean(value) : DEFAULT_RETRY_ABORTS_INTERNALLY;
}

@VisibleForTesting
static boolean parseUseVirtualThreads(String uri) {
String value = parseUriProperty(uri, USE_VIRTUAL_THREADS_PROPERTY_NAME);
return value != null ? Boolean.parseBoolean(value) : DEFAULT_USE_VIRTUAL_THREADS;
}

@VisibleForTesting
static boolean parseUseVirtualGrpcTransportThreads(String uri) {
String value = parseUriProperty(uri, USE_VIRTUAL_GRPC_TRANSPORT_THREADS_PROPERTY_NAME);
return value != null ? Boolean.parseBoolean(value) : DEFAULT_USE_VIRTUAL_GRPC_TRANSPORT_THREADS;
}

@VisibleForTesting
static @Nullable String parseCredentials(String uri) {
String value = parseUriProperty(uri, CREDENTIALS_PROPERTY_NAME);
Expand Down Expand Up @@ -1293,6 +1326,16 @@ public boolean isRetryAbortsInternally() {
return retryAbortsInternally;
}

/** Whether connections should use virtual threads for connection executors. */
public boolean isUseVirtualThreads() {
return useVirtualThreads;
}

/** Whether virtual threads should be used for gRPC transport. */
public boolean isUseVirtualGrpcTransportThreads() {
return useVirtualGrpcTransportThreads;
}

/** Any warnings that were generated while creating the {@link ConnectionOptions} instance. */
@Nullable
public String getWarnings() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner.connection;

import com.google.cloud.spanner.SpannerOptions;

/**
* This class is used for building {@link SpannerOptions} for {@link Connection} instances. It gives
* access to (experimental) properties that are not public in the standard {@link SpannerOptions}
* implementation.
*/
class ConnectionSpannerOptions extends SpannerOptions {
public static Builder newBuilder() {
return new Builder();
}

static class Builder extends SpannerOptions.Builder {
Builder() {}

@Override
protected SpannerOptions.Builder setUseVirtualThreads(boolean useVirtualThreads) {
return super.setUseVirtualThreads(useVirtualThreads);
}

@Override
public ConnectionSpannerOptions build() {
return new ConnectionSpannerOptions(this);
}
}

ConnectionSpannerOptions(Builder builder) {
super(builder);
}
}
Loading

0 comments on commit c99189c

Please sign in to comment.