Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix/status_nested_workspace #480

Merged
merged 18 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added tests for the `dict_deep_merge` function
- Pytest-mock as a dependency for the test suite (necessary for using mocks and fixtures in the same test)
- New github action test to make sure target branch has been merged into the source first, so we know histories are ok
- Check in the status commands to make sure we're not pulling statuses from nested workspaces
- Added `setuptools` as a requirement for python 3.12 to recognize the `pkg_resources` library

### Changed
- `merlin info` is cleaner and gives python package info
- merlin version now prints with every banner message
- Applying filters for `merlin detailed-status` will now log debug statements instead of warnings
- Modified the unit tests for the `merlin status` command to use pytest rather than unittest
- Added fixtures for `merlin status` tests that copy the workspace to a temporary directory so you can see exactly what's run in a test

### Fixed
- Bugfix for output of `merlin example openfoam_wf_singularity`
- A bug with the CHANGELOG detection test when the target branch isn't in the ci runner history
- Link to Merlin banner in readme
- Issue with escape sequences in ascii art (caught by python 3.12)


## [1.12.1]
Expand Down
8 changes: 4 additions & 4 deletions merlin/ascii_art.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@


__ __ _ _
| \/ | | (_)
| \ / | ___ _ __| |_ _ __
| |\/| |/ _ \ '__| | | '_ \
| \\/ | | (_)
| \\ / | ___ _ __| |_ _ __
| |\\/| |/ _ \\ '__| | | '_ \\
| | | | __/ | | | | | | |
|_| |_|\___|_| |_|_|_| |_|
|_| |_|\\___|_| |_|_|_| |_|

Machine Learning for HPC Workflows

Expand Down
31 changes: 18 additions & 13 deletions merlin/study/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,8 @@ def get_step_statuses(self, step_workspace: str, started_step_name: str) -> Dict
Given a step workspace and the name of the step, read in all the statuses
for the step and return them in a dict.

