diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs index 5d0b2d5f89457..d9f1c8af8861b 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs @@ -16,7 +16,6 @@ using Microsoft.Azure.Amqp; using Microsoft.Azure.Amqp.Encoding; using Microsoft.Azure.Amqp.Framing; -using Microsoft.Azure.Amqp.Transaction; namespace Azure.Messaging.ServiceBus.Amqp { @@ -275,7 +274,6 @@ private async Task> ReceiveMessagesAsyn CancellationToken cancellationToken) { var link = default(ReceivingAmqpLink); - var amqpMessages = default(IEnumerable); var receivedMessages = new List(); ThrowIfSessionLockLost(); @@ -288,33 +286,41 @@ private async Task> ReceiveMessagesAsyn } cancellationToken.ThrowIfCancellationRequested(); - var messagesReceived = await Task.Factory.FromAsync + var messagesReceived = await Task.Factory + .FromAsync<(ReceivingAmqpLink, int, TimeSpan?, TimeSpan), IEnumerable> ( - (callback, state) => link.BeginReceiveRemoteMessages( - maxMessages, - TimeSpan.FromMilliseconds(20), - maxWaitTime ?? timeout, - callback, - state), - (asyncResult) => link.EndReceiveMessages(asyncResult, out amqpMessages), - TaskCreationOptions.RunContinuationsAsynchronously - ).ConfigureAwait(false); + static (arguments, callback, state) => + { + var (link, maxMessages, maxWaitTime, timeout) = arguments; + return link.BeginReceiveRemoteMessages( + maxMessages, + TimeSpan.FromMilliseconds(20), + maxWaitTime ?? timeout, + callback, + link); + }, + static asyncResult => + { + var link = (ReceivingAmqpLink)asyncResult.AsyncState; + bool received = link.EndReceiveMessages(asyncResult, out IEnumerable amqpMessages); + return received ? amqpMessages : Enumerable.Empty(); + }, + (link, maxMessages, maxWaitTime, timeout), + default + ).ConfigureAwait(false); cancellationToken.ThrowIfCancellationRequested(); // If event messages were received, then package them for consumption and // return them. - if ((messagesReceived) && (amqpMessages != null)) + foreach (AmqpMessage message in messagesReceived) { - foreach (AmqpMessage message in amqpMessages) + if (_receiveMode == ServiceBusReceiveMode.ReceiveAndDelete) { - if (_receiveMode == ServiceBusReceiveMode.ReceiveAndDelete) - { - link.DisposeDelivery(message, true, AmqpConstants.AcceptedOutcome); - } - receivedMessages.Add(AmqpMessageConverter.AmqpMessageToSBMessage(message)); - message.Dispose(); + link.DisposeDelivery(message, true, AmqpConstants.AcceptedOutcome); } + receivedMessages.Add(AmqpMessageConverter.AmqpMessageToSBMessage(message)); + message.Dispose(); } return receivedMessages;