From 13a16e41155f202f2a79c6b0d14c2a1b4d589cdb Mon Sep 17 00:00:00 2001 From: Paul Reardon Date: Mon, 10 Jul 2023 22:33:21 +0100 Subject: [PATCH] Add extra logic for the case of no available sessions --- .../AzureServiceBusConsumer.cs | 17 ++++++++--- .../ServiceBusReceiverProvider.cs | 30 +++++++++++++++---- 2 files changed, 37 insertions(+), 10 deletions(-) diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusConsumer.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusConsumer.cs index 92542136c9..facc63ec83 100644 --- a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusConsumer.cs +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusConsumer.cs @@ -79,13 +79,21 @@ public Message[] Receive(int timeoutInMilliseconds) try { - if(_subscriptionConfiguration.RequireSession) + if (_subscriptionConfiguration.RequireSession) + { GetMessageReceiverProvider(); + if (_serviceBusReceiver == null) + { + s_logger.LogInformation("Message Gateway: Could not get a lock on a session for {TopicName}", + _topicName); + return messagesToReturn.ToArray(); + } + } messages = _serviceBusReceiver.Receive(_batchSize, TimeSpan.FromMilliseconds(timeoutInMilliseconds)).GetAwaiter().GetResult(); } catch (Exception e) { - if (_serviceBusReceiver.IsClosedOrClosing) + if (_serviceBusReceiver is {IsClosedOrClosing: true}) { s_logger.LogDebug("Message Receiver is closing..."); var message = new Message(new MessageHeader(Guid.NewGuid(), _topicName, MessageType.MT_QUIT), new MessageBody(string.Empty)); @@ -96,7 +104,8 @@ public Message[] Receive(int timeoutInMilliseconds) s_logger.LogError(e, "Failing to receive messages."); //The connection to Azure Service bus may have failed so we re-establish the connection. - GetMessageReceiverProvider(); + if(!_subscriptionConfiguration.RequireSession) + GetMessageReceiverProvider(); throw new ChannelFailureException("Failing to receive messages.", e); } @@ -236,7 +245,7 @@ public void Purge() public void Dispose() { s_logger.LogInformation("Disposing the consumer..."); - _serviceBusReceiver.Close(); + _serviceBusReceiver?.Close(); s_logger.LogInformation("Consumer disposed."); } diff --git a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/ServiceBusReceiverProvider.cs b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/ServiceBusReceiverProvider.cs index c4c58dd7b0..8f79a77a9e 100644 --- a/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/ServiceBusReceiverProvider.cs +++ b/src/Paramore.Brighter.MessagingGateway.AzureServiceBus/AzureServiceBusWrappers/ServiceBusReceiverProvider.cs @@ -1,4 +1,5 @@ -using Azure.Messaging.ServiceBus; +using System; +using Azure.Messaging.ServiceBus; using Paramore.Brighter.MessagingGateway.AzureServiceBus.ClientProvider; namespace Paramore.Brighter.MessagingGateway.AzureServiceBus.AzureServiceBusWrappers @@ -14,12 +15,29 @@ public ServiceBusReceiverProvider(IServiceBusClientProvider clientProvider) public IServiceBusReceiverWrapper Get(string topicName, string subscriptionName, ServiceBusReceiveMode receiveMode, bool sessionEnabled) { - return sessionEnabled ? - new ServiceBusReceiverWrapper(_client.AcceptNextSessionAsync(topicName, subscriptionName, - new ServiceBusSessionReceiverOptions(){ ReceiveMode = receiveMode }).GetAwaiter().GetResult()) : - new ServiceBusReceiverWrapper(_client.CreateReceiver(topicName, subscriptionName, + if (sessionEnabled) + { + try + { + return new ServiceBusReceiverWrapper(_client.AcceptNextSessionAsync(topicName, subscriptionName, + new ServiceBusSessionReceiverOptions() {ReceiveMode = receiveMode}).GetAwaiter().GetResult()); + } + catch (ServiceBusException e) + { + if (e.Reason == ServiceBusFailureReason.ServiceTimeout) + { + //No session available + return null; + } + + throw; + } + } + else + { + return new ServiceBusReceiverWrapper(_client.CreateReceiver(topicName, subscriptionName, new ServiceBusReceiverOptions { ReceiveMode = receiveMode, })); - + } } } }