Skip to content

Commit

Permalink
Remove support for Slurm heterogeneous jobs.
Browse files Browse the repository at this point in the history
This was previously committed in the wrong order.

This reverts the revert of that previous commit.
  • Loading branch information
christopherwharrop-noaa committed Apr 16, 2019
1 parent 3ea7403 commit 6a7e0e5
Showing 1 changed file with 54 additions and 124 deletions.
178 changes: 54 additions & 124 deletions lib/workflowmgr/slurmbatchsystem.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ def initialize(slurm_root=nil)
# Assume the scheduler is up
@schedup=true

# Set heterogeneous job support to nil (it will be set once in submit)
@heterogeneous_job_support

end


Expand Down Expand Up @@ -120,136 +117,81 @@ def status(jobid)
#####################################################
def submit(task)

# Check if heterogeneous jobs are supported
if @heterogeneous_job_support.nil?

# Get version of sbatch being used
version,errors,exit_status=WorkflowMgr.run4("sbatch --version",30)

# Raise SchedulerDown if the command failed
raise WorkflowMgr::SchedulerDown,errors unless exit_status==0

# Get first four digits of version as an integer
@version = version.gsub(/[slurm.\s]/,"")[0..3].to_i

# Check for heterogeneous job support
@heterogeneous_job_support = false
if @version >= 1808
@heterogeneous_job_support = true
end

end

# Initialize the submit command
cmd="sbatch"
input="#! /bin/sh\n"

per_pack_group_input=""
pack_group_nodes=Array.new

# Add Slurm batch system options translated from the generic options specification
task.attributes.each do |option,value|
if value.is_a?(String)
if value.is_a?(String)
if value.empty?
WorkflowMgr.stderr("WARNING: <#{option}> has empty content and is ignored", 1)
next
end
end
case option
when :account
per_pack_group_input += "#SBATCH --account #{value}\n"
input += "#SBATCH --account #{value}\n"
when :queue
per_pack_group_input += "#SBATCH --qos #{value}\n"
input += "#SBATCH --qos #{value}\n"
when :partition
per_pack_group_input += "#SBATCH --partition #{value.gsub(":",",")}\n"
input += "#SBATCH --partition #{value.gsub(":",",")}\n"
when :cores
# Ignore this attribute if the "nodes" attribute is present
next unless task.attributes[:nodes].nil?
if @heterogeneous_job_support
pack_group_nodes << "#SBATCH --ntasks=#{value}\n"
else
pack_group_nodes = ["#SBATCH --ntasks=#{value}\n"]
end
input += "#SBATCH --ntasks=#{value}\n"
when :nodes
# Make sure exclusive access to nodes is enforced
# per_pack_group_input += "#SBATCH --exclusive\n"

if @heterogeneous_job_support

