Skip to content

Commit

Permalink
Allow multiple topics to call the same endpoint (#763)
Browse files Browse the repository at this point in the history
* Allow multiple topics to call the same endpoint

Using the WithTopic builder pattern allows you to specify multiple
topics that route to a single endpoint. Given the topics are unique,
this should be a valid case.

This change allows for a single endpoint to bind to multiple topics.
Note that this can only be done via the EndpointBuilder and not the
TopicAttribute.

#715

* Let TopicAttribute to be specified multiple times

The endpoint builder allowed for multiple WithTopic calls so the
attribute should match that behavior.
  • Loading branch information
halspang committed Oct 26, 2021
1 parent f325863 commit cc1b097
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 12 deletions.
30 changes: 20 additions & 10 deletions src/Dapr.AspNetCore/DaprEndpointRouteBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,25 @@ private static IEndpointConventionBuilder CreateSubscribeEndPoint(IEndpointRoute
var dataSource = context.RequestServices.GetRequiredService<EndpointDataSource>();
var subscriptions = dataSource.Endpoints
.OfType<RouteEndpoint>()
.Where(e => e.Metadata.GetMetadata<ITopicMetadata>()?.Name != null) // only endpoints which have TopicAttribute with not null Name.
.Where(e => e.Metadata.GetOrderedMetadata<ITopicMetadata>().Any(t => t.Name != null)) // only endpoints which have TopicAttribute with not null Name.
.SelectMany(e =>
{
var topicMetadata = e.Metadata.GetOrderedMetadata<ITopicMetadata>();
var subs = new List<(string PubsubName, string Name, bool? EnableRawPayload, string Match, int Priority, RoutePattern RoutePattern)>();
for (int i = 0; i < topicMetadata.Count(); i++)
{
subs.Add((topicMetadata[i].PubsubName,
topicMetadata[i].Name,
(topicMetadata[i] as IRawTopicMetadata)?.EnableRawPayload,
topicMetadata[i].Match,
topicMetadata[i].Priority,
e.RoutePattern));
}
return subs;
})
.Distinct()
.Select(e => (
e.Metadata.GetMetadata<ITopicMetadata>().PubsubName,
e.Metadata.GetMetadata<ITopicMetadata>().Name,
e.Metadata.GetMetadata<IRawTopicMetadata>()?.EnableRawPayload,
e.Metadata.GetMetadata<ITopicMetadata>().Match,
e.Metadata.GetMetadata<ITopicMetadata>().Priority,
e.RoutePattern))
.GroupBy(e => new { e.PubsubName, e.Name })
.Select(e => e.OrderBy(e => e.Priority))
.Select(e =>
Expand All @@ -79,7 +89,7 @@ private static IEndpointConventionBuilder CreateSubscribeEndPoint(IEndpointRoute
if (logger != null)
{
if (defaultRoutes.Count() > 1)
if (defaultRoutes.Count > 1)
{
logger.LogError("A default subscription to topic {name} on pubsub {pubsub} already exists.", first.Name, first.PubsubName);
}
Expand All @@ -105,7 +115,7 @@ private static IEndpointConventionBuilder CreateSubscribeEndPoint(IEndpointRoute
};
// Use the V2 routing rules structure
if (rules.Count() > 0)
if (rules.Count > 0)
{
subscription.Routes = new Routes
{
Expand Down
1 change: 1 addition & 0 deletions src/Dapr.AspNetCore/TopicAttribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace Dapr
/// <summary>
/// TopicAttribute describes an endpoint as a subscriber to a topic.
/// </summary>
[AttributeUsage(AttributeTargets.All, AllowMultiple = true)]
public class TopicAttribute : Attribute, ITopicMetadata, IRawTopicMetadata
{
/// <summary>
Expand Down
13 changes: 13 additions & 0 deletions test/Dapr.AspNetCore.IntegrationTest.App/DaprController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,19 @@ public void TopicEImportant()
{
}

[Topic("pubsub", "F")]
[Topic("pubsub", "F.1", true)]
[HttpPost("/multiTopicAttr")]
public void MultipleTopics()
{
}

[Topic("pubsub", "splitTopicAttr", true)]
[HttpPost("/splitTopics")]
public void SplitTopic()
{
}

[Topic("pubsub", "register-user")]
[HttpPost("/register-user")]
public ActionResult<UserInfo> RegisterUser(UserInfo user)
Expand Down
4 changes: 3 additions & 1 deletion test/Dapr.AspNetCore.IntegrationTest.App/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
endpoints.MapSubscribeHandler();
endpoints.MapControllers();
endpoints.MapPost("/topic-a", context => Task.CompletedTask).WithTopic("testpubsub", "A");
endpoints.MapPost("/topic-a", context => Task.CompletedTask).WithTopic("testpubsub", "A").WithTopic("testpubsub", "A.1");
endpoints.MapPost("/splitTopics", context => Task.CompletedTask).WithTopic("pubsub", "splitTopicBuilder");
endpoints.MapPost("/routingwithstateentry/{widget}", async context =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public async Task SubscribeEndpoint_ReportsTopics()
var json = await JsonSerializer.DeserializeAsync<JsonElement>(stream);

json.ValueKind.Should().Be(JsonValueKind.Array);
json.GetArrayLength().Should().Be(7);
json.GetArrayLength().Should().Be(12);
var subscriptions = new List<(string PubsubName, string Topic, string Route, string rawPayload, string match)>();
foreach (var element in json.EnumerateArray())
{
Expand Down Expand Up @@ -66,6 +66,7 @@ public async Task SubscribeEndpoint_ReportsTopics()
}

subscriptions.Should().Contain(("testpubsub", "A", "topic-a", string.Empty, string.Empty));
subscriptions.Should().Contain(("testpubsub", "A.1", "topic-a", string.Empty, string.Empty));
subscriptions.Should().Contain(("pubsub", "B", "B", string.Empty, string.Empty));
subscriptions.Should().Contain(("custom-pubsub", "custom-C", "C", string.Empty, string.Empty));
subscriptions.Should().Contain(("pubsub", "register-user", "register-user", string.Empty, string.Empty));
Expand All @@ -74,6 +75,10 @@ public async Task SubscribeEndpoint_ReportsTopics()
subscriptions.Should().Contain(("pubsub", "E", "E", string.Empty, string.Empty));
subscriptions.Should().Contain(("pubsub", "E", "E-Critical", string.Empty, "event.type == \"critical\""));
subscriptions.Should().Contain(("pubsub", "E", "E-Important", string.Empty, "event.type == \"important\""));
subscriptions.Should().Contain(("pubsub", "F", "multiTopicAttr", string.Empty, string.Empty));
subscriptions.Should().Contain(("pubsub", "F.1", "multiTopicAttr", "true", string.Empty));
subscriptions.Should().Contain(("pubsub", "splitTopicBuilder", "splitTopics", string.Empty, string.Empty));
subscriptions.Should().Contain(("pubsub", "splitTopicAttr", "splitTopics", "true", string.Empty));

// Test priority route sorting
var eTopic = subscriptions.FindAll(e => e.Topic == "E");
Expand Down

0 comments on commit cc1b097

Please sign in to comment.