diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs index a303dfcb9e203..6c032fcd89e1d 100755 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/api/Azure.Messaging.ServiceBus.netstandard2.0.cs @@ -370,6 +370,7 @@ protected ServiceBusSessionProcessor() { } public virtual int MaxConcurrentSessions { get { throw null; } } public virtual int PrefetchCount { get { throw null; } } public virtual Azure.Messaging.ServiceBus.ServiceBusReceiveMode ReceiveMode { get { throw null; } } + public virtual System.TimeSpan? SessionIdleTimeout { get { throw null; } } public event System.Func ProcessErrorAsync { add { } remove { } } public event System.Func ProcessMessageAsync { add { } remove { } } public event System.Func SessionClosingAsync { add { } remove { } } @@ -394,6 +395,7 @@ public ServiceBusSessionProcessorOptions() { } public int MaxConcurrentSessions { get { throw null; } set { } } public int PrefetchCount { get { throw null; } set { } } public Azure.Messaging.ServiceBus.ServiceBusReceiveMode ReceiveMode { get { throw null; } set { } } + public System.TimeSpan? SessionIdleTimeout { get { throw null; } set { } } public System.Collections.Generic.IList SessionIds { get { throw null; } } [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] public override bool Equals(object obj) { throw null; } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs index 73e3cffa949ab..4cbf74505c0cd 100755 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs @@ -110,6 +110,7 @@ public class ServiceBusProcessor : IAsyncDisposable /// /// The maximum number of concurrent calls to the message handler. public virtual int MaxConcurrentCalls { get; } + internal TimeSpan? MaxReceiveWaitTime { get; } /// /// Gets a value that indicates whether the processor should automatically @@ -200,6 +201,7 @@ internal ServiceBusProcessor( PrefetchCount = _options.PrefetchCount; MaxAutoLockRenewalDuration = _options.MaxAutoLockRenewalDuration; MaxConcurrentCalls = _options.MaxConcurrentCalls; + MaxReceiveWaitTime = _options.MaxReceiveWaitTime; MaxConcurrentSessions = maxConcurrentSessions; MaxConcurrentCallsPerSession = maxConcurrentCallsPerSession; _sessionIds = sessionIds ?? Array.Empty(); diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessor.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessor.cs index 6a928c45ff9dd..5359c3c39e572 100755 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessor.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessor.cs @@ -83,6 +83,14 @@ public class ServiceBusSessionProcessor : IAsyncDisposable /// public virtual string FullyQualifiedNamespace => _innerProcessor.FullyQualifiedNamespace; + /// + /// Gets the maximum amount of time to wait for a message to be received for the + /// currently active session. After this time has elapsed, the processor will close the session + /// and attempt to process another session. + /// If not specified, the will be used. + /// + public virtual TimeSpan? SessionIdleTimeout => _innerProcessor.MaxReceiveWaitTime; + internal ServiceBusSessionProcessor( ServiceBusConnection connection, string entityPath, diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessorOptions.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessorOptions.cs index cf2a4fd93ff9d..eee10f8ab6225 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessorOptions.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusSessionProcessorOptions.cs @@ -63,31 +63,32 @@ public TimeSpan MaxAutoLockRenewalDuration private TimeSpan _maxAutoRenewDuration = TimeSpan.FromMinutes(5); /// - /// Gets or sets the maximum amount of time to wait for each Receive call using the processor's underlying - /// receiver. + /// Gets or sets the maximum amount of time to wait for a message to be received for the + /// currently active session. After this time has elapsed, the processor will close the session + /// and attempt to process another session. /// If not specified, the will be used. /// /// - /// If no message is returned for a call - /// to Receive, a new session will be requested by the processor. - /// Hence, if this value is set to be too low, it could cause new sessions to be requested - /// more often than necessary. + /// + /// If is populated and is greater or equal to + /// the number of sessions specified in , the session will not be closed when the idle timeout elapses. + /// However, it will still control the amount of time each receive call waits. /// - internal TimeSpan? MaxReceiveWaitTime + public TimeSpan? SessionIdleTimeout { - get => _maxReceiveWaitTime; + get => _sessionIdleTimeout; set { if (value.HasValue) { - Argument.AssertPositive(value.Value, nameof(MaxReceiveWaitTime)); + Argument.AssertPositive(value.Value, nameof(SessionIdleTimeout)); } - _maxReceiveWaitTime = value; + _sessionIdleTimeout = value; } } - private TimeSpan? _maxReceiveWaitTime; + private TimeSpan? _sessionIdleTimeout; /// /// Gets or sets the maximum number of sessions that can be processed concurrently by the processor. @@ -167,7 +168,7 @@ internal ServiceBusProcessorOptions ToProcessorOptions() => PrefetchCount = PrefetchCount, AutoCompleteMessages = AutoCompleteMessages, MaxAutoLockRenewalDuration = MaxAutoLockRenewalDuration, - MaxReceiveWaitTime = MaxReceiveWaitTime + MaxReceiveWaitTime = SessionIdleTimeout }; } } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Administration/ServiceBusManagementClientLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Administration/ServiceBusManagementClientLiveTests.cs index e056a8dd7aa5a..be2d8d5797929 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Administration/ServiceBusManagementClientLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Administration/ServiceBusManagementClientLiveTests.cs @@ -29,20 +29,20 @@ private string GetConnectionString() => TestEnvironment.ServiceBusConnectionString : TestEnvironment.OverrideServiceBusConnectionString; - private ServiceBusAdministrationClient GetClient() => + private ServiceBusAdministrationClient CreateClient() => InstrumentClient( new ServiceBusAdministrationClient( GetConnectionString(), InstrumentClientOptions(new ServiceBusAdministrationClientOptions()))); - private ServiceBusAdministrationClient GetAADClient() => + private ServiceBusAdministrationClient CreateAADClient() => InstrumentClient( new ServiceBusAdministrationClient( TestEnvironment.FullyQualifiedNamespace, GetTokenCredential(), InstrumentClientOptions(new ServiceBusAdministrationClientOptions()))); - private ServiceBusAdministrationClient GetSharedKeyTokenClient() + private ServiceBusAdministrationClient CreateSharedKeyTokenClient() { var properties = ServiceBusConnectionStringProperties.Parse(GetConnectionString()); var credential = new ServiceBusSharedAccessKeyCredential(properties.SharedAccessKeyName, properties.SharedAccessKey); @@ -58,7 +58,7 @@ private ServiceBusAdministrationClient GetSharedKeyTokenClient() public async Task BasicQueueCrudOperations() { var queueName = nameof(BasicQueueCrudOperations).ToLower() + Recording.Random.NewGuid().ToString("D").Substring(0, 8); - var client = GetClient(); + var client = CreateClient(); var queueOptions = new CreateQueueOptions(queueName) { @@ -169,7 +169,7 @@ await client.GetQueueAsync(queueOptions.Name), public async Task BasicTopicCrudOperations() { var topicName = nameof(BasicTopicCrudOperations).ToLower() + Recording.Random.NewGuid().ToString("D").Substring(0, 8); - var client = GetClient(); + var client = CreateClient(); var options = new CreateTopicOptions(topicName) { @@ -267,7 +267,7 @@ public async Task BasicSubscriptionCrudOperations() var topicName = nameof(BasicSubscriptionCrudOperations).ToLower() + Recording.Random.NewGuid().ToString("D").Substring(0, 8); var subscriptionName = Recording.Random.NewGuid().ToString("D").Substring(0, 8); - var client = GetClient(); + var client = CreateClient(); await client.CreateTopicAsync(topicName); @@ -338,7 +338,7 @@ public async Task BasicRuleCrudOperations() { var topicName = Recording.Random.NewGuid().ToString("D").Substring(0, 8); var subscriptionName = Recording.Random.NewGuid().ToString("D").Substring(0, 8); - var client = GetClient(); + var client = CreateClient(); await client.CreateTopicAsync(topicName); var rule1 = new CreateRuleOptions @@ -426,7 +426,7 @@ await client.GetRuleAsync(topicName, subscriptionName, "rule1"), public async Task GetQueueRuntimeInfo() { var queueName = nameof(GetQueueRuntimeInfo).ToLower() + Recording.Random.NewGuid().ToString("D").Substring(0, 8); - var mgmtClient = GetClient(); + var mgmtClient = CreateClient(); await using var sbClient = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); QueueProperties queue = await mgmtClient.CreateQueueAsync(queueName); @@ -486,7 +486,7 @@ public async Task GetSubscriptionRuntimeInfoTest() { var topicName = nameof(GetSubscriptionRuntimeInfoTest).ToLower() + Recording.Random.NewGuid().ToString("D").Substring(0, 8); var subscriptionName = Recording.Random.NewGuid().ToString("D").Substring(0, 8); - var client = GetClient(); + var client = CreateClient(); await using var sbClient = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); await client.CreateTopicAsync(topicName); @@ -578,7 +578,7 @@ public async Task GetTopicRuntimeInfo() { var topicName = nameof(GetTopicRuntimeInfo).ToLower() + Recording.Random.NewGuid().ToString("D").Substring(0, 8); var subscriptionName = Recording.Random.NewGuid().ToString("D").Substring(0, 8); - var client = GetClient(); + var client = CreateClient(); await client.CreateTopicAsync(topicName); @@ -620,7 +620,7 @@ public async Task GetTopicRuntimeInfo() [Test] public async Task ThrowsIfEntityDoesNotExist() { - var client = GetClient(); + var client = CreateClient(); Assert.That( async () => @@ -701,7 +701,7 @@ await client.GetTopicAsync(queueName), [Test] public async Task ThrowsIfEntityAlreadyExists() { - var client = GetClient(); + var client = CreateClient(); var queueName = Recording.Random.NewGuid().ToString("D").Substring(0, 8); var topicName = Recording.Random.NewGuid().ToString("D").Substring(0, 8); var subscriptionName = Recording.Random.NewGuid().ToString("D").Substring(0, 8); @@ -737,7 +737,7 @@ public async Task ForwardingEntity() var queueName = Recording.Random.NewGuid().ToString("D").Substring(0, 8); var destinationName = Recording.Random.NewGuid().ToString("D").Substring(0, 8); var dlqDestinationName = Recording.Random.NewGuid().ToString("D").Substring(0, 8); - var mgmtClient = GetClient(); + var mgmtClient = CreateClient(); await mgmtClient.CreateQueueAsync(dlqDestinationName); await mgmtClient.CreateQueueAsync( @@ -776,7 +776,7 @@ await mgmtClient.CreateQueueAsync( [Test] public async Task SqlFilterParams() { - var client = GetClient(); + var client = CreateClient(); var topicName = Recording.Random.NewGuid().ToString("D").Substring(0, 8); var subscriptionName = Recording.Random.NewGuid().ToString("D").Substring(0, 8); @@ -815,7 +815,7 @@ public async Task CorrelationFilterProperties() { var topicName = Recording.Random.NewGuid().ToString("D").Substring(0, 8); var subscriptionName = Recording.Random.NewGuid().ToString("D").Substring(0, 8); - var client = GetClient(); + var client = CreateClient(); await client.CreateTopicAsync(topicName); await client.CreateSubscriptionAsync(topicName, subscriptionName); @@ -838,7 +838,7 @@ public async Task CorrelationFilterProperties() [Test] public async Task GetNamespaceProperties() { - var client = GetClient(); + var client = CreateClient(); NamespaceProperties nsInfo = await client.GetNamespacePropertiesAsync(); Assert.NotNull(nsInfo); @@ -851,7 +851,7 @@ public async Task AuthenticateWithAAD() { var queueName = Recording.Random.NewGuid().ToString("D").Substring(0, 8); var topicName = Recording.Random.NewGuid().ToString("D").Substring(0, 8); - var client = GetAADClient(); + var client = CreateAADClient(); var queueOptions = new CreateQueueOptions(queueName); QueueProperties createdQueue = await client.CreateQueueAsync(queueOptions); @@ -872,7 +872,7 @@ public async Task AuthenticateWithSharedKeyCredential() { var queueName = Recording.Random.NewGuid().ToString("D").Substring(0, 8); var topicName = Recording.Random.NewGuid().ToString("D").Substring(0, 8); - var client = GetSharedKeyTokenClient(); + var client = CreateSharedKeyTokenClient(); var queueOptions = new CreateQueueOptions(queueName); QueueProperties createdQueue = await client.CreateQueueAsync(queueOptions); diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/DiagnosticScopeLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/DiagnosticScopeLiveTests.cs index d250120aba895..775aea43ea424 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/DiagnosticScopeLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/DiagnosticScopeLiveTests.cs @@ -288,7 +288,7 @@ public async Task SessionProcessorActivities() new ServiceBusSessionProcessorOptions { AutoCompleteMessages = false, - MaxReceiveWaitTime = TimeSpan.FromSeconds(10), + SessionIdleTimeout = TimeSpan.FromSeconds(10), MaxConcurrentSessions = 1 }); TaskCompletionSource tcs = new TaskCompletionSource(); diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceLiveTests.cs index 109739fdbc4bf..e746cb2f02198 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceLiveTests.cs @@ -37,7 +37,7 @@ public async Task LogsEvents() { await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) { - await using var client = GetNoRetryClient(); + await using var client = CreateNoRetryClient(); _listener.SingleEventById(ServiceBusEventSource.ClientCreateStartEvent, e => e.Payload.Contains(nameof(ServiceBusClient)) && e.Payload.Contains(client.FullyQualifiedNamespace)); var messageCount = 10; @@ -134,7 +134,7 @@ public async Task LogsSessionEvents() { await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true)) { - await using var client = GetNoRetryClient(); + await using var client = CreateNoRetryClient(); _listener.SingleEventById(ServiceBusEventSource.ClientCreateStartEvent, e => e.Payload.Contains(nameof(ServiceBusClient)) && e.Payload.Contains(client.FullyQualifiedNamespace)); var messageCount = 10; @@ -282,7 +282,7 @@ public async Task LogsProcessorEvents() { await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) { - await using var client = GetClient(); + await using var client = CreateClient(); var sender = client.CreateSender(scope.QueueName); await sender.SendMessageAsync(GetMessage()); await using var processor = client.CreateProcessor(scope.QueueName); @@ -315,7 +315,7 @@ public async Task LogsProcessorExceptionEvent() { await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) { - await using var client = GetClient(); + await using var client = CreateClient(); var sender = client.CreateSender(scope.QueueName); await sender.SendMessageAsync(GetMessage()); await using var processor = client.CreateProcessor(scope.QueueName); diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/ServiceBusLiveTestBase.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/ServiceBusLiveTestBase.cs index 6c87ed034ad2b..07656677b4645 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/ServiceBusLiveTestBase.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/ServiceBusLiveTestBase.cs @@ -13,7 +13,7 @@ public abstract class ServiceBusLiveTestBase : ServiceBusTestBase { public ServiceBusTestEnvironment TestEnvironment { get; } = ServiceBusTestEnvironment.Instance; - protected ServiceBusClient GetNoRetryClient() + protected ServiceBusClient CreateNoRetryClient() { var options = new ServiceBusClientOptions @@ -29,7 +29,7 @@ protected ServiceBusClient GetNoRetryClient() options); } - protected ServiceBusClient GetClient(int tryTimeout = 15) + protected ServiceBusClient CreateClient(int tryTimeout = 15) { var retryOptions = new ServiceBusRetryOptions(); if (tryTimeout != default) diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs index 69206d3920310..198f3dc4d5df0 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs @@ -24,7 +24,7 @@ public async Task ProcessMessages(int numThreads, bool autoComplete) enablePartitioning: false, enableSession: false)) { - await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); + await using var client = CreateClient(); ServiceBusSender sender = client.CreateSender(scope.QueueName); // use double the number of threads so we can make sure we test that we don't @@ -96,7 +96,7 @@ public async Task UserSettlingWithAutoCompleteDoesNotThrow(int numThreads) enablePartitioning: false, enableSession: false)) { - await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); + await using var client = CreateClient(); ServiceBusSender sender = client.CreateSender(scope.QueueName); // use double the number of threads so we can make sure we test that we don't @@ -179,7 +179,7 @@ public async Task AutoLockRenewalWorks(int numThreads) enableSession: false, lockDuration: lockDuration)) { - await using var client = GetClient(); + await using var client = CreateClient(); ServiceBusSender sender = client.CreateSender(scope.QueueName); using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync(); @@ -251,7 +251,7 @@ public async Task MaxAutoLockRenewalDurationRespected(int numThreads, int autoLo enableSession: false, lockDuration: lockDuration)) { - await using var client = GetClient(); + await using var client = CreateClient(); ServiceBusSender sender = client.CreateSender(scope.QueueName); using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync(); @@ -319,7 +319,7 @@ public async Task CanStopProcessingFromHandler(int numThreads) enablePartitioning: false, enableSession: false)) { - await using var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); + await using var client = CreateClient(); ServiceBusSender sender = client.CreateSender(scope.QueueName); int numMessages = 100; using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync(); @@ -356,7 +356,7 @@ Task ProcessMessage(ProcessMessageEventArgs args) } await tcs.Task; - var receiver = GetNoRetryClient().CreateReceiver(scope.QueueName); + var receiver = CreateNoRetryClient().CreateReceiver(scope.QueueName); var receivedMessages = await receiver.ReceiveMessagesAsync(numMessages); // can't assert on the exact amount processed due to threads that // are already in flight when calling StopProcessingAsync, but we can at least verify that there are remaining messages @@ -370,7 +370,7 @@ public async Task OnMessageExceptionHandlerCalled() { var invalidQueueName = "nonexistentqueuename"; var exceptionReceivedHandlerCalled = false; - var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); + await using var client = CreateClient(); ServiceBusProcessor processor = client.CreateProcessor(invalidQueueName); TaskCompletionSource taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); processor.ProcessMessageAsync += ProcessMessage; @@ -424,7 +424,7 @@ Task ProcessErrors(ProcessErrorEventArgs args) public async Task StartStopMultipleTimes() { var invalidQueueName = "nonexistentqueuename"; - var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); + await using var client = CreateClient(); await using ServiceBusProcessor processor = client.CreateProcessor(invalidQueueName); TaskCompletionSource taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); processor.ProcessMessageAsync += eventArgs => Task.CompletedTask; @@ -454,7 +454,7 @@ public async Task CannotAddHandlerWhileProcessorIsRunning() enablePartitioning: false, enableSession: false)) { - await using var client = GetClient(); + await using var client = CreateClient(); await using var processor = client.CreateProcessor(scope.QueueName); @@ -486,7 +486,7 @@ public async Task StopProcessingDoesNotCancelAutoCompletion() enablePartitioning: false, enableSession: false)) { - await using var client = GetClient(); + await using var client = CreateClient(); var sender = client.CreateSender(scope.QueueName); await sender.SendMessageAsync(GetMessage()); await using var processor = client.CreateProcessor(scope.QueueName, new ServiceBusProcessorOptions @@ -525,7 +525,7 @@ public async Task UserCallbackThrowingCausesMessageToBeAbandonedIfNotSettled(str enablePartitioning: false, enableSession: false)) { - await using var client = GetClient(); + await using var client = CreateClient(); var sender = client.CreateSender(scope.QueueName); await sender.SendMessageAsync(GetMessage()); await using var processor = client.CreateProcessor(scope.QueueName, new ServiceBusProcessorOptions diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorTests.cs index 394858c0ab7cc..16086aecf21ac 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorTests.cs @@ -144,6 +144,7 @@ public void ProcessorOptionsSetOnClient() Assert.AreEqual(options.PrefetchCount, processor.PrefetchCount); Assert.AreEqual(options.ReceiveMode, processor.ReceiveMode); Assert.AreEqual(options.MaxAutoLockRenewalDuration, processor.MaxAutoLockRenewalDuration); + Assert.AreEqual(options.MaxReceiveWaitTime, processor.MaxReceiveWaitTime); Assert.AreEqual(fullyQualifiedNamespace, processor.FullyQualifiedNamespace); Assert.IsFalse(processor.IsClosed); Assert.IsFalse(processor.IsProcessing); diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs index b95313c8f7c54..b8094241f6a30 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs @@ -21,7 +21,7 @@ public async Task CannotRemoveHandlersWhileProcessorIsRunning() enablePartitioning: false, enableSession: true)) { - await using var client = GetClient(); + await using var client = CreateClient(); await using var processor = client.CreateSessionProcessor(scope.QueueName); @@ -62,7 +62,7 @@ public async Task CannotAddHandlersWhileProcessorIsRunning() enablePartitioning: false, enableSession: true)) { - await using var client = GetClient(); + await using var client = CreateClient(); await using var processor = client.CreateSessionProcessor(scope.QueueName); @@ -103,7 +103,7 @@ public async Task ProcessSessionMessage(int numThreads, bool autoComplete) enablePartitioning: false, enableSession: true)) { - await using var client = GetClient(); + await using var client = CreateClient(); ServiceBusSender sender = client.CreateSender(scope.QueueName); // send 1 message for each thread and use a different session for each message @@ -193,7 +193,7 @@ public async Task CanDelayProcessingOfSession() enableSession: true, lockDuration: lockDuration)) { - await using var client = GetClient(); + await using var client = CreateClient(); ServiceBusSender sender = client.CreateSender(scope.QueueName); // send 1 message for each thread and use a different session for each message @@ -270,7 +270,7 @@ public async Task UserSettlingWithAutoCompleteDoesNotThrow(int numThreads) enablePartitioning: false, enableSession: true)) { - await using var client = GetClient(); + await using var client = CreateClient(); ServiceBusSender sender = client.CreateSender(scope.QueueName); for (int i = 0; i < numThreads; i++) @@ -348,7 +348,7 @@ public async Task ProcessConsumesAllMessages(int numThreads, bool autoComplete) enablePartitioning: false, enableSession: true)) { - await using var client = GetClient(); + await using var client = CreateClient(); ServiceBusSender sender = client.CreateSender(scope.QueueName); // send 1 message for each thread and use a different session for each message @@ -370,7 +370,7 @@ public async Task ProcessConsumesAllMessages(int numThreads, bool autoComplete) PrefetchCount = 5 }; - await using ServiceBusSessionProcessor processor = GetNoRetryClient().CreateSessionProcessor(scope.QueueName, options); + await using ServiceBusSessionProcessor processor = CreateNoRetryClient().CreateSessionProcessor(scope.QueueName, options); processor.ProcessMessageAsync += ProcessMessage; processor.ProcessErrorAsync += ExceptionHandler; @@ -416,7 +416,7 @@ async Task ProcessMessage(ProcessSessionMessageEventArgs args) if (!autoComplete) { Assert.That(async () => - await GetNoRetryClient().AcceptNextSessionAsync(scope.QueueName), + await CreateNoRetryClient().AcceptNextSessionAsync(scope.QueueName), Throws.Exception); } } @@ -428,7 +428,7 @@ public async Task OnSessionExceptionHandlerCalledWhenRegisteredOnNonSessionfulQu await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) { var exceptionReceivedHandlerCalled = false; - var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); + await using var client = CreateClient(); await using var processor = client.CreateSessionProcessor(scope.QueueName, new ServiceBusSessionProcessorOptions { @@ -472,7 +472,7 @@ public async Task OnSessionExceptionHandlerCalledWhenRegisteredOnNonSessionfulTo await using (var scope = await ServiceBusScope.CreateWithTopic(enablePartitioning: false, enableSession: false)) { var exceptionReceivedHandlerCalled = false; - var client = new ServiceBusClient(TestEnvironment.ServiceBusConnectionString); + await using var client = CreateClient(); await using var processor = client.CreateSessionProcessor(scope.TopicName, scope.SubscriptionNames.First(), new ServiceBusSessionProcessorOptions { MaxConcurrentSessions = 1 @@ -519,7 +519,7 @@ public async Task ProcessSessionMessageUsingNamedSessionId(int numThreads, bool enablePartitioning: false, enableSession: true)) { - await using var client = GetClient(); + await using var client = CreateClient(); ServiceBusSender sender = client.CreateSender(scope.QueueName); // send 1 message for each thread and use a different session for each message @@ -595,7 +595,7 @@ public async Task AutoLockRenewalWorks(int numThreads, int maxCallsPerSession) enableSession: true, lockDuration: lockDuration)) { - await using var client = GetClient(); + await using var client = CreateClient(); ServiceBusSender sender = client.CreateSender(scope.QueueName); using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync(); @@ -611,7 +611,8 @@ public async Task AutoLockRenewalWorks(int numThreads, int maxCallsPerSession) { MaxConcurrentSessions = numThreads, AutoCompleteMessages = false, - MaxConcurrentCallsPerSession = maxCallsPerSession + MaxConcurrentCallsPerSession = maxCallsPerSession, + SessionIdleTimeout = TimeSpan.FromSeconds(30) }; await using var processor = client.CreateSessionProcessor(scope.QueueName, options); int messageCt = 0; @@ -672,7 +673,7 @@ public async Task MaxAutoLockRenewalDurationRespected( enableSession: true, lockDuration: lockDuration)) { - await using var client = GetClient(); + await using var client = CreateClient(); ServiceBusSender sender = client.CreateSender(scope.QueueName); using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync(); @@ -767,7 +768,7 @@ public async Task StopProcessingDoesNotCancelAutoCompletion() enableSession: true, lockDuration: lockDuration)) { - await using var client = GetClient(); + await using var client = CreateClient(); var sender = client.CreateSender(scope.QueueName); await sender.SendMessageAsync(GetMessage("sessionId")); await using var processor = client.CreateSessionProcessor(scope.QueueName, new ServiceBusSessionProcessorOptions @@ -788,7 +789,7 @@ Task ProcessMessage(ProcessSessionMessageEventArgs args) await tcs.Task; await processor.StopProcessingAsync(); Assert.That( - async () => await GetNoRetryClient().AcceptNextSessionAsync(scope.QueueName), + async () => await CreateNoRetryClient().AcceptNextSessionAsync(scope.QueueName), Throws.InstanceOf().And.Property(nameof(ServiceBusException.Reason)).EqualTo(ServiceBusFailureReason.ServiceTimeout)); } } @@ -807,7 +808,7 @@ public async Task UserCallbackThrowingCausesMessageToBeAbandonedIfNotSettled(str enableSession: true, lockDuration: TimeSpan.FromSeconds(5))) { - await using var client = GetClient(); + await using var client = CreateClient(); var sender = client.CreateSender(scope.QueueName); await sender.SendMessageAsync(GetMessage("sessionId")); await using var processor = client.CreateSessionProcessor(scope.QueueName); @@ -862,7 +863,7 @@ Task ExceptionHandler(ProcessErrorEventArgs args) { try { - await GetNoRetryClient().AcceptNextSessionAsync(scope.QueueName); + await CreateNoRetryClient().AcceptNextSessionAsync(scope.QueueName); } catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.ServiceTimeout || @@ -896,7 +897,7 @@ public async Task ProcessMessagesFromMultipleNamedSessions( enablePartitioning: false, enableSession: true)) { - await using var client = GetClient(); + await using var client = CreateClient(); ServiceBusSender sender = client.CreateSender(scope.QueueName); ConcurrentDictionary sessions = new ConcurrentDictionary(); @@ -994,7 +995,7 @@ public async Task SessionLockLostWhenProcessSessionMessages(int numSessions, int enableSession: true, lockDuration: lockDuration)) { - await using var client = GetClient(); + await using var client = CreateClient(); ServiceBusSender sender = client.CreateSender(scope.QueueName); ConcurrentDictionary sessions = new ConcurrentDictionary(); @@ -1146,7 +1147,7 @@ public async Task UserErrorHandlerInvokedOnceIfSessionLockLost() enableSession: true, lockDuration: lockDuration)) { - await using var client = GetClient(); + await using var client = CreateClient(); ServiceBusSender sender = client.CreateSender(scope.QueueName); ConcurrentDictionary sessions = new ConcurrentDictionary(); @@ -1241,7 +1242,7 @@ public async Task ErrorSourceRespected(ServiceBusErrorSource errorSource) lockDuration: TimeSpan.FromSeconds(5))) { var delayDuration = TimeSpan.FromSeconds(10); - await using var client = GetClient(); + await using var client = CreateClient(); ServiceBusSender sender = client.CreateSender(scope.QueueName); ConcurrentDictionary sessions = new ConcurrentDictionary(); @@ -1420,7 +1421,7 @@ public async Task SessionOpenEventDoesNotLoseLock() enableSession: true, lockDuration: lockDuration)) { - await using var client = GetClient(); + await using var client = CreateClient(); ServiceBusSender sender = client.CreateSender(scope.QueueName); ConcurrentDictionary sessions = new ConcurrentDictionary(); @@ -1489,7 +1490,7 @@ public async Task MaxCallsPerSessionRespected(int numSessions, int maxConcurrent enablePartitioning: false, enableSession: true)) { - await using var client = GetClient(); + await using var client = CreateClient(); ServiceBusSender sender = client.CreateSender(scope.QueueName); ConcurrentDictionary sessions = new ConcurrentDictionary(); @@ -1575,7 +1576,7 @@ public async Task StopProcessingDoesNotCloseLink() enablePartitioning: false, enableSession: true)) { - await using var client = GetClient(); + await using var client = CreateClient(); var sender = client.CreateSender(scope.QueueName); await sender.SendMessageAsync(GetMessage("sessionId")); await using var processor = client.CreateSessionProcessor(scope.QueueName, new ServiceBusSessionProcessorOptions @@ -1600,7 +1601,7 @@ Task ProcessMessage(ProcessSessionMessageEventArgs args) Assert.IsFalse(processor.IsProcessing); Assert.That( - async () => await GetNoRetryClient().AcceptSessionAsync( + async () => await CreateNoRetryClient().AcceptSessionAsync( scope.QueueName, "sessionId"), Throws.InstanceOf().And.Property(nameof(ServiceBusException.Reason)). diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorTests.cs index 517a0ec566837..d6b12694a6302 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorTests.cs @@ -159,6 +159,7 @@ public void ProcessorOptionsSetOnClient() Assert.AreEqual(options.PrefetchCount, processor.PrefetchCount); Assert.AreEqual(options.ReceiveMode, processor.ReceiveMode); Assert.AreEqual(options.MaxAutoLockRenewalDuration, processor.MaxAutoLockRenewalDuration); + Assert.AreEqual(options.SessionIdleTimeout, processor.SessionIdleTimeout); Assert.AreEqual(fullyQualifiedNamespace, processor.FullyQualifiedNamespace); Assert.IsFalse(processor.IsClosed); Assert.IsFalse(processor.IsProcessing); @@ -181,15 +182,15 @@ public void ProcessorOptionsValidation() () => options.MaxAutoLockRenewalDuration = TimeSpan.FromSeconds(-1), Throws.InstanceOf()); Assert.That( - () => options.MaxReceiveWaitTime = TimeSpan.FromSeconds(0), + () => options.SessionIdleTimeout = TimeSpan.FromSeconds(0), Throws.InstanceOf()); Assert.That( - () => options.MaxReceiveWaitTime = TimeSpan.FromSeconds(-1), + () => options.SessionIdleTimeout = TimeSpan.FromSeconds(-1), Throws.InstanceOf()); // should not throw options.PrefetchCount = 0; - options.MaxReceiveWaitTime = TimeSpan.FromSeconds(1); + options.SessionIdleTimeout = TimeSpan.FromSeconds(1); options.MaxAutoLockRenewalDuration = TimeSpan.FromSeconds(0); } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs index 07c1890105f67..ff882fd1d4108 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Receiver/SessionReceiverLiveTests.cs @@ -88,7 +88,7 @@ public async Task LockSameSessionShouldThrow() Assert.That( async () => - await GetNoRetryClient().AcceptSessionAsync( + await CreateNoRetryClient().AcceptSessionAsync( scope.QueueName, sessionId), Throws.InstanceOf().And.Property(nameof(ServiceBusException.Reason)).EqualTo(ServiceBusFailureReason.SessionCannotBeLocked)); @@ -685,7 +685,7 @@ public async Task ClientThrowsSessionCannotBeLockedWhenSessionLocked() { await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true)) { - await using var client = GetClient(); + await using var client = CreateClient(); ServiceBusSender sender = client.CreateSender(scope.QueueName); var sessionId = "sessionId"; var receiver = await client.AcceptSessionAsync(scope.QueueName, sessionId); diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Samples/Sample02_MessageSettlement.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Samples/Sample02_MessageSettlement.cs index 64a0de0d5e6f9..69e8fa9aab6a2 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Samples/Sample02_MessageSettlement.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Samples/Sample02_MessageSettlement.cs @@ -39,7 +39,7 @@ public async Task CompleteMessage() // complete the message, thereby deleting it from the service await receiver.CompleteMessageAsync(receivedMessage); #endregion - Assert.IsNull(await GetNoRetryClient().CreateReceiver(queueName).ReceiveMessageAsync()); + Assert.IsNull(await CreateNoRetryClient().CreateReceiver(queueName).ReceiveMessageAsync()); } } @@ -71,7 +71,7 @@ public async Task AbandonMessage() // abandon the message, thereby releasing the lock and allowing it to be received again by this or other receivers await receiver.AbandonMessageAsync(receivedMessage); #endregion - Assert.IsNotNull(GetNoRetryClient().CreateReceiver(queueName).ReceiveMessageAsync()); + Assert.IsNotNull(CreateNoRetryClient().CreateReceiver(queueName).ReceiveMessageAsync()); } } diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Samples/Sample04_Processor.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Samples/Sample04_Processor.cs index 1219f46691fd6..11716f0db252e 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Samples/Sample04_Processor.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Samples/Sample04_Processor.cs @@ -17,7 +17,7 @@ public async Task ProcessMessages() { string connectionString = TestEnvironment.ServiceBusConnectionString; string queueName = scope.QueueName; - await using var client = GetClient(); + await using var client = CreateClient(); #region Snippet:ServiceBusProcessMessages //@@ string connectionString = ""; diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Samples/Sample05_SessionProcessor.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Samples/Sample05_SessionProcessor.cs index e16244a8cc7ed..ea2f7615cbe71 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Samples/Sample05_SessionProcessor.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Samples/Sample05_SessionProcessor.cs @@ -17,7 +17,7 @@ public async Task ProcessSessionMessages() { string connectionString = TestEnvironment.ServiceBusConnectionString; string queueName = scope.QueueName; - await using var client = GetClient(); + await using var client = CreateClient(); #region Snippet:ServiceBusProcessSessionMessages //@@ string connectionString = ""; diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Samples/Sample06_Transactions.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Samples/Sample06_Transactions.cs index 3bc531d6c4b36..56c7305e2bff9 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Samples/Sample06_Transactions.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Samples/Sample06_Transactions.cs @@ -16,7 +16,7 @@ public async Task TransactionalSendAndComplete() { await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: false)) { - var client = GetClient(); + await using var client = CreateClient(); string queueName = scope.QueueName; #region Snippet:ServiceBusTransactionalSend //@@ string connectionString = ""; @@ -48,7 +48,7 @@ public async Task TransactionalSetSessionState() { await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true)) { - var client = GetClient(); + await using var client = CreateClient(); string queueName = scope.QueueName; #region Snippet:ServiceBusTransactionalSetSessionState //@@ string connectionString = ""; diff --git a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Transactions/TransactionLiveTests.cs b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Transactions/TransactionLiveTests.cs index e6f17ef326a41..af5ef7b48b55a 100644 --- a/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Transactions/TransactionLiveTests.cs +++ b/sdk/servicebus/Azure.Messaging.ServiceBus/tests/Transactions/TransactionLiveTests.cs @@ -94,7 +94,7 @@ public async Task TransactionalSendMultipleSessionsRollback() } Assert.That( async () => - await GetNoRetryClient().AcceptNextSessionAsync(scope.QueueName), Throws.InstanceOf() + await CreateNoRetryClient().AcceptNextSessionAsync(scope.QueueName), Throws.InstanceOf() .And.Property(nameof(ServiceBusException.Reason)) .EqualTo(ServiceBusFailureReason.ServiceTimeout)); };