Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transaction.Current may not be set after opening the session #185

Merged
merged 1 commit into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{
using System.Threading;
using System.Threading.Tasks;
using System.Transactions;
using AcceptanceTesting;
using Infrastructure;
using NUnit.Framework;
Expand Down Expand Up @@ -74,10 +75,39 @@ public async Task Should_send_immediate_dispatch_messages_even_if_session_is_not
Assert.True(result.MessageReceived);
}

[Test]
public async Task Should_make_it_possible_float_ambient_transactions()
{
var result = await Scenario.Define<Context>()
.WithEndpoint<AnEndpoint>(s => s.When(async (_, ctx) =>
{
using var scope = ctx.Builder.CreateChildBuilder();
using var transactionalSession = scope.Build<ITransactionalSession>();

await transactionalSession.Open(new CustomTestingPersistenceOpenSessionOptions
{
UseTransactionScope = true
});

ctx.AmbientTransactionFoundBeforeAwait = Transaction.Current != null;

await Task.Yield();

ctx.AmbientTransactionFoundAfterAwait = Transaction.Current != null;
}))
.Done(c => c.EndpointsStarted)
.Run();

Assert.True(result.AmbientTransactionFoundBeforeAwait, "The ambient transaction was not visible before the await");
Assert.True(result.AmbientTransactionFoundAfterAwait, "The ambient transaction was not visible after the await");
}

class Context : ScenarioContext, IInjectBuilder
{
public bool MessageReceived { get; set; }
public bool AmbientTransactionFoundBeforeAwait { get; set; }
public bool AmbientTransactionFoundAfterAwait { get; set; }
public bool CompleteMessageReceived { get; set; }
public bool MessageReceived { get; set; }
public IBuilder Builder { get; set; }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ namespace NServiceBus.AcceptanceTesting
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using System.Transactions;
using Extensibility;
using Outbox;
using TransactionalSession;

sealed class CustomTestingOutboxStorage : IOutboxStorage
{
Expand All @@ -25,6 +27,11 @@ public Task<OutboxMessage> Get(string messageId, ContextBag context)

public Task<OutboxTransaction> BeginTransaction(ContextBag context)
{
if (context.TryGet(out CustomTestingPersistenceOpenSessionOptions options) && options.UseTransactionScope)
{
_ = new TransactionScope(TransactionScopeOption.RequiresNew, TransactionScopeAsyncFlowOption.Enabled);
}

return Task.FromResult<OutboxTransaction>(new CustomTestingOutboxTransaction(context));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,7 @@ public class CustomTestingPersistenceOpenSessionOptions : OpenSessionOptions
public CustomTestingPersistenceOpenSessionOptions() => Extensions.Set(this);

public TaskCompletionSource<bool> TransactionCommitTaskCompletionSource { get; set; }

public bool UseTransactionScope { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace NServiceBus.TransactionalSession
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -26,7 +27,20 @@ protected override async Task CommitInternal(CancellationToken cancellationToken

public override async Task Open(OpenSessionOptions options, CancellationToken cancellationToken = default)
{
await base.Open(options, cancellationToken).ConfigureAwait(false);
ThrowIfDisposed();
ThrowIfCommitted();

if (IsOpen)
{
throw new InvalidOperationException($"This session is already open. {nameof(ITransactionalSession)}.{nameof(ITransactionalSession.Open)} should only be called once.");
}

this.options = options;

foreach (var customization in customizations)
{
customization.Apply(this.options);
}

await synchronizedStorageSession.Open(null, new TransportTransaction(), Context).ConfigureAwait(false);
}
Expand Down
27 changes: 24 additions & 3 deletions src/NServiceBus.TransactionalSession/OutboxTransactionalSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,32 @@ static void SerializeDeliveryConstraint(DeliveryConstraint constraint, Dictionar
throw new Exception($"Unknown delivery constraint {constraint.GetType().FullName}");
}

public override async Task Open(OpenSessionOptions options, CancellationToken cancellationToken = default)
public override Task Open(OpenSessionOptions options, CancellationToken cancellationToken = default)
{
await base.Open(options, cancellationToken).ConfigureAwait(false);
ThrowIfDisposed();
ThrowIfCommitted();

outboxTransaction = await outboxStorage.BeginTransaction(Context).ConfigureAwait(false);
if (IsOpen)
{
throw new InvalidOperationException($"This session is already open. {nameof(ITransactionalSession)}.{nameof(ITransactionalSession.Open)} should only be called once.");
}

this.options = options;

foreach (var customization in customizations)
{
customization.Apply(this.options);
}

// Unfortunately this is the only way to make it possible for Transaction.Current to float up to the caller
// to make sure SQLP and NHibernate work with the transaction scope
var outboxTransactionTask = outboxStorage.BeginTransaction(Context);
return OpenInternal(outboxTransactionTask, cancellationToken);
}

async Task OpenInternal(Task<OutboxTransaction> beginTransactionTask, CancellationToken cancellationToken)
{
outboxTransaction = await beginTransactionTask.ConfigureAwait(false);

if (!await synchronizedStorageSession.TryOpen(outboxTransaction, Context).ConfigureAwait(false))
{
Expand Down
27 changes: 4 additions & 23 deletions src/NServiceBus.TransactionalSession/TransactionalSessionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,29 +64,10 @@ public async Task Commit(CancellationToken cancellationToken = default)
committed = true;
}

public abstract Task Open(OpenSessionOptions options, CancellationToken cancellationToken = default);

protected abstract Task CommitInternal(CancellationToken cancellationToken = default);

public virtual Task Open(OpenSessionOptions options, CancellationToken cancellationToken = default)
{
ThrowIfDisposed();
ThrowIfCommitted();

if (IsOpen)
{
throw new InvalidOperationException($"This session is already open. {nameof(ITransactionalSession)}.{nameof(ITransactionalSession.Open)} should only be called once.");
}

this.options = options;

foreach (var customization in customizations)
{
customization.Apply(this.options);
}

return Task.CompletedTask;
}

public async Task Send(object message, SendOptions sendOptions, CancellationToken cancellationToken = default)
{
ThrowIfInvalidState();
Expand Down Expand Up @@ -119,15 +100,15 @@ public async Task Publish<T>(Action<T> messageConstructor, PublishOptions publis
await messageSession.Publish(messageConstructor, publishOptions).ConfigureAwait(false);
}

void ThrowIfDisposed()
protected void ThrowIfDisposed()
{
if (disposed)
{
throw new ObjectDisposedException(nameof(Dispose));
}
}

void ThrowIfCommitted()
protected void ThrowIfCommitted()
{
if (committed)
{
Expand Down Expand Up @@ -170,7 +151,7 @@ protected virtual void Dispose(bool disposing)

protected readonly CompletableSynchronizedStorageSessionAdapter synchronizedStorageSession;
protected readonly IDispatchMessages dispatcher;
readonly IEnumerable<IOpenSessionOptionsCustomization> customizations;
protected readonly IEnumerable<IOpenSessionOptionsCustomization> customizations;
protected readonly PendingTransportOperations pendingOperations;
protected OpenSessionOptions options;
readonly IMessageSession messageSession;
Expand Down