From e230491256084de5fcf58bea93743b9561d1714b Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Thu, 15 Apr 2021 17:49:28 +0200 Subject: [PATCH 1/3] No longer support disposing multiple lock tokens since this was never used internally --- .../src/Amqp/AmqpReceiver.cs | 79 +++++++++---------- 1 file changed, 39 insertions(+), 40 deletions(-) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs index 7859db25d9cbc..df800a9fc92fc 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs @@ -2,9 +2,11 @@ // Licensed under the MIT License. using System; +using System.Buffers; using System.Collections.Generic; using System.Linq; using System.Runtime.ExceptionServices; +using System.Runtime.InteropServices; using System.Threading; using System.Threading.Tasks; using System.Transactions; @@ -72,6 +74,8 @@ internal class AmqpReceiver : TransportReceiver private readonly FaultTolerantAmqpObject _receiveLink; private readonly FaultTolerantAmqpObject _managementLink; + private const int SizeOfGuidInBytes = 16; + /// /// Gets the sequence number of the last peeked message. /// @@ -417,34 +421,39 @@ private async Task CompleteInternalAsync( TimeSpan timeout) { Guid lockTokenGuid = new Guid(lockToken); - var lockTokenGuids = new[] { lockTokenGuid }; if (_requestResponseLockedMessages.Contains(lockTokenGuid)) { await DisposeMessageRequestResponseAsync( - lockTokenGuids, + lockTokenGuid, timeout, DispositionStatus.Completed, SessionId).ConfigureAwait(false); return; } - await DisposeMessagesAsync(lockTokenGuids, AmqpConstants.AcceptedOutcome, timeout).ConfigureAwait(false); + await DisposeMessagesAsync(lockTokenGuid, AmqpConstants.AcceptedOutcome, timeout).ConfigureAwait(false); } /// /// Completes a series of using a list of lock tokens. This will delete the message from the service. /// /// - /// An containing the lock tokens of the corresponding messages to complete. + /// The lock token of the corresponding message to complete. /// /// private async Task DisposeMessagesAsync( - IEnumerable lockTokens, + Guid lockToken, Outcome outcome, TimeSpan timeout) { ThrowIfSessionLockLost(); - List> deliveryTags = ConvertLockTokensToDeliveryTags(lockTokens); + byte[] bufferForLockToken = ArrayPool.Shared.Rent(SizeOfGuidInBytes); + if (!MemoryMarshal.TryWrite(bufferForLockToken, ref lockToken)) + { + lockToken.ToByteArray().AsSpan().CopyTo(bufferForLockToken); + } + + ArraySegment deliveryTag = new ArraySegment(bufferForLockToken, 0, SizeOfGuidInBytes); ReceivingAmqpLink receiveLink = null; try { @@ -463,27 +472,21 @@ private async Task DisposeMessagesAsync( receiveLink = await _receiveLink.GetOrCreateAsync(timeout).ConfigureAwait(false); } - var disposeMessageTasks = new Task[deliveryTags.Count]; - var i = 0; - foreach (ArraySegment deliveryTag in deliveryTags) - { - disposeMessageTasks[i++] = receiveLink.DisposeMessageAsync(deliveryTag, transactionId, outcome, true, timeout); - } - - Outcome[] outcomes = await Task.WhenAll(disposeMessageTasks).ConfigureAwait(false); + Outcome outcomeResult = await receiveLink + .DisposeMessageAsync(deliveryTag, transactionId, outcome, true, timeout).ConfigureAwait(false); Error error = null; - foreach (Outcome item in outcomes) + Outcome disposedOutcome = + outcomeResult.DescriptorCode == Rejected.Code && ((error = ((Rejected)outcomeResult).Error) != null) + ? outcomeResult + : null; + if (disposedOutcome != null) { - Outcome disposedOutcome = item.DescriptorCode == Rejected.Code && ((error = ((Rejected)item).Error) != null) ? item : null; - if (disposedOutcome != null) + if (error.Condition.Equals(AmqpErrorCode.NotFound)) { - if (error.Condition.Equals(AmqpErrorCode.NotFound)) - { - ThrowLockLostException(); - } - - throw error.ToMessagingContractException(); + ThrowLockLostException(); } + + throw error.ToMessagingContractException(); } } catch (Exception exception) @@ -503,6 +506,10 @@ private async Task DisposeMessagesAsync( throw; } + finally + { + ArrayPool.Shared.Return(bufferForLockToken); + } } private void ThrowLockLostException() @@ -564,17 +571,16 @@ private Task DeferInternalAsync( IDictionary propertiesToModify = null) { Guid lockTokenGuid = new Guid(lockToken); - var lockTokenGuids = new[] { lockTokenGuid }; if (_requestResponseLockedMessages.Contains(lockTokenGuid)) { return DisposeMessageRequestResponseAsync( - lockTokenGuids, + lockTokenGuid, timeout, DispositionStatus.Defered, SessionId, propertiesToModify); } - return DisposeMessagesAsync(lockTokenGuids, GetDeferOutcome(propertiesToModify), timeout); + return DisposeMessagesAsync(lockTokenGuid, GetDeferOutcome(propertiesToModify), timeout); } /// @@ -622,17 +628,16 @@ private Task AbandonInternalAsync( IDictionary propertiesToModify = null) { Guid lockTokenGuid = new Guid(lockToken); - var lockTokenGuids = new[] { lockTokenGuid }; if (_requestResponseLockedMessages.Contains(lockTokenGuid)) { return DisposeMessageRequestResponseAsync( - lockTokenGuids, + lockTokenGuid, timeout, DispositionStatus.Abandoned, SessionId, propertiesToModify); } - return DisposeMessagesAsync(lockTokenGuids, GetAbandonOutcome(propertiesToModify), timeout); + return DisposeMessagesAsync(lockTokenGuid, GetAbandonOutcome(propertiesToModify), timeout); } /// @@ -695,11 +700,10 @@ internal virtual Task DeadLetterInternalAsync( Argument.AssertNotTooLong(deadLetterErrorDescription, Constants.MaxDeadLetterReasonLength, nameof(deadLetterErrorDescription)); Guid lockTokenGuid = new Guid(lockToken); - var lockTokenGuids = new[] { lockTokenGuid }; if (_requestResponseLockedMessages.Contains(lockTokenGuid)) { return DisposeMessageRequestResponseAsync( - lockTokenGuids, + lockTokenGuid, timeout, DispositionStatus.Suspended, SessionId, @@ -709,7 +713,7 @@ internal virtual Task DeadLetterInternalAsync( } return DisposeMessagesAsync( - lockTokenGuids, + lockTokenGuid, GetRejectedOutcome(propertiesToModify, deadLetterReason, deadLetterErrorDescription), timeout); } @@ -756,7 +760,7 @@ private static Rejected GetRejectedOutcome( /// Updates the disposition status of deferred messages. /// /// - /// Message lock tokens to update disposition status. + /// Message lock token to update disposition status. /// /// /// @@ -764,7 +768,7 @@ private static Rejected GetRejectedOutcome( /// /// private async Task DisposeMessageRequestResponseAsync( - Guid[] lockTokens, + Guid lockToken, TimeSpan timeout, DispositionStatus dispositionStatus, string sessionId = null, @@ -780,7 +784,7 @@ private async Task DisposeMessageRequestResponseAsync( amqpRequestMessage.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName] = receiveLink.Name; } - amqpRequestMessage.Map[ManagementConstants.Properties.LockTokens] = lockTokens; + amqpRequestMessage.Map[ManagementConstants.Properties.LockTokens] = new Guid[]{ lockToken }; amqpRequestMessage.Map[ManagementConstants.Properties.DispositionStatus] = dispositionStatus.ToString().ToLowerInvariant(); if (deadLetterReason != null) @@ -836,11 +840,6 @@ private static Outcome GetAbandonOutcome(IDictionary propertiesT private static Outcome GetDeferOutcome(IDictionary propertiesToModify) => GetModifiedOutcome(propertiesToModify, true); - private static List> ConvertLockTokensToDeliveryTags(IEnumerable lockTokens) - { - return lockTokens.Select(lockToken => new ArraySegment(lockToken.ToByteArray())).ToList(); - } - private static Outcome GetModifiedOutcome( IDictionary propertiesToModify, bool undeliverableHere) From 5ed926097435737b1e2ca30d5737d966139fa52c Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Thu, 15 Apr 2021 22:41:24 +0200 Subject: [PATCH 2/3] Rename method --- .../src/Amqp/AmqpReceiver.cs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs index df800a9fc92fc..ed14aa4f27bed 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs @@ -430,7 +430,7 @@ await DisposeMessageRequestResponseAsync( SessionId).ConfigureAwait(false); return; } - await DisposeMessagesAsync(lockTokenGuid, AmqpConstants.AcceptedOutcome, timeout).ConfigureAwait(false); + await DisposeMessageAsync(lockTokenGuid, AmqpConstants.AcceptedOutcome, timeout).ConfigureAwait(false); } /// @@ -440,7 +440,7 @@ await DisposeMessageRequestResponseAsync( /// The lock token of the corresponding message to complete. /// /// - private async Task DisposeMessagesAsync( + private async Task DisposeMessageAsync( Guid lockToken, Outcome outcome, TimeSpan timeout) @@ -580,7 +580,7 @@ private Task DeferInternalAsync( SessionId, propertiesToModify); } - return DisposeMessagesAsync(lockTokenGuid, GetDeferOutcome(propertiesToModify), timeout); + return DisposeMessageAsync(lockTokenGuid, GetDeferOutcome(propertiesToModify), timeout); } /// @@ -637,7 +637,7 @@ private Task AbandonInternalAsync( SessionId, propertiesToModify); } - return DisposeMessagesAsync(lockTokenGuid, GetAbandonOutcome(propertiesToModify), timeout); + return DisposeMessageAsync(lockTokenGuid, GetAbandonOutcome(propertiesToModify), timeout); } /// @@ -712,7 +712,7 @@ internal virtual Task DeadLetterInternalAsync( deadLetterErrorDescription); } - return DisposeMessagesAsync( + return DisposeMessageAsync( lockTokenGuid, GetRejectedOutcome(propertiesToModify, deadLetterReason, deadLetterErrorDescription), timeout); From 0f0ec47baa87ae078ca6fe520f517a89a2cdce53 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Thu, 15 Apr 2021 22:41:40 +0200 Subject: [PATCH 3/3] Xml docs --- .../Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs index ed14aa4f27bed..bf075eae57ec7 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs @@ -411,7 +411,7 @@ await receiver.CompleteInternalAsync( cancellationToken).ConfigureAwait(false); /// - /// Completes a series of using a list of lock tokens. This will delete the message from the service. + /// Completes a using a lock token. This will delete the message from the service. /// /// /// The lockToken of the to complete. @@ -434,10 +434,10 @@ await DisposeMessageRequestResponseAsync( } /// - /// Completes a series of using a list of lock tokens. This will delete the message from the service. + /// Settles a using a lock token. /// /// - /// The lock token of the corresponding message to complete. + /// The lockToken of the to complete. /// /// private async Task DisposeMessageAsync(