Skip to content

Commit

Permalink
bugfix/status_nested_workspace (#480)
Browse files Browse the repository at this point in the history
* remove a merge conflict statement that was missed

* have status ignore nested workspaces and modify merge rules

* update CHANGELOG

* fixed issue with escape sequences in ascii art

* apply Luc's suggestion

* add setuptools as a requirement since python 3.12 doesn't have it natively

* modify unit tests for status to use pytest rather than unittest

* update CHANGELOG

* add fixtures for status testing and add nested workflow test

* update CHANGELOG
  • Loading branch information
bgunnar5 authored May 22, 2024
1 parent 897312f commit 297d9d5
Show file tree
Hide file tree
Showing 6 changed files with 262 additions and 125 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added tests for the `dict_deep_merge` function
- Pytest-mock as a dependency for the test suite (necessary for using mocks and fixtures in the same test)
- New github action test to make sure target branch has been merged into the source first, so we know histories are ok
- Check in the status commands to make sure we're not pulling statuses from nested workspaces
- Added `setuptools` as a requirement for python 3.12 to recognize the `pkg_resources` library

### Changed
- `merlin info` is cleaner and gives python package info
- merlin version now prints with every banner message
- Applying filters for `merlin detailed-status` will now log debug statements instead of warnings
- Modified the unit tests for the `merlin status` command to use pytest rather than unittest
- Added fixtures for `merlin status` tests that copy the workspace to a temporary directory so you can see exactly what's run in a test

### Fixed
- Bugfix for output of `merlin example openfoam_wf_singularity`
- A bug with the CHANGELOG detection test when the target branch isn't in the ci runner history
- Link to Merlin banner in readme
- Issue with escape sequences in ascii art (caught by python 3.12)


## [1.12.1]
Expand Down
8 changes: 4 additions & 4 deletions merlin/ascii_art.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@
__ __ _ _
| \/ | | (_)
| \ / | ___ _ __| |_ _ __
| |\/| |/ _ \ '__| | | '_ \
| \\/ | | (_)
| \\ / | ___ _ __| |_ _ __
| |\\/| |/ _ \\ '__| | | '_ \\
| | | | __/ | | | | | | |
|_| |_|\___|_| |_|_|_| |_|
|_| |_|\\___|_| |_|_|_| |_|
Machine Learning for HPC Workflows
Expand Down
31 changes: 18 additions & 13 deletions merlin/study/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,8 @@ def get_step_statuses(self, step_workspace: str, started_step_name: str) -> Dict
Given a step workspace and the name of the step, read in all the statuses
for the step and return them in a dict.
:param `step_workspace`: The path to the step we're going to read statuses from
:param step_workspace: The path to the step we're going to read statuses from
:param started_step_name: The name of the step that we're gathering statuses for
:returns: A dict of statuses for the given step
"""
step_statuses = {}
Expand All @@ -354,7 +355,12 @@ def get_step_statuses(self, step_workspace: str, started_step_name: str) -> Dict

# Traverse the step workspace and look for MERLIN_STATUS files
LOG.debug(f"Traversing '{step_workspace}' to find MERLIN_STATUS.json files...")
for root, _, _ in os.walk(step_workspace):
for root, dirs, _ in os.walk(step_workspace, topdown=True):
# Look for nested workspaces and skip them
timestamp_regex = r"\d{8}-\d{6}$"
curr_dir = os.path.split(root)[1]
dirs[:] = [d for d in dirs if not re.search(timestamp_regex, curr_dir)]

# Search for a status file
status_filepath = os.path.join(root, "MERLIN_STATUS.json")
matching_files = glob(status_filepath)
Expand Down Expand Up @@ -869,8 +875,7 @@ def apply_filters(self):
if matches_found == self.args.max_tasks:
break
else:
# If our filters aren't a match for this task then delete it
LOG.warning(f"No matching filter for '{sub_step_workspace}'.")
LOG.debug(f"No matching filter for '{sub_step_workspace}'.")

# If we've hit the limit set by args.max_tasks, break out of the outer loop
if matches_found == self.args.max_tasks:
Expand Down Expand Up @@ -1121,7 +1126,7 @@ def status_conflict_handler(*args, **kwargs) -> Any: # pylint: disable=W0613
There are currently 4 rules:
- string-concatenate: take the two conflicting values and concatenate them in a string
- use-initial-and-log-warning: use the value from dict_a and log a warning message
- use-dict_b-and-log-debug: use the value from dict_b and log a debug message
- use-longest-time: use the longest time between the two conflicting values
- use-max: use the larger integer between the two conflicting values
Expand All @@ -1136,8 +1141,8 @@ def status_conflict_handler(*args, **kwargs) -> Any: # pylint: disable=W0613
merge_rules = {
"task_queue": "string-concatenate",
"worker_name": "string-concatenate",
"status": "use-initial-and-log-warning",
"return_code": "use-initial-and-log-warning",
"status": "use-dict_b-and-log-debug",
"return_code": "use-dict_b-and-log-debug",
"elapsed_time": "use-longest-time",
"run_time": "use-longest-time",
"restarts": "use-max",
Expand All @@ -1150,13 +1155,13 @@ def status_conflict_handler(*args, **kwargs) -> Any: # pylint: disable=W0613

# params = self.spec.get_parameters()
# for token in params.parameters:
# merge_rules[token] = "use-initial-and-log-warning"
# merge_rules[token] = "use-dict_b-and-log-debug"

# Set parameter token key rules (commented for loop would be better but it's
# only possible if this conflict handler is contained within Status object; however,
# since this function needs to be imported outside of this file we can't do that)
if path is not None and "parameters" in path:
merge_rules[key] = "use-initial-and-log-warning"
merge_rules[key] = "use-dict_b-and-log-debug"

try:
merge_rule = merge_rules[key]
Expand All @@ -1168,13 +1173,13 @@ def status_conflict_handler(*args, **kwargs) -> Any: # pylint: disable=W0613

if merge_rule == "string-concatenate":
merge_val = f"{dict_a_val}, {dict_b_val}"
elif merge_rule == "use-initial-and-log-warning":
LOG.warning(
f"Conflict at key '{key}' while merging status files. Defaulting to initial value. "
elif merge_rule == "use-dict_b-and-log-debug":
LOG.debug(
f"Conflict at key '{key}' while merging status files. Using the updated value. "
"This could lead to incorrect status information, you may want to re-run in debug mode and "
"check the files in the output directory for this task."
)
merge_val = dict_a_val
merge_val = dict_b_val
elif merge_rule == "use-longest-time":
if dict_a_val == "--:--:--":
merge_val = dict_b_val
Expand Down
1 change: 1 addition & 0 deletions requirements/release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ numpy
parse
psutil>=5.1.0
pyyaml>=5.1.2
setuptools
tabulate
redis>=4.3.4
115 changes: 112 additions & 3 deletions tests/fixtures/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,23 @@
"""

import os
import shutil
from argparse import Namespace
from pathlib import Path

import pytest
import yaml

from tests.unit.study.status_test_files import status_test_variables

@pytest.fixture(scope="class")

@pytest.fixture(scope="session")
def status_testing_dir(temp_output_dir: str) -> str:
"""
A pytest fixture to set up a temporary directory to write files to for testing status.
:param temp_output_dir: The path to the temporary output directory we'll be using for this test run
:returns: The path to the temporary testing directory for status testing
"""
testing_dir = f"{temp_output_dir}/status_testing/"
if not os.path.exists(testing_dir):
Expand All @@ -23,15 +29,118 @@ def status_testing_dir(temp_output_dir: str) -> str:
return testing_dir


@pytest.fixture(scope="class")
@pytest.fixture(scope="session")
def status_empty_file(status_testing_dir: str) -> str: # pylint: disable=W0621
"""
A pytest fixture to create an empty status file.
:param status_testing_dir: A pytest fixture that defines a path to the the output directory we'll write to
:param status_testing_dir: A pytest fixture that defines a path to the the output
directory we'll write to
:returns: The path to the empty status file
"""
empty_file = Path(f"{status_testing_dir}/empty_status.json")
if not empty_file.exists():
empty_file.touch()

return empty_file


@pytest.fixture(scope="session")
def status_spec_path(status_testing_dir: str) -> str: # pylint: disable=W0621
"""
Copy the test spec to the temp directory and modify the OUTPUT_PATH in the spec
to point to the temp location.
:param status_testing_dir: A pytest fixture that defines a path to the the output
directory we'll write to
:returns: The path to the spec file
"""
test_spec = f"{os.path.dirname(__file__)}/../unit/study/status_test_files/status_test_spec.yaml"
spec_in_temp_dir = f"{status_testing_dir}/status_test_spec.yaml"
shutil.copy(test_spec, spec_in_temp_dir) # copy test spec to temp directory

# Modify the OUTPUT_PATH variable to point to the temp directory
with open(spec_in_temp_dir, "r") as spec_file:
spec_contents = yaml.load(spec_file, yaml.Loader)
spec_contents["env"]["variables"]["OUTPUT_PATH"] = status_testing_dir
with open(spec_in_temp_dir, "w") as spec_file:
yaml.dump(spec_contents, spec_file)

return spec_in_temp_dir


def set_sample_path(output_workspace: str):
"""
A pytest fixture to set the path to the samples file in the test spec.
:param output_workspace: The workspace that we'll pull the spec file to update from
"""
temp_merlin_info_path = f"{output_workspace}/merlin_info"
expanded_spec_path = f"{temp_merlin_info_path}/status_test_spec.expanded.yaml"

# Read in the contents of the expanded spec
with open(expanded_spec_path, "r") as expanded_file:
expanded_contents = yaml.load(expanded_file, yaml.Loader)

# Modify the samples file path
expanded_contents["merlin"]["samples"]["file"] = f"{temp_merlin_info_path}/samples.csv"

# Write the new contents to the expanded spec
with open(expanded_spec_path, "w") as expanded_file:
yaml.dump(expanded_contents, expanded_file)


@pytest.fixture(scope="session")
def status_output_workspace(status_testing_dir: str) -> str: # pylint: disable=W0621
"""
A pytest fixture to copy the test output workspace for status to the temporary
status testing directory.
:param status_testing_dir: A pytest fixture that defines a path to the the output
directory we'll write to
:returns: The path to the output workspace in the temp status testing directory
"""
output_workspace = f"{status_testing_dir}/{status_test_variables.VALID_WORKSPACE}"
shutil.copytree(status_test_variables.VALID_WORKSPACE_PATH, output_workspace) # copy over the files
set_sample_path(output_workspace) # set the path to the samples file in the expanded yaml
return output_workspace


@pytest.fixture(scope="function")
def status_args():
"""
A pytest fixture to set up a namespace with all the arguments necessary for
the Status object.
:returns: The namespace with necessary arguments for the Status object
"""
return Namespace(
subparsers="status",
level="INFO",
detailed=False,
output_path=None,
task_server="celery",
cb_help=False,
dump=None,
no_prompts=True, # We'll set this to True here since it's easier to test this way
)


@pytest.fixture(scope="session")
def status_nested_workspace(status_testing_dir: str) -> str: # pylint: disable=W0621
"""
Create an output workspace that contains another output workspace within one of its
steps. In this case it will copy the status test workspace then within the 'just_samples'
step we'll copy the status test workspace again but with a different name.
:param status_testing_dir: A pytest fixture that defines a path to the the output
directory we'll write to
:returns: The path to the top level workspace
"""
top_level_workspace = f"{status_testing_dir}/status_test_study_nested_20240520-163524"
nested_workspace = f"{top_level_workspace}/just_samples/nested_workspace_20240520-163524"
shutil.copytree(status_test_variables.VALID_WORKSPACE_PATH, top_level_workspace) # copy over the top level workspace
shutil.copytree(status_test_variables.VALID_WORKSPACE_PATH, nested_workspace) # copy over the nested workspace
set_sample_path(top_level_workspace) # set the path to the samples file in the expanded yaml of the top level workspace
set_sample_path(nested_workspace) # set the path to the samples file in the expanded yaml of the nested workspace
return top_level_workspace
Loading

0 comments on commit 297d9d5

Please sign in to comment.