Skip to content

Commit

Permalink
Summary of changes in this PR:
Browse files Browse the repository at this point in the history
  * Added a new global `set_connection_limit!` function for controlling the global connection limit that will be applied to all requests
    This is one way to resolve #1033. I added a deprecation warning when passing `connect_limit` to individual requests. So usage would be:
    calling `HTTP.set_connection_limit!` and any time this is called, it changes the global value.
  * Add a try-finally in keepalive! around our global IO lock usage just for good house-keeping
  * Refactored `try_with_timeout` to use a `Threads.Condition` and `Threads.@spawn` instead of the non-threaded versions; seems cleaner
    and allows us to avoid the usage of `@async` when not needed. Note that I included a change in StreamRequest.jl however that wraps
    all the actual write/read IO operations in a `fetch(@async dostuff())` because this will currently prevent code in this task from
    migrating across threads, which is important for OpenSSL usage where error handling is done per-thread. I don't love the solution,
    but it seems ok for now.
  * I refactored a few of the stream IO functions so that we always know the number of bytes downloaded, whether in memory or written to
    an IO, so we can log them and use them in verbose logging to give bit-rate calculations
  * Ok, the big one: I rewrote the internal implementation of ConnectionPool.ConnectionPools.Pod `acquire`/`release` functions; under really
    heavy workloads, there was a ton of contention on the Pod lock. I also observed at least one "hang" where GDB backtraces seemed to indicate
    that somehow a task failed/died/hung while trying to make a new connection _while holding the Pod lock_, which then meant that no other
    requests could ever make progress. The new implementation includes a lock-free "fastpath" where an existing connection that can be re-used
    doesn't require any lock-taking. It uses a lock-free concurrent Stack implementation copied from JuliaConcurrent/ConcurrentCollections.jl (
    doesn't seem actively maintained and it's not much code, so just copied). The rest of the `acquire`/`release` code is now modeled after
    Base.Event in how releasing always acquires the lock and slow-path acquires also take the lock to ensure fairness and no deadlocks.
    I've included some benchmark results on a variety of heavy workloads [here](https://everlasting-mahogany-a5f.notion.site/Issue-heavy-load-perf-degradation-1cd275c75037481a9cd6378b8303cfb3)
    that show some great improvements, a bulk of which are attributable to reducing contention when acquiring/releasing connections during requests.
    The other key change included in this rewrite is that we ensure we _do not_ hold any locks while _making new connections_ to avoid the
    possibility of the lock ever getting "stuck", and because it's not necessary: the pod is in charge of just keeping track of numbers and
    doesn't need to worry about whether the connection was actually made yet or not (if it fails, it will be immediately released back and retried).
    Overall, the code is also _much_ simpler, which I think is a huge win, because the old code was always pretty scary to have to dig into.
  • Loading branch information
