From 254c4261bf848fad79cd6061da53f6fe428b67cf Mon Sep 17 00:00:00 2001 From: Ian Cooper Date: Thu, 11 May 2023 12:35:34 -0400 Subject: [PATCH] [Bug] Add finally block to Kafka Offset Commit (#2662) * Allow a consumer to set the rebalancing strategy * Add a try--finally to semaphore release --------- Co-authored-by: Paul Reardon --- .../KafkaMessageConsumer.cs | 43 +++++++++++-------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs index 53c4ccb1b9..ef19d91dad 100644 --- a/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs +++ b/src/Paramore.Brighter.MessagingGateway.Kafka/KafkaMessageConsumer.cs @@ -436,28 +436,37 @@ private void CommitOffsets() private void CommitAllOffsets(DateTime flushTime) { - var listOffsets = new List(); - var currentOffsetsInBag = _offsetStorage.Count; - for (int i = 0; i < currentOffsetsInBag; i++) + try { - bool hasOffsets = _offsetStorage.TryTake(out var offset); - if (hasOffsets) - listOffsets.Add(offset); - else - break; + + var listOffsets = new List(); + var currentOffsetsInBag = _offsetStorage.Count; + for (int i = 0; i < currentOffsetsInBag; i++) + { + bool hasOffsets = _offsetStorage.TryTake(out var offset); + if (hasOffsets) + listOffsets.Add(offset); + else + break; + + } + + if (s_logger.IsEnabled(LogLevel.Information)) + { + var offsets = listOffsets.Select(tpo => + $"Topic: {tpo.Topic} Partition: {tpo.Partition.Value} Offset: {tpo.Offset.Value}"); + var offsetAsString = string.Join(Environment.NewLine, offsets); + s_logger.LogInformation("Sweeping offsets: {0} {Offset}", Environment.NewLine, offsetAsString); + } + + _consumer.Commit(listOffsets); + _lastFlushAt = flushTime; } - - if (s_logger.IsEnabled(LogLevel.Information)) + finally { - var offsets = listOffsets.Select(tpo => $"Topic: {tpo.Topic} Partition: {tpo.Partition.Value} Offset: {tpo.Offset.Value}"); - var offsetAsString = string.Join(Environment.NewLine, offsets); - s_logger.LogInformation("Sweeping offsets: {0} {Offset}", Environment.NewLine, offsetAsString); + _flushToken.Release(1); } - - _consumer.Commit(listOffsets); - _lastFlushAt = flushTime; - _flushToken.Release(1); } // The batch size has been exceeded, so flush our offsets