:param `step_workspace`: The path to the step we're going to read statuses from
:param step_workspace: The path to the step we're going to read statuses from
:param started_step_name: The name of the step that we're gathering statuses for
:returns: A dict of statuses for the given step
"""
step_statuses = {}
Expand All @@ -354,7 +355,12 @@ def get_step_statuses(self, step_workspace: str, started_step_name: str) -> Dict

# Traverse the step workspace and look for MERLIN_STATUS files
LOG.debug(f"Traversing '{step_workspace}' to find MERLIN_STATUS.json files...")
for root, _, _ in os.walk(step_workspace):
for root, dirs, _ in os.walk(step_workspace, topdown=True):
# Look for nested workspaces and skip them
timestamp_regex = r"\d{8}-\d{6}$"
curr_dir = os.path.split(root)[1]
dirs[:] = [d for d in dirs if not re.search(timestamp_regex, curr_dir)]

# Search for a status file
status_filepath = os.path.join(root, "MERLIN_STATUS.json")
matching_files = glob(status_filepath)
Expand Down Expand Up @@ -869,8 +875,7 @@ def apply_filters(self):
if matches_found == self.args.max_tasks:
break
else:
# If our filters aren't a match for this task then delete it
LOG.warning(f"No matching filter for '{sub_step_workspace}'.")
LOG.debug(f"No matching filter for '{sub_step_workspace}'.")

# If we've hit the limit set by args.max_tasks, break out of the outer loop
if matches_found == self.args.max_tasks:
Expand Down Expand Up @@ -1121,7 +1126,7 @@ def status_conflict_handler(*args, **kwargs) -> Any: # pylint: disable=W0613

There are currently 4 rules:
- string-concatenate: take the two conflicting values and concatenate them in a string
- use-initial-and-log-warning: use the value from dict_a and log a warning message
- use-dict_b-and-log-debug: use the value from dict_b and log a debug message
- use-longest-time: use the longest time between the two conflicting values
- use-max: use the larger integer between the two conflicting values

Expand All @@ -1136,8 +1141,8 @@ def status_conflict_handler(*args, **kwargs) -> Any: # pylint: disable=W0613
merge_rules = {
"task_queue": "string-concatenate",
"worker_name": "string-concatenate",
"status": "use-initial-and-log-warning",
"return_code": "use-initial-and-log-warning",
"status": "use-dict_b-and-log-debug",
"return_code": "use-dict_b-and-log-debug",
"elapsed_time": "use-longest-time",
"run_time": "use-longest-time",
"restarts": "use-max",
Expand All @@ -1150,13 +1155,13 @@ def status_conflict_handler(*args, **kwargs) -> Any: # pylint: disable=W0613

# params = self.spec.get_parameters()
# for token in params.parameters:
# merge_rules[token] = "use-initial-and-log-warning"
# merge_rules[token] = "use-dict_b-and-log-debug"

# Set parameter token key rules (commented for loop would be better but it's
# only possible if this conflict handler is contained within Status object; however,
# since this function needs to be imported outside of this file we can't do that)
if path is not None and "parameters" in path:
merge_rules[key] = "use-initial-and-log-warning"
merge_rules[key] = "use-dict_b-and-log-debug"

try:
merge_rule = merge_rules[key]
Expand All @@ -1168,13 +1173,13 @@ def status_conflict_handler(*args, **kwargs) -> Any: # pylint: disable=W0613

if merge_rule == "string-concatenate":
merge_val = f"{dict_a_val}, {dict_b_val}"
elif merge_rule == "use-initial-and-log-warning":
LOG.warning(
f"Conflict at key '{key}' while merging status files. Defaulting to initial value. "
elif merge_rule == "use-dict_b-and-log-debug":
LOG.debug(
f"Conflict at key '{key}' while merging status files. Using the updated value. "
"This could lead to incorrect status information, you may want to re-run in debug mode and "
"check the files in the output directory for this task."
)
merge_val = dict_a_val
merge_val = dict_b_val
elif merge_rule == "use-longest-time":
if dict_a_val == "--:--:--":
merge_val = dict_b_val
Expand Down
1 change: 1 addition & 0 deletions requirements/release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ numpy
parse
psutil>=5.1.0
pyyaml>=5.1.2
setuptools
tabulate
redis>=4.3.4
115 changes: 112 additions & 3 deletions tests/fixtures/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,23 @@
"""

import os
import shutil
from argparse import Namespace
from pathlib import Path

import pytest
import yaml

from tests.unit.study.status_test_files import status_test_variables

@pytest.fixture(scope="class")

@pytest.fixture(scope="session")
def status_testing_dir(temp_output_dir: str) -> str:
"""
A pytest fixture to set up a temporary directory to write files to for testing status.

:param temp_output_dir: The path to the temporary output directory we'll be using for this test run
:returns: The path to the temporary testing directory for status testing
"""
testing_dir = f"{temp_output_dir}/status_testing/"
if not os.path.exists(testing_dir):
Expand All @@ -23,15 +29,118 @@ def status_testing_dir(temp_output_dir: str) -> str:
return testing_dir


@pytest.fixture(scope="class")
@pytest.fixture(scope="session")
def status_empty_file(status_testing_dir: str) -> str: # pylint: disable=W0621
"""
A pytest fixture to create an empty status file.

:param status_testing_dir: A pytest fixture that defines a path to the the output directory we'll write to
:param status_testing_dir: A pytest fixture that defines a path to the the output
directory we'll write to
:returns: The path to the empty status file
"""
empty_file = Path(f"{status_testing_dir}/empty_status.json")
if not empty_file.exists():
empty_file.touch()

return empty_file


