Skip to content

Commit

Permalink
add api docs for half of the study directory
Browse files Browse the repository at this point in the history
  • Loading branch information
bgunnar5 committed Dec 4, 2024
1 parent 3969414 commit 763b135
Show file tree
Hide file tree
Showing 5 changed files with 1,328 additions and 483 deletions.
11 changes: 11 additions & 0 deletions merlin/study/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,14 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
###############################################################################

"""
Files in the `study/` directory handle the logic of constructing
and executing full Merlin studies. This logic includes:\n
- Putting together batch worker launch commands
- Hosting an interface for Celery commands
- Obtaining the DAG from Maestro and modifying it for use with Celery
- Creating an interface for different schedulers
- Displaying the status of studies
- Handling the logic for entire studies _and_ single steps in studies
"""
258 changes: 204 additions & 54 deletions merlin/study/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,30 +33,49 @@
Currently only the batch worker launch for slurm, lsf or flux
are implemented.
"""
import logging
import os
import subprocess
from typing import Dict, Optional, Union

from merlin.spec.specification import MerlinSpec
from merlin.utils import convert_timestring, get_flux_alloc, get_flux_version, get_yaml_var


LOG = logging.getLogger(__name__)


def batch_check_parallel(spec):
def batch_check_parallel(spec: MerlinSpec) -> bool:
"""
Check for a parallel batch section in the yaml file.
Check for a parallel batch section in the provided MerlinSpec object.
This function examines the 'batch' section of the given specification to determine
whether it is configured for parallel execution. It checks the 'type' attribute
within the batch section, defaulting to 'local' if not specified. If the type
is anything other than 'local', the function will return True, indicating that
parallel processing is enabled.
Args:
spec (spec.specification.MerlinSpec): An instance of the
[`MerlinSpec`][spec.specification.MerlinSpec] class that contains the
configuration details, including the batch section.
Returns:
Returns True if the batch type is set to a value other than 'local',
indicating that parallel processing is enabled; otherwise, returns False.
Raises:
AttributeError: If the 'batch' section is not present in the specification,
an error is logged and an AttributeError is raised.
"""
parallel = False

try:
batch = spec.batch
except AttributeError:
except AttributeError as exc:
LOG.error("The batch section is required in the specification file.")
raise
raise exc

btype = get_yaml_var(batch, "type", "local")
if btype != "local":
Expand All @@ -65,14 +84,29 @@ def batch_check_parallel(spec):
return parallel


def check_for_scheduler(scheduler, scheduler_legend):
def check_for_scheduler(scheduler: str, scheduler_legend: Dict[str, str]) -> bool:
"""
Check which scheduler (Flux, Slurm, LSF, or PBS) is the main
scheduler for the cluster.
:param `scheduler`: A string representing the scheduler to check for
Options: flux, slurm, lsf, or pbs
:param `scheduler_legend`: A dict of information related to each scheduler
:returns: A bool representing whether `scheduler` is the main scheduler for the cluster
Check which scheduler (Flux, Slurm, LSF, or PBS) is the main scheduler for the cluster.
This function verifies if the specified scheduler is the main scheduler by executing
a command associated with it from the provided scheduler legend. It returns a boolean
indicating whether the specified scheduler is active.
Args:
scheduler: A string representing the scheduler to check for. Options include 'flux',
'slurm', 'lsf', or 'pbs'.
scheduler_legend: A dictionary containing information related to each scheduler,
including the command to check its status and the expected output. See
[`construct_scheduler_legend`][study.batch.construct_scheduler_legend]
for more information on all the settings this dict contains.
Returns:
Returns True if the specified scheduler is the main scheduler for the
cluster, otherwise returns False.
Raises:
FileNotFoundError: If the command associated with the scheduler cannot be found.
PermissionError: If there are insufficient permissions to execute the command.
"""
# Check for invalid scheduler
if scheduler not in ("flux", "slurm", "lsf", "pbs"):
Expand All @@ -96,14 +130,26 @@ def check_for_scheduler(scheduler, scheduler_legend):
return False


def get_batch_type(scheduler_legend, default=None):
def get_batch_type(scheduler_legend: Dict[str, str], default: str = None) -> str:
"""
Determine which batch scheduler to use.
:param scheduler_legend: A dict storing info related to each scheduler
:param default: (str) The default batch scheduler to use if a scheduler
can't be determined. The default is None.
:returns: (str) The batch name (available options: slurm, flux, lsf, pbs).
This function checks a predefined list of batch schedulers in a specific order
to determine which one is available for use. If none of the schedulers are found,
it checks the system type environment variable to suggest a default scheduler.
If no suitable scheduler is determined, it returns the specified default value.
Args:
scheduler_legend: A dictionary storing information related to each
scheduler, including commands and expected outputs for checking their
availability. See [`construct_scheduler_legend`][study.batch.construct_scheduler_legend]
for more information on all the settings this dict contains.
default: The default batch scheduler to use if a scheduler cannot be determined.
Returns:
The name of the available batch scheduler. Possible options include
'slurm', 'flux', 'lsf', or 'pbs'. If no scheduler is found, returns
the specified default value.
"""
# These schedulers are listed in order of which should be checked for first
# 1. Flux should be checked first due to slurm emulation scripts
Expand All @@ -126,13 +172,29 @@ def get_batch_type(scheduler_legend, default=None):
return default


def get_node_count(parsed_batch: Dict, default=1):
def get_node_count(parsed_batch: Dict, default: int = 1) -> int:
"""
Determine a default node count based on the environment.
:param default: (int) The number of nodes to return if a node count from
the environment cannot be determined.
:param returns: (int) The number of nodes to use.
This function checks the environment and the Flux version to determine the
appropriate number of nodes to use for batch processing. It first verifies
the Flux version, then attempts to retrieve the node count from the Flux
allocation or environment variables specific to Slurm or LSF. If no valid
node count can be determined, it returns a specified default value.
Args:
parsed_batch: A dictionary containing parsed batch configurations.
See [`parse_batch_block`][study.batch.parse_batch_block] for more
information on all the settings in this dictionary.
default: The number of nodes to return if a node count from the
environment cannot be determined.
Returns:
The number of nodes to use for the batch job. This value is determined
based on the environment and scheduler specifics.
Raises:
ValueError: If the Flux version is too old (below 0.17.0).
"""

# Flux version check
Expand Down Expand Up @@ -166,9 +228,38 @@ def get_node_count(parsed_batch: Dict, default=1):

def parse_batch_block(batch: Dict) -> Dict:
"""
A function to parse the batch block of the yaml file.
:param `batch`: The batch block to read in
:returns: A dict with all the info (or defaults) from the batch block
Parse the batch block of a YAML configuration file.
This function extracts relevant information from the provided batch block
dictionary, including paths, execution options, and defaults. It retrieves
the Flux executable path and allocation details, and populates a dictionary
with the parsed values.
Args:
batch: A dictionary representing the batch block from the YAML
configuration file.
Returns:
Dict: A dictionary containing parsed information from the batch block,
including:\n
- `btype`: The type of batch job (default is 'local').
- `nodes`: The number of nodes to use (default is None).
- `shell`: The shell to use (default is 'bash').
- `bank`: The bank to charge for the job (default is an empty string).
- `queue`: The queue to submit the job to (default is an empty string).
- `walltime`: The maximum wall time for the job (default is an empty string).
- `launch pre`: Any commands to run before launching (default is an empty string).
- `launch args`: Arguments for the launch command (default is an empty string).
- `launch command`: Custom command to launch workers. This will override the
default launch command (default is an empty string).
- `flux path`: Optional path to flux bin.
- `flux exe`: The full path to the Flux executable.
- `flux exec`: Optional flux exec command to launch workers on all nodes if
`flux_exec_workers` is True (default is None).
- `flux alloc`: The Flux allocation retrieved from the executable.
- `flux opts`: Optional flux start options (default is an empty string).
- `flux exec workers`: Optional flux argument to launch workers
on all nodes (default is True).
"""
flux_path: str = get_yaml_var(batch, "flux_path", "")
if "/" in flux_path:
Expand Down Expand Up @@ -204,9 +295,20 @@ def parse_batch_block(batch: Dict) -> Dict:

def get_flux_launch(parsed_batch: Dict) -> str:
"""
Build the flux launch command based on the batch section of the yaml.
:param `parsed_batch`: A dict of batch configurations
:returns: The flux launch command
Build the Flux launch command based on the batch section of the YAML configuration.
This function constructs the command to launch a Flux job using the parameters
specified in the parsed batch configuration. It determines the appropriate
execution command for Flux workers and integrates it with the launch command
provided in the batch configuration.
Args:
parsed_batch: A dictionary containing batch configuration parameters.
See [`parse_batch_block`][study.batch.parse_batch_block] for more information
on all the settings in this dictionary.
Returns:
The constructed Flux launch command, ready to be executed.
"""
default_flux_exec = "flux exec" if parsed_batch["launch command"] else f"{parsed_batch['flux exe']} exec"
flux_exec: str = ""
Expand All @@ -225,20 +327,37 @@ def get_flux_launch(parsed_batch: Dict) -> str:


def batch_worker_launch(
spec: Dict,
spec: MerlinSpec,
com: str,
nodes: Optional[Union[str, int]] = None,
batch: Optional[Dict] = None,
nodes: Union[str, int] = None,
batch: Dict = None,
) -> str:
"""
The configuration in the batch section of the merlin spec
is used to create the worker launch line, which may be
different from a simulation launch.
: param spec : (Dict) workflow specification
: param com : (str): The command to launch with batch configuration
: param nodes : (Optional[Union[str, int]]): The number of nodes to use in the batch launch
: param batch : (Optional[Dict]): An optional batch override from the worker config
Create the worker launch command based on the batch configuration in the
workflow specification.
This function constructs a command to launch a worker process using the
specified batch configuration. It handles different batch types and
integrates any necessary pre-launch commands, launch arguments, and
node specifications.
Args:
spec (spec.specification.MerlinSpec): An instance of the
[`MerlinSpec`][spec.specification.MerlinSpec] class that contains the
configuration details, including the batch section.
com: The command to launch with the batch configuration.
nodes: The number of nodes to use in the batch launch. If not specified,
it will default to the value in the batch configuration.
batch: An optional batch override from the worker configuration. If not
provided, the function will attempt to retrieve the batch section from
the specification.
Returns:
The constructed worker launch command, ready to be executed.
Raises:
AttributeError: If the batch section is missing in the specification.
TypeError: If the `nodes` parameter is of an invalid type.
"""
if batch is None:
try:
Expand Down Expand Up @@ -289,17 +408,33 @@ def batch_worker_launch(

def construct_scheduler_legend(parsed_batch: Dict, nodes: int) -> Dict:
"""
Constructs a legend of relevant information needed for each scheduler. This includes:
- bank (str): The flag to add a bank to the launch command
- check cmd (list): The command to run to check if this is the main scheduler for the cluster
- expected check output (str): The expected output from running the check cmd
- launch (str): The initial launch command for the scheduler
- queue (str): The flag to add a queue to the launch command
- walltime (str): The flag to add a walltime to the launch command
:param `parsed_batch`: A dict of batch configurations
:param `nodes`: An int representing the number of nodes to use in a launch command
:returns: A dict of scheduler related information
Constructs a legend of relevant information needed for each scheduler.
This function generates a dictionary containing configuration details for various
job schedulers based on the provided batch configuration. The returned dictionary
includes flags for bank, queue, and walltime, as well as commands to check the
scheduler and the initial launch command.
Args:
parsed_batch: A dictionary of batch configurations, which must include `bank`,
`queue`, `walltime`, and `flux alloc`. See
[`parse_batch_block`][study.batch.parse_batch_block] for more information on
all the settings in this dictionary.
nodes: The number of nodes to use in the launch command.
Returns:
A dictionary containing scheduler-related information, structured as follows:\n
- For each scheduler (e.g., 'flux', 'lsf', 'pbs', 'slurm'):\n
\t- `bank` (str): The flag to add a bank to the launch command.
- `check cmd` (List[str]): The command to run to check if this is the main
scheduler for the cluster.
- `expected check output` (bytes): The expected output from running
the check command.
- `launch` (str): The initial launch command for the scheduler.
- `queue` (str): The flag to add a queue to the launch command (if
applicable).
- `walltime` (str): The flag to add a walltime to the launch command
(if applicable).
"""
scheduler_legend = {
"flux": {
Expand Down Expand Up @@ -338,11 +473,26 @@ def construct_scheduler_legend(parsed_batch: Dict, nodes: int) -> Dict:

def construct_worker_launch_command(parsed_batch: Dict, nodes: int) -> str:
"""
If no 'worker_launch' is found in the batch yaml, this method constructs the needed launch command.
:param `parsed_batch`: A dict of batch configurations
:param `nodes`:: The number of nodes to use in the batch launch
:returns: The launch command
Constructs the worker launch command based on the provided batch configuration.
This function generates a launch command for a worker process when no
'worker_launch' command is specified in the batch configuration. It
utilizes the scheduler legend to incorporate necessary flags such as
bank, queue, and walltime, depending on the workload manager.
Args:
parsed_batch: A dictionary of batch configurations, which must include
`btype`, `bank`, `queue`, and `walltime`. See
[`parse_batch_block`][study.batch.parse_batch_block] for more information
on all the settings in this dictionary.
nodes: The number of nodes to use in the batch launch.
Returns:
The constructed launch command for the worker process.
Raises:
TypeError: If the PBS scheduler is enabled for a batch type other than 'flux'.
KeyError: If the workload manager is not found in the scheduler legend.
"""
# Initialize launch_command and get the scheduler_legend and workload_manager
launch_command: str = ""
Expand Down
Loading

0 comments on commit 763b135

Please sign in to comment.