Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix/celery-chord-error #481

Merged
merged 23 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
7888493
resolve CHANGELOG conflict
bgunnar5 Aug 4, 2023
ec13acf
Merge branch 'develop' of https://github.com/LLNL/merlin into develop
bgunnar5 Aug 4, 2023
7c59fc2
merge latest changes from develop
bgunnar5 Nov 20, 2023
29573d4
remove a merge conflict statement that was missed
bgunnar5 Nov 20, 2023
d4a33a6
Merge branch 'develop' of https://github.com/LLNL/merlin into develop
bgunnar5 Jan 25, 2024
dc224b5
Merge branch 'develop' of https://github.com/LLNL/merlin into develop
bgunnar5 Jan 25, 2024
54af2b6
Merge branch 'develop' of https://github.com/LLNL/merlin into develop
bgunnar5 Mar 11, 2024
0849acc
Merge branch 'develop' of https://github.com/bgunnar5/merlin into dev…
bgunnar5 May 16, 2024
b44e285
Merge branch 'develop' of https://github.com/bgunnar5/merlin into dev…
bgunnar5 May 16, 2024
358568e
Merge branch 'LLNL:develop' into develop
bgunnar5 May 23, 2024
adc4ebb
add celery results backend patch to stop ChordErrors
bgunnar5 May 24, 2024
3426db3
add MERLIN_RAISE_ERROR return code
bgunnar5 May 24, 2024
07f558c
add tests to ensure chord error isn't raised
bgunnar5 May 24, 2024
fe1d4d9
add RAISE_ERROR to docs
bgunnar5 May 24, 2024
33a1551
update CHANGELOG
bgunnar5 May 24, 2024
1b9d907
fix lint issues
bgunnar5 May 24, 2024
db2924e
up the sleep time on the chord error test
bgunnar5 May 24, 2024
547d6d3
add new steps to the chord err test spec
bgunnar5 May 24, 2024
4745ab7
add tree statement to the new test for debugging
bgunnar5 May 28, 2024
9933f06
upping sleep time to see if that fixes github action for python 3.7
bgunnar5 May 28, 2024
2e12d40
change sleep time for new test based on python version
bgunnar5 May 28, 2024
e003d98
run fix style
bgunnar5 May 28, 2024
c4043f0
remove specific sleep time for diff python versions
bgunnar5 May 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- New github action test to make sure target branch has been merged into the source first, so we know histories are ok
- Check in the status commands to make sure we're not pulling statuses from nested workspaces
- Added `setuptools` as a requirement for python 3.12 to recognize the `pkg_resources` library
- Patch to celery results backend to stop ChordErrors being raised and breaking workflows when a single task fails
- New step return code `$(MERLIN_RAISE_ERROR)` to force an error to be raised by a task (mainly for testing)
- Added description of this to docs
- New test to ensure a single failed task won't break a workflow

### Changed
- `merlin info` is cleaner and gives python package info
Expand Down
1 change: 1 addition & 0 deletions docs/user_guide/variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,3 +252,4 @@ If necessary, users can raise their own return codes within steps. The table bel
| <pre>`$(MERLIN_SOFT_FAIL)`</pre> | Mark this step as a failure, note in the warning log but keep executing the workflow. Unknown return codes get translated to soft fails, so that they can be logged. | <pre><code><span>echo "Uh-oh, this sample didn't work"</span></br><span>exit $(MERLIN_SOFT_FAIL)</span></code></pre> |
| <pre>`$(MERLIN_HARD_FAIL)`</pre> | Something went terribly wrong and we need to stop the whole workflow. Raises a `HardFailException` and stops all workers connected to that step. Workers will stop after a 60 second delay to allow the step to be acknowledged by the server. <div class="admonition note"><p class="admonition-title">Note</p><p>Workers in isolated parts of the workflow not consuming from the bad step will continue. you can stop all workers with `$(MERLIN_STOP_WORKERS)`</p></div> | <pre><code><span>echo "Oh no, we've created skynet! Abort!"</span></br><span>exit $(MERLIN_HARD_FAIL)</span></code></pre> |
| <pre>`$(MERLIN_STOP_WORKERS)`</pre> | Launch a task to stop all active workers. To allow the current task to finish and acknowledge the results to the server, will happen in 60 seconds. | <pre><code><span># send a signal to all workers to stop</span></br><span>exit $(MERLIN_STOP_WORKERS)</span></code></pre> |
| <pre>`$(MERLIN_RAISE_ERROR)`</pre> | Purposefully raise a general exception. *This is intended to be used for testing, you'll likely want to use `$(MERLIN_SOFT_FAIL)` instead.* | <pre><code><span># send a signal to raise an exception</span></br><span>exit $(MERLIN_RAISE_ERROR)</span></code></pre> |
37 changes: 35 additions & 2 deletions merlin/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@
from typing import Dict, Optional, Union

