Skip to content

Commit

Permalink
Add extra logic for the case of no available sessions (#2731)
Browse files Browse the repository at this point in the history
  • Loading branch information
preardon authored Jul 11, 2023
1 parent dad67cc commit 661b40e
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,21 @@ public Message[] Receive(int timeoutInMilliseconds)

try
{
if(_subscriptionConfiguration.RequireSession)
if (_subscriptionConfiguration.RequireSession)
{
GetMessageReceiverProvider();
if (_serviceBusReceiver == null)
{
s_logger.LogInformation("Message Gateway: Could not get a lock on a session for {TopicName}",
_topicName);
return messagesToReturn.ToArray();
}
}
messages = _serviceBusReceiver.Receive(_batchSize, TimeSpan.FromMilliseconds(timeoutInMilliseconds)).GetAwaiter().GetResult();
}
catch (Exception e)
{
if (_serviceBusReceiver.IsClosedOrClosing)
if (_serviceBusReceiver is {IsClosedOrClosing: true})
{
s_logger.LogDebug("Message Receiver is closing...");
var message = new Message(new MessageHeader(Guid.NewGuid(), _topicName, MessageType.MT_QUIT), new MessageBody(string.Empty));
Expand All @@ -96,7 +104,8 @@ public Message[] Receive(int timeoutInMilliseconds)
s_logger.LogError(e, "Failing to receive messages.");

//The connection to Azure Service bus may have failed so we re-establish the connection.
GetMessageReceiverProvider();
if(!_subscriptionConfiguration.RequireSession)
GetMessageReceiverProvider();

throw new ChannelFailureException("Failing to receive messages.", e);
}
Expand Down Expand Up @@ -236,7 +245,7 @@ public void Purge()
public void Dispose()
{
s_logger.LogInformation("Disposing the consumer...");
_serviceBusReceiver.Close();
_serviceBusReceiver?.Close();
s_logger.LogInformation("Consumer disposed.");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Azure.Messaging.ServiceBus;
using System;
using Azure.Messaging.ServiceBus;
using Paramore.Brighter.MessagingGateway.AzureServiceBus.ClientProvider;

namespace Paramore.Brighter.MessagingGateway.AzureServiceBus.AzureServiceBusWrappers
Expand All @@ -14,12 +15,29 @@ public ServiceBusReceiverProvider(IServiceBusClientProvider clientProvider)

public IServiceBusReceiverWrapper Get(string topicName, string subscriptionName, ServiceBusReceiveMode receiveMode, bool sessionEnabled)
{
return sessionEnabled ?
new ServiceBusReceiverWrapper(_client.AcceptNextSessionAsync(topicName, subscriptionName,
new ServiceBusSessionReceiverOptions(){ ReceiveMode = receiveMode }).GetAwaiter().GetResult()) :
new ServiceBusReceiverWrapper(_client.CreateReceiver(topicName, subscriptionName,
if (sessionEnabled)
{
try
{
return new ServiceBusReceiverWrapper(_client.AcceptNextSessionAsync(topicName, subscriptionName,
new ServiceBusSessionReceiverOptions() {ReceiveMode = receiveMode}).GetAwaiter().GetResult());
}
catch (ServiceBusException e)
{
if (e.Reason == ServiceBusFailureReason.ServiceTimeout)
{
//No session available
return null;
}

throw;
}
}
else
{
return new ServiceBusReceiverWrapper(_client.CreateReceiver(topicName, subscriptionName,
new ServiceBusReceiverOptions { ReceiveMode = receiveMode, }));

}
}
}
}

0 comments on commit 661b40e

Please sign in to comment.