From 8be6fcc9bdd3b1d380f567c1529b5124dbb7eaa7 Mon Sep 17 00:00:00 2001 From: phillip-haydon Date: Thu, 22 Aug 2024 12:45:49 +1200 Subject: [PATCH] Made the setting configurable and fixed race condition --- .../RaygunClientBase.cs | 2 +- .../RaygunSettingsBase.cs | 14 +- .../ThrottledBackgroundMessageProcessor.cs | 170 ++++++++++++------ ...hrottledBackgroundMessageProcessorTests.cs | 92 ++++++++-- 4 files changed, 206 insertions(+), 72 deletions(-) diff --git a/Mindscape.Raygun4Net.NetCore.Common/RaygunClientBase.cs b/Mindscape.Raygun4Net.NetCore.Common/RaygunClientBase.cs index 6addaab5..3906c496 100644 --- a/Mindscape.Raygun4Net.NetCore.Common/RaygunClientBase.cs +++ b/Mindscape.Raygun4Net.NetCore.Common/RaygunClientBase.cs @@ -124,7 +124,7 @@ protected RaygunClientBase(RaygunSettingsBase settings, HttpClient client, IRayg { _client = client ?? DefaultClient; _settings = settings; - _backgroundMessageProcessor = new ThrottledBackgroundMessageProcessor(settings.BackgroundMessageQueueMax, _settings.BackgroundMessageWorkerCount, Send); + _backgroundMessageProcessor = new ThrottledBackgroundMessageProcessor(settings.BackgroundMessageQueueMax, _settings.BackgroundMessageWorkerCount, _settings.BackgroundMessageWorkerBreakpoint, Send); _userProvider = userProvider; _wrapperExceptions.Add(typeof(TargetInvocationException)); diff --git a/Mindscape.Raygun4Net.NetCore.Common/RaygunSettingsBase.cs b/Mindscape.Raygun4Net.NetCore.Common/RaygunSettingsBase.cs index ac39ed31..f56c1d6e 100644 --- a/Mindscape.Raygun4Net.NetCore.Common/RaygunSettingsBase.cs +++ b/Mindscape.Raygun4Net.NetCore.Common/RaygunSettingsBase.cs @@ -32,10 +32,10 @@ public RaygunSettingsBase() public string ApplicationVersion { get; set; } /// - /// If set to true will automatically setup handlers to catch Unhandled Exceptions + /// If set to true will automatically set up handlers to catch Unhandled Exceptions /// /// - /// Currently defaults to false. This may be change in future releases. + /// Currently defaults to false. This may be changed in future releases. /// public bool CatchUnhandledExceptions { get; set; } = false; @@ -45,13 +45,21 @@ public RaygunSettingsBase() public int BackgroundMessageQueueMax { get; } = ushort.MaxValue; /// - /// Controls the number of background threads used to process the raygun message queue + /// Controls the maximum number of background threads used to process the raygun message queue /// /// /// Defaults to Environment.ProcessorCount * 2 >= 8 ? 8 : Environment.ProcessorCount * 2 /// public int BackgroundMessageWorkerCount { get; set; } = Environment.ProcessorCount * 2 >= 8 ? 8 : Environment.ProcessorCount * 2; + /// + /// Used to determine how many messages are in the queue before the background processor will add another worker to help process the queue. + /// + /// + /// Defaults to 25, workers will be added for every 25 messages in the queue, until the BackgroundMessageWorkerCount is reached. + /// + public int BackgroundMessageWorkerBreakpoint { get; set; } = 25; + /// /// A list of Environment Variables to include with the message. /// diff --git a/Mindscape.Raygun4Net.NetCore.Common/ThrottledBackgroundMessageProcessor.cs b/Mindscape.Raygun4Net.NetCore.Common/ThrottledBackgroundMessageProcessor.cs index e67a8757..cc34b762 100644 --- a/Mindscape.Raygun4Net.NetCore.Common/ThrottledBackgroundMessageProcessor.cs +++ b/Mindscape.Raygun4Net.NetCore.Common/ThrottledBackgroundMessageProcessor.cs @@ -1,7 +1,9 @@ #nullable enable using System; +using System.Buffers; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading; @@ -11,58 +13,62 @@ namespace Mindscape.Raygun4Net { internal sealed class ThrottledBackgroundMessageProcessor : IDisposable { - private readonly BlockingCollection>> _messageQueue; - private readonly ConcurrentDictionary _workerTasks; + // This was a BlockingCollection which used .Take to dequeue items, but since we will have 0 workers when the queue is empty + // we don't need to block the thread waiting for an item to be enqueued. A concurrent queue is more appropriate. + internal readonly ConcurrentQueue _messageQueue; + internal readonly ConcurrentDictionary _workerTasks; + private readonly CancellationTokenSource _globalCancellationSource; + private readonly int _maxQueueSize; private readonly Func _processCallback; private readonly int _maxWorkerTasks; + private readonly int _workerQueueBreakpoint; private readonly object _workerTaskMutex = new(); - //TODO: Make QueueSizePerWorker configurable - private const int QueueSizePerWorker = 25; + private bool _drainingQueue; - private volatile bool _isDisposing; + private bool _isDisposing; + private readonly int _drainSize; - public ThrottledBackgroundMessageProcessor( - int maxQueueSize, - int maxWorkerTasks, - Func onProcessMessageFunc) + public ThrottledBackgroundMessageProcessor(int maxQueueSize, + int maxWorkerTasks, + int workerQueueBreakpoint, + Func onProcessMessageFunc) { + _maxQueueSize = maxQueueSize; + _workerQueueBreakpoint = workerQueueBreakpoint <= 0 ? 25 : workerQueueBreakpoint; + + // Drain the queue when it reaches 90% of the max size + _drainSize = Math.Max(maxQueueSize / 100 * 90, 1); _processCallback = onProcessMessageFunc ?? throw new ArgumentNullException(nameof(onProcessMessageFunc)); _maxWorkerTasks = maxWorkerTasks; - _messageQueue = new BlockingCollection>>(maxQueueSize); + _messageQueue = new ConcurrentQueue(); _globalCancellationSource = new CancellationTokenSource(); _workerTasks = new ConcurrentDictionary(Environment.ProcessorCount, _maxWorkerTasks); } public bool Enqueue(RaygunMessage message) { - return Enqueue(() => Task.FromResult(message)); - } - - public bool Enqueue(Func messageFunc) - { - return Enqueue(() => Task.FromResult(messageFunc())); - } - - public bool Enqueue(Func> messageFunc) - { - if (_isDisposing) + if (_drainingQueue) { - return false; - } + if (_messageQueue.Count >= _drainSize) + { + return false; + } - var itemAdded = _messageQueue.TryAdd(messageFunc); + _drainingQueue = false; + } - if (itemAdded) + if (_messageQueue.Count >= _maxQueueSize) { - // After enqueuing a message, adjust the number of workers in case there's currently 0 workers - // Otherwise there might be none and the message will never be sent... - AdjustWorkers(); + _drainingQueue = true; + return false; } - return itemAdded; + _messageQueue.Enqueue(message); + AdjustWorkers(); + return true; } /// @@ -83,9 +89,8 @@ private void AdjustWorkers() // Calculate the desired number of workers based on the queue size, this is so we don't end up creating // many workers that essentially do nothing, or if there's a small number of errors we don't have too many workers - var queueSize = _messageQueue.Count; var currentWorkers = _workerTasks.Count; - var desiredWorkers = CalculateDesiredWorkers(queueSize); + var desiredWorkers = CalculateDesiredWorkers(_messageQueue.Count); if (desiredWorkers > currentWorkers) { @@ -98,12 +103,25 @@ private void AdjustWorkers() { RemoveExcessWorkers(currentWorkers - desiredWorkers); } + + if (desiredWorkers == 0 && _messageQueue.Count > 0) + { + // If we have messages to process but no workers, create a worker + CreateWorkerTask(); + } } finally { // Make sure we release the mutex otherwise we'll block the next thread that wants to adjust the number of workers Monitor.Exit(_workerTaskMutex); } + + // We only want 1 thread adjusting the workers at any given time, but there could be a race condition + // where the queue is empty when we release the mutex, but there are 'completed' tasks, so we need to double-check and adjust. + if (_messageQueue.Count > 0 && _workerTasks.All(x => x.Key.IsCompleted)) + { + AdjustWorkers(); + } } /// @@ -112,15 +130,28 @@ private void AdjustWorkers() /// private void RemoveCompletedTasks() { - var completedTasks = _workerTasks.Where(kvp => kvp.Key.IsCompleted).ToArray(); - - foreach (var kvp in completedTasks) + var count = _workerTasks.Count; + var rentedArray = ArrayPool>.Shared.Rent(count); + var completedCount = 0; + + foreach (var kvp in _workerTasks) { + if (kvp.Key.IsCompleted) + { + rentedArray[completedCount++] = kvp; + } + } + + for (var i = 0; i < completedCount; i++) + { + var kvp = rentedArray[i]; if (_workerTasks.TryRemove(kvp.Key, out var cts)) { cts.Dispose(); } } + + ArrayPool>.Shared.Return(rentedArray); } /// @@ -131,16 +162,31 @@ private void RemoveCompletedTasks() /// Number of workers to kill off. private void RemoveExcessWorkers(int count) { - var excessWorkers = _workerTasks.Take(count).ToArray(); + var rentedArray = ArrayPool>.Shared.Rent(count); + var index = 0; + + foreach (var kvp in _workerTasks) + { + if (index == count) + { + break; + } + + rentedArray[index++] = kvp; + } - foreach (var kvp in excessWorkers) + for (var i = 0; i < index; i++) { + var kvp = rentedArray[i]; + if (_workerTasks.TryRemove(kvp.Key, out var cts)) { cts.Cancel(); cts.Dispose(); } } + + ArrayPool>.Shared.Return(rentedArray); } /// @@ -150,8 +196,8 @@ private void RemoveExcessWorkers(int count) private void CreateWorkerTask() { var cts = CancellationTokenSource.CreateLinkedTokenSource(_globalCancellationSource.Token); - var task = Task.Run(() => RaygunMessageWorker(_messageQueue, _processCallback, cts.Token), _globalCancellationSource.Token); + var task = Task.Run(() => RaygunMessageWorker(_messageQueue, _processCallback, cts.Token), cts.Token); _workerTasks[task] = cts; // When the worker task completes, adjust the number of workers @@ -171,40 +217,36 @@ private int CalculateDesiredWorkers(int queueSize) return 0; } - if (queueSize <= QueueSizePerWorker) + if (queueSize <= _workerQueueBreakpoint) { return 1; } - return Math.Min((queueSize + QueueSizePerWorker - 1) / QueueSizePerWorker, _maxWorkerTasks); + return Math.Min((queueSize + _workerQueueBreakpoint - 1) / _workerQueueBreakpoint, _maxWorkerTasks); } /// /// Actual task run by the worker. This method will take a message from the queue and process it. /// - private static async Task RaygunMessageWorker(BlockingCollection>> messageQueue, + private static async Task RaygunMessageWorker(ConcurrentQueue messageQueue, Func callback, CancellationToken cancellationToken) { try { - while (!cancellationToken.IsCancellationRequested && !messageQueue.IsCompleted) + while (!cancellationToken.IsCancellationRequested && messageQueue.TryDequeue(out var message)) { - Func> messageFunc; try { - messageFunc = messageQueue.Take(cancellationToken); + await callback(message, cancellationToken); } - catch (InvalidOperationException) when (messageQueue.IsCompleted) + catch (InvalidOperationException) { break; } - - var message = await messageFunc(); - await callback(message, cancellationToken); } } - catch (Exception cancelledEx) when (cancelledEx is ThreadAbortException + catch (Exception cancelledEx) when (cancelledEx is ThreadAbortException or OperationCanceledException or TaskCanceledException) { @@ -225,19 +267,31 @@ public void Dispose() _isDisposing = true; - _messageQueue.CompleteAdding(); - _globalCancellationSource.Cancel(); - - foreach (var kvp in _workerTasks) + try { - kvp.Value.Cancel(); - kvp.Value.Dispose(); - } + foreach (var kvp in _workerTasks) + { + if (_workerTasks.TryRemove(kvp.Key, out var cts)) + { + if (!cts.IsCancellationRequested) + { + cts.Cancel(); + } - Task.WaitAll(_workerTasks.Keys.ToArray(), TimeSpan.FromSeconds(2)); + cts.Dispose(); + } + } - _messageQueue.Dispose(); - _globalCancellationSource.Dispose(); + _globalCancellationSource.Cancel(); + + Task.WaitAll(_workerTasks.Keys.ToArray(), TimeSpan.FromSeconds(2)); + + _globalCancellationSource.Dispose(); + } + catch (Exception ex) + { + Debug.WriteLine($"Exception in ThrottledBackgroundMessageProcessor.Dispose: {ex}"); + } } } } \ No newline at end of file diff --git a/Mindscape.Raygun4Net.NetCore.Tests/ThrottledBackgroundMessageProcessorTests.cs b/Mindscape.Raygun4Net.NetCore.Tests/ThrottledBackgroundMessageProcessorTests.cs index 7b7e0297..c61ab773 100644 --- a/Mindscape.Raygun4Net.NetCore.Tests/ThrottledBackgroundMessageProcessorTests.cs +++ b/Mindscape.Raygun4Net.NetCore.Tests/ThrottledBackgroundMessageProcessorTests.cs @@ -10,7 +10,7 @@ public class ThrottledBackgroundMessageProcessorTests [Test] public void ThrottledBackgroundMessageProcessor_WithQueueSpace_AcceptsMessages() { - var cut = new ThrottledBackgroundMessageProcessor(1, 0, (m, t) => { return Task.CompletedTask; }); + var cut = new ThrottledBackgroundMessageProcessor(1, 0, 25, (m, t) => { return Task.CompletedTask; }); var enqueued = cut.Enqueue(new RaygunMessage()); Assert.That(enqueued, Is.True); @@ -19,7 +19,7 @@ public void ThrottledBackgroundMessageProcessor_WithQueueSpace_AcceptsMessages() [Test] public void ThrottledBackgroundMessageProcessor_WithFullQueue_DropsMessages() { - var cut = new ThrottledBackgroundMessageProcessor(1, 0, (m, t) => { return Task.CompletedTask; }); + var cut = new ThrottledBackgroundMessageProcessor(1, 0, 25, (m, t) => { return Task.CompletedTask; }); cut.Enqueue(new RaygunMessage()); var second = cut.Enqueue(new RaygunMessage()); @@ -32,7 +32,7 @@ public void ThrottledBackgroundMessageProcessor_WithFullQueue_DropsMessages() public void ThrottledBackgroundMessageProcessor_WithNoWorkers_DoesNotProcessMessages() { var processed = false; - var cut = new ThrottledBackgroundMessageProcessor(1, 0, (m, t) => + var cut = new ThrottledBackgroundMessageProcessor(1, 0, 25, (m, t) => { processed = true; return Task.CompletedTask; @@ -51,7 +51,7 @@ public void ThrottledBackgroundMessageProcessor_WithAtLeastOneWorker_DoesProcess { var processed = false; var resetEventSlim = new ManualResetEventSlim(); - var cut = new ThrottledBackgroundMessageProcessor(1, 1, (m, t) => + var cut = new ThrottledBackgroundMessageProcessor(1, 1, 25, (m, t) => { processed = true; resetEventSlim.Set(); @@ -71,7 +71,7 @@ public void ThrottledBackgroundMessageProcessor_WithAtLeastOneWorker_DoesProcess [Test] public void ThrottledBackgroundMessageProcessor_CallingDisposeTwice_DoesNotExplode() { - var cut = new ThrottledBackgroundMessageProcessor(1, 0, (m, t) => { return Task.CompletedTask; }); + var cut = new ThrottledBackgroundMessageProcessor(1, 0, 25, (m, t) => { return Task.CompletedTask; }); Assert.DoesNotThrow(() => { @@ -87,7 +87,7 @@ public void ThrottledBackgroundMessageProcessor_ExceptionInProcess_KillsWorkerTh var secondMessageWasProcessed = false; var resetEventSlim = new ManualResetEventSlim(); - var cut = new ThrottledBackgroundMessageProcessor(1, 1, (m, t) => + var cut = new ThrottledBackgroundMessageProcessor(1, 1, 25, (m, t) => { if (shouldThrow) { @@ -121,17 +121,23 @@ public void ThrottledBackgroundMessageProcessor_CancellationRequested_IsCaughtAn var secondMessageWasProcessed = false; var resetEventSlim = new ManualResetEventSlim(); - var cut = new ThrottledBackgroundMessageProcessor(1, 1, (m, t) => + var cut = new ThrottledBackgroundMessageProcessor(1, 1, 25, (m, t) => { if (shouldThrow) { - resetEventSlim.Set(); - throw new OperationCanceledException("Bad", t); + try + { + throw new OperationCanceledException("Bad", t); + } + finally + { + resetEventSlim.Set(); + } } secondMessageWasProcessed = true; resetEventSlim.Set(); - + return Task.CompletedTask; }); @@ -148,5 +154,71 @@ public void ThrottledBackgroundMessageProcessor_CancellationRequested_IsCaughtAn Assert.That(secondMessageWasProcessed, Is.True); } + + [Test] + public void Things_Throwing() + { + var secondMessageWasProcessed = false; + + for (int i = 0; i < 100; i++) + { + var resetEventSlim = new ManualResetEventSlim(); + + var cut = new ThrottledBackgroundMessageProcessor(1, 1, 25, (m, t) => + { + secondMessageWasProcessed = true; + resetEventSlim.Set(); + + return Task.CompletedTask; + }); + + cut.Enqueue(new RaygunMessage()); + + resetEventSlim.Wait(TimeSpan.FromSeconds(5)); + + cut.Dispose(); + } + + Assert.That(secondMessageWasProcessed, Is.True); + //Assert.That(secondMessageWasProcessed, Is.True); + } + + [Test] + public void Things_Throwing_Many() + { + var secondMessageWasProcessed = false; + for (int j = 0; j < 100; j++) + { + var count = 0; + var resetEventSlim = new ManualResetEventSlim(); + + var cut = new ThrottledBackgroundMessageProcessor(100_000, 8, 25, (m, t) => + { + Interlocked.Increment(ref count); + if (count == 100) + { + secondMessageWasProcessed = true; + resetEventSlim.Set(); + } + + Console.WriteLine($"Sent {count}"); + return Task.CompletedTask; + }); + + + for (int i = 0; i < 100; i++) + { + cut.Enqueue(new RaygunMessage()); + Console.WriteLine(i); + } + + resetEventSlim.Wait(TimeSpan.FromSeconds(10)); + + cut.Dispose(); + } + + Assert.That(secondMessageWasProcessed, Is.True); + //Assert.That(secondMessageWasProcessed, Is.True); + } } } \ No newline at end of file