diff --git a/base/distributed/managers.jl b/base/distributed/managers.jl index 8654218f5b5a3..1aaa39273da96 100644 --- a/base/distributed/managers.jl +++ b/base/distributed/managers.jl @@ -455,16 +455,10 @@ end const client_port = Ref{Cushort}(0) function socket_reuse_port() - s = TCPSocket() + s = TCPSocket(delay = false) client_host = Ref{Cuint}(0) - ccall(:jl_tcp_bind, Int32, - (Ptr{Void}, UInt16, UInt32, Cuint), - s.handle, hton(client_port.x), hton(UInt32(0)), 0) < 0 && throw(SystemError("bind() : ")) - - # TODO: Support OSX and change the above code to call setsockopt before bind once libuv provides - # early access to a socket fd, i.e., before a bind call. - @static if is_linux() + @static if is_linux() || is_apple() try rc = ccall(:jl_tcp_reuseport, Int32, (Ptr{Void},), s.handle) if rc > 0 # SO_REUSEPORT is unsupported, just return the ephemerally bound socket @@ -472,7 +466,6 @@ function socket_reuse_port() elseif rc < 0 throw(SystemError("setsockopt() SO_REUSEPORT : ")) end - getsockname(s) catch e # This is an issue only on systems with lots of client connections, hence delay the warning nworkers() > 128 && warn_once("Error trying to reuse client port number, falling back to plain socket : ", e) @@ -480,6 +473,27 @@ function socket_reuse_port() return TCPSocket() end end + + ccall(:jl_tcp_bind, Int32, + (Ptr{Void}, UInt16, UInt32, Cuint), + s.handle, hton(client_port[]), hton(UInt32(0)), 0) < 0 && throw(SystemError("bind() : ")) + + rport = Ref{Cushort}(0) + raddress = zeros(UInt8, 16) + rfamily = Ref{Cuint}(0) + + r = ccall(:jl_tcp_getsockname, Int32, + (Ptr{Void}, Ref{Cushort}, Ptr{Void}, Ref{Cuint}), + s.handle, rport, raddress, rfamily) + Base.uv_error("cannot obtain socket name", r) + if r == 0 + port = ntoh(rport[]) + else + error("cannot obtain socket name") + end + + client_port[] = port + return s end diff --git a/base/socket.jl b/base/socket.jl index 7ca828205c46c..21513336c7cc3 100644 --- a/base/socket.jl +++ b/base/socket.jl @@ -282,10 +282,17 @@ mutable struct TCPSocket <: LibuvStream return tcp end end -function TCPSocket() +function TCPSocket(; delay=true) # kw arg "delay": if true, libuv delays creation of the socket + # fd till the first bind call + tcp = TCPSocket(Libc.malloc(_sizeof_uv_tcp), StatusUninit) - err = ccall(:uv_tcp_init, Cint, (Ptr{Void}, Ptr{Void}), - eventloop(), tcp.handle) + if delay + err = ccall(:uv_tcp_init, Cint, (Ptr{Void}, Ptr{Void}), + eventloop(), tcp.handle) + else + err = ccall(:uv_tcp_init_ex, Cint, (Ptr{Void}, Ptr{Void}, Cuint), + eventloop(), tcp.handle, 2) # AF_INET is 2 + end uv_error("failed to create tcp socket", err) tcp.status = StatusInit return tcp diff --git a/test/distributed_exec.jl b/test/distributed_exec.jl index 1f7bdfb1a20f5..1dcff921faca7 100644 --- a/test/distributed_exec.jl +++ b/test/distributed_exec.jl @@ -12,6 +12,44 @@ include("testenv.jl") addprocs_with_testenv(4) +# Test that the client port is reused. SO_REUSEPORT may not be supported on +# all UNIX platforms, Linux kernels prior to 3.9 and older versions of OSX +if is_unix() + # Run the test on all workers. + results = asyncmap(procs()) do p + remotecall_fetch(p) do + client_ports = [] + for w in Base.Distributed.PGRP.workers + if isa(w, Base.Distributed.Worker) + s = w.r_stream + rport = Ref{Cushort}(0) + raddress = zeros(UInt8, 16) + rfamily = Ref{Cuint}(0) + + r = ccall(:jl_tcp_getsockname, Int32, + (Ptr{Void}, Ref{Cushort}, Ptr{Void}, Ref{Cuint}), + s.handle, rport, raddress, rfamily) + Base.uv_error("cannot obtain socket name", r) + if r == 0 + push!(client_ports, ntoh(rport[])) + else + error("cannot obtain socket name") + end + end + end + @assert length(client_ports) == nworkers() + if !all(i -> i == client_ports[1], client_ports[2:end]) + warn("SO_REUSEPORT TESTS FAILED. UNSUPPORTED/OLDER UNIX VERSION?") + return 0 + end + return myid() + end + end + + # Ensure that the code has indeed been executed on all processes + @test all(p -> p in results, procs()) +end + id_me = myid() id_other = filter(x -> x != id_me, procs())[rand(1:(nprocs()-1))]