Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.
/ corefx Public archive

Commit

Permalink
Amortize StreamPipeReader ReadAsync calls
Browse files Browse the repository at this point in the history
  • Loading branch information
benaadams committed Jul 15, 2019
1 parent 929bd81 commit bef7c43
Show file tree
Hide file tree
Showing 4 changed files with 218 additions and 30 deletions.
8 changes: 7 additions & 1 deletion src/System.IO.Pipelines/ref/System.IO.Pipelines.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Configurations>netstandard-Debug;netstandard-Release</Configurations>
<!-- We only plan to use this ref in netcoreapp. For all other netstandard compatible frameworks we should use the lib
asset instead. -->
<PackageTargetFramework Condition="'$(TargetGroup)' == 'netstandard'">netcoreapp2.0</PackageTargetFramework>
<PackageTargetFramework Condition="'$(TargetGroup)' == 'netstandard'">netcoreapp3.0</PackageTargetFramework>
</PropertyGroup>
<ItemGroup>
<Compile Include="System.IO.Pipelines.cs" />
Expand All @@ -16,4 +16,10 @@
<ItemGroup>
<ProjectReference Include="..\..\System.Buffers\ref\System.Buffers.csproj" />
</ItemGroup>
<ItemGroup Condition="'$(TargetGroup)'=='netcoreapp'">
<Reference Include="Microsoft.Bcl.AsyncInterfaces" />
</ItemGroup>
<ItemGroup Condition="'$(TargetGroup)'=='netstandard'">
<ProjectReference Include="..\..\Microsoft.Bcl.AsyncInterfaces\ref\Microsoft.Bcl.AsyncInterfaces.csproj" />
</ItemGroup>
</Project>
4 changes: 3 additions & 1 deletion src/System.IO.Pipelines/src/System.IO.Pipelines.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<ProjectGuid>{1032D5F6-5AE7-4002-A0E4-FEBEADFEA977}</ProjectGuid>
<Configurations>netcoreapp-Debug;netcoreapp-Debug;netcoreapp-Release;netcoreapp-Release;netstandard-Debug;netstandard-Release</Configurations>
<Configurations>netcoreapp-Debug;netcoreapp-Release;netstandard-Debug;netstandard-Release</Configurations>
<DefineConstants Condition="'$(TargetsNetFx)' == 'true'">$(DefineConstants);netstandard</DefineConstants>
</PropertyGroup>
<ItemGroup>
<Compile Include="$(CommonPath)\CoreLib\System\Threading\Tasks\TaskToApm.cs">
Expand Down Expand Up @@ -57,5 +58,6 @@
<Reference Include="System.Threading.Tasks.Extensions" />
<Reference Include="System.Threading.Tasks" />
<Reference Include="System.Threading.ThreadPool" />
<Reference Include="Microsoft.Bcl.AsyncInterfaces" />
</ItemGroup>
</Project>
215 changes: 187 additions & 28 deletions src/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@

using System.Buffers;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;

namespace System.IO.Pipelines
{
internal class StreamPipeReader : PipeReader
internal class StreamPipeReader : PipeReader, IValueTaskSource<ReadResult>
{
internal const int InitialSegmentPoolSize = 4; // 16K
internal const int MaxSegmentPoolSize = 256; // 1MB
Expand All @@ -28,11 +30,19 @@ internal class StreamPipeReader : PipeReader
private BufferSegment _readTail;
private long _bufferedBytes;
private bool _examinedEverything;
private object _lock = new object();
private readonly object _lock = new object();

// Mutable struct! Don't make this readonly
private BufferSegmentStack _bufferSegmentPool;
private bool _leaveOpen;
private readonly bool _leaveOpen;

// State for async reads
private volatile bool _readInProgress;
private readonly Action _onReadCompleted;
private ManualResetValueTaskSourceCore<ReadResult> _readMrvts;
private ValueTaskAwaiter<int> _readAwaiter;
private CancellationToken _readCancellation;
private CancellationTokenRegistration _readRegistration;

/// <summary>
/// Creates a new StreamPipeReader.
Expand All @@ -53,6 +63,7 @@ public StreamPipeReader(Stream readingStream, StreamPipeReaderOptions options)
_pool = options.Pool == MemoryPool<byte>.Shared ? null : options.Pool;
_bufferSize = _pool == null ? options.BufferSize : Math.Min(options.BufferSize, _pool.MaxBufferSize);
_leaveOpen = options.LeaveOpen;
_onReadCompleted = new Action(OnReadCompleted);
}

/// <summary>
Expand All @@ -72,11 +83,7 @@ private CancellationTokenSource InternalTokenSource
{
lock (_lock)
{
if (_internalTokenSource == null)
{
_internalTokenSource = new CancellationTokenSource();
}
return _internalTokenSource;
return (_internalTokenSource ??= new CancellationTokenSource());
}
}
}
Expand Down Expand Up @@ -193,39 +200,59 @@ public override void OnWriterCompleted(Action<Exception, object> callback, objec
}

