Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.
/ corefx Public archive

Adjustments to cancellation on StreamPipeReader #39228

Closed
wants to merge 2 commits into from

Conversation

davidfowl
Copy link
Member

@davidfowl davidfowl commented Jul 5, 2019

  • Allow CancelPendingRead to yield the ReadAsync without cancelling the underlying Stream.ReadAsync.
  • The biggest complication this introduces is calling Complete after CancelPendingRead when the underlying Stream.ReadAsync hasn't completed yet. We attempt to cancel the read using a token during the call to Complete in hopes that it'll end the call to ReadAsync.

This is driven by this behavior in Kestrel (dotnet/aspnetcore#11804 (comment)) which uses the pipe as an event loop and CancelPendingRead as a way to wake up the loop when no data is being sent. It's basically a synchronization primitive.

PS: I need to add some more tests, but wanted to get some eyes on it to make sure I'm not missing anything obvious.
I also need to handle dispatch to the sync context etc.

- Allow CancelPendingRead to yield the ReadAsync without cancelling the underlying Stream.ReadAsync.
}

var reg = new CancellationTokenRegistration();
return new ValueTask<ReadResult>(this, 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might as well add a short field and count up prior to this call e.g.

_token++;
return new ValueTask<ReadResult>(this, _token);

Then check if token passed to GetResult(short token) is the same and throw if not?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don’t check the token today in other the default pipe, I’m not going to add it here unless we do it in both. I’d store then token as part of the PipeAwaitable state

@@ -172,6 +173,15 @@ public override void Complete(Exception exception = null)

_isReaderCompleted = true;

// Make an attempt to cancel any call to Stream.ReadAsync
_completeCts.Cancel();
Copy link
Member

@halter73 halter73 Jul 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're going to allow PipeReader.Complete() to be called during an ongoing call to PipeReader.ReadAsync(), we should ensure that the ongoing ReadAsync calls complete prior to disposing the inner stream and returning memory pool blocks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn’t trying to guard against that, it’s trying to guard against the fact that we have a dangling ReadAsync to the Stream going on, not misuse of the existing API

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it’s trying to guard against the fact that we have a dangling ReadAsync to the Stream going on

I know. And that dangling Stream.ReadAsync() call must have been made by a dangling PipeReader.ReadAsync() call. If we don't await the call Stream.ReadAsync() here in Complete before disposing the inner Stream, it's just as much of a misuse of the API as not cancelling the read at all and trying to dispose the inner Stream.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I’m not going to guard against that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This guard happens when the API is being used properly. The other is just buggy code. While it’s easy to misuse I’m not in this PR adding guards for explicit misuse

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind. You're thinking of the CancelPendingRead case.

In that case, I'm even more concerned about properly awaiting the completion of ReadAsyc(), because as you say there's nothing the caller can do.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea but I’m not willing to block so..... 😬. Either we make this change or we don’t. Also cancellation is a best effort in general so no guarantees this even works without this change (but at least you’re stuck in a ReadAsyc call). The fact that we don’t always dispose the stream is also a problem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably should block. It should be rare for there to be any ongoing reads at this point, and in practice, Cancel should cause the read to finish very quickly anyway.

I understand that returning the blocks in the background after Complete exits is not an option. CompleteAsync cannot come soon enough.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rather not introduce sync over async.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ll open another issue on adding CompleteAsync to both the reader and writer

@@ -91,7 +81,7 @@ public override void AdvanceTo(SequencePosition consumed, SequencePosition exami

private void AdvanceTo(BufferSegment consumedSegment, int consumedIndex, BufferSegment examinedSegment, int examinedIndex)
{
if (consumedSegment == null || examinedSegment == null)
if (consumedSegment == null || examinedSegment == null || !_streamReadTask.IsCompleted)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is the reader getting a non-null consumedSegment/examinedSegment for an unfinished/canceled read anyway? Can we at least do some validation to verify the caller isn't trying to actual advance the cursor in this case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we return the existing buffer for cancelled reads. You could have buffered data on a previous read then cancelled the next read.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, and you're just ignoring that without advancing the read head and returning. You should add a test for this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For what exactly?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling AdvanceTo with a real cursor during an ongoing call to InnerStream.ReadAsyc. It's super broken right now. If you don't want to provid solid support it, at least throw in that case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just so I’m clear, the scenario is I have data buffered, and I cancel the read and call Advance with a real cursor? Yes that seems like something that should be handled. I just need to make sure to we don’t clear the memory in the pending read

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. That's the scenario I was thinking of.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good

{
bool isCancellationRequested = source.IsCancellationRequested;
if (isCancellationRequested || _bufferedBytes > 0 && (!_examinedEverything || _isStreamCompleted))
if (_awaitable.IsCompleted || (_bufferedBytes > 0 && (_awaitable.IsCompleted || _isStreamCompleted)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You know _awaitable is not completed in the second half of this condition:

Suggested change
if (_awaitable.IsCompleted || (_bufferedBytes > 0 && (_awaitable.IsCompleted || _isStreamCompleted)))
if (_awaitable.IsCompleted || (_bufferedBytes > 0 && _isStreamCompleted))

ReadOnlySequence<byte> buffer = _readHead == null ? default : GetCurrentReadOnlySequence();

result = new ReadResult(buffer, isCancellationRequested, _isStreamCompleted);
result = GetReadResult();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So how is StreamPipeReader.TryRead() supposed to work? If no one ever calls StreamPipeReader.ReadAsync() it just forever no-ops and returns false, right? That doesn't seem good.

And if you have to call StreamPipeReader.ReadAsync() anyway, why not just await the result? If you try not to await the result StreamPipeReader.ReadAsync() and then call StreamPipeReader.TryRead() until it returns true, not only is that inefficient, it's likely to be racy since StreamPipeReader doesn't lock when reading or updating fields like _readHead/Tail.

Copy link
Member

@benaadams benaadams Jul 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TryRead is when you may have not consumed all of the last read?

e.g. You consume the first of 16 requests, or 4kB file read up to first delimiter, line terminator etc; move to start of loop, TryRead, else await ReadAsync

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure TryRead is supposed to be a non-blocking, polling-based read API. You could use it to consume the rest of the last read, but that's a very limited subset of its intended functionality.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behavior is mirroring what exists in DefaultPipeReader. It will return false if the everything was examined. This PR is not changing that behavior so focus on the cancellation changes and file other issues for behavior changes outside the scope of this PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This absolutely does not mirror the behavior that exists in DefaultPipeReader.

With DefaultPipeReader, you could only call TryRead and still get data. That's not the case with StreamPipeReader today or after this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I disagree with the premise, it's very much like IOCP. There's an API to wait for completion and there's an API to poll for completion. The polling API doesn't need to kick off the operation, some other operation can do that, now I understand that when there's no proactive operation going on it's not as useful but I don't agree that it invalidates the contract.

It's why your issue titled "Make StreamPipeReader.TryRead usable" is well intentioned but also a little trolly.

Copy link
Member

@halter73 halter73 Jul 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, and if data is available, it'll return true.

Except that's not the case. The client could be uploading all the request data it wants, and I'd never see it if I only called TryRead().

This is a regression caused by the introduction of StreamPipeReader, since before the StreamPipeReader there'd always be a reader calling InnerStream.ReadAsync() if the app was actually consuming the body whether or not it was through TryRead() or ReadAsync().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except that's not the case. The client could be uploading all the request data it wants, and I'd never see it if I only called TryRead().

That doesn't matter because there's no proactive read going on. So as far as the pipe is concerned, there is no data.

This is a regression caused by the introduction of StreamPipeReader, since before the StreamPipeReader there'd always be a reader calling InnerStream.ReadAsync() if the app was actually consuming the body whether or not it was through TryRead() or ReadAsync().

It's not a regression, it's a different implementation.

Copy link
Member

@halter73 halter73 Jul 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a regression, it's a different implementation.

With the risk of a consumer that worked fine in one implementation looping infinitely with the other. And we're swapping out or planning to swap out one implementation with the other in a lot of places.

If we're just going to decide to implement two different behaviors, there should be a very good reason for it.

The crazy thing is you already did all the real work to match the existing DefaultPipeReader.TryRead() implementation by implementing CancelPendingRead() the way you did.

That doesn't matter because there's no proactive read going on. So as far as the pipe is concerned, there is no data.

What about as far as the app is concerned? The Stream, the real source of truth this PipeReader is supposed to represent, might have data available. The StreamPipeReader simply hasn't checked.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is still a draft, I'll still contemplating changing Kestrel. There are always going to be differences in the implementations. It's just unfortunate that Kestrel depended on so many implementation details but that's what happens when you only have a single implementation.

// PERF: store InternalTokenSource locally to avoid querying it twice (which acquires a lock)
CancellationTokenSource tokenSource = InternalTokenSource;
if (TryReadInternal(tokenSource, out ReadResult readResult))
if (TryReadInternal(out ReadResult readResult))
Copy link
Member

@halter73 halter73 Jul 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noticed this. It wasn't even only TryRead that was affected 😨

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an easy fix would be to avoid operations while there’s a pending read (return false from TryRead)

Copy link
Member

@halter73 halter73 Jul 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That might be a reasonable short-term solution. It would be more correct to able to access previously unconsumed bytes if any, but I doubt being unable to do so would break many consumers. I guess we could file a followup issue.

_streamReadTask = ReadStreamAsync(cancellationToken);

// Completed the stream read inline because it was synchronous and there was no exception thrown
if (_streamReadTask.IsCompleted && _edi == null)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is checking _edi here done to avoid calling GetResult?

if (cancellationToken.CanBeCanceled)
{
reg = cancellationToken.UnsafeRegister(state => ((StreamPipeReader)state).Cancel(), this);
cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _completeCts.Token);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to create a LinkedTokenSource instead of doing the cancel pattern we had before?

@stephentoub
Copy link
Member

This is part of the fix for kestrel's handling of cancellation that was resulting in stress failures, right? What's the status of the fix?

@jkotalik
Copy link

This is related, but it shouldn't be hit today in Kestrel. We removed using the StreamPipeReader. The stress fix should come from dotnet/aspnetcore#12081

@davidfowl
Copy link
Member Author

Will revisit after #39450 is merged

@davidfowl davidfowl closed this Jul 22, 2019
@stephentoub stephentoub deleted the davidfowl/stream-pipereader-cancellation branch July 23, 2019 14:55
@karelz karelz added this to the 5.0 milestone Aug 3, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants