Skip to content

Commit

Permalink
initial commit of applying mypy typehints to step classes
Browse files Browse the repository at this point in the history
  • Loading branch information
ankona committed Aug 2, 2023
1 parent 8ce3c1d commit 76eb01b
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 101 deletions.
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ module = [
"redis.cluster",
"keras",
"torch",
"smartsim._core.launcher.step.*", # hints incomplete
"smartsim.ml.torch.*", # must solve/ignore inheritance issues
]
ignore_missing_imports = true
Expand Down
25 changes: 15 additions & 10 deletions smartsim/_core/launcher/step/alpsStep.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from ....error import AllocationError
from ....log import get_logger
from .step import Step
from ....settings import AprunSettings
from ....settings import AprunSettings, Singularity

logger = get_logger(__name__)

Expand All @@ -49,21 +49,23 @@ def __init__(self, name: str, cwd: str, run_settings: AprunSettings) -> None:
:type run_settings: AprunSettings
"""
super().__init__(name, cwd, run_settings)
self.alloc = None
self.alloc: t.Optional[str] = None
if not self.run_settings.in_batch:
self._set_alloc()

@property
def run_settings(self) -> AprunSettings:
return self.step_settings
if isinstance(self.step_settings, AprunSettings):
return self.step_settings
raise TypeError("Run settings must be of type AprunSettings")

def get_launch_cmd(self) -> t.List[str]:
"""Get the command to launch this step
:return: launch command
:rtype: list[str]
"""
aprun = self.run_settings.run_command
aprun = self.run_settings.run_command or ""
aprun_cmd = [aprun, "--wdir", self.cwd]

# add env vars and run settings
Expand All @@ -77,12 +79,15 @@ def get_launch_cmd(self) -> t.List[str]:

# Replace the command with the entrypoint wrapper script
bash = shutil.which("bash")
if not bash:
raise RuntimeError("Could not find bash in PATH")
launch_script_path = self.get_colocated_launch_script()
aprun_cmd.extend([bash, launch_script_path])

if self.run_settings.container:
container = self.run_settings.container
if container and isinstance(container, Singularity):
# pylint: disable-next=protected-access
aprun_cmd += self.run_settings.container._container_cmds(self.cwd)
aprun_cmd += container._container_cmds(self.cwd)

aprun_cmd += self._build_exe()

Expand All @@ -99,7 +104,7 @@ def _set_alloc(self) -> None:
:raises AllocationError: allocation not listed or found
"""
if "PBS_JOBID" in os.environ:
self.alloc = os.environ["PBS_JOBID"]
self.alloc = os.environ.get("PBS_JOBID")
logger.debug(
f"Running on PBS allocation {self.alloc} gleaned from user environment"
)
Expand All @@ -125,19 +130,19 @@ def _build_exe(self) -> t.List[str]:

exe = self.run_settings.exe
args = self.run_settings.exe_args
return exe + args
return exe + [args] if isinstance(args, str) else args

def _make_mpmd(self) -> t.List[str]:
"""Build Aprun (MPMD) executable"""

exe = self.run_settings.exe
exe_args = self.run_settings.exe_args
cmd = exe + exe_args
cmd = exe + [exe_args] if isinstance(exe_args, str) else exe_args

