Skip to content

Commit

Permalink
Harden ChildProcessReader against IOException and fix handling of can…
Browse files Browse the repository at this point in the history
…cellation.

Closes #138.
  • Loading branch information
alexrp committed Jan 8, 2024
1 parent 9b2f156 commit 9897234
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 28 deletions.
15 changes: 9 additions & 6 deletions src/core/Processes/ChildProcess.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,20 @@ internal ChildProcess(ChildProcessBuilder builder)
_in = new(_process.StandardInput);

var tasks = new List<Task>(2);
var cancellationToken = builder.CancellationToken;

if (redirectOut)
tasks.Add((_out = new(_process.StandardOutput, builder.StandardOutBufferSize)).Completion);
tasks.Add(
(_out = new(_process.StandardOutput, builder.StandardOutBufferSize, cancellationToken)).Completion);

if (redirectError)
tasks.Add((_error = new(_process.StandardError, builder.StandardErrorBufferSize)).Completion);
tasks.Add(
(_error = new(_process.StandardError, builder.StandardErrorBufferSize, cancellationToken)).Completion);

// We register the cancellation callback here, after it has started, so that we do not potentially kill the
// process prior to or during startup.
ctr = builder.CancellationToken.UnsafeRegister(
static (state, token) => Unsafe.As<ChildProcess>(state!)._completion.TrySetCanceled(token), this);
// We register the cancellation callback here, after the process has started, so that we do not potentially kill
// the process prior to or during startup.
ctr = cancellationToken.UnsafeRegister(
static (@this, token) => Unsafe.As<ChildProcess>(@this!)._completion.TrySetCanceled(token), this);

Completion = Task.Run(async () =>
{
Expand Down
57 changes: 35 additions & 22 deletions src/core/Processes/ChildProcessReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,50 @@ public sealed class ChildProcessReader

private readonly Pipe _pipe;

internal ChildProcessReader(StreamReader reader, int bufferSize)
internal ChildProcessReader(StreamReader reader, int bufferSize, CancellationToken cancellationToken)
{
_pipe = new(new(pauseWriterThreshold: bufferSize, useSynchronizationContext: false));
Stream = new SynchronizedStream(_pipe.Reader.AsStream());
Encoding = reader.CurrentEncoding;
TextReader = new SynchronizedTextReader(new StreamReader(Stream, Encoding, false, ReadBufferSize));
TextReader = new SynchronizedTextReader(
new StreamReader(Stream, Encoding, detectEncodingFromByteOrderMarks: false, ReadBufferSize));

var readStream = reader.BaseStream;
var writeStream = _pipe.Writer.AsStream();

Completion = Task.Run(async () =>
{
var array = ArrayPool<byte>.Shared.Rent(ReadBufferSize);
try
Completion = Task.Run(
async () =>
{
int read;
var array = ArrayPool<byte>.Shared.Rent(ReadBufferSize);
// TODO: Review exceptions that can be thrown here.
while ((read = await readStream.ReadAsync(array).ConfigureAwait(false)) != 0)
await writeStream.WriteAsync(array.AsMemory(..read)).ConfigureAwait(false);
}
finally
{
ArrayPool<byte>.Shared.Return(array);
// Users of the Stream and TextReader properties might block forever if we do not signal completion on
// the write end of the pipe. We do not signal completion on the read end of the pipe since we want
// users to be able to read all buffered data after the process exits.
await _pipe.Writer.CompleteAsync().ConfigureAwait(false);
}
});
try
{
int read;
while ((read = await readStream.ReadAsync(array, cancellationToken).ConfigureAwait(false)) != 0)
await writeStream.WriteAsync(array.AsMemory(..read), cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// The user requested cancellation of the process. We need to pass a cancellation token and handle
// this case explicitly because, if the user is not actually reading the output we are buffering
// here, the WriteAsync() call above may end up blocking forever, meaning we would never loop around
// to the next ReadAsync() call that is expected to fail.
}
catch (IOException)
{
// The child process either exited or closed the pipe. Either way, treat it as EOF.
}
finally
{
ArrayPool<byte>.Shared.Return(array);
// Users of the Stream and TextReader properties might block forever if we do not signal completion
// on the write end of the pipe. We do not signal completion on the read end of the pipe since we
// want users to be able to read all buffered data after the process exits.
await _pipe.Writer.CompleteAsync().ConfigureAwait(false);
}
},
CancellationToken.None);
}
}

0 comments on commit 9897234

Please sign in to comment.