diff --git a/conftest.py b/conftest.py index b0457522c..991c0d17b 100644 --- a/conftest.py +++ b/conftest.py @@ -120,7 +120,7 @@ def print_test_configuration() -> None: def pytest_configure() -> None: pytest.test_launcher = test_launcher - pytest.wlm_options = ["slurm", "pbs", "lsf", "pals", "dragon"] + pytest.wlm_options = ["slurm", "pbs", "lsf", "pals", "dragon", "sge"] account = get_account() pytest.test_account = account pytest.test_device = test_device diff --git a/doc/changelog.md b/doc/changelog.md index 9b0f8f085..adbcb57b5 100644 --- a/doc/changelog.md +++ b/doc/changelog.md @@ -15,13 +15,19 @@ To be released at some future point in time Description +- New launcher support for SGE (and similar derivatives) - Fix test outputs being created in incorrect directory - Improve support for building SmartSim without ML backends - Update packaging dependency Detailed Notes -- Ensure ouputs from tests are written to temporary `tests/test_output` directory +- SGE is now a supported launcher for SmartSim. Users can now define + BatchSettings which will be monitored by the TaskManager. Additionally, + if the MPI implementation was built with SGE support, Orchestrators can + use `mpirun` without needing to specify the hosts + ([SmartSim-PR610](https://github.com/CrayLabs/SmartSim/pull/610)) +- Ensure outputs from tests are written to temporary `tests/test_output` directory - Fix an error that would prevent ``smart build`` from moving a successfully compiled RedisAI shared object to the install location expected by SmartSim if no ML backend installations were found. Previously, this would effectively diff --git a/smartsim/_core/_cli/validate.py b/smartsim/_core/_cli/validate.py index 96d46d6ee..6d7c72f17 100644 --- a/smartsim/_core/_cli/validate.py +++ b/smartsim/_core/_cli/validate.py @@ -215,7 +215,7 @@ def _test_tf_install(client: Client, tmp_dir: str, device: Device) -> None: # do not need the sending connection in this proc anymore send_conn.close() - proc.join(timeout=120) + proc.join(timeout=600) if proc.is_alive(): proc.terminate() raise Exception("Failed to build a simple keras model within 2 minutes") diff --git a/smartsim/_core/control/controller.py b/smartsim/_core/control/controller.py index 43a218545..0b943ee90 100644 --- a/smartsim/_core/control/controller.py +++ b/smartsim/_core/control/controller.py @@ -72,6 +72,7 @@ LocalLauncher, LSFLauncher, PBSLauncher, + SGELauncher, SlurmLauncher, ) from ..launcher.launcher import Launcher @@ -343,6 +344,7 @@ def init_launcher(self, launcher: str) -> None: "lsf": LSFLauncher, "local": LocalLauncher, "dragon": DragonLauncher, + "sge": SGELauncher, } if launcher is not None: diff --git a/smartsim/_core/launcher/__init__.py b/smartsim/_core/launcher/__init__.py index d78909641..c6584ee3d 100644 --- a/smartsim/_core/launcher/__init__.py +++ b/smartsim/_core/launcher/__init__.py @@ -29,6 +29,7 @@ from .local.local import LocalLauncher from .lsf.lsfLauncher import LSFLauncher from .pbs.pbsLauncher import PBSLauncher +from .sge.sgeLauncher import SGELauncher from .slurm.slurmLauncher import SlurmLauncher __all__ = [ @@ -37,5 +38,6 @@ "LocalLauncher", "LSFLauncher", "PBSLauncher", + "SGELauncher", "SlurmLauncher", ] diff --git a/smartsim/_core/launcher/lsf/lsfCommands.py b/smartsim/_core/launcher/lsf/lsfCommands.py index cb92587c1..0b98abf58 100644 --- a/smartsim/_core/launcher/lsf/lsfCommands.py +++ b/smartsim/_core/launcher/lsf/lsfCommands.py @@ -26,7 +26,7 @@ import typing as t -from ..util.shell import execute_cmd +from ...utils.shell import execute_cmd def bjobs(args: t.List[str]) -> t.Tuple[str, str]: diff --git a/smartsim/_core/launcher/pbs/pbsCommands.py b/smartsim/_core/launcher/pbs/pbsCommands.py index 989af93be..2a8fcf872 100644 --- a/smartsim/_core/launcher/pbs/pbsCommands.py +++ b/smartsim/_core/launcher/pbs/pbsCommands.py @@ -26,7 +26,7 @@ import typing as t -from ..util.shell import execute_cmd +from ...utils.shell import execute_cmd def qstat(args: t.List[str]) -> t.Tuple[str, str]: diff --git a/smartsim/_core/launcher/sge/__init__.py b/smartsim/_core/launcher/sge/__init__.py new file mode 100644 index 000000000..efe03908e --- /dev/null +++ b/smartsim/_core/launcher/sge/__init__.py @@ -0,0 +1,25 @@ +# BSD 2-Clause License +# +# Copyright (c) 2021-2024, Hewlett Packard Enterprise +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/smartsim/_core/launcher/sge/sgeCommands.py b/smartsim/_core/launcher/sge/sgeCommands.py new file mode 100644 index 000000000..a284ee8db --- /dev/null +++ b/smartsim/_core/launcher/sge/sgeCommands.py @@ -0,0 +1,77 @@ +# BSD 2-Clause License +# +# Copyright (c) 2021-2024, Hewlett Packard Enterprise +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import typing as t + +from ...utils.shell import execute_cmd + + +def qstat(args: t.List[str]) -> t.Tuple[str, str]: + """Calls SGE qstat with args + + :param args: List of command arguments + :returns: Output and error of qstat + """ + cmd = ["qstat"] + args + _, out, error = execute_cmd(cmd) + return out, error + + +def qsub(args: t.List[str]) -> t.Tuple[str, str]: + """Calls SGE qsub with args + + :param args: List of command arguments + :returns: Output and error of salloc + """ + cmd = ["qsub"] + args + _, out, error = execute_cmd(cmd) + return out, error + + +def qdel(args: t.List[str]) -> t.Tuple[int, str, str]: + """Calls SGE qdel with args. + + returncode is also supplied in this function. + + :param args: list of command arguments + :return: output and error + """ + cmd = ["qdel"] + args + returncode, out, error = execute_cmd(cmd) + return returncode, out, error + + +def qacct(args: t.List[str]) -> t.Tuple[int, str, str]: + """Calls SGE qacct with args. + + returncode is also supplied in this function. + + :param args: list of command arguments + :return: output and error + """ + cmd = ["qacct"] + args + returncode, out, error = execute_cmd(cmd) + return returncode, out, error diff --git a/smartsim/_core/launcher/sge/sgeLauncher.py b/smartsim/_core/launcher/sge/sgeLauncher.py new file mode 100644 index 000000000..af600cf1d --- /dev/null +++ b/smartsim/_core/launcher/sge/sgeLauncher.py @@ -0,0 +1,184 @@ +# BSD 2-Clause License +# +# Copyright (c) 2021-2024, Hewlett Packard Enterprise +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import time +import typing as t + +from ....error import LauncherError +from ....log import get_logger +from ....settings import ( + MpiexecSettings, + MpirunSettings, + OrterunSettings, + RunSettings, + SettingsBase, + SgeQsubBatchSettings, +) +from ....status import SmartSimStatus +from ...config import CONFIG +from ..launcher import WLMLauncher +from ..step import ( + LocalStep, + MpiexecStep, + MpirunStep, + OrterunStep, + SgeQsubBatchStep, + Step, +) +from ..stepInfo import SGEStepInfo, StepInfo +from .sgeCommands import qacct, qdel, qstat +from .sgeParser import parse_qacct_job_output, parse_qstat_jobid_xml + +logger = get_logger(__name__) + + +class SGELauncher(WLMLauncher): + """This class encapsulates the functionality needed + to launch jobs on systems that use SGE as a workload manager. + + All WLM launchers are capable of launching managed and unmanaged + jobs. Managed jobs are queried through interaction with with WLM, + in this case SGE. Unmanaged jobs are held in the TaskManager + and are managed through references to their launching process ID + i.e. a psutil.Popen object + """ + + # init in WLMLauncher, launcher.py + + @property + def supported_rs(self) -> t.Dict[t.Type[SettingsBase], t.Type[Step]]: + # RunSettings types supported by this launcher + return { + SgeQsubBatchSettings: SgeQsubBatchStep, + MpiexecSettings: MpiexecStep, + MpirunSettings: MpirunStep, + OrterunSettings: OrterunStep, + RunSettings: LocalStep, + } + + def run(self, step: Step) -> t.Optional[str]: + """Run a job step through SGE + + :param step: a job step instance + :raises LauncherError: if launch fails + :return: job step id if job is managed + """ + if not self.task_manager.actively_monitoring: + self.task_manager.start() + + cmd_list = step.get_launch_cmd() + step_id: t.Optional[str] = None + task_id: t.Optional[str] = None + if isinstance(step, SgeQsubBatchStep): + # wait for batch step to submit successfully + return_code, out, err = self.task_manager.start_and_wait(cmd_list, step.cwd) + if return_code != 0: + raise LauncherError(f"Qsub batch submission failed\n {out}\n {err}") + if out: + step_id = out.split(" ")[2] + logger.debug(f"Gleaned batch job id: {step_id} for {step.name}") + else: + # aprun/local doesn't direct output for us. + out, err = step.get_output_files() + + # pylint: disable-next=consider-using-with + output = open(out, "w+", encoding="utf-8") + # pylint: disable-next=consider-using-with + error = open(err, "w+", encoding="utf-8") + task_id = self.task_manager.start_task( + cmd_list, step.cwd, step.env, out=output.fileno(), err=error.fileno() + ) + + self.step_mapping.add(step.name, step_id, task_id, step.managed) + + return step_id + + def stop(self, step_name: str) -> StepInfo: + """Stop/cancel a job step + + :param step_name: name of the job to stop + :return: update for job due to cancel + """ + stepmap = self.step_mapping[step_name] + if stepmap.managed: + qdel_rc, _, err = qdel([str(stepmap.step_id)]) + if qdel_rc != 0: + logger.warning(f"Unable to cancel job step {step_name}\n {err}") + if stepmap.task_id: + self.task_manager.remove_task(str(stepmap.task_id)) + else: + self.task_manager.remove_task(str(stepmap.task_id)) + + _, step_info = self.get_step_update([step_name])[0] + if not step_info: + raise LauncherError(f"Could not get step_info for job step {step_name}") + + step_info.status = ( + SmartSimStatus.STATUS_CANCELLED + ) # set status to cancelled instead of failed + return step_info + + def _get_managed_step_update(self, step_ids: t.List[str]) -> t.List[StepInfo]: + """Get step updates for WLM managed jobs + + :param step_ids: list of job step ids + :return: list of updates for managed jobs + """ + updates: t.List[StepInfo] = [] + + qstat_out, _ = qstat(["-xml"]) + stats = [parse_qstat_jobid_xml(qstat_out, str(step_id)) for step_id in step_ids] + + for stat, step_id in zip(stats, step_ids): + if stat is None: + info = SGEStepInfo("NOTFOUND") + # Attempt to retrieve the historical record + return_code, qacct_output, _ = qacct([f"-j {step_id}"]) + num_trials = 0 + while return_code != 0 and num_trials < CONFIG.wlm_trials: + num_trials += 1 + time.sleep(CONFIG.jm_interval) + return_code, qacct_output, _ = qacct([f"-j {step_id}"]) + + if qacct_output: + failed = bool(int(parse_qacct_job_output(qacct_output, "failed"))) + if failed: + info.status = SmartSimStatus.STATUS_FAILED + info.returncode = 0 + else: + info.status = SmartSimStatus.STATUS_COMPLETED + info.returncode = 0 + else: # Assume if qacct did not find it, that the job completed + info.status = SmartSimStatus.STATUS_COMPLETED + info.returncode = 0 + else: + info = SGEStepInfo(stat) + + updates.append(info) + return updates + + def __str__(self) -> str: + return "SGE" diff --git a/smartsim/_core/launcher/sge/sgeParser.py b/smartsim/_core/launcher/sge/sgeParser.py new file mode 100644 index 000000000..0ee5d5c67 --- /dev/null +++ b/smartsim/_core/launcher/sge/sgeParser.py @@ -0,0 +1,92 @@ +# BSD 2-Clause License +# +# Copyright (c) 2021-2024, Hewlett Packard Enterprise +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import typing as t +import xml.etree.ElementTree as ET + + +def parse_qsub(output: str) -> str: + """Parse qsub output and return job id. For SGE, the + output is the job id itself. + + :param output: stdout of qsub command + :returns: job id + """ + return output + + +def parse_qsub_error(output: str) -> str: + """Parse and return error output of a failed qsub command. + + :param output: stderr of qsub command + :returns: error message + """ + # look for error first + for line in output.split("\n"): + if line.startswith("qsub:"): + error = line.split(":")[1] + return error.strip() + # if no error line, take first line + for line in output.split("\n"): + return line.strip() + # if neither, present a base error message + base_err = "PBS run error" + return base_err + + +def parse_qstat_jobid_xml(output: str, job_id: str) -> t.Optional[str]: + """Parse and return output of the qstat command run with XML options + to obtain job status. + + :param output: output of the qstat command in XML format + :param job_id: allocation id or job step id + :return: status + """ + + root = ET.fromstring(output) + for job_list in root.findall(".//job_list"): + job_state = job_list.find("state") + # not None construct is needed here, since element with no + # children returns 0, interpreted as False + if (job_number := job_list.find("JB_job_number")) is not None: + if job_number.text == job_id and (job_state is not None): + return job_state.text + + return None + + +def parse_qacct_job_output(output: str, field_name: str) -> t.Union[str, int]: + """Parse the output from qacct for a single job + + :param output: The raw text output from qacct + :param field_name: The name of the field to extract + """ + + for line in output.splitlines(): + if field_name in line: + return line.split()[1] + + return 1 diff --git a/smartsim/_core/launcher/slurm/slurmCommands.py b/smartsim/_core/launcher/slurm/slurmCommands.py index 839826297..e72a87af4 100644 --- a/smartsim/_core/launcher/slurm/slurmCommands.py +++ b/smartsim/_core/launcher/slurm/slurmCommands.py @@ -29,7 +29,7 @@ from ....error import LauncherError from ....log import get_logger from ...utils.helpers import expand_exe_path -from ..util.shell import execute_cmd +from ...utils.shell import execute_cmd logger = get_logger(__name__) diff --git a/smartsim/_core/launcher/step/__init__.py b/smartsim/_core/launcher/step/__init__.py index c492f3e97..8331a18bf 100644 --- a/smartsim/_core/launcher/step/__init__.py +++ b/smartsim/_core/launcher/step/__init__.py @@ -30,5 +30,6 @@ from .lsfStep import BsubBatchStep, JsrunStep from .mpiStep import MpiexecStep, MpirunStep, OrterunStep from .pbsStep import QsubBatchStep +from .sgeStep import SgeQsubBatchStep from .slurmStep import SbatchStep, SrunStep from .step import Step diff --git a/smartsim/_core/launcher/step/mpiStep.py b/smartsim/_core/launcher/step/mpiStep.py index 767486462..9ae3af2fc 100644 --- a/smartsim/_core/launcher/step/mpiStep.py +++ b/smartsim/_core/launcher/step/mpiStep.py @@ -54,7 +54,7 @@ def __init__(self, name: str, cwd: str, run_settings: RunSettings) -> None: self._set_alloc() self.run_settings = run_settings - _supported_launchers = ["PBS", "SLURM", "LSB"] + _supported_launchers = ["PBS", "SLURM", "LSB", "SGE"] @proxyable_launch_cmd def get_launch_cmd(self) -> t.List[str]: @@ -102,7 +102,10 @@ def _set_alloc(self) -> None: environment_keys = os.environ.keys() for launcher in self._supported_launchers: - jobid_field = f"{launcher.upper()}_JOBID" + if launcher == "SGE": + jobid_field = "JOB_ID" + else: + jobid_field = f"{launcher.upper()}_JOBID" if jobid_field in environment_keys: self.alloc = os.environ[jobid_field] logger.debug(f"Running on allocation {self.alloc} from {jobid_field}") diff --git a/smartsim/_core/launcher/step/sgeStep.py b/smartsim/_core/launcher/step/sgeStep.py new file mode 100644 index 000000000..2406b19da --- /dev/null +++ b/smartsim/_core/launcher/step/sgeStep.py @@ -0,0 +1,95 @@ +# BSD 2-Clause License +# +# Copyright (c) 2021-2024, Hewlett Packard Enterprise +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import typing as t + +from ....log import get_logger +from ....settings import SgeQsubBatchSettings +from .step import Step + +logger = get_logger(__name__) + + +class SgeQsubBatchStep(Step): + def __init__( + self, name: str, cwd: str, batch_settings: SgeQsubBatchSettings + ) -> None: + """Initialize a Sun Grid Engine qsub step + + :param name: name of the entity to launch + :param cwd: path to launch dir + :param batch_settings: batch settings for entity + """ + super().__init__(name, cwd, batch_settings) + self.step_cmds: t.List[t.List[str]] = [] + self.managed = True + self.batch_settings = batch_settings + + def get_launch_cmd(self) -> t.List[str]: + """Get the launch command for the batch + + :return: launch command for the batch + """ + script = self._write_script() + return [self.batch_settings.batch_cmd, script] + + def add_to_batch(self, step: Step) -> None: + """Add a job step to this batch + + :param step: a job step instance e.g. SrunStep + """ + launch_cmd = step.get_launch_cmd() + self.step_cmds.append(launch_cmd) + logger.debug(f"Added step command to batch for {step.name}") + + def _write_script(self) -> str: + """Write the batch script + + :return: batch script path after writing + """ + batch_script = self.get_step_file(ending=".sh") + output, error = self.get_output_files() + with open(batch_script, "w", encoding="utf-8") as script_file: + script_file.write(f"{self.batch_settings.shebang}\n\n") + script_file.write(f"#$ -o {output}\n") + script_file.write(f"#$ -e {error}\n") + script_file.write(f"#$ -N {self.name}\n") + script_file.write("#$ -V\n") + + # add additional sbatch options + for opt in self.batch_settings.format_batch_args(): + script_file.write(f"#$ {opt}\n") + + for cmd in self.batch_settings.preamble: + script_file.write(f"{cmd}\n") + + for i, step_cmd in enumerate(self.step_cmds): + script_file.write("\n") + script_file.write(f"{' '.join((step_cmd))} &\n") + if i == len(self.step_cmds) - 1: + script_file.write("\n") + script_file.write("wait\n") + return batch_script diff --git a/smartsim/_core/launcher/stepInfo.py b/smartsim/_core/launcher/stepInfo.py index 875eb0322..b68527cb3 100644 --- a/smartsim/_core/launcher/stepInfo.py +++ b/smartsim/_core/launcher/stepInfo.py @@ -151,7 +151,7 @@ def __init__( class PBSStepInfo(StepInfo): # cov-pbs @property def mapping(self) -> t.Dict[str, SmartSimStatus]: - # pylint: disable=line-too-long + # pylint: disable-next=line-too-long # see http://nusc.nsu.ru/wiki/lib/exe/fetch.php/doc/pbs/PBSReferenceGuide19.2.1.pdf#M11.9.90788.PBSHeading1.81.Job.States return { "R": SmartSimStatus.STATUS_RUNNING, @@ -201,7 +201,7 @@ def __init__( class LSFBatchStepInfo(StepInfo): # cov-lsf @property def mapping(self) -> t.Dict[str, SmartSimStatus]: - # pylint: disable=line-too-long + # pylint: disable-next=line-too-long # see https://www.ibm.com/docs/en/spectrum-lsf/10.1.0?topic=execution-about-job-states return { "RUN": SmartSimStatus.STATUS_RUNNING, @@ -239,7 +239,7 @@ def __init__( class LSFJsrunStepInfo(StepInfo): # cov-lsf @property def mapping(self) -> t.Dict[str, SmartSimStatus]: - # pylint: disable=line-too-long + # pylint: disable-next=line-too-long # see https://www.ibm.com/docs/en/spectrum-lsf/10.1.0?topic=execution-about-job-states return { "Killed": SmartSimStatus.STATUS_COMPLETED, @@ -270,3 +270,77 @@ def __init__( super().__init__( smartsim_status, status, returncode, output=output, error=error ) + + +class SGEStepInfo(StepInfo): # cov-pbs + @property + def mapping(self) -> t.Dict[str, SmartSimStatus]: + # pylint: disable-next=line-too-long + # see https://manpages.ubuntu.com/manpages/jammy/man5/sge_status.5.html + return { + # Running states + "r": SmartSimStatus.STATUS_RUNNING, + "hr": SmartSimStatus.STATUS_RUNNING, + "t": SmartSimStatus.STATUS_RUNNING, + "Rr": SmartSimStatus.STATUS_RUNNING, + "Rt": SmartSimStatus.STATUS_RUNNING, + # Queued states + "qw": SmartSimStatus.STATUS_QUEUED, + "Rq": SmartSimStatus.STATUS_QUEUED, + "hqw": SmartSimStatus.STATUS_QUEUED, + "hRwq": SmartSimStatus.STATUS_QUEUED, + # Paused states + "s": SmartSimStatus.STATUS_PAUSED, + "ts": SmartSimStatus.STATUS_PAUSED, + "S": SmartSimStatus.STATUS_PAUSED, + "tS": SmartSimStatus.STATUS_PAUSED, + "T": SmartSimStatus.STATUS_PAUSED, + "tT": SmartSimStatus.STATUS_PAUSED, + "Rs": SmartSimStatus.STATUS_PAUSED, + "Rts": SmartSimStatus.STATUS_PAUSED, + "RS": SmartSimStatus.STATUS_PAUSED, + "RtS": SmartSimStatus.STATUS_PAUSED, + "RT": SmartSimStatus.STATUS_PAUSED, + "RtT": SmartSimStatus.STATUS_PAUSED, + # Failed states + "Eqw": SmartSimStatus.STATUS_FAILED, + "Ehqw": SmartSimStatus.STATUS_FAILED, + "EhRqw": SmartSimStatus.STATUS_FAILED, + # Finished states + "z": SmartSimStatus.STATUS_COMPLETED, + # Cancelled + "dr": SmartSimStatus.STATUS_CANCELLED, + "dt": SmartSimStatus.STATUS_CANCELLED, + "dRr": SmartSimStatus.STATUS_CANCELLED, + "dRt": SmartSimStatus.STATUS_CANCELLED, + "ds": SmartSimStatus.STATUS_CANCELLED, + "dS": SmartSimStatus.STATUS_CANCELLED, + "dT": SmartSimStatus.STATUS_CANCELLED, + "dRs": SmartSimStatus.STATUS_CANCELLED, + "dRS": SmartSimStatus.STATUS_CANCELLED, + "dRT": SmartSimStatus.STATUS_CANCELLED, + } + + def __init__( + self, + status: str = "", + returncode: t.Optional[int] = None, + output: t.Optional[str] = None, + error: t.Optional[str] = None, + ) -> None: + if status == "NOTFOUND": + if returncode is not None: + smartsim_status = ( + SmartSimStatus.STATUS_COMPLETED + if returncode == 0 + else SmartSimStatus.STATUS_FAILED + ) + else: + # if PBS job history is not available, and job is not in queue + smartsim_status = SmartSimStatus.STATUS_COMPLETED + returncode = 0 + else: + smartsim_status = self._get_smartsim_status(status) + super().__init__( + smartsim_status, status, returncode, output=output, error=error + ) diff --git a/smartsim/_core/launcher/taskManager.py b/smartsim/_core/launcher/taskManager.py index 60f097da6..1bc26d043 100644 --- a/smartsim/_core/launcher/taskManager.py +++ b/smartsim/_core/launcher/taskManager.py @@ -36,7 +36,7 @@ from ...error import LauncherError from ...log import ContextThread, get_logger from ..utils.helpers import check_dev_log_level -from .util.shell import execute_async_cmd, execute_cmd +from ..utils.shell import execute_async_cmd, execute_cmd logger = get_logger(__name__) VERBOSE_TM = check_dev_log_level() # pylint: disable=invalid-name diff --git a/smartsim/_core/utils/redis.py b/smartsim/_core/utils/redis.py index 7fa59ad83..76ff45cd5 100644 --- a/smartsim/_core/utils/redis.py +++ b/smartsim/_core/utils/redis.py @@ -39,8 +39,8 @@ from ...error import SSInternalError from ...log import get_logger from ..config import CONFIG -from ..launcher.util.shell import execute_cmd from .network import get_ip_from_host +from .shell import execute_cmd logging.getLogger("rediscluster").setLevel(logging.WARNING) logger = get_logger(__name__) diff --git a/smartsim/_core/launcher/util/shell.py b/smartsim/_core/utils/shell.py similarity index 97% rename from smartsim/_core/launcher/util/shell.py rename to smartsim/_core/utils/shell.py index a2b5bc76b..4cfe2998c 100644 --- a/smartsim/_core/launcher/util/shell.py +++ b/smartsim/_core/utils/shell.py @@ -30,9 +30,9 @@ import psutil -from ....error import ShellError -from ....log import get_logger -from ...utils.helpers import check_dev_log_level +from ...error import ShellError +from ...log import get_logger +from .helpers import check_dev_log_level logger = get_logger(__name__) VERBOSE_SHELL = check_dev_log_level() diff --git a/smartsim/database/orchestrator.py b/smartsim/database/orchestrator.py index f6ce0310f..6323e440b 100644 --- a/smartsim/database/orchestrator.py +++ b/smartsim/database/orchestrator.py @@ -28,6 +28,7 @@ import itertools import os.path as osp +import shutil import sys import typing as t from os import environ, getcwd, getenv @@ -41,6 +42,7 @@ from .._core.utils import db_is_active from .._core.utils.helpers import is_valid_cmd, unpack_db_identifier from .._core.utils.network import get_ip_from_host +from .._core.utils.shell import execute_cmd from ..entity import DBNode, EntityList, TelemetryConfiguration from ..error import ( SmartSimError, @@ -75,6 +77,7 @@ "pals": ["mpiexec"], "lsf": ["jsrun"], "local": [""], + "sge": ["mpirun", "mpiexec", "orterun"], } @@ -280,14 +283,35 @@ def __init__( ) if hosts: self.set_hosts(hosts) - elif not hosts and self.run_command == "mpirun": - raise SmartSimError( - "hosts argument is required when launching Orchestrator with mpirun" - ) + elif not hosts: + mpilike = run_command in ["mpirun", "mpiexec", "orterun"] + if mpilike and not self._mpi_has_sge_support(): + raise SmartSimError( + ( + "hosts argument required when launching ", + "Orchestrator with mpirun", + ) + ) self._reserved_run_args: t.Dict[t.Type[RunSettings], t.List[str]] = {} self._reserved_batch_args: t.Dict[t.Type[BatchSettings], t.List[str]] = {} self._fill_reserved() + def _mpi_has_sge_support(self) -> bool: + """Check if MPI command supports SGE + + If the run command is mpirun, mpiexec, or orterun, there is a possibility + that the user is using OpenMPI with SGE grid support. In this case, hosts + do not need to be set. + + :returns: bool + """ + + if self.run_command in ["mpirun", "orterun", "mpiexec"]: + if shutil.which("ompi_info"): + _, output, _ = execute_cmd(["ompi_info"]) + return "gridengine" in output + return False + @property def db_identifier(self) -> str: """Return the DB identifier, which is common to a DB and all of its nodes diff --git a/smartsim/error/__init__.py b/smartsim/error/__init__.py index 3a40548e7..c7122fe42 100644 --- a/smartsim/error/__init__.py +++ b/smartsim/error/__init__.py @@ -28,6 +28,7 @@ AllocationError, EntityExistsError, LauncherError, + LauncherUnsupportedFeature, ParameterWriterError, ShellError, SmartSimError, diff --git a/smartsim/error/errors.py b/smartsim/error/errors.py index 333258a34..0cb38d7e6 100644 --- a/smartsim/error/errors.py +++ b/smartsim/error/errors.py @@ -108,6 +108,10 @@ class LauncherError(SSInternalError): """Raised when there is an error in the launcher""" +class LauncherUnsupportedFeature(LauncherError): + """Raised when the launcher does not support a given method""" + + class AllocationError(LauncherError): """Raised when there is a problem with the user WLM allocation""" diff --git a/smartsim/experiment.py b/smartsim/experiment.py index 6b9d6a1fb..607a90ae1 100644 --- a/smartsim/experiment.py +++ b/smartsim/experiment.py @@ -144,7 +144,7 @@ def __init__( :param name: name for the ``Experiment`` :param exp_path: path to location of ``Experiment`` directory :param launcher: type of launcher being used, options are "slurm", "pbs", - "lsf", or "local". If set to "auto", + "lsf", "sge", or "local". If set to "auto", an attempt will be made to find an available launcher on the system. """ diff --git a/smartsim/settings/__init__.py b/smartsim/settings/__init__.py index 6e8f0bc96..8052121e2 100644 --- a/smartsim/settings/__init__.py +++ b/smartsim/settings/__init__.py @@ -32,6 +32,7 @@ from .mpiSettings import MpiexecSettings, MpirunSettings, OrterunSettings from .palsSettings import PalsMpiexecSettings from .pbsSettings import QsubBatchSettings +from .sgeSettings import SgeQsubBatchSettings from .slurmSettings import SbatchSettings, SrunSettings __all__ = [ @@ -45,6 +46,7 @@ "RunSettings", "SettingsBase", "SbatchSettings", + "SgeQsubBatchSettings", "SrunSettings", "PalsMpiexecSettings", "DragonRunSettings", diff --git a/smartsim/settings/base.py b/smartsim/settings/base.py index 6373b52fd..da3edb491 100644 --- a/smartsim/settings/base.py +++ b/smartsim/settings/base.py @@ -594,9 +594,13 @@ def __init__( self._batch_cmd = batch_cmd self.batch_args = batch_args or {} self._preamble: t.List[str] = [] - self.set_nodes(kwargs.get("nodes", None)) + nodes = kwargs.get("nodes", None) + if nodes: + self.set_nodes(nodes) + queue = kwargs.get("queue", None) + if queue: + self.set_queue(queue) self.set_walltime(kwargs.get("time", None)) - self.set_queue(kwargs.get("queue", None)) self.set_account(kwargs.get("account", None)) @property diff --git a/smartsim/settings/settings.py b/smartsim/settings/settings.py index 5f7fc3fe2..5afd0e192 100644 --- a/smartsim/settings/settings.py +++ b/smartsim/settings/settings.py @@ -41,6 +41,7 @@ QsubBatchSettings, RunSettings, SbatchSettings, + SgeQsubBatchSettings, SrunSettings, base, ) @@ -78,6 +79,7 @@ def create_batch_settings( "slurm": SbatchSettings, "lsf": BsubBatchSettings, "pals": QsubBatchSettings, + "sge": SgeQsubBatchSettings, } if launcher in ["auto", "dragon"]: @@ -153,6 +155,7 @@ def create_run_settings( "pbs": ["aprun", "mpirun", "mpiexec"], "pals": ["mpiexec"], "lsf": ["jsrun", "mpirun", "mpiexec"], + "sge": ["mpirun", "mpiexec"], "local": [""], } diff --git a/smartsim/settings/sgeSettings.py b/smartsim/settings/sgeSettings.py new file mode 100644 index 000000000..a5cd3f2b0 --- /dev/null +++ b/smartsim/settings/sgeSettings.py @@ -0,0 +1,293 @@ +# BSD 2-Clause License +# +# Copyright (c) 2021-2024, Hewlett Packard Enterprise +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import typing as t + +from ..error import LauncherUnsupportedFeature, SSConfigError +from ..log import get_logger +from .base import BatchSettings + +logger = get_logger(__name__) + + +class SgeQsubBatchSettings(BatchSettings): + def __init__( + self, + time: t.Optional[str] = None, + ncpus: t.Optional[int] = None, + pe_type: t.Optional[str] = None, + account: t.Optional[str] = None, + shebang: str = "#!/bin/bash -l", + resources: t.Optional[t.Dict[str, t.Union[str, int]]] = None, + batch_args: t.Optional[t.Dict[str, t.Optional[str]]] = None, + **kwargs: t.Any, + ): + """Specify SGE batch parameters for a job + + :param time: walltime for batch job + :param ncpus: number of cpus per node + :param pe_type: type of parallel environment + :param queue: queue to run batch in + :param account: account for batch launch + :param resources: overrides for resource arguments + :param batch_args: overrides for SGE batch arguments + """ + + if "nodes" in kwargs: + kwargs["nodes"] = 0 + + self.resources = resources or {} + if ncpus: + self.set_ncpus(ncpus) + if pe_type: + self.set_pe_type(pe_type) + self.set_shebang(shebang) + + # time, queue, nodes, and account set in parent class init + super().__init__( + "qsub", + batch_args=batch_args, + account=account, + time=time, + **kwargs, + ) + + self._context_variables: t.List[str] = [] + self._env_vars: t.Dict[str, str] = {} + + @property + def resources(self) -> t.Dict[str, t.Union[str, int]]: + return self._resources.copy() + + @resources.setter + def resources(self, resources: t.Dict[str, t.Union[str, int]]) -> None: + self._sanity_check_resources(resources) + self._resources = resources.copy() + + def set_hostlist(self, host_list: t.Union[str, t.List[str]]) -> None: + raise LauncherUnsupportedFeature( + "SGE does not support requesting specific hosts in batch jobs" + ) + + def set_queue(self, queue: str) -> None: + raise LauncherUnsupportedFeature("SGE does not support specifying queues") + + def set_shebang(self, shebang: str) -> None: + """Set the shebang (shell) for the batch job + + :param shebang: The shebang used to interpret the rest of script + (e.g. #!/bin/bash) + """ + self.shebang = shebang + + def set_walltime(self, walltime: str) -> None: + """Set the walltime of the job + + format = "HH:MM:SS" + + If a walltime argument is provided in + ``SGEBatchSettings.resources``, then + this value will be overridden + + :param walltime: wall time + """ + if walltime: + self.set_resource("h_rt", walltime) + + def set_nodes(self, num_nodes: t.Optional[int]) -> None: + """Set the number of nodes, invalid for SGE + + :param nodes: Number of nodes, any integer other than 0 is invalid + """ + if num_nodes: + raise LauncherUnsupportedFeature( + "SGE does not support setting the number of nodes" + ) + + def set_ncpus(self, num_cpus: t.Union[int, str]) -> None: + """Set the number of cpus obtained in each node. + + :param num_cpus: number of cpus per node in select + """ + self.set_resource("ncpus", int(num_cpus)) + + def set_ngpus(self, num_gpus: t.Union[int, str]) -> None: + """Set the number of GPUs obtained in each node. + + :param num_gpus: number of GPUs per node in select + """ + self.set_resource("gpu", num_gpus) + + def set_account(self, account: str) -> None: + """Set the account for this batch job + + :param acct: account id + """ + if account: + self.batch_args["A"] = str(account) + + def set_project(self, project: str) -> None: + """Set the project for this batch job + + :param acct: project id + """ + if project: + self.batch_args["P"] = str(project) + + def update_context_variables( + self, + action: t.Literal["ac", "sc", "dc"], + var_name: str, + value: t.Optional[t.Union[int, str]] = None, + ) -> None: + """ + Add, set, or delete context variables + + Configure any context variables using SGE's -ac, -sc, and -dc + qsub switches. These modifications are appended each time this + method is called, so the order does matter + + :param action: Add, set, or delete a context variable (ac, dc, or sc) + :param var_name: The name of the variable to set + :param value: The value of the variable + """ + if action not in ["ac", "sc", "dc"]: + raise ValueError("The action argument must be ac, sc, or dc") + if action == "dc" and value: + raise SSConfigError("When using the 'dc' action, value should not be set") + + command = f"-{action} {var_name}" + if value: + command += f"={value}" + self._context_variables.append(command) + + def set_hyperthreading(self, enable: bool = True) -> None: + """Enable or disable hyperthreading + + :param enable: Enable (True) or disable (False) hypthreading + """ + self.set_resource("threads", int(enable)) + + def set_memory_per_pe(self, memory_spec: str) -> None: + """Set the amount of memory per processing element + + :param memory_spec: The amount of memory per PE (e.g. 2G) + """ + self.set_resource("mem", memory_spec) + + def set_pe_type(self, pe_type: str) -> None: + """Set the parallel environment + + :param pe_type: parallel environment identifier (e.g. mpi or smp) + """ + if pe_type: + self.set_resource("pe_type", pe_type) + + def set_threads_per_pe(self, threads_per_core: int) -> None: + """Sets the number of threads per processing element + + :param threads_per_core: Number of threads per core + """ + + self._env_vars["OMP_NUM_THREADS"] = str(threads_per_core) + + def set_resource(self, resource_name: str, value: t.Union[str, int]) -> None: + """Set a resource value for the SGE batch + + If a select statement is provided, the nodes and ncpus + arguments will be overridden. Likewise for Walltime + + :param resource_name: name of resource, e.g. walltime + :param value: value + """ + updated_dict = self.resources + updated_dict.update({resource_name: value}) + self._sanity_check_resources(updated_dict) + self.resources = updated_dict + + def format_batch_args(self) -> t.List[str]: + """Get the formatted batch arguments for a preview + + :return: batch arguments for SGE + :raises ValueError: if options are supplied without values + """ + opts = self._create_resource_list() + for opt, value in self.batch_args.items(): + prefix = "-" + if not value: + raise ValueError("SGE options without values are not allowed") + opts += [" ".join((prefix + opt, str(value)))] + return opts + + def _sanity_check_resources( + self, resources: t.Optional[t.Dict[str, t.Union[str, int]]] = None + ) -> None: + """Check that resources are correctly formatted""" + # Note: isinstance check here to avoid collision with default + checked_resources = resources if isinstance(resources, dict) else self.resources + + for key, value in checked_resources.items(): + if not isinstance(key, str): + raise TypeError( + f"The type of {key=} is {type(key)}. Only int and str " + "are allowed." + ) + if not isinstance(value, (str, int)): + raise TypeError( + f"The value associated with {key=} is {type(value)}. Only int " + "and str are allowed." + ) + + def _create_resource_list(self) -> t.List[str]: + self._sanity_check_resources() + res = [] + + # Pop off some specific keywords that need to be treated separately + resources = self.resources # Note this is a copy so not modifying original + + # Construct the configuration of the parallel environment + ncpus = resources.pop("ncpus", None) + pe_type = resources.pop("pe_type", None) + if (pe_type is None and ncpus) or (pe_type and ncpus is None): + msg = f"{ncpus=} and {pe_type=} must both be set. " + msg += "Call set_ncpus and/or set_pe_type." + raise SSConfigError(msg) + + if pe_type and ncpus: + res += [f"-pe {pe_type} {ncpus}"] + + # Deal with context variables + for context_variable in self._context_variables: + res += [context_variable] + + # All other "standard" resource specs + for resource, value in resources.items(): + res += [f"-l {resource}={value}"] + + # Set any environment variables + for key, value in self._env_vars.items(): + res += [f"-v {key}={value}"] + return res diff --git a/smartsim/status.py b/smartsim/status.py index e42ef3191..e0d950619 100644 --- a/smartsim/status.py +++ b/smartsim/status.py @@ -35,6 +35,7 @@ class SmartSimStatus(Enum): STATUS_NEW = "New" STATUS_PAUSED = "Paused" STATUS_NEVER_STARTED = "NeverStarted" + STATUS_QUEUED = "Queued" TERMINAL_STATUSES = { diff --git a/tests/test_batch_settings.py b/tests/test_batch_settings.py index db269a9b5..c4f365c39 100644 --- a/tests/test_batch_settings.py +++ b/tests/test_batch_settings.py @@ -64,7 +64,7 @@ def test_create_sbatch(): assert isinstance(slurm_batch, SbatchSettings) assert slurm_batch.batch_args["partition"] == "default" args = slurm_batch.format_batch_args() - assert args == [ + expected_args = [ "--exclusive", "--oversubscribe", "--nodes=1", @@ -72,6 +72,8 @@ def test_create_sbatch(): "--partition=default", "--account=myproject", ] + assert all(arg in expected_args for arg in args) + assert len(expected_args) == len(args) def test_create_bsub(): diff --git a/tests/test_sge_batch_settings.py b/tests/test_sge_batch_settings.py new file mode 100644 index 000000000..fa40b4b00 --- /dev/null +++ b/tests/test_sge_batch_settings.py @@ -0,0 +1,158 @@ +# BSD 2-Clause License +# +# Copyright (c) 2021-2024, Hewlett Packard Enterprise +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import os.path as osp + +import pytest + +from smartsim import Experiment +from smartsim._core.launcher.sge.sgeParser import parse_qstat_jobid_xml +from smartsim.error import SSConfigError +from smartsim.settings import SgeQsubBatchSettings +from smartsim.settings.mpiSettings import _BaseMPISettings + +# The tests in this file belong to the group_b group +pytestmark = pytest.mark.group_b + +qstat_example = """ + + + + 1387693 + 3.50000 + test_1 + user1 + r + 2024-06-06T04:04:21 + example_node1 + 1600 + + + + + 1387695 + 3.48917 + test_2 + user1 + qw + 2024-05-20T16:47:46 + + 1600 + + + +""" + + +@pytest.mark.parametrize("pe_type", ["mpi", "smp"]) +def test_pe_config(pe_type): + settings = SgeQsubBatchSettings(ncpus=8, pe_type=pe_type) + assert settings._create_resource_list() == [f"-pe {pe_type} 8"] + + +def test_walltime(): + settings = SgeQsubBatchSettings(time="01:00:00") + assert settings._create_resource_list() == [ + f"-l h_rt=01:00:00", + ] + + +def test_ngpus(): + settings = SgeQsubBatchSettings() + settings.set_ngpus(1) + assert settings._create_resource_list() == [f"-l gpu=1"] + + +def test_account(): + settings = SgeQsubBatchSettings(account="foo") + assert settings.format_batch_args() == ["-A foo"] + + +def test_project(): + settings = SgeQsubBatchSettings() + settings.set_project("foo") + assert settings.format_batch_args() == ["-P foo"] + + +def test_update_context_variables(): + settings = SgeQsubBatchSettings() + settings.update_context_variables("ac", "foo") + settings.update_context_variables("sc", "foo", "bar") + settings.update_context_variables("dc", "foo") + assert settings._create_resource_list() == ["-ac foo", "-sc foo=bar", "-dc foo"] + + +def test_invalid_dc_and_value_update_context_variables(): + settings = SgeQsubBatchSettings() + with pytest.raises(SSConfigError): + settings.update_context_variables("dc", "foo", "bar") + + +@pytest.mark.parametrize("enable", [True, False]) +def test_set_hyperthreading(enable): + settings = SgeQsubBatchSettings() + settings.set_hyperthreading(enable) + assert settings._create_resource_list() == [f"-l threads={int(enable)}"] + + +def test_default_set_hyperthreading(): + settings = SgeQsubBatchSettings() + settings.set_hyperthreading() + assert settings._create_resource_list() == ["-l threads=1"] + + +def test_resources_is_a_copy(): + settings = SgeQsubBatchSettings() + resources = settings.resources + assert resources is not settings._resources + + +def test_resources_not_set_on_error(): + settings = SgeQsubBatchSettings() + unaltered_resources = settings.resources + with pytest.raises(TypeError): + settings.resources = {"meep": Exception} + + assert unaltered_resources == settings.resources + + +def test_qstat_jobid_xml(): + assert parse_qstat_jobid_xml(qstat_example, "1387693") == "r" + assert parse_qstat_jobid_xml(qstat_example, "1387695") == "qw" + assert parse_qstat_jobid_xml(qstat_example, "9999999") is None + + +def test_sge_launcher_defaults(monkeypatch, fileutils): + + stub_path = osp.join("mpi_impl_stubs", "openmpi4") + stub_path = fileutils.get_test_dir_path(stub_path) + monkeypatch.setenv("PATH", stub_path, prepend=":") + exp = Experiment("test_sge_run_settings", launcher="sge") + + bs = exp.create_batch_settings() + assert isinstance(bs, SgeQsubBatchSettings) + rs = exp.create_run_settings("echo") + assert isinstance(rs, _BaseMPISettings) diff --git a/tests/test_shell_util.py b/tests/test_shell_util.py index 24f6b023c..2c4e19001 100644 --- a/tests/test_shell_util.py +++ b/tests/test_shell_util.py @@ -28,7 +28,7 @@ import psutil import pytest -from smartsim._core.launcher.util.shell import * +from smartsim._core.utils.shell import * # The tests in this file belong to the group_b group pytestmark = pytest.mark.group_b