Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Scheduler.parse_output for the SlurmScheduler plugin #3931

Merged
merged 1 commit into from
Sep 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions aiida/schedulers/plugins/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import re

from aiida.common.escaping import escape_for_bash
from aiida.common.lang import type_check
from aiida.schedulers import Scheduler, SchedulerError
from aiida.schedulers.datastructures import (JobInfo, JobState, NodeNumberJobResource)

Expand Down Expand Up @@ -752,3 +753,54 @@ def _parse_kill_output(self, retval, stdout, stderr):
)

return True

def parse_output(self, detailed_job_info, stdout, stderr): # pylint: disable=inconsistent-return-statements
"""Parse the output of the scheduler.

:param detailed_job_info: dictionary with the output returned by the `Scheduler.get_detailed_job_info` command.
This should contain the keys `retval`, `stdout` and `stderr` corresponding to the return value, stdout and
stderr returned by the accounting command executed for a specific job id.
:param stdout: string with the output written by the scheduler to stdout
:param stderr: string with the output written by the scheduler to stderr
:return: None or an instance of `aiida.engine.processes.exit_code.ExitCode`
:raises TypeError or ValueError: if the passed arguments have incorrect type or value
"""
from aiida.engine import CalcJob

type_check(detailed_job_info, dict)

try:
detailed_stdout = detailed_job_info['stdout']
except KeyError:
raise ValueError('the `detailed_job_info` does not contain the required key `stdout`.')

type_check(detailed_stdout, str)

# The format of the detailed job info should be a multiline string, where the first line is the header, with
# the labels of the projected attributes. The following line should be the values of those attributes for the
# entire job. Any additional lines correspond to those values for any additional tasks that were run.
lines = detailed_stdout.splitlines()

try:
master = lines[1]
except IndexError:
raise ValueError('the `detailed_job_info.stdout` contained less than two lines.')

attributes = master.split('|')

# Pop the last element if it is empty. This happens if the `master` string just finishes with a pipe
if not attributes[-1]:
attributes.pop()

if len(self._detailed_job_info_fields) != len(attributes):
raise ValueError(
'second line in `detailed_job_info.stdout` differs in length with schedulers `_detailed_job_info_fields'
)

data = dict(zip(self._detailed_job_info_fields, attributes))

if data['State'] == 'OUT_OF_MEMORY':
return CalcJob.exit_codes.ERROR_SCHEDULER_OUT_OF_MEMORY # pylint: disable=no-member

if data['State'] == 'TIMEOUT':
return CalcJob.exit_codes.ERROR_SCHEDULER_OUT_OF_WALLTIME # pylint: disable=no-member
56 changes: 48 additions & 8 deletions tests/schedulers/test_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import uuid
import datetime

import pytest

from aiida.schedulers.plugins.slurm import SlurmScheduler, JobState
from aiida.schedulers import SchedulerError

Expand Down Expand Up @@ -198,8 +200,7 @@ def test_submit_script(self):
self.assertTrue('#SBATCH --time=1-00:00:00' in submit_script_text)
self.assertTrue('#SBATCH --nodes=1' in submit_script_text)

self.assertTrue("'mpirun' '-np' '23' 'pw.x' '-npool' '1'" + \
" < 'aiida.in'" in submit_script_text)
self.assertTrue("'mpirun' '-np' '23' 'pw.x' '-npool' '1'" + " < 'aiida.in'" in submit_script_text)

def test_submit_script_bad_shebang(self):
"""Test that first line of submit script is as expected."""
Expand Down Expand Up @@ -257,8 +258,7 @@ def test_submit_script_with_num_cores_per_machine(self): # pylint: disable=inva
self.assertTrue('#SBATCH --ntasks-per-node=2' in submit_script_text)
self.assertTrue('#SBATCH --cpus-per-task=12' in submit_script_text)

self.assertTrue("'mpirun' '-np' '23' 'pw.x' '-npool' '1'" + \
" < 'aiida.in'" in submit_script_text)
self.assertTrue("'mpirun' '-np' '23' 'pw.x' '-npool' '1'" + " < 'aiida.in'" in submit_script_text)

def test_submit_script_with_num_cores_per_mpiproc(self): # pylint: disable=invalid-name
"""
Expand Down Expand Up @@ -290,8 +290,7 @@ def test_submit_script_with_num_cores_per_mpiproc(self): # pylint: disable=inva
self.assertTrue('#SBATCH --ntasks-per-node=1' in submit_script_text)
self.assertTrue('#SBATCH --cpus-per-task=24' in submit_script_text)

self.assertTrue("'mpirun' '-np' '23' 'pw.x' '-npool' '1'" + \
" < 'aiida.in'" in submit_script_text)
self.assertTrue("'mpirun' '-np' '23' 'pw.x' '-npool' '1'" + " < 'aiida.in'" in submit_script_text)

def test_submit_script_with_num_cores_per_machine_and_mpiproc1(self): # pylint: disable=invalid-name
"""
Expand Down Expand Up @@ -326,8 +325,7 @@ def test_submit_script_with_num_cores_per_machine_and_mpiproc1(self): # pylint:
self.assertTrue('#SBATCH --ntasks-per-node=1' in submit_script_text)
self.assertTrue('#SBATCH --cpus-per-task=24' in submit_script_text)

self.assertTrue("'mpirun' '-np' '23' 'pw.x' '-npool' '1'" + \
" < 'aiida.in'" in submit_script_text)
self.assertTrue("'mpirun' '-np' '23' 'pw.x' '-npool' '1'" + " < 'aiida.in'" in submit_script_text)

def test_submit_script_with_num_cores_per_machine_and_mpiproc2(self): # pylint: disable=invalid-name
"""
Expand Down Expand Up @@ -367,3 +365,45 @@ def test_joblist_multi(self):
command = scheduler._get_joblist_command(jobs=['123', '456']) # pylint: disable=protected-access
self.assertIn('123,456', command)
self.assertNotIn('456,456', command)


def test_parse_out_of_memory():
"""Test that for job that failed due to OOM `parse_output` return the `ERROR_SCHEDULER_OUT_OF_MEMORY` code."""
from aiida.engine import CalcJob

scheduler = SlurmScheduler()
stdout = ''
stderr = ''
detailed_job_info = {
'retval': 0,
'stderr': '',
'stdout': """||||||||||||||||||||||||||||||||||||||||||||||||||
|||||||||||||||||||||||||||||||||||||||||OUT_OF_MEMORY|||||||||"""
} # yapf: disable

exit_code = scheduler.parse_output(detailed_job_info, stdout, stderr)
assert exit_code == CalcJob.exit_codes.ERROR_SCHEDULER_OUT_OF_MEMORY # pylint: disable=no-member


@pytest.mark.parametrize('detailed_job_info, expected', [
('string', TypeError), # Not a dictionary
({'stderr': ''}, ValueError), # Key `stdout` missing
({'stdout': None}, TypeError), # `stdout` is not a string
({'stdout': ''}, ValueError), # `stdout` does not contain at least two lines
({'stdout': 'Header\nValue'}, ValueError), # `stdout` second line contains too few elements separated by pipe
]) # yapf: disable
def test_parse_output_invalid(detailed_job_info, expected):
"""Test `SlurmScheduler.parse_output` for various invalid arguments."""
scheduler = SlurmScheduler()

with pytest.raises(expected):
scheduler.parse_output(detailed_job_info, '', '')


def test_parse_output_valid():
"""Test `SlurmScheduler.parse_output` for valid arguments."""
number_of_fields = len(SlurmScheduler._detailed_job_info_fields) # pylint: disable=protected-access
detailed_job_info = {'stdout': 'Header\n{}'.format('|' * number_of_fields)}
scheduler = SlurmScheduler()

assert scheduler.parse_output(detailed_job_info, '', '') is None