first_spec = true
nodespecs=value.split("+")
nodespecs.each { |nodespec|
resources=nodespec.split(":")
nnodes=resources.shift.to_i
ppn=0
resources.each { |resource|
case resource
when /ppn=(\d+)/
ppn=$1.to_i
when /tpp=(\d+)/
tpp=$1.to_i
end
}

# Request for this resource
pack_group_nodes << "#SBATCH --ntasks=#{nnodes*ppn} --tasks-per-node=#{ppn}\n"

first_spec = false
# Rocoto does not currently support Slurm heterogeneous jobs. However, it does
# support requests for non-uniform processor geometries. To accomplish this,
# Rocoto will use sbatch to submit a job with the smallest uniform resource
# request that can accommodate the non-uniform request. It is up to the user to
# use the appropriate tools to manipulate the host file and use the appropriate
# MPI launcher command to specify the desired processor layout for the executable
# in the job script.

# Get the total nodes and max ppn requested
maxppn=1
nnodes=0
nodespecs=value.split("+")
nodespecs.each { |nodespec|
resources=nodespec.split(":")
nnodes+=resources.shift.to_i
ppn=0
resources.each { |resource|
case resource
when /ppn=(\d+)/
ppn=$1.to_i
when /tpp=(\d+)/
tpp=$1.to_i
end
}

else

# This version of SLURM (< version 18.08) does not support submission of jobs
# (via sbatch) with non-uniform processor geometries. SLURM refers to these as
# "heterogenous jobs". To work around this, we will use sbatch to submit a job
# with the smallest uniform resource request that can accommodate the
# heterogeneous request. It is up to the user to use the appropriate host file
# manipulation and/or MPI launcher command to specify the desired processor layout
# for the executable in the job script.

# Get the total nodes and max ppn requested
maxppn=1
nnodes=0
nodespecs=value.split("+")
nodespecs.each { |nodespec|
resources=nodespec.split(":")
nnodes+=resources.shift.to_i
ppn=0
resources.each { |resource|
case resource
when /ppn=(\d+)/
ppn=$1.to_i
when /tpp=(\d+)/
tpp=$1.to_i
end
}
maxppn=ppn if ppn > maxppn
}

# Request total number of nodes
node_input = "#SBATCH --nodes=#{nnodes}-#{nnodes}\n"

# Request max tasks per node
node_input += "#SBATCH --tasks-per-node=#{maxppn}\n"

pack_group_nodes = [ node_input ] # ensure only one "pack group"

# Print a warning if multiple nodespecs are specified
if nodespecs.size > 1
WorkflowMgr.stderr("WARNING: SLURM < 18.08 does not support requests for non-unifortm task geometries",1)
WorkflowMgr.stderr("WARNING: during batch job submission You must use the -m option of the srun command",1)
WorkflowMgr.stderr("WARNING: in your script to launch your code with an arbitrary distribution of tasks",1)
WorkflowMgr.stderr("WARNING: Please see https://slurm.schedmd.com/faq.html#arbitrary for details",1)
WorkflowMgr.stderr("WARNING: Rocoto has automatically converted '#{value}' to '#{nnodes}:ppn=#{maxppn}'",1)
WorkflowMgr.stderr("WARNING: to facilitate the desired arbitrary task distribution. Use",1)
WorkflowMgr.stderr("WARNING: <nodes>#{nnodes}:ppn=#{maxppn}</nodes> in your workflow to eliminate this warning message.",1)
end

maxppn=ppn if ppn > maxppn
}

# Request total number of nodes
input += "#SBATCH --nodes=#{nnodes}-#{nnodes}\n"

# Request max tasks per node
input += "#SBATCH --tasks-per-node=#{maxppn}\n"

# Print a warning if multiple nodespecs are specified
if nodespecs.size > 1
WorkflowMgr.stderr("WARNING: Rocoto does not support Slurm's heterogeneous job feature. However, Rocoto", 1)
WorkflowMgr.stderr("WARNING: does support Slurm requests for non-unifortm task geometries. It does this by", 1)
WorkflowMgr.stderr("WARNING: converting non-uniform requests into the smallest uniform task geometry", 1)
WorkflowMgr.stderr("WARNING: request that can accommodate the non-uniform one during batch job submission.", 1)
WorkflowMgr.stderr("WARNING: It is up to the user to use the -m option of the srun command, or other", 1)
WorkflowMgr.stderr("WARNING: appropriate tools, in the job script to launch executables with an arbitrary", 1)
WorkflowMgr.stderr("WARNING: distribution of tasks.", 1)
WorkflowMgr.stderr("WARNING: Please see https://slurm.schedmd.com/faq.html#arbitrary for details", 1)
WorkflowMgr.stderr("WARNING: Rocoto has automatically converted '#{value}' to '#{nnodes}:ppn=#{maxppn}' for this job", 1)
WorkflowMgr.stderr("WARNING: to facilitate the desired arbitrary task distribution. Use", 1)
WorkflowMgr.stderr("WARNING: <nodes>#{nnodes}:ppn=#{maxppn}</nodes> in your workflow to eliminate this warning message.", 1)
end

when :walltime
# Make sure format is dd-hh:mm:ss if days are included
per_pack_group_input += "#SBATCH -t #{value.sub(/^(\d+):(\d+:\d+:\d+)$/,'\1-\2')}\n"
input += "#SBATCH -t #{value.sub(/^(\d+):(\d+:\d+:\d+)$/,'\1-\2')}\n"
when :memory
m=/^([\.\d]+)([\D]*)$/.match(value)
amount=m[1].to_f
Expand All @@ -267,7 +209,7 @@ def submit(task)
amount=(amount / 1024.0 / 1024.0).ceil
end
if amount > 0
per_pack_group_input += "#SBATCH --mem=#{amount}\n"
input += "#SBATCH --mem=#{amount}\n"
end
when :stdout
input += "#SBATCH -o #{value}\n"
Expand All @@ -281,18 +223,7 @@ def submit(task)
end

task.each_native do |value|
per_pack_group_input += "#SBATCH #{value}\n"
end

first=true
pack_group_nodes.each do |this_group_nodes|
if first
first=false
else
input += "\n#SBATCH packjob\n\n"
end
input += per_pack_group_input
input += this_group_nodes
input += "#SBATCH #{value}\n"
end

# Add export commands to pass environment vars to the job
Expand All @@ -303,7 +234,6 @@ def submit(task)
}
input += varinput
end
input+="set -x\n"

# Add the command to execute
input += task.attributes[:command]
Expand All @@ -313,7 +243,7 @@ def submit(task)
tf.write(input)
tf.flush()

WorkflowMgr.stderr("Submitting #{task.attributes[:name]} using #{cmd} < #{tf.path} with input {{#{input}}}",4)
WorkflowMgr.stderr("Submitting #{task.attributes[:name]} using #{cmd} < #{tf.path} with input\n{{\n#{input}\n}}",4)

# Run the submit command
output=`#{cmd} < #{tf.path} 2>&1`.chomp()
Expand Down

0 comments on commit 6a7e0e5

Please sign in to comment.