/// <inheritdoc />
public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
{
// TODO ReadyAsync needs to throw if there are overlapping reads.
ThrowIfCompleted();

// PERF: store InternalTokenSource locally to avoid querying it twice (which acquires a lock)
CancellationTokenSource tokenSource = InternalTokenSource;
if (TryReadInternal(tokenSource, out ReadResult readResult))
if (_readInProgress)
{
return readResult;
// Throw if there are overlapping reads; throwing unwrapped as it suggests last read was not awaited
// so we surface it directly rather than wrapped in a Task (as this one will likely also not be awaited).
ThrowConcurrentReadsNotSupported();
}
_readInProgress = true;

if (_isStreamCompleted)
bool isAsync = false;
try
{
return new ReadResult(buffer: default, isCanceled: false, isCompleted: true);
}

var reg = new CancellationTokenRegistration();
if (cancellationToken.CanBeCanceled)
{
reg = cancellationToken.UnsafeRegister(state => ((StreamPipeReader)state).Cancel(), this);
}
ThrowIfCompleted();

// PERF: store InternalTokenSource locally to avoid querying it twice (which acquires a lock)
CancellationTokenSource tokenSource = InternalTokenSource;
if (TryReadInternal(tokenSource, out ReadResult readResult))
{
return new ValueTask<ReadResult>(readResult);
}

if (_isStreamCompleted)
{
return new ValueTask<ReadResult>(new ReadResult(buffer: default, isCanceled: false, isCompleted: true));
}

var reg = new CancellationTokenRegistration();
if (cancellationToken.CanBeCanceled)
{
reg = cancellationToken.UnsafeRegister(state => ((StreamPipeReader)state).Cancel(), this);
}

using (reg)
{
var isCanceled = false;
try
{
AllocateReadTail();

Memory<byte> buffer = _readTail.AvailableMemory.Slice(_readTail.End);

int length = await InnerStream.ReadAsync(buffer, tokenSource.Token).ConfigureAwait(false);
ValueTask<int> resultTask = InnerStream.ReadAsync(buffer, tokenSource.Token);
int length;
if (resultTask.IsCompletedSuccessfully)
{
length = resultTask.Result;
}
else
{
isAsync = true;
// Need to go async
return CompleteReadAsync(resultTask, cancellationToken, reg);
}

Debug.Assert(length + _readTail.End <= _readTail.AvailableMemory.Length);

Expand All @@ -252,8 +279,27 @@ public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancella
}

}
finally
{
if (!isAsync)
{
reg.Dispose();
}
}

return new ReadResult(GetCurrentReadOnlySequence(), isCanceled, _isStreamCompleted);
return new ValueTask<ReadResult>(new ReadResult(GetCurrentReadOnlySequence(), isCanceled, _isStreamCompleted));
}
catch (Exception ex)
{
return new ValueTask<ReadResult>(Task.FromException<ReadResult>(ex));
}
finally
{
if (!isAsync)
{
Debug.Assert(_readInProgress);
_readInProgress = false;
}
}
}

Expand All @@ -275,6 +321,11 @@ private void ThrowIfCompleted()

