diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs index 7859db25d9cbc..bf075eae57ec7 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs @@ -2,9 +2,11 @@ // Licensed under the MIT License. using System; +using System.Buffers; using System.Collections.Generic; using System.Linq; using System.Runtime.ExceptionServices; +using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; using System.Transactions; @@ -72,6 +74,8 @@ internal class AmqpReceiver : TransportReceiver private readonly FaultTolerantAmqpObject _receiveLink; private readonly FaultTolerantAmqpObject _managementLink; + private const int SizeOfGuidInBytes = 16; + /// /// Gets the sequence number of the last peeked message. /// @@ -407,7 +411,7 @@ await receiver.CompleteInternalAsync( cancellationToken).ConfigureAwait(false); /// - /// Completes a series of using a list of lock tokens. This will delete the message from the service. + /// Completes a using a lock token. This will delete the message from the service. /// /// /// The lockToken of the to complete. @@ -417,34 +421,39 @@ private async Task CompleteInternalAsync( TimeSpan timeout) { Guid lockTokenGuid = new Guid(lockToken); - var lockTokenGuids = new[] { lockTokenGuid }; if (_requestResponseLockedMessages.Contains(lockTokenGuid)) { await DisposeMessageRequestResponseAsync( - lockTokenGuids, + lockTokenGuid, timeout, DispositionStatus.Completed, SessionId).ConfigureAwait(false); return; } - await DisposeMessagesAsync(lockTokenGuids, AmqpConstants.AcceptedOutcome, timeout).ConfigureAwait(false); + await DisposeMessageAsync(lockTokenGuid, AmqpConstants.AcceptedOutcome, timeout).ConfigureAwait(false); } /// - /// Completes a series of using a list of lock tokens. This will delete the message from the service. + /// Settles a using a lock token. /// /// - /// An containing the lock tokens of the corresponding messages to complete. + /// The lockToken of the to complete. /// /// - private async Task DisposeMessagesAsync( - IEnumerable lockTokens, + private async Task DisposeMessageAsync( + Guid lockToken, Outcome outcome, TimeSpan timeout) { ThrowIfSessionLockLost(); - List> deliveryTags = ConvertLockTokensToDeliveryTags(lockTokens); + byte[] bufferForLockToken = ArrayPool.Shared.Rent(SizeOfGuidInBytes); + if (!MemoryMarshal.TryWrite(bufferForLockToken, ref lockToken)) + { + lockToken.ToByteArray().AsSpan().CopyTo(bufferForLockToken); + } + + ArraySegment deliveryTag = new ArraySegment(bufferForLockToken, 0, SizeOfGuidInBytes); ReceivingAmqpLink receiveLink = null; try { @@ -463,27 +472,21 @@ private async Task DisposeMessagesAsync( receiveLink = await _receiveLink.GetOrCreateAsync(timeout).ConfigureAwait(false); } - var disposeMessageTasks = new Task[deliveryTags.Count]; - var i = 0; - foreach (ArraySegment deliveryTag in deliveryTags) - { - disposeMessageTasks[i++] = receiveLink.DisposeMessageAsync(deliveryTag, transactionId, outcome, true, timeout); - } - - Outcome[] outcomes = await Task.WhenAll(disposeMessageTasks).ConfigureAwait(false); + Outcome outcomeResult = await receiveLink + .DisposeMessageAsync(deliveryTag, transactionId, outcome, true, timeout).ConfigureAwait(false); Error error = null; - foreach (Outcome item in outcomes) + Outcome disposedOutcome = + outcomeResult.DescriptorCode == Rejected.Code && ((error = ((Rejected)outcomeResult).Error) != null) + ? outcomeResult + : null; + if (disposedOutcome != null) { - Outcome disposedOutcome = item.DescriptorCode == Rejected.Code && ((error = ((Rejected)item).Error) != null) ? item : null; - if (disposedOutcome != null) + if (error.Condition.Equals(AmqpErrorCode.NotFound)) { - if (error.Condition.Equals(AmqpErrorCode.NotFound)) - { - ThrowLockLostException(); - } - - throw error.ToMessagingContractException(); + ThrowLockLostException(); } + + throw error.ToMessagingContractException(); } } catch (Exception exception) @@ -503,6 +506,10 @@ private async Task DisposeMessagesAsync( throw; } + finally + { + ArrayPool.Shared.Return(bufferForLockToken); + } } private void ThrowLockLostException() @@ -564,17 +571,16 @@ private Task DeferInternalAsync( IDictionary propertiesToModify = null) { Guid lockTokenGuid = new Guid(lockToken); - var lockTokenGuids = new[] { lockTokenGuid }; if (_requestResponseLockedMessages.Contains(lockTokenGuid)) { return DisposeMessageRequestResponseAsync( - lockTokenGuids, + lockTokenGuid, timeout, DispositionStatus.Defered, SessionId, propertiesToModify); } - return DisposeMessagesAsync(lockTokenGuids, GetDeferOutcome(propertiesToModify), timeout); + return DisposeMessageAsync(lockTokenGuid, GetDeferOutcome(propertiesToModify), timeout); } /// @@ -622,17 +628,16 @@ private Task AbandonInternalAsync( IDictionary propertiesToModify = null) { Guid lockTokenGuid = new Guid(lockToken); - var lockTokenGuids = new[] { lockTokenGuid }; if (_requestResponseLockedMessages.Contains(lockTokenGuid)) { return DisposeMessageRequestResponseAsync( - lockTokenGuids, + lockTokenGuid, timeout, DispositionStatus.Abandoned, SessionId, propertiesToModify); } - return DisposeMessagesAsync(lockTokenGuids, GetAbandonOutcome(propertiesToModify), timeout); + return DisposeMessageAsync(lockTokenGuid, GetAbandonOutcome(propertiesToModify), timeout); } /// @@ -695,11 +700,10 @@ internal virtual Task DeadLetterInternalAsync( Argument.AssertNotTooLong(deadLetterErrorDescription, Constants.MaxDeadLetterReasonLength, nameof(deadLetterErrorDescription)); Guid lockTokenGuid = new Guid(lockToken); - var lockTokenGuids = new[] { lockTokenGuid }; if (_requestResponseLockedMessages.Contains(lockTokenGuid)) { return DisposeMessageRequestResponseAsync( - lockTokenGuids, + lockTokenGuid, timeout, DispositionStatus.Suspended, SessionId, @@ -708,8 +712,8 @@ internal virtual Task DeadLetterInternalAsync( deadLetterErrorDescription); } - return DisposeMessagesAsync( - lockTokenGuids, + return DisposeMessageAsync( + lockTokenGuid, GetRejectedOutcome(propertiesToModify, deadLetterReason, deadLetterErrorDescription), timeout); } @@ -756,7 +760,7 @@ private static Rejected GetRejectedOutcome( /// Updates the disposition status of deferred messages. /// /// - /// Message lock tokens to update disposition status. + /// Message lock token to update disposition status. /// /// /// @@ -764,7 +768,7 @@ private static Rejected GetRejectedOutcome( /// /// private async Task DisposeMessageRequestResponseAsync( - Guid[] lockTokens, + Guid lockToken, TimeSpan timeout, DispositionStatus dispositionStatus, string sessionId = null, @@ -780,7 +784,7 @@ private async Task DisposeMessageRequestResponseAsync( amqpRequestMessage.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName] = receiveLink.Name; } - amqpRequestMessage.Map[ManagementConstants.Properties.LockTokens] = lockTokens; + amqpRequestMessage.Map[ManagementConstants.Properties.LockTokens] = new Guid[]{ lockToken }; amqpRequestMessage.Map[ManagementConstants.Properties.DispositionStatus] = dispositionStatus.ToString().ToLowerInvariant(); if (deadLetterReason != null) @@ -836,11 +840,6 @@ private static Outcome GetAbandonOutcome(IDictionary propertiesT private static Outcome GetDeferOutcome(IDictionary propertiesToModify) => GetModifiedOutcome(propertiesToModify, true); - private static List> ConvertLockTokensToDeliveryTags(IEnumerable lockTokens) - { - return lockTokens.Select(lockToken => new ArraySegment(lockToken.ToByteArray())).ToList(); - } - private static Outcome GetModifiedOutcome( IDictionary propertiesToModify, bool undeliverableHere)