Skip to content

Commit

Permalink
Implement support for SGE (#610)
Browse files Browse the repository at this point in the history
SGE shares some similarities to PBS/Torque-like launchers, but the
differences are significant enough to warrant their own separate
implementations. Notably, SGE has a qacct utility (similar to SLURM's
sacct) to query for the historical record of a job. Additionally, unique
amongst the launchers, SGE does not allow a way for a user to specify
the number of nodes needed and requires the user to select a
admin-configured parallel environment profile (e.g. mpi or smp).

The changes add a new SGE launcher and SGEQsubBatchSettings to 
enable support for SmartSim applications on SGE machines.

[ committed by @ashao ]
[ reviewed by @al-rigazzi ]
  • Loading branch information
ashao authored Jun 17, 2024
1 parent 34987e7 commit 0956399
Show file tree
Hide file tree
Showing 31 changed files with 1,078 additions and 25 deletions.
2 changes: 1 addition & 1 deletion conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def print_test_configuration() -> None:

def pytest_configure() -> None:
pytest.test_launcher = test_launcher
pytest.wlm_options = ["slurm", "pbs", "lsf", "pals", "dragon"]
pytest.wlm_options = ["slurm", "pbs", "lsf", "pals", "dragon", "sge"]
account = get_account()
pytest.test_account = account
pytest.test_device = test_device
Expand Down
8 changes: 7 additions & 1 deletion doc/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,19 @@ To be released at some future point in time

Description

- New launcher support for SGE (and similar derivatives)
- Fix test outputs being created in incorrect directory
- Improve support for building SmartSim without ML backends
- Update packaging dependency

Detailed Notes

- Ensure ouputs from tests are written to temporary `tests/test_output` directory
- SGE is now a supported launcher for SmartSim. Users can now define
BatchSettings which will be monitored by the TaskManager. Additionally,
if the MPI implementation was built with SGE support, Orchestrators can
use `mpirun` without needing to specify the hosts
([SmartSim-PR610](https://github.com/CrayLabs/SmartSim/pull/610))
- Ensure outputs from tests are written to temporary `tests/test_output` directory
- Fix an error that would prevent ``smart build`` from moving a successfully
compiled RedisAI shared object to the install location expected by SmartSim
if no ML backend installations were found. Previously, this would effectively
Expand Down
2 changes: 1 addition & 1 deletion smartsim/_core/_cli/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ def _test_tf_install(client: Client, tmp_dir: str, device: Device) -> None:
# do not need the sending connection in this proc anymore
send_conn.close()

proc.join(timeout=120)
proc.join(timeout=600)
if proc.is_alive():
proc.terminate()
raise Exception("Failed to build a simple keras model within 2 minutes")
Expand Down
2 changes: 2 additions & 0 deletions smartsim/_core/control/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
LocalLauncher,
LSFLauncher,
PBSLauncher,
SGELauncher,
SlurmLauncher,
)
from ..launcher.launcher import Launcher
Expand Down Expand Up @@ -343,6 +344,7 @@ def init_launcher(self, launcher: str) -> None:
"lsf": LSFLauncher,
"local": LocalLauncher,
"dragon": DragonLauncher,
"sge": SGELauncher,
}

if launcher is not None:
Expand Down
2 changes: 2 additions & 0 deletions smartsim/_core/launcher/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from .local.local import LocalLauncher
from .lsf.lsfLauncher import LSFLauncher
from .pbs.pbsLauncher import PBSLauncher
from .sge.sgeLauncher import SGELauncher
from .slurm.slurmLauncher import SlurmLauncher

__all__ = [
Expand All @@ -37,5 +38,6 @@
"LocalLauncher",
"LSFLauncher",
"PBSLauncher",
"SGELauncher",
"SlurmLauncher",
]
2 changes: 1 addition & 1 deletion smartsim/_core/launcher/lsf/lsfCommands.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

import typing as t

from ..util.shell import execute_cmd
from ...utils.shell import execute_cmd


def bjobs(args: t.List[str]) -> t.Tuple[str, str]:
Expand Down
2 changes: 1 addition & 1 deletion smartsim/_core/launcher/pbs/pbsCommands.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

import typing as t

from ..util.shell import execute_cmd
from ...utils.shell import execute_cmd


def qstat(args: t.List[str]) -> t.Tuple[str, str]:
Expand Down
25 changes: 25 additions & 0 deletions smartsim/_core/launcher/sge/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# BSD 2-Clause License
#
# Copyright (c) 2021-2024, Hewlett Packard Enterprise
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
77 changes: 77 additions & 0 deletions smartsim/_core/launcher/sge/sgeCommands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# BSD 2-Clause License
#
# Copyright (c) 2021-2024, Hewlett Packard Enterprise
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import typing as t

from ...utils.shell import execute_cmd


def qstat(args: t.List[str]) -> t.Tuple[str, str]:
"""Calls SGE qstat with args
:param args: List of command arguments
:returns: Output and error of qstat
"""
cmd = ["qstat"] + args
_, out, error = execute_cmd(cmd)
return out, error


def qsub(args: t.List[str]) -> t.Tuple[str, str]:
"""Calls SGE qsub with args
:param args: List of command arguments
:returns: Output and error of salloc
"""
cmd = ["qsub"] + args
_, out, error = execute_cmd(cmd)
return out, error


def qdel(args: t.List[str]) -> t.Tuple[int, str, str]:
"""Calls SGE qdel with args.
returncode is also supplied in this function.
:param args: list of command arguments
:return: output and error
"""
cmd = ["qdel"] + args
returncode, out, error = execute_cmd(cmd)
return returncode, out, error


def qacct(args: t.List[str]) -> t.Tuple[int, str, str]:
"""Calls SGE qacct with args.
returncode is also supplied in this function.
:param args: list of command arguments
:return: output and error
"""
cmd = ["qacct"] + args
returncode, out, error = execute_cmd(cmd)
return returncode, out, error
184 changes: 184 additions & 0 deletions smartsim/_core/launcher/sge/sgeLauncher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
# BSD 2-Clause License
#
# Copyright (c) 2021-2024, Hewlett Packard Enterprise
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import time
import typing as t

from ....error import LauncherError
from ....log import get_logger
from ....settings import (
MpiexecSettings,
MpirunSettings,
OrterunSettings,
RunSettings,
SettingsBase,
SgeQsubBatchSettings,
)
from ....status import SmartSimStatus
from ...config import CONFIG
from ..launcher import WLMLauncher
from ..step import (
LocalStep,
MpiexecStep,
MpirunStep,
OrterunStep,
SgeQsubBatchStep,
Step,
)
from ..stepInfo import SGEStepInfo, StepInfo
from .sgeCommands import qacct, qdel, qstat
from .sgeParser import parse_qacct_job_output, parse_qstat_jobid_xml

logger = get_logger(__name__)


class SGELauncher(WLMLauncher):
"""This class encapsulates the functionality needed
to launch jobs on systems that use SGE as a workload manager.
All WLM launchers are capable of launching managed and unmanaged
jobs. Managed jobs are queried through interaction with with WLM,
in this case SGE. Unmanaged jobs are held in the TaskManager
and are managed through references to their launching process ID
i.e. a psutil.Popen object
"""

# init in WLMLauncher, launcher.py

@property
def supported_rs(self) -> t.Dict[t.Type[SettingsBase], t.Type[Step]]:
# RunSettings types supported by this launcher
return {
SgeQsubBatchSettings: SgeQsubBatchStep,
MpiexecSettings: MpiexecStep,
MpirunSettings: MpirunStep,
OrterunSettings: OrterunStep,
RunSettings: LocalStep,
}

def run(self, step: Step) -> t.Optional[str]:
"""Run a job step through SGE
:param step: a job step instance
:raises LauncherError: if launch fails
:return: job step id if job is managed
"""
if not self.task_manager.actively_monitoring:
self.task_manager.start()

cmd_list = step.get_launch_cmd()
step_id: t.Optional[str] = None
task_id: t.Optional[str] = None
if isinstance(step, SgeQsubBatchStep):
# wait for batch step to submit successfully
return_code, out, err = self.task_manager.start_and_wait(cmd_list, step.cwd)
if return_code != 0:
raise LauncherError(f"Qsub batch submission failed\n {out}\n {err}")
if out:
step_id = out.split(" ")[2]
logger.debug(f"Gleaned batch job id: {step_id} for {step.name}")
else:
# aprun/local doesn't direct output for us.
out, err = step.get_output_files()

# pylint: disable-next=consider-using-with
output = open(out, "w+", encoding="utf-8")
# pylint: disable-next=consider-using-with
error = open(err, "w+", encoding="utf-8")
task_id = self.task_manager.start_task(
cmd_list, step.cwd, step.env, out=output.fileno(), err=error.fileno()
)

self.step_mapping.add(step.name, step_id, task_id, step.managed)

return step_id

def stop(self, step_name: str) -> StepInfo:
"""Stop/cancel a job step
:param step_name: name of the job to stop
:return: update for job due to cancel
"""
stepmap = self.step_mapping[step_name]
if stepmap.managed:
qdel_rc, _, err = qdel([str(stepmap.step_id)])
if qdel_rc != 0:
logger.warning(f"Unable to cancel job step {step_name}\n {err}")
if stepmap.task_id:
self.task_manager.remove_task(str(stepmap.task_id))
else:
self.task_manager.remove_task(str(stepmap.task_id))

_, step_info = self.get_step_update([step_name])[0]
if not step_info:
raise LauncherError(f"Could not get step_info for job step {step_name}")

step_info.status = (
SmartSimStatus.STATUS_CANCELLED
) # set status to cancelled instead of failed
return step_info

def _get_managed_step_update(self, step_ids: t.List[str]) -> t.List[StepInfo]:
"""Get step updates for WLM managed jobs
:param step_ids: list of job step ids
:return: list of updates for managed jobs
"""
updates: t.List[StepInfo] = []

qstat_out, _ = qstat(["-xml"])
stats = [parse_qstat_jobid_xml(qstat_out, str(step_id)) for step_id in step_ids]

for stat, step_id in zip(stats, step_ids):
if stat is None:
info = SGEStepInfo("NOTFOUND")
# Attempt to retrieve the historical record
return_code, qacct_output, _ = qacct([f"-j {step_id}"])
num_trials = 0
while return_code != 0 and num_trials < CONFIG.wlm_trials:
num_trials += 1
time.sleep(CONFIG.jm_interval)
return_code, qacct_output, _ = qacct([f"-j {step_id}"])

if qacct_output:
failed = bool(int(parse_qacct_job_output(qacct_output, "failed")))
if failed:
info.status = SmartSimStatus.STATUS_FAILED
info.returncode = 0
else:
info.status = SmartSimStatus.STATUS_COMPLETED
info.returncode = 0
else: # Assume if qacct did not find it, that the job completed
info.status = SmartSimStatus.STATUS_COMPLETED
info.returncode = 0
else:
info = SGEStepInfo(stat)

updates.append(info)
return updates

def __str__(self) -> str:
return "SGE"
Loading

0 comments on commit 0956399

Please sign in to comment.