diff --git a/.kokoro/build.sh b/.kokoro/build.sh
index 96e009fb69a..d3eaf9922bb 100755
--- a/.kokoro/build.sh
+++ b/.kokoro/build.sh
@@ -109,6 +109,21 @@ integration-directpath-enabled)
verify
RETURN_CODE=$?
;;
+integration-multiplexed-sessions-enabled)
+ mvn -B ${INTEGRATION_TEST_ARGS} \
+ -ntp \
+ -Penable-integration-tests \
+ -Djava.net.preferIPv4Stack=true \
+ -DtrimStackTrace=false \
+ -Dclirr.skip=true \
+ -Denforcer.skip=true \
+ -Dmaven.main.skip=true \
+ -Dspanner.gce.config.project_id=gcloud-devel \
+ -Dspanner.testenv.instance=projects/gcloud-devel/instances/java-client-integration-tests-multiplexed-sessions \
+ -fae \
+ verify
+ RETURN_CODE=$?
+ ;;
integration-cloud-devel)
mvn -B ${INTEGRATION_TEST_ARGS} \
-ntp \
diff --git a/.kokoro/presubmit/integration-multiplexed-sessions-enabled.cfg b/.kokoro/presubmit/integration-multiplexed-sessions-enabled.cfg
new file mode 100644
index 00000000000..0acb1a445b0
--- /dev/null
+++ b/.kokoro/presubmit/integration-multiplexed-sessions-enabled.cfg
@@ -0,0 +1,38 @@
+# Format: //devtools/kokoro/config/proto/build.proto
+
+# Configure the docker image for kokoro-trampoline.
+env_vars: {
+ key: "TRAMPOLINE_IMAGE"
+ value: "gcr.io/cloud-devrel-kokoro-resources/java8"
+}
+
+env_vars: {
+ key: "JOB_TYPE"
+ value: "integration-multiplexed-sessions-enabled"
+}
+
+# TODO: remove this after we've migrated all tests and scripts
+env_vars: {
+ key: "GCLOUD_PROJECT"
+ value: "gcloud-devel"
+}
+
+env_vars: {
+ key: "GOOGLE_CLOUD_PROJECT"
+ value: "gcloud-devel"
+}
+
+env_vars: {
+ key: "GOOGLE_APPLICATION_CREDENTIALS"
+ value: "secret_manager/java-it-service-account"
+}
+
+env_vars: {
+ key: "SECRET_MANAGER_KEYS"
+ value: "java-it-service-account"
+}
+
+env_vars: {
+ key: "GOOGLE_CLOUD_SPANNER_ENABLE_MULTIPLEXED_SESSIONS"
+ value: "true"
+}
\ No newline at end of file
diff --git a/google-cloud-spanner/clirr-ignored-differences.xml b/google-cloud-spanner/clirr-ignored-differences.xml
index 4b10a3bf2c9..fd0477c6716 100644
--- a/google-cloud-spanner/clirr-ignored-differences.xml
+++ b/google-cloud-spanner/clirr-ignored-differences.xml
@@ -20,6 +20,12 @@
8001com/google/cloud/spanner/connection/StatementParser
+
+
+ 7002
+ com/google/cloud/spanner/Session
+ void prepareReadWriteTransaction()
+ 7002com/google/cloud/spanner/SpannerOptions
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Session.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Session.java
index 98c40b49ccc..61d13db8d82 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Session.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Session.java
@@ -18,7 +18,6 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.InternalApi;
-import com.google.cloud.spanner.Options.TransactionOption;
import com.google.protobuf.Empty;
/**
@@ -48,16 +47,6 @@ public interface Session extends DatabaseClient, AutoCloseable {
/** Returns the resource name associated with this session. */
String getName();
- /**
- * Prepares a transaction for use by a subsequent {@link
- * DatabaseClient#readWriteTransaction(TransactionOption...)} or {@link #write(Iterable)} call. It
- * is not necessary to call this method before running a transaction or performing a write, but
- * doing so may allow one round trip of the protocol to be performed in advance; calling this
- * method on an idle session that is expected to execute a transaction or write in the near future
- * may reduce the latency of the subsequent transaction/write.
- */
- void prepareReadWriteTransaction();
-
@Override
void close();
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java
index f3723618b9e..26d6e962116 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java
@@ -151,8 +151,8 @@ public void run() {
}
/**
- * Callback interface to be used for BatchCreateSessions. When sessions become available or
- * session creation fails, one of the callback methods will be called.
+ * Callback interface to be used for Sessions. When sessions become available or session creation
+ * fails, one of the callback methods will be called.
*/
interface SessionConsumer {
/** Called when a session has been created and is ready for use. */
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java
index 8c4a0068599..436c670d58c 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java
@@ -100,7 +100,6 @@ interface SessionTransaction {
private final String name;
private final DatabaseId databaseId;
private SessionTransaction activeTransaction;
- ByteString readyTransactionId;
private final Map options;
private volatile Instant lastUseTime;
@Nullable private final Instant createTime;
@@ -378,12 +377,6 @@ public AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption...
return new AsyncTransactionManagerImpl(this, currentSpan, options);
}
- @Override
- public void prepareReadWriteTransaction() {
- setActive(null);
- readyTransactionId = beginTransaction(true);
- }
-
@Override
public ApiFuture asyncClose() {
return spanner.getRpc().asyncDeleteSession(name, options);
@@ -402,20 +395,6 @@ public void close() {
}
}
- ByteString beginTransaction(boolean routeToLeader) {
- try {
- return beginTransactionAsync(routeToLeader).get();
- } catch (ExecutionException e) {
- throw SpannerExceptionFactory.newSpannerException(e.getCause() == null ? e : e.getCause());
- } catch (InterruptedException e) {
- throw SpannerExceptionFactory.propagateInterrupt(e);
- }
- }
-
- ApiFuture beginTransactionAsync(boolean routeToLeader) {
- return beginTransactionAsync(Options.fromTransactionOptions(), routeToLeader);
- }
-
ApiFuture beginTransactionAsync(Options transactionOptions, boolean routeToLeader) {
final SettableApiFuture res = SettableApiFuture.create();
final ISpan span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION);
@@ -463,7 +442,7 @@ TransactionContextImpl newTransaction(Options options) {
return TransactionContextImpl.newBuilder()
.setSession(this)
.setOptions(options)
- .setTransactionId(readyTransactionId)
+ .setTransactionId(null)
.setOptions(options)
.setTrackTransactionStarter(spanner.getOptions().isTrackTransactionStarter())
.setRpc(spanner.getRpc())
@@ -484,17 +463,12 @@ T setActive(@Nullable T ctx) {
activeTransaction.invalidate();
}
activeTransaction = ctx;
- readyTransactionId = null;
if (activeTransaction != null) {
activeTransaction.setSpan(currentSpan);
}
return ctx;
}
- boolean hasReadyTransaction() {
- return readyTransactionId != null;
- }
-
TraceWrapper getTracer() {
return tracer;
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java
index 9dd44cf01b9..f4c04e4c78b 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java
@@ -104,6 +104,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@@ -146,6 +147,7 @@ void maybeWaitOnMinSessions() {
}
private abstract static class CachedResultSetSupplier implements Supplier {
+
private ResultSet cached;
abstract ResultSet load();
@@ -1155,6 +1157,46 @@ private PooledSessionFuture createPooledSessionFuture(
return new PooledSessionFuture(future, span);
}
+ /** Wrapper class for the {@link SessionFuture} implementations. */
+ interface SessionFutureWrapper {
+
+ /** Method to resolve {@link SessionFuture} implementation for different use-cases. */
+ T get();
+ }
+
+ class PooledSessionFutureWrapper implements SessionFutureWrapper {
+ PooledSessionFuture pooledSessionFuture;
+
+ public PooledSessionFutureWrapper(PooledSessionFuture pooledSessionFuture) {
+ this.pooledSessionFuture = pooledSessionFuture;
+ }
+
+ @Override
+ public PooledSessionFuture get() {
+ return this.pooledSessionFuture;
+ }
+ }
+
+ class MultiplexedSessionFutureWrapper implements SessionFutureWrapper {
+ SettableApiFuture multiplexedSessionSettableApiFuture;
+
+ public MultiplexedSessionFutureWrapper(
+ SettableApiFuture multiplexedSessionSettableApiFuture) {
+ this.multiplexedSessionSettableApiFuture = multiplexedSessionSettableApiFuture;
+ }
+
+ @Override
+ public MultiplexedSessionFuture get() {
+ try {
+ return this.multiplexedSessionSettableApiFuture.get();
+ } catch (InterruptedException interruptedException) {
+ throw SpannerExceptionFactory.propagateInterrupt(interruptedException);
+ } catch (ExecutionException executionException) {
+ throw SpannerExceptionFactory.asSpannerException(executionException.getCause());
+ }
+ }
+ }
+
interface SessionFuture extends Session {
/**
@@ -1363,11 +1405,6 @@ public String getName() {
return get().getName();
}
- @Override
- public void prepareReadWriteTransaction() {
- get().prepareReadWriteTransaction();
- }
-
@Override
public void close() {
try {
@@ -1440,12 +1477,9 @@ PooledSession get(final boolean eligibleForLongRunning) {
class MultiplexedSessionFuture extends SimpleForwardingListenableFuture
implements SessionFuture {
- private final ISpan span;
-
@VisibleForTesting
- MultiplexedSessionFuture(ListenableFuture delegate, ISpan span) {
+ MultiplexedSessionFuture(ListenableFuture delegate) {
super(delegate);
- this.span = span;
}
@Override
@@ -1618,11 +1652,6 @@ public String getName() {
return get().getName();
}
- @Override
- public void prepareReadWriteTransaction() {
- get().prepareReadWriteTransaction();
- }
-
@Override
public void close() {
try {
@@ -1655,15 +1684,6 @@ private MultiplexedSession getOrNull() {
@Override
public MultiplexedSession get() {
- MultiplexedSession res = null;
- try {
- res = super.get();
- } catch (Throwable e) {
- // ignore the exception as it will be handled by the call to super.get() below.
- }
- if (res != null) {
- res.markBusy(span);
- }
try {
return super.get();
} catch (ExecutionException e) {
@@ -1901,12 +1921,6 @@ public String getName() {
return delegate.getName();
}
- @Override
- public void prepareReadWriteTransaction() {
- markUsed();
- delegate.prepareReadWriteTransaction();
- }
-
private void keepAlive() {
markUsed();
final ISpan previousSpan = delegate.getCurrentSpan();
@@ -2129,12 +2143,6 @@ public String getName() {
return delegate.getName();
}
- @Override
- public void prepareReadWriteTransaction() {
- throw SpannerExceptionFactory.newSpannerException(
- ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session");
- }
-
@Override
public void close() {
synchronized (lock) {
@@ -2253,6 +2261,9 @@ private PooledSession pollUninterruptiblyWithTimeout(
*/
final class PoolMaintainer {
+ // Delay post which the maintainer will retry creating/replacing the current multiplexed session
+ private final Duration multiplexedSessionCreationRetryDelay = Duration.ofMinutes(10);
+
// Length of the window in millis over which we keep track of maximum number of concurrent
// sessions in use.
private final Duration windowLength = Duration.ofMillis(TimeUnit.MINUTES.toMillis(10));
@@ -2276,6 +2287,8 @@ final class PoolMaintainer {
*/
@VisibleForTesting Instant lastExecutionTime;
+ @VisibleForTesting Instant multiplexedSessionReplacementAttemptTime;
+
/**
* The previous numSessionsAcquired seen by the maintainer. This is used to calculate the
* transactions per second, which again is used to determine whether to randomize the order of
@@ -2293,6 +2306,8 @@ final class PoolMaintainer {
void init() {
lastExecutionTime = clock.instant();
+ multiplexedSessionReplacementAttemptTime = clock.instant();
+
// Scheduled pool maintenance worker.
synchronized (lock) {
scheduledFuture =
@@ -2334,6 +2349,7 @@ void maintainPool() {
this.prevNumSessionsAcquired = SessionPool.this.numSessionsAcquired;
}
Instant currTime = clock.instant();
+ maintainMultiplexedSession(currTime);
removeIdleSessions(currTime);
// Now go over all the remaining sessions and see if they need to be kept alive explicitly.
keepAliveSessions(currTime);
@@ -2502,6 +2518,46 @@ private void removeLongRunningSessions(
}
}
}
+
+ void maintainMultiplexedSession(Instant currentTime) {
+ try {
+ if (options.getUseMultiplexedSession()) {
+ synchronized (lock) {
+ if (getMultiplexedSession().isDone()
+ && getMultiplexedSession().get() != null
+ && isMultiplexedSessionStale(currentTime)) {
+ final Instant minExecutionTime =
+ multiplexedSessionReplacementAttemptTime.plus(
+ multiplexedSessionCreationRetryDelay);
+ if (currentTime.isBefore(minExecutionTime)) {
+ return;
+ }
+
+ /*
+ This will attempt to create a new multiplexed session. if successfully created then
+ the existing session will be replaced. Note that there maybe active transactions
+ running on the stale session. Hence, it is important that we only replace the reference
+ and not invoke a DeleteSession RPC.
+ */
+ maybeCreateMultiplexedSession(multiplexedMaintainerConsumer);
+
+ // update this only after we have attempted to replace the multiplexed session
+ multiplexedSessionReplacementAttemptTime = currentTime;
+ }
+ }
+ }
+ } catch (final Throwable t) {
+ logger.log(Level.WARNING, "Failed to maintain multiplexed session", t);
+ }
+ }
+
+ boolean isMultiplexedSessionStale(Instant currentTime) {
+ final CachedSession session = getMultiplexedSession().get();
+ final Duration durationFromCreationTime =
+ Duration.between(session.getDelegate().getCreateTime(), currentTime);
+ return durationFromCreationTime.compareTo(options.getMultiplexedSessionMaintenanceDuration())
+ > 0;
+ }
}
enum Position {
@@ -2578,6 +2634,9 @@ enum Position {
@GuardedBy("lock")
private int numSessionsBeingCreated = 0;
+ @GuardedBy("lock")
+ private boolean multiplexedSessionBeingCreated = false;
+
@GuardedBy("lock")
private int numSessionsInUse = 0;
@@ -2607,6 +2666,10 @@ enum Position {
private AtomicLong numWaiterTimeouts = new AtomicLong();
+ private final AtomicReference>
+ currentMultiplexedSessionReference = new AtomicReference<>(SettableApiFuture.create());
+ MultiplexedSessionFutureWrapper wrappedMultiplexedSessionFuture = null;
+
@GuardedBy("lock")
private final Set allSessions = new HashSet<>();
@@ -2615,9 +2678,16 @@ enum Position {
final Set checkedOutSessions = new HashSet<>();
private final SessionConsumer sessionConsumer = new SessionConsumerImpl();
+
+ private final MultiplexedSessionInitializationConsumer multiplexedSessionInitializationConsumer =
+ new MultiplexedSessionInitializationConsumer();
+ private final MultiplexedSessionMaintainerConsumer multiplexedMaintainerConsumer =
+ new MultiplexedSessionMaintainerConsumer();
+
@VisibleForTesting Function idleSessionRemovedListener;
@VisibleForTesting Function longRunningSessionRemovedListener;
+ @VisibleForTesting Function multiplexedSessionRemovedListener;
private final CountDownLatch waitOnMinSessionsLatch;
private final SessionReplacementHandler pooledSessionReplacementHandler =
new PooledSessionReplacementHandler();
@@ -2861,6 +2931,9 @@ private void initPool() {
if (options.getMinSessions() > 0) {
createSessions(options.getMinSessions(), true);
}
+ if (options.getUseMultiplexedSession()) {
+ maybeCreateMultiplexedSession(multiplexedSessionInitializationConsumer);
+ }
}
}
@@ -2922,6 +2995,38 @@ boolean isValid() {
}
}
+ /**
+ * Returns a multiplexed session. The method fallbacks to a regular session if {@link
+ * SessionPoolOptions#useMultiplexedSession} is not set.
+ */
+ SessionFutureWrapper getMultiplexedSessionWithFallback() throws SpannerException {
+ if (options.getUseMultiplexedSession()) {
+ try {
+ SessionFutureWrapper sessionFuture = getWrappedMultiplexedSessionFuture();
+ incrementNumSessionsInUse(true);
+ return sessionFuture;
+ } catch (Throwable t) {
+ ISpan span = tracer.getCurrentSpan();
+ span.addAnnotation("No multiplexed session available.");
+ throw SpannerExceptionFactory.asSpannerException(t.getCause());
+ }
+ } else {
+ return new PooledSessionFutureWrapper(getSession());
+ }
+ }
+
+ SessionFutureWrapper getWrappedMultiplexedSessionFuture() {
+ return wrappedMultiplexedSessionFuture;
+ }
+
+ /**
+ * This method is a blocking method. It will block until the underlying {@code
+ * SettableApiFuture} is resolved.
+ */
+ MultiplexedSessionFuture getMultiplexedSession() {
+ return (MultiplexedSessionFuture) getWrappedMultiplexedSessionFuture().get();
+ }
+
/**
* Returns a session to be used for requests to spanner. This method is always non-blocking and
* returns a {@link PooledSessionFuture}. In case the pool is exhausted and {@link
@@ -3325,6 +3430,20 @@ private boolean canCreateSession() {
}
}
+ private void maybeCreateMultiplexedSession(SessionConsumer sessionConsumer) {
+ synchronized (lock) {
+ if (!multiplexedSessionBeingCreated) {
+ logger.log(Level.FINE, String.format("Creating multiplexed sessions"));
+ try {
+ multiplexedSessionBeingCreated = true;
+ sessionClient.createMultiplexedSession(sessionConsumer);
+ } catch (Throwable ignore) {
+ // such an exception will never be thrown. the exception will be passed onto the consumer.
+ }
+ }
+ }
+ }
+
private void createSessions(final int sessionCount, boolean distributeOverChannels) {
logger.log(Level.FINE, String.format("Creating %d sessions", sessionCount));
synchronized (lock) {
@@ -3347,6 +3466,103 @@ private void createSessions(final int sessionCount, boolean distributeOverChanne
}
}
+ /**
+ * Callback interface which is invoked when a multiplexed session is being replaced by the
+ * background maintenance thread. When a multiplexed session creation fails during background
+ * thread, it would simply log the exception and retry the session creation in the next background
+ * thread invocation.
+ *
+ *
This consumer is not used when the multiplexed session is getting initialized for the first
+ * time during application startup. We instead use {@link
+ * MultiplexedSessionInitializationConsumer} for the first time when multiplexed session is
+ * getting created.
+ */
+ class MultiplexedSessionMaintainerConsumer implements SessionConsumer {
+ @Override
+ public void onSessionReady(SessionImpl sessionImpl) {
+ final SettableFuture settableFuture = SettableFuture.create();
+ final MultiplexedSession newSession = new MultiplexedSession(sessionImpl);
+ settableFuture.set(newSession);
+
+ synchronized (lock) {
+ MultiplexedSession oldSession = null;
+ if (currentMultiplexedSessionReference.get().isDone()) {
+ oldSession = getMultiplexedSession().get();
+ }
+ SettableApiFuture settableApiFuture = SettableApiFuture.create();
+ settableApiFuture.set(new MultiplexedSessionFuture(settableFuture));
+ currentMultiplexedSessionReference.set(settableApiFuture);
+ wrappedMultiplexedSessionFuture = new MultiplexedSessionFutureWrapper(settableApiFuture);
+ if (oldSession != null) {
+ logger.log(
+ Level.INFO,
+ String.format(
+ "Removed Multiplexed Session => %s created at => %s and",
+ oldSession.getName(), oldSession.getDelegate().getCreateTime()));
+ if (multiplexedSessionRemovedListener != null) {
+ multiplexedSessionRemovedListener.apply(oldSession);
+ }
+ }
+ multiplexedSessionBeingCreated = false;
+ }
+ }
+
+ /**
+ * Method which logs the exception so that session creation can be re-attempted in the next
+ * background thread invocation.
+ */
+ @Override
+ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount) {
+ synchronized (lock) {
+ multiplexedSessionBeingCreated = false;
+ wrappedMultiplexedSessionFuture =
+ new MultiplexedSessionFutureWrapper(currentMultiplexedSessionReference.get());
+ }
+ logger.log(
+ Level.WARNING,
+ String.format(
+ "Failed to create multiplexed session. "
+ + "Pending replacing stale multiplexed session",
+ t));
+ }
+ }
+
+ /**
+ * Callback interface which is invoked when a multiplexed session is getting initialised for the
+ * first time when a session is getting created.
+ */
+ class MultiplexedSessionInitializationConsumer implements SessionConsumer {
+ @Override
+ public void onSessionReady(SessionImpl sessionImpl) {
+ final SettableFuture settableFuture = SettableFuture.create();
+ final MultiplexedSession newSession = new MultiplexedSession(sessionImpl);
+ settableFuture.set(newSession);
+
+ synchronized (lock) {
+ SettableApiFuture settableApiFuture =
+ currentMultiplexedSessionReference.get();
+ settableApiFuture.set(new MultiplexedSessionFuture(settableFuture));
+ wrappedMultiplexedSessionFuture = new MultiplexedSessionFutureWrapper(settableApiFuture);
+ multiplexedSessionBeingCreated = false;
+ }
+ }
+
+ /**
+ * When a multiplexed session fails during initialization we would like all pending threads to
+ * receive the exception and throw the error. This is done because at the time of start up there
+ * is no other multiplexed session which could have been assigned to the pending requests.
+ */
+ @Override
+ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount) {
+ synchronized (lock) {
+ multiplexedSessionBeingCreated = false;
+ wrappedMultiplexedSessionFuture =
+ new MultiplexedSessionFutureWrapper(currentMultiplexedSessionReference.get());
+ currentMultiplexedSessionReference.get().setException(newSpannerException(t));
+ }
+ }
+ }
+
/**
* {@link SessionConsumer} that receives the created sessions from a {@link SessionClient} and
* releases these into the pool. The session pool only needs one instance of this, as all sessions
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java
index 92b1baacc03..139dce8f0f0 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java
@@ -16,6 +16,8 @@
package com.google.cloud.spanner;
+import com.google.api.core.BetaApi;
+import com.google.api.core.InternalApi;
import com.google.cloud.spanner.SessionPool.Position;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -69,7 +71,6 @@ public class SessionPoolOptions {
/** Property for allowing mocking of session maintenance clock. */
private final Clock poolMaintainerClock;
- private final Duration waitForMultiplexedSession;
private final boolean useMultiplexedSession;
private final Duration multiplexedSessionMaintenanceDuration;
@@ -99,7 +100,6 @@ private SessionPoolOptions(Builder builder) {
this.poolMaintainerClock = builder.poolMaintainerClock;
this.useMultiplexedSession = builder.useMultiplexedSession;
this.multiplexedSessionMaintenanceDuration = builder.multiplexedSessionMaintenanceDuration;
- this.waitForMultiplexedSession = builder.waitForMultiplexedSession;
}
@Override
@@ -133,8 +133,8 @@ public boolean equals(Object o) {
&& Objects.equals(this.poolMaintainerClock, other.poolMaintainerClock)
&& Objects.equals(this.useMultiplexedSession, other.useMultiplexedSession)
&& Objects.equals(
- this.multiplexedSessionMaintenanceDuration, other.multiplexedSessionMaintenanceDuration)
- && Objects.equals(this.waitForMultiplexedSession, other.waitForMultiplexedSession);
+ this.multiplexedSessionMaintenanceDuration,
+ other.multiplexedSessionMaintenanceDuration);
}
@Override
@@ -161,8 +161,7 @@ public int hashCode() {
this.inactiveTransactionRemovalOptions,
this.poolMaintainerClock,
this.useMultiplexedSession,
- this.multiplexedSessionMaintenanceDuration,
- this.waitForMultiplexedSession);
+ this.multiplexedSessionMaintenanceDuration);
}
public Builder toBuilder() {
@@ -293,10 +292,6 @@ Duration getMultiplexedSessionMaintenanceDuration() {
return multiplexedSessionMaintenanceDuration;
}
- Duration getWaitForMultiplexedSession() {
- return waitForMultiplexedSession;
- }
-
public static Builder newBuilder() {
return new Builder();
}
@@ -493,9 +488,9 @@ public static class Builder {
*/
private long randomizePositionQPSThreshold = 0L;
- private boolean useMultiplexedSession = false;
+ private boolean useMultiplexedSession = getUseMultiplexedSessionFromEnvVariable();
+
private Duration multiplexedSessionMaintenanceDuration = Duration.ofDays(7);
- private Duration waitForMultiplexedSession = Duration.ofSeconds(10);
private Clock poolMaintainerClock;
private static Position getReleaseToPositionFromSystemProperty() {
@@ -511,6 +506,18 @@ private static Position getReleaseToPositionFromSystemProperty() {
return Position.FIRST;
}
+ /**
+ * This environment is only added to support internal spanner testing. Support for it can be
+ * removed in the future. Use {@link SessionPoolOptions#useMultiplexedSession} instead to use
+ * multiplexed sessions.
+ */
+ @InternalApi
+ @BetaApi
+ private static boolean getUseMultiplexedSessionFromEnvVariable() {
+ return Boolean.parseBoolean(
+ System.getenv("GOOGLE_CLOUD_SPANNER_ENABLE_MULTIPLEXED_SESSIONS"));
+ }
+
public Builder() {}
private Builder(SessionPoolOptions options) {
@@ -721,24 +728,6 @@ Builder setMultiplexedSessionMaintenanceDuration(
return this;
}
- /**
- * This option is only used when {@link SessionPoolOptions#useMultiplexedSession} is set to
- * true. If greater than zero, calls to {@link Spanner#getDatabaseClient(DatabaseId)} will block
- * for up to the given duration while waiting for the multiplexed session to be created. The
- * default value for this is 10 seconds.
- *
- *
If this is set to null or zero, the client does not wait for the session to be created,
- * which means that the first read requests could see more latency, as they will need to wait
- * until the multiplexed session has been created.
- *
- *
Note that we would need to use the option {@link SessionPoolOptions#waitForMinSessions} if
- * we want a similar blocking behavior for the other sessions within the session pool.
- */
- Builder setWaitForMultiplexedSession(Duration waitForMultiplexedSession) {
- this.waitForMultiplexedSession = waitForMultiplexedSession;
- return this;
- }
-
/**
* Sets whether the client should automatically execute a background query to detect the dialect
* that is used by the database or not. Set this option to true if you do not know what the
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java
index cbfa4bbb609..743d21b587c 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java
@@ -29,6 +29,7 @@
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.spi.v1.SpannerRpc.Option;
import com.google.protobuf.Empty;
+import com.google.protobuf.Timestamp;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
@@ -121,6 +122,44 @@ public CommitResponse writeWithOptions(
return session;
}
+ SessionImpl buildMockMultiplexedSession(ReadContext context, Timestamp creationTime) {
+ SpannerImpl spanner = mock(SpannerImpl.class);
+ Map options = new HashMap<>();
+ final SessionImpl session =
+ new SessionImpl(
+ spanner,
+ "projects/dummy/instances/dummy/databases/dummy/sessions/session" + sessionIndex,
+ creationTime,
+ true,
+ options) {
+ @Override
+ public ReadContext singleUse(TimestampBound bound) {
+ // The below stubs are added so that we can mock keep-alive.
+ return context;
+ }
+
+ @Override
+ public ApiFuture asyncClose() {
+ return ApiFutures.immediateFuture(Empty.getDefaultInstance());
+ }
+
+ @Override
+ public CommitResponse writeAtLeastOnceWithOptions(
+ Iterable mutations, TransactionOption... transactionOptions)
+ throws SpannerException {
+ return new CommitResponse(com.google.spanner.v1.CommitResponse.getDefaultInstance());
+ }
+
+ @Override
+ public CommitResponse writeWithOptions(
+ Iterable mutations, TransactionOption... options) throws SpannerException {
+ return new CommitResponse(com.google.spanner.v1.CommitResponse.getDefaultInstance());
+ }
+ };
+ sessionIndex++;
+ return session;
+ }
+
void runMaintenanceLoop(FakeClock clock, SessionPool pool, long numCycles) {
for (int i = 0; i < numCycles; i++) {
pool.poolMaintainer.maintainPool();
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionMaintainerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionMaintainerTest.java
new file mode 100644
index 00000000000..2c3ada10803
--- /dev/null
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionMaintainerTest.java
@@ -0,0 +1,286 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.spanner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.SessionPool.MultiplexedSession;
+import com.google.cloud.spanner.SessionPool.MultiplexedSessionInitializationConsumer;
+import com.google.cloud.spanner.SessionPool.MultiplexedSessionMaintainerConsumer;
+import com.google.cloud.spanner.SessionPool.Position;
+import com.google.cloud.spanner.SessionPool.SessionFutureWrapper;
+import io.opencensus.trace.Tracing;
+import io.opentelemetry.api.OpenTelemetry;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.threeten.bp.Duration;
+import org.threeten.bp.Instant;
+
+@RunWith(JUnit4.class)
+public class MultiplexedSessionMaintainerTest extends BaseSessionPoolTest {
+
+ private ExecutorService executor = Executors.newSingleThreadExecutor();
+ private @Mock SpannerImpl client;
+ private @Mock SessionClient sessionClient;
+ private @Mock SpannerOptions spannerOptions;
+ private DatabaseId db = DatabaseId.of("projects/p/instances/i/databases/unused");
+ private SessionPoolOptions options;
+ private FakeClock clock = new FakeClock();
+ private List multiplexedSessionsRemoved = new ArrayList<>();
+
+ @Before
+ public void setUp() {
+ initMocks(this);
+ when(client.getOptions()).thenReturn(spannerOptions);
+ when(client.getSessionClient(db)).thenReturn(sessionClient);
+ when(sessionClient.getSpanner()).thenReturn(client);
+ when(spannerOptions.getNumChannels()).thenReturn(4);
+ when(spannerOptions.getDatabaseRole()).thenReturn("role");
+ options =
+ SessionPoolOptions.newBuilder()
+ .setMinSessions(1)
+ .setMaxIdleSessions(1)
+ .setMaxSessions(5)
+ .setIncStep(1)
+ .setKeepAliveIntervalMinutes(2)
+ .setUseMultiplexedSession(true)
+ .build();
+ multiplexedSessionsRemoved.clear();
+ }
+
+ @Test
+ public void testMaintainMultiplexedSession_whenNewSessionCreated_assertThatStaleSessionIsRemoved()
+ throws InterruptedException {
+ doAnswer(
+ invocation -> {
+ MultiplexedSessionInitializationConsumer consumer =
+ invocation.getArgument(0, MultiplexedSessionInitializationConsumer.class);
+ ReadContext mockContext = mock(ReadContext.class);
+ Timestamp timestamp =
+ Timestamp.ofTimeSecondsAndNanos(
+ Instant.ofEpochMilli(clock.currentTimeMillis.get()).getEpochSecond(), 0);
+ consumer.onSessionReady(
+ setupMockSession(
+ buildMockMultiplexedSession(mockContext, timestamp.toProto()), mockContext));
+ return null;
+ })
+ .when(sessionClient)
+ .createMultiplexedSession(any(MultiplexedSessionInitializationConsumer.class));
+ doAnswer(
+ invocation -> {
+ MultiplexedSessionMaintainerConsumer consumer =
+ invocation.getArgument(0, MultiplexedSessionMaintainerConsumer.class);
+ ReadContext mockContext = mock(ReadContext.class);
+ Timestamp timestamp =
+ Timestamp.ofTimeSecondsAndNanos(
+ Instant.ofEpochMilli(clock.currentTimeMillis.get()).getEpochSecond(), 0);
+ consumer.onSessionReady(
+ setupMockSession(
+ buildMockMultiplexedSession(mockContext, timestamp.toProto()), mockContext));
+ return null;
+ })
+ .when(sessionClient)
+ .createMultiplexedSession(any(MultiplexedSessionMaintainerConsumer.class));
+
+ SessionPool pool = createPool();
+
+ // Run one maintenance loop.
+ SessionFutureWrapper session1 = pool.getMultiplexedSessionWithFallback();
+ runMaintenanceLoop(clock, pool, 1);
+ assertTrue(multiplexedSessionsRemoved.isEmpty());
+
+ // Advance clock by 8 days
+ clock.currentTimeMillis.addAndGet(Duration.ofDays(8).toMillis());
+
+ // Run second maintenance loop. the first session would now be stale since it has now existed
+ // for more than 7 days.
+ runMaintenanceLoop(clock, pool, 1);
+ SessionFutureWrapper session2 = pool.getMultiplexedSessionWithFallback();
+ assertNotEquals(session1.get().getName(), session2.get().getName());
+ assertEquals(1, multiplexedSessionsRemoved.size());
+ assertTrue(multiplexedSessionsRemoved.contains(session1.get().get()));
+
+ // Advance clock by 8 days
+ clock.currentTimeMillis.addAndGet(Duration.ofDays(8).toMillis());
+
+ // Run third maintenance loop. the second session would now be stale since it has now existed
+ // for more than 7 days
+ runMaintenanceLoop(clock, pool, 1);
+
+ SessionFutureWrapper session3 = pool.getMultiplexedSessionWithFallback();
+ assertNotEquals(session2.get().getName(), session3.get().getName());
+ assertEquals(2, multiplexedSessionsRemoved.size());
+ assertTrue(multiplexedSessionsRemoved.contains(session2.get().get()));
+ }
+
+ @Test
+ public void
+ testMaintainMultiplexedSession_whenMultiplexedSessionNotStale_assertThatSessionIsNotRemoved() {
+ doAnswer(
+ invocation -> {
+ MultiplexedSessionInitializationConsumer consumer =
+ invocation.getArgument(0, MultiplexedSessionInitializationConsumer.class);
+ ReadContext mockContext = mock(ReadContext.class);
+ Timestamp timestamp =
+ Timestamp.ofTimeSecondsAndNanos(
+ Instant.ofEpochMilli(clock.currentTimeMillis.get()).getEpochSecond(), 0);
+ consumer.onSessionReady(
+ setupMockSession(
+ buildMockMultiplexedSession(mockContext, timestamp.toProto()), mockContext));
+ return null;
+ })
+ .when(sessionClient)
+ .createMultiplexedSession(any(MultiplexedSessionInitializationConsumer.class));
+ SessionPool pool = createPool();
+
+ // Run one maintenance loop.
+ SessionFutureWrapper session1 = pool.getMultiplexedSessionWithFallback();
+ runMaintenanceLoop(clock, pool, 1);
+ assertTrue(multiplexedSessionsRemoved.isEmpty());
+
+ // Advance clock by 4 days
+ clock.currentTimeMillis.addAndGet(Duration.ofDays(4).toMillis());
+ // Run one maintenance loop. the first session would not be stale yet since it has now existed
+ // for less than 7 days.
+ runMaintenanceLoop(clock, pool, 1);
+ SessionFutureWrapper session2 = pool.getMultiplexedSessionWithFallback();
+ assertTrue(multiplexedSessionsRemoved.isEmpty());
+ assertEquals(session1.get().getName(), session2.get().getName());
+ }
+
+ @Test
+ public void
+ testMaintainMultiplexedSession_whenMultiplexedSessionCreationFailed_testRetryAfterDelay() {
+ doAnswer(
+ invocation -> {
+ MultiplexedSessionInitializationConsumer consumer =
+ invocation.getArgument(0, MultiplexedSessionInitializationConsumer.class);
+ ReadContext mockContext = mock(ReadContext.class);
+ Timestamp timestamp =
+ Timestamp.ofTimeSecondsAndNanos(
+ Instant.ofEpochMilli(clock.currentTimeMillis.get()).getEpochSecond(), 0);
+ consumer.onSessionReady(
+ setupMockSession(
+ buildMockMultiplexedSession(mockContext, timestamp.toProto()), mockContext));
+ return null;
+ })
+ .when(sessionClient)
+ .createMultiplexedSession(any(MultiplexedSessionInitializationConsumer.class));
+ doAnswer(
+ invocation -> {
+ MultiplexedSessionMaintainerConsumer consumer =
+ invocation.getArgument(0, MultiplexedSessionMaintainerConsumer.class);
+ consumer.onSessionCreateFailure(
+ SpannerExceptionFactory.newSpannerException(ErrorCode.DEADLINE_EXCEEDED, ""), 1);
+ return null;
+ })
+ .when(sessionClient)
+ .createMultiplexedSession(any(MultiplexedSessionMaintainerConsumer.class));
+ SessionPool pool = createPool();
+
+ // Advance clock by 8 days
+ clock.currentTimeMillis.addAndGet(Duration.ofDays(8).toMillis());
+
+ // Run one maintenance loop. Attempt replacing stale session should fail.
+ SessionFutureWrapper session1 = pool.getMultiplexedSessionWithFallback();
+ runMaintenanceLoop(clock, pool, 1);
+ assertTrue(multiplexedSessionsRemoved.isEmpty());
+ verify(sessionClient, times(1))
+ .createMultiplexedSession(any(MultiplexedSessionMaintainerConsumer.class));
+
+ // Advance clock by 10s and now mock session creation to be successful.
+ clock.currentTimeMillis.addAndGet(Duration.ofSeconds(10).toMillis());
+ doAnswer(
+ invocation -> {
+ MultiplexedSessionMaintainerConsumer consumer =
+ invocation.getArgument(0, MultiplexedSessionMaintainerConsumer.class);
+ ReadContext mockContext = mock(ReadContext.class);
+ Timestamp timestamp =
+ Timestamp.ofTimeSecondsAndNanos(
+ Instant.ofEpochMilli(clock.currentTimeMillis.get()).getEpochSecond(), 0);
+ consumer.onSessionReady(
+ setupMockSession(
+ buildMockMultiplexedSession(mockContext, timestamp.toProto()), mockContext));
+ return null;
+ })
+ .when(sessionClient)
+ .createMultiplexedSession(any(MultiplexedSessionMaintainerConsumer.class));
+ // Run one maintenance loop. Attempt should be ignored as it has not been 10 minutes since last
+ // attempt.
+ runMaintenanceLoop(clock, pool, 1);
+ SessionFutureWrapper session2 = pool.getMultiplexedSessionWithFallback();
+ assertTrue(multiplexedSessionsRemoved.isEmpty());
+ assertEquals(session1.get().getName(), session2.get().getName());
+ verify(sessionClient, times(1))
+ .createMultiplexedSession(any(MultiplexedSessionMaintainerConsumer.class));
+
+ // Advance clock by 15 minutes
+ clock.currentTimeMillis.addAndGet(Duration.ofMinutes(15).toMillis());
+ // Run one maintenance loop. Attempt should succeed since its already more than 10 minutes since
+ // the last attempt.
+ runMaintenanceLoop(clock, pool, 1);
+ SessionFutureWrapper session3 = pool.getMultiplexedSessionWithFallback();
+ assertTrue(multiplexedSessionsRemoved.contains(session1.get().get()));
+ assertNotEquals(session1.get().getName(), session3.get().getName());
+ verify(sessionClient, times(2))
+ .createMultiplexedSession(any(MultiplexedSessionMaintainerConsumer.class));
+ }
+
+ private SessionImpl setupMockSession(final SessionImpl session, final ReadContext mockContext) {
+ final ResultSet mockResult = mock(ResultSet.class);
+ when(mockContext.executeQuery(any(Statement.class))).thenAnswer(invocation -> mockResult);
+ when(mockResult.next()).thenReturn(true);
+ return session;
+ }
+
+ private SessionPool createPool() {
+ // Allow sessions to be added to the head of the pool in all cases in this test, as it is
+ // otherwise impossible to know which session exactly is getting pinged at what point in time.
+ SessionPool pool =
+ SessionPool.createPool(
+ options,
+ new TestExecutorFactory(),
+ client.getSessionClient(db),
+ clock,
+ Position.FIRST,
+ new TraceWrapper(Tracing.getTracer(), OpenTelemetry.noop().getTracer("")),
+ OpenTelemetry.noop());
+ pool.multiplexedSessionRemovedListener =
+ input -> {
+ multiplexedSessionsRemoved.add(input);
+ return null;
+ };
+ return pool;
+ }
+}
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionPoolTest.java
new file mode 100644
index 00000000000..d2aae568451
--- /dev/null
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionPoolTest.java
@@ -0,0 +1,173 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.spanner;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+import com.google.cloud.spanner.SessionPool.MultiplexedSessionFuture;
+import com.google.cloud.spanner.SessionPool.MultiplexedSessionInitializationConsumer;
+import com.google.cloud.spanner.SessionPool.SessionFutureWrapper;
+import com.google.cloud.spanner.SpannerImpl.ClosedException;
+import io.opencensus.trace.Tracing;
+import io.opentelemetry.api.OpenTelemetry;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.threeten.bp.Duration;
+
+/**
+ * Tests for {@link com.google.cloud.spanner.SessionPool.MultiplexedSession} component within the
+ * {@link SessionPool} class.
+ */
+public class MultiplexedSessionPoolTest extends BaseSessionPoolTest {
+
+ @Mock SpannerImpl client;
+ @Mock SessionClient sessionClient;
+ @Mock SpannerOptions spannerOptions;
+ private final DatabaseId db = DatabaseId.of("projects/p/instances/i/databases/unused");
+ private final TraceWrapper tracer =
+ new TraceWrapper(Tracing.getTracer(), OpenTelemetry.noop().getTracer(""));
+ SessionPoolOptions options;
+ SessionPool pool;
+
+ private SessionPool createPool() {
+ return SessionPool.createPool(
+ options,
+ new TestExecutorFactory(),
+ client.getSessionClient(db),
+ tracer,
+ OpenTelemetry.noop());
+ }
+
+ @Before
+ public void setUp() {
+ initMocks(this);
+ SpannerOptions.resetActiveTracingFramework();
+ SpannerOptions.enableOpenTelemetryTraces();
+ when(client.getOptions()).thenReturn(spannerOptions);
+ when(client.getSessionClient(db)).thenReturn(sessionClient);
+ when(sessionClient.getSpanner()).thenReturn(client);
+ when(spannerOptions.getNumChannels()).thenReturn(4);
+ when(spannerOptions.getDatabaseRole()).thenReturn("role");
+ options =
+ SessionPoolOptions.newBuilder()
+ .setMinSessions(2)
+ .setMaxSessions(2)
+ .setUseMultiplexedSession(true)
+ .build();
+ }
+
+ @Test
+ public void testGetMultiplexedSession_whenSessionInitializationSucceeded_assertSessionReturned() {
+ setupMockMultiplexedSessionCreation();
+
+ pool = createPool();
+ assertTrue(pool.isValid());
+
+ // create 5 requests which require a session
+ for (int i = 0; i < 5; i++) {
+ // checking out a multiplexed session
+ SessionFutureWrapper multiplexedSessionFuture = pool.getMultiplexedSessionWithFallback();
+ assertNotNull(multiplexedSessionFuture.get());
+ }
+ verify(sessionClient, times(1))
+ .createMultiplexedSession(any(MultiplexedSessionInitializationConsumer.class));
+ }
+
+ @Test
+ public void testGetMultiplexedSession_whenClosedPool_assertSessionReturned() {
+ setupMockMultiplexedSessionCreation();
+
+ pool = createPool();
+ assertTrue(pool.isValid());
+ closePoolWithStacktrace();
+
+ // checking out a multiplexed session does not throw error even if pool is closed
+ MultiplexedSessionFuture multiplexedSessionFuture =
+ (MultiplexedSessionFuture) pool.getMultiplexedSessionWithFallback().get();
+ assertNotNull(multiplexedSessionFuture);
+
+ // checking out a regular session throws error.
+ IllegalStateException e = assertThrows(IllegalStateException.class, () -> pool.getSession());
+ assertThat(e.getCause()).isInstanceOf(ClosedException.class);
+ StringWriter sw = new StringWriter();
+ e.getCause().printStackTrace(new PrintWriter(sw));
+ assertThat(sw.toString()).contains("closePoolWithStacktrace");
+ }
+
+ private void closePoolWithStacktrace() {
+ pool.closeAsync(new SpannerImpl.ClosedException());
+ }
+
+ @Test
+ public void testGetMultiplexedSession_whenSessionCreationFailed_assertErrorForWaiters() {
+ doAnswer(
+ invocation -> {
+ MultiplexedSessionInitializationConsumer consumer =
+ invocation.getArgument(0, MultiplexedSessionInitializationConsumer.class);
+ consumer.onSessionCreateFailure(
+ SpannerExceptionFactory.newSpannerException(ErrorCode.DEADLINE_EXCEEDED, ""), 1);
+ return null;
+ })
+ .when(sessionClient)
+ .createMultiplexedSession(any(MultiplexedSessionInitializationConsumer.class));
+ options =
+ options
+ .toBuilder()
+ .setMinSessions(2)
+ .setUseMultiplexedSession(true)
+ .setAcquireSessionTimeout(
+ Duration.ofMillis(50)) // block for a max of 50 ms for session to be available
+ .build();
+ pool = createPool();
+
+ // create 5 requests which require a session
+ for (int i = 0; i < 5; i++) {
+ SpannerException e =
+ assertThrows(
+ SpannerException.class, () -> pool.getMultiplexedSessionWithFallback().get());
+ assertEquals(ErrorCode.DEADLINE_EXCEEDED, e.getErrorCode());
+ }
+ // assert that all 5 requests failed with exception
+ assertEquals(0, pool.getNumWaiterTimeouts());
+ assertEquals(0, pool.getNumberOfSessionsInPool());
+ }
+
+ private void setupMockMultiplexedSessionCreation() {
+ doAnswer(
+ invocation -> {
+ MultiplexedSessionInitializationConsumer consumer =
+ invocation.getArgument(0, MultiplexedSessionInitializationConsumer.class);
+ consumer.onSessionReady(mockSession());
+ return null;
+ })
+ .when(sessionClient)
+ .createMultiplexedSession(any(MultiplexedSessionInitializationConsumer.class));
+ }
+}
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java
index ff50bf52e70..8236e509008 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java
@@ -375,20 +375,6 @@ public void singleUseContextClosesTransaction() {
assertThat(e.getMessage()).contains("invalidated");
}
- @Test
- public void prepareClosesOldSingleUseContext() {
- ReadContext ctx = session.singleUse(TimestampBound.strong());
-
- Mockito.when(rpc.beginTransaction(Mockito.any(), Mockito.eq(options), eq(false)))
- .thenReturn(Transaction.newBuilder().setId(ByteString.copyFromUtf8("t1")).build());
- session.prepareReadWriteTransaction();
- IllegalStateException e =
- assertThrows(
- IllegalStateException.class,
- () -> ctx.read("Dummy", KeySet.all(), Collections.singletonList("C")));
- assertThat(e.getMessage()).contains("invalidated");
- }
-
private static ResultSetMetadata newMetadata(Type type) {
return ResultSetMetadata.newBuilder().setRowType(type.toProto().getStructType()).build();
}
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolOptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolOptionsTest.java
index 9e029e931d6..1f4d591488b 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolOptionsTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolOptionsTest.java
@@ -284,24 +284,4 @@ public void testMultiplexedSessionMaintenanceDuration() {
.build()
.getMultiplexedSessionMaintenanceDuration());
}
-
- @Test
- public void testWaitForMultiplexedSession() {
- assertEquals(
- Duration.ofSeconds(10),
- SessionPoolOptions.newBuilder().build().getWaitForMultiplexedSession());
- assertEquals(
- Duration.ofSeconds(20),
- SessionPoolOptions.newBuilder()
- .setWaitForMultiplexedSession(Duration.ofSeconds(20))
- .build()
- .getWaitForMultiplexedSession());
- assertEquals(
- Duration.ofSeconds(10),
- SessionPoolOptions.newBuilder()
- .setWaitForMultiplexedSession(Duration.ofSeconds(2))
- .setWaitForMultiplexedSession(Duration.ofSeconds(10))
- .build()
- .getWaitForMultiplexedSession());
- }
}
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java
index bb263f04e5f..02502399e10 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolStressTest.java
@@ -32,7 +32,6 @@
import com.google.cloud.spanner.SessionPoolOptions.InactiveTransactionRemovalOptions;
import com.google.cloud.spanner.spi.v1.SpannerRpc.Option;
import com.google.common.util.concurrent.Uninterruptibles;
-import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import io.opencensus.trace.Tracing;
import io.opentelemetry.api.OpenTelemetry;
@@ -161,21 +160,6 @@ public ApiFuture asyncClose() {
}
return ApiFutures.immediateFuture(Empty.getDefaultInstance());
}
-
- @Override
- public void prepareReadWriteTransaction() {
- if (random.nextInt(100) < 10) {
- expireSession(this);
- throw SpannerExceptionFactoryTest.newSessionNotFoundException(this.getName());
- }
- String name = this.getName();
- synchronized (lock) {
- if (sessions.put(name, true)) {
- setFailed();
- }
- this.readyTransactionId = ByteString.copyFromUtf8("foo");
- }
- }
};
sessionIndex++;
return session;
@@ -192,18 +176,9 @@ private void setupSession(final SessionImpl session, final ReadContext mockConte
when(mockResult.next()).thenReturn(true);
}
- private void expireSession(Session session) {
- String name = session.getName();
- synchronized (lock) {
- sessions.remove(name);
- expiredSessions.add(name);
- }
- }
-
private void resetTransaction(SessionImpl session) {
String name = session.getName();
synchronized (lock) {
- session.readyTransactionId = null;
sessions.put(name, false);
}
}
diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java
index 548852f0f79..a391a66ec83 100644
--- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java
+++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java
@@ -139,6 +139,9 @@ public class SessionPoolTest extends BaseSessionPoolTest {
private final ExecutorService executor = Executors.newSingleThreadExecutor();
@Parameter public int minSessions;
+ @Parameter(1)
+ public boolean useMultiplexed;
+
@Mock SpannerImpl client;
@Mock SessionClient sessionClient;
@Mock SpannerOptions spannerOptions;
@@ -151,9 +154,14 @@ public class SessionPoolTest extends BaseSessionPoolTest {
private final TraceWrapper tracer =
new TraceWrapper(Tracing.getTracer(), OpenTelemetry.noop().getTracer(""));
- @Parameters(name = "min sessions = {0}")
+ @Parameters(name = "min sessions = {0}, use multiplexed = {1}")
public static Collection