From 06a57736afa153069df07671d2b7d3d2d89ebddf Mon Sep 17 00:00:00 2001 From: Arpan Mishra Date: Mon, 25 Mar 2024 23:46:33 +0530 Subject: [PATCH 1/6] chore: add multiplexed session implementations for CachedSession/SessionFuture interfaces. --- .../com/google/cloud/spanner/SessionPool.java | 414 +++++++++++++++++- 1 file changed, 413 insertions(+), 1 deletion(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 92ecbc3d55..28cd503a2e 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -574,6 +574,16 @@ public PooledSessionFuture replaceSession( } } + class MultiplexedSessionReplacementHandler + implements SessionReplacementHandler { + @Override + public MultiplexedSessionFuture replaceSession( + SessionNotFoundException e, MultiplexedSessionFuture session) { + // TODO arpanmishra add this handling + return null; + } + } + interface SessionNotFoundHandler { /** * Handles the given {@link SessionNotFoundException} by possibly converting it to a different @@ -1420,6 +1430,246 @@ PooledSession get(final boolean eligibleForLongRunning) { } } + class MultiplexedSessionFuture extends SimpleForwardingListenableFuture + implements SessionFuture { + + private volatile CountDownLatch initialized = new CountDownLatch(1); + private final ISpan span; + + @VisibleForTesting + MultiplexedSessionFuture(ListenableFuture delegate, ISpan span) { + super(delegate); + this.span = span; + } + + @Override + public Timestamp write(Iterable mutations) throws SpannerException { + return writeWithOptions(mutations).getCommitTimestamp(); + } + + @Override + public CommitResponse writeWithOptions( + Iterable mutations, TransactionOption... options) throws SpannerException { + try { + return get().writeWithOptions(mutations, options); + } finally { + close(); + } + } + + @Override + public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerException { + return writeAtLeastOnceWithOptions(mutations).getCommitTimestamp(); + } + + @Override + public CommitResponse writeAtLeastOnceWithOptions( + Iterable mutations, TransactionOption... options) throws SpannerException { + try { + return get().writeAtLeastOnceWithOptions(mutations, options); + } finally { + close(); + } + } + + @Override + public ServerStream batchWriteAtLeastOnce( + Iterable mutationGroups, TransactionOption... options) + throws SpannerException { + try { + return get().batchWriteAtLeastOnce(mutationGroups, options); + } finally { + close(); + } + } + + @Override + public ReadContext singleUse() { + try { + return new AutoClosingReadContext<>( + session -> { + MultiplexedSession ms = session.get(); + return ms.getDelegate().singleUse(); + }, + SessionPool.this, + multiplexedSessionReplacementHandler, + this, + true); + } catch (Exception e) { + close(); + throw e; + } + } + + @Override + public ReadContext singleUse(final TimestampBound bound) { + try { + return new AutoClosingReadContext<>( + session -> { + CachedSession ps = session.get(); + return ps.getDelegate().singleUse(bound); + }, + SessionPool.this, + multiplexedSessionReplacementHandler, + this, + true); + } catch (Exception e) { + close(); + throw e; + } + } + + @Override + public ReadOnlyTransaction singleUseReadOnlyTransaction() { + return internalReadOnlyTransaction( + session -> { + MultiplexedSession ms = session.get(); + return ms.getDelegate().singleUseReadOnlyTransaction(); + }, + true); + } + + @Override + public ReadOnlyTransaction singleUseReadOnlyTransaction(final TimestampBound bound) { + return internalReadOnlyTransaction( + session -> { + MultiplexedSession ms = session.get(); + return ms.getDelegate().singleUseReadOnlyTransaction(bound); + }, + true); + } + + @Override + public ReadOnlyTransaction readOnlyTransaction() { + return internalReadOnlyTransaction( + session -> { + MultiplexedSession ms = session.get(); + return ms.getDelegate().readOnlyTransaction(); + }, + false); + } + + @Override + public ReadOnlyTransaction readOnlyTransaction(final TimestampBound bound) { + return internalReadOnlyTransaction( + session -> { + MultiplexedSession ms = session.get(); + return ms.getDelegate().readOnlyTransaction(bound); + }, + false); + } + + private ReadOnlyTransaction internalReadOnlyTransaction( + Function transactionSupplier, + boolean isSingleUse) { + try { + return new AutoClosingReadTransaction<>( + transactionSupplier, + SessionPool.this, + multiplexedSessionReplacementHandler, + this, + isSingleUse); + } catch (Exception e) { + close(); + throw e; + } + } + + @Override + public TransactionRunner readWriteTransaction(TransactionOption... options) { + return new SessionPoolTransactionRunner<>( + this, multiplexedSessionReplacementHandler, options); + } + + @Override + public TransactionManager transactionManager(TransactionOption... options) { + return new AutoClosingTransactionManager<>( + this, multiplexedSessionReplacementHandler, options); + } + + @Override + public AsyncRunner runAsync(TransactionOption... options) { + return new SessionPoolAsyncRunner(this, multiplexedSessionReplacementHandler, options); + } + + @Override + public AsyncTransactionManager transactionManagerAsync(TransactionOption... options) { + return new SessionPoolAsyncTransactionManager<>( + multiplexedSessionReplacementHandler, this, options); + } + + @Override + public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { + try { + return get().executePartitionedUpdate(stmt, options); + } finally { + close(); + } + } + + @Override + public String getName() { + return get().getName(); + } + + @Override + public void prepareReadWriteTransaction() { + get().prepareReadWriteTransaction(); + } + + @Override + public void close() { + try { + asyncClose().get(); + } catch (InterruptedException e) { + throw SpannerExceptionFactory.propagateInterrupt(e); + } catch (ExecutionException e) { + throw SpannerExceptionFactory.asSpannerException(e.getCause()); + } + } + + @Override + public ApiFuture asyncClose() { + MultiplexedSession delegate = getOrNull(); + if (delegate != null) { + return delegate.asyncClose(); + } + return ApiFutures.immediateFuture(Empty.getDefaultInstance()); + } + + private MultiplexedSession getOrNull() { + try { + return get(); + } catch (Throwable t) { + return null; + } + } + + @Override + public MultiplexedSession get() { + MultiplexedSession res = null; + try { + res = super.get(); + } catch (Throwable e) { + // ignore the exception as it will be handled by the call to super.get() below. + } + if (res != null) { + res.markBusy(span); + } + initialized.countDown(); + + try { + // TODO arpanmishra@ do we really need this latch? + initialized.await(); + return super.get(); + } catch (ExecutionException e) { + throw SpannerExceptionFactory.newSpannerException(e.getCause()); + } catch (InterruptedException e) { + throw SpannerExceptionFactory.propagateInterrupt(e); + } + } + } + interface CachedSession extends Session { SessionImpl getDelegate(); @@ -1730,6 +1980,161 @@ public TransactionManager transactionManager(TransactionOption... options) { } } + class MultiplexedSession implements CachedSession { + SessionImpl delegate; + + MultiplexedSession(SessionImpl session) { + this.delegate = session; + } + + @Override + public boolean isAllowReplacing() { + return false; + } + + @Override + public void setAllowReplacing(boolean allowReplacing) {} + + @Override + public void markBusy(ISpan span) { + this.delegate.setCurrentSpan(span); + } + + @Override + public void markUsed() {} + + @Override + public SpannerException setLastException(SpannerException exception) { + return exception; + } + + @Override + public SessionImpl getDelegate() { + return delegate; + } + + @Override + public Timestamp write(Iterable mutations) throws SpannerException { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); + } + + @Override + public CommitResponse writeWithOptions( + Iterable mutations, TransactionOption... options) throws SpannerException { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); + } + + @Override + public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerException { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); + } + + @Override + public CommitResponse writeAtLeastOnceWithOptions( + Iterable mutations, TransactionOption... options) throws SpannerException { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); + } + + @Override + public ServerStream batchWriteAtLeastOnce( + Iterable mutationGroups, TransactionOption... options) + throws SpannerException { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); + } + + @Override + public ReadContext singleUse() { + return delegate.singleUse(); + } + + @Override + public ReadContext singleUse(TimestampBound bound) { + return delegate.singleUse(bound); + } + + @Override + public ReadOnlyTransaction singleUseReadOnlyTransaction() { + return delegate.singleUseReadOnlyTransaction(); + } + + @Override + public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) { + return delegate.singleUseReadOnlyTransaction(bound); + } + + @Override + public ReadOnlyTransaction readOnlyTransaction() { + return delegate.readOnlyTransaction(); + } + + @Override + public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) { + return delegate.readOnlyTransaction(bound); + } + + @Override + public TransactionRunner readWriteTransaction(TransactionOption... options) { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); + } + + @Override + public TransactionManager transactionManager(TransactionOption... options) { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); + } + + @Override + public AsyncRunner runAsync(TransactionOption... options) { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); + } + + @Override + public AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption... options) { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); + } + + @Override + public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); + } + + @Override + public String getName() { + return delegate.getName(); + } + + @Override + public void prepareReadWriteTransaction() { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.UNIMPLEMENTED, "Unimplemented with Multiplexed Session"); + } + + @Override + public void close() { + synchronized (lock) { + numMultiplexedSessionsReleased++; + } + } + + private final ApiFuture CLOSE_RESULT = + ApiFutures.immediateFuture(Empty.getDefaultInstance()); + + @Override + public ApiFuture asyncClose() { + close(); + return ApiFutures.immediateFuture(Empty.getDefaultInstance()); + } + } + private final class WaiterFuture extends ForwardingListenableFuture { private static final long MAX_SESSION_WAIT_TIMEOUT = 240_000L; private final SettableFuture waiter = SettableFuture.create(); @@ -2162,9 +2567,15 @@ enum Position { @GuardedBy("lock") private long numSessionsAcquired = 0; + @GuardedBy("lock") + private long numMultiplexedSessionsAcquired = 0; + @GuardedBy("lock") private long numSessionsReleased = 0; + @GuardedBy("lock") + private long numMultiplexedSessionsReleased = 0; + @GuardedBy("lock") private long numIdleSessionsRemoved = 0; @@ -2192,7 +2603,8 @@ enum Position { private final CountDownLatch waitOnMinSessionsLatch; private final SessionReplacementHandler pooledSessionReplacementHandler = new PooledSessionReplacementHandler(); - + private final SessionReplacementHandler multiplexedSessionReplacementHandler = + new MultiplexedSessionReplacementHandler(); /** * Create a session pool with the given options and for the given database. It will also start * eagerly creating sessions if {@link SessionPoolOptions#getMinSessions()} is greater than 0. From c6a3ba35e8bacdcc05ff8b83de25a29471882901 Mon Sep 17 00:00:00 2001 From: Arpan Mishra Date: Tue, 26 Mar 2024 09:28:34 +0530 Subject: [PATCH 2/6] chore: add comments. --- .../com/google/cloud/spanner/SessionPool.java | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 28cd503a2e..565c987547 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -1659,8 +1659,9 @@ public MultiplexedSession get() { initialized.countDown(); try { - // TODO arpanmishra@ do we really need this latch? + span.addAnnotation("Awaiting multiplexed session to be initialized"); initialized.await(); + span.addAnnotation("Multiplexed session initialized"); return super.get(); } catch (ExecutionException e) { throw SpannerExceptionFactory.newSpannerException(e.getCause()); @@ -1989,11 +1990,16 @@ class MultiplexedSession implements CachedSession { @Override public boolean isAllowReplacing() { + // for multiplexed session there is only 1 session, hence there is nothing that we + // can replace. return false; } @Override - public void setAllowReplacing(boolean allowReplacing) {} + public void setAllowReplacing(boolean allowReplacing) { + // for multiplexed session there is only 1 session, there is nothing that can be replaced. + // hence this is no-op. + } @Override public void markBusy(ISpan span) { @@ -2001,10 +2007,15 @@ public void markBusy(ISpan span) { } @Override - public void markUsed() {} + public void markUsed() { + // no-op for a multiplexed session since we don't track the last-used time + // in case of multiplexed session + } @Override public SpannerException setLastException(SpannerException exception) { + // multiplexed sessions run more than one transaction concurrently. we cannot store the + // exception state as that is not applicable to all transactions running on the session. return exception; } @@ -2125,9 +2136,6 @@ public void close() { } } - private final ApiFuture CLOSE_RESULT = - ApiFutures.immediateFuture(Empty.getDefaultInstance()); - @Override public ApiFuture asyncClose() { close(); From 3166fb976ca6b58e59d841dbe19274b28dd1ac88 Mon Sep 17 00:00:00 2001 From: Arpan Mishra Date: Tue, 26 Mar 2024 09:45:44 +0530 Subject: [PATCH 3/6] chore: add session replacement handler for multiplexed session. --- .../java/com/google/cloud/spanner/SessionPool.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 565c987547..3c3dfbc101 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -579,8 +579,15 @@ class MultiplexedSessionReplacementHandler @Override public MultiplexedSessionFuture replaceSession( SessionNotFoundException e, MultiplexedSessionFuture session) { - // TODO arpanmishra add this handling - return null; + /** + * For multiplexed sessions, we would never obtain a {@link SessionNotFoundException}. Hence, + * this method will ideally never be invoked. + */ + logger.log( + Level.WARNING, + String.format( + "Replace session invoked for multiplexed " + "session => %s", session.getName())); + return session; } } From c38107f2a9182ce68f13fe2476deb8745238150d Mon Sep 17 00:00:00 2001 From: Arpan Mishra Date: Tue, 26 Mar 2024 17:21:16 +0530 Subject: [PATCH 4/6] chore: address comments. --- .../com/google/cloud/spanner/SessionPool.java | 41 ++++++++----------- 1 file changed, 18 insertions(+), 23 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 3c3dfbc101..1e0a3a1119 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -574,7 +574,7 @@ public PooledSessionFuture replaceSession( } } - class MultiplexedSessionReplacementHandler + static class MultiplexedSessionReplacementHandler implements SessionReplacementHandler { @Override public MultiplexedSessionFuture replaceSession( @@ -587,7 +587,7 @@ public MultiplexedSessionFuture replaceSession( Level.WARNING, String.format( "Replace session invoked for multiplexed " + "session => %s", session.getName())); - return session; + throw e; } } @@ -1439,8 +1439,6 @@ PooledSession get(final boolean eligibleForLongRunning) { class MultiplexedSessionFuture extends SimpleForwardingListenableFuture implements SessionFuture { - - private volatile CountDownLatch initialized = new CountDownLatch(1); private final ISpan span; @VisibleForTesting @@ -1495,8 +1493,8 @@ public ReadContext singleUse() { try { return new AutoClosingReadContext<>( session -> { - MultiplexedSession ms = session.get(); - return ms.getDelegate().singleUse(); + MultiplexedSession multiplexedSession = session.get(); + return multiplexedSession.getDelegate().singleUse(); }, SessionPool.this, multiplexedSessionReplacementHandler, @@ -1513,8 +1511,8 @@ public ReadContext singleUse(final TimestampBound bound) { try { return new AutoClosingReadContext<>( session -> { - CachedSession ps = session.get(); - return ps.getDelegate().singleUse(bound); + MultiplexedSession multiplexedSession = session.get(); + return multiplexedSession.getDelegate().singleUse(bound); }, SessionPool.this, multiplexedSessionReplacementHandler, @@ -1530,8 +1528,8 @@ public ReadContext singleUse(final TimestampBound bound) { public ReadOnlyTransaction singleUseReadOnlyTransaction() { return internalReadOnlyTransaction( session -> { - MultiplexedSession ms = session.get(); - return ms.getDelegate().singleUseReadOnlyTransaction(); + MultiplexedSession multiplexedSession = session.get(); + return multiplexedSession.getDelegate().singleUseReadOnlyTransaction(); }, true); } @@ -1540,8 +1538,8 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction() { public ReadOnlyTransaction singleUseReadOnlyTransaction(final TimestampBound bound) { return internalReadOnlyTransaction( session -> { - MultiplexedSession ms = session.get(); - return ms.getDelegate().singleUseReadOnlyTransaction(bound); + MultiplexedSession multiplexedSession = session.get(); + return multiplexedSession.getDelegate().singleUseReadOnlyTransaction(bound); }, true); } @@ -1550,8 +1548,8 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction(final TimestampBound bou public ReadOnlyTransaction readOnlyTransaction() { return internalReadOnlyTransaction( session -> { - MultiplexedSession ms = session.get(); - return ms.getDelegate().readOnlyTransaction(); + MultiplexedSession multiplexedSession = session.get(); + return multiplexedSession.getDelegate().readOnlyTransaction(); }, false); } @@ -1560,8 +1558,8 @@ public ReadOnlyTransaction readOnlyTransaction() { public ReadOnlyTransaction readOnlyTransaction(final TimestampBound bound) { return internalReadOnlyTransaction( session -> { - MultiplexedSession ms = session.get(); - return ms.getDelegate().readOnlyTransaction(bound); + MultiplexedSession multiplexedSession = session.get(); + return multiplexedSession.getDelegate().readOnlyTransaction(bound); }, false); } @@ -1648,6 +1646,8 @@ private MultiplexedSession getOrNull() { try { return get(); } catch (Throwable t) { + // this exception will never be thrown for a multiplexed session since the Future + // object is already initialised. return null; } } @@ -1663,12 +1663,7 @@ public MultiplexedSession get() { if (res != null) { res.markBusy(span); } - initialized.countDown(); - try { - span.addAnnotation("Awaiting multiplexed session to be initialized"); - initialized.await(); - span.addAnnotation("Multiplexed session initialized"); return super.get(); } catch (ExecutionException e) { throw SpannerExceptionFactory.newSpannerException(e.getCause()); @@ -1989,7 +1984,7 @@ public TransactionManager transactionManager(TransactionOption... options) { } class MultiplexedSession implements CachedSession { - SessionImpl delegate; + final SessionImpl delegate; MultiplexedSession(SessionImpl session) { this.delegate = session; @@ -2618,7 +2613,7 @@ enum Position { private final CountDownLatch waitOnMinSessionsLatch; private final SessionReplacementHandler pooledSessionReplacementHandler = new PooledSessionReplacementHandler(); - private final SessionReplacementHandler multiplexedSessionReplacementHandler = + private static final SessionReplacementHandler multiplexedSessionReplacementHandler = new MultiplexedSessionReplacementHandler(); /** * Create a session pool with the given options and for the given database. It will also start From c3d8f3e8a97188c2957196695c51642711bed719 Mon Sep 17 00:00:00 2001 From: Arpan Mishra Date: Wed, 27 Mar 2024 08:50:34 +0530 Subject: [PATCH 5/6] chore: fix comments. --- .../com/google/cloud/spanner/SessionPool.java | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 1e0a3a1119..3bfc075982 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -45,6 +45,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; +import com.google.api.core.ObsoleteApi; import com.google.api.core.SettableApiFuture; import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.rpc.ServerStream; @@ -586,7 +587,7 @@ public MultiplexedSessionFuture replaceSession( logger.log( Level.WARNING, String.format( - "Replace session invoked for multiplexed " + "session => %s", session.getName())); + "Replace session invoked for multiplexed session => %s", session.getName())); throw e; } } @@ -1645,7 +1646,7 @@ public ApiFuture asyncClose() { private MultiplexedSession getOrNull() { try { return get(); - } catch (Throwable t) { + } catch (Throwable ignore) { // this exception will never be thrown for a multiplexed session since the Future // object is already initialised. return null; @@ -1677,12 +1678,14 @@ interface CachedSession extends Session { SessionImpl getDelegate(); + @ObsoleteApi("This method can be removed once we fully migrate to multiplexed sessions.") void markBusy(ISpan span); void markUsed(); SpannerException setLastException(SpannerException exception); + @ObsoleteApi("This method can be removed once we fully migrate to multiplexed sessions.") boolean isAllowReplacing(); AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption... options); @@ -1985,6 +1988,7 @@ public TransactionManager transactionManager(TransactionOption... options) { class MultiplexedSession implements CachedSession { final SessionImpl delegate; + private volatile SpannerException lastException; MultiplexedSession(SessionImpl session) { this.delegate = session; @@ -2005,7 +2009,8 @@ public void setAllowReplacing(boolean allowReplacing) { @Override public void markBusy(ISpan span) { - this.delegate.setCurrentSpan(span); + // no-op for a multiplexed session since a new span is already created and set in context + // for every handler invocation. } @Override @@ -2016,8 +2021,7 @@ public void markUsed() { @Override public SpannerException setLastException(SpannerException exception) { - // multiplexed sessions run more than one transaction concurrently. we cannot store the - // exception state as that is not applicable to all transactions running on the session. + this.lastException = exception; return exception; } @@ -2135,6 +2139,16 @@ public void prepareReadWriteTransaction() { public void close() { synchronized (lock) { numMultiplexedSessionsReleased++; + if (lastException != null && isDatabaseOrInstanceNotFound(lastException)) { + // Mark this session pool as no longer valid and then release the session into the pool as + // there is nothing we can do with it anyways. + synchronized (lock) { + SessionPool.this.resourceNotFoundException = + MoreObjects.firstNonNull( + SessionPool.this.resourceNotFoundException, + (ResourceNotFoundException) lastException); + } + } } } From 37ff613c21b13b21fe992a1794e22de64baa6940 Mon Sep 17 00:00:00 2001 From: Arpan Mishra Date: Thu, 28 Mar 2024 13:58:59 +0530 Subject: [PATCH 6/6] chore: fix comments. --- .../com/google/cloud/spanner/SessionPool.java | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 3bfc075982..a5eee9d0db 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -45,7 +45,6 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; -import com.google.api.core.ObsoleteApi; import com.google.api.core.SettableApiFuture; import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.rpc.ServerStream; @@ -1678,14 +1677,14 @@ interface CachedSession extends Session { SessionImpl getDelegate(); - @ObsoleteApi("This method can be removed once we fully migrate to multiplexed sessions.") + // TODO This method can be removed once we fully migrate to multiplexed sessions. void markBusy(ISpan span); void markUsed(); SpannerException setLastException(SpannerException exception); - @ObsoleteApi("This method can be removed once we fully migrate to multiplexed sessions.") + // TODO This method can be removed once we fully migrate to multiplexed sessions. boolean isAllowReplacing(); AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption... options); @@ -2140,14 +2139,10 @@ public void close() { synchronized (lock) { numMultiplexedSessionsReleased++; if (lastException != null && isDatabaseOrInstanceNotFound(lastException)) { - // Mark this session pool as no longer valid and then release the session into the pool as - // there is nothing we can do with it anyways. - synchronized (lock) { - SessionPool.this.resourceNotFoundException = - MoreObjects.firstNonNull( - SessionPool.this.resourceNotFoundException, - (ResourceNotFoundException) lastException); - } + SessionPool.this.resourceNotFoundException = + MoreObjects.firstNonNull( + SessionPool.this.resourceNotFoundException, + (ResourceNotFoundException) lastException); } } }