Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding System.IO.Pipelines to replace Channels and Streams (replaces #949) #1199

Closed
wants to merge 4 commits into from

Conversation

stebet
Copy link
Contributor

@stebet stebet commented Apr 20, 2022

Proposed Changes

This PR adds System.IO.Pipelines to replace using Channels and Streams to receive and send data to/from the client.

  • Removes Channels/Streams and replaces them with PipeReader/PipeWriter, which are now handled by the Connection instead of SocketFrameHandler
    • Simplifies the SocketFrameHandler a lot! Gets rid of extra Read/Write tasks which are now managed by the Pipelines.
    • Does require sync-over-async ugliness to Flush the Pipe when publishes manage to fill the Pipe buffer faster than data gets sent. That is however rare under normal circumstances.
  • Prepares the client to be able to support fully asynchronous operations more easily.
    • This does create a little bit of CPU/Allocation overhead for the async operations under mass publish scenarios (benchmarks incoming), but we would always have run into those issues when making the operations Async anyway or when fixing the potential OOM issues like in Heap size grows when publishing a very large batch of messages #1106 due to having to create "fake" tasks to block while waiting for the buffers to drop below the threshold levels. This overhead should however be negligible under normal scenarios and should actually be a lot better off under fully async scenarios since we are less likely to block threads and create ThreadPool starvation scenarios.
  • Should solve several problems similar to Heap size grows when publishing a very large batch of messages #1106, which stem from the current client being able publish messages faster than the network connection/RabbitMQ server can handle them for async scenarios.
  • Lays groundwork to fix Proposal: Full async channel interface (IModel) #970, 8.0 Proposal (internal): Use IValueTaskSource for RPC continuations awaits #966 and AMQP 0-9-1 Channel (IModel) API with async methods #843

Types of Changes

What types of changes does your code introduce to this project?
Put an x in the boxes that apply

  • Bug fix (non-breaking change which fixes issue Heap size grows when publishing a very large batch of messages #1106)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause an observable behavior change in existing systems)
  • Documentation improvements (corrections, new content, etc)
  • Cosmetic change (whitespace, formatting, etc)

Checklist

  • I have read the CONTRIBUTING.md document
  • I have signed the CA (see https://cla.pivotal.io/sign/rabbitmq)
  • All tests pass locally with my changes
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if appropriate)
  • Any dependent changes have been merged and published in related repositories

Further Comments

Reasoning

I see Pipelines being a perfect fit here. They make processing buffers a lot easier, reduce overhead when deserializing data and make it a lot easier to implement fully async operations. That's also why they are used for the same purpose in Kestrel (the ASP.NET Core server) Kestrel and StackExchange.Redis.

I also created as-of-yet unused WriteAsync methods for that purpose, which would make it easy to implement fully Async publish methods for example.

Benchmark results (to be added)

@stebet stebet marked this pull request as draft April 20, 2022 15:03
@stebet stebet changed the title Adding System.IO.Pipelines to replace Channels (replaces #949) Adding System.IO.Pipelines to replace Channels and Streams (replaces #949) Apr 20, 2022
@stebet
Copy link
Contributor Author

stebet commented Apr 22, 2022

Still ironing out a few things that only seem to manifest in the CI tests (tests all run fine locally), but think they have to do with the fact that synchronous publish now potentially blocks where before it had an unlimited buffer with the UnboundedChannel to write into, so WaitForConfirm tests behave slightly differently with Pipelines.

Copy link
Collaborator

@danielmarbach danielmarbach left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry only had like five minutes today. Hopefully I can spend some more time another day

{
_writeSemaphore.Wait();
WriteSpan(payload);
_frameHandler.FrameWriter.FlushAsync().AsTask().Wait(); // Sync-over-async, not great, not terrible. All we can do for the sync path.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not yet used the pipeline stuff very intensively. I was wondering if we could use AsStream and then use the synchronous stream API here and in the other sync parts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That actually makes some sense. I'll look into that.

@@ -149,7 +199,7 @@ private void HardProtocolExceptionHandler(HardProtocolException hpe)
{
var cmd = new ConnectionClose(hpe.ShutdownReason.ReplyCode, hpe.ShutdownReason.ReplyText, 0, 0);
_session0.Transmit(ref cmd);
ClosingLoop();
await ClosingLoop();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConfigureAwait


WriteSpan(payload.Span);
await _frameHandler.FrameWriter.FlushAsync().ConfigureAwait(false);
_writeSemaphore.Release();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you deliberately not doing this in a finally block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. I'll fix that

@stebet
Copy link
Contributor Author

stebet commented Apr 29, 2022

Yeah I'm working on cleaning it up a bit. Found some test issues and I'm working on a PR to clean those up first to reduce the noise.

@michaelklishin
Copy link
Member

Sorry to drag you into resolving some non-trivial conflicts @stebet. We decided to go ahead
and merge #1202 as it felt like the
right thing to do since we require net6.0 now for 7.0. This obviously affects all pull requests in flight.

This was referenced May 13, 2022
@michaelklishin
Copy link
Member

I reverted #1202 because it makes back and forward-porting impossible unless all maintained branches adopt file-scoped namespaces.

@stebet
Copy link
Contributor Author

stebet commented May 24, 2022

No probs. I'm a bit busy with work and RL these days. Hoping to submit another PR first that cleans up the unit tests a lot and hopefully makes the more reliable and then clean this PR up after that.

@lukebakken
Copy link
Contributor

@stebet I've actually got some time to dedicate to releasing version 7.0 of this library. I'll get the most recent conflicts fixed in this branch, as well as address the most recent review comments. We could discuss the remainder of work here and I could take over. Let me know what you think.

@lukebakken
Copy link
Contributor

Hi @stebet, I'm wondering if you have time to read my comment and respond. @bollhals asked about the status of version 7.0 and one major task is completing this PR.

As I said, I'm happy to take over work as long as your piplines branch is up-to-date in your fork. Thanks!

@stebet
Copy link
Contributor Author

stebet commented Oct 11, 2022

I need to take a little bit closer look at this since I've been out of the loop a bit. I'll take a look in the next days and give you an update.

@lukebakken
Copy link
Contributor

Superseded by #1264

@lukebakken lukebakken closed this Oct 14, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Proposal: Full async channel interface (IModel)
4 participants