Skip to content

Commit

Permalink
Feature/flux new api (#407)
Browse files Browse the repository at this point in the history
* Update flux run command.

* Update get_flux_cmd and add get_flux_alloc. Use these functions in all cases.

* Update get_flux_alloc and test_definitions.py to work for testing.

* Update actual flux version.

* Update CHANGELOG.md with flux version.

* Change DEFAULT_FLUX_VERSION .

* Fix-style

* Return to default flux run in scriptadapter.

* fix bugs for stop-workers and schema validation

fix bugs for stop-workers and schema validation

fix all flags for stop-workers

fix small bug with schema validation

modify CHANGELOG

run fix-style

add myself to contributors list

decouple celery logic from main

remove unused import

make changes Luc requested in PR

* Fix batch flux exe and have test flux script return version as well.

* Make sure test flux decalres itself.
Fix flux alloc full path call.

* Make sure qsub emits fake info.
Check for flux or qsub in env for testing before adding the fake command.

* Move fake commands to test directory

* Remove unneeded f-string.

* Change the scheduler check output strings to be more descriptive.

---------

Co-authored-by: Brian Gunnarson <brianfunnarson14@gmail.com>
  • Loading branch information
koning and bgunnar5 authored Mar 28, 2023
1 parent b483b23 commit 535a4bf
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 23 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Merlin will now assign `default_worker` to any step not associated with a worker
- Added `get_step_worker_map()` as a method in `specification.py`
- Added `tabulate_info()` function in `display.py` to help with table formatting
- Added get_flux_alloc function for new flux version >= 0.48.x interface change

### Changed
- Changed celery_regex to celery_slurm_regex in test_definitions.py
Expand All @@ -71,6 +72,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Modified `batch_worker_launch` to use the new `parse_batch_block` function
- Added a function `construct_scheduler_legend` to build a dict that keeps as much information as we need about each scheduler stored in one place
- Cleaned up the `construct_worker_launch_command` function to utilize the newly added functions and decrease the amount of repeated code
- Changed get_flux_cmd for new flux version >=0.48.x interface


## [1.9.1]
Expand Down
2 changes: 0 additions & 2 deletions merlin/examples/workflows/flux/scripts/flux_test/flux

This file was deleted.

2 changes: 0 additions & 2 deletions merlin/examples/workflows/flux/scripts/pbs_test/qsub

This file was deleted.

14 changes: 8 additions & 6 deletions merlin/study/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import subprocess
from typing import Dict, Optional, Union

from merlin.utils import get_yaml_var
from merlin.utils import get_flux_alloc, get_yaml_var


LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -154,21 +154,21 @@ def get_batch_type(default=None):
:returns: (str) The batch name (available options: slurm, flux, lsf, pbs).
"""
# Flux should be checked first due to slurm emulation scripts
LOG.debug(f"check for flux = {check_for_flux()}")
LOG.debug(f"check for flux scheduler = {check_for_flux()}")
if check_for_flux():
return "flux"

# PBS should be checked before slurm for testing
LOG.debug(f"check for pbs = {check_for_pbs()}")
LOG.debug(f"check for pbs scheduler = {check_for_pbs()}")
if check_for_pbs():
return "pbs"

# LSF should be checked before slurm for testing
LOG.debug(f"check for lsf = {check_for_lsf()}")
LOG.debug(f"check for lsf scheduler = {check_for_lsf()}")
if check_for_lsf():
return "lsf"

LOG.debug(f"check for slurm = {check_for_slurm()}")
LOG.debug(f"check for slurm scheduler = {check_for_slurm()}")
if check_for_slurm():
return "slurm"

Expand Down Expand Up @@ -329,7 +329,9 @@ def construct_worker_launch_command(batch: Optional[Dict], btype: str, nodes: in
flux_path += "/"

flux_exe: str = os.path.join(flux_path, "flux")
launch_command = f"{flux_exe} mini alloc -o pty -N {nodes} --exclusive --job-name=merlin"
flux_alloc: str = get_flux_alloc(flux_exe)
launch_command = f"{flux_alloc} -o pty -N {nodes} --exclusive --job-name=merlin"

if bank:
launch_command += f" --setattr=system.bank={bank}"
if queue:
Expand Down
4 changes: 2 additions & 2 deletions merlin/study/script_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,10 @@ def __init__(self, **kwargs):
:param **kwargs: A dictionary with default settings for the adapter.
"""
flux_command = kwargs.pop("flux_command", "flux mini run")
# The flux_command should always be overriden by the study object's flux_command property
flux_command = kwargs.pop("flux_command", "flux run")
super().__init__(**kwargs)

# "cmd": "flux mini run",
self._cmd_flags = {
"cmd": flux_command,
"ntasks": "-n",
Expand Down
2 changes: 1 addition & 1 deletion merlin/study/study.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ def expanded_spec(self):
@cached_property
def flux_command(self):
"""
Returns the flux version.
Returns the flux command, this will include the full path, if flux_path given in the workflow.
"""
flux_bin = "flux"
if "flux_path" in self.expanded_spec.batch.keys():
Expand Down
32 changes: 28 additions & 4 deletions merlin/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@

LOG = logging.getLogger(__name__)
ARRAY_FILE_FORMATS = ".npy, .csv, .tab"
DEFAULT_FLUX_VERSION = "0.13"
DEFAULT_FLUX_VERSION = "0.48.0"


def get_user_process_info(user=None, attrs=None):
Expand Down Expand Up @@ -397,24 +397,48 @@ def get_flux_version(flux_path, no_errors=False):

def get_flux_cmd(flux_path, no_errors=False):
"""
Return the flux command as string
Return the flux run command as string
:param `flux_path`: the full path to the flux bin
:param `no_errors`: a flag to determine if this a test run to ignore errors
"""
# The default is for flux version >= 0.13,
# The default is for flux version >= 0.48.x
# this may change in the future.
flux_cmd = "flux mini run"
flux_cmd = "flux run"

flux_ver = get_flux_version(flux_path, no_errors=no_errors)

vers = [int(n) for n in flux_ver.split(".")]
if vers[0] == 0 and vers[1] < 48:
flux_cmd = "flux mini run"

if vers[0] == 0 and vers[1] < 13:
flux_cmd = "flux wreckrun"

return flux_cmd


def get_flux_alloc(flux_path, no_errors=False):
"""
Return the flux alloc command as string
:param `flux_path`: the full path to the flux bin
:param `no_errors`: a flag to determine if this a test run to ignore errors
"""
# The default is for flux version >= 0.48.x
# this may change in the future.
flux_alloc = f"{flux_path} alloc"

flux_ver = get_flux_version(flux_path, no_errors=no_errors)

vers = [int(n) for n in flux_ver.split(".")]

if vers[0] == 0 and vers[1] < 48:
flux_alloc = f"{flux_path} mini alloc"

return flux_alloc


def check_machines(machines):
"""
Return a True if the current machine is in the list of machines.
Expand Down
8 changes: 8 additions & 0 deletions tests/integration/fake_commands/flux
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/usr/bin/env python3
import sys


if len(sys.argv) > 1 and sys.argv[1] == 'version':
print("commands: 0.48.0\n<This is a fake flux for testing>")
else:
print("Nodes\n<This is a fake flux for testing>")
2 changes: 2 additions & 0 deletions tests/integration/fake_commands/qsub
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/bin/sh
echo "pbs_version = 19.0.0\n<This is a fake qsub for testing>"
20 changes: 14 additions & 6 deletions tests/integration/test_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
}
"""

import shutil

# Pylint complains that we didn't install this module but it's defined locally so ignore
from conditions import ( # pylint: disable=E0401
FileHasNoRegex,
Expand All @@ -52,7 +54,7 @@
StepFileHasRegex,
)

from merlin.utils import get_flux_cmd
from merlin.utils import get_flux_alloc, get_flux_cmd


OUTPUT_DIR = "cli_test_studies"
Expand All @@ -66,8 +68,9 @@ def define_tests(): # pylint: disable=R0914
is the test's name, and the value is a tuple
of (shell command, condition(s) to satisfy).
"""
flux_alloc = (get_flux_alloc("flux", no_errors=True),)
celery_slurm_regex = r"(srun\s+.*)?celery\s+(-A|--app)\s+merlin\s+worker\s+.*"
celery_flux_regex = r"(flux mini alloc\s+.*)?celery\s+(-A|--app)\s+merlin\s+worker\s+.*"
celery_flux_regex = rf"({flux_alloc}\s+.*)?celery\s+(-A|--app)\s+merlin\s+worker\s+.*"
celery_pbs_regex = r"(qsub\s+.*)?celery\s+(-A|--app)\s+merlin\s+worker\s+.*"

# shortcut string variables
Expand All @@ -87,10 +90,15 @@ def define_tests(): # pylint: disable=R0914
flux = f"{examples}/flux/flux_test.yaml"
flux_restart = f"{examples}/flux/flux_par_restart.yaml"
flux_native = f"{examples}/flux/flux_par_native_test.yaml"
flux_native_path = f"{examples}/flux/scripts/flux_test"
workers_flux = f"""PATH="{flux_native_path}:$PATH";merlin {err_lvl} run-workers"""
pbs_path = f"{examples}/flux/scripts/pbs_test"
workers_pbs = f"""PATH="{pbs_path}:$PATH";merlin {err_lvl} run-workers"""
workers_flux = f"merlin {err_lvl} run-workers"
fake_cmds_path = "tests/integration/fake_commands"
if not shutil.which("flux"):
# Use bogus flux to test if no flux is present
workers_flux = f"""PATH="{fake_cmds_path}:$PATH";merlin {err_lvl} run-workers"""
workers_pbs = f"merlin {err_lvl} run-workers" ""
if not shutil.which("qsub"):
# Use bogus qsub to test if no pbs scheduler is present
workers_pbs = f"""PATH="{fake_cmds_path}:$PATH";merlin {err_lvl} run-workers"""
lsf = f"{examples}/lsf/lsf_par.yaml"
mul_workers_demo = f"{dev_examples}/multiple_workers.yaml"
black = "black --check --target-version py36"
Expand Down

0 comments on commit 535a4bf

Please sign in to comment.