diff --git a/Brighter.sln b/Brighter.sln index 3f87ccf87..0403e9fd2 100644 --- a/Brighter.sln +++ b/Brighter.sln @@ -329,6 +329,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.ServiceAc EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.Sqlite.Dapper", "src\Paramore.Brighter.Sqlite.Dapper\Paramore.Brighter.Sqlite.Dapper.csproj", "{3384FBF0-5DCB-452D-8288-FAD1D0023089}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.Locking.Azure", "Paramore.Brighter.Locking.Azure\Paramore.Brighter.Locking.Azure.csproj", "{021F3B51-A640-4C0D-9B47-FB4E32DF6715}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -1875,6 +1877,18 @@ Global {3384FBF0-5DCB-452D-8288-FAD1D0023089}.Release|Mixed Platforms.Build.0 = Release|Any CPU {3384FBF0-5DCB-452D-8288-FAD1D0023089}.Release|x86.ActiveCfg = Release|Any CPU {3384FBF0-5DCB-452D-8288-FAD1D0023089}.Release|x86.Build.0 = Release|Any CPU + {021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Debug|Any CPU.Build.0 = Debug|Any CPU + {021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU + {021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU + {021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Debug|x86.ActiveCfg = Debug|Any CPU + {021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Debug|x86.Build.0 = Debug|Any CPU + {021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Release|Any CPU.ActiveCfg = Release|Any CPU + {021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Release|Any CPU.Build.0 = Release|Any CPU + {021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU + {021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Release|Mixed Platforms.Build.0 = Release|Any CPU + {021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Release|x86.ActiveCfg = Release|Any CPU + {021F3B51-A640-4C0D-9B47-FB4E32DF6715}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/Paramore.Brighter.Locking.Azure/AzureBlobLockingProvider.cs b/Paramore.Brighter.Locking.Azure/AzureBlobLockingProvider.cs new file mode 100644 index 000000000..94c3d1d82 --- /dev/null +++ b/Paramore.Brighter.Locking.Azure/AzureBlobLockingProvider.cs @@ -0,0 +1,106 @@ +using Azure; +using Azure.Storage.Blobs; +using Azure.Storage.Blobs.Specialized; +using Microsoft.Extensions.Logging; +using Paramore.Brighter.Logging; + +namespace Paramore.Brighter.Locking.Azure; + +public class AzureBlobLockingProvider(AzureBlobLockingProviderOptions options) : IDistributedLock +{ + private readonly BlobContainerClient _containerClient = new BlobContainerClient(options.BlobContainerUri, options.TokenCredential); + private readonly ILogger _logger = ApplicationLogging.CreateLogger(); + + private readonly Dictionary _leases = new Dictionary(); + + public async Task ObtainLockAsync(string resource, CancellationToken cancellationToken) + { + var client = GetBlobClient(resource); + + // Write if does not exist + if (!await client.ExistsAsync(cancellationToken)) + { + await using var emptyStream = new MemoryStream(); + await using var writer = new StreamWriter(emptyStream); + await writer.WriteAsync(string.Empty); + await writer.FlushAsync(cancellationToken); + emptyStream.Position = 0; + await client.UploadAsync(emptyStream, cancellationToken: cancellationToken); + } + + try + { + var response = await client.GetBlobLeaseClient().AcquireAsync(options.LeaseValidity, cancellationToken: cancellationToken); + _leases.Add(NormaliseResourceName(resource), response.Value.LeaseId); + return true; + } + catch (RequestFailedException e) + { + _logger.LogInformation("Could not Acquire Lease on Blob {LockResourceName}", resource); + return false; + } + } + + public bool ObtainLock(string resource) + { + var client = GetBlobClient(resource); + + // Write if does not exist + if (!client.Exists()) + { + using var emptyStream = new MemoryStream(); + using var writer = new StreamWriter(emptyStream); + writer.Write(string.Empty); + writer.Flush(); + emptyStream.Position = 0; + client.Upload(emptyStream); + } + + try + { + var response = client.GetBlobLeaseClient().Acquire(options.LeaseValidity); + _leases.Add(NormaliseResourceName(resource), response.Value.LeaseId); + return true; + } + catch (RequestFailedException e) + { + _logger.LogInformation("Could not Acquire Lease on Blob {LockResourceName}", resource); + return false; + } + } + + public async Task ReleaseLockAsync(string resource, CancellationToken cancellationToken) + { + var client = GetBlobLeaseClientForResource(resource); + if(client == null) + return; + await client.ReleaseAsync(cancellationToken: cancellationToken); + _leases.Remove(NormaliseResourceName(resource)); + } + + public void ReleaseLock(string resource) + { + var client = GetBlobLeaseClientForResource(resource); + if(client == null) + return; + client.Release(); + _leases.Remove(NormaliseResourceName(resource)); + } + + private BlobLeaseClient? GetBlobLeaseClientForResource(string resource) + { + if (_leases.ContainsKey(NormaliseResourceName(resource))) + return GetBlobClient(resource).GetBlobLeaseClient(_leases[NormaliseResourceName(resource)]); + + _logger.LogInformation("No lock found for {LockResourceName}", resource); + return null; + } + + private BlobClient GetBlobClient(string resource) + { + var storageLocation = options.StorageLocationFunc.Invoke(NormaliseResourceName(resource)); + return _containerClient.GetBlobClient(storageLocation); + } + + private static string NormaliseResourceName(string resourceName) => resourceName.ToLower(); +} diff --git a/Paramore.Brighter.Locking.Azure/AzureBlobLockingProviderOptions.cs b/Paramore.Brighter.Locking.Azure/AzureBlobLockingProviderOptions.cs new file mode 100644 index 000000000..df012ca18 --- /dev/null +++ b/Paramore.Brighter.Locking.Azure/AzureBlobLockingProviderOptions.cs @@ -0,0 +1,29 @@ +using Azure.Core; + +namespace Paramore.Brighter.Locking.Azure; + +public class AzureBlobLockingProviderOptions( + Uri blobContainerUri, + TokenCredential tokenCredential + ) +{ + /// + /// The URI of the blob container + /// + public Uri BlobContainerUri { get; init; } = blobContainerUri; + + /// + /// The Credential to use when writing blobs + /// + public TokenCredential TokenCredential { get; init; } = tokenCredential; + + /// + /// The amount of time before the lease automatically expires + /// + public TimeSpan LeaseValidity { get; init; } = TimeSpan.FromMinutes(1); + + /// + /// The function to provide the location to store the locks inside of the Blob container + /// + public Func StorageLocationFunc = (resource) => $"lock-{resource}"; +} diff --git a/Paramore.Brighter.Locking.Azure/Paramore.Brighter.Locking.Azure.csproj b/Paramore.Brighter.Locking.Azure/Paramore.Brighter.Locking.Azure.csproj new file mode 100644 index 000000000..df89beb58 --- /dev/null +++ b/Paramore.Brighter.Locking.Azure/Paramore.Brighter.Locking.Azure.csproj @@ -0,0 +1,17 @@ + + + + net8.0 + enable + enable + + + + + + + + + + + diff --git a/src/Paramore.Brighter.Extensions.DependencyInjection/ServiceCollectionExtensions.cs b/src/Paramore.Brighter.Extensions.DependencyInjection/ServiceCollectionExtensions.cs index ae3da8c64..d6553e6e0 100644 --- a/src/Paramore.Brighter.Extensions.DependencyInjection/ServiceCollectionExtensions.cs +++ b/src/Paramore.Brighter.Extensions.DependencyInjection/ServiceCollectionExtensions.cs @@ -206,6 +206,10 @@ public static IBrighterBuilder UseExternalBus( brighterBuilder.Services.Add(asyncOutboxdescriptor); } } + + // If no distributed locking service is added, then add the in memory variant + var distributedLock = busConfiguration.DistributedLock ?? new InMemoryLock(); + brighterBuilder.Services.AddSingleton(distributedLock); if (busConfiguration.UseRpc) { diff --git a/src/Paramore.Brighter.Extensions.Hosting/TimedOutboxArchiver.cs b/src/Paramore.Brighter.Extensions.Hosting/TimedOutboxArchiver.cs index f35c6abac..4545426df 100644 --- a/src/Paramore.Brighter.Extensions.Hosting/TimedOutboxArchiver.cs +++ b/src/Paramore.Brighter.Extensions.Hosting/TimedOutboxArchiver.cs @@ -1,7 +1,6 @@ using System; using System.Threading; using System.Threading.Tasks; -using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Paramore.Brighter.Logging; @@ -15,17 +14,20 @@ public class TimedOutboxArchiver : IHostedService, IDisp private static readonly ILogger s_logger = ApplicationLogging.CreateLogger(); private readonly IAmAnOutbox _outbox; private readonly IAmAnArchiveProvider _archiveProvider; + private readonly IDistributedLock _distributedLock; private Timer _timer; - private static readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1); + private const string LockingResourceName = "Archiver"; public TimedOutboxArchiver( IAmAnOutbox outbox, IAmAnArchiveProvider archiveProvider, + IDistributedLock distributedLock, TimedOutboxArchiverOptions options) { _outbox = outbox; _archiveProvider = archiveProvider; + _distributedLock = distributedLock; _options = options; } @@ -33,7 +35,8 @@ public Task StartAsync(CancellationToken cancellationToken) { s_logger.LogInformation("Outbox Archiver Service is starting."); - _timer = new Timer(async (e) => await Archive(e, cancellationToken), null, TimeSpan.Zero, TimeSpan.FromSeconds(_options.TimerInterval)); + _timer = new Timer(async (e) => await Archive(e, cancellationToken), null, TimeSpan.Zero, + TimeSpan.FromSeconds(_options.TimerInterval)); return Task.CompletedTask; } @@ -54,7 +57,7 @@ public void Dispose() private async Task Archive(object state, CancellationToken cancellationToken) { - if (await _semaphore.WaitAsync(TimeSpan.Zero, cancellationToken)) + if (await _distributedLock.ObtainLockAsync(LockingResourceName, cancellationToken)) { s_logger.LogInformation("Outbox Archiver looking for messages to Archive"); try @@ -72,7 +75,7 @@ private async Task Archive(object state, CancellationToken cancellationToken) } finally { - _semaphore.Release(); + await _distributedLock.ReleaseLockAsync(LockingResourceName, cancellationToken); } s_logger.LogInformation("Outbox Sweeper sleeping"); diff --git a/src/Paramore.Brighter.Extensions.Hosting/TimedOutboxSweeper.cs b/src/Paramore.Brighter.Extensions.Hosting/TimedOutboxSweeper.cs index 4fd3e7e21..575d3ca43 100644 --- a/src/Paramore.Brighter.Extensions.Hosting/TimedOutboxSweeper.cs +++ b/src/Paramore.Brighter.Extensions.Hosting/TimedOutboxSweeper.cs @@ -14,15 +14,18 @@ namespace Paramore.Brighter.Extensions.Hosting public class TimedOutboxSweeper : IHostedService, IDisposable { private readonly IServiceScopeFactory _serviceScopeFactory; + private readonly IDistributedLock _distributedLock; private readonly TimedOutboxSweeperOptions _options; private static readonly ILogger s_logger= ApplicationLogging.CreateLogger(); private Timer _timer; - //private Timer _timer; - - public TimedOutboxSweeper (IServiceScopeFactory serviceScopeFactory, TimedOutboxSweeperOptions options) + private const string LockingResourceName = "OutboxSweeper"; + + public TimedOutboxSweeper(IServiceScopeFactory serviceScopeFactory, IDistributedLock distributedLock, + TimedOutboxSweeperOptions options) { _serviceScopeFactory = serviceScopeFactory; + _distributedLock = distributedLock; _options = options; } @@ -42,36 +45,44 @@ public Task StartAsync(CancellationToken cancellationToken) private void OnElapsed(object sender, ElapsedEventArgs elapsedEventArgs) { - s_logger.LogInformation("Outbox Sweeper looking for unsent messages"); - - var scope = _serviceScopeFactory.CreateScope(); - try + if (_distributedLock.ObtainLock(LockingResourceName)) { - IAmACommandProcessor commandProcessor = scope.ServiceProvider.GetService(); + s_logger.LogInformation("Outbox Sweeper looking for unsent messages"); - var outBoxSweeper = new OutboxSweeper( - millisecondsSinceSent: _options.MinimumMessageAge, - commandProcessor: commandProcessor, - _options.BatchSize, - _options.UseBulk, - _options.Args); + var scope = _serviceScopeFactory.CreateScope(); + try + { + IAmACommandProcessor commandProcessor = scope.ServiceProvider.GetService(); - if (_options.UseBulk) - outBoxSweeper.SweepAsyncOutbox(); - else - outBoxSweeper.Sweep(); - } - catch (Exception e) - { - s_logger.LogError(e, "Error while sweeping the outbox."); - throw; + var outBoxSweeper = new OutboxSweeper( + millisecondsSinceSent: _options.MinimumMessageAge, + commandProcessor: commandProcessor, + _options.BatchSize, + _options.UseBulk, + _options.Args); + + if (_options.UseBulk) + outBoxSweeper.SweepAsyncOutbox(); + else + outBoxSweeper.Sweep(); + } + catch (Exception e) + { + s_logger.LogError(e, "Error while sweeping the outbox."); + throw; + } + finally + { + _distributedLock.ReleaseLock(LockingResourceName); + scope.Dispose(); + ((Timer)sender).Enabled = true; + } } - finally + else { - scope.Dispose(); - ((Timer)sender).Enabled = true; + s_logger.LogWarning("Outbox Sweeper is still running - abandoning attempt."); } - + s_logger.LogInformation("Outbox Sweeper sleeping"); } diff --git a/src/Paramore.Brighter/ExternalBusConfiguration.cs b/src/Paramore.Brighter/ExternalBusConfiguration.cs index 3ce8456c3..219224f34 100644 --- a/src/Paramore.Brighter/ExternalBusConfiguration.cs +++ b/src/Paramore.Brighter/ExternalBusConfiguration.cs @@ -137,6 +137,11 @@ public class ExternalBusConfiguration : IAmExternalBusConfiguration /// The Outbox we wish to use for messaging /// public IAmAnOutbox Outbox { get; set; } + + /// + /// The Distributed Locking Service + /// + public IDistributedLock DistributedLock { get; set; } /// /// The maximum amount of messages to deposit into the outbox in one transmissions. diff --git a/src/Paramore.Brighter/IDistributedLock.cs b/src/Paramore.Brighter/IDistributedLock.cs new file mode 100644 index 000000000..ae9b5c390 --- /dev/null +++ b/src/Paramore.Brighter/IDistributedLock.cs @@ -0,0 +1,37 @@ +using System.Threading; +using System.Threading.Tasks; + +namespace Paramore.Brighter; + +public interface IDistributedLock +{ + /// + /// Attempt to obtain a lock on a resource + /// + /// The name of the resource to Lock + /// The Cancellation Token + /// True if the lock was obtained + Task ObtainLockAsync(string resource, CancellationToken cancellationToken); + + /// + /// Attempt to obtain a lock on a resource + /// + /// The name of the resource to Lock + /// True if the lock was obtained + bool ObtainLock(string resource); + + /// + /// Release a lock + /// + /// + /// + /// Awaitable Task + Task ReleaseLockAsync(string resource, CancellationToken cancellationToken); + + /// + /// Release a lock + /// + /// + /// Awaitable Task + void ReleaseLock(string resource); +} diff --git a/src/Paramore.Brighter/InMemoryLock.cs b/src/Paramore.Brighter/InMemoryLock.cs new file mode 100644 index 000000000..2b09883c2 --- /dev/null +++ b/src/Paramore.Brighter/InMemoryLock.cs @@ -0,0 +1,42 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace Paramore.Brighter; + +public class InMemoryLock : IDistributedLock +{ + private readonly Dictionary _semaphores = new Dictionary(); + + public async Task ObtainLockAsync(string resource, CancellationToken cancellationToken) + { + var normalisedResourceName = resource.ToLower(); + if(!_semaphores.ContainsKey(normalisedResourceName)) + _semaphores.Add(normalisedResourceName, new SemaphoreSlim(1,1)); + + return await _semaphores[normalisedResourceName].WaitAsync(TimeSpan.Zero, cancellationToken); + } + + public bool ObtainLock(string resource) + { + var normalisedResourceName = resource.ToLower(); + if(!_semaphores.ContainsKey(normalisedResourceName)) + _semaphores.Add(normalisedResourceName, new SemaphoreSlim(1,1)); + + return _semaphores[normalisedResourceName].Wait(TimeSpan.Zero); + } + + public Task ReleaseLockAsync(string resource, CancellationToken cancellationToken) + { + ReleaseLock(resource); + return Task.CompletedTask; + } + + public void ReleaseLock(string resource) + { + var normalisedResourceName = resource.ToLower(); + if(_semaphores.TryGetValue(normalisedResourceName, out SemaphoreSlim semaphore)) + semaphore.Release(); + } +} diff --git a/tests/Paramore.Brighter.Azure.Tests/AzureBlobLockingProviderTests.cs b/tests/Paramore.Brighter.Azure.Tests/AzureBlobLockingProviderTests.cs new file mode 100644 index 000000000..dc67cd4c7 --- /dev/null +++ b/tests/Paramore.Brighter.Azure.Tests/AzureBlobLockingProviderTests.cs @@ -0,0 +1,45 @@ +using Azure.Identity; +using Paramore.Brighter.Locking.Azure; + +namespace Paramore.Brighter.Azure.Tests; + +public class AzureBlobLockingProviderTests +{ + private IDistributedLock _blobLocking; + + public AzureBlobLockingProviderTests() + { + var options = new AzureBlobLockingProviderOptions( + new Uri("https://brighterarchivertest.blob.core.windows.net/locking"), new AzureCliCredential()); + + _blobLocking = new AzureBlobLockingProvider(options); + } + + [Test] + public async Task GivenAnAzureBlobLockingProvider_WhenLockIsCalled_ItCanOnlyBeObtainedOnce() + { + var resourceName = $"TestLock-{Guid.NewGuid()}"; + + var firstLock = await _blobLocking.ObtainLockAsync(resourceName, CancellationToken.None); + var secondLock = await _blobLocking.ObtainLockAsync(resourceName, CancellationToken.None); + + Assert.That(firstLock, Is.True); + Assert.That(secondLock, Is.False, "A Lock should not be able to be acquired"); + } + + [Test] + public async Task GivenAnAzureBlobLockingProviderWithALockedBlob_WhenReleaseLockIsCalled_ItCanOnlyBeLockedAgain() + { + var resourceName = $"TestLock-{Guid.NewGuid()}"; + + var firstLock = await _blobLocking.ObtainLockAsync(resourceName, CancellationToken.None); + await _blobLocking.ReleaseLockAsync(resourceName, CancellationToken.None); + var secondLock = await _blobLocking.ObtainLockAsync(resourceName, CancellationToken.None); + var thirdLock = await _blobLocking.ObtainLockAsync(resourceName, CancellationToken.None); + + Assert.That(firstLock, Is.True); + Assert.That(secondLock, Is.True, "A Lock should be able to be acquired"); + Assert.That(thirdLock, Is.False, "A Lock should not be able to be acquired"); + } + +} diff --git a/tests/Paramore.Brighter.Azure.Tests/Paramore.Brighter.Azure.Tests.csproj b/tests/Paramore.Brighter.Azure.Tests/Paramore.Brighter.Azure.Tests.csproj index bb4ac00d8..4d2147722 100644 --- a/tests/Paramore.Brighter.Azure.Tests/Paramore.Brighter.Azure.Tests.csproj +++ b/tests/Paramore.Brighter.Azure.Tests/Paramore.Brighter.Azure.Tests.csproj @@ -25,6 +25,7 @@ + diff --git a/tests/Paramore.Brighter.Core.Tests/Locking/InMemoryLockingProviderTests.cs b/tests/Paramore.Brighter.Core.Tests/Locking/InMemoryLockingProviderTests.cs new file mode 100644 index 000000000..22101ffc3 --- /dev/null +++ b/tests/Paramore.Brighter.Core.Tests/Locking/InMemoryLockingProviderTests.cs @@ -0,0 +1,39 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace Paramore.Brighter.Core.Tests.Locking; + +public class InMemoryLockingProviderTests +{ + private IDistributedLock _locking = new InMemoryLock(); + + + [Fact] + public async Task GivenAnInMemoryLockingProvider_WhenLockIsCalled_ItCanOnlyBeObtainedOnce() + { + var resourceName = $"TestLock-{Guid.NewGuid()}"; + + var firstLock = await _locking.ObtainLockAsync(resourceName, CancellationToken.None); + var secondLock = await _locking.ObtainLockAsync(resourceName, CancellationToken.None); + + Assert.True(firstLock); + Assert.False(secondLock, "A Lock should not be able to be acquired"); + } + + [Fact] + public async Task GivenAnAzureBlobLockingProviderWithALockedBlob_WhenReleaseLockIsCalled_ItCanOnlyBeLockedAgain() + { + var resourceName = $"TestLock-{Guid.NewGuid()}"; + + var firstLock = await _locking.ObtainLockAsync(resourceName, CancellationToken.None); + await _locking.ReleaseLockAsync(resourceName, CancellationToken.None); + var secondLock = await _locking.ObtainLockAsync(resourceName, CancellationToken.None); + var thirdLock = await _locking.ObtainLockAsync(resourceName, CancellationToken.None); + + Assert.True(firstLock); + Assert.True(secondLock, "A Lock should be able to be acquired"); + Assert.False(thirdLock, "A Lock should not be able to be acquired"); + } +}