Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial prototype for scheduler parsing #3647

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions aiida/engine/processes/calcjobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,18 +217,15 @@ def task_retrieve_job(node, transport_queue, retrieved_temporary_folder, cancell
max_attempts = TRANSPORT_TASK_MAXIMUM_ATTEMTPS

authinfo = node.computer.get_authinfo(node.user)

scheduler = node.computer.get_scheduler()
@coroutine
def do_retrieve():
with transport_queue.request_transport(authinfo) as request:
transport = yield cancellable.with_interrupt(request)

scheduler.set_transport(transport)
# Perform the job accounting and set it on the node if successful. If the scheduler does not implement this
# still set the attribute but set it to `None`. This way we can distinguish calculation jobs for which the
# accounting was called but could not be set.
scheduler = node.computer.get_scheduler()
scheduler.set_transport(transport)

try:
detailed_job_info = scheduler.get_detailed_job_info(node.get_job_id())
except FeatureNotAvailable:
Expand All @@ -251,6 +248,16 @@ def do_retrieve():
else:
node.set_state(CalcJobState.PARSING)
logger.info('retrieving CalcJob<{}> successful'.format(node.pk))
# Parse the detailed job info, possibly also stdout/stderrfiles,
# dependent on scheduler functionality and return an exit status and a message
detailed_job_info = node.get_detailed_job_info()
if detailed_job_info is not None:
detailed_job_info = detailed_job_info['stdout']
scheduler_stdout = node.get_scheduler_stdout()
scheduler_stderr = node.get_scheduler_stderr()
exit_code = scheduler.parse(detailed_job_info, scheduler_stdout, scheduler_stdout)
node.set_exit_status(exit_code[0])
node.set_exit_message(exit_code[1]['exit_message'])
raise Return(result)


Expand Down
73 changes: 72 additions & 1 deletion aiida/schedulers/plugins/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from aiida.common.escaping import escape_for_bash
from aiida.schedulers import Scheduler, SchedulerError
from aiida.schedulers.datastructures import (JobInfo, JobState, NodeNumberJobResource)
from aiida.schedulers.scheduler import SchedulerExitCodes

# This maps SLURM state codes to our own status list

Expand Down Expand Up @@ -159,7 +160,7 @@ class SlurmScheduler(Scheduler):
'AllocCPUS', 'Account', 'AssocID', 'AveCPU', 'AvePages', 'AveRSS', 'AveVMSize', 'Cluster', 'Comment', 'CPUTime',
'CPUTimeRAW', 'DerivedExitCode', 'Elapsed', 'Eligible', 'End', 'ExitCode', 'GID', 'Group', 'JobID', 'JobName',
'MaxRSS', 'MaxRSSNode', 'MaxRSSTask', 'MaxVMSize', 'MaxVMSizeNode', 'MaxVMSizeTask', 'MinCPU', 'MinCPUNode',
'MinCPUTask', 'NCPUS', 'NNodes', 'NodeList', 'NTasks', 'Priority', 'Partition', 'QOSRAW', 'ReqCPUS', 'Reserved',
'MinCPUTask', 'NCPUS', 'NNodes', 'NodeList', 'NTasks', 'Priority', 'Partition', 'QOSRAW', 'Reason', 'ReqCPUS', 'Reserved',
'ResvCPU', 'ResvCPURAW', 'Start', 'State', 'Submit', 'Suspended', 'SystemCPU', 'Timelimit', 'TotalCPU', 'UID',
'User', 'UserCPU'
]
Expand Down Expand Up @@ -236,6 +237,76 @@ def _get_detailed_job_info_command(self, job_id):
fields = ','.join(self._detailed_job_info_fields)
return 'sacct --format={} --parsable --jobs={}'.format(fields, job_id)

def parse(self, detailed_job_info, scheduler_stdout, scheduler_stderr):
"""
Parse information that is related to scheduler. First, we parse the
detailed job info in order to get access to the state and reason which is
further analyzed to set the exit status and message.

:param detailed_job_info: the string from sacct associated with `get_detailed_job_info`
:param scheduler_stdout: the Slurm scheduler stdout file as a string
:param scheduler_stderr: the Slurm scheduler stderr file as a string
"""
# Check the detailed job information and parse that
if detailed_job_info is not None:
exit_code_info = self._parse_detailed_job_info(detailed_job_info)
exit_code_file = self._parse_scheduler_files(scheduler_stdout, scheduler_stderr)
# For now we check which has the highest exit_status, assuming the higher the number, the
espenfl marked this conversation as resolved.
Show resolved Hide resolved
# more serious the error. Over time we need to establish a feel for how to prioritize these
# errors.
if exit_code_file[0].value > exit_code_info[0].value:
return exit_code_file
return exit_code_info