import billiard
import celery
import psutil
from celery import Celery
from celery import Celery, states
from celery.backends.redis import RedisBackend # noqa: F401 ; Needed for celery patch
from celery.signals import worker_process_init

import merlin.common.security.encrypt_backend_traffic
Expand All @@ -50,6 +52,37 @@
LOG: logging.Logger = logging.getLogger(__name__)


def patch_celery():
"""
Patch redis backend so that errors in chords don't break workflows.
Celery has error callbacks but they do not work properly on chords that
are nested within chains.

Credit to this function goes to: https://danidee10.github.io/2019/07/09/celery-chords.html
"""

def _unpack_chord_result(
self,
tup,
decode,
EXCEPTION_STATES=states.EXCEPTION_STATES,
PROPAGATE_STATES=states.PROPAGATE_STATES,
):
_, tid, state, retval = decode(tup)

if state in EXCEPTION_STATES:
retval = self.exception_to_python(retval)
if state in PROPAGATE_STATES:
# retval is an Exception
retval = f"{retval.__class__.__name__}: {str(retval)}"

return retval

celery.backends.redis.RedisBackend._unpack_chord_result = _unpack_chord_result

return celery


# This function has to have specific args/return values for celery so ignore pylint
def route_for_task(name, args, kwargs, options, task=None, **kw): # pylint: disable=W0613,R1710
"""
Expand Down Expand Up @@ -82,7 +115,7 @@ def route_for_task(name, args, kwargs, options, task=None, **kw): # pylint: dis
RESULTS_BACKEND_URI = None

# initialize app with essential properties
app: Celery = Celery(
app: Celery = patch_celery().Celery(
"merlin",
broker=BROKER_URI,
backend=RESULTS_BACKEND_URI,
Expand Down
1 change: 1 addition & 0 deletions merlin/common/abstracts/enums/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,4 @@ class ReturnCode(IntEnum):
DRY_OK = 103
RETRY = 104
STOP_WORKERS = 105
RAISE_ERROR = 106
5 changes: 5 additions & 0 deletions merlin/common/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from celery import chain, chord, group, shared_task, signature
from celery.exceptions import MaxRetriesExceededError, OperationalError, TimeoutError # pylint: disable=W0622
from filelock import FileLock, Timeout
from redis.exceptions import TimeoutError as RedisTimeoutError

from merlin.common.abstracts.enums import ReturnCode
from merlin.common.sample_index import uniform_directories
Expand All @@ -62,6 +63,7 @@
RetryException,
RestartException,
FileNotFoundError,
RedisTimeoutError,
)

LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -181,6 +183,9 @@ def merlin_step(self, *args: Any, **kwargs: Any) -> Optional[ReturnCode]: # noq
shutdown = shutdown_workers.s(None)
shutdown.set(queue=step.get_task_queue())
shutdown.apply_async(countdown=STOP_COUNTDOWN)
elif result == ReturnCode.RAISE_ERROR:
LOG.warning("*** Raising an error ***")
raise Exception("Exception raised by request from the user")
else:
LOG.warning(f"**** Step '{step_name}' in '{step_dir}' had unhandled exit code {result}. Continuing with workflow.")

Expand Down
2 changes: 2 additions & 0 deletions merlin/spec/expansion.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"MERLIN_HARD_FAIL",
"MERLIN_RETRY",
"MERLIN_STOP_WORKERS",
"MERLIN_RAISE_ERROR",
}
MERLIN_RESERVED = STEP_AWARE | PROVENANCE_REPLACE
RESERVED = MAESTRO_RESERVED | MERLIN_RESERVED
Expand Down Expand Up @@ -215,6 +216,7 @@ def parameter_substitutions_for_cmd(glob_path, sample_paths):
substitutions.append(("$(MERLIN_HARD_FAIL)", str(int(ReturnCode.HARD_FAIL))))
substitutions.append(("$(MERLIN_RETRY)", str(int(ReturnCode.RETRY))))
substitutions.append(("$(MERLIN_STOP_WORKERS)", str(int(ReturnCode.STOP_WORKERS))))
substitutions.append(("$(MERLIN_RAISE_ERROR)", str(int(ReturnCode.RAISE_ERROR))))
return substitutions


Expand Down
2 changes: 2 additions & 0 deletions merlin/study/script_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,8 @@ def submit(self, step, path, cwd, job_map=None, env=None): # pylint: disable=R0
step.restart = False
elif retcode == ReturnCode.STOP_WORKERS:
LOG.debug("Execution returned status STOP_WORKERS")
elif retcode == ReturnCode.RAISE_ERROR:
LOG.debug("Execution returned status RAISE_ERROR")
else:
LOG.warning(f"Unrecognized Merlin Return code: {retcode}, returning SOFT_FAIL")
submission_record.add_info("retcode", retcode)
Expand Down
5 changes: 3 additions & 2 deletions tests/integration/conditions.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,9 @@ class PathExists(Condition):
A condition for checking if a path to a file or directory exists
"""