public override bool TryRead(out ReadResult result)
{
if (_readInProgress)
{
ThrowConcurrentReadsNotSupported();
}

ThrowIfCompleted();

return TryReadInternal(InternalTokenSource, out result);
Expand Down Expand Up @@ -362,5 +413,113 @@ private void Cancel()
{
InternalTokenSource.Cancel();
}

static void ThrowConcurrentReadsNotSupported()
{
throw new InvalidOperationException($"Concurrent reads are not supported; await the {nameof(ValueTask<ReadResult>)} before starting next read.");
}

private ValueTask<ReadResult> CompleteReadAsync(ValueTask<int> task, CancellationToken cancellationToken, CancellationTokenRegistration reg)
{
Debug.Assert(_readInProgress, "Read not in progress");

_readCancellation = cancellationToken;
_readRegistration = reg;

_readAwaiter = task.GetAwaiter();

return new ValueTask<ReadResult>(this, _readMrvts.Version);
}

private void OnReadCompleted()
{
try
{
int length = _readAwaiter.GetResult();

Debug.Assert(length + _readTail.End <= _readTail.AvailableMemory.Length);

_readTail.End += length;
_bufferedBytes += length;

if (length == 0)
{
_isStreamCompleted = true;
}

_readMrvts.SetResult(new ReadResult(GetCurrentReadOnlySequence(), isCanceled: false, _isStreamCompleted));
}
catch (OperationCanceledException oce)
{
// Get the source before clearing (and replacing)
CancellationTokenSource tokenSource = InternalTokenSource;
ClearCancellationToken();
if (tokenSource.IsCancellationRequested && !_readCancellation.IsCancellationRequested)
{
// Catch cancellation and translate it into setting isCanceled = true
_readMrvts.SetResult(new ReadResult(GetCurrentReadOnlySequence(), isCanceled: true, _isStreamCompleted));
}
else
{
_readMrvts.SetException(oce);
}
}
catch (Exception ex)
{
_readMrvts.SetException(ex);
}
finally
{
_readRegistration.Dispose();
_readRegistration = default;
}
}

ReadResult IValueTaskSource<ReadResult>.GetResult(short token)
{
ValidateReading();
ReadResult result = _readMrvts.GetResult(token);

_readCancellation = default;
_readAwaiter = default;
_readMrvts.Reset();

Debug.Assert(_readInProgress);
_readInProgress = false;

return result;
}

ValueTaskSourceStatus IValueTaskSource<ReadResult>.GetStatus(short token)
=> _readMrvts.GetStatus(token);

void IValueTaskSource<ReadResult>.OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
{
ValidateReading();
_readMrvts.OnCompleted(continuation, state, token, flags);

if ((flags & ValueTaskSourceOnCompletedFlags.FlowExecutionContext) != 0)
{
_readAwaiter.OnCompleted(_onReadCompleted);
}
else
{
_readAwaiter.UnsafeOnCompleted(_onReadCompleted);
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void ValidateReading()
{
if (!_readInProgress)
{
ThrowReadNotInProgress();
}

static void ThrowReadNotInProgress()
{
throw new InvalidOperationException("Read not in progress");
}
}
}
}
21 changes: 21 additions & 0 deletions src/System.IO.Pipelines/tests/StreamPipeReaderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,27 @@ async Task DoAsyncWrites(PipeWriter writer, int[] bufferSizes)
pipe.Reader.Complete();
}

[Fact]
public void ConcurrentReadsThrow()
{
var pipe = new Pipe();
var options = new StreamPipeReaderOptions(bufferSize: 4096);
PipeReader reader = PipeReader.Create(pipe.Reader.AsStream(), options);

ValueTask<ReadResult> valueTask = reader.ReadAsync();

Assert.False(valueTask.IsCompleted);

Assert.Throws<InvalidOperationException>(() => reader.ReadAsync());
Assert.Throws<InvalidOperationException>(() => reader.TryRead(out _));

Assert.False(valueTask.IsCompleted);

reader.Complete();

pipe.Reader.Complete();
}

[Theory]
[MemberData(nameof(ReadSettings))]
public async Task ReadWithDifferentSettings(int bytesInBuffer, int bufferSize, int minimumReadSize, int[] readBufferSizes)
Expand Down

0 comments on commit bef7c43

Please sign in to comment.