Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix/status_nested_workspace #480

Merged
merged 18 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added tests for the `dict_deep_merge` function
- Pytest-mock as a dependency for the test suite (necessary for using mocks and fixtures in the same test)
- New github action test to make sure target branch has been merged into the source first, so we know histories are ok
- Check in the status commands to make sure we're not pulling statuses from nested workspaces

### Changed
- `merlin info` is cleaner and gives python package info
- merlin version now prints with every banner message
- Applying filters for `merlin detailed-status` will now log debug statements instead of warnings

### Fixed
- Bugfix for output of `merlin example openfoam_wf_singularity`
Expand Down
53 changes: 41 additions & 12 deletions merlin/study/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,12 +339,37 @@ def num_requested_statuses(self):

return num_statuses

def move_to_next_dir(self, timestamp_dirs: List[str], root: str) -> bool:
"""
Look for nested workspaces and skip them.

:param timestamp_dirs: A list of nested workspaces
:param root: The path to the directory we need to check
:returns: True if we should skip this directory. False otherwise.
"""
timestamp_regex = r"\d{8}-\d{6}$"
curr_dir = root.split("/")[-1]
bgunnar5 marked this conversation as resolved.
Show resolved Hide resolved
match = re.search(timestamp_regex, curr_dir)

# If a workspace with a timestamp is found add it to the list and move to next dir
if match:
timestamp_dirs.append(curr_dir)
return True

# If a workspace with a timestamp wasn't found, look to see if we should skip this
for ts_dir in timestamp_dirs:
if ts_dir in root:
return True

return False


def get_step_statuses(self, step_workspace: str, started_step_name: str) -> Dict[str, List[str]]:
"""
Given a step workspace and the name of the step, read in all the statuses
for the step and return them in a dict.

:param `step_workspace`: The path to the step we're going to read statuses from
:param step_workspace: The path to the step we're going to read statuses from
:returns: A dict of statuses for the given step
"""
step_statuses = {}
Expand All @@ -354,7 +379,12 @@ def get_step_statuses(self, step_workspace: str, started_step_name: str) -> Dict

# Traverse the step workspace and look for MERLIN_STATUS files
LOG.debug(f"Traversing '{step_workspace}' to find MERLIN_STATUS.json files...")
timestamp_dirs = []
for root, _, _ in os.walk(step_workspace):
# Skip any nested workspaces (iterative workflows may have this)
if self.move_to_next_dir(timestamp_dirs, root):
continue

# Search for a status file
status_filepath = os.path.join(root, "MERLIN_STATUS.json")
matching_files = glob(status_filepath)
Expand Down Expand Up @@ -869,8 +899,7 @@ def apply_filters(self):
if matches_found == self.args.max_tasks:
break
else:
# If our filters aren't a match for this task then delete it
LOG.warning(f"No matching filter for '{sub_step_workspace}'.")
LOG.debug(f"No matching filter for '{sub_step_workspace}'.")

# If we've hit the limit set by args.max_tasks, break out of the outer loop
if matches_found == self.args.max_tasks:
Expand Down Expand Up @@ -1121,7 +1150,7 @@ def status_conflict_handler(*args, **kwargs) -> Any: # pylint: disable=W0613

There are currently 4 rules:
- string-concatenate: take the two conflicting values and concatenate them in a string
- use-initial-and-log-warning: use the value from dict_a and log a warning message
- use-dict_b-and-log-debug: use the value from dict_b and log a debug message
- use-longest-time: use the longest time between the two conflicting values
- use-max: use the larger integer between the two conflicting values

Expand All @@ -1136,8 +1165,8 @@ def status_conflict_handler(*args, **kwargs) -> Any: # pylint: disable=W0613
merge_rules = {
"task_queue": "string-concatenate",
"worker_name": "string-concatenate",
"status": "use-initial-and-log-warning",
"return_code": "use-initial-and-log-warning",
"status": "use-dict_b-and-log-debug",
"return_code": "use-dict_b-and-log-debug",
"elapsed_time": "use-longest-time",
"run_time": "use-longest-time",
"restarts": "use-max",
Expand All @@ -1150,13 +1179,13 @@ def status_conflict_handler(*args, **kwargs) -> Any: # pylint: disable=W0613

# params = self.spec.get_parameters()
# for token in params.parameters:
# merge_rules[token] = "use-initial-and-log-warning"
# merge_rules[token] = "use-dict_b-and-log-debug"

# Set parameter token key rules (commented for loop would be better but it's
# only possible if this conflict handler is contained within Status object; however,
# since this function needs to be imported outside of this file we can't do that)
if path is not None and "parameters" in path:
merge_rules[key] = "use-initial-and-log-warning"
merge_rules[key] = "use-dict_b-and-log-debug"

try:
merge_rule = merge_rules[key]
Expand All @@ -1168,13 +1197,13 @@ def status_conflict_handler(*args, **kwargs) -> Any: # pylint: disable=W0613

if merge_rule == "string-concatenate":
merge_val = f"{dict_a_val}, {dict_b_val}"
elif merge_rule == "use-initial-and-log-warning":
LOG.warning(
f"Conflict at key '{key}' while merging status files. Defaulting to initial value. "
elif merge_rule == "use-dict_b-and-log-debug":
LOG.debug(
f"Conflict at key '{key}' while merging status files. Using the updated value. "
"This could lead to incorrect status information, you may want to re-run in debug mode and "
"check the files in the output directory for this task."
)
merge_val = dict_a_val
merge_val = dict_b_val
elif merge_rule == "use-longest-time":
if dict_a_val == "--:--:--":
merge_val = dict_b_val
Expand Down
Loading