def __init__(self, pathname) -> None:
def __init__(self, pathname, negate=False) -> None:
self.pathname = pathname
self.negate = negate

def path_exists(self) -> bool:
"""Check if a path exists"""
Expand All @@ -319,7 +320,7 @@ def __str__(self) -> str:

@property
def passes(self):
return self.path_exists()
return not self.path_exists() if self.negate else self.path_exists()


class FileHasRegex(Condition):
Expand Down
46 changes: 46 additions & 0 deletions tests/integration/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"""

import shutil
import sys

# Pylint complains that we didn't install this module but it's defined locally so ignore
from conditions import ( # pylint: disable=E0401
Expand All @@ -63,6 +64,24 @@
KILL_WORKERS = "pkill -9 -f '.*merlin_test_worker'"


def is_python_version_above_37() -> bool:
"""
Check if the python version is above or below 3.7.

:returns: True if python version is 3.8+. False otherwise.
"""
if sys.version_info.major > 3:
return True
if sys.version_info.major < 3:
return False

# If we're here then the major version is 3
if sys.version_info.minor >= 8:
return True

return False


def get_worker_by_cmd(cmd: str, default: str) -> str:
"""
Given a command used by a scheduler (e.g. flux for flux, jsrun for lsf, etc.)
Expand Down Expand Up @@ -128,6 +147,7 @@ def define_tests(): # pylint: disable=R0914,R0915
lsf = f"{examples}/lsf/lsf_par.yaml"
mul_workers_demo = f"{dev_examples}/multiple_workers.yaml"
cli_substitution_wf = f"{test_specs}/cli_substitution_test.yaml"
chord_err_wf = f"{test_specs}/chord_err.yaml"

# Other shortcuts
black = "black --check --target-version py36"
Expand Down Expand Up @@ -827,6 +847,31 @@ def define_tests(): # pylint: disable=R0914,R0915
"run type": "distributed",
},
}
distributed_error_checks = {
"check chord error continues wf": {
"cmds": [
f"{workers} {chord_err_wf} --vars OUTPUT_PATH=./{OUTPUT_DIR}",
f"""{run} {chord_err_wf} --vars OUTPUT_PATH=./{OUTPUT_DIR};
sleep {10 if is_python_version_above_37() else 30}; tree {OUTPUT_DIR}""",
bgunnar5 marked this conversation as resolved.
Show resolved Hide resolved
],
"conditions": [
HasReturnCode(),
PathExists( # Check that the sample that's supposed to raise an error actually raises an error
f"{OUTPUT_DIR}/process_samples/01/MERLIN_FINISHED",
negate=True,
),
StepFileExists( # Check that step 3 is actually started and completes
"step_3",
"MERLIN_FINISHED",
"chord_err",
OUTPUT_DIR,
),
],
"run type": "distributed",
"cleanup": KILL_WORKERS,
"num procs": 2,
}
}

# combine and return test dictionaries
all_tests = {}
Expand All @@ -849,6 +894,7 @@ def define_tests(): # pylint: disable=R0914,R0915
stop_workers_tests,
query_workers_tests,
distributed_tests,
distributed_error_checks,
]:
all_tests.update(test_dict)

Expand Down
3 changes: 3 additions & 0 deletions tests/integration/samples_files/samples.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
SUCCESS_1
RAISE
SUCCESS_2
54 changes: 54 additions & 0 deletions tests/integration/test_specs/chord_err.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
description:
name: chord_err
description: test the chord err problem

env:
variables:
OUTPUT_PATH: ./studies

global.parameters:
TEST_PARAM:
values: [2, 4]
label: TEST_PARAM.%%

study:
- name: process_samples
description: Process samples. Purposefully try to trigger the chord err
run:
cmd: |
if [ $(SAMPLE) == "RAISE" ];
then
exit $(MERLIN_RAISE_ERROR)
else
echo "Success for sample $(SAMPLE)"
fi
- name: samples_and_params
description: step with samples and parameters
run:
cmd: |
echo "sample: $(SAMPLE); param: $(TEST_PARAM)"
if [ -f $(process_samples.workspace)/$(MERLIN_SAMPLE_PATH)/MERLIN_FINISHED ];
then
echo "MERLIN finished file found at $(process_samples.workspace)/$(MERLIN_SAMPLE_PATH)"
else
echo "MERLIN finished file NOT found at $(process_samples.workspace)/$(MERLIN_SAMPLE_PATH)"
fi
depends: [process_samples_*]
- name: step_3
description: funnel step
run:
cmd: |
echo "Running step_3"
depends: [samples_and_params_*]

merlin:
samples:
column_labels: [SAMPLE]
file: $(MERLIN_INFO)/samples.csv
generate:
cmd: cp $(SPECROOT)/../samples_files/samples.csv $(MERLIN_INFO)/samples.csv
resources:
workers:
merlin_test_worker:
args: -l INFO --concurrency 1 --prefetch-multiplier 1 -Ofair
steps: [process_samples, samples_and_params, step_3]
Loading