Skip to content

Commit

Permalink
Merge pull request #21 from cocowalla/buffer-strategy
Browse files Browse the repository at this point in the history
Fulfills #20 - add option to block when the queue is full, instead of dropping events
  • Loading branch information
nblumhardt authored Aug 7, 2017
2 parents 1a6689f + 3e5a186 commit dd2e03e
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 66 deletions.
30 changes: 26 additions & 4 deletions src/Serilog.Sinks.Async/LoggerConfigurationAsyncExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System;
using System.ComponentModel;
using Serilog.Configuration;

using Serilog.Sinks.Async;

namespace Serilog
Expand All @@ -16,18 +16,40 @@ public static class LoggerConfigurationAsyncExtensions
/// <param name="loggerSinkConfiguration">The <see cref="LoggerSinkConfiguration"/> being configured.</param>
/// <param name="configure">An action that configures the wrapped sink.</param>
/// <param name="bufferSize">The size of the concurrent queue used to feed the background worker thread. If
/// the thread is unable to process events quickly enough and the queue is filled, subsequent events will be
/// the thread is unable to process events quickly enough and the queue is filled, subsequent events will be
/// dropped until room is made in the queue.</param>
/// <returns>A <see cref="LoggerConfiguration"/> allowing configuration to continue.</returns>
[EditorBrowsable(EditorBrowsableState.Never)]
public static LoggerConfiguration Async(
this LoggerSinkConfiguration loggerSinkConfiguration,
Action<LoggerSinkConfiguration> configure,
int bufferSize = 10000)
int bufferSize)
{
return loggerSinkConfiguration.Async(configure, bufferSize, false);
}

