Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternative: Remove byte array allocations from AmqpReceiver DisposeMessagesAsync #20427

Merged
merged 3 commits into from
Apr 15, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 39 additions & 40 deletions sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
// Licensed under the MIT License.

using System;
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.ExceptionServices;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using System.Transactions;
Expand Down Expand Up @@ -72,6 +74,8 @@ internal class AmqpReceiver : TransportReceiver
private readonly FaultTolerantAmqpObject<ReceivingAmqpLink> _receiveLink;
private readonly FaultTolerantAmqpObject<RequestResponseAmqpLink> _managementLink;

private const int SizeOfGuidInBytes = 16;

/// <summary>
/// Gets the sequence number of the last peeked message.
/// </summary>
Expand Down Expand Up @@ -417,34 +421,39 @@ private async Task CompleteInternalAsync(
TimeSpan timeout)
{
Guid lockTokenGuid = new Guid(lockToken);
var lockTokenGuids = new[] { lockTokenGuid };
if (_requestResponseLockedMessages.Contains(lockTokenGuid))
{
await DisposeMessageRequestResponseAsync(
lockTokenGuids,
lockTokenGuid,
timeout,
DispositionStatus.Completed,
SessionId).ConfigureAwait(false);
return;
}
await DisposeMessagesAsync(lockTokenGuids, AmqpConstants.AcceptedOutcome, timeout).ConfigureAwait(false);
await DisposeMessagesAsync(lockTokenGuid, AmqpConstants.AcceptedOutcome, timeout).ConfigureAwait(false);
}

/// <summary>
/// Completes a series of <see cref="ServiceBusMessage"/> using a list of lock tokens. This will delete the message from the service.
danielmarbach marked this conversation as resolved.
Show resolved Hide resolved
/// </summary>
///
/// <param name="lockTokens">An <see cref="IEnumerable{T}"/> containing the lock tokens of the corresponding messages to complete.</param>
/// <param name="lockToken">The lock token of the corresponding message to complete.</param>
/// <param name="outcome"></param>
/// <param name="timeout"></param>
private async Task DisposeMessagesAsync(
danielmarbach marked this conversation as resolved.
Show resolved Hide resolved
IEnumerable<Guid> lockTokens,
Guid lockToken,
Outcome outcome,
TimeSpan timeout)
{
ThrowIfSessionLockLost();
List<ArraySegment<byte>> deliveryTags = ConvertLockTokensToDeliveryTags(lockTokens);

byte[] bufferForLockToken = ArrayPool<byte>.Shared.Rent(SizeOfGuidInBytes);
if (!MemoryMarshal.TryWrite(bufferForLockToken, ref lockToken))
{
lockToken.ToByteArray().AsSpan().CopyTo(bufferForLockToken);
}

ArraySegment<byte> deliveryTag = new ArraySegment<byte>(bufferForLockToken, 0, SizeOfGuidInBytes);
ReceivingAmqpLink receiveLink = null;
try
{
Expand All @@ -463,27 +472,21 @@ private async Task DisposeMessagesAsync(
receiveLink = await _receiveLink.GetOrCreateAsync(timeout).ConfigureAwait(false);
}

var disposeMessageTasks = new Task<Outcome>[deliveryTags.Count];
var i = 0;
foreach (ArraySegment<byte> deliveryTag in deliveryTags)
{
disposeMessageTasks[i++] = receiveLink.DisposeMessageAsync(deliveryTag, transactionId, outcome, true, timeout);
}

Outcome[] outcomes = await Task.WhenAll(disposeMessageTasks).ConfigureAwait(false);
Outcome outcomeResult = await receiveLink
.DisposeMessageAsync(deliveryTag, transactionId, outcome, true, timeout).ConfigureAwait(false);
Error error = null;
foreach (Outcome item in outcomes)
Outcome disposedOutcome =
outcomeResult.DescriptorCode == Rejected.Code && ((error = ((Rejected)outcomeResult).Error) != null)
? outcomeResult
: null;
if (disposedOutcome != null)
{
Outcome disposedOutcome = item.DescriptorCode == Rejected.Code && ((error = ((Rejected)item).Error) != null) ? item : null;
if (disposedOutcome != null)
if (error.Condition.Equals(AmqpErrorCode.NotFound))
{
if (error.Condition.Equals(AmqpErrorCode.NotFound))
{
ThrowLockLostException();
}

throw error.ToMessagingContractException();
ThrowLockLostException();
}

throw error.ToMessagingContractException();
}
}
catch (Exception exception)
Expand All @@ -503,6 +506,10 @@ private async Task DisposeMessagesAsync(

throw;
}
finally
{
ArrayPool<byte>.Shared.Return(bufferForLockToken);
}
}

