Skip to content

Commit

Permalink
Add ASB Sessions (#2717)
Browse files Browse the repository at this point in the history
  • Loading branch information
preardon authored Jun 28, 2023
1 parent 677d08f commit 7ef1573
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ internal static class ASBConstants
public const string MessageTypeHeaderBagKey = "MessageType";
public const string HandledCountHeaderBagKey = "HandledCount";
public const string ReplyToHeaderBagKey = "ReplyTo";
public const string SessionIdKey = "SessionId";

public static readonly string[] ReservedHeaders =
new[] {LockTokenHeaderBagKey, MessageTypeHeaderBagKey, HandledCountHeaderBagKey, ReplyToHeaderBagKey };
new[] {LockTokenHeaderBagKey, MessageTypeHeaderBagKey, HandledCountHeaderBagKey, ReplyToHeaderBagKey, SessionIdKey };


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public AzureServiceBusConsumer(string topicName, string subscriptionName,
_subscriptionConfiguration = subscriptionConfiguration ?? new AzureServiceBusSubscriptionConfiguration();
_receiveMode = receiveMode;

GetMessageReceiverProvider();
if(!_subscriptionConfiguration.RequireSession)
GetMessageReceiverProvider();
}

/// <summary>
Expand All @@ -78,6 +79,8 @@ public Message[] Receive(int timeoutInMilliseconds)

try
{
if(_subscriptionConfiguration.RequireSession)
GetMessageReceiverProvider();
messages = _serviceBusReceiver.Receive(_batchSize, TimeSpan.FromMilliseconds(timeoutInMilliseconds)).GetAwaiter().GetResult();
}
catch (Exception e)
Expand Down Expand Up @@ -152,6 +155,8 @@ public void Acknowledge(Message message)
lockToken);

