diff --git a/merlin/study/__init__.py b/merlin/study/__init__.py index 37cabcad..5e2edb8a 100644 --- a/merlin/study/__init__.py +++ b/merlin/study/__init__.py @@ -27,3 +27,14 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. ############################################################################### + +""" +Files in the `study/` directory handle the logic of constructing +and executing full Merlin studies. This logic includes:\n +- Putting together batch worker launch commands +- Hosting an interface for Celery commands +- Obtaining the DAG from Maestro and modifying it for use with Celery +- Creating an interface for different schedulers +- Displaying the status of studies +- Handling the logic for entire studies _and_ single steps in studies +""" \ No newline at end of file diff --git a/merlin/study/batch.py b/merlin/study/batch.py index cf2c5311..d6ffde5e 100644 --- a/merlin/study/batch.py +++ b/merlin/study/batch.py @@ -33,30 +33,49 @@ Currently only the batch worker launch for slurm, lsf or flux are implemented. - """ import logging import os import subprocess from typing import Dict, Optional, Union +from merlin.spec.specification import MerlinSpec from merlin.utils import convert_timestring, get_flux_alloc, get_flux_version, get_yaml_var LOG = logging.getLogger(__name__) -def batch_check_parallel(spec): +def batch_check_parallel(spec: MerlinSpec) -> bool: """ - Check for a parallel batch section in the yaml file. + Check for a parallel batch section in the provided MerlinSpec object. + + This function examines the 'batch' section of the given specification to determine + whether it is configured for parallel execution. It checks the 'type' attribute + within the batch section, defaulting to 'local' if not specified. If the type + is anything other than 'local', the function will return True, indicating that + parallel processing is enabled. + + Args: + spec (spec.specification.MerlinSpec): An instance of the + [`MerlinSpec`][spec.specification.MerlinSpec] class that contains the + configuration details, including the batch section. + + Returns: + Returns True if the batch type is set to a value other than 'local', + indicating that parallel processing is enabled; otherwise, returns False. + + Raises: + AttributeError: If the 'batch' section is not present in the specification, + an error is logged and an AttributeError is raised. """ parallel = False try: batch = spec.batch - except AttributeError: + except AttributeError as exc: LOG.error("The batch section is required in the specification file.") - raise + raise exc btype = get_yaml_var(batch, "type", "local") if btype != "local": @@ -65,14 +84,29 @@ def batch_check_parallel(spec): return parallel -def check_for_scheduler(scheduler, scheduler_legend): +def check_for_scheduler(scheduler: str, scheduler_legend: Dict[str, str]) -> bool: """ - Check which scheduler (Flux, Slurm, LSF, or PBS) is the main - scheduler for the cluster. - :param `scheduler`: A string representing the scheduler to check for - Options: flux, slurm, lsf, or pbs - :param `scheduler_legend`: A dict of information related to each scheduler - :returns: A bool representing whether `scheduler` is the main scheduler for the cluster + Check which scheduler (Flux, Slurm, LSF, or PBS) is the main scheduler for the cluster. + + This function verifies if the specified scheduler is the main scheduler by executing + a command associated with it from the provided scheduler legend. It returns a boolean + indicating whether the specified scheduler is active. + + Args: + scheduler: A string representing the scheduler to check for. Options include 'flux', + 'slurm', 'lsf', or 'pbs'. + scheduler_legend: A dictionary containing information related to each scheduler, + including the command to check its status and the expected output. See + [`construct_scheduler_legend`][study.batch.construct_scheduler_legend] + for more information on all the settings this dict contains. + + Returns: + Returns True if the specified scheduler is the main scheduler for the + cluster, otherwise returns False. + + Raises: + FileNotFoundError: If the command associated with the scheduler cannot be found. + PermissionError: If there are insufficient permissions to execute the command. """ # Check for invalid scheduler if scheduler not in ("flux", "slurm", "lsf", "pbs"): @@ -96,14 +130,26 @@ def check_for_scheduler(scheduler, scheduler_legend): return False -def get_batch_type(scheduler_legend, default=None): +def get_batch_type(scheduler_legend: Dict[str, str], default: str = None) -> str: """ Determine which batch scheduler to use. - :param scheduler_legend: A dict storing info related to each scheduler - :param default: (str) The default batch scheduler to use if a scheduler - can't be determined. The default is None. - :returns: (str) The batch name (available options: slurm, flux, lsf, pbs). + This function checks a predefined list of batch schedulers in a specific order + to determine which one is available for use. If none of the schedulers are found, + it checks the system type environment variable to suggest a default scheduler. + If no suitable scheduler is determined, it returns the specified default value. + + Args: + scheduler_legend: A dictionary storing information related to each + scheduler, including commands and expected outputs for checking their + availability. See [`construct_scheduler_legend`][study.batch.construct_scheduler_legend] + for more information on all the settings this dict contains. + default: The default batch scheduler to use if a scheduler cannot be determined. + + Returns: + The name of the available batch scheduler. Possible options include + 'slurm', 'flux', 'lsf', or 'pbs'. If no scheduler is found, returns + the specified default value. """ # These schedulers are listed in order of which should be checked for first # 1. Flux should be checked first due to slurm emulation scripts @@ -126,13 +172,29 @@ def get_batch_type(scheduler_legend, default=None): return default -def get_node_count(parsed_batch: Dict, default=1): +def get_node_count(parsed_batch: Dict, default: int = 1) -> int: """ Determine a default node count based on the environment. - :param default: (int) The number of nodes to return if a node count from - the environment cannot be determined. - :param returns: (int) The number of nodes to use. + This function checks the environment and the Flux version to determine the + appropriate number of nodes to use for batch processing. It first verifies + the Flux version, then attempts to retrieve the node count from the Flux + allocation or environment variables specific to Slurm or LSF. If no valid + node count can be determined, it returns a specified default value. + + Args: + parsed_batch: A dictionary containing parsed batch configurations. + See [`parse_batch_block`][study.batch.parse_batch_block] for more + information on all the settings in this dictionary. + default: The number of nodes to return if a node count from the + environment cannot be determined. + + Returns: + The number of nodes to use for the batch job. This value is determined + based on the environment and scheduler specifics. + + Raises: + ValueError: If the Flux version is too old (below 0.17.0). """ # Flux version check @@ -166,9 +228,38 @@ def get_node_count(parsed_batch: Dict, default=1): def parse_batch_block(batch: Dict) -> Dict: """ - A function to parse the batch block of the yaml file. - :param `batch`: The batch block to read in - :returns: A dict with all the info (or defaults) from the batch block + Parse the batch block of a YAML configuration file. + + This function extracts relevant information from the provided batch block + dictionary, including paths, execution options, and defaults. It retrieves + the Flux executable path and allocation details, and populates a dictionary + with the parsed values. + + Args: + batch: A dictionary representing the batch block from the YAML + configuration file. + + Returns: + Dict: A dictionary containing parsed information from the batch block, + including:\n + - `btype`: The type of batch job (default is 'local'). + - `nodes`: The number of nodes to use (default is None). + - `shell`: The shell to use (default is 'bash'). + - `bank`: The bank to charge for the job (default is an empty string). + - `queue`: The queue to submit the job to (default is an empty string). + - `walltime`: The maximum wall time for the job (default is an empty string). + - `launch pre`: Any commands to run before launching (default is an empty string). + - `launch args`: Arguments for the launch command (default is an empty string). + - `launch command`: Custom command to launch workers. This will override the + default launch command (default is an empty string). + - `flux path`: Optional path to flux bin. + - `flux exe`: The full path to the Flux executable. + - `flux exec`: Optional flux exec command to launch workers on all nodes if + `flux_exec_workers` is True (default is None). + - `flux alloc`: The Flux allocation retrieved from the executable. + - `flux opts`: Optional flux start options (default is an empty string). + - `flux exec workers`: Optional flux argument to launch workers + on all nodes (default is True). """ flux_path: str = get_yaml_var(batch, "flux_path", "") if "/" in flux_path: @@ -204,9 +295,20 @@ def parse_batch_block(batch: Dict) -> Dict: def get_flux_launch(parsed_batch: Dict) -> str: """ - Build the flux launch command based on the batch section of the yaml. - :param `parsed_batch`: A dict of batch configurations - :returns: The flux launch command + Build the Flux launch command based on the batch section of the YAML configuration. + + This function constructs the command to launch a Flux job using the parameters + specified in the parsed batch configuration. It determines the appropriate + execution command for Flux workers and integrates it with the launch command + provided in the batch configuration. + + Args: + parsed_batch: A dictionary containing batch configuration parameters. + See [`parse_batch_block`][study.batch.parse_batch_block] for more information + on all the settings in this dictionary. + + Returns: + The constructed Flux launch command, ready to be executed. """ default_flux_exec = "flux exec" if parsed_batch["launch command"] else f"{parsed_batch['flux exe']} exec" flux_exec: str = "" @@ -225,20 +327,37 @@ def get_flux_launch(parsed_batch: Dict) -> str: def batch_worker_launch( - spec: Dict, + spec: MerlinSpec, com: str, - nodes: Optional[Union[str, int]] = None, - batch: Optional[Dict] = None, + nodes: Union[str, int] = None, + batch: Dict = None, ) -> str: """ - The configuration in the batch section of the merlin spec - is used to create the worker launch line, which may be - different from a simulation launch. - - : param spec : (Dict) workflow specification - : param com : (str): The command to launch with batch configuration - : param nodes : (Optional[Union[str, int]]): The number of nodes to use in the batch launch - : param batch : (Optional[Dict]): An optional batch override from the worker config + Create the worker launch command based on the batch configuration in the + workflow specification. + + This function constructs a command to launch a worker process using the + specified batch configuration. It handles different batch types and + integrates any necessary pre-launch commands, launch arguments, and + node specifications. + + Args: + spec (spec.specification.MerlinSpec): An instance of the + [`MerlinSpec`][spec.specification.MerlinSpec] class that contains the + configuration details, including the batch section. + com: The command to launch with the batch configuration. + nodes: The number of nodes to use in the batch launch. If not specified, + it will default to the value in the batch configuration. + batch: An optional batch override from the worker configuration. If not + provided, the function will attempt to retrieve the batch section from + the specification. + + Returns: + The constructed worker launch command, ready to be executed. + + Raises: + AttributeError: If the batch section is missing in the specification. + TypeError: If the `nodes` parameter is of an invalid type. """ if batch is None: try: @@ -289,17 +408,33 @@ def batch_worker_launch( def construct_scheduler_legend(parsed_batch: Dict, nodes: int) -> Dict: """ - Constructs a legend of relevant information needed for each scheduler. This includes: - - bank (str): The flag to add a bank to the launch command - - check cmd (list): The command to run to check if this is the main scheduler for the cluster - - expected check output (str): The expected output from running the check cmd - - launch (str): The initial launch command for the scheduler - - queue (str): The flag to add a queue to the launch command - - walltime (str): The flag to add a walltime to the launch command - - :param `parsed_batch`: A dict of batch configurations - :param `nodes`: An int representing the number of nodes to use in a launch command - :returns: A dict of scheduler related information + Constructs a legend of relevant information needed for each scheduler. + + This function generates a dictionary containing configuration details for various + job schedulers based on the provided batch configuration. The returned dictionary + includes flags for bank, queue, and walltime, as well as commands to check the + scheduler and the initial launch command. + + Args: + parsed_batch: A dictionary of batch configurations, which must include `bank`, + `queue`, `walltime`, and `flux alloc`. See + [`parse_batch_block`][study.batch.parse_batch_block] for more information on + all the settings in this dictionary. + nodes: The number of nodes to use in the launch command. + + Returns: + A dictionary containing scheduler-related information, structured as follows:\n + - For each scheduler (e.g., 'flux', 'lsf', 'pbs', 'slurm'):\n + \t- `bank` (str): The flag to add a bank to the launch command. + - `check cmd` (List[str]): The command to run to check if this is the main + scheduler for the cluster. + - `expected check output` (bytes): The expected output from running + the check command. + - `launch` (str): The initial launch command for the scheduler. + - `queue` (str): The flag to add a queue to the launch command (if + applicable). + - `walltime` (str): The flag to add a walltime to the launch command + (if applicable). """ scheduler_legend = { "flux": { @@ -338,11 +473,26 @@ def construct_scheduler_legend(parsed_batch: Dict, nodes: int) -> Dict: def construct_worker_launch_command(parsed_batch: Dict, nodes: int) -> str: """ - If no 'worker_launch' is found in the batch yaml, this method constructs the needed launch command. - - :param `parsed_batch`: A dict of batch configurations - :param `nodes`:: The number of nodes to use in the batch launch - :returns: The launch command + Constructs the worker launch command based on the provided batch configuration. + + This function generates a launch command for a worker process when no + 'worker_launch' command is specified in the batch configuration. It + utilizes the scheduler legend to incorporate necessary flags such as + bank, queue, and walltime, depending on the workload manager. + + Args: + parsed_batch: A dictionary of batch configurations, which must include + `btype`, `bank`, `queue`, and `walltime`. See + [`parse_batch_block`][study.batch.parse_batch_block] for more information + on all the settings in this dictionary. + nodes: The number of nodes to use in the batch launch. + + Returns: + The constructed launch command for the worker process. + + Raises: + TypeError: If the PBS scheduler is enabled for a batch type other than 'flux'. + KeyError: If the workload manager is not found in the scheduler legend. """ # Initialize launch_command and get the scheduler_legend and workload_manager launch_command: str = "" diff --git a/merlin/study/celeryadapter.py b/merlin/study/celeryadapter.py index 39176206..1c679ab8 100644 --- a/merlin/study/celeryadapter.py +++ b/merlin/study/celeryadapter.py @@ -38,7 +38,8 @@ import time from contextlib import suppress from datetime import datetime -from typing import Dict, List, Optional, Tuple +from types import SimpleNamespace +from typing import Dict, List, Optional, Set, Tuple from amqp.exceptions import ChannelError from celery import Celery @@ -46,7 +47,9 @@ from merlin.common.dumper import dump_handler from merlin.config import Config +from merlin.spec.specification import MerlinSpec from merlin.study.batch import batch_check_parallel, batch_worker_launch +from merlin.study.study import MerlinStudy from merlin.utils import apply_list_of_regex, check_machines, get_procs, get_yaml_var, is_running @@ -55,10 +58,20 @@ # TODO figure out a better way to handle the import of celery app and CONFIG -def run_celery(study, run_mode=None): +def run_celery(study: MerlinStudy, run_mode: str = None): """ - Run the given MerlinStudy object. If the run mode is set to "local" - configure Celery to run locally (without workers). + Run the given [`MerlinStudy`][study.study.MerlinStudy] object with optional + Celery configuration. + + This function executes the provided [`MerlinStudy`][study.study.MerlinStudy] + object. If the `run_mode` is set to "local", it configures Celery to run in + local mode (without utilizing workers). Otherwise, it connects to the Celery + server to queue tasks. + + Args: + study (study.study.MerlinStudy): The study object to be executed. + run_mode: The mode in which to run the study. If set to "local", + Celery runs locally. """ # Only import celery stuff if we want celery in charge # Pylint complains about circular import between merlin.common.tasks -> merlin.router -> merlin.study.celeryadapter @@ -81,14 +94,25 @@ def run_celery(study, run_mode=None): def get_running_queues(celery_app_name: str, test_mode: bool = False) -> List[str]: """ - Check for running celery workers by looking at the currently running processes. - If there are running celery workers, we'll pull the queues from the -Q tag in the - process command. The list returned here will contain only unique celery queue names. - This must be run on the allocation where the workers are running. + Check for running Celery workers and retrieve their associated queues. - :param `celery_app_name`: The name of the celery app (typically merlin here unless testing) - :param `test_mode`: If True, run this function in test mode - :returns: A unique list of celery queues with workers attached to them + This function inspects currently running processes to identify active + Celery workers. It extracts queue names from the `-Q` tag in the + command line of the worker processes. The returned list contains + only unique Celery queue names. This function must be executed + on the allocation where the workers are running. + + Note: + Unlike [`get_active_celery_queues`][study.celeryadapter.get_active_celery_queues], + this function does _not_ go through the application's server. + + Args: + celery_app_name: The name of the Celery app (typically "merlin" + unless in test mode). + test_mode: If True, the function runs in test mode. + + Returns: + A unique list of Celery queue names with workers attached to them. """ running_queues = [] @@ -111,26 +135,36 @@ def get_running_queues(celery_app_name: str, test_mode: bool = False) -> List[st return running_queues -def get_active_celery_queues(app): - """Get all active queues and workers for a celery application. - - Unlike get_running_queues, this goes through the application's server. - Also returns a dictionary with entries for each worker attached to - the given queues. - - :param `celery.Celery` app: the celery application - - :return: queues dictionary with connected workers, all workers - :rtype: (dict of lists of strings, list of strings) - - :example: - - >>> from merlin.celery import app - >>> queues, workers = get_active_celery_queues(app) - >>> queue_names = [*queues] - >>> workers_on_q0 = queues[queue_names[0]] - >>> workers_not_on_q0 = [worker for worker in workers - if worker not in workers_on_q0] +def get_active_celery_queues(app: Celery) -> Tuple[Dict[str, List[str]], List[str]]: + """ + Retrieve all active queues and their associated workers for a Celery application. + + This function queries the application's server to obtain a comprehensive + view of active queues and the workers connected to them. It returns a + dictionary where each key is a queue name and the value is a list of + workers attached to that queue. Additionally, it provides a list of all + active workers in the application. + + Note: + Unlike [`get_running_queues`][study.celeryadapter.get_running_queues], + this function goes through the application's server. + + Args: + app: The Celery application instance. + + Returns: + A tuple containing:\n + - A dictionary mapping queue names to lists of workers connected to them. + - A list of all active workers in the application. + + Example: + ```python + from merlin.celery import app + queues, workers = get_active_celery_queues(app) + queue_names = list(queues) + workers_on_q0 = queues[queue_names[0]] + workers_not_on_q0 = [worker for worker in workers if worker not in workers_on_q0] + ``` """ i = app.control.inspect() active_workers = i.active_queues() @@ -146,14 +180,22 @@ def get_active_celery_queues(app): return queues, [*active_workers] -def get_active_workers(app): +def get_active_workers(app: Celery) -> Dict[str, List[str]]: """ - This is the inverse of get_active_celery_queues() defined above. This function - builds a dict where the keys are worker names and the values are lists - of queues attached to the worker. + Retrieve a mapping of active workers to their associated queues for a Celery application. + + This function serves as the inverse of + [`get_active_celery_queues()`][study.celeryadapter.get_active_celery_queues]. It constructs + a dictionary where each key is a worker's name and the corresponding value is a + list of queues that the worker is connected to. This allows for easy identification + of which queues are being handled by each worker. + + Args: + app: The Celery application instance. - :param `app`: The celery application - :returns: A dict mapping active workers to queues + Returns: + A dictionary mapping active worker names to lists of queue names they are + attached to. If no active workers are found, an empty dictionary is returned. """ # Get the information we need from celery i = app.control.inspect() @@ -173,14 +215,19 @@ def get_active_workers(app): return worker_queue_map -def celerize_queues(queues: List[str], config: Optional[Dict] = None): +def celerize_queues(queues: List[str], config: SimpleNamespace = None): """ - Celery requires a queue tag to be prepended to their - queues so this function will 'celerize' every queue in - a list you provide it by prepending the queue tag. + Prepend a queue tag to each queue in the provided list to conform to Celery's + queue naming requirements. - :param `queues`: A list of queues that need the queue tag prepended. - :param `config`: A dict of configuration settings + This function modifies the input list of queues by adding a specified queue tag + from the configuration. If no configuration is provided, it defaults to using + the global configuration settings. + + Args: + queues: A list of queue names that need the queue tag prepended. + config: A SimpleNamespace of configuration settings. If not provided, the + function will use the default configuration. """ if config is None: from merlin.config.configfile import CONFIG as config # pylint: disable=C0415 @@ -189,14 +236,18 @@ def celerize_queues(queues: List[str], config: Optional[Dict] = None): queues[i] = f"{config.celery.queue_tag}{queue}" -def _build_output_table(worker_list, output_table): +def _build_output_table(worker_list: List[str], output_table: List[Tuple[str, str]]): """ - Helper function for query-status that will build a table - that we'll use as output. + Construct an output table for displaying the status of workers and their associated queues. + + This helper function populates the provided output table with entries for each worker + in the given worker list. It retrieves the mapping of active workers to their queues + and formats the data accordingly. - :param `worker_list`: A list of workers to add to the table - :param `output_table`: A list of tuples where each entry is - of the form (worker name, associated queues) + Args: + worker_list: A list of worker names to be included in the output table. + output_table: A list of tuples where each entry will be of the form + (worker name, associated queues). """ from merlin.celery import app # pylint: disable=C0415 @@ -211,15 +262,22 @@ def _build_output_table(worker_list, output_table): output_table.append((worker, ", ".join(worker_queue_map[worker]))) -def query_celery_workers(spec_worker_names, queues, workers_regex): +def query_celery_workers(spec_worker_names: List[str], queues: List[str], workers_regex: List[str]): """ - Look for existing celery workers. Filter by spec, queues, or - worker names if provided by user. At the end, print a table - of workers and their associated queues. - - :param `spec_worker_names`: The worker names defined in a spec file - :param `queues`: A list of queues to filter by - :param `workers_regex`: A list of regexs to filter by + Query and filter existing Celery workers based on specified criteria, + and print a table of the workers along with their associated queues. + + This function retrieves the list of active Celery workers and filters them + according to the provided specifications, including worker names from a + spec file, specific queues, and regular expressions for worker names. + It then constructs and displays a table of the matching workers and their + associated queues. + + Args: + spec_worker_names: A list of worker names defined in a spec file + to filter the workers. + queues: A list of queues to filter the workers by. + workers_regex: A list of regular expressions to filter the worker names. """ from merlin.celery import app # pylint: disable=C0415 @@ -280,11 +338,21 @@ def query_celery_workers(spec_worker_names, queues, workers_regex): def build_csv_queue_info(query_return: List[Tuple[str, int, int]], date: str) -> Dict[str, List]: """ - Build the lists of column labels and queue info to write to the csv file. + Construct a dictionary containing queue information and column labels + for writing to a CSV file. + + This function processes the output from the [`query_queues`][router.query_queues] + function and organizes the data into a format suitable for CSV export. It includes + a timestamp to indicate when the status was recorded. + + Args: + query_return: The output from the [`query_queues`][router.query_queues] function, + containing queue names and their associated statistics. + date: A timestamp indicating when the queue status was recorded. - :param query_return: The output of `query_queues` - :param date: A timestamp for us to mark when this status occurred - :returns: A dict of queue information to dump to a csv file + Returns: + A dictionary where keys are column labels and values are lists containing the + corresponding queue information, formatted for CSV output. """ # Build the list of labels if necessary csv_to_dump = {"time": [date]} @@ -297,11 +365,22 @@ def build_csv_queue_info(query_return: List[Tuple[str, int, int]], date: str) -> def build_json_queue_info(query_return: List[Tuple[str, int, int]], date: str) -> Dict: """ - Build the dict of queue info to dump to the json file. - - :param query_return: The output of `query_queues` - :param date: A timestamp for us to mark when this status occurred - :returns: A dictionary that's ready to dump to a json outfile + Construct a dictionary containing queue information for JSON export. + + This function processes the output from the [`query_queues`][router.query_queues] + function and organizes the data into a structured format suitable for JSON + serialization. It includes a timestamp to indicate when the queue status was + recorded. + + Args: + query_return: The output from the [`query_queues`][router.query_queues] + function, containing queue names and their associated statistics. + date: A timestamp indicating when the queue status was recorded. + + Returns: + A dictionary structured for JSON output, where the keys are timestamps + and the values are dictionaries containing queue names and their + corresponding statistics (tasks and consumers). """ # Get the datetime so we can track different entries and initalize a new json entry json_to_dump = {date: {}} @@ -315,11 +394,17 @@ def build_json_queue_info(query_return: List[Tuple[str, int, int]], date: str) - def dump_celery_queue_info(query_return: List[Tuple[str, int, int]], dump_file: str): """ - Format the information we're going to dump in a way that the Dumper class can - understand and add a timestamp to the info. + Format and dump Celery queue information to a specified file. + + This function processes the output from the `query_queues` function, formats + the data according to the file type (CSV or JSON), and adds a timestamp + to the information before writing it to the specified file. - :param query_return: The output of `query_queues` - :param dump_file: The filepath of the file we'll dump queue info to + Args: + query_return: The output from the [`query_queues`][router.query_queues] + function, containing queue names and their associated statistics. + dump_file: The filepath of the file where the queue information + will be written. The file extension determines the format (CSV or JSON). """ # Get a timestamp for this dump date = datetime.now().strftime("%Y-%m-%d %H:%M:%S") @@ -336,16 +421,25 @@ def dump_celery_queue_info(query_return: List[Tuple[str, int, int]], dump_file: dump_handler(dump_file, dump_info) -def _get_specific_queues(queues: set, specific_queues: List[str], spec: "MerlinSpec", verbose=True) -> set: # noqa: F821 +def _get_specific_queues(queues: Set[str], specific_queues: List[str], spec: MerlinSpec, verbose=True) -> Set[str]: """ - Search for specific queues that the user asked for. The queues that cannot be found will not - be returned. The queues that can be found will be added to a set and returned. - - :param queues: Either an empty set or a set of queues from `spec` - :param specific_queues: The list of queues that we're going to search for - :param spec: A `MerlinSpec` object or None - :param verbose: If True, display log messages. Otherwise, don't. - :returns: A set of the specific queues that were found to exist. + Retrieve a set of specific queues requested by the user, filtering out those that do not exist. + + This function checks a provided list of specific queues against a set of existing queues + (from a [`MerlinSpec`][spec.specification.MerlinSpec] object) and returns a set of queues + that are found. If a queue is not found in the existing set, it will be excluded from the + results. The function also logs messages based on the verbosity setting. + + Args: + queues: A set of existing queues, which may be empty or populated from the `spec` + object. + specific_queues: A list of specific queue names to search for. + spec (spec.specification.MerlinSpec): A [`MerlinSpec`][spec.specification.MerlinSpec] + object that may provide context for the search. Can be None. + verbose: If True, log messages will be displayed. + + Returns: + A set containing the specific queues that were found in the existing queues. """ if verbose: LOG.info(f"Filtering queues to query by these specific queues: {specific_queues}") @@ -376,21 +470,30 @@ def _get_specific_queues(queues: set, specific_queues: List[str], spec: "MerlinS def build_set_of_queues( - spec: "MerlinSpec", # noqa: F821 + spec: MerlinSpec, steps: List[str], specific_queues: List[str], - verbose: Optional[bool] = True, - app: Optional["Celery"] = None, # noqa: F821 -) -> set: + verbose: bool = True, + app: Celery = None, +) -> Set[str]: """ - Build a set of queues to query based on the parameters given here. - - :param spec: A `MerlinSpec` object or None - :param steps: Spaced-separated list of stepnames to query. Default is all - :param specific_queues: A list of queue names to query or None - :param verbose: A bool to determine whether to output log statements or not - :param app: A celery app object, if left out we'll just import it - :returns: A set of queues to investigate + Construct a set of queues to query based on the provided parameters. + + This function builds a set of queues by querying a [`MerlinSpec`][spec.specification.MerlinSpec] + object for queues associated with specified steps and/or filtering for specific queue names. + If no spec or specific queues are provided, it defaults to querying active queues from the Celery + application. + + Args: + spec (spec.specification.MerlinSpec): A [`MerlinSpec`][spec.specification.MerlinSpec] + object that defines the context for the query. Can be None. + steps: A list of step names to query. If empty, all steps are considered. + specific_queues: A list of specific queue names to filter. Can be None. + verbose: If True, log statements will be output. Defaults to True. + app: A Celery application instance. If None, it will be imported. + + Returns: + A set of queue names to investigate based on the provided parameters. """ if app is None: from merlin.celery import app # pylint: disable=C0415 @@ -426,13 +529,30 @@ def build_set_of_queues( def query_celery_queues(queues: List[str], app: Celery = None, config: Config = None) -> Dict[str, Dict[str, int]]: """ - Build a dict of information about the number of jobs and consumers attached - to specific queues that we want information on. - - :param queues: A list of the queues we want to know about - :param app: The celery application (this will be none unless testing) - :param config: The configuration object that has the broker name (this will be none unless testing) - :returns: A dict of info on the number of jobs and consumers for each queue in `queues` + Retrieve information about the number of jobs and consumers for specified Celery queues. + + This function constructs a dictionary containing details about the number of jobs + and consumers associated with each queue provided in the input list. It connects + to the Celery application to gather this information, handling both Redis and + RabbitMQ brokers. + + Notes: + - If the specified queue does not exist or has no jobs, it will be handled gracefully. + - For Redis brokers, the function counts consumers by inspecting active queues + since Redis does not track consumers like RabbitMQ does. + + Args: + queues: A list of queue names for which to gather information. + app: The Celery application instance. Defaults to None, which triggers an import + for testing purposes. + config (config.Config): A configuration object containing broker details. + Defaults to None, which also triggers an import for testing. + + Returns: + A dictionary where each key is a queue name and the value is another dictionary + containing:\n + - "jobs": The number of jobs in the queue. + - "consumers": The number of consumers attached to the queue. """ if app is None: from merlin.celery import app # pylint: disable=C0415 @@ -474,12 +594,17 @@ def query_celery_queues(queues: List[str], app: Celery = None, config: Config = return queue_info -def get_workers_from_app(): - """Get all workers connected to a celery application. +def get_workers_from_app() -> List[str]: + """ + Retrieve a list of all workers connected to the Celery application. + + This function uses the Celery control interface to inspect the current state + of the application and returns a list of workers that are currently connected. + If no workers are found, an empty list is returned. - :param `celery.Celery` app: the celery application - :return: A list of all connected workers - :rtype: list + Returns: + A list of worker names that are currently connected to the Celery application. + If no workers are connected, an empty list is returned. """ from merlin.celery import app # pylint: disable=C0415 @@ -492,11 +617,19 @@ def get_workers_from_app(): def check_celery_workers_processing(queues_in_spec: List[str], app: Celery) -> bool: """ - Query celery to see if any workers are still processing tasks. + Check if any Celery workers are currently processing tasks from specified queues. + + This function queries the Celery application to determine if there are any active + tasks being processed by workers for the given list of queues. It returns a boolean + indicating whether any tasks are currently active. - :param queues_in_spec: A list of queues to check if tasks are still active in - :param app: The celery app that we're querying - :returns: True if workers are still processing tasks, False otherwise + Args: + queues_in_spec: A list of queue names to check for active tasks. + app: The Celery application instance used for querying. + + Returns: + True if any workers are processing tasks in the specified queues; False + otherwise. """ # Query celery for active tasks active_tasks = app.control.inspect().active() @@ -511,15 +644,23 @@ def check_celery_workers_processing(queues_in_spec: List[str], app: Celery) -> b return False -def _get_workers_to_start(spec, steps): +def _get_workers_to_start(spec: MerlinSpec, steps: List[str]) -> Set[str]: """ - Helper function to return a set of workers to start based on - the steps provided by the user. + Determine the set of workers to start based on the specified steps. + + This helper function retrieves a mapping of steps to their corresponding workers + from a [`MerlinSpec`][spec.specification.MerlinSpec] object and returns a unique + set of workers that should be started for the provided list of steps. If a step + is not found in the mapping, a warning is logged. - :param `spec`: A MerlinSpec object - :param `steps`: A list of steps to start workers for + Args: + spec (spec.specification.MerlinSpec): An instance of the + [`MerlinSpec`][spec.specification.MerlinSpec] class that contains the + mapping of steps to workers. + steps: A list of steps for which workers need to be started. - :returns: A set of workers to start + Returns: + A set of unique workers to be started based on the specified steps. """ workers_to_start = [] step_worker_map = spec.get_step_worker_map() @@ -535,14 +676,25 @@ def _get_workers_to_start(spec, steps): return workers_to_start -def _create_kwargs(spec): +def _create_kwargs(spec: MerlinSpec) -> Tuple[Dict[str, str], Dict]: """ - Helper function to handle creating the kwargs dict that - we'll pass to subprocess.Popen when we launch the worker. - - :param `spec`: A MerlinSpec object - :returns: A tuple where the first entry is the kwargs and - the second entry is variables defined in the spec + Construct the keyword arguments for launching a worker process. + + This helper function creates a dictionary of keyword arguments that will be + passed to `subprocess.Popen` when launching a worker. It retrieves the + environment variables defined in a [`MerlinSpec`][spec.specification.MerlinSpec] + object and updates the shell environment accordingly. + + Args: + spec (spec.specification.MerlinSpec): An instance of the MerlinSpec class + that contains environment specifications. + + Returns: + A tuple containing: + - A dictionary of keyword arguments for `subprocess.Popen`, including + the updated environment. + - A dictionary of variables defined in the spec, or None if no variables + were defined. """ # Get the environment from the spec and the shell spec_env = spec.environment @@ -563,15 +715,24 @@ def _create_kwargs(spec): return kwargs, yaml_vars -def _get_steps_to_start(wsteps, steps, steps_provided): +def _get_steps_to_start(wsteps: List[str], steps: List[str], steps_provided: bool) -> List[str]: """ - Determine which steps to start workers for. - - :param `wsteps`: A list of steps associated with a worker - :param `steps`: A list of steps to start provided by the user - :param `steps`: A bool representing whether the user gave specific - steps to start or not - :returns: A list of steps to start workers for + Identify the steps for which workers should be started. + + This function determines which steps to initiate based on the steps + associated with a worker and the user-provided steps. If specific steps + are provided by the user, only those steps that match the worker's steps + will be included. If no specific steps are provided, all worker-associated + steps will be returned. + + Args: + wsteps: A list of steps that are associated with a worker. + steps: A list of steps specified by the user to start workers for. + steps_provided: A boolean indicating whether the user provided + specific steps to start. + + Returns: + A list of steps for which workers should be started. """ steps_to_start = [] if steps_provided: @@ -584,31 +745,52 @@ def _get_steps_to_start(wsteps, steps, steps_provided): return steps_to_start -def start_celery_workers(spec, steps, celery_args, disable_logs, just_return_command): # pylint: disable=R0914,R0915 - """Start the celery workers on the allocation - - :param MerlinSpec spec: A MerlinSpec object representing our study - :param list steps: A list of steps to start workers for - :param str celery_args: A string of arguments to provide to the celery workers - :param bool disable_logs: A boolean flag to turn off the celery logs for the workers - :param bool just_return_command: When True, workers aren't started and just the launch command(s) - are returned - :side effect: Starts subprocesses for each worker we launch - :returns: A string of all the worker launch commands - ... - - example config: - - merlin: - resources: - task_server: celery - overlap: False - workers: - simworkers: - args: -O fair --prefetch-multiplier 1 -E -l info --concurrency 4 - steps: [run, data] - nodes: 1 - machine: [hostA, hostB] +def start_celery_workers( + spec: MerlinSpec, + steps: List[str], + celery_args: str, + disable_logs: bool, + just_return_command: bool +) -> str: # pylint: disable=R0914,R0915 + """ + Start Celery workers based on the provided specifications and steps. + + This function initializes and starts Celery workers for the specified steps + in the given [`MerlinSpec`][spec.specification.MerlinSpec]. It constructs + the necessary command-line arguments and handles the launching of subprocesses + for each worker. If the `just_return_command` flag is set to `True`, it will + return the command(s) to start the workers without actually launching them. + + Args: + spec (spec.specification.MerlinSpec): A [`MerlinSpec`][spec.specification.MerlinSpec] + object representing the study configuration. + steps: A list of steps for which to start workers. + celery_args: A string of additional arguments to pass to the Celery workers. + disable_logs: A flag to disable logging for the Celery workers. + just_return_command: If `True`, returns the launch command(s) without starting the workers. + + Returns: + A string containing all the worker launch commands. + + Side Effects: + - Starts subprocesses for each worker that is launched, so long as `just_return_command` + is not True. + + Example: + Below is an example configuration for Merlin workers: + + ```yaml + merlin: + resources: + task_server: celery + overlap: False + workers: + simworkers: + args: -O fair --prefetch-multiplier 1 -E -l info --concurrency 4 + steps: [run, data] + nodes: 1 + machine: [hostA, hostB] + ``` """ if not just_return_command: LOG.info("Starting workers") @@ -701,10 +883,25 @@ def start_celery_workers(spec, steps, celery_args, disable_logs, just_return_com return str(worker_list) -def examine_and_log_machines(worker_val, yenv) -> bool: +def examine_and_log_machines(worker_val: Dict, yenv: Dict[str, str]) -> bool: """ - Examines whether a worker should be skipped in a step of start_celery_workers(), logs errors in output path for a celery - worker. + Determine if a worker should be skipped based on machine availability and log any errors. + + This function checks the specified machines for a worker and determines + whether the worker can be started. If the machines are not available, + it logs an error message regarding the output path for the Celery worker. + If the environment variables (`yenv`) are not provided or do not specify + an output path, a warning is logged. + + Args: + worker_val: A dictionary containing worker configuration, including + the list of machines associated with the worker. + yenv: A dictionary of environment variables that may include the + output path for logging. + + Returns: + Returns `True` if the worker should be skipped (i.e., machines are + unavailable), otherwise returns `False`. """ worker_machines = get_yaml_var(worker_val, "machines", None) if worker_machines: @@ -725,8 +922,33 @@ def examine_and_log_machines(worker_val, yenv) -> bool: return False -def verify_args(spec, worker_args, worker_name, overlap, disable_logs=False): - """Examines the args passed to a worker for completeness.""" +def verify_args( + spec: MerlinSpec, + worker_args: str, + worker_name: str, + overlap: bool, + disable_logs: bool = False +) -> str: + """ + Validate and enhance the arguments passed to a Celery worker for completeness. + + This function checks the provided worker arguments to ensure that they include + recommended settings for running parallel tasks. It adds default values for + concurrency, prefetch multiplier, and logging level if they are not specified. + Additionally, it generates a unique worker name based on the current time if + the `-n` argument is not provided. + + Args: + spec (spec.specification.MerlinSpec): A [`MerlinSpec`][spec.specification.MerlinSpec] + object containing the study configuration. + worker_args: A string of arguments passed to the worker that may need validation. + worker_name: The name of the worker, used for generating a unique worker identifier. + overlap: A flag indicating whether multiple workers can overlap in their queue processing. + disable_logs: A flag to disable logging configuration for the worker. + + Returns: + The validated and potentially modified worker arguments string. + """ parallel = batch_check_parallel(spec) if parallel: if "--concurrency" not in worker_args: @@ -750,15 +972,29 @@ def verify_args(spec, worker_args, worker_name, overlap, disable_logs=False): return worker_args -def launch_celery_worker(worker_cmd, worker_list, kwargs): +def launch_celery_worker(worker_cmd: str, worker_list: List[str], kwargs: Dict): """ - Using the worker launch command provided, launch a celery worker. - :param str worker_cmd: The celery command to launch a worker - :param list worker_list: A list of worker launch commands - :param dict kwargs: A dictionary containing additional keyword args to provide - to subprocess.Popen - - :side effect: Launches a celery worker via a subprocess + Launch a Celery worker using the specified command and parameters. + + This function executes the provided Celery command to start a worker as a + subprocess. It appends the command to the given list of worker commands + for tracking purposes. If the worker fails to start, an error is logged. + + Args: + worker_cmd: The command string used to launch the Celery worker. + worker_list: A list that will be updated to include the launched + worker command for tracking active workers. + kwargs: A dictionary of additional keyword arguments to pass to + `subprocess.Popen`, allowing for customization of the subprocess + behavior. + + Raises: + Exception: If the worker fails to start, an error is logged, and the + exception is re-raised. + + Side Effects: + - Launches a Celery worker process in the background. + - Modifies the `worker_list` by appending the launched worker command. """ try: _ = subprocess.Popen(worker_cmd, **kwargs) # pylint: disable=R1732 @@ -768,12 +1004,25 @@ def launch_celery_worker(worker_cmd, worker_list, kwargs): raise -def get_celery_cmd(queue_names, worker_args="", just_return_command=False): +def get_celery_cmd(queue_names: str, worker_args: str = "", just_return_command: bool = False) -> str: """ - Get the appropriate command to launch celery workers for the specified MerlinStudy. - queue_names The name(s) of the queue(s) to associate a worker with - worker_args Optional celery arguments for the workers - just_return_command Don't execute, just return the command + Construct the command to launch Celery workers for the specified queues. + + This function generates a command string that can be used to start Celery + workers associated with the provided queue names. It allows for optional + worker arguments to be included and can return the command without executing it. + + Args: + queue_names: A comma-separated string of the queue name(s) to which the worker + will be associated. + worker_args: Additional command-line arguments for the Celery worker. + just_return_command: If True, the function will return the constructed command + without executing it. + + Returns: + The constructed command string for launching the Celery worker. If + `just_return_command` is True, returns the command; otherwise, returns an + empty string. """ worker_command = " ".join(["celery -A merlin worker", worker_args, "-Q", queue_names]) if just_return_command: @@ -783,12 +1032,24 @@ def get_celery_cmd(queue_names, worker_args="", just_return_command=False): return "" -def purge_celery_tasks(queues, force): +def purge_celery_tasks(queues: str, force: bool) -> int: """ - Purge celery tasks for the specified spec file. - - queues Which queues to purge - force Purge without asking for confirmation + Purge Celery tasks from the specified queues. + + This function constructs and executes a command to purge tasks from the + specified Celery queues. If the `force` parameter is set to True, the + purge operation will be executed without prompting for confirmation. + + Args: + queues: A comma-separated string of the queue name(s) from which + tasks should be purged. + force: If True, the purge operation will be executed without asking + for user confirmation. + + Returns: + The return code from the subprocess execution. A return code of + 0 indicates success, while any non-zero value indicates an error + occurred during the purge operation. """ # This version will purge all queues. # from merlin.celery import app @@ -801,24 +1062,35 @@ def purge_celery_tasks(queues, force): return subprocess.run(purge_command, shell=True).returncode -def stop_celery_workers(queues=None, spec_worker_names=None, worker_regex=None): # pylint: disable=R0912 - """Send a stop command to celery workers. - - Default behavior is to stop all connected workers. - As options can downselect to only workers on certain queues and/or that - match a regular expression. - - :param list queues: The queues to send stop signals to. If None: stop all - :param list spec_worker_names: Worker names read from a spec to stop, in addition to worker_regex matches. - :param str worker_regex: The regex string to match worker names. If None: - :return: Return code from stop command - - :example: - - >>> stop_celery_workers(queues=['hello'], worker_regex='celery@*my_machine*') - - >>> stop_celery_workers() - +def stop_celery_workers( + queues: List[str] = None, + spec_worker_names: List[str] = None, + worker_regex: List[str] = None +): # pylint: disable=R0912 + """ + Send a stop command to Celery workers. + + This function sends a shutdown command to Celery workers associated with + specified queues. By default, it stops all connected workers, but it can + be configured to target specific workers based on queue names or regular + expression patterns. + + Args: + queues: A list of queue names to which the stop command will be sent. + If None, all connected workers across all queues will be stopped. + spec_worker_names: A list of specific worker names to stop, in addition + to those matching the `worker_regex`. + worker_regex: A regular expression string used to match worker names. + If None, no regex filtering will be applied. + + Side Effects: + - Broadcasts a shutdown signal to Celery workers + + Example: + ```python + stop_celery_workers(queues=['hello'], worker_regex='celery@*my_machine*') + stop_celery_workers() + ``` """ from merlin.celery import app # pylint: disable=C0415 @@ -870,13 +1142,24 @@ def stop_celery_workers(queues=None, spec_worker_names=None, worker_regex=None): LOG.warning("No workers found to stop") -def create_celery_config(config_dir, data_file_name, data_file_path): +def create_celery_config(config_dir: str, data_file_name: str, data_file_path: str): """ - Command to setup default celery merlin config. - - :param `config_dir`: The directory to create the config file. - :param `data_file_name`: The name of the config file. - :param `data_file_path`: The full data file path. + Set up the default Celery configuration for Merlin. + + This function creates a configuration file for Celery in the specified + directory. If the configuration file already exists, it logs an + informational message and does not overwrite the existing file. If the + file does not exist, it reads from a specified data file and writes + its contents to the new configuration file. + + Args: + config_dir: The directory where the configuration file will be created. + data_file_name: The name of the configuration file to be created. + data_file_path: The full path to the data file from which the + configuration content will be read. + + Side Effects: + - Creates a configuration file if one does not already exist """ # This will need to come from the server interface MERLIN_CONFIG = os.path.join(config_dir, data_file_name) # pylint: disable=C0103 diff --git a/merlin/study/dag.py b/merlin/study/dag.py index 06f75bea..fc99a740 100644 --- a/merlin/study/dag.py +++ b/merlin/study/dag.py @@ -29,57 +29,91 @@ ############################################################################### """ -Holds DAG class. TODO make this an interface, separate from Maestro. +Holds the Merlin DAG class. """ from collections import OrderedDict +from typing import Dict, List from merlin.study.step import Step +# TODO make this an interface, separate from Maestro. class DAG: """ This class provides methods on a task graph that Merlin needs for staging - tasks in celery. It is initialized from am maestro ExecutionGraph, and the + tasks in Celery. It is initialized from a Maestro `ExecutionGraph`, and the major entry point is the group_tasks method, which provides groups of independent chains of tasks. + + Attributes: + backwards_adjacency: A dictionary mapping each task to its parent tasks for reverse + traversal. + column_labels: A list of column labels provided in the spec file. + maestro_adjacency_table: An ordered dict showing adjacency of nodes. Comes from + a maestrowf `ExecutionGraph`. + maestro_values: An ordered dict of the values at each node. Comes from a maestrowf + `ExecutionGraph`. + parameter_info: A dict containing information about parameters in the study. + study_name: The name of the study. + + Methods: + calc_backwards_adjacency: Initializes the backwards adjacency table. + calc_depth: Calculate the depth of the given node and its children. + children: Return the children of the task. + compatible_merlin_expansion: Check if two tasks are compatible for Merlin expansion. + find_chain: Find the chain containing the task. + find_independent_chains: Finds independent chains and adjusts with the groups of chains + to maximize parallelism. + group_by_depth: Group DAG tasks by depth. + group_tasks: Group independent tasks in a directed acyclic graph (DAG). + num_children: Find the number of children for the given task in the DAG. + num_parents: Find the number of parents for the given task in the DAG. + parents: Return the parents of the task. + step: Return a [`Step`][study.step.Step] object for the given task name. """ def __init__( - self, maestro_adjacency_table, maestro_values, column_labels, study_name, parameter_info + self, + maestro_adjacency_table: OrderedDict, + maestro_values: OrderedDict, + column_labels: List[str], + study_name: str, + parameter_info: Dict, ): # pylint: disable=R0913 - """ - :param `maestro_adjacency_table`: An ordered dict showing adjacency of nodes. Comes from a maestrowf ExecutionGraph. - :param `maestro_values`: An ordered dict of the values at each node. Comes from a maestrowf ExecutionGraph. - :param `column_labels`: A list of column labels provided in the spec file. - :param `study_name`: The name of the study - :param `parameter_info`: A dict containing information about parameters in the study - """ # We used to store the entire maestro ExecutionGraph here but now it's # unpacked so we're only storing the 2 attributes from it that we use: # the adjacency table and the values. This had to happen to get pickle # to work for Celery. - self.maestro_adjacency_table = maestro_adjacency_table - self.maestro_values = maestro_values - self.column_labels = column_labels - self.study_name = study_name - self.parameter_info = parameter_info - self.backwards_adjacency = {} + self.maestro_adjacency_table: OrderedDict = maestro_adjacency_table + self.maestro_values: OrderedDict = maestro_values + self.column_labels: List[str] = column_labels + self.study_name: str = study_name + self.parameter_info: Dict = parameter_info + self.backwards_adjacency: Dict = {} self.calc_backwards_adjacency() - def step(self, task_name): - """Return a Step object for the given task name + def step(self, task_name: str) -> Step: + """ + Return a Step object for the given task name. + + Args: + task_name: The task name. - :param `task_name`: The task name. - :return: A Merlin Step object. + Returns: + A Merlin [`Step`][study.step.Step] object representing the + task's configuration and parameters. """ return Step(self.maestro_values[task_name], self.study_name, self.parameter_info) - def calc_depth(self, node, depths, current_depth=0): - """Calculate the depth of the given node and its children. + def calc_depth(self, node: str, depths: Dict, current_depth: int = 0): + """ + Calculate the depth of the given node and its children. This recursive + method will update `depths` in place. - :param `node`: The node (str) to start at. - :param `depths`: the dictionary of depths to update. - :param `current_depth`: the current depth in the graph traversal. + Args: + node: The node to start at. + depths: The dictionary of depths to update. + current_depth: The current depth in the graph traversal. """ if node not in depths: depths[node] = current_depth @@ -90,22 +124,30 @@ def calc_depth(self, node, depths, current_depth=0): self.calc_depth(child, depths, current_depth=depths[node] + 1) @staticmethod - def group_by_depth(depths): - """Group DAG tasks by depth. + def group_by_depth(depths: Dict) -> List[List[List]]: + """ + Group DAG tasks by depth. + + This method only groups by depth, and has one task in every chain. + [`find_independent_chains`][study.dag.DAG.find_independent_chains] is used + to figure out how to coalesce chains across depths. - :param `depths`: the dictionary of depths to group by + Args: + depths (dict): The dictionary of depths to group by. - :return: a list of lists of lists ordered by depth + Returns: + A list of lists of lists ordered by depth. - ([[["tasks"],["with"],["Depth 0"]],[["tasks"],["with"],["Depth 1"]]]) + Example: + This method will return a list that could look something like this: - The outer index of this list is the depth, the middle index is which - chain of tasks in that depth, and the inner index is the task id in - that chain. + ```python + [[["tasks"], ["with"], ["Depth 0"]], [["tasks"], ["with"], ["Depth 1"]]] + ``` - This method only groups by depth, and has one task in every chain. - find_independent_chains is used to figure out how to coalesce chains - across depths. + Here, the outer index of this list is the depth, the middle index is + which chain of tasks in that depth, and the inner index is the task + id in that chain. """ groups = {} for node in depths: @@ -123,44 +165,66 @@ def group_by_depth(depths): return list_of_groups_of_chains - def children(self, task_name): - """Return the children of the task. - :param `task_name`: The name of the task to get the children of. + def children(self, task_name: str) -> List: + """ + Return the children of the task. + + Args: + task_name: The name of the task to get the children of. - :return: list of children of this task. + Returns: + List of children of this task. """ return self.maestro_adjacency_table[task_name] - def num_children(self, task_name): - """Find the number of children for the given task in the dag. - :param `task_name`: The name of the task to count the children of. + def num_children(self, task_name: str) -> int: + """ + Find the number of children for the given task in the DAG. + + Args: + task_name: The name of the task to count the children of. - :return : number of children this task has + Returns: + Number of children this task has. """ return len(self.children(task_name)) - def parents(self, task_name): - """Return the parents of the task. - :param `task_name` : The name of the task to get the parents of. + def parents(self, task_name: str) -> List: + """ + Return the parents of the task. + + Args: + task_name: The name of the task to get the parents of. - :return : list of parents of this task""" + Returns: + List of parents of this task. + """ return self.backwards_adjacency[task_name] - def num_parents(self, task_name): - """find the number of parents for the given task in the dag - :param `task_name` : The name of the task to count the parents of + def num_parents(self, task_name: str) -> int: + """ + Find the number of parents for the given task in the DAG. + + Args: + task_name: The name of the task to count the parents of. - :return : number of parents this task has""" + Returns: + Number of parents this task has. + """ return len(self.parents(task_name)) @staticmethod - def find_chain(task_name, list_of_groups_of_chains): - """find the chain containing the task - :param `task_name` : The task to search for. - :param `list_of_groups_of_chains` : list of groups of chains to search - for the task + def find_chain(task_name: str, list_of_groups_of_chains: List[List[List]]) -> List: + """ + Find the chain containing the task. + + Args: + task_name: The task to search for. + list_of_groups_of_chains: List of groups of chains to search for the task. - :return : the list representing the chain containing task_name""" + Returns: + The list representing the chain containing task_name, or None if not found. + """ for group in list_of_groups_of_chains: for chain in group: if task_name in chain: @@ -168,7 +232,42 @@ def find_chain(task_name, list_of_groups_of_chains): return None def calc_backwards_adjacency(self): - """initializes our backwards adjacency table""" + """ + Initializes the backwards adjacency table. + + This method constructs a mapping of each task to its parent tasks in the directed + acyclic graph (DAG). The backwards adjacency table allows for reverse traversal + of the graph, enabling the identification of dependencies for each task. + + The method iterates through each parent task in the `maestro_adjacency_table` + and updates the `backwards_adjacency` dictionary. For each task that is a child + of a parent, it adds the parent to the list of that task's parents in the + `backwards_adjacency` table. + + This is essential for operations that require knowledge of a task's dependencies, + such as determining the order of execution or identifying independent tasks. + + Example: + If the `maestro_adjacency_table` is structured as follows: + + ```python + { + 'A': ['B', 'C'], + 'B': ['D'], + 'C': ['D'] + } + ``` + + After calling this method, the `backwards_adjacency` will be: + + ```python + { + 'B': ['A'], + 'C': ['A'], + 'D': ['B', 'C'] + } + ``` + """ for parent in self.maestro_adjacency_table: for task_name in self.maestro_adjacency_table[parent]: if task_name in self.backwards_adjacency: @@ -176,34 +275,52 @@ def calc_backwards_adjacency(self): else: self.backwards_adjacency[task_name] = [parent] - def compatible_merlin_expansion(self, task1, task2): + def compatible_merlin_expansion(self, task1: str, task2: str) -> bool: """ - TODO + Check if two tasks are compatible for Merlin expansion. + + This method compares the expansion needs of two tasks to determine + if they can be expanded together. + + Args: + task1: The first task. + task2: The second task. + + Returns: + True if compatible, False otherwise. """ step1 = self.step(task1) step2 = self.step(task2) return step1.check_if_expansion_needed(self.column_labels) == step2.check_if_expansion_needed(self.column_labels) - def find_independent_chains(self, list_of_groups_of_chains): + def find_independent_chains(self, list_of_groups_of_chains: List[List[List]]) -> List[List[List]]: """ - Finds independent chains and adjusts with the groups of chains to - maximalize parallelism + Finds independent chains and adjusts with the groups of chains to maximize parallelism. - :param list_of_groups_of_chains: List of list of lists, as returned by - self.group_by_depth + This method looks for opportunities to move tasks in deeper groups of chains + into chains in shallower groups, thus increasing available parallelism in execution. - e.g., + Args: + list_of_groups_of_chains: List of list of lists, as returned by + [`group_by_depth`][study.dag.DAG.group_by_depth]. - ([[["task1"],["with"],["Depth 0"]],[["task2"],["has"],["Depth 1"]]]) + Returns: + list: Adjusted list of groups of chains to maximize parallelism. - :return : This takes the groups of chains and looks for opportunities - to move tasks in deeper groups of chains into chains in shallower - groups, thus increasing available parallelism in the execution. + Example: + Given input chains, the method may return a modified structure that allows + for more tasks to be executed in parallel. For example, we might start with + this: - Depending on the precise parental relationships between the tasks - in the graph the output may be something like: + ```python + [[["task1"], ["with"], ["Depth 0"]], [["task2"], ["has"], ["Depth 1"]]] + ``` - ([[["task1", "has"],["with","task2"],["Depth 0"]],["Depth 1"]]]) + and finish with this: + + ```python + [[["task1", "has"], ["with", "task2"], ["Depth 0"]], ["Depth 1"]]] + ``` """ for group in list_of_groups_of_chains: for chain in group: @@ -220,14 +337,18 @@ def find_independent_chains(self, list_of_groups_of_chains): return new_list_2 - def group_tasks(self, source_node): - """Group independent tasks in a directed acyclic graph (DAG). + def group_tasks(self, source_node: str) -> List[List[List]]: + """ + Group independent tasks in a directed acyclic graph (DAG). + + Starts from a source node and works down, grouping tasks by depth, + then identifies independent parallel chains in those groups. - Starts from a source node and works down, grouping tasks by - depth, then identify independent parallel chains in those groups. + Args: + source_node: The source node from which to start grouping tasks. - :param dag : The DAG - :param source_node: The source node. + Returns: + A list of independent chains of tasks. """ depths = {} self.calc_depth(source_node, depths) diff --git a/merlin/study/status.py b/merlin/study/status.py index b785ee14..6ffd334f 100644 --- a/merlin/study/status.py +++ b/merlin/study/status.py @@ -27,7 +27,7 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. ############################################################################### -"""This module handles all the functionality of getting the statuses of studies""" +"""This module handles all the functionality of getting the statuses of studies.""" import json import logging import os @@ -37,7 +37,7 @@ from datetime import datetime from glob import glob from traceback import print_exception -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Dict, List, Set, Tuple, Union import numpy as np from filelock import FileLock, Timeout @@ -47,6 +47,7 @@ from merlin.common.dumper import dump_handler from merlin.display import ANSI_COLORS, display_status_summary, display_status_task_by_task from merlin.spec.expansion import get_spec_with_expansion +from merlin.spec.specification import MerlinSpec from merlin.study.status_constants import ( ALL_VALID_FILTERS, CELERY_KEYS, @@ -72,15 +73,47 @@ class Status: """ - This class handles everything to do with status besides displaying it. - Display functionality is handled in display.py. + Handles the management and retrieval of status information for studies. + + This class is responsible for loading specifications, tracking the status of steps, + calculating runtime statistics, and formatting status information for output in + various formats (JSON, CSV). It interacts with the file system to read status files + and provides methods to display and dump status information. + + Attributes: + args: Command-line arguments provided by the user. + full_step_name_map: A mapping of overall step names to full step names. + num_requested_statuses: Counts the number of task statuses in the `requested_statuses` + dictionary. + requested_statuses: A dictionary storing the statuses that the user wants to view. + run_time_info: A dictionary storing runtime statistics for each step. + spec (spec.specification.MerlinSpec): A [`MerlinSpec`][spec.specification.MerlinSpec] + object loaded from the workspace or spec file. + step_tracker: A dictionary tracking started and unstarted steps. + tasks_per_step: A mapping of tasks per step for accurate totals. + workspace: The path to the workspace containing study data. + + Methods: + display: Displays a high-level summary of the status. + dump: Dumps the status information to a specified file. + format_csv_dump: Prepares the dictionary of statuses for CSV output. + format_json_dump: Prepares the dictionary of statuses for JSON output. + format_status_for_csv: Reformats statuses into a dictionary suitable for CSV output. + get_runtime_avg_std_dev: Calculates and stores the average and standard deviation of + runtimes for a step. + get_step_statuses: Reads and returns the statuses for a given step. + get_steps_to_display: Generates a list of steps to display the status for. + load_requested_statuses: Populates the `requested_statuses` dictionary with statuses + from the study. """ def __init__(self, args: Namespace, spec_display: bool, file_or_ws: str): # Save the args to this class instance and check if the steps filter was given - self.args = args + self.args: Namespace = args # Load in the workspace path and spec object + self.workspace: str + self.spec: MerlinSpec if spec_display: self.workspace, self.spec = self._load_from_spec(file_or_ws) else: @@ -91,26 +124,31 @@ def __init__(self, args: Namespace, spec_display: bool, file_or_ws: str): self._verify_filter_args() # Create a step tracker that will tell us which steps have started/not started - self.step_tracker = self.get_steps_to_display() + self.step_tracker: Dict[str, List[str]] = self.get_steps_to_display() # Create a tasks per step mapping in order to give accurate totals for each step - self.tasks_per_step = self.spec.get_tasks_per_step() + self.tasks_per_step: Dict[str, int] = self.spec.get_tasks_per_step() # This attribute will store a map between the overall step name and the full step names # that are created with parameters (e.g. step name is hello and uses a "GREET: hello" parameter # so the real step name is hello_GREET.hello) - self.full_step_name_map = {} + self.full_step_name_map: Dict[str, Set[str]] = {} # Variable to store run time information for each step - self.run_time_info = {} + self.run_time_info: Dict[str, Dict] = {} # Variable to store the statuses that the user wants - self.requested_statuses = {} + self.requested_statuses: Dict = {} self.load_requested_statuses() def _print_requested_statuses(self): """ - Helper method to print out the requested statuses dict. + Print the requested statuses stored in the `requested_statuses` dictionary. + + This helper method iterates through the `requested_statuses` attribute, which contains + information about the statuses of various steps. It prints the step names along with + their corresponding status information. Non-workspace keys are printed directly, while + workspace-related keys are further detailed by their status keys and values. """ print("self.requested_statuses:") for step_name, overall_step_info in self.requested_statuses.items(): @@ -125,16 +163,38 @@ def _print_requested_statuses(self): def _verify_filter_args(self): """ - This is an abstract method since we'll need to verify filter args for DetailedStatus - but not for Status. + Verify the filter arguments for the status retrieval. + + This is an abstract method intended to be implemented in subclasses, such as + [`DetailedStatus`][study.status.DetailedStatus]. The method will ensure that + the filter arguments provided for retrieving statuses are valid and meet the + necessary criteria. The implementation details will depend on the specific + requirements of the subclass. """ def _get_latest_study(self, studies: List[str]) -> str: """ - Given a list of studies, get the latest one. + Retrieve the latest study from a list of studies. + + This method examines a list of study identifiers and determines which one is the latest + based on the timestamp embedded in the study names. It assumes that the newest study is + represented by the last entry in the list but verifies this assumption by comparing the + timestamps of all studies. + + The method extracts the timestamp from the last 15 characters of each study identifier, + converts it to a datetime object, and compares it to find the most recent study. + + Args: + studies: A list of study identifiers to evaluate. + + Returns: + The identifier of the latest study. - :param `studies`: A list of studies to sort through - :returns: The latest study in the list provided + Example: + ```python + >>> self._get_latest_study(["study_20231101-174102", "study_20231101-182044", "study_20231101-163327"]) + 'study_20231101-182044' + ``` """ # We can assume the newest study is the last one to be added to the list of potential studies newest_study = studies[-1] @@ -155,12 +215,22 @@ def _obtain_study(self, study_output_dir: str, num_studies: int, potential_studi """ Grab the study that the user wants to view the status of based on a list of potential studies provided. - :param `study_output_dir`: A string representing the output path of a study; equivalent to $(OUTPUT_PATH) - :param `num_studies`: The number of potential studies we found - :param `potential_studies`: The list of potential studies we found; - Each entry is of the form (index, potential_study_name) - :returns: A directory path to the study that the user wants - to view the status of ("study_output_dir/selected_potential_study") + This method checks the number of potential studies found and either selects the latest study + automatically or prompts the user to choose from the available options. It constructs the + directory path to the selected study. + + Args: + study_output_dir: A string representing the output path of a study; equivalent to $(OUTPUT_PATH). + num_studies: The number of potential studies found. + potential_studies: A list of potential studies found, where each entry is of the form (index, + potential_study_name). + + Returns: + A directory path to the study that the user wants to view the status of, formatted as + "study_output_dir/selected_potential_study". + + Raises: + ValueError: If no potential studies are found or if the user input is invalid. """ study_to_check = f"{study_output_dir}/" if num_studies == 0: @@ -197,14 +267,25 @@ def _obtain_study(self, study_output_dir: str, num_studies: int, potential_studi return study_to_check - def _load_from_spec(self, filepath: str) -> Tuple[str, "MerlinSpec"]: # noqa: F821 pylint: disable=R0914 + def _load_from_spec(self, filepath: str) -> Tuple[str, MerlinSpec]: # pylint: disable=R0914 """ - Get the desired workspace from the user and load up it's yaml spec - for further processing. + Get the desired workspace from the user and load its YAML spec for further processing. + + This method verifies the output path based on user input or the spec file and builds a list + of potential study output directories. It then calls another method to obtain the study to + check the status for and loads the corresponding spec. + + Args: + filepath: The filepath to a spec provided by the user. + + Returns: + A tuple containing the workspace of the study to check the status for and a + [`MerlinSpec`][spec.specification.MerlinSpec] object loaded from the workspace's + merlin_info subdirectory. - :param `filepath`: The filepath to a spec given by the user - :returns: The workspace of the study we'll check the status for and a MerlinSpec - object loaded in from the workspace's merlin_info subdirectory. + Raises: + ValueError: If the specified output directory does not contain a merlin_info subdirectory, + or if multiple or no expanded spec options are found in the directory. """ # If the user provided a new output path to look in, use that if self.args.output_path is not None: @@ -263,11 +344,17 @@ def _load_from_spec(self, filepath: str) -> Tuple[str, "MerlinSpec"]: # noqa: F return study_to_check, actual_spec - def _load_from_workspace(self) -> "MerlinSpec": # noqa: F821 + def _load_from_workspace(self) -> MerlinSpec: """ - Create a MerlinSpec object based on the spec file in the workspace. + Create a [`MerlinSpec`][spec.specification.MerlinSpec] object based on the expanded spec file + in the workspace. - :returns: A MerlinSpec object loaded from the workspace provided by the user + Returns: + spec.specification.MerlinSpec: A [`MerlinSpec`][spec.specification.MerlinSpec] object loaded + from the workspace provided by the user. + + Raises: + ValueError: If multiple or no expanded spec options are found in the workspace's merlin_info directory. """ # Grab the spec file from the directory provided expanded_spec_options = glob(f"{self.workspace}/merlin_info/*.expanded.yaml") @@ -285,11 +372,19 @@ def _load_from_workspace(self) -> "MerlinSpec": # noqa: F821 def _create_step_tracker(self, steps_to_check: List[str]) -> Dict[str, List[str]]: """ - Creates a dictionary of started and unstarted steps that we - will display the status for. + Creates a dictionary of started and unstarted steps to display their status. + + This method checks the workspace for steps that have been started and compares them + against a provided list of steps to determine which steps are started and which are + unstarted. It returns a dictionary categorizing the steps accordingly. + + Args: + steps_to_check: A list of step names to check the status of. - :param `steps_to_check`: A list of steps to view the status of - :returns: A dictionary mapping of started and unstarted steps. Values are lists of step names. + Returns: + A dictionary with two keys:\n + - "started_steps": A list of steps that have been started. + - "unstarted_steps": A list of steps that have not been started. """ step_tracker = {"started_steps": [], "unstarted_steps": []} started_steps = next(os.walk(self.workspace))[1] @@ -310,10 +405,22 @@ def _create_step_tracker(self, steps_to_check: List[str]) -> Dict[str, List[str] def get_steps_to_display(self) -> Dict[str, List[str]]: """ - Generates a list of steps to display the status for based on information - provided to the merlin status command by the user. - - :returns: A dictionary of started and unstarted steps for us to display the status of + Generates a dictionary of steps to display their status based on user input + provided to the merlin status command. + + This method retrieves the names of existing steps from the study specification + and creates a step tracker to categorize them into started and unstarted steps. + + Returns: + A dictionary with two keys:\n + - "started_steps": A list of steps that have been started. + - "unstarted_steps": A list of steps that have not been started. + + Example: + ```python + >>> self.get_steps_to_display() + {"started_steps": ["step1"], "unstarted_steps": ["step2", "step3"]} + ``` """ existing_steps = self.spec.get_study_step_names() @@ -328,10 +435,13 @@ def get_steps_to_display(self) -> Dict[str, List[str]]: return step_tracker @property - def num_requested_statuses(self): + def num_requested_statuses(self) -> int: """ - Count the number of task statuses in a the requested_statuses dict. - We need to ignore non workspace keys when we count. + Counts the number of task statuses in the requested_statuses dictionary, + excluding non-workspace keys. + + Returns: + The count of requested task statuses that are not non-workspace keys. """ num_statuses = 0 for overall_step_info in self.requested_statuses.values(): @@ -341,12 +451,20 @@ def num_requested_statuses(self): def get_step_statuses(self, step_workspace: str, started_step_name: str) -> Dict[str, List[str]]: """ - Given a step workspace and the name of the step, read in all the statuses - for the step and return them in a dict. + Reads the statuses for a specified step from the given step workspace. + + This method traverses the specified step workspace directory to locate + `MERLIN_STATUS.json` files, reads their contents, and aggregates the statuses + into a dictionary. It also tracks the full names of the steps and counts + the number of statuses read. - :param step_workspace: The path to the step we're going to read statuses from - :param started_step_name: The name of the step that we're gathering statuses for - :returns: A dict of statuses for the given step + Args: + step_workspace: The path to the step directory from which to read statuses. + started_step_name: The name of the step for which statuses are being gathered. + + Returns: + A dictionary containing the statuses for the specified step, where each key is a full + step name and the value is a list of status information. """ step_statuses = {} num_statuses_read = 0 @@ -390,7 +508,13 @@ def get_step_statuses(self, step_workspace: str, started_step_name: str) -> Dict def load_requested_statuses(self): """ - Populate the requested_statuses dict with the statuses from the study. + Populates the `requested_statuses` dictionary with statuses from the study. + + This method iterates through the started steps in the step tracker, + retrieves their statuses using the + [`get_step_statuses`][study.status.Status.get_step_statuses] method, and merges + these statuses into the `requested_statuses` dictionary. It also calculates + the average and standard deviation of the run times for each step. """ LOG.info(f"Reading task statuses from {self.workspace}") @@ -406,14 +530,20 @@ def load_requested_statuses(self): # Count how many statuses in total that we just read in LOG.info(f"Read in {self.num_requested_statuses} statuses total.") - def get_runtime_avg_std_dev(self, step_statuses: Dict, step_name: str) -> Dict: + def get_runtime_avg_std_dev(self, step_statuses: Dict, step_name: str): """ - Calculate the mean and standard deviation for the runtime of each step. - Add this to the state information once calculated. - - :param `step_statuses`: A dict of step status information that we'll parse for run times - :param `step_name`: The name of the step - :returns: An updated dict of step status info with run time avg and std dev + Calculates the average and standard deviation of the runtime for a specified step. + + This method parses the provided step status information to extract runtime values, + computes the mean and standard deviation of these runtimes, and updates the state + information with the calculated values. The runtimes are expected to be in a specific + format (e.g., "1h30m15s") and are converted to seconds for the calculations. + + Args: + step_statuses: A dictionary containing step status information, where each + entry includes runtime data to be parsed. + step_name: The name of the step for which the average and standard deviation + of the runtime are being calculated. """ # Initialize a list to track all existing runtimes run_times_in_seconds = [] @@ -454,32 +584,53 @@ def get_runtime_avg_std_dev(self, step_statuses: Dict, step_name: str) -> Dict: self.run_time_info[step_name]["run_time_std_dev"] = f"±{pretty_format_hms(convert_timestring(run_time_std_dev))}" LOG.debug(f"Run time avg and std dev for step '{step_name}' calculated.") - def display(self, test_mode: Optional[bool] = False) -> Dict: + def display(self, test_mode: bool = False) -> Dict: """ - Displays the high level summary of the status. + Displays a high-level summary of the status. - :param `test_mode`: If true, run this in testing mode and don't print any output - :returns: A dict that will be empty if test_mode is False. Otherwise, the dict will - contain the status info that would be displayed. + This method provides an overview of the current status of the workflow. If + `test_mode` is enabled, it will not print any output but will return the + status information in a dictionary. + + Args: + test_mode: If true, run this in testing mode and don't print any output. + + Returns: + An empty dictionary if `test_mode` is False; otherwise, a dictionary containing + the status information that would be displayed. """ return display_status_summary(self, NON_WORKSPACE_KEYS, test_mode=test_mode) def format_json_dump(self, date: datetime) -> Dict: """ - Build the dict of statuses to dump to the json file. + Builds a dictionary of statuses to dump to a JSON file. + + This method prepares the status information for serialization by adding a timestamp + to the existing status data. - :param `date`: A timestamp for us to mark when this status occurred - :returns: A dictionary that's ready to dump to a json outfile + Args: + date: A timestamp marking when this status occurred. + + Returns: + A dictionary ready to be dumped to a JSON file, containing the timestamp + and the requested statuses. """ # Statuses are already in json format so we'll just add a timestamp for the dump here return {date: self.requested_statuses} def format_csv_dump(self, date: datetime) -> Dict: """ - Add the timestamp to the statuses to write. + Adds a timestamp to the statuses for CSV output. + + This method reformats the status information into a structure suitable for CSV + output, including a timestamp entry as the first column. + + Args: + date: A timestamp marking when this status occurred. - :param `date`: A timestamp for us to mark when this status occurred - :returns: A dict equivalent of formatted statuses with a timestamp entry at the start of the dict. + Returns: + A dictionary equivalent of formatted statuses with a timestamp entry + at the start of the dictionary. """ # Reformat the statuses to a new dict where the keys are the column labels and rows are the values LOG.debug("Formatting statuses for csv dump...") @@ -494,7 +645,11 @@ def format_csv_dump(self, date: datetime) -> Dict: def dump(self): """ - Dump the status information to a file. + Dumps the status information to a file. + + This method handles the creation of a timestamp and determines the appropriate + file format (CSV or JSON) for dumping the status information. It then calls + the appropriate formatting method and writes the data to the specified file. """ # Get a timestamp for this dump date = datetime.now().strftime("%Y-%m-%d %H:%M:%S") @@ -512,10 +667,16 @@ def dump(self): def format_status_for_csv(self) -> Dict: """ - Reformat our statuses to csv format so they can use Maestro's status renderer layouts. + Reformats statuses for CSV output to comply with + [Maestro's status renderer layouts](https://maestrowf.readthedocs.io/en/latest/Maestro/reference_guide/api_reference/index.html). + + This method transforms the status information into a dictionary format where each + key represents a column label and the corresponding values are the rows of information + to display for that column. - :returns: A formatted dictionary where each key is a column and the values are the rows - of information to display for that column. + Returns: + A formatted dictionary where each key is a column and the values are the + rows of information to display for that column. """ reformatted_statuses = { "step_name": [], @@ -591,7 +752,27 @@ def format_status_for_csv(self) -> Dict: class DetailedStatus(Status): """ This class handles obtaining and filtering requested statuses from the user. - This class shares similar methodology to the Status class it inherits from. + It inherits from the [`Status`][study.status.Status] class and provides + additional functionality for filtering and displaying task statuses based on + user-defined criteria. + + Attributes: + args (Namespace): A namespace containing user-defined arguments for filtering. + num_requested_statuses (int): The number of task statuses in the `requested_statuses` dictionary. + requested_statuses (Dict): A dictionary holding the statuses requested by the user. + spec (spec.specification.MerlinSpec): A [`MerlinSpec`][spec.specification.MerlinSpec] + object loaded from the workspace or spec file. + steps_filter_provided: Indicates if a specific steps filter was provided. + + Methods: + apply_filters: Applies user-defined filters to the requested statuses. + apply_max_tasks_limit: Limits the number of tasks displayed based on the user-defined maximum. + display: Displays a task-by-task view of the status based on user filters. + filter_via_prompts: Interacts with the user to manage task display filters. + get_steps_to_display: Generates a list of steps to display the status for. + get_user_filters: Prompts the user for filters to apply to the statuses. + get_user_max_tasks: Prompts the user for a maximum task limit to display. + load_requested_statuses: Populates the requested statuses dictionary based on user-defined filters. """ def __init__(self, args: Namespace, spec_display: bool, file_or_ws: str): @@ -603,23 +784,30 @@ def __init__(self, args: Namespace, spec_display: bool, file_or_ws: str): os.environ["MANPAGER"] = "less -r" # Check if the steps filter was given - self.steps_filter_provided = "all" not in args_copy.steps + self.steps_filter_provided: bool = "all" not in args_copy.steps def _verify_filters( self, filters_to_check: List[str], valid_options: Union[List, Tuple], suppress_warnings: bool, - warning_msg: Optional[str] = "", + warning_msg: str = "", ): """ - Check each filter in a list of filters provided by the user against a list of valid options. - If the filter is invalid, remove it from the list of filters. - - :param `filters_to_check`: A list of filters provided by the user - :param `valid_options`: A list of valid options for this particular filter - :param `suppress_warnings`: If True, don't log warnings. Otherwise, log them - :param `warning_msg`: An optional warning message to attach to output + Verify and validate a list of user-provided filters against a set of valid options. + + This method checks each filter in the `filters_to_check` list to determine if it is present + in the `valid_options`. If a filter is found to be invalid (i.e., not in `valid_options`), + it is removed from the `filters_to_check` list. Depending on the value of `suppress_warnings`, + a warning message may be logged for each invalid filter. + + Args: + filters_to_check: A list of filters provided by the user that need to be validated. + valid_options: A list or tuple of valid options against which the filters will be checked. + suppress_warnings: A boolean flag indicating whether to suppress warning messages. + If True, no warnings will be logged for invalid filters. + warning_msg: An optional string that provides additional context for the warning message + logged when an invalid filter is detected. Default is an empty string. """ for filter_arg in filters_to_check[:]: if filter_arg not in valid_options: @@ -627,11 +815,17 @@ def _verify_filters( LOG.warning(f"The filter '{filter_arg}' is invalid. {warning_msg}") filters_to_check.remove(filter_arg) - def _verify_filter_args(self, suppress_warnings: Optional[bool] = False): + def _verify_filter_args(self, suppress_warnings: bool = False): """ - Verify that our filters are all valid and able to be used. + Verify the validity of filter arguments used in the current context. - :param `suppress_warnings`: If True, don't log warnings. Otherwise, log them. + This method checks various filter arguments, including steps, max_tasks, task_status, + return_code, task_queues, and workers, to ensure they are valid and can be used. + Invalid filters are removed from their respective lists, and warnings may be logged + based on the `suppress_warnings` flag. + + Args: + suppress_warnings: If True, suppress logging of warnings for invalid filters. """ # Ensure the steps are valid if "all" not in self.args.steps: @@ -717,8 +911,11 @@ def _verify_filter_args(self, suppress_warnings: Optional[bool] = False): def _process_task_queue(self): """ - Modifies the list of steps to display status for based on - the list of task queues provided by the user. + Modify the list of steps to display status for based on the provided task queues. + + This method processes the task queues specified by the user, removing any duplicates + and checking for their validity. It updates the list of steps to include those associated + with the valid task queues. If a provided task queue does not exist, a warning is logged. """ from merlin.config.configfile import CONFIG # pylint: disable=C0415 @@ -744,11 +941,16 @@ def _process_task_queue(self): def get_steps_to_display(self) -> Dict[str, List[str]]: """ - Generates a list of steps to display the status for based on information - provided to the merlin detailed-status command by the user. This function - will handle the --steps and --task-queues filter options. + Generate a dictionary of steps to display the status for based on user-provided filters. + + This method processes the `--steps` and `--task-queues` options from the `merlin + detailed-status` command. It determines which steps should be included in the status + display based on the existing steps in the study and the specified filters. - :returns: A dictionary of started and unstarted steps for us to display the status of + Returns: + A dictionary containing two lists:\n + - "started": A list of steps that have been started. + - "unstarted": A list of steps that have not yet been started. """ existing_steps = self.spec.get_study_step_names() @@ -781,9 +983,16 @@ def get_steps_to_display(self) -> Dict[str, List[str]]: def _remove_steps_without_statuses(self): """ - After applying filters, there's a chance that certain steps will still exist - in self.requested_statuses but won't have any tasks to view the status of so - we'll remove those here. + Remove steps from the requested statuses that do not have any associated tasks. + + This method iterates through the `requested_statuses` dictionary and checks each step + for associated sub-steps. If a step does not have any valid sub-step workspaces (i.e., + it has no tasks to view the status of), it is removed from the `requested_statuses`. + + Note: + After applying filters, there's a chance that certain steps will still exist + in self.requested_statuses but won't have any tasks to view the status of. That's + why this method is necessary. """ result = deepcopy(self.requested_statuses) @@ -797,11 +1006,16 @@ def _remove_steps_without_statuses(self): def _search_for_filter(self, filter_to_apply: List[str], entry_to_search: Union[List[str], str]) -> bool: """ - Search an entry to see if our filter(s) apply to this entry. If they do, return True. Otherwise, False. + Search an entry to see if the specified filters apply to it. + + This method checks if any of the provided filters match the given entry or entries. - :param filter_to_apply: A list of filters to search for - :param entry_to_search: A list or string of entries to search for our filters in - :returns: True if a filter was found in the entry. False otherwise. + Args: + filter_to_apply: A list of filters to search for. + entry_to_search: A list or string of entries to search for the filters in. + + Returns: + True if a filter was found in the entry; False otherwise. """ if not isinstance(entry_to_search, list): entry_to_search = [entry_to_search] @@ -814,10 +1028,12 @@ def _search_for_filter(self, filter_to_apply: List[str], entry_to_search: Union[ def apply_filters(self): """ - Apply any filters given by the --workers, --return-code, and/or --task-status arguments. - This function will also apply the --max-tasks limit if it was set by a user. We apply this - limit here so it can be done in-place; if we called apply_max_tasks_limit instead, this - would become a two-pass algorithm and can be really slow with lots of statuses. + Apply filters based on the provided command-line arguments for workers, return code, + and task status, as well as enforce a maximum task limit if specified. + + This method processes the `requested_statuses` to filter out entries that do not match + the specified criteria. It ensures that the filtering is done in-place to optimize performance + and avoid a two-pass algorithm, which can be inefficient with a large number of statuses. """ if self.args.max_tasks is not None: # Make sure the max_tasks variable is set to a reasonable number and store that value @@ -897,8 +1113,13 @@ def apply_filters(self): def apply_max_tasks_limit(self): """ - Given a number representing the maximum amount of tasks to display, filter the dict of statuses - so that there are at most a max_tasks amount of tasks. + Filter the dictionary of statuses to ensure that the number of displayed tasks does not exceed + the specified maximum limit. + + This method checks the current value of `max_tasks` and adjusts it if it exceeds the number + of available statuses. It then iterates through the `requested_statuses`, removing excess + entries to comply with the `max_tasks` limit. The method also merges the allowed task statuses + into a new dictionary and updates the `requested_statuses` accordingly. """ # Make sure the max_tasks variable is set to a reasonable number and store that value if self.args.max_tasks > self.num_requested_statuses: @@ -959,15 +1180,24 @@ def load_requested_statuses(self): def get_user_filters(self) -> bool: """ - Get a filter on the statuses to display from the user. Possible options - for filtering: - - A str MAX_TASKS -> will ask the user for another input that's equivalent to the --max-tasks flag - - A list of statuses -> equivalent to the --task-status flag - - A list of return codes -> equivalent to the --return-code flag - - A list of workers -> equivalent to the --workers flag - - An exit keyword to leave the filter prompt without filtering - - :returns: True if we need to exit without filtering. False otherwise. + Prompt the user to specify filters for the statuses to display. The user can choose from + several filtering options, including setting a maximum number of tasks, filtering by status, + return code, or worker, or exiting the filter prompt without applying any filters. + + The method displays available filter options and their descriptions, then collects and + validates the user's input. If the user provides valid filters, they are stored in the + corresponding attributes. If the user opts to exit, the method returns True; otherwise, + it returns False. + + Possible filtering options include:\n + - A string "MAX_TASKS" to request a limit on the number of tasks. + - A list of statuses to filter by, corresponding to the `--task-status` flag. + - A list of return codes to filter by, corresponding to the `--return-code` flag. + - A list of workers to filter by, corresponding to the `--workers` flag. + - An exit keyword to leave the filter prompt without applying any filters. + + Returns: + True if the user chooses to exit without filtering; False otherwise. """ valid_workers = tuple(self.spec.get_worker_names()) @@ -1050,7 +1280,17 @@ def get_user_filters(self) -> bool: def get_user_max_tasks(self): """ - Get a limit for the amount of tasks to display from the user. + Prompt the user to specify a maximum limit for the number of tasks to display. + + The method repeatedly requests input from the user until a valid integer greater than 0 + is provided. Once a valid input is received, it sets the `max_tasks` attribute in the + `args` object to the specified limit. + + This method ensures that the user input is validated and handles any exceptions + related to invalid input types or values. + + Raises: + ValueError: If the input is not a valid integer greater than 0. """ invalid_input = True @@ -1069,8 +1309,16 @@ def get_user_max_tasks(self): def filter_via_prompts(self): """ - Interact with the user to manage how many/which tasks are displayed. This helps to - prevent us from overloading the terminal by displaying a bazillion tasks at once. + Interact with the user to determine how many and which tasks should be displayed, + preventing terminal overload by limiting the output to a manageable number of tasks. + + This method prompts the user for filtering options, including task statuses, return codes, + and worker specifications. It also handles the case where the user opts to exit without + applying any filters. If filters are provided, it applies them accordingly. + + Warning: + The method includes specific handling for the "RESTART" and "RETRY" return codes, + which are currently not implemented, and issues warnings if these filters are selected. """ # Get the filters from the user exit_without_filtering = self.get_user_filters() @@ -1095,11 +1343,18 @@ def filter_via_prompts(self): elif self.args.max_tasks is not None: self.apply_max_task_limit() - def display(self, test_mode: Optional[bool] = False): + def display(self, test_mode: bool = False): """ - Displays a task-by-task view of the status based on user filter(s). + Displays a task-by-task view of the statuses based on the user-defined filters. + + This method checks for any requested statuses and, if found, invokes the + `display_status_task_by_task` function to present the tasks accordingly. + If no statuses are available to display, it logs a warning message. - :param `test_mode`: If true, run this in testing mode and don't print any output + Args: + test_mode: If set to True, the method runs in testing mode, suppressing + any output to the terminal. This is useful for unit testing or debugging + without cluttering the output. """ # Check that there's statuses found and display them if self.requested_statuses: @@ -1111,26 +1366,31 @@ def display(self, test_mode: Optional[bool] = False): # Pylint complains that args is unused but we can ignore that def status_conflict_handler(*args, **kwargs) -> Any: # pylint: disable=W0613 """ - The conflict handler function to apply to any status entries that have conflicting - values while merging two status files together. - - kwargs should include: - - dict_a_val: The conflicting value from the dictionary that we're merging into - - dict_b_val: The conflicting value from the dictionary that we're pulling from - - key: The key into each dictionary that has a conflict - - path: The path down the dictionary tree that `dict_deep_merge` is currently at - - When we're reading in status files, we're merging all of the statuses into one dictionary. - This function defines the merge rules in case there is a merge conflict. We ignore the list - and dictionary entries since `dict_deep_merge` from `utils.py` handles these scenarios already. - - There are currently 4 rules: - - string-concatenate: take the two conflicting values and concatenate them in a string - - use-dict_b-and-log-debug: use the value from dict_b and log a debug message - - use-longest-time: use the longest time between the two conflicting values - - use-max: use the larger integer between the two conflicting values - - :returns: The value to merge into dict_a at `key` + Handles conflicts that arise when merging two status files by applying specific merge rules + to conflicting values. + + This function is designed to be used during the merging process of status entries, where + conflicting values may exist. It defines how to resolve these conflicts based on predefined + rules, ensuring that the merged dictionary maintains integrity and clarity. + + The merge rules currently implemented are:\n + - **string-concatenate**: Concatenates the two conflicting string values. + - **use-dict_b-and-log-debug**: Uses the value from dict_b and logs a debug message indicating + the conflict. + - **use-longest-time**: Chooses the longest time value between the two conflicting entries, + converting them to a timedelta for comparison. + - **use-max**: Selects the maximum integer value from the two conflicting entries. + + If a key does not have a defined merge rule, a warning is logged, and the function returns None. + + The function expects the following keyword arguments:\n + - `dict_a_val`: The conflicting value from the dictionary that we are merging into (dict_a). + - `dict_b_val`: The conflicting value from the dictionary that we are merging from (dict_b). + - `key`: The key in each dictionary that has a conflict. + - `path`: The current path in the dictionary tree during the merge process. + + Returns: + The resolved value to merge into dict_a at the specified key. """ # Grab the arguments passed into this function dict_a_val = kwargs.get("dict_a_val", None) @@ -1203,12 +1463,25 @@ def read_status( """ Locks the status file for reading and returns its contents. - :param status_filepath: The path to the status file that we'll read from. - :param lock_file: The path to the lock file that we'll use to create a FileLock. - :param display_fnf_message: If True, display the file not found warning. Otherwise don't. - :param raise_errors: A boolean indicating whether to ignore errors or raise them. - :param timeout: An integer representing how long to hold a lock for before timing out. - :returns: A dict of the contents in the status file + This function attempts to read the contents of a status file while ensuring that the file is + locked to prevent race conditions. It handles various exceptions that may occur during the + reading process, including file not found errors and JSON decoding errors. + + Args: + status_filepath: The path to the status file that will be read. + lock_file: The path to the lock file used to create a FileLock. + display_fnf_message: If True, displays a warning message if the file is not found. + raise_errors: If True, raises exceptions when errors occur. + timeout: The maximum time (in seconds) to hold the lock before timing out. + + Returns: + A dictionary containing the contents of the status file. + + Raises: + Timeout: If the lock acquisition times out. + FileNotFoundError: If the status file does not exist and `raise_errors` is True. + json.decoder.JSONDecodeError: If the status file is empty or contains invalid JSON and `raise_errors` is True. + Exception: Any other exceptions that occur during the reading process if `raise_errors` is True. """ statuses_read = {} @@ -1249,13 +1522,20 @@ def read_status( def write_status(status_to_write: Dict, status_filepath: str, lock_file: str, timeout: int = 10): """ - Locks the status file for writing. We're not catching any errors here since we likely want to - know if something went wrong in this process. + Locks the status file for writing and writes the provided status to the file. + + This function ensures that the status file is locked during the write operation to prevent + race conditions. It does not catch errors during the writing process, as it is important to + be aware of any issues that may arise. + + Args: + status_to_write: The status data to write to the status file. + status_filepath: The path to the status file where the status will be written. + lock_file: The path to the lock file used to create a FileLock for the write operation. + timeout: The maximum time (in seconds) to hold the lock before timing out. - :param status_to_write: The status to write to the status file - :param status_filepath: The path to the status file that we'll write the status to - :param lock_file: The path to the lock file we'll use for this status write - :param timeout: A timeout value for the lock so it's always released eventually + Raises: + Exception: Any exceptions that occur during the writing process will be logged, but not caught. """ # Pylint complains that we're instantiating an abstract class but this is correct usage try: