Skip to content

Commit

Permalink
Merge pull request #50 from NREL/pp/debug_exec_option
Browse files Browse the repository at this point in the history
New exec option
  • Loading branch information
ppinchuk authored Aug 19, 2024
2 parents 62b5a3f + c78829e commit f6bbf30
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 41 deletions.
6 changes: 6 additions & 0 deletions gaps/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,16 @@ def _make_job_dirs(self):
logger.debug(
"Batch jobs list: %s", sorted(table.index.values.tolist())
)
logger.debug("Using the following batch sets: %s", self._sets)
logger.info("Preparing batch job directories...")

# walk through current directory getting everything to copy
for source_dir, _, filenames in os.walk(self._base_dir):
logger.debug("Processing files in : %s", source_dir)
logger.debug(
" - Is dupe dir: %s",
any(job_tag in source_dir for job_tag in self._sets),
)

# don't make additional copies of job sub directories.
if any(job_tag in source_dir for job_tag in self._sets):
Expand All @@ -112,6 +117,7 @@ def _make_job_dirs(self):
/ tag
/ source_dir.relative_to(self._base_dir)
)
logger.debug("Creating dir: %s", destination_dir)
destination_dir.mkdir(parents=True, exist_ok=True)

for name in filenames:
Expand Down
93 changes: 59 additions & 34 deletions gaps/cli/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,29 +244,12 @@ def kickoff_jobs(self):
keys_to_run, lists_to_run = self._keys_and_lists_to_run()

jobs = sorted(product(*lists_to_run))
num_jobs_submit = len(jobs)
self._warn_about_excessive_au_usage(num_jobs_submit)
self._warn_about_excessive_au_usage(len(jobs))
extra_exec_args = self._extract_extra_exec_args_for_command()
for node_index, values in enumerate(jobs):
tag = _tag(node_index, num_jobs_submit)
self.ctx.obj["NAME"] = job_name = f"{self.job_name}{tag}"
node_specific_config = deepcopy(self.config)
node_specific_config.pop("execution_control", None)
node_specific_config.update(
{
"tag": tag,
"command_name": self.command_name,
"pipeline_step": self.pipeline_step,
"config_file": self.config_file.as_posix(),
"project_dir": self.project_dir.as_posix(),
"job_name": job_name,
"out_dir": self.project_dir.as_posix(),
"out_fpath": self._suggested_stem(job_name).as_posix(),
"run_method": getattr(
self.command_config, "run_method", None
),
}
)

for tag, values, exec_kwargs in self._with_tagged_context(jobs):

node_specific_config = self._compile_node_config(tag)
node_specific_config.update(extra_exec_args)

for key, val in zip(keys_to_run, values):
Expand All @@ -275,21 +258,63 @@ def kickoff_jobs(self):
else:
node_specific_config.update(dict(zip(key, val)))

cmd = "; ".join(_CMD_LIST).format(
run_func_module=self.command_config.runner.__module__,
run_func_name=self.command_config.runner.__name__,
node_specific_config=as_script_str(node_specific_config),
project_dir=self.project_dir.as_posix(),
logging_options=as_script_str(self.logging_options),
exclude_from_status=as_script_str(self.exclude_from_status),
pipeline_step=self.pipeline_step,
job_name=job_name,
)
cmd = f"python -c {cmd!r}"
kickoff_job(self.ctx, cmd, deepcopy(self.exec_kwargs))
cmd = self._compile_run_command(node_specific_config)
kickoff_job(self.ctx, cmd, exec_kwargs)

return self

def _with_tagged_context(self, jobs):
"""Iterate over jobs and populate context with job name."""
num_jobs_submit = len(jobs)

exec_kwargs = deepcopy(self.exec_kwargs)
num_test_nodes = exec_kwargs.pop("num_test_nodes", None)
if num_test_nodes is None:
num_test_nodes = float("inf")

for node_index, values in enumerate(jobs):
if node_index >= num_test_nodes:
return

tag = _tag(node_index, num_jobs_submit)
self.ctx.obj["NAME"] = f"{self.job_name}{tag}"
yield tag, values, exec_kwargs

def _compile_node_config(self, tag):
"""Compile initial node-specific config."""
job_name = self.ctx.obj["NAME"]
node_specific_config = deepcopy(self.config)
node_specific_config.pop("execution_control", None)
node_specific_config.update(
{
"tag": tag,
"command_name": self.command_name,
"pipeline_step": self.pipeline_step,
"config_file": self.config_file.as_posix(),
"project_dir": self.project_dir.as_posix(),
"job_name": job_name,
"out_dir": self.project_dir.as_posix(),
"out_fpath": self._suggested_stem(job_name).as_posix(),
"run_method": getattr(self.command_config, "run_method", None),
}
)
return node_specific_config

