Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
use dedicated writer thread, 1.3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed May 25, 2016
1 parent d392e2c commit 85af578
Show file tree
Hide file tree
Showing 17 changed files with 363 additions and 128 deletions.
5 changes: 3 additions & 2 deletions .nuget/EtwStream.InProcess.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@
<package xmlns="http://schemas.microsoft.com/packaging/2010/07/nuspec.xsd">
<metadata>
<id>EtwStream.InProcess</id>
<version>1.2.5</version>
<version>1.3.0</version>
<title>EtwStream.InProcess</title>
<authors>neuecc</authors>
<owners>neuecc</owners>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>Subset of EtwStream, only for InProcess logging, does not dependent TraceEvent.</description>
<releaseNotes>
<![CDATA[
Create directory if path does not exists at using FileSink.
FileSink uses dedicated writer thread.
Fix write empty line when message is buffered.
]]>
</releaseNotes>
<language>en-US</language>
Expand Down
8 changes: 4 additions & 4 deletions .nuget/EtwStream.LinqPad.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,24 @@
<package xmlns="http://schemas.microsoft.com/packaging/2010/07/nuspec.xsd">
<metadata>
<id>EtwStream.LinqPad</id>
<version>1.2.5</version>
<version>1.3.0</version>
<title>EtwStream.LinqPad</title>
<authors>neuecc</authors>
<owners>neuecc</owners>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>Monitoring ETW is very hard. Now LINQPad is log viewer, you can dump ETW stream.</description>
<releaseNotes>
<![CDATA[
Create directory if path does not exists at using FileSink.
Update TraceEvent 1.0.41
FileSink uses dedicated writer thread.
Fix write empty line when message is buffered.
]]>
</releaseNotes>
<language>en-US</language>
<licenseUrl>http://opensource.org/licenses/MIT</licenseUrl>
<projectUrl>https://github.com/neuecc/EtwStream</projectUrl>
<tags>ETW TraceEvent EventSource LINQPad</tags>
<dependencies>
<dependency id="EtwStream" version="1.2.4" />
<dependency id="EtwStream" version="1.3.0" />
</dependencies>
</metadata>
<files>
Expand Down
6 changes: 3 additions & 3 deletions .nuget/EtwStream.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@
<package xmlns="http://schemas.microsoft.com/packaging/2010/07/nuspec.xsd">
<metadata>
<id>EtwStream</id>
<version>1.2.5</version>
<version>1.3.0</version>
<title>EtwStream</title>
<authors>neuecc</authors>
<owners>neuecc</owners>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>Logs are event streams. EtwStream provides In-Process and Out-of-Process ObservableEventListener. Everything can compose and output to anywhere by Reactive Extensions.</description>
<releaseNotes>
<![CDATA[
Create directory if path does not exists at using FileSink.
Update TraceEvent 1.0.41
FileSink uses dedicated writer thread.
Fix write empty line when message is buffered.
]]>
</releaseNotes>
<language>en-US</language>
Expand Down
6 changes: 3 additions & 3 deletions .nuget/push.bat
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
nuget push EtwStream.1.2.5.nupkg
nuget push EtwStream.InProcess.1.2.5.nupkg
nuget push EtwStream.LinqPad.1.2.5.nupkg
nuget push EtwStream.1.3.0.nupkg
nuget push EtwStream.InProcess.1.3.0.nupkg
nuget push EtwStream.LinqPad.1.3.0.nupkg
4 changes: 2 additions & 2 deletions EtwStream.Core/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@

[assembly: Guid("d8e5b849-29cd-4484-b8a8-d53fab9cb794")]

[assembly: AssemblyVersion("1.2.5.0")]
[assembly: AssemblyFileVersion("1.2.5.0")]
[assembly: AssemblyVersion("1.3.0.0")]
[assembly: AssemblyFileVersion("1.3.0.0")]
4 changes: 2 additions & 2 deletions EtwStream.InProcess/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.2.5.0")]
[assembly: AssemblyFileVersion("1.2.5.0")]
[assembly: AssemblyVersion("1.3.0.0")]
[assembly: AssemblyFileVersion("1.3.0.0")]
4 changes: 2 additions & 2 deletions EtwStream.LinqPad/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@