def _parse_detailed_job_info(self, detailed_job_info):
"""
Parse the detailed job info and extract the state and reason.

Set any detected exit status and message.

:param detailed_job_info: the string returned from sacct
"""

# First entry contains the formatting tags, so we skip this
details = detailed_job_info.split('\n')[1:]
# When using --parsable with saact we get a string which is divided by |,
# however, on multinode runs we get multiple entries, typically:
# jobid - contains information about the job for all the nodes
# jobid.batch - contains information from the master node
# jobid.0 - contains information from the master cpu
# We only fetch information from the first entry at this point
if details:
info = details[0].split('|')
else:
raise ValueError('The supplied job id for the saact command does not return '
'any valid entries.')

# Compose detailed job info dictionary
detailed_info = dict(zip(self._detailed_job_info_fields, info))
# exit_status = int(detailed_info['ExitCode'].split(':')[0])
# reason = detailed_info['Reason']
state = detailed_info['State']
exit_code = (SchedulerExitCodes.NO_ERROR, {'exit_message': '',
'logger_message': ''})
if 'OUT_OF_MEMORY' in state:
exit_code = (SchedulerExitCodes.OUT_OF_MEMORY,
{'exit_message': 'Scheduler reported out of memory.',
'logger_message': 'Your job used more memory than you specified, or was available on your '
'hardware configuration.'})
return exit_code

def _parse_scheduler_files(self, scheduler_stdout, scheduler_stderr):
"""
Parse the scheduler stdout and stderr files.

Set any detected exit status and message.

:param scheduler_stdout: the Slurm scheduler stdout file as a string
:param scheduler_stderr: the Slurm scheduler stderr file as a string
"""
return (SchedulerExitCodes.NO_ERROR, {'exit_message': '',
'logger_message': ''})