def _compile_run_command(self, node_specific_config):
"""Create run command from config and job name."""
job_name = self.ctx.obj["NAME"]
cmd = "; ".join(_CMD_LIST).format(
run_func_module=self.command_config.runner.__module__,
run_func_name=self.command_config.runner.__name__,
node_specific_config=as_script_str(node_specific_config),
project_dir=self.project_dir.as_posix(),
logging_options=as_script_str(self.logging_options),
exclude_from_status=as_script_str(self.exclude_from_status),
pipeline_step=self.pipeline_step,
job_name=job_name,
)
return f"python -c {cmd!r}"

def _suggested_stem(self, job_name_with_tag):
"""Determine suggested filepath with filename stem."""
if self._include_tag_in_out_fpath:
Expand Down
9 changes: 9 additions & 0 deletions gaps/cli/documentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"conda_env": None,
"module": None,
"sh_script": None,
"num_test_nodes": None,
}

EXTRA_EXEC_PARAMS = {
Expand Down Expand Up @@ -338,6 +339,14 @@
Extra shell script to run before command call.
By default, ``None``, which does not run any
scripts.
:num_test_nodes: (str, optional)
Number of nodes to submit before terminating the
submission process. This can be used to test a
new submission configuration without sumbitting
all nodes (i.e. only running a handful to ensure
the inputs are specified correctly and the
outputs look reasonable). By default, ``None``,
which submits all node jobs.
Only the `option` key is required for local execution. For
execution on the HPC, the `allocation` and `walltime` keys are also
Expand Down
2 changes: 2 additions & 0 deletions gaps/cli/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import datetime as dt
from pathlib import Path
from copy import deepcopy
from warnings import warn
from inspect import signature

Expand Down Expand Up @@ -55,6 +56,7 @@ def kickoff_job(ctx, cmd, exec_kwargs):
If `exec_kwargs` is missing some arguments required by the
respective `submit` function.
"""
exec_kwargs = deepcopy(exec_kwargs)
hardware_option = HardwareOption(exec_kwargs.pop("option", "local"))
if hardware_option.manager is None:
_kickoff_local_job(ctx, cmd)
Expand Down
61 changes: 54 additions & 7 deletions tests/cli/test_cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,9 +485,8 @@ def test_run_multiple_nodes(
"max_workers": 1,
},
"input1": 1,
"input2": 7,
"input3": 8,
"_z_0": ["unsorted", "strings"],
"_z_0": ["unsorted", "strings"],
"project_points": [0, 1, 2, 4],
}

Expand Down Expand Up @@ -524,13 +523,13 @@ def test_run_multiple_nodes_correct_zfill(
TestCommand,
"run",
name="run",
split_keys={"project_points", "_z_0"},
split_keys={"project_points", "input3"},
)
else:
command_config = CLICommandFromFunction(
_testing_function,
name="run",
split_keys={"project_points", "_z_0"},
split_keys={"project_points", "input3"},
)

config = {
Expand All @@ -542,9 +541,7 @@ def test_run_multiple_nodes_correct_zfill(
"max_workers": 1,
},
"input1": 1,
"input2": 7,
"input3": 8,
"_z_0": ["unsorted", "strings"],
"input3": ["unsorted", "strings"],
"project_points": [0, 1, 2, 4, 5, 6, 7, 8, 9],
}

Expand All @@ -561,6 +558,56 @@ def test_run_multiple_nodes_correct_zfill(
assert any("j0" in job_name for job_name in job_names_cache)


@pytest.mark.parametrize("test_class", [False, True])
@pytest.mark.parametrize(
"test_nodes", [(-1, 0), (0, 0), (1, 1), (10, 10), (20, 10)]
)
def test_run_multiple_nodes_num_test_nodes(
test_ctx, runnable_script, test_class, test_nodes, job_names_cache
):
"""`run` function calls `_kickoff_hpc_job` for `num_test_nodes`."""

tmp_path = test_ctx.obj["TMP_PATH"]
num_test_nodes, expected_job_count = test_nodes

if test_class:
command_config = CLICommandFromClass(
TestCommand,
"run",
name="run",
split_keys={"project_points", "input3"},
)
else:
command_config = CLICommandFromFunction(
_testing_function,
name="run",
split_keys={"project_points", "input3"},
)

config = {
"execution_control": {
"option": "eagle",
"allocation": "test",
"walltime": 1,
"nodes": 5,
"max_workers": 1,
"num_test_nodes": num_test_nodes,
},
"input1": 1,
"input3": ["unsorted", "strings"],
"project_points": [0, 1, 2, 4, 5, 6, 7, 8, 9],
}

config_fp = tmp_path / "config.json"
with open(config_fp, "w") as config_file:
json.dump(config, config_file)

assert len(job_names_cache) == 0
from_config(config_fp, command_config)
assert len(job_names_cache) == expected_job_count
assert len(set(job_names_cache)) == expected_job_count


@pytest.mark.parametrize("test_class", [False, True])
def test_run_no_split_keys(
test_ctx, runnable_script, test_class, job_names_cache
Expand Down

0 comments on commit f6bbf30

Please sign in to comment.