Skip to content

Commit

Permalink
Fix reuse of client port on Linux. Implement for OSX. [ci skip]
Browse files Browse the repository at this point in the history
  • Loading branch information
amitmurthy committed May 12, 2017
1 parent 7ea1620 commit 557352f
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 12 deletions.
32 changes: 23 additions & 9 deletions base/distributed/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -455,31 +455,45 @@ end
const client_port = Ref{Cushort}(0)

function socket_reuse_port()
s = TCPSocket()
s = TCPSocket(delay = false)
client_host = Ref{Cuint}(0)
ccall(:jl_tcp_bind, Int32,
(Ptr{Void}, UInt16, UInt32, Cuint),
s.handle, hton(client_port.x), hton(UInt32(0)), 0) < 0 && throw(SystemError("bind() : "))

# TODO: Support OSX and change the above code to call setsockopt before bind once libuv provides
# early access to a socket fd, i.e., before a bind call.

@static if is_linux()
@static if is_linux() || is_apple()
try
rc = ccall(:jl_tcp_reuseport, Int32, (Ptr{Void},), s.handle)
if rc > 0 # SO_REUSEPORT is unsupported, just return the ephemerally bound socket
return s
elseif rc < 0
throw(SystemError("setsockopt() SO_REUSEPORT : "))
end
getsockname(s)
catch e
# This is an issue only on systems with lots of client connections, hence delay the warning
nworkers() > 128 && warn_once("Error trying to reuse client port number, falling back to plain socket : ", e)
# provide a clean new socket
return TCPSocket()
end
end

ccall(:jl_tcp_bind, Int32,
(Ptr{Void}, UInt16, UInt32, Cuint),
s.handle, hton(client_port[]), hton(UInt32(0)), 0) < 0 && throw(SystemError("bind() : "))

rport = Ref{Cushort}(0)
raddress = zeros(UInt8, 16)
rfamily = Ref{Cuint}(0)

r = ccall(:jl_tcp_getsockname, Int32,
(Ptr{Void}, Ref{Cushort}, Ptr{Void}, Ref{Cuint}),
s.handle, rport, raddress, rfamily)
Base.uv_error("cannot obtain socket name", r)
if r == 0
port = ntoh(rport[])
else
error("cannot obtain socket name")
end

client_port[] = port

return s
end

Expand Down
13 changes: 10 additions & 3 deletions base/socket.jl
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,17 @@ mutable struct TCPSocket <: LibuvStream
return tcp
end
end
function TCPSocket()
function TCPSocket(; delay=true) # kw arg "delay": if true, libuv delays creation of the socket
# fd till the first bind call

tcp = TCPSocket(Libc.malloc(_sizeof_uv_tcp), StatusUninit)
err = ccall(:uv_tcp_init, Cint, (Ptr{Void}, Ptr{Void}),
eventloop(), tcp.handle)
if delay
err = ccall(:uv_tcp_init, Cint, (Ptr{Void}, Ptr{Void}),
eventloop(), tcp.handle)
else
err = ccall(:uv_tcp_init_ex, Cint, (Ptr{Void}, Ptr{Void}, Cuint),
eventloop(), tcp.handle, 2) # AF_INET is 2
end
uv_error("failed to create tcp socket", err)
tcp.status = StatusInit
return tcp
Expand Down
38 changes: 38 additions & 0 deletions test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,44 @@ include("testenv.jl")

addprocs_with_testenv(4)

# Test that the client port is reused. SO_REUSEPORT may not be supported on
# all UNIX platforms, Linux kernels prior to 3.9 and older versions of OSX
if is_unix()
# Run the test on all workers.
results = asyncmap(procs()) do p
remotecall_fetch(p) do
client_ports = []
for w in Base.Distributed.PGRP.workers
if isa(w, Base.Distributed.Worker)
s = w.r_stream
rport = Ref{Cushort}(0)
raddress = zeros(UInt8, 16)
rfamily = Ref{Cuint}(0)

r = ccall(:jl_tcp_getsockname, Int32,
(Ptr{Void}, Ref{Cushort}, Ptr{Void}, Ref{Cuint}),
s.handle, rport, raddress, rfamily)
Base.uv_error("cannot obtain socket name", r)
if r == 0
push!(client_ports, ntoh(rport[]))
else
error("cannot obtain socket name")
end
end
end
@assert length(client_ports) == nworkers()
if !all(i -> i == client_ports[1], client_ports[2:end])
warn("SO_REUSEPORT TESTS FAILED. UNSUPPORTED/OLDER UNIX VERSION?")
return 0
end
return myid()
end
end

# Ensure that the code has indeed been executed on all processes
@test all(p -> p in results, procs())
end

id_me = myid()
id_other = filter(x -> x != id_me, procs())[rand(1:(nprocs()-1))]

Expand Down

0 comments on commit 557352f

Please sign in to comment.