Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Symlink batch ensembles and batch models #547

Merged
merged 23 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions doc/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ To be released at some future point in time

Description

- Fix symlinking batch ensemble and model bug
- Remove defensive regexp in .gitignore
- Upgrade ubuntu to 22.04
- Remove helper function ``init_default``
Expand Down Expand Up @@ -45,6 +46,7 @@ Description

Detailed Notes

- Properly symlinks batch ensembles and batch models. (SmartSim-PR547_)
- Remove defensive regexp in .gitignore and ensure tests write to test_output.
(SmartSim-PR560_)
- After dropping support for Python 3.8, ubuntu needs to be upgraded.
Expand Down Expand Up @@ -116,6 +118,7 @@ Detailed Notes
handler. SmartSim will now attempt to kill any launched jobs before calling
the previously registered signal handler. (SmartSim-PR535_)

.. _SmartSim-PR547: https://github.com/CrayLabs/SmartSim/pull/547
.. _SmartSim-PR560: https://github.com/CrayLabs/SmartSim/pull/560
.. _SmartSim-PR558: https://github.com/CrayLabs/SmartSim/pull/558
.. _SmartSim-PR545: https://github.com/CrayLabs/SmartSim/pull/545
Expand Down
82 changes: 33 additions & 49 deletions smartsim/_core/control/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
shutdown_db_node,
)
from ...database import Orchestrator
from ...entity import Ensemble, EntityList, EntitySequence, Model, SmartSimEntity
from ...entity import Ensemble, EntitySequence, Model, SmartSimEntity
from ...error import (
LauncherError,
SmartSimError,
Expand All @@ -70,6 +70,7 @@
from ..launcher import LocalLauncher, LSFLauncher, PBSLauncher, SlurmLauncher
from ..launcher.launcher import Launcher
from ..utils import check_cluster_status, create_cluster, serialize
from .controller_utils import _AnonymousBatchJob, _look_up_launched_data
from .job import Job
from .jobmanager import JobManager
from .manifest import LaunchedManifest, LaunchedManifestBuilder, Manifest
Expand Down Expand Up @@ -376,14 +377,17 @@ def symlink_output_files(
entity_out.unlink()
entity_err.unlink()

try:
historical_err.touch()
historical_out.touch()

if historical_err.exists() and historical_out.exists():
entity_out.symlink_to(historical_out)
entity_err.symlink_to(historical_err)
except FileNotFoundError as fnf:
else:
raise FileNotFoundError(
f"Output files for {entity.name} could not be found. "
"Symlinking files failed."
) from fnf
)

def _launch(
self, exp_name: str, exp_path: str, manifest: Manifest
Expand Down Expand Up @@ -432,13 +436,23 @@ def _launch(
steps: t.List[
t.Tuple[Step, t.Union[SmartSimEntity, EntitySequence[SmartSimEntity]]]
] = []

symlink_substeps: t.List[
t.Tuple[Step, t.Union[SmartSimEntity, EntitySequence[SmartSimEntity]]]
] = []

for elist in manifest.ensembles:
ens_telem_dir = manifest_builder.run_telemetry_subdirectory / "ensemble"
if elist.batch:
batch_step, substeps = self._create_batch_job_step(elist, ens_telem_dir)
manifest_builder.add_ensemble(
elist, [(batch_step.name, step) for step in substeps]
)

# symlink substeps to maintain directory structure
for substep, substep_entity in zip(substeps, elist.models):
symlink_substeps.append((substep, substep_entity))

steps.append((batch_step, elist))
else:
# if ensemble is to be run as separate job steps, aka not in a batch
Expand All @@ -456,19 +470,26 @@ def _launch(
model_telem_dir = manifest_builder.run_telemetry_subdirectory / "model"
if model.batch_settings:
anon_entity_list = _AnonymousBatchJob(model)
batch_step, _ = self._create_batch_job_step(
batch_step, substeps = self._create_batch_job_step(
anon_entity_list, model_telem_dir
)
manifest_builder.add_model(model, (batch_step.name, batch_step))

symlink_substeps.append((substeps[0], model))
steps.append((batch_step, model))
else:
job_step = self._create_job_step(model, model_telem_dir)
manifest_builder.add_model(model, (job_step.name, job_step))
steps.append((job_step, model))

# launch steps
# launch and symlink steps
for step, entity in steps:
self._launch_step(step, entity)
self.symlink_output_files(step, entity)

# symlink substeps to maintain directory structure
for substep, entity in symlink_substeps:
self.symlink_output_files(substep, entity)

return manifest_builder.finalize()

Expand Down Expand Up @@ -501,12 +522,13 @@ def _launch_orchestrator(
orchestrator, [(orc_batch_step.name, step) for step in substeps]
)

self._launch_step(orc_batch_step, orchestrator)
self.symlink_output_files(orc_batch_step, orchestrator)

# symlink substeps to maintain directory structure
for substep, substep_entity in zip(substeps, orchestrator.entities):
self.symlink_output_files(substep, substep_entity)

self._launch_step(orc_batch_step, orchestrator)

# if orchestrator was run on existing allocation, locally, or in allocation
else:
db_steps = [
Expand All @@ -518,6 +540,7 @@ def _launch_orchestrator(
)
for db_step in db_steps:
self._launch_step(*db_step)
self.symlink_output_files(*db_step)

# wait for orchestrator to spin up
self._orchestrator_launch_wait(orchestrator)
Expand Down Expand Up @@ -572,7 +595,6 @@ def _launch_step(
if completed_job is None and (
entity.name not in self._jobs.jobs and entity.name not in self._jobs.db_jobs
):
self.symlink_output_files(job_step, entity)
try:
job_id = self._launcher.run(job_step)
except LauncherError as e:
Expand All @@ -581,10 +603,10 @@ def _launch_step(
msg += f"{entity}"
logger.error(msg)
raise SmartSimError(f"Job step {entity.name} failed to launch") from e

# if the completed job does exist and the entity passed in is the same
# that has ran and completed, relaunch the entity.
elif completed_job is not None and completed_job.entity is entity:
self.symlink_output_files(job_step, entity)
try:
job_id = self._launcher.run(job_step)
except LauncherError as e:
Expand All @@ -593,6 +615,7 @@ def _launch_step(
msg += f"{entity}"
logger.error(msg)
raise SmartSimError(f"Job step {entity.name} failed to launch") from e

# the entity is using a duplicate name of an existing entity in
# the experiment, throw an error
else:
Expand Down Expand Up @@ -938,42 +961,3 @@ def _start_telemetry_monitor(self, exp_dir: str) -> None:
cwd=str(pathlib.Path(__file__).parent.parent.parent),
shell=False,
)


class _AnonymousBatchJob(EntityList[Model]):
@staticmethod
def _validate(model: Model) -> None:
if model.batch_settings is None:
msg = "Unable to create _AnonymousBatchJob without batch_settings"
raise SmartSimError(msg)

def __init__(self, model: Model) -> None:
self._validate(model)
super().__init__(model.name, model.path)
self.entities = [model]
self.batch_settings = model.batch_settings

def _initialize_entities(self, **kwargs: t.Any) -> None: ...


def _look_up_launched_data(
launcher: Launcher,
) -> t.Callable[[t.Tuple[str, Step]], "TStepLaunchMetaData"]:
def _unpack_launched_data(data: t.Tuple[str, Step]) -> "TStepLaunchMetaData":
# NOTE: we cannot assume that the name of the launched step
# ``launched_step_name`` is equal to the name of the step referring to
# the entity ``step.name`` as is the case when an entity list is
# launched as a batch job
launched_step_name, step = data
launched_step_map = launcher.step_mapping[launched_step_name]
out_file, err_file = step.get_output_files()
return (
launched_step_map.step_id,
launched_step_map.task_id,
launched_step_map.managed,
out_file,
err_file,
pathlib.Path(step.meta.get("status_dir", step.cwd)),
)

return _unpack_launched_data
77 changes: 77 additions & 0 deletions smartsim/_core/control/controller_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# BSD 2-Clause License
#
# Copyright (c) 2021-2024, Hewlett Packard Enterprise
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from __future__ import annotations

import pathlib
import typing as t

from ..._core.launcher.step import Step
from ...entity import EntityList, Model
from ...error import SmartSimError
from ..launcher.launcher import Launcher

if t.TYPE_CHECKING:
from ..utils.serialize import TStepLaunchMetaData


class _AnonymousBatchJob(EntityList[Model]):
@staticmethod
def _validate(model: Model) -> None:
if model.batch_settings is None:
msg = "Unable to create _AnonymousBatchJob without batch_settings"
raise SmartSimError(msg)

def __init__(self, model: Model) -> None:
self._validate(model)
super().__init__(model.name, model.path)
self.entities = [model]
self.batch_settings = model.batch_settings

def _initialize_entities(self, **kwargs: t.Any) -> None: ...


def _look_up_launched_data(
launcher: Launcher,
) -> t.Callable[[t.Tuple[str, Step]], "TStepLaunchMetaData"]:
def _unpack_launched_data(data: t.Tuple[str, Step]) -> "TStepLaunchMetaData":
# NOTE: we cannot assume that the name of the launched step
# ``launched_step_name`` is equal to the name of the step referring to
# the entity ``step.name`` as is the case when an entity list is
# launched as a batch job
launched_step_name, step = data
launched_step_map = launcher.step_mapping[launched_step_name]
out_file, err_file = step.get_output_files()
return (
launched_step_map.step_id,
launched_step_map.task_id,
launched_step_map.managed,
out_file,
err_file,
pathlib.Path(step.meta.get("status_dir", step.cwd)),
)

return _unpack_launched_data
Loading
Loading