Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Submit commands on another host via ssh #204

Merged
merged 17 commits into from
Aug 5, 2020
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions lib/ood_core/job/adapters/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,15 @@ class Helper
def self.bin_path(cmd, bin_default, bin_overrides)
bin_overrides.fetch(cmd.to_s) { Pathname.new(bin_default.to_s).join(cmd.to_s).to_s }
end

# Gets a command that submits command on another host via ssh
# @param cmd [String] the desired command to execute on another host
# @param submit_host [String] where to submit the command
# @return [String] command wrapped in ssh if submit_host is present
def self.ssh_wrap(cmd, submit_host)
return cmd if submit_host.empty?
"ssh -t -o BatchMode=yes -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no #{submit_host} \"#{cmd}\""
matthu017 marked this conversation as resolved.
Show resolved Hide resolved
end
end
end
end
Expand Down
1 change: 1 addition & 0 deletions lib/ood_core/job/adapters/lsf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class Factory
# @option config [#to_s] :serverdir ('') Path to lsf client etc dir
# @option config [#to_s] :cluster ('') name of cluster, if in multi-cluster mode
# @option config [#to_h] :bin_overrides ({}) Optional overrides to LSF client executables
# @option config [#to_s] :submit_host ('') Host to submit commands to
def self.build_lsf(config)
batch = Adapters::Lsf::Batch.new(config.to_h.symbolize_keys)
Adapters::Lsf.new(batch: batch)
Expand Down
7 changes: 4 additions & 3 deletions lib/ood_core/job/adapters/lsf/batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@
#
# @api private
class OodCore::Job::Adapters::Lsf::Batch
attr_reader :bindir, :libdir, :envdir, :serverdir, :cluster, :bin_overrides
attr_reader :bindir, :libdir, :envdir, :serverdir, :cluster, :bin_overrides, :submit_host

# The root exception class that all LSF-specific exceptions inherit
# from
class Error < StandardError; end

# @param bin [#to_s] path to LSF installation binaries
def initialize(bindir: "", envdir: "", libdir: "", serverdir: "", cluster: "", bin_overrides: {}, **_)
def initialize(bindir: "", envdir: "", libdir: "", serverdir: "", cluster: "", bin_overrides: {}, submit_host: "", **_)
@bindir = Pathname.new(bindir.to_s)

@envdir = Pathname.new(envdir.to_s)
@libdir = Pathname.new(libdir.to_s)
@serverdir = Pathname.new(serverdir.to_s)
@cluster = cluster.to_s
@bin_overrides = bin_overrides
@submit_host = submit_host.to_s
end

def default_env
Expand Down Expand Up @@ -141,6 +141,7 @@ def cluster_args
# Call a forked Lsf command for a given cluster
def call(cmd, *args, env: {}, stdin: "")
cmd = OodCore::Job::Adapters::Helper.bin_path(cmd, bindir, bin_overrides)
cmd = OodCore::Job::Adapters::Helper.ssh_wrap(cmd, submit_host)
matthu017 marked this conversation as resolved.
Show resolved Hide resolved
args = cluster_args + args
env = default_env.merge(env.to_h)
o, e, s = Open3.capture3(env, cmd, *(args.map(&:to_s)), stdin_data: stdin.to_s)
Expand Down
15 changes: 13 additions & 2 deletions lib/ood_core/job/adapters/pbspro.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,19 @@ class Factory
# Build the PBS Pro adapter from a configuration
# @param config [#to_h] the configuration for job adapter
# @option config [Object] :host (nil) The batch server host
# @option config [Object] :submit_host (nil) The login node where the job is submitted
# @option config [Object] :exec (nil) Path to PBS Pro executables
# @option config [Object] :qstat_factor (nil) Deciding factor on how to
# call qstat for a user
# @option config [#to_h] :bin_overrides ({}) Optional overrides to PBS Pro client executables
def self.build_pbspro(config)
c = config.to_h.compact.symbolize_keys
host = c.fetch(:host, nil)
submit_host = c.fetch(:submit_host, "")
ericfranz marked this conversation as resolved.
Show resolved Hide resolved
pbs_exec = c.fetch(:exec, nil)
qstat_factor = c.fetch(:qstat_factor, nil)
bin_overrides = c.fetch(:bin_overrides, {})
pbspro = Adapters::PBSPro::Batch.new(host: host, pbs_exec: pbs_exec, bin_overrides: bin_overrides)
pbspro = Adapters::PBSPro::Batch.new(host: host, submit_host: submit_host, pbs_exec: pbs_exec, bin_overrides: bin_overrides)
Adapters::PBSPro.new(pbspro: pbspro, qstat_factor: qstat_factor)
end
end
Expand All @@ -41,6 +43,12 @@ class Batch
# @return [String, nil] the batch server host
attr_reader :host

