Skip to content

Commit

Permalink
Performance enhancements for PBSPro support (#95)
Browse files Browse the repository at this point in the history
* Remove "pbsnodes -a" query that calculated a default node size

Hardcode default node size to 128 and remove warning of missing <nodesize>
attribute.

* Deprecate PBSPro support for <cores> and <nodesize>

Remove calls to set default nodesize for PBSPro.
Remove unused code in PBSPro support.
Fail job submission if <cores> is used but <nodesize> is missing.

* Fix incorrect arguments and error messages

* Increment version and update release notes
  • Loading branch information
christopherwharrop-noaa authored Oct 13, 2022
1 parent ee41565 commit 5233399
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 172 deletions.
7 changes: 7 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Release Notes

## New for Version 1.3.5

* Remove automatic detection of default nodesize from PBSPro support.
* Deprecate use of <nodesize> tag.
* Deprecate support for <cores> tag for PBSPro.
* Remove unused/dead code from PBSPro support.

## New for Version 1.3.4

* Fix INSTALL script incompatibilities with Bourne shell.
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.3.4
1.3.5
188 changes: 17 additions & 171 deletions lib/workflowmgr/pbsprobatchsystem.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,83 +28,14 @@ class PBSPROBatchSystem < BatchSystem
def initialize(pbspro_root=nil,config)

# Get timeouts from the configuration
@qstat_timeout=config.JobQueueTimeout
@qstat_x_timeout=config.JobAcctTimeout

# Initialize an empty hash for job queue records
@jobqueue={}

# Initialize an empty hash for job completion records
@jobacct={}

# Hours to look back for finished jobs.
@hrsback=1

# Assume the scheduler is up
@schedup=true

# Set default node size to 0
# If a user does not specify one, it will be determined from pbsnodes
@default_node_size=nil

# Try to get a default node size from pbsnodes -a
begin
pbsnodes,errors,exit_status=WorkflowMgr.run4("pbsnodes -a | grep resources_available.ncpus | sort | uniq -c",30)

# Raise SchedulerDown if the pbsnodes failed
raise WorkflowMgr::SchedulerDown,errors unless exit_status==0

rescue Timeout::Error,WorkflowMgr::SchedulerDown
WorkflowMgr.log("#{$!}")
WorkflowMgr.stderr("#{$!}",3)
@schedup=false
raise WorkflowMgr::SchedulerDown
end

if pbsnodes =~ /\s*\d+\s+resources_available.ncpus = (\d+)/
@default_node_size = $1.to_i
end

end


#####################################################
#
# status
#
#####################################################
def status(jobid)

begin

raise WorkflowMgr::SchedulerDown unless @schedup

# Populate the jobs status table if it is empty
refresh_jobqueue if @jobqueue.empty?

# Return the jobqueue record if there is one
return @jobqueue[jobid] if @jobqueue.has_key?(jobid)

# Populate the job accounting log table if it is empty
refresh_jobacct(nil) if @jobacct.empty?

# Return the jobacct record if there is one
return @jobacct[jobid] if @jobacct.has_key?(jobid)

# Try to find the job individually
refresh_jobacct(jobid)

# Return the jobacct record if there is one
return @jobacct[jobid] if @jobacct.has_key?(jobid)

# We didn't find the job, so return an uknown status record
return { :jobid => jobid, :state => "UNKNOWN", :native_state => "Unknown" }

rescue WorkflowMgr::SchedulerDown
@schedup=false
return { :jobid => jobid, :state => "UNAVAILABLE", :native_state => "Unavailable" }
end

end


Expand Down Expand Up @@ -157,18 +88,8 @@ def submit(task)
cmd="qsub"
input="#! /bin/sh\n"

# Get the node size
nodesize = @default_node_size
if task.attributes[:nodesize].nil?
WorkflowMgr.stderr("WARNING: <nodesize> attribute not set, using default node size of #{@default_node_size} cores.",1)
WorkflowMgr.log("WARNING: <nodesize> attribute not set, using default node size of #{@default_node_size} cores.")
else
nodesize = task.attributes[:nodesize]
end

# Add Pbspro batch system options translated from the generic options specification
task.attributes.each do |option,value|

if value.is_a?(String)
if value.empty?
WorkflowMgr.stderr("WARNING: <#{option}> has empty content and is ignored", 1)
Expand All @@ -182,11 +103,26 @@ def submit(task)
input += "#PBS -q #{value}\n"
when :partition
WorkflowMgr.stderr("WARNING: the <partition> tag is not supported for PBSPro.", 1)
WorkflowMgr.log("WARNING: the <partition> tag is not supported for PBSPro.", 1)
WorkflowMgr.log("WARNING: the <partition> tag is not supported for PBSPro.")
when :nodesize
WorkflowMgr.stderr("WARNING: <nodesize> support is deprecated, please use <nodes> to specify the requested resources", 1)
WorkflowMgr.log("WARNING: <nodesize> support is deprecated, please use <nodes> to specify the requested resources")
when :cores
# Ignore this attribute if the "nodes" attribute is present
next unless task.attributes[:nodes].nil?

# Print deprecation warning
WorkflowMgr.stderr("WARNING: <cores> support is deprecated for PBSPro, please use <nodes> to specify the requested resources", 1)
WorkflowMgr.log("WARNING: <cores> support is deprecated for PBSPro, please use <nodes> to specify the requested resources")

# Get the node size
nodesize = task.attributes[:nodesize]
if nodesize.nil?
WorkflowMgr.stderr("FATAL ERROR: task `#{task.attributes[:name]}` cannot be submitted due to missing <nodesize> information",0)
WorkflowMgr.log("FATAL ERROR: task `#{task.attributes[:name]}` cannot be submitted due to missing <nodesize> information")
return nil, "FATAL ERROR: task `#{task.attributes[:name]}` cannot be submitted due to missing <nodesize> information"
end

# Calculate the number of full nodes required
nchunks = value / nodesize

Expand Down Expand Up @@ -306,87 +242,6 @@ def delete(jobid)

private

#####################################################
#
# refresh_jobqueue
#
#####################################################
def refresh_jobqueue

begin

# Get the username of this process
username=Etc.getpwuid(Process.uid).name

# Run qstat to obtain the current status of queued jobs
qstat=""
errors=""
exit_status=0

# Get the list of jobs queued or running for this user
qstat,errors,exit_status=WorkflowMgr.run4("qstat -u #{username} -w",@qstat_timeout)

# Raise SchedulerDown if the qstat failed
raise WorkflowMgr::SchedulerDown,errors unless exit_status==0

# Return if the qstat output is empty
return if qstat.empty?

rescue Timeout::Error,WorkflowMgr::SchedulerDown
WorkflowMgr.log("#{$!}")
WorkflowMgr.stderr("#{$!}",3)
raise WorkflowMgr::SchedulerDown
end

# Initialize an empty job record
record={}

# For each line, find the various attributes and create job records
qstat.each_line { |line|

# Remove leading and trailing white space
line.strip!

# Skip lines that don't start with a jobid number
next unless line=~/^\d+/

# Split the line into fields
fields = line.split(/\s+/)

# Extract the jobid
if fields[0] =~ /^(\d+)\..*/
record[:jobid] = $1
end

# Extract the user
record[:user] = username

# Extract the native state
record[:native_state] = fields[9]

# Compute the state
case record[:native_state]
when /^Q$/,/^H$/,/^W$/,/^S$/,/^T$/,/^M$/
record[:state] = "QUEUED"
when /^B$/,/^R$/,/^E$/
record[:state]="RUNNING"
else
record[:state]="UNKNOWN"
end

# Extract jobname
record[:jobname] = fields[3]

# Extract queue
record[:queue] = fields[2]

record[:cores] = fields[6]

@jobqueue[record[:jobid]] = record

} # qstat.each

end # refresh_job_queue

#####################################################
#
Expand All @@ -406,16 +261,7 @@ def refresh_jobacct(jobids)
errors=""
exit_status=0

if jobids.nil?

# Get the list of jobs that have completed within the last hour for this user
joblist,errors,exit_status=WorkflowMgr.run4("qselect -H -u $USER -te.gt.#{(Time.now - 3600).strftime("%Y%m%d%H%M")}")

# Raise SchedulerDown if the qselect failed
raise WorkflowMgr::SchedulerDown,errors unless exit_status==0
joblist = joblist.split.join(" ")

else
unless jobids.nil?
joblist = jobids.join(" ")
end

Expand Down

0 comments on commit 5233399

Please sign in to comment.