From 7299fef9a0d5fc82ec8ef113fa0885c78b4f3e31 Mon Sep 17 00:00:00 2001 From: Michael Kavulich Date: Tue, 7 Feb 2023 09:33:08 -0700 Subject: [PATCH] [develop] First implementation of run_WE2E_tests.py (#558) This PR introduces two new scripts to the repository: run_WE2E_tests.py and monitor_jobs.py. The purpose of these scripts is to eventually provide a pythonic replacement for the current workflow end-to-end test submission script. Additionally, the monitor_jobs function gives the capability to monitor and submit jobs automatically via the command line or a batch job, rather than relying on crontab entries. --- tests/WE2E/monitor_jobs.py | 274 ++++++++++ tests/WE2E/monitor_jobs.yaml | 54 ++ tests/WE2E/run_WE2E_tests.py | 474 ++++++++++++++++++ .../config.deactivate_tasks.yaml | 7 - ush/generate_FV3LAM_wflow.py | 75 +-- ush/setup.py | 16 +- 6 files changed, 859 insertions(+), 41 deletions(-) create mode 100755 tests/WE2E/monitor_jobs.py create mode 100644 tests/WE2E/monitor_jobs.yaml create mode 100755 tests/WE2E/run_WE2E_tests.py diff --git a/tests/WE2E/monitor_jobs.py b/tests/WE2E/monitor_jobs.py new file mode 100755 index 0000000000..9e34a87264 --- /dev/null +++ b/tests/WE2E/monitor_jobs.py @@ -0,0 +1,274 @@ +#!/usr/bin/env python3 + +import sys +import argparse +import logging +import subprocess +import sqlite3 +import time +from textwrap import dedent +from datetime import datetime +from contextlib import closing + +sys.path.append("../../ush") + +from python_utils import ( + load_config_file, + cfg_to_yaml_str +) + +from check_python_version import check_python_version + + +def monitor_jobs(expt_dict: dict, monitor_file: str = '', debug: bool = False) -> str: + """Function to monitor and run jobs for the specified experiment using Rocoto + + Args: + expt_dict (dict): A dictionary containing the information needed to run + one or more experiments. See example file monitor_jobs.yaml + monitor_file (str): [optional] + debug (bool): [optional] Enable extra output for debugging + Returns: + str: The name of the file used for job monitoring (when script is finished, this + contains results/summary) + + """ + + starttime = datetime.now() + # Write monitor_file, which will contain information on each monitored experiment + if not monitor_file: + monitor_file = f'monitor_jobs_{starttime.strftime("%Y%m%d%H%M%S")}.yaml' + logging.info(f"Writing information for all experiments to {monitor_file}") + + write_monitor_file(monitor_file,expt_dict) + + # Perform initial setup for each experiment + logging.info("Checking tests available for monitoring...") + for expt in expt_dict: + logging.info(f"Starting experiment {expt} running") + expt_dict[expt] = update_expt_status(expt_dict[expt], expt) + + write_monitor_file(monitor_file,expt_dict) + + logging.info(f'Setup complete; monitoring {len(expt_dict)} experiments') + + #Make a copy of experiment dictionary; will use this copy to monitor active experiments + running_expts = expt_dict.copy() + + i = 0 + while running_expts: + i += 1 + for expt in running_expts.copy(): + expt_dict[expt] = update_expt_status(expt_dict[expt], expt) + running_expts[expt] = expt_dict[expt] + if running_expts[expt]["status"] in ['DEAD','ERROR','COMPLETE']: + logging.info(f'Experiment {expt} is {running_expts[expt]["status"]}; will no longer monitor.') + running_expts.pop(expt) + continue + logging.debug(f'Experiment {expt} status is {expt_dict[expt]["status"]}') + + + write_monitor_file(monitor_file,expt_dict) + endtime = datetime.now() + total_walltime = endtime - starttime + + logging.debug(f"Finished loop {i}\nWalltime so far is {str(total_walltime)}") + + #Slow things down just a tad between loops so experiments behave better + time.sleep(5) + + + endtime = datetime.now() + total_walltime = endtime - starttime + + logging.info(f'All {num_expts} experiments finished in {str(total_walltime)}') + + return monitor_file + +def update_expt_status(expt: dict, name: str) -> dict: + """ + This function reads the dictionary showing the location of a given experiment, runs a + `rocotorun` command to update the experiment (running new jobs and updating the status of + previously submitted ones), and reads the rocoto database file to update the status of + each job for that experiment in the experiment dictionary. + + The function then and uses a simple set of rules to combine the statuses of every task + into a useful "status" for the whole experiment, and returns the updated experiment dictionary. + + Experiment "status" levels explained: + CREATED: The experiments have been created, but the monitor script has not yet processed them. + This is immediately overwritten at the beginning of the "monitor_jobs" function, so we + should never see this status in this function. Including just for completeness sake. + SUBMITTING: All jobs are in status SUBMITTING or SUCCEEDED. This is a normal state; we will + continue to monitor this experiment. + DYING: One or more tasks have died (status "DEAD"), so this experiment has had an error. + We will continue to monitor this experiment until all tasks are either status DEAD or + status SUCCEEDED (see next entry). + DEAD: One or more tasks are at status DEAD, and the rest are either DEAD or SUCCEEDED. We + will no longer monitor this experiment. + ERROR: One or more tasks are at status UNKNOWN, meaning that rocoto has failed to track the + job associated with that task. This will require manual intervention to solve, so we + will no longer monitor this experiment. + This status may also appear if we fail to read the rocoto database file. + RUNNING: One or more jobs are at status RUNNING, and the rest are either status QUEUED, SUBMITTED, + or SUCCEEDED. This is a normal state; we will continue to monitor this experiment. + QUEUED: One or more jobs are at status QUEUED, and some others may be at status SUBMITTED or + SUCCEEDED. + This is a normal state; we will continue to monitor this experiment. + SUCCEEDED: All jobs are status SUCCEEDED; we will monitor for one more cycle in case there are + unsubmitted jobs remaining. + COMPLETE:All jobs are status SUCCEEDED, and we have monitored this job for an additional cycle + to ensure there are no un-submitted jobs. We will no longer monitor this experiment. + + Args: + expt (dict): A dictionary containing the information for an individual experiment, as + described in the main monitor_jobs() function. + name (str): [optional] + Returns: + dict: The updated experiment dictionary. + """ + + #If we are no longer tracking this experiment, return unchanged + if expt["status"] in ['DEAD','ERROR','COMPLETE']: + return expt + + # Update experiment, read rocoto database + rocoto_db = f"{expt['expt_dir']}/FV3LAM_wflow.db" + rocotorun_cmd = ["rocotorun", f"-w {expt['expt_dir']}/FV3LAM_wflow.xml", f"-d {rocoto_db}"] + subprocess.run(rocotorun_cmd) + + logging.debug(f"Reading database for experiment {name}, updating experiment dictionary") + try: + # This section of code queries the "job" table of the rocoto database, returning a list + # of tuples containing the taskname, cycle, and state of each job respectively + with closing(sqlite3.connect(rocoto_db)) as connection: + with closing(connection.cursor()) as cur: + db = cur.execute('SELECT taskname,cycle,state from jobs').fetchall() + except: + logging.warning(f"Unable to read database {rocoto_db}\nCan not track experiment {name}") + expt["status"] = "ERROR" + return expt + + for task in db: + # For each entry from rocoto database, store that under a dictionary key named TASKNAME_CYCLE + # Cycle comes from the database in Unix Time (seconds), so convert to human-readable + cycle = datetime.utcfromtimestamp(task[1]).strftime('%Y%m%d%H%M') + expt[f"{task[0]}_{cycle}"] = task[2] + + #Run rocotorun again to get around rocotobqserver proliferation issue + subprocess.run(rocotorun_cmd) + + statuses = list() + for task in expt: + # Skip non-task entries + if task in ["expt_dir","status"]: + continue + statuses.append(expt[task]) + + if "DEAD" in statuses: + still_live = ["RUNNING", "SUBMITTING", "QUEUED"] + if any(status in still_live for status in statuses): + logging.debug(f'DEAD job in experiment {name}; continuing to track until all jobs are complete') + expt["status"] = "DYING" + else: + expt["status"] = "DEAD" + return expt + + if "UNKNOWN" in statuses: + expt["status"] = "ERROR" + + if "RUNNING" in statuses: + expt["status"] = "RUNNING" + elif "QUEUED" in statuses: + expt["status"] = "QUEUED" + elif "SUBMITTING" in statuses: + expt["status"] = "SUBMITTING" + elif "SUCCEEDED" in statuses: + if expt["status"] == "SUCCEEDED": + expt["status"] = "COMPLETE" + else: + expt["status"] = "SUCCEEDED" + else: + logging.fatal("Some kind of horrible thing has happened") + raise ValueError(dedent(f"""Some kind of horrible thing has happened to the experiment status + for experiment {name} + status is {expt["status"]} + all task statuses are {statuses}""")) + + return expt + + +def write_monitor_file(monitor_file: str, expt_dict: dict): + try: + with open(monitor_file,"w") as f: + f.write("### WARNING ###\n") + f.write("### THIS FILE IS AUTO_GENERATED AND REGULARLY OVER-WRITTEN BY monitor_jobs.py\n") + f.write("### EDITS MAY RESULT IN MISBEHAVIOR OF EXPERIMENTS RUNNING\n") + f.writelines(cfg_to_yaml_str(expt_dict)) + except: + logging.fatal("\n********************************\n") + logging.fatal(f"WARNING WARNING WARNING\nFailure occurred while writing monitor file {monitor_file}") + logging.fatal("File may be corrupt or invalid for re-run!!") + logging.fatal("\n********************************\n") + raise + + +def setup_logging(logfile: str = "log.run_WE2E_tests", debug: bool = False) -> None: + """ + Sets up logging, printing high-priority (INFO and higher) messages to screen, and printing all + messages with detailed timing and routine info in the specified text file. + """ + logging.getLogger().setLevel(logging.DEBUG) + + formatter = logging.Formatter("%(name)-16s %(levelname)-8s %(message)s") + + fh = logging.FileHandler(logfile, mode='w') + fh.setLevel(logging.DEBUG) + fh.setFormatter(formatter) + logging.getLogger().addHandler(fh) + + logging.debug(f"Finished setting up debug file logging in {logfile}") + console = logging.StreamHandler() + if debug: + console.setLevel(logging.DEBUG) + else: + console.setLevel(logging.INFO) + logging.getLogger().addHandler(console) + logging.debug("Logging set up successfully") + + +if __name__ == "__main__": + + check_python_version() + + logfile='log.monitor_jobs' + + #Parse arguments + parser = argparse.ArgumentParser(description="Script for monitoring and running jobs in a specified experiment, as specified in a yaml configuration file\n") + + parser.add_argument('-y', '--yaml_file', type=str, help='YAML-format file specifying the information of jobs to be run; for an example file, see monitor_jobs.yaml', required=True) + parser.add_argument('-d', '--debug', action='store_true', help='Script will be run in debug mode with more verbose output') + + args = parser.parse_args() + + setup_logging(logfile,args.debug) + + expt_dict = load_config_file(args.yaml_file) + + #Call main function + + try: + monitor_jobs(expt_dict,args.yaml_file, args.debug) + except: + logging.exception( + dedent( + f""" + ********************************************************************* + FATAL ERROR: + An error occurred. See the error message(s) printed below. + For more detailed information, check the log file from the workflow + generation script: {logfile} + *********************************************************************\n + """ + ) + ) diff --git a/tests/WE2E/monitor_jobs.yaml b/tests/WE2E/monitor_jobs.yaml new file mode 100644 index 0000000000..03d15c5d45 --- /dev/null +++ b/tests/WE2E/monitor_jobs.yaml @@ -0,0 +1,54 @@ +# This is an example yaml file showing the various entries that can be created for tracking jobs by monitor_jobs.py +# Any valid file created by monitor_jobs.py (unless corrupted) can be re-submitted for continued tracking if any +# experiments are yet to be completed. +# If an experiment with status: COMPLETE, DEAD, or ERROR is read by monitor_jobs,py, it will be ignored. +#First example: an experiment that has been created by generate_FV3LAM_workflow.py but has not yet started running +custom_ESGgrid: + expt_dir: /some_directory/expt_dirs/custom_ESGgrid + status: CREATED +#Second example: an experiment that has just been submitted +custom_ESGgrid: + expt_dir: /some_directory/expt_dirs/custom_ESGgrid + status: SUBMITTING + make_grid_201907010000: SUBMITTING + get_extrn_ics_201907010000: SUBMITTING + get_extrn_lbcs_201907010000: SUBMITTING +#Third example: an experiment with a mix of successful and running tasks +custom_ESGgrid: + expt_dir: /some_directory/expt_dirs/custom_ESGgrid + status: RUNNING + make_grid_201907010000: SUCCEEDED + get_extrn_ics_201907010000: SUCCEEDED + get_extrn_lbcs_201907010000: SUCCEEDED + make_orog_201907010000: SUCCEEDED + make_sfc_climo_201907010000: SUCCEEDED + make_ics_201907010000: RUNNING + make_lbcs_201907010000: RUNNING +#Fourth example: an experiment that has completed successfully +custom_ESGgrid: + expt_dir: /some_directory/expt_dirs/custom_ESGgrid + status: COMPLETE + make_grid_201907010000: SUCCEEDED + get_extrn_ics_201907010000: SUCCEEDED + get_extrn_lbcs_201907010000: SUCCEEDED + make_orog_201907010000: SUCCEEDED + make_sfc_climo_201907010000: SUCCEEDED + make_ics_201907010000: SUCCEEDED + make_lbcs_201907010000: SUCCEEDED + run_fcst_201907010000: SUCCEEDED + run_post_f000_201907010000: SUCCEEDED + run_post_f001_201907010000: SUCCEEDED + run_post_f002_201907010000: SUCCEEDED + run_post_f003_201907010000: SUCCEEDED + run_post_f004_201907010000: SUCCEEDED + run_post_f005_201907010000: SUCCEEDED + run_post_f006_201907010000: SUCCEEDED +#Fifth example: an experiment that has died due to a failed task. +custom_ESGgrid: + expt_dir: /some_directory/expt_dirs/custom_ESGgrid + status: DEAD + make_grid_201907010000: SUCCEEDED + get_extrn_ics_201907010000: SUCCEEDED + get_extrn_lbcs_201907010000: SUCCEEDED + make_orog_201907010000: DEAD + diff --git a/tests/WE2E/run_WE2E_tests.py b/tests/WE2E/run_WE2E_tests.py new file mode 100755 index 0000000000..dc472f8333 --- /dev/null +++ b/tests/WE2E/run_WE2E_tests.py @@ -0,0 +1,474 @@ +#!/usr/bin/env python3 + +import os +import sys +import glob +import argparse +import logging +from textwrap import dedent + +sys.path.append("../../ush") + +from generate_FV3LAM_wflow import generate_FV3LAM_wflow +from python_utils import ( + cfg_to_yaml_str, + load_config_file, +) + +from check_python_version import check_python_version + +from monitor_jobs import monitor_jobs + + +def run_we2e_tests(homedir, args) -> None: + """Function to run the WE2E tests selected by the user + + Args: + homedir (str): The full path of the top-level app directory + args : The argparse.Namespace object containing command-line arguments + Returns: + None + """ + + # Set up logging to write to screen and logfile + setup_logging(debug=args.debug) + + # Set some important directories + ushdir=os.path.join(homedir,'ush') + + # Set some variables based on input arguments + run_envir = args.run_envir + machine = args.machine.lower() + + # If args.tests is a list of length more than one, we assume it is a list of test names + if len(args.tests) > 1: + tests_to_check=args.tests + logging.debug(f"User specified a list of tests:\n{tests_to_check}") + else: + #First see if args.tests is a valid test name + user_spec_tests = args.tests + logging.debug(f'Checking if {user_spec_tests} is a valid test name') + match = check_test(user_spec_tests[0]) + if match: + tests_to_check = user_spec_tests + else: + # If not a valid test name, check if it is a test suite + logging.debug(f'Checking if {user_spec_tests} is a valid test suite') + if user_spec_tests[0] == 'all': + alltests = glob.glob('test_configs/**/config*.yaml', recursive=True) + tests_to_check = [] + for f in alltests: + filename = os.path.basename(f) + # We just want the test namein this list, so cut out the "config." prefix and ".yaml" extension + tests_to_check.append(filename[7:-5]) + logging.debug(f"Will check all tests:\n{tests_to_check}") + elif user_spec_tests[0] in ['fundamental', 'comprehensive']: + # I am writing this section of code under protest; we should use args.run_envir to check for run_envir-specific files! + prefix = f"machine_suites/{user_spec_tests[0]}" + testfilename = f"{prefix}.{machine}.{args.compiler}.nco" + if not os.path.isfile(testfilename): + testfilename = f"{prefix}.{machine}.{args.compiler}.com" + if not os.path.isfile(testfilename): + testfilename = f"{prefix}.{machine}.{args.compiler}" + if not os.path.isfile(testfilename): + testfilename = f"{prefix}.{machine}" + if not os.path.isfile(testfilename): + testfilename = f"machine_suites/{user_spec_tests[0]}" + else: + if not run_envir: + run_envir = 'community' + logging.debug(f'{testfilename} exists for this platform and run_envir has not been specified'\ + 'Setting run_envir = {run_envir} for all tests') + else: + if not run_envir: + run_envir = 'nco' + logging.debug(f'{testfilename} exists for this platform and run_envir has not been specified'\ + 'Setting run_envir = {run_envir} for all tests') + logging.debug(f"Reading test file: {testfilename}") + with open(testfilename) as f: + tests_to_check = [x.rstrip() for x in f] + logging.debug(f"Will check {user_spec_tests[0]} tests:\n{tests_to_check}") + else: + # If we have gotten this far then the only option left for user_spec_tests is a file containing test names + logging.debug(f'Checking if {user_spec_tests} is a file containing test names') + if os.path.isfile(user_spec_tests[0]): + with open(user_spec_tests[0]) as f: + tests_to_check = [x.rstrip() for x in f] + else: + raise FileNotFoundError(dedent(f""" + The specified 'tests' argument '{user_spec_tests}' + does not appear to be a valid test name, a valid test suite, or a file containing valid test names. + + Check your inputs and try again. + """)) + + + logging.info("Checking that all tests are valid") + + tests_to_run=check_tests(tests_to_check) + + pretty_list = "\n".join(str(x) for x in tests_to_run) + logging.info(f'Will run {len(tests_to_run)} tests:\n{pretty_list}') + + + config_default_file = os.path.join(ushdir,'config_defaults.yaml') + logging.debug(f"Loading config defaults file {config_default_file}") + config_defaults = load_config_file(config_default_file) + + machine_file = os.path.join(ushdir, 'machine', f'{machine}.yaml') + logging.debug(f"Loading machine defaults file {machine_file}") + machine_defaults = load_config_file(machine_file) + + # Set up dictionary for job monitoring yaml + if not args.use_cron_to_relaunch: + monitor_yaml = dict() + + for test in tests_to_run: + #Starting with test yaml template, fill in user-specified and machine- and + # test-specific options, then write resulting complete config.yaml + test_name = os.path.basename(test).split('.')[1] + logging.debug(f"For test {test_name}, constructing config.yaml") + test_cfg = load_config_file(test) + + test_cfg['user'].update({"MACHINE": machine}) + test_cfg['user'].update({"ACCOUNT": args.account}) + if run_envir: + test_cfg['user'].update({"RUN_ENVIR": run_envir}) + # if platform section was not in input config, initialize as empty dict + if 'platform' not in test_cfg: + test_cfg['platform'] = dict() + test_cfg['platform'].update({"BUILD_MOD_FN": args.modulefile}) + test_cfg['workflow'].update({"COMPILER": args.compiler}) + if args.expt_basedir: + test_cfg['workflow'].update({"EXPT_BASEDIR": args.expt_basedir}) + test_cfg['workflow'].update({"EXPT_SUBDIR": test_name}) + if args.exec_subdir: + test_cfg['workflow'].update({"EXEC_SUBDIR": args.exec_subdir}) + if args.use_cron_to_relaunch: + test_cfg['workflow'].update({"USE_CRON_TO_RELAUNCH": args.use_cron_to_relaunch}) + if args.cron_relaunch_intvl_mnts: + test_cfg['workflow'].update({"CRON_RELAUNCH_INTVL_MNTS": args.cron_relaunch_intvl_mnts}) + if args.debug_tests: + test_cfg['workflow'].update({"DEBUG": args.debug_tests}) + if args.verbose_tests: + test_cfg['workflow'].update({"VERBOSE": args.verbose_tests}) + + logging.debug(f"Overwriting WE2E-test-specific settings for test \n{test_name}\n") + + if 'task_get_extrn_ics' in test_cfg: + logging.debug(test_cfg['task_get_extrn_ics']) + test_cfg['task_get_extrn_ics'] = check_task_get_extrn_ics(test_cfg,machine_defaults,config_defaults) + logging.debug(test_cfg['task_get_extrn_ics']) + if 'task_get_extrn_lbcs' in test_cfg: + logging.debug(test_cfg['task_get_extrn_lbcs']) + test_cfg['task_get_extrn_lbcs'] = check_task_get_extrn_lbcs(test_cfg,machine_defaults,config_defaults) + logging.debug(test_cfg['task_get_extrn_lbcs']) + + + logging.debug(f"Writing updated config.yaml for test {test_name}\nbased on specified command-line arguments:\n") + logging.debug(cfg_to_yaml_str(test_cfg)) + with open(ushdir + "/config.yaml","w") as f: + f.writelines(cfg_to_yaml_str(test_cfg)) + + logging.debug(f"Calling workflow generation function for test {test_name}\n") + if args.quiet: + console_handler = logging.getLogger().handlers[1] + console_handler.setLevel(logging.WARNING) + expt_dir = generate_FV3LAM_wflow(ushdir,logfile=f"{ushdir}/log.generate_FV3LAM_wflow",debug=args.debug) + if args.quiet: + if args.debug: + console_handler.setLevel(logging.DEBUG) + else: + console_handler.setLevel(logging.INFO) + logging.info(f"Workflow for test {test_name} successfully generated in\n{expt_dir}\n") + # If this job is not using crontab, we need to add an entry to monitor.yaml + if 'USE_CRON_TO_RELAUNCH' not in test_cfg['workflow']: + test_cfg['workflow'].update({"USE_CRON_TO_RELAUNCH": False}) + if not test_cfg['workflow']['USE_CRON_TO_RELAUNCH']: + logging.debug(f'Creating entry for job {test_name} in job monitoring dict') + monitor_yaml[test_name] = dict() + monitor_yaml[test_name].update({"expt_dir": expt_dir}) + monitor_yaml[test_name].update({"status": "CREATED"}) + + if not args.use_cron_to_relaunch: + logging.info("calling function that monitors jobs, prints summary") + monitor_file = monitor_jobs(monitor_yaml, debug=args.debug) + + logging.info("All experiments are complete") + logging.info(f"Summary of results available in {monitor_file}") + + + + + +def check_tests(tests: list) -> list: + """ + Function for checking that all tests in a provided list of tests are valid + + Args: + tests : List of potentially valid test names + Returns: + tests_to_run : List of config files corresponding to test names + """ + + testfiles = glob.glob('test_configs/**/config*.yaml', recursive=True) + # Check that there are no duplicate test filenames + testfilenames=[] + for testfile in testfiles: + if os.path.basename(testfile) in testfilenames: + duplicates = glob.glob('test_configs/**/' + os.path.basename(testfile), recursive=True) + raise Exception(dedent(f""" + Found duplicate test file names: + {duplicates} + Ensure that each test file name under the test_configs/ directory + is unique. + """)) + testfilenames.append(os.path.basename(testfile)) + tests_to_run=[] + for test in tests: + # Skip blank/empty testnames; this avoids failure if newlines or spaces are included + if not test or test.isspace(): + continue + match = check_test(test) + if not match: + raise Exception(f"Could not find test {test}") + tests_to_run.append(match) + # Because some test files are symlinks to other tests, check that we don't + # include the same test twice + for testfile in tests_to_run.copy(): + if os.path.islink(testfile): + if os.path.realpath(testfile) in tests_to_run: + logging.warning(dedent(f"""WARNING: test file {testfile} is a symbolic link to a + test file ({os.path.realpath(testfile)}) that is also included in the + test list. Only the latter test will be run.""")) + tests_to_run.remove(testfile) + if len(tests_to_run) != len(set(tests_to_run)): + logging.warning("\nWARNING: Duplicate test names were found in list. Removing duplicates and continuing.\n") + tests_to_run = list(set(tests_to_run)) + return tests_to_run + + + +def check_test(test: str) -> str: + """ + Function for checking that a string corresponds to a valid test name + + Args: + test (str) : String of potential test name + Returns: + str : File name of test config file (empty string if no test file found) + """ + # potential test files + testfiles = glob.glob('test_configs/**/config*.yaml', recursive=True) + # potential test file for input test name + test_config=f'config.{test.strip()}.yaml' + config = '' + for testfile in testfiles: + if test_config in testfile: + logging.debug(f"found test {test}, testfile {testfile}") + config = os.path.abspath(testfile) + return config + + +def check_task_get_extrn_ics(cfg: dict, mach: dict, dflt: dict) -> dict: + """ + Function for checking and updating various settings in task_get_extrn_ics section of test config yaml + + Args: + cfg : Dictionary loaded from test config file + mach : Dictionary loaded from machine settings file + dflt : Dictionary loaded from default config file + Returns: + cfg_ics : Updated dictionary for task_get_extrn_ics section of test config + """ + + #Make our lives easier by shortening some dictionary calls + cfg_ics = cfg['task_get_extrn_ics'] + + # If RUN_TASK_GET_EXTRN_ICS is explicitly set to false, do nothing and return + if 'workflow_switches' in cfg: + if 'RUN_TASK_GET_EXTRN_ICS' in cfg['workflow_switches']: + if cfg['workflow_switches']['RUN_TASK_GET_EXTRN_ICS'] is False: + return cfg_ics + + # If USE_USER_STAGED_EXTRN_FILES not specified or false, do nothing and return + if not cfg_ics.get('USE_USER_STAGED_EXTRN_FILES'): + logging.debug(f'USE_USER_STAGED_EXTRN_FILES not specified or False in task_get_extrn_ics section of config') + return cfg_ics + + # If EXTRN_MDL_SYSBASEDIR_ICS is "set_to_non_default_location_in_testing_script", replace with test value from machine file + if cfg_ics.get('EXTRN_MDL_SYSBASEDIR_ICS') == "set_to_non_default_location_in_testing_script": + if 'TEST_ALT_EXTRN_MDL_SYSBASEDIR_ICS' in mach['platform']: + if os.path.isdir(mach['platform']['TEST_ALT_EXTRN_MDL_SYSBASEDIR_ICS']): + raise FileNotFoundError(f"Non-default input file location TEST_ALT_EXTRN_MDL_SYSBASEDIR_ICS from machine file does not exist or is not a directory") + cfg_ics['EXTRN_MDL_SYSBASEDIR_ICS'] = mach['platform']['TEST_ALT_EXTRN_MDL_SYSBASEDIR_ICS'] + else: + raise KeyError(f"Non-default input file location TEST_ALT_EXTRN_MDL_SYSBASEDIR_ICS not set in machine file") + return cfg_ics + + # Because USE_USER_STAGED_EXTRN_FILES is true, only look on disk, and ensure the staged data directory exists + cfg['platform']['EXTRN_MDL_DATA_STORES'] = "disk" + if 'TEST_EXTRN_MDL_SOURCE_BASEDIR' not in mach['platform']: + raise KeyError("TEST_EXTRN_MDL_SOURCE_BASEDIR, the directory for staged test data,"\ + "has not been specified in the machine file for this platform") + if not os.path.isdir(mach['platform']['TEST_EXTRN_MDL_SOURCE_BASEDIR']): + raise FileNotFoundError(dedent(f"""The directory for staged test data specified in this platform's machine file + TEST_EXTRN_MDL_SOURCE_BASEDIR = {mach['platform']['TEST_EXTRN_MDL_SOURCE_BASEDIR']} + does not exist.""")) + + # Different input data types have different directory structures, so set the data directory accordingly + if cfg_ics['EXTRN_MDL_NAME_ICS'] == 'FV3GFS': + if 'FV3GFS_FILE_FMT_ICS' not in cfg_ics: + cfg_ics['FV3GFS_FILE_FMT_ICS'] = dflt['task_get_extrn_ics']['FV3GFS_FILE_FMT_ICS'] + cfg_ics['EXTRN_MDL_SOURCE_BASEDIR_ICS'] = f"{mach['platform']['TEST_EXTRN_MDL_SOURCE_BASEDIR']}/"\ + f"{cfg_ics['EXTRN_MDL_NAME_ICS']}/{cfg_ics['FV3GFS_FILE_FMT_ICS']}/${{yyyymmddhh}}" + else: + cfg_ics['EXTRN_MDL_SOURCE_BASEDIR_ICS'] = f"{mach['platform']['TEST_EXTRN_MDL_SOURCE_BASEDIR']}/"\ + f"{cfg_ics['EXTRN_MDL_NAME_ICS']}/${{yyyymmddhh}}" + + return cfg_ics + +def check_task_get_extrn_lbcs(cfg: dict, mach: dict, dflt: dict) -> dict: + """ + Function for checking and updating various settings in task_get_extrn_lbcs section of test config yaml + + Args: + cfg : Dictionary loaded from test config file + mach : Dictionary loaded from machine settings file + dflt : Dictionary loaded from default config file + Returns: + cfg_lbcs : Updated dictionary for task_get_extrn_lbcs section of test config + """ + + #Make our lives easier by shortening some dictionary calls + cfg_lbcs = cfg['task_get_extrn_lbcs'] + + # If RUN_TASK_GET_EXTRN_LBCS is explicitly set to false, do nothing and return + if 'workflow_switches' in cfg: + if 'RUN_TASK_GET_EXTRN_LBCS' in cfg['workflow_switches']: + if cfg['workflow_switches']['RUN_TASK_GET_EXTRN_LBCS'] is False: + return cfg_lbcs + + # If USE_USER_STAGED_EXTRN_FILES not specified or false, do nothing and return + if not cfg_lbcs.get('USE_USER_STAGED_EXTRN_FILES'): + logging.debug(f'USE_USER_STAGED_EXTRN_FILES not specified or False in task_get_extrn_lbcs section of config') + return cfg_lbcs + + # If EXTRN_MDL_SYSBASEDIR_LBCS is "set_to_non_default_location_in_testing_script", replace with test value from machine file + if cfg_lbcs.get('EXTRN_MDL_SYSBASEDIR_LBCS') == "set_to_non_default_location_in_testing_script": + if 'TEST_ALT_EXTRN_MDL_SYSBASEDIR_LBCS' in mach['platform']: + if os.path.isdir(mach['platform']['TEST_ALT_EXTRN_MDL_SYSBASEDIR_LBCS']): + raise FileNotFoundError(f"Non-default input file location TEST_ALT_EXTRN_MDL_SYSBASEDIR_LBCS from machine file does not exist or is not a directory") + cfg_lbcs['EXTRN_MDL_SYSBASEDIR_LBCS'] = mach['platform']['TEST_ALT_EXTRN_MDL_SYSBASEDIR_LBCS'] + else: + raise KeyError(f"Non-default input file location TEST_ALT_EXTRN_MDL_SYSBASEDIR_LBCS not set in machine file") + return cfg_lbcs + + # Because USE_USER_STAGED_EXTRN_FILES is true, only look on disk, and ensure the staged data directory exists + cfg['platform']['EXTRN_MDL_DATA_STORES'] = "disk" + if 'TEST_EXTRN_MDL_SOURCE_BASEDIR' not in mach['platform']: + raise KeyError("TEST_EXTRN_MDL_SOURCE_BASEDIR, the directory for staged test data,"\ + "has not been specified in the machine file for this platform") + if not os.path.isdir(mach['platform']['TEST_EXTRN_MDL_SOURCE_BASEDIR']): + raise FileNotFoundError(dedent(f"""The directory for staged test data specified in this platform's machine file + TEST_EXTRN_MDL_SOURCE_BASEDIR = {mach['platform']['TEST_EXTRN_MDL_SOURCE_BASEDIR']} + does not exist.""")) + + # Different input data types have different directory structures, so set the data directory accordingly + if cfg_lbcs['EXTRN_MDL_NAME_LBCS'] == 'FV3GFS': + if 'FV3GFS_FILE_FMT_LBCS' not in cfg_lbcs: + cfg_lbcs['FV3GFS_FILE_FMT_LBCS'] = dflt['task_get_extrn_lbcs']['FV3GFS_FILE_FMT_LBCS'] + cfg_lbcs['EXTRN_MDL_SOURCE_BASEDIR_LBCS'] = f"{mach['platform']['TEST_EXTRN_MDL_SOURCE_BASEDIR']}/"\ + f"{cfg_lbcs['EXTRN_MDL_NAME_LBCS']}/{cfg_lbcs['FV3GFS_FILE_FMT_LBCS']}/${{yyyymmddhh}}" + else: + cfg_lbcs['EXTRN_MDL_SOURCE_BASEDIR_LBCS'] = f"{mach['platform']['TEST_EXTRN_MDL_SOURCE_BASEDIR']}/"\ + f"{cfg_lbcs['EXTRN_MDL_NAME_LBCS']}/${{yyyymmddhh}}" + + return cfg_lbcs + +def setup_logging(logfile: str = "log.run_WE2E_tests", debug: bool = False) -> None: + """ + Sets up logging, printing high-priority (INFO and higher) messages to screen, and printing all + messages with detailed timing and routine info in the specified text file. + """ + logging.getLogger().setLevel(logging.DEBUG) + + formatter = logging.Formatter("%(name)-16s %(levelname)-8s %(message)s") + + fh = logging.FileHandler(logfile, mode='w') + fh.setLevel(logging.DEBUG) + fh.setFormatter(formatter) + logging.getLogger().addHandler(fh) + + logging.debug(f"Finished setting up debug file logging in {logfile}") + console = logging.StreamHandler() + if debug: + console.setLevel(logging.DEBUG) + else: + console.setLevel(logging.INFO) + logging.getLogger().addHandler(console) + logging.debug("Logging set up successfully") + + + +if __name__ == "__main__": + + # Check python version and presence of some non-standard packages + check_python_version() + + #Get the "Home" directory, two levels above this one + homedir=os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + logfile='log.run_WE2E_tests' + + #Parse arguments + parser = argparse.ArgumentParser(epilog="For more information about config arguments (denoted in CAPS), see ush/config_defaults.yaml\n") + optional = parser._action_groups.pop() # Create a group for optional arguments so they can be listed after required args + required = parser.add_argument_group('required arguments') + + required.add_argument('-m', '--machine', type=str, help='Machine name; see ush/machine/ for valid values', required=True) + required.add_argument('-a', '--account', type=str, help='Account name for running submitted jobs', required=True) + required.add_argument('-t', '--tests', type=str, nargs="*", help="""Can be one of three options (in order of priority): + 1. A test name or list of test names. + 2. A test suite name ("fundamental", "comprehensive", or "all") + 3. The name of a file (full or relative path) containing a list of test names. + """, required=True) + + parser.add_argument('-c', '--compiler', type=str, help='Compiler used for building the app', default='intel') + parser.add_argument('-d', '--debug', action='store_true', help='Script will be run in debug mode with more verbose output') + parser.add_argument('-q', '--quiet', action='store_true', help='Suppress console output from workflow generation; this will help keep the screen uncluttered') + + + parser.add_argument('--modulefile', type=str, help='Modulefile used for building the app') + parser.add_argument('--run_envir', type=str, help='Overrides RUN_ENVIR variable to a new value ( "nco" or "community" ) for all experiments', default='') + parser.add_argument('--expt_basedir', type=str, help='Explicitly set EXPT_BASEDIR for all experiments') + parser.add_argument('--exec_subdir', type=str, help='Explicitly set EXEC_SUBDIR for all experiments') + parser.add_argument('--use_cron_to_relaunch', action='store_true', help='Explicitly set USE_CRON_TO_RELAUNCH for all experiments; this option disables the "monitor" script functionality') + parser.add_argument('--cron_relaunch_intvl_mnts', type=str, help='Overrides CRON_RELAUNCH_INTVL_MNTS for all experiments') + parser.add_argument('--debug_tests', action='store_true', help='Explicitly set DEBUG=TRUE for all experiments') + parser.add_argument('--verbose_tests', action='store_true', help='Explicitly set VERBOSE=TRUE for all experiments') + + parser._action_groups.append(optional) + + args = parser.parse_args() + + #Set defaults that need other argument values + if args.modulefile is None: + args.modulefile = f'build_{args.machine.lower()}_{args.compiler}' + + #Call main function + + try: + run_we2e_tests(homedir,args) + except: + logging.exception( + dedent( + f""" + ********************************************************************* + FATAL ERROR: + Experiment generation failed. See the error message(s) printed below. + For more detailed information, check the log file from the workflow + generation script: {logfile} + *********************************************************************\n + """ + ) + ) diff --git a/tests/WE2E/test_configs/wflow_features/config.deactivate_tasks.yaml b/tests/WE2E/test_configs/wflow_features/config.deactivate_tasks.yaml index 5defa0bf98..f17039df85 100644 --- a/tests/WE2E/test_configs/wflow_features/config.deactivate_tasks.yaml +++ b/tests/WE2E/test_configs/wflow_features/config.deactivate_tasks.yaml @@ -32,10 +32,3 @@ workflow_switches: RUN_TASK_MAKE_LBCS: false RUN_TASK_RUN_FCST: false RUN_TASK_RUN_POST: false -task_get_extrn_ics: - EXTRN_MDL_NAME_ICS: FV3GFS - USE_USER_STAGED_EXTRN_FILES: true -task_get_extrn_lbcs: - EXTRN_MDL_NAME_LBCS: FV3GFS - LBC_SPEC_INTVL_HRS: 3 - USE_USER_STAGED_EXTRN_FILES: true diff --git a/ush/generate_FV3LAM_wflow.py b/ush/generate_FV3LAM_wflow.py index f8637e71e5..449c70dede 100755 --- a/ush/generate_FV3LAM_wflow.py +++ b/ush/generate_FV3LAM_wflow.py @@ -40,19 +40,20 @@ from check_python_version import check_python_version -def generate_FV3LAM_wflow(ushdir, logfile: str = "log.generate_FV3LAM_wflow") -> None: +def generate_FV3LAM_wflow(ushdir, logfile: str = "log.generate_FV3LAM_wflow", debug: bool = False) -> str: """Function to setup a forecast experiment and create a workflow (according to the parameters specified in the config file) Args: - ushdir (str): The full path of the ush/ directory where this script is located - logfile (str): The name of the file where logging is written + ushdir (str) : The full path of the ush/ directory where this script is located + logfile (str) : The name of the file where logging is written + debug (bool): Enable extra output for debugging Returns: - None + EXPTDIR (str) : The full path of the directory where this experiment has been generated """ # Set up logging to write to screen and logfile - setup_logging(logfile) + setup_logging(logfile, debug) # Check python version and presence of some non-standard packages check_python_version() @@ -67,7 +68,7 @@ def generate_FV3LAM_wflow(ushdir, logfile: str = "log.generate_FV3LAM_wflow") -> # The setup function reads the user configuration file and fills in # non-user-specified values from config_defaults.yaml - expt_config = setup(ushdir) + expt_config = setup(ushdir,debug=debug) verbose = expt_config["workflow"]["VERBOSE"] # @@ -681,20 +682,6 @@ def generate_FV3LAM_wflow(ushdir, logfile: str = "log.generate_FV3LAM_wflow") -> # cp_vrfy(os.path.join(ushdir, EXPT_CONFIG_FN), EXPTDIR) - # Note workflow generation completion - log_info( - f""" - ======================================================================== - ======================================================================== - - Experiment generation completed. The experiment directory is: - - EXPTDIR='{EXPTDIR}' - - ======================================================================== - ======================================================================== - """ - ) # # ----------------------------------------------------------------------- # @@ -744,21 +731,36 @@ def generate_FV3LAM_wflow(ushdir, logfile: str = "log.generate_FV3LAM_wflow") -> # If we got to this point everything was successful: move the log file to the experiment directory. mv_vrfy(logfile, EXPTDIR) + return EXPTDIR -def setup_logging(logfile: str = "log.generate_FV3LAM_wflow") -> None: + +def setup_logging(logfile: str = "log.generate_FV3LAM_wflow", debug: bool = False) -> None: """ Sets up logging, printing high-priority (INFO and higher) messages to screen, and printing all messages with detailed timing and routine info in the specified text file. + + If debug = True, print all messages to both screen and log file. """ - logging.basicConfig( - level=logging.DEBUG, - format="%(name)-22s %(levelname)-8s %(message)s", - filename=logfile, - filemode="w", - ) + logging.getLogger().setLevel(logging.DEBUG) + + formatter = logging.Formatter("%(name)-22s %(levelname)-8s %(message)s") + + fh = logging.FileHandler(logfile, mode='w') + fh.setLevel(logging.DEBUG) + fh.setFormatter(formatter) + logging.getLogger().addHandler(fh) logging.debug(f"Finished setting up debug file logging in {logfile}") + + # If there are already multiple handlers, that means generate_FV3LAM_workflow was called from another function. + # In that case, do not change the console (print-to-screen) logging. + if len(logging.getLogger().handlers) > 1: + return + console = logging.StreamHandler() - console.setLevel(logging.INFO) + if debug: + console.setLevel(logging.DEBUG) + else: + console.setLevel(logging.INFO) logging.getLogger().addHandler(console) logging.debug("Logging set up successfully") @@ -771,7 +773,7 @@ def setup_logging(logfile: str = "log.generate_FV3LAM_wflow") -> None: # Call the generate_FV3LAM_wflow function defined above to generate the # experiment/workflow. try: - generate_FV3LAM_wflow(USHdir, wflow_logfile) + expt_dir = generate_FV3LAM_wflow(USHdir, wflow_logfile) except: logging.exception( dedent( @@ -785,6 +787,21 @@ def setup_logging(logfile: str = "log.generate_FV3LAM_wflow") -> None: """ ) ) + + # Note workflow generation completion + log_info( + f""" + ======================================================================== + ======================================================================== + + Experiment generation completed. The experiment directory is: + + EXPTDIR='{EXPTDIR}' + + ======================================================================== + ======================================================================== + """ + ) class Testing(unittest.TestCase): diff --git a/ush/setup.py b/ush/setup.py index 2ccfdd25e0..d41f6e2d1e 100644 --- a/ush/setup.py +++ b/ush/setup.py @@ -4,8 +4,8 @@ import sys import datetime import traceback +import logging from textwrap import dedent -from logging import getLogger from python_utils import ( log_info, @@ -55,7 +55,10 @@ def load_config_for_setup(ushdir, default_config, user_config): """ # Load the default config. + logging.debug(f"Loading config defaults file {default_config}") cfg_d = load_config_file(default_config) + logging.debug(f"Read in the following values from config defaults file:\n") + logging.debug(cfg_d) # Load the user config file, then ensure all user-specified # variables correspond to a default value. @@ -69,6 +72,8 @@ def load_config_for_setup(ushdir, default_config, user_config): try: cfg_u = load_config_file(user_config) + logging.debug(f"Read in the following values from YAML config file {user_config}:\n") + logging.debug(cfg_u) except: errmsg = dedent( f"""\n @@ -114,6 +119,7 @@ def load_config_for_setup(ushdir, default_config, user_config): ({machine}) in your config file {user_config}""" ) ) + logging.debug(f"Loading machine defaults file {machine_file}") machine_cfg = load_config_file(machine_file) # Load the fixed files configuration @@ -269,7 +275,7 @@ def set_srw_paths(ushdir, expt_config): ) -def setup(USHdir, user_config_fn="config.yaml"): +def setup(USHdir, user_config_fn="config.yaml", debug: bool = False): """Function that validates user-provided configuration, and derives a secondary set of parameters needed to configure a Rocoto-based SRW workflow. The derived parameters use a set of required user-defined @@ -284,13 +290,13 @@ def setup(USHdir, user_config_fn="config.yaml"): USHdir (str): The full path of the ush/ directory where this script is located user_config_fn (str): The name of a user-provided config YAML + debug (bool): Enable extra output for debugging Returns: None """ - logger = getLogger(__name__) - cd_vrfy(USHdir) + logger = logging.getLogger(__name__) # print message log_info( @@ -1265,7 +1271,7 @@ def get_location(xcs, fmt, expt_cfg): # # loop through the flattened expt_config and check validity of params - cfg_v = load_config_file("valid_param_vals.yaml") + cfg_v = load_config_file(os.path.join(USHdir, "valid_param_vals.yaml")) for k, v in flatten_dict(expt_config).items(): if v is None or v == "": continue