Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler: Allow terminating job if submission script is invalid #5849

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions aiida/engine/daemon/execmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
the routines make reference to the suitable plugins for all
plugin-specific operations.
"""
from __future__ import annotations

from collections.abc import Mapping
from logging import LoggerAdapter
import os
Expand All @@ -27,6 +29,7 @@
from aiida.common.datastructures import CalcInfo
from aiida.common.folders import SandboxFolder
from aiida.common.links import LinkType
from aiida.engine.processes.exit_code import ExitCode
from aiida.manage.configuration import get_config_option
from aiida.orm import CalcJobNode, Code, FolderData, Node, PortableCode, RemoteData, load_node
from aiida.orm.utils.log import get_dblogger_extra
Expand Down Expand Up @@ -351,7 +354,7 @@ def upload_calculation(
remotedata.store()


def submit_calculation(calculation: CalcJobNode, transport: Transport) -> str:
def submit_calculation(calculation: CalcJobNode, transport: Transport) -> str | ExitCode:
ltalirz marked this conversation as resolved.
Show resolved Hide resolved
"""Submit a previously uploaded `CalcJob` to the scheduler.

:param calculation: the instance of CalcJobNode to submit.
Expand All @@ -373,10 +376,12 @@ def submit_calculation(calculation: CalcJobNode, transport: Transport) -> str:

submit_script_filename = calculation.get_option('submit_script_filename')
workdir = calculation.get_remote_workdir()
job_id = scheduler.submit_from_script(workdir, submit_script_filename)
calculation.set_job_id(job_id)
result = scheduler.submit_from_script(workdir, submit_script_filename)

if isinstance(result, str):
calculation.set_job_id(result)

return job_id
return result


def stash_calculation(calculation: CalcJobNode, transport: Transport) -> None:
Expand Down
12 changes: 8 additions & 4 deletions aiida/engine/processes/calcjobs/calcjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ def parse_scheduler_output(self, retrieved: orm.Node) -> Optional[ExitCode]:
self.logger.warning('could not determine `stderr` filename because `scheduler_stderr` option was not set.')
else:
try:
scheduler_stderr = retrieved.base.repository.get_object_content(filename_stderr)
scheduler_stderr = retrieved.base.repository.get_object_content(filename_stderr, mode='r')
except FileNotFoundError:
scheduler_stderr = None
self.logger.warning(f'could not parse scheduler output: the `{filename_stderr}` file is missing')
Expand All @@ -740,13 +740,17 @@ def parse_scheduler_output(self, retrieved: orm.Node) -> Optional[ExitCode]:
self.logger.warning('could not determine `stdout` filename because `scheduler_stdout` option was not set.')
else:
try:
scheduler_stdout = retrieved.base.repository.get_object_content(filename_stdout)
scheduler_stdout = retrieved.base.repository.get_object_content(filename_stdout, mode='r')
except FileNotFoundError:
scheduler_stdout = None
self.logger.warning(f'could not parse scheduler output: the `{filename_stdout}` file is missing')

try:
exit_code = scheduler.parse_output(detailed_job_info, scheduler_stdout, scheduler_stderr)
exit_code = scheduler.parse_output(
detailed_job_info,
scheduler_stdout or '', # type: ignore[arg-type]
scheduler_stderr or '', # type: ignore[arg-type]
)
except exceptions.FeatureNotAvailable:
self.logger.info(f'`{scheduler.__class__.__name__}` does not implement scheduler output parsing')
return None
Expand Down Expand Up @@ -881,7 +885,7 @@ def presubmit(self, folder: Folder) -> CalcInfo:

# Set resources, also with get_default_mpiprocs_per_machine
resources = self.node.get_option('resources')
scheduler.preprocess_resources(resources, computer.get_default_mpiprocs_per_machine())
scheduler.preprocess_resources(resources or {}, computer.get_default_mpiprocs_per_machine())
job_tmpl.job_resource = scheduler.create_job_resource(**resources) # type: ignore

subst_dict = {'tot_num_mpiprocs': job_tmpl.job_resource.get_tot_num_mpiprocs()}
Expand Down
8 changes: 7 additions & 1 deletion aiida/engine/processes/calcjobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,13 @@ async def execute(self) -> plumpy.process_states.State: # type: ignore[override
result = self.submit()

elif self._command == SUBMIT_COMMAND:
await self._launch_task(task_submit_job, node, transport_queue)
result = await self._launch_task(task_submit_job, node, transport_queue)

