diff --git a/base/distributed/process_messages.jl b/base/distributed/process_messages.jl index af04aa47a966a..f1d28ac52ac70 100644 --- a/base/distributed/process_messages.jl +++ b/base/distributed/process_messages.jl @@ -317,7 +317,7 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version) let rpid=rpid, wconfig=wconfig if lazy - # The constructor register the object with a global registery. + # The constructor registers the object with a global registery. Worker(rpid, Nullable{Function}(()->connect_to_peer(cluster_manager, rpid, wconfig))) else t = @async connect_to_peer(cluster_manager, rpid, wconfig) diff --git a/test/distributed_exec.jl b/test/distributed_exec.jl index 938d7f238e333..d5c78a0a307df 100644 --- a/test/distributed_exec.jl +++ b/test/distributed_exec.jl @@ -10,7 +10,7 @@ include("testenv.jl") 1 end -addprocs_with_testenv(4) +addprocs_with_testenv(4; lazy=false) @test nprocs() == 5 function reuseport_tests() @@ -49,15 +49,6 @@ end # Test that the client port is reused. SO_REUSEPORT may not be supported on # all UNIX platforms, Linux kernels prior to 3.9 and older versions of OSX if ccall(:jl_has_so_reuseport, Int32, ()) == 1 - # Force all worker-worker connections to be setup. - wlist = reverse(workers()) - @sync for (i,p) in enumerate(wlist) - i==length(wlist) && continue - @async remotecall_fetch(wl -> asyncmap(q->remotecall_fetch(myid, q), wl), - p, wlist[i+1:end]) - - end - reuseport_tests() else info("SO_REUSEPORT is unsupported, skipping reuseport tests.") diff --git a/test/topology.jl b/test/topology.jl index 572c9f210f6dc..b6d223b3f7fe8 100644 --- a/test/topology.jl +++ b/test/topology.jl @@ -94,12 +94,12 @@ remove_workers_and_test() # test `lazy` connection setup function def_count_conn() @everywhere function count_connected_workers() - count(x -> isa(x, Base.Distributed.Worker) && x.state == Base.Distributed.W_CONNECTED, + count(x -> isa(x, Base.Distributed.Worker) && isdefined(x, :r_stream) && isopen(x.r_stream), Base.Distributed.PGRP.workers) end end -addprocs(8) +addprocs_with_testenv(8) def_count_conn() # Test for 10 random combinations @@ -120,8 +120,7 @@ expected_num_conns = 8 num_conns = sum(asyncmap(p->remotecall_fetch(count_connected_workers,p), workers())) @test num_conns == expected_num_conns -for i, comb in enumerate(combinations) - from,to = comb +for (i, (from,to)) in enumerate(combinations) remotecall_wait(topid->remotecall_fetch(myid, topid), from, to) expected_num_conns += 2 # one connection endpoint on both from and to num_conns = sum(asyncmap(p->remotecall_fetch(count_connected_workers,p), workers())) @@ -130,6 +129,6 @@ end # With lazy=false, all connections ought to be setup initially itself rmprocs(workers()) -addprocs(8; lazy=false) +addprocs_with_testenv(8; lazy=false) def_count_conn() @test sum(asyncmap(p->remotecall_fetch(count_connected_workers,p), workers())) == 64