diff --git a/RabbitMQ.Stream.Client/Reliable/DeduplicatingProducer.cs b/RabbitMQ.Stream.Client/Reliable/DeduplicatingProducer.cs index d476b6d4..3b2c94e0 100644 --- a/RabbitMQ.Stream.Client/Reliable/DeduplicatingProducer.cs +++ b/RabbitMQ.Stream.Client/Reliable/DeduplicatingProducer.cs @@ -50,6 +50,10 @@ public static async Task Create(DeduplicatingProducerConf MaxInFlight = producerConfig.MaxInFlight, MessagesBufferSize = producerConfig.MessagesBufferSize, TimeoutMessageAfter = producerConfig.TimeoutMessageAfter, + Filter = producerConfig.Filter, + Identifier = producerConfig.Identifier, + ResourceAvailableReconnectStrategy = producerConfig.ResourceAvailableReconnectStrategy, + }, logger) .ConfigureAwait(false) }; diff --git a/Tests/FilterTest.cs b/Tests/FilterTest.cs index 9acadab5..4951b46f 100644 --- a/Tests/FilterTest.cs +++ b/Tests/FilterTest.cs @@ -172,6 +172,115 @@ async Task SendTo(string state) await SystemUtils.CleanUpStreamSystem(system, stream).ConfigureAwait(false); } + [SkippableFact] + public async void FilterShouldReturnOnlyOneChunkWithDeduplication() + { + SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + if (!AvailableFeaturesSingleton.Instance.PublishFilter) + { + throw new SkipException("broker does not support filter"); + } + + var deduplicatingProducer = await DeduplicatingProducer.Create( + new DeduplicatingProducerConfig(system, stream, "my_ref") + { + Filter = new ProducerFilter() + { + // define the producer filter + FilterValue = message => message.ApplicationProperties["state"].ToString(), + } + } + ); + + const int ToSend = 50; + + async Task SendTo(string state, ulong start) + { + for (var i = (0 + start); i < ToSend + start; i++) + { + var message = new Message(Encoding.UTF8.GetBytes($"Message: {i}. State: {state}")) + { + ApplicationProperties = new ApplicationProperties() { ["state"] = state }, + Properties = new Properties() { GroupId = $"group_{i}" } + }; + await deduplicatingProducer.Send(i, message).ConfigureAwait(false); + } + } + + await SendTo("Alabama", 0); + await Task.Delay(TimeSpan.FromSeconds(2)).ConfigureAwait(false); + await SendTo("New York", ToSend); + await Task.Delay(TimeSpan.FromSeconds(2)).ConfigureAwait(false); + + var testPassedAlabama = new TaskCompletionSource(); + var consumedAlabama = new List(); + var consumerAlabama = await Consumer.Create(new ConsumerConfig(system, stream) + { + OffsetSpec = new OffsetTypeFirst(), + + // This is mandatory for enabling the filter + Filter = new ConsumerFilter() + { + Values = new List() { "Alabama" }, + PostFilter = + _ => + true, // we don't apply any post filter here to be sure that the server is doing the filtering + MatchUnfiltered = true + }, + MessageHandler = (_, _, _, message) => + { + consumedAlabama.Add(message); + if (consumedAlabama.Count == ToSend) + { + testPassedAlabama.SetResult(ToSend); + } + + return Task.CompletedTask; + } + }).ConfigureAwait(false); + Assert.True(testPassedAlabama.Task.Wait(TimeSpan.FromSeconds(5))); + + Assert.Equal(ToSend, consumedAlabama.Count); + + // check that only the messages from Alabama were + consumedAlabama.Where(m => m.ApplicationProperties["state"].Equals("Alabama")).ToList().ForEach(m => + { + Assert.Equal("Alabama", m.ApplicationProperties["state"]); + }); + + await consumerAlabama.Close().ConfigureAwait(false); + // let's reset + var consumedNY = new List(); + + var consumerNY = await Consumer.Create(new ConsumerConfig(system, stream) + { + OffsetSpec = new OffsetTypeFirst(), + + // This is mandatory for enabling the filter + Filter = new ConsumerFilter() + { + Values = new List() { "New York" }, + PostFilter = + message => message.Properties.GroupId.ToString()! + .Equals("group_55"), // we only want the message with group_55 ignoring the rest + // this filter is client side. We should have two messages with group_55 + // One for the standard send and one for the batch send + MatchUnfiltered = true + }, + MessageHandler = (_, _, _, message) => + { + consumedNY.Add(message); + return Task.CompletedTask; + } + }).ConfigureAwait(false); + + SystemUtils.Wait(TimeSpan.FromSeconds(2)); + Assert.Single(consumedNY); + Assert.Equal("group_55", consumedNY[0].Properties.GroupId!); + await consumerNY.Close().ConfigureAwait(false); + await SystemUtils.CleanUpStreamSystem(system, stream).ConfigureAwait(false); + } + // This test is to test when there are errors on the filter functions // producer side and consumer side. // FilterValue and PostFilter are user's functions and can throw exceptions @@ -249,7 +358,7 @@ await producer.Send(new Message(Encoding.UTF8.GetBytes("Message: " + i)) OffsetSpec = new OffsetTypeFirst(), Filter = new ConsumerFilter() { - Values = new List() { "my_filter" },// at this level we don't care about the filter value + Values = new List() { "my_filter" }, // at this level we don't care about the filter value PostFilter = message => {