diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageBatch.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageBatch.cs index 82458f782338d..47366a2684e0f 100755 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageBatch.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageBatch.cs @@ -102,7 +102,7 @@ public override bool TryAddMessage(ServiceBusMessage message) { // Initialize the size by reserving space for the batch envelope taking into account the properties from the first // message which will be used to populate properties on the batch envelope. - amqpMessage = AmqpMessageConverter.BatchSBMessagesAsAmqpMessage(new ServiceBusMessage[] { message }, forceBatch: true); + amqpMessage = AmqpMessageConverter.BatchSBMessagesAsAmqpMessage(message, forceBatch: true); } else { @@ -154,14 +154,14 @@ public override void Clear() /// /// The set of messages as an enumerable of the requested type. /// - public override IEnumerable AsEnumerable() + public override IReadOnlyCollection AsReadOnly() { if (typeof(T) != typeof(ServiceBusMessage)) { throw new FormatException(string.Format(CultureInfo.CurrentCulture, Resources.UnsupportedTransportEventType, typeof(T).Name)); } - return (IEnumerable)BatchMessages; + return (IReadOnlyCollection) BatchMessages; } /// diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs index 7a001436aa6cd..02ea3d4613f63 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpMessageConverter.cs @@ -30,7 +30,14 @@ internal static class AmqpMessageConverter /// The size, in bytes, to use as a buffer for stream operations. private const int StreamBufferSizeInBytes = 512; - public static AmqpMessage BatchSBMessagesAsAmqpMessage(IEnumerable source, bool forceBatch = false) + public static AmqpMessage BatchSBMessagesAsAmqpMessage(ServiceBusMessage source, bool forceBatch = false) + { + Argument.AssertNotNull(source, nameof(source)); + var batchMessages = new List(1) { SBMessageToAmqpMessage(source) }; + return BuildAmqpBatchFromMessages(batchMessages, source, forceBatch); + } + + public static AmqpMessage BatchSBMessagesAsAmqpMessage(IReadOnlyCollection source, bool forceBatch = false) { Argument.AssertNotNull(source, nameof(source)); return BuildAmqpBatchFromMessage(source, forceBatch); @@ -46,25 +53,27 @@ public static AmqpMessage BatchSBMessagesAsAmqpMessage(IEnumerableThe batch containing the source messages. /// - private static AmqpMessage BuildAmqpBatchFromMessage(IEnumerable source, bool forceBatch) + private static AmqpMessage BuildAmqpBatchFromMessage(IReadOnlyCollection source, bool forceBatch) { AmqpMessage firstAmqpMessage = null; ServiceBusMessage firstMessage = null; - return BuildAmqpBatchFromMessages( - source.Select(sbMessage => + var batchMessages = new List(source.Count); + foreach (ServiceBusMessage sbMessage in source) + { + if (firstAmqpMessage == null) { - if (firstAmqpMessage == null) - { - firstAmqpMessage = SBMessageToAmqpMessage(sbMessage); - firstMessage = sbMessage; - return firstAmqpMessage; - } - else - { - return SBMessageToAmqpMessage(sbMessage); - } - }).ToList(), firstMessage, forceBatch); + firstAmqpMessage = SBMessageToAmqpMessage(sbMessage); + firstMessage = sbMessage; + batchMessages.Add(firstAmqpMessage); + } + else + { + batchMessages.Add(SBMessageToAmqpMessage(sbMessage)); + } + } + + return BuildAmqpBatchFromMessages(batchMessages, firstMessage, forceBatch); } /// @@ -78,7 +87,7 @@ private static AmqpMessage BuildAmqpBatchFromMessage(IEnumerableThe batch containing the source messages. /// private static AmqpMessage BuildAmqpBatchFromMessages( - IList batchMessages, + List batchMessages, ServiceBusMessage firstMessage, bool forceBatch) { @@ -90,13 +99,14 @@ private static AmqpMessage BuildAmqpBatchFromMessages( } else { - batchEnvelope = AmqpMessage.Create(batchMessages.Select(message => + var data = new List(batchMessages.Count); + foreach (var message in batchMessages) { message.Batchable = true; using var messageStream = message.ToStream(); - return new Data { Value = ReadStreamToArraySegment(messageStream) }; - })); - + data.Add(new Data { Value = ReadStreamToArraySegment(messageStream) }); + } + batchEnvelope = AmqpMessage.Create(data); batchEnvelope.MessageFormat = AmqpConstants.AmqpBatchedMessageFormat; } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs index f7f2f3655678f..4c1bb92469595 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs @@ -217,7 +217,7 @@ await sender.SendBatchInternalAsync( timeout, token).ConfigureAwait(false); }, - (this, messageBatch.AsEnumerable()), + (this, messageBatch.AsReadOnly()), _connectionScope, cancellationToken).ConfigureAwait(false); } @@ -231,11 +231,10 @@ await sender.SendBatchInternalAsync( /// An optional instance to signal the request to cancel the operation. /// internal virtual async Task SendBatchInternalAsync( - IEnumerable messages, + IReadOnlyCollection messages, TimeSpan timeout, CancellationToken cancellationToken) { - var stopWatch = ValueStopwatch.StartNew(); var link = default(SendingAmqpLink); try @@ -302,7 +301,7 @@ internal virtual async Task SendBatchInternalAsync( /// The list of messages to send. /// An optional instance to signal the request to cancel the operation. public override async Task SendAsync( - IReadOnlyList messages, + IReadOnlyCollection messages, CancellationToken cancellationToken) { await _retryPolicy.RunOperation(static async (value, timeout, token) => @@ -376,7 +375,7 @@ private void OnManagementLinkClosed(object managementLink, EventArgs e) => /// /// public override async Task> ScheduleMessagesAsync( - IReadOnlyList messages, + IReadOnlyCollection messages, CancellationToken cancellationToken = default) { return await _retryPolicy.RunOperation(static async (value, timeout, token) => @@ -401,7 +400,7 @@ public override async Task> ScheduleMessagesAsync( /// /// internal async Task> ScheduleMessageInternalAsync( - IReadOnlyList messages, + IReadOnlyCollection messages, TimeSpan timeout, CancellationToken cancellationToken = default) { @@ -418,7 +417,7 @@ internal async Task> ScheduleMessageInternalAsync( request.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName] = sendLink.Name; } - List entries = new List(); + List entries = new List(messages.Count); foreach (ServiceBusMessage message in messages) { using AmqpMessage amqpMessage = AmqpMessageConverter.SBMessageToAmqpMessage(message); diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportMessageBatch.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportMessageBatch.cs index 69c3410ef6556..eb232392bbd2c 100755 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportMessageBatch.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportMessageBatch.cs @@ -61,7 +61,7 @@ internal abstract class TransportMessageBatch : IDisposable /// /// The set of messages as an enumerable of the requested type. /// - public abstract IEnumerable AsEnumerable(); + public abstract IReadOnlyCollection AsReadOnly(); /// /// Performs the task needed to clean up resources used by the . diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportSender.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportSender.cs index 0451fea53756d..a4668916cc32e 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportSender.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportSender.cs @@ -57,7 +57,7 @@ public abstract ValueTask CreateMessageBatchAsync( /// The list of messages to send. /// An optional instance to signal the request to cancel the operation. public abstract Task SendAsync( - IReadOnlyList messages, + IReadOnlyCollection messages, CancellationToken cancellationToken); /// @@ -80,7 +80,7 @@ public abstract Task SendBatchAsync( /// /// public abstract Task> ScheduleMessagesAsync( - IReadOnlyList messages, + IReadOnlyCollection messages, CancellationToken cancellationToken = default); /// diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/DiagnosticExtensions.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/DiagnosticExtensions.cs index a34a4d2d721e4..b3e59ff2bba51 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/DiagnosticExtensions.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/DiagnosticExtensions.cs @@ -4,7 +4,6 @@ using System; using System.Text; using Azure.Core.Pipeline; -using System.Linq; using System.Collections.Generic; namespace Azure.Messaging.ServiceBus.Diagnostics @@ -16,17 +15,22 @@ public static string GetAsciiString(this ArraySegment arraySegment) return arraySegment.Array == null ? string.Empty : Encoding.ASCII.GetString(arraySegment.Array, arraySegment.Offset, arraySegment.Count); } - public static void SetMessageData(this DiagnosticScope scope, IEnumerable messages) + public static void SetMessageData(this DiagnosticScope scope, ServiceBusReceivedMessage message) + { + scope.AddLinkedDiagnostics(message); + } + + public static void SetMessageData(this DiagnosticScope scope, IReadOnlyCollection messages) { scope.AddLinkedDiagnostics(messages); } - public static void SetMessageData(this DiagnosticScope scope, IEnumerable messages) + public static void SetMessageData(this DiagnosticScope scope, IReadOnlyCollection messages) { scope.AddLinkedDiagnostics(messages); } - private static void AddLinkedDiagnostics(this DiagnosticScope scope, IEnumerable messages) + private static void AddLinkedDiagnostics(this DiagnosticScope scope, IReadOnlyCollection messages) { if (scope.IsEnabled) { @@ -37,7 +41,15 @@ private static void AddLinkedDiagnostics(this DiagnosticScope scope, IEnumerable } } - private static void AddLinkedDiagnostics(this DiagnosticScope scope, IEnumerable messages) + private static void AddLinkedDiagnostics(this DiagnosticScope scope, ServiceBusReceivedMessage message) + { + if (scope.IsEnabled) + { + AddLinkedDiagnostics(scope, message.ApplicationProperties); + } + } + + private static void AddLinkedDiagnostics(this DiagnosticScope scope, IReadOnlyCollection messages) { if (scope.IsEnabled) { @@ -48,6 +60,16 @@ private static void AddLinkedDiagnostics(this DiagnosticScope scope, IEnumerable } } + private static void AddLinkedDiagnostics(this DiagnosticScope scope, IReadOnlyDictionary properties) + { + if (EntityScopeFactory.TryExtractDiagnosticId( + properties, + out string diagnosticId)) + { + scope.AddLink(diagnosticId, null); + } + } + private static void AddLinkedDiagnostics(this DiagnosticScope scope, IDictionary properties) { if (EntityScopeFactory.TryExtractDiagnosticId( diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/EntityScopeFactory.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/EntityScopeFactory.cs index 4ca2df02c8441..889fb8ae81857 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/EntityScopeFactory.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/EntityScopeFactory.cs @@ -36,6 +36,28 @@ public EntityScopeFactory( _fullyQualifiedNamespace = fullyQualifiedNamespace; } + /// + /// Extracts a diagnostic id from a message's properties. + /// + /// + /// The properties holding the diagnostic id. + /// The value of the diagnostics identifier assigned to the event. + /// + /// true if the event was contained the diagnostic id; otherwise, false. + /// + public static bool TryExtractDiagnosticId(IReadOnlyDictionary properties, out string id) + { + id = null; + + if (properties.TryGetValue(DiagnosticProperty.DiagnosticIdAttribute, out var objectId) && objectId is string stringId) + { + id = stringId; + return true; + } + + return false; + } + /// /// Extracts a diagnostic id from a message's properties. /// diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusModelFactory.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusModelFactory.cs index 892398deb2ff5..6a58886416fea 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusModelFactory.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusModelFactory.cs @@ -504,7 +504,11 @@ public override bool TryAddMessage(ServiceBusMessage message) /// /// The set of events as an enumerable of the requested type. /// - public override IEnumerable AsEnumerable() => (IEnumerable)_backingStore; + public override IReadOnlyCollection AsReadOnly() => _backingStore switch + { + IReadOnlyCollection readOnlyCollection => readOnlyCollection, + _ => new List((IEnumerable) _backingStore) + }; /// /// Performs the task needed to clean up resources used by the . diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ReceiverManager.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ReceiverManager.cs index d24f929889537..cfc84b594d1c0 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ReceiverManager.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ReceiverManager.cs @@ -118,7 +118,7 @@ await RaiseExceptionReceived( protected async Task ProcessOneMessageWithinScopeAsync(ServiceBusReceivedMessage message, string activityName, CancellationToken cancellationToken) { using DiagnosticScope scope = _scopeFactory.CreateScope(activityName, DiagnosticScope.ActivityKind.Consumer); - scope.SetMessageData(new ServiceBusReceivedMessage[] { message }); + scope.SetMessageData(message); scope.Start(); try { diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusMessageBatch.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusMessageBatch.cs index e108fec8485a9..844d5bcca3522 100755 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusMessageBatch.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusMessageBatch.cs @@ -140,7 +140,7 @@ internal void Clear() /// /// The set of messages as an enumerable of the requested type. /// - internal IEnumerable AsEnumerable() => _innerBatch.AsEnumerable(); + internal IReadOnlyCollection AsReadOnly() => _innerBatch.AsReadOnly(); /// /// Locks the batch to prevent new messages from being added while a service diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSender.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSender.cs index d40906b6f43f8..cdb280e129cbf 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSender.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Sender/ServiceBusSender.cs @@ -190,26 +190,26 @@ public virtual async Task SendMessagesAsync( Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusSender)); _connection.ThrowIfClosed(); - IReadOnlyList messageList = messages switch + IReadOnlyCollection readOnlyCollection = messages switch { - IReadOnlyList alreadyList => alreadyList, - _ => messages.ToList() + IReadOnlyCollection alreadyReadOnlyCollection => alreadyReadOnlyCollection, + _ => messages.ToArray() }; - if (messageList.Count == 0) + if (readOnlyCollection.Count == 0) { return; } cancellationToken.ThrowIfCancellationRequested(); - Logger.SendMessageStart(Identifier, messageCount: messageList.Count); - using DiagnosticScope scope = CreateDiagnosticScope(messages, DiagnosticProperty.SendActivityName); + Logger.SendMessageStart(Identifier, messageCount: readOnlyCollection.Count); + using DiagnosticScope scope = CreateDiagnosticScope(readOnlyCollection, DiagnosticProperty.SendActivityName); scope.Start(); try { await _innerSender.SendAsync( - messageList, + readOnlyCollection, cancellationToken).ConfigureAwait(false); } @@ -223,7 +223,7 @@ await _innerSender.SendAsync( Logger.SendMessageComplete(Identifier); } - private DiagnosticScope CreateDiagnosticScope(IEnumerable messages, string activityName) + private DiagnosticScope CreateDiagnosticScope(IReadOnlyCollection messages, string activityName) { foreach (ServiceBusMessage message in messages) { @@ -325,7 +325,7 @@ public virtual async Task SendMessagesAsync( cancellationToken.ThrowIfCancellationRequested(); Logger.SendMessageStart(Identifier, messageBatch.Count); using DiagnosticScope scope = CreateDiagnosticScope( - messageBatch.AsEnumerable(), + messageBatch.AsReadOnly(), DiagnosticProperty.SendActivityName); scope.Start(); @@ -405,35 +405,35 @@ public virtual async Task> ScheduleMessagesAsync( _connection.ThrowIfClosed(); cancellationToken.ThrowIfCancellationRequested(); - IReadOnlyList messageList = messages switch + IReadOnlyCollection readOnlyCollection = messages switch { - IReadOnlyList alreadyList => alreadyList, - _ => messages.ToList() + IReadOnlyCollection alreadyReadOnlyCollection => alreadyReadOnlyCollection, + _ => messages.ToArray() }; - if (messageList.Count == 0) + if (readOnlyCollection.Count == 0) { return Array.Empty(); } Logger.ScheduleMessagesStart( Identifier, - messageList.Count, + readOnlyCollection.Count, scheduledEnqueueTime.ToString(CultureInfo.InvariantCulture)); using DiagnosticScope scope = CreateDiagnosticScope( - messages, + readOnlyCollection, DiagnosticProperty.ScheduleActivityName); scope.Start(); IReadOnlyList sequenceNumbers = null; try { - foreach (ServiceBusMessage message in messageList) + foreach (ServiceBusMessage message in readOnlyCollection) { message.ScheduledEnqueueTime = scheduledEnqueueTime.UtcDateTime; } - sequenceNumbers = await _innerSender.ScheduleMessagesAsync(messageList, cancellationToken).ConfigureAwait(false); + sequenceNumbers = await _innerSender.ScheduleMessagesAsync(readOnlyCollection, cancellationToken).ConfigureAwait(false); } catch (Exception exception) { diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Amqp/AmqpMessageBatchTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Amqp/AmqpMessageBatchTests.cs index ef41cdab4c11b..8d7bf1869e269 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Amqp/AmqpMessageBatchTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Amqp/AmqpMessageBatchTests.cs @@ -76,7 +76,7 @@ public void TryAddValidatesNotDisposed() var batch = new AmqpMessageBatch(new CreateMessageBatchOptions { MaxSizeInBytes = 25 }); batch.Dispose(); - Assert.That(() => batch.TryAddMessage(new ServiceBusMessage(new byte[0])), Throws.InstanceOf()); + Assert.That(() => batch.TryAddMessage(new ServiceBusMessage(Array.Empty())), Throws.InstanceOf()); } /// @@ -107,7 +107,7 @@ public void TryAddAcceptsAMessageSmallerThanTheMaximumSize() var batch = new AmqpMessageBatch(options); - Assert.That(batch.TryAddMessage(new ServiceBusMessage(new byte[0])), Is.True); + Assert.That(batch.TryAddMessage(new ServiceBusMessage(Array.Empty())), Is.True); } /// @@ -159,33 +159,33 @@ public void TryAddSetsTheCount() for (var index = 0; index < messages.Length; ++index) { - Assert.That(batch.TryAddMessage(new ServiceBusMessage(new byte[0])), Is.True, $"The addition for index: { index } should fit and be accepted."); + Assert.That(batch.TryAddMessage(new ServiceBusMessage(Array.Empty())), Is.True, $"The addition for index: { index } should fit and be accepted."); } Assert.That(batch.Count, Is.EqualTo(messages.Length), "The count should have been set when the batch was updated."); } /// - /// Verifies functionality of the + /// Verifies functionality of the /// method. /// /// [Test] - public void AsEnumerableValidatesTheTypeParameter() + public void AsReadOnlyValidatesTheTypeParameter() { var options = new CreateMessageBatchOptions { MaxSizeInBytes = 5000 }; var batch = new AmqpMessageBatch(options); - Assert.That(() => batch.AsEnumerable(), Throws.InstanceOf()); + Assert.That(() => batch.AsReadOnly(), Throws.InstanceOf()); } /// - /// Verifies functionality of the + /// Verifies functionality of the /// method. /// /// [Test] - public void AsEnumerableReturnsTheMessages() + public void AsReadOnlyReturnsTheMessages() { var maximumSize = 5000; var options = new CreateMessageBatchOptions { MaxSizeInBytes = maximumSize }; @@ -195,19 +195,17 @@ public void AsEnumerableReturnsTheMessages() for (var index = 0; index < batchMessages.Length; ++index) { - batchMessages[index] = new ServiceBusMessage(new byte[0]); + batchMessages[index] = new ServiceBusMessage(Array.Empty()); batch.TryAddMessage(batchMessages[index]); } - IEnumerable batchEnumerable = batch.AsEnumerable(); - Assert.That(batchEnumerable, Is.Not.Null, "The batch enumerable should have been populated."); - - var batchEnumerableList = batchEnumerable.ToList(); - Assert.That(batchEnumerableList.Count, Is.EqualTo(batch.Count), "The wrong number of messages was in the enumerable."); + var batchReadOnly = batch.AsReadOnly(); + Assert.That(batchReadOnly, Is.Not.Null, "The batch enumerable should have been populated."); + Assert.That(batchReadOnly.Count, Is.EqualTo(batch.Count), "The wrong number of messages was in the enumerable."); for (var index = 0; index < batchMessages.Length; ++index) { - Assert.That(batchEnumerableList.Contains(batchMessages[index]), $"The message at index: { index } was not in the enumerable."); + Assert.That(batchReadOnly.Contains(batchMessages[index]), $"The message at index: { index } was not in the enumerable."); } } @@ -233,7 +231,7 @@ public void ClearClearsTheCount() for (var index = 0; index < messages.Length; ++index) { - Assert.That(batch.TryAddMessage(new ServiceBusMessage(new byte[0])), Is.True, $"The addition for index: { index } should fit and be accepted."); + Assert.That(batch.TryAddMessage(new ServiceBusMessage(Array.Empty())), Is.True, $"The addition for index: { index } should fit and be accepted."); } Assert.That(batch.Count, Is.EqualTo(messages.Length), "The count should have been set when the batch was updated."); @@ -264,7 +262,7 @@ public void ClearClearsTheSize() for (var index = 0; index < messages.Length; ++index) { - Assert.That(batch.TryAddMessage(new ServiceBusMessage(new byte[0])), Is.True, $"The addition for index: { index } should fit and be accepted."); + Assert.That(batch.TryAddMessage(new ServiceBusMessage(Array.Empty())), Is.True, $"The addition for index: { index } should fit and be accepted."); } Assert.That(batch.SizeInBytes, Is.GreaterThan(0), "The size should have been set when the batch was updated."); @@ -295,7 +293,7 @@ public void DisposeClearsTheCount() for (var index = 0; index < messages.Length; ++index) { - Assert.That(batch.TryAddMessage(new ServiceBusMessage(new byte[0])), Is.True, $"The addition for index: { index } should fit and be accepted."); + Assert.That(batch.TryAddMessage(new ServiceBusMessage(Array.Empty())), Is.True, $"The addition for index: { index } should fit and be accepted."); } Assert.That(batch.Count, Is.EqualTo(messages.Length), "The count should have been set when the batch was updated."); @@ -326,7 +324,7 @@ public void DisposeClearsTheSize() for (var index = 0; index < messages.Length; ++index) { - Assert.That(batch.TryAddMessage(new ServiceBusMessage(new byte[0])), Is.True, $"The addition for index: { index } should fit and be accepted."); + Assert.That(batch.TryAddMessage(new ServiceBusMessage(Array.Empty())), Is.True, $"The addition for index: { index } should fit and be accepted."); } Assert.That(batch.SizeInBytes, Is.GreaterThan(0), "The size should have been set when the batch was updated."); diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/DiagnosticScopeLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/DiagnosticScopeLiveTests.cs index d4004199b86be..7d893c87edd7c 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/DiagnosticScopeLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/DiagnosticScopeLiveTests.cs @@ -161,7 +161,7 @@ public async Task SenderReceiverActivities(bool useSessions) batch.TryAddMessage(ServiceBusTestUtilities.GetMessage(sessionId)); } await sender.SendMessagesAsync(batch); - AssertSendActivities(useSessions, sender, batch.AsEnumerable()); + AssertSendActivities(useSessions, sender, batch.AsReadOnly()); }; } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceLiveTests.cs index 0ef08f0ea3796..b23a9fe816cbd 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceLiveTests.cs @@ -48,7 +48,7 @@ public async Task LogsEvents() _listener.SingleEventById(ServiceBusEventSource.CreateMessageBatchStartEvent, e => e.Payload.Contains(sender.Identifier)); _listener.SingleEventById(ServiceBusEventSource.CreateMessageBatchCompleteEvent, e => e.Payload.Contains(sender.Identifier)); - IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount).AsEnumerable(); + IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount).AsReadOnly(); await sender.SendMessagesAsync(batch); _listener.SingleEventById(ServiceBusEventSource.CreateSendLinkStartEvent, e => e.Payload.Contains(sender.Identifier)); @@ -163,7 +163,7 @@ public async Task LogsSessionEvents() _listener.SingleEventById(ServiceBusEventSource.CreateMessageBatchStartEvent, e => e.Payload.Contains(sender.Identifier)); _listener.SingleEventById(ServiceBusEventSource.CreateMessageBatchCompleteEvent, e => e.Payload.Contains(sender.Identifier)); - IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount, "sessionId").AsEnumerable(); + IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount, "sessionId").AsReadOnly(); await sender.SendMessagesAsync(batch); _listener.SingleEventById(ServiceBusEventSource.CreateSendLinkStartEvent, e => e.Payload.Contains(sender.Identifier)); diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceTests.cs index 5ebf3fc647518..8842b58ee9857 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceTests.cs @@ -88,7 +88,7 @@ public void SendMessageExceptionLogsEvents() mockTransportSender.Setup( sender => sender.SendAsync( - It.IsAny>(), + It.IsAny>(), It.IsAny())) .Throws(new Exception()); @@ -128,6 +128,10 @@ public async Task SendBatchOfMessagesLogsEvents() .Setup(transport => transport.Count) .Returns(3); + mockTransportBatch + .Setup(transport => transport.AsReadOnly()) + .Returns(new List()); + mockTransportSender.Setup( sender => sender.CreateMessageBatchAsync( It.IsAny(), @@ -173,7 +177,7 @@ public async Task ScheduleMessageLogsEvents() }; mockTransportSender.Setup( sender => sender.ScheduleMessagesAsync( - It.IsAny>(), + It.IsAny>(), It.IsAny())) .Returns(Task.FromResult((IReadOnlyList) new List { 1 })); @@ -210,7 +214,7 @@ public void ScheduleMessageExceptionLogsEvents() mockTransportSender.Setup( sender => sender.ScheduleMessagesAsync( - It.IsAny>(), + It.IsAny>(), It.IsAny())) .Throws(new Exception()); diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Message/MessageBatchTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Message/MessageBatchTests.cs index 44e6f4d50d21d..208204fa2bf66 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Message/MessageBatchTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Message/MessageBatchTests.cs @@ -31,7 +31,7 @@ public void EventDataBatchRespectsTheTryAddCallback() Assert.That(() => batch.TryAddMessage(new ServiceBusMessage(new BinaryData("Too many"))), Is.False, "The batch is full; a second attempt to add a new event should not succeed."); Assert.That(store.Count, Is.EqualTo(eventLimit), "The batch should be at its limit after the failed TryAdd attempts."); - Assert.That(batch.AsEnumerable(), Is.EquivalentTo(store), "The batch enumerable should reflect the events in the backing store."); + Assert.That(batch.AsReadOnly(), Is.EquivalentTo(store), "The batch enumerable should reflect the events in the backing store."); } /// diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs index 8eead15624ee5..56bf46a03adaa 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs @@ -25,7 +25,7 @@ public async Task PeekUsingConnectionStringWithSharedKey() ServiceBusSender sender = client.CreateSender(scope.QueueName); using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync(); - IEnumerable sentMessages = ServiceBusTestUtilities.AddMessages(batch, messageCt).AsEnumerable(); + IEnumerable sentMessages = ServiceBusTestUtilities.AddMessages(batch, messageCt).AsReadOnly(); await sender.SendMessagesAsync(batch); @@ -85,7 +85,7 @@ public async Task PeekUsingConnectionStringWithSas() ServiceBusSender sender = client.CreateSender(scope.QueueName); using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync(); - IEnumerable sentMessages = ServiceBusTestUtilities.AddMessages(batch, messageCt).AsEnumerable(); + IEnumerable sentMessages = ServiceBusTestUtilities.AddMessages(batch, messageCt).AsReadOnly(); await sender.SendMessagesAsync(batch); @@ -141,7 +141,7 @@ public async Task ReceiveMessagesWhenQueueEmpty() var messageCount = 2; ServiceBusSender sender = client.CreateSender(scope.QueueName); using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync(); - IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount).AsEnumerable(); + IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount).AsReadOnly(); await sender.SendMessagesAsync(batch); var receiver = client.CreateReceiver(scope.QueueName); @@ -173,7 +173,7 @@ public async Task CancellingDoesNotLoseMessages(bool prefetch) var messageCount = 10; ServiceBusSender sender = client.CreateSender(scope.QueueName); using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync(); - IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount).AsEnumerable(); + IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount).AsReadOnly(); await sender.SendMessagesAsync(batch); var receiver = client.CreateReceiver( scope.QueueName, @@ -244,7 +244,7 @@ public async Task ReceiveMessagesInPeekLockMode() ServiceBusSender sender = client.CreateSender(scope.QueueName); using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync(); - IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount).AsEnumerable(); + IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount).AsReadOnly(); await sender.SendMessagesAsync(batch); @@ -281,7 +281,7 @@ public async Task CompleteMessages() ServiceBusSender sender = client.CreateSender(scope.QueueName); using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync(); - IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount).AsEnumerable(); + IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount).AsReadOnly(); await sender.SendMessagesAsync(batch); @@ -317,7 +317,7 @@ public async Task ServerBusyRespected() ServiceBusSender sender = client.CreateSender(scope.QueueName); using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync(); - IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount).AsEnumerable(); + IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount).AsReadOnly(); await sender.SendMessagesAsync(batch); @@ -391,7 +391,7 @@ public async Task AbandonMessages() ServiceBusSender sender = client.CreateSender(scope.QueueName); using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync(); - IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount).AsEnumerable(); + IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount).AsReadOnly(); await sender.SendMessagesAsync(batch); @@ -500,7 +500,7 @@ public async Task DeferMessagesList() ServiceBusSender sender = client.CreateSender(scope.QueueName); using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync(); - IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount).AsEnumerable(); + IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount).AsReadOnly(); await sender.SendMessagesAsync(batch); @@ -555,7 +555,7 @@ public async Task DeferMessagesArray() ServiceBusSender sender = client.CreateSender(scope.QueueName); using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync(); - IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount).AsEnumerable(); + IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount).AsReadOnly(); await sender.SendMessagesAsync(batch); @@ -604,7 +604,7 @@ public async Task DeferMessagesEnumerable() ServiceBusSender sender = client.CreateSender(scope.QueueName); using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync(); - IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount).AsEnumerable(); + IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount).AsReadOnly(); await sender.SendMessagesAsync(batch); @@ -688,7 +688,7 @@ public async Task ReceiveMessagesInReceiveAndDeleteMode() ServiceBusSender sender = client.CreateSender(scope.QueueName); using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync(); - IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount).AsEnumerable(); + IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount).AsReadOnly(); await sender.SendMessagesAsync(batch); diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs index cfafb234d9390..030adb8c841a8 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs @@ -30,7 +30,7 @@ public async Task PeekSession(long? sequenceNumber) // send the messages using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync(); IEnumerable sentMessages = ServiceBusTestUtilities.AddMessages(batch, messageCt, sessionId, sessionId) - .AsEnumerable(); + .AsReadOnly(); await sender.SendMessagesAsync(batch); Dictionary sentMessageIdToMsg = new Dictionary(); @@ -218,7 +218,7 @@ public async Task ReceiveMessagesWhenQueueEmpty() var sessionId = "sessionId1"; ServiceBusSender sender = client.CreateSender(scope.QueueName); using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync(); - IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount, sessionId).AsEnumerable(); + IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount, sessionId).AsReadOnly(); await sender.SendMessagesAsync(batch); ServiceBusReceiver receiver = await client.AcceptNextSessionAsync( @@ -259,7 +259,7 @@ public async Task ReceiveMessagesInPeekLockMode() var messageCount = 10; var sessionId = "sessionId1"; using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync(); - IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount, sessionId).AsEnumerable(); + IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount, sessionId).AsReadOnly(); await sender.SendMessagesAsync(batch); @@ -301,7 +301,7 @@ public async Task ReceiveMessagesInReceiveAndDeleteMode() var messageCount = 10; var sessionId = "sessionId1"; using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync(); - IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount, sessionId).AsEnumerable(); + IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount, sessionId).AsReadOnly(); await sender.SendMessagesAsync(batch); @@ -348,7 +348,7 @@ public async Task CompleteMessages(bool useSpecificSession) var messageCount = 10; var sessionId = "sessionId1"; using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync(); - IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount, sessionId).AsEnumerable(); + IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount, sessionId).AsReadOnly(); await sender.SendMessagesAsync(batch); @@ -389,7 +389,7 @@ public async Task AbandonMessages(bool useSpecificSession) var messageCount = 10; var sessionId = "sessionId1"; using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync(); - IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount, sessionId).AsEnumerable(); + IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount, sessionId).AsReadOnly(); await sender.SendMessagesAsync(batch); @@ -446,7 +446,7 @@ public async Task DeadLetterMessages(bool useSpecificSession) var messageCount = 10; var sessionId = "sessionId1"; using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync(); - IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount, sessionId).AsEnumerable(); + IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount, sessionId).AsReadOnly(); await sender.SendMessagesAsync(batch); @@ -511,7 +511,7 @@ public async Task DeadLetterMessagesSubscription(bool useSpecificSession) var messageCount = 10; var sessionId = "sessionId1"; using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync(); - IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount, sessionId).AsEnumerable(); + IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount, sessionId).AsReadOnly(); await sender.SendMessagesAsync(batch); var topicName = scope.TopicName; @@ -587,7 +587,7 @@ public async Task DeferMessages(bool useSpecificSession) var messageCount = 10; var sessionId = "sessionId1"; using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync(); - IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount, sessionId).AsEnumerable(); + IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount, sessionId).AsReadOnly(); await sender.SendMessagesAsync(batch); @@ -955,7 +955,7 @@ public async Task CancellingDoesNotLoseSessionMessages(bool prefetch) var messageCount = 10; ServiceBusSender sender = client.CreateSender(scope.QueueName); using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync(); - IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount, "sessionId").AsEnumerable(); + IEnumerable messages = ServiceBusTestUtilities.AddMessages(batch, messageCount, "sessionId").AsReadOnly(); await sender.SendMessagesAsync(batch); var receiver = await client.AcceptSessionAsync( scope.QueueName, diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Sender/SenderTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Sender/SenderTests.cs index 8fef13d966ead..73aa1a3637e20 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Sender/SenderTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Sender/SenderTests.cs @@ -160,6 +160,10 @@ public async Task SendBatchManagesLockingTheBatch() .Setup(transport => transport.Count) .Returns(1); + mockTransportBatch + .Setup(transport => transport.AsReadOnly()) + .Returns(new List()); + mockTransportSender .Setup(transport => transport.SendBatchAsync(It.IsAny(), It.IsAny())) .Returns(async () => await Task.WhenAny(completionSource.Task, Task.Delay(Timeout.Infinite, cancellationSource.Token))); diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Sender/ServiceBusMessageBatchTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Sender/ServiceBusMessageBatchTests.cs index dc280c191f774..a1da726b552d2 100755 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Sender/ServiceBusMessageBatchTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Sender/ServiceBusMessageBatchTests.cs @@ -83,20 +83,20 @@ public void TryAddIsDelegatedToTheTransportClient() } /// - /// Verifies property accessors for the + /// Verifies property accessors for the /// method. /// /// [Test] - public void AsEnumerableIsDelegatedToTheTransportClient() + public void AsReadOnlyIsDelegatedToTheTransportClient() { var mockBatch = new MockTransportBatch(); var mockScope = new EntityScopeFactory("mock", "mock"); var batch = new ServiceBusMessageBatch(mockBatch, mockScope); - batch.AsEnumerable(); - Assert.That(mockBatch.AsEnumerableCalledWith, Is.EqualTo(typeof(string)), "The enumerable should delegated the requested type parameter."); + batch.AsReadOnly(); + Assert.That(mockBatch.AsReadOnlyCalledWith, Is.EqualTo(typeof(string)), "The enumerable should delegated the requested type parameter."); } /// @@ -211,7 +211,7 @@ private class MockTransportBatch : TransportMessageBatch { public bool DisposeInvoked = false; public bool ClearInvoked = false; - public Type AsEnumerableCalledWith = null; + public Type AsReadOnlyCalledWith = null; public ServiceBusMessage TryAddCalledWith = null; public override long MaxSizeInBytes { get; } = 200; @@ -226,9 +226,9 @@ public override bool TryAddMessage(ServiceBusMessage message) return true; } - public override IEnumerable AsEnumerable() + public override IReadOnlyCollection AsReadOnly() { - AsEnumerableCalledWith = typeof(T); + AsReadOnlyCalledWith = typeof(T); return default; } }