Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternative: Remove byte array allocations from AmqpReceiver DisposeMessagesAsync #20427

Merged
merged 3 commits into from
Apr 15, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 43 additions & 44 deletions sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
// Licensed under the MIT License.

using System;
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.ExceptionServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using System.Transactions;
Expand Down Expand Up @@ -72,6 +74,8 @@ internal class AmqpReceiver : TransportReceiver
private readonly FaultTolerantAmqpObject<ReceivingAmqpLink> _receiveLink;
private readonly FaultTolerantAmqpObject<RequestResponseAmqpLink> _managementLink;

private const int SizeOfGuidInBytes = 16;

/// <summary>
/// Gets the sequence number of the last peeked message.
/// </summary>
Expand Down Expand Up @@ -407,7 +411,7 @@ await receiver.CompleteInternalAsync(
cancellationToken).ConfigureAwait(false);

/// <summary>
/// Completes a series of <see cref="ServiceBusMessage"/> using a list of lock tokens. This will delete the message from the service.
/// Completes a <see cref="ServiceBusReceivedMessage"/> using a lock token. This will delete the message from the service.
/// </summary>
///
/// <param name="lockToken">The lockToken of the <see cref="ServiceBusReceivedMessage"/> to complete.</param>
Expand All @@ -417,34 +421,39 @@ private async Task CompleteInternalAsync(
TimeSpan timeout)
{
Guid lockTokenGuid = new Guid(lockToken);
var lockTokenGuids = new[] { lockTokenGuid };
if (_requestResponseLockedMessages.Contains(lockTokenGuid))
{
await DisposeMessageRequestResponseAsync(
lockTokenGuids,
lockTokenGuid,
timeout,
DispositionStatus.Completed,
SessionId).ConfigureAwait(false);
return;
}
await DisposeMessagesAsync(lockTokenGuids, AmqpConstants.AcceptedOutcome, timeout).ConfigureAwait(false);
await DisposeMessageAsync(lockTokenGuid, AmqpConstants.AcceptedOutcome, timeout).ConfigureAwait(false);
}

