Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP/3 & QUIC: fix abort read on dispose and cancellation #55724

Merged
merged 12 commits into from
Jul 20, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ public async ValueTask DisposeAsync()
private void DisposeSyncHelper()
{
_connection.RemoveStream(_stream);
_connection = null!;
_stream = null!;

_sendBuffer.Dispose();
_recvBuffer.Dispose();
Expand Down Expand Up @@ -1107,7 +1105,10 @@ private void HandleReadResponseContentException(Exception ex, CancellationToken
{
switch (ex)
{
// Peer aborted the stream
case QuicStreamAbortedException _:
// User aborted the stream
case QuicOperationAbortedException _:
throw new IOException(SR.net_http_client_execution_error, new HttpRequestException(SR.net_http_client_execution_error, ex));
case QuicConnectionAbortedException _:
// Our connection was reset. Start aborting the connection.
Expand All @@ -1118,11 +1119,11 @@ private void HandleReadResponseContentException(Exception ex, CancellationToken
_connection.Abort(ex);
throw new IOException(SR.net_http_client_execution_error, new HttpRequestException(SR.net_http_client_execution_error, ex));
case OperationCanceledException oce when oce.CancellationToken == cancellationToken:
_stream.AbortWrite((long)Http3ErrorCode.RequestCancelled);
_stream.AbortRead((long)Http3ErrorCode.RequestCancelled);
ExceptionDispatchInfo.Throw(ex); // Rethrow.
return; // Never reached.
default:
_stream.AbortWrite((long)Http3ErrorCode.InternalError);
_stream.AbortRead((long)Http3ErrorCode.InternalError);
throw new IOException(SR.net_http_client_execution_error, new HttpRequestException(SR.net_http_client_execution_error, ex));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,12 @@ await LoopbackServerFactory.CreateClientAndServerAsync(async uri =>
[InlineData("RandomCustomHeader", 12345)]
public async Task GetAsync_LargeHeader_Success(string headerName, int headerValueLength)
{
if (UseVersion == HttpVersion.Version30)
{
// [ActiveIssue("https://github.com/dotnet/runtime/issues/55508")]
return;
}

var rand = new Random(42);
string headerValue = string.Concat(Enumerable.Range(0, headerValueLength).Select(_ => (char)('A' + rand.Next(26))));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,186 @@ public async Task Public_Interop_Upgrade_Success(string uri)
}
}

public enum CancellationType
{
Dispose,
CancellationToken
}

[ConditionalTheory(nameof(IsMsQuicSupported))]
[InlineData(CancellationType.Dispose)]
[InlineData(CancellationType.CancellationToken)]
public async Task ResponseCancellation_ServerReceivesCancellation(CancellationType type)
{
if (UseQuicImplementationProvider != QuicImplementationProviders.MsQuic)
{
return;
}

using Http3LoopbackServer server = CreateHttp3LoopbackServer();

using var clientDone = new SemaphoreSlim(0);
using var serverDone = new SemaphoreSlim(0);

Task serverTask = Task.Run(async () =>
{
using Http3LoopbackConnection connection = (Http3LoopbackConnection)await server.EstablishGenericConnectionAsync();
using Http3LoopbackStream stream = await connection.AcceptRequestStreamAsync();

HttpRequestData request = await stream.ReadRequestDataAsync().ConfigureAwait(false);

int contentLength = 2*1024*1024;
var headers = new List<HttpHeaderData>();
headers.Append(new HttpHeaderData("Content-Length", contentLength.ToString(CultureInfo.InvariantCulture)));

await stream.SendResponseHeadersAsync(HttpStatusCode.OK, headers).ConfigureAwait(false);
await stream.SendDataFrameAsync(new byte[1024]).ConfigureAwait(false);

await clientDone.WaitAsync();

// It is possible that PEER_RECEIVE_ABORTED event will arrive with a significant delay after peer calls AbortReceive
// In that case even with synchronization via semaphores, first writes after peer aborting may "succeed" (get SEND_COMPLETE event)
// We are asserting that PEER_RECEIVE_ABORTED would still arrive eventually

var ex = await Assert.ThrowsAsync<QuicStreamAbortedException>(() => SendDataForever(stream).WaitAsync(TimeSpan.FromSeconds(10)));
Assert.Equal((type == CancellationType.CancellationToken ? 268 : 0xffffffff), ex.ErrorCode);

serverDone.Release();
});
CarnaViire marked this conversation as resolved.
Show resolved Hide resolved

Task clientTask = Task.Run(async () =>
{
using HttpClient client = CreateHttpClient();

using HttpRequestMessage request = new()
{
Method = HttpMethod.Get,
RequestUri = server.Address,
Version = HttpVersion30,
VersionPolicy = HttpVersionPolicy.RequestVersionExact
};
HttpResponseMessage response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead).WaitAsync(TimeSpan.FromSeconds(10));

Stream stream = await response.Content.ReadAsStreamAsync();

int bytesRead = await stream.ReadAsync(new byte[1024]);
Assert.Equal(1024, bytesRead);

var cts = new CancellationTokenSource(200);

if (type == CancellationType.Dispose)
{
cts.Token.Register(() => response.Dispose());
}
CancellationToken readCt = type == CancellationType.CancellationToken ? cts.Token : default;

Exception ex = await Assert.ThrowsAnyAsync<Exception>(() => stream.ReadAsync(new byte[1024], cancellationToken: readCt).AsTask());

if (type == CancellationType.CancellationToken)
{
Assert.IsType<OperationCanceledException>(ex);
}
else
{
var ioe = Assert.IsType<IOException>(ex);
var hre = Assert.IsType<HttpRequestException>(ioe.InnerException);
Assert.IsType<QuicOperationAbortedException>(hre.InnerException);
}

clientDone.Release();
await serverDone.WaitAsync();
});

await new[] { clientTask, serverTask }.WhenAllOrAnyFailed(20_000);
}

[Fact]
public async Task ResponseCancellation_BothCancellationTokenAndDispose_Success()
{
if (UseQuicImplementationProvider != QuicImplementationProviders.MsQuic)
{
return;
}

using Http3LoopbackServer server = CreateHttp3LoopbackServer();

using var clientDone = new SemaphoreSlim(0);
using var serverDone = new SemaphoreSlim(0);

Task serverTask = Task.Run(async () =>
{
using Http3LoopbackConnection connection = (Http3LoopbackConnection)await server.EstablishGenericConnectionAsync();
using Http3LoopbackStream stream = await connection.AcceptRequestStreamAsync();

HttpRequestData request = await stream.ReadRequestDataAsync().ConfigureAwait(false);

int contentLength = 2*1024*1024;
var headers = new List<HttpHeaderData>();
headers.Append(new HttpHeaderData("Content-Length", contentLength.ToString(CultureInfo.InvariantCulture)));

await stream.SendResponseHeadersAsync(HttpStatusCode.OK, headers).ConfigureAwait(false);
await stream.SendDataFrameAsync(new byte[1024]).ConfigureAwait(false);

await clientDone.WaitAsync();

// It is possible that PEER_RECEIVE_ABORTED event will arrive with a significant delay after peer calls AbortReceive
// In that case even with synchronization via semaphores, first writes after peer aborting may "succeed" (get SEND_COMPLETE event)
// We are asserting that PEER_RECEIVE_ABORTED would still arrive eventually

var ex = await Assert.ThrowsAsync<QuicStreamAbortedException>(() => SendDataForever(stream).WaitAsync(TimeSpan.FromSeconds(10)));
// exact error code depends on who won the race
Assert.True(ex.ErrorCode == 268 /* cancellation */ || ex.ErrorCode == 0xffffffff /* disposal */);
CarnaViire marked this conversation as resolved.
Show resolved Hide resolved

serverDone.Release();
});
CarnaViire marked this conversation as resolved.
Show resolved Hide resolved

Task clientTask = Task.Run(async () =>
{
using HttpClient client = CreateHttpClient();

using HttpRequestMessage request = new()
{
Method = HttpMethod.Get,
RequestUri = server.Address,
Version = HttpVersion30,
VersionPolicy = HttpVersionPolicy.RequestVersionExact
};
HttpResponseMessage response = await client.SendAsync(request, HttpCompletionOption.ResponseHeadersRead).WaitAsync(TimeSpan.FromSeconds(10));

Stream stream = await response.Content.ReadAsStreamAsync();

int bytesRead = await stream.ReadAsync(new byte[1024]);
Assert.Equal(1024, bytesRead);

var cts = new CancellationTokenSource(200);
cts.Token.Register(() => response.Dispose());

Exception ex = await Assert.ThrowsAnyAsync<Exception>(() => stream.ReadAsync(new byte[1024], cancellationToken: cts.Token).AsTask());

// exact exception depends on who won the race
if (ex is not OperationCanceledException and not ObjectDisposedException)
{
var ioe = Assert.IsType<IOException>(ex);
var hre = Assert.IsType<HttpRequestException>(ioe.InnerException);
Assert.IsType<QuicOperationAbortedException>(hre.InnerException);
}

clientDone.Release();
await serverDone.WaitAsync();
});

await new[] { clientTask, serverTask }.WhenAllOrAnyFailed(20_000);
}

private static async Task SendDataForever(Http3LoopbackStream stream)
{
var buf = new byte[100];
while (true)
{
await stream.SendDataFrameAsync(buf);
}
}

/// <summary>
/// These are public interop test servers for various QUIC and HTTP/3 implementations,
/// taken from https://github.com/quicwg/base-drafts/wiki/Implementations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,7 @@ private void Dispose(bool disposing)

bool callShutdown = false;
bool abortRead = false;
bool completeRead = false;
bool releaseHandles = false;
lock (_state)
{
Expand All @@ -719,9 +720,13 @@ private void Dispose(bool disposing)
callShutdown = true;
}

if (_state.ReadState < ReadState.ReadsCompleted)
// We can enter Aborted state from both AbortRead call (aborts on the wire) and a Cancellation callback (only changes state)
// We need to ensure read is aborted on the wire here. We let msquic handle a second call to abort as a no-op
if (_state.ReadState < ReadState.ReadsCompleted || _state.ReadState == ReadState.Aborted)
{
abortRead = true;
completeRead = _state.ReadState == ReadState.PendingRead;
_state.Stream = null;
_state.ReadState = ReadState.Aborted;
}

Expand Down Expand Up @@ -755,6 +760,12 @@ private void Dispose(bool disposing)
} catch (ObjectDisposedException) { };
}

if (completeRead)
{
_state.ReceiveResettableCompletionSource.CompleteException(
ExceptionDispatchInfo.SetCurrentStackTrace(new QuicOperationAbortedException("Read was canceled")));
}

if (releaseHandles)
{
_state.Cleanup();
Expand Down Expand Up @@ -932,7 +943,7 @@ private static uint HandleEventPeerRecvAborted(State state, ref StreamEvent evt)
shouldComplete = true;
}
state.SendState = SendState.Aborted;
state.SendErrorCode = (long)evt.Data.PeerSendAborted.ErrorCode;
state.SendErrorCode = (long)evt.Data.PeerReceiveAborted.ErrorCode;
}

if (shouldComplete)
Expand Down