Skip to content

Commit

Permalink
Remove closure allocation on ReceiveMessagesAsyncInternal
Browse files Browse the repository at this point in the history
  • Loading branch information
danielmarbach committed Mar 27, 2021
1 parent fbcb81b commit eacc71a
Showing 1 changed file with 26 additions and 20 deletions.
46 changes: 26 additions & 20 deletions sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
using Microsoft.Azure.Amqp;
using Microsoft.Azure.Amqp.Encoding;
using Microsoft.Azure.Amqp.Framing;
using Microsoft.Azure.Amqp.Transaction;

namespace Azure.Messaging.ServiceBus.Amqp
{
Expand Down Expand Up @@ -275,7 +274,6 @@ private async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveMessagesAsyn
CancellationToken cancellationToken)
{
var link = default(ReceivingAmqpLink);
var amqpMessages = default(IEnumerable<AmqpMessage>);
var receivedMessages = new List<ServiceBusReceivedMessage>();

ThrowIfSessionLockLost();
Expand All @@ -288,33 +286,41 @@ private async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveMessagesAsyn
}
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

var messagesReceived = await Task.Factory.FromAsync
var messagesReceived = await Task.Factory
.FromAsync<(ReceivingAmqpLink, int, TimeSpan?, TimeSpan), IEnumerable<AmqpMessage>>
(
(callback, state) => link.BeginReceiveRemoteMessages(
maxMessages,
TimeSpan.FromMilliseconds(20),
maxWaitTime ?? timeout,
callback,
state),
(asyncResult) => link.EndReceiveMessages(asyncResult, out amqpMessages),
TaskCreationOptions.RunContinuationsAsynchronously
).ConfigureAwait(false);
static (arguments, callback, state) =>
{
var (link, maxMessages, maxWaitTime, timeout) = arguments;
return link.BeginReceiveRemoteMessages(
maxMessages,
TimeSpan.FromMilliseconds(20),
maxWaitTime ?? timeout,
callback,
link);
},
static asyncResult =>
{
var link = (ReceivingAmqpLink)asyncResult.AsyncState;
bool received = link.EndReceiveMessages(asyncResult, out IEnumerable<AmqpMessage> amqpMessages);
return received ? amqpMessages : Enumerable.Empty<AmqpMessage>();
},
(link, maxMessages, maxWaitTime, timeout),
default
).ConfigureAwait(false);

cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
// If event messages were received, then package them for consumption and
// return them.

if ((messagesReceived) && (amqpMessages != null))
foreach (AmqpMessage message in messagesReceived)
{
foreach (AmqpMessage message in amqpMessages)
if (_receiveMode == ServiceBusReceiveMode.ReceiveAndDelete)
{
if (_receiveMode == ServiceBusReceiveMode.ReceiveAndDelete)
{
link.DisposeDelivery(message, true, AmqpConstants.AcceptedOutcome);
}
receivedMessages.Add(AmqpMessageConverter.AmqpMessageToSBMessage(message));
message.Dispose();
link.DisposeDelivery(message, true, AmqpConstants.AcceptedOutcome);
}
receivedMessages.Add(AmqpMessageConverter.AmqpMessageToSBMessage(message));
message.Dispose();
}

return receivedMessages;
Expand Down

0 comments on commit eacc71a

Please sign in to comment.