Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Deduplicating Producer #234

Merged
merged 7 commits into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion RabbitMQ.Stream.Client/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,6 @@ RabbitMQ.Stream.Client.Reliable.ProducerConfig.MessagesBufferSize.get -> int
RabbitMQ.Stream.Client.Reliable.ProducerConfig.MessagesBufferSize.set -> void
RabbitMQ.Stream.Client.Reliable.ProducerConfig.ProducerConfig(RabbitMQ.Stream.Client.StreamSystem streamSystem, string stream) -> void
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.get -> string
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.set -> void
RabbitMQ.Stream.Client.Reliable.ProducerConfig.SuperStreamConfig.get -> RabbitMQ.Stream.Client.Reliable.SuperStreamConfig
RabbitMQ.Stream.Client.Reliable.ProducerConfig.SuperStreamConfig.set -> void
RabbitMQ.Stream.Client.Reliable.ProducerConfig.TimeoutMessageAfter.get -> System.TimeSpan
Expand Down
11 changes: 10 additions & 1 deletion RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
RabbitMQ.Stream.Client.AbstractEntity.MaybeCancelToken() -> void
RabbitMQ.Stream.Client.AbstractEntity.Token.get -> System.Threading.CancellationToken
RabbitMQ.Stream.Client.Chunk.MagicVersion.get -> byte
RabbitMQ.Stream.Client.Chunk.MagicVersion.get -> byte
RabbitMQ.Stream.Client.Reliable.DeduplicationProducer
RabbitMQ.Stream.Client.Reliable.DeduplicationProducer.Close() -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.Reliable.DeduplicationProducer.GetLastPublishedId() -> System.Threading.Tasks.Task<ulong>
RabbitMQ.Stream.Client.Reliable.DeduplicationProducer.IsOpen() -> bool
RabbitMQ.Stream.Client.Reliable.DeduplicationProducer.Send(ulong publishing, RabbitMQ.Stream.Client.Message message) -> System.Threading.Tasks.ValueTask
RabbitMQ.Stream.Client.Reliable.DeduplicationProducerConfig
RabbitMQ.Stream.Client.Reliable.DeduplicationProducerConfig.DeduplicationProducerConfig(RabbitMQ.Stream.Client.StreamSystem streamSystem, string stream, string reference) -> void
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.set -> void
static RabbitMQ.Stream.Client.Reliable.DeduplicationProducer.Create(RabbitMQ.Stream.Client.Reliable.DeduplicationProducerConfig producerConfig, Microsoft.Extensions.Logging.ILogger<RabbitMQ.Stream.Client.Reliable.Producer> logger = null) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.Reliable.DeduplicationProducer>
14 changes: 11 additions & 3 deletions RabbitMQ.Stream.Client/RawProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public static async Task<IProducer> Create(
ILogger logger = null
)
{
var client = await RoutingHelper<Routing>.LookupLeaderConnection(clientParameters, metaStreamInfo, logger).ConfigureAwait(false);
var client = await RoutingHelper<Routing>.LookupLeaderConnection(clientParameters, metaStreamInfo, logger)
.ConfigureAwait(false);

var producer = new RawProducer((Client)client, config, logger);
await producer.Init().ConfigureAwait(false);
Expand Down Expand Up @@ -224,7 +225,13 @@ private async Task SendMessages(List<(ulong, Message)> messages, bool clearMessa
/// <returns>The last sequence id stored by the producer.</returns>
public async Task<ulong> GetLastPublishingId()
{
var response = await _client.QueryPublisherSequence(_config.Reference, _config.Stream).ConfigureAwait(false);
if (string.IsNullOrWhiteSpace(_config.Reference))
{
return 0;
}

var response = await _client.QueryPublisherSequence(_config.Reference, _config.Stream)
.ConfigureAwait(false);
ClientExceptions.MaybeThrowException(response.ResponseCode,
$"GetLastPublishingId stream: {_config.Stream}, reference: {_config.Reference}");
return response.Sequence;
Expand Down Expand Up @@ -310,7 +317,8 @@ public async Task<ResponseCode> Close()
// in this case we reduce the waiting time
// the producer could be removed because of stream deleted
// so it is not necessary to wait.
var closeResponse = await _client.DeletePublisher(_publisherId).WaitAsync(TimeSpan.FromSeconds(3)).ConfigureAwait(false);
var closeResponse = await _client.DeletePublisher(_publisherId).WaitAsync(TimeSpan.FromSeconds(3))
.ConfigureAwait(false);
result = closeResponse.ResponseCode;
}
catch (Exception e)
Expand Down
87 changes: 87 additions & 0 deletions RabbitMQ.Stream.Client/Reliable/DeduplicationProducer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2007-2023 VMware, Inc.

using System;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

namespace RabbitMQ.Stream.Client.Reliable;

public record DeduplicationProducerConfig : ProducerConfig
{
public DeduplicationProducerConfig(StreamSystem streamSystem, string stream, string reference) : base(streamSystem,
stream)
{
if (string.IsNullOrWhiteSpace(reference))
throw new ArgumentException("Reference cannot be null or empty", nameof(reference));
_reference = reference;
}
}

// DeduplicationProducer is a wrapper around the Producer class
// to handle the deduplication of the messages.
// The deduplication is enabled by setting the reference in the DeduplicationProducerConfig
// and it is mandatory to set the reference.
// This class it to use in an easy way the deduplication feature.
// the low level API is the RawProducer class, this class sets the right parameters to enable the deduplication
// The only api is `Send(ulong publishing, Message message)`. In this case the user has to manage the sequence
// to decide deduplication or not.
// The best way to handle the deduplication is to use a single thread avoiding the id overlaps.

public class DeduplicationProducer
{
private Producer _producer = null!;

public static async Task<DeduplicationProducer> Create(DeduplicationProducerConfig producerConfig,
ILogger<Producer> logger = null)
{
var x = new DeduplicationProducer()
{
_producer = await Producer
.Create(
new ProducerConfig(producerConfig.StreamSystem, producerConfig.Stream)
{
_reference = producerConfig.Reference,
ConfirmationHandler = producerConfig.ConfirmationHandler,
ReconnectStrategy = producerConfig.ReconnectStrategy,
ClientProvidedName = producerConfig.ClientProvidedName,
MaxInFlight = producerConfig.MaxInFlight,
MessagesBufferSize = producerConfig.MessagesBufferSize,
TimeoutMessageAfter = producerConfig.TimeoutMessageAfter,
}, logger)
.ConfigureAwait(false)
};
return x;
}

private DeduplicationProducer()
{
}

// Send a message with a specific publishing id
// the publishing id is used to deduplicate the messages
// the publishing id must be unique and incremental. It can accept gaps the important is to be incremental
public async ValueTask Send(ulong publishing, Message message)
{
await _producer.SendInternal(publishing, message).ConfigureAwait(false);
}

public async Task Close()
{
await _producer.Close().ConfigureAwait(false);
}

public bool IsOpen()
{
return _producer.IsOpen();
}

// Get the last publishing id from the producer/reference
// this is useful to know the last id used to deduplicate the messages
// so it is possible to restart the producer with the last id
public async Task<ulong> GetLastPublishedId()
{
return await _producer.GetLastPublishingId().ConfigureAwait(false);
}
}
54 changes: 45 additions & 9 deletions RabbitMQ.Stream.Client/Reliable/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,35 @@ public record SuperStreamConfig
public Func<Message, string> Routing { get; set; }
}

[AttributeUsage(AttributeTargets.Method)]
internal class MyMethodAttribute : Attribute
{
public string Message { get; }

public MyMethodAttribute(string message)
{
Message = message;
}
}

public record ProducerConfig : ReliableConfig
{
private readonly TimeSpan _timeoutMessageAfter = TimeSpan.FromSeconds(3);

/// <summary>
/// Reference is mostly used for deduplication.
/// In most of the cases reference is not needed.
/// Reference used for deduplication.
/// For the Producer Class it is not needed to set this value.
Gsantomaggio marked this conversation as resolved.
Show resolved Hide resolved
/// See DeduplicationProducer for Deduplication Messages where this value is needed.
Gsantomaggio marked this conversation as resolved.
Show resolved Hide resolved
/// </summary>
public string Reference { get; set; }
internal string _reference;

public string Reference
{
get { return _reference; }
[Obsolete("Deprecated. Use ClientProvidedName instead. Se DeduplicationProducer for Deduplication Messages ",
false)]
set { _reference = value; }
}

/// <summary>
/// Publish confirmation callback.<br/>
Expand Down Expand Up @@ -113,7 +133,7 @@ public class Producer : ProducerFactory

protected override ILogger BaseLogger => _logger;

private Producer(ProducerConfig producerConfig, ILogger<Producer> logger = null)
private protected Producer(ProducerConfig producerConfig, ILogger<Producer> logger = null)
{
_producerConfig = producerConfig;
_confirmationPipe = new ConfirmationPipe(
Expand Down Expand Up @@ -205,11 +225,27 @@ public override async Task Close()
/// In case of error the message is considered as timed out, you will receive a confirmation with the status TimedOut.
public async ValueTask Send(Message message)
{

await SemaphoreSlim.WaitAsync().ConfigureAwait(false);
try
{
await SendInternal(Interlocked.Increment(ref _publishingId), message).ConfigureAwait(false);
}
finally
{
SemaphoreSlim.Release();
}
}

Interlocked.Increment(ref _publishingId);
_confirmationPipe.AddUnConfirmedMessage(_publishingId, message);
internal async Task<ulong> GetLastPublishingId()
{
return await _producer.GetLastPublishingId().ConfigureAwait(false);
}

internal async ValueTask SendInternal(ulong publishingId, Message message)
{
// await SemaphoreSlim.WaitAsync().ConfigureAwait(false);
// Interlocked.Increment(ref _publishingId);
_confirmationPipe.AddUnConfirmedMessage(publishingId, message);
try
{
// This flags avoid some race condition,
Expand All @@ -219,7 +255,7 @@ public async ValueTask Send(Message message)
// on the _waitForConfirmation list. The user will get Timeout Error
if (!(_inReconnection))
{
await _producer.Send(_publishingId, message).ConfigureAwait(false);
await _producer.Send(publishingId, message).ConfigureAwait(false);
}
}

Expand All @@ -232,7 +268,7 @@ public async ValueTask Send(Message message)
}
finally
{
SemaphoreSlim.Release();
// SemaphoreSlim.Release();
}
Gsantomaggio marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down
131 changes: 131 additions & 0 deletions Tests/DeduplicationProducerTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2007-2023 VMware, Inc.

/* Unmerged change from project 'Tests(net7.0)'
Before:
using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
After:
using System.Threading.Tasks;
*/
Gsantomaggio marked this conversation as resolved.
Show resolved Hide resolved

using System;
using System.Threading.Tasks;
using RabbitMQ.Stream.Client;
using RabbitMQ.Stream.Client.Reliable;
using Xunit;
using Xunit.Abstractions;

namespace Tests;

public class DeduplicationProducerTests
{
private readonly ITestOutputHelper _testOutputHelper;

public DeduplicationProducerTests(ITestOutputHelper testOutputHelper)
{
_testOutputHelper = testOutputHelper;
}

[Fact]
public async Task ValidateDeduplicationProducer()
{
SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
Assert.Throws<ArgumentException>(() => new DeduplicationProducerConfig(system, stream, null));
await Assert.ThrowsAsync<ArgumentException>(async () =>
// reference is white space, not valid
await DeduplicationProducer.Create(new DeduplicationProducerConfig(system, stream, " ")));
await SystemUtils.CleanUpStreamSystem(system, stream);
}

[Fact]
public async Task GetLastIdShouldBeEqualtoTheMessagesSent()
{
// here we create a producer with a reference
// the reference is used to enable the deduplication
// then we query the sequence externally form the producer to be sure that
// the values are the same
SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
var testPassed = new TaskCompletionSource<ulong>();
const ulong TotalMessages = 1000UL;
var p = await DeduplicationProducer.Create(
new DeduplicationProducerConfig(system, stream, "my_producer_reference")
{
ConfirmationHandler = async confirmation =>
{
if (confirmation.PublishingId == TotalMessages)
testPassed.SetResult(TotalMessages);
await Task.CompletedTask;
},
});
for (ulong i = 1; i <= TotalMessages; i++)
{
await p.Send(i, new Message(new byte[10]));
}

new Utils<ulong>(_testOutputHelper).WaitUntilTaskCompletes(testPassed);
SystemUtils.Wait();
Assert.Equal(TotalMessages, await p.GetLastPublishedId());
await p.Close();
Assert.False(p.IsOpen());

// here we query the sequence externally form the producer to be sure that
// the values are the same
Assert.Equal(TotalMessages, await system.QuerySequence("my_producer_reference", stream));
await SystemUtils.CleanUpStreamSystem(system, stream);
}

[Fact]
public async Task DeduplicationInActionSendingTheSameIdMessagesWontStore()
{
// in this test we send the same messages again with the same publishing id
// to see the deduplication in action

SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
var testPassed = new TaskCompletionSource<ulong>();
const ulong TotalMessages = 1000UL;
var p = await DeduplicationProducer.Create(
new DeduplicationProducerConfig(system, stream, "my_producer_reference")
{
ConfirmationHandler = async confirmation =>
{
if (confirmation.PublishingId == TotalMessages)
testPassed.SetResult(TotalMessages);
await Task.CompletedTask;
},
});
// first send and the messages are stored
for (ulong i = 1; i <= TotalMessages; i++)
{
await p.Send(i, new Message(new byte[10]));
}

new Utils<ulong>(_testOutputHelper).WaitUntilTaskCompletes(testPassed);
SystemUtils.Wait();
Assert.Equal(TotalMessages, await p.GetLastPublishedId());

// we send the same messages again with the same publishing id
// so the messages won't be stored due of the deduplication
for (ulong i = 1; i <= TotalMessages; i++)
{
await p.Send(i, new Message(new byte[10]));
}

SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(stream) == (int)TotalMessages);

// we are out of the deduplication window so the messages will be stored
// we start from the last published id + 1
await p.Send(await p.GetLastPublishedId() + 1, new Message(new byte[10]));
await p.Send(await p.GetLastPublishedId() + 2, new Message(new byte[10]));

// the total messages should be the TotalMessages + 2 new messages
SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(stream) == (int)TotalMessages + 2);
await p.Close();
Assert.False(p.IsOpen());

await SystemUtils.CleanUpStreamSystem(system, stream);
}
}
Loading