Skip to content

Commit

Permalink
Merge branch 'develop' into 478
Browse files Browse the repository at this point in the history
  • Loading branch information
ankona authored Apr 10, 2024
2 parents 99f9b65 + 3edd895 commit e7eb2ce
Show file tree
Hide file tree
Showing 6 changed files with 292 additions and 4 deletions.
5 changes: 5 additions & 0 deletions doc/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ To be released at some future point in time
Description

- Update watchdog dependency
- Historical output files stored under .smartsim directory
- Add option to build Torch backend without the Intel Math Kernel Library
- Fix ReadTheDocs build issue
- Promote device options to an Enum
Expand All @@ -39,6 +40,9 @@ Description
Detailed Notes

- Update watchdog dependency from 3.x to 4.x, fix new type issues (SmartSim-PR540_)
- The dashboard needs to display historical logs, so log files are written
out under the .smartsim directory and files under the experiment
directory are symlinked to them. (SmartSim-PR532_)
- Add an option to smart build "--torch_with_mkl"/"--no_torch_with_mkl" to
prevent Torch from trying to link in the Intel Math Kernel Library. This
is needed because on machines that have the Intel compilers installed, the
Expand Down Expand Up @@ -92,6 +96,7 @@ Detailed Notes
the previously registered signal handler. (SmartSim-PR535_)

.. _SmartSim-PR540: https://github.com/CrayLabs/SmartSim/pull/540
.. _SmartSim-PR532: https://github.com/CrayLabs/SmartSim/pull/532
.. _SmartSim-PR538: https://github.com/CrayLabs/SmartSim/pull/538
.. _SmartSim-PR537: https://github.com/CrayLabs/SmartSim/pull/537
.. _SmartSim-PR498: https://github.com/CrayLabs/SmartSim/pull/498
Expand Down
39 changes: 38 additions & 1 deletion smartsim/_core/control/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,36 @@ def init_launcher(self, launcher: str) -> None:
else:
raise TypeError("Must provide a 'launcher' argument")

@staticmethod
def symlink_output_files(
job_step: Step, entity: t.Union[SmartSimEntity, EntitySequence[SmartSimEntity]]
) -> None:
"""Create symlinks for entity output files that point to the output files
under the .smartsim directory
:param job_step: Job step instance
:type job_step: Step
:param entity: Entity instance
:type entity: SmartSimEntity | EntitySequence[SmartSimEntity]
"""
historical_out, historical_err = map(pathlib.Path, job_step.get_output_files())
entity_out = pathlib.Path(entity.path) / f"{entity.name}.out"
entity_err = pathlib.Path(entity.path) / f"{entity.name}.err"

# check if there is already a link to a previous run
if entity_out.is_symlink() or entity_err.is_symlink():
entity_out.unlink()
entity_err.unlink()

try:
entity_out.symlink_to(historical_out)
entity_err.symlink_to(historical_err)
except FileNotFoundError as fnf:
raise FileNotFoundError(
f"Output files for {entity.name} could not be found. "
"Symlinking files failed."
) from fnf

