-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fulfills #20 - add option to block when the queue is full, instead of dropping events #21
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,55 +12,63 @@ sealed class BackgroundWorkerSink : ILogEventSink, IDisposable | |
{ | ||
readonly ILogEventSink _pipeline; | ||
readonly int _bufferCapacity; | ||
volatile bool _disposed; | ||
readonly CancellationTokenSource _cancel = new CancellationTokenSource(); | ||
readonly bool _blockWhenFull; | ||
readonly BlockingCollection<LogEvent> _queue; | ||
readonly Task _worker; | ||
|
||
public BackgroundWorkerSink(ILogEventSink pipeline, int bufferCapacity) | ||
public BackgroundWorkerSink(ILogEventSink pipeline, int bufferCapacity, bool blockWhenFull) | ||
{ | ||
if (pipeline == null) throw new ArgumentNullException(nameof(pipeline)); | ||
if (bufferCapacity <= 0) throw new ArgumentOutOfRangeException(nameof(bufferCapacity)); | ||
_pipeline = pipeline; | ||
_bufferCapacity = bufferCapacity; | ||
_blockWhenFull = blockWhenFull; | ||
_queue = new BlockingCollection<LogEvent>(_bufferCapacity); | ||
_worker = Task.Factory.StartNew(Pump, CancellationToken.None, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach, TaskScheduler.Default); | ||
} | ||
|
||
public void Emit(LogEvent logEvent) | ||
{ | ||
// The disposed check is racy, but only ensures we don't prevent flush from | ||
// completing by pushing more events. | ||
if (!_disposed && !_queue.TryAdd(logEvent)) | ||
SelfLog.WriteLine("{0} unable to enqueue, capacity {1}", typeof(BackgroundWorkerSink), _bufferCapacity); | ||
if (this._queue.IsAddingCompleted) | ||
return; | ||
|
||
try | ||
{ | ||
if (_blockWhenFull) | ||
{ | ||
_queue.Add(logEvent); | ||
} | ||
else | ||
{ | ||
if (!_queue.TryAdd(logEvent)) | ||
SelfLog.WriteLine("{0} unable to enqueue, capacity {1}", typeof(BackgroundWorkerSink), _bufferCapacity); | ||
} | ||
} | ||
catch (InvalidOperationException) | ||
{ | ||
// Thrown in the event of a race condition when we try to add another event after | ||
// CompleteAdding has been called | ||
} | ||
} | ||
|
||
public void Dispose() | ||
{ | ||
_disposed = true; | ||
_cancel.Cancel(); | ||
_worker.Wait(); | ||
// Prevent any more events from being added | ||
_queue.CompleteAdding(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure why this wasn't used originally - looks like a good change to make. Can we grab some benchmark output from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, it's late now so will add something tomorrow There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The issue with this one is that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @skomis-mm yes, you're right - it's probably best to just do away with the disposed check in |
||
|
||
// Allow queued events to be flushed | ||
_worker.Wait(); | ||
|
||
(_pipeline as IDisposable)?.Dispose(); | ||
// _cancel not disposed, because it will make _cancel.Cancel() non-idempotent | ||
} | ||
|
||
void Pump() | ||
{ | ||
try | ||
{ | ||
try | ||
{ | ||
while (true) | ||
{ | ||
var next = _queue.Take(_cancel.Token); | ||
_pipeline.Emit(next); | ||
} | ||
} | ||
catch (OperationCanceledException) | ||
foreach (var next in _queue.GetConsumingEnumerable()) | ||
{ | ||
LogEvent next; | ||
while (_queue.TryTake(out next)) | ||
_pipeline.Emit(next); | ||
_pipeline.Emit(next); | ||
} | ||
} | ||
catch (Exception ex) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,26 +1,27 @@ | ||
using System; | ||
using System.Collections.Generic; | ||
using System.Diagnostics; | ||
using System.Linq; | ||
using System.Threading.Tasks; | ||
using Serilog.Core; | ||
using Serilog.Events; | ||
using Serilog.Parsing; | ||
using Serilog.Sinks.Async.Tests; | ||
using Serilog.Sinks.Async.Tests.Support; | ||
using Xunit; | ||
|
||
namespace Serilog.Sinks.Async.Tests | ||
{ | ||
public class BackgroundWorkerSinkSpec : IDisposable | ||
{ | ||
readonly Logger _logger; | ||
readonly MemorySink _innerSink; | ||
readonly BackgroundWorkerSink _sink; | ||
BackgroundWorkerSink _sink; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since some tests now use var sink = CreateWithDefaultOptions(); in each test that needs it, where the method would just be a shortcut to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually,
This adds quite a bit of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. I think I'd still probably prefer the |
||
|
||
public BackgroundWorkerSinkSpec() | ||
{ | ||
_innerSink = new MemorySink(); | ||
var logger = new LoggerConfiguration().WriteTo.Sink(_innerSink).CreateLogger(); | ||
_sink = new BackgroundWorkerSink(logger, 10000); | ||
_logger = new LoggerConfiguration().WriteTo.Sink(_innerSink).CreateLogger(); | ||
_sink = new BackgroundWorkerSink(_logger, 10000, false); | ||
} | ||
|
||
public void Dispose() | ||
|
@@ -31,7 +32,7 @@ public void Dispose() | |
[Fact] | ||
public void WhenCtorWithNullSink_ThenThrows() | ||
{ | ||
Assert.Throws<ArgumentNullException>(() => new BackgroundWorkerSink(null, 10000)); | ||
Assert.Throws<ArgumentNullException>(() => new BackgroundWorkerSink(null, 10000, false)); | ||
} | ||
|
||
[Fact] | ||
|
@@ -80,6 +81,77 @@ public async Task WhenEmitMultipleTimes_ThenRelaysToInnerSink() | |
Assert.Equal(3, _innerSink.Events.Count); | ||
} | ||
|
||
[Fact] | ||
public async Task WhenQueueFull_ThenDropsEvents() | ||
{ | ||
_sink = new BackgroundWorkerSink(_logger, 1, false); | ||
|
||
// Cause a delay when emmitting to the inner sink, allowing us to fill the queue to capacity | ||
// after the first event is popped | ||
_innerSink.DelayEmit = TimeSpan.FromMilliseconds(300); | ||
|
||
var events = new List<LogEvent> | ||
{ | ||
CreateEvent(), | ||
CreateEvent(), | ||
CreateEvent(), | ||
CreateEvent(), | ||
CreateEvent() | ||
}; | ||
events.ForEach(e => | ||
{ | ||
var sw = Stopwatch.StartNew(); | ||
_sink.Emit(e); | ||
sw.Stop(); | ||
|
||
Assert.True(sw.ElapsedMilliseconds < 200, "Should not block the caller when the queue is full"); | ||
}); | ||
|
||
// If we *weren't* dropped events, the delay in the inner sink would mean the 5 events would take | ||
// at least 15 seconds to process | ||
await Task.Delay(TimeSpan.FromSeconds(2)); | ||
|
||
// Events should be dropped | ||
Assert.Equal(2, _innerSink.Events.Count); | ||
} | ||
|
||
[Fact] | ||
public async Task WhenQueueFull_ThenBlocks() | ||
{ | ||
_sink = new BackgroundWorkerSink(_logger, 1, true); | ||
|
||
// Cause a delay when emmitting to the inner sink, allowing us to fill the queue to capacity | ||
// after the first event is popped | ||
_innerSink.DelayEmit = TimeSpan.FromMilliseconds(300); | ||
|
||
var events = new List<LogEvent> | ||
{ | ||
CreateEvent(), | ||
CreateEvent(), | ||
CreateEvent() | ||
}; | ||
|
||
int i = 0; | ||
events.ForEach(e => | ||
{ | ||
var sw = Stopwatch.StartNew(); | ||
_sink.Emit(e); | ||
sw.Stop(); | ||
|
||
// Emit should return immediately the first time, since the queue is not yet full. On | ||
// subsequent calls, the queue should be full, so we should be blocked | ||
if (i > 0) | ||
{ | ||
Assert.True(sw.ElapsedMilliseconds > 200, "Should block the caller when the queue is full"); | ||
} | ||
}); | ||
|
||
await Task.Delay(TimeSpan.FromSeconds(2)); | ||
|
||
// No events should be dropped | ||
Assert.Equal(3, _innerSink.Events.Count); | ||
} | ||
|
||
private static LogEvent CreateEvent() | ||
{ | ||
return new LogEvent(DateTimeOffset.MaxValue, LogEventLevel.Error, null, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is used by quite a lot of downstream packages, we'll need to follow the old pattern of adding an overload with the old parameter set, marked
[EditorBrowsable(EditorBrowsableState.Never)]
with no default values applied.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I figured using a default would preserve current behaviour for existing users. Wasn't aware of that attribute