Skip to content

Commit

Permalink
Introduced ClusterManager. Externalized support for new cluster types.
Browse files Browse the repository at this point in the history
  • Loading branch information
amitmurthy committed Jul 9, 2013
1 parent 413d257 commit 17b3986
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 154 deletions.
3 changes: 1 addition & 2 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1092,8 +1092,6 @@ export

# multiprocessing
addprocs,
addprocs_scyld,
addprocs_sge,
fetch,
isready,
yield,
Expand All @@ -1110,6 +1108,7 @@ export
remotecall_wait,
take,
wait,
ClusterManager,

# distributed arrays
distribute,
Expand Down
281 changes: 129 additions & 152 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
## julia starts with one process, and processors can be added using:
## addprocs(n) using exec
## addprocs({"host1","host2",...}) using remote execution
## addprocs_scyld(n) using Scyld ClusterWare
## addprocs_sge(n) using Sun Grid Engine batch queue
##
## remotecall(w, func, args...) -
## tell a worker to call a function on the given arguments.
Expand Down Expand Up @@ -210,7 +208,7 @@ function add_workers(pg::ProcessGroup, w::Array{Any,1})
for i=1:length(w)
send_msg_now(w[i], :join_pgrp, w[i].id, all_locs)
end
:ok
[w[i].id for i in 1:length(w)]
end

myid() = LPROC.id
Expand Down Expand Up @@ -860,7 +858,7 @@ function create_message_handler_loop(sock::AsyncStream) #returns immediately
end)
end

function disable_parallel_libs()
function disable_threaded_libs()
blas_set_num_threads(1)
end

Expand All @@ -879,7 +877,7 @@ function start_worker(out::IO)
# close STDIN; workers will not use it
#close(STDIN)

disable_parallel_libs()
disable_threaded_libs()

ccall(:jl_install_sigint_handler, Void, ())

Expand All @@ -896,42 +894,86 @@ function start_worker(out::IO)
exit(0)
end


function start_remote_workers(machines, cmds, tunnel=false, sshflags=``)
n = length(cmds)
outs = cell(n)
function start_cluster_workers(n, config)
w = cell(n)
for i=1:n
outs[i],_ = readsfrom(cmds[i])
outs[i].line_buffered = true
cman = config[:cman]

# Get the cluster manager to launch the instance
(insttype, instances) = cman.launch_cb(n, config)


if insttype == :io_only
read_cb_response(inst) =
begin
(host, port) = read_worker_host_port(inst)
inst, host, port
end
elseif insttype == :io_host
read_cb_response(inst) =
begin
io = inst[1]
(_, port) = read_worker_host_port(io)
io, inst[2], port
end
elseif insttype == :io_host_port
read_cb_response(inst) = (inst[1], inst[2], inst[3])
elseif insttype == :host_port
read_cb_response(inst) = (nothing, inst[1], inst[2])
elseif insttype == :cmd
read_cb_response(inst) =
begin
io,_ = readsfrom(detach(inst))
io.line_buffered = true
(host, port) = read_worker_host_port(io)
io, host, port
end
else
error("Unsupported format from Cluster Manager callback")
end

for i=1:n
local hostname::String, port::Int16
stream = outs[i]
stream.line_buffered = true
while true
conninfo = readline(stream)
private_hostname, port = parse_connection_info(conninfo)
if private_hostname != ""
break
end
(io, host, port) = read_cb_response(instances[i])
w[i] = create_worker(host, port, io, config)
end
w
end

function read_worker_host_port (io::IO)
io.line_buffered = true
while true
conninfo = readline(io)
private_hostname, port = parse_connection_info(conninfo)
if private_hostname != ""
return private_hostname, port
end

s = split(machines[i],'@')
if length(s) > 1
user = s[1]
hostname = s[2]
else
end
end

function create_worker(hostname, port, stream, config)
tunnel = config[:tunnel]

s = split(hostname,'@')
if length(s) > 1
user = s[1]
hostname = s[2]
else
if haskey(ENV, "USER")
user = ENV["USER"]
hostname = s[1]
end

if tunnel
w[i] = Worker(hostname, port, user, sshflags)
else
w[i] = Worker(hostname, port)
elseif tunnel
error("USER must be specified either in the environment or as part of the hostname when tunnel option is used.")
end
let wrker = w[i]
hostname = s[1]
end

if tunnel
sshflags = config[:sshflags]
w = Worker(hostname, port, user, sshflags)
else
w = Worker(hostname, port)
end

