Skip to content

Commit

Permalink
Fix a potential race condition in FileSubscriber (#5032)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismaelhamed authored May 22, 2021
1 parent 5294e9a commit 61d08b7
Showing 1 changed file with 35 additions and 38 deletions.
73 changes: 35 additions & 38 deletions src/core/Akka.Streams/Implementation/IO/FileSubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
using Akka.IO;
using Akka.Streams.Actors;
using Akka.Streams.IO;
using Akka.Util;

namespace Akka.Streams.Implementation.IO
{
Expand All @@ -35,20 +34,20 @@ internal class FileSubscriber : ActorSubscriber
/// <exception cref="ArgumentException">TBD</exception>
/// <returns>TBD</returns>
public static Props Props(
FileInfo f,
TaskCompletionSource<IOResult> completionPromise,
int bufferSize,
long startPosition,
FileInfo f,
TaskCompletionSource<IOResult> completionPromise,
int bufferSize,
long startPosition,
FileMode fileMode,
bool autoFlush = false,
object flushCommand = null)
{
if(bufferSize <= 0)
if (bufferSize <= 0)
throw new ArgumentException($"bufferSize must be > 0 (was {bufferSize})", nameof(bufferSize));
if(startPosition < 0)
if (startPosition < 0)
throw new ArgumentException($"startPosition must be >= 0 (was {startPosition})", nameof(startPosition));

return Actor.Props.Create(()=> new FileSubscriber(f, completionPromise, bufferSize, startPosition, fileMode, autoFlush, flushCommand))
return Actor.Props.Create(() => new FileSubscriber(f, completionPromise, bufferSize, startPosition, fileMode, autoFlush, flushCommand))
.WithDeploy(Deploy.Local);
}

Expand All @@ -74,12 +73,12 @@ public static Props Props(
/// <param name="autoFlush"></param>
/// <param name="flushCommand"></param>
public FileSubscriber(
FileInfo f,
TaskCompletionSource<IOResult> completionPromise,
int bufferSize,
long startPosition,
FileInfo f,
TaskCompletionSource<IOResult> completionPromise,
int bufferSize,
long startPosition,
FileMode fileMode,
bool autoFlush,
bool autoFlush,
object flushCommand)
{
_f = f;
Expand Down Expand Up @@ -111,7 +110,7 @@ protected override void PreStart()
}
catch (Exception ex)
{
_completionPromise.TrySetResult(IOResult.Failed(_bytesWritten, ex));
CloseAndComplete(IOResult.Failed(_bytesWritten, ex));
Cancel();
}
}
Expand All @@ -128,32 +127,23 @@ protected override bool Receive(object message)
case OnNext next:
try
{
var byteString = (ByteString) next.Element;
var byteString = (ByteString)next.Element;
var bytes = byteString.ToArray();
try
{
_chan.Write(bytes, 0, bytes.Length);
_bytesWritten += bytes.Length;
if (_autoFlush)
_chan.Flush(true);
}
catch (Exception ex)
{
_log.Error(ex, $"Tearing down FileSink({_f.FullName}) due to write error.");
_completionPromise.TrySetResult(IOResult.Failed(_bytesWritten, ex));
Context.Stop(Self);
}
_chan.Write(bytes, 0, bytes.Length);
_bytesWritten += bytes.Length;
if (_autoFlush)
_chan.Flush(true);
}
catch (Exception ex)
{
_completionPromise.TrySetResult(IOResult.Failed(_bytesWritten, ex));
CloseAndComplete(IOResult.Failed(_bytesWritten, ex));
Cancel();
}
return true;

case OnError error:
_log.Error(error.Cause, $"Tearing down FileSink({_f.FullName}) due to upstream error");
_completionPromise.TrySetResult(IOResult.Failed(_bytesWritten, error.Cause));
_log.Error(error.Cause, "Tearing down FileSink({0}) due to upstream error", _f.FullName);
CloseAndComplete(IOResult.Failed(_bytesWritten, error.Cause));
Context.Stop(Self);
return true;

Expand All @@ -164,8 +154,8 @@ protected override bool Receive(object message)
}
catch (Exception ex)
{
_completionPromise.TrySetResult(IOResult.Failed(_bytesWritten, ex));
}
CloseAndComplete(IOResult.Failed(_bytesWritten, ex));
}
Context.Stop(Self);
return true;

Expand All @@ -176,8 +166,8 @@ protected override bool Receive(object message)
}
catch (Exception ex)
{
_log.Error(ex, $"Tearing down FileSink({_f.FullName}). File flush failed.");
_completionPromise.TrySetResult(IOResult.Failed(_bytesWritten, ex));
_log.Error(ex, "Tearing down FileSink({0}). File flush failed.", _f.FullName);
CloseAndComplete(IOResult.Failed(_bytesWritten, ex));
Context.Stop(Self);
}
return true;
Expand All @@ -190,18 +180,25 @@ protected override bool Receive(object message)
/// TBD
/// </summary>
protected override void PostStop()
{
CloseAndComplete(IOResult.Success(_bytesWritten));
base.PostStop();
}

private void CloseAndComplete(IOResult result)
{
try
{
// close the channel/file before completing the promise, allowing the
// file to be deleted, which would not work (on some systems) if the
// file is still open for writing
_chan?.Dispose();
_completionPromise.TrySetResult(result);
}
catch (Exception ex)
{
_completionPromise.TrySetResult(IOResult.Failed(_bytesWritten, ex));
}

_completionPromise.TrySetResult(IOResult.Success(_bytesWritten));
base.PostStop();
}
}
}

0 comments on commit 61d08b7

Please sign in to comment.