Skip to content

Commit

Permalink
Address PR review
Browse files Browse the repository at this point in the history
  • Loading branch information
quinnj committed Apr 27, 2023
1 parent 6fed5e3 commit 309f8dd
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 15 deletions.
2 changes: 1 addition & 1 deletion docs/src/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ If `true`, `HTTP.StatusError`, `HTTP.TimeoutError`, `HTTP.IOError`, and `HTTP.Co

### `observelayers`

If `true`, enables the `HTTP.observe_layer` to wrap each client-side "layer" to track the amount of time spent in each layer as a request is processed. This can be useful for debugging performance issues. Note that when retries or redirects happen, the time spent in each layer is cumulative, as noted by the `[layer]_count`. The metrics are stored in the `Request.context` dictionary, and can be accessed like `HTTP.get(...).request.context`.
If `true`, enables the `HTTP.observelayer` to wrap each client-side "layer" to track the amount of time spent in each layer as a request is processed. This can be useful for debugging performance issues. Note that when retries or redirects happen, the time spent in each layer is cumulative, as noted by the `[layer]_count`. The metrics are stored in the `Request.context` dictionary, and can be accessed like `HTTP.get(...).request.context`.

### `basicauth`

Expand Down
19 changes: 12 additions & 7 deletions src/Connections.jl
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,14 @@ const nolimit = typemax(Int)

taskid(t=current_task()) = string(hash(t) & 0xffff, base=16, pad=4)

const default_connection_limit = Ref(16)
const default_connection_limit = Ref{Int}()

function __init__()
default_connection_limit[] = Threads.nthreads() * 4
# default connection limit is 4x the number of threads
# this was chosen after some empircal benchmarking on aws/azure machines
# where, for non-data-intensive workloads, having at least 4x ensured
# there was no artificial restriction on overall throughput
default_connection_limit[] = max(16, Threads.nthreads() * 4)
nosslcontext[] = OpenSSL.SSLContext(OpenSSL.TLSClientMethod())
TCP_POOL[] = CPool{Sockets.TCPSocket}(default_connection_limit[])
MBEDTLS_POOL[] = CPool{MbedTLS.SSLContext}(default_connection_limit[])
Expand All @@ -53,6 +57,7 @@ Fields:
- `port::String`, exactly as specified in the URI (i.e. may be empty).
- `idle_timeout`, No. of seconds to maintain connection after last request/response.
- `require_ssl_verification`, whether ssl verification is required for an ssl connection
- `keepalive`, whether the tcp socket should have keepalive enabled
- `peerip`, remote IP adress (used for debug/log messages).
- `peerport`, remote TCP port number (used for debug/log messages).
- `localport`, local TCP port number (used for debug messages).
Expand Down Expand Up @@ -89,8 +94,8 @@ end
Used for "hashing" a Connection object on just the key properties necessary for determining
connection re-useability. That is, when a new request calls `newconnection`, we take the
request parameters of what socket type, the host and port, and if ssl
verification is required, and if an existing Connection was already created with the exact
request parameters of host and port, and if ssl verification is required, if keepalive is enabled,
and if an existing Connection was already created with the exact
same parameters, we can re-use it (as long as it's not already being used, obviously).
"""
connectionkey(x::Connection) = (x.host, x.port, x.require_ssl_verification, x.keepalive, x.clientconnection)
Expand Down Expand Up @@ -346,7 +351,7 @@ and defaults to the `HTTP.default_connection_limit` value.
A pool can be passed to any of the `HTTP.request` methods via the `pool` keyword argument.
"""
mutable struct Pool
struct Pool
lock::ReentrantLock
tcp::CPool{Sockets.TCPSocket}
mbedtls::CPool{MbedTLS.SSLContext}
Expand Down Expand Up @@ -413,7 +418,7 @@ function closeall(pool::Union{Nothing, Pool}=nothing)
end

function connection_isvalid(c, idle_timeout)
check = isopen(c) && (time() - c.timestamp) <= idle_timeout
check = isopen(c) && inactiveseconds(c) <= idle_timeout
check || close(c)
return check
end
Expand Down Expand Up @@ -628,7 +633,7 @@ function sslupgrade(::Type{IOType}, c::Connection{T},
# success, now we turn it into a new Connection
conn = Connection(host, "", 0, require_ssl_verification, keepalive, tls)
# release the "old" one, but don't return the connection since we're hijacking the socket
release(getpool(pool, T), connectionkey(c), nothing)
release(getpool(pool, T), connectionkey(c))
# and return the new one
return acquire(() -> conn, getpool(pool, IOType), connectionkey(conn); forcenew=true)
end
Expand Down
10 changes: 5 additions & 5 deletions src/HTTP.jl
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ getrequest(r::Request) = r
getrequest(s::Stream) = s.message.request

# Wraps client-side "layer" to track the amount of time spent in it as a request is processed.
function observe_layer(f)
function observelayer(req_or_stream; kw...)
function observelayer(f)
function observation(req_or_stream; kw...)
req = getrequest(req_or_stream)
nm = nameof(f)
cntnm = Symbol(nm, "_count")
Expand Down Expand Up @@ -158,7 +158,7 @@ Supported optional keyword arguments:
- `logerrors = false`, if `true`, `HTTP.StatusError`, `HTTP.TimeoutError`, `HTTP.IOError`, and `HTTP.ConnectError` will be
logged via `@error` as they happen, regardless of whether the request is then retried or not. Useful for debugging or
monitoring requests where there's worry of certain errors happening but ignored because of retries.
- `observelayers = false`, if `true`, enables the `HTTP.observe_layer` to wrap each client-side "layer" to track the amount of
- `observelayers = false`, if `true`, enables the `HTTP.observelayer` to wrap each client-side "layer" to track the amount of
time spent in each layer as a request is processed. This can be useful for debugging performance issues. Note that when retries
or redirects happen, the time spent in each layer is cumulative, as noted by the `[layer]_count`. The metrics are stored
in the `Request.context` dictionary, and can be accessed like `HTTP.get(...).request.context`
Expand Down Expand Up @@ -420,7 +420,7 @@ function stack(
requestlayers=(),
streamlayers=())

obs = observelayers ? observe_layer : identity
obs = observelayers ? observelayer : identity
# stream layers
if streamlayers isa NamedTuple
inner_stream_layers = haskey(streamlayers, :last) ? streamlayers.last : ()
Expand Down Expand Up @@ -602,7 +602,7 @@ write(socket, frame)
"""
function openraw(method::Union{String,Symbol}, url, headers=Header[]; kw...)::Tuple{IO, Response}
socketready = Channel{Tuple{IO, Response}}(0)
@async HTTP.open(method, url, headers; kw...) do http
Threads.@spawn HTTP.open(method, url, headers; kw...) do http
HTTP.startread(http)
socket = http.stream
put!(socketready, (socket, http.message))
Expand Down
2 changes: 1 addition & 1 deletion src/Servers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ Whatever you type on the client will be displayed on the server and vis-versa.
using HTTP
function chat(io::HTTP.Stream)
@async while !eof(io)
Threads.@spawn while !eof(io)
write(stdout, readavailable(io), "\\n")
end
while isopen(io)
Expand Down
2 changes: 1 addition & 1 deletion src/clientlayers/MessageRequest.jl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Construct a [`Request`](@ref) object from method, url, headers, and body.
Hard-coded as the first layer in the request pipeline.
"""
function messagelayer(handler)
return function(method::String, url::URI, headers, body; copyheaders::Bool=true, response_stream=nothing, http_version=HTTPVersion(1, 1), verbose=DEBUG_LEVEL[], kw...)
return function makerequest(method::String, url::URI, headers, body; copyheaders::Bool=true, response_stream=nothing, http_version=HTTPVersion(1, 1), verbose=DEBUG_LEVEL[], kw...)
req = Request(method, resource(url), mkreqheaders(headers, copyheaders), body; url=url, version=http_version, responsebody=response_stream)
local resp
start_time = time()
Expand Down

0 comments on commit 309f8dd

Please sign in to comment.