Skip to content

Commit

Permalink
Batch ingestion not correctly handling OperationCancelledException wh…
Browse files Browse the repository at this point in the history
…ich can cause the ingestion to never used incoming context tasks and hang. (#4780)

- Rectored to use BackgroundService
- Fix `catch (OperationCancelledException)` blocks to add `when` guards - only ignore cancellations set by caller
- Overriding Start and Stop to support graceful shutdown and improve intent and readability
- Ensure TrySetException is always set on exception
- Logging improvements around cancellation to inform that tasks got cancelled. Important when shutting down to diagnose where teardown "hangs".
  • Loading branch information
ramonsmits authored Feb 11, 2025
1 parent 1d34d8c commit 92c6864
Show file tree
Hide file tree
Showing 30 changed files with 211 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ protected override async Task ExecuteAsync(CancellationToken cancellationToken)
}
} while (await timer.WaitForNextTickAsync(cancellationToken));
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
logger.LogInformation($"Stopping {nameof(AuditThroughputCollectorHostedService)}");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ static ReadOnlyDictionary<string, string> LoadBrokerSettingValues(IEnumerable<Ke
}
} while (await timer.WaitForNextTickAsync(stoppingToken));
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
logger.LogInformation($"Stopping {nameof(BrokerThroughputCollectorHostedService)}");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ public override async Task Stop(CancellationToken cancellationToken = default)
{
await checkTask;
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
//Swallow
// Even though we are stopping, ONLY swallow when OCE from callee to not hide any ungraceful stop errors
}
finally
{
Expand Down
134 changes: 80 additions & 54 deletions src/ServiceControl.Audit/Auditing/AuditIngestion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
using ServiceControl.Infrastructure;
using Transports;

class AuditIngestion : IHostedService
class AuditIngestion : BackgroundService
{
public AuditIngestion(
Settings settings,
Expand Down Expand Up @@ -51,23 +51,15 @@ public AuditIngestion(

errorHandlingPolicy = new AuditIngestionFaultPolicy(failedImportsStorage, settings.LoggingSettings, OnCriticalError);

watchdog = new Watchdog("audit message ingestion", EnsureStarted, EnsureStopped, ingestionState.ReportError, ingestionState.Clear, settings.TimeToRestartAuditIngestionAfterFailure, logger);

ingestionWorker = Task.Run(() => Loop(), CancellationToken.None);
}

public Task StartAsync(CancellationToken _) => watchdog.Start(() => applicationLifetime.StopApplication());

public async Task StopAsync(CancellationToken cancellationToken)
{
await watchdog.Stop();
channel.Writer.Complete();
await ingestionWorker;

if (transportInfrastructure != null)
{
await transportInfrastructure.Shutdown(cancellationToken);
}
watchdog = new Watchdog(
"audit message ingestion",
EnsureStarted,
EnsureStopped,
ingestionState.ReportError,
ingestionState.Clear,
settings.TimeToRestartAuditIngestionAfterFailure,
logger
);
}

Task OnCriticalError(string failure, Exception exception)
Expand Down Expand Up @@ -132,7 +124,7 @@ async Task EnsureStarted(CancellationToken cancellationToken = default)
}
catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken)
{
// ignored
logger.Info("StopReceive cancelled");
}
}

Expand Down Expand Up @@ -170,7 +162,7 @@ async Task EnsureStopped(CancellationToken cancellationToken = default)
}
catch (OperationCanceledException e) when (e.CancellationToken == cancellationToken)
{
// ignored
logger.Info("StopReceive cancelled");
}
finally
{
Expand Down Expand Up @@ -200,57 +192,92 @@ async Task OnMessage(MessageContext messageContext, CancellationToken cancellati
}
}

async Task Loop()
public override async Task StartAsync(CancellationToken cancellationToken)
{
var contexts = new List<MessageContext>(transportSettings.MaxConcurrency.Value);
await watchdog.Start(() => applicationLifetime.StopApplication());
await base.StartAsync(cancellationToken);
}

while (await channel.Reader.WaitToReadAsync())
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
// will only enter here if there is something to read.
try
var contexts = new List<MessageContext>(transportSettings.MaxConcurrency.Value);

