Skip to content

Commit

Permalink
Documentation and formatting
Browse files Browse the repository at this point in the history
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
  • Loading branch information
Gsantomaggio authored and lukebakken committed Mar 13, 2023
1 parent dec0031 commit f5613f8
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 23 deletions.
27 changes: 11 additions & 16 deletions RabbitMQ.Stream.Client/RawConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -185,26 +185,18 @@ Message MessageFromSequence(ref ReadOnlySequence<byte> unCompressedData, ref int
[MethodImpl(MethodImplOptions.AggressiveInlining)]
async Task DispatchMessage(Message message, ulong i)
{

/* Unmerged change from project 'RabbitMQ.Stream.Client(net7.0)'
Before:
numRecords -= subEntryChunk.NumRecordsInBatch;
After:
numRecords -= subEntryChunk.NumRecordsInBatch;
*/
try
{
message.MessageOffset = chunk.ChunkId + i;
if (MaybeDispatch(message.MessageOffset))
{
if (Token.IsCancellationRequested)
if (!Token.IsCancellationRequested)
{
return;
await _config.MessageHandler(this,
new MessageContext(message.MessageOffset,
TimeSpan.FromMilliseconds(chunk.Timestamp)),
message).ConfigureAwait(false);
}

await _config.MessageHandler(this,
new MessageContext(message.MessageOffset, TimeSpan.FromMilliseconds(chunk.Timestamp)),
message).ConfigureAwait(false);
}
}

Expand All @@ -223,8 +215,8 @@ await _config.MessageHandler(this,
var chunkBuffer = new ReadOnlySequence<byte>(chunk.Data);

var numRecords = chunk.NumRecords;
var offset = 0;
ulong messageOffset = 0; // it is used to calculate the message offset.
var offset = 0; // it is used to calculate the offset in the chunk.
ulong messageOffset = 0; // it is used to calculate the message offset. It is the chunkId + messageOffset
while (numRecords != 0)
{
// (entryType & 0x80) == 0 is standard entry
Expand All @@ -236,7 +228,7 @@ await _config.MessageHandler(this,
var isSubEntryBatch = (entryType & 0x80) != 0;
if (isSubEntryBatch)
{
// it means that it is a subentry batch
// it means that it is a sub-entry batch
// We continue to read from the stream to decode the subEntryChunk values
slice = chunkBuffer.Slice(offset);

Expand Down Expand Up @@ -279,6 +271,8 @@ private void ProcessChunks()
{
while (_chunksBuffer.Reader.TryRead(out var chunk))
{
// We send the credit to the server to allow the server to send more messages
// we request the credit before process the check to keep the network busy
await _client.Credit(_subscriberId, 1).ConfigureAwait(false);
await ParseChunk(chunk).ConfigureAwait(false);
}
Expand Down Expand Up @@ -338,6 +332,7 @@ private async Task Init()
// Send the chunk to the _chunksBuffer
// in this way the chunks are processed in a separate thread
// this wont' block the socket thread
// introduced https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/250
await _chunksBuffer.Writer.WriteAsync(deliver.Chunk, Token).ConfigureAwait(false);
}, async b =>
{
Expand Down
1 change: 1 addition & 0 deletions RabbitMQ.Stream.Client/Reliable/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public record ConsumerConfig : ReliableConfig

/// <summary>
/// Callback function where the consumer receives the messages.
/// The callback runs in a different Task respect to the socket thread.
/// Parameters that will be received by this function:
/// <list type="bullet">
/// <item>
Expand Down
10 changes: 3 additions & 7 deletions docs/asciidoc/api.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -667,13 +667,9 @@ include::{test-examples}/ConsumerUsage.cs[tag=consumer-creation]

The broker start sending messages as soon as the `Consumer` instance is created.

[WARNING]
.Keep the message processing callback as short as possible
====
The message processing callback should be kept as short as possible to avoid blocking the connection thread.
Not doing so can make the `StreamSystem`, `Producer`, `Consumer` instances sluggish or even block them.
Any long processing should be done in a separate thread (e.g. with an asynchronous `Task.Run(...)`).
====

Staring from the 1.3.0 version, the `Consumer#MessageHandler` API runs in a separated `Task` and it is possible to use `async`/`await` in the handler.


The following table sums up the main settings to create a `Consumer` with `ConsumerConfig`:

Expand Down

0 comments on commit f5613f8

Please sign in to comment.