From f5613f89c54e421db1d33aaefa33afb34e9bf825 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Sat, 11 Mar 2023 22:32:32 +0100 Subject: [PATCH] Documentation and formatting Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/RawConsumer.cs | 27 +++++++++------------ RabbitMQ.Stream.Client/Reliable/Consumer.cs | 1 + docs/asciidoc/api.adoc | 10 +++----- 3 files changed, 15 insertions(+), 23 deletions(-) diff --git a/RabbitMQ.Stream.Client/RawConsumer.cs b/RabbitMQ.Stream.Client/RawConsumer.cs index afcb54e8..4fa3ce12 100644 --- a/RabbitMQ.Stream.Client/RawConsumer.cs +++ b/RabbitMQ.Stream.Client/RawConsumer.cs @@ -185,26 +185,18 @@ Message MessageFromSequence(ref ReadOnlySequence unCompressedData, ref int [MethodImpl(MethodImplOptions.AggressiveInlining)] async Task DispatchMessage(Message message, ulong i) { - - /* Unmerged change from project 'RabbitMQ.Stream.Client(net7.0)' - Before: - numRecords -= subEntryChunk.NumRecordsInBatch; - After: - numRecords -= subEntryChunk.NumRecordsInBatch; - */ try { message.MessageOffset = chunk.ChunkId + i; if (MaybeDispatch(message.MessageOffset)) { - if (Token.IsCancellationRequested) + if (!Token.IsCancellationRequested) { - return; + await _config.MessageHandler(this, + new MessageContext(message.MessageOffset, + TimeSpan.FromMilliseconds(chunk.Timestamp)), + message).ConfigureAwait(false); } - - await _config.MessageHandler(this, - new MessageContext(message.MessageOffset, TimeSpan.FromMilliseconds(chunk.Timestamp)), - message).ConfigureAwait(false); } } @@ -223,8 +215,8 @@ await _config.MessageHandler(this, var chunkBuffer = new ReadOnlySequence(chunk.Data); var numRecords = chunk.NumRecords; - var offset = 0; - ulong messageOffset = 0; // it is used to calculate the message offset. + var offset = 0; // it is used to calculate the offset in the chunk. + ulong messageOffset = 0; // it is used to calculate the message offset. It is the chunkId + messageOffset while (numRecords != 0) { // (entryType & 0x80) == 0 is standard entry @@ -236,7 +228,7 @@ await _config.MessageHandler(this, var isSubEntryBatch = (entryType & 0x80) != 0; if (isSubEntryBatch) { - // it means that it is a subentry batch + // it means that it is a sub-entry batch // We continue to read from the stream to decode the subEntryChunk values slice = chunkBuffer.Slice(offset); @@ -279,6 +271,8 @@ private void ProcessChunks() { while (_chunksBuffer.Reader.TryRead(out var chunk)) { + // We send the credit to the server to allow the server to send more messages + // we request the credit before process the check to keep the network busy await _client.Credit(_subscriberId, 1).ConfigureAwait(false); await ParseChunk(chunk).ConfigureAwait(false); } @@ -338,6 +332,7 @@ private async Task Init() // Send the chunk to the _chunksBuffer // in this way the chunks are processed in a separate thread // this wont' block the socket thread + // introduced https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/250 await _chunksBuffer.Writer.WriteAsync(deliver.Chunk, Token).ConfigureAwait(false); }, async b => { diff --git a/RabbitMQ.Stream.Client/Reliable/Consumer.cs b/RabbitMQ.Stream.Client/Reliable/Consumer.cs index 2d137c6d..a43ec432 100644 --- a/RabbitMQ.Stream.Client/Reliable/Consumer.cs +++ b/RabbitMQ.Stream.Client/Reliable/Consumer.cs @@ -27,6 +27,7 @@ public record ConsumerConfig : ReliableConfig /// /// Callback function where the consumer receives the messages. + /// The callback runs in a different Task respect to the socket thread. /// Parameters that will be received by this function: /// /// diff --git a/docs/asciidoc/api.adoc b/docs/asciidoc/api.adoc index 8cb83b56..90e0ed72 100644 --- a/docs/asciidoc/api.adoc +++ b/docs/asciidoc/api.adoc @@ -667,13 +667,9 @@ include::{test-examples}/ConsumerUsage.cs[tag=consumer-creation] The broker start sending messages as soon as the `Consumer` instance is created. -[WARNING] -.Keep the message processing callback as short as possible -==== -The message processing callback should be kept as short as possible to avoid blocking the connection thread. -Not doing so can make the `StreamSystem`, `Producer`, `Consumer` instances sluggish or even block them. -Any long processing should be done in a separate thread (e.g. with an asynchronous `Task.Run(...)`). -==== + +Staring from the 1.3.0 version, the `Consumer#MessageHandler` API runs in a separated `Task` and it is possible to use `async`/`await` in the handler. + The following table sums up the main settings to create a `Consumer` with `ConsumerConfig`: