Skip to content

Commit

Permalink
Merge pull request #385 from Particular/backport-fix
Browse files Browse the repository at this point in the history
Make sure PendingTransportOperations clearing doesn't leak state from previous executions
  • Loading branch information
danielmarbach authored Jun 28, 2022
2 parents 0c4ee85 + 75681e7 commit fbf954a
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
namespace NServiceBus.Persistence.CosmosDB.Tests
{
using System;
using System.Collections.Generic;
using NUnit.Framework;
using Routing;
using Transport;

[TestFixture]
public class PendingTransportOperationsExtensionsTests
{
[Test]
public void Should_clear_existing_operations()
{
var operations = new PendingTransportOperations();
operations.Add(new TransportOperation(
new OutgoingMessage("", new Dictionary<string, string>(), Array.Empty<byte>()),
new UnicastAddressTag("someQueue")));

operations.Clear();

Assert.That(operations.Operations, Is.Empty);
}

[Test]
public void Should_support_adding_after_clearing()
{
var operations = new PendingTransportOperations();
operations.Add(new TransportOperation(
new OutgoingMessage("1", new Dictionary<string, string>(), Array.Empty<byte>()),
new UnicastAddressTag("someQueue")));

operations.Clear();

operations.Add(new TransportOperation(
new OutgoingMessage("2", new Dictionary<string, string>(), Array.Empty<byte>()),
new UnicastAddressTag("someQueue")));

operations.Clear();

operations.Add(new TransportOperation(
new OutgoingMessage("3", new Dictionary<string, string>(), Array.Empty<byte>()),
new UnicastAddressTag("someQueue")));

Assert.That(operations.Operations, Has.Length.EqualTo(1));
}
}
}
16 changes: 1 addition & 15 deletions src/NServiceBus.Persistence.CosmosDB/Outbox/OutboxBehavior.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
namespace NServiceBus.Persistence.CosmosDB
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Reflection;
using System.Threading.Tasks;
using DelayedDelivery;
using DeliveryConstraints;
Expand Down Expand Up @@ -37,16 +34,6 @@ internal LogicalOutboxBehavior()
/// <remarks>Can be renamed back to LogicalOutboxBehavior once the type is gone from the public API.</remarks>
class OutboxBehavior : IBehavior<IIncomingLogicalMessageContext, IIncomingLogicalMessageContext>
{
static OutboxBehavior()
{
var field = typeof(PendingTransportOperations).GetField("operations", BindingFlags.NonPublic | BindingFlags.Instance);
var targetExp = Expression.Parameter(typeof(PendingTransportOperations), "target");
var fieldExp = Expression.Field(targetExp, field);
var assignExp = Expression.Assign(fieldExp, Expression.Constant(new ConcurrentStack<TransportOperation>()));

setter = Expression.Lambda<Action<PendingTransportOperations>>(assignExp, targetExp).Compile();
}

internal OutboxBehavior(ContainerHolderResolver containerHolderResolver, JsonSerializer serializer)
{
this.containerHolderResolver = containerHolderResolver;
Expand Down Expand Up @@ -103,7 +90,7 @@ public async Task Invoke(IIncomingLogicalMessageContext context, Func<IIncomingL
outboxTransaction.SuppressStoreAndCommit = true;

var pendingTransportOperations = context.Extensions.Get<PendingTransportOperations>();
setter(pendingTransportOperations);
pendingTransportOperations.Clear();

foreach (var operation in outboxRecord.TransportOperations)
{
Expand Down Expand Up @@ -160,7 +147,6 @@ static AddressTag DeserializeRoutingStrategy(Dictionary<string, string> options)
}

readonly JsonSerializer serializer;
static Action<PendingTransportOperations> setter;
readonly ContainerHolderResolver containerHolderResolver;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
namespace NServiceBus.Persistence.CosmosDB
{
using System;
using System.Collections.Concurrent;
using System.Linq.Expressions;
using System.Reflection;
using Transport;

static class PendingTransportOperationsExtensions
{
static PendingTransportOperationsExtensions()
{
var field = typeof(PendingTransportOperations).GetField("operations",
BindingFlags.NonPublic | BindingFlags.Instance);
var targetExp = Expression.Parameter(typeof(PendingTransportOperations), "target");
var fieldExp = Expression.Field(targetExp, field);
getter = Expression
.Lambda<Func<PendingTransportOperations, ConcurrentStack<TransportOperation>>>(fieldExp, targetExp)
.Compile();
}

public static void Clear(this PendingTransportOperations operations)
{
var collection = getter(operations);
collection.Clear();
}

static readonly Func<PendingTransportOperations, ConcurrentStack<TransportOperation>> getter;
}
}

0 comments on commit fbf954a

Please sign in to comment.