# The login node to submit the job via ssh
# @example
# my_batch.submit_host #=> "my_batch.server.edu"
# @return [String, nil] the login node
attr_reader :submit_host

# The path containing the PBS executables
# @example
# my_batch.pbs_exec.to_s #=> "/usr/local/pbspro/10.0.0
Expand All @@ -58,9 +66,11 @@ class Batch
class Error < StandardError; end

# @param host [#to_s, nil] the batch server host
# @param submit_host [#to_s, nil] the login node to ssh to
# @param exec [#to_s, nil] path to pbs executables
def initialize(host: nil, pbs_exec: nil, bin_overrides: {})
def initialize(host: nil, submit_host: "", pbs_exec: nil, bin_overrides: {})
@host = host && host.to_s
@submit_host = submit_host && submit_host.to_s
@pbs_exec = pbs_exec && Pathname.new(pbs_exec.to_s)
@bin_overrides = bin_overrides
end
Expand Down Expand Up @@ -159,6 +169,7 @@ def call(cmd, *args, env: {}, stdin: "", chdir: nil)
cmd = cmd.to_s
bindir = (!!pbs_exec) ? pbs_exec.join("bin").to_s : ''
cmd = OodCore::Job::Adapters::Helper.bin_path(cmd, bindir, bin_overrides)
cmd = OodCore::Job::Adapters::Helper.ssh_wrap(cmd, submit_host)
args = args.map(&:to_s)
env = env.to_h.each_with_object({}) { |(k, v), h| h[k.to_s] = v.to_s }
env["PBS_DEFAULT"] = host.to_s if host
Expand Down
4 changes: 3 additions & 1 deletion lib/ood_core/job/adapters/sge/batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def self.libdrmaa_path=(path)
class OodCore::Job::Adapters::Sge::Batch
using OodCore::Refinements::HashExtensions

attr_reader :bin, :bin_overrides, :conf, :cluster, :helper
attr_reader :bin, :bin_overrides, :conf, :cluster, :helper, :submit_host

require "ood_core/job/adapters/sge/qstat_xml_j_r_listener"
require "ood_core/job/adapters/sge/qstat_xml_r_listener"
Expand All @@ -36,6 +36,7 @@ def initialize(config)
@bin = Pathname.new(config.fetch(:bin, nil).to_s)
@sge_root = Pathname.new(config[:sge_root] || ENV['SGE_ROOT'] || "/var/lib/gridengine")
@bin_overrides = config.fetch(:bin_overrides, {})
@submit_host = config.fetch(:submit_host, "")

# FIXME: hack as this affects env of the process!
ENV['SGE_ROOT'] = @sge_root.to_s
Expand Down Expand Up @@ -166,6 +167,7 @@ def submit(content, args)
# Call a forked SGE command for a given batch server
def call(cmd, *args, env: {}, stdin: "", chdir: nil)
cmd = OodCore::Job::Adapters::Helper.bin_path(cmd, bin, bin_overrides)
cmd = OodCore::Job::Adapters::Helper.ssh_wrap(cmd, submit_host)
args = args.map(&:to_s)

env = env.to_h.each_with_object({}) { |(k, v), h| h[k.to_s] = v.to_s }
Expand Down
12 changes: 10 additions & 2 deletions lib/ood_core/job/adapters/slurm.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ def self.build_slurm(config)
conf = c.fetch(:conf, nil)
bin = c.fetch(:bin, nil)
bin_overrides = c.fetch(:bin_overrides, {})
slurm = Adapters::Slurm::Batch.new(cluster: cluster, conf: conf, bin: bin, bin_overrides: bin_overrides)
submit_host = c.fetch(:submit_host, "")
slurm = Adapters::Slurm::Batch.new(cluster: cluster, conf: conf, bin: bin, bin_overrides: bin_overrides, submit_host: submit_host)
Adapters::Slurm.new(slurm: slurm)
end
end
Expand Down Expand Up @@ -62,18 +63,24 @@ class Batch
# @return Hash<String, String>
attr_reader :bin_overrides

# The login node where the job is submitted via ssh
# @example owens.osc.edu
# @return [String] The login node
attr_reader :submit_host

# The root exception class that all Slurm-specific exceptions inherit
# from
class Error < StandardError; end

