Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New exec option #50

Merged
merged 11 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions gaps/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,16 @@ def _make_job_dirs(self):
logger.debug(
"Batch jobs list: %s", sorted(table.index.values.tolist())
)
logger.debug("Using the following batch sets: %s", self._sets)
logger.info("Preparing batch job directories...")

# walk through current directory getting everything to copy
for source_dir, _, filenames in os.walk(self._base_dir):
logger.debug("Processing files in : %s", source_dir)
logger.debug(
" - Is dupe dir: %s",
any(job_tag in source_dir for job_tag in self._sets),
)

# don't make additional copies of job sub directories.
if any(job_tag in source_dir for job_tag in self._sets):
Expand All @@ -112,6 +117,7 @@ def _make_job_dirs(self):
/ tag
/ source_dir.relative_to(self._base_dir)
)
logger.debug("Creating dir: %s", destination_dir)
destination_dir.mkdir(parents=True, exist_ok=True)

for name in filenames:
Expand Down
93 changes: 59 additions & 34 deletions gaps/cli/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,29 +244,12 @@ def kickoff_jobs(self):
keys_to_run, lists_to_run = self._keys_and_lists_to_run()

jobs = sorted(product(*lists_to_run))
num_jobs_submit = len(jobs)
self._warn_about_excessive_au_usage(num_jobs_submit)
self._warn_about_excessive_au_usage(len(jobs))
extra_exec_args = self._extract_extra_exec_args_for_command()
for node_index, values in enumerate(jobs):
tag = _tag(node_index, num_jobs_submit)
self.ctx.obj["NAME"] = job_name = f"{self.job_name}{tag}"
node_specific_config = deepcopy(self.config)
node_specific_config.pop("execution_control", None)
node_specific_config.update(
{
"tag": tag,
"command_name": self.command_name,
"pipeline_step": self.pipeline_step,
"config_file": self.config_file.as_posix(),
"project_dir": self.project_dir.as_posix(),
"job_name": job_name,
"out_dir": self.project_dir.as_posix(),
"out_fpath": self._suggested_stem(job_name).as_posix(),
"run_method": getattr(
self.command_config, "run_method", None
),
}
)

for tag, values, exec_kwargs in self._with_tagged_context(jobs):

node_specific_config = self._compile_node_config(tag)
node_specific_config.update(extra_exec_args)

for key, val in zip(keys_to_run, values):
Expand All @@ -275,21 +258,63 @@ def kickoff_jobs(self):
else:
node_specific_config.update(dict(zip(key, val)))

cmd = "; ".join(_CMD_LIST).format(
run_func_module=self.command_config.runner.__module__,
run_func_name=self.command_config.runner.__name__,
node_specific_config=as_script_str(node_specific_config),
project_dir=self.project_dir.as_posix(),
logging_options=as_script_str(self.logging_options),
exclude_from_status=as_script_str(self.exclude_from_status),
pipeline_step=self.pipeline_step,
job_name=job_name,
)
cmd = f"python -c {cmd!r}"
kickoff_job(self.ctx, cmd, deepcopy(self.exec_kwargs))
cmd = self._compile_run_command(node_specific_config)
kickoff_job(self.ctx, cmd, exec_kwargs)

return self

def _with_tagged_context(self, jobs):
"""Iterate over jobs and populate context with job name."""
num_jobs_submit = len(jobs)

exec_kwargs = deepcopy(self.exec_kwargs)
num_test_nodes = exec_kwargs.pop("num_test_nodes", None)
if num_test_nodes is None:
num_test_nodes = float("inf")

for node_index, values in enumerate(jobs):
if node_index >= num_test_nodes:
return

tag = _tag(node_index, num_jobs_submit)
self.ctx.obj["NAME"] = f"{self.job_name}{tag}"
yield tag, values, exec_kwargs

def _compile_node_config(self, tag):
"""Compile initial node-specific config."""
job_name = self.ctx.obj["NAME"]
node_specific_config = deepcopy(self.config)
node_specific_config.pop("execution_control", None)
node_specific_config.update(
{
"tag": tag,
"command_name": self.command_name,
"pipeline_step": self.pipeline_step,
"config_file": self.config_file.as_posix(),
"project_dir": self.project_dir.as_posix(),
"job_name": job_name,
"out_dir": self.project_dir.as_posix(),
"out_fpath": self._suggested_stem(job_name).as_posix(),
"run_method": getattr(self.command_config, "run_method", None),
}
)
return node_specific_config

def _compile_run_command(self, node_specific_config):
"""Create run command from config and job name."""
job_name = self.ctx.obj["NAME"]
cmd = "; ".join(_CMD_LIST).format(
run_func_module=self.command_config.runner.__module__,
run_func_name=self.command_config.runner.__name__,
node_specific_config=as_script_str(node_specific_config),
project_dir=self.project_dir.as_posix(),
logging_options=as_script_str(self.logging_options),
exclude_from_status=as_script_str(self.exclude_from_status),
pipeline_step=self.pipeline_step,
job_name=job_name,
)
return f"python -c {cmd!r}"