quinnj committed Apr 11, 2023
1 parent 75c1b25 commit b0007bb
Show file tree
Hide file tree
Showing 12 changed files with 195 additions and 248 deletions.
40 changes: 25 additions & 15 deletions src/ConnectionPool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,21 @@ remotely closed, a connection will be reused.
"""
module ConnectionPool

export Connection, newconnection, releaseconnection, getrawstream, inactiveseconds, shouldtimeout, set_default_connection_limit!
export Connection, newconnection, releaseconnection, getrawstream, inactiveseconds, shouldtimeout, set_default_connection_limit!, set_connection_limit!

using Sockets, LoggingExtras, NetworkOptions
using MbedTLS: SSLConfig, SSLContext, setup!, associate!, hostname!, handshake!
using MbedTLS, OpenSSL
using ..IOExtras, ..Conditions, ..Exceptions

const default_connection_limit = Ref(8)
const nolimit = typemax(Int)

set_default_connection_limit!(n) = default_connection_limit[] = n

taskid(t=current_task()) = string(hash(t) & 0xffff, base=16, pad=4)

include("connectionpools.jl")
using .ConnectionPools
set_default_connection_limit!(n) = ConnectionPools.connection_limit[] = n
set_connection_limit!(n) = ConnectionPools.connection_limit[] = n

"""
Connection
Expand Down Expand Up @@ -364,15 +363,15 @@ or create a new `Connection` if required.
function newconnection(::Type{T},
host::AbstractString,
port::AbstractString;
connection_limit=default_connection_limit[],
connection_limit=nothing,
forcenew::Bool=false,
idle_timeout=typemax(Int),
require_ssl_verification::Bool=NetworkOptions.verify_host(host, "SSL"),
kw...) where {T <: IO}
return acquire(
getpool(T),
(T, host, port, require_ssl_verification, true);
max_concurrent_connections=Int(connection_limit),
max_concurrent_connections=connection_limit,
forcenew=forcenew,
idle_timeout=Int(idle_timeout)) do
Connection(host, port,
Expand All @@ -383,16 +382,21 @@ function newconnection(::Type{T},
end
end

releaseconnection(c::Connection{T}, reuse) where {T} =
function releaseconnection(c::Connection{T}, reuse) where {T}
c.timestamp = time()
release(getpool(T), connectionkey(c), c; return_for_reuse=reuse)
end

function keepalive!(tcp)
Base.iolock_begin()
Base.check_open(tcp)
err = ccall(:uv_tcp_keepalive, Cint, (Ptr{Nothing}, Cint, Cuint),
tcp.handle, 1, 1)
Base.uv_error("failed to set keepalive on tcp socket", err)
Base.iolock_end()
try
Base.check_open(tcp)
err = ccall(:uv_tcp_keepalive, Cint, (Ptr{Nothing}, Cint, Cuint),
tcp.handle, 1, 1)
Base.uv_error("failed to set keepalive on tcp socket", err)
finally
Base.iolock_end()
end
return
end

Expand Down Expand Up @@ -431,7 +435,7 @@ function getconnection(::Type{TCPSocket},
return if connect_timeout > 0
tcp = Sockets.TCPSocket()
Sockets.connect!(tcp, addr, p)
try_with_timeout(() -> checkconnected(tcp), connect_timeout, () -> close(tcp)) do
try_with_timeout(connect_timeout) do
Sockets.wait_connected(tcp)
keepalive && keepalive!(tcp)
end
Expand All @@ -442,6 +446,7 @@ function getconnection(::Type{TCPSocket},
tcp
end
catch e
close(tcp)
lasterr = e isa TimeoutError ? ConnectTimeout(host, port) : e
continue
end
Expand Down Expand Up @@ -545,8 +550,13 @@ function sslupgrade(::Type{IOType}, c::Connection{T},
# if the upgrade fails, an error will be thrown and the original c will be closed
# in ConnectionRequest
tls = if readtimeout > 0
try_with_timeout(() -> shouldtimeout(c, readtimeout), readtimeout, () -> close(c)) do
sslconnection(IOType, c.io, host; require_ssl_verification=require_ssl_verification, kw...)
try
try_with_timeout(readtimeout) do
sslconnection(IOType, c.io, host; require_ssl_verification=require_ssl_verification, kw...)
end
catch
close(c)
rethrow()
end
else
sslconnection(IOType, c.io, host; require_ssl_verification=require_ssl_verification, kw...)
Expand Down
43 changes: 12 additions & 31 deletions src/Exceptions.jl
Original file line number Diff line number Diff line change
Expand Up @@ -25,41 +25,22 @@ macro $(:try)(exes...)
end
end # @eval

function try_with_timeout(f, shouldtimeout, delay, iftimeout=() -> nothing)
@assert delay > 0
cond = Condition()
# execute f async
t = @async try
notify(cond, f())
catch e
@debugv 1 "error executing f in try_with_timeout"
isopen(timer) && notify(cond, e, error = true)
end
# start a timer
timer = Timer(delay; interval=delay / 10) do tm
function try_with_timeout(f, timeout)
ch = Channel(0)
timer = Timer(tm -> close(ch, TimeoutError(timeout)), timeout)
Threads.@spawn begin
try
if shouldtimeout()
@debugv 1 "❗️ Timeout: $delay"
close(tm)
iftimeout()
notify(cond, TimeoutError(delay), error = true)
end
put!(ch, $f())
catch e
@debugv 1 "callback error in try_with_timeout"
close(tm)
notify(cond, e, error = true)
close(ch, CapturedException(e, catch_backtrace()))
finally
if isopen(timer)
close(timer)
notify(event)
end
end
end
try
res = wait(cond)
@debugv 1 "try_with_timeout finished with: $res"
res
catch e
@debugv 1 "try_with_timeout failed with: $e"
rethrow()
finally
close(timer)
end
return take!(ch)
end

abstract type HTTPError <: Exception end
Expand Down
14 changes: 10 additions & 4 deletions src/Streams.jl
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ end

@noinline function bufcheck(buf::Base.GenericIOBuffer, n)
requested_buffer_capacity = (buf.append ? buf.size : (buf.ptr - 1)) + n
requested_buffer_capacity > length(buf.data) && throw(ArgumentError("Unable to grow response stream IOBuffer large enough for response body size"))
requested_buffer_capacity > length(buf.data) && throw(ArgumentError("Unable to grow response stream IOBuffer $(length(buf.data)) large enough for response body size: $requested_buffer_capacity"))
end

function Base.readbytes!(http::Stream, buf::Base.GenericIOBuffer, n=bytesavailable(http))
Expand All @@ -299,19 +299,25 @@ function Base.readbytes!(http::Stream, buf::Base.GenericIOBuffer, n=bytesavailab
return n
end

Base.read(http::Stream, buf::Base.GenericIOBuffer=PipeBuffer()) = take!(readall!(http, buf))
function Base.read(http::Stream, buf::Base.GenericIOBuffer=PipeBuffer())
readall!(http, buf)
return take!(buf)
end

function readall!(http::Stream, buf::Base.GenericIOBuffer=PipeBuffer())
n = 0
if ntoread(http) == unknown_length
while !eof(http)
readbytes!(http, buf)
n += readbytes!(http, buf)
end
else
# even if we know the length, we still need to read until eof
# because Transfer-Encoding: chunked comes in piece-by-piece
while !eof(http)
readbytes!(http, buf, ntoread(http))
end
end
return buf
return n
end

function Base.readuntil(http::Stream, f::Function)::ByteView
Expand Down
5 changes: 4 additions & 1 deletion src/clientlayers/ConnectionRequest.jl
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ function connectionlayer(handler)
target_url = URI(target_url, port=80) # if there is no port info, connect_tunnel will fail
end
r = if readtimeout > 0
try_with_timeout(() -> shouldtimeout(io, readtimeout), readtimeout, () -> close(io)) do
try_with_timeout(readtimeout) do
connect_tunnel(io, target_url, req)
end
else
Expand All @@ -110,6 +110,9 @@ function connectionlayer(handler)
stream = Stream(req.response, io)
return handler(stream; readtimeout=readtimeout, kw...)
catch e
if e isa TaskFailedException
e = e.task.result
end
@debugv 1 "❗️ ConnectionLayer $e. Closing: $io"
shouldreuse = false
@try Base.IOError close(io)
Expand Down
8 changes: 8 additions & 0 deletions src/clientlayers/MessageRequest.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ using URIs
using ..IOExtras, ..Messages, ..Parsers, ..Exceptions
using ..Messages, ..Parsers
using ..Strings: HTTPVersion
using LoggingExtras

export messagelayer

Expand All @@ -23,6 +24,7 @@ function messagelayer(handler)
return function(method::String, url::URI, headers, body; copyheaders::Bool=true, response_stream=nothing, http_version=HTTPVersion(1, 1), kw...)
req = Request(method, resource(url), mkreqheaders(headers, copyheaders), body; url=url, version=http_version, responsebody=response_stream)
local resp
start_time = time()
try
resp = handler(req; response_stream=response_stream, kw...)
catch e
Expand All @@ -38,6 +40,12 @@ function messagelayer(handler)
write(resp.body, resp.request.context[:response_body])
end
end
if @isdefined(resp)
end_time = time()
bytes = get(resp.request.context, :nbytes, 0)
mbits_per_second = bytes == 0 ? 0 : (((8 * bytes) / 1e9) / (end_time - start_time))
@debugv 1 "Request complete with bandwidth: $(mbits_per_second) Gbps"
end
end
end
end
Expand Down
1 change: 1 addition & 0 deletions src/clientlayers/RetryRequest.jl
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ function retrylayer(handler)
end

isrecoverable(ex) = true
isrecoverable(ex::ConnectError) = ex.error isa Sockets.DNSError && ex.error.code == EAI_AGAIN ? false : true
isrecoverable(ex::StatusError) = retryable(ex.status)

function _retry_check(s, ex, req, check)
Expand Down
17 changes: 12 additions & 5 deletions src/clientlayers/StreamRequest.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ immediately so that the transmission can be aborted if the `Response` status
indicates that the server does not wish to receive the message body.
[RFC7230 6.5](https://tools.ietf.org/html/rfc7230#section-6.5).
"""
function streamlayer(stream::Stream; iofunction=nothing, decompress::Union{Nothing, Bool}=nothing, kw...)::Response
function streamlayer(stream::Stream; kw...)::Response
return fetch(@async _streamlayer(stream; kw...))
end

function _streamlayer(stream::Stream; iofunction=nothing, decompress::Union{Nothing, Bool}=nothing, kw...)::Response
response = stream.message
req = response.request
io = stream.stream
Expand Down Expand Up @@ -132,6 +136,7 @@ end
const IOBuffers = Union{IOBuffer, Base.GenericIOBuffer{SubArray{UInt8, 1, Vector{UInt8}, Tuple{UnitRange{Int64}}, true}}}

function readbody!(stream::Stream, res::Response, buf_or_stream)
n = 0
if !iserror(res)
if isbytes(res.body)
if length(res.body) > 0
Expand All @@ -144,29 +149,31 @@ function readbody!(stream::Stream, res::Response, buf_or_stream)
# if it's a BufferStream, the response body was gzip encoded
# so using the default write is fastest because it utilizes
# readavailable under the hood, for which BufferStream is optimized
write(body, buf_or_stream)
n = write(body, buf_or_stream)
elseif buf_or_stream isa Stream
# for HTTP.Stream, there's already an optimized read method
# that just needs an IOBuffer to write into
readall!(buf_or_stream, body)
n = readall!(buf_or_stream, body)
else
error("unreachable")
end
else
res.body = read(buf_or_stream)
n = length(res.body)
end
elseif res.body isa Base.GenericIOBuffer && buf_or_stream isa Stream
# optimization for IOBuffer response_stream to avoid temporary allocations
readall!(buf_or_stream, res.body)
n = readall!(buf_or_stream, res.body)
else
write(res.body, buf_or_stream)
n = write(res.body, buf_or_stream)
end
else
# read the response body into the request context so that it can be
# read by the user if they want to or set later if
# we end up not retrying/redirecting/etc.
res.request.context[:response_body] = read(buf_or_stream)
end
res.request.context[:nbytes] = n
end

end # module StreamRequest
3 changes: 1 addition & 2 deletions src/clientlayers/TimeoutRequest.jl
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ function timeoutlayer(handler)
# skip
return handler(stream; kw...)
end
io = stream.stream
return try_with_timeout(() -> shouldtimeout(io, readtimeout), readtimeout, () -> close(io)) do
return try_with_timeout(readtimeout) do
handler(stream; kw...)
end
end
Expand Down
35 changes: 35 additions & 0 deletions src/concurrentstack.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
mutable struct Node{T}
value::T
@atomic next::Union{Node{T},Nothing}
end

Node{T}(value::T) where {T} = Node{T}(value, nothing)

mutable struct ConcurrentStack{T}
@atomic next::Union{Node{T},Nothing}
end

ConcurrentStack{T}() where {T} = ConcurrentStack{T}(nothing)

function Base.push!(stack::ConcurrentStack{T}, v) where {T}
v === nothing && throw(ArgumentError("cannot push nothing onto a ConcurrentStack"))
v = convert(T, v)
node = Node{T}(v)
next = @atomic stack.next
while true
@atomic node.next = next
next, ok = @atomicreplace(stack.next, next => node)
ok && break
end
return stack
end

function Base.pop!(stack::ConcurrentStack)
while true
node = @atomic stack.next
node === nothing && return nothing
next = @atomic node.next
next, ok = @atomicreplace(stack.next, node => next)
ok && return node.value
end
end
Loading

0 comments on commit b0007bb

Please sign in to comment.