Skip to content

Commit

Permalink
Fix race condition when cancelling pending HTTP connection attempts (#…
Browse files Browse the repository at this point in the history
…110744)

* Fix race condition when cancelling pending HTTP connection attempts

* Improve comments and naming
  • Loading branch information
MihaZupan authored Dec 18, 2024
1 parent 43f7d32 commit 54527ea
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,7 @@ private async Task InjectNewHttp11ConnectionAsync(RequestQueue<HttpConnection>.Q
HttpConnection? connection = null;
Exception? connectionException = null;

CancellationTokenSource cts = GetConnectTimeoutCancellationTokenSource();
waiter.ConnectionCancellationTokenSource = cts;
CancellationTokenSource cts = GetConnectTimeoutCancellationTokenSource(waiter);
try
{
connection = await CreateHttp11ConnectionAsync(queueItem.Request, true, cts.Token).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,7 @@ private async Task InjectNewHttp2ConnectionAsync(RequestQueue<Http2Connection?>.
Exception? connectionException = null;
HttpConnectionWaiter<Http2Connection?> waiter = queueItem.Waiter;

CancellationTokenSource cts = GetConnectTimeoutCancellationTokenSource();
waiter.ConnectionCancellationTokenSource = cts;
CancellationTokenSource cts = GetConnectTimeoutCancellationTokenSource(waiter);
try
{
(Stream stream, TransportContext? transportContext, Activity? activity, IPEndPoint? remoteEndPoint) = await ConnectAsync(queueItem.Request, true, cts.Token).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,52 +75,60 @@ internal sealed partial class HttpConnectionPool
// Loop in case we get a 421 and need to send the request to a different authority.
while (true)
{
if (!TryGetHttp3Authority(request, out HttpAuthority? authority, out Exception? reasonException))
HttpConnectionWaiter<Http3Connection?>? http3ConnectionWaiter = null;
try
{
if (reasonException is null)
if (!TryGetHttp3Authority(request, out HttpAuthority? authority, out Exception? reasonException))
{
return null;
if (reasonException is null)
{
return null;
}
ThrowGetVersionException(request, 3, reasonException);
}
ThrowGetVersionException(request, 3, reasonException);
}

long queueStartingTimestamp = HttpTelemetry.Log.IsEnabled() || Settings._metrics!.RequestsQueueDuration.Enabled ? Stopwatch.GetTimestamp() : 0;
Activity? waitForConnectionActivity = ConnectionSetupDistributedTracing.StartWaitForConnectionActivity(authority);
long queueStartingTimestamp = HttpTelemetry.Log.IsEnabled() || Settings._metrics!.RequestsQueueDuration.Enabled ? Stopwatch.GetTimestamp() : 0;
Activity? waitForConnectionActivity = ConnectionSetupDistributedTracing.StartWaitForConnectionActivity(authority);

if (!TryGetPooledHttp3Connection(request, out Http3Connection? connection, out HttpConnectionWaiter<Http3Connection?>? http3ConnectionWaiter))
{
try
if (!TryGetPooledHttp3Connection(request, out Http3Connection? connection, out http3ConnectionWaiter))
{
connection = await http3ConnectionWaiter.WaitWithCancellationAsync(cancellationToken).ConfigureAwait(false);
try
{
connection = await http3ConnectionWaiter.WaitWithCancellationAsync(cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
ConnectionSetupDistributedTracing.ReportError(waitForConnectionActivity, ex);
waitForConnectionActivity?.Stop();
throw;
}
}
catch (Exception ex)

// Request cannot be sent over H/3 connection, try downgrade or report failure.
// Note that if there's an H/3 suitable origin authority but is unavailable or blocked via Alt-Svc, exception is thrown instead.
if (connection is null)
{
ConnectionSetupDistributedTracing.ReportError(waitForConnectionActivity, ex);
waitForConnectionActivity?.Stop();
throw;
return null;
}
}

// Request cannot be sent over H/3 connection, try downgrade or report failure.
// Note that if there's an H/3 suitable origin authority but is unavailable or blocked via Alt-Svc, exception is thrown instead.
if (connection is null)
{
return null;
}
HttpResponseMessage response = await connection.SendAsync(request, queueStartingTimestamp, waitForConnectionActivity, cancellationToken).ConfigureAwait(false);

HttpResponseMessage response = await connection.SendAsync(request, queueStartingTimestamp, waitForConnectionActivity, cancellationToken).ConfigureAwait(false);
// If an Alt-Svc authority returns 421, it means it can't actually handle the request.
// An authority is supposed to be able to handle ALL requests to the origin, so this is a server bug.
// In this case, we blocklist the authority and retry the request at the origin.
if (response.StatusCode == HttpStatusCode.MisdirectedRequest && connection.Authority != _originAuthority)
{
response.Dispose();
BlocklistAuthority(connection.Authority);
continue;
}

// If an Alt-Svc authority returns 421, it means it can't actually handle the request.
// An authority is supposed to be able to handle ALL requests to the origin, so this is a server bug.
// In this case, we blocklist the authority and retry the request at the origin.
if (response.StatusCode == HttpStatusCode.MisdirectedRequest && connection.Authority != _originAuthority)
return response;
}
finally
{
response.Dispose();
BlocklistAuthority(connection.Authority);
continue;
http3ConnectionWaiter?.SetTimeoutToPendingConnectionAttempt(this, cancellationToken.IsCancellationRequested);
}

return response;
}
}

Expand Down Expand Up @@ -253,8 +261,7 @@ private async Task InjectNewHttp3ConnectionAsync(RequestQueue<Http3Connection?>.
HttpAuthority? authority = null;
HttpConnectionWaiter<Http3Connection?> waiter = queueItem.Waiter;

CancellationTokenSource cts = GetConnectTimeoutCancellationTokenSource();
waiter.ConnectionCancellationTokenSource = cts;
CancellationTokenSource cts = GetConnectTimeoutCancellationTokenSource(waiter);
Activity? connectionSetupActivity = null;
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,8 +559,8 @@ public async ValueTask<HttpResponseMessage> SendWithVersionDetectionAndRetryAsyn
// We never cancel both attempts at the same time. When downgrade happens, it's possible that both waiters are non-null,
// but in that case http2ConnectionWaiter.ConnectionCancellationTokenSource shall be null.
Debug.Assert(http11ConnectionWaiter is null || http2ConnectionWaiter?.ConnectionCancellationTokenSource is null);
http11ConnectionWaiter?.CancelIfNecessary(this, cancellationToken.IsCancellationRequested);
http2ConnectionWaiter?.CancelIfNecessary(this, cancellationToken.IsCancellationRequested);
http11ConnectionWaiter?.SetTimeoutToPendingConnectionAttempt(this, cancellationToken.IsCancellationRequested);
http2ConnectionWaiter?.SetTimeoutToPendingConnectionAttempt(this, cancellationToken.IsCancellationRequested);
}
}
}
Expand Down Expand Up @@ -827,7 +827,31 @@ private async ValueTask<Stream> EstablishSocksTunnel(HttpRequestMessage request,
return stream;
}

private CancellationTokenSource GetConnectTimeoutCancellationTokenSource() => new CancellationTokenSource(Settings._connectTimeout);
private CancellationTokenSource GetConnectTimeoutCancellationTokenSource<T>(HttpConnectionWaiter<T> waiter)
where T : HttpConnectionBase?
{
var cts = new CancellationTokenSource(Settings._connectTimeout);

lock (waiter)
{
// After a request completes (or is canceled), it will call into SetTimeoutToPendingConnectionAttempt,
// which will no-op if ConnectionCancellationTokenSource is not set, assuming that the connection attempt is done.
// As the initiating request for this connection attempt may complete concurrently at any time,
// there is a race condition where the first call to SetTimeoutToPendingConnectionAttempt may happen
// before we were able to set the CTS, so no timeout will be applied even though the request is already done.
waiter.ConnectionCancellationTokenSource = cts;

// To fix that, we check whether the waiter already completed now that we're holding a lock.
// If it had, call SetTimeoutToPendingConnectionAttempt again now that the CTS is set.
if (waiter.Task.IsCompleted)
{
waiter.SetTimeoutToPendingConnectionAttempt(this, requestCancelled: waiter.Task.IsCanceled);
waiter.ConnectionCancellationTokenSource = null;
}
}

return cts;
}

private static Exception CreateConnectTimeoutException(OperationCanceledException oce)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public bool TrySignal(T connection)
}
}

