diff --git a/src/jobflow/managers/local.py b/src/jobflow/managers/local.py index 72f12d38..37dad837 100644 --- a/src/jobflow/managers/local.py +++ b/src/jobflow/managers/local.py @@ -81,7 +81,7 @@ def _run_job(job: jobflow.Job, parents): nonlocal stop_jobflow if stop_jobflow: - return False + return None, True if len(set(parents).intersection(stopped_parents)) > 0: # stop children has been called for one of the jobs' parents @@ -89,14 +89,14 @@ def _run_job(job: jobflow.Job, parents): f"{job.name} is a child of a job with stop_children=True, skipping..." ) stopped_parents.add(job.uuid) - return + return None, False if ( len(set(parents).intersection(errored)) > 0 and job.config.on_missing_references == OnMissing.ERROR ): errored.add(job.uuid) - return + return None, False try: response = job.run(store=store) @@ -105,7 +105,7 @@ def _run_job(job: jobflow.Job, parents): logger.info(f"{job.name} failed with exception:\n{traceback.format_exc()}") errored.add(job.uuid) - return + return None, False responses[job.uuid][job.index] = response @@ -117,21 +117,25 @@ def _run_job(job: jobflow.Job, parents): if response.stop_jobflow: stop_jobflow = True - return False + return None, True + diversion_responses = [] if response.replace is not None: # first run any restarts - _run(response.replace) + diversion_responses.append(_run(response.replace)) if response.detour is not None: # next any detours - _run(response.detour) + diversion_responses.append(_run(response.detour)) if response.addition is not None: # finally any additions - _run(response.addition) + diversion_responses.append(_run(response.addition)) - return response + if not all(diversion_responses): + return None, False + else: + return response, False def _get_job_dir(): if create_folders: @@ -143,15 +147,17 @@ def _get_job_dir(): return root_dir def _run(root_flow): - job: jobflow.Job + encountered_bad_response = False for job, parents in root_flow.iterflow(): job_dir = _get_job_dir() with cd(job_dir): - response = _run_job(job, parents) - if response is False: + response, jobflow_stopped = _run_job(job, parents) + + encountered_bad_response = encountered_bad_response or response is None + if jobflow_stopped is True: return False - return response is not None + return not encountered_bad_response logger.info("Started executing jobs locally") finished_successfully = _run(flow) diff --git a/tests/managers/conftest.py b/tests/managers/conftest.py index dd170df7..56a2ef27 100644 --- a/tests/managers/conftest.py +++ b/tests/managers/conftest.py @@ -234,6 +234,80 @@ def _gen(): return _gen +@pytest.fixture(scope="session") +def error_detour_job(error_job): + from jobflow import Response, job + + global error_detour_func + + @job + def error_detour_func(message): + return Response(output=message + "_end", detour=error_job()) + + return error_detour_func + + +@pytest.fixture(scope="session") +def error_detour_flow(error_detour_job, simple_job): + from jobflow import Flow + + def _gen(): + error = error_detour_job("detour") + simple1 = simple_job(error.output) + return Flow([error, simple1]) + + return _gen + + +@pytest.fixture(scope="session") +def error_replace_job(error_job): + from jobflow import Response, job + + global error_replace_func + + @job + def error_replace_func(message): + return Response(output=message + "_end", replace=error_job()) + + return error_replace_func + + +@pytest.fixture(scope="session") +def error_replace_flow(error_replace_job, simple_job): + from jobflow import Flow + + def _gen(): + error = error_replace_job("replace") + return Flow([error]) + + return _gen + + +@pytest.fixture(scope="session") +def error_addition_job(error_job): + from jobflow import Response, job + + global error_addition_func + + @job + def error_addition_func(message): + return Response(output=message + "_end", addition=error_job()) + + return error_addition_func + + +@pytest.fixture(scope="session") +def error_addition_flow(error_addition_job, simple_job): + from jobflow import Flow + + def _gen(): + error = error_addition_job("addition") + simple1 = simple_job(error.output) + return Flow([error, simple1]) + + return _gen + + @pytest.fixture(scope="session") def stored_data_job(): from jobflow import Response, job diff --git a/tests/managers/test_local.py b/tests/managers/test_local.py index 2212f041..e68a967d 100644 --- a/tests/managers/test_local.py +++ b/tests/managers/test_local.py @@ -323,6 +323,59 @@ def test_error_flow(memory_jobstore, clean_dir, error_flow, capsys): run_locally(flow, store=memory_jobstore, ensure_success=True) +def test_ensure_success_with_replace(memory_jobstore, error_replace_flow, capsys): + from jobflow import run_locally + + flow = error_replace_flow() + + responses = run_locally(flow, store=memory_jobstore) + + # check responses has been filled with the replaced + # job's output + assert len(responses) == 1 + assert flow.job_uuids[0] in responses + + captured = capsys.readouterr() + assert "error_func failed with exception" in captured.out + + with pytest.raises(RuntimeError, match="Flow did not finish running successfully"): + run_locally(flow, store=memory_jobstore, ensure_success=True) + + +def test_ensure_success_with_detour(error_detour_flow, memory_jobstore, capsys): + from jobflow import run_locally + + flow = error_detour_flow() + + responses = run_locally(flow, store=memory_jobstore) + + # check responses has been filled with the detour output + assert len(responses) == 2 + + captured = capsys.readouterr() + assert "error_func failed with exception" in captured.out + + with pytest.raises(RuntimeError, match="Flow did not finish running successfully"): + run_locally(flow, store=memory_jobstore, ensure_success=True) + + +def test_ensure_success_with_addition(error_addition_flow, memory_jobstore, capsys): + from jobflow import run_locally + + flow = error_addition_flow() + + responses = run_locally(flow, store=memory_jobstore) + + # check responses has been filled with the addition output + assert len(responses) == 2 + + captured = capsys.readouterr() + assert "error_func failed with exception" in captured.out + + with pytest.raises(RuntimeError, match="Flow did not finish running successfully"): + run_locally(flow, store=memory_jobstore, ensure_success=True) + + def test_stored_data_flow(memory_jobstore, clean_dir, stored_data_flow, capsys): from jobflow import run_locally