From 3bfd34b09d28b469ff43a5c19b1594a094f88522 Mon Sep 17 00:00:00 2001 From: Matt Drozt Date: Thu, 15 Feb 2024 16:45:32 -0600 Subject: [PATCH 1/3] Dragon Launcher Treats Dragon Steps as Managed The `DragonLauncher` now maps dragon run settings to a `DragonStep` and regular run settings to a `LoacalStep` to match behavior of existing launchers. The `DragonLauncher` treats `DragonStep`s as managed steps and `LocalStep`s as unmanaged steps. The `DragonStep`s run commands are now tracked natively through the dragon run time and not proxyed through the `proxyable_run_cmd` decorator. Remove unnecessary down casts to dragon specific entities. Unify `connect_to_dragon` code path for main SmartSim process and the telemetry monitor. --- smartsim/_core/control/controller.py | 17 +-- .../_core/entrypoints/telemetrymonitor.py | 14 ++- .../_core/launcher/dragon/dragonLauncher.py | 116 ++++++++++++------ smartsim/_core/launcher/step/dragonStep.py | 45 ++----- smartsim/_core/launcher/step/step.py | 46 ------- tests/test_telemetry_monitor.py | 14 +-- 6 files changed, 102 insertions(+), 150 deletions(-) diff --git a/smartsim/_core/control/controller.py b/smartsim/_core/control/controller.py index c7ab29904..ea05b47bf 100644 --- a/smartsim/_core/control/controller.py +++ b/smartsim/_core/control/controller.py @@ -118,23 +118,8 @@ def start( The controller will start the job-manager thread upon execution of all jobs. """ - if isinstance(self._launcher, DragonLauncher): - dragon_server_path = CONFIG.dragon_server_path - if dragon_server_path is not None: - dragon_server_paths = dragon_server_path.split(":") - if len(dragon_server_paths) > 1: - msg = ( - "Multiple dragon servers not supported, " - "will connect to (or create) first server in list." - ) - logger.warning(msg) - self._launcher.connect_to_dragon(dragon_server_paths[0]) - else: - dragon_path = osp.join(exp_path, CONFIG.dragon_default_subdir) - self._launcher.connect_to_dragon(dragon_path) - if not self._launcher.is_connected: - raise LauncherError("Could not connect to Dragon server") + self._launcher.connect_to_dragon(exp_path) self._jobs.kill_on_interrupt = kill_on_interrupt # register custom signal handler for ^C (SIGINT) diff --git a/smartsim/_core/entrypoints/telemetrymonitor.py b/smartsim/_core/entrypoints/telemetrymonitor.py index 434d35b11..95c3641e0 100644 --- a/smartsim/_core/entrypoints/telemetrymonitor.py +++ b/smartsim/_core/entrypoints/telemetrymonitor.py @@ -28,6 +28,7 @@ import json import logging import os +import os.path import pathlib import signal import sys @@ -349,9 +350,15 @@ def init_launcher(self, launcher: str) -> Launcher: raise ValueError("Launcher type not supported: " + launcher) - def set_launcher(self, launcher_type: str) -> None: + def set_launcher( + self, launcher_type: str, exp_dir: t.Union[str, "os.PathLike[str]"] + ) -> None: """Set the launcher for the experiment""" self._launcher = self.init_launcher(launcher_type) + + if isinstance(self._launcher, DragonLauncher): + self._launcher.connect_to_dragon(exp_dir) + self.job_manager.set_launcher(self._launcher) self.job_manager.start() @@ -372,16 +379,15 @@ def process_manifest(self, manifest_path: str) -> None: self._logger.error("Manifest content error", exc_info=True) return + exp_dir = pathlib.Path(manifest_path).parent.parent.parent if self._launcher is None: - self.set_launcher(manifest.launcher) + self.set_launcher(manifest.launcher, exp_dir) if not self._launcher: raise SmartSimError(f"Unable to set launcher from {manifest_path}") runs = [run for run in manifest.runs if run.timestamp not in self._tracked_runs] - exp_dir = pathlib.Path(manifest_path).parent.parent.parent - for run in runs: for entity in run.flatten( filter_fn=lambda e: e.key not in self._tracked_jobs and e.is_managed diff --git a/smartsim/_core/launcher/dragon/dragonLauncher.py b/smartsim/_core/launcher/dragon/dragonLauncher.py index 1ae7b7c66..00d28e244 100644 --- a/smartsim/_core/launcher/dragon/dragonLauncher.py +++ b/smartsim/_core/launcher/dragon/dragonLauncher.py @@ -128,28 +128,33 @@ def _set_timeout(self, timeout: int) -> None: self._context.setsockopt(zmq.SNDTIMEO, value=timeout) self._context.setsockopt(zmq.RCVTIMEO, value=timeout) + def connect_to_dragon(self, path: t.Union[str, "os.PathLike[str]"]) -> None: + self._connect_to_dragon(path) + if not self.is_connected: + raise LauncherError("Could not connect to Dragon server") + # pylint: disable-next=too-many-statements - def connect_to_dragon(self, path: str) -> None: + def _connect_to_dragon(self, path: t.Union[str, "os.PathLike[str]"]) -> None: with DRG_LOCK: # TODO use manager instead if self.is_connected: return - dragon_config_log = os.path.join(path, CONFIG.dragon_log_filename) + path = _resolve_dragon_path(path) + dragon_config_log = path / CONFIG.dragon_log_filename - if Path.is_file(Path(dragon_config_log)): - dragon_confs = ( - DragonLauncher._parse_launched_dragon_server_info_from_files( - [dragon_config_log] - ) + if dragon_config_log.is_file(): + dragon_confs = self._parse_launched_dragon_server_info_from_files( + [dragon_config_log] ) logger.debug(dragon_confs) for dragon_conf in dragon_confs: if not "address" in dragon_conf: continue - msg = "Found dragon server config file. Checking if the server" - msg += f" is still up at address {dragon_conf['address']}." - logger.debug(msg) + logger.debug( + "Found dragon server config file. Checking if the server" + f" is still up at address {dragon_conf['address']}." + ) try: self._set_timeout(self._reconnect_timeout) self._handshake(dragon_conf["address"]) @@ -160,7 +165,7 @@ def connect_to_dragon(self, path: str) -> None: if self.is_connected: return - os.makedirs(path, exist_ok=True) + path.mkdir(parents=True, exist_ok=True) cmd = [ "dragon", @@ -179,8 +184,8 @@ def connect_to_dragon(self, path: str) -> None: launcher_socket.bind(socket_addr) cmd += ["+launching_address", socket_addr] - dragon_out_file = os.path.join(path, "dragon_head.out") - dragon_err_file = os.path.join(path, "dragon_head.err") + dragon_out_file = path / "dragon_head.out" + dragon_err_file = path / "dragon_head.err" with open(dragon_out_file, "w", encoding="utf-8") as dragon_out, open( dragon_err_file, "w", encoding="utf-8" @@ -244,12 +249,7 @@ def connect_to_dragon(self, path: str) -> None: @property def supported_rs(self) -> t.Dict[t.Type[SettingsBase], t.Type[Step]]: # RunSettings types supported by this launcher - return {DragonRunSettings: DragonStep, RunSettings: DragonStep} - - @staticmethod - def _unpack_launch_cmd(cmd: t.List[str]) -> DragonRunRequest: - req = DragonRunRequest.parse_obj(json.loads(cmd[-1])) - return req + return {DragonRunSettings: DragonStep, RunSettings: LocalStep} def run(self, step: Step) -> t.Optional[str]: """Run a job step through Slurm @@ -270,23 +270,45 @@ def run(self, step: Step) -> t.Optional[str]: step_id = None task_id = None + cmd = step.get_launch_cmd() + out, err = step.get_output_files() + if isinstance(step, DragonStep): - req = DragonLauncher._unpack_launch_cmd(step.get_launch_cmd()) + run_args = step.run_settings.run_args + env = step.run_settings.env_vars + nodes = int(run_args.get("nodes", None) or 1) + response = ( + _helpers.start_with( + DragonRunRequest( + exe=cmd[0], + exe_args=cmd[1:], + path=step.cwd, + name=step.name, + nodes=nodes, + env=env, + current_env=os.environ, + output_file=out, + error_file=err, + ) + ) + .then(self._send_request) + .then(_assert_schema_type(DragonRunResponse)) + .get_result() + ) + step_id = task_id = str(response.step_id) elif isinstance(step, LocalStep): - cmd = step.get_launch_cmd() - req = DragonRunRequest( - exe=cmd[0], exe_args=cmd[1:], path=step.cwd, name=step.entity_name + # pylint: disable-next=consider-using-with + out_strm = open(out, "w+", encoding="utf-8") + # pylint: disable-next=consider-using-with + err_strm = open(err, "w+", encoding="utf-8") + task_id = self.task_manager.start_task( + cmd, step.cwd, step.env, out=out_strm.fileno(), err=err_strm.fileno() + ) + else: # pragma: no-cover + raise TypeError( + f"{type(self).__name__} is unable to launch a step of " + f"type {type(self)}" ) - - response = ( - _helpers.start_with(req) - .then(self._send_request) - .then(_assert_schema_type(DragonRunResponse)) - .get_result() - ) - - step_id = str(response.step_id) - task_id = step_id self.step_mapping.add(step.name, step_id, task_id, step.managed) @@ -349,16 +371,17 @@ def _get_managed_step_update(self, step_ids: t.List[str]) -> t.List[StepInfo]: msg += response.error_message raise LauncherError(msg) - stat_tuple = response.statuses[NonEmptyStr(step_id)] - ret_codes = stat_tuple[1] + status, ret_codes = response.statuses[NonEmptyStr(step_id)] if ret_codes: grp_ret_code = min(ret_codes) if any(ret_codes): - err_msg = f"One or more processes failed for job {step_id}" - err_msg += f"Return codes were: {ret_codes}" + _err_msg = ( + f"One or more processes failed for job {step_id}" + f"Return codes were: {ret_codes}" + ) else: grp_ret_code = None - info = StepInfo(stat_tuple[0], stat_tuple[0], grp_ret_code) + info = StepInfo(status, status, grp_ret_code) updates.append(info) return updates @@ -394,7 +417,9 @@ def _parse_launched_dragon_server_info_from_iterable( @classmethod def _parse_launched_dragon_server_info_from_files( - cls, file_paths: t.List[str], num_dragon_envs: t.Optional[int] = None + cls, + file_paths: t.List[t.Union[str, "os.PathLike[str]"]], + num_dragon_envs: t.Optional[int] = None, ) -> t.List[t.Dict[str, str]]: with fileinput.FileInput(file_paths) as ifstream: dragon_envs = cls._parse_launched_dragon_server_info_from_iterable( @@ -439,3 +464,16 @@ def _dragon_cleanup(server_socket: zmq.Socket[t.Any], server_process_pid: int) - ) finally: os.kill(server_process_pid, signal.SIGINT) + + +def _resolve_dragon_path(fallback: t.Union[str, "os.PathLike[str]"]) -> Path: + dragon_server_path = CONFIG.dragon_server_path or os.path.join( + fallback, ".smartsim", "dragon" + ) + dragon_server_paths = dragon_server_path.split(":") + if len(dragon_server_paths) > 1: + logger.warning( + "Multiple dragon servers not supported, " + "will connect to (or create) first server in list." + ) + return Path(dragon_server_paths[0]) diff --git a/smartsim/_core/launcher/step/dragonStep.py b/smartsim/_core/launcher/step/dragonStep.py index 8ba4a0055..05fdba6ef 100644 --- a/smartsim/_core/launcher/step/dragonStep.py +++ b/smartsim/_core/launcher/step/dragonStep.py @@ -24,14 +24,12 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -import os import shutil import typing as t from ....log import get_logger from ....settings import DragonRunSettings, Singularity -from ...schemas import DragonRunRequest -from .step import Step, proxyable_launch_cmd +from .step import Step logger = get_logger(__name__) @@ -48,34 +46,23 @@ def __init__(self, name: str, cwd: str, run_settings: DragonRunSettings) -> None :type run_settings: SrunSettings """ super().__init__(name, cwd, run_settings) - self.alloc: t.Optional[str] = None self.managed = True - self.run_settings = run_settings - @proxyable_launch_cmd + @property + def run_settings(self) -> DragonRunSettings: + return t.cast(DragonRunSettings, self.step_settings) + def get_launch_cmd(self) -> t.List[str]: """Get stringified version of request needed to launch this step :return: launch command - :rtype: str + :rtype: list[str] """ - - output, error = self.get_output_files() - run_settings = self.run_settings - - # pylint: disable=protected-access - run_args = run_settings._run_args - - if "nodes" in run_args: - nodes = t.cast(int, run_args["nodes"]) - else: - nodes = 1 - exe_cmd = [] - if self.run_settings.colocated_db_settings: + if run_settings.colocated_db_settings: # Replace the command with the entrypoint wrapper script bash = shutil.which("bash") if not bash: @@ -83,9 +70,9 @@ def get_launch_cmd(self) -> t.List[str]: launch_script_path = self.get_colocated_launch_script() exe_cmd += [bash, launch_script_path] - if isinstance(self.run_settings.container, Singularity): + if isinstance(run_settings.container, Singularity): # pylint: disable-next=protected-access - exe_cmd += self.run_settings.container._container_cmds(self.cwd) + exe_cmd += run_settings.container._container_cmds(self.cwd) exe_cmd += run_settings.exe @@ -93,19 +80,7 @@ def get_launch_cmd(self) -> t.List[str]: exe_cmd_and_args = exe_cmd + exe_args - run_request = DragonRunRequest( - exe=exe_cmd_and_args[0], - exe_args=exe_cmd_and_args[1:], - path=self.cwd, - nodes=nodes, - output_file=output, - error_file=error, - env=run_settings.env_vars, - current_env=os.environ, - name=self.name, - ) - - return [run_request.json()] + return exe_cmd_and_args @staticmethod def _get_exe_args_list(run_setting: DragonRunSettings) -> t.List[str]: diff --git a/smartsim/_core/launcher/step/step.py b/smartsim/_core/launcher/step/step.py index 9e44b0bd3..afcdf0555 100644 --- a/smartsim/_core/launcher/step/step.py +++ b/smartsim/_core/launcher/step/step.py @@ -27,7 +27,6 @@ from __future__ import annotations import functools -import json import os.path as osp import sys import time @@ -35,8 +34,6 @@ from os import makedirs from smartsim._core.config import CONFIG -from smartsim._core.schemas import DragonRunRequest -from smartsim._core.schemas.types import NonEmptyStr from smartsim.error.errors import SmartSimError, UnproxyableStepError from ....log import get_logger @@ -135,49 +132,6 @@ def _get_launch_cmd(self: _StepT) -> t.List[str]: if not CONFIG.telemetry_enabled: return original_cmd_list - # pylint: disable-next=import-outside-toplevel - from .dragonStep import DragonStep - - if isinstance(self, DragonStep): - proxy_module = "smartsim._core.entrypoints.indirect" - etype = self.meta["entity_type"] - status_dir = self.meta["status_dir"] - run_req = DragonRunRequest.parse_obj(json.loads(original_cmd_list[-1])) - - exe_args = run_req.exe_args - encoded_cmd = encode_cmd([run_req.exe] + exe_args) - - # NOTE: this is NOT safe. should either 1) sign cmd and verify OR 2) - # serialize step and let the indirect entrypoint rebuild the - # cmd... for now, test away... - # NOTE: This mapping DOES NOT VALIDATE THE STRINGS and we are lying - # to pydantic and the type checker! We should try to remove this - # dragon specific branch for more type safety. - new_cmd = list( - map( - NonEmptyStr, - ( - sys.executable, - "-m", - proxy_module, - "+name", - self.name, - "+command", - encoded_cmd, - "+entity_type", - etype, - "+telemetry_dir", - status_dir, - "+working_dir", - self.cwd, - ), - ) - ) - run_req.exe = new_cmd[0] - run_req.exe_args = new_cmd[1:] - - return [run_req.json()] - if self.managed: raise UnproxyableStepError( f"Attempting to proxy managed step of type {type(self)} " diff --git a/tests/test_telemetry_monitor.py b/tests/test_telemetry_monitor.py index 0f3b92092..3e1f30290 100644 --- a/tests/test_telemetry_monitor.py +++ b/tests/test_telemetry_monitor.py @@ -929,11 +929,8 @@ def test_unmanaged_steps_are_proxyed_through_indirect( step = wlm_launcher.create_step("test-step", test_dir, rs) step.meta = mock_step_meta_dict assert isinstance(step, Step) - assert not step.managed or isinstance(step, DragonStep) + assert not step.managed cmd = step.get_launch_cmd() - if isinstance(wlm_launcher, DragonLauncher): - req = wlm_launcher._unpack_launch_cmd(cmd) - cmd = [req.exe] + req.exe_args assert sys.executable in cmd assert PROXY_ENTRY_POINT in cmd assert "hello" not in cmd @@ -941,7 +938,7 @@ def test_unmanaged_steps_are_proxyed_through_indirect( @for_all_wlm_launchers -def test_unmanaged_steps_are_not_proxied_if_the_telemetry_monitor_is_disabled( +def test_unmanaged_steps_are_not_proxyed_if_the_telemetry_monitor_is_disabled( wlm_launcher, mock_step_meta_dict, test_dir, monkeypatch ): monkeypatch.setattr(cfg.Config, CFG_TM_ENABLED_ATTR, False) @@ -949,11 +946,8 @@ def test_unmanaged_steps_are_not_proxied_if_the_telemetry_monitor_is_disabled( step = wlm_launcher.create_step("test-step", test_dir, rs) step.meta = mock_step_meta_dict assert isinstance(step, Step) - assert not step.managed or isinstance(step, DragonStep) + assert not step.managed cmd = step.get_launch_cmd() - if isinstance(wlm_launcher, DragonLauncher): - req = wlm_launcher._unpack_launch_cmd(cmd) - cmd = [req.exe] + req.exe_args assert PROXY_ENTRY_POINT not in cmd assert "hello" in cmd assert "world" in cmd @@ -1100,7 +1094,7 @@ def _faux_updates(_self: WLMLauncher, _names: t.List[str]) -> t.List[StepInfo]: ctx.setattr(SlurmLauncher, "get_step_update", get_faux_update(status_in)) mani_handler = ManifestEventHandler("xyz", logger) - mani_handler.set_launcher("slurm") + mani_handler.set_launcher("slurm", test_dir) # prep a fake job to request updates for job_entity = JobEntity() From e5e067de9c5aa068c69988b0badfcec07af5e418 Mon Sep 17 00:00:00 2001 From: Matt Drozt Date: Wed, 6 Mar 2024 19:28:45 -0600 Subject: [PATCH 2/3] Temporary Patch for TM Race Condition Hack "fix" for race condition in telemetry monitor test. This ought to be removed before we officially ship a dragon launcher! --- tests/test_telemetry_monitor.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/test_telemetry_monitor.py b/tests/test_telemetry_monitor.py index 3e1f30290..92b25bb85 100644 --- a/tests/test_telemetry_monitor.py +++ b/tests/test_telemetry_monitor.py @@ -705,6 +705,18 @@ def test_telemetry_db_and_model(fileutils, test_dir, wlmutils, monkeypatch, conf try: exp.start(orc) + # TODO: This sleep is necessary as there is race condition between + # SmartSim and the TM when launching and adding a managed + # task into their respective JMs for tracking. Essentially, + # the TM does not have enough time register file listeners + # before the manifest is updated with the start of + # `smartsim_model` when launching through a Launcher that + # does devolve into a simple system call from the driver + # script process (e.g. the dragon launcher) + # FIXME: THIS NEEDS TO BE REMOVED AND THIS TEST NEEDS TO PASS + # CONSISTENTLY BEFORE WE CAN SHIP A DRAGON LAUNCHER. + time.sleep(1) + # create run settings app_settings = exp.create_run_settings(sys.executable, test_script) app_settings.set_nodes(1) From a0ad0da5ebb1b711628e8c8fee038f5eab8a739e Mon Sep 17 00:00:00 2001 From: Al Rigazzi Date: Wed, 13 Mar 2024 18:36:55 +0100 Subject: [PATCH 3/3] Update smartsim/_core/launcher/dragon/dragonLauncher.py Co-authored-by: Matt Drozt --- smartsim/_core/launcher/dragon/dragonLauncher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/smartsim/_core/launcher/dragon/dragonLauncher.py b/smartsim/_core/launcher/dragon/dragonLauncher.py index 00d28e244..3eb900f31 100644 --- a/smartsim/_core/launcher/dragon/dragonLauncher.py +++ b/smartsim/_core/launcher/dragon/dragonLauncher.py @@ -307,7 +307,7 @@ def run(self, step: Step) -> t.Optional[str]: else: # pragma: no-cover raise TypeError( f"{type(self).__name__} is unable to launch a step of " - f"type {type(self)}" + f"type {type(step)}" ) self.step_mapping.add(step.name, step_id, task_id, step.managed)