From 85af578dd30a1298e4cd3e54bbedbb929b103c6b Mon Sep 17 00:00:00 2001 From: neuecc Date: Wed, 25 May 2016 21:33:44 +0900 Subject: [PATCH] use dedicated writer thread, 1.3.0 --- .nuget/EtwStream.InProcess.nuspec | 5 +- .nuget/EtwStream.LinqPad.nuspec | 8 +- .nuget/EtwStream.nuspec | 6 +- .nuget/push.bat | 6 +- EtwStream.Core/Properties/AssemblyInfo.cs | 4 +- .../Properties/AssemblyInfo.cs | 4 +- EtwStream.LinqPad/Properties/AssemblyInfo.cs | 4 +- EtwStream.Service/Properties/AssemblyInfo.cs | 4 +- EtwStream/EtwStream.projitems | 1 + EtwStream/Sinks/AsyncFileWriter.cs | 98 +++++++------------ EtwStream/Sinks/DebugSink.cs | 25 +++++ EtwStream/Sinks/EnumerableExtensions.cs | 34 +++++++ EtwStream/Sinks/FileSink.cs | 73 +++++++++----- EtwStream/Sinks/RollingFileSink.cs | 81 ++++++++++++--- EtwStream/Sinks/TraceSink.cs | 24 ++++- LoggerPerformance/Program.cs | 73 +++++++++++++- README.md | 41 +++++++- 17 files changed, 363 insertions(+), 128 deletions(-) create mode 100644 EtwStream/Sinks/EnumerableExtensions.cs diff --git a/.nuget/EtwStream.InProcess.nuspec b/.nuget/EtwStream.InProcess.nuspec index ecccd55..2e19c69 100644 --- a/.nuget/EtwStream.InProcess.nuspec +++ b/.nuget/EtwStream.InProcess.nuspec @@ -2,7 +2,7 @@ EtwStream.InProcess - 1.2.5 + 1.3.0 EtwStream.InProcess neuecc neuecc @@ -10,7 +10,8 @@ Subset of EtwStream, only for InProcess logging, does not dependent TraceEvent. en-US diff --git a/.nuget/EtwStream.LinqPad.nuspec b/.nuget/EtwStream.LinqPad.nuspec index 1e1f1bf..c93e418 100644 --- a/.nuget/EtwStream.LinqPad.nuspec +++ b/.nuget/EtwStream.LinqPad.nuspec @@ -2,7 +2,7 @@ EtwStream.LinqPad - 1.2.5 + 1.3.0 EtwStream.LinqPad neuecc neuecc @@ -10,8 +10,8 @@ Monitoring ETW is very hard. Now LINQPad is log viewer, you can dump ETW stream. en-US @@ -19,7 +19,7 @@ Update TraceEvent 1.0.41 https://github.com/neuecc/EtwStream ETW TraceEvent EventSource LINQPad - + diff --git a/.nuget/EtwStream.nuspec b/.nuget/EtwStream.nuspec index 5989839..69048e6 100644 --- a/.nuget/EtwStream.nuspec +++ b/.nuget/EtwStream.nuspec @@ -2,7 +2,7 @@ EtwStream - 1.2.5 + 1.3.0 EtwStream neuecc neuecc @@ -10,8 +10,8 @@ Logs are event streams. EtwStream provides In-Process and Out-of-Process ObservableEventListener. Everything can compose and output to anywhere by Reactive Extensions. en-US diff --git a/.nuget/push.bat b/.nuget/push.bat index 6333635..beabde4 100644 --- a/.nuget/push.bat +++ b/.nuget/push.bat @@ -1,3 +1,3 @@ -nuget push EtwStream.1.2.5.nupkg -nuget push EtwStream.InProcess.1.2.5.nupkg -nuget push EtwStream.LinqPad.1.2.5.nupkg \ No newline at end of file +nuget push EtwStream.1.3.0.nupkg +nuget push EtwStream.InProcess.1.3.0.nupkg +nuget push EtwStream.LinqPad.1.3.0.nupkg \ No newline at end of file diff --git a/EtwStream.Core/Properties/AssemblyInfo.cs b/EtwStream.Core/Properties/AssemblyInfo.cs index 462b85f..3148df2 100644 --- a/EtwStream.Core/Properties/AssemblyInfo.cs +++ b/EtwStream.Core/Properties/AssemblyInfo.cs @@ -14,5 +14,5 @@ [assembly: Guid("d8e5b849-29cd-4484-b8a8-d53fab9cb794")] -[assembly: AssemblyVersion("1.2.5.0")] -[assembly: AssemblyFileVersion("1.2.5.0")] \ No newline at end of file +[assembly: AssemblyVersion("1.3.0.0")] +[assembly: AssemblyFileVersion("1.3.0.0")] \ No newline at end of file diff --git a/EtwStream.InProcess/Properties/AssemblyInfo.cs b/EtwStream.InProcess/Properties/AssemblyInfo.cs index 83541c4..232bdae 100644 --- a/EtwStream.InProcess/Properties/AssemblyInfo.cs +++ b/EtwStream.InProcess/Properties/AssemblyInfo.cs @@ -32,5 +32,5 @@ // You can specify all the values or you can default the Build and Revision Numbers // by using the '*' as shown below: // [assembly: AssemblyVersion("1.0.*")] -[assembly: AssemblyVersion("1.2.5.0")] -[assembly: AssemblyFileVersion("1.2.5.0")] +[assembly: AssemblyVersion("1.3.0.0")] +[assembly: AssemblyFileVersion("1.3.0.0")] diff --git a/EtwStream.LinqPad/Properties/AssemblyInfo.cs b/EtwStream.LinqPad/Properties/AssemblyInfo.cs index b522746..e9a0de5 100644 --- a/EtwStream.LinqPad/Properties/AssemblyInfo.cs +++ b/EtwStream.LinqPad/Properties/AssemblyInfo.cs @@ -14,5 +14,5 @@ [assembly: Guid("7482323f-72a0-4f8d-a3b5-d15822d32627")] -[assembly: AssemblyVersion("1.2.5.0")] -[assembly: AssemblyFileVersion("1.2.5.0")] \ No newline at end of file +[assembly: AssemblyVersion("1.3.0.0")] +[assembly: AssemblyFileVersion("1.3.0.0")] \ No newline at end of file diff --git a/EtwStream.Service/Properties/AssemblyInfo.cs b/EtwStream.Service/Properties/AssemblyInfo.cs index e284aa7..4cf429c 100644 --- a/EtwStream.Service/Properties/AssemblyInfo.cs +++ b/EtwStream.Service/Properties/AssemblyInfo.cs @@ -15,5 +15,5 @@ [assembly: Guid("0d5b543d-5156-43bc-aedb-97d3404a8dae")] -[assembly: AssemblyVersion("1.2.5.0")] -[assembly: AssemblyFileVersion("1.2.5.0")] +[assembly: AssemblyVersion("1.3.0.0")] +[assembly: AssemblyFileVersion("1.3.0.0")] \ No newline at end of file diff --git a/EtwStream/EtwStream.projitems b/EtwStream/EtwStream.projitems index 244b26b..a73239f 100644 --- a/EtwStream/EtwStream.projitems +++ b/EtwStream/EtwStream.projitems @@ -18,6 +18,7 @@ + diff --git a/EtwStream/Sinks/AsyncFileWriter.cs b/EtwStream/Sinks/AsyncFileWriter.cs index ef56f5e..38f9a70 100644 --- a/EtwStream/Sinks/AsyncFileWriter.cs +++ b/EtwStream/Sinks/AsyncFileWriter.cs @@ -14,7 +14,7 @@ namespace EtwStream { internal class AsyncFileWriter { - readonly ConcurrentQueue q = new ConcurrentQueue(); + readonly BlockingCollection q = new BlockingCollection(); readonly object gate = new object(); readonly string sinkName; readonly FileStream fileStream; @@ -23,9 +23,9 @@ internal class AsyncFileWriter readonly bool autoFlush; readonly byte[] newLine; - Task lastQueueWorker; - bool isConsuming = false; + readonly Task processingTask; int isDisposed = 0; + readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); public string FileName { get; private set; } public long CurrentStreamLength { get; private set; } @@ -39,92 +39,73 @@ public AsyncFileWriter(string sinkName, string fileName, Encoding encoding, bool this.FileName = fileName; this.sinkName = sinkName; - this.fileStream = new FileStream(fileName, FileMode.Append, FileAccess.Write, FileShare.ReadWrite, 4096, true); // useAsync:true + this.fileStream = new FileStream(fileName, FileMode.Append, FileAccess.Write, FileShare.ReadWrite, 4096, useAsync: false); // useAsync:false, use dedicated processor this.encoding = encoding; this.autoFlush = autoFlush; - this.lastQueueWorker = Task.CompletedTask; + this.newLine = encoding.GetBytes(Environment.NewLine); this.CurrentStreamLength = fileStream.Length; - } - bool SwitchStartConsume() - { - lock (gate) - { - if (isConsuming) - { - return false; - } - else - { - isConsuming = true; - return true; - } - } + this.processingTask = Task.Factory.StartNew(ConsumeQueue, TaskCreationOptions.LongRunning); } - async Task ConsumeQueue() + void ConsumeQueue() { - CONSUME_AGAIN: - while (true) + while (!cancellationTokenSource.IsCancellationRequested) { string nextString; - if (q.TryDequeue(out nextString)) + try { - try + if (q.TryTake(out nextString, Timeout.Infinite, cancellationTokenSource.Token)) { - var bytes = encoding.GetBytes(nextString); - CurrentStreamLength += bytes.Length + newLine.Length; - if (!autoFlush) + try { - await fileStream.WriteAsync(bytes, 0, bytes.Length).ConfigureAwait(false); - fileStream.Write(newLine, 0, newLine.Length); + var bytes = encoding.GetBytes(nextString); + CurrentStreamLength += bytes.Length + newLine.Length; + if (!autoFlush) + { + fileStream.Write(bytes, 0, bytes.Length); + fileStream.Write(newLine, 0, newLine.Length); + } + else + { + fileStream.Write(bytes, 0, bytes.Length); + fileStream.Write(newLine, 0, newLine.Length); + fileStream.Flush(); + } } - else + catch (Exception ex) { - await fileStream.WriteAsync(bytes, 0, bytes.Length).ConfigureAwait(false); - fileStream.Write(newLine, 0, newLine.Length); - await fileStream.FlushAsync().ConfigureAwait(false); + EtwStreamEventSource.Log.SinkError(sinkName, "FileStream Write/Flush failed", ex.ToString()); } } - catch (Exception ex) + else { - EtwStreamEventSource.Log.SinkError(sinkName, "FileStream Write/Flush failed", ex.ToString()); + break; } } - else + catch (OperationCanceledException) { break; } - } - lock (gate) - { - // inlock, onNext enqued string and now checking isConsuming - if (q.Count == 0) + catch (ObjectDisposedException) { - isConsuming = false; - } - else - { - goto CONSUME_AGAIN; + break; } } } public void Enqueue(string value) { - q.Enqueue(value); - if (SwitchStartConsume()) - { - lastQueueWorker = ConsumeQueue(); - } + q.Add(value); } - public List Finalize() + public string[] Finalize() { if (Interlocked.Increment(ref isDisposed) == 1) { - this.lastQueueWorker.Wait(); + cancellationTokenSource.Cancel(); + processingTask.Wait(); try { this.fileStream.Close(); @@ -135,14 +116,7 @@ public List Finalize() } // rest line... - var list = new List(); - string r; - while (q.TryDequeue(out r)) - { - list.Add(r); - } - - return list; + return q.ToArray(); } return null; } diff --git a/EtwStream/Sinks/DebugSink.cs b/EtwStream/Sinks/DebugSink.cs index 196e995..8b820fd 100644 --- a/EtwStream/Sinks/DebugSink.cs +++ b/EtwStream/Sinks/DebugSink.cs @@ -31,6 +31,16 @@ public static IDisposable LogToDebug(this IObservable source, Func Debug.WriteLine(messageFormatter(x))); } + public static IDisposable LogToDebug(this IObservable> source) + { + return source.Subscribe(xs => xs.FastForEach(x => Debug.WriteLine(x.EventName + ": " + x.DumpPayloadOrMessage()))); + } + + public static IDisposable LogToDebug(this IObservable> source, Func messageFormatter) + { + return source.Subscribe(xs => xs.FastForEach(x => Debug.WriteLine(messageFormatter(x)))); + } + #endif // EventArgs @@ -45,11 +55,26 @@ public static IDisposable LogToDebug(this IObservable sou return source.Subscribe(x => Debug.WriteLine(messageFormatter(x))); } + public static IDisposable LogToDebug(this IObservable> source) + { + return source.Subscribe(xs => xs.FastForEach(x => Debug.WriteLine(x.EventName + ": " + x.DumpPayloadOrMessage()))); + } + + public static IDisposable LogToDebug(this IObservable> source, Func messageFormatter) + { + return source.Subscribe(xs => xs.FastForEach(x => Debug.WriteLine(messageFormatter(x)))); + } + // String public static IDisposable LogToDebug(this IObservable source) { return source.Subscribe(x => Debug.WriteLine(x)); } + + public static IDisposable LogToDebug(this IObservable> source) + { + return source.Subscribe(xs => xs.FastForEach(x => Debug.WriteLine(x))); + } } } \ No newline at end of file diff --git a/EtwStream/Sinks/EnumerableExtensions.cs b/EtwStream/Sinks/EnumerableExtensions.cs new file mode 100644 index 0000000..a0063cc --- /dev/null +++ b/EtwStream/Sinks/EnumerableExtensions.cs @@ -0,0 +1,34 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace EtwStream +{ + internal static class EnumerableExtensions + { + public static void FastForEach(this IList source, Action action) + { + var l = source as List; + if (l != null) + { + l.ForEach(action); + return; + } + + var a = source as T[]; + if (a != null) + { + for (int i = 0; i < a.Length; i++) + { + action(a[i]); + } + return; + } + + foreach (var item in source) + { + action(item); + } + } + } +} diff --git a/EtwStream/Sinks/FileSink.cs b/EtwStream/Sinks/FileSink.cs index 5321202..82ca567 100644 --- a/EtwStream/Sinks/FileSink.cs +++ b/EtwStream/Sinks/FileSink.cs @@ -21,6 +21,13 @@ public static class FileSink #if TRACE_EVENT + /// + /// Write to text file. + /// + /// Event source. + /// Converter of message per line. + /// String encoding. + /// If true, call Flush on every write. public static IDisposable LogToFile(this IObservable source, string fileName, Func messageFormatter, Encoding encoding, bool autoFlush) { var sink = new TraceEventSink(fileName, messageFormatter, encoding, autoFlush); @@ -28,6 +35,13 @@ public static IDisposable LogToFile(this IObservable source, string return sink.CreateLinkedDisposable(subscription); } + /// + /// Write to text file. + /// + /// Event source. + /// Converter of message per line. + /// String encoding. + /// If true, call Flush on every write. public static IDisposable LogToFile(this IObservable> source, string fileName, Func messageFormatter, Encoding encoding, bool autoFlush) { var sink = new TraceEventSink(fileName, messageFormatter, encoding, autoFlush); @@ -39,6 +53,13 @@ public static IDisposable LogToFile(this IObservable> source, // EventArgs + /// + /// Write to text file. + /// + /// Event source. + /// Converter of message per line. + /// String encoding. + /// If true, call Flush on every write. public static IDisposable LogToFile(this IObservable source, string fileName, Func messageFormatter, Encoding encoding, bool autoFlush) { var sink = new EventWrittenEventArgsSink(fileName, messageFormatter, encoding, autoFlush); @@ -46,6 +67,13 @@ public static IDisposable LogToFile(this IObservable sour return sink.CreateLinkedDisposable(subscription); } + /// + /// Write to text file. + /// + /// Event source. + /// Converter of message per line. + /// String encoding. + /// If true, call Flush on every write. public static IDisposable LogToFile(this IObservable> source, string fileName, Func messageFormatter, Encoding encoding, bool autoFlush) { var sink = new EventWrittenEventArgsSink(fileName, messageFormatter, encoding, autoFlush); @@ -55,6 +83,12 @@ public static IDisposable LogToFile(this IObservable + /// Write to text file. + /// + /// Event source. + /// String encoding. + /// If true, call Flush on every write. public static IDisposable LogToFile(this IObservable source, string fileName, Encoding encoding, bool autoFlush) { var sink = new StringSink(fileName, encoding, autoFlush); @@ -62,6 +96,12 @@ public static IDisposable LogToFile(this IObservable source, string file return sink.CreateLinkedDisposable(subscription); } + /// + /// Write to text file. + /// + /// Event source. + /// String encoding. + /// If true, call Flush on every write. public static IDisposable LogToFile(this IObservable> source, string fileName, Encoding encoding, bool autoFlush) { var sink = new StringSink(fileName, encoding, autoFlush); @@ -77,11 +117,13 @@ class TraceEventSink : SinkBase { readonly Func messageFormatter; readonly AsyncFileWriter asyncFileWriter; + readonly Action onNext; public TraceEventSink(string fileName, Func messageFormatter, Encoding encoding, bool autoFlush) { this.asyncFileWriter = new AsyncFileWriter(nameof(FileSink), fileName, encoding, autoFlush); this.messageFormatter = messageFormatter; + this.onNext = OnNext; } public override void OnNext(TraceEvent value) @@ -101,17 +143,7 @@ public override void OnNext(TraceEvent value) public override void OnNext(IList value) { - string v; - try - { - v = string.Join(Environment.NewLine, value.Select(x => messageFormatter(x))); - } - catch (Exception ex) - { - EtwStreamEventSource.Log.SinkError(nameof(FileSink), "messageFormatter convert failed", ex.ToString()); - return; - } - asyncFileWriter.Enqueue(v); + value.FastForEach(onNext); } public override void Dispose() @@ -126,11 +158,13 @@ class EventWrittenEventArgsSink : SinkBase { readonly Func messageFormatter; readonly AsyncFileWriter asyncFileWriter; + readonly Action onNext; public EventWrittenEventArgsSink(string fileName, Func messageFormatter, Encoding encoding, bool autoFlush) { this.asyncFileWriter = new AsyncFileWriter(nameof(FileSink), fileName, encoding, autoFlush); this.messageFormatter = messageFormatter; + this.onNext = OnNext; } public override void OnNext(EventWrittenEventArgs value) @@ -151,17 +185,7 @@ public override void OnNext(EventWrittenEventArgs value) public override void OnNext(IList value) { - string v; - try - { - v = string.Join(Environment.NewLine, value.Select(x => messageFormatter(x))); - } - catch (Exception ex) - { - EtwStreamEventSource.Log.SinkError(nameof(FileSink), "messageFormatter convert failed", ex.ToString()); - return; - } - asyncFileWriter.Enqueue(v); + value.FastForEach(onNext); } public override void Dispose() @@ -173,10 +197,12 @@ public override void Dispose() class StringSink : SinkBase { readonly AsyncFileWriter asyncFileWriter; + readonly Action onNext; public StringSink(string fileName, Encoding encoding, bool autoFlush) { this.asyncFileWriter = new AsyncFileWriter(nameof(FileSink), fileName, encoding, autoFlush); + this.onNext = OnNext; } public override void OnNext(string value) @@ -186,8 +212,7 @@ public override void OnNext(string value) public override void OnNext(IList value) { - string v = string.Join(Environment.NewLine, value); - asyncFileWriter.Enqueue(v); + value.FastForEach(onNext); } public override void Dispose() diff --git a/EtwStream/Sinks/RollingFileSink.cs b/EtwStream/Sinks/RollingFileSink.cs index 6a8060c..8092295 100644 --- a/EtwStream/Sinks/RollingFileSink.cs +++ b/EtwStream/Sinks/RollingFileSink.cs @@ -22,6 +22,16 @@ public static class RollingFileSink #if TRACE_EVENT + /// + /// Write to text file, file is rolled. + /// + /// Event source. + /// Selector of output file name. DateTime is date of file open time, int is number sequence. + /// Pattern of rolling identifier. DateTime is write time of message. If pattern is different roll new file. + /// Size of start next file. + /// Converter of message per line. + /// String encoding. + /// If true, call Flush on every write. public static IDisposable LogToRollingFile(this IObservable source, Func fileNameSelector, Func timestampPattern, int rollSizeKB, Func messageFormatter, Encoding encoding, bool autoFlush) { var sink = new TraceEventSink(fileNameSelector, timestampPattern, rollSizeKB, messageFormatter, encoding, autoFlush); @@ -29,6 +39,16 @@ public static IDisposable LogToRollingFile(this IObservable source, return sink.CreateLinkedDisposable(subscription); } + /// + /// Write to text file, file is rolled. + /// + /// Event source. + /// Selector of output file name. DateTime is date of file open time, int is number sequence. + /// Pattern of rolling identifier. DateTime is write time of message. If pattern is different roll new file. + /// Size of start next file. + /// Converter of message per line. + /// String encoding. + /// If true, call Flush on every write. public static IDisposable LogToRollingFile(this IObservable> source, Func fileNameSelector, Func timestampPattern, int rollSizeKB, Func messageFormatter, Encoding encoding, bool autoFlush) { var sink = new TraceEventSink(fileNameSelector, timestampPattern, rollSizeKB, messageFormatter, encoding, autoFlush); @@ -40,6 +60,16 @@ public static IDisposable LogToRollingFile(this IObservable> s // EventArgs + /// + /// Write to text file, file is rolled. + /// + /// Event source. + /// Selector of output file name. DateTime is date of file open time, int is number sequence. + /// Pattern of rolling identifier. DateTime is write time of message. If pattern is different roll new file. + /// Size of start next file. + /// Converter of message per line. + /// String encoding. + /// If true, call Flush on every write. public static IDisposable LogToRollingFile(this IObservable source, Func fileNameSelector, Func timestampPattern, int rollSizeKB, Func messageFormatter, Encoding encoding, bool autoFlush) { var sink = new EventWrittenEventArgsSink(fileNameSelector, timestampPattern, rollSizeKB, messageFormatter, encoding, autoFlush); @@ -47,6 +77,16 @@ public static IDisposable LogToRollingFile(this IObservable + /// Write to text file, file is rolled. + /// + /// Event source. + /// Selector of output file name. DateTime is date of file open time, int is number sequence. + /// Pattern of rolling identifier. DateTime is write time of message. If pattern is different roll new file. + /// Size of start next file. + /// Converter of message per line. + /// String encoding. + /// If true, call Flush on every write. public static IDisposable LogToRollingFile(this IObservable> source, Func fileNameSelector, Func timestampPattern, int rollSizeKB, Func messageFormatter, Encoding encoding, bool autoFlush) { var sink = new EventWrittenEventArgsSink(fileNameSelector, timestampPattern, rollSizeKB, messageFormatter, encoding, autoFlush); @@ -56,6 +96,15 @@ public static IDisposable LogToRollingFile(this IObservable + /// Write to text file, file is rolled. + /// + /// Event source. + /// Selector of output file name. DateTime is date of file open time, int is number sequence. + /// Pattern of rolling identifier. DateTime is write time of message. If pattern is different roll new file. + /// Size of start next file. + /// String encoding. + /// If true, call Flush on every write. public static IDisposable LogToRollingFile(this IObservable source, Func fileNameSelector, Func timestampPattern, int rollSizeKB, Encoding encoding, bool autoFlush) { var sink = new StringSink(fileNameSelector, timestampPattern, rollSizeKB, encoding, autoFlush); @@ -63,6 +112,15 @@ public static IDisposable LogToRollingFile(this IObservable source, Func return sink.CreateLinkedDisposable(subscription); } + /// + /// Write to text file, file is rolled. + /// + /// Event source. + /// Selector of output file name. DateTime is date of file open time, int is number sequence. + /// Pattern of rolling identifier. DateTime is write time of message. If pattern is different roll new file. + /// Size of start next file. + /// String encoding. + /// If true, call Flush on every write. public static IDisposable LogToRollingFile(this IObservable> source, Func fileNameSelector, Func timestampPattern, int rollSizeKB, Encoding encoding, bool autoFlush) { var sink = new StringSink(fileNameSelector, timestampPattern, rollSizeKB, encoding, autoFlush); @@ -83,6 +141,7 @@ abstract class RollingFileSinkBase : SinkBase readonly Encoding encoding; readonly bool autoFlush; readonly long rollSizeInBytes; + readonly Action onNextCore; string currentTimestampPattern; @@ -102,6 +161,7 @@ public RollingFileSinkBase( this.rollSizeInBytes = rollSizeKB * 1024; this.encoding = encoding; this.autoFlush = autoFlush; + this.onNextCore = OnNextCore; ValidateFileNameSelector(); } @@ -193,7 +253,7 @@ protected void CheckFileRolling() break; } - List safe; + string[] safe; try { safe = disposeTarget?.Finalize(); // block! @@ -246,28 +306,21 @@ static int ExtractCurrentSequence(string fileName) public override void OnNext(T value) { CheckFileRolling(); - - string v; - try - { - v = messageFormatter(value); - } - catch (Exception ex) - { - EtwStreamEventSource.Log.SinkError(nameof(RollingFileSink), "messageFormatter convert failed", ex.ToString()); - return; - } - asyncFileWriter.Enqueue(v); + OnNextCore(value); } public override void OnNext(IList value) { CheckFileRolling(); + value.FastForEach(onNextCore); + } + void OnNextCore(T value) + { string v; try { - v = string.Join(Environment.NewLine, value.Select(x => messageFormatter(x))); + v = messageFormatter(value); } catch (Exception ex) { diff --git a/EtwStream/Sinks/TraceSink.cs b/EtwStream/Sinks/TraceSink.cs index f523881..8832669 100644 --- a/EtwStream/Sinks/TraceSink.cs +++ b/EtwStream/Sinks/TraceSink.cs @@ -31,6 +31,16 @@ public static IDisposable LogToTrace(this IObservable source, Func Trace.WriteLine(messageFormatter(x))); } + public static IDisposable LogToTrace(this IObservable> source) + { + return source.Subscribe(xs => xs.FastForEach(x => Trace.WriteLine(x.EventName + ": " + x.DumpPayloadOrMessage()))); + } + + public static IDisposable LogToTrace(this IObservable> source, Func messageFormatter) + { + return source.Subscribe(xs => xs.FastForEach(x => Trace.WriteLine(messageFormatter(x)))); + } + #endif // EventArgs @@ -45,6 +55,16 @@ public static IDisposable LogToTrace(this IObservable sou return source.Subscribe(x => Trace.WriteLine(messageFormatter(x))); } + public static IDisposable LogToTrace(this IObservable> source) + { + return source.Subscribe(xs => xs.FastForEach(x => Trace.WriteLine(x.EventName + ": " + x.DumpPayloadOrMessage()))); + } + + public static IDisposable LogToTrace(this IObservable> source, Func messageFormatter) + { + return source.Subscribe(xs => xs.FastForEach(x => Trace.WriteLine(messageFormatter(x)))); + } + // String public static IDisposable LogToTrace(this IObservable source) @@ -52,9 +72,9 @@ public static IDisposable LogToTrace(this IObservable source) return source.Subscribe(x => Trace.WriteLine(x)); } - public static Task LogToTraceAsync(this IObservable source) + public static IDisposable LogToTrace(this IObservable> source) { - return source.Do(x => Trace.WriteLine(x)).DefaultIfEmpty().ToTask(); + return source.Subscribe(xs => xs.FastForEach(x => Trace.WriteLine(x))); } } } \ No newline at end of file diff --git a/LoggerPerformance/Program.cs b/LoggerPerformance/Program.cs index 9d8c902..02d7c25 100644 --- a/LoggerPerformance/Program.cs +++ b/LoggerPerformance/Program.cs @@ -11,6 +11,7 @@ using Microsoft.Practices.EnterpriseLibrary.SemanticLogging.Formatters; using Serilog; using Serilog.Sinks; +using System.Reactive.Linq; namespace LoggerPerformance { @@ -18,11 +19,24 @@ class Program { static void Main(string[] args) { - NLoog.Run(); - // Slab.Run(); + //EtwStream.RollCheck(); + + EtwStream.Test2(); + + + //Console.WriteLine("EtwStream"); //EtwStream.Run(); - // Serilooog.Run(); - //EtwStream.Test(); + + //Console.WriteLine("NLog"); + //NLoog.Run(); + + //Console.WriteLine("Slab"); + //Slab.Run(); + + + //Console.WriteLine("Serilooog"); + //Serilooog.Run(); + ////EtwStream.Test(); } static class EtwStream @@ -77,6 +91,57 @@ public static void Test() cts.Cancel(); subscription.Dispose(); } + + public static void Test2() + { + var cts = new CancellationTokenSource(); + + var subscription = ObservableEventListener.FromEventSource(MyEventSource.Log) + .Buffer(TimeSpan.FromSeconds(5), 1000, cts.Token) + .LogToFile("etw.txt", x => DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss,fff") + " " + "Info " + x.Payload[0], Encoding.UTF8, true); + + Observable.Timer(TimeSpan.FromSeconds(0), TimeSpan.FromSeconds(8)) + .Subscribe(_ => + { + loggger.Info("hogehogehogehoge"); + }); + + + + Console.WriteLine("waiting..."); + Console.ReadLine(); + cts.Cancel(); + subscription.Dispose(); + } + + public static void RollCheck() + { + var cts = new CancellationTokenSource(); + var d = ObservableEventListener.FromEventSource(MyEventSource.Log) + .Buffer(TimeSpan.FromSeconds(5), 1000, cts.Token) + //.LogToFile("hoge.txt", x => (string)x.Payload[0], Encoding.UTF8, false); + .LogToRollingFile((dt, i) => $@"EtwStreamLog\RollingCheck{dt.ToString("yyyyMMdd")}-{i}.log", x => x.ToString("yyyyMMdd"), 10000, x => x.DumpPayloadOrMessage(), Encoding.UTF8, true); + var sw = new Stopwatch(); + sw.Start(); + Task.WhenAll(Enumerable.Range(0, 100) + .Select(async (i) => + { + foreach (var j in Enumerable.Range(0, 10000)) + { + await Task.Run(() => + { + MyEventSource.Log.Info($"abc{i}:{j}"); + }); + } + })) + .Wait(); + + cts.Cancel(); + d.Dispose(); + + sw.Stop(); + Console.WriteLine("elapsed {0}", sw.Elapsed); + } } static class Slab diff --git a/README.md b/README.md index 6264ed3..79c9579 100644 --- a/README.md +++ b/README.md @@ -99,7 +99,7 @@ static void Main() // configure log ObservableEventListener.FromTraceEvent("SampleEventSource") .Buffer(TimeSpan.FromSeconds(5), 1000, cts.Token) - .LogToFile("log.txt", x => $"[{DateTime.Now.ToString("yyyy/MM/dd hh:mm:ss")}][{x.Level}]{x.DumpPayload()}", Encoding.UTF8, autoFlush: false) + .LogToFile("log.txt", x => $"[{DateTime.Now.ToString("yyyy/MM/dd hh:mm:ss")}][{x.Level}]{x.DumpPayload()}", Encoding.UTF8, autoFlush: true) .AddTo(container); // Application Running... @@ -112,6 +112,43 @@ static void Main() `Buffer(TimeSpan, int, CancellationToken)` and `TakeUntil(CancellationToken)` is special helper methods of EtwStream. Please use before Subscribe(LogTo) operator. After Subscribe(LogTo), you can use `AddTo` helper method to `SubscriptionContainer`. It enables wait subscription complete with `CancellationToken`. +LogTo and LogToRollingFile example + +```csharp +ObservableEventListener.FromTraceEvent("SampleEventSource") + .Buffer(TimeSpan.FromSeconds(5), 1000, cts.Token) + .LogTo(xs => + { + // LogTo defines multiple output. + + // RollingFile: + // fileNameSelector's DateTime is date of file open time, int is number sequence. + // timestampPattern's DateTime is write time of message. If pattern is different then roll new file. + var d1 = xs.LogToRollingFile( + fileNameSelector: (dt, i) => $@"{dt.ToString("yyyyMMdd")}Log-{i}.log", + timestampPattern: x => x.ToString("yyyyMMdd"), + rollSizeKB: 10000, + messageFormatter: x => x.DumpPayloadOrMessage(), + encoding: Encoding.UTF8, + autoFlush: false); + + var d2 = xs.LogToConsole(); + var d3 = xs.LogToDebug(); + + return new[] { d1, d2, d3 }; // return all subscriptions + }) + .AddTo(container); +``` + +EventWrittenEventArgs and TraceEvent are extended some methos for format message. + +| Method | Description +| -------------------- | --------------------------------------------------------- +| DumpPayload | Convert payloads to human readable message. +| DumpPayloadOrMessage | If message is exists, return formatted message. Otherwise convert payloads to human readable message. +| DumpFormattedMessage | (EventWrittenEventArgs only), return formatted message. +| ToJson | Return json formatted payloads. + EtwStream.Service --- EtwStream.Service is Out-Of-Process worker of EtwStream. It's built on [Topshelf](https://github.com/Topshelf/Topshelf). You can execute direct(for Console Application Viewer) or install Windows Service(EtwStreamService.exe -install). @@ -129,7 +166,7 @@ Configuration is csx. You can write full Rx and C# codes. for example // Output format is Func ObservableEventListener.FromTraceEvent("SampleEventSource") .Buffer(TimeSpan.FromSeconds(5), 1000, EtwStreamService.TerminateToken) - .LogToFile("log.txt", x => $"[{DateTime.Now.ToString("yyyy/MM/dd hh:mm:ss")}][{x.Level}]{x.DumpPayload()}", Encoding.UTF8, autoFlush: false) + .LogToFile("log.txt", x => $"[{DateTime.Now.ToString("yyyy/MM/dd hh:mm:ss")}][{x.Level}]{x.DumpPayload()}", Encoding.UTF8, autoFlush: true) .AddTo(EtwStreamService.Container); ```