From 8c5436660ca893ddc8d7a351c08c32f43a9abb79 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Thu, 6 Apr 2023 08:01:23 +0200 Subject: [PATCH] Transaction.Current may not be set after opening the session (#184) * A quick spike to verify we can float the tx scope Co-authored-by: Szymon Pobiega Co-authored-by: Tomasz Masternak * Add a test --------- Co-authored-by: Szymon Pobiega Co-authored-by: Tomasz Masternak # Conflicts: # src/NServiceBus.TransactionalSession.AcceptanceTests/When_using_outbox.cs # src/NServiceBus.TransactionalSession.CustomTestingPersistence/CustomTestingOutboxStorage.cs # src/NServiceBus.TransactionalSession/OutboxTransactionalSession.cs # src/NServiceBus.TransactionalSession/TransactionalSessionBase.cs --- .../When_using_outbox.cs | 32 ++++++++++++++++++- .../CustomTestingOutboxStorage.cs | 7 ++++ ...tomTestingPersistenceOpenSessionOptions.cs | 2 ++ .../NonOutboxTransactionalSession.cs | 16 +++++++++- .../OutboxTransactionalSession.cs | 27 ++++++++++++++-- .../TransactionalSessionBase.cs | 27 +++------------- 6 files changed, 83 insertions(+), 28 deletions(-) diff --git a/src/NServiceBus.TransactionalSession.AcceptanceTests/When_using_outbox.cs b/src/NServiceBus.TransactionalSession.AcceptanceTests/When_using_outbox.cs index 2474072..7fc4cd8 100644 --- a/src/NServiceBus.TransactionalSession.AcceptanceTests/When_using_outbox.cs +++ b/src/NServiceBus.TransactionalSession.AcceptanceTests/When_using_outbox.cs @@ -2,6 +2,7 @@ { using System.Threading; using System.Threading.Tasks; + using System.Transactions; using AcceptanceTesting; using Infrastructure; using NUnit.Framework; @@ -74,10 +75,39 @@ public async Task Should_send_immediate_dispatch_messages_even_if_session_is_not Assert.True(result.MessageReceived); } + [Test] + public async Task Should_make_it_possible_float_ambient_transactions() + { + var result = await Scenario.Define() + .WithEndpoint(s => s.When(async (_, ctx) => + { + using var scope = ctx.Builder.CreateChildBuilder(); + using var transactionalSession = scope.Build(); + + await transactionalSession.Open(new CustomTestingPersistenceOpenSessionOptions + { + UseTransactionScope = true + }); + + ctx.AmbientTransactionFoundBeforeAwait = Transaction.Current != null; + + await Task.Yield(); + + ctx.AmbientTransactionFoundAfterAwait = Transaction.Current != null; + })) + .Done(c => c.EndpointsStarted) + .Run(); + + Assert.True(result.AmbientTransactionFoundBeforeAwait, "The ambient transaction was not visible before the await"); + Assert.True(result.AmbientTransactionFoundAfterAwait, "The ambient transaction was not visible after the await"); + } + class Context : ScenarioContext, IInjectBuilder { - public bool MessageReceived { get; set; } + public bool AmbientTransactionFoundBeforeAwait { get; set; } + public bool AmbientTransactionFoundAfterAwait { get; set; } public bool CompleteMessageReceived { get; set; } + public bool MessageReceived { get; set; } public IBuilder Builder { get; set; } } diff --git a/src/NServiceBus.TransactionalSession.CustomTestingPersistence/CustomTestingOutboxStorage.cs b/src/NServiceBus.TransactionalSession.CustomTestingPersistence/CustomTestingOutboxStorage.cs index 9e9b115..1389357 100644 --- a/src/NServiceBus.TransactionalSession.CustomTestingPersistence/CustomTestingOutboxStorage.cs +++ b/src/NServiceBus.TransactionalSession.CustomTestingPersistence/CustomTestingOutboxStorage.cs @@ -3,8 +3,10 @@ namespace NServiceBus.AcceptanceTesting using System; using System.Collections.Concurrent; using System.Threading.Tasks; + using System.Transactions; using Extensibility; using Outbox; + using TransactionalSession; sealed class CustomTestingOutboxStorage : IOutboxStorage { @@ -25,6 +27,11 @@ public Task Get(string messageId, ContextBag context) public Task BeginTransaction(ContextBag context) { + if (context.TryGet(out CustomTestingPersistenceOpenSessionOptions options) && options.UseTransactionScope) + { + _ = new TransactionScope(TransactionScopeOption.RequiresNew, TransactionScopeAsyncFlowOption.Enabled); + } + return Task.FromResult(new CustomTestingOutboxTransaction(context)); } diff --git a/src/NServiceBus.TransactionalSession.CustomTestingPersistence/CustomTestingPersistenceOpenSessionOptions.cs b/src/NServiceBus.TransactionalSession.CustomTestingPersistence/CustomTestingPersistenceOpenSessionOptions.cs index 4c928b2..ba185e0 100644 --- a/src/NServiceBus.TransactionalSession.CustomTestingPersistence/CustomTestingPersistenceOpenSessionOptions.cs +++ b/src/NServiceBus.TransactionalSession.CustomTestingPersistence/CustomTestingPersistenceOpenSessionOptions.cs @@ -7,5 +7,7 @@ public class CustomTestingPersistenceOpenSessionOptions : OpenSessionOptions public CustomTestingPersistenceOpenSessionOptions() => Extensions.Set(this); public TaskCompletionSource TransactionCommitTaskCompletionSource { get; set; } + + public bool UseTransactionScope { get; set; } } } \ No newline at end of file diff --git a/src/NServiceBus.TransactionalSession/NonOutboxTransactionalSession.cs b/src/NServiceBus.TransactionalSession/NonOutboxTransactionalSession.cs index c76016d..a6ab3cc 100644 --- a/src/NServiceBus.TransactionalSession/NonOutboxTransactionalSession.cs +++ b/src/NServiceBus.TransactionalSession/NonOutboxTransactionalSession.cs @@ -1,5 +1,6 @@ namespace NServiceBus.TransactionalSession { + using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; @@ -26,7 +27,20 @@ protected override async Task CommitInternal(CancellationToken cancellationToken public override async Task Open(OpenSessionOptions options, CancellationToken cancellationToken = default) { - await base.Open(options, cancellationToken).ConfigureAwait(false); + ThrowIfDisposed(); + ThrowIfCommitted(); + + if (IsOpen) + { + throw new InvalidOperationException($"This session is already open. {nameof(ITransactionalSession)}.{nameof(ITransactionalSession.Open)} should only be called once."); + } + + this.options = options; + + foreach (var customization in customizations) + { + customization.Apply(this.options); + } await synchronizedStorageSession.Open(null, new TransportTransaction(), Context).ConfigureAwait(false); } diff --git a/src/NServiceBus.TransactionalSession/OutboxTransactionalSession.cs b/src/NServiceBus.TransactionalSession/OutboxTransactionalSession.cs index cd1633a..3e88087 100644 --- a/src/NServiceBus.TransactionalSession/OutboxTransactionalSession.cs +++ b/src/NServiceBus.TransactionalSession/OutboxTransactionalSession.cs @@ -141,11 +141,32 @@ static void SerializeDeliveryConstraint(DeliveryConstraint constraint, Dictionar throw new Exception($"Unknown delivery constraint {constraint.GetType().FullName}"); } - public override async Task Open(OpenSessionOptions options, CancellationToken cancellationToken = default) + public override Task Open(OpenSessionOptions options, CancellationToken cancellationToken = default) { - await base.Open(options, cancellationToken).ConfigureAwait(false); + ThrowIfDisposed(); + ThrowIfCommitted(); - outboxTransaction = await outboxStorage.BeginTransaction(Context).ConfigureAwait(false); + if (IsOpen) + { + throw new InvalidOperationException($"This session is already open. {nameof(ITransactionalSession)}.{nameof(ITransactionalSession.Open)} should only be called once."); + } + + this.options = options; + + foreach (var customization in customizations) + { + customization.Apply(this.options); + } + + // Unfortunately this is the only way to make it possible for Transaction.Current to float up to the caller + // to make sure SQLP and NHibernate work with the transaction scope + var outboxTransactionTask = outboxStorage.BeginTransaction(Context); + return OpenInternal(outboxTransactionTask, cancellationToken); + } + + async Task OpenInternal(Task beginTransactionTask, CancellationToken cancellationToken) + { + outboxTransaction = await beginTransactionTask.ConfigureAwait(false); if (!await synchronizedStorageSession.TryOpen(outboxTransaction, Context).ConfigureAwait(false)) { diff --git a/src/NServiceBus.TransactionalSession/TransactionalSessionBase.cs b/src/NServiceBus.TransactionalSession/TransactionalSessionBase.cs index 6598c8a..0e1f3b9 100644 --- a/src/NServiceBus.TransactionalSession/TransactionalSessionBase.cs +++ b/src/NServiceBus.TransactionalSession/TransactionalSessionBase.cs @@ -64,29 +64,10 @@ public async Task Commit(CancellationToken cancellationToken = default) committed = true; } + public abstract Task Open(OpenSessionOptions options, CancellationToken cancellationToken = default); protected abstract Task CommitInternal(CancellationToken cancellationToken = default); - public virtual Task Open(OpenSessionOptions options, CancellationToken cancellationToken = default) - { - ThrowIfDisposed(); - ThrowIfCommitted(); - - if (IsOpen) - { - throw new InvalidOperationException($"This session is already open. {nameof(ITransactionalSession)}.{nameof(ITransactionalSession.Open)} should only be called once."); - } - - this.options = options; - - foreach (var customization in customizations) - { - customization.Apply(this.options); - } - - return Task.CompletedTask; - } - public async Task Send(object message, SendOptions sendOptions, CancellationToken cancellationToken = default) { ThrowIfInvalidState(); @@ -119,7 +100,7 @@ public async Task Publish(Action messageConstructor, PublishOptions publis await messageSession.Publish(messageConstructor, publishOptions).ConfigureAwait(false); } - void ThrowIfDisposed() + protected void ThrowIfDisposed() { if (disposed) { @@ -127,7 +108,7 @@ void ThrowIfDisposed() } } - void ThrowIfCommitted() + protected void ThrowIfCommitted() { if (committed) { @@ -170,7 +151,7 @@ protected virtual void Dispose(bool disposing) protected readonly CompletableSynchronizedStorageSessionAdapter synchronizedStorageSession; protected readonly IDispatchMessages dispatcher; - readonly IEnumerable customizations; + protected readonly IEnumerable customizations; protected readonly PendingTransportOperations pendingOperations; protected OpenSessionOptions options; readonly IMessageSession messageSession;