-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Adjustments to cancellation on StreamPipeReader #39228
Conversation
- Allow CancelPendingRead to yield the ReadAsync without cancelling the underlying Stream.ReadAsync.
} | ||
|
||
var reg = new CancellationTokenRegistration(); | ||
return new ValueTask<ReadResult>(this, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might as well add a short
field and count up prior to this call e.g.
_token++;
return new ValueTask<ReadResult>(this, _token);
Then check if token
passed to GetResult(short token)
is the same and throw if not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don’t check the token today in other the default pipe, I’m not going to add it here unless we do it in both. I’d store then token as part of the PipeAwaitable state
@@ -172,6 +173,15 @@ public override void Complete(Exception exception = null) | |||
|
|||
_isReaderCompleted = true; | |||
|
|||
// Make an attempt to cancel any call to Stream.ReadAsync | |||
_completeCts.Cancel(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're going to allow PipeReader.Complete() to be called during an ongoing call to PipeReader.ReadAsync(), we should ensure that the ongoing ReadAsync calls complete prior to disposing the inner stream and returning memory pool blocks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn’t trying to guard against that, it’s trying to guard against the fact that we have a dangling ReadAsync to the Stream going on, not misuse of the existing API
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it’s trying to guard against the fact that we have a dangling ReadAsync to the Stream going on
I know. And that dangling Stream.ReadAsync() call must have been made by a dangling PipeReader.ReadAsync() call. If we don't await the call Stream.ReadAsync() here in Complete before disposing the inner Stream, it's just as much of a misuse of the API as not cancelling the read at all and trying to dispose the inner Stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No I’m not going to guard against that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This guard happens when the API is being used properly. The other is just buggy code. While it’s easy to misuse I’m not in this PR adding guards for explicit misuse
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never mind. You're thinking of the CancelPendingRead case.
In that case, I'm even more concerned about properly awaiting the completion of ReadAsyc(), because as you say there's nothing the caller can do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea but I’m not willing to block so..... 😬. Either we make this change or we don’t. Also cancellation is a best effort in general so no guarantees this even works without this change (but at least you’re stuck in a ReadAsyc call). The fact that we don’t always dispose the stream is also a problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably should block. It should be rare for there to be any ongoing reads at this point, and in practice, Cancel should cause the read to finish very quickly anyway.
I understand that returning the blocks in the background after Complete exits is not an option. CompleteAsync cannot come soon enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I rather not introduce sync over async.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’ll open another issue on adding CompleteAsync to both the reader and writer
@@ -91,7 +81,7 @@ public override void AdvanceTo(SequencePosition consumed, SequencePosition exami | |||
|
|||
private void AdvanceTo(BufferSegment consumedSegment, int consumedIndex, BufferSegment examinedSegment, int examinedIndex) | |||
{ | |||
if (consumedSegment == null || examinedSegment == null) | |||
if (consumedSegment == null || examinedSegment == null || !_streamReadTask.IsCompleted) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is the reader getting a non-null consumedSegment/examinedSegment for an unfinished/canceled read anyway? Can we at least do some validation to verify the caller isn't trying to actual advance the cursor in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we return the existing buffer for cancelled reads. You could have buffered data on a previous read then cancelled the next read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly, and you're just ignoring that without advancing the read head and returning. You should add a test for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For what exactly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling AdvanceTo with a real cursor during an ongoing call to InnerStream.ReadAsyc. It's super broken right now. If you don't want to provid solid support it, at least throw in that case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just so I’m clear, the scenario is I have data buffered, and I cancel the read and call Advance with a real cursor? Yes that seems like something that should be handled. I just need to make sure to we don’t clear the memory in the pending read
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. That's the scenario I was thinking of.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good
{ | ||
bool isCancellationRequested = source.IsCancellationRequested; | ||
if (isCancellationRequested || _bufferedBytes > 0 && (!_examinedEverything || _isStreamCompleted)) | ||
if (_awaitable.IsCompleted || (_bufferedBytes > 0 && (_awaitable.IsCompleted || _isStreamCompleted))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You know _awaitable is not completed in the second half of this condition:
if (_awaitable.IsCompleted || (_bufferedBytes > 0 && (_awaitable.IsCompleted || _isStreamCompleted))) | |
if (_awaitable.IsCompleted || (_bufferedBytes > 0 && _isStreamCompleted)) |
ReadOnlySequence<byte> buffer = _readHead == null ? default : GetCurrentReadOnlySequence(); | ||
|
||
result = new ReadResult(buffer, isCancellationRequested, _isStreamCompleted); | ||
result = GetReadResult(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So how is StreamPipeReader.TryRead() supposed to work? If no one ever calls StreamPipeReader.ReadAsync() it just forever no-ops and returns false, right? That doesn't seem good.
And if you have to call StreamPipeReader.ReadAsync() anyway, why not just await the result? If you try not to await the result StreamPipeReader.ReadAsync() and then call StreamPipeReader.TryRead() until it returns true, not only is that inefficient, it's likely to be racy since StreamPipeReader doesn't lock when reading or updating fields like _readHead/Tail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TryRead is when you may have not consumed all of the last read?
e.g. You consume the first of 16 requests, or 4kB file read up to first delimiter, line terminator etc; move to start of loop, TryRead
, else await ReadAsync
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty sure TryRead is supposed to be a non-blocking, polling-based read API. You could use it to consume the rest of the last read, but that's a very limited subset of its intended functionality.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The behavior is mirroring what exists in DefaultPipeReader. It will return false if the everything was examined. This PR is not changing that behavior so focus on the cancellation changes and file other issues for behavior changes outside the scope of this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This absolutely does not mirror the behavior that exists in DefaultPipeReader.
With DefaultPipeReader, you could only call TryRead and still get data. That's not the case with StreamPipeReader today or after this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because I disagree with the premise, it's very much like IOCP. There's an API to wait for completion and there's an API to poll for completion. The polling API doesn't need to kick off the operation, some other operation can do that, now I understand that when there's no proactive operation going on it's not as useful but I don't agree that it invalidates the contract.
It's why your issue titled "Make StreamPipeReader.TryRead usable" is well intentioned but also a little trolly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, and if data is available, it'll return true.
Except that's not the case. The client could be uploading all the request data it wants, and I'd never see it if I only called TryRead().
This is a regression caused by the introduction of StreamPipeReader, since before the StreamPipeReader there'd always be a reader calling InnerStream.ReadAsync() if the app was actually consuming the body whether or not it was through TryRead() or ReadAsync().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Except that's not the case. The client could be uploading all the request data it wants, and I'd never see it if I only called TryRead().
That doesn't matter because there's no proactive read going on. So as far as the pipe is concerned, there is no data.
This is a regression caused by the introduction of StreamPipeReader, since before the StreamPipeReader there'd always be a reader calling InnerStream.ReadAsync() if the app was actually consuming the body whether or not it was through TryRead() or ReadAsync().
It's not a regression, it's a different implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not a regression, it's a different implementation.
With the risk of a consumer that worked fine in one implementation looping infinitely with the other. And we're swapping out or planning to swap out one implementation with the other in a lot of places.
If we're just going to decide to implement two different behaviors, there should be a very good reason for it.
The crazy thing is you already did all the real work to match the existing DefaultPipeReader.TryRead() implementation by implementing CancelPendingRead() the way you did.
That doesn't matter because there's no proactive read going on. So as far as the pipe is concerned, there is no data.
What about as far as the app is concerned? The Stream, the real source of truth this PipeReader is supposed to represent, might have data available. The StreamPipeReader simply hasn't checked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR is still a draft, I'll still contemplating changing Kestrel. There are always going to be differences in the implementations. It's just unfortunate that Kestrel depended on so many implementation details but that's what happens when you only have a single implementation.
// PERF: store InternalTokenSource locally to avoid querying it twice (which acquires a lock) | ||
CancellationTokenSource tokenSource = InternalTokenSource; | ||
if (TryReadInternal(tokenSource, out ReadResult readResult)) | ||
if (TryReadInternal(out ReadResult readResult)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just noticed this. It wasn't even only TryRead that was affected 😨
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
an easy fix would be to avoid operations while there’s a pending read (return false from TryRead)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That might be a reasonable short-term solution. It would be more correct to able to access previously unconsumed bytes if any, but I doubt being unable to do so would break many consumers. I guess we could file a followup issue.
_streamReadTask = ReadStreamAsync(cancellationToken); | ||
|
||
// Completed the stream read inline because it was synchronous and there was no exception thrown | ||
if (_streamReadTask.IsCompleted && _edi == null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is checking _edi here done to avoid calling GetResult?
if (cancellationToken.CanBeCanceled) | ||
{ | ||
reg = cancellationToken.UnsafeRegister(state => ((StreamPipeReader)state).Cancel(), this); | ||
cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _completeCts.Token); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to create a LinkedTokenSource instead of doing the cancel pattern we had before?
This is part of the fix for kestrel's handling of cancellation that was resulting in stress failures, right? What's the status of the fix? |
This is related, but it shouldn't be hit today in Kestrel. We removed using the StreamPipeReader. The stress fix should come from dotnet/aspnetcore#12081 |
Will revisit after #39450 is merged |
This is driven by this behavior in Kestrel (dotnet/aspnetcore#11804 (comment)) which uses the pipe as an event loop and CancelPendingRead as a way to wake up the loop when no data is being sent. It's basically a synchronization primitive.
PS: I need to add some more tests, but wanted to get some eyes on it to make sure I'm not missing anything obvious.
I also need to handle dispatch to the sync context etc.