Skip to content

Commit

Permalink
Merge pull request #28 from NREL/pp/recurvise_pipeline_submission
Browse files Browse the repository at this point in the history
Recursive pipeline submission
  • Loading branch information
ppinchuk authored Sep 22, 2023
2 parents f6c6b80 + 842e3e1 commit 3a4f046
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 24 deletions.
62 changes: 54 additions & 8 deletions gaps/cli/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,25 +43,60 @@ def template_pipeline_config(commands):


@click.pass_context
def pipeline(ctx, config_file, cancel, monitor, background=False):
def pipeline(
ctx, config_file, cancel, monitor, background=False, recursive=False
):
"""Execute multiple steps in an analysis pipeline."""

if recursive:
_submit_recursive_pipelines(ctx, cancel, monitor, background)
return

if config_file is None:
config_file = [
fp
for fp in Path(".").glob("*")
if fp.is_file() and "pipeline" in fp.name
]
if len(config_file) != 1:
config_files = _find_pipeline_config_files(Path("."))
if len(config_files) != 1:
msg = (
f"Could not determine config file - multiple (or no) files "
f" detected with 'pipeline' in the name exist: {config_file}"
)
raise gapsExecutionError(msg)

config_file = config_file[0]
config_file = config_files[0]

init_logging_from_config_file(config_file, background=background)
_run_pipeline(ctx, config_file, cancel, monitor, background)


def _submit_recursive_pipelines(ctx, cancel, monitor, background):
"""Submit pipelines in all recursive subdirectories."""
start_dir = Path(".")
for ind, sub_dir in enumerate(start_dir.glob("**/")):
config_files = _find_pipeline_config_files(sub_dir)
if sub_dir.name == Status.HIDDEN_SUB_DIR:
continue

if len(config_files) > 1:
msg = (
f"Could not determine config file - multiple files detected "
f"with 'pipeline' in the name in the {str(sub_dir)!r} "
"directory!"
)
warn(msg, gapsWarning)
continue
if len(config_files) == 0:
continue

init_logging_from_config_file(config_files[0], background=ind == 0)
_run_pipeline(ctx, config_files[0], cancel, monitor, background)


def _find_pipeline_config_files(directory):
"""Find all files matching *pipeline* in directory"""
return [fp for fp in Path(directory).glob("*pipeline*") if fp.is_file()]


def _run_pipeline(ctx, config_file, cancel, monitor, background):
"""Run a GAPs pipeline for an existing config file."""

if cancel:
Pipeline.cancel_all(config_file)
Expand Down Expand Up @@ -128,6 +163,17 @@ def pipeline_command(template_config):
help="Flag to monitor pipeline jobs continuously. Default is not "
"to monitor (kick off jobs and exit).",
),
click.Option(
param_decls=["--recursive", "-r"],
is_flag=True,
help="Flag to recursively submit pipelines, starting from the "
"current directory and checking every sub-directory therein. The "
"`-c` option will be *completely ignored* if you use this option. "
"Instead, the code will check every sub-directory for exactly one "
"file with the word `pipeline` in it. If found, that file is "
"assumed to be the pipeline config and is used to kick off the "
"pipeline. In any other case, the directory is skipped.",
),
]
if _can_run_background():
params += [
Expand Down
83 changes: 67 additions & 16 deletions tests/cli/test_cli_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""
import os
import json
import shutil
from pathlib import Path

import click
Expand Down Expand Up @@ -34,19 +35,6 @@
SUCCESS_CONFIG = {"test": "success"}


@pytest.fixture
def runnable_pipeline(tmp_path):
"""Add run to pipeline commands for test only."""
try:
Pipeline.COMMANDS["run"] = run
pipe_config_fp = tmp_path / "config_pipe.json"
with open(pipe_config_fp, "w") as config_file:
json.dump(SAMPLE_CONFIG, config_file)
yield
finally:
Pipeline.COMMANDS.pop("run")


@pytest.fixture
def pipe_config_fp(tmp_path):
"""Add a sample pipeline config to a temp directory."""
Expand All @@ -57,6 +45,16 @@ def pipe_config_fp(tmp_path):
yield pipe_config_fp


@pytest.fixture
def runnable_pipeline(pipe_config_fp):
"""Add run to pipeline commands for test only."""
try:
Pipeline.COMMANDS["run"] = run
yield pipe_config_fp
finally:
Pipeline.COMMANDS.pop("run")


@click.command()
@click.option("--config", "-c", default=".", help="Path to config file")
def run(config):
Expand Down Expand Up @@ -95,7 +93,6 @@ def test_pipeline_command(
tmp_path,
cli_runner,
runnable_pipeline,
pipe_config_fp,
assert_message_was_logged,
):
"""Test the pipeline_command creation."""
Expand All @@ -108,10 +105,10 @@ def test_pipeline_command(
assert "background" in [opt.name for opt in pipe.params]
else:
assert "background" not in [opt.name for opt in pipe.params]
cli_runner.invoke(pipe, ["-c", pipe_config_fp.as_posix()] + extra_args)
cli_runner.invoke(pipe, ["-c", runnable_pipeline.as_posix()] + extra_args)

if not extra_args:
cli_runner.invoke(pipe, ["-c", pipe_config_fp.as_posix()])
cli_runner.invoke(pipe, ["-c", runnable_pipeline.as_posix()])
else:
assert Status(tmp_path).get(StatusField.MONITOR_PID) == os.getpid()

Expand Down Expand Up @@ -249,5 +246,59 @@ def test_pipeline_command_with_running_pid(
)


def test_pipeline_command_recursive(
tmp_cwd, cli_runner, runnable_pipeline, assert_message_was_logged
):
"""Test the pipeline command with recursive directories."""

target_config_fp = tmp_cwd / "config_run.json"
target_config_fp.touch()
runnable_pipeline.rename(runnable_pipeline.parent / "config_pipeline.json")

test_dirs = [
tmp_cwd,
tmp_cwd / "test_run_2",
tmp_cwd / "test_run_2" / "test_run_3",
tmp_cwd / "test_run_4",
tmp_cwd / "test_run_5",
tmp_cwd / "test_run_6",
]

for prev_dir, next_dir in zip(test_dirs[0:-1], test_dirs[1:]):
shutil.copytree(prev_dir, next_dir)

for test_dir in test_dirs:
expected_out_fp = test_dir / "config_run.json"
assert expected_out_fp.exists()
with open(expected_out_fp, "r") as config:
assert not config.read()

shutil.copy(
test_dirs[-2] / "config_pipeline.json",
test_dirs[-2] / "config_pipeline2.json",
)
(test_dirs[-1] / "config_pipeline.json").unlink()

pipe = pipeline_command({})
cli_runner.invoke(pipe, ["-r"])
cli_runner.invoke(pipe, ["-r"])

for test_dir in test_dirs[:-2]:
assert_message_was_logged(test_dir.name, "INFO")
with open(test_dir / "config_run.json", "r") as config:
assert json.load(config) == SUCCESS_CONFIG

for test_dir in test_dirs[-2:]:
with open(test_dir / "config_run.json", "r") as config:
assert not config.read()

assert_message_was_logged("Pipeline job", "INFO")
assert_message_was_logged("is complete.", "INFO")
assert_message_was_logged(
"Could not determine config file - multiple files detected", "WARNING"
)
assert_message_was_logged(test_dirs[-2].name, "WARNING")


if __name__ == "__main__":
pytest.main(["-q", "--show-capture=all", Path(__file__), "-rapP"])

0 comments on commit 3a4f046

Please sign in to comment.