Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.
/ corefx Public archive

Adjustments to cancellation on StreamPipeReader #39228

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 127 additions & 70 deletions src/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@

using System.Buffers;
using System.Diagnostics;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;

namespace System.IO.Pipelines
{
internal class StreamPipeReader : PipeReader
internal class StreamPipeReader : PipeReader, IValueTaskSource<ReadResult>
{
internal const int InitialSegmentPoolSize = 4; // 16K
internal const int MaxSegmentPoolSize = 256; // 1MB
Expand All @@ -18,16 +20,18 @@ internal class StreamPipeReader : PipeReader
private readonly int _minimumReadThreshold;
private readonly MemoryPool<byte> _pool;

private CancellationTokenSource _internalTokenSource;
private bool _isReaderCompleted;
private bool _isStreamCompleted;
private PipeAwaitable _awaitable;
private Task _streamReadTask = Task.CompletedTask;
private ExceptionDispatchInfo _edi;
private readonly CancellationTokenSource _completeCts = new CancellationTokenSource();

private BufferSegment _readHead;
private int _readIndex;

private BufferSegment _readTail;
private long _bufferedBytes;
private bool _examinedEverything;
private object _lock = new object();

// Mutable struct! Don't make this readonly
Expand All @@ -53,6 +57,7 @@ public StreamPipeReader(Stream readingStream, StreamPipeReaderOptions options)
_pool = options.Pool == MemoryPool<byte>.Shared ? null : options.Pool;
_bufferSize = _pool == null ? options.BufferSize : Math.Min(options.BufferSize, _pool.MaxBufferSize);
_leaveOpen = options.LeaveOpen;
_awaitable = new PipeAwaitable(completed: false, useSynchronizationContext: true);
}

/// <summary>
Expand All @@ -66,21 +71,6 @@ public override void AdvanceTo(SequencePosition consumed)
AdvanceTo(consumed, consumed);
}

private CancellationTokenSource InternalTokenSource
{
get
{
lock (_lock)
{
if (_internalTokenSource == null)
{
_internalTokenSource = new CancellationTokenSource();
}
return _internalTokenSource;
}
}
}