public void CancelIfNecessary(HttpConnectionPool pool, bool requestCancelled)
public void SetTimeoutToPendingConnectionAttempt(HttpConnectionPool pool, bool requestCancelled)
{
int timeout = GlobalHttpSettings.SocketsHttpHandler.PendingConnectionTimeoutOnRequestCompletion;
if (ConnectionCancellationTokenSource is null ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,68 @@ await RemoteExecutor.Invoke(static async (versionString, timoutStr) =>
}, UseVersion.ToString(), timeout.ToString()).DisposeAsync();
}

[OuterLoop("We wait for PendingConnectionTimeout which defaults to 5 seconds.")]
[Fact]
public async Task PendingConnectionTimeout_SignalsAllConnectionAttempts()
{
if (UseVersion == HttpVersion.Version30)
{
// HTTP3 does not support ConnectCallback
return;
}

int pendingConnectionAttempts = 0;
bool connectionAttemptTimedOut = false;

using var handler = new SocketsHttpHandler
{
ConnectCallback = async (context, cancellation) =>
{
Interlocked.Increment(ref pendingConnectionAttempts);
try
{
await Assert.ThrowsAsync<TaskCanceledException>(() => Task.Delay(-1, cancellation)).WaitAsync(TestHelper.PassingTestTimeout);
cancellation.ThrowIfCancellationRequested();
throw new UnreachableException();
}
catch (TimeoutException)
{
connectionAttemptTimedOut = true;
throw;
}
finally
{
Interlocked.Decrement(ref pendingConnectionAttempts);
}
}
};

using HttpClient client = CreateHttpClient(handler);
client.Timeout = TimeSpan.FromSeconds(2);

// Many of these requests should trigger new connection attempts, and all of those should eventually be cleaned up.
await Parallel.ForAsync(0, 100, async (_, _) =>
{
await Assert.ThrowsAnyAsync<TaskCanceledException>(() => client.GetAsync("https://dummy"));
});

Stopwatch stopwatch = Stopwatch.StartNew();

while (Volatile.Read(ref pendingConnectionAttempts) > 0)
{
Assert.False(connectionAttemptTimedOut);

if (stopwatch.Elapsed > 2 * TestHelper.PassingTestTimeout)
{
Assert.Fail("Connection attempts took too long to get cleaned up");
}

await Task.Delay(100);
}

Assert.False(connectionAttemptTimedOut);
}

private sealed class SetTcsContent : StreamContent
{
private readonly TaskCompletionSource<bool> _tcs;
Expand Down

0 comments on commit 54527ea

Please sign in to comment.