diff --git a/gaps/cli/pipeline.py b/gaps/cli/pipeline.py index ea1818bb..ba180e38 100644 --- a/gaps/cli/pipeline.py +++ b/gaps/cli/pipeline.py @@ -43,25 +43,60 @@ def template_pipeline_config(commands): @click.pass_context -def pipeline(ctx, config_file, cancel, monitor, background=False): +def pipeline( + ctx, config_file, cancel, monitor, background=False, recursive=False +): """Execute multiple steps in an analysis pipeline.""" + if recursive: + _submit_recursive_pipelines(ctx, cancel, monitor, background) + return + if config_file is None: - config_file = [ - fp - for fp in Path(".").glob("*") - if fp.is_file() and "pipeline" in fp.name - ] - if len(config_file) != 1: + config_files = _find_pipeline_config_files(Path(".")) + if len(config_files) != 1: msg = ( f"Could not determine config file - multiple (or no) files " f" detected with 'pipeline' in the name exist: {config_file}" ) raise gapsExecutionError(msg) - config_file = config_file[0] + config_file = config_files[0] init_logging_from_config_file(config_file, background=background) + _run_pipeline(ctx, config_file, cancel, monitor, background) + + +def _submit_recursive_pipelines(ctx, cancel, monitor, background): + """Submit pipelines in all recursive subdirectories.""" + start_dir = Path(".") + for ind, sub_dir in enumerate(start_dir.glob("**/")): + config_files = _find_pipeline_config_files(sub_dir) + if sub_dir.name == Status.HIDDEN_SUB_DIR: + continue + + if len(config_files) > 1: + msg = ( + f"Could not determine config file - multiple files detected " + f"with 'pipeline' in the name in the {str(sub_dir)!r} " + "directory!" + ) + warn(msg, gapsWarning) + continue + if len(config_files) == 0: + continue + + init_logging_from_config_file(config_files[0], background=ind == 0) + _run_pipeline(ctx, config_files[0], cancel, monitor, background) + + +def _find_pipeline_config_files(directory): + """Find all files matching *pipeline* in directory""" + return [fp for fp in Path(directory).glob("*pipeline*") if fp.is_file()] + + +def _run_pipeline(ctx, config_file, cancel, monitor, background): + """Run a GAPs pipeline for an existing config file.""" if cancel: Pipeline.cancel_all(config_file) @@ -128,6 +163,17 @@ def pipeline_command(template_config): help="Flag to monitor pipeline jobs continuously. Default is not " "to monitor (kick off jobs and exit).", ), + click.Option( + param_decls=["--recursive", "-r"], + is_flag=True, + help="Flag to recursively submit pipelines, starting from the " + "current directory and checking every sub-directory therein. The " + "`-c` option will be *completely ignored* if you use this option. " + "Instead, the code will check every sub-directory for exactly one " + "file with the word `pipeline` in it. If found, that file is " + "assumed to be the pipeline config and is used to kick off the " + "pipeline. In any other case, the directory is skipped.", + ), ] if _can_run_background(): params += [ diff --git a/tests/cli/test_cli_pipeline.py b/tests/cli/test_cli_pipeline.py index b6a2d71b..c70ffba6 100644 --- a/tests/cli/test_cli_pipeline.py +++ b/tests/cli/test_cli_pipeline.py @@ -5,6 +5,7 @@ """ import os import json +import shutil from pathlib import Path import click @@ -34,19 +35,6 @@ SUCCESS_CONFIG = {"test": "success"} -@pytest.fixture -def runnable_pipeline(tmp_path): - """Add run to pipeline commands for test only.""" - try: - Pipeline.COMMANDS["run"] = run - pipe_config_fp = tmp_path / "config_pipe.json" - with open(pipe_config_fp, "w") as config_file: - json.dump(SAMPLE_CONFIG, config_file) - yield - finally: - Pipeline.COMMANDS.pop("run") - - @pytest.fixture def pipe_config_fp(tmp_path): """Add a sample pipeline config to a temp directory.""" @@ -57,6 +45,16 @@ def pipe_config_fp(tmp_path): yield pipe_config_fp +@pytest.fixture +def runnable_pipeline(pipe_config_fp): + """Add run to pipeline commands for test only.""" + try: + Pipeline.COMMANDS["run"] = run + yield pipe_config_fp + finally: + Pipeline.COMMANDS.pop("run") + + @click.command() @click.option("--config", "-c", default=".", help="Path to config file") def run(config): @@ -95,7 +93,6 @@ def test_pipeline_command( tmp_path, cli_runner, runnable_pipeline, - pipe_config_fp, assert_message_was_logged, ): """Test the pipeline_command creation.""" @@ -108,10 +105,10 @@ def test_pipeline_command( assert "background" in [opt.name for opt in pipe.params] else: assert "background" not in [opt.name for opt in pipe.params] - cli_runner.invoke(pipe, ["-c", pipe_config_fp.as_posix()] + extra_args) + cli_runner.invoke(pipe, ["-c", runnable_pipeline.as_posix()] + extra_args) if not extra_args: - cli_runner.invoke(pipe, ["-c", pipe_config_fp.as_posix()]) + cli_runner.invoke(pipe, ["-c", runnable_pipeline.as_posix()]) else: assert Status(tmp_path).get(StatusField.MONITOR_PID) == os.getpid() @@ -249,5 +246,59 @@ def test_pipeline_command_with_running_pid( ) +def test_pipeline_command_recursive( + tmp_cwd, cli_runner, runnable_pipeline, assert_message_was_logged +): + """Test the pipeline command with recursive directories.""" + + target_config_fp = tmp_cwd / "config_run.json" + target_config_fp.touch() + runnable_pipeline.rename(runnable_pipeline.parent / "config_pipeline.json") + + test_dirs = [ + tmp_cwd, + tmp_cwd / "test_run_2", + tmp_cwd / "test_run_2" / "test_run_3", + tmp_cwd / "test_run_4", + tmp_cwd / "test_run_5", + tmp_cwd / "test_run_6", + ] + + for prev_dir, next_dir in zip(test_dirs[0:-1], test_dirs[1:]): + shutil.copytree(prev_dir, next_dir) + + for test_dir in test_dirs: + expected_out_fp = test_dir / "config_run.json" + assert expected_out_fp.exists() + with open(expected_out_fp, "r") as config: + assert not config.read() + + shutil.copy( + test_dirs[-2] / "config_pipeline.json", + test_dirs[-2] / "config_pipeline2.json", + ) + (test_dirs[-1] / "config_pipeline.json").unlink() + + pipe = pipeline_command({}) + cli_runner.invoke(pipe, ["-r"]) + cli_runner.invoke(pipe, ["-r"]) + + for test_dir in test_dirs[:-2]: + assert_message_was_logged(test_dir.name, "INFO") + with open(test_dir / "config_run.json", "r") as config: + assert json.load(config) == SUCCESS_CONFIG + + for test_dir in test_dirs[-2:]: + with open(test_dir / "config_run.json", "r") as config: + assert not config.read() + + assert_message_was_logged("Pipeline job", "INFO") + assert_message_was_logged("is complete.", "INFO") + assert_message_was_logged( + "Could not determine config file - multiple files detected", "WARNING" + ) + assert_message_was_logged(test_dirs[-2].name, "WARNING") + + if __name__ == "__main__": pytest.main(["-q", "--show-capture=all", Path(__file__), "-rapP"])