if isinstance(result, ExitCode):
# The scheduler plugin returned an exit code from ``Scheduler.submit_from_script`` indicating the
# job submission failed due to a non-transient problem and the job should be terminated.
return self.create_state(ProcessState.RUNNING, self.process.terminate, result)

result = self.update()

elif self._command == UPDATE_COMMAND:
Expand Down
90 changes: 53 additions & 37 deletions aiida/schedulers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,18 @@
# For further information please visit http://www.aiida.net #
###########################################################################
"""Implementation of `Scheduler` base class."""
from __future__ import annotations

import abc
from typing import Dict, Type
import typing as t

from aiida.common import exceptions, log
from aiida.common.datastructures import CodeRunMode
from aiida.common.escaping import escape_for_bash
from aiida.common.lang import classproperty
from aiida.schedulers.datastructures import JobResource, JobTemplate
from aiida.engine.processes.exit_code import ExitCode
from aiida.schedulers.datastructures import JobInfo, JobResource, JobTemplate, JobTemplateCodeInfo
from aiida.transports import Transport

__all__ = ('Scheduler', 'SchedulerError', 'SchedulerParsingError')

Expand All @@ -37,26 +42,26 @@ class Scheduler(metaclass=abc.ABCMeta):
# 'can_query_by_user': True if I can pass the 'user' argument to
# get_joblist_command (and in this case, no 'jobs' should be given).
# Otherwise, if False, a list of jobs is passed, and no 'user' is given.
_features: Dict[str, bool] = {}
_features: dict[str, bool] = {}

# The class to be used for the job resource.
_job_resource_class: Type[JobResource] = None # type: ignore
_job_resource_class: t.Type[JobResource] | None = None

def __str__(self):
return self.__class__.__name__

@classmethod
def preprocess_resources(cls, resources, default_mpiprocs_per_machine=None):
def preprocess_resources(cls, resources: dict[str, t.Any], default_mpiprocs_per_machine: int | None = None):
"""Pre process the resources.

Add the `num_mpiprocs_per_machine` key to the `resources` if it is not already defined and it cannot be deduced
from the `num_machines` and `tot_num_mpiprocs` being defined. The value is also not added if the job resource
class of this scheduler does not accept the `num_mpiprocs_per_machine` keyword. Note that the changes are made
in place to the `resources` argument passed.
"""
num_machines = resources.get('num_machines', None)
tot_num_mpiprocs = resources.get('tot_num_mpiprocs', None)
num_mpiprocs_per_machine = resources.get('num_mpiprocs_per_machine', None)
num_machines: int | None = resources.get('num_machines', None)
tot_num_mpiprocs: int | None = resources.get('tot_num_mpiprocs', None)
num_mpiprocs_per_machine: int | None = resources.get('num_mpiprocs_per_machine', None)

