diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerTransaction.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerTransaction.cs index dc2c529bb2aa..b467d4cb42d5 100644 --- a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerTransaction.cs +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerTransaction.cs @@ -287,7 +287,7 @@ Task ISpannerTransaction.ExecuteMutationsAsync( _mutations.AddRange(mutations); } taskCompletionSource.SetResult(mutations.Count); - return taskCompletionSource.Task; + return Task.FromResult(taskCompletionSource.Task.Result); }, "SpannerTransaction.ExecuteMutations", SpannerConnection.Logger); } @@ -362,8 +362,7 @@ Task> ISpannerTransaction.ExecuteBatchDmlAsync(ExecuteBatchDml request.Seqno = Interlocked.Increment(ref _lastDmlSequenceNumber); return ExecuteHelper.WithErrorTranslationAndProfiling(async () => { - var callSettings = SpannerConnection.CreateCallSettings(settings => settings.ExecuteBatchDmlSettings, timeoutSeconds, cancellationToken); - ExecuteBatchDmlResponse response = await _session.ExecuteBatchDmlAsync(request, callSettings).ConfigureAwait(false); + var response = await _session.ExecuteWithTransactionSelector(AsyncWork, request.Transaction).ConfigureAwait(false); IEnumerable result = response.ResultSets.Select(rs => rs.Stats.RowCountExact); // Work around an issue with the emulator, which can return an ExecuteBatchDmlResponse without populating a status. // TODO: Remove this when the emulator has been fixed, although it does no harm if it stays longer than strictly necessary. @@ -381,6 +380,13 @@ Task> ISpannerTransaction.ExecuteBatchDmlAsync(ExecuteBatchDml throw new SpannerBatchNonQueryException(response.Status, result); } }, "SpannerTransaction.ExecuteBatchDml", SpannerConnection.Logger); + + async Task AsyncWork(TransactionSelector transaction) + { + request.Transaction = transaction; + var callSettings = SpannerConnection.CreateCallSettings(settings => settings.ExecuteBatchDmlSettings, timeoutSeconds, cancellationToken); + return await _session.ExecuteBatchDmlAsync(request, callSettings).ConfigureAwait(false); + } } /// @@ -511,18 +517,18 @@ private void CheckCompatibleMode(TransactionMode mode) switch (mode) { case TransactionMode.ReadOnly: - { - GaxPreconditions.CheckState( - Mode == TransactionMode.ReadOnly || Mode == TransactionMode.ReadWrite, - "You can only execute reads on a ReadWrite or ReadOnly Transaction!"); - } + { + GaxPreconditions.CheckState( + Mode == TransactionMode.ReadOnly || Mode == TransactionMode.ReadWrite, + "You can only execute reads on a ReadWrite or ReadOnly Transaction!"); + } break; case TransactionMode.ReadWrite: - { - GaxPreconditions.CheckState( - Mode == TransactionMode.ReadWrite, - "You can only execute read/write commands on a ReadWrite Transaction!"); - } + { + GaxPreconditions.CheckState( + Mode == TransactionMode.ReadWrite, + "You can only execute read/write commands on a ReadWrite Transaction!"); + } break; default: throw new ArgumentOutOfRangeException(nameof(mode), mode, null); diff --git a/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/PooledSession.cs b/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/PooledSession.cs index 71f823aa295f..904bef95dc24 100644 --- a/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/PooledSession.cs +++ b/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/PooledSession.cs @@ -18,7 +18,7 @@ using Google.Protobuf; using Google.Protobuf.WellKnownTypes; using System; -using System.Runtime.CompilerServices; +using System.Linq; using System.Threading; using System.Threading.Tasks; using static Google.Cloud.Spanner.V1.TransactionOptions; @@ -42,7 +42,7 @@ namespace Google.Cloud.Spanner.V1 /// public sealed class PooledSession : IDisposable { - private readonly Session _session; + internal Session Session { get; } /// /// The name of the session. This is never null. @@ -52,23 +52,22 @@ public sealed class PooledSession : IDisposable /// /// The ID of the transaction. May be null. /// - public ByteString TransactionId { get; } + public ByteString TransactionId { get; set; } /// - /// The mode of the transaction. (Always None iff TransactionId is null.) + /// The options of the transaction. /// - internal ModeOneofCase TransactionMode { get; } + public TransactionOptions TransactionOptions { get; set; } /// - /// The read timestamp of the transaction. (Always null if - /// ReturnReadTimestamp = false or if TransactionMode != ReadOnly.) + /// The read timestamp of the transaction. (Always null if ReturnReadTimestamp = false) /// public Timestamp ReadTimestamp { get; } /// /// Indicates whether the server has told us that the session has expired. /// - internal bool ServerExpired => _session.Expired; + internal bool ServerExpired => Session.Expired; /// /// The time (in ticks since 0001-01-01T00:00:00Z) at which to refresh this session. @@ -84,6 +83,11 @@ public sealed class PooledSession : IDisposable /// private readonly DateTime _evictionTime; + /// + /// Create a single instance of the semaphore with a value of 1. It means only one thread will be allowed access at a time. + /// + private static readonly SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1, 1); + // Just for convenience... private SpannerClient Client => _pool.Client; @@ -91,20 +95,16 @@ public sealed class PooledSession : IDisposable private int _disposed; private int _committedOrRolledBack; - private PooledSession(SessionPool.ISessionPool pool, SessionName sessionName, ByteString transactionId, ModeOneofCase transactionMode, Timestamp readTimestamp, DateTime evictionTime, long refreshTicks) + private PooledSession(SessionPool.ISessionPool pool, SessionName sessionName, Timestamp readTimestamp, DateTime evictionTime, long refreshTicks) { - GaxPreconditions.CheckArgument( - (transactionId == null) == (transactionMode == ModeOneofCase.None), - nameof(transactionMode), - "Transaction mode and ID don't match."); _pool = pool; SessionName = GaxPreconditions.CheckNotNull(sessionName, nameof(sessionName)); - TransactionId = transactionId; - TransactionMode = transactionMode; ReadTimestamp = readTimestamp; - _session = new Session { SessionName = SessionName }; + Session = new Session { SessionName = SessionName }; _evictionTime = evictionTime; _refreshTicks = refreshTicks; + TransactionOptions = null; + TransactionId = null; } /// @@ -116,7 +116,7 @@ internal static PooledSession FromSessionName(SessionPool.ISessionPool pool, Ses var now = pool.Clock.GetCurrentDateTimeUtc(); var refreshDelay = options.SessionRefreshJitter.GetDelay(options.IdleSessionRefreshDelay); var evictionDelay = options.SessionEvictionJitter.GetDelay(options.PoolEvictionDelay); - return new PooledSession(pool, sessionName, transactionId: null, ModeOneofCase.None, readTimestamp: null, now + evictionDelay, now.Ticks + refreshDelay.Ticks); + return new PooledSession(pool, sessionName, readTimestamp: null, now + evictionDelay, now.Ticks + refreshDelay.Ticks); } /// @@ -126,16 +126,28 @@ internal static PooledSession FromSessionName(SessionPool.ISessionPool pool, Ses private PooledSession AfterReset() { MarkAsDisposed(); - return new PooledSession(_pool, SessionName, null, ModeOneofCase.None, null, _evictionTime, RefreshTicks); + return new PooledSession(_pool, SessionName, null, _evictionTime, RefreshTicks) + { + TransactionOptions = null, + }; } - internal PooledSession WithTransaction(ByteString transactionId, ModeOneofCase transactionMode, Timestamp readTimestamp = null) + /// + /// TODO: Add summary. + /// + /// The transaction id to which session needs to be associated with. + /// The read time stamp. + /// TODO: Add summary. + internal PooledSession WithTransaction(ByteString transactionId, Timestamp readTimestamp = null) { MarkAsDisposed(); - return new PooledSession(_pool, SessionName, transactionId, transactionMode, readTimestamp, _evictionTime, _refreshTicks); + return new PooledSession(_pool, SessionName, readTimestamp, _evictionTime, _refreshTicks) + { + TransactionId = transactionId, + }; } - /// + /// //TODO: Update Comment /// Always returns a new instance of . The new instance can: /// 1. represent the same session as this one, but will have a fresh transaction of the /// same type as this did. @@ -153,13 +165,13 @@ internal PooledSession WithTransaction(ByteString transactionId, ModeOneofCase t /// are retried with the same session, because after each abort the sessions' lock priority increments. /// /// A new instance of . - /// If this + /// If transaction mode of this /// is . public Task WithFreshTransactionOrNewAsync(TransactionOptions transactionOptions, CancellationToken cancellationToken) { CheckNotDisposed(); GaxPreconditions.CheckNotNull(transactionOptions, nameof(transactionOptions)); - GaxPreconditions.CheckArgument(transactionOptions.ModeCase == TransactionMode, nameof(transactionOptions), $"{nameof(TransactionOptions)} should be of the same type as this session's {nameof(TransactionMode)} which is {TransactionMode}"); + GaxPreconditions.CheckArgument(transactionOptions.ModeCase == TransactionOptions.ModeCase, nameof(transactionOptions), $"{nameof(TransactionOptions)} should be of the same type as this session's {nameof(TransactionOptions)} which is {TransactionOptions.ModeCase}"); // Calling AfterReset() will mark this instance as disposed. // The pool will take care of releasing back to the pool if needed. @@ -222,7 +234,7 @@ public void ReleaseToPool(bool forceDelete) { // A read/write transaction that hasn't been committed or rolled back might have taken out a database lock: roll it // back as part of releasing the session. (We don't block on the rollback happening though.) - ByteString transactionToRollback = TransactionMode == ModeOneofCase.ReadWrite && !IsCommittedOrRolledBack() ? TransactionId : null; + ByteString transactionToRollback = TransactionOptions?.ModeCase == ModeOneofCase.ReadWrite && !IsCommittedOrRolledBack() ? TransactionId : null; _pool.Release(AfterReset(), transactionToRollback, forceDelete || ServerExpired || ShouldBeEvicted); } else @@ -242,7 +254,7 @@ public void ReleaseToPool(bool forceDelete) /// Note that we don't attempt to rollback a transaction that is being detached, or attempt to delete the session, /// under the assumption that it will be reused across processes. /// If there's a process capable of knowing when all other processes are done using the session, then that process could call - /// (or an overload) to create an instance + /// (or an overload) to create an instance /// of representing the shared transaction and then call passing true /// to force session deletion and clean up resources. /// Else, the application can rely on Spaner service garbage collection to clean up this session once it becomes stale. @@ -270,9 +282,16 @@ public async Task CommitAsync(CommitRequest request, CallSetting { CheckNotDisposed(); GaxPreconditions.CheckNotNull(request, nameof(request)); - GaxPreconditions.CheckState(TransactionId != null, "Cannot commit a PooledSession with no associated transaction"); + var transactionId = TransactionId; + if (transactionId == null) + { + var beginRequest = new BeginTransactionRequest { Options = TransactionOptions }; + var transaction = await BeginTransactionAsync(beginRequest, null).ConfigureAwait(false); // TODO: Probably we'll need to pass callSettings for BeginTransaction. + transactionId = transaction.Id; + } + request.SessionAsSessionName = SessionName; - request.TransactionId = TransactionId; + request.TransactionId = transactionId; var response = await RecordSuccessAndExpiredSessions(Client.CommitAsync(request, callSettings)).ConfigureAwait(false); MarkAsCommittedOrRolledBack(); @@ -367,14 +386,11 @@ internal ReliableStreamReader ExecuteReadOrQueryStreamReader(ReadOrQueryRequest { CheckNotDisposed(); GaxPreconditions.CheckNotNull(request, nameof(request)); - if (TransactionId != null) - { - request.Transaction = new TransactionSelector { Id = TransactionId }; - } request.SessionAsSessionName = SessionName; SpannerClientImpl.ApplyResourcePrefixHeaderFromSession(ref callSettings, request.Session); - ResultStream stream = new ResultStream(Client, request, _session, callSettings); + // ExecuteWithTransactionSelector will be invoked in the MoveNext() method of ResultStream. + ResultStream stream = new ResultStream(Client, request, this, callSettings); return new ReliableStreamReader(stream, Client.Settings.Logger); } @@ -390,11 +406,14 @@ public Task ExecuteSqlAsync(ExecuteSqlRequest request, CallSettings c CheckNotDisposed(); GaxPreconditions.CheckNotNull(request, nameof(request)); request.SessionAsSessionName = SessionName; - if (TransactionId != null) + + return ExecuteWithTransactionSelector(AsyncWork, request.Transaction); + + async Task AsyncWork(TransactionSelector transaction) { - request.Transaction = new TransactionSelector { Id = TransactionId }; + request.Transaction = transaction; + return await RecordSuccessAndExpiredSessions(Client.ExecuteSqlAsync(request, callSettings)).ConfigureAwait(false); } - return RecordSuccessAndExpiredSessions(Client.ExecuteSqlAsync(request, callSettings)); } /// @@ -409,11 +428,13 @@ public Task ExecuteBatchDmlAsync(ExecuteBatchDmlRequest CheckNotDisposed(); GaxPreconditions.CheckNotNull(request, nameof(request)); request.SessionAsSessionName = SessionName; - if (TransactionId != null) + return ExecuteWithTransactionSelector(AsyncWork, request.Transaction); + + async Task AsyncWork(TransactionSelector transaction) { - request.Transaction = new TransactionSelector { Id = TransactionId }; + request.Transaction = transaction; + return await RecordSuccessAndExpiredSessions(Client.ExecuteBatchDmlAsync(request, callSettings)).ConfigureAwait(false); } - return RecordSuccessAndExpiredSessions(Client.ExecuteBatchDmlAsync(request, callSettings)); } /// @@ -421,30 +442,40 @@ public Task ExecuteBatchDmlAsync(ExecuteBatchDmlRequest /// /// /// This method does not affect of this object. Instead, typical usage will be to call this method followed - /// by to create a new using the transaction. + /// by to create a new using the transaction. /// /// The begin-transaction request. Must not be null. The request will be modified with session details /// from this object. /// If not null, applies overrides to this RPC call. /// A task representing the asynchronous operation. When the task completes, the result is the response from the RPC. - internal Task BeginTransactionAsync(BeginTransactionRequest request, CallSettings callSettings) + public Task BeginTransactionAsync(BeginTransactionRequest request, CallSettings callSettings) { CheckNotDisposed(); GaxPreconditions.CheckNotNull(request, nameof(request)); + if (TransactionOptions == null) + { + TransactionOptions = request.Options; + } + else if (TransactionOptions.ModeCase != request.Options.ModeCase) + { + // May be this check is not necessary but it's good to have. + throw new InvalidOperationException("BeginTransactionRequest and Session's TransactionOptions should have same mode."); + } + request.SessionAsSessionName = SessionName; return RecordSuccessAndExpiredSessions(Client.BeginTransactionAsync(request, callSettings)); } private async Task RecordSuccessAndExpiredSessions(Task task) { - var result = await task.WithSessionExpiryChecking(_session).ConfigureAwait(false); + var result = await task.WithSessionExpiryChecking(Session).ConfigureAwait(false); UpdateRefreshTime(); return result; } private async Task RecordSuccessAndExpiredSessions(Task task) { - await task.WithSessionExpiryChecking(_session).ConfigureAwait(false); + await task.WithSessionExpiryChecking(Session).ConfigureAwait(false); UpdateRefreshTime(); } @@ -467,5 +498,95 @@ private void CheckNotDisposed() throw new ObjectDisposedException($"PooledSession for {SessionName} has been disposed, and cannot be reused."); } } + + /// + /// TODO: Add Summary. + /// + /// The typr of result to be retunred. + /// Async task that will assign appropriate transaction selector + /// to the request and then execute the request. + /// The cancellation token. + /// The current transaction selector of request that needs to be updated. + /// + public async Task ExecuteWithTransactionSelector(Func> asyncWork, TransactionSelector currentTransactionSelector, CancellationToken cancellationToken = default) + { + GaxPreconditions.CheckNotNull(asyncWork, nameof(asyncWork)); + + // If transaction options is null it means tramsaction is not required + // to execute this operation. Which means It should have been invoked from Read Only Transaction. + // If current tramsaction selector if request is of type single use, the no need to inline begin a new transaction. + if (TransactionOptions == null || + currentTransactionSelector?.SelectorCase == TransactionSelector.SelectorOneofCase.SingleUse) + { + return await asyncWork(currentTransactionSelector).ConfigureAwait(false); + } + + // Transaction has been already started by an earlier operation. + if (TransactionId != null) + { + return await asyncWork(new TransactionSelector { Id = TransactionId }).ConfigureAwait(false); + } + + // Null Transaction Id means a new trasaction needs to inline begin/started. + // Only one thread should inline begin the transction. + + var enteredSemaphoreSlim = false; // Intially false for all threads. + if (TransactionId == null) + { + await _semaphoreSlim.WaitAsync().ConfigureAwait(false); // All except one thrad will wait here for semaphore to release. + enteredSemaphoreSlim = true; // True for first thread trying to inline begin. + } + + // Transaction was aready inline begin by another thread in parallel while this was on hold. + if (TransactionId != null) + { + return await asyncWork(new TransactionSelector { Id = TransactionId }).ConfigureAwait(false); + } + + TResult result = default; + try + { + result = await asyncWork(new TransactionSelector { Begin = TransactionOptions }).ConfigureAwait(false); + + if (TransactionId == null) + { + if (result is PartialResultSet) + { + TransactionId = ((PartialResultSet) (object) result).Metadata.Transaction.Id; + } + else if (result is ExecuteBatchDmlResponse) + { + TransactionId = ((ExecuteBatchDmlResponse) (object) result).ResultSets.FirstOrDefault().Metadata.Transaction.Id; + } + else + { + throw new InvalidOperationException("Invalid response type."); + } + } + + return result; + } + catch + { + if (result != null && result is ExecuteBatchDmlResponse) + { + TransactionId = ((ExecuteBatchDmlResponse) (object) result).ResultSets?.FirstOrDefault().Metadata.Transaction.Id; + throw; + } + + //Retry if inline fails. + var request = new BeginTransactionRequest { Options = TransactionOptions }; + var transaction = await BeginTransactionAsync(request, null).ConfigureAwait(false); // TODO: Probably we'll need to pass callSettings for BeginTransaction. + TransactionId = transaction.Id; + return await asyncWork(new TransactionSelector { Id = TransactionId }).ConfigureAwait(false); + } + finally + { + if (enteredSemaphoreSlim) + { + _semaphoreSlim.Release(); + } + } + } } } diff --git a/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/ResultStream.cs b/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/ResultStream.cs index 18a250f1452a..88938f80da6f 100644 --- a/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/ResultStream.cs +++ b/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/ResultStream.cs @@ -19,6 +19,7 @@ using Grpc.Core; using System; using System.Collections.Generic; +using System.IO; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -50,7 +51,7 @@ internal sealed class ResultStream : IAsyncStreamReader, IDisp private readonly LinkedList _buffer; private readonly SpannerClient _client; private readonly ReadOrQueryRequest _request; - private readonly Session _session; + private readonly PooledSession _session; private readonly CallSettings _callSettings; private readonly RetrySettings _retrySettings; private readonly int _maxBufferSize; @@ -66,7 +67,7 @@ internal sealed class ResultStream : IAsyncStreamReader, IDisp /// /// Constructor for normal usage, with default buffer size, backoff settings and jitter. /// - internal ResultStream(SpannerClient client, ReadOrQueryRequest request, Session session, CallSettings callSettings) + internal ResultStream(SpannerClient client, ReadOrQueryRequest request, PooledSession session, CallSettings callSettings) : this(client, request, session, callSettings, DefaultMaxBufferSize, s_defaultRetrySettings) { } @@ -77,7 +78,7 @@ internal ResultStream(SpannerClient client, ReadOrQueryRequest request, Session internal ResultStream( SpannerClient client, ReadOrQueryRequest request, - Session session, + PooledSession session, CallSettings callSettings, int maxBufferSize, RetrySettings retrySettings) @@ -103,9 +104,17 @@ public void Dispose() public async Task MoveNext(CancellationToken cancellationToken) { - var value = await ComputeNextAsync(cancellationToken).ConfigureAwait(false); + // AFTER ILB CONFIRM THIS BEHAVIOUR. IF THE TRANACTION IS NULL THEN EXECUTE THE READ/SELECT QUERY USING NULL TRANSACTION. + var value = await _session.ExecuteWithTransactionSelector(AsynWork, _request.Transaction).ConfigureAwait(false); + Current = value; return value != null; + + async Task AsynWork(TransactionSelector transaction) + { + _request.Transaction = transaction; + return await ComputeNextAsync(cancellationToken).ConfigureAwait(false); + } } // See https://github.com/googleapis/google-cloud-java/blob/master/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java#L2674 @@ -139,7 +148,7 @@ private async Task ComputeNextAsync(CancellationToken cancella } bool hasNext = await _grpcCall.ResponseStream .MoveNext(cancellationToken) - .WithSessionExpiryChecking(_session) + .WithSessionExpiryChecking(_session.Session) .ConfigureAwait(false); retryState.Reset(); diff --git a/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/SessionPool.TargetedSessionPool.cs b/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/SessionPool.TargetedSessionPool.cs index 7bb030789d18..f06b282bc705 100644 --- a/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/SessionPool.TargetedSessionPool.cs +++ b/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/SessionPool.TargetedSessionPool.cs @@ -43,8 +43,7 @@ internal sealed class TargetedSessionPool : SessionPoolBase // Mutable state, which should be accessed within the lock - private readonly ConcurrentStack _readOnlySessions = new ConcurrentStack(); - private readonly ConcurrentStack _readWriteSessions = new ConcurrentStack(); + private readonly ConcurrentStack _sessions = new ConcurrentStack(); private readonly ConcurrentQueue> _pendingAcquisitions = new ConcurrentQueue>(); @@ -101,11 +100,6 @@ internal sealed class TargetedSessionPool : SessionPoolBase /// internal int LiveOrRequestedSessionCount => Interlocked.CompareExchange(ref _liveOrRequestedSessionCount, 0, 0); - // Statistics maintained purely for diagnostic purposes. This lets us evaluate - // how effective transaction pre-warming is. - private long _rwTransactionRequests; - private long _rwTransactionRequestsPrewarmed; - internal TargetedSessionPool(SessionPool parent, SessionPoolSegmentKey key, bool acquireSessionsImmediately) : base(parent) { _segmentKey = GaxPreconditions.CheckNotNull(key, nameof(key)); @@ -161,8 +155,7 @@ public async Task AcquireSessionAsync(TransactionOptions transact private async Task AcquireSessionImplAsync(TransactionOptions transactionOptions, CancellationToken cancellationToken) { - var transactionMode = transactionOptions?.ModeCase ?? ModeOneofCase.None; - var sessionAcquisitionTask = GetSessionAcquisitionTask(transactionMode, cancellationToken); + var sessionAcquisitionTask = GetSessionAcquisitionTask(cancellationToken); // We've either fetched a task from the pool, or registered that a caller is waiting for one. // We may want to start creation tasks, either to replenish the pool or (if there were no pool entries) @@ -175,74 +168,17 @@ private async Task AcquireSessionImplAsync(TransactionOptions tra // We do this when a session is released, and in the maintenance task. // These happen frequently enough that we shouldn't need to worry about them here. - // Update statistics for prewarming - only after we've already acquired the session. - if (transactionMode == ModeOneofCase.ReadWrite) - { - Interlocked.Increment(ref _rwTransactionRequests); - if (session.TransactionMode == transactionMode) - { - Interlocked.Increment(ref _rwTransactionRequestsPrewarmed); - } - } - - // If we've already got the right transaction mode, we're done. - if (session.TransactionMode == transactionMode) - { - return session; - } - // Otherwise, we may need to forget an existing transaction, or request a new one. - else - { - // If we asked for a session with no transaction but we got one *with* a tranasction, - // we don't need to perform any RPCs - but we do need to return a PooledSession with - // no transaction ID. - if (transactionMode == ModeOneofCase.None) - { - return session.WithTransaction(null, ModeOneofCase.None); - } - else - { - bool success = false; - try - { - session = await BeginTransactionAsync(session, transactionOptions, cancellationToken).ConfigureAwait(false); - success = true; - return session; - } - finally - { - // If we succeeded in getting a session but not a transaction, we can reuse the session later, but still fail this call. - // It counts as "inactive" because the failure will decrement the active session count already. - // Note that the only way success is false is if we're throwing an exception, so we'll never release it - // *and* then return it. - if (!success) - { - ReleaseInactiveSession(session, maybeCreateReadWriteTransaction: false); - } - } - } - } + // We'll set the required ransaction option to the session before returning it. + session.TransactionOptions = transactionOptions ?? new TransactionOptions { ReadOnly = new Types.ReadOnly() }; + return session; } - private Task GetSessionAcquisitionTask(ModeOneofCase transactionMode, CancellationToken cancellationToken) + private Task GetSessionAcquisitionTask(CancellationToken cancellationToken) { - // Three scenarios for initial session picking: - // - No transaction options: take a read-only session as-is - // - Read/write transaction options: take a read/write session as-is - // - Other options (a non-single-use read-only bound, or PDML): take a read-only session, but fetch a transaction with it before returning it. - // If there's no session of the appropriate type, take the other kind instead - at the cost of either wasting an existing read/write - // transaction, or having to acquire a read/write transaction. - ConcurrentStack preferredStack = _readOnlySessions; - ConcurrentStack alternateStack = _readWriteSessions; - if (transactionMode == ModeOneofCase.ReadWrite) - { - preferredStack = _readWriteSessions; - alternateStack = _readOnlySessions; - } lock (_lock) { // First try the pool. - if (preferredStack.TryPop(out var session) || alternateStack.TryPop(out session)) + if (_sessions.TryPop(out var session)) { // Slight inefficiency wrapping this in a task, but it makes the implementation simpler. return Task.FromResult(session); @@ -273,7 +209,7 @@ async Task NurseAndRetryAsync() await GetNursePoolBackToHealthTask(cancellationToken).ConfigureAwait(false); // If we reached this point the nursing task succeeded and the pool is healthy. - var acquisitionTask = GetSessionAcquisitionTask(transactionMode, cancellationToken); + var acquisitionTask = GetSessionAcquisitionTask(cancellationToken); // Although the pool is back to being healthy, it might be depleted, since for making it healthy // we only created the last failed batch worth of sessions. Let's try and make it ready again. StartSessionCreationTasksIfNecessary(); @@ -291,11 +227,7 @@ private void EvictSession(PooledSession session) /// Release a session back to the pool (or refresh) but don't change the number of active sessions. /// /// The session to stack. Should be "active" (i.e. not disposed) - /// Whether to allow the session to go through a cycle of acquiring a read/write transaction. - /// This is true unless we've just come from attempting to create a read/write transaction, in which case either we succeeded (no need - /// to create a new one) or failed (in which case we should just keep it read-only). - /// - private void ReleaseInactiveSession(PooledSession session, bool maybeCreateReadWriteTransaction) + private void ReleaseInactiveSession(PooledSession session) { if (Shutdown) { @@ -312,15 +244,13 @@ private void ReleaseInactiveSession(PooledSession session, bool maybeCreateReadW return; } - // There are a couple of cases where we need to take an action outside the lock after breaking + // There is a case where we need to take an action outside the lock after breaking // out of the loop. It's simplest to remember that in a delegate. Action outsideLockAction = null; // We need to atomically (within the lock) decide between: // - Adding the session to a pool stack (adding performed within the lock) // - Providing the session to a waiting caller (setting the result peformed outside the lock) - // - If it's currently not got a transaction but we need more read/write transactions, starting a transaction - // In the last case, we will come back to this code to make another decision later. while (true) { TaskCompletionSource pendingAquisition; @@ -329,43 +259,9 @@ private void ReleaseInactiveSession(PooledSession session, bool maybeCreateReadW // Only add a session to a stack if there are no pending acquisitions. if (!_pendingAcquisitions.TryDequeue(out pendingAquisition)) { - // Options: - // - Decide to create a new read/write transaction (will get back here later) - // - Push the current session as read-only or read/write depending on its mode - ConcurrentStack stack; - - // If the session already has a read/write transaction, add it to the read/write pool immediately. - // Otherwise, work out whether we *want* it to be read/write. - if (session.TransactionMode == ModeOneofCase.ReadWrite) - { - stack = _readWriteSessions; - } - else - { - var readCount = _readOnlySessions.Count; - var writeCount = _readWriteSessions.Count; - // Avoid division by zero by including the new session in the denominator. - var writeProportion = writeCount / (writeCount + readCount + 1.0); - bool createReadWriteTransaction = maybeCreateReadWriteTransaction && writeProportion < Options.WriteSessionsFraction; - if (createReadWriteTransaction) - { - // Exit the loop, and acquire a read/write transaction - outsideLockAction = () => Parent.ConsumeBackgroundTask(TryCreateReadWriteTransactionAndReturnToPool(session), "transaction creation"); - break; - } - else - { - // At this point we didn't already have a r/w transaction, and we don't want to - // create one, so add it to the pool of read-only sessions. - stack = _readOnlySessions; - } - } - // We definitely have a stack now, so add the session to it, and - // potentially release tasks waiting for the pool to reach minimum size. - stack.Push(session); + _sessions.Push(session); - int poolSize = _readOnlySessions.Count + _readWriteSessions.Count; - if (poolSize >= Options.MinimumPooledSessions && _minimumSizeWaiters.Count > 0) + if (_sessions.Count >= Options.MinimumPooledSessions && _minimumSizeWaiters.Count > 0) { var minimumSizeWaiters = _minimumSizeWaiters.ToList(); outsideLockAction = () => minimumSizeWaiters.ForEach(tcs => tcs.TrySetResult(0)); @@ -398,7 +294,8 @@ private async Task RefreshAsync(PooledSession session) try { var callSettings = Client.Settings.ExecuteSqlSettings.WithExpiration(Expiration.FromTimeout(Options.Timeout)); - await session.ExecuteSqlAsync(new ExecuteSqlRequest { Sql = "SELECT 1" }, callSettings).ConfigureAwait(false); + var result = await session.ExecuteSqlAsync(new ExecuteSqlRequest { Sql = "SELECT 1" }, callSettings).ConfigureAwait(false); + session.TransactionId = result.Metadata.Transaction.Id; } catch (RpcException e) { @@ -411,52 +308,7 @@ private async Task RefreshAsync(PooledSession session) Interlocked.Decrement(ref _inFlightSessionCreationCount); } // We now definitely don't have a transaction. - ReleaseInactiveSession(session.WithTransaction(null, ModeOneofCase.None), maybeCreateReadWriteTransaction: true); - } - - private async Task TryCreateReadWriteTransactionAndReturnToPool(PooledSession session) - { - try - { - session = await BeginTransactionAsync(session, s_readWriteOptions, CancellationToken.None).ConfigureAwait(false); - } - catch (RpcException e) - { - // Failed to create a read/write transaction; release this back to the pool, but making - // sure we don't come back here. - Parent._logger.Warn("Failed to create read/write transaction for pooled session", e); - } - ReleaseInactiveSession(session, maybeCreateReadWriteTransaction: false); - } - - private async Task BeginTransactionAsync(PooledSession session, TransactionOptions options, CancellationToken cancellationToken, bool isSessionAcquired = false) - { - // While we're creating a transaction, it's as if we're preparing a new session - it's a period of time - // where there's already an RPC in flight, and when it completes a session will be available. - // But if that session is being held by calling code already, and we are here because client code - // requested a transaction refresh then we can't count the session as if it were being prepared - // because after the transaction is refreshed the session won't be release back to the pool, it will still - // be held by calling code. - if (!isSessionAcquired) - { - Interlocked.Increment(ref _inFlightSessionCreationCount); - } - var request = new BeginTransactionRequest { Options = options }; - try - { - var callSettings = Client.Settings.BeginTransactionSettings - .WithExpiration(Expiration.FromTimeout(Options.Timeout)) - .WithCancellationToken(cancellationToken); - var transaction = await session.BeginTransactionAsync(request, callSettings).ConfigureAwait(false); - return session.WithTransaction(transaction.Id, options.ModeCase, transaction.ReadTimestamp); - } - finally - { - if (!isSessionAcquired) - { - Interlocked.Decrement(ref _inFlightSessionCreationCount); - } - } + ReleaseInactiveSession(session.WithTransaction(null)); } public override Task WithFreshTransactionOrNewAsync(PooledSession session, TransactionOptions transactionOptions, CancellationToken cancellationToken) @@ -465,25 +317,10 @@ public override Task WithFreshTransactionOrNewAsync(PooledSession { // Let's just release it back to the pool, that will handle the refreshing etc. session.ReleaseToPool(false); + return AcquireSessionAsync(transactionOptions, cancellationToken); } - else - { - try - { - // Let's try to begin a new transaction for this same session. - return BeginTransactionAsync(session, transactionOptions, cancellationToken, true); - } - catch(RpcException e) - { - Parent._logger.Warn("Failed to create transaction for acquired session", e); - // Failed to create the transaction for the session. - // Release the session back to the pool. - // We'll try to acquire a new session now. - session.ReleaseToPool(false); - } - } - // If we are here we need to acquire a new session. - return AcquireSessionAsync(transactionOptions, cancellationToken); + + return Task.FromResult(session.WithTransaction(null, session.ReadTimestamp)); } /// @@ -517,7 +354,7 @@ private async Task ReleaseAsync(PooledSession session, ByteString transactionId, } else { - ReleaseInactiveSession(session, maybeCreateReadWriteTransaction: true); + ReleaseInactiveSession(session); } } @@ -544,8 +381,7 @@ private void EvictAndRefreshSessions() LinkedList staleSessions = new LinkedList(); lock (_lock) { - RemoveStaleOrExpiredItemsFromStack(_readOnlySessions); - RemoveStaleOrExpiredItemsFromStack(_readWriteSessions); + RemoveStaleOrExpiredItemsFromStack(_sessions); } foreach (var session in sessionsToEvict) @@ -625,7 +461,7 @@ async Task PrepareNewSessionsAsync() var sessions = await CreatePooledSessionsAsync(CancellationToken.None).ConfigureAwait(false); foreach (var session in sessions) { - ReleaseInactiveSession(session, maybeCreateReadWriteTransaction: true); + ReleaseInactiveSession(session); } } // Note: we expect this to always actually be an RpcException, but we don't want to end up unhealthy @@ -651,7 +487,7 @@ async Task PrepareNewSessionsAsync() minimumSizeWaiters.ForEach(tcs => tcs.TrySetException(e)); } } - } + } private Task GetNursePoolBackToHealthTask(CancellationToken cancellationToken) { @@ -678,7 +514,7 @@ private Task GetNursePoolBackToHealthTask(CancellationToken cancellationToken) var sessions = await CreatePooledSessionsAsync(CancellationToken.None).ConfigureAwait(false); foreach (var session in sessions) { - ReleaseInactiveSession(session, maybeCreateReadWriteTransaction: true); + ReleaseInactiveSession(session); } // We have succesfully nursed the pool back to health. @@ -706,7 +542,7 @@ private Task GetNursePoolBackToHealthTask(CancellationToken cancellationToken) // Whatever happened we are done nursing the pool back to health. Interlocked.CompareExchange(ref _nurseBackToHealthTask, null, newTcs); } - }), "nurse pool back to health" ); + }), "nurse pool back to health"); // Use the cancellation token now, if the caller cancels, the task they are waiting on will be canceled, // but not the nursing. @@ -763,7 +599,7 @@ private async Task> CreatePooledSessionsAsync(CancellationT success = true; actualCreatedSessions = batchSessionCreateResponse.Session.Count; - foreach(var sessionProto in batchSessionCreateResponse.Session) + foreach (var sessionProto in batchSessionCreateResponse.Session) { pooledSessions.Add(PooledSession.FromSessionName(this, sessionProto.SessionName)); } @@ -782,7 +618,7 @@ private async Task> CreatePooledSessionsAsync(CancellationT } } } - catch(Exception e) + catch (Exception e) { Parent._logger.Warn(() => $"Failed to batch create sessions for {_segmentKey}", e); throw; @@ -820,8 +656,6 @@ private bool ShouldCreateMoreSessions(out int batchSize) { if (Healthy) { - int poolSize = _readWriteSessions.Count + _readOnlySessions.Count; - // Determine how many more sessions to create. // We want to make sure that if all existing and new requests for session creation succeed: // - All queuing callers will have a session @@ -835,7 +669,7 @@ private bool ShouldCreateMoreSessions(out int batchSize) // How many more sessions do we need to create in order to get the pool to the minimum size, after satisfying // pending callers, assuming that all the in-flight session requests succeed, and there are no more requests? - int newSessionsToSatisfyPool = (_pendingAcquisitions.Count + Options.MinimumPooledSessions) - (poolSize + InFlightSessionCreationCount); + int newSessionsToSatisfyPool = (_pendingAcquisitions.Count + Options.MinimumPooledSessions) - (_sessions.Count + InFlightSessionCreationCount); // How many more sessions *can* we create without going over the maximum number of active sessions? int maxPossibleNewSessions = Options.MaximumActiveSessions - LiveOrRequestedSessionCount; @@ -858,12 +692,9 @@ internal SessionPoolSegmentStatistics GetStatisticsSnapshot() return new SessionPoolSegmentStatistics( _segmentKey, ActiveSessionCount, - _readOnlySessions.Count, - _readWriteSessions.Count, + _sessions.Count, InFlightSessionCreationCount, _pendingAcquisitions.Count, - Interlocked.CompareExchange(ref _rwTransactionRequests, 0L, 0L), - Interlocked.CompareExchange(ref _rwTransactionRequestsPrewarmed, 0L, 0L), Healthy, Shutdown); } @@ -889,7 +720,7 @@ internal async Task WhenPoolReady(CancellationToken cancellationToken = default) { throw new RpcException(new Status(StatusCode.Unknown, "Session pool was unhealthy")); } - if (_readOnlySessions.Count + _readWriteSessions.Count >= Options.MinimumPooledSessions) + if (_sessions.Count >= Options.MinimumPooledSessions) { return; } @@ -936,8 +767,7 @@ private async Task ExecuteShutdownAsync(TaskCompletionSource tcsToSignal) { lock (_lock) { - if (_readWriteSessions.Count == 0 && - _readOnlySessions.Count == 0 && + if (_sessions.Count == 0 && ActiveSessionCount == 0 && InFlightSessionCreationCount == 0 && _pendingAcquisitions.Count == 0 && @@ -959,11 +789,7 @@ private async Task ExecuteShutdownAsync(TaskCompletionSource tcsToSignal) List> pendingAcquisitionsToCancel = new List>(); lock (_lock) { - while (_readOnlySessions.TryPop(out var session)) - { - sessionsToDelete.Add(session); - } - while (_readWriteSessions.TryPop(out var session)) + while (_sessions.TryPop(out var session)) { sessionsToDelete.Add(session); }