/// <summary>
/// Configure a sink to be invoked asynchronously, on a background worker thread.
/// </summary>
/// <param name="loggerSinkConfiguration">The <see cref="LoggerSinkConfiguration"/> being configured.</param>
/// <param name="configure">An action that configures the wrapped sink.</param>
/// <param name="bufferSize">The size of the concurrent queue used to feed the background worker thread. If
/// the thread is unable to process events quickly enough and the queue is filled, depending on
/// <paramref name="blockWhenFull"/> the queue will block or subsequent events will be dropped until
/// room is made in the queue.</param>
/// <param name="blockWhenFull">Block when the queue is full, instead of dropping events.</param>
/// <returns>A <see cref="LoggerConfiguration"/> allowing configuration to continue.</returns>
public static LoggerConfiguration Async(
this LoggerSinkConfiguration loggerSinkConfiguration,
Action<LoggerSinkConfiguration> configure,
int bufferSize = 10000,
bool blockWhenFull = false)
{
return LoggerSinkConfiguration.Wrap(
loggerSinkConfiguration,
wrappedSink => new BackgroundWorkerSink(wrappedSink, bufferSize),
wrappedSink => new BackgroundWorkerSink(wrappedSink, bufferSize, blockWhenFull),
configure);
}

}
}
54 changes: 31 additions & 23 deletions src/Serilog.Sinks.Async/Sinks/Async/BackgroundWorkerSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,55 +12,63 @@ sealed class BackgroundWorkerSink : ILogEventSink, IDisposable
{
readonly ILogEventSink _pipeline;
readonly int _bufferCapacity;
volatile bool _disposed;
readonly CancellationTokenSource _cancel = new CancellationTokenSource();
readonly bool _blockWhenFull;
readonly BlockingCollection<LogEvent> _queue;
readonly Task _worker;

public BackgroundWorkerSink(ILogEventSink pipeline, int bufferCapacity)
public BackgroundWorkerSink(ILogEventSink pipeline, int bufferCapacity, bool blockWhenFull)
{
if (pipeline == null) throw new ArgumentNullException(nameof(pipeline));
if (bufferCapacity <= 0) throw new ArgumentOutOfRangeException(nameof(bufferCapacity));
_pipeline = pipeline;
_bufferCapacity = bufferCapacity;
_blockWhenFull = blockWhenFull;
_queue = new BlockingCollection<LogEvent>(_bufferCapacity);
_worker = Task.Factory.StartNew(Pump, CancellationToken.None, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
}

public void Emit(LogEvent logEvent)
{
// The disposed check is racy, but only ensures we don't prevent flush from
// completing by pushing more events.
if (!_disposed && !_queue.TryAdd(logEvent))
SelfLog.WriteLine("{0} unable to enqueue, capacity {1}", typeof(BackgroundWorkerSink), _bufferCapacity);
if (this._queue.IsAddingCompleted)
return;

try
{
if (_blockWhenFull)
{
_queue.Add(logEvent);
}
else
{
if (!_queue.TryAdd(logEvent))
SelfLog.WriteLine("{0} unable to enqueue, capacity {1}", typeof(BackgroundWorkerSink), _bufferCapacity);
}
}
catch (InvalidOperationException)
{
// Thrown in the event of a race condition when we try to add another event after
// CompleteAdding has been called
}
}

public void Dispose()
{
_disposed = true;
_cancel.Cancel();
_worker.Wait();
// Prevent any more events from being added
_queue.CompleteAdding();

// Allow queued events to be flushed
_worker.Wait();

(_pipeline as IDisposable)?.Dispose();
// _cancel not disposed, because it will make _cancel.Cancel() non-idempotent
}

void Pump()
{
try
{
try
{
while (true)
{
var next = _queue.Take(_cancel.Token);
_pipeline.Emit(next);
}
}
catch (OperationCanceledException)
foreach (var next in _queue.GetConsumingEnumerable())
{
LogEvent next;
while (_queue.TryTake(out next))
_pipeline.Emit(next);
_pipeline.Emit(next);
}
}
catch (Exception ex)
Expand Down
147 changes: 114 additions & 33 deletions test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkSpec.cs
Original file line number Diff line number Diff line change
@@ -1,83 +1,164 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Serilog.Core;
using Serilog.Events;
using Serilog.Parsing;
using Serilog.Sinks.Async.Tests;
using Serilog.Sinks.Async.Tests.Support;
using Xunit;

namespace Serilog.Sinks.Async.Tests
{
public class BackgroundWorkerSinkSpec : IDisposable
public class BackgroundWorkerSinkSpec
{
readonly Logger _logger;
readonly MemorySink _innerSink;
readonly BackgroundWorkerSink _sink;

public BackgroundWorkerSinkSpec()
{
_innerSink = new MemorySink();
var logger = new LoggerConfiguration().WriteTo.Sink(_innerSink).CreateLogger();
_sink = new BackgroundWorkerSink(logger, 10000);
}

public void Dispose()
{
_sink.Dispose();
_logger = new LoggerConfiguration().WriteTo.Sink(_innerSink).CreateLogger();
}

[Fact]
public void WhenCtorWithNullSink_ThenThrows()
{
Assert.Throws<ArgumentNullException>(() => new BackgroundWorkerSink(null, 10000));
Assert.Throws<ArgumentNullException>(() => new BackgroundWorkerSink(null, 10000, false));
}

[Fact]
public async Task WhenEmitSingle_ThenRelaysToInnerSink()
{
var logEvent = CreateEvent();
_sink.Emit(logEvent);
using (var sink = this.CreateSinkWithDefaultOptions())
{
var logEvent = CreateEvent();

sink.Emit(logEvent);

await Task.Delay(TimeSpan.FromSeconds(3));
await Task.Delay(TimeSpan.FromSeconds(3));

Assert.Equal(1, _innerSink.Events.Count);
Assert.Equal(1, _innerSink.Events.Count);
}
}

[Fact]
public async Task WhenInnerEmitThrows_ThenContinuesRelaysToInnerSink()
{
_innerSink.ThrowAfterCollecting = true;

var events = new List<LogEvent>
using (var sink = this.CreateSinkWithDefaultOptions())
{
CreateEvent(),
CreateEvent(),
CreateEvent()
};
events.ForEach(e => _sink.Emit(e));
_innerSink.ThrowAfterCollecting = true;

var events = new List<LogEvent>
{
CreateEvent(),
CreateEvent(),
CreateEvent()
};
events.ForEach(e => sink.Emit(e));

await Task.Delay(TimeSpan.FromSeconds(3));
await Task.Delay(TimeSpan.FromSeconds(3));

Assert.Equal(3, _innerSink.Events.Count);
Assert.Equal(3, _innerSink.Events.Count);
}
}

[Fact]
public async Task WhenEmitMultipleTimes_ThenRelaysToInnerSink()
{
var events = new List<LogEvent>
using (var sink = this.CreateSinkWithDefaultOptions())
{
CreateEvent(),
CreateEvent(),
CreateEvent()
};
var events = new List<LogEvent>
{
CreateEvent(),
CreateEvent(),
CreateEvent()
};
events.ForEach(e => { sink.Emit(e); });

await Task.Delay(TimeSpan.FromSeconds(3));

Assert.Equal(3, _innerSink.Events.Count);
}
}

events.ForEach(e => { _sink.Emit(e); });
[Fact]
public async Task WhenQueueFull_ThenDropsEvents()
{
using (var sink = new BackgroundWorkerSink(_logger, 1, false))
{
// Cause a delay when emmitting to the inner sink, allowing us to fill the queue to capacity
// after the first event is popped
_innerSink.DelayEmit = TimeSpan.FromMilliseconds(300);

var events = new List<LogEvent>
{
CreateEvent(),
CreateEvent(),
CreateEvent(),
CreateEvent(),
CreateEvent()
};
events.ForEach(e =>
{
var sw = Stopwatch.StartNew();
sink.Emit(e);
sw.Stop();

Assert.True(sw.ElapsedMilliseconds < 200, "Should not block the caller when the queue is full");
});

// If we *weren't* dropped events, the delay in the inner sink would mean the 5 events would take
// at least 15 seconds to process
await Task.Delay(TimeSpan.FromSeconds(2));

// Events should be dropped
Assert.Equal(2, _innerSink.Events.Count);
}
}

await Task.Delay(TimeSpan.FromSeconds(3));
[Fact]
public async Task WhenQueueFull_ThenBlocks()
{
using (var sink = new BackgroundWorkerSink(_logger, 1, true))
{
// Cause a delay when emmitting to the inner sink, allowing us to fill the queue to capacity
// after the first event is popped
_innerSink.DelayEmit = TimeSpan.FromMilliseconds(300);

var events = new List<LogEvent>
{
CreateEvent(),
CreateEvent(),
CreateEvent()
};

int i = 0;
events.ForEach(e =>
{
var sw = Stopwatch.StartNew();
sink.Emit(e);
sw.Stop();

// Emit should return immediately the first time, since the queue is not yet full. On
// subsequent calls, the queue should be full, so we should be blocked
if (i > 0)
{
Assert.True(sw.ElapsedMilliseconds > 200, "Should block the caller when the queue is full");
}
});

await Task.Delay(TimeSpan.FromSeconds(2));

// No events should be dropped
Assert.Equal(3, _innerSink.Events.Count);
}
}

Assert.Equal(3, _innerSink.Events.Count);
private BackgroundWorkerSink CreateSinkWithDefaultOptions()
{
return new BackgroundWorkerSink(_logger, 10000, false);
}

private static LogEvent CreateEvent()
Expand Down
7 changes: 1 addition & 6 deletions test/Serilog.Sinks.Async.Tests/BackgroundWorkerSinkTests.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
using System;
using System.Threading;
using Serilog.Core;
using Serilog.Events;
using Serilog.Parsing;
using Serilog.Sinks.Async.Tests.Support;
using Serilog.Sinks.Async.Tests.Support;
using Xunit;

namespace Serilog.Sinks.Async.Tests
Expand Down
5 changes: 5 additions & 0 deletions test/Serilog.Sinks.Async.Tests/Support/MemorySink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,21 @@
using Serilog.Core;
using System.Collections.Concurrent;
using System;
using System.Threading.Tasks;

namespace Serilog.Sinks.Async.Tests.Support
{
public class MemorySink : ILogEventSink
{
public ConcurrentBag<LogEvent> Events { get; } = new ConcurrentBag<LogEvent>();
public bool ThrowAfterCollecting { get; set; }
public TimeSpan? DelayEmit { get; set; }

public void Emit(LogEvent logEvent)
{
if (DelayEmit.HasValue)
Task.Delay(DelayEmit.Value).Wait();

Events.Add(logEvent);

if (ThrowAfterCollecting)
Expand Down

0 comments on commit dd2e03e

Please sign in to comment.