Skip to content

Commit

Permalink
disable_nagle moved to Sockets, split and documented (see #31842 ) (#…
Browse files Browse the repository at this point in the history
…31924)

disable_nagle was split into nagle (which enables or disables Nagle's 
algorithm) and quickack (which enables or disables TCP_QUICKACK on Linux 
systems).
  • Loading branch information
mateuszbaran authored and JeffBezanson committed Jun 6, 2019
1 parent 5776993 commit 7b59c72
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 18 deletions.
15 changes: 2 additions & 13 deletions stdlib/Distributed/src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_std
print(out, '\n')
flush(out)

disable_nagle(sock)
Sockets.nagle(sock, false)
Sockets.quickack(sock, true)

if ccall(:jl_running_on_valgrind,Cint,()) != 0
println(out, "PID = $(getpid())")
Expand Down Expand Up @@ -1180,18 +1181,6 @@ function interrupt(pids::AbstractVector=workers())
end
end


function disable_nagle(sock)
# disable nagle on all OSes
ccall(:uv_tcp_nodelay, Cint, (Ptr{Cvoid}, Cint), sock.handle, 1)
@static if Sys.islinux()
# tcp_quickack is a linux only option
if ccall(:jl_tcp_quickack, Cint, (Ptr{Cvoid}, Cint), sock.handle, 1) < 0
@warn "Networking unoptimized ( Error enabling TCP_QUICKACK : $(Libc.strerror(Libc.errno())) )" maxlog=1
end
end
end

wp_bind_addr(p::LocalProcess) = p.bind_addr
wp_bind_addr(p) = p.config.bind_addr

Expand Down
7 changes: 4 additions & 3 deletions stdlib/Distributed/src/precompile.jl
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ precompile(Tuple{typeof(Distributed.launch), Distributed.LocalManager, Base.Dict
precompile(Tuple{typeof(Distributed.start_worker), Base.PipeEndpoint, String})
precompile(Tuple{typeof(Distributed.socket_reuse_port)})
precompile(Tuple{typeof(Distributed.flush_gc_msgs)})
precompile(Tuple{typeof(Distributed.disable_nagle), Sockets.TCPServer})
precompile(Tuple{typeof(Sockets.nagle), Sockets.TCPServer, Bool})
precompile(Tuple{typeof(Sockets.quickack), Sockets.TCPServer, Bool})
precompile(Tuple{typeof(Distributed.next_tunnel_port)})
precompile(Tuple{typeof(Base._delete!), Base.Dict{Int64, Union{Distributed.Worker, Distributed.LocalProcess}}, Int64})
precompile(Tuple{typeof(Distributed.send_msg_), Distributed.Worker, Distributed.MsgHeader, Distributed.JoinPGRPMsg, Bool})
Expand Down Expand Up @@ -85,7 +86,8 @@ precompile(Tuple{typeof(Distributed.process_hdr), Sockets.TCPSocket, Bool})
precompile(Tuple{typeof(Distributed.deserialize_msg), Distributed.ClusterSerializer{Sockets.TCPSocket}})
precompile(Tuple{typeof(Distributed.null_id), Distributed.RRID})
precompile(Tuple{typeof(Distributed.deliver_result), Sockets.TCPSocket, Symbol, Distributed.RRID, Distributed.RemoteException})
precompile(Tuple{typeof(Distributed.disable_nagle), Sockets.TCPSocket})
precompile(Tuple{typeof(Sockets.nagle), Sockets.TCPSocket, Bool})
precompile(Tuple{typeof(Sockets.quickack), Sockets.TCPSocket, Bool})
precompile(Tuple{typeof(Distributed.message_handler_loop), Sockets.TCPSocket, Sockets.TCPSocket, Bool})
precompile(Tuple{typeof(Distributed.process_tcp_streams), Sockets.TCPSocket, Sockets.TCPSocket, Bool})
precompile(Tuple{Type{Distributed.JoinPGRPMsg}, Int64, Array{Union{Tuple{Any, Int64}, Tuple{Tuple{}, Any, Bool}}, 1}, Symbol, Bool})
Expand Down Expand Up @@ -126,7 +128,6 @@ precompile(Tuple{typeof(Distributed.deregister_worker), Distributed.ProcessGroup
precompile(Tuple{typeof(Distributed.process_hdr), Sockets.TCPSocket, Bool})
precompile(Tuple{typeof(Distributed.null_id), Distributed.RRID})
precompile(Tuple{typeof(Distributed.deliver_result), Sockets.TCPSocket, Symbol, Distributed.RRID, Distributed.RemoteException})
precompile(Tuple{typeof(Distributed.disable_nagle), Sockets.TCPSocket})
precompile(Tuple{typeof(Distributed.message_handler_loop), Sockets.TCPSocket, Sockets.TCPSocket, Bool})
precompile(Tuple{typeof(Distributed.process_tcp_streams), Sockets.TCPSocket, Sockets.TCPSocket, Bool})
precompile(Tuple{typeof(Serialization.deserialize), Distributed.ClusterSerializer{Sockets.TCPSocket}, Type{Union}})
Expand Down
6 changes: 4 additions & 2 deletions stdlib/Distributed/src/process_messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,12 @@ function process_messages(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bo
end

function process_tcp_streams(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool)
disable_nagle(r_stream)
Sockets.nagle(r_stream, false)
Sockets.quickack(r_stream, true)
wait_connected(r_stream)
if r_stream != w_stream
disable_nagle(w_stream)
Sockets.nagle(w_stream, false)
Sockets.quickack(w_stream, true)
wait_connected(w_stream)
end
message_handler_loop(r_stream, w_stream, incoming)
Expand Down
2 changes: 2 additions & 0 deletions stdlib/Sockets/docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ Sockets.send
Sockets.recv
Sockets.recvfrom
Sockets.setopt
Sockets.nagle
Sockets.quickack
```

```@meta
Expand Down
25 changes: 25 additions & 0 deletions stdlib/Sockets/src/Sockets.jl
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,31 @@ function connect(sock::LibuvStream, args...)
return sock
end

"""
nagle(socket::Union{TCPServer, TCPSocket}, enable::Bool)
Enables or disables Nagle's algorithm on a given TCP server or socket.
"""
function nagle(sock::Union{TCPServer, TCPSocket}, enable::Bool)
# disable or enable Nagle's algorithm on all OSes
ccall(:uv_tcp_nodelay, Cint, (Ptr{Cvoid}, Cint), sock.handle, Cint(!enable))
end

"""
quickack(socket::Union{TCPServer, TCPSocket}, enable::Bool)
On Linux systems, the TCP_QUICKACK is disabled or enabled on `socket`.
"""
function quickack(sock::Union{TCPServer, TCPSocket}, enable::Bool)
@static if Sys.islinux()
# tcp_quickack is a linux only option
if ccall(:jl_tcp_quickack, Cint, (Ptr{Cvoid}, Cint), sock.handle, Cint(enable)) < 0
@warn "Networking unoptimized ( Error enabling TCP_QUICKACK : $(Libc.strerror(Libc.errno())) )" maxlog=1
end
end
end


##

const BACKLOG_DEFAULT = 511
Expand Down

1 comment on commit 7b59c72

@nanosoldier
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Executing the daily benchmark build, I will reply here when finished:

@nanosoldier runbenchmarks(ALL, isdaily = true)

Please sign in to comment.