def _get_submit_script_header(self, job_tmpl):
"""
Return the submit script header, using the parameters from the
Expand Down
27 changes: 27 additions & 0 deletions aiida/schedulers/plugins/test_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import datetime

from aiida.schedulers.plugins.slurm import SlurmScheduler, JobState
from aiida.backends.testbase import AiidaTestCase

# pylint: disable=line-too-long
# job_id, state_raw, annotation, executing_host, username, number_nodes, number_cpus, allocated_machines, partition, time_limit, time_used, dispatch_time, job_name, submission_time
Expand All @@ -27,13 +28,39 @@
863553^^^R^^^None^^^rosa1^^^user5^^^1^^^32^^^nid00471^^^normal^^^30:00^^^29:29^^^2013-05-23T11:44:11^^^bash^^^2013-05-23T10:42:11
863554^^^R^^^None^^^rosa1^^^user5^^^1^^^32^^^nid00471^^^normal^^^NOT_SET^^^29:29^^^2013-05-23T11:44:11^^^bash^^^2013-05-23T10:42:11
"""
DICT_DETAILED_JOB_INFO = {
'retval':
0,
'stderr':
'',
'stdout':
'AllocCPUS|Account|AssocID|AveCPU|AvePages|AveRSS|AveVMSize|Cluster|Comment|CPUTime|CPUTimeRAW|DerivedExitCode|Elapsed|Eligible|End|ExitCode|GID|Group|JobID|JobName|MaxRSS|MaxRSSNode|MaxRSSTask|MaxVMSize|MaxVMSizeNode|MaxVMSizeTask|MinCPU|MinCPUNode|MinCPUTask|NCPUS|NNodes|NodeList|NTasks|Priority|Partition|QOSRAW|Reason|ReqCPUS|Reserved|ResvCPU|ResvCPURAW|Start|State|Submit|Suspended|SystemCPU|Timelimit|TotalCPU|UID|User|UserCPU|\n1|nn9995k|350|||||saga||00:00:07|7|0:125|00:00:07|2019-12-12T14:56:37|2019-12-12T14:56:59|0:125|50475|espenfl|161524|aiida-717||||||||||1|1|c5-44||20000|normal|21|None|1|00:00:15|00:00:15|15|2019-12-12T14:56:52|OUT_OF_MEMORY|2019-12-12T14:56:37|00:00:00|00:00.744|00:17:00|00:01.300|50475|espenfl|00:00.556|\n1|nn9995k|350|00:00:00|0|0|275992K|saga||00:00:07|7||00:00:07|2019-12-12T14:56:52|2019-12-12T14:56:59|0:125|||161524.batch|batch|0|c5-44|0|275992K|c5-44|0|00:00:00|c5-44|0|1|1|c5-44|1|||||1||||2019-12-12T14:56:52|OUT_OF_MEMORY|2019-12-12T14:56:52|00:00:00|00:00.321||00:00.606|||00:00.284|\n1|nn9995k|350|00:00:00|0|0|107952K|saga||00:00:07|7||00:00:07|2019-12-12T14:56:52|2019-12-12T14:56:59|0:0|||161524.extern|extern|0|c5-44|0|107952K|c5-44|0|00:00:00|c5-44|0|1|1|c5-44|1|||||1||||2019-12-12T14:56:52|COMPLETED|2019-12-12T14:56:52|00:00:00|00:00:00||00:00:00|||00:00:00|\n1|nn9995k|350|00:00:00|0|0|411308K|saga||00:00:03|3||00:00:03|2019-12-12T14:56:56|2019-12-12T14:56:59|0:125|||161524.0|pmi_proxy|0|c5-44|0|411308K|c5-44|0|00:00:00|c5-44|0|1|1|c5-44|1|||||1||||2019-12-12T14:56:56|OUT_OF_MEMORY|2019-12-12T14:56:56|00:00:00|00:00.422||00:00.693|||00:00.271|\n'
}
JOBS_ON_CLUSTER = 8
JOBS_HELD = 2
JOBS_QUEUED = 2
USERS_RUNNING = ['user5', 'user6']
JOBS_RUNNING = ['862538', '861352', '863553', '863554']


class TestParserJobInfoAndFiles(AiidaTestCase):
"""
Tests to verify that the parsing of the detailed job info and parsing
of the scheduler standard output and error files works.
"""

def test_detailed_job_info_parsing(self):
"""
Test parsing of the detailed job info and that it returns
the correct exit code instance
"""
detailed_job_info = DICT_DETAILED_JOB_INFO['stdout']
scheduler = SlurmScheduler()
exit_code = scheduler.parse(detailed_job_info, '', '')
self.assertEqual(exit_code[0].value, 101)
self.assertEqual(exit_code[1]['exit_message'], 'Scheduler reported out of memory.')


class TestParserSqueue(unittest.TestCase):
"""
Tests to verify if teh function _parse_joblist_output behave correctly
Expand Down
19 changes: 19 additions & 0 deletions aiida/schedulers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
###########################################################################
"""Implementation of `Scheduler` base class."""
from abc import abstractmethod
from enum import Enum

import aiida.common
from aiida.common.lang import classproperty
Expand All @@ -27,6 +28,12 @@ class SchedulerParsingError(SchedulerError):
pass


# Enumerator to host the exit code
class SchedulerExitCodes(Enum):
NO_ERROR = 0
OUT_OF_MEMORY = 101


class Scheduler:
"""
Base class for all schedulers.
Expand Down Expand Up @@ -286,6 +293,18 @@ def _get_detailed_job_info_command(self, job_id):
# pylint: disable=no-self-use,not-callable,unused-argument
raise FeatureNotAvailable('Cannot get detailed job info')

def parse(self, detailed_job_info, scheduler_stdout, scheduler_stderr): # pylint: disable=no-self-use, unused-argument
"""
Handles scheduler related parsing. Details need to be implemented
on the scheduler level. In case a parse function is not present for each
respective scheduler, we set the exit status and message to zero and empty.

:param detailed_job_info: any string containing relevant detailed job information to be parsed
:param scheduler_stdout: the scheduler stdout file as a string
:param scheduler_stderr: the scheduler stderr file as a string
"""
return (0, {'exit_message': '', 'logger_message': ''})

def get_detailed_job_info(self, job_id):
"""Return the detailed job info.

Expand Down