[assembly: Guid("7482323f-72a0-4f8d-a3b5-d15822d32627")]

[assembly: AssemblyVersion("1.2.5.0")]
[assembly: AssemblyFileVersion("1.2.5.0")]
[assembly: AssemblyVersion("1.3.0.0")]
[assembly: AssemblyFileVersion("1.3.0.0")]
4 changes: 2 additions & 2 deletions EtwStream.Service/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@

[assembly: Guid("0d5b543d-5156-43bc-aedb-97d3404a8dae")]

[assembly: AssemblyVersion("1.2.5.0")]
[assembly: AssemblyFileVersion("1.2.5.0")]
[assembly: AssemblyVersion("1.3.0.0")]
[assembly: AssemblyFileVersion("1.3.0.0")]
1 change: 1 addition & 0 deletions EtwStream/EtwStream.projitems
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
<Compile Include="$(MSBuildThisFileDirectory)Sinks\AsyncFileWriter.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Sinks\ConsoleSink.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Sinks\DebugSink.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Sinks\EnumerableExtensions.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Sinks\FileSink.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Sinks\RollingFileSink.cs" />
<Compile Include="$(MSBuildThisFileDirectory)Sinks\SinkBase.cs" />
Expand Down
98 changes: 36 additions & 62 deletions EtwStream/Sinks/AsyncFileWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace EtwStream
{
internal class AsyncFileWriter
{
readonly ConcurrentQueue<string> q = new ConcurrentQueue<string>();
readonly BlockingCollection<string> q = new BlockingCollection<string>();
readonly object gate = new object();
readonly string sinkName;
readonly FileStream fileStream;
Expand All @@ -23,9 +23,9 @@ internal class AsyncFileWriter
readonly bool autoFlush;
readonly byte[] newLine;

Task lastQueueWorker;
bool isConsuming = false;
readonly Task processingTask;
int isDisposed = 0;
readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();

public string FileName { get; private set; }
public long CurrentStreamLength { get; private set; }
Expand All @@ -39,92 +39,73 @@ public AsyncFileWriter(string sinkName, string fileName, Encoding encoding, bool

this.FileName = fileName;
this.sinkName = sinkName;
this.fileStream = new FileStream(fileName, FileMode.Append, FileAccess.Write, FileShare.ReadWrite, 4096, true); // useAsync:true
this.fileStream = new FileStream(fileName, FileMode.Append, FileAccess.Write, FileShare.ReadWrite, 4096, useAsync: false); // useAsync:false, use dedicated processor
this.encoding = encoding;
this.autoFlush = autoFlush;
this.lastQueueWorker = Task.CompletedTask;

this.newLine = encoding.GetBytes(Environment.NewLine);
this.CurrentStreamLength = fileStream.Length;
}

bool SwitchStartConsume()
{
lock (gate)
{
if (isConsuming)
{
return false;
}
else
{
isConsuming = true;
return true;
}
}
this.processingTask = Task.Factory.StartNew(ConsumeQueue, TaskCreationOptions.LongRunning);
}

async Task ConsumeQueue()
void ConsumeQueue()
{
CONSUME_AGAIN:
while (true)
while (!cancellationTokenSource.IsCancellationRequested)
{
string nextString;
if (q.TryDequeue(out nextString))
try
{
try
if (q.TryTake(out nextString, Timeout.Infinite, cancellationTokenSource.Token))
{
var bytes = encoding.GetBytes(nextString);
CurrentStreamLength += bytes.Length + newLine.Length;
if (!autoFlush)
try
{
await fileStream.WriteAsync(bytes, 0, bytes.Length).ConfigureAwait(false);
fileStream.Write(newLine, 0, newLine.Length);
var bytes = encoding.GetBytes(nextString);
CurrentStreamLength += bytes.Length + newLine.Length;
if (!autoFlush)
{
fileStream.Write(bytes, 0, bytes.Length);
fileStream.Write(newLine, 0, newLine.Length);
}
else
{
fileStream.Write(bytes, 0, bytes.Length);
fileStream.Write(newLine, 0, newLine.Length);
fileStream.Flush();
}
}
else
catch (Exception ex)
{
await fileStream.WriteAsync(bytes, 0, bytes.Length).ConfigureAwait(false);
fileStream.Write(newLine, 0, newLine.Length);
await fileStream.FlushAsync().ConfigureAwait(false);
EtwStreamEventSource.Log.SinkError(sinkName, "FileStream Write/Flush failed", ex.ToString());
}
}
catch (Exception ex)
else
{
EtwStreamEventSource.Log.SinkError(sinkName, "FileStream Write/Flush failed", ex.ToString());
break;
}
}
else
catch (OperationCanceledException)
{
break;
}
}
lock (gate)
{
// inlock, onNext enqued string and now checking isConsuming
if (q.Count == 0)
catch (ObjectDisposedException)
{
isConsuming = false;
}
else
{
goto CONSUME_AGAIN;
break;
}
}
}

public void Enqueue(string value)
{
q.Enqueue(value);
if (SwitchStartConsume())
{
lastQueueWorker = ConsumeQueue();
}
q.Add(value);
}

public List<string> Finalize()
public string[] Finalize()
{
if (Interlocked.Increment(ref isDisposed) == 1)
{
this.lastQueueWorker.Wait();
cancellationTokenSource.Cancel();
processingTask.Wait();
try
{
this.fileStream.Close();
Expand All @@ -135,14 +116,7 @@ public List<string> Finalize()
}

// rest line...
var list = new List<string>();
string r;
while (q.TryDequeue(out r))
{
list.Add(r);
}

return list;
return q.ToArray();
}
return null;
}
Expand Down
25 changes: 25 additions & 0 deletions EtwStream/Sinks/DebugSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ public static IDisposable LogToDebug(this IObservable<TraceEvent> source, Func<T
return source.Subscribe(x => Debug.WriteLine(messageFormatter(x)));
}

public static IDisposable LogToDebug(this IObservable<IList<TraceEvent>> source)
{
return source.Subscribe(xs => xs.FastForEach(x => Debug.WriteLine(x.EventName + ": " + x.DumpPayloadOrMessage())));
}

public static IDisposable LogToDebug(this IObservable<IList<TraceEvent>> source, Func<TraceEvent, string> messageFormatter)
{
return source.Subscribe(xs => xs.FastForEach(x => Debug.WriteLine(messageFormatter(x))));
}

#endif

// EventArgs
Expand All @@ -45,11 +55,26 @@ public static IDisposable LogToDebug(this IObservable<EventWrittenEventArgs> sou
return source.Subscribe(x => Debug.WriteLine(messageFormatter(x)));
}

public static IDisposable LogToDebug(this IObservable<IList<EventWrittenEventArgs>> source)
{
return source.Subscribe(xs => xs.FastForEach(x => Debug.WriteLine(x.EventName + ": " + x.DumpPayloadOrMessage())));
}

public static IDisposable LogToDebug(this IObservable<IList<EventWrittenEventArgs>> source, Func<EventWrittenEventArgs, string> messageFormatter)
{
return source.Subscribe(xs => xs.FastForEach(x => Debug.WriteLine(messageFormatter(x))));
}

// String

public static IDisposable LogToDebug(this IObservable<string> source)
{
return source.Subscribe(x => Debug.WriteLine(x));
}

public static IDisposable LogToDebug(this IObservable<IList<string>> source)
{
return source.Subscribe(xs => xs.FastForEach(x => Debug.WriteLine(x)));
}
}
}
34 changes: 34 additions & 0 deletions EtwStream/Sinks/EnumerableExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace EtwStream
{
internal static class EnumerableExtensions
{
public static void FastForEach<T>(this IList<T> source, Action<T> action)
{
var l = source as List<T>;
if (l != null)
{
l.ForEach(action);
return;
}

var a = source as T[];
if (a != null)
{
for (int i = 0; i < a.Length; i++)
{
action(a[i]);
}
return;
}

foreach (var item in source)
{
action(item);
}
}
}
}
Loading

0 comments on commit 85af578

Please sign in to comment.