# @param cluster [#to_s, nil] the cluster name
# @param conf [#to_s, nil] path to the slurm conf
# @param bin [#to_s] path to slurm installation binaries
def initialize(cluster: nil, bin: nil, conf: nil, bin_overrides: {})
def initialize(cluster: nil, bin: nil, conf: nil, bin_overrides: {}, submit_host: "")
@cluster = cluster && cluster.to_s
@conf = conf && Pathname.new(conf.to_s)
@bin = Pathname.new(bin.to_s)
@bin_overrides = bin_overrides
@submit_host = submit_host
end

# Get a list of hashes detailing each of the jobs on the batch server
Expand Down Expand Up @@ -275,6 +282,7 @@ def advance_past_squeue_header!(squeue_output)
# Call a forked Slurm command for a given cluster
def call(cmd, *args, env: {}, stdin: "")
cmd = OodCore::Job::Adapters::Helper.bin_path(cmd, bin, bin_overrides)
cmd = OodCore::Job::Adapters::Helper.ssh_wrap(cmd, submit_host)
args = args.map(&:to_s)
args += ["-M", cluster] if cluster
env = env.to_h
Expand Down
4 changes: 3 additions & 1 deletion lib/ood_core/job/adapters/torque.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,18 @@ class Factory
# Build the Torque adapter from a configuration
# @param config [#to_h] the configuration for job adapter
# @option config [#to_s] :host The batch server host
# @option config [#to_s] :submit_host The login node to submit the job via ssh
# @option config [#to_s] :lib ('') Path to torque client libraries
# @option config [#to_s] :bin ('') Path to torque client binaries
# @option config [#to_h] :custom_bin ({}) Optional overrides to Torque client executables
def self.build_torque(config)
c = config.to_h.symbolize_keys
host = c.fetch(:host) { raise ArgumentError, "No host specified. Missing argument: host" }.to_s
submit_host = c.fetch(:submit_host, "").to_s
lib = c.fetch(:lib, "").to_s
bin = c.fetch(:bin, "").to_s
custom_bin = c.fetch(:custom_bin, {})
pbs = Adapters::Torque::Batch.new(host: host, lib: lib, bin: bin, custom_bin: custom_bin)
pbs = Adapters::Torque::Batch.new(host: host, submit_host: submit_host, lib: lib, bin: bin, custom_bin: custom_bin)
Adapters::Torque.new(pbs: pbs)
end
end
Expand Down
16 changes: 13 additions & 3 deletions lib/ood_core/job/adapters/torque/batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ class Batch
# @return [String] the batch server host
attr_reader :host

# The login node where job is submitted via ssh
# @example OSC's owens login node
# my_conn.submit_host #=> "owens.osc.edu"
# @return [String] the login node
attr_reader :submit_host

# The path to the Torque client installation libraries
# @example For Torque 5.0.0
# my_conn.lib.to_s #=> "/usr/local/Torque/5.0.0/lib"
Expand All @@ -32,10 +38,12 @@ class Batch
class Error < StandardError; end

# @param host [#to_s] the batch server host
# @param submit_host [#to_s] the login node
# @param lib [#to_s] path to FFI installation libraries
# @param bin [#to_s] path to FFI installation binaries
def initialize(host:, lib: "", bin: "", bin_overrides: {}, **_)
@host = host.to_s
def initialize(host:, submit_host: "", lib: "", bin: "", bin_overrides: {}, **_)
@host = host.to_s
@submit_host = submit_host.to_s
@lib = Pathname.new(lib.to_s)
@bin = Pathname.new(bin.to_s)
@bin_overrides = bin_overrides
Expand All @@ -44,7 +52,7 @@ def initialize(host:, lib: "", bin: "", bin_overrides: {}, **_)
# Convert object to hash
# @return [Hash] the hash describing this object
def to_h
{host: host, lib: lib, bin: bin}
{host: host, submit_host: submit_host, lib: lib, bin: bin}
end

