Skip to content

Commit

Permalink
do not lock up addproc on worker setup timeout
Browse files Browse the repository at this point in the history
This is a corollary to the previous commit in JuliaLang#32290, and implements 
suggestions thereof.

It restricts the master to wait for a worker to respond within 
`Distributed.worker_timeout()` seconds. Beyond that it releases the lock 
on `rr_ntfy_join` with a special flag `:TIMEDOUT`. This flag is set to 
`:ERROR` in case of any errors during worker setup, and to `:OK` when 
the master received a `JoinCompleteMsg` indicating setup completion from 
worker.

`addprocs` returns the worker id in the list of workers it added only if 
it has received a `JoinCompleteMsg`, that is, only when `rr_ntfy_join` 
contains `:OK`. Note that the worker process may not be dead yet, and it 
may still be listed in `workers()` until it actually goes down.
  • Loading branch information
s2maki committed Jun 15, 2019
1 parent 0c9f1f1 commit e908be2
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 3 deletions.
10 changes: 8 additions & 2 deletions base/distributed/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -461,8 +461,14 @@ function create_worker(manager, wconfig)
# Start a new task to handle inbound messages from connected worker in master.
# Also calls `wait_connected` on TCP streams.
procmsg_task = process_messages(w.r_stream, w.w_stream, false)
timeout = worker_timeout()
Timer(1, 1) do timer
istaskstarted(procmsg_task) && istaskdone(procmsg_task) && put!(rr_ntfy_join, nothing)
timeout -= 1
if timeout <= 0.0
put!(rr_ntfy_join, :TIMEDOUT)
elseif istaskdone(procmsg_task)
put!(rr_ntfy_join, :ERROR)
end
isready(rr_ntfy_join) && close(timer)
end

Expand Down Expand Up @@ -519,7 +525,7 @@ function create_worker(manager, wconfig)
delete!(PGRP.refs, ntfy_oid)
end

return istaskdone(procmsg_task) ? 0 : w.id
return (fetch(rr_ntfy_join.c) === :OK) ? w.id : 0
end


Expand Down
2 changes: 1 addition & 1 deletion base/distributed/process_messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ function handle_msg(msg::JoinCompleteMsg, header, r_stream, w_stream, version)
w.version = version

ntfy_channel = lookup_ref(header.notify_oid)
put!(ntfy_channel, w.id)
put!(ntfy_channel, :OK)

push!(default_worker_pool(), w.id)
end

0 comments on commit e908be2

Please sign in to comment.