Skip to content

Commit

Permalink
Kafka commit issues GDATASoftwareAG#773
Browse files Browse the repository at this point in the history
* Implementation
* Tests
  • Loading branch information
Philip Stadermann committed Mar 8, 2023
1 parent 87c93ee commit 1188b95
Show file tree
Hide file tree
Showing 2 changed files with 336 additions and 96 deletions.
215 changes: 169 additions & 46 deletions src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using CloudNative.CloudEvents;
using Confluent.Kafka;
Expand All @@ -18,6 +19,9 @@

namespace Motor.Extensions.Hosting.Kafka;

public record ConsumeResultAndProcessedMessageStatus(ConsumeResult<string?, byte[]> ConsumeResult,
ProcessedMessageStatus ProcessedMessageStatus);

public sealed class KafkaMessageConsumer<TData> : IMessageConsumer<TData>, IDisposable where TData : notnull
{
private readonly IApplicationNameService _applicationNameService;
Expand All @@ -28,8 +32,7 @@ public sealed class KafkaMessageConsumer<TData> : IMessageConsumer<TData>, IDisp
private readonly ILogger<KafkaMessageConsumer<TData>> _logger;
private readonly IHostApplicationLifetime _applicationLifetime;
private IConsumer<string?, byte[]>? _consumer;
private readonly SemaphoreSlim _messageSemaphore;


public KafkaMessageConsumer(ILogger<KafkaMessageConsumer<TData>> logger,
IOptions<KafkaConsumerOptions<TData>> config,
IHostApplicationLifetime applicationLifetime,
Expand All @@ -39,14 +42,16 @@ public KafkaMessageConsumer(ILogger<KafkaMessageConsumer<TData>> logger,
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_applicationLifetime = applicationLifetime;
_applicationNameService = applicationNameService ?? throw new ArgumentNullException(nameof(config));
_applicationNameService = applicationNameService ?? throw new ArgumentNullException(nameof(applicationNameService));
_cloudEventFormatter = cloudEventFormatter;
_options = config.Value ?? throw new ArgumentNullException(nameof(config));
_consumerLagSummary = metricsFactory?.CreateSummary("consumer_lag_distribution",
"Contains a summary of current consumer lag of each partition", new[] { "topic", "partition" });
_consumerLagGauge = metricsFactory?.CreateGauge("consumer_lag",
"Contains current number consumer lag of each partition", false, "topic", "partition");
_messageSemaphore = new SemaphoreSlim(config.Value.MaxConcurrentMessages);

_processedMessages = Channel.CreateBounded<Task<ConsumeResultAndProcessedMessageStatus>>(_options.MaxConcurrentMessages);
_timer = new Timer(HandleCommitTimer);
}

public Func<MotorCloudEvent<byte[]>, CancellationToken, Task<ProcessedMessageStatus>>? ConsumeCallbackAsync
Expand Down Expand Up @@ -75,31 +80,43 @@ public async Task ExecuteAsync(CancellationToken token = default)
{
await Task.Run(async () =>
{
while (!token.IsCancellationRequested)
var committer = ExecuteCommitLoopAsync(token);
try
{
await _messageSemaphore.WaitAsync(token);
try
while (!token.IsCancellationRequested)
{
var msg = _consumer?.Consume(token);
if (msg is { IsPartitionEOF: false })
try
{
SingleMessageHandlingAsync(msg, token);
if (!await _processedMessages.Writer.WaitToWriteAsync(token))
{
break;
}
var msg = _consumer?.Consume(token);
if (msg is { IsPartitionEOF: false })
{
await _processedMessages.Writer.WriteAsync(SingleMessageHandlingAsync(msg, token), token);
}
else
{
_logger.LogDebug(LogEvents.NoMessageReceived, "No messages received");
}
}
else
catch (Exception e) when (e is not OperationCanceledException or ChannelClosedException)
{
_logger.LogDebug(LogEvents.NoMessageReceived, "No messages received");
_logger.LogError(LogEvents.MessageReceivedFailure, e, "Failed to receive message.");
}
}
catch (OperationCanceledException)
{
_logger.LogInformation(LogEvents.TerminatingKafkaListener, "Terminating Kafka listener...");
break;
}
catch (Exception e)
{
_logger.LogError(LogEvents.MessageReceivedFailure, e, "Failed to receive message.");
}
await committer;
}
catch (Exception e) when (e is OperationCanceledException or ChannelClosedException)
{
}
Commit();
_logger.LogInformation(LogEvents.TerminatingKafkaListener, "Terminating Kafka listener...");
}, token).ConfigureAwait(false);
}

Expand Down Expand Up @@ -167,7 +184,7 @@ private void WriteStatistics(string json)
}
}

private async Task SingleMessageHandlingAsync(ConsumeResult<string?, byte[]> msg, CancellationToken token)
private async Task<ConsumeResultAndProcessedMessageStatus> SingleMessageHandlingAsync(ConsumeResult<string?, byte[]> msg, CancellationToken token)
{
try
{
Expand All @@ -180,61 +197,167 @@ private async Task SingleMessageHandlingAsync(ConsumeResult<string?, byte[]> msg
.HandleResult<ProcessedMessageStatus>(status => status == ProcessedMessageStatus.TemporaryFailure)
.WaitAndRetryAsync(_options.RetriesOnTemporaryFailure,
retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)));
var status = await retryPolicy.ExecuteAsync(() => ConsumeCallbackAsync!.Invoke(cloudEvent, token));
HandleMessageStatus(msg, status);
var status = await retryPolicy.ExecuteAsync(
(cancellationToken) => ConsumeCallbackAsync!.Invoke(cloudEvent, cancellationToken), token);
return new ConsumeResultAndProcessedMessageStatus(msg, status);
}
catch (Exception e)
{
_logger.LogCritical(LogEvents.MessageHandlingUnexpectedException, e,
"Unexpected exception in message handling");
_applicationLifetime.StopApplication();
}