while (await channel.Reader.WaitToReadAsync(stoppingToken))
{
// as long as there is something to read this will fetch up to MaximumConcurrency items
while (channel.Reader.TryRead(out var context))
// will only enter here if there is something to read.
try
{
contexts.Add(context);
// as long as there is something to read this will fetch up to MaximumConcurrency items
while (channel.Reader.TryRead(out var context))
{
contexts.Add(context);
}

auditBatchSize.Record(contexts.Count);

using (new DurationRecorder(auditBatchDuration))
{
await auditIngestor.Ingest(contexts);
}

consecutiveBatchFailuresCounter.Record(0);
}
catch (Exception e)
{
// signal all message handling tasks to terminate
foreach (var context in contexts)
{
_ = context.GetTaskCompletionSource().TrySetException(e);
}

if (e is OperationCanceledException && stoppingToken.IsCancellationRequested)
{
logger.Info("Batch cancelled", e);
break;
}

auditBatchSize.Record(contexts.Count);
logger.Info("Ingesting messages failed", e);

using (new DurationRecorder(auditBatchDuration))
// no need to do interlocked increment since this is running sequential
consecutiveBatchFailuresCounter.Record(consecutiveBatchFailures++);
}
finally
{
await auditIngestor.Ingest(contexts);
contexts.Clear();
}

consecutiveBatchFailuresCounter.Record(0);
}
catch (OperationCanceledException)
{
//Do nothing as we are shutting down
continue;
}
catch (Exception e) // show must go on
// will fall out here when writer is completed
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
// ExecuteAsync cancelled
}
}

public override async Task StopAsync(CancellationToken cancellationToken)
{
try
{
await watchdog.Stop();
channel.Writer.Complete();
await base.StopAsync(cancellationToken);
}
finally
{
if (transportInfrastructure != null)
{
if (logger.IsInfoEnabled)
try
{
logger.Info("Ingesting messages failed", e);
await transportInfrastructure.Shutdown(cancellationToken);
}

// signal all message handling tasks to terminate
foreach (var context in contexts)
catch (OperationCanceledException e) when (cancellationToken.IsCancellationRequested)
{
context.GetTaskCompletionSource().TrySetException(e);
logger.Info("Shutdown cancelled", e);
}

// no need to do interlocked increment since this is running sequential
consecutiveBatchFailuresCounter.Record(consecutiveBatchFailures++);
}
finally
{
contexts.Clear();
}
}
// will fall out here when writer is completed
}

TransportInfrastructure transportInfrastructure;
Expand All @@ -273,7 +300,6 @@ async Task Loop()
readonly Histogram<long> consecutiveBatchFailuresCounter = Telemetry.Meter.CreateHistogram<long>(Telemetry.CreateInstrumentName("ingestion", "consecutive_batch_failures"), unit: "count", description: "Consecutive audit ingestion batch failure");
readonly Histogram<double> ingestionDuration = Telemetry.Meter.CreateHistogram<double>(Telemetry.CreateInstrumentName("ingestion", "duration"), unit: "ms", description: "Average incoming audit message processing duration");
readonly Watchdog watchdog;
readonly Task ingestionWorker;
readonly IHostApplicationLifetime applicationLifetime;

static readonly ILog logger = LogManager.GetLogger<AuditIngestion>();
Expand Down
4 changes: 2 additions & 2 deletions src/ServiceControl.Audit/Auditing/ImportFailedAudits.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ await failedAuditStore.ProcessFailedMessages(
Logger.Debug($"Successfully re-imported failed audit message {transportMessage.Id}.");
}
}
catch (OperationCanceledException)
catch (OperationCanceledException e) when (token.IsCancellationRequested)
{
// no-op
Logger.Info("Cancelled", e);
}
catch (Exception e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using NServiceBus;
using NServiceBus.Logging;
using Settings;

class ImportFailedAuditsCommand : AbstractCommand
{
readonly ILog logger = LogManager.GetLogger<ImportFailedAuditsCommand>();

public override async Task Execute(HostArguments args, Settings settings)
{
settings.IngestAuditMessages = false;
Expand All @@ -37,9 +40,9 @@ public override async Task Execute(HostArguments args, Settings settings)
{
await importer.Run(tokenSource.Token);
}
catch (OperationCanceledException)
catch (OperationCanceledException e) when (tokenSource.IsCancellationRequested)
{
// no op
logger.Info("Cancelled", e);
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void Start()
await Task.Delay(interval, tokenSource.Token).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (tokenSource.IsCancellationRequested)
{
//no-op
}
Expand Down
10 changes: 5 additions & 5 deletions src/ServiceControl.Infrastructure/AsyncTimer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,17 @@ public TimerJob(Func<CancellationToken, Task<TimerJobExecutionResult>> callback,

//Otherwise execute immediately
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (token.IsCancellationRequested)
{
// no-op
break;
}
catch (Exception ex)
{
errorCallback(ex);
}
}
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (token.IsCancellationRequested)
{
// no-op
}
Expand All @@ -64,7 +64,7 @@ public async Task Stop()
return;
}

tokenSource.Cancel();
await tokenSource.CancelAsync().ConfigureAwait(false);
tokenSource.Dispose();

if (task != null)
Expand All @@ -73,7 +73,7 @@ public async Task Stop()
{
await task.ConfigureAwait(false);
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (tokenSource.IsCancellationRequested)
{
//NOOP
}
Expand Down
6 changes: 3 additions & 3 deletions src/ServiceControl.Infrastructure/ReadOnlyStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public override Task CopyToAsync(Stream destination, int bufferSize, Cancellatio

return Task.CompletedTask;
}
catch (OperationCanceledException e)
catch (OperationCanceledException e) when (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled(e.CancellationToken);
}
Expand Down Expand Up @@ -113,7 +113,7 @@ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, Cancel

return Task.FromResult(result);
}
catch (OperationCanceledException e)
catch (OperationCanceledException e) when (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled<int>(e.CancellationToken);
}
Expand All @@ -136,7 +136,7 @@ public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken

return new ValueTask<int>(result);
}
catch (OperationCanceledException e)
catch (OperationCanceledException e) when (cancellationToken.IsCancellationRequested)
{
return new ValueTask<int>(Task.FromCanceled<int>(e.CancellationToken));
}
Expand Down
9 changes: 5 additions & 4 deletions src/ServiceControl.Infrastructure/Watchdog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ public Task Start(Action onFailedOnStartup)

failedOnStartup ??= false;
}
catch (OperationCanceledException)
catch (OperationCanceledException e) when (!shutdownTokenSource.IsCancellationRequested)
{
//Do not Delay
// Continue, as OCE is not from caller
log.Info("Start cancelled, retrying...", e);
continue;
}
catch (Exception e)
Expand All @@ -81,9 +82,9 @@ public Task Start(Action onFailedOnStartup)
{
await Task.Delay(timeToWaitBetweenStartupAttempts, shutdownTokenSource.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (shutdownTokenSource.IsCancellationRequested)
{
//Ignore
//Ignore, no need to log cancellation of delay
}
}
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ protected override async Task ExecuteAsync(CancellationToken cancellationToken)
}
} while (await timer.WaitForNextTickAsync(cancellationToken));
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
logger.LogInformation($"Stopping {nameof(RemoveExpiredEndpointInstances)} timer");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ protected override async Task ExecuteAsync(CancellationToken cancellationToken)
}
} while (await timer.WaitForNextTickAsync(cancellationToken));
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
logger.LogInformation($"Stopping {nameof(ReportThroughputHostedService)} timer");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async Task StartDispatcherTask(CancellationToken cancellationToken)
await signal.WaitHandle.WaitOneAsync(cancellationToken);
signal.Reset();
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
break;
}
Expand All @@ -91,7 +91,7 @@ async Task StartDispatcherTask(CancellationToken cancellationToken)
}
while (!cancellationToken.IsCancellationRequested);
}
catch (OperationCanceledException)
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// ignore
}
Expand Down
Loading

0 comments on commit 92c6864

Please sign in to comment.