Skip to content

Commit

Permalink
Change HttpConnectionPool.GetConnectionAsync to do read-ahead outside…
Browse files Browse the repository at this point in the history
… of lock (dotnet#32495) (dotnet#32568)

Minimize the work done inside the lock in order to reduce contention.  This means we need to reacquire the lock if the connection isn't usable, but in the fast path case where the connection is usable, we remove the syscalls from being performed while holding the lock.
  • Loading branch information
stephentoub authored Oct 2, 2018
1 parent d1a3bca commit 7cddd8e
Showing 1 changed file with 74 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,83 +186,92 @@ private static SslClientAuthenticationOptions ConstructSslOptions(HttpConnection
TimeSpan pooledConnectionIdleTimeout = _poolManager.Settings._pooledConnectionIdleTimeout;
DateTimeOffset now = DateTimeOffset.UtcNow;
List<CachedConnection> list = _idleConnections;
lock (SyncObj)

// Try to get a cached connection. If we can and if it's usable, return it. If we can but it's not usable,
// try again. And if we can't because there aren't any valid ones, create a new one and return it.
while (true)
{
// Try to return a cached connection. We need to loop in case the connection
// we get from the list is unusable.
while (list.Count > 0)
CachedConnection cachedConnection;
lock (SyncObj)
{
CachedConnection cachedConnection = list[list.Count - 1];
HttpConnection conn = cachedConnection._connection;

list.RemoveAt(list.Count - 1);
if (cachedConnection.IsUsable(now, pooledConnectionLifetime, pooledConnectionIdleTimeout) &&
!conn.EnsureReadAheadAndPollRead())
if (list.Count > 0)
{
// We found a valid connection. Return it.
if (NetEventSource.IsEnabled) conn.Trace("Found usable connection in pool.");
return new ValueTask<(HttpConnection, HttpResponseMessage)>((conn, null));
// Pop off the next connection to try. We'll test it outside of the lock
// to avoid doing expensive validation while holding the lock.
cachedConnection = list[list.Count - 1];
list.RemoveAt(list.Count - 1);
}

// We got a connection, but it was already closed by the server or the
// server sent unexpected data or the connection is too old. In any case,
// we can't use the connection, so get rid of it and try again.
if (NetEventSource.IsEnabled) conn.Trace("Found invalid connection in pool.");
conn.Dispose();
}

// No valid cached connections, so we need to create a new one. If
// there's no limit on the number of connections associated with this
// pool, or if we haven't reached such a limit, simply create a new
// connection.
if (_associatedConnectionCount < _maxConnections)
{
if (NetEventSource.IsEnabled) Trace("Creating new connection for pool.");
IncrementConnectionCountNoLock();
return WaitForCreatedConnectionAsync(CreateConnectionAsync(request, cancellationToken));
}
else
{
// There is a limit, and we've reached it, which means we need to
// wait for a connection to be returned to the pool or for a connection
// associated with the pool to be dropped before we can create a
// new one. Create a waiter object and register it with the pool; it'll
// be signaled with the created connection when one is returned or
// space is available and the provided creation func has successfully
// created the connection to be used.
if (NetEventSource.IsEnabled) Trace("Limit reached. Waiting to create new connection.");
var waiter = new ConnectionWaiter(this, request, cancellationToken);
EnqueueWaiter(waiter);
if (cancellationToken.CanBeCanceled)
else
{
// If cancellation could be requested, register a callback for it that'll cancel
// the waiter and remove the waiter from the queue. Note that this registration needs
// to happen under the reentrant lock and after enqueueing the waiter.
waiter._cancellationTokenRegistration = cancellationToken.Register(s =>
// No valid cached connections, so we need to create a new one. If
// there's no limit on the number of connections associated with this
// pool, or if we haven't reached such a limit, simply create a new
// connection.
if (_associatedConnectionCount < _maxConnections)
{
var innerWaiter = (ConnectionWaiter)s;
lock (innerWaiter._pool.SyncObj)
if (NetEventSource.IsEnabled) Trace("Creating new connection for pool.");
IncrementConnectionCountNoLock();
return WaitForCreatedConnectionAsync(CreateConnectionAsync(request, cancellationToken));
}
else
{
// There is a limit, and we've reached it, which means we need to
// wait for a connection to be returned to the pool or for a connection
// associated with the pool to be dropped before we can create a
// new one. Create a waiter object and register it with the pool; it'll
// be signaled with the created connection when one is returned or
// space is available and the provided creation func has successfully
// created the connection to be used.
if (NetEventSource.IsEnabled) Trace("Limit reached. Waiting to create new connection.");
var waiter = new ConnectionWaiter(this, request, cancellationToken);
EnqueueWaiter(waiter);
if (cancellationToken.CanBeCanceled)
{
// If it's in the list, remove it and cancel it.
if (innerWaiter._pool.RemoveWaiterForCancellation(innerWaiter))
// If cancellation could be requested, register a callback for it that'll cancel
// the waiter and remove the waiter from the queue. Note that this registration needs
// to happen under the reentrant lock and after enqueueing the waiter.
waiter._cancellationTokenRegistration = cancellationToken.Register(s =>
{
bool canceled = innerWaiter.TrySetCanceled(innerWaiter._cancellationToken);
Debug.Assert(canceled);
}
var innerWaiter = (ConnectionWaiter)s;
lock (innerWaiter._pool.SyncObj)
{
// If it's in the list, remove it and cancel it.
if (innerWaiter._pool.RemoveWaiterForCancellation(innerWaiter))
{
bool canceled = innerWaiter.TrySetCanceled(innerWaiter._cancellationToken);
Debug.Assert(canceled);
}
}
}, waiter);
}
}, waiter);
return new ValueTask<(HttpConnection, HttpResponseMessage)>(waiter.Task);
}

// Note that we don't check for _disposed. We may end up disposing the
// created connection when it's returned, but we don't want to block use
// of the pool if it's already been disposed, as there's a race condition
// between getting a pool and someone disposing of it, and we don't want
// to complicate the logic about trying to get a different pool when the
// retrieved one has been disposed of. In the future we could alternatively
// try returning such connections to whatever pool is currently considered
// current for that endpoint, if there is one.
}
return new ValueTask<(HttpConnection, HttpResponseMessage)>(waiter.Task);
}

// Note that we don't check for _disposed. We may end up disposing the
// created connection when it's returned, but we don't want to block use
// of the pool if it's already been disposed, as there's a race condition
// between getting a pool and someone disposing of it, and we don't want
// to complicate the logic about trying to get a different pool when the
// retrieved one has been disposed of. In the future we could alternatively
// try returning such connections to whatever pool is currently considered
// current for that endpoint, if there is one.
HttpConnection conn = cachedConnection._connection;
if (cachedConnection.IsUsable(now, pooledConnectionLifetime, pooledConnectionIdleTimeout) &&
!conn.EnsureReadAheadAndPollRead())
{
// We found a valid connection. Return it.
if (NetEventSource.IsEnabled) conn.Trace("Found usable connection in pool.");
return new ValueTask<(HttpConnection, HttpResponseMessage)>((conn, null));
}

// We got a connection, but it was already closed by the server or the
// server sent unexpected data or the connection is too old. In any case,
// we can't use the connection, so get rid of it and loop around to try again.
if (NetEventSource.IsEnabled) conn.Trace("Found invalid connection in pool.");
conn.Dispose();
}
}

Expand Down

0 comments on commit 7cddd8e

Please sign in to comment.