Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove LINQ allocations for all batched sends #26911

Merged
merged 10 commits into from
Feb 14, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public override bool TryAddMessage(ServiceBusMessage message)
{
// Initialize the size by reserving space for the batch envelope taking into account the properties from the first
// message which will be used to populate properties on the batch envelope.
amqpMessage = AmqpMessageConverter.BatchSBMessagesAsAmqpMessage(new ServiceBusMessage[] { message }, forceBatch: true);
amqpMessage = AmqpMessageConverter.BatchSBMessagesAsAmqpMessage(message, forceBatch: true);
}
else
{
Expand Down Expand Up @@ -154,14 +154,14 @@ public override void Clear()
///
/// <returns>The set of messages as an enumerable of the requested type.</returns>
///
public override IEnumerable<T> AsEnumerable<T>()
public override IReadOnlyCollection<T> AsReadOnly<T>()
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
{
if (typeof(T) != typeof(ServiceBusMessage))
{
throw new FormatException(string.Format(CultureInfo.CurrentCulture, Resources.UnsupportedTransportEventType, typeof(T).Name));
}

return (IEnumerable<T>)BatchMessages;
return (IReadOnlyCollection<T>) BatchMessages;
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,14 @@ internal static class AmqpMessageConverter
/// <summary>The size, in bytes, to use as a buffer for stream operations.</summary>
private const int StreamBufferSizeInBytes = 512;

public static AmqpMessage BatchSBMessagesAsAmqpMessage(IEnumerable<ServiceBusMessage> source, bool forceBatch = false)
public static AmqpMessage BatchSBMessagesAsAmqpMessage(ServiceBusMessage source, bool forceBatch = false)
{
Argument.AssertNotNull(source, nameof(source));
var batchMessages = new List<AmqpMessage>(1) { SBMessageToAmqpMessage(source) };
return BuildAmqpBatchFromMessages(batchMessages, source, forceBatch);
}

public static AmqpMessage BatchSBMessagesAsAmqpMessage(IReadOnlyCollection<ServiceBusMessage> source, bool forceBatch = false)
{
Argument.AssertNotNull(source, nameof(source));
return BuildAmqpBatchFromMessage(source, forceBatch);
Expand All @@ -46,25 +53,27 @@ public static AmqpMessage BatchSBMessagesAsAmqpMessage(IEnumerable<ServiceBusMes
///
/// <returns>The batch <see cref="AmqpMessage" /> containing the source messages.</returns>
///
private static AmqpMessage BuildAmqpBatchFromMessage(IEnumerable<ServiceBusMessage> source, bool forceBatch)
private static AmqpMessage BuildAmqpBatchFromMessage(IReadOnlyCollection<ServiceBusMessage> source, bool forceBatch)
{
AmqpMessage firstAmqpMessage = null;
ServiceBusMessage firstMessage = null;

return BuildAmqpBatchFromMessages(
source.Select(sbMessage =>
var batchMessages = new List<AmqpMessage>(source.Count);
foreach (ServiceBusMessage sbMessage in source)
{
if (firstAmqpMessage == null)
{
if (firstAmqpMessage == null)
{
firstAmqpMessage = SBMessageToAmqpMessage(sbMessage);
firstMessage = sbMessage;
return firstAmqpMessage;
}
else
{
return SBMessageToAmqpMessage(sbMessage);
}
}).ToList(), firstMessage, forceBatch);
firstAmqpMessage = SBMessageToAmqpMessage(sbMessage);
firstMessage = sbMessage;
batchMessages.Add(firstAmqpMessage);
}
else
{
batchMessages.Add(SBMessageToAmqpMessage(sbMessage));
}
}

return BuildAmqpBatchFromMessages(batchMessages, firstMessage, forceBatch);
}

/// <summary>
Expand All @@ -78,7 +87,7 @@ private static AmqpMessage BuildAmqpBatchFromMessage(IEnumerable<ServiceBusMessa
/// <returns>The batch <see cref="AmqpMessage" /> containing the source messages.</returns>
///
private static AmqpMessage BuildAmqpBatchFromMessages(
IList<AmqpMessage> batchMessages,
List<AmqpMessage> batchMessages,
ServiceBusMessage firstMessage,
bool forceBatch)
{
Expand All @@ -90,13 +99,14 @@ private static AmqpMessage BuildAmqpBatchFromMessages(
}
JoshLove-msft marked this conversation as resolved.
Show resolved Hide resolved
else
{
batchEnvelope = AmqpMessage.Create(batchMessages.Select(message =>
var data = new List<Data>(batchMessages.Count);
foreach (var message in batchMessages)
{
message.Batchable = true;
using var messageStream = message.ToStream();
return new Data { Value = ReadStreamToArraySegment(messageStream) };
}));

data.Add(new Data { Value = ReadStreamToArraySegment(messageStream) });
}
batchEnvelope = AmqpMessage.Create(data);
batchEnvelope.MessageFormat = AmqpConstants.AmqpBatchedMessageFormat;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ await sender.SendBatchInternalAsync(
timeout,
token).ConfigureAwait(false);
},
(this, messageBatch.AsEnumerable<ServiceBusMessage>()),
(this, messageBatch.AsReadOnly<ServiceBusMessage>()),
_connectionScope,
cancellationToken).ConfigureAwait(false);
}
Expand All @@ -231,11 +231,10 @@ await sender.SendBatchInternalAsync(
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
internal virtual async Task SendBatchInternalAsync(
IEnumerable<ServiceBusMessage> messages,
IReadOnlyCollection<ServiceBusMessage> messages,
TimeSpan timeout,
CancellationToken cancellationToken)
{
var stopWatch = ValueStopwatch.StartNew();
var link = default(SendingAmqpLink);

try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ internal abstract class TransportMessageBatch : IDisposable
///
/// <returns>The set of messages as an enumerable of the requested type.</returns>
///
public abstract IEnumerable<T> AsEnumerable<T>();
public abstract IReadOnlyCollection<T> AsReadOnly<T>();

/// <summary>
/// Performs the task needed to clean up resources used by the <see cref="TransportMessageBatch" />.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ public override bool TryAddMessage(ServiceBusMessage message)
///
/// <returns>The set of events as an enumerable of the requested type.</returns>
///
public override IEnumerable<T> AsEnumerable<T>() => (IEnumerable<T>)_backingStore;
public override IReadOnlyCollection<T> AsReadOnly<T>() => new List<T>((IEnumerable<T>)_backingStore);
danielmarbach marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// Performs the task needed to clean up resources used by the <see cref="TransportMessageBatch" />.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ internal void Clear()
///
/// <returns>The set of messages as an enumerable of the requested type.</returns>
///
internal IEnumerable<T> AsEnumerable<T>() => _innerBatch.AsEnumerable<T>();
internal IReadOnlyCollection<T> AsReadOnly<T>() => _innerBatch.AsReadOnly<T>();

/// <summary>
/// Locks the batch to prevent new messages from being added while a service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ public virtual async Task SendMessagesAsync(
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
Logger.SendMessageStart(Identifier, messageBatch.Count);
using DiagnosticScope scope = CreateDiagnosticScope(
messageBatch.AsEnumerable<ServiceBusMessage>(),
messageBatch.AsReadOnly<ServiceBusMessage>(),
DiagnosticProperty.SendActivityName);
scope.Start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void TryAddValidatesNotDisposed()
var batch = new AmqpMessageBatch(new CreateMessageBatchOptions { MaxSizeInBytes = 25 });
batch.Dispose();

Assert.That(() => batch.TryAddMessage(new ServiceBusMessage(new byte[0])), Throws.InstanceOf<ObjectDisposedException>());
Assert.That(() => batch.TryAddMessage(new ServiceBusMessage(Array.Empty<byte>())), Throws.InstanceOf<ObjectDisposedException>());
}

/// <summary>
Expand Down Expand Up @@ -107,7 +107,7 @@ public void TryAddAcceptsAMessageSmallerThanTheMaximumSize()

var batch = new AmqpMessageBatch(options);

Assert.That(batch.TryAddMessage(new ServiceBusMessage(new byte[0])), Is.True);
Assert.That(batch.TryAddMessage(new ServiceBusMessage(Array.Empty<byte>())), Is.True);
}

/// <summary>
Expand Down Expand Up @@ -159,33 +159,33 @@ public void TryAddSetsTheCount()

for (var index = 0; index < messages.Length; ++index)
{
Assert.That(batch.TryAddMessage(new ServiceBusMessage(new byte[0])), Is.True, $"The addition for index: { index } should fit and be accepted.");
Assert.That(batch.TryAddMessage(new ServiceBusMessage(Array.Empty<byte>())), Is.True, $"The addition for index: { index } should fit and be accepted.");
}

Assert.That(batch.Count, Is.EqualTo(messages.Length), "The count should have been set when the batch was updated.");
}

/// <summary>
/// Verifies functionality of the <see cref="AmqpMessageBatch.AsEnumerable{T}" />
/// Verifies functionality of the <see cref="AmqpMessageBatch.AsReadOnly{T}" />
/// method.
/// </summary>
///
[Test]
public void AsEnumerableValidatesTheTypeParameter()
public void AsReadOnlyValidatesTheTypeParameter()
{
var options = new CreateMessageBatchOptions { MaxSizeInBytes = 5000 };

var batch = new AmqpMessageBatch(options);
Assert.That(() => batch.AsEnumerable<AmqpMessage>(), Throws.InstanceOf<FormatException>());
Assert.That(() => batch.AsReadOnly<AmqpMessage>(), Throws.InstanceOf<FormatException>());
}

/// <summary>
/// Verifies functionality of the <see cref="AmqpMessageBatch.AsEnumerable{T}" />
/// Verifies functionality of the <see cref="AmqpMessageBatch.AsReadOnly{T}" />
/// method.
/// </summary>
///
[Test]
public void AsEnumerableReturnsTheMessages()
public void AsReadOnlyReturnsTheMessages()
{
var maximumSize = 5000;
var options = new CreateMessageBatchOptions { MaxSizeInBytes = maximumSize };
Expand All @@ -195,19 +195,17 @@ public void AsEnumerableReturnsTheMessages()

for (var index = 0; index < batchMessages.Length; ++index)
{
batchMessages[index] = new ServiceBusMessage(new byte[0]);
batchMessages[index] = new ServiceBusMessage(Array.Empty<byte>());
batch.TryAddMessage(batchMessages[index]);
}

IEnumerable<ServiceBusMessage> batchEnumerable = batch.AsEnumerable<ServiceBusMessage>();
Assert.That(batchEnumerable, Is.Not.Null, "The batch enumerable should have been populated.");

var batchEnumerableList = batchEnumerable.ToList();
Assert.That(batchEnumerableList.Count, Is.EqualTo(batch.Count), "The wrong number of messages was in the enumerable.");
var batchReadOnly = batch.AsReadOnly<ServiceBusMessage>();
Assert.That(batchReadOnly, Is.Not.Null, "The batch enumerable should have been populated.");
Assert.That(batchReadOnly.Count, Is.EqualTo(batch.Count), "The wrong number of messages was in the enumerable.");

for (var index = 0; index < batchMessages.Length; ++index)
{
Assert.That(batchEnumerableList.Contains(batchMessages[index]), $"The message at index: { index } was not in the enumerable.");
Assert.That(batchReadOnly.Contains(batchMessages[index]), $"The message at index: { index } was not in the enumerable.");
}
}

Expand All @@ -233,7 +231,7 @@ public void ClearClearsTheCount()

for (var index = 0; index < messages.Length; ++index)
{
Assert.That(batch.TryAddMessage(new ServiceBusMessage(new byte[0])), Is.True, $"The addition for index: { index } should fit and be accepted.");
Assert.That(batch.TryAddMessage(new ServiceBusMessage(Array.Empty<byte>())), Is.True, $"The addition for index: { index } should fit and be accepted.");
}

Assert.That(batch.Count, Is.EqualTo(messages.Length), "The count should have been set when the batch was updated.");
Expand Down Expand Up @@ -264,7 +262,7 @@ public void ClearClearsTheSize()

for (var index = 0; index < messages.Length; ++index)
{
Assert.That(batch.TryAddMessage(new ServiceBusMessage(new byte[0])), Is.True, $"The addition for index: { index } should fit and be accepted.");
Assert.That(batch.TryAddMessage(new ServiceBusMessage(Array.Empty<byte>())), Is.True, $"The addition for index: { index } should fit and be accepted.");
}

Assert.That(batch.SizeInBytes, Is.GreaterThan(0), "The size should have been set when the batch was updated.");
Expand Down Expand Up @@ -295,7 +293,7 @@ public void DisposeClearsTheCount()

for (var index = 0; index < messages.Length; ++index)
{
Assert.That(batch.TryAddMessage(new ServiceBusMessage(new byte[0])), Is.True, $"The addition for index: { index } should fit and be accepted.");
Assert.That(batch.TryAddMessage(new ServiceBusMessage(Array.Empty<byte>())), Is.True, $"The addition for index: { index } should fit and be accepted.");
}

Assert.That(batch.Count, Is.EqualTo(messages.Length), "The count should have been set when the batch was updated.");
Expand Down Expand Up @@ -326,7 +324,7 @@ public void DisposeClearsTheSize()

for (var index = 0; index < messages.Length; ++index)
{
Assert.That(batch.TryAddMessage(new ServiceBusMessage(new byte[0])), Is.True, $"The addition for index: { index } should fit and be accepted.");
Assert.That(batch.TryAddMessage(new ServiceBusMessage(Array.Empty<byte>())), Is.True, $"The addition for index: { index } should fit and be accepted.");
}

Assert.That(batch.SizeInBytes, Is.GreaterThan(0), "The size should have been set when the batch was updated.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public async Task SenderReceiverActivities(bool useSessions)
batch.TryAddMessage(ServiceBusTestUtilities.GetMessage(sessionId));
}
await sender.SendMessagesAsync(batch);
AssertSendActivities(useSessions, sender, batch.AsEnumerable<ServiceBusMessage>());
AssertSendActivities(useSessions, sender, batch.AsReadOnly<ServiceBusMessage>());
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public async Task LogsEvents()
_listener.SingleEventById(ServiceBusEventSource.CreateMessageBatchStartEvent, e => e.Payload.Contains(sender.Identifier));
_listener.SingleEventById(ServiceBusEventSource.CreateMessageBatchCompleteEvent, e => e.Payload.Contains(sender.Identifier));

IEnumerable<ServiceBusMessage> messages = ServiceBusTestUtilities.AddMessages(batch, messageCount).AsEnumerable<ServiceBusMessage>();
IEnumerable<ServiceBusMessage> messages = ServiceBusTestUtilities.AddMessages(batch, messageCount).AsReadOnly<ServiceBusMessage>();

await sender.SendMessagesAsync(batch);
_listener.SingleEventById(ServiceBusEventSource.CreateSendLinkStartEvent, e => e.Payload.Contains(sender.Identifier));
Expand Down Expand Up @@ -163,7 +163,7 @@ public async Task LogsSessionEvents()
_listener.SingleEventById(ServiceBusEventSource.CreateMessageBatchStartEvent, e => e.Payload.Contains(sender.Identifier));
_listener.SingleEventById(ServiceBusEventSource.CreateMessageBatchCompleteEvent, e => e.Payload.Contains(sender.Identifier));

IEnumerable<ServiceBusMessage> messages = ServiceBusTestUtilities.AddMessages(batch, messageCount, "sessionId").AsEnumerable<ServiceBusMessage>();
IEnumerable<ServiceBusMessage> messages = ServiceBusTestUtilities.AddMessages(batch, messageCount, "sessionId").AsReadOnly<ServiceBusMessage>();

await sender.SendMessagesAsync(batch);
_listener.SingleEventById(ServiceBusEventSource.CreateSendLinkStartEvent, e => e.Payload.Contains(sender.Identifier));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ public async Task SendBatchOfMessagesLogsEvents()
.Setup(transport => transport.Count)
.Returns(3);

mockTransportBatch
.Setup(transport => transport.AsReadOnly<ServiceBusMessage>())
.Returns(new List<ServiceBusMessage>());

mockTransportSender.Setup(
sender => sender.CreateMessageBatchAsync(
It.IsAny<CreateMessageBatchOptions>(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public void EventDataBatchRespectsTheTryAddCallback()
Assert.That(() => batch.TryAddMessage(new ServiceBusMessage(new BinaryData("Too many"))), Is.False, "The batch is full; a second attempt to add a new event should not succeed.");

Assert.That(store.Count, Is.EqualTo(eventLimit), "The batch should be at its limit after the failed TryAdd attempts.");
Assert.That(batch.AsEnumerable<ServiceBusMessage>(), Is.EquivalentTo(store), "The batch enumerable should reflect the events in the backing store.");
Assert.That(batch.AsReadOnly<ServiceBusMessage>(), Is.EquivalentTo(store), "The batch enumerable should reflect the events in the backing store.");
}

/// <summary>
Expand Down
Loading