def _launch(
self, exp_name: str, exp_path: str, manifest: Manifest
) -> LaunchedManifest[t.Tuple[str, Step]]:
Expand Down Expand Up @@ -470,6 +500,11 @@ def _launch_orchestrator(
manifest_builder.add_database(
orchestrator, [(orc_batch_step.name, step) for step in substeps]
)

# symlink substeps to maintain directory structure
for substep, substep_entity in zip(substeps, orchestrator.entities):
self.symlink_output_files(substep, substep_entity)

self._launch_step(orc_batch_step, orchestrator)

# if orchestrator was run on existing allocation, locally, or in allocation
Expand Down Expand Up @@ -537,6 +572,7 @@ def _launch_step(
if completed_job is None and (
entity.name not in self._jobs.jobs and entity.name not in self._jobs.db_jobs
):
self.symlink_output_files(job_step, entity)
try:
job_id = self._launcher.run(job_step)
except LauncherError as e:
Expand All @@ -548,6 +584,7 @@ def _launch_step(
# if the completed job does exist and the entity passed in is the same
# that has ran and completed, relaunch the entity.
elif completed_job is not None and completed_job.entity is entity:
self.symlink_output_files(job_step, entity)
try:
job_id = self._launcher.run(job_step)
except LauncherError as e:
Expand Down Expand Up @@ -599,7 +636,7 @@ def _create_batch_job_step(
entity_list.name, entity_list.path, entity_list.batch_settings
)
batch_step.meta["entity_type"] = str(type(entity_list).__name__).lower()
batch_step.meta["status_dir"] = str(telemetry_dir / entity_list.name)
batch_step.meta["status_dir"] = str(telemetry_dir)

substeps = []
for entity in entity_list.entities:
Expand Down
18 changes: 15 additions & 3 deletions smartsim/_core/launcher/step/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import functools
import os.path as osp
import pathlib
import sys
import time
import typing as t
Expand Down Expand Up @@ -66,10 +67,21 @@ def _create_unique_name(entity_name: str) -> str:
step_name = entity_name + "-" + get_base_36_repr(time.time_ns())
return step_name

@staticmethod
def _ensure_output_directory_exists(output_dir: str) -> None:
"""Create the directory for the step output if it doesn't exist already"""
if not osp.exists(output_dir):
pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

def get_output_files(self) -> t.Tuple[str, str]:
"""Return two paths to error and output files based on cwd"""
output = self.get_step_file(ending=".out")
error = self.get_step_file(ending=".err")
"""Return two paths to error and output files based on metadata directory"""
try:
output_dir = self.meta["status_dir"]
except KeyError as exc:
raise KeyError("Status directory for this step has not been set.") from exc
self._ensure_output_directory_exists(output_dir)
output = osp.join(output_dir, f"{self.entity_name}.out")
error = osp.join(output_dir, f"{self.entity_name}.err")
return output, error

def get_step_file(
Expand Down
4 changes: 4 additions & 0 deletions tests/test_controller_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ def test_restarting_entity(test_dir, wlmutils, entity):
"""Validate restarting a completed Model/Ensemble job"""
step_settings = RunSettings("echo")
step = MockStep("mock-step", test_dir, step_settings)
step.meta["status_dir"] = test_dir
entity.path = test_dir
test_launcher = wlmutils.get_test_launcher()
controller = Controller(test_launcher)
controller._jobs.add_job(entity.name, job_id="1234", entity=entity)
Expand All @@ -172,6 +174,8 @@ def test_restarting_orch(test_dir, wlmutils):
"""Validate restarting a completed Orchestrator job"""
step_settings = RunSettings("echo")
step = MockStep("mock-step", test_dir, step_settings)
step.meta["status_dir"] = test_dir
orc.path = test_dir
test_launcher = wlmutils.get_test_launcher()
controller = Controller(test_launcher)
controller._jobs.add_job(orc.name, job_id="1234", entity=orc)
Expand Down
227 changes: 227 additions & 0 deletions tests/test_output_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
# BSD 2-Clause License
#
# Copyright (c) 2021-2024, Hewlett Packard Enterprise
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import os
import pathlib

import pytest

from smartsim import Experiment
from smartsim._core.config import CONFIG
from smartsim._core.control.controller import Controller
from smartsim._core.launcher.step import Step
from smartsim.database.orchestrator import Orchestrator
from smartsim.entity.ensemble import Ensemble
from smartsim.entity.model import Model
from smartsim.settings.base import RunSettings
from smartsim.settings.slurmSettings import SbatchSettings, SrunSettings

controller = Controller()
slurm_controller = Controller(launcher="slurm")

rs = RunSettings("echo", ["spam", "eggs"])
bs = SbatchSettings()
batch_rs = SrunSettings("echo", ["spam", "eggs"])

ens = Ensemble("ens", params={}, run_settings=rs, batch_settings=bs, replicas=3)
orc = Orchestrator(db_nodes=3, batch=True, launcher="slurm", run_command="srun")
model = Model("test_model", {}, "", rs)
batch_model = Model("batch_test_model", {}, "", batch_rs, batch_settings=bs)


def test_mutated_model_output(test_dir):
exp_name = "test-mutated-model-output"
exp = Experiment(exp_name, launcher="local", exp_path=test_dir)

test_model = exp.create_model("test_model", path=test_dir, run_settings=rs)
exp.generate(test_model)
exp.start(test_model, block=True)

assert pathlib.Path(test_model.path).exists()
assert pathlib.Path(test_model.path, f"{test_model.name}.out").is_symlink()
assert pathlib.Path(test_model.path, f"{test_model.name}.err").is_symlink()

with open(pathlib.Path(test_model.path, f"{test_model.name}.out"), "r") as file:
log_contents = file.read()

assert "spam eggs" in log_contents

first_link = os.readlink(pathlib.Path(test_model.path, f"{test_model.name}.out"))

test_model.run_settings.exe_args = ["hello", "world"]
exp.generate(test_model, overwrite=True)
exp.start(test_model, block=True)

assert pathlib.Path(test_model.path).exists()
assert pathlib.Path(test_model.path, f"{test_model.name}.out").is_symlink()
assert pathlib.Path(test_model.path, f"{test_model.name}.err").is_symlink()

with open(pathlib.Path(test_model.path, f"{test_model.name}.out"), "r") as file:
log_contents = file.read()

assert "hello world" in log_contents

second_link = os.readlink(pathlib.Path(test_model.path, f"{test_model.name}.out"))

with open(first_link, "r") as file:
first_historical_log = file.read()

assert "spam eggs" in first_historical_log

with open(second_link, "r") as file:
second_historical_log = file.read()

assert "hello world" in second_historical_log


def test_get_output_files_with_create_job_step(test_dir):
"""Testing output files through _create_job_step"""
exp_dir = pathlib.Path(test_dir)
status_dir = exp_dir / CONFIG.telemetry_subdir / model.type
step = controller._create_job_step(model, status_dir)
expected_out_path = status_dir / model.name / (model.name + ".out")
expected_err_path = status_dir / model.name / (model.name + ".err")
assert step.get_output_files() == (str(expected_out_path), str(expected_err_path))


@pytest.mark.parametrize(
"entity",
[pytest.param(ens, id="ensemble"), pytest.param(orc, id="orchestrator")],
)
def test_get_output_files_with_create_batch_job_step(entity, test_dir):
"""Testing output files through _create_batch_job_step"""
exp_dir = pathlib.Path(test_dir)
status_dir = exp_dir / CONFIG.telemetry_subdir / entity.type
batch_step, substeps = slurm_controller._create_batch_job_step(entity, status_dir)
for step in substeps:
# example output path for a member of an Ensemble is
# .smartsim/telemetry/Ensemble/ens/ens_0/ens_0.out
expected_out_path = (
status_dir / entity.name / step.entity_name / (step.entity_name + ".out")
)
expected_err_path = (
status_dir / entity.name / step.entity_name / (step.entity_name + ".err")
)
assert step.get_output_files() == (
str(expected_out_path),
str(expected_err_path),
)


def test_model_get_output_files(test_dir):
"""Testing model output files with manual step creation"""
exp_dir = pathlib.Path(test_dir)
step = Step(model.name, model.path, model.run_settings)
step.meta["status_dir"] = exp_dir / "output_dir"
expected_out_path = step.meta["status_dir"] / (model.name + ".out")
expected_err_path = step.meta["status_dir"] / (model.name + ".err")
assert step.get_output_files() == (str(expected_out_path), str(expected_err_path))


def test_ensemble_get_output_files(test_dir):
"""Testing ensemble output files with manual step creation"""
exp_dir = pathlib.Path(test_dir)
for member in ens.models:
step = Step(member.name, member.path, member.run_settings)
step.meta["status_dir"] = exp_dir / "output_dir"
expected_out_path = step.meta["status_dir"] / (member.name + ".out")
expected_err_path = step.meta["status_dir"] / (member.name + ".err")
assert step.get_output_files() == (
str(expected_out_path),
str(expected_err_path),
)


def test_get_output_files_no_status_dir(test_dir):
"""Test that a step not having a status directory throws a KeyError"""
step_settings = RunSettings("echo")
step = Step("mock-step", test_dir, step_settings)
with pytest.raises(KeyError):
out, err = step.get_output_files()


@pytest.mark.parametrize(
"entity",
[pytest.param(ens, id="ensemble"), pytest.param(model, id="model")],
)
def test_symlink(test_dir, entity):
"""Test symlinking historical output files"""
entity.path = test_dir
if entity.type == Ensemble:
for member in ens.models:
symlink_with_create_job_step(test_dir, member)
else:
symlink_with_create_job_step(test_dir, entity)


def symlink_with_create_job_step(test_dir, entity):
"""Function that helps cut down on repeated testing code"""
exp_dir = pathlib.Path(test_dir)
entity.path = test_dir
status_dir = exp_dir / CONFIG.telemetry_subdir / entity.type
step = controller._create_job_step(entity, status_dir)
controller.symlink_output_files(step, entity)
assert pathlib.Path(entity.path, f"{entity.name}.out").is_symlink()
assert pathlib.Path(entity.path, f"{entity.name}.err").is_symlink()
assert os.readlink(pathlib.Path(entity.path, f"{entity.name}.out")) == str(
status_dir / entity.name / (entity.name + ".out")
)
assert os.readlink(pathlib.Path(entity.path, f"{entity.name}.err")) == str(
status_dir / entity.name / (entity.name + ".err")
)


@pytest.mark.parametrize(
"entity",
[pytest.param(ens, id="ensemble"), pytest.param(orc, id="orchestrator")],
)
def test_batch_symlink(entity, test_dir):
"""Test symlinking historical output files"""
exp_dir = pathlib.Path(test_dir)
entity.path = test_dir
status_dir = exp_dir / CONFIG.telemetry_subdir / entity.type
batch_step, substeps = slurm_controller._create_batch_job_step(entity, status_dir)
for step in substeps:
slurm_controller.symlink_output_files(step, entity)
assert pathlib.Path(entity.path, f"{entity.name}.out").is_symlink()
assert pathlib.Path(entity.path, f"{entity.name}.err").is_symlink()
assert os.readlink(pathlib.Path(entity.path, f"{entity.name}.out")) == str(
status_dir / entity.name / step.entity_name / (step.entity_name + ".out")
)
assert os.readlink(pathlib.Path(entity.path, f"{entity.name}.err")) == str(
status_dir / entity.name / step.entity_name / (step.entity_name + ".err")
)


def test_symlink_error(test_dir):
"""Ensure FileNotFoundError is thrown"""
bad_model = Model(
"bad_model", {}, pathlib.Path(test_dir, "badpath"), RunSettings("echo")
)
telem_dir = pathlib.Path(test_dir, "bad_model_telemetry")
bad_step = controller._create_job_step(bad_model, telem_dir)
with pytest.raises(FileNotFoundError):
controller.symlink_output_files(bad_step, bad_model)
Loading

0 comments on commit e7eb2ce

Please sign in to comment.