diff --git a/docs/gen_ref_pages.py b/docs/gen_ref_pages.py index e46c0eb9..0b087b7d 100644 --- a/docs/gen_ref_pages.py +++ b/docs/gen_ref_pages.py @@ -6,10 +6,24 @@ nav = mkdocs_gen_files.Nav() -# print(sorted(Path("merlin").rglob("*.py"))) +IGNORE_PATTERNS = [ + Path("merlin/examples/workflows"), + Path("merlin/examples/dev_workflows"), + "*/ascii_art.py", +] + +def should_ignore(path): + """Check if the given path matches any ignore patterns.""" + for pattern in IGNORE_PATTERNS: + # if Path(pattern).is_relative_to(path): + if path.is_relative_to(Path(pattern)): + return True + if path.match(pattern): + return True + return False for path in sorted(Path("merlin").rglob("*.py")): - if "merlin/examples" in str(path): + if should_ignore(path): continue module_path = path.relative_to("merlin").with_suffix("") doc_path = path.relative_to("merlin").with_suffix(".md") diff --git a/merlin/celery.py b/merlin/celery.py index eb10f1a1..af7c3a22 100644 --- a/merlin/celery.py +++ b/merlin/celery.py @@ -33,7 +33,7 @@ import logging import os -from typing import Dict, Optional, Union +from typing import Any, Dict, Optional, Union import billiard import celery @@ -58,7 +58,8 @@ def patch_celery(): Celery has error callbacks but they do not work properly on chords that are nested within chains. - Credit to this function goes to: https://danidee10.github.io/2019/07/09/celery-chords.html + Credit to this function goes to + [the following post](https://danidee10.github.io/2019/07/09/celery-chords.html). """ def _unpack_chord_result( @@ -84,9 +85,33 @@ def _unpack_chord_result( # This function has to have specific args/return values for celery so ignore pylint -def route_for_task(name, args, kwargs, options, task=None, **kw): # pylint: disable=W0613,R1710 +def route_for_task( + name: str, + args: List[Any], + kwargs: Dict[Any, Any], + options: Dict[Any, Any], + task: celery.Task = None, + **kw: Dict[Any, Any], +) -> Dict[Any, Any]: # pylint: disable=W0613,R1710 """ - Custom task router for queues + Custom task router for Celery queues. + + This function routes tasks to specific queues based on the task name. + If the task name contains a colon, it splits the name to determine the queue. + + Args: + name: The name of the task being routed. + args: The positional arguments passed to the task. + kwargs: The keyword arguments passed to the task. + options: Additional options for the task. + task: The task instance (default is None). + **kw: Additional keyword arguments. + + Returns: + A dictionary specifying the queue to route the task to. + If the task name contains a colon, it returns a dictionary with + the key "queue" set to the queue name. Otherwise, it returns + an empty dictionary. """ if ":" in name: queue, _ = name.split(":") @@ -169,11 +194,12 @@ def route_for_task(name, args, kwargs, options, task=None, **kw): # pylint: dis # Pylint believes the args are unused, I believe they're used after decoration @worker_process_init.connect() -def setup(**kwargs): # pylint: disable=W0613 +def setup(**kwargs: Dict[Any, Any]): # pylint: disable=W0613 """ - Set affinity for the worker on startup (works on toss3 nodes) + Set affinity for the worker on startup (works on toss3 nodes). - :param `**kwargs`: keyword arguments + Args: + **kwargs: Keyword arguments. """ if "CELERY_AFFINITY" in os.environ and int(os.environ["CELERY_AFFINITY"]) > 1: # Number of cpus between workers. diff --git a/merlin/display.py b/merlin/display.py index a1af0ac2..6e87eb95 100644 --- a/merlin/display.py +++ b/merlin/display.py @@ -37,11 +37,13 @@ import shutil import time import traceback +from argparse import Namespace from datetime import datetime from multiprocessing import Pipe, Process -from typing import Dict +from multiprocessing.connection import Connection +from typing import Any, Dict, List, Union -from kombu import Connection +from kombu import Connection as KombuConnection from tabulate import tabulate from merlin.ascii_art import banner_small @@ -72,16 +74,36 @@ class ConnProcess(Process): """ - An extension of Multiprocessing's Process class in order - to overwrite the run and exception defintions. + An extension of the multiprocessing's Process class that allows for + custom handling of exceptions and inter-process communication. + + This class overrides the `run` method to capture exceptions that occur + during the execution of the process and sends them back to the parent + process via a pipe. It also provides a property to retrieve any + exceptions that were raised during execution. + + Attributes: + _pconn: The parent connection for inter-process communication. + _cconn: The child connection for inter-process communication. + exception: Stores the exception raised during the process run. """ def __init__(self, *args, **kwargs): Process.__init__(self, *args, **kwargs) + self._pconn: Connection + self._cconn: Connection self._pconn, self._cconn = Pipe() self._exception = None def run(self): + """ + Executes the process's main logic. + + This method overrides the default run method of the Process class. + It attempts to run the process and captures any exceptions that occur. + If an exception is raised, it sends the exception and its traceback + back to the parent process via the child connection. + """ try: Process.run(self) self._cconn.send(None) @@ -91,17 +113,35 @@ def run(self): # raise e # You can still rise this exception if you need to @property - def exception(self): - """Create custom exception""" + def exception(self) -> Union[Exception, None]: + """ + Retrieves the exception raised during the process execution. + + This property checks if there is an exception available from the + parent connection. If an exception was raised, it is received and + stored for later access. + + Returns: + The exception raised during the process run, or None if no exception occurred. + """ if self._pconn.poll(): self._exception = self._pconn.recv() return self._exception -def check_server_access(sconf): +def check_server_access(sconf: Dict[str, Any]): """ Check if there are any issues connecting to the servers. If there are, output the errors. + + This function iterates through a predefined list of servers and checks + their connectivity based on the provided server configuration. If any + connection issues are detected, the exceptions are collected and printed. + + Args: + sconf: A dictionary containing server configurations, where keys + represent server names and values contain connection details. + The function expects keys corresponding to the servers being checked. """ servers = ["broker server", "results server"] @@ -120,7 +160,24 @@ def check_server_access(sconf): print(f"{key}: {val}") -def _examine_connection(server, sconf, excpts): +def _examine_connection(server: str, sconf: Dict[str, Any], excpts: Dict[str, Exception]): + """ + Examine the connection to a specified server and handle any exceptions. + + This function attempts to establish a connection to the given server using + the configuration provided in `sconf`. It utilizes a separate process to + manage the connection attempt and checks for timeouts. If the connection + fails or times out, the error is recorded in the `excpts` dictionary. + + Args: + server: A string representing the name of the server to connect to. + This should correspond to a key in the `sconf` dictionary. + sconf: A dictionary containing server configurations, where keys + represent server names and values contain connection details. + excpts: A dictionary to store exceptions encountered during the + connection attempt, with server names as keys and exceptions + as values. + """ from merlin.config import broker, results_backend # pylint: disable=C0415 connect_timeout = 60 @@ -130,7 +187,7 @@ def _examine_connection(server, sconf, excpts): ssl_conf = broker.get_ssl_config() if "results" in server: ssl_conf = results_backend.get_ssl_config() - conn = Connection(sconf[server], ssl=ssl_conf) + conn = KombuConnection(sconf[server], ssl=ssl_conf) conn_check = ConnProcess(target=conn.connect) conn_check.start() counter = 0 @@ -153,7 +210,11 @@ def _examine_connection(server, sconf, excpts): def display_config_info(): """ - Prints useful configuration information to the console. + Prints useful configuration information for the Merlin application to the console. + + This function retrieves and displays the connection strings and SSL configurations + for the broker and results servers. It handles any exceptions that may occur during + the retrieval process, providing error messages for any issues encountered. """ from merlin.config import broker, results_backend # pylint: disable=C0415 from merlin.config.configfile import default_config_info # pylint: disable=C0415 @@ -191,12 +252,13 @@ def display_config_info(): check_server_access(sconf) -def display_multiple_configs(files, configs): +def display_multiple_configs(files: List[str], configs: List[Dict]): """ Logic for displaying multiple Merlin config files. - :param `files`: List of merlin config files - :param `configs`: List of merlin configurations + Args: + files: List of merlin config files + configs: List of merlin configurations """ print("=" * 50) print(" MERLIN CONFIG ") @@ -211,13 +273,19 @@ def display_multiple_configs(files, configs): # Might use args here in the future so we'll disable the pylint warning for now -def print_info(args): # pylint: disable=W0613 +def print_info(args: Namespace): # pylint: disable=W0613 """ Provide version and location information about python and packages to facilitate user troubleshooting. Also provides info about server connections and configurations. - :param `args`: parsed CLI arguments + Note: + The `args` parameter is currently unused but is included for + compatibility with the command-line interface (CLI) in case we decide to use + args here in the future. + + Args: + args: parsed CLI arguments (currently unused). """ print(banner_small) display_config_info() @@ -235,16 +303,29 @@ def print_info(args): # pylint: disable=W0613 def display_status_task_by_task(status_obj: "DetailedStatus", test_mode: bool = False): # noqa: F821 """ - Displays a low level overview of the status of a study. This is a task-by-task - status display where each task will show: - step name, worker name, task queue, cmd & restart parameters, - step workspace, step status, return code, elapsed time, run time, and num restarts. - If too many tasks are found and the pager is disabled, prompts will appear for the user to decide - what to do that way we don't overload the terminal (unless the no-prompts flag is provided). - - :param `status_obj`: A DetailedStatus object - :param `test_mode`: If true, run this in testing mode and don't print any output. This will also - decrease the limit on the number of tasks allowed before a prompt is displayed. + Displays a low-level overview of the status of a study in a task-by-task format. + + Each task will display the following details: + - Step name + - Worker name + - Task queue + - Command and restart parameters + - Step workspace + - Step status + - Return code + - Elapsed time + - Run time + - Number of restarts + + If the number of tasks exceeds a certain limit and the pager is disabled, the user + will be prompted to apply additional filters to avoid overwhelming the terminal output, + unless the prompts are disabled through the no-prompts flag. + + Args: + status_obj: An instance of DetailedStatus containing information + about the current state of tasks. + test_mode: If True, runs the function in testing mode, suppressing output + and reducing the task limit for prompts. Defaults to False. """ args = status_obj.args try: @@ -299,10 +380,19 @@ def display_status_task_by_task(status_obj: "DetailedStatus", test_mode: bool = def _display_summary(state_info: Dict[str, str], cb_help: bool): """ - Given a dict of state info for a step, print a summary of the task states. - - :param `state_info`: A dictionary of information related to task states for a step - :param `cb_help`: True if colorblind assistance (using symbols) is needed. False otherwise. + Prints a summary of task states based on the provided state information. + + This function takes a dictionary of state information for a step and + prints a formatted summary, including optional colorblind assistance using + symbols if specified. + + Args: + state_info: A dictionary containing information related + to task states for a step. Each entry should + correspond to a specific task state with its + associated properties (e.g., count, total, name). + cb_help: If True, provides colorblind assistance by using symbols + in the display. Defaults to False for standard output. """ # Build a summary list of task info print("\nSUMMARY:") @@ -338,18 +428,27 @@ def _display_summary(state_info: Dict[str, str], cb_help: bool): def display_status_summary( # pylint: disable=R0912 - status_obj: "Status", non_workspace_keys: set, test_mode=False # noqa: F821 + status_obj: "Status", non_workspace_keys: set, test_mode: bool = False # noqa: F821 ) -> Dict: """ - Displays a high level overview of the status of a study. This includes - progress bars for each step and a summary of the number of initialized, - running, finished, cancelled, dry ran, failed, and unknown tasks. - - :param `status_obj`: A Status object - :param `non_workspace_keys`: A set of keys in requested_statuses that are not workspace keys. - This will be set("parameters", "task_queue", "workers") - :param `test_mode`: If True, don't print anything and just return a dict of all the state info for each step - :returns: A dict that's empty usually. If ran in test_mode it will be a dict of state_info for every step. + Displays a high-level overview of the status of a study, including progress bars for each step + and a summary of the number of initialized, running, finished, cancelled, dry ran, failed, and + unknown tasks. + + The function prints a summary for each step and collects state information. In test mode, + it suppresses output and returns a dictionary of state information instead. + + Args: + status_obj: An instance of Status containing information about task states + and associated data for the study. + non_workspace_keys: A set of keys in requested_statuses that are not workspace keys. + Typically includes keys like "parameters", "task_queue", and "workers". + test_mode: If True, runs in test mode; suppresses printing and returns a dictionary + of state information for each step. Defaults to False. + + Returns: + An empty dictionary in regular mode. In test mode, returns a dictionary containing + the state information for each step. """ all_state_info = {} if not test_mode: @@ -429,32 +528,41 @@ def display_status_summary( # pylint: disable=R0912 # Credit to this stack overflow post: https://stackoverflow.com/a/34325723 def display_progress_bar( # pylint: disable=R0913,R0914 - current, - total, - state_info=None, - prefix="", - suffix="", - decimals=1, - length=80, - fill="█", - print_end="\n", - color=None, - cb_help=False, + current: int, + total: int, + state_info: Dict[str, Any] = None, + prefix: str = "", + suffix: str = "", + decimals: int = 1, + length: int = 80, + fill: str = "█", + print_end: str = "\n", + color: str = None, + cb_help: bool = False, ): """ - Prints a progress bar based on current and total. - - :param `current`: current number (Int) - :param `total`: total number (Int) - :param `state_info`: information about the state of tasks (Dict) (overrides color) - :param `prefix`: prefix string (Str) - :param `suffix`: suffix string (Str) - :param `decimals`: positive number of decimals in percent complete (Int) - :param `length`: character length of bar (Int) - :param `fill`: bar fill character (Str) - :param `print_end`: end character (e.g. "\r", "\r\n") (Str) - :param `color`: color of the progress bar (ANSI Str) (overridden by state_info) - :param `cb_help`: true if color blind help is needed; false otherwise (Bool) + Prints a customizable progress bar that visually represents the completion percentage + relative to a given total. + + The function can display additional state information for detailed tracking, including + support for color customization and adaptation for color-blind users. It updates the + display based on current progress and optionally accepts state information to adjust the + appearance of the progress bar. + + Args: + current: Current progress value. + total: Total value representing 100% completion. + state_info: Dictionary containing state information about tasks. + This can override color settings and modifies + how the progress bar is displayed. + prefix: Optional prefix string to display before the progress bar. + suffix: Optional suffix string to display after the progress bar. + decimals: Number of decimal places to display in the percentage (default is 1). + length: Character length of the progress bar (default is 80). + fill: Character used to fill the progress bar (default is "█"). + print_end: Character(s) to print at the end of the line (e.g., '\\r', '\\n'). + color: ANSI color string for the progress bar. Overrides state_info colors. + cb_help: If True, provides color-blind assistance by adapting the fill characters. """ # Set the color of the bar if color and color in ANSI_COLORS: diff --git a/merlin/log_formatter.py b/merlin/log_formatter.py index 6cd6a745..5914bcdf 100644 --- a/merlin/log_formatter.py +++ b/merlin/log_formatter.py @@ -43,12 +43,14 @@ } -def setup_logging(logger, log_level="INFO", colors=True): +def setup_logging(logger: logging.Logger, log_level: str = "INFO", colors: bool = True): """ Setup and configure Python logging. - :param `logger`: a logging.Logger object - :param `log_level`: logger level + Args: + logger: A logging.Logger object. + log_level: Logger level. + colors: If True use colored logs. """ formatter = logging.Formatter() handler = logging.StreamHandler(sys.stdout) diff --git a/merlin/main.py b/merlin/main.py index 4bb00598..3f755682 100644 --- a/merlin/main.py +++ b/merlin/main.py @@ -46,7 +46,7 @@ RawTextHelpFormatter, ) from contextlib import suppress -from typing import Dict, List, Optional, Union +from typing import Dict, List, Optional, Tuple, Union from tabulate import tabulate @@ -72,7 +72,13 @@ class HelpParser(ArgumentParser): """This class overrides the error message of the argument parser to print the help message when an error happens.""" - def error(self, message): + def error(self, message: str): + """ + Override the error message of the ArgumentParser class. + + Args: + message: The error message to log. + """ sys.stderr.write(f"error: {message}\n") self.print_help() sys.exit(2) @@ -82,13 +88,27 @@ def parse_override_vars( variables_list: Optional[List[str]], ) -> Optional[Dict[str, Union[str, int]]]: """ - Parse a list of variables from command line syntax - into a valid dictionary of variable keys and values. - - :param [List[str]] `variables_list`: an optional list of strings, e.g. ["KEY=val",...] - - :return: returns either None or a Dict keyed with strs, linked to strs and ints. - :rtype: Dict + Parse a list of command-line variables into a dictionary of key-value pairs. + + This function takes an optional list of strings following the syntax + "KEY=val" and converts them into a dictionary. It validates the format + of the variables and ensures that keys are valid according to specified rules. + + Args: + variables_list: An optional list of strings, where each string should be in the + format "KEY=val", e.g., ["KEY1=value1", "KEY2=42"]. + + Returns: + A dictionary where the keys are variable names (str) and the + values are either strings or integers. If `variables_list` is + None or empty, returns None. + + Raises: + ValueError: If the input format is incorrect, including:\n + - Missing '=' operator. + - Excess '=' operators in a variable assignment. + - Invalid variable names (must be alphanumeric and underscores). + - Attempting to override reserved variable names. """ if variables_list is None: return None @@ -121,11 +141,23 @@ def parse_override_vars( return result -def get_merlin_spec_with_override(args): +def get_merlin_spec_with_override(args: Namespace) -> Tuple[MerlinSpec, str]: """ - Shared command to return the spec object. + Shared command to retrieve a `MerlinSpec` object and an expanded filepath. + + This function processes parsed command-line interface (CLI) arguments to validate + and expand the specified filepath and any associated variables. It then constructs + and returns a `MerlinSpec` object based on the provided specification. - :param 'args': parsed CLI arguments + Args: + args: Parsed CLI arguments containing:\n + - `specification`: the path to the specification file + - `variables`: optional variable overrides to customize the spec. + + Returns: + spec: An instance of the `MerlinSpec` class with the + expanded configuration based on the provided filepath and variables. + filepath: The expanded filepath derived from the specification. """ filepath = verify_filepath(args.specification) variables_dict = parse_override_vars(args.variables) @@ -133,11 +165,27 @@ def get_merlin_spec_with_override(args): return spec, filepath -def process_run(args: Namespace) -> None: +def process_run(args: Namespace): """ CLI command for running a study. - :param [Namespace] `args`: parsed CLI arguments + This function initializes and runs a study using the specified parameters. + It handles file verification, variable parsing, and checks for required + arguments related to the study configuration and execution. + + Args: + args: Parsed CLI arguments containing:\n + - `specification`: Path to the specification file for the study. + - `variables`: Optional variable overrides for the study. + - `samples_file`: Optional path to a samples file. + - `dry`: If True, runs the study in dry-run mode (without actual execution). + - `no_errors`: If True, suppresses error reporting. + - `pgen_file`: Optional path to the pgen file, required if `pargs` is specified. + - `pargs`: Additional arguments for parallel processing. + + Raises: + ValueError: + If the `pargs` parameter is used without specifying a `pgen_file`. """ print(banner_small) filepath: str = verify_filepath(args.specification) @@ -164,11 +212,22 @@ def process_run(args: Namespace) -> None: router.run_task_server(study, args.run_mode) -def process_restart(args: Namespace) -> None: +def process_restart(args: Namespace): """ CLI command for restarting a study. - :param [Namespace] `args`: parsed CLI arguments + This function handles the restart process by verifying the specified restart + directory, locating a valid provenance specification file, and initiating + the study from that point. + + Args: + args: Parsed CLI arguments containing:\n + - `restart_dir`: Path to the directory where the restart specifications are located. + - `run_mode`: The mode for running the study (e.g., normal, dry-run). + + Raises: + ValueError: If the `restart_dir` does not contain a valid provenance spec file or + if multiple files match the specified pattern. """ print(banner_small) restart_dir: str = verify_dirpath(args.restart_dir) @@ -184,11 +243,20 @@ def process_restart(args: Namespace) -> None: router.run_task_server(study, args.run_mode) -def launch_workers(args): +def launch_workers(args: Namespace): """ CLI command for launching workers. - :param `args`: parsed CLI arguments + This function initializes worker processes for executing tasks as defined + in the Merlin specification. + + Args: + args: Parsed CLI arguments containing:\n + - `worker_echo_only`: If True, don't start the workers and just echo the launch command + - Additional worker-related parameters such as: + - `worker_steps`: Only start workers for these steps. + - `worker_args`: Arguments to pass to the worker processes. + - `disable_logs`: If True, disables logging for the worker processes. """ if not args.worker_echo_only: print(banner_small) @@ -204,11 +272,17 @@ def launch_workers(args): LOG.debug(f"celery command: {launch_worker_status}") -def purge_tasks(args): +def purge_tasks(args: Namespace): """ - CLI command for purging tasks. + CLI command for purging tasks from the task server. + + This function removes specified tasks from the task server based on the provided + Merlin specification. It allows for targeted purging or forced removal of tasks. - :param `args`: parsed CLI arguments + Args: + args: Parsed CLI arguments containing:\n + - `purge_force`: If True, forces the purge operation without confirmation. + - `purge_steps`: Steps or criteria based on which tasks will be purged. """ print(banner_small) spec, _ = get_merlin_spec_with_override(args) @@ -222,15 +296,27 @@ def purge_tasks(args): LOG.info(f"Purge return = {ret} .") -def query_status(args): +def query_status(args: Namespace): """ - CLI command for querying status of studies. - Based on the parsed CLI args, construct either a Status object or a DetailedStatus object - and display the appropriate output. - Object mapping is as follows: - merlin status -> Status object ; merlin detailed-status -> DetailedStatus object + CLI command for querying the status of studies. + + This function processes the given command-line arguments to determine the + status of a study. It constructs either a `Status` object or a `DetailedStatus` + object based on the specified command and the arguments provided. The function + handles validations for the task server input and the output format specified + for status dumping. - :param `args`: parsed CLI arguments + Object mapping: + - `merlin status` -> `Status` object + - `merlin detailed-status` -> `DetailedStatus` object + + Args: + args: Parsed CLI arguments containing user inputs for the status query. + + Raises: + ValueError: + If the task server specified is not supported (only "celery" is valid). + If the --dump filename provided does not end with ".csv" or ".json". """ print(banner_small) @@ -274,11 +360,24 @@ def query_status(args): return None -def query_queues(args): +def query_queues(args: Namespace): """ - CLI command for finding all workers. - - :param args: parsed CLI arguments + CLI command for finding all workers and their associated queues. + + This function processes the command-line arguments to retrieve and display + information about the available workers and their queues within the task server. + It validates the necessary parameters, handles potential file dumping, and + formats the output for easy readability. + + Args: + args: Parsed CLI arguments containing user inputs related to the query. + + Raises: + ValueError: + If a specification is not provided when steps are specified and the + steps do not include "all". + If variables are included without a corresponding specification. + If the specified dump filename does not end with '.json' or '.csv'. """ print(banner_small) @@ -318,11 +417,19 @@ def query_queues(args): router.dump_queue_info(args.task_server, queue_information, args.dump) -def query_workers(args): +def query_workers(args: Namespace): """ CLI command for finding all workers. - :param `args`: parsed CLI arguments + This function retrieves and queries the names of any active workers. + If the `--spec` argument is included, only query the workers defined in the spec file. + + Args: + args: Parsed command-line arguments, which may include:\n + - `spec`: Path to the specification file. + - `task_server`: Address of the task server to query. + - `queues`: List of queue names to filter workers. + - `workers`: List of specific worker names to query. """ print(banner_small) @@ -340,11 +447,20 @@ def query_workers(args): router.query_workers(args.task_server, worker_names, args.queues, args.workers) -def stop_workers(args): +def stop_workers(args: Namespace): """ CLI command for stopping all workers. - :param `args`: parsed CLI arguments + This function stops any active workers connected to a user's task server. + If the `--spec` argument is provided, this function retrieves the names of + workers from a the spec file and then issues a command to stop them. + + Args: + args: Parsed command-line arguments, which may include:\n + - `spec`: Path to the specification file to load worker names. + - `task_server`: Address of the task server to send the stop command to. + - `queues`: List of queue names to filter the workers. + - `workers`: List of specific worker names to stop. """ print(banner_small) worker_names = [] @@ -362,11 +478,12 @@ def stop_workers(args): router.stop_workers(args.task_server, worker_names, args.queues, args.workers) -def print_info(args): +def print_info(args: Namespace): """ - CLI command to print merlin config info. + CLI command to print merlin configuration info. - :param `args`: parsed CLI arguments + Args: + args: Parsed CLI arguments. """ # if this is moved to the toplevel per standard style, merlin is unable to generate the (needed) default config file from merlin import display # pylint: disable=import-outside-toplevel @@ -376,9 +493,22 @@ def print_info(args): def config_merlin(args: Namespace) -> None: """ - CLI command to setup default merlin config. - - :param [Namespace] `args`: parsed CLI arguments + CLI command to set up the default Merlin configuration. + + This function initializes the configuration app.yaml file that's + necessary to connect Merlin to a central server. If the output + directory is not specified via the command-line arguments, it + defaults to the user's home directory under `.merlin`. + + Args: + args: Parsed command-line arguments, which may include:\n + - `output_dir`: Path to the output directory for + configuration files. If not provided, defaults to + `~/.merlin`. + - `task_server`: Address of the task server for the + configuration. + - `broker`: Address of the broker service to use. + - `test`: Flag indicating whether to run in test mode. """ output_dir: Optional[str] = args.output_dir if output_dir is None: @@ -389,9 +519,20 @@ def config_merlin(args: Namespace) -> None: def process_example(args: Namespace) -> None: - """Either lists all example workflows, or sets up an example as a workflow to be run at root dir. - - :param [Namespace] `args`: parsed CLI arguments + """ + CLI command to set up or list Merlin example workflows. + + This function either lists all available example workflows or sets + up a specified example workflow to be run in the root directory. The + behavior is determined by the `workflow` argument. + + Args: + args: Parsed command-line arguments, which may include:\n + - `workflow`: The action to perform; should be "list" + to display all examples or the name of a specific example + workflow to set up. + - `path`: The directory where the example workflow + should be set up. Only applicable when `workflow` is not "list". """ if args.workflow == "list": print(list_examples()) @@ -400,12 +541,20 @@ def process_example(args: Namespace) -> None: setup_example(args.workflow, args.path) -def process_monitor(args): +def process_monitor(args: Namespace): """ - CLI command to monitor merlin workers and queues to keep - the allocation alive - - :param `args`: parsed CLI arguments + CLI command to monitor Merlin workers and queues to maintain + allocation status. + + This function periodically checks the status of Merlin workers and + the associated queues to ensure that the allocation remains active. + It includes a sleep interval to wait before each check, including + the initial one. + + Args: + args: Parsed command-line arguments, which may include:\n + - `sleep`: The duration (in seconds) to wait before + checking the queue status again. """ LOG.info("Monitor: checking queues ...") spec, _ = get_merlin_spec_with_override(args) @@ -422,8 +571,24 @@ def process_monitor(args): def process_server(args: Namespace): """ - Route to the correct function based on the command - given via the CLI + Route to the appropriate server function based on the command + specified via the CLI. + + This function processes commands related to server management, + directing the flow to the corresponding function for actions such + as initializing, starting, stopping, checking status, restarting, + or configuring the server. + + Args: + args: Parsed command-line arguments, which includes:\n + - `commands`: The server management command to execute. + Possible values are: + - "init": Initialize the server. + - "start": Start the server. + - "stop": Stop the server. + - "status": Check the server status. + - "restart": Restart the server. + - "config": Configure the server. """ if args.commands == "init": init_server() @@ -443,7 +608,12 @@ def process_server(args: Namespace): # to split the function up but that wouldn't make much sense so we ignore it def setup_argparse() -> None: # pylint: disable=R0915 """ - Setup argparse and any CLI options we want available via the package. + Set up the command-line argument parser for the Merlin package. + + This function configures the ArgumentParser for the Merlin CLI, allowing users + to interact with various commands related to workflow management and task handling. + It includes options for running a workflow, restarting tasks, purging task queues, + generating configuration files, and managing/configuring the server. """ parser: HelpParser = HelpParser( prog="merlin", @@ -791,10 +961,17 @@ def setup_argparse() -> None: # pylint: disable=R0915 def generate_worker_touching_parsers(subparsers: ArgumentParser) -> None: - """All CLI arg parsers directly controlling or invoking workers are generated here. + """ + Generate command-line argument parsers for managing worker operations. + + This function sets up subparsers for CLI commands that directly control or invoke + workers in the context of the Merlin framework. It provides options for running, + querying, stopping, and monitoring workers associated with a Merlin YAML study + specification. - :param [ArgumentParser] `subparsers`: the subparsers needed for every CLI command that directly controls or invokes - workers. + Args: + subparsers: An instance of ArgumentParser for adding command-line subcommands related + to worker management. """ # merlin run-workers run_workers: ArgumentParser = subparsers.add_parser( @@ -939,11 +1116,18 @@ def generate_worker_touching_parsers(subparsers: ArgumentParser) -> None: monitor.set_defaults(func=process_monitor) -def generate_diagnostic_parsers(subparsers: ArgumentParser) -> None: - """All CLI arg parsers generally used diagnostically are generated here. +def generate_diagnostic_parsers(subparsers: ArgumentParser): + """ + Generate command-line argument parsers for diagnostic operations in the Merlin framework. + + This function sets up subparsers for CLI commands that handle diagnostics related + to Merlin jobs. It provides options to check the status of studies, gather queue + statistics, and retrieve configuration information, making it easier for users to + diagnose issues with their workflows. - :param [ArgumentParser] `subparsers`: the subparsers needed for every CLI command that handles diagnostics for a - Merlin job. + Args: + subparsers: An instance of ArgumentParser that will be used to add command-line + subcommands for various diagnostic activities. """ # merlin status status_cmd: ArgumentParser = subparsers.add_parser( @@ -1133,7 +1317,13 @@ def generate_diagnostic_parsers(subparsers: ArgumentParser) -> None: def main(): """ - High-level CLI operations. + Entry point for the Merlin command-line interface (CLI) operations. + + This function sets up the argument parser, handles command-line arguments, + initializes logging, and executes the appropriate function based on the + provided command. It ensures that the user receives help information if + no arguments are provided and performs error handling for any exceptions + that may occur during command execution. """ parser = setup_argparse() if len(sys.argv) == 1: diff --git a/merlin/router.py b/merlin/router.py index d9114bbc..b135de3f 100644 --- a/merlin/router.py +++ b/merlin/router.py @@ -38,8 +38,10 @@ import logging import os import time +from argparse import Namespace from typing import Dict, List, Tuple + from merlin.exceptions import NoWorkersException from merlin.study.celeryadapter import ( build_set_of_queues, @@ -55,6 +57,8 @@ start_celery_workers, stop_celery_workers, ) +from merlin.spec.specification import MerlinSpec +from merlin.study.study import MerlinStudy try: @@ -70,12 +74,21 @@ # and try to resolve them -def run_task_server(study, run_mode=None): +def run_task_server(study: MerlinStudy, run_mode: str = None): """ - Creates the task server interface for communicating the tasks. - - :param `study`: The MerlinStudy object - :param `run_mode`: The type of run mode, e.g. local, batch + Creates the task server interface for managing task communications. + + This function determines which server to send tasks to. It checks if + Celery is set as the task server; if not, it logs an error message. + The run mode can be specified to determine how tasks should be executed. + + Args: + study: The study object representing the current + experiment setup, containing configuration + details for the task server. + run_mode: The type of run mode to use for + task execution. This can include options + such as 'local' or 'batch'. """ if study.expanded_spec.merlin["resources"]["task_server"] == "celery": run_celery(study, run_mode) @@ -83,14 +96,36 @@ def run_task_server(study, run_mode=None): LOG.error("Celery is not specified as the task server!") -def launch_workers(spec, steps, worker_args="", disable_logs=False, just_return_command=False): +def launch_workers( + spec: MerlinSpec, + steps: List[str], + worker_args: str = "", + disable_logs: bool = False, + just_return_command: bool = False, +) -> str: """ - Launches workers for the specified study. - - :param `specs`: Tuple of (YAMLSpecification, MerlinSpec) - :param `steps`: The steps in the spec to tie the workers to - :param `worker_args`: Optional arguments for the workers - :param `just_return_command`: Don't execute, just return the command + Launches workers for the specified study based on the provided + specification and steps. + + This function checks if Celery is configured as the task server + and initiates the specified workers accordingly. It provides options + for additional worker arguments, logging control, and command-only + execution without launching the workers. + + Args: + spec: Specification details necessary for launching the workers. + steps: The specific steps in the specification that the workers + will be associated with. + worker_args: Additional arguments to be passed to the workers. + Defaults to an empty string. + disable_logs: Flag to disable logging during worker execution. + Defaults to False. + just_return_command: If True, the function will not execute the + command but will return it instead. Defaults + to False. + + Returns: + A string of the worker launch command(s). """ if spec.merlin["resources"]["task_server"] == "celery": # pylint: disable=R1705 # Start workers @@ -101,15 +136,29 @@ def launch_workers(spec, steps, worker_args="", disable_logs=False, just_return_ return "No workers started" -def purge_tasks(task_server, spec, force, steps): +def purge_tasks(task_server: str, spec: MerlinSpec, force: bool, steps: List[str]) -> int: """ - Purges all tasks. - - :param `task_server`: The task server from which to purge tasks. - :param `spec`: A MerlinSpec object - :param `force`: Purge without asking for confirmation - :param `steps`: Space-separated list of stepnames defining queues to purge, - default is all steps + Purges all tasks from the specified task server. + + This function removes tasks from the designated queues associated + with the specified steps. It operates without confirmation if + the `force` parameter is set to True. The function logs the + steps being purged and checks if Celery is the configured task + server before proceeding. + + Args: + task_server: The task server from which to purge tasks. + Expected value is 'celery'. + spec: A MerlinSpec object containing the configuration + needed to generate queue specifications. + force: If True, purge the tasks without any confirmation prompt. + steps: A space-separated list of step names that define + which queues to purge. If not specified, + defaults to purging all steps. + + Returns: + The result of the purge operation; -1 if the task server is not + supported (i.e., not Celery). """ LOG.info(f"Purging queues for steps = {steps}") @@ -124,12 +173,20 @@ def purge_tasks(task_server, spec, force, steps): def dump_queue_info(task_server: str, query_return: List[Tuple[str, int, int]], dump_file: str): """ - Format the information we're going to dump in a way that the Dumper class can - understand and add a timestamp to the info. - - :param task_server: The task server from which to query queues - :param query_return: The output of `query_queues` - :param dump_file: The filepath of the file we'll dump queue info to + Formats and dumps queue information for the specified task server. + + This function prepares the queue data returned from the queue + query and formats it in a way that the `Dumper` class can process. + It also adds a timestamp to the information before dumping it + to the specified file. + + Args: + task_server: The task server from which to query queues, + expected to be 'celery'. + query_return: The output from the `query_queues` function, + containing tuples of queue information. + dump_file: The filepath where the queue information will + be dumped. """ if task_server == "celery": dump_celery_queue_info(query_return, dump_file) @@ -139,19 +196,36 @@ def dump_queue_info(task_server: str, query_return: List[Tuple[str, int, int]], def query_queues( task_server: str, - spec: "MerlinSpec", # noqa: F821 + spec: MerlinSpec, steps: List[str], specific_queues: List[str], verbose: bool = True, -): +) -> Dict[str, Dict[str, int]]: """ - Queries status of queues. - - :param task_server: The task server from which to query queues - :param spec: A MerlinSpec object or None - :param steps: Spaced-separated list of stepnames to query. Default is all - :param specific_queues: A list of queue names to query or None - :param verbose: A bool to determine whether to output log statements or not + Queries the status of queues from the specified task server. + + This function checks the status of queues tied to a given task + server, building a list of queues based on the provided steps + and specific queue names. It supports querying Celery task + servers and returns the results in a structured format. + Logging behavior can be controlled with the verbose parameter. + + Args: + task_server: The task server from which to query queues, + expected to be 'celery'. + spec: A MerlinSpec object used to define the configuration of + queues. Can also be None. + steps: A space-separated list of step names to query. Default + is to query all available steps if this is empty. + specific_queues: A list of specific queue names to query. Can + be empty or None to query all relevant queues. + verbose: If True, enables logging of query operations. Defaults + to True. + + Returns: + A dictionary where the keys are queue names and the values are + dictionaries containing the number of workers (consumers) + and tasks (jobs) attached to each queue. """ if task_server == "celery": # pylint: disable=R1705 # Build a set of queues to query and query them @@ -159,14 +233,19 @@ def query_queues( return query_celery_queues(queues) else: LOG.error("Celery is not specified as the task server!") - return [] + return {} -def query_workers(task_server, spec_worker_names, queues, workers_regex): +def query_workers(task_server: str, spec_worker_names: List[str], queues: List[str], workers_regex: str): """ - Gets info from workers. - - :param `task_server`: The task server to query. + Retrieves information from workers associated with the specified task server. + + Args: + task_server: The task server to query; must be 'celery'. + spec_worker_names: A list of specific worker names to query. + queues: A list of queues to search for associated workers. + workers_regex: A regex pattern used to filter worker names + during the query. """ LOG.info("Searching for workers...") @@ -176,12 +255,17 @@ def query_workers(task_server, spec_worker_names, queues, workers_regex): LOG.error("Celery is not specified as the task server!") -def get_workers(task_server): - """Get all workers. +def get_workers(task_server: str) -> List[str]: + """ + This function queries the designated task server to obtain a list of all + workers that are currently connected. + + Args: + task_server: The task server to query. - :param `task_server`: The task server to query. - :return: A list of all connected workers - :rtype: list + Returns: + A list of all connected workers. If the task server is not supported, + an empty list is returned. """ if task_server == "celery": # pylint: disable=R1705 return get_workers_from_app() @@ -190,14 +274,18 @@ def get_workers(task_server): return [] -def stop_workers(task_server, spec_worker_names, queues, workers_regex): +def stop_workers(task_server: str, spec_worker_names: List[str], queues: List[str], workers_regex: str): """ - Stops workers. - - :param `task_server`: The task server from which to stop workers. - :param `spec_worker_names`: Worker names to stop, drawn from a spec. - :param `queues` : The queues to stop - :param `workers_regex` : Regex for workers to stop + This function sends a command to stop workers that match the specified + criteria from the designated task server. + + Args: + task_server: The task server from which to stop workers; + currently only supports 'celery'. + spec_worker_names: A list of worker names to stop, as defined + in a specification. + queues: A list of queues from which to stop associated workers. + workers_regex: A regex pattern used to filter the workers to stop. """ LOG.info("Stopping workers...") @@ -208,14 +296,24 @@ def stop_workers(task_server, spec_worker_names, queues, workers_regex): LOG.error("Celery is not specified as the task server!") -def create_config(task_server: str, config_dir: str, broker: str, test: str) -> None: +def create_config(task_server: str, config_dir: str, broker: str, test: str): """ - Create a config for the given task server. - - :param [str] `task_server`: The task server from which to stop workers. - :param [str] `config_dir`: Optional directory to install the config. - :param [str] `broker`: string indicated the broker, used to check for redis. - :param [str] `test`: string indicating if the app.yaml is used for testing. + Create a configuration app.yaml that Merlin will use to connect to the + specified task server. + + This function generates a configuration file for the given task server, + primarily supporting the 'celery' task server. It creates the necessary + directories if they do not exist and determines the appropriate configuration + file based on the provided broker and testing parameters. + + Args: + task_server: The task server for which to create the configuration. + Currently supports only 'celery'. + config_dir: The directory where the configuration files will be installed. + If the directory does not exist, it will be created. + broker: A string indicating the broker type; currently used to check for 'redis'. + test: A string that indicates whether the application should use a test + configuration file. If set, a test configuration is created. """ if test: LOG.info("Creating test config ...") @@ -240,10 +338,21 @@ def create_config(task_server: str, config_dir: str, broker: str, test: str) -> def get_active_queues(task_server: str) -> Dict[str, List[str]]: """ - Get a dictionary of active queues and the workers attached to these queues. - - :param `task_server`: The task server to query for active queues - :returns: A dict where keys are queue names and values are a list of workers watching them + Retrieve a dictionary of active queues and their associated workers for the specified task server. + + This function queries the given task server for its active queues and gathers + information about which workers are currently monitoring these queues. It supports + the 'celery' task server and returns a structured dictionary containing the queue + names as keys and lists of worker names as values. + + Args: + task_server: The task server to query for active queues. + Currently supports only 'celery'. + + Returns: + A dictionary where:\n + - The keys are the names of the active queues. + - The values are lists of worker names that are currently attached to those queues. """ active_queues = {} @@ -257,15 +366,24 @@ def get_active_queues(task_server: str) -> Dict[str, List[str]]: return active_queues -def wait_for_workers(sleep: int, task_server: str, spec: "MerlinSpec"): # noqa +def wait_for_workers(sleep: int, task_server: str, spec: MerlinSpec): # noqa """ - Wait on workers to start up. Check on worker start 10 times with `sleep` seconds between - each check. If no workers are started in time, raise an error to kill the monitor (there - was likely an issue with the task server that caused worker launch to fail). - - :param `sleep`: An integer representing the amount of seconds to sleep between each check - :param `task_server`: The task server from which to look for workers - :param `spec`: A MerlinSpec object representing the spec we're monitoring + Wait for workers to start up by checking their status at regular intervals. + + This function monitors the specified task server for the startup of worker processes. + It checks for the existence of the expected workers up to 10 times, sleeping for a + specified number of seconds between each check. If no workers are detected after + the maximum number of attempts, it raises an error to terminate the monitoring + process, indicating a potential issue with the task server. + + Args: + sleep: The number of seconds to pause between each check for worker status. + task_server: The task server from which to query for worker status. + spec: An instance of the MerlinSpec class that contains the specification + for the workers being monitored. + + Raises: + NoWorkersException: If no workers are detected after the maximum number of checks. """ # Get the names of the workers that we're looking for worker_names = spec.get_worker_names() @@ -297,9 +415,12 @@ def check_workers_processing(queues_in_spec: List[str], task_server: str) -> boo """ Check if any workers are still processing tasks by querying the task server. - :param `queues_in_spec`: A list of queues to check if tasks are still active in - :param `task_server`: The task server from which to query - :returns: True if workers are still processing tasks, False otherwise + Args: + queues_in_spec: A list of queue names to check for active tasks. + task_server: The task server from which to query the processing status. + + Returns: + True if workers are still processing tasks, False otherwise. """ result = False @@ -313,13 +434,22 @@ def check_workers_processing(queues_in_spec: List[str], task_server: str) -> boo return result -def check_merlin_status(args: "Namespace", spec: "MerlinSpec") -> bool: # noqa +def check_merlin_status(args: Namespace, spec: MerlinSpec) -> bool: # noqa """ - Function to check merlin workers and queues to keep the allocation alive + Function to check Merlin workers and queues to keep the allocation alive. + + This function monitors the status of workers and jobs within the specified task server + and the provided Merlin specification. It checks for active tasks and workers, ensuring + that the allocation remains valid. + + Args: + args: Parsed command-line interface arguments, including task server + specifications and sleep duration. + spec: The parsed spec.yaml as a MerlinSpec object, containing queue + and worker definitions. - :param `args`: parsed CLI arguments - :param `spec`: the parsed spec.yaml as a MerlinSpec object - :returns: True if there are still tasks being processed, False otherwise + Returns: + True if there are still tasks being processed, False otherwise. """ # Initialize the variable to track if there are still active tasks active_tasks = False diff --git a/merlin/study/celeryadapter.py b/merlin/study/celeryadapter.py index 5b5bdd41..67f1c7a8 100644 --- a/merlin/study/celeryadapter.py +++ b/merlin/study/celeryadapter.py @@ -424,7 +424,7 @@ def build_set_of_queues( return queues -def query_celery_queues(queues: List[str], app: Celery = None, config: Config = None) -> Dict[str, List[str]]: +def query_celery_queues(queues: List[str], app: Celery = None, config: Config = None) -> Dict[str, Dict[str, int]]: """ Build a dict of information about the number of jobs and consumers attached to specific queues that we want information on. diff --git a/merlin/utils.py b/merlin/utils.py index 2e69f577..12ccd670 100644 --- a/merlin/utils.py +++ b/merlin/utils.py @@ -42,7 +42,7 @@ from copy import deepcopy from datetime import datetime, timedelta from types import SimpleNamespace -from typing import Callable, List, Optional, Union +from typing import Any, Callable, Dict, Generator, List, Optional, Tuple, Union import numpy as np import pkg_resources @@ -62,13 +62,25 @@ DEFAULT_FLUX_VERSION = "0.48.0" -def get_user_process_info(user=None, attrs=None): +def get_user_process_info(user: str = None, attrs: List[str] = None) -> List[Dict]: """ - Return a list of process info for all of the user's running processes. + Return a list of process information for all of the user's running processes. - :param `user`: user name (default from getpass). Option: 'all_users': get - all processes - :param `atts`: the attributes to include + This function retrieves and returns details about the currently running processes + for a specified user. If no user is specified, it defaults to the current user. + It can also return information for all users if specified. + + Args: + user: The username for which to retrieve process information. + If set to 'all_users', retrieves processes for all users. + Defaults to the current user's username if not provided. + attrs: A list of attributes to include in the process information. + Defaults to ["pid", "name", "username", "cmdline"] if None. + If "username" is not included in the list, it will be added. + + Returns: + A list of dictionaries containing the specified attributes for each process + belonging to the specified user or all users if 'all_users' is specified. """ if attrs is None: attrs = ["pid", "name", "username", "cmdline"] @@ -84,13 +96,23 @@ def get_user_process_info(user=None, attrs=None): return [p.info for p in psutil.process_iter(attrs=attrs) if user in p.info["username"]] -def check_pid(pid, user=None): +def check_pid(pid: int, user: str = None) -> bool: """ - Check if pid is in process list. + Check if a given process ID (PID) is in the process list for a specified user. + + This function determines whether a specific PID is currently running + for the specified user. If no user is specified, it defaults to the + current user. It can also check for all users if specified. + + Args: + pid: The process ID to check for in the process list. + user: The username for which to check the process. + If set to 'all_users', checks processes for all users. + Defaults to the current user's username if not provided. - :param `pid`: process id - :param `user`: user name (default from getpass). Option: 'all_users': get - all processes + Returns: + True if the specified PID is found in the process list for the + given user, False otherwise. """ user_processes = get_user_process_info(user=user) for process in user_processes: @@ -99,13 +121,23 @@ def check_pid(pid, user=None): return False -def get_pid(name, user=None): +def get_pid(name: str, user: str = None) -> List[int]: """ - Return pid of process with name. + Return the process ID(s) (PID) of processes with the specified name. - :param `name`: process name - :param `user`: user name (default from getpass). Option: 'all_users': get - all processes + This function retrieves the PID(s) of all running processes that match + the given name for a specified user. If no user is specified, it defaults + to the current user. It can also retrieve PIDs for all users if specified. + + Args: + name: The name of the process to search for. + user: The username for which to retrieve the process IDs. + If set to 'all_users', retrieves processes for all users. + Defaults to the current user's username if not provided. + + Returns: + A list of PIDs for processes matching the specified name. + Returns None if no matching processes are found. """ user_processes = get_user_process_info(user=user) name_list = [p["pid"] for p in user_processes if name in p["name"]] @@ -114,37 +146,67 @@ def get_pid(name, user=None): return None -def get_procs(name, user=None): +def get_procs(name: str, user: str = None) -> List[Tuple[int, str]]: """ - Return a list of (pid, cmdline) tuples of process with name. + Return a list of tuples containing the process ID (PID) and command line + of processes with the specified name. + + This function retrieves all running processes that match the given name + for a specified user. If no user is specified, it defaults to the current + user. It can also retrieve processes for all users if specified. - :param `name`: process name - :param `user`: user name (default from getpass). Option: 'all_users': get - all processes + Args: + name: The name of the process to search for. + user: The username for which to retrieve the process information. + If set to 'all_users', retrieves processes for all users. + Defaults to the current user's username if not provided. + + Returns: + A list of tuples, each containing the PID and command line of processes + matching the specified name. Returns an empty list if no matching + processes are found. """ user_processes = get_user_process_info(user=user) procs = [(p["pid"], p["cmdline"]) for p in user_processes if name in p["name"]] return procs -def is_running_psutil(cmd, user=None): +def is_running_psutil(cmd: str, user: str = None) -> bool: """ - Determine if process with given command is running. - Uses psutil command instead of call to 'ps' + Determine if a process with the given command is currently running. + + This function checks for the existence of any running processes that + match the specified command. It uses the `psutil` library to gather + process information instead of making a call to the 'ps' command. + + Args: + cmd: The command or command line snippet to search for in running processes. + user: The username for which to check running processes. + If set to 'all_users', checks processes for all users. + Defaults to the current user's username if not provided. - :param `cmd`: process cmd - :param `user`: user name (default from getpass). Option: 'all_users': get - all processes + Returns: + True if at least one matching process is found; otherwise, False. """ user_processes = get_user_process_info(user=user) return any(cmd in " ".join(p["cmdline"]) for p in user_processes) -def is_running(name, all_users=False): +def is_running(name: str, all_users: bool = False) -> bool: """ - Determine if process with name is running. + Determine if a process with the specified name is currently running. - :param `name`: process name + This function checks for the existence of a running process with the + provided name by executing the 'ps' command. It can be configured to + check processes for all users or just the current user. + + Args: + name: The name of the process to search for. + all_users: If True, checks for processes across all users. + Defaults to False, which checks only the current user's processes. + + Returns: + True if a process with the specified name is found; otherwise, False. """ cmd = ["ps", "ux"] @@ -164,23 +226,42 @@ def is_running(name, all_users=False): return False -def expandvars2(path): +def expandvars2(path: str) -> str: """ - Replace shell strings from the current environment variables + Replace shell variables in the given path with their corresponding + environment variable values. + + This function expands shell-style variable references (e.g., $VAR) + in the input path using the current environment variables. It also + ensures that any escaped dollar signs (e.g., \$) are not expanded. - :param `path`: a path + Args: + path: The input path containing shell variable references to be expanded. + + Returns: + The path with shell variables replaced by their corresponding values + from the environment, with unescaped variables expanded. """ return re.sub(r"(? List[str]: """ - Apply a regex filter to a list + Apply a regex filter to a list. + + This function filters a given list based on a specified regular expression. + Depending on the `match` parameter, it can either match the entire string + or search for the regex pattern within the strings of the list. - :param `regex` : the regular expression - :param `list_to_filter` : the list to filter + Args: + regex: The regular expression to use for filtering the list. + list_to_filter: The list of strings to be filtered based on the regex. + match: If True, uses re.match to filter items that match + the regex from the start. If False, uses re.search + to filter items that contain the regex pattern. - :return `new_list` + Returns: + A new list containing the filtered items that match the regex. """ r = re.compile(regex) # pylint: disable=C0103 if match: @@ -188,16 +269,32 @@ def regex_list_filter(regex, list_to_filter, match=True): return list(filter(r.search, list_to_filter)) -def apply_list_of_regex(regex_list, list_to_filter, result_list, match=False, display_warning: bool = True): +def apply_list_of_regex( + regex_list: List[str], + list_to_filter: List[str], + result_list: List[str], + match: bool = False, + display_warning: bool = True +): """ - Take a list of regex's, apply each regex to a list we're searching through, - and append each result to a result list. + Apply a list of regex patterns to a list and accumulate the results. + + This function takes each regex from the provided list of regex patterns + and applies it to the specified list. The results of each successful + match or search are appended to a result list. Optionally, it can display + a warning if a regex does not match any item in the list. + + Args: + regex_list: A list of regular expressions to apply to the list_to_filter. + list_to_filter: The list of strings that the regex patterns will be applied to. + result_list: The list where results of the regex filters will be appended. + match: If True, uses re.match for applying the regex. + If False, uses re.search. + display_warning: If True, displays a warning message when no matches are + found for a regex. - :param `regex_list`: A list of regular expressions to apply to the list_to_filter - :param `list_to_filter`: A list that we'll apply regexs to - :param `result_list`: A list that we'll append results of the regex filters to - :param `match`: A bool where when true we use re.match for applying the regex, - when false we use re.search for applying the regex. + Side Effect: + This function modifies the `result_list` in place. """ for regex in regex_list: filter_results = set(regex_list_filter(regex, list_to_filter, match)) @@ -209,28 +306,37 @@ def apply_list_of_regex(regex_list, list_to_filter, result_list, match=False, di result_list += filter_results -def load_yaml(filepath): +def load_yaml(filepath: str) -> Dict: """ - Safely read a yaml file. + Safely read a YAML file and return its contents. - :param `filepath`: a filepath to a yaml file - :type filepath: str + Args: + filepath: The file path to the YAML file to be read. - :returns: Python objects holding the contents of the yaml file + Returns: + A dict representing the contents of the YAML file. """ with open(filepath, "r") as _file: return yaml.safe_load(_file) -def get_yaml_var(entry, var, default): +def get_yaml_var(entry: Dict[str, Any], var: str, default: Any) -> Any: """ - Return entry[var], else return default + Retrieve the value associated with a specified key from a YAML dictionary. - :param `entry`: a yaml dict - :param `var`: a yaml key - :param `default`: default value in the absence of data - """ + This function attempts to return the value of `var` from the provided `entry` + dictionary. If the key does not exist, it will try to access it as an attribute + of the entry object. If neither is found, the function returns the specified + `default` value. + + Args: + entry: A dictionary representing the contents of a YAML file. + var: The key or attribute name to retrieve from the entry. + default: The default value to return if the key or attribute is not found. + Returns: + The value associated with `var` in the entry, or `default` if not found. + """ try: return entry[var] except (TypeError, KeyError): @@ -240,19 +346,31 @@ def get_yaml_var(entry, var, default): return default -def load_array_file(filename, ndmin=2): +def load_array_file(filename: str, ndmin: int = 2) -> np.ndarray: """ - Loads up an array stored in filename, based on extension. + Load an array from a file based on its extension. - Valid filename extensions: - '.npy' : numpy binary file - '.csv' : comma separated text file - '.tab' : whitespace (or tab) separated text file + This function reads an array stored in the specified `filename`. + It supports three file types based on their extensions: - :param `filename` : The file to load - :param `ndmin` : The minimum number of dimensions to load - """ + - `.npy` for NumPy binary files + - `.csv` for comma-separated values + - `.tab` for whitespace (or tab) separated values + + The function ensures that the loaded array has at least `ndmin` dimensions. + If the array is in binary format, it checks the dimensions without altering the data. + + Args: + filename: The path to the file to load. + ndmin: The minimum number of dimensions the array should have. + Returns: + The loaded array. + + Raises: + TypeError: If the file extension is not one of the supported types + (`.npy`, `.csv`, `.tab`). + """ protocol = determine_protocol(filename) # Don't change binary-stored numpy arrays; just check dimensions @@ -277,9 +395,18 @@ def load_array_file(filename, ndmin=2): return array -def determine_protocol(fname): +def determine_protocol(fname: str) -> str: """ - Determines a file protocol based on file name extension. + Determine the file protocol based on the file name extension. + + Args: + fname: The name of the file whose protocol is to be determined. + + Returns: + The protocol corresponding to the file extension (e.g., 'hdf5'). + + Raises: + ValueError: If the provided file name does not have a valid extension. """ _, ext = os.path.splitext(fname) if ext.startswith("."): @@ -294,13 +421,21 @@ def determine_protocol(fname): def verify_filepath(filepath: str) -> str: """ - Verify that the filepath argument is a valid - file. + Verify that the given file path is valid and return its absolute form. + + This function checks if the specified `filepath` points to an existing file. + It expands any user directory shortcuts (e.g., `~`) and environment variables + in the provided path before verifying its existence. If the file does not exist, + a ValueError is raised. + + Args: + filepath: The path of the file to verify. - :param [str] `filepath`: the path of a file + Returns: + The verified absolute file path with expanded environment variables. - :return: the verified absolute filepath with expanded environment variables. - :rtype: str + Raises: + ValueError: If the provided file path does not point to a valid file. """ filepath = os.path.abspath(os.path.expandvars(os.path.expanduser(filepath))) if not os.path.isfile(filepath): @@ -310,13 +445,21 @@ def verify_filepath(filepath: str) -> str: def verify_dirpath(dirpath: str) -> str: """ - Verify that the dirpath argument is a valid - directory. + Verify that the given directory path is valid and return its absolute form. - :param [str] `dirpath`: the path of a directory + This function checks if the specified `dirpath` points to an existing directory. + It expands any user directory shortcuts (e.g., `~`) and environment variables + in the provided path before verifying its existence. If the directory does not exist, + a ValueError is raised. - :return: returns the absolute path with expanded environment vars for a given dirpath. - :rtype: str + Args: + dirpath: The path of the directory to verify. + + Returns: + The verified absolute directory path with expanded environment variables. + + Raises: + ValueError: If the provided directory path does not point to a valid directory. """ dirpath: str = os.path.abspath(os.path.expandvars(os.path.expanduser(dirpath))) if not os.path.isdir(dirpath): @@ -325,9 +468,19 @@ def verify_dirpath(dirpath: str) -> str: @contextmanager -def cd(path): # pylint: disable=C0103 +def cd(path: str) -> Generator[None]: # pylint: disable=C0103 """ - TODO + Context manager for changing the current working directory. + + This context manager changes the current working directory to the specified `path` + while executing the block of code within the context. Once the block is exited, + it restores the original working directory. + + Args: + path: The path to the directory to change to. + + Yields: + Control is yielded back to the block of code within the context. """ old_dir = os.getcwd() os.chdir(path) @@ -337,15 +490,37 @@ def cd(path): # pylint: disable=C0103 os.chdir(old_dir) -def pickle_data(filepath, content): - """Dump content to a pickle file""" +def pickle_data(filepath: str, content: Any): + """ + Dump content to a pickle file. + + This function serializes the given `content` and writes it to a specified file + in pickle format. The file is opened in write mode, which will overwrite any + existing content in the file. + + Args: + filepath: The path to the file where the content will be saved. + content: The data to be serialized and saved to the pickle file. + """ with open(filepath, "w") as f: # pylint: disable=C0103 pickle.dump(content, f) -def get_source_root(filepath): - """Used to find the absolute project path given a sample file path from - within the project. +def get_source_root(filepath: str) -> str: + """ + Find the absolute project path given a file path from within the project. + + This function determines the root directory of a project by analyzing the given + file path. It works by traversing the directory structure upwards until it + encounters a directory name that is not an integer, which is assumed to be the + project root. + + Args: + filepath: The file path from within the project for which to find the root. + + Returns: + The absolute path to the root directory of the project. Returns None if + the path corresponds to the root directory itself. """ filepath = os.path.abspath(filepath) sep = os.path.sep @@ -367,9 +542,19 @@ def get_source_root(filepath): return root -def ensure_directory_exists(**kwargs): +def ensure_directory_exists(**kwargs: Dict[Any, Any]) -> bool: """ - TODO + Ensure that the directory for the specified aggregate file exists. + + This function checks if the directory for the given `aggregate_file` exists. + If it does not exist, the function creates the necessary directories. + + Args: + **kwargs: Keyword arguments that must include:\n + - `aggregate_file` (str): The file path for which the directory needs to be ensured. + + Returns: + True if the directory already existed. False otherwise. """ aggregate_bundle = kwargs["aggregate_file"] dirname = os.path.dirname(aggregate_bundle) @@ -381,11 +566,25 @@ def ensure_directory_exists(**kwargs): return True -def nested_dict_to_namespaces(dic): - """Code for recursively converting dictionaries of dictionaries - into SimpleNamespaces instead. +def nested_dict_to_namespaces(dic: Dict) -> SimpleNamespace: """ + Convert a nested dictionary into a nested SimpleNamespace structure. + This function recursively transforms a dictionary (which may contain other + dictionaries) into a structure of SimpleNamespace objects. Each key in the + dictionary becomes an attribute of a SimpleNamespace, allowing for attribute-style + access to the data. + + Args: + dic: The nested dictionary to be converted. + + Returns: + A SimpleNamespace object representing the nested structure + of the input dictionary. + + Raises: + TypeError: If the input is not a dictionary. + """ def recurse(dic): if not isinstance(dic, dict): return dic @@ -400,9 +599,23 @@ def recurse(dic): return recurse(new_dic) -def nested_namespace_to_dicts(namespaces): - """Code for recursively converting namespaces of namespaces - into dictionaries instead. +def nested_namespace_to_dicts(namespaces: SimpleNamespace) -> Dict: + """ + Convert a nested SimpleNamespace structure into a nested dictionary. + + This function recursively transforms a SimpleNamespace (which may contain + other SimpleNamespaces) into a dictionary structure. Each attribute of the + SimpleNamespace becomes a key in the resulting dictionary. + + Args: + namespaces: The nested SimpleNamespace to be converted. + + Returns: + A dictionary representing the nested structure of the input + SimpleNamespace. + + Raises: + TypeError: If the input is not a SimpleNamespace. """ def recurse(namespaces): @@ -419,12 +632,28 @@ def recurse(namespaces): return recurse(new_ns) -def get_flux_version(flux_path, no_errors=False): +def get_flux_version(flux_path: str, no_errors: bool = False) -> str: """ - Return the flux version as a string + Retrieve the version of Flux as a string. + + This function executes the Flux binary located at `flux_path` with the + "version" command and parses the output to return the version number. + If the command fails or the Flux binary cannot be found, it can either + raise an error or return a default version based on the `no_errors` flag. - :param `flux_path`: the full path to the flux bin - :param `no_errors`: a flag to determine if this a test run to ignore errors + Args: + flux_path: The full path to the Flux binary. + no_errors: A flag to suppress error messages and + exceptions. If set to True, errors will be logged but not raised. + + Returns: + The version of Flux as a string. + + Raises: + FileNotFoundError: If the Flux binary cannot be found and `no_errors` + is set to False. + ValueError: If the version cannot be determined from the output and + `no_errors` is set to False. """ cmd = [flux_path, "version"] @@ -451,12 +680,23 @@ def get_flux_version(flux_path, no_errors=False): return flux_ver -def get_flux_cmd(flux_path, no_errors=False): +def get_flux_cmd(flux_path: str, no_errors: bool = False) -> str: """ - Return the flux run command as string + Generate the Flux run command based on the installed version. + + This function determines the appropriate Flux command to use for + running jobs, depending on the version of Flux installed at the + specified `flux_path`. It defaults to "flux run" for versions + greater than or equal to 0.48.x. For older versions, it adjusts + the command accordingly. - :param `flux_path`: the full path to the flux bin - :param `no_errors`: a flag to determine if this a test run to ignore errors + Args: + flux_path: The full path to the Flux binary. + no_errors: A flag to suppress error messages and exceptions + if set to True. + + Returns: + The appropriate Flux run command as a string. """ # The default is for flux version >= 0.48.x # this may change in the future. @@ -474,12 +714,23 @@ def get_flux_cmd(flux_path, no_errors=False): return flux_cmd -def get_flux_alloc(flux_path, no_errors=False): +def get_flux_alloc(flux_path: str, no_errors: bool = False) -> str: """ - Return the flux alloc command as string + Generate the `flux alloc` command based on the installed version. + + This function constructs the appropriate command for allocating + resources with Flux, depending on the version of Flux installed + at the specified `flux_path`. It defaults to "{flux_path} alloc" + for versions greater than or equal to 0.48.x. For older versions, + it adjusts the command accordingly. + + Args: + flux_path: The full path to the Flux binary. + no_errors: A flag to suppress error messages and exceptions + if set to True. - :param `flux_path`: the full path to the flux bin - :param `no_errors`: a flag to determine if this a test run to ignore errors + Returns: + The appropriate Flux allocation command as a string. """ # The default is for flux version >= 0.48.x # this may change in the future. @@ -495,12 +746,21 @@ def get_flux_alloc(flux_path, no_errors=False): return flux_alloc -def check_machines(machines): +def check_machines(machines: Union[str, List[str], Tuple[str]]) -> bool: """ - Return a True if the current machine is in the list of machines. + Check if the current machine is in the list of specified machines. - :param `machines`: A single machine or list of machines to compare - with the current machine. + This function determines whether the hostname of the current + machine matches any entry in a provided list of machine names. + It returns True if a match is found, otherwise it returns False. + + Args: + machines: A single machine name or a list/tuple of machine + names to compare with the current machine's hostname. + + Returns: + True if the current machine's hostname matches any of the + specified machines; False otherwise. """ local_hostname = socket.gethostname() @@ -514,19 +774,43 @@ def check_machines(machines): return False -def contains_token(string): +def contains_token(string: str) -> bool: """ - Return True if given string contains a token of the form $(STR). + Check if the given string contains a token of the form $(STR). + + This function uses a regular expression to search for tokens + that match the pattern $(), where consists of + alphanumeric characters and underscores. It returns True if + such a token is found; otherwise, it returns False. + + Args: + string: The input string to be checked for tokens. + + Returns: + True if the input string contains a token of the form + $(STR); False otherwise. """ if re.search(r"\$\(\w+\)", string): return True return False -def contains_shell_ref(string): +def contains_shell_ref(string: str) -> bool: """ - Return True if given string contains a shell variable reference - of the form $STR or ${STR}. + Check if the given string contains a shell variable reference. + + This function searches for shell variable references in the + format of $ or ${}, where + consists of alphanumeric characters and underscores. It returns + True if a match is found; otherwise, it returns False. + + Args: + string: The input string to be checked for shell + variable references. + + Returns: + True if the input string contains a shell variable + reference of the form $STR or ${STR}; False otherwise. """ if re.search(r"\$\w+", string) or re.search(r"\$\{\w+\}", string): return True @@ -534,16 +818,25 @@ def contains_shell_ref(string): def needs_merlin_expansion( - cmd: str, restart_cmd: str, labels: List[str], include_sample_keywords: Optional[bool] = True + cmd: str, restart_cmd: str, labels: List[str], include_sample_keywords: bool = True ) -> bool: """ - Check if the cmd or restart cmd provided have variables that need expansion. + Check if the provided command or restart command contains variables that require expansion. + + This function checks both the command (`cmd`) and the restart command (`restart_cmd`) + for the presence of specified labels or sample keywords that indicate a need for variable + expansion. + + Args: + cmd: The command inside a study step to check for variable expansion. + restart_cmd: The restart command inside a study step to check for variable expansion. + labels: A list of labels to check for inside `cmd` and `restart_cmd`. + include_sample_keywords: Flag to indicate whether to include default sample keywords + in the label check. - :param `cmd`: The command inside a study step to check for expansion - :param `restart_cmd`: The restart command inside a study step to check for expansion - :param `labels`: A list of labels to check for inside `cmd` and `restart_cmd` - :return : True if the cmd has any of the default keywords or spec - specified sample column labels. False otherwise. + Returns: + True if either `cmd` or `restart_cmd` contains any of the specified labels + or default sample keywords, indicating a need for expansion. False otherwise. """ sample_keywords = ["MERLIN_SAMPLE_ID", "MERLIN_SAMPLE_PATH", "merlin_sample_id", "merlin_sample_path"] if include_sample_keywords: @@ -560,20 +853,28 @@ def needs_merlin_expansion( return False -def dict_deep_merge(dict_a: dict, dict_b: dict, path: str = None, conflict_handler: Callable = None): +def dict_deep_merge(dict_a: Dict, dict_b: Dict, path: str = None, conflict_handler: Callable = None): """ - This function recursively merges dict_b into dict_a. The built-in - merge of dictionaries in python (dict(dict_a) | dict(dict_b)) does not do a - deep merge so this function is necessary. This will only merge in new keys, - it will NOT update existing ones, unless you specify a conflict handler function. - Credit to this stack overflow post: https://stackoverflow.com/a/7205107. + Recursively merges `dict_b` into `dict_a`, performing a deep merge. - :param `dict_a`: A dict that we'll merge dict_b into - :param `dict_b`: A dict that we want to merge into dict_a - :param `path`: The path down the dictionary tree that we're currently at - :param `conflict_handler`: An optional function to handle conflicts between values at the same key. - The function should return the value to be used in the merged dictionary. - The default behavior without this argument is to log a warning. + This function combines two dictionaries by recursively merging + the contents of `dict_b` into `dict_a`. Unlike Python's built-in + dictionary merge, this function performs a deep merge, meaning + it will merge nested dictionaries instead of just updating top-level keys. + Existing keys in `dict_a` will not be updated unless a conflict handler + is provided to resolve key conflicts. + + Credit to [this stack overflow post](https://stackoverflow.com/a/7205107). + + Args: + dict_a: The dictionary that will be merged into. + dict_b: The dictionary to merge into `dict_a`. + path: The current path in the dictionary tree. This is used for logging + purposes during recursion. + conflict_handler: A function to handle conflicts when both dictionaries + have the same key with different values. The function should return + the value to be used in the merged dictionary. If not provided, a + warning will be logged for conflicts. """ # Check to make sure we have valid dict_a and dict_b input @@ -609,15 +910,28 @@ def dict_deep_merge(dict_a: dict, dict_b: dict, path: str = None, conflict_handl dict_a[key] = dict_b[key] -def find_vlaunch_var(vlaunch_var: str, step_cmd: str, accept_no_matches=False) -> str: +def find_vlaunch_var(vlaunch_var: str, step_cmd: str, accept_no_matches: bool = False) -> str: """ - Given a variable used for VLAUNCHER and the step cmd value, find - the variable. + Find and return the specified VLAUNCHER variable from the step command. + + This function searches for a variable defined in the VLAUNCHER context + within the provided step command string. It looks for the variable in + the format `MERLIN_=`. If the variable is found, + it returns the variable in a format suitable for use in a command string. + If the variable is not found, the behavior depends on the `accept_no_matches` flag. - :param `vlaunch_var`: The name of the VLAUNCHER variable (without MERLIN_) - :param `step_cmd`: The string for the cmd of a step - :param `accept_no_matches`: If True, return None if we couldn't find the variable. Otherwise, raise an error. - :returns: the `vlaunch_var` variable or None + Args: + vlaunch_var: The name of the VLAUNCHER variable (without the prefix 'MERLIN_'). + step_cmd: The command string of a step where the variable may be defined. + accept_no_matches: If True, returns None if the variable is not found. + If False, raises a ValueError. Defaults to False. + + Returns: + The variable in the format '${MERLIN_}' if found, otherwise None + (if `accept_no_matches` is True) or raises a ValueError (if False). + + Raises: + ValueError: If the variable is not found and `accept_no_matches` is False. """ matches = list(re.findall(rf"^(?!#).*MERLIN_{vlaunch_var}=\d+", step_cmd, re.MULTILINE)) @@ -631,10 +945,25 @@ def find_vlaunch_var(vlaunch_var: str, step_cmd: str, accept_no_matches=False) - # Time utilities def convert_to_timedelta(timestr: Union[str, int]) -> timedelta: - """Convert a timestring to a timedelta object. - Timestring is given in in the format '[days]:[hours]:[minutes]:seconds' - with days, hours, minutes all optional add ons. - If passed as an int, will convert to a string first and interpreted as seconds. + """ + Convert a time string or integer to a timedelta object. + + The function takes a time string formatted as + '[days]:[hours]:[minutes]:seconds', where days, hours, and minutes + are optional. If an integer is provided, it is interpreted as the + total number of seconds. + + Args: + timestr: The time string in the specified format or an integer + representing seconds. + + Returns: + A timedelta object representing the duration specified by the input + string or integer. + + Raises: + ValueError: If the input string does not conform to the expected + format or contains more than four time fields. """ # make sure it's a string in case we get an int timestr = str(timestr) @@ -652,7 +981,20 @@ def convert_to_timedelta(timestr: Union[str, int]) -> timedelta: def _repr_timedelta_HMS(time_delta: timedelta) -> str: # pylint: disable=C0103 - """Represent a timedelta object as a string in hours:minutes:seconds""" + """ + Represent a timedelta object as a string in 'HH:MM:SS' format. + + This function converts a given timedelta object into a string that + represents the duration in hours, minutes, and seconds. The output + is formatted as 'HH:MM:SS', with leading zeros for single-digit + hours, minutes, or seconds. + + Args: + time_delta: The timedelta object to be converted. + + Returns: + A string representation of the timedelta in the format 'HH:MM:SS'. + """ hours, remainder = divmod(time_delta.total_seconds(), 3600) minutes, seconds = divmod(remainder, 60) hours, minutes, seconds = int(hours), int(minutes), int(seconds) @@ -660,20 +1002,46 @@ def _repr_timedelta_HMS(time_delta: timedelta) -> str: # pylint: disable=C0103 def _repr_timedelta_FSD(time_delta: timedelta) -> str: # pylint: disable=C0103 - """Represent a timedelta as a flux standard duration string, using seconds. + """ + Represent a timedelta as a Flux Standard Duration (FSD) string in seconds. - flux standard duration (FSD) is a floating point number with a single character suffix: s,m,h or d. - This uses seconds for simplicity. + The FSD format represents a duration as a floating-point number followed + by a suffix indicating the time unit. This function simplifies the + representation by using seconds and appending an 's' suffix. + + Args: + time_delta: The timedelta object to be converted. + + Returns: + A string representation of the timedelta in FSD format, expressed + in seconds (e.g., '123.45s'). """ fsd = f"{time_delta.total_seconds()}s" return fsd def repr_timedelta(time_delta: timedelta, method: str = "HMS") -> str: - """Represent a timedelta object as a string using a particular method. + """ + Represent a timedelta object as a string using a specified format method. - method - HMS: 'hours:minutes:seconds' - method - FSD: flux standard duration: 'seconds.s'""" + This function formats a given timedelta object according to the chosen + method. The available methods are: + + - HMS: Represents the duration in 'hours:minutes:seconds' format. + - FSD: Represents the duration in Flux Standard Duration (FSD), + expressed as a floating-point number of seconds with an 's' suffix. + + Args: + time_delta: The timedelta object to be formatted. + method: The method to use for formatting. Must be either 'HMS' or 'FSD'. + + Returns: + A string representation of the timedelta formatted according + to the specified method. + + Raises: + ValueError: If an invalid method is provided. + """ if method == "HMS": return _repr_timedelta_HMS(time_delta) if method == "FSD": @@ -682,16 +1050,28 @@ def repr_timedelta(time_delta: timedelta, method: str = "HMS") -> str: def convert_timestring(timestring: Union[str, int], format_method: str = "HMS") -> str: - """Converts a timestring to a different format. + """ + Converts a timestring to a specified format. + + This function accepts a timestring in a specific format or an integer + representing seconds, and converts it to a formatted string based on + the chosen format method. The available format methods are: + + - HMS: Represents the duration in 'hours:minutes:seconds' format. + - FSD: Represents the duration in Flux Standard Duration (FSD), + expressed as a floating-point number of seconds with an 's' suffix. - timestring: -either- - a timestring in in the format '[days]:[hours]:[minutes]:seconds' - days, hours, minutes are all optional add ons - -or- - an integer representing seconds - format_method: HMS - 'hours:minutes:seconds' - FSD - 'seconds.s' (flux standard duration) + Args: + timestring: A string representing time in the format + '[days]:[hours]:[minutes]:seconds' (where days, hours, + and minutes are optional), or an integer representing + time in seconds. + format_method: The method to use for formatting. Must be either + 'HMS' or 'FSD'. + Returns: + A string representation of the converted timestring formatted + according to the specified method. """ LOG.debug(f"Timestring is: {timestring}") tdelta = convert_to_timedelta(timestring) @@ -701,16 +1081,35 @@ def convert_timestring(timestring: Union[str, int], format_method: str = "HMS") def pretty_format_hms(timestring: str) -> str: """ - Given an HMS timestring, format it so it removes blank entries and adds - labels. + Format an HMS timestring to remove blank entries and add appropriate labels. + + This function takes a timestring in the 'HH:MM:SS' format and formats + it by removing any components that are zero and appending the relevant + labels (days, hours, minutes, seconds). The output is a cleaner string + representation of the time. - :param `timestring`: the HMS timestring we'll format - :returns: a formatted timestring + Args: + timestring: A timestring formatted as 'DD:HH:MM:SS'. Each component + represents days, hours, minutes, and seconds, respectively. + Only the last four components are relevant and may include + leading zeros. + + Returns: + A formatted timestring with non-zero components labeled appropriately. + + Raises: + ValueError: If the input timestring contains more than four components + or is not in the expected format. Examples: - - "00:00:34:00" -> "34m" - - "01:00:00:25" -> "01d:25s" - - "00:19:44:28" -> "19h:44m:28s" + >>> pretty_format_hms("00:00:34:00") + '34m' + >>> pretty_format_hms("01:00:00:25") + '01d:25s' + >>> pretty_format_hms("00:19:44:28") + '19h:44m:28s' + >>> pretty_format_hms("00:00:00:00") + '00s' """ # Create labels and split the timestring labels = ["d", "h", "m", "s"] @@ -735,10 +1134,23 @@ def pretty_format_hms(timestring: str) -> str: def ws_time_to_dt(ws_time: str) -> datetime: """ - Converts a workspace timestring to a datetime object. + Convert a workspace timestring to a datetime object. + + This function takes a workspace timestring formatted as 'YYYYMMDD-HHMMSS' + and converts it into a corresponding datetime object. The input string + must adhere to the specified format to ensure accurate conversion. - :param `ws_time`: A workspace timestring in the format YYYYMMDD-HHMMSS - :returns: A datetime object created from the workspace timestring + Args: + ws_time: A workspace timestring in the format 'YYYYMMDD-HHMMSS', where:\n + - YYYY is the four-digit year, + - MM is the two-digit month (01 to 12), + - DD is the two-digit day (01 to 31), + - HH is the two-digit hour (00 to 23), + - MM is the two-digit minute (00 to 59), + - SS is the two-digit second (00 to 59). + + Returns: + A datetime object constructed from the provided workspace timestring. """ year = int(ws_time[:4]) month = int(ws_time[4:6]) @@ -751,11 +1163,19 @@ def ws_time_to_dt(ws_time: str) -> datetime: def get_package_versions(package_list: List[str]) -> str: """ - Return a table of the versions and locations of installed packages, including python. - If the package is not installed says "Not installed" + Generate a formatted table of installed package versions and their locations. + + This function takes a list of package names and checks for their installed + versions and locations. If a package is not installed, it indicates that + the package is "Not installed". The output includes the Python version + and its executable location at the top of the table. + + Args: + package_list: A list of package names to check for installed versions. - :param `package_list`: A list of packages. - :returns: A string that's a formatted table. + Returns: + A formatted string representing a table of package names, their versions, + and installation locations. """ table = [] for package in package_list: diff --git a/mkdocs.yml b/mkdocs.yml index 76c123bd..4a56483e 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -104,17 +104,17 @@ plugins: - search - codeinclude: title_mode: pymdownx.tabbed - # - gen-files: - # scripts: - # - docs/gen_ref_pages.py - # - mkdocstrings: - # handlers: - # python: - # paths: [merlin] - # options: - # docstring_style: sphinx - # - literate-nav: - # nav_file: SUMMARY.md + - gen-files: + scripts: + - docs/gen_ref_pages.py + - mkdocstrings: + handlers: + python: + paths: [merlin] + options: + docstring_style: google + - literate-nav: + nav_file: SUMMARY.md extra: social: