Skip to content

Commit

Permalink
Implement parse_output for the SlurmScheduler plugin
Browse files Browse the repository at this point in the history
This implements the `Scheduler.parse_output` method that allows parsing
the detailed job info that is retrieved from the scheduler when a job is
finished. For the time being, only an out-of-memory error is detected
and the corresponding exit code is returned.
  • Loading branch information
sphuber committed Sep 10, 2020
1 parent e974d3f commit a1b5729
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 8 deletions.
52 changes: 52 additions & 0 deletions aiida/schedulers/plugins/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import re

from aiida.common.escaping import escape_for_bash
from aiida.common.lang import type_check
from aiida.schedulers import Scheduler, SchedulerError
from aiida.schedulers.datastructures import (JobInfo, JobState, NodeNumberJobResource)

Expand Down Expand Up @@ -752,3 +753,54 @@ def _parse_kill_output(self, retval, stdout, stderr):
)

return True

def parse_output(self, detailed_job_info, stdout, stderr): # pylint: disable=inconsistent-return-statements
"""Parse the output of the scheduler.
:param detailed_job_info: dictionary with the output returned by the `Scheduler.get_detailed_job_info` command.
This should contain the keys `retval`, `stdout` and `stderr` corresponding to the return value, stdout and
stderr returned by the accounting command executed for a specific job id.
:param stdout: string with the output written by the scheduler to stdout
:param stderr: string with the output written by the scheduler to stderr
:return: None or an instance of `aiida.engine.processes.exit_code.ExitCode`
:raises TypeError or ValueError: if the passed arguments have incorrect type or value
"""
from aiida.engine import CalcJob

type_check(detailed_job_info, dict)

try:
detailed_stdout = detailed_job_info['stdout']
except KeyError:
raise ValueError('the `detailed_job_info` does not contain the required key `stdout`.')

type_check(detailed_stdout, str)

# The format of the detailed job info should be a multiline string, where the first line is the header, with
# the labels of the projected attributes. The following line should be the values of those attributes for the
# entire job. Any additional lines correspond to those values for any additional tasks that were run.
lines = detailed_stdout.splitlines()

try:
master = lines[1]
except IndexError:
raise ValueError('the `detailed_job_info.stdout` contained less than two lines.')

attributes = master.split('|')

# Pop the last element if it is empty. This happens if the `master` string just finishes with a pipe
if not attributes[-1]:
attributes.pop()

if len(self._detailed_job_info_fields) != len(attributes):
raise ValueError(
'second line in `detailed_job_info.stdout` differs in length with schedulers `_detailed_job_info_fields'
)

data = dict(zip(self._detailed_job_info_fields, attributes))

if data['State'] == 'OUT_OF_MEMORY':
return CalcJob.exit_codes.ERROR_SCHEDULER_OUT_OF_MEMORY # pylint: disable=no-member

if data['State'] == 'TIMEOUT':
return CalcJob.exit_codes.ERROR_SCHEDULER_OUT_OF_WALLTIME # pylint: disable=no-member
56 changes: 48 additions & 8 deletions tests/schedulers/test_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import uuid
import datetime

import pytest

from aiida.schedulers.plugins.slurm import SlurmScheduler, JobState
from aiida.schedulers import SchedulerError

Expand Down Expand Up @@ -198,8 +200,7 @@ def test_submit_script(self):
self.assertTrue('#SBATCH --time=1-00:00:00' in submit_script_text)
self.assertTrue('#SBATCH --nodes=1' in submit_script_text)

self.assertTrue("'mpirun' '-np' '23' 'pw.x' '-npool' '1'" + \
" < 'aiida.in'" in submit_script_text)
self.assertTrue("'mpirun' '-np' '23' 'pw.x' '-npool' '1'" + " < 'aiida.in'" in submit_script_text)

def test_submit_script_bad_shebang(self):
"""Test that first line of submit script is as expected."""
Expand Down Expand Up @@ -257,8 +258,7 @@ def test_submit_script_with_num_cores_per_machine(self): # pylint: disable=inva
self.assertTrue('#SBATCH --ntasks-per-node=2' in submit_script_text)
self.assertTrue('#SBATCH --cpus-per-task=12' in submit_script_text)