/// <summary>
/// Completes a series of <see cref="ServiceBusMessage"/> using a list of lock tokens. This will delete the message from the service.
/// Settles a <see cref="ServiceBusReceivedMessage"/> using a lock token.
/// </summary>
///
/// <param name="lockTokens">An <see cref="IEnumerable{T}"/> containing the lock tokens of the corresponding messages to complete.</param>
/// <param name="lockToken">The lockToken of the <see cref="ServiceBusReceivedMessage"/> to complete.</param>
/// <param name="outcome"></param>
/// <param name="timeout"></param>
private async Task DisposeMessagesAsync(
IEnumerable<Guid> lockTokens,
private async Task DisposeMessageAsync(
Guid lockToken,
Outcome outcome,
TimeSpan timeout)
{
ThrowIfSessionLockLost();
List<ArraySegment<byte>> deliveryTags = ConvertLockTokensToDeliveryTags(lockTokens);

byte[] bufferForLockToken = ArrayPool<byte>.Shared.Rent(SizeOfGuidInBytes);
if (!MemoryMarshal.TryWrite(bufferForLockToken, ref lockToken))
{
lockToken.ToByteArray().AsSpan().CopyTo(bufferForLockToken);
}

ArraySegment<byte> deliveryTag = new ArraySegment<byte>(bufferForLockToken, 0, SizeOfGuidInBytes);
ReceivingAmqpLink receiveLink = null;
try
{
Expand All @@ -463,27 +472,21 @@ private async Task DisposeMessagesAsync(
receiveLink = await _receiveLink.GetOrCreateAsync(timeout).ConfigureAwait(false);
}

var disposeMessageTasks = new Task<Outcome>[deliveryTags.Count];
var i = 0;
foreach (ArraySegment<byte> deliveryTag in deliveryTags)
{
disposeMessageTasks[i++] = receiveLink.DisposeMessageAsync(deliveryTag, transactionId, outcome, true, timeout);
}

Outcome[] outcomes = await Task.WhenAll(disposeMessageTasks).ConfigureAwait(false);
Outcome outcomeResult = await receiveLink
.DisposeMessageAsync(deliveryTag, transactionId, outcome, true, timeout).ConfigureAwait(false);
Error error = null;
foreach (Outcome item in outcomes)
Outcome disposedOutcome =
outcomeResult.DescriptorCode == Rejected.Code && ((error = ((Rejected)outcomeResult).Error) != null)
? outcomeResult
: null;
if (disposedOutcome != null)
{
Outcome disposedOutcome = item.DescriptorCode == Rejected.Code && ((error = ((Rejected)item).Error) != null) ? item : null;
if (disposedOutcome != null)
if (error.Condition.Equals(AmqpErrorCode.NotFound))
{
if (error.Condition.Equals(AmqpErrorCode.NotFound))
{
ThrowLockLostException();
}

throw error.ToMessagingContractException();
ThrowLockLostException();
}

throw error.ToMessagingContractException();
}
}
catch (Exception exception)
Expand All @@ -503,6 +506,10 @@ private async Task DisposeMessagesAsync(

throw;
}
finally
{
ArrayPool<byte>.Shared.Return(bufferForLockToken);
}
}

private void ThrowLockLostException()
Expand Down Expand Up @@ -564,17 +571,16 @@ private Task DeferInternalAsync(
IDictionary<string, object> propertiesToModify = null)
{
Guid lockTokenGuid = new Guid(lockToken);
var lockTokenGuids = new[] { lockTokenGuid };
if (_requestResponseLockedMessages.Contains(lockTokenGuid))
{
return DisposeMessageRequestResponseAsync(
lockTokenGuids,
lockTokenGuid,
timeout,
DispositionStatus.Defered,
SessionId,
propertiesToModify);
}
return DisposeMessagesAsync(lockTokenGuids, GetDeferOutcome(propertiesToModify), timeout);
return DisposeMessageAsync(lockTokenGuid, GetDeferOutcome(propertiesToModify), timeout);
}

/// <summary>
Expand Down Expand Up @@ -622,17 +628,16 @@ private Task AbandonInternalAsync(
IDictionary<string, object> propertiesToModify = null)
{
Guid lockTokenGuid = new Guid(lockToken);
var lockTokenGuids = new[] { lockTokenGuid };
if (_requestResponseLockedMessages.Contains(lockTokenGuid))
{
return DisposeMessageRequestResponseAsync(
lockTokenGuids,
lockTokenGuid,
timeout,
DispositionStatus.Abandoned,
SessionId,
propertiesToModify);
}
return DisposeMessagesAsync(lockTokenGuids, GetAbandonOutcome(propertiesToModify), timeout);
return DisposeMessageAsync(lockTokenGuid, GetAbandonOutcome(propertiesToModify), timeout);
}

/// <summary>
Expand Down Expand Up @@ -695,11 +700,10 @@ internal virtual Task DeadLetterInternalAsync(
Argument.AssertNotTooLong(deadLetterErrorDescription, Constants.MaxDeadLetterReasonLength, nameof(deadLetterErrorDescription));

Guid lockTokenGuid = new Guid(lockToken);
var lockTokenGuids = new[] { lockTokenGuid };
if (_requestResponseLockedMessages.Contains(lockTokenGuid))
{
return DisposeMessageRequestResponseAsync(
lockTokenGuids,
lockTokenGuid,
timeout,
DispositionStatus.Suspended,
SessionId,
Expand All @@ -708,8 +712,8 @@ internal virtual Task DeadLetterInternalAsync(
deadLetterErrorDescription);
}

return DisposeMessagesAsync(
lockTokenGuids,
return DisposeMessageAsync(
lockTokenGuid,
GetRejectedOutcome(propertiesToModify, deadLetterReason, deadLetterErrorDescription),
timeout);
}
Expand Down Expand Up @@ -756,15 +760,15 @@ private static Rejected GetRejectedOutcome(
/// Updates the disposition status of deferred messages.
/// </summary>
///
/// <param name="lockTokens">Message lock tokens to update disposition status.</param>
/// <param name="lockToken">Message lock token to update disposition status.</param>
/// <param name="timeout"></param>
/// <param name="dispositionStatus"></param>
/// <param name="sessionId"></param>
/// <param name="propertiesToModify"></param>
/// <param name="deadLetterReason"></param>
/// <param name="deadLetterDescription"></param>
private async Task DisposeMessageRequestResponseAsync(
Guid[] lockTokens,
Guid lockToken,
TimeSpan timeout,
DispositionStatus dispositionStatus,
string sessionId = null,
Expand All @@ -780,7 +784,7 @@ private async Task DisposeMessageRequestResponseAsync(
amqpRequestMessage.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName] = receiveLink.Name;
}

amqpRequestMessage.Map[ManagementConstants.Properties.LockTokens] = lockTokens;
amqpRequestMessage.Map[ManagementConstants.Properties.LockTokens] = new Guid[]{ lockToken };
amqpRequestMessage.Map[ManagementConstants.Properties.DispositionStatus] = dispositionStatus.ToString().ToLowerInvariant();

if (deadLetterReason != null)
Expand Down Expand Up @@ -836,11 +840,6 @@ private static Outcome GetAbandonOutcome(IDictionary<string, object> propertiesT
private static Outcome GetDeferOutcome(IDictionary<string, object> propertiesToModify) =>
GetModifiedOutcome(propertiesToModify, true);

private static List<ArraySegment<byte>> ConvertLockTokensToDeliveryTags(IEnumerable<Guid> lockTokens)
{
return lockTokens.Select(lockToken => new ArraySegment<byte>(lockToken.ToByteArray())).ToList();
}

private static Outcome GetModifiedOutcome(
IDictionary<string, object> propertiesToModify,
bool undeliverableHere)
Expand Down