Skip to content

Commit

Permalink
Kafka commit issues GDATASoftwareAG#773
Browse files Browse the repository at this point in the history
Fix formatting
  • Loading branch information
Philip Stadermann committed Mar 8, 2023
1 parent 1188b95 commit 334a7ef
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 29 deletions.
30 changes: 15 additions & 15 deletions src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public sealed class KafkaMessageConsumer<TData> : IMessageConsumer<TData>, IDisp
private readonly ILogger<KafkaMessageConsumer<TData>> _logger;
private readonly IHostApplicationLifetime _applicationLifetime;
private IConsumer<string?, byte[]>? _consumer;

public KafkaMessageConsumer(ILogger<KafkaMessageConsumer<TData>> logger,
IOptions<KafkaConsumerOptions<TData>> config,
IHostApplicationLifetime applicationLifetime,
Expand Down Expand Up @@ -212,7 +212,7 @@ private async Task<ConsumeResultAndProcessedMessageStatus> SingleMessageHandling
}

#region Commit

private readonly Channel<Task<ConsumeResultAndProcessedMessageStatus>> _processedMessages;
private readonly Timer _timer;
private readonly object _commitLock = new();
Expand All @@ -221,13 +221,13 @@ private async Task<ConsumeResultAndProcessedMessageStatus> SingleMessageHandling
private async Task ExecuteCommitLoopAsync(CancellationToken cancellationToken)
{
RestartCommitTimer();

while (!cancellationToken.IsCancellationRequested)
{
try
{
var result = await PeekAndAwaitProcessedMessages(cancellationToken);

if (IsIrrecoverableFailure(result.ProcessedMessageStatus))
{
_applicationLifetime.StopApplication();
Expand All @@ -241,7 +241,7 @@ private async Task ExecuteCommitLoopAsync(CancellationToken cancellationToken)
{
_lastConsumeResultAndProcessedMessageStatus = result;
}

if (result.ConsumeResult.Offset.Value % _options.CommitPeriod == 0)
{
Commit();
Expand All @@ -253,22 +253,22 @@ private async Task ExecuteCommitLoopAsync(CancellationToken cancellationToken)
break;
}
}

StopCommitTimer();
}

private async Task<ConsumeResultAndProcessedMessageStatus> PeekAndAwaitProcessedMessages(CancellationToken cancellationToken)
{
await _processedMessages.Reader.WaitToReadAsync(cancellationToken);

if (!_processedMessages.Reader.TryPeek(out var consumeAndProcessTask))
{
throw new InvalidOperationException("Awaited channel data has been removed by another consumer");
}

return await consumeAndProcessTask;
}

private void Commit()
{
lock (_commitLock)
Expand Down Expand Up @@ -303,14 +303,14 @@ private void StopCommitTimer()
{
_timer.Change(Timeout.Infinite, Timeout.Infinite);
}


private void HandleCommitTimer(object? state)
{
Commit();
RestartCommitTimer();
}

private bool IsIrrecoverableFailure(ProcessedMessageStatus status)
{
switch (status)
Expand All @@ -329,12 +329,12 @@ private bool IsIrrecoverableFailure(ProcessedMessageStatus status)
return true;
default:
_logger.LogCritical("Unknown process status {status}", status);
return true;
return true;
}
}

#endregion

public MotorCloudEvent<byte[]> KafkaMessageToCloudEvent(Message<string?, byte[]> msg)
{
return msg.ToMotorCloudEvent(_applicationNameService, _cloudEventFormatter);
Expand All @@ -352,7 +352,7 @@ public IEnumerable<TopicPartitionOffset> Committed()

return _consumer.Committed(TimeSpan.FromSeconds(10));
}

private void Dispose(bool disposing)
{
if (disposing)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,19 +286,19 @@ public async Task Consume_CommitsEveryCommitPeriod()
{
var config = GetConsumerConfig<string>(_topic, maxConcurrentMessages: 1, retriesOnTemporaryFailure: 1);
config.CommitPeriod = 2;
config.AutoCommitIntervalMs = null;
config.AutoCommitIntervalMs = null;

using var consumer = GetConsumer(_topic, config, _fakeLifetimeMock.Object);
consumer.ConsumeCallbackAsync = CreateConsumeCallback(ProcessedMessageStatus.Success, _consumedChannel);

var cts = new CancellationTokenSource();
await consumer.StartAsync(cts.Token);
var execution = consumer.ExecuteAsync(cts.Token);

await PublishAndAwaitMessages(_consumedChannel, 3);

await WaitForCommittedOffset(consumer, 3);

cts.Cancel();
await execution;
}
Expand All @@ -324,7 +324,7 @@ private async Task WaitForCommittedOffset<TData>(KafkaMessageConsumer<TData> con
{
var offsets = consumer.Committed();
var offset = offsets.FirstOrDefault()?.Offset;

_output.WriteLine($"Waiting for offset {expectedOffset} got {offset}");
if (offset == expectedOffset)
{
Expand All @@ -334,24 +334,24 @@ private async Task WaitForCommittedOffset<TData>(KafkaMessageConsumer<TData> con
await Task.Delay(10, CancellationToken.None);
}
}

[Fact(Timeout = 50000)]
public async Task Consume_CommitsEveryAutoCommitIntervalMs()
{
var config = GetConsumerConfig<string>(_topic, maxConcurrentMessages: 1, retriesOnTemporaryFailure: 1);
config.AutoCommitIntervalMs = 1;
config.AutoCommitIntervalMs = 1;

using var consumer = GetConsumer(_topic, config, _fakeLifetimeMock.Object);
consumer.ConsumeCallbackAsync = CreateConsumeCallback(ProcessedMessageStatus.Success, _consumedChannel);

var cts = new CancellationTokenSource();
await consumer.StartAsync(cts.Token);
var execution = consumer.ExecuteAsync(cts.Token);

await PublishAndAwaitMessages(_consumedChannel, 2);

await WaitForCommittedOffset(consumer, 2);

cts.Cancel();
await execution;
}
Expand All @@ -360,7 +360,7 @@ public async Task Consume_CommitsEveryAutoCommitIntervalMs()
public async Task Consume_CommitsOnShutdown()
{
var config = GetConsumerConfig<string>(_topic, maxConcurrentMessages: 1, retriesOnTemporaryFailure: 1);
config.AutoCommitIntervalMs = null;
config.AutoCommitIntervalMs = null;

using var consumer = GetConsumer(_topic, config, _fakeLifetimeMock.Object);
consumer.ConsumeCallbackAsync = CreateConsumeCallback(ProcessedMessageStatus.Success, _consumedChannel);
Expand All @@ -377,10 +377,10 @@ public async Task Consume_CommitsOnShutdown()

cts.Cancel();
await execution;

await WaitForCommittedOffset(consumer, 2);
}

private Func<MotorCloudEvent<byte[]>, CancellationToken, Task<ProcessedMessageStatus>> CreateConsumeCallback(
ProcessedMessageStatus statusToReturn, Channel<byte[]> channel) => async (data, _) =>
{
Expand All @@ -397,7 +397,7 @@ private Func<MotorCloudEvent<byte[]>, CancellationToken, Task<ProcessedMessageSt
await Task.Delay(TimeSpan.MaxValue, cancellationToken);
return ProcessedMessageStatus.Success;
};

private async Task PublishMessage(string topic, string key, string value)
{
using var producer = new ProducerBuilder<string, byte[]>(GetPublisherConfig<string>(topic)).Build();
Expand Down

0 comments on commit 334a7ef

Please sign in to comment.