From 1f3b24a6a47b93f16e272ee37eeaaf3a609d56b0 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Tue, 20 Apr 2021 22:32:35 +0200 Subject: [PATCH] Switch APIs to use the lock token guid to avoid converting from and to string --- .../src/Amqp/AmqpReceiver.cs | 52 +++++++------- .../src/Core/TransportReceiver.cs | 12 ++-- .../src/Diagnostics/ServiceBusEventSource.cs | 68 +++++++++++++++++-- .../src/Processor/ReceiverManager.cs | 6 +- .../src/Receiver/ServiceBusReceiver.cs | 44 ++++++------ .../ServiceBusManagementClientLiveTests.cs | 8 +-- .../tests/Diagnostics/EventSourceTests.cs | 30 ++++---- .../tests/Message/MessageLiveTests.cs | 4 +- .../tests/Receiver/ReceiverLiveTests.cs | 16 ++--- .../Receiver/SessionReceiverLiveTests.cs | 16 ++--- .../Transactions/TransactionLiveTests.cs | 4 +- 11 files changed, 156 insertions(+), 104 deletions(-) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs index bf075eae57ec7..487321460060f 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs @@ -396,7 +396,7 @@ private async Task> ReceiveMessagesAsyn /// /// A task to be resolved on when the operation has completed. public override async Task CompleteAsync( - string lockToken, + Guid lockToken, CancellationToken cancellationToken = default) => await _retryPolicy.RunOperation( static async (value, timeout, _) => @@ -417,20 +417,19 @@ await receiver.CompleteInternalAsync( /// The lockToken of the to complete. /// private async Task CompleteInternalAsync( - string lockToken, + Guid lockToken, TimeSpan timeout) { - Guid lockTokenGuid = new Guid(lockToken); - if (_requestResponseLockedMessages.Contains(lockTokenGuid)) + if (_requestResponseLockedMessages.Contains(lockToken)) { await DisposeMessageRequestResponseAsync( - lockTokenGuid, + lockToken, timeout, DispositionStatus.Completed, SessionId).ConfigureAwait(false); return; } - await DisposeMessageAsync(lockTokenGuid, AmqpConstants.AcceptedOutcome, timeout).ConfigureAwait(false); + await DisposeMessageAsync(lockToken, AmqpConstants.AcceptedOutcome, timeout).ConfigureAwait(false); } /// @@ -532,7 +531,7 @@ private void ThrowLockLostException() /// An optional instance to signal the request to cancel the operation. /// /// - /// A lock token can be found in , + /// A lock token can be found in , /// only when is set to . /// In order to receive this message again in the future, you will need to save /// the @@ -543,7 +542,7 @@ private void ThrowLockLostException() /// /// A task to be resolved on when the operation has completed. public override async Task DeferAsync( - string lockToken, + Guid lockToken, IDictionary propertiesToModify = null, CancellationToken cancellationToken = default) => await _retryPolicy.RunOperation( @@ -566,21 +565,20 @@ await receiver.DeferInternalAsync( /// The properties of the message to modify while deferring the message. /// private Task DeferInternalAsync( - string lockToken, + Guid lockToken, TimeSpan timeout, IDictionary propertiesToModify = null) { - Guid lockTokenGuid = new Guid(lockToken); - if (_requestResponseLockedMessages.Contains(lockTokenGuid)) + if (_requestResponseLockedMessages.Contains(lockToken)) { return DisposeMessageRequestResponseAsync( - lockTokenGuid, + lockToken, timeout, DispositionStatus.Defered, SessionId, propertiesToModify); } - return DisposeMessageAsync(lockTokenGuid, GetDeferOutcome(propertiesToModify), timeout); + return DisposeMessageAsync(lockToken, GetDeferOutcome(propertiesToModify), timeout); } /// @@ -599,7 +597,7 @@ private Task DeferInternalAsync( /// /// A task to be resolved on when the operation has completed. public override async Task AbandonAsync( - string lockToken, + Guid lockToken, IDictionary propertiesToModify = null, CancellationToken cancellationToken = default) => await _retryPolicy.RunOperation( @@ -623,21 +621,20 @@ await receiver.AbandonInternalAsync( /// /// The properties of the message to modify while abandoning the message. private Task AbandonInternalAsync( - string lockToken, + Guid lockToken, TimeSpan timeout, IDictionary propertiesToModify = null) { - Guid lockTokenGuid = new Guid(lockToken); - if (_requestResponseLockedMessages.Contains(lockTokenGuid)) + if (_requestResponseLockedMessages.Contains(lockToken)) { return DisposeMessageRequestResponseAsync( - lockTokenGuid, + lockToken, timeout, DispositionStatus.Abandoned, SessionId, propertiesToModify); } - return DisposeMessageAsync(lockTokenGuid, GetAbandonOutcome(propertiesToModify), timeout); + return DisposeMessageAsync(lockToken, GetAbandonOutcome(propertiesToModify), timeout); } /// @@ -660,7 +657,7 @@ private Task AbandonInternalAsync( /// /// A task to be resolved on when the operation has completed. public override async Task DeadLetterAsync( - string lockToken, + Guid lockToken, string deadLetterReason, string deadLetterErrorDescription = default, IDictionary propertiesToModify = default, @@ -690,7 +687,7 @@ await receiver.DeadLetterInternalAsync( /// The reason for dead-lettering the message. /// The error description for dead-lettering the message. internal virtual Task DeadLetterInternalAsync( - string lockToken, + Guid lockToken, TimeSpan timeout, IDictionary propertiesToModify, string deadLetterReason, @@ -699,11 +696,10 @@ internal virtual Task DeadLetterInternalAsync( Argument.AssertNotTooLong(deadLetterReason, Constants.MaxDeadLetterReasonLength, nameof(deadLetterReason)); Argument.AssertNotTooLong(deadLetterErrorDescription, Constants.MaxDeadLetterReasonLength, nameof(deadLetterErrorDescription)); - Guid lockTokenGuid = new Guid(lockToken); - if (_requestResponseLockedMessages.Contains(lockTokenGuid)) + if (_requestResponseLockedMessages.Contains(lockToken)) { return DisposeMessageRequestResponseAsync( - lockTokenGuid, + lockToken, timeout, DispositionStatus.Suspended, SessionId, @@ -713,7 +709,7 @@ internal virtual Task DeadLetterInternalAsync( } return DisposeMessageAsync( - lockTokenGuid, + lockToken, GetRejectedOutcome(propertiesToModify, deadLetterReason, deadLetterErrorDescription), timeout); } @@ -993,7 +989,7 @@ private async Task> PeekMessagesInterna /// Lock token associated with the message. /// An optional instance to signal the request to cancel the operation. public override async Task RenewMessageLockAsync( - string lockToken, + Guid lockToken, CancellationToken cancellationToken) { return await _retryPolicy.RunOperation( @@ -1018,7 +1014,7 @@ static async (value, timeout, _) => /// Lock token associated with the message. /// private async Task RenewMessageLockInternalAsync( - string lockToken, + Guid lockToken, TimeSpan timeout) { DateTimeOffset lockedUntil; @@ -1033,7 +1029,7 @@ private async Task RenewMessageLockInternalAsync( { amqpRequestMessage.AmqpMessage.ApplicationProperties.Map[ManagementConstants.Request.AssociatedLinkName] = receiveLink.Name; } - amqpRequestMessage.Map[ManagementConstants.Properties.LockTokens] = new[] { new Guid(lockToken) }; + amqpRequestMessage.Map[ManagementConstants.Properties.LockTokens] = new Guid[] { lockToken }; AmqpResponseMessage amqpResponseMessage = await ExecuteRequest( timeout, diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportReceiver.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportReceiver.cs index e7893a1633f62..2c71b69e7a005 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportReceiver.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Core/TransportReceiver.cs @@ -81,7 +81,7 @@ public abstract Task> ReceiveMessagesAs /// /// A task to be resolved on when the operation has completed. public abstract Task CompleteAsync( - string lockToken, + Guid lockToken, CancellationToken cancellationToken); /// Indicates that the receiver wants to defer the processing for the message. @@ -91,7 +91,7 @@ public abstract Task CompleteAsync( /// An optional instance to signal the request to cancel the operation. /// /// - /// A lock token can be found in , + /// A lock token can be found in , /// only when is set to . /// In order to receive this message again in the future, you will need to save the /// and receive it using ReceiveDeferredMessageBatchAsync(IEnumerable, CancellationToken). @@ -101,7 +101,7 @@ public abstract Task CompleteAsync( /// /// A task to be resolved on when the operation has completed. public abstract Task DeferAsync( - string lockToken, + Guid lockToken, IDictionary propertiesToModify = null, CancellationToken cancellationToken = default); @@ -141,7 +141,7 @@ public abstract Task> PeekMessagesAsync /// /// A task to be resolved on when the operation has completed. public abstract Task AbandonAsync( - string lockToken, + Guid lockToken, IDictionary propertiesToModify = null, CancellationToken cancellationToken = default); @@ -165,7 +165,7 @@ public abstract Task AbandonAsync( /// /// A task to be resolved on when the operation has completed. public abstract Task DeadLetterAsync( - string lockToken, + Guid lockToken, string deadLetterReason = default, string deadLetterErrorDescription = default, IDictionary propertiesToModify = default, @@ -191,7 +191,7 @@ public abstract Task> ReceiveDeferredMe /// Lock token associated with the message. /// An optional instance to signal the request to cancel the operation. public abstract Task RenewMessageLockAsync( - string lockToken, + Guid lockToken, CancellationToken cancellationToken); /// diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/ServiceBusEventSource.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/ServiceBusEventSource.cs index 3192a692f0d0e..c088485c096ac 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/ServiceBusEventSource.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/ServiceBusEventSource.cs @@ -413,8 +413,18 @@ public virtual void CancelScheduledMessagesException(string identifier, string e #endregion #region Settlement + + [NonEvent] + public virtual void CompleteMessageStart(string identifier, int messageCount, Guid lockToken) + { + if (IsEnabled()) + { + CompleteMessageStartCore(identifier, messageCount, lockToken.ToString()); + } + } + [Event(CompleteMessageStartEvent, Level = EventLevel.Informational, Message = "{0}: CompleteAsync start. MessageCount = {1}, LockTokens = {2}")] - public virtual void CompleteMessageStart(string identifier, int messageCount, string lockTokens) + public virtual void CompleteMessageStartCore(string identifier, int messageCount, string lockTokens) { if (IsEnabled()) { @@ -440,8 +450,17 @@ public virtual void CompleteMessageException(string identifier, string exception } } + [NonEvent] + public virtual void DeferMessageStart(string identifier, int messageCount, Guid lockToken) + { + if (IsEnabled()) + { + DeferMessageStartCore(identifier, messageCount, lockToken.ToString()); + } + } + [Event(DeferMessageStartEvent, Level = EventLevel.Informational, Message = "{0}: DeferAsync start. MessageCount = {1}, LockToken = {2}")] - public virtual void DeferMessageStart(string identifier, int messageCount, string lockToken) + public virtual void DeferMessageStartCore(string identifier, int messageCount, string lockToken) { if (IsEnabled()) { @@ -464,8 +483,17 @@ public virtual void DeferMessageException(string identifier, string exception) WriteEvent(DeferMessageExceptionEvent, identifier, exception); } + [NonEvent] + public virtual void AbandonMessageStart(string identifier, int messageCount, Guid lockToken) + { + if (IsEnabled()) + { + AbandonMessageStartCore(identifier, messageCount, lockToken.ToString()); + } + } + [Event(AbandonMessageStartEvent, Level = EventLevel.Informational, Message = "{0}: AbandonAsync start. MessageCount = {1}, LockToken = {2}")] - public virtual void AbandonMessageStart(string identifier, int messageCount, string lockToken) + public virtual void AbandonMessageStartCore(string identifier, int messageCount, string lockToken) { if (IsEnabled()) { @@ -491,8 +519,17 @@ public virtual void AbandonMessageException(string identifier, string exception) } } + [NonEvent] + public virtual void DeadLetterMessageStart(string identifier, int messageCount, Guid lockToken) + { + if (IsEnabled()) + { + DeadLetterMessageStartCore(identifier, messageCount, lockToken.ToString()); + } + } + [Event(DeadLetterMessageStartEvent, Level = EventLevel.Informational, Message = "{0}: DeadLetterAsync start. MessageCount = {1}, LockToken = {2}")] - public virtual void DeadLetterMessageStart(string identifier, int messageCount, string lockToken) + public virtual void DeadLetterMessageStartCore(string identifier, int messageCount, string lockToken) { if (IsEnabled()) { @@ -520,8 +557,18 @@ public virtual void DeadLetterMessageException(string identifier, string excepti #endregion #region Lock renewal + + [NonEvent] + public virtual void RenewMessageLockStart(string identifier, int messageCount, Guid lockToken) + { + if (IsEnabled()) + { + RenewMessageLockStartCore(identifier, messageCount, lockToken.ToString()); + } + } + [Event(RenewMessageLockStartEvent, Level = EventLevel.Informational, Message = "{0}: RenewLockAsync start. MessageCount = {1}, LockToken = {2}")] - public virtual void RenewMessageLockStart(string identifier, int messageCount, string lockToken) + public virtual void RenewMessageLockStartCore(string identifier, int messageCount, string lockToken) { if (IsEnabled()) { @@ -688,8 +735,17 @@ public virtual void StopProcessingException(string identifier, string exception) } } + [NonEvent] + public virtual void ProcessorRenewMessageLockStart(string identifier, int messageCount, Guid lockToken) + { + if (IsEnabled()) + { + ProcessorRenewMessageLockStartCore(identifier, messageCount, lockToken.ToString()); + } + } + [Event(ProcessorRenewMessageLockStartEvent, Level = EventLevel.Informational, Message = "{0}: Processor RenewMessageLock start. MessageCount = {1}, LockToken = {2}")] - public virtual void ProcessorRenewMessageLockStart(string identifier, int messageCount, string lockToken) + public virtual void ProcessorRenewMessageLockStartCore(string identifier, int messageCount, string lockToken) { if (IsEnabled()) { diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ReceiverManager.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ReceiverManager.cs index a3848a9629075..12789c2fd9da9 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ReceiverManager.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ReceiverManager.cs @@ -175,7 +175,7 @@ private async Task ProcessOneMessage( // as we want in flight auto-completion to be able // to finish await Receiver.CompleteMessageAsync( - message.LockToken, + message.LockTokenGuid, CancellationToken.None) .ConfigureAwait(false); } @@ -212,7 +212,7 @@ await RaiseExceptionReceived( // as we want in flight abandon to be able // to finish even if user stopped processing await Receiver.AbandonMessageAsync( - message.LockToken, + message.LockTokenGuid, cancellationToken: CancellationToken.None) .ConfigureAwait(false); } @@ -256,7 +256,7 @@ private async Task RenewMessageLock( { try { - ServiceBusEventSource.Log.ProcessorRenewMessageLockStart(Processor.Identifier, 1, message.LockToken); + ServiceBusEventSource.Log.ProcessorRenewMessageLockStart(Processor.Identifier, 1, message.LockTokenGuid); TimeSpan delay = CalculateRenewDelay(message.LockedUntil); // We're awaiting the task created by 'ContinueWith' to avoid awaiting the Delay task which may be canceled diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusReceiver.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusReceiver.cs index 95f4830addb8c..350d4f7ccdb3c 100755 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusReceiver.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Receiver/ServiceBusReceiver.cs @@ -506,7 +506,7 @@ public virtual async Task CompleteMessageAsync( CancellationToken cancellationToken = default) { Argument.AssertNotNull(message, nameof(message)); - await CompleteMessageAsync(message.LockToken, cancellationToken).ConfigureAwait(false); + await CompleteMessageAsync(message.LockTokenGuid, cancellationToken).ConfigureAwait(false); } /// @@ -523,7 +523,7 @@ public virtual async Task CompleteMessageAsync( /// /// A task to be resolved on when the operation has completed. internal virtual async Task CompleteMessageAsync( - string lockToken, + Guid lockToken, CancellationToken cancellationToken = default) { Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusReceiver)); @@ -588,7 +588,7 @@ public virtual async Task AbandonMessageAsync( { Argument.AssertNotNull(message, nameof(message)); await AbandonMessageAsync( - message.LockToken, + message.LockTokenGuid, propertiesToModify, cancellationToken).ConfigureAwait(false); } @@ -609,7 +609,7 @@ await AbandonMessageAsync( /// /// A task to be resolved on when the operation has completed. internal virtual async Task AbandonMessageAsync( - string lockToken, + Guid lockToken, IDictionary propertiesToModify = null, CancellationToken cancellationToken = default) { @@ -676,7 +676,7 @@ public virtual async Task DeadLetterMessageAsync( { Argument.AssertNotNull(message, nameof(message)); await DeadLetterMessageAsync( - lockToken: message.LockToken, + lockToken: message.LockTokenGuid, propertiesToModify: propertiesToModify, cancellationToken: cancellationToken).ConfigureAwait(false); } @@ -698,7 +698,7 @@ await DeadLetterMessageAsync( /// This operation can only be performed when is set to . /// internal virtual async Task DeadLetterMessageAsync( - string lockToken, + Guid lockToken, IDictionary propertiesToModify = null, CancellationToken cancellationToken = default) => await DeadLetterInternalAsync( @@ -716,7 +716,7 @@ await DeadLetterInternalAsync( /// An optional instance to signal the request to cancel the operation. /// /// - /// A lock token can be found in , + /// A lock token can be found in , /// only when is set to . /// In order to receive a message from the dead-letter queue, you will need a new , with the corresponding path. /// You can use EntityNameHelper.FormatDeadLetterPath(string) to help with this. @@ -740,7 +740,7 @@ public virtual async Task DeadLetterMessageAsync( { Argument.AssertNotNull(message, nameof(message)); await DeadLetterMessageAsync( - lockToken: message.LockToken, + lockToken: message.LockTokenGuid, deadLetterReason: deadLetterReason, deadLetterErrorDescription: deadLetterErrorDescription, cancellationToken: cancellationToken).ConfigureAwait(false); @@ -756,14 +756,14 @@ await DeadLetterMessageAsync( /// An optional instance to signal the request to cancel the operation. /// /// - /// A lock token can be found in , + /// A lock token can be found in , /// only when is set to . /// In order to receive a message from the dead-letter queue, you will need a new , with the corresponding path. /// You can use EntityNameHelper.FormatDeadLetterPath(string) to help with this. /// This operation can only be performed on messages that were received by this receiver. /// internal virtual async Task DeadLetterMessageAsync( - string lockToken, + Guid lockToken, string deadLetterReason, string deadLetterErrorDescription = null, CancellationToken cancellationToken = default) => @@ -784,14 +784,14 @@ await DeadLetterInternalAsync( /// /// /// - /// A lock token can be found in , + /// A lock token can be found in , /// only when is set to . /// In order to receive a message from the dead-letter queue, you will need a new , with the corresponding path. /// You can use EntityNameHelper.FormatDeadLetterPath(string) to help with this. /// This operation can only be performed on messages that were received by this receiver. /// private async Task DeadLetterInternalAsync( - string lockToken, + Guid lockToken, string deadLetterReason = default, string deadLetterErrorDescription = default, IDictionary propertiesToModify = default, @@ -836,7 +836,7 @@ await InnerReceiver.DeadLetterAsync( /// An optional instance to signal the request to cancel the operation. /// /// - /// A lock token can be found in , + /// A lock token can be found in , /// only when is set to . /// In order to receive this message again in the future, you will need to save the /// @@ -863,7 +863,7 @@ public virtual async Task DeferMessageAsync( { Argument.AssertNotNull(message, nameof(message)); await DeferMessageAsync( - message.LockToken, + message.LockTokenGuid, propertiesToModify, cancellationToken).ConfigureAwait(false); } @@ -875,7 +875,7 @@ await DeferMessageAsync( /// An optional instance to signal the request to cancel the operation. /// /// - /// A lock token can be found in , + /// A lock token can be found in , /// only when is set to . /// In order to receive this message again in the future, you will need to save the /// @@ -886,7 +886,7 @@ await DeferMessageAsync( /// /// A task to be resolved on when the operation has completed. internal virtual async Task DeferMessageAsync( - string lockToken, + Guid lockToken, IDictionary propertiesToModify = null, CancellationToken cancellationToken = default) { @@ -934,9 +934,9 @@ private void ThrowIfNotPeekLockMode() /// /// Throws an InvalidOperationException when the lock token is empty. /// - private static void ThrowIfLockTokenIsEmpty(string lockToken) + private static void ThrowIfLockTokenIsEmpty(Guid lockToken) { - if (Guid.Parse(lockToken) == Guid.Empty) + if (lockToken == Guid.Empty) { throw new InvalidOperationException(Resources.PeekLockModeRequired); } @@ -953,7 +953,7 @@ private static void ThrowIfLockTokenIsEmpty(string lockToken) /// The deferred message identified by the specified sequence number. Returns null if no message is found. /// Throws if the message has not been deferred. /// - /// + /// public virtual async Task ReceiveDeferredMessageAsync( long sequenceNumber, CancellationToken cancellationToken = default) => @@ -969,7 +969,7 @@ public virtual async Task ReceiveDeferredMessageAsync /// Messages identified by sequence number are returned. Returns null if no messages are found. /// Throws if the messages have not been deferred. /// - /// + /// /// /// The specified sequence number does not correspond to a message that has been deferred. /// The will be set to in this case. @@ -1044,7 +1044,7 @@ public virtual async Task RenewMessageLockAsync( { Argument.AssertNotNull(message, nameof(message)); DateTimeOffset lockedUntil = await RenewMessageLockAsync( - message.LockToken, + message.LockTokenGuid, cancellationToken).ConfigureAwait(false); message.LockedUntil = lockedUntil; } @@ -1063,7 +1063,7 @@ public virtual async Task RenewMessageLockAsync( /// The lockToken of the to renew the lock for. /// An optional instance to signal the request to cancel the operation. internal virtual async Task RenewMessageLockAsync( - string lockToken, + Guid lockToken, CancellationToken cancellationToken = default) { Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusReceiver)); diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Administration/ServiceBusManagementClientLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Administration/ServiceBusManagementClientLiveTests.cs index c78c71f813edd..60933cdf3572e 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Administration/ServiceBusManagementClientLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Administration/ServiceBusManagementClientLiveTests.cs @@ -460,7 +460,7 @@ public async Task GetQueueRuntimeInfo() ServiceBusReceiver receiver = sbClient.CreateReceiver(queueName); ServiceBusReceivedMessage msg = await receiver.ReceiveMessageAsync(); - await receiver.DeadLetterMessageAsync(msg.LockToken); + await receiver.DeadLetterMessageAsync(msg); List runtimeInfoList = new List(); await foreach (QueueRuntimeProperties queueRuntimeInfo in mgmtClient.GetQueuesRuntimePropertiesAsync()) @@ -528,7 +528,7 @@ public async Task GetSubscriptionRuntimeInfoTest() ServiceBusReceiver receiver = sbClient.CreateReceiver(topicName, subscriptionName); ServiceBusReceivedMessage msg = await receiver.ReceiveMessageAsync(); - await receiver.DeadLetterMessageAsync(msg.LockToken); + await receiver.DeadLetterMessageAsync(msg); List runtimeInfoList = new List(); await foreach (SubscriptionRuntimeProperties subscriptionRuntimeInfo in client.GetSubscriptionsRuntimePropertiesAsync(topicName)) @@ -775,13 +775,13 @@ await mgmtClient.CreateQueueAsync( ServiceBusReceivedMessage msg = await receiver.ReceiveMessageAsync(); Assert.NotNull(msg); Assert.AreEqual("mid", msg.MessageId); - await receiver.DeadLetterMessageAsync(msg.LockToken); + await receiver.DeadLetterMessageAsync(msg); receiver = sbClient.CreateReceiver(dlqDestinationName); msg = await receiver.ReceiveMessageAsync(); Assert.NotNull(msg); Assert.AreEqual("mid", msg.MessageId); - await receiver.CompleteMessageAsync(msg.LockToken); + await receiver.CompleteMessageAsync(msg); await mgmtClient.DeleteQueueAsync(queueName); await mgmtClient.DeleteQueueAsync(destinationName); diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceTests.cs index b219ee1e90810..8a03d3cd5b4b0 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceTests.cs @@ -624,7 +624,7 @@ public async Task CompleteMessageLogsEvents() log => log.CompleteMessageStart( receiver.Identifier, 1, - msg.LockToken), + msg.LockTokenGuid), Times.Once); mockLogger .Verify( @@ -642,7 +642,7 @@ public void CompleteMessageExceptionLogsEvents() mockTransportReceiver.Setup( transportReceiver => transportReceiver.CompleteAsync( - It.IsAny(), + It.IsAny(), It.IsAny())) .Throws(new Exception()); var receiver = new ServiceBusReceiver( @@ -665,7 +665,7 @@ public void CompleteMessageExceptionLogsEvents() log => log.CompleteMessageStart( receiver.Identifier, 1, - msg.LockToken), + msg.LockTokenGuid), Times.Once); mockLogger .Verify( @@ -699,7 +699,7 @@ public async Task DeferMessageLogsEvents() log => log.DeferMessageStart( receiver.Identifier, 1, - msg.LockToken), + msg.LockTokenGuid), Times.Once); mockLogger .Verify( @@ -716,7 +716,7 @@ public void DeferMessageExceptionLogsEvents() var mockConnection = GetMockConnection(mockTransportReceiver); mockTransportReceiver.Setup( transportReceiver => transportReceiver.DeferAsync( - It.IsAny(), + It.IsAny(), It.IsAny>(), It.IsAny())) .Throws(new Exception()); @@ -740,7 +740,7 @@ public void DeferMessageExceptionLogsEvents() log => log.DeferMessageStart( receiver.Identifier, 1, - msg.LockToken), + msg.LockTokenGuid), Times.Once); mockLogger .Verify( @@ -774,7 +774,7 @@ public async Task DeadLetterMessageLogsEvents() log => log.DeadLetterMessageStart( receiver.Identifier, 1, - msg.LockToken), + msg.LockTokenGuid), Times.Once); mockLogger .Verify( @@ -791,7 +791,7 @@ public void DeadLetterMessageExceptionLogsEvents() var mockConnection = GetMockConnection(mockTransportReceiver); mockTransportReceiver.Setup( transportReceiver => transportReceiver.DeadLetterAsync( - It.IsAny(), + It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny>(), @@ -817,7 +817,7 @@ public void DeadLetterMessageExceptionLogsEvents() log => log.DeadLetterMessageStart( receiver.Identifier, 1, - msg.LockToken), + msg.LockTokenGuid), Times.Once); mockLogger .Verify( @@ -851,7 +851,7 @@ public async Task AbandonMessageLogsEvents() log => log.AbandonMessageStart( receiver.Identifier, 1, - msg.LockToken), + msg.LockTokenGuid), Times.Once); mockLogger .Verify( @@ -868,7 +868,7 @@ public void AbandonMessageExceptionLogsEvents() var mockConnection = GetMockConnection(mockTransportReceiver); mockTransportReceiver.Setup( transportReceiver => transportReceiver.AbandonAsync( - It.IsAny(), + It.IsAny(), It.IsAny>(), It.IsAny())) .Throws(new Exception()); @@ -892,7 +892,7 @@ public void AbandonMessageExceptionLogsEvents() log => log.AbandonMessageStart( receiver.Identifier, 1, - msg.LockToken), + msg.LockTokenGuid), Times.Once); mockLogger .Verify( @@ -926,7 +926,7 @@ public async Task RenewMessageLockLogsEvents() log => log.RenewMessageLockStart( receiver.Identifier, 1, - msg.LockToken), + msg.LockTokenGuid), Times.Once); mockLogger .Verify( @@ -943,7 +943,7 @@ public void RenewMessageLockExceptionLogsEvents() var mockConnection = GetMockConnection(mockTransportReceiver); mockTransportReceiver.Setup( transportReceiver => transportReceiver.RenewMessageLockAsync( - It.IsAny(), + It.IsAny(), It.IsAny())) .Throws(new Exception()); var receiver = new ServiceBusReceiver( @@ -966,7 +966,7 @@ public void RenewMessageLockExceptionLogsEvents() log => log.RenewMessageLockStart( receiver.Identifier, 1, - msg.LockToken), + msg.LockTokenGuid), Times.Once); mockLogger .Verify( diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Message/MessageLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Message/MessageLiveTests.cs index cedb299aea255..2a3767a538815 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Message/MessageLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Message/MessageLiveTests.cs @@ -88,7 +88,7 @@ public async Task CanSendMessageWithMaxSize() await client.CreateSender(scope.QueueName).SendMessageAsync(maxSizeMessage); var receiver = client.CreateReceiver(scope.QueueName); var receivedMaxSizeMessage = await receiver.ReceiveMessageAsync(); - await receiver.CompleteMessageAsync(receivedMaxSizeMessage.LockToken); + await receiver.CompleteMessageAsync(receivedMaxSizeMessage); Assert.AreEqual(maxPayload, receivedMaxSizeMessage.Body.ToArray()); } } @@ -106,7 +106,7 @@ public async Task CanSendNullBodyMessage() var receiver = client.CreateReceiver(scope.QueueName); var receivedMessage = await receiver.ReceiveMessageAsync(); Assert.IsNotNull(receivedMessage); - await receiver.CompleteMessageAsync(receivedMessage.LockToken); + await receiver.CompleteMessageAsync(receivedMessage); } } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs index cab9f0f308af9..f9344aaa005fc 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/ReceiverLiveTests.cs @@ -272,7 +272,7 @@ public async Task CompleteMessages() remainingMessages--; messageEnum.MoveNext(); Assert.AreEqual(messageEnum.Current.MessageId, item.MessageId); - await receiver.CompleteMessageAsync(item.LockToken); + await receiver.CompleteMessageAsync(item); } } Assert.AreEqual(0, remainingMessages); @@ -308,7 +308,7 @@ public async Task ReceiveIterator() await foreach (var msg in receiver.ReceiveMessagesAsync(cts.Token)) { Assert.AreEqual(messages[ct].MessageId, msg.MessageId); - await receiver.CompleteMessageAsync(msg.LockToken); + await receiver.CompleteMessageAsync(msg, CancellationToken.None); ct++; } } @@ -396,7 +396,7 @@ public async Task DeadLetterMessages() messageEnum.MoveNext(); Assert.AreEqual(messageEnum.Current.MessageId, item.MessageId); Assert.AreEqual(messageEnum.Current.Body.ToArray(), item.Body.ToArray()); - await receiver.DeadLetterMessageAsync(item.LockToken); + await receiver.DeadLetterMessageAsync(item); } } Assert.AreEqual(0, remainingMessages); @@ -416,7 +416,7 @@ public async Task DeadLetterMessages() remainingMessages--; messageEnum.MoveNext(); Assert.AreEqual(messageEnum.Current.MessageId, item.MessageId); - await deadLetterReceiver.CompleteMessageAsync(item.LockToken); + await deadLetterReceiver.CompleteMessageAsync(item); } } Assert.AreEqual(0, remainingMessages); @@ -453,7 +453,7 @@ public async Task DeferMessagesList() messageEnum.MoveNext(); Assert.AreEqual(messageEnum.Current.MessageId, item.MessageId); sequenceNumbers.Add(item.SequenceNumber); - await receiver.DeferMessageAsync(item.LockToken); + await receiver.DeferMessageAsync(item); } } Assert.AreEqual(0, remainingMessages); @@ -507,7 +507,7 @@ public async Task DeferMessagesArray() messageEnum.MoveNext(); Assert.AreEqual(messageEnum.Current.MessageId, item.MessageId); sequenceNumbers[idx++] = item.SequenceNumber; - await receiver.DeferMessageAsync(item.LockToken); + await receiver.DeferMessageAsync(item); } } Assert.AreEqual(0, remainingMessages); @@ -555,7 +555,7 @@ public async Task DeferMessagesEnumerable() messageEnum.MoveNext(); Assert.AreEqual(messageEnum.Current.MessageId, item.MessageId); sequenceNumbers[idx++] = item.SequenceNumber; - await receiver.DeferMessageAsync(item.LockToken); + await receiver.DeferMessageAsync(item); } } Assert.AreEqual(0, remainingMessages); @@ -711,7 +711,7 @@ public async Task RenewMessageLock() Assert.Greater(receivedMessage.LockedUntil, firstLockedUntilUtcTime); // Complete Messages - await receiver.CompleteMessageAsync(receivedMessage.LockToken); + await receiver.CompleteMessageAsync(receivedMessage); Assert.AreEqual(messageCount, receivedMessages.Length); Assert.AreEqual(message.MessageId, receivedMessage.MessageId); diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs index 25df283c9789c..37361fda01cba 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs @@ -366,7 +366,7 @@ public async Task CompleteMessages(bool useSpecificSession) messageEnum.MoveNext(); Assert.AreEqual(messageEnum.Current.MessageId, item.MessageId); Assert.AreEqual(messageEnum.Current.SessionId, item.SessionId); - await receiver.CompleteMessageAsync(item.LockToken); + await receiver.CompleteMessageAsync(item); } } Assert.AreEqual(0, remainingMessages); @@ -464,7 +464,7 @@ public async Task DeadLetterMessages(bool useSpecificSession) messageEnum.MoveNext(); Assert.AreEqual(messageEnum.Current.MessageId, item.MessageId); Assert.AreEqual(messageEnum.Current.SessionId, item.SessionId); - await receiver.DeadLetterMessageAsync(item.LockToken, "testReason", "testDescription"); + await receiver.DeadLetterMessageAsync(item, "testReason", "testDescription"); } } Assert.AreEqual(0, remainingMessages); @@ -489,7 +489,7 @@ public async Task DeadLetterMessages(bool useSpecificSession) Assert.AreEqual(messageEnum.Current.SessionId, msg.SessionId); Assert.AreEqual("testReason", msg.DeadLetterReason); Assert.AreEqual("testDescription", msg.DeadLetterErrorDescription); - await deadLetterReceiver.CompleteMessageAsync(msg.LockToken); + await deadLetterReceiver.CompleteMessageAsync(msg); } } Assert.AreEqual(0, remainingMessages); @@ -537,7 +537,7 @@ public async Task DeadLetterMessagesSubscription(bool useSpecificSession) props[AmqpMessageConstants.DeadLetterReasonHeader] = DateTime.UtcNow; props[AmqpMessageConstants.DeadLetterErrorDescriptionHeader] = DateTime.UtcNow; - await receiver.DeadLetterMessageAsync(item.LockToken, props); + await receiver.DeadLetterMessageAsync(item, props); } } Assert.AreEqual(0, remainingMessages); @@ -564,7 +564,7 @@ public async Task DeadLetterMessagesSubscription(bool useSpecificSession) Assert.IsNull(msg.DeadLetterReason); Assert.IsNotNull(msg.ApplicationProperties[AmqpMessageConstants.DeadLetterReasonHeader]); Assert.IsNotNull(msg.ApplicationProperties[AmqpMessageConstants.DeadLetterErrorDescriptionHeader]); - await deadLetterReceiver.CompleteMessageAsync(msg.LockToken); + await deadLetterReceiver.CompleteMessageAsync(msg); } } Assert.AreEqual(0, remainingMessages); @@ -606,7 +606,7 @@ public async Task DeferMessages(bool useSpecificSession) Assert.AreEqual(messageEnum.Current.MessageId, item.MessageId); Assert.AreEqual(messageEnum.Current.SessionId, item.SessionId); sequenceNumbers.Add(item.SequenceNumber); - await receiver.DeferMessageAsync(item.LockToken); + await receiver.DeferMessageAsync(item); } } Assert.AreEqual(0, remainingMessages); @@ -658,7 +658,7 @@ public async Task RenewSessionLock(bool isSessionSpecified) Assert.Greater(receiver.SessionLockedUntil, firstLockedUntilUtcTime); // Complete Messages - await receiver.CompleteMessageAsync(receivedMessage.LockToken); + await receiver.CompleteMessageAsync(receivedMessage); Assert.AreEqual(messageCount, receivedMessages.Length); if (isSessionSpecified) @@ -864,7 +864,7 @@ async Task RenewLock() await foreach (var msg in receiver.ReceiveMessagesAsync(cts.Token)) { Assert.AreEqual(messages[ct].MessageId, msg.MessageId); - await receiver.CompleteMessageAsync(msg.LockToken); + await receiver.CompleteMessageAsync(msg); ct++; if (ct == messageCount) { diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Transactions/TransactionLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Transactions/TransactionLiveTests.cs index 9702f529bc085..76f096b354317 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Transactions/TransactionLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Transactions/TransactionLiveTests.cs @@ -401,7 +401,7 @@ public async Task TransactionRollbackWorksAcrossClientsUsingSameConnectionToSame using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled)) { - await receiver.CompleteMessageAsync(receivedMessage.LockToken); + await receiver.CompleteMessageAsync(receivedMessage); await sender.SendMessageAsync(message2); } @@ -410,7 +410,7 @@ public async Task TransactionRollbackWorksAcrossClientsUsingSameConnectionToSame await Task.Delay(TimeSpan.FromSeconds(2)); // Following should succeed without exceptions - await receiver.CompleteMessageAsync(receivedMessage.LockToken); + await receiver.CompleteMessageAsync(receivedMessage); // Assert that send failed receivedMessage = await receiver.ReceiveMessageAsync(TimeSpan.FromSeconds(5));