private void ThrowLockLostException()
Expand Down Expand Up @@ -564,17 +571,16 @@ private Task DeferInternalAsync(
IDictionary<string, object> propertiesToModify = null)
{
Guid lockTokenGuid = new Guid(lockToken);
var lockTokenGuids = new[] { lockTokenGuid };
if (_requestResponseLockedMessages.Contains(lockTokenGuid))
{
return DisposeMessageRequestResponseAsync(
lockTokenGuids,
lockTokenGuid,
timeout,
DispositionStatus.Defered,
SessionId,
propertiesToModify);
}
return DisposeMessagesAsync(lockTokenGuids, GetDeferOutcome(propertiesToModify), timeout);
return DisposeMessagesAsync(lockTokenGuid, GetDeferOutcome(propertiesToModify), timeout);
}

/// <summary>
Expand Down Expand Up @@ -622,17 +628,16 @@ private Task AbandonInternalAsync(
IDictionary<string, object> propertiesToModify = null)
{
Guid lockTokenGuid = new Guid(lockToken);
var lockTokenGuids = new[] { lockTokenGuid };
if (_requestResponseLockedMessages.Contains(lockTokenGuid))
{
return DisposeMessageRequestResponseAsync(
lockTokenGuids,
lockTokenGuid,
timeout,
DispositionStatus.Abandoned,
SessionId,
propertiesToModify);
}
return DisposeMessagesAsync(lockTokenGuids, GetAbandonOutcome(propertiesToModify), timeout);
return DisposeMessagesAsync(lockTokenGuid, GetAbandonOutcome(propertiesToModify), timeout);
}

/// <summary>
Expand Down Expand Up @@ -695,11 +700,10 @@ internal virtual Task DeadLetterInternalAsync(
Argument.AssertNotTooLong(deadLetterErrorDescription, Constants.MaxDeadLetterReasonLength, nameof(deadLetterErrorDescription));

Guid lockTokenGuid = new Guid(lockToken);
var lockTokenGuids = new[] { lockTokenGuid };
if (_requestResponseLockedMessages.Contains(lockTokenGuid))
{
return DisposeMessageRequestResponseAsync(
lockTokenGuids,
lockTokenGuid,
timeout,
DispositionStatus.Suspended,
SessionId,
Expand All @@ -709,7 +713,7 @@ internal virtual Task DeadLetterInternalAsync(
}

return DisposeMessagesAsync(
lockTokenGuids,
lockTokenGuid,
GetRejectedOutcome(propertiesToModify, deadLetterReason, deadLetterErrorDescription),
timeout);
}
Expand Down Expand Up @@ -756,15 +760,15 @@ private static Rejected GetRejectedOutcome(
/// Updates the disposition status of deferred messages.
/// </summary>
///
/// <param name="lockTokens">Message lock tokens to update disposition status.</param>
/// <param name="lockToken">Message lock token to update disposition status.</param>
/// <param name="timeout"></param>
/// <param name="dispositionStatus"></param>
/// <param name="sessionId"></param>
/// <param name="propertiesToModify"></param>
/// <param name="deadLetterReason"></param>
/// <param name="deadLetterDescription"></param>
private async Task DisposeMessageRequestResponseAsync(
Guid[] lockTokens,
Guid lockToken,
TimeSpan timeout,
DispositionStatus dispositionStatus,
string sessionId = null,
Expand All @@ -780,7 +784,7 @@ private async Task DisposeMessageRequestResponseAsync(
amqpRequestMessage.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName] = receiveLink.Name;
}

amqpRequestMessage.Map[ManagementConstants.Properties.LockTokens] = lockTokens;
amqpRequestMessage.Map[ManagementConstants.Properties.LockTokens] = new Guid[]{ lockToken };
amqpRequestMessage.Map[ManagementConstants.Properties.DispositionStatus] = dispositionStatus.ToString().ToLowerInvariant();

if (deadLetterReason != null)
Expand Down Expand Up @@ -836,11 +840,6 @@ private static Outcome GetAbandonOutcome(IDictionary<string, object> propertiesT
private static Outcome GetDeferOutcome(IDictionary<string, object> propertiesToModify) =>
GetModifiedOutcome(propertiesToModify, true);

private static List<ArraySegment<byte>> ConvertLockTokensToDeliveryTags(IEnumerable<Guid> lockTokens)
{
return lockTokens.Select(lockToken => new ArraySegment<byte>(lockToken.ToByteArray())).ToList();
}

private static Outcome GetModifiedOutcome(
IDictionary<string, object> propertiesToModify,
bool undeliverableHere)
Expand Down