Skip to content

Commit

Permalink
Validate maximum number of workers for flux
Browse files Browse the repository at this point in the history
  • Loading branch information
jan-janssen committed Dec 19, 2024
1 parent b37acd6 commit 1641409
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 0 deletions.
6 changes: 6 additions & 0 deletions executorlib/interactive/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

try: # The PyFluxExecutor requires flux-base to be installed.
from executorlib.interactive.flux import FluxPythonSpawner
from executorlib.interactive.flux import validate_max_workers
except ImportError:
pass

Expand Down Expand Up @@ -226,6 +227,11 @@ def create_executor(
resource_dict["flux_executor_nesting"] = flux_executor_nesting
if block_allocation:
resource_dict["init_function"] = init_function
validate_max_workers(
max_workers=max_workers,
cores=cores_per_worker,
threads_per_core=resource_dict["threads_per_core"],
)
return InteractiveExecutor(
max_workers=validate_number_of_cores(
max_cores=max_cores,
Expand Down
9 changes: 9 additions & 0 deletions executorlib/interactive/flux.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
import os
from typing import Optional

import flux
import flux.job

from executorlib.standalone.interactive.spawner import BaseSpawner


def validate_max_workers(max_workers, cores, threads_per_core):
handle = flux.Flux()
cores_total = flux.resource.list.resource_list(handle).get().up.ncores
cores_requested = max_workers * cores * threads_per_core
if cores_total < cores_requested:
raise ValueError("The number of requested cores is larger than the available cores " + str(cores_total) + " < " + str(cores_requested))


class FluxPythonSpawner(BaseSpawner):
"""
A class representing the FluxPythonInterface.
Expand Down
10 changes: 10 additions & 0 deletions tests/test_executor_backend_flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,13 @@ def test_internal_memory(self):
self.assertFalse(f.done())
self.assertEqual(f.result(), np.array([5]))
self.assertTrue(f.done())

def test_validate_max_workers(self):
with self.assertRaises(ValueError):
Executor(
max_workers=10,
resource_dict={"cores": 10, "threads_per_core": 10},
flux_executor=self.executor,
backend="flux_allocation",
block_allocation=True,
)

0 comments on commit 1641409

Please sign in to comment.