if isa(stream, AsyncStream)
let wrker = w
# redirect console output from workers to the client's stdout:
start_reading(stream,function(stream::AsyncStream,nread::Int)
if nread>0
Expand All @@ -950,6 +992,7 @@ function start_remote_workers(machines, cmds, tunnel=false, sshflags=``)
w
end


function parse_connection_info(str)
m = match(r"^julia_worker:(\d+)#(.*)", str)
if m != nothing
Expand Down Expand Up @@ -977,135 +1020,69 @@ function ssh_tunnel(user, host, port, sshflags)
localp
end

#function worker_ssh_cmd(host, key)
# `ssh -i $key -n $host "sh -l -c \"cd $JULIA_HOME && ./julia-release-basic --worker\""`
#end

# start and connect to processes via SSH.
# optionally through an SSH tunnel.
# the tunnel is only used from the head (process 1); the nodes are assumed
# to be mutually reachable without a tunnel, as is often the case in a cluster.
function addprocs(machines::AbstractVector;
tunnel=false, dir=JULIA_HOME, exename="./julia-release-basic", sshflags::Cmd=``)
add_workers(PGRP,
start_remote_workers(machines,
map(m->detach(`ssh -n $sshflags $m "sh -l -c \"cd $dir && $exename --worker\""`),
machines),
tunnel, sshflags))
end

#function addprocs_ssh(machines, keys)
# if !(isa(keys, Array)) && isa(machines,Array)
# key = keys
# keys = [ key for x = 1:length(machines)]
# cmdargs = { {machines[x],keys[x]} for x = 1:length(machines)}
# else
# cmdargs = {{machines,keys}}
# end #if/else
# add_workers(PGRP, start_remote_workers(machines, map(x->worker_ssh_cmd(x[1],x[2]), cmdargs)))
#end
abstract ClusterManager

worker_local_cmd() = `$JULIA_HOME/julia-release-basic --bind-to $bind_addr --worker`

function addprocs(np::Integer)
disable_parallel_libs()
add_workers(PGRP, start_remote_workers({ "localhost" for i=1:np },
{ worker_local_cmd() for i=1:np }))
end

function start_scyld_workers(np::Integer)
home = JULIA_HOME
beomap_cmd = `beomap --no-local --np $np`
out,beomap_proc = readsfrom(beomap_cmd)
wait(beomap_proc)
if !success(beomap_proc)
error("node availability inaccessible (could not run beomap)")
end
nodes = split(chomp(readline(out)),':')
outs = cell(np)
for (i,node) in enumerate(nodes)
cmd = detach(`bpsh $node sh -l -c "cd $home && ./julia-release-basic --worker"`)
outs[i],_ = readsfrom(cmd)
outs[i].line_buffered = true
end
workers = cell(np)
for (i,stream) in enumerate(outs)
local hostname::String, port::Int16
stream.line_buffered = true
while true
conninfo = readline(stream)
hostname, port = parse_connection_info(conninfo)
if hostname != ""
break
end
end
workers[i] = Worker(hostname, port)
let worker = workers[i]
# redirect console output from workers to the client's stdout:
start_reading(stream,function(stream::AsyncStream,nread::Int)
if(nread>0)
try
line = readbytes(stream.buffer, nread)
print("\tFrom worker $(worker.id):\t",line)
catch err
println(STDERR,"\tError parsing reply from worker $(worker.id):\t",err)
return false
end
end
true
end)
function launch_procs(n::Integer, config::Dict)
dir = config[:dir]
exename = config[:exename]
exeflags = config[:exeflags]

cman = config[:cman]
if cman.ssh
sshflags = config[:sshflags]
outs=cell(n)
for i in 1:n
m = cman.machines[i]
cmd = detach(`ssh -n $sshflags $m "sh -l -c \"cd $dir && $exename $exeflags\""`)
io,_ = readsfrom(cmd)
io.line_buffered = true
local port::Int16
(_, port) = read_worker_host_port (io)

# We ignore the hostname printed by the worker, since the worker may be behind a NATed interface,
# we just use the hostname specified by the caller as part of the machine name
outs[i] = (io, m, port)
end
return (:io_host_port, outs)

else
worker_local_cmd = `$(dir)/$(exename) --bind-to $bind_addr $exeflags`
return (:cmd, {worker_local_cmd for i in 1:n})
end
workers
end

function addprocs_scyld(np::Integer)
disable_parallel_libs()
add_workers(PGRP, start_scyld_workers(np))
immutable RegularCluster <: ClusterManager
launch_cb::Function
ssh::Bool
machines

RegularCluster(; ssh=false, machines=[]) = new(launch_procs, ssh, machines)
end

function start_sge_workers(n)
home = JULIA_HOME
sgedir = joinpath(pwd(),"SGE")
run(`mkdir -p $sgedir`)
qsub_cmd = `echo $home/julia-release-basic --worker` |> `qsub -N JULIA -terse -cwd -j y -o $sgedir -t 1:$n`
out,qsub_proc = readsfrom(qsub_cmd)
if !success(qsub_proc)
error("batch queue not available (could not run qsub)")
end
id = chomp(split(readline(out),'.')[1])
println("job id is $id")
print("waiting for job to start");
workers = cell(n)
for i=1:n
# wait for each output stream file to get created
fname = "$sgedir/JULIA.o$(id).$(i)"
local fl, hostname, port
fexists = false
sleep(0.5)
while !fexists
try
fl = open(fname)
try
conninfo = readline(fl)
hostname, port = parse_connection_info(conninfo)
finally
close(fl)
end
fexists = (hostname != "")
catch
print(".");
sleep(0.5)
end
# start and connect to processes via SSH.
# optionally through an SSH tunnel.
# the tunnel is only used from the head (process 1); the nodes are assumed
# to be mutually reachable without a tunnel, as is often the case in a cluster.
function addprocs(instances::Union(AbstractVector, Integer);
tunnel=false, dir=JULIA_HOME, exename="./julia-release-basic", sshflags::Cmd=``, cman=nothing)

config={:dir=>dir, :exename=>exename, :exeflags=>` --worker `, :tunnel=>tunnel, :sshflags=>sshflags}
disable_threaded_libs()

if isa(instances, AbstractVector) && (cman == nothing)
config[:cman] = RegularCluster(ssh=true, machines=instances)
return add_workers(PGRP, start_cluster_workers(length(instances), config))
else
if isa(cman, ClusterManager)
config[:cman] = cman
else
config[:cman] = RegularCluster()
end
#print("hostname=$hostname, port=$port\n")
workers[i] = Worker(hostname, port)
return add_workers(PGRP, start_cluster_workers(instances, config))
end
print("\n")
workers
end

addprocs_sge(n) = add_workers(PGRP, start_sge_workers(n))

## higher-level functions: spawn, pmap, pfor, etc. ##

Expand Down

0 comments on commit 17b3986

Please sign in to comment.