From 121e814bfeec8ac0a45da2c4b5c87189366e1aec Mon Sep 17 00:00:00 2001 From: Ben Arthur Date: Sat, 29 Dec 2018 07:48:42 -0600 Subject: [PATCH] cluster manager fixes (#30172) * kill workers which don't launch properly * don't emit spurious error messages * document how to asynchronously launch workers --- stdlib/Distributed/src/cluster.jl | 51 ++++++++++++++++++--- stdlib/Distributed/test/distributed_exec.jl | 2 +- 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/stdlib/Distributed/src/cluster.jl b/stdlib/Distributed/src/cluster.jl index 747919e56b07e..9ac30c94f0748 100644 --- a/stdlib/Distributed/src/cluster.jl +++ b/stdlib/Distributed/src/cluster.jl @@ -241,6 +241,11 @@ function redirect_worker_output(ident, stream) end end +struct LaunchWorkerError <: Exception + msg::String +end + +Base.showerror(io::IO, e::LaunchWorkerError) = print(io, e.msg) # The default TCP transport relies on the worker listening on a free # port available and printing its bind address and port. @@ -272,7 +277,7 @@ function read_worker_host_port(io::IO) conninfo = fetch(readtask) if isempty(conninfo) && !isopen(io) - error("Unable to read host:port string from worker. Launch command exited with error?") + throw(LaunchWorkerError("Unable to read host:port string from worker. Launch command exited with error?")) end ntries -= 1 @@ -286,13 +291,13 @@ function read_worker_host_port(io::IO) end close(io) if ntries > 0 - error("Timed out waiting to read host:port string from worker.") + throw(LaunchWorkerError("Timed out waiting to read host:port string from worker.")) else - error("Unexpected output from worker launch command. Host:port string not found.") + throw(LaunchWorkerError("Unexpected output from worker launch command. Host:port string not found.")) end finally for line in leader - println("\tFrom failed worker startup:\t", line) + println("\tFrom worker startup:\t", line) end end end @@ -354,6 +359,34 @@ the package `ClusterManagers.jl`. The number of seconds a newly launched worker waits for connection establishment from the master can be specified via variable `JULIA_WORKER_TIMEOUT` in the worker process's environment. Relevant only when using TCP/IP as transport. + +To launch workers without blocking the REPL, or the containing function +if launching workers programmatically, execute `addprocs` in its own task. + +# Examples + +``` +# On busy clusters, call `addprocs` asynchronously +t = @async addprocs(...) +``` + +``` +# Utilize workers as and when they come online +if nprocs() > 1 # Ensure at least one new worker is available + .... # perform distributed execution +end +``` + +``` +# Retrieve newly launched worker IDs, or any error messages +if istaskdone(t) # Check if `addprocs` has completed to ensure `fetch` doesn't block + if nworkers() == N + new_pids = fetch(t) + else + fetch(t) + end + end +``` """ function addprocs(manager::ClusterManager; kwargs...) init_multi() @@ -499,9 +532,13 @@ function create_worker(manager, wconfig) local r_s, w_s try (r_s, w_s) = connect(manager, w.id, wconfig) - catch - deregister_worker(w.id) - rethrow() + catch ex + try + deregister_worker(w.id) + kill(manager, w.id, wconfig) + finally + rethrow(ex) + end end w = Worker(w.id, r_s, w_s, manager; config=wconfig) diff --git a/stdlib/Distributed/test/distributed_exec.jl b/stdlib/Distributed/test/distributed_exec.jl index 3b4bd58eb2f1a..4c8c467673360 100644 --- a/stdlib/Distributed/test/distributed_exec.jl +++ b/stdlib/Distributed/test/distributed_exec.jl @@ -1149,7 +1149,7 @@ for (addp_testf, expected_errstr, env) in testruns old_stdout = stdout stdout_out, stdout_in = redirect_stdout() stdout_txt = @async filter!(readlines(stdout_out)) do s - return !startswith(s, "\tFrom failed worker startup:\t") + return !startswith(s, "\tFrom worker startup:\t") end try withenv(env...) do