Skip to content

Commit

Permalink
Add filter for deduplication producer (#383)
Browse files Browse the repository at this point in the history
* Add filter for deduplication producer
Fixes: #382
---------

Signed-off-by: Gabriele Santomaggio <g.santomaggio@gmail.com>
  • Loading branch information
Gsantomaggio authored Jun 1, 2024
1 parent fc47863 commit dbfcdc9
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 1 deletion.
4 changes: 4 additions & 0 deletions RabbitMQ.Stream.Client/Reliable/DeduplicatingProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ public static async Task<DeduplicatingProducer> Create(DeduplicatingProducerConf
MaxInFlight = producerConfig.MaxInFlight,
MessagesBufferSize = producerConfig.MessagesBufferSize,
TimeoutMessageAfter = producerConfig.TimeoutMessageAfter,
Filter = producerConfig.Filter,
Identifier = producerConfig.Identifier,
ResourceAvailableReconnectStrategy = producerConfig.ResourceAvailableReconnectStrategy,

}, logger)
.ConfigureAwait(false)
};
Expand Down
111 changes: 110 additions & 1 deletion Tests/FilterTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,115 @@ async Task SendTo(string state)
await SystemUtils.CleanUpStreamSystem(system, stream).ConfigureAwait(false);
}

[SkippableFact]
public async void FilterShouldReturnOnlyOneChunkWithDeduplication()
{
SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
if (!AvailableFeaturesSingleton.Instance.PublishFilter)
{
throw new SkipException("broker does not support filter");
}

var deduplicatingProducer = await DeduplicatingProducer.Create(
new DeduplicatingProducerConfig(system, stream, "my_ref")
{
Filter = new ProducerFilter()
{
// define the producer filter
FilterValue = message => message.ApplicationProperties["state"].ToString(),
}
}
);

const int ToSend = 50;

async Task SendTo(string state, ulong start)
{
for (var i = (0 + start); i < ToSend + start; i++)
{
var message = new Message(Encoding.UTF8.GetBytes($"Message: {i}. State: {state}"))
{
ApplicationProperties = new ApplicationProperties() { ["state"] = state },
Properties = new Properties() { GroupId = $"group_{i}" }
};
await deduplicatingProducer.Send(i, message).ConfigureAwait(false);
}
}

await SendTo("Alabama", 0);
await Task.Delay(TimeSpan.FromSeconds(2)).ConfigureAwait(false);
await SendTo("New York", ToSend);
await Task.Delay(TimeSpan.FromSeconds(2)).ConfigureAwait(false);

var testPassedAlabama = new TaskCompletionSource<int>();
var consumedAlabama = new List<Message>();
var consumerAlabama = await Consumer.Create(new ConsumerConfig(system, stream)
{
OffsetSpec = new OffsetTypeFirst(),

// This is mandatory for enabling the filter
Filter = new ConsumerFilter()
{
Values = new List<string>() { "Alabama" },
PostFilter =
_ =>
true, // we don't apply any post filter here to be sure that the server is doing the filtering
MatchUnfiltered = true
},
MessageHandler = (_, _, _, message) =>
{
consumedAlabama.Add(message);
if (consumedAlabama.Count == ToSend)
{
testPassedAlabama.SetResult(ToSend);
}
return Task.CompletedTask;
}
}).ConfigureAwait(false);
Assert.True(testPassedAlabama.Task.Wait(TimeSpan.FromSeconds(5)));

Assert.Equal(ToSend, consumedAlabama.Count);

// check that only the messages from Alabama were
consumedAlabama.Where(m => m.ApplicationProperties["state"].Equals("Alabama")).ToList().ForEach(m =>
{
Assert.Equal("Alabama", m.ApplicationProperties["state"]);
});

await consumerAlabama.Close().ConfigureAwait(false);
// let's reset
var consumedNY = new List<Message>();

var consumerNY = await Consumer.Create(new ConsumerConfig(system, stream)
{
OffsetSpec = new OffsetTypeFirst(),

// This is mandatory for enabling the filter
Filter = new ConsumerFilter()
{
Values = new List<string>() { "New York" },
PostFilter =
message => message.Properties.GroupId.ToString()!
.Equals("group_55"), // we only want the message with group_55 ignoring the rest
// this filter is client side. We should have two messages with group_55
// One for the standard send and one for the batch send
MatchUnfiltered = true
},
MessageHandler = (_, _, _, message) =>
{
consumedNY.Add(message);
return Task.CompletedTask;
}
}).ConfigureAwait(false);

SystemUtils.Wait(TimeSpan.FromSeconds(2));
Assert.Single(consumedNY);
Assert.Equal("group_55", consumedNY[0].Properties.GroupId!);
await consumerNY.Close().ConfigureAwait(false);
await SystemUtils.CleanUpStreamSystem(system, stream).ConfigureAwait(false);
}

// This test is to test when there are errors on the filter functions
// producer side and consumer side.
// FilterValue and PostFilter are user's functions and can throw exceptions
Expand Down Expand Up @@ -249,7 +358,7 @@ await producer.Send(new Message(Encoding.UTF8.GetBytes("Message: " + i))
OffsetSpec = new OffsetTypeFirst(),
Filter = new ConsumerFilter()
{
Values = new List<string>() { "my_filter" },// at this level we don't care about the filter value
Values = new List<string>() { "my_filter" }, // at this level we don't care about the filter value
PostFilter =
message =>
{
Expand Down

0 comments on commit dbfcdc9

Please sign in to comment.