diff --git a/executorlib/interactive/executor.py b/executorlib/interactive/executor.py index ed32d31f..568982bc 100644 --- a/executorlib/interactive/executor.py +++ b/executorlib/interactive/executor.py @@ -30,6 +30,7 @@ try: # The PyFluxExecutor requires flux-base to be installed. from executorlib.interactive.flux import FluxPythonSpawner + from executorlib.interactive.flux import validate_max_workers except ImportError: pass @@ -226,6 +227,11 @@ def create_executor( resource_dict["flux_executor_nesting"] = flux_executor_nesting if block_allocation: resource_dict["init_function"] = init_function + validate_max_workers( + max_workers=max_workers, + cores=cores_per_worker, + threads_per_core=resource_dict["threads_per_core"], + ) return InteractiveExecutor( max_workers=validate_number_of_cores( max_cores=max_cores, diff --git a/executorlib/interactive/flux.py b/executorlib/interactive/flux.py index 472a4792..04f25a5c 100644 --- a/executorlib/interactive/flux.py +++ b/executorlib/interactive/flux.py @@ -1,11 +1,20 @@ import os from typing import Optional +import flux import flux.job from executorlib.standalone.interactive.spawner import BaseSpawner +def validate_max_workers(max_workers, cores, threads_per_core): + handle = flux.Flux() + cores_total = flux.resource.list.resource_list(handle).get().up.ncores + cores_requested = max_workers * cores * threads_per_core + if cores_total < cores_requested: + raise ValueError("The number of requested cores is larger than the available cores " + str(cores_total) + " < " + str(cores_requested)) + + class FluxPythonSpawner(BaseSpawner): """ A class representing the FluxPythonInterface. diff --git a/tests/test_executor_backend_flux.py b/tests/test_executor_backend_flux.py index 8ab11569..b9d45ee7 100644 --- a/tests/test_executor_backend_flux.py +++ b/tests/test_executor_backend_flux.py @@ -113,3 +113,13 @@ def test_internal_memory(self): self.assertFalse(f.done()) self.assertEqual(f.result(), np.array([5])) self.assertTrue(f.done()) + + def test_validate_max_workers(self): + with self.assertRaises(ValueError): + Executor( + max_workers=10, + resource_dict={"cores": 10, "threads_per_core": 10}, + flux_executor=self.executor, + backend="flux_allocation", + block_allocation=True, + )