@pytest.fixture(scope="session")
def status_spec_path(status_testing_dir: str) -> str: # pylint: disable=W0621
"""
Copy the test spec to the temp directory and modify the OUTPUT_PATH in the spec
to point to the temp location.

:param status_testing_dir: A pytest fixture that defines a path to the the output
directory we'll write to
:returns: The path to the spec file
"""
test_spec = f"{os.path.dirname(__file__)}/../unit/study/status_test_files/status_test_spec.yaml"
spec_in_temp_dir = f"{status_testing_dir}/status_test_spec.yaml"
shutil.copy(test_spec, spec_in_temp_dir) # copy test spec to temp directory

# Modify the OUTPUT_PATH variable to point to the temp directory
with open(spec_in_temp_dir, "r") as spec_file:
spec_contents = yaml.load(spec_file, yaml.Loader)
spec_contents["env"]["variables"]["OUTPUT_PATH"] = status_testing_dir
with open(spec_in_temp_dir, "w") as spec_file:
yaml.dump(spec_contents, spec_file)

return spec_in_temp_dir


def set_sample_path(output_workspace: str):
"""
A pytest fixture to set the path to the samples file in the test spec.

:param output_workspace: The workspace that we'll pull the spec file to update from
"""
temp_merlin_info_path = f"{output_workspace}/merlin_info"
expanded_spec_path = f"{temp_merlin_info_path}/status_test_spec.expanded.yaml"

# Read in the contents of the expanded spec
with open(expanded_spec_path, "r") as expanded_file:
expanded_contents = yaml.load(expanded_file, yaml.Loader)

# Modify the samples file path
expanded_contents["merlin"]["samples"]["file"] = f"{temp_merlin_info_path}/samples.csv"

# Write the new contents to the expanded spec
with open(expanded_spec_path, "w") as expanded_file:
yaml.dump(expanded_contents, expanded_file)


@pytest.fixture(scope="session")
def status_output_workspace(status_testing_dir: str) -> str: # pylint: disable=W0621
"""
A pytest fixture to copy the test output workspace for status to the temporary
status testing directory.

:param status_testing_dir: A pytest fixture that defines a path to the the output
directory we'll write to
:returns: The path to the output workspace in the temp status testing directory
"""
output_workspace = f"{status_testing_dir}/{status_test_variables.VALID_WORKSPACE}"
shutil.copytree(status_test_variables.VALID_WORKSPACE_PATH, output_workspace) # copy over the files
set_sample_path(output_workspace) # set the path to the samples file in the expanded yaml
return output_workspace


@pytest.fixture(scope="function")
def status_args():
"""
A pytest fixture to set up a namespace with all the arguments necessary for
the Status object.

:returns: The namespace with necessary arguments for the Status object
"""
return Namespace(
subparsers="status",
level="INFO",
detailed=False,
output_path=None,
task_server="celery",
cb_help=False,
dump=None,
no_prompts=True, # We'll set this to True here since it's easier to test this way
)


@pytest.fixture(scope="session")
def status_nested_workspace(status_testing_dir: str) -> str: # pylint: disable=W0621
"""
Create an output workspace that contains another output workspace within one of its
steps. In this case it will copy the status test workspace then within the 'just_samples'
step we'll copy the status test workspace again but with a different name.

:param status_testing_dir: A pytest fixture that defines a path to the the output
directory we'll write to
:returns: The path to the top level workspace
"""
top_level_workspace = f"{status_testing_dir}/status_test_study_nested_20240520-163524"
nested_workspace = f"{top_level_workspace}/just_samples/nested_workspace_20240520-163524"
shutil.copytree(status_test_variables.VALID_WORKSPACE_PATH, top_level_workspace) # copy over the top level workspace
shutil.copytree(status_test_variables.VALID_WORKSPACE_PATH, nested_workspace) # copy over the nested workspace
set_sample_path(top_level_workspace) # set the path to the samples file in the expanded yaml of the top level workspace
set_sample_path(nested_workspace) # set the path to the samples file in the expanded yaml of the nested workspace
return top_level_workspace
Loading
Loading