Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reliable Producer #104

Merged
merged 24 commits into from
Apr 28, 2022
Merged

Reliable Producer #104

merged 24 commits into from
Apr 28, 2022

Conversation

Gsantomaggio
Copy link
Member

@Gsantomaggio Gsantomaggio commented Apr 11, 2022

Reliable Producer

part of #92

RProducer:

  • is able to reconnect and invalidate old messages.
  • trace the sent and confirmed messages in an internal waitForConfirmation list (as the go/java clients do) ex:
  • invalidate the messages without confirmation ( in 2 seconds )
 var p = await ReliableProducer.CreateRProducer(new ReliableProducerConfig()
            {
                StreamSystem = system,
                Stream = stream,
                ConfirmationHandler = confirmation =>
                {
                    if (confirmation.Status == ConfirmationStatus.Confirmed)
                    {
 
                        countConf++;
                    }
                    else
                    {
                        countError++;
                    }
           

Reconnection

By default it uses an BackOffReconnectStrategy to reconnect the client, but the user can implement another behaviour just implement the IReconnectStrategy interface.

var p = await ReliableProducer.CreateReliableProducer(new ReliableProducerConfig()
            {
                ReconnectStrategy = myReconnectStrategy            

How to test:

public async Task TestRProducer()
    {
        var config = new StreamSystemConfig
        {
        
        };
        const string stream = "mystream-r";
        var system = await StreamSystem.Create(config);
        try
        {
            await system.DeleteStream(stream);
        }
        catch (Exception e)
        {
            // ignored 
        }

        await system.CreateStream(new StreamSpec(stream)
        {
            MaxLengthBytes = 2073741824
        });
        var countConf = 0;
        var countError = 0;
        var gc = 0;
        const int totalMessages = 10_000_000;
        var run = Task.Run(async () =>
        {
            var p = await ReliableProducer.CreateReliableProducer(new ReliableProducerConfig()
            {
                StreamSystem = system,
                Stream = stream,
                Reference = "myref",
                ConfirmationHandler = confirmation =>
                {
                    
                    gc++;
                   if (confirmation.Status == ConfirmationStatus.Confirmed)
                    {
 
                        countConf++;
                    }
                    else
                    {
                        countError++;
                    }

                    if ( gc % 50_000 == 0)
                    {
                        Console.WriteLine(
                            $"-Partial confirmed: {countConf} - TimeOutError: {countError} - total: {countError + countConf}");
                    }
                    
                    if ( gc  == totalMessages)
                    {
                        Console.WriteLine(
                            $"-END confirmed: {countConf} - TimeOutError: {countError} - total: {countError + countConf}");
                    }

                    return Task.CompletedTask;
                }
            });
            var start = DateTime.Now;
            for (var i = 0; i < totalMessages; i++)
            {
                if ((i +1) % 100_000 == 0)
                {
                    Console.WriteLine($"sent {i + 1}");
                }

                try
                {
                    await p.Send(new Message(Encoding.UTF8.GetBytes($"hello {i}")));
                }
                catch (Exception e)
                {
                    Console.WriteLine($"Error {e}");
                }
            }

            Console.WriteLine($"End...Done {DateTime.Now - start}");
            Thread.Sleep(5000);
            // await p.Close();
        });


        for (var i = 0; i < 50; i++)
        {
            Thread.Sleep(5000);
            try
            {
                Console.WriteLine($"Killing all the connections...");
                var r = HttpKillConnections();
                r.Wait();
            }
            catch (Exception e)
            {
                Console.WriteLine($"fail to kill {e}");
            }
        }

        run.Wait();
        Thread.Sleep(50000);
        Console.WriteLine("Done");
    }

Handle Metadata update:

MetadataUpdate is raised when the stream is deleted or changes the topology.

The ReliableProducer is able to:

  • Detect the event
  • reconnect the client in case the stream still exists
  • close the producer and the TCP connection if the stream does/t exist anymore.

See here

How to test:

See the HandleDeleteStreamWithMetaDataUpdate

it is enough to remove a stream when the producer is running.

See also the HandleChangeStreamConfigurationWithMetaDataUpdate

Auto Generate Sequence

ReliableProducer Automatically retrieves the last stored PublishingID and It continues from there.

Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
@Gsantomaggio Gsantomaggio marked this pull request as draft April 11, 2022 14:00
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
@codecov-commenter
Copy link

codecov-commenter commented Apr 14, 2022

Codecov Report

Merging #104 (31b930d) into main (705daa7) will increase coverage by 0.60%.
The diff coverage is 94.63%.

@@            Coverage Diff             @@
##             main     #104      +/-   ##
==========================================
+ Coverage   91.25%   91.85%   +0.60%     
==========================================
  Files          69       72       +3     
  Lines        4743     5252     +509     
  Branches      276      315      +39     
==========================================
+ Hits         4328     4824     +496     
- Misses        353      360       +7     
- Partials       62       68       +6     
Impacted Files Coverage Δ
RabbitMQ.Stream.Client/Connection.cs 93.51% <ø> (ø)
RabbitMQ.Stream.Client/Producer.cs 75.00% <70.58%> (-0.65%) ⬇️
...abbitMQ.Stream.Client/Reliable/ReliableProducer.cs 87.56% <87.56%> (ø)
RabbitMQ.Stream.Client/Client.cs 91.14% <100.00%> (+1.22%) ⬆️
...abbitMQ.Stream.Client/Reliable/ConfirmationPipe.cs 100.00% <100.00%> (ø)
RabbitMQ.Stream.Client/StreamSystem.cs 93.24% <100.00%> (+1.90%) ⬆️
Tests/ReliableTests.cs 100.00% <100.00%> (ø)
Tests/SystemTests.cs 100.00% <100.00%> (ø)
Tests/Utils.cs 91.50% <100.00%> (+0.60%) ⬆️
... and 4 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 705daa7...31b930d. Read the comment docs.

Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
@Gsantomaggio Gsantomaggio marked this pull request as ready for review April 19, 2022 15:53
RProducer will get the last SequenceID by the reference name. If it can't get it
Default value is 0

Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
@Gsantomaggio
Copy link
Member Author

Hi @simone-fariselli,
I know you are interested in it. If you have a chance try it and let me know :)! Thanks!

change the test.
Add the isInReconnection to avoid some race codition

Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Copy link
Contributor

@lukebakken lukebakken left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just making some comments. More tomorrow!

RabbitMQ.Stream.Client/Producer.cs Show resolved Hide resolved
RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs Outdated Show resolved Hide resolved
RabbitMQ.Stream.Client/Reliable/RProducer.cs Outdated Show resolved Hide resolved
RabbitMQ.Stream.Client/Reliable/RProducer.cs Outdated Show resolved Hide resolved
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
@simone-fariselli
Copy link
Contributor

Hi @Gsantomaggio,
I tried it and it properly reconnects and gives timeout errors on not confirmed messages. ;)
Thanks!

lukebakken
lukebakken previously approved these changes Apr 21, 2022
@lukebakken
Copy link
Contributor

Thank you @simone-fariselli we appreciate you testing this PR.

for TPL.

Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
@lukebakken lukebakken added this to the v1.0.0-beta.6 milestone Apr 27, 2022
@Gsantomaggio Gsantomaggio merged commit 33e832c into main Apr 28, 2022
@Gsantomaggio Gsantomaggio deleted the rproducer branch April 28, 2022 14:57
@lukebakken lukebakken modified the milestones: v1.0.0-beta.6, v1.0.0-rc.1 May 17, 2022
fortinjose916 added a commit to fortinjose916/stream-dotnet-client that referenced this pull request Nov 11, 2022
- Implement Reliable  Producer
- See rabbitmq/rabbitmq-stream-dotnet-client#104 

Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>

Co-authored-by: Luke Bakken <luke@bakken.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants