Skip to content

Commit

Permalink
Made the setting configurable and fixed race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
phillip-haydon committed Aug 22, 2024
1 parent 2435703 commit 8be6fcc
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 72 deletions.
2 changes: 1 addition & 1 deletion Mindscape.Raygun4Net.NetCore.Common/RaygunClientBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ protected RaygunClientBase(RaygunSettingsBase settings, HttpClient client, IRayg
{
_client = client ?? DefaultClient;
_settings = settings;
_backgroundMessageProcessor = new ThrottledBackgroundMessageProcessor(settings.BackgroundMessageQueueMax, _settings.BackgroundMessageWorkerCount, Send);
_backgroundMessageProcessor = new ThrottledBackgroundMessageProcessor(settings.BackgroundMessageQueueMax, _settings.BackgroundMessageWorkerCount, _settings.BackgroundMessageWorkerBreakpoint, Send);
_userProvider = userProvider;

_wrapperExceptions.Add(typeof(TargetInvocationException));
Expand Down
14 changes: 11 additions & 3 deletions Mindscape.Raygun4Net.NetCore.Common/RaygunSettingsBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ public RaygunSettingsBase()
public string ApplicationVersion { get; set; }

/// <summary>
/// If set to true will automatically setup handlers to catch Unhandled Exceptions
/// If set to true will automatically set up handlers to catch Unhandled Exceptions
/// </summary>
/// <remarks>
/// Currently defaults to false. This may be change in future releases.
/// Currently defaults to false. This may be changed in future releases.
/// </remarks>
public bool CatchUnhandledExceptions { get; set; } = false;

Expand All @@ -45,13 +45,21 @@ public RaygunSettingsBase()
public int BackgroundMessageQueueMax { get; } = ushort.MaxValue;

/// <summary>
/// Controls the number of background threads used to process the raygun message queue
/// Controls the maximum number of background threads used to process the raygun message queue
/// </summary>
/// <remarks>
/// Defaults to Environment.ProcessorCount * 2 &gt;= 8 ? 8 : Environment.ProcessorCount * 2
/// </remarks>
public int BackgroundMessageWorkerCount { get; set; } = Environment.ProcessorCount * 2 >= 8 ? 8 : Environment.ProcessorCount * 2;

/// <summary>
/// Used to determine how many messages are in the queue before the background processor will add another worker to help process the queue.
/// </summary>
/// <remarks>
/// Defaults to 25, workers will be added for every 25 messages in the queue, until the BackgroundMessageWorkerCount is reached.
/// </remarks>
public int BackgroundMessageWorkerBreakpoint { get; set; } = 25;

/// <summary>
/// A list of Environment Variables to include with the message.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#nullable enable

using System;
using System.Buffers;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
Expand All @@ -11,58 +13,62 @@ namespace Mindscape.Raygun4Net
{
internal sealed class ThrottledBackgroundMessageProcessor : IDisposable
{
private readonly BlockingCollection<Func<Task<RaygunMessage>>> _messageQueue;
private readonly ConcurrentDictionary<Task, CancellationTokenSource> _workerTasks;
// This was a BlockingCollection<T> which used .Take to dequeue items, but since we will have 0 workers when the queue is empty
// we don't need to block the thread waiting for an item to be enqueued. A concurrent queue is more appropriate.
internal readonly ConcurrentQueue<RaygunMessage> _messageQueue;
internal readonly ConcurrentDictionary<Task, CancellationTokenSource> _workerTasks;

private readonly CancellationTokenSource _globalCancellationSource;
private readonly int _maxQueueSize;
private readonly Func<RaygunMessage, CancellationToken, Task> _processCallback;
private readonly int _maxWorkerTasks;
private readonly int _workerQueueBreakpoint;
private readonly object _workerTaskMutex = new();

//TODO: Make QueueSizePerWorker configurable
private const int QueueSizePerWorker = 25;
private bool _drainingQueue;


private volatile bool _isDisposing;
private bool _isDisposing;
private readonly int _drainSize;

public ThrottledBackgroundMessageProcessor(
int maxQueueSize,
int maxWorkerTasks,
Func<RaygunMessage, CancellationToken, Task> onProcessMessageFunc)
public ThrottledBackgroundMessageProcessor(int maxQueueSize,
int maxWorkerTasks,
int workerQueueBreakpoint,
Func<RaygunMessage, CancellationToken, Task> onProcessMessageFunc)
{
_maxQueueSize = maxQueueSize;
_workerQueueBreakpoint = workerQueueBreakpoint <= 0 ? 25 : workerQueueBreakpoint;

// Drain the queue when it reaches 90% of the max size
_drainSize = Math.Max(maxQueueSize / 100 * 90, 1);
_processCallback = onProcessMessageFunc ?? throw new ArgumentNullException(nameof(onProcessMessageFunc));
_maxWorkerTasks = maxWorkerTasks;
_messageQueue = new BlockingCollection<Func<Task<RaygunMessage>>>(maxQueueSize);
_messageQueue = new ConcurrentQueue<RaygunMessage>();
_globalCancellationSource = new CancellationTokenSource();
_workerTasks = new ConcurrentDictionary<Task, CancellationTokenSource>(Environment.ProcessorCount, _maxWorkerTasks);
}

public bool Enqueue(RaygunMessage message)
{
return Enqueue(() => Task.FromResult(message));
}

public bool Enqueue(Func<RaygunMessage> messageFunc)
{
return Enqueue(() => Task.FromResult(messageFunc()));
}

public bool Enqueue(Func<Task<RaygunMessage>> messageFunc)
{
if (_isDisposing)
if (_drainingQueue)
{
return false;
}
if (_messageQueue.Count >= _drainSize)
{
return false;
}

var itemAdded = _messageQueue.TryAdd(messageFunc);
_drainingQueue = false;
}

if (itemAdded)
if (_messageQueue.Count >= _maxQueueSize)
{
// After enqueuing a message, adjust the number of workers in case there's currently 0 workers
// Otherwise there might be none and the message will never be sent...
AdjustWorkers();
_drainingQueue = true;
return false;
}

return itemAdded;
_messageQueue.Enqueue(message);
AdjustWorkers();
return true;
}

/// <summary>
Expand All @@ -83,9 +89,8 @@ private void AdjustWorkers()

// Calculate the desired number of workers based on the queue size, this is so we don't end up creating
// many workers that essentially do nothing, or if there's a small number of errors we don't have too many workers
var queueSize = _messageQueue.Count;
var currentWorkers = _workerTasks.Count;
var desiredWorkers = CalculateDesiredWorkers(queueSize);
var desiredWorkers = CalculateDesiredWorkers(_messageQueue.Count);

if (desiredWorkers > currentWorkers)
{
Expand All @@ -98,12 +103,25 @@ private void AdjustWorkers()
{
RemoveExcessWorkers(currentWorkers - desiredWorkers);
}

if (desiredWorkers == 0 && _messageQueue.Count > 0)
{
// If we have messages to process but no workers, create a worker
CreateWorkerTask();
}
}
finally
{
// Make sure we release the mutex otherwise we'll block the next thread that wants to adjust the number of workers
Monitor.Exit(_workerTaskMutex);
}

// We only want 1 thread adjusting the workers at any given time, but there could be a race condition
// where the queue is empty when we release the mutex, but there are 'completed' tasks, so we need to double-check and adjust.
if (_messageQueue.Count > 0 && _workerTasks.All(x => x.Key.IsCompleted))
{
AdjustWorkers();
}
}

/// <summary>
Expand All @@ -112,15 +130,28 @@ private void AdjustWorkers()
/// </summary>
private void RemoveCompletedTasks()
{
var completedTasks = _workerTasks.Where(kvp => kvp.Key.IsCompleted).ToArray();

foreach (var kvp in completedTasks)
var count = _workerTasks.Count;
var rentedArray = ArrayPool<KeyValuePair<Task, CancellationTokenSource>>.Shared.Rent(count);
var completedCount = 0;

foreach (var kvp in _workerTasks)
{
if (kvp.Key.IsCompleted)
{
rentedArray[completedCount++] = kvp;
}
}

for (var i = 0; i < completedCount; i++)
{
var kvp = rentedArray[i];
if (_workerTasks.TryRemove(kvp.Key, out var cts))
{
cts.Dispose();
}
}

ArrayPool<KeyValuePair<Task, CancellationTokenSource>>.Shared.Return(rentedArray);
}

/// <summary>
Expand All @@ -131,16 +162,31 @@ private void RemoveCompletedTasks()
/// <param name="count">Number of workers to kill off.</param>
private void RemoveExcessWorkers(int count)
{
var excessWorkers = _workerTasks.Take(count).ToArray();
var rentedArray = ArrayPool<KeyValuePair<Task, CancellationTokenSource>>.Shared.Rent(count);
var index = 0;

foreach (var kvp in _workerTasks)
{
if (index == count)
{
break;
}

rentedArray[index++] = kvp;
}

foreach (var kvp in excessWorkers)
for (var i = 0; i < index; i++)
{
var kvp = rentedArray[i];

if (_workerTasks.TryRemove(kvp.Key, out var cts))
{
cts.Cancel();
cts.Dispose();
}
}

ArrayPool<KeyValuePair<Task, CancellationTokenSource>>.Shared.Return(rentedArray);
}

/// <summary>
Expand All @@ -150,8 +196,8 @@ private void RemoveExcessWorkers(int count)
private void CreateWorkerTask()
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(_globalCancellationSource.Token);
var task = Task.Run(() => RaygunMessageWorker(_messageQueue, _processCallback, cts.Token), _globalCancellationSource.Token);

var task = Task.Run(() => RaygunMessageWorker(_messageQueue, _processCallback, cts.Token), cts.Token);
_workerTasks[task] = cts;

// When the worker task completes, adjust the number of workers
Expand All @@ -171,40 +217,36 @@ private int CalculateDesiredWorkers(int queueSize)
return 0;
}

if (queueSize <= QueueSizePerWorker)
if (queueSize <= _workerQueueBreakpoint)
{
return 1;
}

return Math.Min((queueSize + QueueSizePerWorker - 1) / QueueSizePerWorker, _maxWorkerTasks);
return Math.Min((queueSize + _workerQueueBreakpoint - 1) / _workerQueueBreakpoint, _maxWorkerTasks);
}

/// <summary>
/// Actual task run by the worker. This method will take a message from the queue and process it.
/// </summary>
private static async Task RaygunMessageWorker(BlockingCollection<Func<Task<RaygunMessage>>> messageQueue,
private static async Task RaygunMessageWorker(ConcurrentQueue<RaygunMessage> messageQueue,
Func<RaygunMessage, CancellationToken, Task> callback,
CancellationToken cancellationToken)
{
try
{
while (!cancellationToken.IsCancellationRequested && !messageQueue.IsCompleted)
while (!cancellationToken.IsCancellationRequested && messageQueue.TryDequeue(out var message))
{
Func<Task<RaygunMessage>> messageFunc;
try
{
messageFunc = messageQueue.Take(cancellationToken);
await callback(message, cancellationToken);
}
catch (InvalidOperationException) when (messageQueue.IsCompleted)
catch (InvalidOperationException)
{
break;
}

var message = await messageFunc();
await callback(message, cancellationToken);
}
}
catch (Exception cancelledEx) when (cancelledEx is ThreadAbortException
catch (Exception cancelledEx) when (cancelledEx is ThreadAbortException
or OperationCanceledException
or TaskCanceledException)
{
Expand All @@ -225,19 +267,31 @@ public void Dispose()

_isDisposing = true;

_messageQueue.CompleteAdding();
_globalCancellationSource.Cancel();

foreach (var kvp in _workerTasks)
try
{
kvp.Value.Cancel();
kvp.Value.Dispose();
}
foreach (var kvp in _workerTasks)
{
if (_workerTasks.TryRemove(kvp.Key, out var cts))
{
if (!cts.IsCancellationRequested)
{
cts.Cancel();
}

Task.WaitAll(_workerTasks.Keys.ToArray(), TimeSpan.FromSeconds(2));
cts.Dispose();
}
}

_messageQueue.Dispose();
_globalCancellationSource.Dispose();
_globalCancellationSource.Cancel();

Task.WaitAll(_workerTasks.Keys.ToArray(), TimeSpan.FromSeconds(2));

_globalCancellationSource.Dispose();
}
catch (Exception ex)
{
Debug.WriteLine($"Exception in ThrottledBackgroundMessageProcessor.Dispose: {ex}");
}
}
}
}
Loading

0 comments on commit 8be6fcc

Please sign in to comment.