diff --git a/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt index 98eb481b..cf089bd4 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt @@ -658,7 +658,6 @@ RabbitMQ.Stream.Client.Reliable.ProducerConfig.MessagesBufferSize.get -> int RabbitMQ.Stream.Client.Reliable.ProducerConfig.MessagesBufferSize.set -> void RabbitMQ.Stream.Client.Reliable.ProducerConfig.ProducerConfig(RabbitMQ.Stream.Client.StreamSystem streamSystem, string stream) -> void RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.get -> string -RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.set -> void RabbitMQ.Stream.Client.Reliable.ProducerConfig.SuperStreamConfig.get -> RabbitMQ.Stream.Client.Reliable.SuperStreamConfig RabbitMQ.Stream.Client.Reliable.ProducerConfig.SuperStreamConfig.set -> void RabbitMQ.Stream.Client.Reliable.ProducerConfig.TimeoutMessageAfter.get -> System.TimeSpan diff --git a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt index 3a4252bf..5db01bb0 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt @@ -1,3 +1,12 @@ RabbitMQ.Stream.Client.AbstractEntity.MaybeCancelToken() -> void RabbitMQ.Stream.Client.AbstractEntity.Token.get -> System.Threading.CancellationToken -RabbitMQ.Stream.Client.Chunk.MagicVersion.get -> byte \ No newline at end of file +RabbitMQ.Stream.Client.Chunk.MagicVersion.get -> byte +RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer +RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Close() -> System.Threading.Tasks.Task +RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.GetLastPublishedId() -> System.Threading.Tasks.Task +RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.IsOpen() -> bool +RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Send(ulong publishing, RabbitMQ.Stream.Client.Message message) -> System.Threading.Tasks.ValueTask +RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig +RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig.DeduplicatingProducerConfig(RabbitMQ.Stream.Client.StreamSystem streamSystem, string stream, string reference) -> void +RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.set -> void +static RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Create(RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig producerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task \ No newline at end of file diff --git a/RabbitMQ.Stream.Client/RawProducer.cs b/RabbitMQ.Stream.Client/RawProducer.cs index a6af1c8d..285e0651 100644 --- a/RabbitMQ.Stream.Client/RawProducer.cs +++ b/RabbitMQ.Stream.Client/RawProducer.cs @@ -59,7 +59,8 @@ public static async Task Create( ILogger logger = null ) { - var client = await RoutingHelper.LookupLeaderConnection(clientParameters, metaStreamInfo, logger).ConfigureAwait(false); + var client = await RoutingHelper.LookupLeaderConnection(clientParameters, metaStreamInfo, logger) + .ConfigureAwait(false); var producer = new RawProducer((Client)client, config, logger); await producer.Init().ConfigureAwait(false); @@ -224,7 +225,13 @@ private async Task SendMessages(List<(ulong, Message)> messages, bool clearMessa /// The last sequence id stored by the producer. public async Task GetLastPublishingId() { - var response = await _client.QueryPublisherSequence(_config.Reference, _config.Stream).ConfigureAwait(false); + if (string.IsNullOrWhiteSpace(_config.Reference)) + { + return 0; + } + + var response = await _client.QueryPublisherSequence(_config.Reference, _config.Stream) + .ConfigureAwait(false); ClientExceptions.MaybeThrowException(response.ResponseCode, $"GetLastPublishingId stream: {_config.Stream}, reference: {_config.Reference}"); return response.Sequence; @@ -310,7 +317,8 @@ public async Task Close() // in this case we reduce the waiting time // the producer could be removed because of stream deleted // so it is not necessary to wait. - var closeResponse = await _client.DeletePublisher(_publisherId).WaitAsync(TimeSpan.FromSeconds(3)).ConfigureAwait(false); + var closeResponse = await _client.DeletePublisher(_publisherId).WaitAsync(TimeSpan.FromSeconds(3)) + .ConfigureAwait(false); result = closeResponse.ResponseCode; } catch (Exception e) diff --git a/RabbitMQ.Stream.Client/Reliable/DeduplicatingProducer.cs b/RabbitMQ.Stream.Client/Reliable/DeduplicatingProducer.cs new file mode 100644 index 00000000..181fa1cc --- /dev/null +++ b/RabbitMQ.Stream.Client/Reliable/DeduplicatingProducer.cs @@ -0,0 +1,88 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// Copyright (c) 2007-2023 VMware, Inc. + +using System; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; + +namespace RabbitMQ.Stream.Client.Reliable; + +public record DeduplicatingProducerConfig : ProducerConfig +{ + public DeduplicatingProducerConfig(StreamSystem streamSystem, string stream, string reference) : base(streamSystem, + stream) + { + if (string.IsNullOrWhiteSpace(reference)) + throw new ArgumentException("Reference cannot be null or empty", nameof(reference)); + _reference = reference; + } +} + +// DeduplicatingProducer is a wrapper around the Producer class +// to handle the deduplication of the messages. +// The deduplication is enabled by setting the reference in the DeduplicationProducerConfig +// and it is mandatory to set the reference. +// This class it to use in an easy way the deduplication feature. +// the low level API is the RawProducer class, this class sets the right parameters to enable the deduplication +// The only api is `Send(ulong publishing, Message message)`. In this case the user has to manage the sequence +// to decide deduplication or not. +// The best way to handle the deduplication is to use a single thread avoiding the id overlaps. + +public class DeduplicatingProducer +{ + private Producer _producer = null!; + + public static async Task Create(DeduplicatingProducerConfig producerConfig, + ILogger logger = null) + { + var x = new DeduplicatingProducer() + { + _producer = await Producer + .Create( + new ProducerConfig(producerConfig.StreamSystem, producerConfig.Stream) + { + _reference = producerConfig.Reference, + ConfirmationHandler = producerConfig.ConfirmationHandler, + ReconnectStrategy = producerConfig.ReconnectStrategy, + ClientProvidedName = producerConfig.ClientProvidedName, + MaxInFlight = producerConfig.MaxInFlight, + MessagesBufferSize = producerConfig.MessagesBufferSize, + TimeoutMessageAfter = producerConfig.TimeoutMessageAfter, + }, logger) + .ConfigureAwait(false) + }; + return x; + } + + private DeduplicatingProducer() + { + } + + // Send a message with a specific publishing id + // the publishing id is used to deduplicate the messages + // the publishing id must be unique and incremental. The publishing ID may have gaps. + // It is important to always increment the ID, otherwise, messages will be discarded by the deduplication algorithm + public async ValueTask Send(ulong publishing, Message message) + { + await _producer.SendInternal(publishing, message).ConfigureAwait(false); + } + + public async Task Close() + { + await _producer.Close().ConfigureAwait(false); + } + + public bool IsOpen() + { + return _producer.IsOpen(); + } + + // Get the last publishing id from the producer/reference + // this is useful to know the last id used to deduplicate the messages + // so it is possible to restart the producer with the last id + public async Task GetLastPublishedId() + { + return await _producer.GetLastPublishingId().ConfigureAwait(false); + } +} diff --git a/RabbitMQ.Stream.Client/Reliable/Producer.cs b/RabbitMQ.Stream.Client/Reliable/Producer.cs index de110b5e..d0fb943b 100644 --- a/RabbitMQ.Stream.Client/Reliable/Producer.cs +++ b/RabbitMQ.Stream.Client/Reliable/Producer.cs @@ -18,15 +18,35 @@ public record SuperStreamConfig public Func Routing { get; set; } } +[AttributeUsage(AttributeTargets.Method)] +internal class MyMethodAttribute : Attribute +{ + public string Message { get; } + + public MyMethodAttribute(string message) + { + Message = message; + } +} + public record ProducerConfig : ReliableConfig { private readonly TimeSpan _timeoutMessageAfter = TimeSpan.FromSeconds(3); /// - /// Reference is mostly used for deduplication. - /// In most of the cases reference is not needed. + /// Reference used for deduplication. + /// For the Producer Class, it is not needed to set this value + /// See DeduplicatingProducer for Deduplication Messages where this value is needed. /// - public string Reference { get; set; } + internal string _reference; + + public string Reference + { + get { return _reference; } + [Obsolete("Deprecated. Use ClientProvidedName instead. Se DeduplicatingProducer for Deduplication Messages ", + false)] + set { _reference = value; } + } /// /// Publish confirmation callback.
@@ -113,7 +133,7 @@ public class Producer : ProducerFactory protected override ILogger BaseLogger => _logger; - private Producer(ProducerConfig producerConfig, ILogger logger = null) + private protected Producer(ProducerConfig producerConfig, ILogger logger = null) { _producerConfig = producerConfig; _confirmationPipe = new ConfirmationPipe( @@ -205,11 +225,25 @@ public override async Task Close() /// In case of error the message is considered as timed out, you will receive a confirmation with the status TimedOut. public async ValueTask Send(Message message) { - await SemaphoreSlim.WaitAsync().ConfigureAwait(false); + try + { + await SendInternal(Interlocked.Increment(ref _publishingId), message).ConfigureAwait(false); + } + finally + { + SemaphoreSlim.Release(); + } + } - Interlocked.Increment(ref _publishingId); - _confirmationPipe.AddUnConfirmedMessage(_publishingId, message); + internal async Task GetLastPublishingId() + { + return await _producer.GetLastPublishingId().ConfigureAwait(false); + } + + internal async ValueTask SendInternal(ulong publishingId, Message message) + { + _confirmationPipe.AddUnConfirmedMessage(publishingId, message); try { // This flags avoid some race condition, @@ -219,7 +253,7 @@ public async ValueTask Send(Message message) // on the _waitForConfirmation list. The user will get Timeout Error if (!(_inReconnection)) { - await _producer.Send(_publishingId, message).ConfigureAwait(false); + await _producer.Send(publishingId, message).ConfigureAwait(false); } } @@ -230,10 +264,6 @@ public async ValueTask Send(Message message) "Message wont' receive confirmation so you will receive a timeout error", _producerConfig.Stream); } - finally - { - SemaphoreSlim.Release(); - } } /// diff --git a/Tests/DeduplicationProducerTests.cs b/Tests/DeduplicationProducerTests.cs new file mode 100644 index 00000000..a5320094 --- /dev/null +++ b/Tests/DeduplicationProducerTests.cs @@ -0,0 +1,121 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// Copyright (c) 2007-2023 VMware, Inc. + +using System; +using System.Threading.Tasks; +using RabbitMQ.Stream.Client; +using RabbitMQ.Stream.Client.Reliable; +using Xunit; +using Xunit.Abstractions; + +namespace Tests; + +public class DeduplicationProducerTests +{ + private readonly ITestOutputHelper _testOutputHelper; + + public DeduplicationProducerTests(ITestOutputHelper testOutputHelper) + { + _testOutputHelper = testOutputHelper; + } + + [Fact] + public async Task ValidateDeduplicationProducer() + { + SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + Assert.Throws(() => new DeduplicatingProducerConfig(system, stream, null)); + await Assert.ThrowsAsync(async () => + // reference is white space, not valid + await DeduplicatingProducer.Create(new DeduplicatingProducerConfig(system, stream, " "))); + await SystemUtils.CleanUpStreamSystem(system, stream); + } + + [Fact] + public async Task GetLastIdShouldBeEqualtoTheMessagesSent() + { + // here we create a producer with a reference + // the reference is used to enable the deduplication + // then we query the sequence externally form the producer to be sure that + // the values are the same + SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + var testPassed = new TaskCompletionSource(); + const ulong TotalMessages = 1000UL; + var p = await DeduplicatingProducer.Create( + new DeduplicatingProducerConfig(system, stream, "my_producer_reference") + { + ConfirmationHandler = async confirmation => + { + if (confirmation.PublishingId == TotalMessages) + testPassed.SetResult(TotalMessages); + await Task.CompletedTask; + }, + }); + for (ulong i = 1; i <= TotalMessages; i++) + { + await p.Send(i, new Message(new byte[10])); + } + + new Utils(_testOutputHelper).WaitUntilTaskCompletes(testPassed); + SystemUtils.Wait(); + Assert.Equal(TotalMessages, await p.GetLastPublishedId()); + await p.Close(); + Assert.False(p.IsOpen()); + + // here we query the sequence externally form the producer to be sure that + // the values are the same + Assert.Equal(TotalMessages, await system.QuerySequence("my_producer_reference", stream)); + await SystemUtils.CleanUpStreamSystem(system, stream); + } + + [Fact] + public async Task DeduplicationInActionSendingTheSameIdMessagesWontStore() + { + // in this test we send the same messages again with the same publishing id + // to see the deduplication in action + + SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + var testPassed = new TaskCompletionSource(); + const ulong TotalMessages = 1000UL; + var p = await DeduplicatingProducer.Create( + new DeduplicatingProducerConfig(system, stream, "my_producer_reference") + { + ConfirmationHandler = async confirmation => + { + if (confirmation.PublishingId == TotalMessages) + testPassed.SetResult(TotalMessages); + await Task.CompletedTask; + }, + }); + // first send and the messages are stored + for (ulong i = 1; i <= TotalMessages; i++) + { + await p.Send(i, new Message(new byte[10])); + } + + new Utils(_testOutputHelper).WaitUntilTaskCompletes(testPassed); + SystemUtils.Wait(); + Assert.Equal(TotalMessages, await p.GetLastPublishedId()); + + // we send the same messages again with the same publishing id + // so the messages won't be stored due of the deduplication + for (ulong i = 1; i <= TotalMessages; i++) + { + await p.Send(i, new Message(new byte[10])); + } + + SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(stream) == (int)TotalMessages); + + // we are out of the deduplication window so the messages will be stored + // we start from the last published id + 1 + await p.Send(await p.GetLastPublishedId() + 1, new Message(new byte[10])); + await p.Send(await p.GetLastPublishedId() + 2, new Message(new byte[10])); + + // the total messages should be the TotalMessages + 2 new messages + SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(stream) == (int)TotalMessages + 2); + await p.Close(); + Assert.False(p.IsOpen()); + + await SystemUtils.CleanUpStreamSystem(system, stream); + } +} diff --git a/Tests/MultiThreadTests.cs b/Tests/MultiThreadTests.cs index d3b1cdaf..c26f4a32 100644 --- a/Tests/MultiThreadTests.cs +++ b/Tests/MultiThreadTests.cs @@ -35,6 +35,8 @@ public MultiThreadTests(ITestOutputHelper testOutputHelper) public async Task PublishMessagesInMultiThreads() { SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + const int TotalMessages = 1000; + const int ThreadNumber = 3; var receivedTask = new TaskCompletionSource(); var confirmed = 0; var error = 0; @@ -53,7 +55,7 @@ public async Task PublishMessagesInMultiThreads() break; } - if (error + confirmed == 802) + if (confirmed == TotalMessages * ThreadNumber) { receivedTask.SetResult(confirmed); } @@ -62,20 +64,20 @@ public async Task PublishMessagesInMultiThreads() } }); - for (var i = 0; i < 2; i++) + for (var i = 0; i < ThreadNumber; i++) { _ = Task.Run(async () => { - for (var j = 0; j < 401; j++) + for (var j = 0; j < TotalMessages; j++) { - await producer.Send(new RabbitMQ.Stream.Client.Message(new byte[3])); + await producer.Send(new Message(new byte[3])); } }); } new Utils(_testOutputHelper).WaitUntilTaskCompletes(receivedTask); - Assert.Equal(802, confirmed); - Assert.Equal(802, receivedTask.Task.Result); + Assert.Equal(TotalMessages * ThreadNumber, confirmed); + Assert.Equal(TotalMessages * ThreadNumber, receivedTask.Task.Result); Assert.Equal(0, error); await system.DeleteStream(stream); } diff --git a/Tests/ReliableTests.cs b/Tests/ReliableTests.cs index 940176b7..5791836f 100644 --- a/Tests/ReliableTests.cs +++ b/Tests/ReliableTests.cs @@ -28,29 +28,33 @@ public ReliableTests(ITestOutputHelper testOutputHelper) [Fact] public void MessageWithoutConfirmationRaiseTimeout() { - var confirmationTask = new TaskCompletionSource>(); - var l = new List(); - var confirmationPipe = new ConfirmationPipe(confirmation => + var confirmationTask = new TaskCompletionSource(); + var l = new List(); + var confirmationPipe = new ConfirmationPipe(async confirmation => { - l.Add(confirmation); + l.Add(confirmation.Status); if (confirmation.PublishingId == 2) { - confirmationTask.SetResult(l); + await Task.CompletedTask; + confirmationTask.SetResult(2); + } + else + { + await Task.CompletedTask; } - - return Task.CompletedTask; }, TimeSpan.FromSeconds(2), 100 ); confirmationPipe.Start(); - var message = new Message(Encoding.UTF8.GetBytes($"hello")); - confirmationPipe.AddUnConfirmedMessage(1, message); - confirmationPipe.AddUnConfirmedMessage(2, new List() { message }); - new Utils>(_testOutputHelper).WaitUntilTaskCompletes(confirmationTask); + confirmationPipe.AddUnConfirmedMessage(1, new Message(Encoding.UTF8.GetBytes($"hello"))); + confirmationPipe.AddUnConfirmedMessage(2, new List() { new Message(Encoding.UTF8.GetBytes($"hello")) }); + new Utils(_testOutputHelper).WaitUntilTaskCompletes(confirmationTask); + Assert.Equal(2, confirmationTask.Task.Result); + Assert.Equal(2, l.Count); // time out error is sent by the internal time that checks the status // if the message doesn't receive the confirmation within X time, the timeout error is raised. - Assert.Equal(ConfirmationStatus.ClientTimeoutError, confirmationTask.Task.Result[0].Status); - Assert.Equal(ConfirmationStatus.ClientTimeoutError, confirmationTask.Task.Result[1].Status); + Assert.Equal(ConfirmationStatus.ClientTimeoutError, l[0]); + Assert.Equal(ConfirmationStatus.ClientTimeoutError, l[1]); confirmationPipe.Stop(); } @@ -235,9 +239,10 @@ public async void HandleChangeStreamConfigurationWithMetaDataUpdate() public async void AutoPublishIdDefaultShouldStartFromTheLast() { // RProducer automatically retrieves the last producer offset. - // see IPublishingIdStrategy implementation // This tests if the the last id stored // A new RProducer should restart from the last offset. + // This test will be removed when Reference will be mandatory + // in the DeduplicationProducer SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); var testPassed = new TaskCompletionSource(); @@ -247,8 +252,8 @@ public async void AutoPublishIdDefaultShouldStartFromTheLast() var producer = await Producer.Create( new ProducerConfig(system, stream) { + _reference = reference, ClientProvidedName = clientProviderName, - Reference = reference, ConfirmationHandler = confirm => { if (Interlocked.Increment(ref count) != 5) @@ -284,7 +289,7 @@ public async void AutoPublishIdDefaultShouldStartFromTheLast() var producerSecond = await Producer.Create( new ProducerConfig(system, stream) { - Reference = reference, + _reference = reference, ClientProvidedName = clientProviderName, ConfirmationHandler = confirm => { diff --git a/Tests/Utils.cs b/Tests/Utils.cs index 0af17632..8b568bb9 100644 --- a/Tests/Utils.cs +++ b/Tests/Utils.cs @@ -113,6 +113,12 @@ public static void InitStreamSystemWithRandomStream(out StreamSystem system, out x.Wait(); } + public static async Task CleanUpStreamSystem(StreamSystem system, string stream) + { + await system.DeleteStream(stream); + await system.Close(); + } + public static async Task PublishMessages(StreamSystem system, string stream, int numberOfMessages, ITestOutputHelper testOutputHelper) {