/// <inheritdoc />
public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
{
Expand All @@ -91,7 +81,7 @@ public override void AdvanceTo(SequencePosition consumed, SequencePosition exami

private void AdvanceTo(BufferSegment consumedSegment, int consumedIndex, BufferSegment examinedSegment, int examinedIndex)
{
if (consumedSegment == null || examinedSegment == null)
if (consumedSegment == null || examinedSegment == null || !_streamReadTask.IsCompleted)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is the reader getting a non-null consumedSegment/examinedSegment for an unfinished/canceled read anyway? Can we at least do some validation to verify the caller isn't trying to actual advance the cursor in this case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we return the existing buffer for cancelled reads. You could have buffered data on a previous read then cancelled the next read.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, and you're just ignoring that without advancing the read head and returning. You should add a test for this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For what exactly?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling AdvanceTo with a real cursor during an ongoing call to InnerStream.ReadAsyc. It's super broken right now. If you don't want to provid solid support it, at least throw in that case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just so I’m clear, the scenario is I have data buffered, and I cancel the read and call Advance with a real cursor? Yes that seems like something that should be handled. I just need to make sure to we don’t clear the memory in the pending read

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. That's the scenario I was thinking of.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good

{
return;
}
Expand All @@ -110,13 +100,13 @@ private void AdvanceTo(BufferSegment consumedSegment, int consumedIndex, BufferS

Debug.Assert(_bufferedBytes >= 0);

_examinedEverything = false;
var examinedEverything = false;

if (examinedSegment == _readTail)
{
// If we examined everything, we force ReadAsync to actually read from the underlying stream
// instead of returning a ReadResult from TryRead.
_examinedEverything = examinedIndex == _readTail.End;
examinedEverything = examinedIndex == _readTail.End;
}

// Two cases here:
Expand Down Expand Up @@ -154,12 +144,23 @@ private void AdvanceTo(BufferSegment consumedSegment, int consumedIndex, BufferS
ReturnSegmentUnsynchronized(returnStart);
returnStart = next;
}

if (examinedEverything)
{
_awaitable.SetUncompleted();
}
}

/// <inheritdoc />
public override void CancelPendingRead()
{
InternalTokenSource.Cancel();
CompletionData completionData;
lock (_lock)
{
_awaitable.Cancel(out completionData);
}

DispatchCompletion(completionData);
}

/// <inheritdoc />
Expand All @@ -172,6 +173,15 @@ public override void Complete(Exception exception = null)

_isReaderCompleted = true;

// Make an attempt to cancel any call to Stream.ReadAsync
_completeCts.Cancel();
Copy link
Member

@halter73 halter73 Jul 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're going to allow PipeReader.Complete() to be called during an ongoing call to PipeReader.ReadAsync(), we should ensure that the ongoing ReadAsync calls complete prior to disposing the inner stream and returning memory pool blocks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn’t trying to guard against that, it’s trying to guard against the fact that we have a dangling ReadAsync to the Stream going on, not misuse of the existing API

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it’s trying to guard against the fact that we have a dangling ReadAsync to the Stream going on

I know. And that dangling Stream.ReadAsync() call must have been made by a dangling PipeReader.ReadAsync() call. If we don't await the call Stream.ReadAsync() here in Complete before disposing the inner Stream, it's just as much of a misuse of the API as not cancelling the read at all and trying to dispose the inner Stream.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I’m not going to guard against that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This guard happens when the API is being used properly. The other is just buggy code. While it’s easy to misuse I’m not in this PR adding guards for explicit misuse

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind. You're thinking of the CancelPendingRead case.

In that case, I'm even more concerned about properly awaiting the completion of ReadAsyc(), because as you say there's nothing the caller can do.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea but I’m not willing to block so..... 😬. Either we make this change or we don’t. Also cancellation is a best effort in general so no guarantees this even works without this change (but at least you’re stuck in a ReadAsyc call). The fact that we don’t always dispose the stream is also a problem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably should block. It should be rare for there to be any ongoing reads at this point, and in practice, Cancel should cause the read to finish very quickly anyway.

I understand that returning the blocks in the background after Complete exits is not an option. CompleteAsync cannot come soon enough.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rather not introduce sync over async.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ll open another issue on adding CompleteAsync to both the reader and writer


if (!_leaveOpen)
{
InnerStream.Dispose();
}

// Return the memory after potentially disposing the stream
BufferSegment segment = _readHead;
while (segment != null)
{
Expand All @@ -181,10 +191,7 @@ public override void Complete(Exception exception = null)
returnSegment.ResetMemory();
}

if (!_leaveOpen)
{
InnerStream.Dispose();
}
_completeCts.Dispose();
}

/// <inheritdoc />
Expand All @@ -193,39 +200,59 @@ public override void OnWriterCompleted(Action<Exception, object> callback, objec
}

/// <inheritdoc />
public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
{
// TODO ReadyAsync needs to throw if there are overlapping reads.
ThrowIfCompleted();

// PERF: store InternalTokenSource locally to avoid querying it twice (which acquires a lock)
CancellationTokenSource tokenSource = InternalTokenSource;
if (TryReadInternal(tokenSource, out ReadResult readResult))
if (TryReadInternal(out ReadResult readResult))
Copy link
Member

@halter73 halter73 Jul 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noticed this. It wasn't even only TryRead that was affected 😨

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an easy fix would be to avoid operations while there’s a pending read (return false from TryRead)

Copy link
Member

@halter73 halter73 Jul 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That might be a reasonable short-term solution. It would be more correct to able to access previously unconsumed bytes if any, but I doubt being unable to do so would break many consumers. I guess we could file a followup issue.

{
return readResult;
return new ValueTask<ReadResult>(readResult);
}

if (_isStreamCompleted)
{
return new ReadResult(buffer: default, isCanceled: false, isCompleted: true);
return new ValueTask<ReadResult>(new ReadResult(buffer: default, isCanceled: false, isCompleted: true));
}

if (_streamReadTask.IsCompleted)
{
_streamReadTask = ReadStreamAsync(cancellationToken);

// Completed the stream read inline because it was synchronous and there was no exception thrown
if (_streamReadTask.IsCompleted && _edi == null)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is checking _edi here done to avoid calling GetResult?

{
return new ValueTask<ReadResult>(GetReadResult());
}
}

var reg = new CancellationTokenRegistration();
return new ValueTask<ReadResult>(this, 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might as well add a short field and count up prior to this call e.g.

_token++;
return new ValueTask<ReadResult>(this, _token);

Then check if token passed to GetResult(short token) is the same and throw if not?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don’t check the token today in other the default pipe, I’m not going to add it here unless we do it in both. I’d store then token as part of the PipeAwaitable state

}

private async Task ReadStreamAsync(CancellationToken cancellationToken)
{
CancellationTokenSource cts = null;
CancellationToken effectiveToken;

if (cancellationToken.CanBeCanceled)
{
reg = cancellationToken.UnsafeRegister(state => ((StreamPipeReader)state).Cancel(), this);
cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _completeCts.Token);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to create a LinkedTokenSource instead of doing the cancel pattern we had before?

effectiveToken = cts.Token;
}
else
{
effectiveToken = _completeCts.Token;
}

using (reg)
using (cts)
{
var isCanceled = false;
try
{
AllocateReadTail();

Memory<byte> buffer = _readTail.AvailableMemory.Slice(_readTail.End);

int length = await InnerStream.ReadAsync(buffer, tokenSource.Token).ConfigureAwait(false);
int length = await InnerStream.ReadAsync(buffer, effectiveToken).ConfigureAwait(false);

Debug.Assert(length + _readTail.End <= _readTail.AvailableMemory.Length);

Expand All @@ -237,32 +264,20 @@ public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancella
_isStreamCompleted = true;
}
}
catch (OperationCanceledException)
catch (Exception ex)
{
ClearCancellationToken();

if (tokenSource.IsCancellationRequested && !cancellationToken.IsCancellationRequested)
{
// Catch cancellation and translate it into setting isCanceled = true
isCanceled = true;
}
else
{
throw;
}

_edi = ExceptionDispatchInfo.Capture(ex);
}

return new ReadResult(GetCurrentReadOnlySequence(), isCanceled, _isStreamCompleted);
}
}

private void ClearCancellationToken()
{
CompletionData completionData;

lock (_lock)
{
_internalTokenSource = null;
_awaitable.Complete(out completionData);
}

DispatchCompletion(completionData);
}

private void ThrowIfCompleted()
Expand All @@ -277,22 +292,14 @@ public override bool TryRead(out ReadResult result)
{
ThrowIfCompleted();

return TryReadInternal(InternalTokenSource, out result);
return TryReadInternal(out result);
}

private bool TryReadInternal(CancellationTokenSource source, out ReadResult result)
private bool TryReadInternal(out ReadResult result)
{
bool isCancellationRequested = source.IsCancellationRequested;
if (isCancellationRequested || _bufferedBytes > 0 && (!_examinedEverything || _isStreamCompleted))
if (_awaitable.IsCompleted || (_bufferedBytes > 0 && (_awaitable.IsCompleted || _isStreamCompleted)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You know _awaitable is not completed in the second half of this condition:

Suggested change
if (_awaitable.IsCompleted || (_bufferedBytes > 0 && (_awaitable.IsCompleted || _isStreamCompleted)))
if (_awaitable.IsCompleted || (_bufferedBytes > 0 && _isStreamCompleted))

{
if (isCancellationRequested)
{
ClearCancellationToken();
}

ReadOnlySequence<byte> buffer = _readHead == null ? default : GetCurrentReadOnlySequence();

result = new ReadResult(buffer, isCancellationRequested, _isStreamCompleted);
result = GetReadResult();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So how is StreamPipeReader.TryRead() supposed to work? If no one ever calls StreamPipeReader.ReadAsync() it just forever no-ops and returns false, right? That doesn't seem good.

And if you have to call StreamPipeReader.ReadAsync() anyway, why not just await the result? If you try not to await the result StreamPipeReader.ReadAsync() and then call StreamPipeReader.TryRead() until it returns true, not only is that inefficient, it's likely to be racy since StreamPipeReader doesn't lock when reading or updating fields like _readHead/Tail.

Copy link
Member

@benaadams benaadams Jul 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TryRead is when you may have not consumed all of the last read?

e.g. You consume the first of 16 requests, or 4kB file read up to first delimiter, line terminator etc; move to start of loop, TryRead, else await ReadAsync

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure TryRead is supposed to be a non-blocking, polling-based read API. You could use it to consume the rest of the last read, but that's a very limited subset of its intended functionality.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behavior is mirroring what exists in DefaultPipeReader. It will return false if the everything was examined. This PR is not changing that behavior so focus on the cancellation changes and file other issues for behavior changes outside the scope of this PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This absolutely does not mirror the behavior that exists in DefaultPipeReader.

With DefaultPipeReader, you could only call TryRead and still get data. That's not the case with StreamPipeReader today or after this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I disagree with the premise, it's very much like IOCP. There's an API to wait for completion and there's an API to poll for completion. The polling API doesn't need to kick off the operation, some other operation can do that, now I understand that when there's no proactive operation going on it's not as useful but I don't agree that it invalidates the contract.

It's why your issue titled "Make StreamPipeReader.TryRead usable" is well intentioned but also a little trolly.

Copy link
Member

@halter73 halter73 Jul 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, and if data is available, it'll return true.

Except that's not the case. The client could be uploading all the request data it wants, and I'd never see it if I only called TryRead().

This is a regression caused by the introduction of StreamPipeReader, since before the StreamPipeReader there'd always be a reader calling InnerStream.ReadAsync() if the app was actually consuming the body whether or not it was through TryRead() or ReadAsync().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except that's not the case. The client could be uploading all the request data it wants, and I'd never see it if I only called TryRead().

That doesn't matter because there's no proactive read going on. So as far as the pipe is concerned, there is no data.

This is a regression caused by the introduction of StreamPipeReader, since before the StreamPipeReader there'd always be a reader calling InnerStream.ReadAsync() if the app was actually consuming the body whether or not it was through TryRead() or ReadAsync().

It's not a regression, it's a different implementation.

Copy link
Member

@halter73 halter73 Jul 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a regression, it's a different implementation.

With the risk of a consumer that worked fine in one implementation looping infinitely with the other. And we're swapping out or planning to swap out one implementation with the other in a lot of places.

If we're just going to decide to implement two different behaviors, there should be a very good reason for it.

The crazy thing is you already did all the real work to match the existing DefaultPipeReader.TryRead() implementation by implementing CancelPendingRead() the way you did.

That doesn't matter because there's no proactive read going on. So as far as the pipe is concerned, there is no data.

What about as far as the app is concerned? The Stream, the real source of truth this PipeReader is supposed to represent, might have data available. The StreamPipeReader simply hasn't checked.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is still a draft, I'll still contemplating changing Kestrel. There are always going to be differences in the implementations. It's just unfortunate that Kestrel depended on so many implementation details but that's what happens when you only have a single implementation.

return true;
}

Expand Down Expand Up @@ -358,9 +365,59 @@ private void ReturnSegmentUnsynchronized(BufferSegment segment)
}
}

private void Cancel()
private ReadResult GetReadResult()
{
var isCancellationRequested = _awaitable.ObserveCancellation();

ReadOnlySequence<byte> buffer = _readHead == null ? default : GetCurrentReadOnlySequence();

return new ReadResult(buffer, isCancellationRequested, _isStreamCompleted);
}

public ReadResult GetResult(short token)
{
ExceptionDispatchInfo edi = _edi;
_edi = null;
edi?.Throw();

return GetReadResult();
}

public ValueTaskSourceStatus GetStatus(short token)
{
InternalTokenSource.Cancel();
if (_awaitable.IsCompleted)
{
if (_edi != null)
{
return ValueTaskSourceStatus.Faulted;
}

return ValueTaskSourceStatus.Succeeded;
}
return ValueTaskSourceStatus.Pending;
}

public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
{
CompletionData completionData;
bool doubleCompletion;

lock (_lock)
{
_awaitable.OnCompleted(continuation, state, flags, out completionData, out doubleCompletion);
}

DispatchCompletion(completionData);
}

private static void DispatchCompletion(in CompletionData completionData)
{
if (completionData.Completion is null)
{
return;
}

PipeScheduler.ThreadPool.UnsafeSchedule(completionData.Completion, completionData.CompletionState);
}
}
}
9 changes: 6 additions & 3 deletions src/System.IO.Pipelines/tests/StreamPipeReaderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -269,13 +269,16 @@ public async Task ReadCanBeCanceledViaCancelPendingReadWhenReadIsAsync()
PipeReader reader = PipeReader.Create(stream);

ValueTask<ReadResult> task = reader.ReadAsync();

reader.CancelPendingRead();
ReadResult readResult = await task;
Assert.True(readResult.IsCanceled);
reader.AdvanceTo(readResult.Buffer.End);

stream.WaitForReadTask.TrySetResult(null);

ReadResult readResult = await task;
Assert.True(readResult.IsCanceled);
readResult = await reader.ReadAsync();
Assert.True(readResult.IsCompleted);

reader.Complete();
}

Expand Down