Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cancellation token during subscribe #206

Merged
merged 5 commits into from
Jan 10, 2023
Merged

Conversation

Gsantomaggio
Copy link
Member

@Gsantomaggio Gsantomaggio commented Jan 3, 2023

Fixes #205

In this way, the thread is not locked, and the:

new MessageContext(message.MessageOffset, TimeSpan.FromMilliseconds(chunk.Timestamp)),
                            message).Wait(_token);

is in wait(...), so there is not the thread pool exhaustion.

@ricardSiliuk @ricsiLT it should not change anything. Please feel free to add any comments.

@jonnepmyra do you have a chance to test it?

Signed-off-by: Gabriele Santomaggio G.santomaggio@gmail.com

Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
@Gsantomaggio Gsantomaggio marked this pull request as draft January 3, 2023 20:50
@codecov
Copy link

codecov bot commented Jan 3, 2023

Codecov Report

Base: 92.17% // Head: 91.98% // Decreases project coverage by -0.19% ⚠️

Coverage data is based on head (1944a18) compared to base (ce2e847).
Patch coverage: 65.38% of modified lines in pull request are covered.

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #206      +/-   ##
==========================================
- Coverage   92.17%   91.98%   -0.20%     
==========================================
  Files          92       92              
  Lines        7966     7985      +19     
  Branches      640      641       +1     
==========================================
+ Hits         7343     7345       +2     
- Misses        492      507      +15     
- Partials      131      133       +2     
Impacted Files Coverage Δ
RabbitMQ.Stream.Client/RawConsumer.cs 84.16% <65.38%> (-4.46%) ⬇️
RabbitMQ.Stream.Client/WireFormatting.cs 71.18% <0.00%> (-3.39%) ⬇️
RabbitMQ.Stream.Client/Reliable/ReliableBase.cs 96.47% <0.00%> (+1.17%) ⬆️

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

☔ View full report at Codecov.
📢 Do you have feedback about the report comment? Let us know in this issue.

Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
@jonnepmyra
Copy link
Contributor

jonnepmyra commented Jan 4, 2023

Hi!

Thank your for the quick response and patch @Gsantomaggio

We've been testing it during the day with various workloads and it's definitely alot better!

At first we could still se about 0.5% of all closed connections get deadlocked with an error:
Error removing the consumer id: 0 from the server

... but after tweaking our consumer messagehandlers that contained TaskCompletionSources with TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); it works better and we're no longer able to reporduce the problem!!

The reason why I'm mentioning the TaskCompletionSource tweak in our application code is that even if it works well now, and we're super grateful for this being fixed, the code still seems to be able to deadlock under certain conditions but I might be wrong...

The Wait() in:

_config.MessageHandler(this,
new MessageContext(message.MessageOffset, TimeSpan.FromMilliseconds(chunk.Timestamp)),
message).Wait(_token);

feels like some sort of a flaw (as mentioned before in #106 (comment) ) but on the other hand, I've no better suggestion at the very moment if WaitAsync() leads to thread stravation.

Thanks again!

@Gsantomaggio
Copy link
Member Author

@jonnepmyra let me clarify some points:

Parameters of type `SequenceReader<byte>` cannot be declared in async local functions

Have questions:

  • code still seems to be able to deadlock

    • Have you tried without your consumer code to reproduce the lock?
    • What do you mean by deadlock? Do you still have a memory leak?
    • Have you tried the last commit 9d34170 ?

Thank you for trying the PR. :)

p.s. If you want, you can find me at https://rabbitmq-slack.herokuapp.com/

@jonnepmyra
Copy link
Contributor

Hi!

Thanks for taking time to explain.

  • Have you tried without your consumer code to reproduce the lock?

Yes, and I have not been able to reproduce it.

  • What do you mean by deadlock? Do you still have a memory leak?

The Memoryleak seems to be fixed now, no connections left unclosed after tweaking application specific consumer code!

  • Have you tried the last commit 9d34170 ?

Yes, it seems fine

Cheers!

@Gsantomaggio
Copy link
Member Author

Great! We will merge it soon!
Ty

@Gsantomaggio Gsantomaggio marked this pull request as ready for review January 5, 2023 13:58
@Gsantomaggio
Copy link
Member Author

Let's wait for some feedback from @ricardSiliuk @ricsiLT then, we can merge and release.

@Gsantomaggio
Copy link
Member Author

I did some memory/threads test with millions of messages:

Task.Run(() =>
            {
                while (true)
                {
                    ThreadPool.GetMaxThreads(out var max, out _);
                    ThreadPool.GetAvailableThreads(out var available, out _);
                    var running = max - available;
                    Console.WriteLine(
                        $"{DateTime.Now:HH:mm:ss.fff} - Threads: {System.Diagnostics.Process.GetCurrentProcess().Threads.Count} " +
                        $"-  {running}");
                    Thread.Sleep(1000);
                }
            });
....



  MessageHandler = async (consumer, ctx, message) =>
                {
                    if (++consumed % 10_000 == 0)
                    {
                        var timeMilliseconds = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
                        Console.WriteLine(
                            $"Client Consumed {consumed} messages in {timeMilliseconds - startTimestamp} ms stream: {Stream}");
                    }

                    await Task.Delay(1);
                }

Threads are stable:

11:02:35.849 - Threads: 23 -  4
11:02:36.849 - Threads: 23 -  4
.....
11:05:27.923 - Threads: 23 -  4
11:05:28.924 - Threads: 23 -  4

Memory is stable:

memory

@Gsantomaggio Gsantomaggio merged commit f6efcc8 into main Jan 10, 2023
@Gsantomaggio Gsantomaggio deleted the add_cancellation_token branch January 10, 2023 08:58
@Gsantomaggio Gsantomaggio added this to the 1.0.1 milestone Jan 10, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Unclosed connections (memoryleak)
3 participants