Skip to content

Commit

Permalink
added client caching
Browse files Browse the repository at this point in the history
  • Loading branch information
mizrael committed Jul 9, 2024
1 parent d167de4 commit 371ee28
Show file tree
Hide file tree
Showing 10 changed files with 219 additions and 19 deletions.
1 change: 1 addition & 0 deletions src/AzureStorageTopics.Function/host.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
},
"extensions": {
"storageTopics": {
"cacheClient": true,
"topics": {
"MyTopic": {
"subscriptions": [
Expand Down
4 changes: 3 additions & 1 deletion src/AzureStorageTopics/AzureStorageTopicsStartup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public void Configure(IWebJobsBuilder builder)
builder.Services
.Configure<TopicsConfig>(config.GetSection("AzureFunctionsJobHost:extensions:storageTopics"))

Check warning on line 21 in src/AzureStorageTopics/AzureStorageTopicsStartup.cs

View workflow job for this annotation

GitHub Actions / build

Dereference of a possibly null reference.

Check warning on line 21 in src/AzureStorageTopics/AzureStorageTopicsStartup.cs

View workflow job for this annotation

GitHub Actions / build

Dereference of a possibly null reference.
.AddSingleton<IValidateOptions<TopicsConfig>, TopicsConfigValidator>()
.AddSingleton<ISubscriptionFactory, SubscriptionFactory>()
.AddSingleton<IConnectionStringProvider>(ctx =>
{
var config = ctx.GetRequiredService<IConfiguration>();
Expand All @@ -28,7 +29,8 @@ public void Configure(IWebJobsBuilder builder)
.AddSingleton<ISubscriptionsProvider, SubscriptionsProvider>(ctx =>
{
var topicsConfig = ctx.GetRequiredService<IOptions<TopicsConfig>>();
return new SubscriptionsProvider(topicsConfig.Value);
var factory = ctx.GetRequiredService<ISubscriptionFactory>();
return new SubscriptionsProvider(topicsConfig.Value, factory);
});

builder.AddExtension<StorageTopicConfigProvider>();
Expand Down
11 changes: 11 additions & 0 deletions src/AzureStorageTopics/ISubscriptionFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using Azure.Storage.Queues;
using System.Threading;
using System.Threading.Tasks;

namespace AzureStorageTopics
{
public interface ISubscriptionFactory
{
ValueTask<QueueClient> CreateAsync(string topicName, string subscriptionName, string connectionString, bool useCache = true, CancellationToken cancellationToken = default);
}
}
49 changes: 49 additions & 0 deletions src/AzureStorageTopics/SubscriptionFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
using Azure.Storage.Queues;
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace AzureStorageTopics
{
internal sealed class SubscriptionFactory : ISubscriptionFactory
{
private static readonly ConcurrentDictionary<string, Task<QueueClient>> _cache = new ConcurrentDictionary<string, Task<QueueClient>>();

public async ValueTask<QueueClient> CreateAsync(
string topicName,
string subscriptionName,
string connectionString,
bool useCache = true,
CancellationToken cancellationToken = default)
{
string queueName = BuildQueueName(topicName, subscriptionName);

if (useCache)
{
var clientFactory = _cache.GetOrAdd(queueName, _ => CreateClient(connectionString, queueName, cancellationToken));
var client = await clientFactory.ConfigureAwait(false);
return client;
}

var queueClient = await CreateClient(connectionString, queueName, cancellationToken).ConfigureAwait(false);

return queueClient;
}

private static async Task<QueueClient> CreateClient(string connectionString, string queueName, CancellationToken cancellationToken)
{
var queueClient = new QueueClient(
connectionString,
queueName);
await queueClient.CreateIfNotExistsAsync(cancellationToken: cancellationToken)
.ConfigureAwait(false);
return queueClient;
}

private static string BuildQueueName(string topicName, string subscriptionName)
{
return $"{topicName}-{subscriptionName}".ToLower();
}
}
}
26 changes: 16 additions & 10 deletions src/AzureStorageTopics/SubscriptionsProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ namespace AzureStorageTopics
internal sealed class SubscriptionsProvider : ISubscriptionsProvider
{
private readonly TopicsConfig _topicsConfig;
private readonly ISubscriptionFactory _subscriptionFactory;

public SubscriptionsProvider(TopicsConfig topicsConfig)
public SubscriptionsProvider(TopicsConfig topicsConfig, ISubscriptionFactory subscriptionFactory)
{
_topicsConfig = topicsConfig ?? throw new System.ArgumentNullException(nameof(topicsConfig));
_subscriptionFactory = subscriptionFactory ?? throw new System.ArgumentNullException(nameof(subscriptionFactory));
}

public async ValueTask<IEnumerable<QueueClient>> GetSubscriptionsAsync(
Expand All @@ -29,21 +31,25 @@ public async ValueTask<IEnumerable<QueueClient>> GetSubscriptionsAsync(
throw new System.ArgumentException($"'{nameof(connectionString)}' cannot be null or whitespace.", nameof(connectionString));
}

if (!_topicsConfig.Topics.TryGetValue(topicName, out var config))
throw new KeyNotFoundException($"invalid topic name: {topicName}");
if (_topicsConfig.Topics == null || !_topicsConfig.Topics.TryGetValue(topicName, out var config) || config == null)
{
throw new KeyNotFoundException($"invalid topic name: {topicName}");
}

if (config.Subscriptions is null)
if (config.Subscriptions is null || config.Subscriptions.Length == 0)
{
throw new System.InvalidOperationException($"no subscriptions found for topic: {topicName}");
}

var queues = new List<QueueClient>(config.Subscriptions.Length);
foreach(var subscription in config.Subscriptions)
{
var queueName = $"{topicName}-{subscription.Name}".ToLower();
var queueClient = new QueueClient(
connectionString,
queueName);
await queueClient.CreateIfNotExistsAsync(cancellationToken: cancellationToken)
.ConfigureAwait(false);
var queueClient = await _subscriptionFactory.CreateAsync(
topicName,
subscription.Name,
connectionString,
useCache: _topicsConfig.CacheClient,
cancellationToken: cancellationToken).ConfigureAwait(false);
queues.Add(queueClient);
}

Expand Down
10 changes: 9 additions & 1 deletion src/AzureStorageTopics/TopicsConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ namespace AzureStorageTopics
{
internal sealed class TopicsConfig
{
public Dictionary<string, TopicConfig> Topics { get; set; }
public TopicsConfig()
{
Topics = new Dictionary<string, TopicConfig>();
CacheClient = true;
}

public Dictionary<string, TopicConfig> Topics { get; }

public bool CacheClient { get; set; }
}
}
8 changes: 4 additions & 4 deletions src/AzureStorageTopics/TopicsConfigValidator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ public ValidateOptionsResult Validate(string? name, TopicsConfig options)
return ValidateOptionsResult.Fail($"The topic '{key}' does not contain any subscriptions.");
}

var topicsDict = new HashSet<string>();
var subsDic = new HashSet<string>();
foreach (var subscription in topic.Subscriptions)
{
if (topicsDict.Contains(subscription.Name))
if (subsDic.Contains(subscription.Name))
{
return ValidateOptionsResult.Fail($"The topic '{key}' contains a duplicate subscription '{subscription}'.");
return ValidateOptionsResult.Fail($"The topic '{key}' contains a duplicate subscription '{subscription.Name}'.");
}

topicsDict.Add(subscription.Name);
subsDic.Add(subscription.Name);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
using NSubstitute;
using AzureStorageTopics;
using Azure.Storage.Queues;
using NSubstitute.Extensions;
using NSubstitute;

namespace AzureStorageTopics.Tests
{
Expand Down
50 changes: 50 additions & 0 deletions tests/AzureStorageTopics.Tests/SubscriptionsProviderTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
using FluentAssertions;
using NSubstitute;

namespace AzureStorageTopics.Tests
{
public class SubscriptionsProviderTests
{
[Fact]
public async Task GetSubscriptionsAsync_should_fail_when_topic_name_null()
{
var factory = Substitute.For<ISubscriptionFactory>();
var sut = new SubscriptionsProvider(new TopicsConfig(), factory);
Func<Task> act = async () => await sut.GetSubscriptionsAsync(null, "connectionString");

Check warning on line 13 in tests/AzureStorageTopics.Tests/SubscriptionsProviderTests.cs

View workflow job for this annotation

GitHub Actions / build

Cannot convert null literal to non-nullable reference type.
await act.Should().ThrowAsync<ArgumentException>()
.WithMessage("'topicName' cannot be null or whitespace. (Parameter 'topicName')");
}

[Fact]
public async Task GetSubscriptionsAsync_should_fail_when_connection_string_null()
{
var factory = Substitute.For<ISubscriptionFactory>();
var sut = new SubscriptionsProvider(new TopicsConfig(), factory);
Func<Task> act = async () => await sut.GetSubscriptionsAsync("topicName", null);

Check warning on line 23 in tests/AzureStorageTopics.Tests/SubscriptionsProviderTests.cs

View workflow job for this annotation

GitHub Actions / build

Cannot convert null literal to non-nullable reference type.
await act.Should().ThrowAsync<ArgumentException>()
.WithMessage("'connectionString' cannot be null or whitespace. (Parameter 'connectionString')");
}

[Fact]
public async Task GetSubscriptionsAsync_should_fail_when_invalid_topic_name()
{
var factory = Substitute.For<ISubscriptionFactory>();
var sut = new SubscriptionsProvider(new TopicsConfig(), factory);
Func<Task> act = async () => await sut.GetSubscriptionsAsync("topicName", "connectionString");
await act.Should().ThrowAsync<KeyNotFoundException>()
.WithMessage("invalid topic name: topicName");
}

[Fact]
public async Task GetSubscriptionsAsync_should_fail_when_no_subscriptions()
{
var config = new TopicsConfig();
config.Topics["topicName"] = new TopicConfig();
var factory = Substitute.For<ISubscriptionFactory>();
var sut = new SubscriptionsProvider(config, factory);
Func<Task> act = async () => await sut.GetSubscriptionsAsync("topicName", "connectionString");
await act.Should().ThrowAsync<InvalidOperationException>()
.WithMessage("no subscriptions found for topic: topicName");
}
}
}
75 changes: 75 additions & 0 deletions tests/AzureStorageTopics.Tests/TopicsConfigValidatorTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
using FluentAssertions;

namespace AzureStorageTopics.Tests
{
public class TopicsConfigValidatorTests
{
[Fact]
public void Validate_should_fail_when_input_null()
{
var sut = new TopicsConfigValidator();
var result = sut.Validate(null, null);

Check warning on line 11 in tests/AzureStorageTopics.Tests/TopicsConfigValidatorTests.cs

View workflow job for this annotation

GitHub Actions / build

Cannot convert null literal to non-nullable reference type.

Check warning on line 11 in tests/AzureStorageTopics.Tests/TopicsConfigValidatorTests.cs

View workflow job for this annotation

GitHub Actions / build

Cannot convert null literal to non-nullable reference type.
result.Failed.Should().BeTrue();
result.FailureMessage.Should().Be("The configuration is null.");
}

[Fact]
public void Validate_should_fail_when_no_topics_provided()
{
var config = new TopicsConfig();
var sut = new TopicsConfigValidator();
var result = sut.Validate(null, config);
result.Failed.Should().BeTrue();
result.FailureMessage.Should().Be("The configuration does not contain any topics.");
}

[Fact]
public void Validate_should_fail_when_no_subscriptions_provided()
{
var config = new TopicsConfig();
config.Topics["topic1"] = new TopicConfig();

var sut = new TopicsConfigValidator();
var result = sut.Validate(null, config);
result.Failed.Should().BeTrue();
result.FailureMessage.Should().Be("The topic 'topic1' does not contain any subscriptions.");
}

[Fact]
public void Validate_should_fail_when_subscriptions_duplicated()
{
var config = new TopicsConfig();
config.Topics["topic1"] = new TopicConfig()
{
Subscriptions = new[]
{
new SubscriptionConfig() { Name = "sub1" },
new SubscriptionConfig() { Name = "sub1" }
}
};

var sut = new TopicsConfigValidator();
var result = sut.Validate(null, config);
result.Failed.Should().BeTrue();
result.FailureMessage.Should().Be("The topic 'topic1' contains a duplicate subscription 'sub1'.");
}

[Fact]
public void Validate_should_succeed_when_input_valid()
{
var config = new TopicsConfig();
config.Topics["topic1"] = new TopicConfig()
{
Subscriptions = new[]
{
new SubscriptionConfig() { Name = "sub1" },
new SubscriptionConfig() { Name = "sub2" }
}
};

var sut = new TopicsConfigValidator();
var result = sut.Validate(null, config);
result.Failed.Should().BeFalse();
}
}
}

0 comments on commit 371ee28

Please sign in to comment.