def _suggested_stem(self, job_name_with_tag):
"""Determine suggested filepath with filename stem."""
if self._include_tag_in_out_fpath:
Expand Down
9 changes: 9 additions & 0 deletions gaps/cli/documentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"conda_env": None,
"module": None,
"sh_script": None,
"num_test_nodes": None,
}

EXTRA_EXEC_PARAMS = {
Expand Down Expand Up @@ -338,6 +339,14 @@
Extra shell script to run before command call.
By default, ``None``, which does not run any
scripts.
:num_test_nodes: (str, optional)
Number of nodes to submit before terminating the
submission process. This can be used to test a
new submission configuration without sumbitting
all nodes (i.e. only running a handful to ensure
the inputs are specified correctly and the
outputs look reasonable). By default, ``None``,
which submits all node jobs.

Only the `option` key is required for local execution. For
execution on the HPC, the `allocation` and `walltime` keys are also
Expand Down
2 changes: 2 additions & 0 deletions gaps/cli/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import datetime as dt
from pathlib import Path
from copy import deepcopy
from warnings import warn
from inspect import signature

Expand Down Expand Up @@ -55,6 +56,7 @@ def kickoff_job(ctx, cmd, exec_kwargs):
If `exec_kwargs` is missing some arguments required by the
respective `submit` function.
"""
exec_kwargs = deepcopy(exec_kwargs)
hardware_option = HardwareOption(exec_kwargs.pop("option", "local"))
if hardware_option.manager is None:
_kickoff_local_job(ctx, cmd)
Expand Down
61 changes: 54 additions & 7 deletions tests/cli/test_cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,9 +485,8 @@ def test_run_multiple_nodes(
"max_workers": 1,
},
"input1": 1,
"input2": 7,
"input3": 8,
"_z_0": ["unsorted", "strings"],
"_z_0": ["unsorted", "strings"],
"project_points": [0, 1, 2, 4],
}

Expand Down Expand Up @@ -524,13 +523,13 @@ def test_run_multiple_nodes_correct_zfill(
TestCommand,
"run",
name="run",
split_keys={"project_points", "_z_0"},
split_keys={"project_points", "input3"},
)
else:
command_config = CLICommandFromFunction(
_testing_function,
name="run",
split_keys={"project_points", "_z_0"},
split_keys={"project_points", "input3"},
)

config = {
Expand All @@ -542,9 +541,7 @@ def test_run_multiple_nodes_correct_zfill(
"max_workers": 1,
},
"input1": 1,
"input2": 7,
"input3": 8,
"_z_0": ["unsorted", "strings"],
"input3": ["unsorted", "strings"],
"project_points": [0, 1, 2, 4, 5, 6, 7, 8, 9],
}

Expand All @@ -561,6 +558,56 @@ def test_run_multiple_nodes_correct_zfill(
assert any("j0" in job_name for job_name in job_names_cache)


@pytest.mark.parametrize("test_class", [False, True])
@pytest.mark.parametrize(
"test_nodes", [(-1, 0), (0, 0), (1, 1), (10, 10), (20, 10)]
)
def test_run_multiple_nodes_num_test_nodes(
test_ctx, runnable_script, test_class, test_nodes, job_names_cache
):
"""`run` function calls `_kickoff_hpc_job` for `num_test_nodes`."""

tmp_path = test_ctx.obj["TMP_PATH"]
num_test_nodes, expected_job_count = test_nodes

if test_class:
command_config = CLICommandFromClass(
TestCommand,
"run",
name="run",
split_keys={"project_points", "input3"},
)
else:
command_config = CLICommandFromFunction(
_testing_function,
name="run",
split_keys={"project_points", "input3"},
)

config = {
"execution_control": {
"option": "eagle",
"allocation": "test",
"walltime": 1,
"nodes": 5,
"max_workers": 1,
"num_test_nodes": num_test_nodes,
},
"input1": 1,
"input3": ["unsorted", "strings"],
"project_points": [0, 1, 2, 4, 5, 6, 7, 8, 9],
}

config_fp = tmp_path / "config.json"
with open(config_fp, "w") as config_file:
json.dump(config, config_file)

assert len(job_names_cache) == 0
from_config(config_fp, command_config)
assert len(job_names_cache) == expected_job_count
assert len(set(job_names_cache)) == expected_job_count


@pytest.mark.parametrize("test_class", [False, True])
def test_run_no_split_keys(
test_ctx, runnable_script, test_class, job_names_cache
Expand Down
Loading