Skip to content

Commit

Permalink
Removed node dependencies from the scheduler parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
Espen Flage-Larsen committed Dec 12, 2019
1 parent b639de4 commit b7a754e
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 47 deletions.
2 changes: 0 additions & 2 deletions aiida/engine/processes/calcjobs/calcjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,6 @@ def define(cls, spec):
spec.output(cls.link_label_retrieved, valid_type=orm.FolderData, pass_to_parser=True,
help='Files that are retrieved by the daemon will be stored in this node. By default the stdout and stderr '
'of the scheduler will be added, but one can add more by specifying them in `CalcInfo.retrieve_list`.')
spec.exit_code(0, 'NO_ERROR', message='no errors')
spec.exit_code(50, 'OUT_OF_MEMORY', message='scheduler reports out of memory')

@classproperty
def spec_options(cls): # pylint: disable=no-self-argument
Expand Down
10 changes: 8 additions & 2 deletions aiida/engine/processes/calcjobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,14 @@ def do_retrieve():
logger.info('retrieving CalcJob<{}> successful'.format(node.pk))
# Parse the detailed job info, possibly also stdout/stderrfiles,
# dependent on scheduler functionality and return an exit status and a message
exit_code = scheduler.parse(node)
node.set_exit_code(exit_code)
detailed_job_info = node.get_detailed_job_info()
if detailed_job_info is not None:
detailed_job_info = detailed_job_info['stdout']
scheduler_stdout = node.get_scheduler_stdout()
scheduler_stderr = node.get_scheduler_stderr()
exit_code = scheduler.parse(detailed_job_info, scheduler_stdout, scheduler_stdout)
node.set_exit_status(exit_code[0])
node.set_exit_message(exit_code[1]['exit_message'])
raise Return(result)


Expand Down
13 changes: 0 additions & 13 deletions aiida/orm/nodes/process/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,19 +296,6 @@ def exit_status(self):
"""
return self.get_attribute(self.EXIT_STATUS_KEY, None)

def set_exit_code(self, exit_code):
"""
Set the exit status and message via the exit code framework.
:param exit_code: a valid tuple set from an ExitCode instance
"""

if isinstance(exit_code, tuple):
self.set_exit_status(exit_code[0])
self.set_exit_message(exit_code[1])
else:
raise ValueError('exit code is not a tuple set with ExitCode')

def set_exit_status(self, status):
"""
Set the exit status of the process
Expand Down
46 changes: 29 additions & 17 deletions aiida/schedulers/plugins/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from aiida.common.escaping import escape_for_bash
from aiida.schedulers import Scheduler, SchedulerError
from aiida.schedulers.datastructures import (JobInfo, JobState, NodeNumberJobResource)
from aiida.schedulers.scheduler import SchedulerExitCodes

# This maps SLURM state codes to our own status list

Expand Down Expand Up @@ -236,33 +237,36 @@ def _get_detailed_job_info_command(self, job_id):
fields = ','.join(self._detailed_job_info_fields)
return 'sacct --format={} --parsable --jobs={}'.format(fields, job_id)

def parse(self, node):
def parse(self, detailed_job_info, scheduler_stdout, scheduler_stderr):
"""
Parse information that is related to scheduler. First, we parse the
detailed job info in order to get access to the state and reason which is
further analyzed to set the exit status and message.
:param detailed_job_info: the string from sacct associated with `get_detailed_job_info`
:param scheduler_stdout: the Slurm scheduler stdout file as a string
:param scheduler_stderr: the Slurm scheduler stderr file as a string
"""
# Check the detailed job information and parse that
detailed_job_info = node.get_detailed_job_info()
exit_codes = node.process_class.exit_codes
if detailed_job_info is not None:
exit_code_info = self._parse_detailed_job_info(detailed_job_info['stdout'], exit_codes)
# Locate scheduler files and parse those
scheduler_stdout = node.get_scheduler_stdout()
scheduler_stderr = node.get_scheduler_stderr()
exit_code_file = self._parse_scheduler_files(scheduler_stdout, scheduler_stderr, exit_codes)
exit_code_info = self._parse_detailed_job_info(detailed_job_info)
exit_code_file = self._parse_scheduler_files(scheduler_stdout, scheduler_stderr)
# For now we check which has the highest exit_status, assuming the higher the number, the
# more serious the error
if exit_code_file[0] > exit_code_info[0]:
# more serious the error. Over time we need to establish a feel for how to prioritize these
# errors.
if exit_code_file[0].value > exit_code_info[0].value:
return exit_code_file
return exit_code_info