return new ConsumeResultAndProcessedMessageStatus(msg, ProcessedMessageStatus.CriticalFailure);
}

#region Commit

private readonly Channel<Task<ConsumeResultAndProcessedMessageStatus>> _processedMessages;
private readonly Timer _timer;
private readonly object _commitLock = new();
private ConsumeResultAndProcessedMessageStatus? _lastConsumeResultAndProcessedMessageStatus;

private async Task ExecuteCommitLoopAsync(CancellationToken cancellationToken)
{
RestartCommitTimer();

while (!cancellationToken.IsCancellationRequested)
{
try
{
var result = await PeekAndAwaitProcessedMessages(cancellationToken);

if (IsIrrecoverableFailure(result.ProcessedMessageStatus))
{
_applicationLifetime.StopApplication();
break;
}

// Remove message from channel, when Task is successfully completed
await _processedMessages.Reader.ReadAsync(cancellationToken);

lock (_commitLock)
{
_lastConsumeResultAndProcessedMessageStatus = result;
}

if (result.ConsumeResult.Offset.Value % _options.CommitPeriod == 0)
{
Commit();
RestartCommitTimer();
}
}
catch (Exception e) when (e is OperationCanceledException or ChannelClosedException)
{
break;
}
}

StopCommitTimer();
}

private async Task<ConsumeResultAndProcessedMessageStatus> PeekAndAwaitProcessedMessages(CancellationToken cancellationToken)
{
await _processedMessages.Reader.WaitToReadAsync(cancellationToken);

if (!_processedMessages.Reader.TryPeek(out var consumeAndProcessTask))
{
throw new InvalidOperationException("Awaited channel data has been removed by another consumer");
}

return await consumeAndProcessTask;
}

private void Commit()
{
lock (_commitLock)
{
if (_lastConsumeResultAndProcessedMessageStatus == null)
{
return;
}

try
{
_consumer?.Commit(_lastConsumeResultAndProcessedMessageStatus.ConsumeResult);
_lastConsumeResultAndProcessedMessageStatus = null;
}
catch (KafkaException e)
{
_logger.LogError(LogEvents.CommitError, e, "Commit error: {Reason}", e.Error.Reason);
}
}
}

private void RestartCommitTimer()
{
var autoCommitIntervalMs = _options.AutoCommitIntervalMs;
if (autoCommitIntervalMs != null)
{
_timer.Change(autoCommitIntervalMs.Value, Timeout.Infinite);
}
}

private void HandleMessageStatus(ConsumeResult<string?, byte[]> msg, ProcessedMessageStatus? status)
private void StopCommitTimer()
{
_timer.Change(Timeout.Infinite, Timeout.Infinite);
}


private void HandleCommitTimer(object? state)
{
Commit();
RestartCommitTimer();
}

private bool IsIrrecoverableFailure(ProcessedMessageStatus status)
{
switch (status)
{
case ProcessedMessageStatus.Success:
case ProcessedMessageStatus.InvalidInput:
case ProcessedMessageStatus.Failure:
if (msg.Offset.Value % _options.CommitPeriod == 0)
{
try
{
_consumer?.Commit(msg);
}
catch (KafkaException e)
{
_logger.LogError(LogEvents.CommitError, e, "Commit error: {Reason}", e.Error.Reason);
}
}
_messageSemaphore.Release();
break;
return false;
case ProcessedMessageStatus.TemporaryFailure:
_logger.LogWarning(LogEvents.FailureDespiteRetrying,
_logger.LogCritical(LogEvents.FailureDespiteRetrying,
"Message consume fails despite retrying");
_applicationLifetime.StopApplication();
break;
return true;
case ProcessedMessageStatus.CriticalFailure:
_logger.LogWarning(LogEvents.CriticalFailureOnConsume,
_logger.LogCritical(LogEvents.CriticalFailureOnConsume,
"Message consume fails with critical failure");
_applicationLifetime.StopApplication();
break;
return true;
default:
throw new ArgumentOutOfRangeException(nameof(status), status, "Unhandled ProcessedMessageStatus");
_logger.LogCritical("Unknown process status {status}", status);
return true;
}
}


#endregion

public MotorCloudEvent<byte[]> KafkaMessageToCloudEvent(Message<string?, byte[]> msg)
{
return msg.ToMotorCloudEvent(_applicationNameService, _cloudEventFormatter);
}

/// <remarks>
/// For testing.
/// </remarks>
public IEnumerable<TopicPartitionOffset> Committed()
{
if (_consumer == null)
{
throw new InvalidOperationException("Consumer is not initialized");
}

return _consumer.Committed(TimeSpan.FromSeconds(10));
}

private void Dispose(bool disposing)
{
if (disposing)
{
_timer.Dispose();
_consumer?.Dispose();
}
}
Expand Down
Loading

0 comments on commit 1188b95

Please sign in to comment.