diff --git a/.github/workflows/push-pr_workflow.yml b/.github/workflows/push-pr_workflow.yml index eecbf3eeb..4b5de2373 100644 --- a/.github/workflows/push-pr_workflow.yml +++ b/.github/workflows/push-pr_workflow.yml @@ -95,6 +95,7 @@ jobs: python3 -m pip install --upgrade pip if [ -f requirements.txt ]; then pip install -r requirements.txt; fi pip3 install -r requirements/dev.txt + pip freeze - name: Install singularity run: | diff --git a/CHANGELOG.md b/CHANGELOG.md index c7862710a..b69525389 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,24 @@ All notable changes to Merlin will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.12.1] +### Added +- New Priority.RETRY value for the Celery task priorities. This will be the new highest priority. +- Support for the status command to handle multiple workers on the same step +- Documentation on how to run cross-node workflows with a containerized server (`merlin server`) + +### Changed +- Modified some tests in `test_status.py` and `test_detailed_status.py` to accommodate bugfixes for the status commands + +### Fixed +- Bugfixes for the status commands: + - Fixed "DRY RUN" naming convention so that it outputs in the progress bar properly + - Fixed issue where a step that was run with one sample would delete the status file upon condensing + - Fixed issue where multiple workers processing the same step would break the status file and cause the workflow to crash + - Added a catch for the JSONDecodeError that would potentially crash a run + - Added a FileLock to the status write in `_update_status_file()` of `MerlinStepRecord` to avoid potential race conditions (potentially related to JSONDecodeError above) + - Added in `export MANPAGER="less -r"` call behind the scenes for `detailed-status` to fix ASCII error + ## [1.12.0] ### Added - A new command `merlin queue-info` that will print the status of your celery queues diff --git a/Makefile b/Makefile index 0470b3332..cfd4cea29 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/docs/user_guide/configuration/containerized_server.md b/docs/user_guide/configuration/containerized_server.md new file mode 100644 index 000000000..218e97f23 --- /dev/null +++ b/docs/user_guide/configuration/containerized_server.md @@ -0,0 +1,621 @@ +# Containerized Server Configuration + +!!! warning + + It's recommended that you read through the [Configuration Overview](./index.md) page before proceeding with this module. + +!!! warning + + It is not possible to run [cross-machine workflows](../../tutorial/5_advanced_topics.md#multi-machine-workflows) with the containerized servers created with the `merlin server` command. + +The `merlin server` command allows users easy access to containerized broker and results servers for Merlin workflows. This allows users to run Merlin without a dedicated external server. + +The main configuration will be stored in the subdirectory called `server/` by default in the main Merlin configuration directory `~/.merlin`. However, different server images can be created for different use cases or studies by simplying creating a new directory to store local configuration files for Merlin server instances. + +This module will walk through how to initalize the server, start it, and ensure it's linked to Merlin. There will also be an explanation of how to set the containerized server up to run across multiple nodes. + +## Initializing the Server + +First create and navigate into a directory to store your local Merlin configuration for a specific use case or study: + +```bash +mkdir study1/ ; cd study1/ +``` + +Afterwards you can instantiate Merlin server in this directory by running: + +```bash +merlin server init +``` + +A main server configuration will be created in the `~/.merlin/server/` directory. This will have the following files: + +- docker.yaml +- merlin_server.yaml +- podman.yaml +- singularity.yaml + +The main configuration in `~/.merlin/server/` deals with defaults and technical commands that might be used for setting up the Merlin server local configuration and its containers. Each container has their own configuration file to allow users to be able to switch between different containerized services freely. + +In addition to the main server configuration, a local server configuration will be created in your current working directory in a folder called `merlin_server/`. This directory will contain: + +- `redis.conf`: The Redis configuration file that contains all of the settings to be used for our Redis server +- `redis.pass`: A password for the Redis server that we'll start up next +- `redis.users`: A file defining the users that are allowed to access the Redis server and their permissions +- `redis_latest.sif`: A singularity file that contains the latest Redis Docker image that was pulled behind the scenes by Merlin + +The local configuration `merlin_server/` folder contains configuration files specific to a certain use case or run. In the case above you can see that we have a Redis singularity container called `redis_latest.sif` with the Redis configuration file called `redis.conf`. This Redis configuration will allow the user to configure Redis to their specified needs without have to manage or edit the Redis container. When the server is run this configuration will be dynamically read, so settings can be changed between runs if needed. + +Once the Merlin server has been initialized in the local directory the user will be allowed to run other Merlin server commands such as `start`, `status`, and `stop` to interact with the Merlin server. A detailed list of commands can be found in the [Merlin Server](../command_line.md#server-merlin-server) section of the [Command Line](../command_line.md) page. + +!!! note + + Running `merlin server init` again will *not* override any exisiting configuration that the users might have set or edited. By running this command again any missing files will be created for the users with exisiting defaults. *However,* it is highly advised that users back up their configuration in case an error occurs where configuration files are overriden. + +## Starting the Server and Linking it to Merlin + +!!! warning + + Newer versions of Redis have started requiring a global variable `LC_ALL` to be set in order for this to work. To set this properly, run: + + ```bash + export LC_ALL="C" + ``` + + If this is not set, the `merlin server start` command may seem to run forever until you manually terminate it. + +After initializing the server, starting the server is as simple as running: + +```bash +merlin server start +``` + +You can check that the server was started properly with: + +```bash +merlin server status +``` + +The `merlin server start` command will add new files to the local configuration `merlin_server/` folder: + +- `merlin_server.pf`: A process file containing information regarding the Redis process +- `app.yaml`: A new app.yaml file configured specifically for the containerized Redis server that we just started + +To have Merlin read this server configuration: + +=== "Copy Configuration to CWD" + + ```bash + cp merlin_server/app.yaml . + ``` + +=== "Make This Server Configuration Your Main Configuration" + + If you're going to use the server configuration as your main configuration, it's a good idea to make a backup of your current server configuration (if you have one): + + ```bash + mv ~/.merlin/app.yaml ~/.merlin/app.yaml.bak + ``` + + From here, simply copy the server configuration to your `~/.merlin/` folder: + + ```bash + cp merlin_server/app.yaml ~/.merlin/app.yaml + ``` + +You can check that Merlin recognizes the containerized server connection with: + +```bash +merlin info +``` + +If your servers are running and set up properly, this should output something similar to this: + +???+ success + + ```bash + * + *~~~~~ + *~~*~~~* __ __ _ _ + / ~~~~~ | \/ | | (_) + ~~~~~ | \ / | ___ _ __| |_ _ __ + ~~~~~* | |\/| |/ _ \ '__| | | '_ \ + *~~~~~~~ | | | | __/ | | | | | | | + ~~~~~~~~~~ |_| |_|\___|_| |_|_|_| |_| + *~~~~~~~~~~~ + ~~~*~~~* Machine Learning for HPC Workflows + + + + Merlin Configuration + ------------------------- + + config_file | /path/to/app.yaml + is_debug | False + merlin_home | /path/to/.merlin + merlin_home_exists | True + broker server | redis://default:******@127.0.0.1:6379/0 + broker ssl | False + results server | redis://default:******@127.0.0.1:6379/0 + results ssl | False + + Checking server connections: + ---------------------------- + broker server connection: OK + results server connection: OK + + Python Configuration + ------------------------- + + $ which python3 + /path/to/python3 + + $ python3 --version + Python x.y.z + + $ which pip3 + /path/to/pip3 + + $ pip3 --version + pip x.y.x from /path/to/pip (python x.y) + + "echo $PYTHONPATH" + ``` + +## Stopping the Server + +Once you're done using your containerized server, it can be stopped with: + +```bash +merlin server stop +``` + +You can check that it's no longer running with: + +```bash +merlin server status +``` + +## Running Cross-Node Workflows with a Containerized Server + +By default, the container will be started using `localhost` as the location to route network traffic through. For a cross-node workflow, we'll have to modify this since `localhost` will only be reachable on the same node that the container is running on. + +Instead of using `localhost`, we'll do two things: + +1. Configure the server to point to the IP address of the node that's hosting the server with `merlin server config` +2. Modify the `app.yaml` file generated by the `merlin server start` command so that the `server` setting points to the name of the node hosting the server + +### Creating the Cross-Node Server Launch Script + +We'll automate the process described in the beginning of this section by creating a batch script. We can start with a typical header: + +```bash title="server.sbatch" linenums="1" +#!/bin/bash +#SBATCH -N 1 +#SBATCH -J Merlin +#SBATCH -t 00:20:00 +#SBATCH -p pdebug +#SBATCH -A wbronze +#SBATCH -o merlin_server_%j.out +``` + +These settings can easily be modified as you see fit for your workflow. Specifically, you'll want to modify the walltime to run for however long your workflow should take. You should also modify the queue and the bank to fit your needs. + +Next, similar to what's common for worker launch scripts (like the one shown in [Distributed Runs](../running_studies.md#distributed-runs)), we'll add a variable to define a path to the virtual environment where Merlin is installed and a statement to activate this environment: + +```bash title="server.sbatch" linenums="9" +# Turn off core files to work aroung flux exec issue. +ulimit -c 0 + +# Path to virtual environment containing Merlin +VENV=/path/to/your/merlin_venv # UPDATE THIS PATH + +# Activate the virtual environment +source ${VENV}/bin/activate +``` + +You'll need to modify the `VENV` variable to point to your Merlin virtual environment. + +Now that we've got Merlin activated, let's get started with initializing the server (for more information on what's happening in this process, [see above](#initializing-the-server)): + +```bash title="server.sbatch" linenums="18" +######################################### +# Starting the Server # +######################################### + +# Necessary for the Redis server to spin up +export LC_ALL="C" + +# Initialize the server files +merlin server init + +# Check to make sure the server initialized properly +MERLIN_SERVER_DIR=`pwd`/merlin_server +echo "merlin_server_dir: $MERLIN_SERVER_DIR" +if ! [ -d $MERLIN_SERVER_DIR ]; then + echo "The server directory '$MERLIN_SERVER_DIR' doesn't exist. Likely a problem with 'merlin server init'" + exit 1 +fi +``` + +When `merlin server init` is executed, the server files should all be initialized in a folder named `merlin_server` located in your current working directory. If there was a problem with this process, the check after should output an error message and exit gracefully. + +The server files now exist so it's time to handle our first major task listed at the start of this section: configuring the server to point to the IP of the current node. We can accomplish this by adding the following lines to `server.sbatch`: + +```bash title="server.sbatch" linenums="36" +# Obtain the ip of the current node and set the server to point to it +ip=`getent hosts $(hostname) | awk '{ print $1 }'` +echo "ip: $ip" +merlin server config -ip $ip +``` + +The server is now configured to use the IP address of the current node. + +We'll now start up the server so that we can obtain the `app.yaml` file that we'll need to modify for the second major task listed at the start of this section: + +```bash title="server.sbatch" linenums="41" +# Start the server (this creates the app.yaml file that we need) +merlin server start + +# Check to make sure the app.yaml file was created properly +APP_YAML_PATH=$MERLIN_SERVER_DIR/app.yaml +echo "app_yaml_path: $APP_YAML_PATH" +if ! [ -f $APP_YAML_PATH ]; then + echo "The app.yaml file '$APP_YAML_PATH' doesn't exist. Likely a problem with 'merlin server start'" + exit 1 +fi +``` + +From here, we'll pause on creating the server launch script so that we can create a python script to assist us in updating the `app.yaml` programmatically. + +### Creating a Script to Update the Server in `app.yaml` + +At this point in the execution of our server launch script, our server should be started and an `app.yaml` file should exist in the `merlin_server` directory that was created by our call to `merlin server init`. In this subsection, we'll create a python script `update_app_hostname.py` that updates the `server` settings in the `app.yaml` file so that they point to the name of the node that's hosting the server. + +This `update_app_hostname.py` script will need to take in a hostname and a path to the `app.yaml` file that we need to update. Therefore, we'll start this script by establishing arguments with Python's built-in [argparse library](https://docs.python.org/3/library/argparse.html): + +```python title="update_app_hostname.py" +import argparse + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("hostname", help="the hostname to set in the app.yaml file") + parser.add_argument("app_yaml", help="the path to the app.yaml file to update") + args = parser.parse_args() +``` + +Before updating the `app.yaml` file using the path that was passed in, let's ensure the path exists just to be safe: + +```python title="update_app_hostname.py" hl_lines="3 11" +import argparse + +from merlin.utils import verify_filepath + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("hostname", help="the hostname to set in the app.yaml file") + parser.add_argument("app_yaml", help="the path to the app.yaml file to update") + args = parser.parse_args() + + app_yaml_path = verify_filepath(args.app_yaml) +``` + +Finally, we'll add a function `update_app_yaml` to do the actual updating of the `app.yaml` file. This function will load in the current contents of the `app.yaml` file, update the necessary `server` settings, and dump the updated settings back to the `app.yaml` file. + +```python title="update_app_hostname.py" hl_lines="2 6-29 39" +import argparse +import yaml + +from merlin.utils import verify_filepath + +def update_app_yaml(hostname, app_yaml): + """ + Read in the app.yaml contents, update them, then write the updated + contents back to the file. + + :param hostname: The hostname to set our broker and results server to + :param app_yaml: The path to the app.yaml file to update + """ + with open(app_yaml, "r") as yaml_file: + try: + contents = yaml.load(yaml_file, yaml.FullLoader) + except AttributeError: + LOG.warning( + "PyYAML is using an unsafe version with a known " + "load vulnerability. Please upgrade your installation " + "to a more recent version!" + ) + contents = yaml.load(yaml_file, yaml.Loader) + + contents["broker"]["server"] = hostname + contents["results_backend"]["server"] = hostname + + with open(app_yaml, "w") as yaml_file: + yaml.dump(contents, yaml_file) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("hostname", help="the hostname to set in the app.yaml file") + parser.add_argument("app_yaml", help="the path to the app.yaml file to update") + args = parser.parse_args() + + app_yaml_path = verify_filepath(args.app_yaml) + update_app_yaml(args.hostname, app_yaml_path) +``` + +This script is now complete. If you'd like to test it out to make sure it works: + +1. Create a copy of your current `app.yaml` file +2. Call this script with: + + ``` + python update_app_hostname.py test /path/to/app_copy.yaml + ``` + +3. Open your `app_copy.yaml` file and verify that the `server` settings are now `server: test` + +From here, we'll pick up where we left off with creating the server launch script. + +### Finishing the Cross-Node Server Launch Script + +We now have a fully complete python script to update our `app.yaml` file and a partially complete batch script to launch our server. All that's left now is to finish up the server launch script. + +To accomplish this, we'll use the `hostname` command to obtain the name of the host that the server is currently on and pass that in to a call to our `update_app_hostname.py` script that was created in the [previous subsection](#creating-a-script-to-update-the-server-in-appyaml): + +```bash title="server.sbatch" linenums="52" +# Update the app.yaml file generated by merlin server start to point to the hostname of this node +python update_app_hostname.py `hostname` ${APP_YAML_PATH} +``` + +When Merlin reads in the `app.yaml` file, it will search for this file in two locations: your current working directory and `~/.merlin`. In order for Merlin to read in this `app.yaml` file that we just updated, we need to copy it to the directory where you'll launch your study from (AKA the current working directory): + +```bash title="server.sbatch" linenums="55" +# Move the app.yaml to the project directory +PROJECT_DIR=`pwd` +cp ${MERLIN_SERVER_DIR}/app.yaml ${PROJECT_DIR} +``` + +In these lines, we're assuming that this file is located in the same place as your spec file. If you place this file elsewhere you'll need to modify the `PROJECT_DIR` variable to point to where your spec file will be launched from. + +Finally, let's add in a statement to see if our server is connected properly (this will help with debugging) and a call to sleep forever so that this server stays up and running until our allocation terminates: + +```bash title="server.sbatch" linenums="59" +# Check the server connection +merlin info + +# Keeping the allocation alive so that the server remains up for as long as possible +sleep inf +``` + +This file is now complete. The full versions of the `server.sbatch` and the `update_app_hostname.py` files can be found in the section below. + +### Full Scripts + +The cross-node containerized server configuration requires two scripts: + +1. `server.sbatch` - the script needed to launch the server +2. `update_app_hostname.py` - the script needed to update the `app.yaml` file + +Below are the full scripts: + +=== "server.sbatch" + + ```bash title="server.sbatch" + #!/bin/bash + #SBATCH -N 1 + #SBATCH -J Merlin + #SBATCH -t 00:20:00 + #SBATCH -p pdebug + #SBATCH -A wbronze + #SBATCH -o merlin_server_%j.out + + # Turn off core files to work aroung flux exec issue. + ulimit -c 0 + + # Path to virtual environment containing Merlin + VENV=/path/to/your/merlin_venv # UPDATE THIS PATH + + # Activate the virtual environment + source ${VENV}/bin/activate + + ######################################### + # Starting the Server # + ######################################### + + # Necessary for the Redis server to spin up + export LC_ALL="C" + + # Initialize the server files + merlin server init + + # Check to make sure the server initialized properly + MERLIN_SERVER_DIR=`pwd`/merlin_server + echo "merlin_server_dir: $MERLIN_SERVER_DIR" + if ! [ -d $MERLIN_SERVER_DIR ]; then + echo "The server directory '$MERLIN_SERVER_DIR' doesn't exist. Likely a problem with 'merlin server init'" + exit 1 + fi + + # Obtain the ip of the current node and set the server to point to it + ip=`getent hosts $(hostname) | awk '{ print $1 }'` + echo "ip: $ip" + merlin server config -ip $ip + + # Start the server (this creates the app.yaml file that we need) + merlin server start + + # Check to make sure the app.yaml file was created properly + APP_YAML_PATH=$MERLIN_SERVER_DIR/app.yaml + echo "app_yaml_path: $APP_YAML_PATH" + if ! [ -f $APP_YAML_PATH ]; then + echo "The app.yaml file '$APP_YAML_PATH' doesn't exist. Likely a problem with 'merlin server start'" + exit 1 + fi + + # Update the app.yaml file generated by merlin server start to point to the hostname of this node + python update_app_hostname.py `hostname` ${APP_YAML_PATH} + + # Move the app.yaml to the project directory + PROJECT_DIR=`pwd` + cp ${MERLIN_SERVER_DIR}/app.yaml ${PROJECT_DIR} + + # Check the server connection + merlin info + + # Keeping the allocation alive so that the server remains up for as long as possible + sleep inf + ``` + +=== "update_app_hostname.py" + + ```python title="update_app_hostname.py" + import argparse + import yaml + + from merlin.utils import verify_filepath + + def update_app_yaml(hostname, app_yaml): + """ + Read in the app.yaml contents, update them, then write the updated + contents back to the file. + + :param hostname: The hostname to set our broker and results server to + :param app_yaml: The path to the app.yaml file to update + """ + with open(app_yaml, "r") as yaml_file: + try: + contents = yaml.load(yaml_file, yaml.FullLoader) + except AttributeError: + LOG.warning( + "PyYAML is using an unsafe version with a known " + "load vulnerability. Please upgrade your installation " + "to a more recent version!" + ) + contents = yaml.load(yaml_file, yaml.Loader) + + contents["broker"]["server"] = hostname + contents["results_backend"]["server"] = hostname + + with open(app_yaml, "w") as yaml_file: + yaml.dump(contents, yaml_file) + + + if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("hostname", help="the hostname to set in the app.yaml file") + parser.add_argument("app_yaml", help="the path to the app.yaml file to update") + args = parser.parse_args() + + app_yaml_path = verify_filepath(args.app_yaml) + update_app_yaml(args.hostname, app_yaml_path) + ``` + +### How to Use the Scripts + +Using the scripts is as easy as: + +1. Copying the `update_app_hostname.py` and `server.sbatch` files to the same location as your spec file +2. Updating the `VENV` variable in `servers.sbatch` to point to your venv with Merlin installed +3. Starting the server by submitting the script with `sbatch server.sbatch` + +Once your allocation is granted, the server should spin up. You can check that it's been started by executing `merlin info` from the directory where these scripts exist. This should output a message like is shown at the end of [Starting the Server and Linking it to Merlin](#starting-the-server-and-linking-it-to-merlin). + +From here, you should be able to start your workers by submitting a `workers.sbatch` script like is shown in [Distributed Runs](../running_studies.md#distributed-runs). To ensure that this script doesn't start prior to your server spinning up, you should submit this script with: + +```bash +sbatch -d after:+1 workers.sbatch +``` + +This will make it so that workers.sbatch cannot start until the server job has been running for 1 minute. + +You can also submit tasks to the server with: + +```bash +merlin run +``` + +### Example Usage + +For this example, we'll use Merlin's built-in [Hello Samples Example](../../examples/hello.md#the-hello-samples-example). The files for this example can be downloaded with: + +```bash +merlin example hello_samples +``` + +We'll then move into the directory that was downloaded with: + +```bash +cd hello/ +``` + +From here, let's copy over the `update_app_hostname.py` and `server.sbatch` files from the [Full Scripts section above](#full-scripts). We'll also add in the following `workers.sbatch` file: + +```bash title="workers.sbatch" +#!/bin/bash +#SBATCH -N 2 +#SBATCH -J Merlin +#SBATCH -t 00:20:00 +#SBATCH -p pdebug +#SBATCH -A wbronze +#SBATCH -o merlin_workers_%j.out + +# Turn off core files to work aroung flux exec issue. +ulimit -c 0 + +YAML=hello_samples.yaml + +VENV=/path/to/your/merlin_venv + +# Activate the virtual environment +source ${VENV}/bin/activate + +# Check the server connection +merlin info + +######################################### +# Running the Workers # +######################################### + +# Show the workers command +merlin run-workers ${YAML} --echo + +# Start workers to run the tasks in the broker +merlin run-workers ${YAML} + +# Keep the allocation alive until all workers stop +merlin monitor ${YAML} +sleep inf # If you're using merlin v1.12.0+ you can comment out this line +``` + +Now update the `VENV` variable in both `server.sbatch` and `workers.sbatch` to point to the virtual environment where Merlin is installed. + +From here, all we need to do is: + +1. Start the containerized server with: + + ```bash + sbatch server.sbatch + ``` + +2. Start the workers with: + + ```bash + sbatch -d after:+1 workers.sbatch + ``` + +3. Wait for the server to start (step 1), then queue the tasks with: + + ```bash + merlin run hello_samples.yaml + ``` + +You can check that everything ran properly with: + +```bash +merlin status hello_samples.yaml +``` + +Or, if you're using a version of Merlin prior to v1.12.0, you can ensure that the `hello_samples_/` output workspace was created. More info on the expected output can be found in [the Hello World Examples page](../../examples/hello.md#expected-output-1). + +Congratulations, you just ran a cross-node workflow with a containerized server! diff --git a/docs/user_guide/configuration/index.md b/docs/user_guide/configuration/index.md index e8a551da9..c6dbcbb27 100644 --- a/docs/user_guide/configuration/index.md +++ b/docs/user_guide/configuration/index.md @@ -258,9 +258,9 @@ When it comes to configuring the `broker` and `results_backend` sections of your For Livermore Computing (LC) users we recommend configuring with either: - [Dedicated LaunchIT Servers](https://lc.llnl.gov/confluence/display/MERLIN/LaunchIT+Configuration) -- [Containerized Servers](./merlin_server.md) +- [Containerized Servers](./containerized_server.md) For all other users, we recommend configuring with either: - [Dedicated External Servers](./external_server.md) -- [Containerized Servers](./merlin_server.md) +- [Containerized Servers](./containerized_server.md) diff --git a/merlin/__init__.py b/merlin/__init__.py index 184e86d9d..b765c71f2 100644 --- a/merlin/__init__.py +++ b/merlin/__init__.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # @@ -38,7 +38,7 @@ import sys -__version__ = "1.12.0" +__version__ = "1.12.1" VERSION = __version__ PATH_TO_PROJ = os.path.join(os.path.dirname(__file__), "") diff --git a/merlin/ascii_art.py b/merlin/ascii_art.py index 1e89e61a2..1f8b04229 100644 --- a/merlin/ascii_art.py +++ b/merlin/ascii_art.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/celery.py b/merlin/celery.py index 1cb7d3806..d35b0dccd 100644 --- a/merlin/celery.py +++ b/merlin/celery.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/common/__init__.py b/merlin/common/__init__.py index c7b9c10f5..c76918410 100644 --- a/merlin/common/__init__.py +++ b/merlin/common/__init__.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/common/abstracts/__init__.py b/merlin/common/abstracts/__init__.py index c7b9c10f5..c76918410 100644 --- a/merlin/common/abstracts/__init__.py +++ b/merlin/common/abstracts/__init__.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/common/abstracts/enums/__init__.py b/merlin/common/abstracts/enums/__init__.py index 9f23caa2f..61fecf7a8 100644 --- a/merlin/common/abstracts/enums/__init__.py +++ b/merlin/common/abstracts/enums/__init__.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/common/dumper.py b/merlin/common/dumper.py index f42aead3a..457c62b49 100644 --- a/merlin/common/dumper.py +++ b/merlin/common/dumper.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0 +# This file is part of Merlin, Version: 1.12.1 # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/common/openfilelist.py b/merlin/common/openfilelist.py index 90408dad7..4030dfe75 100644 --- a/merlin/common/openfilelist.py +++ b/merlin/common/openfilelist.py @@ -8,7 +8,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/common/opennpylib.py b/merlin/common/opennpylib.py index a239e3ab5..6d719ea5e 100644 --- a/merlin/common/opennpylib.py +++ b/merlin/common/opennpylib.py @@ -8,7 +8,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/common/sample_index.py b/merlin/common/sample_index.py index 3b016c683..8e42aeac3 100644 --- a/merlin/common/sample_index.py +++ b/merlin/common/sample_index.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/common/sample_index_factory.py b/merlin/common/sample_index_factory.py index 6d0fffaa9..a36a15ef0 100644 --- a/merlin/common/sample_index_factory.py +++ b/merlin/common/sample_index_factory.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/common/security/__init__.py b/merlin/common/security/__init__.py index c7b9c10f5..c76918410 100644 --- a/merlin/common/security/__init__.py +++ b/merlin/common/security/__init__.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/common/security/encrypt.py b/merlin/common/security/encrypt.py index fea3039a2..b0ad464b6 100644 --- a/merlin/common/security/encrypt.py +++ b/merlin/common/security/encrypt.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/common/security/encrypt_backend_traffic.py b/merlin/common/security/encrypt_backend_traffic.py index 86befd070..c59c19ebd 100644 --- a/merlin/common/security/encrypt_backend_traffic.py +++ b/merlin/common/security/encrypt_backend_traffic.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/common/tasks.py b/merlin/common/tasks.py index 15be1182f..33afb3316 100644 --- a/merlin/common/tasks.py +++ b/merlin/common/tasks.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # @@ -49,6 +49,7 @@ from merlin.exceptions import HardFailException, InvalidChainException, RestartException, RetryException from merlin.router import stop_workers from merlin.spec.expansion import parameter_substitutions_for_cmd, parameter_substitutions_for_sample +from merlin.study.status import read_status from merlin.utils import dict_deep_merge @@ -139,7 +140,7 @@ def merlin_step(self, *args: Any, **kwargs: Any) -> Optional[ReturnCode]: # noq f"Step '{step_name}' in '{step_dir}' is being restarted ({self.request.retries + 1}/{self.max_retries})..." ) step.mstep.mark_restart() - self.retry(countdown=step.retry_delay) + self.retry(countdown=step.retry_delay, priority=get_priority(Priority.RETRY)) except MaxRetriesExceededError: LOG.warning( f"""*** Step '{step_name}' in '{step_dir}' exited with a MERLIN_RESTART command, @@ -155,7 +156,7 @@ def merlin_step(self, *args: Any, **kwargs: Any) -> Optional[ReturnCode]: # noq f"Step '{step_name}' in '{step_dir}' is being retried ({self.request.retries + 1}/{self.max_retries})..." ) step.mstep.mark_restart() - self.retry(countdown=step.retry_delay) + self.retry(countdown=step.retry_delay, priority=get_priority(Priority.RETRY)) except MaxRetriesExceededError: LOG.warning( f"""*** Step '{step_name}' in '{step_dir}' exited with a MERLIN_RETRY command, @@ -312,13 +313,17 @@ def add_merlin_expanded_chain_to_chord( # pylint: disable=R0913,R0914 all_chains.append(new_chain) - condense_sig = condense_status_files.s( - sample_index=sample_index, - workspace=top_lvl_workspace, - condensed_workspace=chain_[0].mstep.condensed_workspace, - ).set( - queue=chain_[0].get_task_queue(), - ) + # Only need to condense status files if there's more than 1 sample + if num_samples > 1: + condense_sig = condense_status_files.s( + sample_index=sample_index, + workspace=top_lvl_workspace, + condensed_workspace=chain_[0].mstep.condensed_workspace, + ).set( + queue=chain_[0].get_task_queue(), + ) + else: + condense_sig = None LOG.debug("adding chain to chord") chain_1d = get_1d_chain(all_chains) @@ -467,29 +472,33 @@ def gather_statuses( # Read in the status data sample_workspace = f"{workspace}/{path}" status_filepath = f"{sample_workspace}/MERLIN_STATUS.json" - lock = FileLock(f"{sample_workspace}/status.lock") # pylint: disable=E0110 - try: - # The status files will need locks when reading to avoid race conditions - with lock.acquire(timeout=10): - with open(status_filepath, "r") as status_file: - status = json.load(status_file) - - # This for loop is just to get the step name that we don't have; it's really not even looping - for step_name in status: - try: - # Make sure the status for this sample workspace is in a finished state (not initialized or running) - if status[step_name][f"{condensed_workspace}/{path}"]["status"] not in ("INITIALIZED", "RUNNING"): - # Add the status data to the statuses we'll write to the condensed file and remove this status file - dict_deep_merge(condensed_statuses, status) - files_to_remove.append(status_filepath) - except KeyError: - LOG.warning(f"Key error when reading from {sample_workspace}") - except Timeout: - # Raising this celery timeout instead will trigger a restart for this task - raise TimeoutError # pylint: disable=W0707 - except FileNotFoundError: - LOG.warning(f"Could not find {status_filepath} while trying to condense. Restarting this task...") - raise FileNotFoundError # pylint: disable=W0707 + lock_filepath = f"{sample_workspace}/status.lock" + if os.path.exists(status_filepath): + try: + # NOTE: instead of leaving statuses as dicts read in by JSON, maybe they should each be their own object + status = read_status(status_filepath, lock_filepath, raise_errors=True) + + # This for loop is just to get the step name that we don't have; it's really not even looping + for step_name in status: + try: + # Make sure the status for this sample workspace is in a finished state (not initialized or running) + if status[step_name][f"{condensed_workspace}/{path}"]["status"] not in ("INITIALIZED", "RUNNING"): + # Add the status data to the statuses we'll write to the condensed file and remove this status file + dict_deep_merge(condensed_statuses, status) + files_to_remove.append(status_filepath) + files_to_remove.append(lock_filepath) # Remove the lock files as well as the status files + except KeyError: + LOG.warning(f"Key error when reading from {sample_workspace}") + except Timeout: + # Raising this celery timeout instead will trigger a restart for this task + raise TimeoutError # pylint: disable=W0707 + except FileNotFoundError: + LOG.warning(f"Could not find {status_filepath} while trying to condense. Restarting this task...") + raise FileNotFoundError # pylint: disable=W0707 + else: + # Might be missing a status file in the output if we hit this but we don't want that + # to fully crash the workflow + LOG.debug(f"Could not find {status_filepath}, skipping this status file.") return condensed_statuses diff --git a/merlin/common/util_sampling.py b/merlin/common/util_sampling.py index 3e2c3a9dd..3ddc47d65 100644 --- a/merlin/common/util_sampling.py +++ b/merlin/common/util_sampling.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/config/__init__.py b/merlin/config/__init__.py index 63bc37fa0..6768ee58d 100644 --- a/merlin/config/__init__.py +++ b/merlin/config/__init__.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/config/broker.py b/merlin/config/broker.py index be1da0ab9..e1c9b8cfc 100644 --- a/merlin/config/broker.py +++ b/merlin/config/broker.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/config/celeryconfig.py b/merlin/config/celeryconfig.py index 75b31b051..652e76db8 100644 --- a/merlin/config/celeryconfig.py +++ b/merlin/config/celeryconfig.py @@ -10,7 +10,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/config/configfile.py b/merlin/config/configfile.py index e2d9a6a82..c01c1554a 100644 --- a/merlin/config/configfile.py +++ b/merlin/config/configfile.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/config/results_backend.py b/merlin/config/results_backend.py index a37667ffa..83de7e457 100644 --- a/merlin/config/results_backend.py +++ b/merlin/config/results_backend.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/config/utils.py b/merlin/config/utils.py index f0380b63c..b7063577e 100644 --- a/merlin/config/utils.py +++ b/merlin/config/utils.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # @@ -30,7 +30,7 @@ """This module contains priority handling""" import enum -from typing import List +from typing import Dict from merlin.config.configfile import CONFIG @@ -41,6 +41,7 @@ class Priority(enum.Enum): HIGH = 1 MID = 2 LOW = 3 + RETRY = 4 def is_rabbit_broker(broker: str) -> bool: @@ -53,26 +54,31 @@ def is_redis_broker(broker: str) -> bool: return broker in ["redis", "rediss", "redis+socket"] +def determine_priority_map(broker_name: str) -> Dict[Priority, int]: + """ + Returns the priority mapping for the given broker name. + + :param broker_name: The name of the broker that we need the priority map for + :returns: The priority map associated with `broker_name` + """ + if is_rabbit_broker(broker_name): + return {Priority.LOW: 1, Priority.MID: 5, Priority.HIGH: 9, Priority.RETRY: 10} + if is_redis_broker(broker_name): + return {Priority.LOW: 10, Priority.MID: 5, Priority.HIGH: 2, Priority.RETRY: 1} + + raise ValueError(f"Unsupported broker name: {broker_name}") + + def get_priority(priority: Priority) -> int: """ - Get the priority based on the broker. For a rabbit broker - a low priority is 1 and high is 10. For redis it's the opposite. - :returns: An int representing the priority level + Gets the priority level as an integer based on the broker. + For a rabbit broker a low priority is 1 and high is 10. For redis it's the opposite. + + :param priority: The priority value that we want + :returns: The priority value as an integer """ - broker: str = CONFIG.broker.name.lower() - priorities: List[Priority] = [Priority.HIGH, Priority.MID, Priority.LOW] - if not isinstance(priority, Priority): - raise TypeError(f"Unrecognized priority '{priority}'! Priority enum options: {[x.name for x in priorities]}") - if priority == Priority.MID: - return 5 - if is_rabbit_broker(broker): - if priority == Priority.LOW: - return 1 - if priority == Priority.HIGH: - return 10 - if is_redis_broker(broker): - if priority == Priority.LOW: - return 10 - if priority == Priority.HIGH: - return 1 - raise ValueError(f"Function get_priority has reached unknown state! Maybe unsupported broker {broker}?") + if priority not in Priority: + raise ValueError(f"Invalid priority: {priority}") + + priority_map = determine_priority_map(CONFIG.broker.name.lower()) + return priority_map.get(priority, priority_map[Priority.MID]) # Default to MID priority for unknown priorities diff --git a/merlin/data/celery/__init__.py b/merlin/data/celery/__init__.py index c7b9c10f5..c76918410 100644 --- a/merlin/data/celery/__init__.py +++ b/merlin/data/celery/__init__.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/display.py b/merlin/display.py index f464d4cb0..3a7cd66c3 100644 --- a/merlin/display.py +++ b/merlin/display.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # @@ -349,7 +349,7 @@ def display_status_summary( # pylint: disable=R0912 :param `status_obj`: A Status object :param `non_workspace_keys`: A set of keys in requested_statuses that are not workspace keys. - This will be set("parameters", "task_queue", "worker_name) + This will be set("parameters", "task_queue", "workers") :param `test_mode`: If True, don't print anything and just return a dict of all the state info for each step :returns: A dict that's empty usually. If ran in test_mode it will be a dict of state_info for every step. """ @@ -369,7 +369,7 @@ def display_status_summary( # pylint: disable=R0912 "UNKNOWN": {"count": 0, "color": ANSI_COLORS["GREY"], "fill": "?"}, "INITIALIZED": {"count": 0, "color": ANSI_COLORS["LIGHT_BLUE"]}, "RUNNING": {"count": 0, "color": ANSI_COLORS["BLUE"]}, - "DRY RUN": {"count": 0, "color": ANSI_COLORS["ORANGE"], "fill": "\\"}, + "DRY_RUN": {"count": 0, "color": ANSI_COLORS["ORANGE"], "fill": "\\"}, "TOTAL TASKS": {"total": status_obj.tasks_per_step[sstep]}, "AVG RUN TIME": status_obj.run_time_info[sstep]["avg_run_time"], "RUN TIME STD DEV": status_obj.run_time_info[sstep]["run_time_std_dev"], @@ -385,8 +385,9 @@ def display_status_summary( # pylint: disable=R0912 # If this was a non-local run we should have a task queue and worker name to add to state_info if "task_queue" in overall_step_info: state_info["TASK QUEUE"] = {"name": overall_step_info["task_queue"]} - if "worker_name" in overall_step_info: - state_info["WORKER NAME"] = {"name": overall_step_info["worker_name"]} + if "workers" in overall_step_info: + worker_str = ", ".join(overall_step_info["workers"]) + state_info["WORKER(S)"] = {"name": worker_str} # Loop through all workspaces for this step (if there's no samples for this step it'll just be one path) for sub_step_workspace, task_status_info in overall_step_info.items(): @@ -474,7 +475,7 @@ def display_progress_bar( # pylint: disable=R0913,R0914 "INITIALIZED", "RUNNING", "TASK QUEUE", - "WORKER NAME", + "WORKER(S)", "TOTAL TASKS", "AVG RUN TIME", "RUN TIME STD DEV", diff --git a/merlin/examples/__init__.py b/merlin/examples/__init__.py index c7b9c10f5..c76918410 100644 --- a/merlin/examples/__init__.py +++ b/merlin/examples/__init__.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/examples/examples.py b/merlin/examples/examples.py index 002ade16c..c50b11b99 100644 --- a/merlin/examples/examples.py +++ b/merlin/examples/examples.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/examples/generator.py b/merlin/examples/generator.py index 0af04066c..942ef9011 100644 --- a/merlin/examples/generator.py +++ b/merlin/examples/generator.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/exceptions/__init__.py b/merlin/exceptions/__init__.py index 0c610bcf3..89fe89a13 100644 --- a/merlin/exceptions/__init__.py +++ b/merlin/exceptions/__init__.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/log_formatter.py b/merlin/log_formatter.py index 980c75e2a..b90660f9c 100644 --- a/merlin/log_formatter.py +++ b/merlin/log_formatter.py @@ -8,7 +8,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/main.py b/merlin/main.py index c5c3743e7..26274a729 100644 --- a/merlin/main.py +++ b/merlin/main.py @@ -8,7 +8,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/merlin_templates.py b/merlin/merlin_templates.py index 8f2b36fbb..8afb7b9c8 100644 --- a/merlin/merlin_templates.py +++ b/merlin/merlin_templates.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/router.py b/merlin/router.py index a942cf65d..ec3c83acf 100644 --- a/merlin/router.py +++ b/merlin/router.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/server/__init__.py b/merlin/server/__init__.py index 1c36c97ff..1a7410485 100644 --- a/merlin/server/__init__.py +++ b/merlin/server/__init__.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. diff --git a/merlin/server/server_commands.py b/merlin/server/server_commands.py index c2764a77c..65d17c42b 100644 --- a/merlin/server/server_commands.py +++ b/merlin/server/server_commands.py @@ -8,7 +8,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/server/server_config.py b/merlin/server/server_config.py index 92f019b46..f4d5d5174 100644 --- a/merlin/server/server_config.py +++ b/merlin/server/server_config.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/server/server_util.py b/merlin/server/server_util.py index c10e0e1d9..db19866e5 100644 --- a/merlin/server/server_util.py +++ b/merlin/server/server_util.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/spec/__init__.py b/merlin/spec/__init__.py index c7b9c10f5..c76918410 100644 --- a/merlin/spec/__init__.py +++ b/merlin/spec/__init__.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/spec/all_keys.py b/merlin/spec/all_keys.py index cae3cfb53..10fc8646c 100644 --- a/merlin/spec/all_keys.py +++ b/merlin/spec/all_keys.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/spec/defaults.py b/merlin/spec/defaults.py index 3baf84668..01ba8c743 100644 --- a/merlin/spec/defaults.py +++ b/merlin/spec/defaults.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/spec/expansion.py b/merlin/spec/expansion.py index b9aa0fc74..a8bf3ac43 100644 --- a/merlin/spec/expansion.py +++ b/merlin/spec/expansion.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/spec/override.py b/merlin/spec/override.py index e2fb10b0d..83a831c59 100644 --- a/merlin/spec/override.py +++ b/merlin/spec/override.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/spec/specification.py b/merlin/spec/specification.py index 13170c2fb..e5ebc858c 100644 --- a/merlin/spec/specification.py +++ b/merlin/spec/specification.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/study/__init__.py b/merlin/study/__init__.py index c7b9c10f5..c76918410 100644 --- a/merlin/study/__init__.py +++ b/merlin/study/__init__.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/study/batch.py b/merlin/study/batch.py index 5ccc4b4b9..201e41ff3 100644 --- a/merlin/study/batch.py +++ b/merlin/study/batch.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/study/celeryadapter.py b/merlin/study/celeryadapter.py index 97adf9fd1..556953259 100644 --- a/merlin/study/celeryadapter.py +++ b/merlin/study/celeryadapter.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/study/dag.py b/merlin/study/dag.py index ab456ed36..098cdb92b 100644 --- a/merlin/study/dag.py +++ b/merlin/study/dag.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/study/script_adapter.py b/merlin/study/script_adapter.py index 1656a90a7..69dea2b63 100644 --- a/merlin/study/script_adapter.py +++ b/merlin/study/script_adapter.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/study/status.py b/merlin/study/status.py index e617df12f..d11a403e2 100644 --- a/merlin/study/status.py +++ b/merlin/study/status.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0 +# This file is part of Merlin, Version: 1.12.1 # # For details, see https://github.com/LLNL/merlin. # @@ -36,6 +36,7 @@ from copy import deepcopy from datetime import datetime from glob import glob +from traceback import print_exception from typing import Dict, List, Optional, Tuple, Union import numpy as np @@ -55,6 +56,7 @@ ) from merlin.study.status_renderers import status_renderer_factory from merlin.utils import ( + apply_list_of_regex, convert_timestring, convert_to_timedelta, dict_deep_merge, @@ -105,6 +107,21 @@ def __init__(self, args: Namespace, spec_display: bool, file_or_ws: str): self.requested_statuses = {} self.load_requested_statuses() + def _print_requested_statuses(self): + """ + Helper method to print out the requested statuses dict. + """ + print("self.requested_statuses:") + for step_name, overall_step_info in self.requested_statuses.items(): + print(f"\t{step_name}:") + for key, val in overall_step_info.items(): + if key in NON_WORKSPACE_KEYS: + print(f"\t\t{key}: {val}") + else: + print(f"\t\t{key}:") + for status_key, status_val in val.items(): + print(f"\t\t\t{status_key}: {status_val}") + def _verify_filter_args(self): """ This is an abstract method since we'll need to verify filter args for DetailedStatus @@ -343,16 +360,19 @@ def get_step_statuses(self, step_workspace: str, started_step_name: str) -> Dict if matching_files: LOG.debug(f"Found status file at '{status_filepath}'") # Read in the statuses - lock = FileLock(f"{root}/status.lock") # pylint: disable=E0110 - statuses_read = read_status(status_filepath, lock) + statuses_read = read_status(status_filepath, f"{root}/status.lock") + + # Merge the statuses we read with the dict tracking all statuses for this step + dict_deep_merge(step_statuses, statuses_read) # Add full step name to the tracker and count number of statuses we just read in for full_step_name, status_info in statuses_read.items(): self.full_step_name_map[started_step_name].add(full_step_name) num_statuses_read += len(status_info.keys() - NON_WORKSPACE_KEYS) - # Merge the statuses we read with the dict tracking all statuses for this step - dict_deep_merge(step_statuses, statuses_read) + # Make sure there aren't any duplicate workers + if "workers" in step_statuses[full_step_name]: + step_statuses[full_step_name]["workers"] = list(set(step_statuses[full_step_name]["workers"])) LOG.debug( f"Done traversing '{step_workspace}'. Read in {num_statuses_read} " @@ -501,7 +521,7 @@ def format_status_for_csv(self) -> Dict: "cmd_parameters": [], "restart_parameters": [], "task_queue": [], - "worker_name": [], + "workers": [], } # We only care about started steps since unstarted steps won't have any status to report @@ -511,8 +531,11 @@ def format_status_for_csv(self) -> Dict: # Loop through information for each step for step_info_key, step_info_value in overall_step_info.items(): - # Format celery specific keys - if step_info_key in CELERY_KEYS: + # Skip the workers entry at the top level; this will be added in the else statement below on a task-by-task basis + if step_info_key == "workers": + continue + # Format task queue entry + if step_info_key == "task_queue": # Set the val_to_add value based on if a value exists for the key val_to_add = step_info_value if step_info_value else "-------" # Add the val_to_add entry for each row @@ -541,12 +564,18 @@ def format_status_for_csv(self) -> Dict: # Add the rest of the information for each task (status, return code, elapsed & run time, num restarts) for key, val in step_info_value.items(): - reformatted_statuses[key].append(val) + if key == "workers": + reformatted_statuses[key].append(", ".join(val)) + else: + reformatted_statuses[key].append(val) # For local runs, there will be no task queue or worker name so delete these entries for celery_specific_key in CELERY_KEYS: - if not reformatted_statuses[celery_specific_key]: - del reformatted_statuses[celery_specific_key] + try: + if not reformatted_statuses[celery_specific_key]: + del reformatted_statuses[celery_specific_key] + except KeyError: + pass return reformatted_statuses @@ -561,6 +590,10 @@ def __init__(self, args: Namespace, spec_display: bool, file_or_ws: str): args_copy = Namespace(**vars(args)) super().__init__(args, spec_display, file_or_ws) + # Need to set this environment value for the pager functionality to work + if not args.disable_pager: + os.environ["MANPAGER"] = "less -r" + # Check if the steps filter was given self.steps_filter_provided = "all" not in args_copy.steps @@ -674,30 +707,6 @@ def _verify_filter_args(self, suppress_warnings: Optional[bool] = False): ) LOG.debug(f"args.workers after verification: {self.args.workers}") - def _process_workers(self): - """ - Modifies the list of steps to display status for based on - the list of workers provided by the user. - """ - LOG.debug("Processing workers filter...") - # Remove duplicates - workers_provided = list(set(self.args.workers)) - - # Get a map between workers and steps - worker_step_map = self.spec.get_worker_step_map() - - # Append steps associated with each worker provided - for worker_provided in workers_provided: - # Check for invalid workers - if worker_provided not in worker_step_map: - LOG.warning(f"Worker with name {worker_provided} does not exist for this study.") - else: - for step in worker_step_map[worker_provided]: - if step not in self.args.steps: - self.args.steps.append(step) - - LOG.debug(f"Steps after workers filter: {self.args.steps}") - def _process_task_queue(self): """ Modifies the list of steps to display status for based on @@ -729,7 +738,7 @@ def get_steps_to_display(self) -> Dict[str, List[str]]: """ Generates a list of steps to display the status for based on information provided to the merlin detailed-status command by the user. This function - will handle the --steps, --task-queues, and --workers filter options. + will handle the --steps and --task-queues filter options. :returns: A dictionary of started and unstarted steps for us to display the status of """ @@ -737,19 +746,17 @@ def get_steps_to_display(self) -> Dict[str, List[str]]: LOG.debug(f"existing steps: {existing_steps}") - if ("all" in self.args.steps) and (not self.args.task_queues) and (not self.args.workers): - LOG.debug("The steps, task_queues, and workers filters weren't provided. Setting steps to be all existing steps.") + if ("all" in self.args.steps) and (not self.args.task_queues): + LOG.debug("The steps and task_queues filters weren't provided. Setting steps to be all existing steps.") self.args.steps = existing_steps else: - # This won't matter anymore since task_queues or workers is not None here + # This won't matter anymore since task_queues is not None here if "all" in self.args.steps: self.args.steps = [] - # Add steps to start based on task queues and/or workers provided + # Add steps to start based on task queues provided if self.args.task_queues: self._process_task_queue() - if self.args.workers: - self._process_workers() # Sort the steps to start by the order they show up in the study for i, estep in enumerate(existing_steps): @@ -780,45 +787,105 @@ def _remove_steps_without_statuses(self): self.requested_statuses = result - def apply_filters(self, filter_types: List[str], filters: List[str]): + def _search_for_filter(self, filter_to_apply: List[str], entry_to_search: Union[List[str], str]) -> bool: """ - Given a list of filters, filter the dict of requested statuses by them. + Search an entry to see if our filter(s) apply to this entry. If they do, return True. Otherwise, False. - :param `filter_types`: A list of str denoting the types of filters we're applying - :param `filters`: A list of filters to apply to the dict of statuses we read in + :param filter_to_apply: A list of filters to search for + :param entry_to_search: A list or string of entries to search for our filters in + :returns: True if a filter was found in the entry. False otherwise. """ - LOG.info(f"Filtering tasks using these filters: {filters}") + if not isinstance(entry_to_search, list): + entry_to_search = [entry_to_search] - # Create a deep copy of the dict so we can make changes to it while we iterate - result = deepcopy(self.requested_statuses) + filter_matches = [] + apply_list_of_regex(filter_to_apply, entry_to_search, filter_matches, display_warning=False) + if len(filter_matches) != 0: + return True + return False + + def apply_filters(self): + """ + Apply any filters given by the --workers, --return-code, and/or --task-status arguments. + This function will also apply the --max-tasks limit if it was set by a user. We apply this + limit here so it can be done in-place; if we called apply_max_tasks_limit instead, this + would become a two-pass algorithm and can be really slow with lots of statuses. + """ + if self.args.max_tasks is not None: + # Make sure the max_tasks variable is set to a reasonable number and store that value + if self.args.max_tasks > self.num_requested_statuses: + LOG.warning( + f"'max_tasks' was set to {self.args.max_tasks} but only {self.num_requested_statuses} statuses exist. " + f"Setting 'max_tasks' to {self.num_requested_statuses}." + ) + self.args.max_tasks = self.num_requested_statuses + + # Establish a map between keys and filters; Only create a key/val pair here if the filter is not None + filter_key_map = { + key: value + for key, value in zip( + ["status", "return_code", "workers"], [self.args.task_status, self.args.return_code, self.args.workers] + ) + if value is not None + } + matches_found = 0 + filtered_statuses = {} for step_name, overall_step_info in self.requested_statuses.items(): + filtered_statuses[step_name] = {} + # Add the non-workspace keys to the filtered_status dict so we don't accidentally miss any of this information while filtering + for non_ws_key in NON_WORKSPACE_KEYS: + try: + filtered_statuses[step_name][non_ws_key] = overall_step_info[non_ws_key] + except KeyError: + LOG.debug( + f"Tried to add {non_ws_key} to filtered_statuses dict but it was not found in requested_statuses[{step_name}]" + ) + + # Go through the actual statuses and filter them as necessary for sub_step_workspace, task_status_info in overall_step_info.items(): # Ignore non workspace keys if sub_step_workspace in NON_WORKSPACE_KEYS: continue - # Search for our filters found_a_match = False - for filter_type in filter_types: - if task_status_info[filter_type] in filters: - found_a_match = True + + # Check all of our filters to see if this specific entry matches them all + filter_match = [False for _ in range(len(filter_key_map))] + for i, (filter_key, filter_to_apply) in enumerate(filter_key_map.items()): + filter_match[i] = self._search_for_filter(filter_to_apply, task_status_info[filter_key]) + + found_a_match = any(filter_match) + + # If a match is found, increment the number of matches found and compare against args.max_tasks limit + if found_a_match: + matches_found += 1 + filtered_statuses[step_name][sub_step_workspace] = task_status_info + # If we've hit the limit set by args.max_tasks, break out of the inner loop + if matches_found == self.args.max_tasks: break + else: + # If our filters aren't a match for this task then delete it + LOG.warning(f"No matching filter for '{sub_step_workspace}'.") + + # If we've hit the limit set by args.max_tasks, break out of the outer loop + if matches_found == self.args.max_tasks: + break - # If our filters aren't a match for this task then delete it - if not found_a_match: - LOG.debug(f"No matching filter for '{sub_step_workspace}'; removing it from requested_statuses.") - del result[step_name][sub_step_workspace] + LOG.debug(f"result after applying filters: {filtered_statuses}") + LOG.info(f"Found {matches_found} tasks matching your filters.") - # Get the number of tasks found with our filters - self.requested_statuses = result + # Set our requested statuses to the new filtered statuses + self.requested_statuses = filtered_statuses self._remove_steps_without_statuses() - LOG.info(f"Found {self.num_requested_statuses} tasks matching your filters.") # If no tasks were found set the status dict to empty if self.num_requested_statuses == 0: self.requested_statuses = {} + if self.args.max_tasks is not None: + LOG.info(f"Limited the number of tasks to display to {self.args.max_tasks} tasks.") + def apply_max_tasks_limit(self): """ Given a number representing the maximum amount of tasks to display, filter the dict of statuses @@ -869,50 +936,49 @@ def load_requested_statuses(self): # Grab all the statuses based on our step tracker super().load_requested_statuses() - # Apply filters to the statuses - filter_types = set() - filters = [] - if self.args.task_status: - filter_types.add("status") - filters += self.args.task_status - if self.args.return_code: - filter_types.add("return_code") - filters += [f"MERLIN_{return_code}" for return_code in self.args.return_code] - - # Apply the filters if necessary - if filters: - self.apply_filters(list(filter_types), filters) + # Determine if there are filters to apply + filters_to_apply = ( + (self.args.return_code is not None) or (self.args.task_status is not None) or (self.args.workers is not None) + ) - # Limit the number of tasks to display if necessary - if self.args.max_tasks is not None and self.args.max_tasks > 0: + # Case where there are filters to apply + if filters_to_apply: + self.apply_filters() # This will also apply max_tasks if it's provided too + # Case where there are no filters but there is a max tasks limit set + elif self.args.max_tasks is not None: self.apply_max_tasks_limit() - def get_user_filters(self) -> List[str]: + def get_user_filters(self) -> bool: """ Get a filter on the statuses to display from the user. Possible options for filtering: - A str MAX_TASKS -> will ask the user for another input that's equivalent to the --max-tasks flag - A list of statuses -> equivalent to the --task-status flag - A list of return codes -> equivalent to the --return-code flag + - A list of workers -> equivalent to the --workers flag - An exit keyword to leave the filter prompt without filtering - :returns: A list of strings to filter by + :returns: True if we need to exit without filtering. False otherwise. """ + valid_workers = tuple(self.spec.get_worker_names()) + # Build the filter options filter_info = { "Filter Type": [ "Put a limit on the number of tasks to display", "Filter by status", "Filter by return code", + "Filter by workers", "Exit without filtering", ], "Description": [ "Enter 'MAX_TASKS'", f"Enter a comma separated list of the following statuses you'd like to see: {VALID_STATUS_FILTERS}", f"Enter a comma separated list of the following return codes you'd like to see: {VALID_RETURN_CODES}", + f"Enter a comma separated list of the following workers from your spec: {valid_workers}", f"Enter one of the following: {VALID_EXIT_FILTERS}", ], - "Example": ["MAX_TASKS", "FAILED, CANCELLED", "SOFT_FAIL, RETRY", "EXIT"], + "Example": ["MAX_TASKS", "FAILED, CANCELLED", "SOFT_FAIL, RETRY", "default_worker, other_worker", "EXIT"], } # Display the filter options @@ -922,29 +988,60 @@ def get_user_filters(self) -> List[str]: # Obtain and validate the filter provided by the user invalid_filter = True + exit_requested = False while invalid_filter: user_filters = input("How would you like to filter the tasks? ") + # Remove spaces and split user filters by commas user_filters = user_filters.replace(" ", "") user_filters = user_filters.split(",") + # Variables to help track our filters + status_filters = [] + return_code_filters = [] + worker_filters = [] + max_task_requested = False + # Ensure every filter is valid - for i, entry in enumerate(user_filters): + for entry in user_filters: + invalid_filter = False + orig_entry = entry entry = entry.upper() - if entry not in ALL_VALID_FILTERS: + + if entry in VALID_STATUS_FILTERS: + status_filters.append(entry) + elif entry in VALID_RETURN_CODES: + return_code_filters.append(entry) + elif orig_entry in valid_workers: + worker_filters.append(orig_entry) + elif entry == "MAX_TASKS": + max_task_requested = True + elif entry in VALID_EXIT_FILTERS: + LOG.info(f"The exit filter '{entry}' was provided. Exiting without filtering.") + exit_requested = True + break + else: invalid_filter = True - print(f"Invalid input: {entry}. Input must be one of the following {ALL_VALID_FILTERS}") + print(f"Invalid input: {entry}. Input must be one of the following {ALL_VALID_FILTERS + valid_workers}") break - invalid_filter = False - user_filters[i] = entry - return user_filters + if exit_requested: + return True + + # Set the filters provided by the user + self.args.task_status = status_filters if len(status_filters) > 0 else None + self.args.return_code = return_code_filters if len(return_code_filters) > 0 else None + self.args.workers = worker_filters if len(worker_filters) > 0 else None + + # Set the max_tasks value if it was requested + if max_task_requested: + self.get_user_max_tasks() - def get_user_max_tasks(self) -> int: + return False + + def get_user_max_tasks(self): """ Get a limit for the amount of tasks to display from the user. - - :returns: An int representing the max amount of tasks to display """ invalid_input = True @@ -959,7 +1056,7 @@ def get_user_max_tasks(self) -> int: print("Invalid input. The limit must be an integer greater than 0.") continue - return user_max_tasks + self.args.max_tasks = user_max_tasks def filter_via_prompts(self): """ @@ -967,54 +1064,27 @@ def filter_via_prompts(self): prevent us from overloading the terminal by displaying a bazillion tasks at once. """ # Get the filters from the user - user_filters = self.get_user_filters() - - # TODO remove this once restart/retry functionality is implemented - if "RESTART" in user_filters: - LOG.warning("The RESTART filter is coming soon. Ignoring this filter for now...") - user_filters.remove("RESTART") - if "RETRY" in user_filters: - LOG.warning("The RETRY filter is coming soon. Ignoring this filter for now...") - user_filters.remove("RETRY") - - # Variable to track whether the user wants to stop filtering - exit_without_filtering = False - - # Process the filters - max_tasks_found = False - filter_types = [] - for i, user_filter in enumerate(user_filters): - # Case 1: Exit command found, stop filtering - if user_filter in ("E", "EXIT"): - exit_without_filtering = True - break - # Case 2: MAX_TASKS command found, get the limit from the user - if user_filter == "MAX_TASKS": - max_tasks_found = True - # Case 3: Status filter provided, add it to the list of filter types - elif user_filter in VALID_STATUS_FILTERS and "status" not in filter_types: - filter_types.append("status") - # Case 4: Return Code filter provided, add it to the list of filter types and add the MERLIN prefix - elif user_filter in VALID_RETURN_CODES: - user_filters[i] = f"MERLIN_{user_filter}" - if "return_code" not in filter_types: - filter_types.append("return_code") - - # Remove the MAX_TASKS entry so we don't try to filter using it - try: - user_filters.remove("MAX_TASKS") - except ValueError: - pass - - # Apply the filters and tell the user how many tasks match the filters (if necessary) - if not exit_without_filtering and user_filters: - self.apply_filters(filter_types, user_filters) - - # Apply max tasks limit (if necessary) - if max_tasks_found: - user_max_tasks = self.get_user_max_tasks() - self.args.max_tasks = user_max_tasks - self.apply_max_tasks_limit() + exit_without_filtering = self.get_user_filters() + + if not exit_without_filtering: + # TODO remove this once restart/retry functionality is implemented + if self.args.return_code is not None: + if "RESTART" in self.args.return_code: + LOG.warning("The RESTART filter is coming soon. Ignoring this filter for now...") + self.args.return_code.remove("RESTART") + if "RETRY" in self.args.return_code: + LOG.warning("The RETRY filter is coming soon. Ignoring this filter for now...") + self.args.return_code.remove("RETRY") + + # If any status, return code, or workers filters were given, apply them + if any( + list_var is not None and len(list_var) != 0 + for list_var in [self.args.return_code, self.args.task_status, self.args.workers] + ): + self.apply_filters() # This will also apply max_tasks if it's provided too + # If just max_tasks was given, apply the limit and nothing else + elif self.args.max_tasks is not None: + self.apply_max_task_limit() def display(self, test_mode: Optional[bool] = False): """ @@ -1029,28 +1099,73 @@ def display(self, test_mode: Optional[bool] = False): LOG.warning("No statuses to display.") -def read_status(status_filepath: str, lock: FileLock, display_fnf_message: Optional[bool] = True) -> Dict: +def read_status( + status_filepath: str, lock_file: str, display_fnf_message: bool = True, raise_errors: bool = False, timeout: int = 10 +) -> Dict: """ Locks the status file for reading and returns its contents. - :param `status_filepath`: The path to the status file that we'll read from - :param `lock`: A FileLock object that we'll use to lock the file - :param `display_fnf_message`: If True, display the file not found warning. Otherwise don't. + :param status_filepath: The path to the status file that we'll read from. + :param lock_file: The path to the lock file that we'll use to create a FileLock. + :param display_fnf_message: If True, display the file not found warning. Otherwise don't. + :param raise_errors: A boolean indicating whether to ignore errors or raise them. + :param timeout: An integer representing how long to hold a lock for before timing out. :returns: A dict of the contents in the status file """ + # Pylint complains that we're instantiating an abstract class but this is correct usage + lock = FileLock(lock_file) # pylint: disable=abstract-class-instantiated try: # The status files will need locks when reading to avoid race conditions - with lock.acquire(timeout=10): + with lock.acquire(timeout=timeout): with open(status_filepath, "r") as status_file: statuses_read = json.load(status_file) # Handle timeouts - except Timeout: - LOG.warning(f"Timed out when trying to read status from {status_filepath}") + except Timeout as to_exc: + LOG.warning(f"Timed out when trying to read status from '{status_filepath}'") statuses_read = {} + if raise_errors: + raise Timeout from to_exc # Handle FNF errors - except FileNotFoundError: + except FileNotFoundError as fnf_exc: if display_fnf_message: - LOG.warning(f"Could not find {status_filepath}") + LOG.warning(f"Could not find '{status_filepath}'") statuses_read = {} + if raise_errors: + raise FileNotFoundError from fnf_exc + # Handle JSONDecode errors (this is likely due to an empty status file) + except json.decoder.JSONDecodeError as json_exc: + LOG.warning(f"JSONDecodeError raised when trying to read status from '{status_filepath}'") + if raise_errors: + raise json.decoder.JSONDecodeError from json_exc + # Catch all exceptions so that we don't crash the workers + except Exception as exc: # pylint: disable=broad-except + LOG.warning( + f"An exception was raised while trying to read status from '{status_filepath}'!\n{print_exception(type(exc), exc, exc.__traceback__)}" + ) + if raise_errors: + raise exc return statuses_read + + +def write_status(status_to_write: Dict, status_filepath: str, lock_file: str, timeout: int = 10): + """ + Locks the status file for writing. We're not catching any errors here since we likely want to + know if something went wrong in this process. + + :param status_to_write: The status to write to the status file + :param status_filepath: The path to the status file that we'll write the status to + :param lock_file: The path to the lock file we'll use for this status write + :param timeout: A timeout value for the lock so it's always released eventually + """ + # Pylint complains that we're instantiating an abstract class but this is correct usage + try: + lock = FileLock(lock_file) # pylint: disable=abstract-class-instantiated + with lock.acquire(timeout=timeout): + with open(status_filepath, "w") as status_file: + json.dump(status_to_write, status_file) + # Catch all exceptions so that we don't crash the workers + except Exception as exc: # pylint: disable=broad-except + LOG.warning( + f"An exception was raised while trying to write status to '{status_filepath}'!\n{print_exception(type(exc), exc, exc.__traceback__)}" + ) diff --git a/merlin/study/status_constants.py b/merlin/study/status_constants.py index aeb7bfbe6..84af51e53 100644 --- a/merlin/study/status_constants.py +++ b/merlin/study/status_constants.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0 +# This file is part of Merlin, Version: 1.12.1 # # For details, see https://github.com/LLNL/merlin. # @@ -38,7 +38,9 @@ VALID_EXIT_FILTERS = ("E", "EXIT") ALL_VALID_FILTERS = VALID_STATUS_FILTERS + VALID_RETURN_CODES + VALID_EXIT_FILTERS + ("MAX_TASKS",) -CELERY_KEYS = set(["task_queue", "worker_name"]) +# Listing worker_name below since it was used in v1.12.0 so if you try to run "merlin status" on a study +# ran with 1.12.0, then you'll need this key here for everything to function +CELERY_KEYS = set(["task_queue", "workers", "worker_name"]) RUN_TIME_STAT_KEYS = set(["avg_run_time", "run_time_std_dev"]) NON_WORKSPACE_KEYS = CELERY_KEYS.union(RUN_TIME_STAT_KEYS) NON_WORKSPACE_KEYS.add("parameters") diff --git a/merlin/study/status_renderers.py b/merlin/study/status_renderers.py index 7096334f4..b062a5d9b 100644 --- a/merlin/study/status_renderers.py +++ b/merlin/study/status_renderers.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0 +# This file is part of Merlin, Version: 1.12.1 # # For details, see https://github.com/LLNL/merlin. # @@ -122,6 +122,7 @@ def create_param_table(self, parameters: Dict[str, Dict[str, str]]) -> Columns: # Loop through each parameter token/val for this param type and create a row entry for each token/val for token, param_val in param_set.items(): + param_val = str(param_val) param_subtable.add_row(token, param_val, style="row_style") # Add the sub table for this parameter type to the list that will store both sub tables @@ -135,7 +136,7 @@ def create_step_table( step_name: str, parameters: Dict[str, Dict[str, str]], task_queue: Optional[str] = None, - worker_name: Optional[str] = None, + workers: Optional[str] = None, ) -> Table: """ Create each step entry in the display @@ -143,7 +144,7 @@ def create_step_table( :param `step_name`: The name of the step that we're setting the layout for :param `parameters`: The parameters dict for this step :param `task_queue`: The name of the task queue associated with this step if one was provided - :param `worker_name`: The name of the worker that ran this step if one was provided + :param `workers`: The name of the worker(s) that ran this step if one was provided :returns: A rich Table object with info for one sub step (here a 'sub step' is referencing a step with multiple parameters; each parameter set will have it's own entry in the output) """ @@ -156,8 +157,8 @@ def create_step_table( # Top level contains step name and may contain task queue and worker name step_table.add_row("STEP:", step_name, style="Step Name") - if worker_name is not None: - step_table.add_row("WORKER NAME:", worker_name, style="Workspace") + if workers is not None: + step_table.add_row("WORKER(S):", ", ".join(workers), style="Workspace") if task_queue is not None: step_table.add_row("TASK QUEUE:", task_queue, style="Workspace") @@ -180,7 +181,7 @@ def create_task_details_table(self, task_statuses: Dict) -> Table: task_details = Table(title="Task Details") # Setup the columns - cols = ["Step Workspace", "Status", "Return Code", "Elapsed Time", "Run Time", "Restarts"] + cols = ["Step Workspace", "Status", "Return Code", "Elapsed Time", "Run Time", "Restarts", "Worker(s)"] for nominal_col_num, col in enumerate(cols): if col in list(self._theme_dict): col_style = col @@ -208,6 +209,8 @@ def create_task_details_table(self, task_statuses: Dict) -> Table: # If we have a failed task then let's make that stand out by bolding and styling the whole row red if status_info_val in ("FAILED", "UNKNOWN"): row_style = "row_style_failed" + elif status_info_key == "workers": + status_entry.append(", ".join(status_info_val)) else: status_entry.append(str(status_info_val)) @@ -259,12 +262,12 @@ def layout( # Build out the status table by sectioning it off at each step for step_name, overall_step_info in self._status_data.items(): task_queue = overall_step_info["task_queue"] if "task_queue" in overall_step_info else None - worker_name = overall_step_info["worker_name"] if "worker_name" in overall_step_info else None + workers = overall_step_info["workers"] if "workers" in overall_step_info else None # Set up the top section of each step entry # (this section will have step name, task queue, worker name, and parameters) step_table = self.create_step_table( - step_name, overall_step_info["parameters"], task_queue=task_queue, worker_name=worker_name + step_name, overall_step_info["parameters"], task_queue=task_queue, workers=workers ) # Set up the bottom section of each step entry diff --git a/merlin/study/step.py b/merlin/study/step.py index 510d14a9b..95f8e7622 100644 --- a/merlin/study/step.py +++ b/merlin/study/step.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # @@ -29,7 +29,6 @@ ############################################################################### """This module represents all of the logic that goes into a step""" -import json import logging import os import re @@ -38,14 +37,13 @@ from typing import Dict, Optional, Tuple from celery import current_task -from filelock import FileLock from maestrowf.abstracts.enums import State from maestrowf.datastructures.core.executiongraph import _StepRecord from maestrowf.datastructures.core.study import StudyStep from merlin.common.abstracts.enums import ReturnCode from merlin.study.script_adapter import MerlinScriptAdapter -from merlin.study.status import read_status +from merlin.study.status import read_status, write_status from merlin.utils import needs_merlin_expansion @@ -238,8 +236,7 @@ def _update_status_file( # If the status file already exists then we can just add to it if os.path.exists(status_filepath): - lock = FileLock(f"{self.workspace.value}/status.lock") # pylint: disable=E0110 - status_info = read_status(status_filepath, lock) + status_info = read_status(status_filepath, f"{self.workspace.value}/status.lock") else: # Create the parameter entries cmd_params = restart_params = None @@ -259,15 +256,6 @@ def _update_status_file( } } - # Add celery specific info - if task_server == "celery": - from merlin.celery import app # pylint: disable=C0415 - - # If the tasks are always eager, this is a local run and we won't have workers running - if not app.conf.task_always_eager: - status_info[self.name]["task_queue"] = get_current_queue() - status_info[self.name]["worker_name"] = get_current_worker() - # Put together a dict of status info status_info[self.name][self.condensed_workspace] = { "status": state_translator[self.status], @@ -277,9 +265,29 @@ def _update_status_file( "restarts": self.restarts, } + # Add celery specific info + if task_server == "celery": + from merlin.celery import app # pylint: disable=C0415 + + # If the tasks are always eager, this is a local run and we won't have workers running + if not app.conf.task_always_eager: + status_info[self.name]["task_queue"] = get_current_queue() + + # Add the current worker to the workspace-specific status info + current_worker = get_current_worker() + if "workers" not in status_info[self.name][self.condensed_workspace]: + status_info[self.name][self.condensed_workspace]["workers"] = [current_worker] + elif current_worker not in status_info[self.name][self.condensed_workspace]["workers"]: + status_info[self.name][self.condensed_workspace]["workers"].append(current_worker) + + # Add the current worker to the overall-step status info + if "workers" not in status_info[self.name]: + status_info[self.name]["workers"] = [current_worker] + elif current_worker not in status_info[self.name]["workers"]: + status_info[self.name]["workers"].append(current_worker) + LOG.info(f"Writing status for {self.name} to '{status_filepath}...") - with open(status_filepath, "w") as status_file: - json.dump(status_info, status_file) + write_status(status_info, status_filepath, f"{self.workspace.value}/status.lock") LOG.info(f"Status for {self.name} successfully written.") diff --git a/merlin/study/study.py b/merlin/study/study.py index d72c32cb2..2cd2e0fc9 100644 --- a/merlin/study/study.py +++ b/merlin/study/study.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/merlin/utils.py b/merlin/utils.py index eba648669..070638c38 100644 --- a/merlin/utils.py +++ b/merlin/utils.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # @@ -187,7 +187,7 @@ def regex_list_filter(regex, list_to_filter, match=True): return list(filter(r.search, list_to_filter)) -def apply_list_of_regex(regex_list, list_to_filter, result_list, match=False): +def apply_list_of_regex(regex_list, list_to_filter, result_list, match=False, display_warning: bool = True): """ Take a list of regex's, apply each regex to a list we're searching through, and append each result to a result list. @@ -202,7 +202,8 @@ def apply_list_of_regex(regex_list, list_to_filter, result_list, match=False): filter_results = set(regex_list_filter(regex, list_to_filter, match)) if not filter_results: - LOG.warning(f"No regex match for {regex}.") + if display_warning: + LOG.warning(f"No regex match for {regex}.") else: result_list += filter_results @@ -576,6 +577,11 @@ def dict_deep_merge(dict_a, dict_b, path=None): if key in dict_a: if isinstance(dict_a[key], dict) and isinstance(dict_b[key], dict): dict_deep_merge(dict_a[key], dict_b[key], path + [str(key)]) + elif key == "workers": # specifically for status merging + all_workers = [dict_a[key], dict_b[key]] + dict_a[key] = list(set().union(*all_workers)) + elif isinstance(dict_a[key], list) and isinstance(dict_a[key], list): + dict_a[key] += dict_b[key] elif dict_a[key] == dict_b[key]: pass # same leaf value else: diff --git a/mkdocs.yml b/mkdocs.yml index dab8ab114..76c123bd3 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -15,10 +15,10 @@ nav: - User Guide: - User Guide Overview: "user_guide/index.md" - Installation: "user_guide/installation.md" - - Configuration: + - Server Configuration: - Configuration Overview: "user_guide/configuration/index.md" - External Server: "user_guide/configuration/external_server.md" - - Merlin Server: "user_guide/configuration/merlin_server.md" + - Containerized Server: "user_guide/configuration/containerized_server.md" - Command Line Interface: "user_guide/command_line.md" - Specification: "user_guide/specification.md" - Variables: "user_guide/variables.md" diff --git a/setup.py b/setup.py index c00ebc5ea..3fe594fc8 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/tests/celery_test_workers.py b/tests/celery_test_workers.py index 6df1e45a4..d97229664 100644 --- a/tests/celery_test_workers.py +++ b/tests/celery_test_workers.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/tests/conftest.py b/tests/conftest.py index 7deff6419..e180ad910 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/tests/integration/conditions.py b/tests/integration/conditions.py index caccaa94e..535e350de 100644 --- a/tests/integration/conditions.py +++ b/tests/integration/conditions.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/tests/integration/definitions.py b/tests/integration/definitions.py index a2e645c51..28eac2d0e 100644 --- a/tests/integration/definitions.py +++ b/tests/integration/definitions.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/tests/integration/run_tests.py b/tests/integration/run_tests.py index 4a595397d..e54e5d603 100644 --- a/tests/integration/run_tests.py +++ b/tests/integration/run_tests.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/tests/unit/study/__init__.py b/tests/unit/study/__init__.py index c7b9c10f5..c76918410 100644 --- a/tests/unit/study/__init__.py +++ b/tests/unit/study/__init__.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/tests/unit/study/status_test_files/combine_status_files.py b/tests/unit/study/status_test_files/combine_status_files.py index 473cb08f6..7d5413e6a 100644 --- a/tests/unit/study/status_test_files/combine_status_files.py +++ b/tests/unit/study/status_test_files/combine_status_files.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/tests/unit/study/status_test_files/shared_tests.py b/tests/unit/study/status_test_files/shared_tests.py index 30f7d1a9e..b003b993e 100644 --- a/tests/unit/study/status_test_files/shared_tests.py +++ b/tests/unit/study/status_test_files/shared_tests.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/tests/unit/study/status_test_files/status_test_study_20230717-162921/cancel_step/MERLIN_STATUS.json b/tests/unit/study/status_test_files/status_test_study_20230717-162921/cancel_step/MERLIN_STATUS.json index 8b5ddd35d..4c4db3866 100644 --- a/tests/unit/study/status_test_files/status_test_study_20230717-162921/cancel_step/MERLIN_STATUS.json +++ b/tests/unit/study/status_test_files/status_test_study_20230717-162921/cancel_step/MERLIN_STATUS.json @@ -5,13 +5,14 @@ "restart": null }, "task_queue": "cancel_queue", - "worker_name": "other_worker", + "workers": ["other_worker"], "cancel_step": { "status": "CANCELLED", "return_code": "MERLIN_STOP_WORKERS", "elapsed_time": "0d:00h:00m:00s", "run_time": "0d:00h:00m:00s", - "restarts": 0 + "restarts": 0, + "workers": ["other_worker"] } } } \ No newline at end of file diff --git a/tests/unit/study/status_test_files/status_test_study_20230717-162921/fail_step/MERLIN_STATUS.json b/tests/unit/study/status_test_files/status_test_study_20230717-162921/fail_step/MERLIN_STATUS.json index 6e076a26e..9f49821cb 100644 --- a/tests/unit/study/status_test_files/status_test_study_20230717-162921/fail_step/MERLIN_STATUS.json +++ b/tests/unit/study/status_test_files/status_test_study_20230717-162921/fail_step/MERLIN_STATUS.json @@ -5,13 +5,14 @@ "restart": null }, "task_queue": "fail_queue", - "worker_name": "other_worker", + "workers": ["other_worker"], "fail_step": { "status": "FAILED", "return_code": "MERLIN_SOFT_FAIL", "elapsed_time": "0d:00h:00m:00s", "run_time": "0d:00h:00m:00s", - "restarts": 0 + "restarts": 0, + "workers": ["other_worker"] } } } \ No newline at end of file diff --git a/tests/unit/study/status_test_files/status_test_study_20230717-162921/just_parameters/GREET.hello.LEAVE.goodbye/MERLIN_STATUS.json b/tests/unit/study/status_test_files/status_test_study_20230717-162921/just_parameters/GREET.hello.LEAVE.goodbye/MERLIN_STATUS.json index 406c090b7..2801ba31f 100644 --- a/tests/unit/study/status_test_files/status_test_study_20230717-162921/just_parameters/GREET.hello.LEAVE.goodbye/MERLIN_STATUS.json +++ b/tests/unit/study/status_test_files/status_test_study_20230717-162921/just_parameters/GREET.hello.LEAVE.goodbye/MERLIN_STATUS.json @@ -9,13 +9,14 @@ } }, "task_queue": "just_parameters_queue", - "worker_name": "other_worker", + "workers": ["other_worker"], "just_parameters/GREET.hello.LEAVE.goodbye": { "status": "FINISHED", "return_code": "MERLIN_SUCCESS", "elapsed_time": "0d:00h:02m:00s", "run_time": "0d:00h:01m:30s", - "restarts": 0 + "restarts": 0, + "workers": ["other_worker"] } } } \ No newline at end of file diff --git a/tests/unit/study/status_test_files/status_test_study_20230717-162921/just_parameters/GREET.hola.LEAVE.adios/MERLIN_STATUS.json b/tests/unit/study/status_test_files/status_test_study_20230717-162921/just_parameters/GREET.hola.LEAVE.adios/MERLIN_STATUS.json index bf783d98f..e677de353 100644 --- a/tests/unit/study/status_test_files/status_test_study_20230717-162921/just_parameters/GREET.hola.LEAVE.adios/MERLIN_STATUS.json +++ b/tests/unit/study/status_test_files/status_test_study_20230717-162921/just_parameters/GREET.hola.LEAVE.adios/MERLIN_STATUS.json @@ -9,13 +9,14 @@ } }, "task_queue": "just_parameters_queue", - "worker_name": "other_worker", + "workers": ["other_worker"], "just_parameters/GREET.hola.LEAVE.adios": { "status": "FINISHED", "return_code": "MERLIN_SUCCESS", "elapsed_time": "0d:00h:01m:00s", "run_time": "0d:00h:01m:00s", - "restarts": 0 + "restarts": 0, + "workers": ["other_worker"] } } } \ No newline at end of file diff --git a/tests/unit/study/status_test_files/status_test_study_20230717-162921/just_samples/MERLIN_STATUS.json b/tests/unit/study/status_test_files/status_test_study_20230717-162921/just_samples/MERLIN_STATUS.json index d7df3d153..1e6daeffb 100644 --- a/tests/unit/study/status_test_files/status_test_study_20230717-162921/just_samples/MERLIN_STATUS.json +++ b/tests/unit/study/status_test_files/status_test_study_20230717-162921/just_samples/MERLIN_STATUS.json @@ -5,41 +5,46 @@ "restart": null }, "task_queue": "just_samples_queue", - "worker_name": "sample_worker", + "workers": ["sample_worker"], "just_samples/00": { "status": "FINISHED", "return_code": "MERLIN_SUCCESS", "elapsed_time": "0d:00h:02m:00s", "run_time": "0d:00h:01m:00s", - "restarts": 0 + "restarts": 0, + "workers": ["sample_worker"] }, "just_samples/01": { "status": "FINISHED", "return_code": "MERLIN_SUCCESS", "elapsed_time": "0d:00h:02m:00s", "run_time": "0d:00h:01m:15s", - "restarts": 0 + "restarts": 0, + "workers": ["sample_worker"] }, "just_samples/02": { "status": "FINISHED", "return_code": "MERLIN_SUCCESS", "elapsed_time": "0d:00h:02m:00s", "run_time": "0d:00h:01m:30s", - "restarts": 0 + "restarts": 0, + "workers": ["sample_worker"] }, "just_samples/03": { "status": "FINISHED", "return_code": "MERLIN_SUCCESS", "elapsed_time": "0d:00h:02m:00s", "run_time": "0d:00h:01m:45s", - "restarts": 0 + "restarts": 0, + "workers": ["sample_worker"] }, "just_samples/04": { "status": "FINISHED", "return_code": "MERLIN_SUCCESS", "elapsed_time": "0d:00h:02m:00s", "run_time": "0d:00h:02m:00s", - "restarts": 0 + "restarts": 0, + "workers": ["sample_worker"] } } } \ No newline at end of file diff --git a/tests/unit/study/status_test_files/status_test_study_20230717-162921/params_and_samples/GREET.hello/MERLIN_STATUS.json b/tests/unit/study/status_test_files/status_test_study_20230717-162921/params_and_samples/GREET.hello/MERLIN_STATUS.json index 364248c43..140c019bd 100644 --- a/tests/unit/study/status_test_files/status_test_study_20230717-162921/params_and_samples/GREET.hello/MERLIN_STATUS.json +++ b/tests/unit/study/status_test_files/status_test_study_20230717-162921/params_and_samples/GREET.hello/MERLIN_STATUS.json @@ -7,41 +7,46 @@ "restart": null }, "task_queue": "both_queue", - "worker_name": "sample_worker", + "workers": ["sample_worker"], "params_and_samples/GREET.hello/00": { "status": "FINISHED", "return_code": "MERLIN_SUCCESS", "elapsed_time": "0d:00h:00m:15s", "run_time": "0d:00h:00m:10s", - "restarts": 0 + "restarts": 0, + "workers": ["sample_worker"] }, "params_and_samples/GREET.hello/01": { "status": "FINISHED", "return_code": "MERLIN_SUCCESS", "elapsed_time": "0d:00h:00m:15s", "run_time": "0d:00h:00m:11s", - "restarts": 0 + "restarts": 0, + "workers": ["sample_worker"] }, "params_and_samples/GREET.hello/02": { "status": "FINISHED", "return_code": "MERLIN_SUCCESS", "elapsed_time": "0d:00h:00m:15s", "run_time": "0d:00h:00m:12s", - "restarts": 0 + "restarts": 0, + "workers": ["sample_worker"] }, "params_and_samples/GREET.hello/03": { "status": "FINISHED", "return_code": "MERLIN_SUCCESS", "elapsed_time": "0d:00h:00m:15s", "run_time": "0d:00h:00m:13s", - "restarts": 0 + "restarts": 0, + "workers": ["sample_worker"] }, "params_and_samples/GREET.hello/04": { "status": "FINISHED", "return_code": "MERLIN_SUCCESS", "elapsed_time": "0d:00h:00m:15s", "run_time": "0d:00h:00m:14s", - "restarts": 0 + "restarts": 0, + "workers": ["sample_worker"] } } } \ No newline at end of file diff --git a/tests/unit/study/status_test_files/status_test_study_20230717-162921/params_and_samples/GREET.hola/MERLIN_STATUS.json b/tests/unit/study/status_test_files/status_test_study_20230717-162921/params_and_samples/GREET.hola/MERLIN_STATUS.json index b256e65f5..ccf17967f 100644 --- a/tests/unit/study/status_test_files/status_test_study_20230717-162921/params_and_samples/GREET.hola/MERLIN_STATUS.json +++ b/tests/unit/study/status_test_files/status_test_study_20230717-162921/params_and_samples/GREET.hola/MERLIN_STATUS.json @@ -7,41 +7,46 @@ "restart": null }, "task_queue": "both_queue", - "worker_name": "sample_worker", + "workers": ["sample_worker"], "params_and_samples/GREET.hola/00": { "status": "FINISHED", "return_code": "MERLIN_SUCCESS", "elapsed_time": "0d:00h:00m:30s", "run_time": "0d:00h:00m:10s", - "restarts": 0 + "restarts": 0, + "workers": ["sample_worker"] }, "params_and_samples/GREET.hola/01": { "status": "FINISHED", "return_code": "MERLIN_SUCCESS", "elapsed_time": "0d:00h:00m:30s", "run_time": "0d:00h:00m:18s", - "restarts": 0 + "restarts": 0, + "workers": ["sample_worker"] }, "params_and_samples/GREET.hola/02": { "status": "FINISHED", "return_code": "MERLIN_SUCCESS", "elapsed_time": "0d:00h:00m:30s", "run_time": "0d:00h:00m:23s", - "restarts": 0 + "restarts": 0, + "workers": ["sample_worker"] }, "params_and_samples/GREET.hola/03": { "status": "FINISHED", "return_code": "MERLIN_SUCCESS", "elapsed_time": "0d:00h:00m:30s", "run_time": "0d:00h:00m:29s", - "restarts": 0 + "restarts": 0, + "workers": ["sample_worker"] }, "params_and_samples/GREET.hola/04": { "status": "FINISHED", "return_code": "MERLIN_SUCCESS", "elapsed_time": "0d:00h:00m:30s", "run_time": "0d:00h:00m:16s", - "restarts": 0 + "restarts": 0, + "workers": ["sample_worker"] } } } \ No newline at end of file diff --git a/tests/unit/study/status_test_files/status_test_variables.py b/tests/unit/study/status_test_files/status_test_variables.py index 8a84d1808..c941ad0c5 100644 --- a/tests/unit/study/status_test_files/status_test_variables.py +++ b/tests/unit/study/status_test_files/status_test_variables.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # @@ -57,18 +57,75 @@ } NUM_ALL_REQUESTED_STATUSES = sum(TASKS_PER_STEP.values()) - TASKS_PER_STEP["unstarted_step"] +# This is requested statuses with just the steps that were processed by 'other_worker' +REQUESTED_STATUSES_JUST_OTHER_WORKER = { + "just_parameters_GREET.hello.LEAVE.goodbye": { + "parameters": {"cmd": {"GREET": "hello"}, "restart": {"LEAVE": "goodbye"}}, + "task_queue": "just_parameters_queue", + "workers": ["other_worker"], + "just_parameters/GREET.hello.LEAVE.goodbye": { + "status": "FINISHED", + "return_code": "MERLIN_SUCCESS", + "elapsed_time": "0d:00h:02m:00s", + "run_time": "0d:00h:01m:30s", + "restarts": 0, + "workers": ["other_worker"], + }, + }, + "just_parameters_GREET.hola.LEAVE.adios": { + "parameters": {"cmd": {"GREET": "hola"}, "restart": {"LEAVE": "adios"}}, + "task_queue": "just_parameters_queue", + "workers": ["other_worker"], + "just_parameters/GREET.hola.LEAVE.adios": { + "status": "FINISHED", + "return_code": "MERLIN_SUCCESS", + "elapsed_time": "0d:00h:01m:00s", + "run_time": "0d:00h:01m:00s", + "restarts": 0, + "workers": ["other_worker"], + }, + }, + "fail_step": { + "parameters": {"cmd": None, "restart": None}, + "task_queue": "fail_queue", + "workers": ["other_worker"], + "fail_step": { + "status": "FAILED", + "return_code": "MERLIN_SOFT_FAIL", + "elapsed_time": "0d:00h:00m:00s", + "run_time": "0d:00h:00m:00s", + "restarts": 0, + "workers": ["other_worker"], + }, + }, + "cancel_step": { + "parameters": {"cmd": None, "restart": None}, + "task_queue": "cancel_queue", + "workers": ["other_worker"], + "cancel_step": { + "status": "CANCELLED", + "return_code": "MERLIN_STOP_WORKERS", + "elapsed_time": "0d:00h:00m:00s", + "run_time": "0d:00h:00m:00s", + "restarts": 0, + "workers": ["other_worker"], + }, + }, +} + # This is the requested statuses with just the failed step REQUESTED_STATUSES_JUST_FAILED_STEP = { "fail_step": { "parameters": {"cmd": None, "restart": None}, "task_queue": "fail_queue", - "worker_name": "other_worker", + "workers": ["other_worker"], "fail_step": { "status": "FAILED", "return_code": "MERLIN_SOFT_FAIL", "elapsed_time": "0d:00h:00m:00s", "run_time": "0d:00h:00m:00s", "restarts": 0, + "workers": ["other_worker"], }, } } @@ -78,13 +135,14 @@ "cancel_step": { "parameters": {"cmd": None, "restart": None}, "task_queue": "cancel_queue", - "worker_name": "other_worker", + "workers": ["other_worker"], "cancel_step": { "status": "CANCELLED", "return_code": "MERLIN_STOP_WORKERS", "elapsed_time": "0d:00h:00m:00s", "run_time": "0d:00h:00m:00s", "restarts": 0, + "workers": ["other_worker"], }, } } @@ -94,25 +152,27 @@ "fail_step": { "parameters": {"cmd": None, "restart": None}, "task_queue": "fail_queue", - "worker_name": "other_worker", + "workers": ["other_worker"], "fail_step": { "status": "FAILED", "return_code": "MERLIN_SOFT_FAIL", "elapsed_time": "0d:00h:00m:00s", "run_time": "0d:00h:00m:00s", "restarts": 0, + "workers": ["other_worker"], }, }, "cancel_step": { "parameters": {"cmd": None, "restart": None}, "task_queue": "cancel_queue", - "worker_name": "other_worker", + "workers": ["other_worker"], "cancel_step": { "status": "CANCELLED", "return_code": "MERLIN_STOP_WORKERS", "elapsed_time": "0d:00h:00m:00s", "run_time": "0d:00h:00m:00s", "restarts": 0, + "workers": ["other_worker"], }, }, } @@ -128,7 +188,7 @@ "cmd_parameters": ["-------", "-------"], "restart_parameters": ["-------", "-------"], "task_queue": ["fail_queue", "cancel_queue"], - "worker_name": ["other_worker", "other_worker"], + "workers": ["other_worker", "other_worker"], } # This variable holds the state_info dict of every step from VALID_WORKSPACE @@ -141,9 +201,9 @@ "UNKNOWN": {"count": 0, "color": "\033[38;2;102;102;102m", "fill": "?"}, "INITIALIZED": {"count": 0, "color": "\033[38;2;86;180;233m"}, "RUNNING": {"count": 0, "color": "\033[38;2;0;114;178m"}, - "DRY RUN": {"count": 0, "color": "\033[38;2;230;159;0m", "fill": "\\"}, + "DRY_RUN": {"count": 0, "color": "\033[38;2;230;159;0m", "fill": "\\"}, "TOTAL TASKS": {"total": 5}, - "WORKER NAME": {"name": "sample_worker"}, + "WORKER(S)": {"name": "sample_worker"}, "TASK QUEUE": {"name": "just_samples_queue"}, "AVG RUN TIME": "01m:30s", "RUN TIME STD DEV": "±21s", @@ -155,9 +215,9 @@ "UNKNOWN": {"count": 0, "color": "\033[38;2;102;102;102m", "fill": "?"}, "INITIALIZED": {"count": 0, "color": "\033[38;2;86;180;233m"}, "RUNNING": {"count": 0, "color": "\033[38;2;0;114;178m"}, - "DRY RUN": {"count": 0, "color": "\033[38;2;230;159;0m", "fill": "\\"}, + "DRY_RUN": {"count": 0, "color": "\033[38;2;230;159;0m", "fill": "\\"}, "TOTAL TASKS": {"total": 2}, - "WORKER NAME": {"name": "other_worker"}, + "WORKER(S)": {"name": "other_worker"}, "TASK QUEUE": {"name": "just_parameters_queue"}, "AVG RUN TIME": "01m:15s", "RUN TIME STD DEV": "±15s", @@ -169,9 +229,9 @@ "UNKNOWN": {"count": 0, "color": "\033[38;2;102;102;102m", "fill": "?"}, "INITIALIZED": {"count": 0, "color": "\033[38;2;86;180;233m"}, "RUNNING": {"count": 0, "color": "\033[38;2;0;114;178m"}, - "DRY RUN": {"count": 0, "color": "\033[38;2;230;159;0m", "fill": "\\"}, + "DRY_RUN": {"count": 0, "color": "\033[38;2;230;159;0m", "fill": "\\"}, "TOTAL TASKS": {"total": 10}, - "WORKER NAME": {"name": "sample_worker"}, + "WORKER(S)": {"name": "sample_worker"}, "TASK QUEUE": {"name": "both_queue"}, "AVG RUN TIME": "16s", "RUN TIME STD DEV": "±06s", @@ -183,9 +243,9 @@ "UNKNOWN": {"count": 0, "color": "\033[38;2;102;102;102m", "fill": "?"}, "INITIALIZED": {"count": 0, "color": "\033[38;2;86;180;233m"}, "RUNNING": {"count": 0, "color": "\033[38;2;0;114;178m"}, - "DRY RUN": {"count": 0, "color": "\033[38;2;230;159;0m", "fill": "\\"}, + "DRY_RUN": {"count": 0, "color": "\033[38;2;230;159;0m", "fill": "\\"}, "TOTAL TASKS": {"total": 1}, - "WORKER NAME": {"name": "other_worker"}, + "WORKER(S)": {"name": "other_worker"}, "TASK QUEUE": {"name": "fail_queue"}, "AVG RUN TIME": "00s", "RUN TIME STD DEV": "±00s", @@ -197,9 +257,9 @@ "UNKNOWN": {"count": 0, "color": "\033[38;2;102;102;102m", "fill": "?"}, "INITIALIZED": {"count": 0, "color": "\033[38;2;86;180;233m"}, "RUNNING": {"count": 0, "color": "\033[38;2;0;114;178m"}, - "DRY RUN": {"count": 0, "color": "\033[38;2;230;159;0m", "fill": "\\"}, + "DRY_RUN": {"count": 0, "color": "\033[38;2;230;159;0m", "fill": "\\"}, "TOTAL TASKS": {"total": 1}, - "WORKER NAME": {"name": "other_worker"}, + "WORKER(S)": {"name": "other_worker"}, "TASK QUEUE": {"name": "cancel_queue"}, "AVG RUN TIME": "00s", "RUN TIME STD DEV": "±00s", @@ -236,37 +296,40 @@ "just_parameters_GREET.hello.LEAVE.goodbye": { "parameters": {"cmd": {"GREET": "hello"}, "restart": {"LEAVE": "goodbye"}}, "task_queue": "just_parameters_queue", - "worker_name": "other_worker", + "workers": ["other_worker"], "just_parameters/GREET.hello.LEAVE.goodbye": { "status": "FINISHED", "return_code": "MERLIN_SUCCESS", "elapsed_time": "0d:00h:02m:00s", "run_time": "0d:00h:01m:30s", "restarts": 0, + "workers": ["other_worker"], }, }, "just_parameters_GREET.hola.LEAVE.adios": { "parameters": {"cmd": {"GREET": "hola"}, "restart": {"LEAVE": "adios"}}, "task_queue": "just_parameters_queue", - "worker_name": "other_worker", + "workers": ["other_worker"], "just_parameters/GREET.hola.LEAVE.adios": { "status": "FINISHED", "return_code": "MERLIN_SUCCESS", "elapsed_time": "0d:00h:01m:00s", "run_time": "0d:00h:01m:00s", "restarts": 0, + "workers": ["other_worker"], }, }, "just_samples": { "parameters": {"cmd": None, "restart": None}, "task_queue": "just_samples_queue", - "worker_name": "sample_worker", + "workers": ["sample_worker"], "just_samples/00": { "status": "FINISHED", "return_code": "MERLIN_SUCCESS", "elapsed_time": "0d:00h:02m:00s", "run_time": "0d:00h:01m:00s", "restarts": 0, + "workers": ["sample_worker"], }, "just_samples/01": { "status": "FINISHED", @@ -274,6 +337,7 @@ "elapsed_time": "0d:00h:02m:00s", "run_time": "0d:00h:01m:15s", "restarts": 0, + "workers": ["sample_worker"], }, "just_samples/02": { "status": "FINISHED", @@ -281,6 +345,7 @@ "elapsed_time": "0d:00h:02m:00s", "run_time": "0d:00h:01m:30s", "restarts": 0, + "workers": ["sample_worker"], }, "just_samples/03": { "status": "FINISHED", @@ -288,6 +353,7 @@ "elapsed_time": "0d:00h:02m:00s", "run_time": "0d:00h:01m:45s", "restarts": 0, + "workers": ["sample_worker"], }, "just_samples/04": { "status": "FINISHED", @@ -295,18 +361,20 @@ "elapsed_time": "0d:00h:02m:00s", "run_time": "0d:00h:02m:00s", "restarts": 0, + "workers": ["sample_worker"], }, }, "params_and_samples_GREET.hello": { "parameters": {"cmd": {"GREET": "hello"}, "restart": None}, "task_queue": "both_queue", - "worker_name": "sample_worker", + "workers": ["sample_worker"], "params_and_samples/GREET.hello/00": { "status": "FINISHED", "return_code": "MERLIN_SUCCESS", "elapsed_time": "0d:00h:00m:15s", "run_time": "0d:00h:00m:10s", "restarts": 0, + "workers": ["sample_worker"], }, "params_and_samples/GREET.hello/01": { "status": "FINISHED", @@ -314,6 +382,7 @@ "elapsed_time": "0d:00h:00m:15s", "run_time": "0d:00h:00m:11s", "restarts": 0, + "workers": ["sample_worker"], }, "params_and_samples/GREET.hello/02": { "status": "FINISHED", @@ -321,6 +390,7 @@ "elapsed_time": "0d:00h:00m:15s", "run_time": "0d:00h:00m:12s", "restarts": 0, + "workers": ["sample_worker"], }, "params_and_samples/GREET.hello/03": { "status": "FINISHED", @@ -328,6 +398,7 @@ "elapsed_time": "0d:00h:00m:15s", "run_time": "0d:00h:00m:13s", "restarts": 0, + "workers": ["sample_worker"], }, "params_and_samples/GREET.hello/04": { "status": "FINISHED", @@ -335,18 +406,20 @@ "elapsed_time": "0d:00h:00m:15s", "run_time": "0d:00h:00m:14s", "restarts": 0, + "workers": ["sample_worker"], }, }, "params_and_samples_GREET.hola": { "parameters": {"cmd": {"GREET": "hola"}, "restart": None}, "task_queue": "both_queue", - "worker_name": "sample_worker", + "workers": ["sample_worker"], "params_and_samples/GREET.hola/00": { "status": "FINISHED", "return_code": "MERLIN_SUCCESS", "elapsed_time": "0d:00h:00m:30s", "run_time": "0d:00h:00m:10s", "restarts": 0, + "workers": ["sample_worker"], }, "params_and_samples/GREET.hola/01": { "status": "FINISHED", @@ -354,6 +427,7 @@ "elapsed_time": "0d:00h:00m:30s", "run_time": "0d:00h:00m:18s", "restarts": 0, + "workers": ["sample_worker"], }, "params_and_samples/GREET.hola/02": { "status": "FINISHED", @@ -361,6 +435,7 @@ "elapsed_time": "0d:00h:00m:30s", "run_time": "0d:00h:00m:23s", "restarts": 0, + "workers": ["sample_worker"], }, "params_and_samples/GREET.hola/03": { "status": "FINISHED", @@ -368,6 +443,7 @@ "elapsed_time": "0d:00h:00m:30s", "run_time": "0d:00h:00m:29s", "restarts": 0, + "workers": ["sample_worker"], }, "params_and_samples/GREET.hola/04": { "status": "FINISHED", @@ -375,30 +451,33 @@ "elapsed_time": "0d:00h:00m:30s", "run_time": "0d:00h:00m:16s", "restarts": 0, + "workers": ["sample_worker"], }, }, "fail_step": { "parameters": {"cmd": None, "restart": None}, "task_queue": "fail_queue", - "worker_name": "other_worker", + "workers": ["other_worker"], "fail_step": { "status": "FAILED", "return_code": "MERLIN_SOFT_FAIL", "elapsed_time": "0d:00h:00m:00s", "run_time": "0d:00h:00m:00s", "restarts": 0, + "workers": ["other_worker"], }, }, "cancel_step": { "parameters": {"cmd": None, "restart": None}, "task_queue": "cancel_queue", - "worker_name": "other_worker", + "workers": ["other_worker"], "cancel_step": { "status": "CANCELLED", "return_code": "MERLIN_STOP_WORKERS", "elapsed_time": "0d:00h:00m:00s", "run_time": "0d:00h:00m:00s", "restarts": 0, + "workers": ["other_worker"], }, }, } @@ -596,7 +675,7 @@ "fail_queue", "cancel_queue", ], - "worker_name": [ + "workers": [ "other_worker", "other_worker", "sample_worker", diff --git a/tests/unit/study/test_celeryadapter.py b/tests/unit/study/test_celeryadapter.py index 559252767..d94f16047 100644 --- a/tests/unit/study/test_celeryadapter.py +++ b/tests/unit/study/test_celeryadapter.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # diff --git a/tests/unit/study/test_detailed_status.py b/tests/unit/study/test_detailed_status.py index e03d937c3..8c7f0f600 100644 --- a/tests/unit/study/test_detailed_status.py +++ b/tests/unit/study/test_detailed_status.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # @@ -294,35 +294,43 @@ def test_prompt_for_study_with_invalid_input(self): # Testing get_user_filters() ############################################### - def run_get_user_filters_test(self, inputs_to_test: List[str], expected_outputs: List[List[str]]): + def run_mock_input_with_filters(self, input_to_test: str, expected_return: bool, max_tasks: str = None): """ - This will pass every input in `inputs_to_test` to the get_user_filters - method. All inputs in `inputs_to_test` should be valid inputs to the - prompt displayed in the get_user_filters method. After passing inputs in, - we will capture the result of running that method and compare it against - the expected outputs from `expected_outputs`. + This will pass in `input_to_test` (and `max_tasks` if set) as input to the prompt + that's displayed by the `get_user_filters` method. This function will then compare + the expected return vs the actual return value. + + Not explicitly shown in this function is the side effect that `get_user_filters` + will modify one or more of `self.args.task_status`, `self.args.return_code`, + `self.args.workers`, and/or `self.args.max_tasks`. The values set for these will + be compared in the test method that calls this method. - :param `inputs_to_test`: A list of valid inputs to give to the prompt displayed in get_user_filters - :param `expected_outputs`: A list of expected outputs corresponding to the inputs provided in - `inputs_to_test`. Each expected output should be a list + :param input_to_test: A string to pass into the input prompt raised by `get_user_filters` + :param expected_return: The expected return value of the `get_user_filters` call + :param max_tasks: A string (really an int) to pass into the second input prompt raised + by `get_user_filters` when `MAX_TASKS` is requested """ - # Ensure the number of inputs matches the number of outputs - if len(inputs_to_test) != len(expected_outputs): - raise ValueError("The run_get_user_filters_test method requires that both arguments are the same length.") + # Add max_tasks entry to our side effect if necessary (this is what's passed as input) + side_effect = [input_to_test] + if max_tasks is not None: + side_effect.append(max_tasks) # Redirect the input prompt to be stored in mock_input and not displayed in stdout - with patch("builtins.input", side_effect=inputs_to_test) as mock_input: - for expected_output in expected_outputs: - # We use patch here to keep stdout from get_user_filters from being displayed - with patch("sys.stdout"): - # Run the method we're testing and capture the result - result = self.detailed_status_obj.get_user_filters() + with patch("builtins.input", side_effect=side_effect) as mock_input: + # We use patch here to keep stdout from get_user_filters from being displayed + with patch("sys.stdout"): + # Run the method we're testing and capture the result + result = self.detailed_status_obj.get_user_filters() + + calls = [call("How would you like to filter the tasks? ")] + if max_tasks is not None: + calls.append(call("What limit would you like to set? (must be an integer greater than 0) ")) - # Make sure the prompt is called with the initial prompt message - mock_input.assert_called_with("How would you like to filter the tasks? ") + # Make sure the prompt is called with the initial prompt message + # mock_input.assert_called_with("How would you like to filter the tasks? ") + mock_input.assert_has_calls(calls) - # Ensure the result matches the expected output - self.assertEqual(result, expected_output) + self.assertEqual(result, expected_return) def run_invalid_get_user_filters_test(self, inputs_to_test: List[str]): """ @@ -355,13 +363,28 @@ def run_invalid_get_user_filters_test(self, inputs_to_test: List[str]): # to account for that when we check how many invalid msgs we got in our output self.assertEqual(len(all_invalid_msgs), len(inputs_to_test) - 1) + def reset_filters(self): + """ + Reset the filters so they can be set again from a starting stage. + """ + self.detailed_status_obj.args.task_status = None + self.detailed_status_obj.args.return_code = None + self.detailed_status_obj.args.workers = None + self.detailed_status_obj.args.max_tasks = None + def test_get_user_filters_exit(self): """ This will test the exit input to the get_user_filters method. """ inputs_to_test = ["E", "EXIT", "E, EXIT"] - expected_outputs = [["E"], ["EXIT"], ["E", "EXIT"]] - self.run_get_user_filters_test(inputs_to_test, expected_outputs) + for input_to_test in inputs_to_test: + self.run_mock_input_with_filters(input_to_test, True) # The return should be true so we know to exit + self.assertEqual(self.detailed_status_obj.args.task_status, None) + self.assertEqual(self.detailed_status_obj.args.return_code, None) + self.assertEqual(self.detailed_status_obj.args.workers, None) + self.assertEqual(self.detailed_status_obj.args.max_tasks, None) + + self.reset_filters() def test_get_user_filters_task_status(self): """ @@ -369,7 +392,14 @@ def test_get_user_filters_task_status(self): """ inputs_to_test = ["FAILED", "CANCELLED", "FAILED, CANCELLED"] expected_outputs = [["FAILED"], ["CANCELLED"], ["FAILED", "CANCELLED"]] - self.run_get_user_filters_test(inputs_to_test, expected_outputs) + for input_to_test, expected_output in zip(inputs_to_test, expected_outputs): + self.run_mock_input_with_filters(input_to_test, False) + self.assertEqual(self.detailed_status_obj.args.task_status, expected_output) + self.assertEqual(self.detailed_status_obj.args.return_code, None) + self.assertEqual(self.detailed_status_obj.args.workers, None) + self.assertEqual(self.detailed_status_obj.args.max_tasks, None) + + self.reset_filters() def test_get_user_filters_return_codes(self): """ @@ -377,37 +407,116 @@ def test_get_user_filters_return_codes(self): """ inputs_to_test = ["SOFT_FAIL", "STOP_WORKERS", "SOFT_FAIL, STOP_WORKERS"] expected_outputs = [["SOFT_FAIL"], ["STOP_WORKERS"], ["SOFT_FAIL", "STOP_WORKERS"]] - self.run_get_user_filters_test(inputs_to_test, expected_outputs) + for input_to_test, expected_output in zip(inputs_to_test, expected_outputs): + self.run_mock_input_with_filters(input_to_test, False) + self.assertEqual(self.detailed_status_obj.args.task_status, None) + self.assertEqual(self.detailed_status_obj.args.return_code, expected_output) + self.assertEqual(self.detailed_status_obj.args.workers, None) + self.assertEqual(self.detailed_status_obj.args.max_tasks, None) + + self.reset_filters() def test_get_user_filters_max_tasks(self): """ This will test the max tasks input to the get_user_filters method. """ inputs_to_test = ["MAX_TASKS"] - expected_outputs = [["MAX_TASKS"]] - self.run_get_user_filters_test(inputs_to_test, expected_outputs) + max_tasks = 23 + for input_to_test in inputs_to_test: + self.run_mock_input_with_filters(input_to_test, False, max_tasks=str(max_tasks)) + self.assertEqual(self.detailed_status_obj.args.task_status, None) + self.assertEqual(self.detailed_status_obj.args.return_code, None) + self.assertEqual(self.detailed_status_obj.args.workers, None) + self.assertEqual(self.detailed_status_obj.args.max_tasks, max_tasks) + + self.reset_filters() + + def test_get_user_filters_status_and_return_code(self): + """ + This will test a combination of the task status and return code filters as inputs + to the get_user_filters method. The only args that should be set here are the task_status + and return_code args. + """ + filter1 = "CANCELLED" + filter2 = "SOFT_FAIL" + self.run_mock_input_with_filters(", ".join([filter1, filter2]), False) + self.assertEqual(self.detailed_status_obj.args.task_status, [filter1]) + self.assertEqual(self.detailed_status_obj.args.return_code, [filter2]) + self.assertEqual(self.detailed_status_obj.args.workers, None) + self.assertEqual(self.detailed_status_obj.args.max_tasks, None) - def test_get_user_filters_combination(self): + def test_get_user_filters_status_and_workers(self): """ - This will test a combination of filters as inputs to the get_user_filters method. + This will test a combination of the task status and workers filters as inputs + to the get_user_filters method. The only args that should be set here are the task_status + and workers args. """ - inputs_to_test = [ - "CANCELLED, SOFT_FAIL", # testing return code and task status being used together - "STOP_WORKERS, MAX_TASKS", # testing return code and max tasks being used together - "STOP_WORKERS, EXIT", # testing return code and exit being used together - "FAILED, MAX_TASKS", # testing task status and max tasks being used together - "CANCELLED, EXIT", # testing task status and exit being used together - "MAX_TASKS, EXIT", # testing max tasks and exit being used together - ] - expected_outputs = [ - ["CANCELLED", "SOFT_FAIL"], - ["STOP_WORKERS", "MAX_TASKS"], - ["STOP_WORKERS", "EXIT"], - ["FAILED", "MAX_TASKS"], - ["CANCELLED", "EXIT"], - ["MAX_TASKS", "EXIT"], - ] - self.run_get_user_filters_test(inputs_to_test, expected_outputs) + filter1 = "CANCELLED" + filter2 = "sample_worker" + self.run_mock_input_with_filters(", ".join([filter1, filter2]), False) + self.assertEqual(self.detailed_status_obj.args.task_status, [filter1]) + self.assertEqual(self.detailed_status_obj.args.return_code, None) + self.assertEqual(self.detailed_status_obj.args.workers, [filter2]) + self.assertEqual(self.detailed_status_obj.args.max_tasks, None) + + def test_get_user_filters_status_and_max_tasks(self): + """ + This will test a combination of the task status and max tasks filters as inputs + to the get_user_filters method. The only args that should be set here are the task_status + and max_tasks args. + """ + filter1 = "FINISHED" + filter2 = "MAX_TASKS" + max_tasks = 4 + self.run_mock_input_with_filters(", ".join([filter1, filter2]), False, max_tasks=str(max_tasks)) + self.assertEqual(self.detailed_status_obj.args.task_status, [filter1]) + self.assertEqual(self.detailed_status_obj.args.return_code, None) + self.assertEqual(self.detailed_status_obj.args.workers, None) + self.assertEqual(self.detailed_status_obj.args.max_tasks, max_tasks) + + def test_get_user_filters_return_code_and_workers(self): + """ + This will test a combination of the return code and workers filters as inputs + to the get_user_filters method. The only args that should be set here are the return_code + and workers args. + """ + filter1 = "STOP_WORKERS" + filter2 = "sample_worker" + self.run_mock_input_with_filters(", ".join([filter1, filter2]), False) + self.assertEqual(self.detailed_status_obj.args.task_status, None) + self.assertEqual(self.detailed_status_obj.args.return_code, [filter1]) + self.assertEqual(self.detailed_status_obj.args.workers, [filter2]) + self.assertEqual(self.detailed_status_obj.args.max_tasks, None) + + def test_get_user_filters_return_code_and_max_tasks(self): + """ + This will test a combination of the return code and max tasks filters as inputs + to the get_user_filters method. The only args that should be set here are the return_code + and max_tasks args. + """ + filter1 = "RETRY" + filter2 = "MAX_TASKS" + max_tasks = 4 + self.run_mock_input_with_filters(", ".join([filter1, filter2]), False, max_tasks=str(max_tasks)) + self.assertEqual(self.detailed_status_obj.args.task_status, None) + self.assertEqual(self.detailed_status_obj.args.return_code, [filter1]) + self.assertEqual(self.detailed_status_obj.args.workers, None) + self.assertEqual(self.detailed_status_obj.args.max_tasks, max_tasks) + + def test_get_user_filters_workers_and_max_tasks(self): + """ + This will test a combination of the workers and max tasks filters as inputs + to the get_user_filters method. The only args that should be set here are the workers + and max_tasks args. + """ + filter1 = "sample_worker" + filter2 = "MAX_TASKS" + max_tasks = 4 + self.run_mock_input_with_filters(", ".join([filter1, filter2]), False, max_tasks=str(max_tasks)) + self.assertEqual(self.detailed_status_obj.args.task_status, None) + self.assertEqual(self.detailed_status_obj.args.return_code, None) + self.assertEqual(self.detailed_status_obj.args.workers, [filter1]) + self.assertEqual(self.detailed_status_obj.args.max_tasks, max_tasks) def test_get_user_filters_only_invalid_inputs(self): """ @@ -457,12 +566,12 @@ def test_get_user_max_tasks_valid_inputs(self, mock_input: MagicMock): # We use patch here to keep stdout from get_user_tasks from being displayed with patch("sys.stdout"): # Run the method we're testing and save the result - result = self.detailed_status_obj.get_user_max_tasks() + self.detailed_status_obj.get_user_max_tasks() # Make sure the prompt is called with the correct prompt message mock_input.assert_called_with("What limit would you like to set? (must be an integer greater than 0) ") # Ensure we get correct output - self.assertEqual(result, expected_output) + self.assertEqual(self.detailed_status_obj.args.max_tasks, expected_output) # '1' is a valid input and we'll use that to exit safely from this test @patch("builtins.input", side_effect=["0", "-1", "1.5", "a", "1"]) @@ -706,9 +815,9 @@ class TestFilterApplication(TestBaseDetailedStatus): inputs (that's what the TestFilterVerification class is for). This class will test 3 methods: get_steps_to_display (this applies the - steps, task_queues, and workers filters), apply_filters (this applies the - return_code and task_status filters), and apply_max_tasks_limit (this - applies the max_tasks filter). + steps and task_queues filters), apply_filters (this applies the return_code, + task_status, workers, and max_tasks filters), and apply_max_tasks_limit (this + applies just the max_tasks filter). """ def test_apply_default_steps(self): @@ -790,33 +899,6 @@ def test_apply_multiple_task_queues(self): # Run the test self.run_get_steps_to_display_test(expected_step_tracker) - def test_apply_single_worker(self): - """ - This tests the application of the workers filter with only one worker. - """ - # Modify the workers argument and create the expected output - self.detailed_status_obj.args.workers = ["sample_worker"] - expected_step_tracker = {"started_steps": ["just_samples", "params_and_samples"], "unstarted_steps": []} - - # We need to reset steps to "all" otherwise this test won't work - self.detailed_status_obj.args.steps = ["all"] - - # Run the test - self.run_get_steps_to_display_test(expected_step_tracker) - - def test_apply_multiple_workers(self): - """ - This tests the application of the workers filter with multiple worker. - """ - # Modify the workers argument and create the expected output - self.detailed_status_obj.args.workers = ["sample_worker", "other_worker"] - - # We need to reset steps to "all" otherwise this test won't work - self.detailed_status_obj.args.steps = ["all"] - - # Run the test - self.run_get_steps_to_display_test(status_test_variables.FULL_STEP_TRACKER) - def test_apply_max_tasks(self): """ The max_tasks filter has no default to test against as the default value is None @@ -846,6 +928,22 @@ def run_apply_filters_test(self, expected_requested_statuses: Dict): ) self.assertEqual(requested_statuses_diff, {}) + def test_apply_single_worker(self): + """ + This tests the application of the workers filter with only one worker. + """ + # Set the workers filter and run the test + self.detailed_status_obj.args.workers = ["other_worker"] + self.run_apply_filters_test(status_test_variables.REQUESTED_STATUSES_JUST_OTHER_WORKER) + + def test_apply_multiple_workers(self): + """ + This tests the application of the workers filter with multiple worker. + """ + # Set the workers filter and run the test + self.detailed_status_obj.args.workers = ["other_worker", "sample_worker"] + self.run_apply_filters_test(status_test_variables.ALL_REQUESTED_STATUSES) + def test_apply_single_return_code(self): """ This tests the application of the return_code filter with only one return codes. diff --git a/tests/unit/study/test_status.py b/tests/unit/study/test_status.py index 331884ac5..695af17f3 100644 --- a/tests/unit/study/test_status.py +++ b/tests/unit/study/test_status.py @@ -6,7 +6,7 @@ # # LLNL-CODE-797170 # All rights reserved. -# This file is part of Merlin, Version: 1.12.0. +# This file is part of Merlin, Version: 1.12.1. # # For details, see https://github.com/LLNL/merlin. # @@ -277,7 +277,7 @@ def test_get_runtime_avg_std_dev(self): dummy_step_status = { "dummy_step_PARAM.1": { "task_queue": "dummy_queue", - "worker_name": "dummy_worker", + "workers": "dummy_worker", "dummy_step/PARAM.1/00": { "status": "FINISHED", "return_code": "MERLIN_SUCCESS", @@ -295,7 +295,7 @@ def test_get_runtime_avg_std_dev(self): }, "dummy_step_PARAM.2": { "task_queue": "dummy_queue", - "worker_name": "dummy_worker", + "workers": "dummy_worker", "dummy_step/PARAM.2/00": { "status": "FINISHED", "return_code": "MERLIN_SUCCESS",