Skip to content

Commit

Permalink
Use a rate limiter and in-memory cache to fix rate exceeded exception…
Browse files Browse the repository at this point in the history
… when using pub/sub hybrid mode (#998)

* Adjust class names to respect conventions

Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com>
Co-Authored-By: Dennis van der Stelt <dvdstelt@gmail.com>

* Guard against negative and zero

Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com>
Co-Authored-By: Dennis van der Stelt <dvdstelt@gmail.com>

* Make sure that clean up cleans CLI tests artifacts

Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com>
Co-Authored-By: Dennis van der Stelt <dvdstelt@gmail.com>

* Allow tests to use a fixed name infrastrucutre

Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com>
Co-Authored-By: Dennis van der Stelt <dvdstelt@gmail.com>

* Configure tests for tests independency

Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com>
Co-Authored-By: Dennis van der Stelt <dvdstelt@gmail.com>

* Set test execution timeout only if needed

Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com>
Co-Authored-By: Dennis van der Stelt <dvdstelt@gmail.com>

* Use fixed name infrastructure for migration tests

Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com>
Co-Authored-By: Dennis van der Stelt <dvdstelt@gmail.com>

* Extend test execution timeout for migration tests

Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com>
Co-Authored-By: Dennis van der Stelt <dvdstelt@gmail.com>

* Do not rely on oder in message dispatcher tests

Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com>
Co-Authored-By: Dennis van der Stelt <dvdstelt@gmail.com>

* Allow tests to append a sequence # to name prefix

Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com>
Co-Authored-By: Dennis van der Stelt <dvdstelt@gmail.com>

* Define rate exceed failing test cases

Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com>
Co-Authored-By: Dennis van der Stelt <dvdstelt@gmail.com>

* Allow custom subscription migration settings

Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com>
Co-Authored-By: Dennis van der Stelt <dvdstelt@gmail.com>

* Introduce the concept of rate limiting

Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com>
Co-Authored-By: Dennis van der Stelt <dvdstelt@gmail.com>

* Define custom rate limiters to throttled APIs

Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com>
Co-Authored-By: Dennis van der Stelt <dvdstelt@gmail.com>

* Use rate limiter when fetching subscriptions

Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com>
Co-Authored-By: Dennis van der Stelt <dvdstelt@gmail.com>

* Cache null topics lookups, use rate limiter if needed

Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com>
Co-Authored-By: Dennis van der Stelt <dvdstelt@gmail.com>

* Extract hybrid mode check, cache subscriptions lookups

Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com>
Co-Authored-By: Dennis van der Stelt <dvdstelt@gmail.com>

* Use user configured message visibility timeout

Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com>
Co-Authored-By: Dennis van der Stelt <dvdstelt@gmail.com>

* Log a warning if using default visibility timeout and migration mode

Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com>
Co-Authored-By: Dennis van der Stelt <dvdstelt@gmail.com>

* Define required variables for tests needing fixed infrastructure


Co-Authored-By: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com>
Co-Authored-By: Dennis van der Stelt <dvdstelt@gmail.com>

* typofixes and renamings

Co-authored-by: Tomasz Masternak <1092707+tmasternak@users.noreply.github.com>
Co-authored-by: Dennis van der Stelt <dvdstelt@gmail.com>
Co-authored-by: Tomek Masternak <tomasz.masternak@particular.net>
  • Loading branch information
4 people authored Sep 16, 2021
1 parent 907c410 commit 03f9a61
Show file tree
Hide file tree
Showing 36 changed files with 1,679 additions and 70 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ jobs:
- name: Run Windows tests
if: matrix.name == 'Windows'
run: dotnet test src --configuration Release --no-build --logger "GitHubActions;report-warnings=false"
env:
NServiceBus_AmazonSQS_AT_CustomFixedNamePrefix: CiWinATT
- name: Run Linux tests
if: matrix.name == 'Linux'
run: dotnet test src --configuration Release --no-build --framework netcoreapp3.1 --logger "GitHubActions;report-warnings=false"
env:
NServiceBus_AmazonSQS_AT_CustomFixedNamePrefix: CiNixATT
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,22 @@ The transport can be configured using the following environment variables:

The names of queues used by the acceptance tests take the following form:

AT<datetime>-<pre-truncated-queue-name>
AT<sanitized-guid>-<pre-truncated-queue-name>

Where

* `AT` stands for "Acceptance Test"
* `datetime` is a date and time as yyyyMMddHHmmss that uniquely identifies a single test run. For example, when 100 tests are executed in a single test run each queue will have the same datetime timestamp.
* `sanitized-guid` is a GUID converted to a base 64 string from which invalid characters are removed. For example, when 100 tests are executed in a single test run each queue will have the same `sanitized-guid`.
* `pre-truncated-queue-name` is the name of the queue, "pre-truncated" (characters are removed from the beginning) so that the entire queue name is 80 characters or less.

_Note:_

Some tests generate high load and require a fixed queue naming scheme to prevent policy propagation in the cluster to affect the test execution. The fixed resources prefix can be customized by using the `NServiceBus_AmazonSQS_AT_CustomFixedNamePrefix` environment variable. If not specified an exception will be thrown. For these tests the resources name schema is:

<name-prefix>-<optional-test-case-sequence>-<pre-truncated-queue-name>

Where `optional-test-case-sequence` is an optional integer value specified by tests using the NUnit test case feature to run the same test multiple times with different input values.

This scheme accomplishes the following goals:

* Test runs are idempotent - each test run uses its own set of queues
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,15 @@ public Task Configure(string endpointName, EndpointConfiguration configuration,

ApplyMappingsToSupportMultipleInheritance(endpointName, transportConfig);

settings.TestExecutionTimeout = TimeSpan.FromSeconds(120);
configuration.RegisterComponents(c => c.ConfigureComponent<TestIndependenceMutator>(DependencyLifecycle.SingleInstance));
configuration.Pipeline.Register("TestIndependenceBehavior", typeof(TestIndependenceSkipBehavior), "Skips messages not created during the current test.");

if (settings.TestExecutionTimeout == null)
{
//If it's not null it means it has been set to a custom
//value in the test. We don't want to overwrite that
settings.TestExecutionTimeout = TimeSpan.FromSeconds(120);
}

return Task.FromResult(0);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
namespace NServiceBus.AcceptanceTests.NativePubSub.HybridModeRateLimit
{
using System;

public class TestCase
{
public TestCase(int sequence) => Sequence = sequence;

public int NumberOfEvents { get; internal set; }
public TimeSpan? TestExecutionTimeout { get; internal set; }
public int MessageVisibilityTimeout { get; internal set; } = DefaultMessageVisibilityTimeout;
public TimeSpan SubscriptionsCacheTTL { get; internal set; } = DefaultSubscriptionCacheTTL;
public TimeSpan NotFoundTopicsCacheTTL { get; internal set; } = DefaultTopicCacheTTL;
public int Sequence { get; }

public override string ToString() => $"#{Sequence}, " +
$"{nameof(NumberOfEvents)}: {NumberOfEvents}, " +
$"{nameof(MessageVisibilityTimeout)}: {(MessageVisibilityTimeout == DefaultMessageVisibilityTimeout ? "default" : MessageVisibilityTimeout.ToString())}, " +
$"{nameof(TestExecutionTimeout)}: {TestExecutionTimeout?.ToString() ?? "default"}, " +
$"{nameof(SubscriptionsCacheTTL)}: {(SubscriptionsCacheTTL == DefaultSubscriptionCacheTTL ? "default" : SubscriptionsCacheTTL.ToString())}, " +
$"{nameof(NotFoundTopicsCacheTTL)}: {(NotFoundTopicsCacheTTL == DefaultTopicCacheTTL ? "default" : NotFoundTopicsCacheTTL.ToString())}";

static TimeSpan DefaultSubscriptionCacheTTL = TimeSpan.FromSeconds(5);
static TimeSpan DefaultTopicCacheTTL = TimeSpan.FromSeconds(5);
static int DefaultMessageVisibilityTimeout = 30;

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
namespace NServiceBus.AcceptanceTests.NativePubSub.HybridModeRateLimit
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using AcceptanceTesting;
using EndpointTemplates;
using NServiceBus.Configuration.AdvancedExtensibility;
using NServiceBus.Features;
using NServiceBus.Routing.MessageDrivenSubscriptions;
using NUnit.Framework;
using Conventions = AcceptanceTesting.Customization.Conventions;

public class When_publishing_one_event_type_to_native_and_non_native_subscribers_in_a_loop : NServiceBusAcceptanceTest
{
static TestCase[] TestCases =
{
new TestCase(1){ NumberOfEvents = 1 },
new TestCase(2){ NumberOfEvents = 100 },
new TestCase(3){ NumberOfEvents = 300, SubscriptionsCacheTTL = TimeSpan.FromMinutes(1) },
new TestCase(4){ NumberOfEvents = 1000, TestExecutionTimeout = TimeSpan.FromMinutes(4), SubscriptionsCacheTTL = TimeSpan.FromMinutes(1), NotFoundTopicsCacheTTL = TimeSpan.FromMinutes(1) },
};

[Test, UseFixedNamePrefix, TestCaseSource(nameof(TestCases))]
public async Task Should_not_rate_exceed(TestCase testCase)
{
SetupFixture.AppendSequenceToNamePrefix(testCase.Sequence);

var context = await Scenario.Define<Context>()
.WithEndpoint<Publisher>(b =>
{
b.CustomConfig(config =>
{
#pragma warning disable CS0618
var migrationMode = config.ConfigureSqsTransport().EnableMessageDrivenPubSubCompatibilityMode();
migrationMode.SubscriptionsCacheTTL(testCase.SubscriptionsCacheTTL);
migrationMode.TopicCacheTTL(testCase.NotFoundTopicsCacheTTL);
#pragma warning restore CS0618
});

b.When(c => c.SubscribedMessageDriven && c.SubscribedNative, (session, ctx) =>
{
var sw = Stopwatch.StartNew();
var tasks = new List<Task>();
for (int i = 0; i < testCase.NumberOfEvents; i++)
{
tasks.Add(session.Publish(new MyEvent()));
}
_ = Task.WhenAll(tasks).ContinueWith(t =>
{
sw.Stop();
ctx.PublishTime = sw.Elapsed;
});
return Task.FromResult(0);
});
})
.WithEndpoint<NativePubSubSubscriber>(b =>
{
b.When((_, ctx) =>
{
ctx.SubscribedNative = true;
return Task.FromResult(0);
});
})
.WithEndpoint<MessageDrivenPubSubSubscriber>(b =>
{
b.When((session, ctx) => session.Subscribe<MyEvent>());
})
.Done(c => c.NativePubSubSubscriberReceivedEventsCount == testCase.NumberOfEvents
&& c.MessageDrivenPubSubSubscriberReceivedEventsCount == testCase.NumberOfEvents)
.Run(testCase.TestExecutionTimeout);

Assert.AreEqual(testCase.NumberOfEvents, context.MessageDrivenPubSubSubscriberReceivedEventsCount);
Assert.AreEqual(testCase.NumberOfEvents, context.NativePubSubSubscriberReceivedEventsCount);
}

public class Context : ScenarioContext
{
int nativePubSubSubscriberReceivedEventsCount;
public int NativePubSubSubscriberReceivedEventsCount => nativePubSubSubscriberReceivedEventsCount;
public void IncrementNativePubSubSubscriberReceivedEventsCount()
{
Interlocked.Increment(ref nativePubSubSubscriberReceivedEventsCount);
}
int messageDrivenPubSubSubscriberReceivedEventsCount;
public int MessageDrivenPubSubSubscriberReceivedEventsCount => messageDrivenPubSubSubscriberReceivedEventsCount;
public void IncrementMessageDrivenPubSubSubscriberReceivedEventsCount()
{
Interlocked.Increment(ref messageDrivenPubSubSubscriberReceivedEventsCount);
}
public bool SubscribedMessageDriven { get; set; }
public bool SubscribedNative { get; set; }
public TimeSpan PublishTime { get; set; }
}

public class Publisher : EndpointConfigurationBuilder
{
public Publisher()
{
EndpointSetup<DefaultPublisher>(c =>
{
var subscriptionStorage = new TestingInMemorySubscriptionStorage();
c.UsePersistence<TestingInMemoryPersistence, StorageType.Subscriptions>().UseStorage(subscriptionStorage);

c.OnEndpointSubscribed<Context>((s, context) =>
{
if (s.SubscriberEndpoint.Contains(Conventions.EndpointNamingConvention(typeof(MessageDrivenPubSubSubscriber))))
{
context.SubscribedMessageDriven = true;
}
});
}).IncludeType<TestingInMemorySubscriptionPersistence>();
}
}

public class NativePubSubSubscriber : EndpointConfigurationBuilder
{
public NativePubSubSubscriber()
{
EndpointSetup<DefaultServer>(c => { });
}

public class MyEventMessageHandler : IHandleMessages<MyEvent>
{
Context testContext;

public MyEventMessageHandler(Context testContext)
{
this.testContext = testContext;
}

public Task Handle(MyEvent @event, IMessageHandlerContext context)
{
testContext.IncrementNativePubSubSubscriberReceivedEventsCount();
return Task.FromResult(0);
}
}
}

public class MessageDrivenPubSubSubscriber : EndpointConfigurationBuilder
{
public MessageDrivenPubSubSubscriber()
{
EndpointSetup<DefaultServer>(c =>
{
c.DisableFeature<AutoSubscribe>();
c.GetSettings().Set("NServiceBus.AmazonSQS.DisableNativePubSub", true);
c.GetSettings().GetOrCreate<Publishers>().AddOrReplacePublishers("LegacyConfig", new List<PublisherTableEntry>
{
new PublisherTableEntry(typeof(MyEvent), PublisherAddress.CreateFromEndpointName(Conventions.EndpointNamingConvention(typeof(Publisher))))
});
},
metadata => metadata.RegisterPublisherFor<MyEvent>(typeof(Publisher)));
}

public class MyEventMessageHandler : IHandleMessages<MyEvent>
{
Context testContext;

public MyEventMessageHandler(Context testContext)
{
this.testContext = testContext;
}

public Task Handle(MyEvent @event, IMessageHandlerContext context)
{
testContext.IncrementMessageDrivenPubSubSubscriberReceivedEventsCount();
return Task.FromResult(0);
}
}
}

public class MyEvent : IEvent
{
}
}
}
Loading

0 comments on commit 03f9a61

Please sign in to comment.