Skip to content

Commit

Permalink
Merge branch 'main' into flux_log_files
Browse files Browse the repository at this point in the history
  • Loading branch information
jan-janssen authored Dec 20, 2024
2 parents 482cb8f + e9a81f8 commit 5d0046d
Show file tree
Hide file tree
Showing 11 changed files with 215 additions and 114 deletions.
26 changes: 15 additions & 11 deletions executorlib/interactive/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
InteractiveStepExecutor,
execute_tasks_with_dependencies,
)
from executorlib.interactive.slurm import SrunSpawner
from executorlib.standalone.inputcheck import (
check_command_line_argument_lst,
check_executor,
Expand All @@ -18,10 +19,7 @@
check_pmi,
validate_number_of_cores,
)
from executorlib.standalone.interactive.spawner import (
MpiExecSpawner,
SrunSpawner,
)
from executorlib.standalone.interactive.spawner import MpiExecSpawner
from executorlib.standalone.plot import (
draw,
generate_nodes_and_edges,
Expand All @@ -30,7 +28,7 @@
from executorlib.standalone.thread import RaisingThread

try: # The PyFluxExecutor requires flux-base to be installed.
from executorlib.interactive.flux import FluxPythonSpawner
from executorlib.interactive.flux import FluxPythonSpawner, validate_max_workers
except ImportError:
pass

Expand Down Expand Up @@ -230,13 +228,19 @@ def create_executor(
resource_dict["flux_log_files"] = flux_log_files
if block_allocation:
resource_dict["init_function"] = init_function
max_workers = validate_number_of_cores(
max_cores=max_cores,
max_workers=max_workers,
cores_per_worker=cores_per_worker,
set_local_cores=False,
)
validate_max_workers(
max_workers=max_workers,
cores=cores_per_worker,
threads_per_core=resource_dict["threads_per_core"],
)
return InteractiveExecutor(
max_workers=validate_number_of_cores(
max_cores=max_cores,
max_workers=max_workers,
cores_per_worker=cores_per_worker,
set_local_cores=False,
),
max_workers=max_workers,
executor_kwargs=resource_dict,
spawner=FluxPythonSpawner,
)
Expand Down
14 changes: 14 additions & 0 deletions executorlib/interactive/flux.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,25 @@
import os
from typing import Optional

import flux
import flux.job

from executorlib.standalone.interactive.spawner import BaseSpawner


def validate_max_workers(max_workers, cores, threads_per_core):
handle = flux.Flux()
cores_total = flux.resource.list.resource_list(handle).get().up.ncores
cores_requested = max_workers * cores * threads_per_core
if cores_total < cores_requested:
raise ValueError(
"The number of requested cores is larger than the available cores "
+ str(cores_total)
+ " < "
+ str(cores_requested)
)


class FluxPythonSpawner(BaseSpawner):
"""
A class representing the FluxPythonInterface.
Expand Down
94 changes: 94 additions & 0 deletions executorlib/interactive/slurm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from typing import Optional

from executorlib.standalone.interactive.spawner import SubprocessSpawner

SLURM_COMMAND = "srun"


class SrunSpawner(SubprocessSpawner):
def __init__(
self,
cwd: Optional[str] = None,
cores: int = 1,
threads_per_core: int = 1,
gpus_per_core: int = 0,
openmpi_oversubscribe: bool = False,
slurm_cmd_args: list[str] = [],
):
"""
Srun interface implementation.
Args:
cwd (str, optional): The current working directory. Defaults to None.
cores (int, optional): The number of cores to use. Defaults to 1.
threads_per_core (int, optional): The number of threads per core. Defaults to 1.
gpus_per_core (int, optional): The number of GPUs per core. Defaults to 0.
openmpi_oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
slurm_cmd_args (list[str], optional): Additional command line arguments. Defaults to [].
"""
super().__init__(
cwd=cwd,
cores=cores,
openmpi_oversubscribe=openmpi_oversubscribe,
threads_per_core=threads_per_core,
)
self._gpus_per_core = gpus_per_core
self._slurm_cmd_args = slurm_cmd_args

def generate_command(self, command_lst: list[str]) -> list[str]:
"""
Generate the command list for the Srun interface.
Args:
command_lst (list[str]): The command list.
Returns:
list[str]: The generated command list.
"""
command_prepend_lst = generate_slurm_command(
cores=self._cores,
cwd=self._cwd,
threads_per_core=self._threads_per_core,
gpus_per_core=self._gpus_per_core,
openmpi_oversubscribe=self._openmpi_oversubscribe,
slurm_cmd_args=self._slurm_cmd_args,
)
return super().generate_command(
command_lst=command_prepend_lst + command_lst,
)


def generate_slurm_command(
cores: int,
cwd: str,
threads_per_core: int = 1,
gpus_per_core: int = 0,
openmpi_oversubscribe: bool = False,
slurm_cmd_args: list[str] = [],
) -> list[str]:
"""
Generate the command list for the SLURM interface.
Args:
cores (int): The number of cores.
cwd (str): The current working directory.
threads_per_core (int, optional): The number of threads per core. Defaults to 1.
gpus_per_core (int, optional): The number of GPUs per core. Defaults to 0.
openmpi_oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
slurm_cmd_args (list[str], optional): Additional command line arguments. Defaults to [].
Returns:
list[str]: The generated command list.
"""
command_prepend_lst = [SLURM_COMMAND, "-n", str(cores)]
if cwd is not None:
command_prepend_lst += ["-D", cwd]
if threads_per_core > 1:
command_prepend_lst += ["--cpus-per-task" + str(threads_per_core)]
if gpus_per_core > 0:
command_prepend_lst += ["--gpus-per-task=" + str(gpus_per_core)]
if openmpi_oversubscribe:
command_prepend_lst += ["--oversubscribe"]
if len(slurm_cmd_args) > 0:
command_prepend_lst += slurm_cmd_args
return command_prepend_lst
3 changes: 1 addition & 2 deletions executorlib/standalone/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
interface_send,
interface_shutdown,
)
from executorlib.standalone.interactive.spawner import MpiExecSpawner, SrunSpawner
from executorlib.standalone.interactive.spawner import MpiExecSpawner
from executorlib.standalone.thread import RaisingThread

__all__ = [
Expand All @@ -18,5 +18,4 @@
interface_receive,
RaisingThread,
MpiExecSpawner,
SrunSpawner,
]
34 changes: 25 additions & 9 deletions executorlib/standalone/hdf.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
from typing import Optional, Tuple
import os
from typing import List, Optional, Tuple

import cloudpickle
import h5py
import numpy as np

group_dict = {
"fn": "function",
"args": "input_args",
"kwargs": "input_kwargs",
"output": "output",
"runtime": "runtime",
"queue_id": "queue_id",
}


def dump(file_name: str, data_dict: dict) -> None:
"""
Expand All @@ -13,14 +23,6 @@ def dump(file_name: str, data_dict: dict) -> None:
file_name (str): file name of the HDF5 file as absolute path
data_dict (dict): dictionary containing the python function to be executed {"fn": ..., "args": (), "kwargs": {}}
"""
group_dict = {
"fn": "function",
"args": "input_args",
"kwargs": "input_kwargs",
"output": "output",
"runtime": "runtime",
"queue_id": "queue_id",
}
with h5py.File(file_name, "a") as fname:
for data_key, data_value in data_dict.items():
if data_key in group_dict.keys():
Expand Down Expand Up @@ -97,3 +99,17 @@ def get_queue_id(file_name: str) -> Optional[int]:
return cloudpickle.loads(np.void(hdf["/queue_id"]))
else:
return None


def get_cache_data(cache_directory: str) -> List[dict]:
file_lst = []
for file_name in os.listdir(cache_directory):
with h5py.File(os.path.join(cache_directory, file_name), "r") as hdf:
file_content_dict = {
key: cloudpickle.loads(np.void(hdf["/" + key]))
for key in group_dict.values()
if key in hdf
}
file_content_dict["filename"] = file_name
file_lst.append(file_content_dict)
return file_lst
90 changes: 0 additions & 90 deletions executorlib/standalone/interactive/spawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from typing import Optional

MPI_COMMAND = "mpiexec"
SLURM_COMMAND = "srun"


class BaseSpawner(ABC):
Expand Down Expand Up @@ -147,59 +146,6 @@ def generate_command(self, command_lst: list[str]) -> list[str]:
)


class SrunSpawner(SubprocessSpawner):
def __init__(
self,
cwd: Optional[str] = None,
cores: int = 1,
threads_per_core: int = 1,
gpus_per_core: int = 0,
openmpi_oversubscribe: bool = False,
slurm_cmd_args: list[str] = [],
):
"""
Srun interface implementation.
Args:
cwd (str, optional): The current working directory. Defaults to None.
cores (int, optional): The number of cores to use. Defaults to 1.
threads_per_core (int, optional): The number of threads per core. Defaults to 1.
gpus_per_core (int, optional): The number of GPUs per core. Defaults to 0.
openmpi_oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
slurm_cmd_args (list[str], optional): Additional command line arguments. Defaults to [].
"""
super().__init__(
cwd=cwd,
cores=cores,
openmpi_oversubscribe=openmpi_oversubscribe,
threads_per_core=threads_per_core,
)
self._gpus_per_core = gpus_per_core
self._slurm_cmd_args = slurm_cmd_args

def generate_command(self, command_lst: list[str]) -> list[str]:
"""
Generate the command list for the Srun interface.
Args:
command_lst (list[str]): The command list.
Returns:
list[str]: The generated command list.
"""
command_prepend_lst = generate_slurm_command(
cores=self._cores,
cwd=self._cwd,
threads_per_core=self._threads_per_core,
gpus_per_core=self._gpus_per_core,
openmpi_oversubscribe=self._openmpi_oversubscribe,
slurm_cmd_args=self._slurm_cmd_args,
)
return super().generate_command(
command_lst=command_prepend_lst + command_lst,
)


def generate_mpiexec_command(
cores: int, openmpi_oversubscribe: bool = False
) -> list[str]:
Expand All @@ -220,39 +166,3 @@ def generate_mpiexec_command(
if openmpi_oversubscribe:
command_prepend_lst += ["--oversubscribe"]
return command_prepend_lst


def generate_slurm_command(
cores: int,
cwd: str,
threads_per_core: int = 1,
gpus_per_core: int = 0,
openmpi_oversubscribe: bool = False,
slurm_cmd_args: list[str] = [],
) -> list[str]:
"""
Generate the command list for the SLURM interface.
Args:
cores (int): The number of cores.
cwd (str): The current working directory.
threads_per_core (int, optional): The number of threads per core. Defaults to 1.
gpus_per_core (int, optional): The number of GPUs per core. Defaults to 0.
openmpi_oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
slurm_cmd_args (list[str], optional): Additional command line arguments. Defaults to [].
Returns:
list[str]: The generated command list.
"""
command_prepend_lst = [SLURM_COMMAND, "-n", str(cores)]
if cwd is not None:
command_prepend_lst += ["-D", cwd]
if threads_per_core > 1:
command_prepend_lst += ["--cpus-per-task" + str(threads_per_core)]
if gpus_per_core > 0:
command_prepend_lst += ["--gpus-per-task=" + str(gpus_per_core)]
if openmpi_oversubscribe:
command_prepend_lst += ["--oversubscribe"]
if len(slurm_cmd_args) > 0:
command_prepend_lst += slurm_cmd_args
return command_prepend_lst
4 changes: 3 additions & 1 deletion notebooks/3-hpc-allocation.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,9 @@
}
],
"source": [
"with Executor(backend=\"flux_allocation\", flux_executor_pmi_mode=\"pmix\") as exe:\n",
"with Executor(\n",
" backend=\"flux_allocation\", flux_executor_pmi_mode=\"pmix\", flux_executor_nesting=True\n",
") as exe:\n",
" fs = exe.submit(calc_nested)\n",
" print(fs.result())"
]
Expand Down
33 changes: 33 additions & 0 deletions tests/test_cache_executor_interactive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import os
import shutil
import unittest

from executorlib import Executor

try:
from executorlib.standalone.hdf import get_cache_data

skip_h5py_test = False
except ImportError:
skip_h5py_test = True


@unittest.skipIf(
skip_h5py_test, "h5py is not installed, so the h5io tests are skipped."
)
class TestCacheFunctions(unittest.TestCase):
def test_cache_data(self):
cache_directory = "./cache"
with Executor(backend="local", cache_directory=cache_directory) as exe:
future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]
result_lst = [f.result() for f in future_lst]

cache_lst = get_cache_data(cache_directory=cache_directory)
self.assertEqual(sum([c["output"] for c in cache_lst]), sum(result_lst))
self.assertEqual(
sum([sum(c["input_args"][0]) for c in cache_lst]), sum(result_lst)
)

def tearDown(self):
if os.path.exists("cache"):
shutil.rmtree("cache")
Loading

0 comments on commit 5d0046d

Please sign in to comment.