_serviceBusReceiver.Complete(lockToken).Wait();
if(_subscriptionConfiguration.RequireSession)
_serviceBusReceiver.Close();
}
catch (AggregateException ex)
{
Expand Down Expand Up @@ -198,6 +203,8 @@ public void Reject(Message message)
s_logger.LogDebug("Dead Lettering Message with Id {Id} Lock Token : {LockToken}", message.Id, lockToken);

_serviceBusReceiver.DeadLetter(lockToken).Wait();
if(_subscriptionConfiguration.RequireSession)
_serviceBusReceiver.Close();
}
catch (Exception ex)
{
Expand Down Expand Up @@ -240,7 +247,7 @@ private void GetMessageReceiverProvider()
_topicName, _subscriptionName, _receiveMode);
try
{
_serviceBusReceiver = _serviceBusReceiverProvider.Get(_topicName, _subscriptionName, _receiveMode);
_serviceBusReceiver = _serviceBusReceiverProvider.Get(_topicName, _subscriptionName, _receiveMode, _subscriptionConfiguration.RequireSession);
}
catch (Exception e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ private ServiceBusMessage ConvertToServiceBusMessage(Message message)
azureServiceBusMessage.ApplicationProperties.Add(ASBConstants.MessageTypeHeaderBagKey, message.Header.MessageType.ToString());
azureServiceBusMessage.ApplicationProperties.Add(ASBConstants.HandledCountHeaderBagKey, message.Header.HandledCount);
azureServiceBusMessage.ApplicationProperties.Add(ASBConstants.ReplyToHeaderBagKey, message.Header.ReplyTo);

foreach (var header in message.Header.Bag.Where(h => !ASBConstants.ReservedHeaders.Contains(h.Key)))
{
azureServiceBusMessage.ApplicationProperties.Add(header.Key, header.Value);
Expand All @@ -250,6 +250,8 @@ private ServiceBusMessage ConvertToServiceBusMessage(Message message)
azureServiceBusMessage.CorrelationId = message.Header.CorrelationId.ToString();
azureServiceBusMessage.ContentType = message.Header.ContentType;
azureServiceBusMessage.MessageId = message.Header.Id.ToString();
if (message.Header.Bag.ContainsKey(ASBConstants.SessionIdKey))
azureServiceBusMessage.SessionId = message.Header.Bag[ASBConstants.SessionIdKey].ToString();

return azureServiceBusMessage;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ public class AzureServiceBusSubscriptionConfiguration
/// Default is TimeSpan.MaxValue.
/// </summary>
public TimeSpan QueueIdleBeforeDelete { get; set; } = TimeSpan.MaxValue;


/// <summary>
/// Subscription is Session Enabled
/// </summary>
public bool RequireSession { get; set; } = false;

/// <summary>
/// A Sql Filter to apply to the subscription
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ private async Task CreateSubscriptionAsync(string topicName, string subscription
DeadLetteringOnMessageExpiration = subscriptionConfiguration.DeadLetteringOnMessageExpiration,
LockDuration = subscriptionConfiguration.LockDuration,
DefaultMessageTimeToLive = subscriptionConfiguration.DefaultMessageTimeToLive,
AutoDeleteOnIdle = subscriptionConfiguration.QueueIdleBeforeDelete
AutoDeleteOnIdle = subscriptionConfiguration.QueueIdleBeforeDelete,
RequiresSession = subscriptionConfiguration.RequireSession
};

var ruleOptions = string.IsNullOrEmpty(subscriptionConfiguration.SqlFilter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ public interface IServiceBusReceiverProvider
/// <param name="topicName">The name of the Topic.</param>
/// <param name="subscriptionName">The name of the Subscription on the Topic.</param>
/// <param name="receiveMode">The Receive Mode.</param>
/// <param name="sessionEnabled">Use Sessions for Processing</param>
/// <returns>A ServiceBusReceiverWrapper.</returns>
IServiceBusReceiverWrapper Get(string topicName, string subscriptionName, ServiceBusReceiveMode receiveMode);
IServiceBusReceiverWrapper Get(string topicName, string subscriptionName, ServiceBusReceiveMode receiveMode, bool sessionEnabled);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ public ServiceBusReceiverProvider(IServiceBusClientProvider clientProvider)
_client = clientProvider.GetServiceBusClient();
}

public IServiceBusReceiverWrapper Get(string topicName, string subscriptionName, ServiceBusReceiveMode receiveMode)
public IServiceBusReceiverWrapper Get(string topicName, string subscriptionName, ServiceBusReceiveMode receiveMode, bool sessionEnabled)
{
var messageReceiver = _client.CreateReceiver(topicName, subscriptionName,
new ServiceBusReceiverOptions {ReceiveMode = receiveMode,});
return new ServiceBusReceiverWrapper(messageReceiver);
return sessionEnabled ?
new ServiceBusReceiverWrapper(_client.AcceptNextSessionAsync(topicName, subscriptionName,
new ServiceBusSessionReceiverOptions(){ ReceiveMode = receiveMode }).GetAwaiter().GetResult()) :
new ServiceBusReceiverWrapper(_client.CreateReceiver(topicName, subscriptionName,
new ServiceBusReceiverOptions { ReceiveMode = receiveMode, }));

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public AzureServiceBusConsumerTests()

_messageReceiver = new Mock<IServiceBusReceiverWrapper>();

_mockMessageReceiver.Setup(x => x.Get("topic", "subscription", ServiceBusReceiveMode.ReceiveAndDelete)).Returns(_messageReceiver.Object);
_mockMessageReceiver.Setup(x => x.Get("topic", "subscription", ServiceBusReceiveMode.ReceiveAndDelete, false)).Returns(_messageReceiver.Object);

_azureServiceBusConsumer = new AzureServiceBusConsumer("topic", "subscription", _mockMessageProducer.Object,
_nameSpaceManagerWrapper.Object, _mockMessageReceiver.Object, makeChannels: OnMissingChannel.Create, subscriptionConfiguration: _subConfig);
Expand Down Expand Up @@ -282,7 +282,7 @@ public void When_there_is_an_error_talking_to_servicebus_when_receiving_then_a_C
_messageReceiver.Setup(f => f.Receive(It.IsAny<int>(), It.IsAny<TimeSpan>())).Throws(new Exception());

Assert.Throws<ChannelFailureException>(() => _azureServiceBusConsumer.Receive(400));
_mockMessageReceiver.Verify(x => x.Get(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<ServiceBusReceiveMode>()), Times.Exactly(2));
_mockMessageReceiver.Verify(x => x.Get(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<ServiceBusReceiveMode>(), false), Times.Exactly(2));
}

[Theory]
Expand Down Expand Up @@ -383,7 +383,7 @@ public void When_ackOnRead_is_Set_and_ack_fails_then_exception_is_thrown()
var message1 = new Mock<IBrokeredMessageWrapper>();
var mockMessageReceiver = new Mock<IServiceBusReceiverProvider>();

mockMessageReceiver.Setup(x => x.Get("topic", "subscription", ServiceBusReceiveMode.PeekLock)).Returns(_messageReceiver.Object);
mockMessageReceiver.Setup(x => x.Get("topic", "subscription", ServiceBusReceiveMode.PeekLock, false)).Returns(_messageReceiver.Object);

var lockToken = Guid.NewGuid().ToString();

Expand Down Expand Up @@ -413,7 +413,7 @@ public void When_ackOnRead_is_Set_and_DeadLetter_fails_then_exception_is_thrown(
var message1 = new Mock<IBrokeredMessageWrapper>();
var mockMessageReceiver = new Mock<IServiceBusReceiverProvider>();

mockMessageReceiver.Setup(x => x.Get("topic", "subscription", ServiceBusReceiveMode.PeekLock)).Returns(_messageReceiver.Object);
mockMessageReceiver.Setup(x => x.Get("topic", "subscription", ServiceBusReceiveMode.PeekLock, false)).Returns(_messageReceiver.Object);

var lockToken = Guid.NewGuid().ToString();

Expand Down

0 comments on commit 7ef1573

Please sign in to comment.