self.assertTrue("'mpirun' '-np' '23' 'pw.x' '-npool' '1'" + \
" < 'aiida.in'" in submit_script_text)
self.assertTrue("'mpirun' '-np' '23' 'pw.x' '-npool' '1'" + " < 'aiida.in'" in submit_script_text)

def test_submit_script_with_num_cores_per_mpiproc(self): # pylint: disable=invalid-name
"""
Expand Down Expand Up @@ -290,8 +290,7 @@ def test_submit_script_with_num_cores_per_mpiproc(self): # pylint: disable=inva
self.assertTrue('#SBATCH --ntasks-per-node=1' in submit_script_text)
self.assertTrue('#SBATCH --cpus-per-task=24' in submit_script_text)

self.assertTrue("'mpirun' '-np' '23' 'pw.x' '-npool' '1'" + \
" < 'aiida.in'" in submit_script_text)
self.assertTrue("'mpirun' '-np' '23' 'pw.x' '-npool' '1'" + " < 'aiida.in'" in submit_script_text)

def test_submit_script_with_num_cores_per_machine_and_mpiproc1(self): # pylint: disable=invalid-name
"""
Expand Down Expand Up @@ -326,8 +325,7 @@ def test_submit_script_with_num_cores_per_machine_and_mpiproc1(self): # pylint:
self.assertTrue('#SBATCH --ntasks-per-node=1' in submit_script_text)
self.assertTrue('#SBATCH --cpus-per-task=24' in submit_script_text)

self.assertTrue("'mpirun' '-np' '23' 'pw.x' '-npool' '1'" + \
" < 'aiida.in'" in submit_script_text)
self.assertTrue("'mpirun' '-np' '23' 'pw.x' '-npool' '1'" + " < 'aiida.in'" in submit_script_text)

def test_submit_script_with_num_cores_per_machine_and_mpiproc2(self): # pylint: disable=invalid-name
"""
Expand Down Expand Up @@ -367,3 +365,45 @@ def test_joblist_multi(self):
command = scheduler._get_joblist_command(jobs=['123', '456']) # pylint: disable=protected-access
self.assertIn('123,456', command)
self.assertNotIn('456,456', command)


def test_parse_out_of_memory():
"""Test that for job that failed due to OOM `parse_output` return the `ERROR_SCHEDULER_OUT_OF_MEMORY` code."""
from aiida.engine import CalcJob

scheduler = SlurmScheduler()
stdout = ''
stderr = ''
detailed_job_info = {
'retval': 0,
'stderr': '',
'stdout': """||||||||||||||||||||||||||||||||||||||||||||||||||
|||||||||||||||||||||||||||||||||||||||||OUT_OF_MEMORY|||||||||"""
} # yapf: disable

exit_code = scheduler.parse_output(detailed_job_info, stdout, stderr)
assert exit_code == CalcJob.exit_codes.ERROR_SCHEDULER_OUT_OF_MEMORY # pylint: disable=no-member


@pytest.mark.parametrize('detailed_job_info, expected', [
('string', TypeError), # Not a dictionary
({'stderr': ''}, ValueError), # Key `stdout` missing
({'stdout': None}, TypeError), # `stdout` is not a string
({'stdout': ''}, ValueError), # `stdout` does not contain at least two lines
({'stdout': 'Header\nValue'}, ValueError), # `stdout` second line contains too few elements separated by pipe
]) # yapf: disable
def test_parse_output_invalid(detailed_job_info, expected):
"""Test `SlurmScheduler.parse_output` for various invalid arguments."""
scheduler = SlurmScheduler()

with pytest.raises(expected):
scheduler.parse_output(detailed_job_info, '', '')


def test_parse_output_valid():
"""Test `SlurmScheduler.parse_output` for valid arguments."""
number_of_fields = len(SlurmScheduler._detailed_job_info_fields) # pylint: disable=protected-access
detailed_job_info = {'stdout': 'Header\n{}'.format('|' * number_of_fields)}
scheduler = SlurmScheduler()

assert scheduler.parse_output(detailed_job_info, '', '') is None

0 comments on commit a1b5729

Please sign in to comment.