Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Treat DragonSteps as a Managed Steps #6

Merged
merged 3 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 1 addition & 16 deletions smartsim/_core/control/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,23 +118,8 @@ def start(
The controller will start the job-manager thread upon
execution of all jobs.
"""

if isinstance(self._launcher, DragonLauncher):
dragon_server_path = CONFIG.dragon_server_path
if dragon_server_path is not None:
dragon_server_paths = dragon_server_path.split(":")
if len(dragon_server_paths) > 1:
msg = (
"Multiple dragon servers not supported, "
"will connect to (or create) first server in list."
)
logger.warning(msg)
self._launcher.connect_to_dragon(dragon_server_paths[0])
else:
dragon_path = osp.join(exp_path, CONFIG.dragon_default_subdir)
self._launcher.connect_to_dragon(dragon_path)
if not self._launcher.is_connected:
raise LauncherError("Could not connect to Dragon server")
self._launcher.connect_to_dragon(exp_path)

self._jobs.kill_on_interrupt = kill_on_interrupt
# register custom signal handler for ^C (SIGINT)
Expand Down
14 changes: 10 additions & 4 deletions smartsim/_core/entrypoints/telemetrymonitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import json
import logging
import os
import os.path
import pathlib
import signal
import sys
Expand Down Expand Up @@ -349,9 +350,15 @@ def init_launcher(self, launcher: str) -> Launcher:

raise ValueError("Launcher type not supported: " + launcher)

def set_launcher(self, launcher_type: str) -> None:
def set_launcher(
self, launcher_type: str, exp_dir: t.Union[str, "os.PathLike[str]"]
) -> None:
"""Set the launcher for the experiment"""
self._launcher = self.init_launcher(launcher_type)

if isinstance(self._launcher, DragonLauncher):
self._launcher.connect_to_dragon(exp_dir)

self.job_manager.set_launcher(self._launcher)
self.job_manager.start()

Expand All @@ -372,16 +379,15 @@ def process_manifest(self, manifest_path: str) -> None:
self._logger.error("Manifest content error", exc_info=True)
return

exp_dir = pathlib.Path(manifest_path).parent.parent.parent
if self._launcher is None:
self.set_launcher(manifest.launcher)
self.set_launcher(manifest.launcher, exp_dir)

if not self._launcher:
raise SmartSimError(f"Unable to set launcher from {manifest_path}")

runs = [run for run in manifest.runs if run.timestamp not in self._tracked_runs]

exp_dir = pathlib.Path(manifest_path).parent.parent.parent

for run in runs:
for entity in run.flatten(
filter_fn=lambda e: e.key not in self._tracked_jobs and e.is_managed
Expand Down
116 changes: 77 additions & 39 deletions smartsim/_core/launcher/dragon/dragonLauncher.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,28 +128,33 @@ def _set_timeout(self, timeout: int) -> None:
self._context.setsockopt(zmq.SNDTIMEO, value=timeout)
self._context.setsockopt(zmq.RCVTIMEO, value=timeout)

def connect_to_dragon(self, path: t.Union[str, "os.PathLike[str]"]) -> None:
self._connect_to_dragon(path)
if not self.is_connected:
raise LauncherError("Could not connect to Dragon server")

# pylint: disable-next=too-many-statements
def connect_to_dragon(self, path: str) -> None:
def _connect_to_dragon(self, path: t.Union[str, "os.PathLike[str]"]) -> None:
with DRG_LOCK:
# TODO use manager instead
if self.is_connected:
return

dragon_config_log = os.path.join(path, CONFIG.dragon_log_filename)
path = _resolve_dragon_path(path)
dragon_config_log = path / CONFIG.dragon_log_filename

if Path.is_file(Path(dragon_config_log)):
dragon_confs = (
DragonLauncher._parse_launched_dragon_server_info_from_files(
[dragon_config_log]
)
if dragon_config_log.is_file():
dragon_confs = self._parse_launched_dragon_server_info_from_files(
[dragon_config_log]
)
logger.debug(dragon_confs)
for dragon_conf in dragon_confs:
if not "address" in dragon_conf:
continue
msg = "Found dragon server config file. Checking if the server"
msg += f" is still up at address {dragon_conf['address']}."
logger.debug(msg)
logger.debug(
"Found dragon server config file. Checking if the server"
f" is still up at address {dragon_conf['address']}."
)
try:
self._set_timeout(self._reconnect_timeout)
self._handshake(dragon_conf["address"])
Expand All @@ -160,7 +165,7 @@ def connect_to_dragon(self, path: str) -> None:
if self.is_connected:
return

os.makedirs(path, exist_ok=True)
path.mkdir(parents=True, exist_ok=True)

cmd = [
"dragon",
Expand All @@ -179,8 +184,8 @@ def connect_to_dragon(self, path: str) -> None:
launcher_socket.bind(socket_addr)
cmd += ["+launching_address", socket_addr]

dragon_out_file = os.path.join(path, "dragon_head.out")
dragon_err_file = os.path.join(path, "dragon_head.err")
dragon_out_file = path / "dragon_head.out"
dragon_err_file = path / "dragon_head.err"

with open(dragon_out_file, "w", encoding="utf-8") as dragon_out, open(
dragon_err_file, "w", encoding="utf-8"
Expand Down Expand Up @@ -244,12 +249,7 @@ def connect_to_dragon(self, path: str) -> None:
@property
def supported_rs(self) -> t.Dict[t.Type[SettingsBase], t.Type[Step]]:
# RunSettings types supported by this launcher
return {DragonRunSettings: DragonStep, RunSettings: DragonStep}

@staticmethod
def _unpack_launch_cmd(cmd: t.List[str]) -> DragonRunRequest:
req = DragonRunRequest.parse_obj(json.loads(cmd[-1]))
return req
return {DragonRunSettings: DragonStep, RunSettings: LocalStep}

def run(self, step: Step) -> t.Optional[str]:
"""Run a job step through Slurm
Expand All @@ -270,23 +270,45 @@ def run(self, step: Step) -> t.Optional[str]:
step_id = None
task_id = None

cmd = step.get_launch_cmd()
out, err = step.get_output_files()

if isinstance(step, DragonStep):
req = DragonLauncher._unpack_launch_cmd(step.get_launch_cmd())
run_args = step.run_settings.run_args
env = step.run_settings.env_vars
nodes = int(run_args.get("nodes", None) or 1)
response = (
_helpers.start_with(
DragonRunRequest(
exe=cmd[0],
exe_args=cmd[1:],
path=step.cwd,
name=step.name,
nodes=nodes,
env=env,
current_env=os.environ,
output_file=out,
error_file=err,
)
)
.then(self._send_request)
.then(_assert_schema_type(DragonRunResponse))
.get_result()
)
step_id = task_id = str(response.step_id)
elif isinstance(step, LocalStep):
cmd = step.get_launch_cmd()
req = DragonRunRequest(
exe=cmd[0], exe_args=cmd[1:], path=step.cwd, name=step.entity_name
# pylint: disable-next=consider-using-with
out_strm = open(out, "w+", encoding="utf-8")
# pylint: disable-next=consider-using-with
err_strm = open(err, "w+", encoding="utf-8")
task_id = self.task_manager.start_task(
cmd, step.cwd, step.env, out=out_strm.fileno(), err=err_strm.fileno()
)
else: # pragma: no-cover
raise TypeError(
f"{type(self).__name__} is unable to launch a step of "
f"type {type(step)}"
)

response = (
_helpers.start_with(req)
.then(self._send_request)
.then(_assert_schema_type(DragonRunResponse))
.get_result()
)

step_id = str(response.step_id)
task_id = step_id

self.step_mapping.add(step.name, step_id, task_id, step.managed)

Expand Down Expand Up @@ -349,16 +371,17 @@ def _get_managed_step_update(self, step_ids: t.List[str]) -> t.List[StepInfo]:
msg += response.error_message
raise LauncherError(msg)

stat_tuple = response.statuses[NonEmptyStr(step_id)]
ret_codes = stat_tuple[1]
status, ret_codes = response.statuses[NonEmptyStr(step_id)]
if ret_codes:
grp_ret_code = min(ret_codes)
if any(ret_codes):
err_msg = f"One or more processes failed for job {step_id}"
err_msg += f"Return codes were: {ret_codes}"
_err_msg = (
f"One or more processes failed for job {step_id}"
f"Return codes were: {ret_codes}"
)
else:
grp_ret_code = None
info = StepInfo(stat_tuple[0], stat_tuple[0], grp_ret_code)
info = StepInfo(status, status, grp_ret_code)

updates.append(info)
return updates
Expand Down Expand Up @@ -394,7 +417,9 @@ def _parse_launched_dragon_server_info_from_iterable(

@classmethod
def _parse_launched_dragon_server_info_from_files(
cls, file_paths: t.List[str], num_dragon_envs: t.Optional[int] = None
cls,
file_paths: t.List[t.Union[str, "os.PathLike[str]"]],
num_dragon_envs: t.Optional[int] = None,
) -> t.List[t.Dict[str, str]]:
with fileinput.FileInput(file_paths) as ifstream:
dragon_envs = cls._parse_launched_dragon_server_info_from_iterable(
Expand Down Expand Up @@ -439,3 +464,16 @@ def _dragon_cleanup(server_socket: zmq.Socket[t.Any], server_process_pid: int) -
)
finally:
os.kill(server_process_pid, signal.SIGINT)


def _resolve_dragon_path(fallback: t.Union[str, "os.PathLike[str]"]) -> Path:
dragon_server_path = CONFIG.dragon_server_path or os.path.join(
fallback, ".smartsim", "dragon"
)
dragon_server_paths = dragon_server_path.split(":")
if len(dragon_server_paths) > 1:
logger.warning(
"Multiple dragon servers not supported, "
"will connect to (or create) first server in list."
)
return Path(dragon_server_paths[0])
45 changes: 10 additions & 35 deletions smartsim/_core/launcher/step/dragonStep.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import os
import shutil
import typing as t

from ....log import get_logger
from ....settings import DragonRunSettings, Singularity
from ...schemas import DragonRunRequest
from .step import Step, proxyable_launch_cmd
from .step import Step

logger = get_logger(__name__)

Expand All @@ -48,64 +46,41 @@ def __init__(self, name: str, cwd: str, run_settings: DragonRunSettings) -> None
:type run_settings: SrunSettings
"""
super().__init__(name, cwd, run_settings)
self.alloc: t.Optional[str] = None
self.managed = True
self.run_settings = run_settings

@proxyable_launch_cmd
@property
def run_settings(self) -> DragonRunSettings:
return t.cast(DragonRunSettings, self.step_settings)

def get_launch_cmd(self) -> t.List[str]:
"""Get stringified version of request
needed to launch this step

:return: launch command
:rtype: str
:rtype: list[str]
"""

output, error = self.get_output_files()

run_settings = self.run_settings

# pylint: disable=protected-access
run_args = run_settings._run_args

if "nodes" in run_args:
nodes = t.cast(int, run_args["nodes"])
else:
nodes = 1

exe_cmd = []

if self.run_settings.colocated_db_settings:
if run_settings.colocated_db_settings:
# Replace the command with the entrypoint wrapper script
bash = shutil.which("bash")
if not bash:
raise RuntimeError("Could not find bash in PATH")
launch_script_path = self.get_colocated_launch_script()
exe_cmd += [bash, launch_script_path]

if isinstance(self.run_settings.container, Singularity):
if isinstance(run_settings.container, Singularity):
# pylint: disable-next=protected-access
exe_cmd += self.run_settings.container._container_cmds(self.cwd)
exe_cmd += run_settings.container._container_cmds(self.cwd)

exe_cmd += run_settings.exe

exe_args = self._get_exe_args_list(run_settings)

exe_cmd_and_args = exe_cmd + exe_args

run_request = DragonRunRequest(
exe=exe_cmd_and_args[0],
exe_args=exe_cmd_and_args[1:],
path=self.cwd,
nodes=nodes,
output_file=output,
error_file=error,
env=run_settings.env_vars,
current_env=os.environ,
name=self.name,
)

return [run_request.json()]
return exe_cmd_and_args

@staticmethod
def _get_exe_args_list(run_setting: DragonRunSettings) -> t.List[str]:
Expand Down
Loading
Loading