Skip to content

Commit

Permalink
fixups
Browse files Browse the repository at this point in the history
  • Loading branch information
amitmurthy committed Jul 18, 2017
1 parent 6b6b4b1 commit 17c83ca
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 16 deletions.
2 changes: 1 addition & 1 deletion base/distributed/process_messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version)

let rpid=rpid, wconfig=wconfig
if lazy
# The constructor register the object with a global registery.
# The constructor registers the object with a global registery.
Worker(rpid, Nullable{Function}(()->connect_to_peer(cluster_manager, rpid, wconfig)))
else
t = @async connect_to_peer(cluster_manager, rpid, wconfig)
Expand Down
11 changes: 1 addition & 10 deletions test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ include("testenv.jl")
1
end

addprocs_with_testenv(4)
addprocs_with_testenv(4; lazy=false)
@test nprocs() == 5

function reuseport_tests()
Expand Down Expand Up @@ -49,15 +49,6 @@ end
# Test that the client port is reused. SO_REUSEPORT may not be supported on
# all UNIX platforms, Linux kernels prior to 3.9 and older versions of OSX
if ccall(:jl_has_so_reuseport, Int32, ()) == 1
# Force all worker-worker connections to be setup.
wlist = reverse(workers())
@sync for (i,p) in enumerate(wlist)
i==length(wlist) && continue
@async remotecall_fetch(wl -> asyncmap(q->remotecall_fetch(myid, q), wl),
p, wlist[i+1:end])

end

reuseport_tests()
else
info("SO_REUSEPORT is unsupported, skipping reuseport tests.")
Expand Down
9 changes: 4 additions & 5 deletions test/topology.jl
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ remove_workers_and_test()
# test `lazy` connection setup
function def_count_conn()
@everywhere function count_connected_workers()
count(x -> isa(x, Base.Distributed.Worker) && x.state == Base.Distributed.W_CONNECTED,
count(x -> isa(x, Base.Distributed.Worker) && isdefined(x, :r_stream) && isopen(x.r_stream),
Base.Distributed.PGRP.workers)
end
end

addprocs(8)
addprocs_with_testenv(8)
def_count_conn()

# Test for 10 random combinations
Expand All @@ -120,8 +120,7 @@ expected_num_conns = 8
num_conns = sum(asyncmap(p->remotecall_fetch(count_connected_workers,p), workers()))
@test num_conns == expected_num_conns

for i, comb in enumerate(combinations)
from,to = comb
for (i, (from,to)) in enumerate(combinations)
remotecall_wait(topid->remotecall_fetch(myid, topid), from, to)
expected_num_conns += 2 # one connection endpoint on both from and to
num_conns = sum(asyncmap(p->remotecall_fetch(count_connected_workers,p), workers()))
Expand All @@ -130,6 +129,6 @@ end

# With lazy=false, all connections ought to be setup initially itself
rmprocs(workers())
addprocs(8; lazy=false)
addprocs_with_testenv(8; lazy=false)
def_count_conn()
@test sum(asyncmap(p->remotecall_fetch(count_connected_workers,p), workers())) == 64

0 comments on commit 17c83ca

Please sign in to comment.