From 03f9a61d0dd9e710738f5ead666707144556b4fe Mon Sep 17 00:00:00 2001 From: Mauro Servienti Date: Thu, 16 Sep 2021 12:57:46 +0200 Subject: [PATCH] Use a rate limiter and in-memory cache to fix rate exceeded exception when using pub/sub hybrid mode (#998) * Adjust class names to respect conventions Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com> Co-Authored-By: Dennis van der Stelt * Guard against negative and zero Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com> Co-Authored-By: Dennis van der Stelt * Make sure that clean up cleans CLI tests artifacts Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com> Co-Authored-By: Dennis van der Stelt * Allow tests to use a fixed name infrastrucutre Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com> Co-Authored-By: Dennis van der Stelt * Configure tests for tests independency Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com> Co-Authored-By: Dennis van der Stelt * Set test execution timeout only if needed Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com> Co-Authored-By: Dennis van der Stelt * Use fixed name infrastructure for migration tests Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com> Co-Authored-By: Dennis van der Stelt * Extend test execution timeout for migration tests Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com> Co-Authored-By: Dennis van der Stelt * Do not rely on oder in message dispatcher tests Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com> Co-Authored-By: Dennis van der Stelt * Allow tests to append a sequence # to name prefix Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com> Co-Authored-By: Dennis van der Stelt * Define rate exceed failing test cases Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com> Co-Authored-By: Dennis van der Stelt * Allow custom subscription migration settings Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com> Co-Authored-By: Dennis van der Stelt * Introduce the concept of rate limiting Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com> Co-Authored-By: Dennis van der Stelt * Define custom rate limiters to throttled APIs Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com> Co-Authored-By: Dennis van der Stelt * Use rate limiter when fetching subscriptions Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com> Co-Authored-By: Dennis van der Stelt * Cache null topics lookups, use rate limiter if needed Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com> Co-Authored-By: Dennis van der Stelt * Extract hybrid mode check, cache subscriptions lookups Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com> Co-Authored-By: Dennis van der Stelt * Use user configured message visibility timeout Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com> Co-Authored-By: Dennis van der Stelt * Log a warning if using default visibility timeout and migration mode Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com> Co-Authored-By: Dennis van der Stelt * Define required variables for tests needing fixed infrastructure Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com> Co-Authored-By: Dennis van der Stelt * typofixes and renamings Co-authored-by: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com> Co-authored-by: Dennis van der Stelt Co-authored-by: Tomek Masternak --- .github/workflows/ci.yml | 4 + README.md | 12 +- .../ConfigureEndpointSqsTransport.cs | 10 +- .../HybridModeRateLimit/TestCase.cs | 28 ++ ...ve_and_non_native_subscribers_in_a_loop.cs | 179 ++++++++++++ ...loop_in_the_context_of_incoming_message.cs | 214 ++++++++++++++ ...ve_and_non_native_subscribers_in_a_loop.cs | 242 +++++++++++++++ ...loop_in_the_context_of_incoming_message.cs | 276 ++++++++++++++++++ .../When_migrating_publisher_first.cs | 14 +- .../When_migrating_subscriber_first.cs | 14 +- .../SetupFixture.cs | 58 ++++ .../TestIndependenceMutator.cs | 22 ++ .../TestIndependenceSkipBehavior.cs | 29 ++ .../UseFixedNamePrefixAttribute.cs | 14 + ...Approvals.ApproveSqsTransport.approved.txt | 14 +- .../Cleanup.cs | 4 + .../MessageDispatcherTests.cs | 25 +- .../SubscriptionManagerTests.cs | 3 +- .../Configure/SettingsKeys.cs | 5 + .../SqsSubscriptionMigrationModeSettings.cs | 65 +++++ .../Configure/SqsTransportInfrastructure.cs | 7 + .../Configure/SqsTransportSettings.cs | 6 +- .../Extensions/SnsClientExtensions.cs | 33 ++- src/NServiceBus.Transport.SQS/Guard.cs | 8 + .../HybridPubSubChecker.cs | 108 +++++++ .../InputQueuePump.cs | 3 +- .../MessageDispatcher.cs | 21 +- .../RateLimiter/AwaitableConstraint.cs | 88 ++++++ .../RateLimiter/DisposableAction.cs | 23 ++ .../RateLimiter/RateLimiter.cs | 22 ++ .../RateLimiter/SizeConstrainedStack.cs | 27 ++ .../SnsListSubscriptionsByTopicRateLimiter.cs | 13 + .../RateLimiter/SnsListTopicsRateLimiter.cs | 13 + .../SubscriptionManager.cs | 6 +- src/NServiceBus.Transport.SQS/TopicCache.cs | 90 +++++- .../TransportConfiguration.cs | 49 ++++ 36 files changed, 1679 insertions(+), 70 deletions(-) create mode 100644 src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/TestCase.cs create mode 100644 src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/When_publishing_one_event_type_to_native_and_non_native_subscribers_in_a_loop.cs create mode 100644 src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/When_publishing_one_event_type_to_native_and_non_native_subscribers_in_a_loop_in_the_context_of_incoming_message.cs create mode 100644 src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/When_publishing_two_event_types_to_native_and_non_native_subscribers_in_a_loop.cs create mode 100644 src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/When_publishing_two_event_types_to_native_and_non_native_subscribers_in_a_loop_in_the_context_of_incoming_message.cs create mode 100644 src/NServiceBus.Transport.SQS.AcceptanceTests/TestIndependenceMutator.cs create mode 100644 src/NServiceBus.Transport.SQS.AcceptanceTests/TestIndependenceSkipBehavior.cs create mode 100644 src/NServiceBus.Transport.SQS.AcceptanceTests/UseFixedNamePrefixAttribute.cs create mode 100644 src/NServiceBus.Transport.SQS/Configure/SqsSubscriptionMigrationModeSettings.cs create mode 100644 src/NServiceBus.Transport.SQS/HybridPubSubChecker.cs create mode 100644 src/NServiceBus.Transport.SQS/RateLimiter/AwaitableConstraint.cs create mode 100644 src/NServiceBus.Transport.SQS/RateLimiter/DisposableAction.cs create mode 100644 src/NServiceBus.Transport.SQS/RateLimiter/RateLimiter.cs create mode 100644 src/NServiceBus.Transport.SQS/RateLimiter/SizeConstrainedStack.cs create mode 100644 src/NServiceBus.Transport.SQS/RateLimiter/SnsListSubscriptionsByTopicRateLimiter.cs create mode 100644 src/NServiceBus.Transport.SQS/RateLimiter/SnsListTopicsRateLimiter.cs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 33c564460..f24541239 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -72,6 +72,10 @@ jobs: - name: Run Windows tests if: matrix.name == 'Windows' run: dotnet test src --configuration Release --no-build --logger "GitHubActions;report-warnings=false" + env: + NServiceBus_AmazonSQS_AT_CustomFixedNamePrefix: CiWinATT - name: Run Linux tests if: matrix.name == 'Linux' run: dotnet test src --configuration Release --no-build --framework netcoreapp3.1 --logger "GitHubActions;report-warnings=false" + env: + NServiceBus_AmazonSQS_AT_CustomFixedNamePrefix: CiNixATT diff --git a/README.md b/README.md index 55855845a..91efaf2fc 100644 --- a/README.md +++ b/README.md @@ -29,14 +29,22 @@ The transport can be configured using the following environment variables: The names of queues used by the acceptance tests take the following form: - AT- + AT- Where * `AT` stands for "Acceptance Test" - * `datetime` is a date and time as yyyyMMddHHmmss that uniquely identifies a single test run. For example, when 100 tests are executed in a single test run each queue will have the same datetime timestamp. + * `sanitized-guid` is a GUID converted to a base 64 string from which invalid characters are removed. For example, when 100 tests are executed in a single test run each queue will have the same `sanitized-guid`. * `pre-truncated-queue-name` is the name of the queue, "pre-truncated" (characters are removed from the beginning) so that the entire queue name is 80 characters or less. +_Note:_ + +Some tests generate high load and require a fixed queue naming scheme to prevent policy propagation in the cluster to affect the test execution. The fixed resources prefix can be customized by using the `NServiceBus_AmazonSQS_AT_CustomFixedNamePrefix` environment variable. If not specified an exception will be thrown. For these tests the resources name schema is: + + -- + +Where `optional-test-case-sequence` is an optional integer value specified by tests using the NUnit test case feature to run the same test multiple times with different input values. + This scheme accomplishes the following goals: * Test runs are idempotent - each test run uses its own set of queues diff --git a/src/NServiceBus.Transport.SQS.AcceptanceTests/ConfigureEndpointSqsTransport.cs b/src/NServiceBus.Transport.SQS.AcceptanceTests/ConfigureEndpointSqsTransport.cs index f62c41769..b40beb053 100644 --- a/src/NServiceBus.Transport.SQS.AcceptanceTests/ConfigureEndpointSqsTransport.cs +++ b/src/NServiceBus.Transport.SQS.AcceptanceTests/ConfigureEndpointSqsTransport.cs @@ -23,7 +23,15 @@ public Task Configure(string endpointName, EndpointConfiguration configuration, ApplyMappingsToSupportMultipleInheritance(endpointName, transportConfig); - settings.TestExecutionTimeout = TimeSpan.FromSeconds(120); + configuration.RegisterComponents(c => c.ConfigureComponent(DependencyLifecycle.SingleInstance)); + configuration.Pipeline.Register("TestIndependenceBehavior", typeof(TestIndependenceSkipBehavior), "Skips messages not created during the current test."); + + if (settings.TestExecutionTimeout == null) + { + //If it's not null it means it has been set to a custom + //value in the test. We don't want to overwrite that + settings.TestExecutionTimeout = TimeSpan.FromSeconds(120); + } return Task.FromResult(0); } diff --git a/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/TestCase.cs b/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/TestCase.cs new file mode 100644 index 000000000..34522b864 --- /dev/null +++ b/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/TestCase.cs @@ -0,0 +1,28 @@ +namespace NServiceBus.AcceptanceTests.NativePubSub.HybridModeRateLimit +{ + using System; + + public class TestCase + { + public TestCase(int sequence) => Sequence = sequence; + + public int NumberOfEvents { get; internal set; } + public TimeSpan? TestExecutionTimeout { get; internal set; } + public int MessageVisibilityTimeout { get; internal set; } = DefaultMessageVisibilityTimeout; + public TimeSpan SubscriptionsCacheTTL { get; internal set; } = DefaultSubscriptionCacheTTL; + public TimeSpan NotFoundTopicsCacheTTL { get; internal set; } = DefaultTopicCacheTTL; + public int Sequence { get; } + + public override string ToString() => $"#{Sequence}, " + + $"{nameof(NumberOfEvents)}: {NumberOfEvents}, " + + $"{nameof(MessageVisibilityTimeout)}: {(MessageVisibilityTimeout == DefaultMessageVisibilityTimeout ? "default" : MessageVisibilityTimeout.ToString())}, " + + $"{nameof(TestExecutionTimeout)}: {TestExecutionTimeout?.ToString() ?? "default"}, " + + $"{nameof(SubscriptionsCacheTTL)}: {(SubscriptionsCacheTTL == DefaultSubscriptionCacheTTL ? "default" : SubscriptionsCacheTTL.ToString())}, " + + $"{nameof(NotFoundTopicsCacheTTL)}: {(NotFoundTopicsCacheTTL == DefaultTopicCacheTTL ? "default" : NotFoundTopicsCacheTTL.ToString())}"; + + static TimeSpan DefaultSubscriptionCacheTTL = TimeSpan.FromSeconds(5); + static TimeSpan DefaultTopicCacheTTL = TimeSpan.FromSeconds(5); + static int DefaultMessageVisibilityTimeout = 30; + + } +} diff --git a/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/When_publishing_one_event_type_to_native_and_non_native_subscribers_in_a_loop.cs b/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/When_publishing_one_event_type_to_native_and_non_native_subscribers_in_a_loop.cs new file mode 100644 index 000000000..dc62b2851 --- /dev/null +++ b/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/When_publishing_one_event_type_to_native_and_non_native_subscribers_in_a_loop.cs @@ -0,0 +1,179 @@ +namespace NServiceBus.AcceptanceTests.NativePubSub.HybridModeRateLimit +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Threading; + using System.Threading.Tasks; + using AcceptanceTesting; + using EndpointTemplates; + using NServiceBus.Configuration.AdvancedExtensibility; + using NServiceBus.Features; + using NServiceBus.Routing.MessageDrivenSubscriptions; + using NUnit.Framework; + using Conventions = AcceptanceTesting.Customization.Conventions; + + public class When_publishing_one_event_type_to_native_and_non_native_subscribers_in_a_loop : NServiceBusAcceptanceTest + { + static TestCase[] TestCases = + { + new TestCase(1){ NumberOfEvents = 1 }, + new TestCase(2){ NumberOfEvents = 100 }, + new TestCase(3){ NumberOfEvents = 300, SubscriptionsCacheTTL = TimeSpan.FromMinutes(1) }, + new TestCase(4){ NumberOfEvents = 1000, TestExecutionTimeout = TimeSpan.FromMinutes(4), SubscriptionsCacheTTL = TimeSpan.FromMinutes(1), NotFoundTopicsCacheTTL = TimeSpan.FromMinutes(1) }, + }; + + [Test, UseFixedNamePrefix, TestCaseSource(nameof(TestCases))] + public async Task Should_not_rate_exceed(TestCase testCase) + { + SetupFixture.AppendSequenceToNamePrefix(testCase.Sequence); + + var context = await Scenario.Define() + .WithEndpoint(b => + { + b.CustomConfig(config => + { +#pragma warning disable CS0618 + var migrationMode = config.ConfigureSqsTransport().EnableMessageDrivenPubSubCompatibilityMode(); + migrationMode.SubscriptionsCacheTTL(testCase.SubscriptionsCacheTTL); + migrationMode.TopicCacheTTL(testCase.NotFoundTopicsCacheTTL); +#pragma warning restore CS0618 + }); + + b.When(c => c.SubscribedMessageDriven && c.SubscribedNative, (session, ctx) => + { + var sw = Stopwatch.StartNew(); + var tasks = new List(); + for (int i = 0; i < testCase.NumberOfEvents; i++) + { + tasks.Add(session.Publish(new MyEvent())); + } + _ = Task.WhenAll(tasks).ContinueWith(t => + { + sw.Stop(); + ctx.PublishTime = sw.Elapsed; + }); + return Task.FromResult(0); + }); + }) + .WithEndpoint(b => + { + b.When((_, ctx) => + { + ctx.SubscribedNative = true; + return Task.FromResult(0); + }); + }) + .WithEndpoint(b => + { + b.When((session, ctx) => session.Subscribe()); + }) + .Done(c => c.NativePubSubSubscriberReceivedEventsCount == testCase.NumberOfEvents + && c.MessageDrivenPubSubSubscriberReceivedEventsCount == testCase.NumberOfEvents) + .Run(testCase.TestExecutionTimeout); + + Assert.AreEqual(testCase.NumberOfEvents, context.MessageDrivenPubSubSubscriberReceivedEventsCount); + Assert.AreEqual(testCase.NumberOfEvents, context.NativePubSubSubscriberReceivedEventsCount); + } + + public class Context : ScenarioContext + { + int nativePubSubSubscriberReceivedEventsCount; + public int NativePubSubSubscriberReceivedEventsCount => nativePubSubSubscriberReceivedEventsCount; + public void IncrementNativePubSubSubscriberReceivedEventsCount() + { + Interlocked.Increment(ref nativePubSubSubscriberReceivedEventsCount); + } + int messageDrivenPubSubSubscriberReceivedEventsCount; + public int MessageDrivenPubSubSubscriberReceivedEventsCount => messageDrivenPubSubSubscriberReceivedEventsCount; + public void IncrementMessageDrivenPubSubSubscriberReceivedEventsCount() + { + Interlocked.Increment(ref messageDrivenPubSubSubscriberReceivedEventsCount); + } + public bool SubscribedMessageDriven { get; set; } + public bool SubscribedNative { get; set; } + public TimeSpan PublishTime { get; set; } + } + + public class Publisher : EndpointConfigurationBuilder + { + public Publisher() + { + EndpointSetup(c => + { + var subscriptionStorage = new TestingInMemorySubscriptionStorage(); + c.UsePersistence().UseStorage(subscriptionStorage); + + c.OnEndpointSubscribed((s, context) => + { + if (s.SubscriberEndpoint.Contains(Conventions.EndpointNamingConvention(typeof(MessageDrivenPubSubSubscriber)))) + { + context.SubscribedMessageDriven = true; + } + }); + }).IncludeType(); + } + } + + public class NativePubSubSubscriber : EndpointConfigurationBuilder + { + public NativePubSubSubscriber() + { + EndpointSetup(c => { }); + } + + public class MyEventMessageHandler : IHandleMessages + { + Context testContext; + + public MyEventMessageHandler(Context testContext) + { + this.testContext = testContext; + } + + public Task Handle(MyEvent @event, IMessageHandlerContext context) + { + testContext.IncrementNativePubSubSubscriberReceivedEventsCount(); + return Task.FromResult(0); + } + } + } + + public class MessageDrivenPubSubSubscriber : EndpointConfigurationBuilder + { + public MessageDrivenPubSubSubscriber() + { + EndpointSetup(c => + { + c.DisableFeature(); + c.GetSettings().Set("NServiceBus.AmazonSQS.DisableNativePubSub", true); + c.GetSettings().GetOrCreate().AddOrReplacePublishers("LegacyConfig", new List + { + new PublisherTableEntry(typeof(MyEvent), PublisherAddress.CreateFromEndpointName(Conventions.EndpointNamingConvention(typeof(Publisher)))) + }); + }, + metadata => metadata.RegisterPublisherFor(typeof(Publisher))); + } + + public class MyEventMessageHandler : IHandleMessages + { + Context testContext; + + public MyEventMessageHandler(Context testContext) + { + this.testContext = testContext; + } + + public Task Handle(MyEvent @event, IMessageHandlerContext context) + { + testContext.IncrementMessageDrivenPubSubSubscriberReceivedEventsCount(); + return Task.FromResult(0); + } + } + } + + public class MyEvent : IEvent + { + } + } +} diff --git a/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/When_publishing_one_event_type_to_native_and_non_native_subscribers_in_a_loop_in_the_context_of_incoming_message.cs b/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/When_publishing_one_event_type_to_native_and_non_native_subscribers_in_a_loop_in_the_context_of_incoming_message.cs new file mode 100644 index 000000000..f490f28da --- /dev/null +++ b/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/When_publishing_one_event_type_to_native_and_non_native_subscribers_in_a_loop_in_the_context_of_incoming_message.cs @@ -0,0 +1,214 @@ +namespace NServiceBus.AcceptanceTests.NativePubSub.HybridModeRateLimit +{ + using AcceptanceTesting; + using EndpointTemplates; + using NServiceBus.Configuration.AdvancedExtensibility; + using NServiceBus.Features; + using NServiceBus.Routing.MessageDrivenSubscriptions; + using NUnit.Framework; + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Threading; + using System.Threading.Tasks; + using Conventions = AcceptanceTesting.Customization.Conventions; + + public class When_publishing_one_event_type_to_native_and_non_native_subscribers_in_a_loop_in_the_context_of_incoming_message : NServiceBusAcceptanceTest + { + static TestCase[] TestCases = + { + new TestCase(1){ NumberOfEvents = 1 }, + new TestCase(2){ NumberOfEvents = 100 }, + new TestCase(3){ NumberOfEvents = 200 }, + new TestCase(4){ NumberOfEvents = 300 }, + new TestCase(5) + { + NumberOfEvents = 1000, + MessageVisibilityTimeout = 180, + TestExecutionTimeout = TimeSpan.FromMinutes(3), + SubscriptionsCacheTTL = TimeSpan.FromMinutes(1), + NotFoundTopicsCacheTTL = TimeSpan.FromMinutes(1), + }, + new TestCase(6) + { + NumberOfEvents = 3000, + MessageVisibilityTimeout = 300, + TestExecutionTimeout = TimeSpan.FromMinutes(7), + SubscriptionsCacheTTL = TimeSpan.FromMinutes(2), + NotFoundTopicsCacheTTL = TimeSpan.FromMinutes(2), + }, + }; + + [Test, UseFixedNamePrefix, TestCaseSource(nameof(TestCases))] + public async Task Should_not_rate_exceed(TestCase testCase) + { + SetupFixture.AppendSequenceToNamePrefix(testCase.Sequence); + + var context = await Scenario.Define() + .WithEndpoint(b => + { + b.CustomConfig(config => + { +#pragma warning disable CS0618 + var migrationMode = config.ConfigureSqsTransport().EnableMessageDrivenPubSubCompatibilityMode(); + migrationMode.SubscriptionsCacheTTL(testCase.SubscriptionsCacheTTL); + migrationMode.TopicCacheTTL(testCase.NotFoundTopicsCacheTTL); + migrationMode.MessageVisibilityTimeout(testCase.MessageVisibilityTimeout); +#pragma warning restore CS0618 + }); + + b.When(c => c.SubscribedMessageDriven && c.SubscribedNative, session => + { + return session.SendLocal(new KickOff { NumberOfEvents = testCase.NumberOfEvents }); + }); + }) + .WithEndpoint(b => + { + b.When((_, ctx) => + { + ctx.SubscribedNative = true; + return Task.FromResult(0); + }); + }) + .WithEndpoint(b => + { + b.When((session, ctx) => session.Subscribe()); + }) + .Done(c => c.NativePubSubSubscriberReceivedEventsCount == testCase.NumberOfEvents + && c.MessageDrivenPubSubSubscriberReceivedEventsCount == testCase.NumberOfEvents) + .Run(testCase.TestExecutionTimeout); + + Assert.AreEqual(testCase.NumberOfEvents, context.MessageDrivenPubSubSubscriberReceivedEventsCount); + Assert.AreEqual(testCase.NumberOfEvents, context.NativePubSubSubscriberReceivedEventsCount); + } + + public class Context : ScenarioContext + { + int nativePubSubSubscriberReceivedEventsCount; + public int NativePubSubSubscriberReceivedEventsCount => nativePubSubSubscriberReceivedEventsCount; + public void IncrementNativePubSubSubscriberReceivedEventsCount() + { + Interlocked.Increment(ref nativePubSubSubscriberReceivedEventsCount); + } + + int messageDrivenPubSubSubscriberReceivedEventsCount; + public int MessageDrivenPubSubSubscriberReceivedEventsCount => messageDrivenPubSubSubscriberReceivedEventsCount; + public void IncrementMessageDrivenPubSubSubscriberReceivedEventsCount() + { + Interlocked.Increment(ref messageDrivenPubSubSubscriberReceivedEventsCount); + } + public bool SubscribedMessageDriven { get; set; } + public bool SubscribedNative { get; set; } + public TimeSpan PublishTime { get; set; } + } + + public class Publisher : EndpointConfigurationBuilder + { + public Publisher() + { + EndpointSetup(c => + { + var subscriptionStorage = new TestingInMemorySubscriptionStorage(); + c.UsePersistence().UseStorage(subscriptionStorage); + + c.OnEndpointSubscribed((s, context) => + { + if (s.SubscriberEndpoint.Contains(Conventions.EndpointNamingConvention(typeof(MessageDrivenPubSubSubscriber)))) + { + context.SubscribedMessageDriven = true; + } + }); + }).IncludeType(); + } + + public class KickOffMessageHandler : IHandleMessages + { + readonly Context testContext; + + public KickOffMessageHandler(Context testContext) + { + this.testContext = testContext; + } + + public async Task Handle(KickOff message, IMessageHandlerContext context) + { + var sw = Stopwatch.StartNew(); + var tasks = new List(); + for (int i = 0; i < message.NumberOfEvents; i++) + { + tasks.Add(context.Publish(new MyEvent())); + } + await Task.WhenAll(tasks); + sw.Stop(); + testContext.PublishTime = sw.Elapsed; + } + } + } + + public class NativePubSubSubscriber : EndpointConfigurationBuilder + { + public NativePubSubSubscriber() + { + EndpointSetup(c => { }); + } + + public class MyEventMessageHandler : IHandleMessages + { + Context testContext; + + public MyEventMessageHandler(Context testContext) + { + this.testContext = testContext; + } + + public Task Handle(MyEvent @event, IMessageHandlerContext context) + { + testContext.IncrementNativePubSubSubscriberReceivedEventsCount(); + return Task.FromResult(0); + } + } + } + + public class MessageDrivenPubSubSubscriber : EndpointConfigurationBuilder + { + public MessageDrivenPubSubSubscriber() + { + EndpointSetup(c => + { + c.DisableFeature(); + c.GetSettings().Set("NServiceBus.AmazonSQS.DisableNativePubSub", true); + c.GetSettings().GetOrCreate().AddOrReplacePublishers("LegacyConfig", new List + { + new PublisherTableEntry(typeof(MyEvent), PublisherAddress.CreateFromEndpointName(Conventions.EndpointNamingConvention(typeof(Publisher)))) + }); + }, + metadata => metadata.RegisterPublisherFor(typeof(Publisher))); + } + + public class MyEventMessageHandler : IHandleMessages + { + Context testContext; + + public MyEventMessageHandler(Context testContext) + { + this.testContext = testContext; + } + + public Task Handle(MyEvent @event, IMessageHandlerContext context) + { + testContext.IncrementMessageDrivenPubSubSubscriberReceivedEventsCount(); + return Task.FromResult(0); + } + } + } + + public class KickOff : ICommand + { + public int NumberOfEvents { get; set; } + } + + public class MyEvent : IEvent + { + } + } +} diff --git a/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/When_publishing_two_event_types_to_native_and_non_native_subscribers_in_a_loop.cs b/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/When_publishing_two_event_types_to_native_and_non_native_subscribers_in_a_loop.cs new file mode 100644 index 000000000..3f3299114 --- /dev/null +++ b/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/When_publishing_two_event_types_to_native_and_non_native_subscribers_in_a_loop.cs @@ -0,0 +1,242 @@ +namespace NServiceBus.AcceptanceTests.NativePubSub.HybridModeRateLimit +{ + using AcceptanceTesting; + using EndpointTemplates; + using NServiceBus.Configuration.AdvancedExtensibility; + using NServiceBus.Features; + using NServiceBus.Routing.MessageDrivenSubscriptions; + using NUnit.Framework; + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Threading; + using System.Threading.Tasks; + using Conventions = AcceptanceTesting.Customization.Conventions; + + public class When_publishing_two_event_types_to_native_and_non_native_subscribers_in_a_loop : NServiceBusAcceptanceTest + { + static TestCase[] TestCases = + { + new TestCase(1) + { + NumberOfEvents = 1 + }, + new TestCase(2) + { + NumberOfEvents = 100, + SubscriptionsCacheTTL = TimeSpan.FromMinutes(1), + NotFoundTopicsCacheTTL = TimeSpan.FromMinutes(1), + }, + new TestCase(3) + { + NumberOfEvents = 200, + SubscriptionsCacheTTL = TimeSpan.FromMinutes(3), + NotFoundTopicsCacheTTL = TimeSpan.FromMinutes(3), + } + }; + + [Test, UseFixedNamePrefix, TestCaseSource(nameof(TestCases))] + public async Task Should_not_rate_exceed(TestCase testCase) + { + SetupFixture.AppendSequenceToNamePrefix(testCase.Sequence); + + var context = await Scenario.Define() + .WithEndpoint(b => + { + b.CustomConfig(config => + { +#pragma warning disable CS0618 + var migrationMode = config.ConfigureSqsTransport().EnableMessageDrivenPubSubCompatibilityMode(); + migrationMode.SubscriptionsCacheTTL(testCase.SubscriptionsCacheTTL); + migrationMode.TopicCacheTTL(testCase.NotFoundTopicsCacheTTL); +#pragma warning restore CS0618 + }); + + b.When(c => c.SubscribedMessageDrivenToMyEvent && c.SubscribedMessageDrivenToMySecondEvent && c.SubscribedNative, (session, ctx) => + { + var sw = Stopwatch.StartNew(); + var tasks = new List(); + for (int i = 0; i < testCase.NumberOfEvents; i++) + { + tasks.Add(session.Publish(new MyEvent())); + tasks.Add(session.Publish(new MySecondEvent())); + } + _ = Task.WhenAll(tasks).ContinueWith(t => + { + sw.Stop(); + ctx.PublishTime = sw.Elapsed; + }); + return Task.FromResult(0); + }); + }) + .WithEndpoint(b => + { + b.When((_, ctx) => + { + ctx.SubscribedNative = true; + return Task.FromResult(0); + }); + }) + .WithEndpoint(b => + { + b.When(async (session, ctx) => + { + await session.Subscribe(); + await session.Subscribe(); + }); + }) + .Done(c => c.NativePubSubSubscriberReceivedMyEventCount == testCase.NumberOfEvents + && c.MessageDrivenPubSubSubscriberReceivedMyEventCount == testCase.NumberOfEvents + && c.MessageDrivenPubSubSubscriberReceivedMySecondEventCount == testCase.NumberOfEvents) + .Run(testCase.TestExecutionTimeout); + + Assert.AreEqual(testCase.NumberOfEvents, context.MessageDrivenPubSubSubscriberReceivedMyEventCount); + Assert.AreEqual(testCase.NumberOfEvents, context.NativePubSubSubscriberReceivedMyEventCount); + Assert.AreEqual(testCase.NumberOfEvents, context.MessageDrivenPubSubSubscriberReceivedMySecondEventCount); + } + + public class Context : ScenarioContext + { + int nativePubSubSubscriberReceivedMyEventCount; + internal void IncrementNativePubSubSubscriberReceivedMyEventCount() + { + Interlocked.Increment(ref nativePubSubSubscriberReceivedMyEventCount); + } + public int NativePubSubSubscriberReceivedMyEventCount => nativePubSubSubscriberReceivedMyEventCount; + + int messageDrivenPubSubSubscriberReceivedMyEventCount; + internal void IncrementMessageDrivenPubSubSubscriberReceivedMyEventCount() + { + Interlocked.Increment(ref messageDrivenPubSubSubscriberReceivedMyEventCount); + } + public int MessageDrivenPubSubSubscriberReceivedMyEventCount => messageDrivenPubSubSubscriberReceivedMyEventCount; + + int messageDrivenPubSubSubscriberReceivedMySecondEventCount; + internal void IncrementMessageDrivenPubSubSubscriberReceivedMySecondEventCount() + { + Interlocked.Increment(ref messageDrivenPubSubSubscriberReceivedMySecondEventCount); + } + public int MessageDrivenPubSubSubscriberReceivedMySecondEventCount => messageDrivenPubSubSubscriberReceivedMySecondEventCount; + public bool SubscribedMessageDrivenToMyEvent { get; set; } + public bool SubscribedMessageDrivenToMySecondEvent { get; set; } + public bool SubscribedNative { get; set; } + public TimeSpan PublishTime { get; set; } + } + + public class Publisher : EndpointConfigurationBuilder + { + public Publisher() + { + EndpointSetup(c => + { + var subscriptionStorage = new TestingInMemorySubscriptionStorage(); + c.UsePersistence().UseStorage(subscriptionStorage); + + c.OnEndpointSubscribed((s, context) => + { + if (!s.SubscriberEndpoint.Contains(Conventions.EndpointNamingConvention(typeof(MessageDrivenPubSubSubscriber)))) + { + return; + } + + if (Type.GetType(s.MessageType) == typeof(MyEvent)) + { + context.SubscribedMessageDrivenToMyEvent = true; + } + + if (Type.GetType(s.MessageType) == typeof(MySecondEvent)) + { + context.SubscribedMessageDrivenToMySecondEvent = true; + } + }); + }).IncludeType(); + } + } + + public class NativePubSubSubscriber : EndpointConfigurationBuilder + { + public NativePubSubSubscriber() + { + EndpointSetup(c => { }); + } + + public class MyEventMessageHandler : IHandleMessages + { + Context testContext; + + public MyEventMessageHandler(Context testContext) + { + this.testContext = testContext; + } + + public Task Handle(MyEvent @event, IMessageHandlerContext context) + { + testContext.IncrementNativePubSubSubscriberReceivedMyEventCount(); + return Task.FromResult(0); + } + } + } + + public class MessageDrivenPubSubSubscriber : EndpointConfigurationBuilder + { + public MessageDrivenPubSubSubscriber() + { + EndpointSetup(c => + { + c.DisableFeature(); + c.GetSettings().Set("NServiceBus.AmazonSQS.DisableNativePubSub", true); + c.GetSettings().GetOrCreate().AddOrReplacePublishers("LegacyConfig", new List + { + new PublisherTableEntry(typeof(MyEvent), PublisherAddress.CreateFromEndpointName(Conventions.EndpointNamingConvention(typeof(Publisher)))), + new PublisherTableEntry(typeof(MySecondEvent), PublisherAddress.CreateFromEndpointName(Conventions.EndpointNamingConvention(typeof(Publisher)))) + }); + }, + metadata => + { + metadata.RegisterPublisherFor(typeof(Publisher)); + metadata.RegisterPublisherFor(typeof(Publisher)); + }); + } + + public class MyEventMessageHandler : IHandleMessages + { + Context testContext; + + public MyEventMessageHandler(Context testContext) + { + this.testContext = testContext; + } + + public Task Handle(MyEvent @event, IMessageHandlerContext context) + { + testContext.IncrementMessageDrivenPubSubSubscriberReceivedMyEventCount(); + return Task.FromResult(0); + } + } + + public class MySecondEventMessageHandler : IHandleMessages + { + Context testContext; + + public MySecondEventMessageHandler(Context testContext) + { + this.testContext = testContext; + } + + public Task Handle(MySecondEvent @event, IMessageHandlerContext context) + { + testContext.IncrementMessageDrivenPubSubSubscriberReceivedMySecondEventCount(); + return Task.FromResult(0); + } + } + } + + public class MyEvent : IEvent + { + } + + public class MySecondEvent : IEvent + { + } + } +} diff --git a/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/When_publishing_two_event_types_to_native_and_non_native_subscribers_in_a_loop_in_the_context_of_incoming_message.cs b/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/When_publishing_two_event_types_to_native_and_non_native_subscribers_in_a_loop_in_the_context_of_incoming_message.cs new file mode 100644 index 000000000..fce9da552 --- /dev/null +++ b/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/When_publishing_two_event_types_to_native_and_non_native_subscribers_in_a_loop_in_the_context_of_incoming_message.cs @@ -0,0 +1,276 @@ +namespace NServiceBus.AcceptanceTests.NativePubSub.HybridModeRateLimit +{ + using AcceptanceTesting; + using EndpointTemplates; + using NServiceBus.Configuration.AdvancedExtensibility; + using NServiceBus.Features; + using NServiceBus.Routing.MessageDrivenSubscriptions; + using NUnit.Framework; + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Threading; + using System.Threading.Tasks; + using Conventions = AcceptanceTesting.Customization.Conventions; + + public class When_publishing_two_event_types_to_native_and_non_native_subscribers_in_a_loop_in_the_context_of_incoming_message : NServiceBusAcceptanceTest + { + static TestCase[] TestCases = + { + new TestCase(1){ NumberOfEvents = 1 }, + new TestCase(2){ NumberOfEvents = 100, MessageVisibilityTimeout = 60, }, + new TestCase(3) + { + NumberOfEvents = 200, + MessageVisibilityTimeout = 120, + SubscriptionsCacheTTL = TimeSpan.FromMinutes(2), + NotFoundTopicsCacheTTL = TimeSpan.FromSeconds(120) + }, + new TestCase(4) + { + NumberOfEvents = 300, + MessageVisibilityTimeout = 180, + TestExecutionTimeout = TimeSpan.FromMinutes(3), + SubscriptionsCacheTTL = TimeSpan.FromMinutes(2), + NotFoundTopicsCacheTTL = TimeSpan.FromSeconds(120) + }, + new TestCase(5) + { + NumberOfEvents = 1000, + MessageVisibilityTimeout = 360, + SubscriptionsCacheTTL = TimeSpan.FromSeconds(120), + TestExecutionTimeout = TimeSpan.FromMinutes(8), + NotFoundTopicsCacheTTL = TimeSpan.FromSeconds(120) + }, + }; + + [Test, UseFixedNamePrefix, TestCaseSource(nameof(TestCases))] + public async Task Should_not_rate_exceed(TestCase testCase) + { + SetupFixture.AppendSequenceToNamePrefix(testCase.Sequence); + + var context = await Scenario.Define() + .WithEndpoint(b => + { + b.When(async (session, ctx) => + { + TestContext.WriteLine("Sending subscriptions"); + await Task.WhenAll( + session.Subscribe(), + session.Subscribe() + ); + TestContext.WriteLine("Subscriptions sent"); + }); + }) + .WithEndpoint(b => + { + b.When((_, ctx) => + { + ctx.SubscribedNative = true; + return Task.FromResult(0); + }); + }) + .WithEndpoint(b => + { + b.CustomConfig(config => + { +#pragma warning disable CS0618 + var migrationMode = config.ConfigureSqsTransport().EnableMessageDrivenPubSubCompatibilityMode(); + migrationMode.SubscriptionsCacheTTL(testCase.SubscriptionsCacheTTL); + migrationMode.TopicCacheTTL(testCase.NotFoundTopicsCacheTTL); + migrationMode.MessageVisibilityTimeout(testCase.MessageVisibilityTimeout); +#pragma warning restore CS0618 + }); + + b.When(c => c.SubscribedMessageDrivenToMyEvent && c.SubscribedMessageDrivenToMySecondEvent && c.SubscribedNative, session => + { + return session.SendLocal(new KickOff { NumberOfEvents = testCase.NumberOfEvents }); + }); + }) + .Done(c => c.NativePubSubSubscriberReceivedMyEventCount == testCase.NumberOfEvents + && c.MessageDrivenPubSubSubscriberReceivedMyEventCount == testCase.NumberOfEvents + && c.MessageDrivenPubSubSubscriberReceivedMySecondEventCount == testCase.NumberOfEvents) + .Run(testCase.TestExecutionTimeout); + + Assert.AreEqual(testCase.NumberOfEvents, context.MessageDrivenPubSubSubscriberReceivedMyEventCount); + Assert.AreEqual(testCase.NumberOfEvents, context.NativePubSubSubscriberReceivedMyEventCount); + Assert.AreEqual(testCase.NumberOfEvents, context.MessageDrivenPubSubSubscriberReceivedMySecondEventCount); + } + + public class Context : ScenarioContext + { + int nativePubSubSubscriberReceivedMyEventCount; + internal void IncrementNativePubSubSubscriberReceivedMyEventCount() + { + Interlocked.Increment(ref nativePubSubSubscriberReceivedMyEventCount); + } + public int NativePubSubSubscriberReceivedMyEventCount => nativePubSubSubscriberReceivedMyEventCount; + + int messageDrivenPubSubSubscriberReceivedMyEventCount; + internal void IncrementMessageDrivenPubSubSubscriberReceivedMyEventCount() + { + Interlocked.Increment(ref messageDrivenPubSubSubscriberReceivedMyEventCount); + } + public int MessageDrivenPubSubSubscriberReceivedMyEventCount => messageDrivenPubSubSubscriberReceivedMyEventCount; + + int messageDrivenPubSubSubscriberReceivedMySecondEventCount; + internal void IncrementMessageDrivenPubSubSubscriberReceivedMySecondEventCount() + { + Interlocked.Increment(ref messageDrivenPubSubSubscriberReceivedMySecondEventCount); + } + public int MessageDrivenPubSubSubscriberReceivedMySecondEventCount => messageDrivenPubSubSubscriberReceivedMySecondEventCount; + + public bool SubscribedMessageDrivenToMyEvent { get; set; } + public bool SubscribedMessageDrivenToMySecondEvent { get; set; } + public bool SubscribedNative { get; set; } + public TimeSpan PublishTime { get; set; } + } + + public class Publisher : EndpointConfigurationBuilder + { + public Publisher() + { + EndpointSetup(c => + { + var subscriptionStorage = new TestingInMemorySubscriptionStorage(); + c.UsePersistence().UseStorage(subscriptionStorage); + + c.OnEndpointSubscribed((s, context) => + { + TestContext.WriteLine($"Received subscription message {s.MessageType} from {s.SubscriberEndpoint}."); + if (!s.SubscriberEndpoint.Contains(Conventions.EndpointNamingConvention(typeof(MessageDrivenPubSubSubscriber)))) + { + return; + } + + if (Type.GetType(s.MessageType) == typeof(MyEvent)) + { + context.SubscribedMessageDrivenToMyEvent = true; + } + + if (Type.GetType(s.MessageType) == typeof(MySecondEvent)) + { + context.SubscribedMessageDrivenToMySecondEvent = true; + } + TestContext.WriteLine($"Subscription message processed."); + }); + }).IncludeType(); + } + + public class KickOffMessageHandler : IHandleMessages + { + readonly Context testContext; + + public KickOffMessageHandler(Context testContext) + { + this.testContext = testContext; + } + + public async Task Handle(KickOff message, IMessageHandlerContext context) + { + var sw = Stopwatch.StartNew(); + var tasks = new List(); + for (int i = 0; i < message.NumberOfEvents; i++) + { + tasks.Add(context.Publish(new MyEvent())); + tasks.Add(context.Publish(new MySecondEvent())); + } + await Task.WhenAll(tasks); + sw.Stop(); + testContext.PublishTime = sw.Elapsed; + } + } + } + + public class NativePubSubSubscriber : EndpointConfigurationBuilder + { + public NativePubSubSubscriber() + { + EndpointSetup(c => { }); + } + + public class MyEventMessageHandler : IHandleMessages + { + Context testContext; + + public MyEventMessageHandler(Context testContext) + { + this.testContext = testContext; + } + + public Task Handle(MyEvent @event, IMessageHandlerContext context) + { + testContext.IncrementNativePubSubSubscriberReceivedMyEventCount(); + return Task.FromResult(0); + } + } + } + + public class MessageDrivenPubSubSubscriber : EndpointConfigurationBuilder + { + public MessageDrivenPubSubSubscriber() + { + EndpointSetup(c => + { + c.DisableFeature(); + c.GetSettings().Set("NServiceBus.AmazonSQS.DisableNativePubSub", true); + c.GetSettings().GetOrCreate().AddOrReplacePublishers("LegacyConfig", new List + { + new PublisherTableEntry(typeof(MyEvent), PublisherAddress.CreateFromEndpointName(Conventions.EndpointNamingConvention(typeof(Publisher)))), + new PublisherTableEntry(typeof(MySecondEvent), PublisherAddress.CreateFromEndpointName(Conventions.EndpointNamingConvention(typeof(Publisher)))) + }); + }, + metadata => + { + metadata.RegisterPublisherFor(typeof(Publisher)); + metadata.RegisterPublisherFor(typeof(Publisher)); + }); + } + + public class MyEventMessageHandler : IHandleMessages + { + Context testContext; + + public MyEventMessageHandler(Context testContext) + { + this.testContext = testContext; + } + + public Task Handle(MyEvent @event, IMessageHandlerContext context) + { + testContext.IncrementMessageDrivenPubSubSubscriberReceivedMyEventCount(); + return Task.FromResult(0); + } + } + + public class MySecondEventMessageHandler : IHandleMessages + { + Context testContext; + + public MySecondEventMessageHandler(Context testContext) + { + this.testContext = testContext; + } + + public Task Handle(MySecondEvent @event, IMessageHandlerContext context) + { + testContext.IncrementMessageDrivenPubSubSubscriberReceivedMySecondEventCount(); + return Task.FromResult(0); + } + } + } + + public class KickOff : ICommand + { + public int NumberOfEvents { get; set; } + } + + public class MyEvent : IEvent + { + } + + public class MySecondEvent : IEvent + { + } + } +} diff --git a/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/When_migrating_publisher_first.cs b/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/When_migrating_publisher_first.cs index 65898a78b..2de213818 100644 --- a/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/When_migrating_publisher_first.cs +++ b/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/When_migrating_publisher_first.cs @@ -1,4 +1,4 @@ -namespace NServiceBus.AcceptanceTests.NativePubSub +namespace NServiceBus.AcceptanceTests.NativePubSub { using System; using System.Collections.Generic; @@ -16,7 +16,7 @@ public class When_migrating_publisher_first : NServiceBusAcceptanceTest { static string PublisherEndpoint => Conventions.EndpointNamingConvention(typeof(Publisher)); - [Test] + [Test, UseFixedNamePrefix] public async Task Should_not_lose_any_events() { var subscriptionStorage = new TestingInMemorySubscriptionStorage(); @@ -49,7 +49,7 @@ public async Task Should_not_lose_any_events() }); }) .Done(c => c.GotTheEvent) - .Run(TimeSpan.FromSeconds(30)); + .Run(TimeSpan.FromSeconds(60)); Assert.True(beforeMigration.GotTheEvent); @@ -83,14 +83,14 @@ public async Task Should_not_lose_any_events() }); }) .Done(c => c.GotTheEvent) - .Run(TimeSpan.FromSeconds(30)); + .Run(TimeSpan.FromSeconds(60)); Assert.True(publisherMigrated.GotTheEvent); //Subscriber migrated and in compatibility mode var subscriberMigratedRunSettings = new RunSettings { - TestExecutionTimeout = TimeSpan.FromSeconds(30) + TestExecutionTimeout = TimeSpan.FromSeconds(60) }; var subscriberMigrated = await Scenario.Define() .WithEndpoint(b => @@ -135,7 +135,7 @@ public async Task Should_not_lose_any_events() }) .WithEndpoint() .Done(c => c.GotTheEvent) - .Run(TimeSpan.FromSeconds(30)); + .Run(TimeSpan.FromSeconds(60)); Assert.True(compatModeDisabled.GotTheEvent); } @@ -175,7 +175,7 @@ public Subscriber() metadata => metadata.RegisterPublisherFor(typeof(Publisher))); } - public class MyHandler : IHandleMessages + public class MyEventMessageHandler : IHandleMessages { public Context Context { get; set; } diff --git a/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/When_migrating_subscriber_first.cs b/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/When_migrating_subscriber_first.cs index e6957729a..c06ddd129 100644 --- a/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/When_migrating_subscriber_first.cs +++ b/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/When_migrating_subscriber_first.cs @@ -1,4 +1,4 @@ -namespace NServiceBus.AcceptanceTests.NativePubSub +namespace NServiceBus.AcceptanceTests.NativePubSub { using System; using System.Collections.Generic; @@ -16,7 +16,7 @@ public class When_migrating_subscriber_first : NServiceBusAcceptanceTest { static string PublisherEndpoint => Conventions.EndpointNamingConvention(typeof(Publisher)); - [Test] + [Test, UseFixedNamePrefix] public async Task Should_not_lose_any_events() { var subscriptionStorage = new TestingInMemorySubscriptionStorage(); @@ -49,14 +49,14 @@ public async Task Should_not_lose_any_events() }); }) .Done(c => c.GotTheEvent) - .Run(TimeSpan.FromSeconds(30)); + .Run(TimeSpan.FromSeconds(60)); Assert.True(beforeMigration.GotTheEvent); //Subscriber migrated and in compatibility mode. var subscriberMigratedRunSettings = new RunSettings { - TestExecutionTimeout = TimeSpan.FromSeconds(30) + TestExecutionTimeout = TimeSpan.FromSeconds(60) }; var subscriberMigrated = await Scenario.Define() .WithEndpoint(b => @@ -93,7 +93,7 @@ public async Task Should_not_lose_any_events() //Publisher migrated and in compatibility mode var publisherMigratedRunSettings = new RunSettings { - TestExecutionTimeout = TimeSpan.FromSeconds(30) + TestExecutionTimeout = TimeSpan.FromSeconds(60) }; var publisherMigrated = await Scenario.Define() .WithEndpoint(b => @@ -137,7 +137,7 @@ public async Task Should_not_lose_any_events() }) .WithEndpoint() .Done(c => c.GotTheEvent) - .Run(TimeSpan.FromSeconds(30)); + .Run(TimeSpan.FromSeconds(60)); Assert.True(compatModeDisabled.GotTheEvent); } @@ -177,7 +177,7 @@ public Subscriber() metadata => metadata.RegisterPublisherFor(typeof(Publisher))); } - public class MyHandler : IHandleMessages + public class MyEventMessageHandler : IHandleMessages { public Context Context { get; set; } diff --git a/src/NServiceBus.Transport.SQS.AcceptanceTests/SetupFixture.cs b/src/NServiceBus.Transport.SQS.AcceptanceTests/SetupFixture.cs index b9d5f8153..ed4589167 100644 --- a/src/NServiceBus.Transport.SQS.AcceptanceTests/SetupFixture.cs +++ b/src/NServiceBus.Transport.SQS.AcceptanceTests/SetupFixture.cs @@ -18,6 +18,56 @@ public class SetupFixture /// public static string NamePrefix { get; private set; } + static bool usingFixedNamePrefix; + static string namePrefixBackup; + + static string GetFixedNamePrefix() + { + var fixedNamePrefixKeyName = "NServiceBus_AmazonSQS_AT_CustomFixedNamePrefix"; + var customFixedNamePrefix = EnvironmentHelper.GetEnvironmentVariable(fixedNamePrefixKeyName); + + if (customFixedNamePrefix == null) + { + throw new Exception($"Environment variable '{fixedNamePrefixKeyName}' not set. " + + $"The variable is required by tests bound to a fixed infrastructure. " + + $"Make sure the value doesn't contain any space or dash characher."); + } + + return customFixedNamePrefix; + } + + public static void UseFixedNamePrefix() + { + usingFixedNamePrefix = true; + + namePrefixBackup = NamePrefix; + NamePrefix = GetFixedNamePrefix(); + + TestContext.WriteLine($"Using fixed name prefix: '{NamePrefix}'"); + } + + public static void RestoreNamePrefixToRandomlyGenerated() + { + if (usingFixedNamePrefix) + { + TestContext.WriteLine($"Restoring name prefix from '{NamePrefix}' to '{namePrefixBackup}'"); + NamePrefix = namePrefixBackup; + usingFixedNamePrefix = false; + } + } + + public static void AppendSequenceToNamePrefix(int sequence) + { + var idx = NamePrefix.LastIndexOf('-'); + if (idx >= 0) + { + NamePrefix = NamePrefix.Substring(0, idx); + } + NamePrefix += $"-{sequence}"; + + TestContext.WriteLine($"Sequence #{sequence} appended name prefix: '{NamePrefix}'"); + } + [OneTimeSetUp] public void OneTimeSetUp() { @@ -27,6 +77,7 @@ public void OneTimeSetUp() // us from deleting then creating a queue with the // same name in a 60 second period. NamePrefix = $"AT{Regex.Replace(Convert.ToBase64String(Guid.NewGuid().ToByteArray()), "[/+=]", "").ToUpperInvariant()}"; + TestContext.WriteLine($"Generated name prefix: '{NamePrefix}'"); } [OneTimeTearDown] @@ -42,6 +93,13 @@ public async Task OneTimeTearDown() using (var s3Client = string.IsNullOrEmpty(accessKeyId) ? SqsTransportExtensions.CreateS3Client() : new AmazonS3Client(accessKeyId, secretAccessKey)) { + var idx = NamePrefix.LastIndexOf('-'); + if (idx >= 0) + { + //remove the sequence number before cleaning up + NamePrefix = NamePrefix.Substring(0, idx); + } + await Cleanup.DeleteAllResourcesWithPrefix(sqsClient, snsClient, s3Client, NamePrefix).ConfigureAwait(false); } } diff --git a/src/NServiceBus.Transport.SQS.AcceptanceTests/TestIndependenceMutator.cs b/src/NServiceBus.Transport.SQS.AcceptanceTests/TestIndependenceMutator.cs new file mode 100644 index 000000000..25ff1da74 --- /dev/null +++ b/src/NServiceBus.Transport.SQS.AcceptanceTests/TestIndependenceMutator.cs @@ -0,0 +1,22 @@ +namespace NServiceBus.AcceptanceTests +{ + using System.Threading.Tasks; + using NServiceBus.AcceptanceTesting; + using NServiceBus.MessageMutator; + + class TestIndependenceMutator : IMutateOutgoingTransportMessages + { + readonly string testRunId; + + public TestIndependenceMutator(ScenarioContext scenarioContext) + { + testRunId = scenarioContext.TestRunId.ToString(); + } + + public Task MutateOutgoing(MutateOutgoingTransportMessageContext context) + { + context.OutgoingHeaders["$AcceptanceTesting.TestRunId"] = testRunId; + return Task.FromResult(0); + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.Transport.SQS.AcceptanceTests/TestIndependenceSkipBehavior.cs b/src/NServiceBus.Transport.SQS.AcceptanceTests/TestIndependenceSkipBehavior.cs new file mode 100644 index 000000000..72d319cca --- /dev/null +++ b/src/NServiceBus.Transport.SQS.AcceptanceTests/TestIndependenceSkipBehavior.cs @@ -0,0 +1,29 @@ +namespace NServiceBus.AcceptanceTests +{ + using System; + using System.Threading.Tasks; + using NServiceBus.AcceptanceTesting; + using NServiceBus.Pipeline; + using NUnit.Framework; + + class TestIndependenceSkipBehavior : IBehavior + { + readonly string testRunId; + + public TestIndependenceSkipBehavior(ScenarioContext scenarioContext) + { + testRunId = scenarioContext.TestRunId.ToString(); + } + + public Task Invoke(ITransportReceiveContext context, Func next) + { + if (context.Message.Headers.TryGetValue("$AcceptanceTesting.TestRunId", out var runId) && runId != testRunId) + { + TestContext.WriteLine($"Skipping message {context.Message.MessageId} from previous test run"); + return Task.FromResult(0); + } + + return next(context); + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.Transport.SQS.AcceptanceTests/UseFixedNamePrefixAttribute.cs b/src/NServiceBus.Transport.SQS.AcceptanceTests/UseFixedNamePrefixAttribute.cs new file mode 100644 index 000000000..8a4d6637b --- /dev/null +++ b/src/NServiceBus.Transport.SQS.AcceptanceTests/UseFixedNamePrefixAttribute.cs @@ -0,0 +1,14 @@ +namespace NServiceBus.AcceptanceTests +{ + using NUnit.Framework; + using NUnit.Framework.Interfaces; + using System; + + public class UseFixedNamePrefixAttribute : Attribute, ITestAction + { + public ActionTargets Targets => ActionTargets.Test; + + public void AfterTest(ITest test) => SetupFixture.RestoreNamePrefixToRandomlyGenerated(); + public void BeforeTest(ITest test) => SetupFixture.UseFixedNamePrefix(); + } +} diff --git a/src/NServiceBus.Transport.SQS.Tests/ApprovalFiles/APIApprovals.ApproveSqsTransport.approved.txt b/src/NServiceBus.Transport.SQS.Tests/ApprovalFiles/APIApprovals.ApproveSqsTransport.approved.txt index a92b284c8..81739ac55 100644 --- a/src/NServiceBus.Transport.SQS.Tests/ApprovalFiles/APIApprovals.ApproveSqsTransport.approved.txt +++ b/src/NServiceBus.Transport.SQS.Tests/ApprovalFiles/APIApprovals.ApproveSqsTransport.approved.txt @@ -39,7 +39,7 @@ namespace NServiceBus [System.ObsoleteAttribute("The compatibility mode will be deprecated in the next major version of the transp" + "ort. Switch to native publish/subscribe mode using SNS instead. Will be treated " + "as an error from version 6.0.0. Will be removed in version 7.0.0.", false)] - public static NServiceBus.SubscriptionMigrationModeSettings EnableMessageDrivenPubSubCompatibilityMode(this NServiceBus.TransportExtensions transportExtensions) { } + public static NServiceBus.Transport.SQS.Configure.SqsSubscriptionMigrationModeSettings EnableMessageDrivenPubSubCompatibilityMode(this NServiceBus.TransportExtensions transportExtensions) { } public static NServiceBus.TransportExtensions EnableV1CompatibilityMode(this NServiceBus.TransportExtensions transportExtensions) { } public static void MapEvent(this NServiceBus.TransportExtensions transportExtensions, string customTopicName) { } public static void MapEvent(this NServiceBus.TransportExtensions transportExtensions, System.Type eventType, string customTopicName) { } @@ -55,4 +55,16 @@ namespace NServiceBus public static NServiceBus.TransportExtensions TopicNamePrefix(this NServiceBus.TransportExtensions transportExtensions, string topicNamePrefix) { } public static NServiceBus.TransportExtensions UnrestrictedDurationDelayedDelivery(this NServiceBus.TransportExtensions transportExtensions) { } } +} +namespace NServiceBus.Transport.SQS.Configure +{ + [System.ObsoleteAttribute("The compatibility mode will be deprecated in the next major version of the transp" + + "ort. Switch to native publish/subscribe mode using SNS instead. Will be treated " + + "as an error from version 6.0.0. Will be removed in version 7.0.0.", false)] + public class SqsSubscriptionMigrationModeSettings : NServiceBus.SubscriptionMigrationModeSettings + { + public NServiceBus.SubscriptionMigrationModeSettings MessageVisibilityTimeout(int timeoutInSeconds) { } + public NServiceBus.SubscriptionMigrationModeSettings SubscriptionsCacheTTL(System.TimeSpan ttl) { } + public NServiceBus.Transport.SQS.Configure.SqsSubscriptionMigrationModeSettings TopicCacheTTL(System.TimeSpan ttl) { } + } } \ No newline at end of file diff --git a/src/NServiceBus.Transport.SQS.Tests/Cleanup.cs b/src/NServiceBus.Transport.SQS.Tests/Cleanup.cs index ef15891fb..2d9cc93bc 100644 --- a/src/NServiceBus.Transport.SQS.Tests/Cleanup.cs +++ b/src/NServiceBus.Transport.SQS.Tests/Cleanup.cs @@ -39,6 +39,7 @@ public async Task DeleteAllQueuesUsedForTests() { await DeleteAllQueuesWithPrefix(sqsClient, "AT"); await DeleteAllQueuesWithPrefix(sqsClient, "TT"); + await DeleteAllQueuesWithPrefix(sqsClient, "cli"); } [Test] @@ -47,6 +48,7 @@ public async Task DeleteAllSubscriptionsUsedForTests() { await DeleteAllSubscriptionsWithPrefix(snsClient, "AT"); await DeleteAllSubscriptionsWithPrefix(snsClient, "TT"); + await DeleteAllSubscriptionsWithPrefix(snsClient, "cli"); } [Test] @@ -55,6 +57,7 @@ public async Task DeleteAllTopicsUsedForTests() { await DeleteAllTopicsWithPrefix(snsClient, "AT"); await DeleteAllTopicsWithPrefix(snsClient, "TT"); + await DeleteAllTopicsWithPrefix(snsClient, "cli"); } [Test] @@ -70,6 +73,7 @@ public async Task DeleteAllResourcesWithPrefix() { await DeleteAllResourcesWithPrefix(sqsClient, snsClient, s3Client, "AT"); await DeleteAllResourcesWithPrefix(sqsClient, snsClient, s3Client, "TT"); + await DeleteAllResourcesWithPrefix(sqsClient, snsClient, s3Client, "cli"); } public static Task DeleteAllResourcesWithPrefix(IAmazonSQS sqsClient, IAmazonSimpleNotificationService snsClient, IAmazonS3 s3Client, string namePrefix) diff --git a/src/NServiceBus.Transport.SQS.Tests/MessageDispatcherTests.cs b/src/NServiceBus.Transport.SQS.Tests/MessageDispatcherTests.cs index 1f95ce387..e32f79ea4 100644 --- a/src/NServiceBus.Transport.SQS.Tests/MessageDispatcherTests.cs +++ b/src/NServiceBus.Transport.SQS.Tests/MessageDispatcherTests.cs @@ -344,9 +344,11 @@ public async Task Should_not_dispatch_multicast_operation_if_event_type_is_objec [Test] public async Task Should_upload_large_multicast_operations_request_to_s3() { + string keyPrefix = "somePrefix"; + var settings = new SettingsHolder(); var transportExtensions = new TransportExtensions(settings); - transportExtensions.S3("someBucket", "somePrefix"); + transportExtensions.S3("someBucket", keyPrefix); var mockS3Client = new MockS3Client(); var mockSnsClient = new MockSnsClient(); @@ -354,13 +356,16 @@ public async Task Should_upload_large_multicast_operations_request_to_s3() var transportConfiguration = new TransportConfiguration(settings); var dispatcher = new MessageDispatcher(transportConfiguration, mockS3Client, null, mockSnsClient, new QueueCache(null, transportConfiguration), new TopicCache(mockSnsClient, settings.SetupMessageMetadataRegistry(), transportConfiguration)); + var longBodyMessageId = Guid.NewGuid().ToString(); + /* Crazy long message id will cause the message to go over limits because attributes count as well */ + var crazyLongMessageId = new string('x', 256 * 1024); var transportOperations = new TransportOperations( new TransportOperation( - new OutgoingMessage(Guid.NewGuid().ToString(), new Dictionary(), Encoding.Default.GetBytes(new string('x', 256 * 1024))), + new OutgoingMessage(longBodyMessageId, new Dictionary(), Encoding.Default.GetBytes(new string('x', 256 * 1024))), new MulticastAddressTag(typeof(Event)), DispatchConsistency.Isolated), - new TransportOperation( /* Crazy long message id will cause the message to go over limits because attributes count as well */ - new OutgoingMessage(new string('x', 256 * 1024), new Dictionary(), Encoding.Default.GetBytes("{}")), + new TransportOperation( + new OutgoingMessage(crazyLongMessageId, new Dictionary(), Encoding.Default.GetBytes("{}")), new MulticastAddressTag(typeof(AnotherEvent)), DispatchConsistency.Isolated)); @@ -372,13 +377,13 @@ public async Task Should_upload_large_multicast_operations_request_to_s3() Assert.AreEqual(2, mockSnsClient.PublishedEvents.Count); Assert.AreEqual(2, mockS3Client.PutObjectRequestsSent.Count); - var firstUpload = mockS3Client.PutObjectRequestsSent.ElementAt(0); - var secondUpload = mockS3Client.PutObjectRequestsSent.ElementAt(1); + var longBodyMessageUpload = mockS3Client.PutObjectRequestsSent.Single(por => por.Key == $"{keyPrefix}/{longBodyMessageId}"); + var crazyLongMessageUpload = mockS3Client.PutObjectRequestsSent.Single(por => por.Key == $"{keyPrefix}/{crazyLongMessageId}"); - Assert.AreEqual("someBucket", firstUpload.BucketName); - Assert.AreEqual("someBucket", secondUpload.BucketName); - StringAssert.Contains($@"""Body"":"""",""S3BodyKey"":""{firstUpload.Key}", mockSnsClient.PublishedEvents.ElementAt(0).Message); - StringAssert.Contains($@"""Body"":"""",""S3BodyKey"":""{secondUpload.Key}", mockSnsClient.PublishedEvents.ElementAt(1).Message); + Assert.AreEqual("someBucket", longBodyMessageUpload.BucketName); + Assert.AreEqual("someBucket", crazyLongMessageUpload.BucketName); + StringAssert.Contains($@"""Body"":"""",""S3BodyKey"":""{longBodyMessageUpload.Key}", mockSnsClient.PublishedEvents.Single(pr => pr.MessageAttributes[Headers.MessageId].StringValue == longBodyMessageId).Message); + StringAssert.Contains($@"""Body"":"""",""S3BodyKey"":""{crazyLongMessageUpload.Key}", mockSnsClient.PublishedEvents.Single(pr => pr.MessageAttributes[Headers.MessageId].StringValue == crazyLongMessageId).Message); } [Test] diff --git a/src/NServiceBus.Transport.SQS.Tests/SubscriptionManagerTests.cs b/src/NServiceBus.Transport.SQS.Tests/SubscriptionManagerTests.cs index ee72e0c29..349406cf5 100644 --- a/src/NServiceBus.Transport.SQS.Tests/SubscriptionManagerTests.cs +++ b/src/NServiceBus.Transport.SQS.Tests/SubscriptionManagerTests.cs @@ -725,7 +725,8 @@ void EmulateImmediateSettlementOfPolicy(Policy initialPolicy) class TestableSubscriptionManager : SubscriptionManager { - public TestableSubscriptionManager(TransportConfiguration configuration, IAmazonSQS sqsClient, IAmazonSimpleNotificationService snsClient, string queueName, QueueCache queueCache, MessageMetadataRegistry messageMetadataRegistry, TopicCache topicCache) : base(configuration, sqsClient, snsClient, queueName, queueCache, messageMetadataRegistry, topicCache) + public TestableSubscriptionManager(TransportConfiguration configuration, IAmazonSQS sqsClient, IAmazonSimpleNotificationService snsClient, string queueName, QueueCache queueCache, MessageMetadataRegistry messageMetadataRegistry, TopicCache topicCache) + : base(configuration, sqsClient, snsClient, queueName, queueCache, messageMetadataRegistry, topicCache) { } diff --git a/src/NServiceBus.Transport.SQS/Configure/SettingsKeys.cs b/src/NServiceBus.Transport.SQS/Configure/SettingsKeys.cs index e07ec1e01..444c04e48 100644 --- a/src/NServiceBus.Transport.SQS/Configure/SettingsKeys.cs +++ b/src/NServiceBus.Transport.SQS/Configure/SettingsKeys.cs @@ -31,5 +31,10 @@ static class SettingsKeys public const string V1CompatibilityMode = Prefix + nameof(V1CompatibilityMode); public const string DisableNativePubSub = Prefix + nameof(DisableNativePubSub); public const string DisableSubscribeBatchingOnStart = Prefix + nameof(DisableSubscribeBatchingOnStart); + + public const string MessageVisibilityTimeout = Prefix + nameof(MessageVisibilityTimeout); + public const string SubscriptionsCacheTTL = Prefix + nameof(SubscriptionsCacheTTL); + public const string NotFoundTopicsCacheTTL = Prefix + nameof(NotFoundTopicsCacheTTL); + public const string EnableMigrationModeSettingKey = "NServiceBus.Subscriptions.EnableMigrationMode"; } } \ No newline at end of file diff --git a/src/NServiceBus.Transport.SQS/Configure/SqsSubscriptionMigrationModeSettings.cs b/src/NServiceBus.Transport.SQS/Configure/SqsSubscriptionMigrationModeSettings.cs new file mode 100644 index 000000000..b99ada837 --- /dev/null +++ b/src/NServiceBus.Transport.SQS/Configure/SqsSubscriptionMigrationModeSettings.cs @@ -0,0 +1,65 @@ +namespace NServiceBus.Transport.SQS.Configure +{ + using System; + using Settings; + + /// + /// Publish-subscribe migration mode configuration. + /// + [ObsoleteEx( + Message = @"The compatibility mode will be deprecated in the next major version of the transport. Switch to native publish/subscribe mode using SNS instead.", + TreatAsErrorFromVersion = "6.0", + RemoveInVersion = "7.0")] + public class SqsSubscriptionMigrationModeSettings : SubscriptionMigrationModeSettings + { + SettingsHolder settings; + + internal SqsSubscriptionMigrationModeSettings(SettingsHolder settings) : base(settings) + { + this.settings = settings; + } + + /// + /// Overrides the default value of 5 seconds for SNS topic cache. + /// + /// Topic cache TTL. + public SqsSubscriptionMigrationModeSettings TopicCacheTTL(TimeSpan ttl) + { + Guard.AgainstNegativeAndZero(nameof(ttl), ttl); + + settings.Set(SettingsKeys.NotFoundTopicsCacheTTL, ttl); + + return this; + } + + /// + /// Overrides the default value of 5 seconds for SNS topic subscribers cache. + /// + /// Subscription cache TTL. + public SubscriptionMigrationModeSettings SubscriptionsCacheTTL(TimeSpan ttl) + { + Guard.AgainstNegativeAndZero(nameof(ttl), ttl); + + settings.Set(SettingsKeys.SubscriptionsCacheTTL, ttl); + + return this; + } + + /// + /// Overrides the default value of 30 seconds for SQS message visibility timeout. + /// + /// Message visibility timeout. + public SubscriptionMigrationModeSettings MessageVisibilityTimeout(int timeoutInSeconds) + { + //HINT: See https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html + if (timeoutInSeconds < 0 || timeoutInSeconds > TimeSpan.FromHours(12).TotalSeconds) + { + throw new ArgumentOutOfRangeException(nameof(timeoutInSeconds)); + } + + settings.Set(SettingsKeys.MessageVisibilityTimeout, timeoutInSeconds); + + return this; + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.Transport.SQS/Configure/SqsTransportInfrastructure.cs b/src/NServiceBus.Transport.SQS/Configure/SqsTransportInfrastructure.cs index a47ad6fcc..8a7860f18 100644 --- a/src/NServiceBus.Transport.SQS/Configure/SqsTransportInfrastructure.cs +++ b/src/NServiceBus.Transport.SQS/Configure/SqsTransportInfrastructure.cs @@ -111,6 +111,13 @@ public override TransportReceiveInfrastructure ConfigureReceiveInfrastructure() public override TransportSendInfrastructure ConfigureSendInfrastructure() { + if (configuration.UsingMessageDrivenPubSubCompatibilityMode && configuration.UsingDefaultMessageVisibilityTimeout) + { + Logger.Warn("When using message driven pub/sub compatibility mode, " + + "it's suggested to set a custom message visibility timeout. " + + "Refer to the message driven pub/sub compatibility mode documentation for more information."); + } + return new TransportSendInfrastructure( CreateMessageDispatcher, () => Task.FromResult(StartupCheckResult.Success)); diff --git a/src/NServiceBus.Transport.SQS/Configure/SqsTransportSettings.cs b/src/NServiceBus.Transport.SQS/Configure/SqsTransportSettings.cs index a4c56eef2..983540e26 100644 --- a/src/NServiceBus.Transport.SQS/Configure/SqsTransportSettings.cs +++ b/src/NServiceBus.Transport.SQS/Configure/SqsTransportSettings.cs @@ -21,11 +21,11 @@ public static class SqsTransportSettings Message = @"The compatibility mode will be deprecated in the next major version of the transport. Switch to native publish/subscribe mode using SNS instead.", TreatAsErrorFromVersion = "6.0", RemoveInVersion = "7.0")] - public static SubscriptionMigrationModeSettings EnableMessageDrivenPubSubCompatibilityMode(this TransportExtensions transportExtensions) + public static SqsSubscriptionMigrationModeSettings EnableMessageDrivenPubSubCompatibilityMode(this TransportExtensions transportExtensions) { var settings = transportExtensions.GetSettings(); - settings.Set("NServiceBus.Subscriptions.EnableMigrationMode", true); - return new SubscriptionMigrationModeSettings(settings); + settings.Set(SettingsKeys.EnableMigrationModeSettingKey, true); + return new SqsSubscriptionMigrationModeSettings(settings); } /// diff --git a/src/NServiceBus.Transport.SQS/Extensions/SnsClientExtensions.cs b/src/NServiceBus.Transport.SQS/Extensions/SnsClientExtensions.cs index 77bb759c2..a269f1519 100644 --- a/src/NServiceBus.Transport.SQS/Extensions/SnsClientExtensions.cs +++ b/src/NServiceBus.Transport.SQS/Extensions/SnsClientExtensions.cs @@ -8,25 +8,29 @@ namespace NServiceBus.Transport.SQS.Extensions static class SnsClientExtensions { - public static Task FindMatchingSubscription(this IAmazonSimpleNotificationService snsClient, QueueCache queueCache, TopicCache topicCache, MessageMetadata metadata, string queueName) + public static async Task FindMatchingSubscription(this IAmazonSimpleNotificationService snsClient, QueueCache queueCache, TopicCache topicCache, MessageMetadata metadata, string queueName, SnsListSubscriptionsByTopicRateLimiter snsListSubscriptionsByTopicRateLimiter) { - var topicName = topicCache.GetTopicName(metadata); - return snsClient.FindMatchingSubscription(queueCache, topicName, queueName); + var topic = await topicCache.GetTopic(metadata).ConfigureAwait(false); + return await snsClient.FindMatchingSubscription(queueCache, topic, queueName, snsListSubscriptionsByTopicRateLimiter).ConfigureAwait(false); } - public static async Task FindMatchingSubscription(this IAmazonSimpleNotificationService snsClient, QueueCache queueCache, string topicName, string queueName) + public static async Task FindMatchingSubscription(this IAmazonSimpleNotificationService snsClient, QueueCache queueCache, string topicName, string queueName, SnsListTopicsRateLimiter snsListTopicsRateLimiter, SnsListSubscriptionsByTopicRateLimiter snsListSubscriptionsByTopicRateLimiter) { - var existingTopic = await snsClient.FindTopicAsync(topicName).ConfigureAwait(false); + var existingTopic = await snsListTopicsRateLimiter.Execute(async () => + { + return await snsClient.FindTopicAsync(topicName).ConfigureAwait(false); + }).ConfigureAwait(false); + if (existingTopic == null) { return null; } - return await snsClient.FindMatchingSubscription(queueCache, existingTopic, queueName) + return await snsClient.FindMatchingSubscription(queueCache, existingTopic, queueName, snsListSubscriptionsByTopicRateLimiter) .ConfigureAwait(false); } - public static async Task FindMatchingSubscription(this IAmazonSimpleNotificationService snsClient, QueueCache queueCache, Topic topic, string queueName) + public static async Task FindMatchingSubscription(this IAmazonSimpleNotificationService snsClient, QueueCache queueCache, Topic topic, string queueName, SnsListSubscriptionsByTopicRateLimiter snsListSubscriptionsByTopicRateLimiter = null) { var physicalQueueName = queueCache.GetPhysicalQueueName(queueName); @@ -34,8 +38,19 @@ public static async Task FindMatchingSubscription(this IAmazonSimpleNoti do { - upToAHundredSubscriptions = await snsClient.ListSubscriptionsByTopicAsync(topic.TopicArn, upToAHundredSubscriptions?.NextToken) - .ConfigureAwait(false); + if (snsListSubscriptionsByTopicRateLimiter != null) + { + upToAHundredSubscriptions = await snsListSubscriptionsByTopicRateLimiter.Execute(async () => + { + return await snsClient.ListSubscriptionsByTopicAsync(topic.TopicArn, upToAHundredSubscriptions?.NextToken) + .ConfigureAwait(false); + }).ConfigureAwait(false); + } + else + { + upToAHundredSubscriptions = await snsClient.ListSubscriptionsByTopicAsync(topic.TopicArn, upToAHundredSubscriptions?.NextToken) + .ConfigureAwait(false); + } // ReSharper disable once ForeachCanBePartlyConvertedToQueryUsingAnotherGetEnumerator foreach (var upToAHundredSubscription in upToAHundredSubscriptions.Subscriptions) diff --git a/src/NServiceBus.Transport.SQS/Guard.cs b/src/NServiceBus.Transport.SQS/Guard.cs index 1a491954f..e4d54a2d3 100644 --- a/src/NServiceBus.Transport.SQS/Guard.cs +++ b/src/NServiceBus.Transport.SQS/Guard.cs @@ -19,5 +19,13 @@ public static void AgainstNullAndEmpty(string argumentName, string value) throw new ArgumentNullException(argumentName); } } + + public static void AgainstNegativeAndZero(string argumentName, TimeSpan value) + { + if (value <= TimeSpan.Zero) + { + throw new ArgumentOutOfRangeException(argumentName); + } + } } } \ No newline at end of file diff --git a/src/NServiceBus.Transport.SQS/HybridPubSubChecker.cs b/src/NServiceBus.Transport.SQS/HybridPubSubChecker.cs new file mode 100644 index 000000000..6bc080cac --- /dev/null +++ b/src/NServiceBus.Transport.SQS/HybridPubSubChecker.cs @@ -0,0 +1,108 @@ +namespace NServiceBus.Transport.SQS +{ + using System; + using System.Collections.Concurrent; + using System.Collections.Generic; + using System.Threading.Tasks; + using Amazon.SimpleNotificationService; + using NServiceBus.Logging; + using NServiceBus.Transport.SQS.Extensions; + + class HybridPubSubChecker + { + class SubscriptionsCacheItem + { + public bool IsThereAnSnsSubscription { get; set; } + public DateTime Age { get; } = DateTime.Now; + } + + public HybridPubSubChecker(TransportConfiguration configuration) + { + rateLimiter = configuration.SnsListSubscriptionsByTopicRateLimiter; + cacheTTL = configuration.SubscriptionsCacheTTL; + } + + bool TryGetFromCache(string cacheKey, out SubscriptionsCacheItem item) + { + item = null; + if (subscriptionsCache.TryGetValue(cacheKey, out var cacheItem)) + { + Logger.Debug($"Subscription found in cache, key: '{cacheKey}'."); + if (cacheItem.Age.Add(cacheTTL) < DateTime.Now) + { + Logger.Debug($"Removing subscription '{cacheKey}' from cache: TTL expired."); + subscriptionsCache.TryRemove(cacheKey, out _); + } + else + { + item = cacheItem; + } + } + else + { + Logger.Debug($"Subscription not found in cache, key: '{cacheKey}'."); + } + + return item != null; + } + + public async Task PublishUsingMessageDrivenPubSub(UnicastTransportOperation unicastTransportOperation, HashSet messageIdsOfMulticastedEvents, TopicCache topicCache, QueueCache queueCache, IAmazonSimpleNotificationService snsClient) + { + // The following check is required by the message-driven pub/sub hybrid mode in Core + // to allow endpoints to migrate from message-driven pub/sub to native pub/sub + // If the message we're trying to dispatch is a unicast message with a `Publish` intent + // but the subscriber is also subscribed via SNS we don't want to dispatch the message twice + // the subscriber will receive it via SNS and not via a unicast send. + // We can improve the situation a bit by caching the information and thus reduce the amount of times we hit the SNS API. + // We need to think abut what happens in case the destination endpoint unsubscribes from the event. + // these conditions are carefully chosen to only execute the code if really necessary + if (unicastTransportOperation != null + && messageIdsOfMulticastedEvents.Contains(unicastTransportOperation.Message.MessageId) + && unicastTransportOperation.Message.GetMessageIntent() == MessageIntentEnum.Publish + && unicastTransportOperation.Message.Headers.ContainsKey(Headers.EnclosedMessageTypes)) + { + var mostConcreteEnclosedMessageType = unicastTransportOperation.Message.GetEnclosedMessageTypes()[0]; + var existingTopic = await topicCache.GetTopic(mostConcreteEnclosedMessageType).ConfigureAwait(false); + if (existingTopic == null) + { + return true; + } + + var cacheKey = existingTopic.TopicArn + unicastTransportOperation.Destination; + Logger.Debug($"Performing first subscription cache lookup for '{cacheKey}'."); + if (!TryGetFromCache(cacheKey, out var cacheItem)) + { + cacheItem = await rateLimiter.Execute(async () => + { + Logger.Debug($"Performing second subscription cache lookup for '{cacheKey}'."); + if (TryGetFromCache(cacheKey, out var secondAttemptItem)) + { + return secondAttemptItem; + } + + Logger.Debug($"Finding matching subscription for key '{cacheKey}' using SNS API."); + var matchingSubscriptionArn = await snsClient.FindMatchingSubscription(queueCache, existingTopic, unicastTransportOperation.Destination) + .ConfigureAwait(false); + + return new SubscriptionsCacheItem { IsThereAnSnsSubscription = matchingSubscriptionArn != null }; + }).ConfigureAwait(false); + + Logger.Debug($"Adding subscription to cache as '{(cacheItem.IsThereAnSnsSubscription ? "found" : "not found")}', key: '{cacheKey}'."); + _ = subscriptionsCache.TryAdd(cacheKey, cacheItem); + } + + if (cacheItem.IsThereAnSnsSubscription) + { + return false; + } + } + + return true; + } + + SnsListSubscriptionsByTopicRateLimiter rateLimiter; + readonly TimeSpan cacheTTL; + readonly ConcurrentDictionary subscriptionsCache = new ConcurrentDictionary(); + static ILog Logger = LogManager.GetLogger(typeof(HybridPubSubChecker)); + } +} \ No newline at end of file diff --git a/src/NServiceBus.Transport.SQS/InputQueuePump.cs b/src/NServiceBus.Transport.SQS/InputQueuePump.cs index 200350e5d..4dc9f475e 100644 --- a/src/NServiceBus.Transport.SQS/InputQueuePump.cs +++ b/src/NServiceBus.Transport.SQS/InputQueuePump.cs @@ -84,7 +84,8 @@ public void Start(int maximumProcessingConcurrency, CancellationToken token) QueueUrl = inputQueueUrl, WaitTimeSeconds = 20, AttributeNames = new List { "SentTimestamp" }, - MessageAttributeNames = new List { "*" } + MessageAttributeNames = new List { "*" }, + VisibilityTimeout = configuration.MessageVisibilityTimeout, }; maxConcurrencySemaphore = new SemaphoreSlim(maxConcurrency); diff --git a/src/NServiceBus.Transport.SQS/MessageDispatcher.cs b/src/NServiceBus.Transport.SQS/MessageDispatcher.cs index c94085d42..6e17d08d6 100644 --- a/src/NServiceBus.Transport.SQS/MessageDispatcher.cs +++ b/src/NServiceBus.Transport.SQS/MessageDispatcher.cs @@ -12,7 +12,6 @@ using Amazon.SQS.Model; using DelayedDelivery; using Extensibility; - using Extensions; using Logging; using SimpleJson; using Transport; @@ -27,6 +26,7 @@ public MessageDispatcher(TransportConfiguration configuration, IAmazonS3 s3Clien this.s3Client = s3Client; this.sqsClient = sqsClient; this.queueCache = queueCache; + hybridPubSubChecker = new HybridPubSubChecker(configuration); serializerStrategy = configuration.UseV1CompatiblePayload ? SimpleJson.PocoJsonSerializerStrategy : ReducedPayloadSerializerStrategy.Instance; } @@ -244,23 +244,9 @@ async Task PrepareMessage(IOutgoingTransportOperation transp { var unicastTransportOperation = transportOperation as UnicastTransportOperation; - // these conditions are carefully chosen to only execute the code if really necessary - if (unicastTransportOperation != null - && messageIdsOfMulticastedEvents.Contains(unicastTransportOperation.Message.MessageId) - && unicastTransportOperation.Message.GetMessageIntent() == MessageIntentEnum.Publish - && unicastTransportOperation.Message.Headers.ContainsKey(Headers.EnclosedMessageTypes)) + if (!await hybridPubSubChecker.PublishUsingMessageDrivenPubSub(unicastTransportOperation, messageIdsOfMulticastedEvents, topicCache, queueCache, snsClient).ConfigureAwait(false)) { - var mostConcreteEnclosedMessageType = unicastTransportOperation.Message.GetEnclosedMessageTypes()[0]; - var existingTopic = await topicCache.GetTopicArn(mostConcreteEnclosedMessageType).ConfigureAwait(false); - if (existingTopic != null) - { - var matchingSubscriptionArn = await snsClient.FindMatchingSubscription(queueCache, existingTopic, unicastTransportOperation.Destination) - .ConfigureAwait(false); - if (matchingSubscriptionArn != null) - { - return null; - } - } + return null; } var delayDeliveryWith = transportOperation.DeliveryConstraints.OfType().SingleOrDefault(); @@ -420,6 +406,7 @@ void ApplyServerSideEncryptionConfiguration(PutObjectRequest putObjectRequest) IAmazonS3 s3Client; QueueCache queueCache; IJsonSerializerStrategy serializerStrategy; + readonly HybridPubSubChecker hybridPubSubChecker; static readonly HashSet emptyHashset = new HashSet(); static ILog Logger = LogManager.GetLogger(typeof(MessageDispatcher)); diff --git a/src/NServiceBus.Transport.SQS/RateLimiter/AwaitableConstraint.cs b/src/NServiceBus.Transport.SQS/RateLimiter/AwaitableConstraint.cs new file mode 100644 index 000000000..a1f20c9bd --- /dev/null +++ b/src/NServiceBus.Transport.SQS/RateLimiter/AwaitableConstraint.cs @@ -0,0 +1,88 @@ +namespace NServiceBus.Transport.SQS +{ + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Threading; + using System.Threading.Tasks; + using NServiceBus.Logging; + + partial class RateLimiter + { + class AwaitableConstraint + { + public AwaitableConstraint(int maxAllowedRequests, TimeSpan timeConstraint, string apiName) + { + if (maxAllowedRequests <= 0) + { + throw new ArgumentException($"{nameof(maxAllowedRequests)} must be greater than 0.", nameof(maxAllowedRequests)); + } + + if (timeConstraint.TotalMilliseconds <= 0) + { + throw new ArgumentException($"{nameof(timeConstraint)} must be greater than 0.", nameof(timeConstraint)); + } + + this.maxAllowedRequests = maxAllowedRequests; + this.timeConstraint = timeConstraint; + this.apiName = apiName; + requestsTimeStamps = new SizeConstrainedStack(this.maxAllowedRequests); + } + + public async Task WaitIfNeeded() + { + await semaphore.WaitAsync().ConfigureAwait(false); + + var requestsCount = 0; + var now = DateTime.Now; + var allocatedTimeLowerBound = now - timeConstraint; + var request = requestsTimeStamps.First; + LinkedListNode lastRequest = null; + while ((request != null) && (request.Value > allocatedTimeLowerBound)) + { + //counting how many requests have already + //been performed within the allocated time + lastRequest = request; + request = request.Next; + requestsCount++; + } + + if (requestsCount < maxAllowedRequests) + { + return new DisposableAction(OnActionDisposed); + } + + Debug.Assert(request == null); + Debug.Assert(lastRequest != null); + var timeToWait = lastRequest.Value.Add(timeConstraint) - now; + try + { + Logger.Info($"Requests threshold of {maxAllowedRequests} requests every {timeConstraint} reached for API '{apiName}'. Waiting {timeToWait}."); + await Task.Delay(timeToWait).ConfigureAwait(false); + } + catch (Exception) + { + _ = semaphore.Release(); + throw; + } + + return new DisposableAction(OnActionDisposed); + } + + void OnActionDisposed() + { + //pushing as time stamp the request completion time + requestsTimeStamps.Push(DateTime.Now); + _ = semaphore.Release(); + } + + readonly SizeConstrainedStack requestsTimeStamps; + + readonly int maxAllowedRequests; + TimeSpan timeConstraint; + readonly string apiName; + readonly SemaphoreSlim semaphore = new SemaphoreSlim(1, 1); + static ILog Logger = LogManager.GetLogger(typeof(AwaitableConstraint)); + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.Transport.SQS/RateLimiter/DisposableAction.cs b/src/NServiceBus.Transport.SQS/RateLimiter/DisposableAction.cs new file mode 100644 index 000000000..4339a1e30 --- /dev/null +++ b/src/NServiceBus.Transport.SQS/RateLimiter/DisposableAction.cs @@ -0,0 +1,23 @@ +namespace NServiceBus.Transport.SQS +{ + using System; + + partial class RateLimiter + { + class DisposableAction : IDisposable + { + public DisposableAction(Action onDisposedCallback) + { + this.onDisposedCallback = onDisposedCallback; + } + + public void Dispose() + { + onDisposedCallback?.Invoke(); + onDisposedCallback = null; + } + + Action onDisposedCallback; + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.Transport.SQS/RateLimiter/RateLimiter.cs b/src/NServiceBus.Transport.SQS/RateLimiter/RateLimiter.cs new file mode 100644 index 000000000..6dbefc2ef --- /dev/null +++ b/src/NServiceBus.Transport.SQS/RateLimiter/RateLimiter.cs @@ -0,0 +1,22 @@ +namespace NServiceBus.Transport.SQS +{ + using System; + using System.Threading.Tasks; + + //implementation adapted from https://david-desmaisons.github.io/RateLimiter/ + //we couldn't use the OSS package due to dependencies constrains: the above-linked package requires .NET 4.7.2 + abstract partial class RateLimiter + { + protected RateLimiter(int maxAllowedRequests, TimeSpan timeConstraint, string limitedApiName) => awaitableConstraint = new AwaitableConstraint(maxAllowedRequests, timeConstraint, limitedApiName); + + public async Task Execute(Func> taskToExecute) + { + using (await awaitableConstraint.WaitIfNeeded().ConfigureAwait(false)) + { + return await taskToExecute().ConfigureAwait(false); + } + } + + readonly AwaitableConstraint awaitableConstraint; + } +} \ No newline at end of file diff --git a/src/NServiceBus.Transport.SQS/RateLimiter/SizeConstrainedStack.cs b/src/NServiceBus.Transport.SQS/RateLimiter/SizeConstrainedStack.cs new file mode 100644 index 000000000..56d21959a --- /dev/null +++ b/src/NServiceBus.Transport.SQS/RateLimiter/SizeConstrainedStack.cs @@ -0,0 +1,27 @@ +namespace NServiceBus.Transport.SQS +{ + using System.Collections.Generic; + + partial class RateLimiter + { + class SizeConstrainedStack : LinkedList + { + public SizeConstrainedStack(int maxSize) + { + this.maxSize = maxSize; + } + + public void Push(T item) + { + AddFirst(item); + + if (Count > maxSize) + { + RemoveLast(); + } + } + + readonly int maxSize; + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.Transport.SQS/RateLimiter/SnsListSubscriptionsByTopicRateLimiter.cs b/src/NServiceBus.Transport.SQS/RateLimiter/SnsListSubscriptionsByTopicRateLimiter.cs new file mode 100644 index 000000000..38444e8be --- /dev/null +++ b/src/NServiceBus.Transport.SQS/RateLimiter/SnsListSubscriptionsByTopicRateLimiter.cs @@ -0,0 +1,13 @@ +namespace NServiceBus.Transport.SQS +{ + using System; + + class SnsListSubscriptionsByTopicRateLimiter : RateLimiter + { + public SnsListSubscriptionsByTopicRateLimiter() + : base(30, TimeSpan.FromSeconds(1), "ListSubscriptionsByTopic") + { + + } + } +} diff --git a/src/NServiceBus.Transport.SQS/RateLimiter/SnsListTopicsRateLimiter.cs b/src/NServiceBus.Transport.SQS/RateLimiter/SnsListTopicsRateLimiter.cs new file mode 100644 index 000000000..2f9c02c07 --- /dev/null +++ b/src/NServiceBus.Transport.SQS/RateLimiter/SnsListTopicsRateLimiter.cs @@ -0,0 +1,13 @@ +namespace NServiceBus.Transport.SQS +{ + using System; + + class SnsListTopicsRateLimiter : RateLimiter + { + public SnsListTopicsRateLimiter() + : base(30, TimeSpan.FromSeconds(1), "ListTopics") + { + + } + } +} diff --git a/src/NServiceBus.Transport.SQS/SubscriptionManager.cs b/src/NServiceBus.Transport.SQS/SubscriptionManager.cs index 5d015248c..9bbe59abe 100644 --- a/src/NServiceBus.Transport.SQS/SubscriptionManager.cs +++ b/src/NServiceBus.Transport.SQS/SubscriptionManager.cs @@ -87,7 +87,7 @@ async Task DeleteSubscription(MessageMetadata metadata) foreach (var mappedTopicName in mappedTopicsNames) { //we skip the topic name generation assuming the topic name is already good - var mappedTypeMatchingSubscription = await snsClient.FindMatchingSubscription(queueCache, mappedTopicName, queueName) + var mappedTypeMatchingSubscription = await snsClient.FindMatchingSubscription(queueCache, mappedTopicName, queueName, configuration.SnsListTopicsRateLimiter, configuration.SnsListSubscriptionsByTopicRateLimiter) .ConfigureAwait(false); if (mappedTypeMatchingSubscription != null) { @@ -107,7 +107,7 @@ async Task DeleteSubscription(MessageMetadata metadata) continue; } - var mappedTypeMatchingSubscription = await snsClient.FindMatchingSubscription(queueCache, topicCache, mappedTypeMetadata, queueName) + var mappedTypeMatchingSubscription = await snsClient.FindMatchingSubscription(queueCache, topicCache, mappedTypeMetadata, queueName, configuration.SnsListSubscriptionsByTopicRateLimiter) .ConfigureAwait(false); if (mappedTypeMatchingSubscription != null) { @@ -117,7 +117,7 @@ async Task DeleteSubscription(MessageMetadata metadata) } } - var matchingSubscriptionArn = await snsClient.FindMatchingSubscription(queueCache, topicCache, metadata, queueName) + var matchingSubscriptionArn = await snsClient.FindMatchingSubscription(queueCache, topicCache, metadata, queueName, configuration.SnsListSubscriptionsByTopicRateLimiter) .ConfigureAwait(false); if (matchingSubscriptionArn != null) { diff --git a/src/NServiceBus.Transport.SQS/TopicCache.cs b/src/NServiceBus.Transport.SQS/TopicCache.cs index 77385cfe1..36e72623f 100644 --- a/src/NServiceBus.Transport.SQS/TopicCache.cs +++ b/src/NServiceBus.Transport.SQS/TopicCache.cs @@ -4,11 +4,20 @@ namespace NServiceBus.Transport.SQS using System.Collections.Concurrent; using System.Threading.Tasks; using Amazon.SimpleNotificationService; + using Amazon.SimpleNotificationService.Model; using Configure; + using NServiceBus.Logging; using Unicast.Messages; class TopicCache { + class TopicCacheItem + { + public Topic Topic { get; set; } + + public DateTime CreatedOn { get; } = DateTime.Now; + } + public TopicCache(IAmazonSimpleNotificationService snsClient, MessageMetadataRegistry messageMetadataRegistry, TransportConfiguration configuration) { this.configuration = configuration; @@ -23,6 +32,11 @@ public TopicCache(IAmazonSimpleNotificationService snsClient, MessageMetadataReg public EventToTopicsMappings CustomEventToTopicsMappings { get; } public Task GetTopicArn(MessageMetadata metadata) + { + return GetAndCacheTopicIfFound(metadata).ContinueWith(t => t.Result?.TopicArn); + } + + public Task GetTopic(MessageMetadata metadata) { return GetAndCacheTopicIfFound(metadata); } @@ -30,10 +44,16 @@ public Task GetTopicArn(MessageMetadata metadata) public Task GetTopicArn(Type eventType) { var metadata = messageMetadataRegistry.GetMessageMetadata(eventType); - return GetAndCacheTopicIfFound(metadata); + return GetAndCacheTopicIfFound(metadata).ContinueWith(t => t.Result?.TopicArn); } public Task GetTopicArn(string messageTypeIdentifier) + { + var metadata = messageMetadataRegistry.GetMessageMetadata(messageTypeIdentifier); + return GetAndCacheTopicIfFound(metadata).ContinueWith(t => t.Result?.TopicArn); + } + + public Task GetTopic(string messageTypeIdentifier) { var metadata = messageMetadataRegistry.GetMessageMetadata(messageTypeIdentifier); return GetAndCacheTopicIfFound(metadata); @@ -49,22 +69,76 @@ public string GetTopicName(MessageMetadata metadata) return topicNameCache.GetOrAdd(metadata.MessageType, configuration.TopicNameGenerator(metadata)); } - async Task GetAndCacheTopicIfFound(MessageMetadata metadata) + bool TryGetTopicFromCache(MessageMetadata metadata, out Topic topic) + { + if (topicCache.TryGetValue(metadata.MessageType, out var topicCacheItem)) + { + if (topicCacheItem.Topic == null && topicCacheItem.CreatedOn.Add(configuration.NotFoundTopicsCacheTTL) < DateTime.Now) + { + Logger.Debug($"Removing topic '' with key '{metadata.MessageType}' from cache: TTL expired."); + _ = topicCache.TryRemove(metadata.MessageType, out _); + } + else + { + Logger.Debug($"Returning topic for '{metadata.MessageType}' from cache. Topic '{topicCacheItem.Topic?.TopicArn ?? ""}'."); + topic = topicCacheItem.Topic; + return true; + } + } + + topic = null; + return false; + } + + async Task GetAndCacheTopicIfFound(MessageMetadata metadata) { - if (topicCache.TryGetValue(metadata.MessageType, out var topic)) + Logger.Debug($"Performing firt Topic cache lookup for '{metadata.MessageType}'."); + if (TryGetTopicFromCache(metadata, out var cachedTopic)) + { + return cachedTopic; + } + + Logger.Debug($"Topic for '{metadata.MessageType}' not found in cache."); + + var foundTopic = await configuration.SnsListTopicsRateLimiter.Execute(async () => + { + /* + * Rate limiter serializes requests, only 1 thread is allowed per + * rate limiter. Before trying to reach out to SNS we do another + * cache lookup + */ + Logger.Debug($"Performing second Topic cache lookup for '{metadata.MessageType}'."); + if (TryGetTopicFromCache(metadata, out var cachedValue)) + { + return cachedValue; + } + + var topicName = GetTopicName(metadata); + Logger.Debug($"Finding topic '{topicName}' using 'ListTopics' SNS API."); + + return await snsClient.FindTopicAsync(topicName).ConfigureAwait(false); + }).ConfigureAwait(false); + + //We cache also null/not found topics, they'll be wiped + //from the cache at lookup time based on the configured TTL + var added = topicCache.TryAdd(metadata.MessageType, new TopicCacheItem() { Topic = foundTopic }); + if (added) + { + Logger.Debug($"Added topic '{foundTopic?.TopicArn ?? ""}' to cache. Cache items count: {topicCache.Count}."); + } + else { - return topic; + Logger.Debug($"Topic already present in cache. Topic '{foundTopic?.TopicArn ?? ""}'. Cache items count: {topicCache.Count}."); } - var topicName = GetTopicName(metadata); - var foundTopic = await snsClient.FindTopicAsync(topicName).ConfigureAwait(false); - return foundTopic != null ? topicCache.GetOrAdd(metadata.MessageType, foundTopic.TopicArn) : null; + return foundTopic; } IAmazonSimpleNotificationService snsClient; MessageMetadataRegistry messageMetadataRegistry; TransportConfiguration configuration; - ConcurrentDictionary topicCache = new ConcurrentDictionary(); + ConcurrentDictionary topicCache = new ConcurrentDictionary(); ConcurrentDictionary topicNameCache = new ConcurrentDictionary(); + static ILog Logger = LogManager.GetLogger(typeof(TopicCache)); } } \ No newline at end of file diff --git a/src/NServiceBus.Transport.SQS/TransportConfiguration.cs b/src/NServiceBus.Transport.SQS/TransportConfiguration.cs index 917fb9f68..9efe5a125 100644 --- a/src/NServiceBus.Transport.SQS/TransportConfiguration.cs +++ b/src/NServiceBus.Transport.SQS/TransportConfiguration.cs @@ -329,6 +329,52 @@ public IReadOnlyList NamespaceConditionsForPolicies } } + public int MessageVisibilityTimeout + { + get + { + if (!messageVisibilityTimeout.HasValue) + { + messageVisibilityTimeout = settings.GetOrDefault(SettingsKeys.MessageVisibilityTimeout) ?? 30; + } + + return messageVisibilityTimeout.Value; + } + } + + public bool UsingDefaultMessageVisibilityTimeout => !settings.HasSetting(SettingsKeys.MessageVisibilityTimeout); + public bool UsingMessageDrivenPubSubCompatibilityMode => settings.HasSetting(SettingsKeys.MessageVisibilityTimeout) && settings.Get(SettingsKeys.EnableMigrationModeSettingKey); + + public TimeSpan SubscriptionsCacheTTL + { + get + { + if (!subscriptionsCacheTTL.HasValue) + { + subscriptionsCacheTTL = settings.GetOrDefault(SettingsKeys.SubscriptionsCacheTTL) ?? TimeSpan.FromSeconds(5); + } + + return subscriptionsCacheTTL.Value; + } + } + + public TimeSpan NotFoundTopicsCacheTTL + { + get + { + if (!notFoundTopicsCacheTTL.HasValue) + { + notFoundTopicsCacheTTL = settings.GetOrDefault(SettingsKeys.NotFoundTopicsCacheTTL) ?? TimeSpan.FromSeconds(5); + } + + return notFoundTopicsCacheTTL.Value; + } + } + + public SnsListTopicsRateLimiter SnsListTopicsRateLimiter { get; } = new SnsListTopicsRateLimiter(); + + public SnsListSubscriptionsByTopicRateLimiter SnsListSubscriptionsByTopicRateLimiter { get; } = new SnsListSubscriptionsByTopicRateLimiter(); + public EventToTopicsMappings CustomEventToTopicsMappings => settings.GetOrDefault(); public EventToEventsMappings CustomEventToEventsMappings => settings.GetOrDefault(); @@ -370,6 +416,9 @@ public IReadOnlyList NamespaceConditionsForPolicies bool? preTruncateTopicNames; bool? useV1CompatiblePayload; int? queueDelayTime; + int? messageVisibilityTimeout; + TimeSpan? subscriptionsCacheTTL; + TimeSpan? notFoundTopicsCacheTTL; Func s3ClientFactory; Func sqsClientFactory; Func snsClientFactory;