# The comparison operator
Expand Down Expand Up @@ -448,6 +456,7 @@ def qsub_submit(script, queue, headers, resources, envvars)
"LD_LIBRARY_PATH" => "#{lib}:#{ENV['LD_LIBRARY_PATH']}"
}
cmd = OodCore::Job::Adapters::Helper.bin_path('qsub', bin, bin_overrides)
cmd = OodCore::Job::Adapters::Helper.ssh_wrap(cmd, submit_host)
o, e, s = Open3.capture3(env, cmd, *params)
raise Error, e unless s.success?
o.chomp
Expand All @@ -456,6 +465,7 @@ def qsub_submit(script, queue, headers, resources, envvars)
# Call a forked PBS command for a given host
def call(cmd, *args, env: {}, stdin: "", chdir: nil)
cmd = OodCore::Job::Adapters::Helper.bin_path(cmd, bin, bin_overrides)
cmd = OodCore::Job::Adapters::Helper.ssh_wrap(cmd, submit_host)
args = args.map(&:to_s)
env = env.to_h.each_with_object({}) {|(k,v), h| h[k.to_s] = v.to_s}.merge({
"PBS_DEFAULT" => host,
Expand Down
20 changes: 20 additions & 0 deletions spec/job/adapters/helper_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,24 @@
end
end
end

describe "#ssh_wrap" do
let(:cmd) {"sbatch"}

context "submit_host: empty" do
let(:submit_host) { "" }

it "returns the command" do
expect(helper.ssh_wrap(cmd, submit_host)).to eq("sbatch")
end
end

context "submit_host: owens.osc.edu" do
let(:submit_host) { "owens.osc.edu" }

it "returns the ssh wrapped command" do
expect(helper.ssh_wrap(cmd, submit_host)).to eq("ssh -t -o BatchMode=yes -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no owens.osc.edu \"sbatch\"")
end
end
end
end
26 changes: 26 additions & 0 deletions spec/job/adapters/lsf/batch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -256,4 +256,30 @@
end
end
end


describe "setting submit_host" do
let(:script) { OodCore::Job::Script.new(content: "echo 'hi'") }

context "when calling without submit_host" do
it "uses the correct command" do
batch = OodCore::Job::Adapters::Lsf::Batch.new
allow(Open3).to receive(:capture3).and_return(["job.123", "", double("success?" => true)])

batch.submit_string(str: script.content)
expect(Open3).to have_received(:capture3).with(anything, "bsub", any_args)
end
end

context "when calling with submit_host" do
it "uses ssh wrapper" do
batch = OodCore::Job::Adapters::Lsf::Batch.new(submit_host: 'owens.osc.edu')
allow(Open3).to receive(:capture3).and_return(["job.123", "", double("success?" => true)])

batch.submit_string(str: script.content)
expect(Open3).to have_received(:capture3).with(anything, "ssh -t -o BatchMode=yes -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no owens.osc.edu \"bsub\"", any_args)
end
end

end
end
24 changes: 24 additions & 0 deletions spec/job/adapters/pbspro_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,30 @@ def build_script(opts = {})
end
end

describe "setting submit_host" do
let(:script) { OodCore::Job::Script.new(content: "echo 'hi'") }

context "when calling withoug submit_host" do
it "uses the correct command" do
batch = OodCore::Job::Adapters::PBSPro::Batch.new(host: "owens.osc.edu", pbs_exec: nil, bin_overrides: {}, submit_host: "")
allow(Open3).to receive(:capture3).and_return(["job.123", "", double("success?" => true)])

OodCore::Job::Adapters::PBSPro.new(pbspro: batch, qstat_factor: nil).submit script
expect(Open3).to have_received(:capture3).with(anything, "qsub", any_args)
end
end

context "when calling with submit_host" do
it "uses ssh wrapper" do
batch = OodCore::Job::Adapters::PBSPro::Batch.new(host: "owens.osc.edu", pbs_exec: nil, bin_overrides: {}, submit_host: "owens.osc.edu")
allow(Open3).to receive(:capture3).and_return(["job.123", "", double("success?" => true)])

OodCore::Job::Adapters::PBSPro.new(pbspro: batch, qstat_factor: nil).submit script
expect(Open3).to have_received(:capture3).with(anything, "ssh -t -o BatchMode=yes -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no owens.osc.edu \"qsub\"", any_args)
end
end
end

describe "#directive_prefix" do
context "when called" do
it "does not raise an error" do
Expand Down
24 changes: 24 additions & 0 deletions spec/job/adapters/sge/batch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,28 @@ def load_resource_file(file_name)
end
end
end

describe "setting submit_host" do
let(:script) { OodCore::Job::Script.new(content: "echo 'hi'") }

context "when calling without submit_host" do
it "uses the correct command" do
batch = described_class.new(submit_host: "")
allow(Open3).to receive(:capture3).and_return(["Your job 123", "", double("success?" => true)])

OodCore::Job::Adapters::Sge.new(batch: batch).submit script
expect(Open3).to have_received(:capture3).with(anything, "qsub", any_args)
end
end

context "when calling with submit_host" do
it "uses ssh wrapper" do
batch = described_class.new(submit_host: "owens.osc.edu")
allow(Open3).to receive(:capture3).and_return(["Your job 123", "", double("success?" => true)])

OodCore::Job::Adapters::Sge.new(batch: batch).submit script
expect(Open3).to have_received(:capture3).with(anything, "ssh -t -o BatchMode=yes -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no owens.osc.edu \"qsub\"", any_args)
end
end
end
end
Loading