Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Event Hubs] Minor Performance Tweaks #26983

Merged
merged 4 commits into from
Feb 14, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions sdk/eventhub/Azure.Messaging.EventHubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

## 5.7.0-beta.4 (Unreleased)

### Acknowledgments

Thank you to our developer community members who helped to make the Event Hubs client libraries better with their contributions to this release:

- Daniel Marbach _([GitHub](https://github.com/danielmarbach))_

### Features Added

### Breaking Changes
Expand All @@ -10,6 +16,14 @@

### Other Changes

- Attempt to retrieve AMQP objects synchronously before calling `GetOrCreateAsync`.

- Remove LINQ from the `AmqpMessageConverter` in favor of direct looping. _(Based on a community contribution, courtesy of [danielmarbach](https://github.com/danielmarbach))_

- Change the internal batch `AsEnumerable<T>` to `AsList<T>` in order to avoid casting costs and have `Count` available to right-size transform collections. _(Based on a community contribution, courtesy of [danielmarbach](https://github.com/danielmarbach))_

- Use the two item overload when creating a linked token source to avoid allocating an unnecessary array. _([ref](https://github.com/dotnet/runtime/blob/main/src/libraries/System.Private.CoreLib/src/System/Threading/CancellationTokenSource.cs#L736-L739))_ _(Based on a community contribution, courtesy of [danielmarbach](https://github.com/danielmarbach))_

## 5.7.0-beta.3 (2022-02-09)

### Features Added
Expand Down
14 changes: 11 additions & 3 deletions sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public override async Task<EventHubProperties> GetPropertiesAsync(EventHubsRetry

var failedAttemptCount = 0;
var retryDelay = default(TimeSpan?);

var link = default(RequestResponseAmqpLink);
var stopWatch = ValueStopwatch.StartNew();

try
Expand All @@ -244,7 +244,11 @@ public override async Task<EventHubProperties> GetPropertiesAsync(EventHubsRetry
var token = await AcquireAccessTokenAsync(cancellationToken).ConfigureAwait(false);
using AmqpMessage request = MessageConverter.CreateEventHubPropertiesRequest(EventHubName, token);

RequestResponseAmqpLink link = await ManagementLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout.CalculateRemaining(stopWatch.GetElapsedTime())), cancellationToken).ConfigureAwait(false);
if (!ManagementLink.TryGetOpenedObject(out link))
{
link = await ManagementLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout.CalculateRemaining(stopWatch.GetElapsedTime())), cancellationToken).ConfigureAwait(false);
}

cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

// Send the request and process the response.
Expand Down Expand Up @@ -340,7 +344,11 @@ public override async Task<PartitionProperties> GetPartitionPropertiesAsync(stri
token = await AcquireAccessTokenAsync(cancellationToken).ConfigureAwait(false);
using AmqpMessage request = MessageConverter.CreatePartitionPropertiesRequest(EventHubName, partitionId, token);

link = await ManagementLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout.CalculateRemaining(stopWatch.GetElapsedTime())), cancellationToken).ConfigureAwait(false);
if (!ManagementLink.TryGetOpenedObject(out link))
{
link = await ManagementLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout.CalculateRemaining(stopWatch.GetElapsedTime())), cancellationToken).ConfigureAwait(false);
}

cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

// Send the request and process the response.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,12 @@ public virtual async Task<RequestResponseAmqpLink> OpenManagementLinkAsync(TimeS
EventHubsEventSource.Log.AmqpManagementLinkCreateStart(EventHubName);

var stopWatch = ValueStopwatch.StartNew();
var connection = await ActiveConnection.GetOrCreateAsync(linkTimeout, cancellationToken).ConfigureAwait(false);

if (!ActiveConnection.TryGetOpenedObject(out var connection))
{
connection = await ActiveConnection.GetOrCreateAsync(linkTimeout, cancellationToken).ConfigureAwait(false);
}

var link = await CreateManagementLinkAsync(connection, operationTimeout, linkTimeout.CalculateRemaining(stopWatch.GetElapsedTime()), cancellationToken).ConfigureAwait(false);

await OpenAmqpObjectAsync(link, cancellationToken: cancellationToken).ConfigureAwait(false);
Expand Down Expand Up @@ -363,7 +368,11 @@ public virtual async Task<ReceivingAmqpLink> OpenConsumerLinkAsync(string consum

var stopWatch = ValueStopwatch.StartNew();
var consumerEndpoint = new Uri(ServiceEndpoint, string.Format(CultureInfo.InvariantCulture, ConsumerPathSuffixMask, EventHubName, consumerGroup, partitionId));
var connection = await ActiveConnection.GetOrCreateAsync(linkTimeout, cancellationToken).ConfigureAwait(false);

if (!ActiveConnection.TryGetOpenedObject(out var connection))
{
connection = await ActiveConnection.GetOrCreateAsync(linkTimeout, cancellationToken).ConfigureAwait(false);
}

if (string.IsNullOrEmpty(linkIdentifier))
{
Expand Down Expand Up @@ -432,7 +441,11 @@ public virtual async Task<SendingAmqpLink> OpenProducerLinkAsync(string partitio
var stopWatch = ValueStopwatch.StartNew();
var path = (string.IsNullOrEmpty(partitionId)) ? EventHubName : string.Format(CultureInfo.InvariantCulture, PartitionProducerPathSuffixMask, EventHubName, partitionId);
var producerEndpoint = new Uri(ServiceEndpoint, path);
var connection = await ActiveConnection.GetOrCreateAsync(linkTimeout, cancellationToken).ConfigureAwait(false);

if (!ActiveConnection.TryGetOpenedObject(out var connection))
{
connection = await ActiveConnection.GetOrCreateAsync(linkTimeout, cancellationToken).ConfigureAwait(false);
}

if (string.IsNullOrEmpty(linkIdentifier))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,12 @@ public override async Task<IReadOnlyList<EventData>> ReceiveAsync(int maximumEve

EventHubsEventSource.Log.EventReceiveStart(EventHubName, ConsumerGroup, PartitionId, operationId);

link = await ReceiveLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout), cancellationToken).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
if (!ReceiveLink.TryGetOpenedObject(out link))
{
link = await ReceiveLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout), cancellationToken).ConfigureAwait(false);
}

cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
var messagesReceived = await link.ReceiveMessagesAsync(maximumEventCount, ReceiveBuildBatchInterval, waitTime, cancellationToken).ConfigureAwait(false);

// If no messages were received, then just return the empty set.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using Azure.Core;
using Azure.Messaging.EventHubs.Core;
using Azure.Messaging.EventHubs.Producer;
Expand Down Expand Up @@ -102,9 +101,10 @@ public AmqpEventBatch(AmqpMessageConverter messageConverter,
MaximumSizeInBytes = options.MaximumSizeInBytes.Value;
ActiveFeatures = activeFeatures;

// Initialize the size by reserving space for the batch envelope.
// Initialize the size by reserving space for the batch envelope. At this point, the
// set of batch events is empty, so the message returned will only represent the envelope.

using AmqpMessage envelope = messageConverter.CreateBatchFromEvents(Enumerable.Empty<EventData>(), options.PartitionKey);
using AmqpMessage envelope = messageConverter.CreateBatchFromEvents(BatchEvents, options.PartitionKey);
ReservedSize = envelope.SerializedMessageSize;
_sizeBytes = ReservedSize;
}
Expand Down Expand Up @@ -174,22 +174,21 @@ public override void Clear()
}

/// <summary>
/// Represents the batch as an enumerable set of transport-specific
/// representations of an event.
/// Represents the batch as a set of the AMQP-specific representations of an event.
/// </summary>
///
/// <typeparam name="T">The transport-specific event representation being requested.</typeparam>
///
/// <returns>The set of events as an enumerable of the requested type.</returns>
///
public override IEnumerable<T> AsEnumerable<T>()
public override List<T> AsList<T>()
{
if (typeof(T) != typeof(EventData))
{
throw new FormatException(string.Format(CultureInfo.CurrentCulture, Resources.UnsupportedTransportEventType, typeof(T).Name));
}

return (IEnumerable<T>)BatchEvents;
return BatchEvents as List<T>;
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Runtime.InteropServices;
using System.Runtime.Serialization;
using Azure.Core;
Expand Down Expand Up @@ -80,7 +79,7 @@ public virtual AmqpMessage CreateMessageFromEvent(EventData source,
/// ensuring proper disposal.
/// </remarks>
///
public virtual AmqpMessage CreateBatchFromEvents(IEnumerable<EventData> source,
public virtual AmqpMessage CreateBatchFromEvents(IReadOnlyCollection<EventData> source,
string partitionKey = null)
{
Argument.AssertNotNull(source, nameof(source));
Expand All @@ -101,7 +100,7 @@ public virtual AmqpMessage CreateBatchFromEvents(IEnumerable<EventData> source,
/// ensuring proper disposal.
/// </remarks>
///
public virtual AmqpMessage CreateBatchFromMessages(IEnumerable<AmqpMessage> source,
public virtual AmqpMessage CreateBatchFromMessages(IReadOnlyCollection<AmqpMessage> source,
string partitionKey = null)
{
Argument.AssertNotNull(source, nameof(source));
Expand Down Expand Up @@ -264,11 +263,18 @@ public virtual PartitionProperties CreatePartitionPropertiesFromResponse(AmqpMes
///
/// <returns>The batch <see cref="AmqpMessage" /> containing the source events.</returns>
///
private static AmqpMessage BuildAmqpBatchFromEvents(IEnumerable<EventData> source,
string partitionKey) =>
BuildAmqpBatchFromMessages(
source.Select(eventData => BuildAmqpMessageFromEvent(eventData, partitionKey)),
partitionKey);
private static AmqpMessage BuildAmqpBatchFromEvents(IReadOnlyCollection<EventData> source,
string partitionKey)
{
var messages = new List<AmqpMessage>(source.Count);

foreach (var eventData in source)
{
messages.Add(BuildAmqpMessageFromEvent(eventData, partitionKey));
}

return BuildAmqpBatchFromMessages(messages, partitionKey);
}

/// <summary>
/// Builds a batch <see cref="AmqpMessage" /> from a set of <see cref="AmqpMessage" />.
Expand All @@ -284,26 +290,43 @@ private static AmqpMessage BuildAmqpBatchFromEvents(IEnumerable<EventData> sourc
/// ensuring proper disposal.
/// </remarks>
///
private static AmqpMessage BuildAmqpBatchFromMessages(IEnumerable<AmqpMessage> source,
private static AmqpMessage BuildAmqpBatchFromMessages(IReadOnlyCollection<AmqpMessage> source,
string partitionKey)
{
AmqpMessage batchEnvelope;

var batchMessages = source.ToList();

if (batchMessages.Count == 1)
if (source.Count == 1)
{
batchEnvelope = batchMessages[0];
switch (source)
{
case List<AmqpMessage> messageList:
batchEnvelope = messageList[0];
break;

case AmqpMessage[] messageArray:
batchEnvelope = messageArray[0];
break;

default:
var enumerator = source.GetEnumerator();
enumerator.MoveNext();

batchEnvelope = enumerator.Current;
break;
}
}
else
{
batchEnvelope = AmqpMessage.Create(batchMessages.Select(message =>
var messageData = new List<Data>(source.Count);

foreach (var message in source)
{
message.Batchable = true;
using var messageStream = message.ToStream();
return new Data { Value = ReadStreamToArraySegment(messageStream) };
}));
messageData.Add(new Data { Value = ReadStreamToArraySegment(messageStream) });
}

batchEnvelope = AmqpMessage.Create(messageData);
batchEnvelope.MessageFormat = AmqpConstants.AmqpBatchedMessageFormat;
}

Expand Down Expand Up @@ -858,7 +881,14 @@ private static bool TryGetSequenceBody(AmqpMessage source, out AmqpMessageBody s
return false;
}

sequenceBody = AmqpMessageBody.FromSequence(source.SequenceBody.Select(item => (IList<object>)item.List).ToArray());
var bodyContent = new List<IList<object>>();

foreach (var item in source.SequenceBody)
{
bodyContent.Add((IList<object>)item.List);
}

sequenceBody = AmqpMessageBody.FromSequence(bodyContent);
return true;
}

Expand Down
24 changes: 17 additions & 7 deletions sdk/eventhub/Azure.Messaging.EventHubs/src/Amqp/AmqpProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public AmqpProducer(string eventHubName,
/// <param name="sendOptions">The set of options to consider when sending this batch.</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
public override async Task SendAsync(IEnumerable<EventData> events,
public override async Task SendAsync(IReadOnlyCollection<EventData> events,
SendEventOptions sendOptions,
CancellationToken cancellationToken)
{
Expand Down Expand Up @@ -230,7 +230,7 @@ public override async Task SendAsync(EventDataBatch eventBatch,

// Make a defensive copy of the messages in the batch.

AmqpMessage messageFactory() => MessageConverter.CreateBatchFromEvents(eventBatch.AsEnumerable<EventData>(), eventBatch.SendOptions?.PartitionKey);
AmqpMessage messageFactory() => MessageConverter.CreateBatchFromEvents(eventBatch.AsList<EventData>(), eventBatch.SendOptions?.PartitionKey);
await SendAsync(messageFactory, eventBatch.SendOptions?.PartitionKey, cancellationToken).ConfigureAwait(false);
}

Expand Down Expand Up @@ -269,7 +269,11 @@ public override async ValueTask<TransportEventBatch> CreateBatchAsync(CreateBatc
{
try
{
await SendLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout), cancellationToken).ConfigureAwait(false);
if (!SendLink.TryGetOpenedObject(out _))
{
await SendLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout), cancellationToken).ConfigureAwait(false);
}

break;
}
catch (Exception ex)
Expand Down Expand Up @@ -344,7 +348,11 @@ public override async ValueTask<PartitionPublishingPropertiesInternal> ReadIniti
{
try
{
await SendLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout), cancellationToken).ConfigureAwait(false);
if (!SendLink.TryGetOpenedObject(out _))
{
await SendLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout), cancellationToken).ConfigureAwait(false);
}

break;
}
catch (Exception ex)
Expand Down Expand Up @@ -436,7 +444,6 @@ protected virtual async Task SendAsync(Func<AmqpMessage> messageFactory,
var failedAttemptCount = 0;
var logPartition = PartitionId ?? partitionKey;
var operationId = Guid.NewGuid().ToString("D", CultureInfo.InvariantCulture);
var stopWatch = ValueStopwatch.StartNew();

TimeSpan? retryDelay;
SendingAmqpLink link;
Expand All @@ -457,7 +464,11 @@ protected virtual async Task SendAsync(Func<AmqpMessage> messageFactory,

EventHubsEventSource.Log.EventPublishStart(EventHubName, logPartition, operationId);

link = await SendLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout), cancellationToken).ConfigureAwait(false);
if (!SendLink.TryGetOpenedObject(out link))
{
link = await SendLink.GetOrCreateAsync(UseMinimum(ConnectionScope.SessionTimeout, tryTimeout), cancellationToken).ConfigureAwait(false);
}

cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();

// Validate that the batch of messages is not too large to send. This is done after the link is created to ensure
Expand Down Expand Up @@ -499,7 +510,6 @@ protected virtual async Task SendAsync(Func<AmqpMessage> messageFactory,
await Task.Delay(retryDelay.Value, cancellationToken).ConfigureAwait(false);

tryTimeout = RetryPolicy.CalculateTryTimeout(failedAttemptCount);
stopWatch = ValueStopwatch.StartNew();
}
else if (ex is AmqpException)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ public virtual async IAsyncEnumerable<PartitionEvent> ReadEventsAsync(bool start
var options = readOptions?.Clone() ?? new ReadEventOptions();
var startingPosition = startReadingAtEarliestEvent ? EventPosition.Earliest : EventPosition.Latest;

using var cancellationSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
using var cancellationSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, CancellationToken.None);

try
{
Expand Down
Loading