def _parse_detailed_job_info(self, detailed_job_info, exit_codes):
def _parse_detailed_job_info(self, detailed_job_info):
"""
Parse the detailed job info and extract the state and reason.
Set any detected exit status and message.
:param detailed_job_info: the string returned from sacct
"""

# First entry contains the formatting tags, so we skip this
details = detailed_job_info.split('\n')[1:]
# When using --parsable with saact we get a string which is divided by |,
Expand All @@ -279,21 +283,29 @@ def _parse_detailed_job_info(self, detailed_job_info, exit_codes):

# Compose detailed job info dictionary
detailed_info = dict(zip(self._detailed_job_info_fields, info))
exit_status = int(detailed_info['ExitCode'].split(':')[0])
# exit_status = int(detailed_info['ExitCode'].split(':')[0])
# reason = detailed_info['Reason']
state = detailed_info['State']
reason = detailed_info['Reason']
exit_code = exit_codes.NO_ERROR
exit_code = (SchedulerExitCodes.NO_ERROR, {'exit_message': '',
'logger_message': ''})
if 'OUT_OF_MEMORY' in state:
exit_code = exit_codes.OUT_OF_MEMORY
exit_code = (SchedulerExitCodes.OUT_OF_MEMORY,
{'exit_message': 'Scheduler reported out of memory.',
'logger_message': 'Your job used more memory than you specified, or was available on your '
'hardware configuration.'})
return exit_code

def _parse_scheduler_files(self, scheduler_stdout, scheduler_stderr, exit_codes):
def _parse_scheduler_files(self, scheduler_stdout, scheduler_stderr):
"""
Parse the scheduler stdout and stderr files.
Set any detected exit status and message.
:param scheduler_stdout: the Slurm scheduler stdout file as a string
:param scheduler_stderr: the Slurm scheduler stderr file as a string
"""
return exit_codes.NO_ERROR
return (SchedulerExitCodes.NO_ERROR, {'exit_message': '',
'logger_message': ''})

def _get_submit_script_header(self, job_tmpl):
"""
Expand Down
14 changes: 4 additions & 10 deletions aiida/schedulers/plugins/test_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,11 @@ def test_detailed_job_info_parsing(self):
Test parsing of the detailed job info and that it returns
the correct exit code instance
"""
from aiida.orm import CalcJobNode
process_type = 'aiida.calculations:{}'.format('templatereplacer')
node = CalcJobNode(process_type=process_type)
node.computer = self.computer
node.set_detailed_job_info(DICT_DETAILED_JOB_INFO)
node.store()
node.seal()
detailed_job_info = DICT_DETAILED_JOB_INFO['stdout']
scheduler = SlurmScheduler()
exit_code = scheduler.parse(node)
self.assertEqual(exit_code.status, 50)
self.assertEqual(exit_code.message, 'scheduler reports out of memory')
exit_code = scheduler.parse(detailed_job_info, '', '')
self.assertEqual(exit_code[0].value, 101)
self.assertEqual(exit_code[1]['exit_message'], 'Scheduler reported out of memory.')


class TestParserSqueue(unittest.TestCase):
Expand Down
15 changes: 12 additions & 3 deletions aiida/schedulers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
###########################################################################
"""Implementation of `Scheduler` base class."""
from abc import abstractmethod
from enum import Enum

import aiida.common
from aiida.common.lang import classproperty
Expand All @@ -27,6 +28,12 @@ class SchedulerParsingError(SchedulerError):
pass


# Enumerator to host the exit code
class SchedulerExitCodes(Enum):
NO_ERROR = 0
OUT_OF_MEMORY = 101


class Scheduler:
"""
Base class for all schedulers.
Expand Down Expand Up @@ -286,15 +293,17 @@ def _get_detailed_job_info_command(self, job_id):
# pylint: disable=no-self-use,not-callable,unused-argument
raise FeatureNotAvailable('Cannot get detailed job info')

def parse(self, node): # pylint: disable=no-self-use
def parse(self, detailed_job_info, scheduler_stdout, scheduler_stderr): # pylint: disable=no-self-use, unused-argument
"""
Handles scheduler related parsing. Details need to be implemented
on the scheduler level. In case a parse function is not present for each
respective scheduler, we set the exit status and message to zero and empty.
:param node: the node that the scheduler is associated with
:param detailed_job_info: any string containing relevant detailed job information to be parsed
:param scheduler_stdout: the scheduler stdout file as a string
:param scheduler_stderr: the scheduler stderr file as a string
"""
return node.process_class.exit_codes.NO_ERROR
return (0, {'exit_message': '', 'logger_message': ''})

def get_detailed_job_info(self, job_id):
"""Return the detailed job info.
Expand Down

0 comments on commit b7a754e

Please sign in to comment.