Skip to content

Commit

Permalink
[Event Hubs] Minor Performance Tweaks (#26983)
Browse files Browse the repository at this point in the history
* [Event Hubs] Minor Performance Tweaks

The focus of these changes is to apply some of the performance-oriented
tweaks made in Service Bus to the Event Hubs clients.   Included are:

- Attempt to retrieve AMQP objects synchronously before calling `GetOrCreateAsync`

- Remove LINQ from the `AmqpMessageConverter`

- Change the internal batch `AsEnumerable<T>` to `AsList<T>` in order to avoid
  casting costs and have `Count` available to right-size transform collections.

- Use the two item overload when creating a linked token source to avoid
  allocating an unnecessary array.  _([ref](https://github.com/dotnet/runtime/blob/main/src/libraries/System.Private.CoreLib/src/System/Threading/CancellationTokenSource.cs#L736-L739))_
  • Loading branch information
jsquire authored Feb 14, 2022
1 parent e781c3a commit 3bfb0f8
Show file tree
Hide file tree
Showing 23 changed files with 199 additions and 122 deletions.
14 changes: 14 additions & 0 deletions sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

## 5.7.0-beta.4 (Unreleased)

### Acknowledgments

Thank you to our developer community members who helped to make the Event Hubs client libraries better with their contributions to this release:

- Daniel Marbach _([GitHub](https://github.com/danielmarbach))_

### Features Added

### Breaking Changes
Expand All @@ -10,6 +16,14 @@

### Other Changes

- Attempt to retrieve AMQP objects synchronously before calling `GetOrCreateAsync`.

- Remove LINQ from the `AmqpMessageConverter` in favor of direct looping. _(Based on a community contribution, courtesy of [danielmarbach](https://github.com/danielmarbach))_

- Change the internal batch `AsEnumerable<T>` to `AsList<T>` in order to avoid casting costs and have `Count` available to right-size transform collections. _(Based on a community contribution, courtesy of [danielmarbach](https://github.com/danielmarbach))_

- Use the two item overload when creating a linked token source to avoid allocating an unnecessary array. _([ref](https://github.com/dotnet/runtime/blob/main/src/libraries/System.Private.CoreLib/src/System/Threading/CancellationTokenSource.cs#L736-L739))_ _(Based on a community contribution, courtesy of [danielmarbach](https://github.com/danielmarbach))_

## 5.7.0-beta.3 (2022-02-09)

### Features Added
Expand Down
14 changes: 11 additions & 3 deletions sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public override async Task<EventHubProperties> GetPropertiesAsync(EventHubsRetry

var failedAttemptCount = 0;
var retryDelay = default(TimeSpan?);

var link = default(RequestResponseAmqpLink);
var stopWatch = ValueStopwatch.StartNew();

try
Expand All @@ -244,7 +244,11 @@ public override async Task<EventHubProperties> GetPropertiesAsync(EventHubsRetry
var token = await AcquireAccessTokenAsync(cancellationToken).ConfigureAwait(false);
using AmqpMessage request = MessageConverter.CreateEventHubPropertiesRequest(EventHubName, token);

RequestResponseAmqpLink link = await ManagementLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout.CalculateRemaining(stopWatch.GetElapsedTime())), cancellationToken).ConfigureAwait(false);
if (!ManagementLink.TryGetOpenedObject(out link))
{
link = await ManagementLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout.CalculateRemaining(stopWatch.GetElapsedTime())), cancellationToken).ConfigureAwait(false);
}

cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

// Send the request and process the response.
Expand Down Expand Up @@ -340,7 +344,11 @@ public override async Task<PartitionProperties> GetPartitionPropertiesAsync(stri
token = await AcquireAccessTokenAsync(cancellationToken).ConfigureAwait(false);
using AmqpMessage request = MessageConverter.CreatePartitionPropertiesRequest(EventHubName, partitionId, token);

link = await ManagementLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout.CalculateRemaining(stopWatch.GetElapsedTime())), cancellationToken).ConfigureAwait(false);
if (!ManagementLink.TryGetOpenedObject(out link))
{
link = await ManagementLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout.CalculateRemaining(stopWatch.GetElapsedTime())), cancellationToken).ConfigureAwait(false);
}

cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

// Send the request and process the response.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,12 @@ public virtual async Task<RequestResponseAmqpLink> OpenManagementLinkAsync(TimeS
EventHubsEventSource.Log.AmqpManagementLinkCreateStart(EventHubName);

var stopWatch = ValueStopwatch.StartNew();
var connection = await ActiveConnection.GetOrCreateAsync(linkTimeout, cancellationToken).ConfigureAwait(false);

if (!ActiveConnection.TryGetOpenedObject(out var connection))
{
connection = await ActiveConnection.GetOrCreateAsync(linkTimeout, cancellationToken).ConfigureAwait(false);
}

var link = await CreateManagementLinkAsync(connection, operationTimeout, linkTimeout.CalculateRemaining(stopWatch.GetElapsedTime()), cancellationToken).ConfigureAwait(false);

await OpenAmqpObjectAsync(link, cancellationToken: cancellationToken).ConfigureAwait(false);
Expand Down Expand Up @@ -363,7 +368,11 @@ public virtual async Task<ReceivingAmqpLink> OpenConsumerLinkAsync(string consum

var stopWatch = ValueStopwatch.StartNew();
var consumerEndpoint = new Uri(ServiceEndpoint, string.Format(CultureInfo.InvariantCulture, ConsumerPathSuffixMask, EventHubName, consumerGroup, partitionId));
var connection = await ActiveConnection.GetOrCreateAsync(linkTimeout, cancellationToken).ConfigureAwait(false);

if (!ActiveConnection.TryGetOpenedObject(out var connection))
{
connection = await ActiveConnection.GetOrCreateAsync(linkTimeout, cancellationToken).ConfigureAwait(false);
}

if (string.IsNullOrEmpty(linkIdentifier))
{
Expand Down Expand Up @@ -432,7 +441,11 @@ public virtual async Task<SendingAmqpLink> OpenProducerLinkAsync(string partitio
var stopWatch = ValueStopwatch.StartNew();
var path = (string.IsNullOrEmpty(partitionId)) ? EventHubName : string.Format(CultureInfo.InvariantCulture, PartitionProducerPathSuffixMask, EventHubName, partitionId);
var producerEndpoint = new Uri(ServiceEndpoint, path);
var connection = await ActiveConnection.GetOrCreateAsync(linkTimeout, cancellationToken).ConfigureAwait(false);

if (!ActiveConnection.TryGetOpenedObject(out var connection))
{
connection = await ActiveConnection.GetOrCreateAsync(linkTimeout, cancellationToken).ConfigureAwait(false);
}

if (string.IsNullOrEmpty(linkIdentifier))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,12 @@ public override async Task<IReadOnlyList<EventData>> ReceiveAsync(int maximumEve

EventHubsEventSource.Log.EventReceiveStart(EventHubName, ConsumerGroup, PartitionId, operationId);

link = await ReceiveLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout), cancellationToken).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
if (!ReceiveLink.TryGetOpenedObject(out link))
{
link = await ReceiveLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout), cancellationToken).ConfigureAwait(false);
}

cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
var messagesReceived = await link.ReceiveMessagesAsync(maximumEventCount, ReceiveBuildBatchInterval, waitTime, cancellationToken).ConfigureAwait(false);

// If no messages were received, then just return the empty set.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using Azure.Core;
using Azure.Messaging.EventHubs.Core;
using Azure.Messaging.EventHubs.Producer;
Expand Down Expand Up @@ -102,9 +101,10 @@ public AmqpEventBatch(AmqpMessageConverter messageConverter,
MaximumSizeInBytes = options.MaximumSizeInBytes.Value;
ActiveFeatures = activeFeatures;

// Initialize the size by reserving space for the batch envelope.
// Initialize the size by reserving space for the batch envelope. At this point, the
// set of batch events is empty, so the message returned will only represent the envelope.

using AmqpMessage envelope = messageConverter.CreateBatchFromEvents(Enumerable.Empty<EventData>(), options.PartitionKey);
using AmqpMessage envelope = messageConverter.CreateBatchFromEvents(BatchEvents, options.PartitionKey);
ReservedSize = envelope.SerializedMessageSize;
_sizeBytes = ReservedSize;
}
Expand Down Expand Up @@ -174,22 +174,21 @@ public override void Clear()
}

/// <summary>
/// Represents the batch as an enumerable set of transport-specific
/// representations of an event.
/// Represents the batch as a set of the AMQP-specific representations of an event.
/// </summary>
///
/// <typeparam name="T">The transport-specific event representation being requested.</typeparam>
///
/// <returns>The set of events as an enumerable of the requested type.</returns>
///
public override IEnumerable<T> AsEnumerable<T>()
public override IReadOnlyCollection<T> AsReadOnlyCollection<T>()
{
if (typeof(T) != typeof(EventData))
{
throw new FormatException(string.Format(CultureInfo.CurrentCulture, Resources.UnsupportedTransportEventType, typeof(T).Name));
}

return (IEnumerable<T>)BatchEvents;
return BatchEvents as IReadOnlyCollection<T>;
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Runtime.InteropServices;
using System.Runtime.Serialization;
using Azure.Core;
Expand Down Expand Up @@ -80,7 +79,7 @@ public virtual AmqpMessage CreateMessageFromEvent(EventData source,
/// ensuring proper disposal.
/// </remarks>
///
public virtual AmqpMessage CreateBatchFromEvents(IEnumerable<EventData> source,
public virtual AmqpMessage CreateBatchFromEvents(IReadOnlyCollection<EventData> source,
string partitionKey = null)
{
Argument.AssertNotNull(source, nameof(source));
Expand All @@ -101,7 +100,7 @@ public virtual AmqpMessage CreateBatchFromEvents(IEnumerable<EventData> source,
/// ensuring proper disposal.
/// </remarks>
///
public virtual AmqpMessage CreateBatchFromMessages(IEnumerable<AmqpMessage> source,
public virtual AmqpMessage CreateBatchFromMessages(IReadOnlyCollection<AmqpMessage> source,
string partitionKey = null)
{
Argument.AssertNotNull(source, nameof(source));
Expand Down Expand Up @@ -264,11 +263,18 @@ public virtual PartitionProperties CreatePartitionPropertiesFromResponse(AmqpMes
///
/// <returns>The batch <see cref="AmqpMessage" /> containing the source events.</returns>
///
private static AmqpMessage BuildAmqpBatchFromEvents(IEnumerable<EventData> source,
string partitionKey) =>
BuildAmqpBatchFromMessages(
source.Select(eventData => BuildAmqpMessageFromEvent(eventData, partitionKey)),
partitionKey);
private static AmqpMessage BuildAmqpBatchFromEvents(IReadOnlyCollection<EventData> source,
string partitionKey)
{
var messages = new List<AmqpMessage>(source.Count);

foreach (var eventData in source)
{
messages.Add(BuildAmqpMessageFromEvent(eventData, partitionKey));
}

return BuildAmqpBatchFromMessages(messages, partitionKey);
}

/// <summary>
/// Builds a batch <see cref="AmqpMessage" /> from a set of <see cref="AmqpMessage" />.
Expand All @@ -284,26 +290,43 @@ private static AmqpMessage BuildAmqpBatchFromEvents(IEnumerable<EventData> sourc
/// ensuring proper disposal.
/// </remarks>
///
private static AmqpMessage BuildAmqpBatchFromMessages(IEnumerable<AmqpMessage> source,
private static AmqpMessage BuildAmqpBatchFromMessages(IReadOnlyCollection<AmqpMessage> source,
string partitionKey)
{
AmqpMessage batchEnvelope;

var batchMessages = source.ToList();

if (batchMessages.Count == 1)
if (source.Count == 1)
{
batchEnvelope = batchMessages[0];
switch (source)
{
case List<AmqpMessage> messageList:
batchEnvelope = messageList[0];
break;

case AmqpMessage[] messageArray:
batchEnvelope = messageArray[0];
break;

default:
var enumerator = source.GetEnumerator();
enumerator.MoveNext();

batchEnvelope = enumerator.Current;
break;
}
}
else
{
batchEnvelope = AmqpMessage.Create(batchMessages.Select(message =>
var messageData = new List<Data>(source.Count);

foreach (var message in source)
{
message.Batchable = true;
using var messageStream = message.ToStream();
return new Data { Value = ReadStreamToArraySegment(messageStream) };
}));
messageData.Add(new Data { Value = ReadStreamToArraySegment(messageStream) });
}

batchEnvelope = AmqpMessage.Create(messageData);
batchEnvelope.MessageFormat = AmqpConstants.AmqpBatchedMessageFormat;
}

Expand Down Expand Up @@ -858,7 +881,14 @@ private static bool TryGetSequenceBody(AmqpMessage source, out AmqpMessageBody s
return false;
}

sequenceBody = AmqpMessageBody.FromSequence(source.SequenceBody.Select(item => (IList<object>)item.List).ToArray());
var bodyContent = new List<IList<object>>();

foreach (var item in source.SequenceBody)
{
bodyContent.Add((IList<object>)item.List);
}

sequenceBody = AmqpMessageBody.FromSequence(bodyContent);
return true;
}

Expand Down
24 changes: 17 additions & 7 deletions sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public AmqpProducer(string eventHubName,
/// <param name="sendOptions">The set of options to consider when sending this batch.</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
public override async Task SendAsync(IEnumerable<EventData> events,
public override async Task SendAsync(IReadOnlyCollection<EventData> events,
SendEventOptions sendOptions,
CancellationToken cancellationToken)
{
Expand Down Expand Up @@ -230,7 +230,7 @@ public override async Task SendAsync(EventDataBatch eventBatch,

// Make a defensive copy of the messages in the batch.

AmqpMessage messageFactory() => MessageConverter.CreateBatchFromEvents(eventBatch.AsEnumerable<EventData>(), eventBatch.SendOptions?.PartitionKey);
AmqpMessage messageFactory() => MessageConverter.CreateBatchFromEvents(eventBatch.AsReadOnlyCollection<EventData>(), eventBatch.SendOptions?.PartitionKey);
await SendAsync(messageFactory, eventBatch.SendOptions?.PartitionKey, cancellationToken).ConfigureAwait(false);
}

Expand Down Expand Up @@ -269,7 +269,11 @@ public override async ValueTask<TransportEventBatch> CreateBatchAsync(CreateBatc
{
try
{
await SendLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout), cancellationToken).ConfigureAwait(false);
if (!SendLink.TryGetOpenedObject(out _))
{
await SendLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout), cancellationToken).ConfigureAwait(false);
}

break;
}
catch (Exception ex)
Expand Down Expand Up @@ -344,7 +348,11 @@ public override async ValueTask<PartitionPublishingPropertiesInternal> ReadIniti
{
try
{
await SendLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout), cancellationToken).ConfigureAwait(false);
if (!SendLink.TryGetOpenedObject(out _))
{
await SendLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout), cancellationToken).ConfigureAwait(false);
}

break;
}
catch (Exception ex)
Expand Down Expand Up @@ -436,7 +444,6 @@ protected virtual async Task SendAsync(Func<AmqpMessage> messageFactory,
var failedAttemptCount = 0;
var logPartition = PartitionId ?? partitionKey;
var operationId = Guid.NewGuid().ToString("D", CultureInfo.InvariantCulture);
var stopWatch = ValueStopwatch.StartNew();

TimeSpan? retryDelay;
SendingAmqpLink link;
Expand All @@ -457,7 +464,11 @@ protected virtual async Task SendAsync(Func<AmqpMessage> messageFactory,

EventHubsEventSource.Log.EventPublishStart(EventHubName, logPartition, operationId);

link = await SendLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout), cancellationToken).ConfigureAwait(false);
if (!SendLink.TryGetOpenedObject(out link))
{
link = await SendLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout), cancellationToken).ConfigureAwait(false);
}

cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

// Validate that the batch of messages is not too large to send. This is done after the link is created to ensure
Expand Down Expand Up @@ -499,7 +510,6 @@ protected virtual async Task SendAsync(Func<AmqpMessage> messageFactory,
await Task.Delay(retryDelay.Value, cancellationToken).ConfigureAwait(false);

tryTimeout = RetryPolicy.CalculateTryTimeout(failedAttemptCount);
stopWatch = ValueStopwatch.StartNew();
}
else if (ex is AmqpException)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ public virtual async IAsyncEnumerable<PartitionEvent> ReadEventsAsync(bool start
var options = readOptions?.Clone() ?? new ReadEventOptions();
var startingPosition = startReadingAtEarliestEvent ? EventPosition.Earliest : EventPosition.Latest;

using var cancellationSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
using var cancellationSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, CancellationToken.None);

try
{
Expand Down
Loading

0 comments on commit 3bfb0f8

Please sign in to comment.