if (
num_mpiprocs_per_machine is None and cls.job_resource_class.accepts_default_mpiprocs_per_machine() # pylint: disable=no-member
Expand All @@ -71,14 +76,13 @@ def validate_resources(cls, **resources):
:param resources: keyword arguments to define the job resources
:raises ValueError: if the resources are invalid or incomplete
"""
assert cls._job_resource_class is not None and issubclass(cls._job_resource_class, JobResource)
cls._job_resource_class.validate_resources(**resources)

def __init__(self):
assert self._job_resource_class is not None and issubclass(self._job_resource_class, JobResource)
self._transport = None

if not issubclass(self._job_resource_class, JobResource):
raise RuntimeError('the class attribute `_job_resource_class` is not a subclass of `JobResource`.')

@classmethod
def get_short_doc(cls):
"""Return the first non-empty line of the class docstring, if available."""
Expand Down Expand Up @@ -108,16 +112,17 @@ def logger(self):
raise exceptions.InternalError('No self._logger configured for {}!')

@classproperty
def job_resource_class(cls) -> Type[JobResource]: # pylint: disable=no-self-argument
def job_resource_class(cls) -> t.Type[JobResource]: # pylint: disable=no-self-argument
assert cls._job_resource_class is not None and issubclass(cls._job_resource_class, JobResource)
return cls._job_resource_class

@classmethod
def create_job_resource(cls, **kwargs):
"""Create a suitable job resource from the kwargs specified."""
# pylint: disable=not-callable
return cls._job_resource_class(**kwargs)
assert cls._job_resource_class is not None and issubclass(cls._job_resource_class, JobResource)
return cls._job_resource_class(**kwargs) # pylint: disable=not-callable

def get_submit_script(self, job_tmpl):
def get_submit_script(self, job_tmpl: JobTemplate) -> str:
"""Return the submit script as a string.

:parameter job_tmpl: a `aiida.schedulers.datastrutures.JobTemplate` instance.
Expand Down Expand Up @@ -164,14 +169,14 @@ def get_submit_script(self, job_tmpl):
script_lines.append(job_tmpl.append_text)
script_lines.append(empty_line)

footer = self._get_submit_script_footer(job_tmpl) # pylint: disable=assignment-from-none
footer = self._get_submit_script_footer(job_tmpl)
if footer:
script_lines.append(footer)
script_lines.append(empty_line)

return '\n'.join(script_lines)

def _get_submit_script_environment_variables(self, template): # pylint: disable=no-self-use
def _get_submit_script_environment_variables(self, template: JobTemplate) -> str: # pylint: disable=no-self-use
"""Return the part of the submit script header that defines environment variables.

:parameter template: a `aiida.schedulers.datastrutures.JobTemplate` instance.
Expand All @@ -190,21 +195,23 @@ def _get_submit_script_environment_variables(self, template): # pylint: disable
return '\n'.join(lines)

@abc.abstractmethod
def _get_submit_script_header(self, job_tmpl):
def _get_submit_script_header(self, job_tmpl: JobTemplate) -> str:
"""Return the submit script header, using the parameters from the job template.

:param job_tmpl: a `JobTemplate` instance with relevant parameters set.
:return: string with the submission script header.
"""

def _get_submit_script_footer(self, job_tmpl):
def _get_submit_script_footer(self, job_tmpl: JobTemplate) -> str:
"""Return the submit script final part, using the parameters from the job template.

:param job_tmpl: a `JobTemplate` instance with relevant parameters set.
:return: string with the submission script footer.
"""
# pylint: disable=no-self-use,unused-argument
return None
return ''

def _get_run_line(self, codes_info, codes_run_mode):
def _get_run_line(self, codes_info: list[JobTemplateCodeInfo], codes_run_mode: CodeRunMode) -> str:
"""Return a string with the line to execute a specific code with specific arguments.

:parameter codes_info: a list of `aiida.scheduler.datastructures.JobTemplateCodeInfo` objects.
Expand All @@ -215,8 +222,6 @@ def _get_run_line(self, codes_info, codes_run_mode):
to launch the multiple codes.
:return: string with format: [executable] [args] {[ < stdin ]} {[ < stdout ]} {[2>&1 | 2> stderr]}
"""
from aiida.common.datastructures import CodeRunMode

list_of_runlines = []

for code_info in codes_info:
Expand Down Expand Up @@ -259,7 +264,7 @@ def _get_run_line(self, codes_info, codes_run_mode):
raise NotImplementedError('Unrecognized code run mode')

@abc.abstractmethod
def _get_joblist_command(self, jobs=None, user=None):
def _get_joblist_command(self, jobs: list[str] | None = None, user: str | None = None) -> str:
"""Return the command to get the most complete description possible of currently active jobs.

.. note::
Expand All @@ -271,7 +276,7 @@ def _get_joblist_command(self, jobs=None, user=None):
:param user: either None, or a string with the username (to show only jobs of the specific user).
"""

def _get_detailed_job_info_command(self, job_id):
def _get_detailed_job_info_command(self, job_id: str) -> dict[str, t.Any]:
"""Return the command to run to get detailed information for a given job.

This is typically called after the job has finished, to retrieve the most detailed information possible about
Expand All @@ -283,7 +288,7 @@ def _get_detailed_job_info_command(self, job_id):
# pylint: disable=no-self-use,not-callable,unused-argument
raise exceptions.FeatureNotAvailable('Cannot get detailed job info')

def get_detailed_job_info(self, job_id):
def get_detailed_job_info(self, job_id: str) -> dict[str, str | int]:
"""Return the detailed job info.

This will be a dictionary with the return value, stderr and stdout content returned by calling the command that
Expand All @@ -305,13 +310,18 @@ def get_detailed_job_info(self, job_id):
return detailed_job_info

@abc.abstractmethod
def _parse_joblist_output(self, retval, stdout, stderr):
def _parse_joblist_output(self, retval: int, stdout: str, stderr: str) -> list[JobInfo]:
"""Parse the joblist output as returned by executing the command returned by `_get_joblist_command` method.

:return: list of `JobInfo` objects, one of each job each with at least its default params implemented.
"""

def get_jobs(self, jobs=None, user=None, as_dict=False):
def get_jobs(
self,
jobs: list[str] | None = None,
user: str | None = None,
as_dict: bool = False,
) -> list[JobInfo] | dict[str, JobInfo]:
"""Return the list of currently active jobs.

.. note:: typically, only either jobs or user can be specified. See also comments in `_get_joblist_command`.
Expand Down Expand Up @@ -342,15 +352,15 @@ def transport(self):

return self._transport

def set_transport(self, transport):
def set_transport(self, transport: Transport):
"""Set the transport to be used to query the machine or to submit scripts.

This class assumes that the transport is open and active.
"""
self._transport = transport

@abc.abstractmethod
def _get_submit_command(self, submit_script):
def _get_submit_command(self, submit_script: str) -> str:
"""Return the string to execute to submit a given script.

.. warning:: the `submit_script` should already have been bash-escaped
Expand All @@ -360,13 +370,14 @@ def _get_submit_command(self, submit_script):
"""

@abc.abstractmethod
def _parse_submit_output(self, retval, stdout, stderr):
def _parse_submit_output(self, retval: int, stdout: str, stderr: str) -> str | ExitCode:
"""Parse the output of the submit command returned by calling the `_get_submit_command` command.

:return: a string with the job ID.
:return: a string with the job ID or an exit code if the submission failed because the submission script is
invalid and the job should be terminated.
"""

def submit_from_script(self, working_directory, submit_script):
def submit_from_script(self, working_directory: str, submit_script: str) -> str | ExitCode:
"""Submit the submission script to the scheduler.

:return: return a string with the job ID in a valid format to be used for querying.
Expand All @@ -375,7 +386,7 @@ def submit_from_script(self, working_directory, submit_script):
result = self.transport.exec_command_wait(self._get_submit_command(escape_for_bash(submit_script)))
return self._parse_submit_output(*result)

def kill(self, jobid):
def kill(self, jobid: str) -> bool:
"""Kill a remote job and parse the return value of the scheduler to check if the command succeeded.

..note::
Expand All @@ -390,17 +401,22 @@ def kill(self, jobid):
return self._parse_kill_output(retval, stdout, stderr)

@abc.abstractmethod
def _get_kill_command(self, jobid):
def _get_kill_command(self, jobid: str) -> str:
"""Return the command to kill the job with specified jobid."""

@abc.abstractmethod
def _parse_kill_output(self, retval, stdout, stderr):
def _parse_kill_output(self, retval: int, stdout: str, stderr: str) -> bool:
"""Parse the output of the kill command.

:return: True if everything seems ok, False otherwise.
"""

def parse_output(self, detailed_job_info=None, stdout=None, stderr=None):
def parse_output(
self,
detailed_job_info: dict[str, str | int] | None = None,
stdout: str | None = None,
stderr: str | None = None
) -> ExitCode | None:
"""Parse the output of the scheduler.

:param detailed_job_info: dictionary with the output returned by the `Scheduler.get_detailed_job_info` command.
Expand Down
20 changes: 20 additions & 0 deletions tests/engine/processes/calcjobs/test_calc_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1172,6 +1172,26 @@ def test_monitor_result_action_disable_self(get_calcjob_builder, entry_points, c
assert len([record for record in caplog.records if 'Disable self.' in record.message]) == 1


def test_submit_return_exit_code(get_calcjob_builder, monkeypatch):
"""Test that a job is terminated if ``Scheduler.submit_from_script`` returns an exit code.

To simulate this situation we monkeypatch ``DirectScheduler._parse_submit_output`` because that is the method that
is called internally by ``Scheduler.submit_from_script`` and it returns its result, and the ``DirectScheduler`` is
the plugin that is used by the localhost computer used in the inputs for this calcjob.
"""
from aiida.schedulers.plugins.direct import DirectScheduler

def _parse_submit_output(self, *args): # pylint: disable=unused-argument
return ExitCode(418)

monkeypatch.setattr(DirectScheduler, '_parse_submit_output', _parse_submit_output)

builder = get_calcjob_builder()
_, node = launch.run_get_node(builder)
assert node.is_failed, (node.process_state, node.exit_status)
assert node.exit_status == 418


class TestImport:
"""Test the functionality to import existing calculations completed outside of AiiDA."""

Expand Down