Skip to content

Commit

Permalink
Merge pull request #426 from mcgalcode/bugfix/ensure_success
Browse files Browse the repository at this point in the history
Make ensure_success apply to replace/detour/addition jobs
  • Loading branch information
utf authored Sep 28, 2023

Verified

This commit was signed with the committer’s verified signature.
mpaperno Max Paperno
2 parents b977105 + a04a236 commit eddcfd3
Showing 3 changed files with 146 additions and 13 deletions.
32 changes: 19 additions & 13 deletions src/jobflow/managers/local.py
Original file line number Diff line number Diff line change
@@ -81,22 +81,22 @@ def _run_job(job: jobflow.Job, parents):
nonlocal stop_jobflow

if stop_jobflow:
return False
return None, True

if len(set(parents).intersection(stopped_parents)) > 0:
# stop children has been called for one of the jobs' parents
logger.info(
f"{job.name} is a child of a job with stop_children=True, skipping..."
)
stopped_parents.add(job.uuid)
return
return None, False

if (
len(set(parents).intersection(errored)) > 0
and job.config.on_missing_references == OnMissing.ERROR
):
errored.add(job.uuid)
return
return None, False

try:
response = job.run(store=store)
@@ -105,7 +105,7 @@ def _run_job(job: jobflow.Job, parents):

logger.info(f"{job.name} failed with exception:\n{traceback.format_exc()}")
errored.add(job.uuid)
return
return None, False

responses[job.uuid][job.index] = response

@@ -117,21 +117,25 @@ def _run_job(job: jobflow.Job, parents):

if response.stop_jobflow:
stop_jobflow = True
return False
return None, True

diversion_responses = []
if response.replace is not None:
# first run any restarts
_run(response.replace)
diversion_responses.append(_run(response.replace))

if response.detour is not None:
# next any detours
_run(response.detour)
diversion_responses.append(_run(response.detour))

if response.addition is not None:
# finally any additions
_run(response.addition)
diversion_responses.append(_run(response.addition))

return response
if not all(diversion_responses):
return None, False
else:
return response, False

def _get_job_dir():
if create_folders:
@@ -143,15 +147,17 @@ def _get_job_dir():
return root_dir

def _run(root_flow):
job: jobflow.Job
encountered_bad_response = False
for job, parents in root_flow.iterflow():
job_dir = _get_job_dir()
with cd(job_dir):
response = _run_job(job, parents)
if response is False:
response, jobflow_stopped = _run_job(job, parents)

encountered_bad_response = encountered_bad_response or response is None
if jobflow_stopped is True:
return False

return response is not None
return not encountered_bad_response

logger.info("Started executing jobs locally")
finished_successfully = _run(flow)
74 changes: 74 additions & 0 deletions tests/managers/conftest.py
Original file line number Diff line number Diff line change
@@ -234,6 +234,80 @@ def _gen():
return _gen


@pytest.fixture(scope="session")
def error_detour_job(error_job):
from jobflow import Response, job

global error_detour_func

@job
def error_detour_func(message):
return Response(output=message + "_end", detour=error_job())

return error_detour_func


@pytest.fixture(scope="session")
def error_detour_flow(error_detour_job, simple_job):
from jobflow import Flow

def _gen():
error = error_detour_job("detour")
simple1 = simple_job(error.output)
return Flow([error, simple1])

return _gen


@pytest.fixture(scope="session")
def error_replace_job(error_job):
from jobflow import Response, job

global error_replace_func

@job
def error_replace_func(message):
return Response(output=message + "_end", replace=error_job())

return error_replace_func


@pytest.fixture(scope="session")
def error_replace_flow(error_replace_job, simple_job):
from jobflow import Flow

def _gen():
error = error_replace_job("replace")
return Flow([error])

return _gen


@pytest.fixture(scope="session")
def error_addition_job(error_job):
from jobflow import Response, job

global error_addition_func

@job
def error_addition_func(message):
return Response(output=message + "_end", addition=error_job())

return error_addition_func


@pytest.fixture(scope="session")
def error_addition_flow(error_addition_job, simple_job):
from jobflow import Flow

def _gen():
error = error_addition_job("addition")
simple1 = simple_job(error.output)
return Flow([error, simple1])

return _gen


@pytest.fixture(scope="session")
def stored_data_job():
from jobflow import Response, job
53 changes: 53 additions & 0 deletions tests/managers/test_local.py
Original file line number Diff line number Diff line change
@@ -323,6 +323,59 @@ def test_error_flow(memory_jobstore, clean_dir, error_flow, capsys):
run_locally(flow, store=memory_jobstore, ensure_success=True)


def test_ensure_success_with_replace(memory_jobstore, error_replace_flow, capsys):
from jobflow import run_locally

flow = error_replace_flow()

responses = run_locally(flow, store=memory_jobstore)

# check responses has been filled with the replaced
# job's output
assert len(responses) == 1
assert flow.job_uuids[0] in responses

captured = capsys.readouterr()
assert "error_func failed with exception" in captured.out

with pytest.raises(RuntimeError, match="Flow did not finish running successfully"):
run_locally(flow, store=memory_jobstore, ensure_success=True)


def test_ensure_success_with_detour(error_detour_flow, memory_jobstore, capsys):
from jobflow import run_locally

flow = error_detour_flow()

responses = run_locally(flow, store=memory_jobstore)

# check responses has been filled with the detour output
assert len(responses) == 2

captured = capsys.readouterr()
assert "error_func failed with exception" in captured.out

with pytest.raises(RuntimeError, match="Flow did not finish running successfully"):
run_locally(flow, store=memory_jobstore, ensure_success=True)


def test_ensure_success_with_addition(error_addition_flow, memory_jobstore, capsys):
from jobflow import run_locally

flow = error_addition_flow()

responses = run_locally(flow, store=memory_jobstore)

# check responses has been filled with the addition output
assert len(responses) == 2

captured = capsys.readouterr()
assert "error_func failed with exception" in captured.out

with pytest.raises(RuntimeError, match="Flow did not finish running successfully"):
run_locally(flow, store=memory_jobstore, ensure_success=True)


def test_stored_data_flow(memory_jobstore, clean_dir, stored_data_flow, capsys):
from jobflow import run_locally

0 comments on commit eddcfd3

Please sign in to comment.