for mpmd in self.run_settings.mpmd:
cmd += [" : "]
cmd += mpmd.format_run_args()
cmd += mpmd.exe
cmd += mpmd.exe_args
cmd += [mpmd.exe_args] if isinstance(mpmd.exe_args, str) else mpmd.exe_args
cmd = sh_split(" ".join(cmd))
return cmd
12 changes: 7 additions & 5 deletions smartsim/_core/launcher/step/cobaltStep.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ def __init__(

@property
def batch_settings(self) -> CobaltBatchSettings:
return self.step_settings
if isinstance(self.step_settings, CobaltBatchSettings):
return self.step_settings
raise TypeError("Batch settings must be of type CobaltBatchSettings")

def get_launch_cmd(self) -> t.List[str]:
"""Get the launch command for the batch
Expand Down Expand Up @@ -97,12 +99,12 @@ def _write_script(self) -> str:
script_file.write(f"#COBALT {opt}\n")

# pylint: disable-next=protected-access
for cmd in self.batch_settings._preamble:
script_file.write(f"{cmd}\n")
for preamble_cmd in self.batch_settings._preamble:
script_file.write(f"{preamble_cmd}\n")

for i, cmd in enumerate(self.step_cmds):
for i, step_cmd in enumerate(self.step_cmds):
script_file.write("\n")
script_file.write(f"{' '.join((cmd))} &\n")
script_file.write(f"{' '.join((step_cmd))} &\n")
if i == len(self.step_cmds) - 1:
script_file.write("\n")
script_file.write("wait\n")
Expand Down
12 changes: 8 additions & 4 deletions smartsim/_core/launcher/step/localStep.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

from .step import Step
from ....settings.base import RunSettings
from ....settings import Singularity


class LocalStep(Step):
Expand All @@ -39,7 +40,9 @@ def __init__(self, name: str, cwd: str, run_settings: RunSettings):

@property
def run_settings(self) -> RunSettings:
return self.step_settings
if isinstance(self.step_settings, RunSettings):
return self.step_settings
raise TypeError("Run settings must be of type RunSettings")

def get_launch_cmd(self) -> t.List[str]:
cmd = []
Expand All @@ -59,9 +62,10 @@ def get_launch_cmd(self) -> t.List[str]:
launch_script_path = self.get_colocated_launch_script()
cmd.extend([bash, launch_script_path])

if self.run_settings.container:
container = self.run_settings.container
if container and isinstance(container, Singularity):
# pylint: disable-next=protected-access
cmd += self.run_settings.container._container_cmds(self.cwd)
cmd += container._container_cmds(self.cwd)

# build executable
cmd.extend(self.run_settings.exe)
Expand All @@ -73,5 +77,5 @@ def _set_env(self) -> t.Dict[str, str]:
env = os.environ.copy()
if self.run_settings.env_vars:
for k, v in self.run_settings.env_vars.items():
env[k] = v
env[k] = v or ""
return env
112 changes: 68 additions & 44 deletions smartsim/_core/launcher/step/lsfStep.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from ....error import AllocationError
from ....log import get_logger
from .step import Step
from ....settings import BsubBatchSettings
from ....settings import BsubBatchSettings, JsrunSettings
from ....settings.base import RunSettings

logger = get_logger(__name__)
Expand All @@ -49,12 +49,14 @@ def __init__(self, name: str, cwd: str, batch_settings: BsubBatchSettings) -> No
:type batch_settings: BsubBatchSettings
"""
super().__init__(name, cwd, batch_settings)
self.step_cmds = []
self.step_cmds: t.List[t.List[str]] = []
self.managed = True

@property
def batch_settings(self) -> BsubBatchSettings:
return self.step_settings
if isinstance(self.step_settings, BsubBatchSettings):
return self.step_settings
raise TypeError("Batch settings must be of type BsubBatchSettings")

def get_launch_cmd(self) -> t.List[str]:
"""Get the launch command for the batch
Expand Down Expand Up @@ -123,14 +125,16 @@ def __init__(self, name: str, cwd: str, run_settings: RunSettings):
:type run_settings: RunSettings
"""
super().__init__(name, cwd, run_settings)
self.alloc = None
self.alloc: t.Optional[str] = None
self.managed = True
if not self.run_settings.in_batch:
self._set_alloc()

@property
def run_settings(self) -> RunSettings:
return self.step_settings
if isinstance(self.step_settings, RunSettings):
return self.step_settings
raise TypeError("Batch settings must be of type RunSettings")

def get_output_files(self) -> t.Tuple[str, str]:
"""Return two paths to error and output files based on cwd"""
Expand All @@ -144,7 +148,10 @@ def get_output_files(self) -> t.Tuple[str, str]:
# --stdio_stdout (similarly for error). This in turn, will be processed
# by jsrun, replacing each occurrence of "%t" with the task number and
# writing output to "entity_name_0.out", "entity_name_1.out"...
if self.run_settings.individual_suffix:
if (
isinstance(self.run_settings, JsrunSettings)
and self.run_settings.individual_suffix
):
partitioned_output = output.rpartition(".")
output_prefix = partitioned_output[0] + self.run_settings.individual_suffix
output_suffix = partitioned_output[-1]
Expand All @@ -167,7 +174,6 @@ def get_launch_cmd(self) -> t.List[str]:
output, error = self.get_output_files()

jsrun_cmd = [
jsrun,
"--chdir",
self.cwd,
"--stdio_stdout",
Expand All @@ -176,6 +182,9 @@ def get_launch_cmd(self) -> t.List[str]:
error,
]

if jsrun:
jsrun_cmd.insert(0, jsrun) # only use jsrun if it's set on run settings

if self.run_settings.env_vars:
env_var_str_list = self.run_settings.format_env_vars()
jsrun_cmd.extend(env_var_str_list)
Expand All @@ -189,6 +198,8 @@ def get_launch_cmd(self) -> t.List[str]:

# Replace the command with the entrypoint wrapper script
bash = shutil.which("bash")
if not bash:
raise RuntimeError("Could not find bash in PATH")
launch_script_path = self.get_colocated_launch_script()
jsrun_cmd.extend([bash, launch_script_path])

Expand Down Expand Up @@ -219,34 +230,40 @@ def _build_exe(self) -> t.List[str]:
"""
exe = self.run_settings.exe
args = self.run_settings.exe_args
if self.run_settings.mpmd:

if hasattr(self.run_settings, "mpmd") and self.run_settings.mpmd:
erf_file = self.get_step_file(ending=".mpmd")
self._make_mpmd()
mp_cmd = ["--erf_input", erf_file]
return mp_cmd

cmd = exe + args
cmd = exe + [args] if isinstance(args, str) else args
return cmd

def _make_mpmd(self) -> None:
"""Build LSF's Explicit Resource File"""
erf_file_path = self.get_step_file(ending=".mpmd")

if not isinstance(self.run_settings, JsrunSettings):
logger.warning("Attempt _make_mpmd on non-mpmd run settings")
return

# Find launch_distribution command
preamble_lines = self.run_settings.mpmd_preamble_lines.copy()
distr_line = None
for line in self.run_settings.mpmd_preamble_lines:
if line.lstrip(" ").startswith("launch_distribution"):
distr_line = line
preamble_lines.remove(line)
if not distr_line:

if hasattr(self.run_settings, "mpmd") and not distr_line:
for jrs in self.run_settings.mpmd:
if "launch_distribution" in jrs.run_args.keys():
distr_line = (
"launch_distribution : " + jrs.run_args["launch_distribution"]
f"launch_distribution : {jrs.run_args['launch_distribution']}"
)
elif "d" in jrs.run_args.keys():
distr_line = "launch_distribution : " + jrs.run_args["d"]
distr_line = f"launch_distribution : {jrs.run_args['d']}"
if distr_line:
break
if not distr_line:
Expand All @@ -259,41 +276,48 @@ def _make_mpmd(self) -> None:
erf_file.write("\n")

# First we list the apps
for app_id, jrs in enumerate(self.run_settings.mpmd):
job_rs = " ".join(jrs.exe + jrs.exe_args)
erf_file.write(f"app {app_id} : {job_rs}\n")
if hasattr(self.run_settings, "mpmd") and self.run_settings.mpmd:
for app_id, jrs in enumerate(self.run_settings.mpmd):
# pylint: ignore-next=protected-access
job_rs = " ".join(jrs.exe + jrs._exe_args)
erf_file.write(f"app {app_id} : {job_rs}\n")
erf_file.write("\n")

# Then we list the resources
for app_id, jrs in enumerate(self.run_settings.mpmd):
rs_line = ""
if "rank" in jrs.erf_sets.keys():
rs_line += "rank: " + jrs.erf_sets["rank"] + ": "
elif "rank_count" in jrs.erf_sets.keys():
rs_line += jrs.erf_sets["rank_count"] + ": "
else:
rs_line += "1 : "

rs_line += "{ "
if "host" in jrs.erf_sets.keys():
rs_line += "host: " + jrs.erf_sets["host"] + "; "
else:
rs_line += "host: *;"

if "cpu" in jrs.erf_sets.keys():
rs_line += "cpu: " + jrs.erf_sets["cpu"]
else:
rs_line += "cpu: * "

if "gpu" in jrs.erf_sets.keys():
rs_line += "; gpu: " + jrs.erf_sets["gpu"]

if "memory" in jrs.erf_sets.keys():
rs_line += "; memory: " + jrs.erf_sets["memory"]

rs_line += "}: app " + str(app_id) + "\n"

erf_file.write(rs_line)
if hasattr(self.run_settings, "mpmd") and self.run_settings.mpmd:
for app_id, jrs in enumerate(self.run_settings.mpmd):
if not isinstance(jrs, JsrunSettings):
logger.warning("Attempt _make_mpmd on non-mpmd run settings")
continue

rs_line = ""
if "rank" in jrs.erf_sets.keys():
rs_line += "rank: " + jrs.erf_sets["rank"] + ": "
elif "rank_count" in jrs.erf_sets.keys():
rs_line += jrs.erf_sets["rank_count"] + ": "
else:
rs_line += "1 : "

rs_line += "{ "
if "host" in jrs.erf_sets.keys():
rs_line += "host: " + jrs.erf_sets["host"] + "; "
else:
rs_line += "host: *;"

if "cpu" in jrs.erf_sets.keys():
rs_line += "cpu: " + jrs.erf_sets["cpu"]
else:
rs_line += "cpu: * "

if "gpu" in jrs.erf_sets.keys():
rs_line += "; gpu: " + jrs.erf_sets["gpu"]

if "memory" in jrs.erf_sets.keys():
rs_line += "; memory: " + jrs.erf_sets["memory"]

rs_line += "}: app " + str(app_id) + "\n"

erf_file.write(rs_line)

with open(erf_file_path, encoding="utf-8") as erf_file:
erf_file.flush()
Expand Down
Loading

0 comments on commit 76eb01b

Please sign in to comment.