Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to write flux log files #519

Merged
merged 22 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions executorlib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def __init__(
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
flux_log_files: bool = False,
pysqa_config_directory: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
Expand All @@ -115,6 +116,7 @@ def __new__(
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
flux_log_files: bool = False,
pysqa_config_directory: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
Expand Down Expand Up @@ -194,6 +196,7 @@ def __new__(
flux_executor=flux_executor,
flux_executor_pmi_mode=flux_executor_pmi_mode,
flux_executor_nesting=flux_executor_nesting,
flux_log_files=flux_log_files,
pysqa_config_directory=pysqa_config_directory,
hostname_localhost=hostname_localhost,
block_allocation=block_allocation,
Expand All @@ -211,6 +214,7 @@ def __new__(
flux_executor=flux_executor,
flux_executor_pmi_mode=flux_executor_pmi_mode,
flux_executor_nesting=flux_executor_nesting,
flux_log_files=flux_log_files,
hostname_localhost=hostname_localhost,
block_allocation=block_allocation,
init_function=init_function,
Expand All @@ -230,6 +234,7 @@ def __new__(
flux_executor=flux_executor,
flux_executor_pmi_mode=flux_executor_pmi_mode,
flux_executor_nesting=flux_executor_nesting,
flux_log_files=flux_log_files,
hostname_localhost=hostname_localhost,
block_allocation=block_allocation,
init_function=init_function,
Expand Down
3 changes: 3 additions & 0 deletions executorlib/cache/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from executorlib.standalone.inputcheck import (
check_executor,
check_flux_executor_pmi_mode,
check_flux_log_files,
check_hostname_localhost,
check_max_workers_and_cores,
check_nested_flux_executor,
Expand Down Expand Up @@ -88,6 +89,7 @@ def create_file_executor(
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
flux_log_files: bool = False,
pysqa_config_directory: Optional[str] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
Expand All @@ -109,6 +111,7 @@ def create_file_executor(
check_hostname_localhost(hostname_localhost=hostname_localhost)
check_executor(executor=flux_executor)
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
check_flux_log_files(flux_log_files=flux_log_files)
return FileExecutor(
cache_directory=cache_directory,
resource_dict=resource_dict,
Expand Down
5 changes: 5 additions & 0 deletions executorlib/interactive/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from executorlib.standalone.inputcheck import (
check_command_line_argument_lst,
check_executor,
check_flux_log_files,
check_gpus_per_worker,
check_init_function,
check_nested_flux_executor,
Expand Down Expand Up @@ -154,6 +155,7 @@ def create_executor(
flux_executor=None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
flux_log_files: bool = False,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
init_function: Optional[callable] = None,
Expand Down Expand Up @@ -213,6 +215,7 @@ def create_executor(
resource_dict["flux_executor"] = flux_executor
resource_dict["flux_executor_pmi_mode"] = flux_executor_pmi_mode
resource_dict["flux_executor_nesting"] = flux_executor_nesting
resource_dict["flux_log_files"] = flux_log_files
if block_allocation:
resource_dict["init_function"] = init_function
return InteractiveExecutor(
Expand All @@ -235,6 +238,7 @@ def create_executor(
elif backend == "slurm_allocation":
check_executor(executor=flux_executor)
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
check_flux_log_files(flux_log_files=flux_log_files)
if block_allocation:
resource_dict["init_function"] = init_function
return InteractiveExecutor(
Expand All @@ -257,6 +261,7 @@ def create_executor(
elif backend == "local":
check_executor(executor=flux_executor)
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
check_flux_log_files(flux_log_files=flux_log_files)
check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"])
check_command_line_argument_lst(
command_line_argument_lst=resource_dict["slurm_cmd_args"]
Expand Down
6 changes: 6 additions & 0 deletions executorlib/interactive/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class FluxPythonSpawner(BaseSpawner):
flux_executor (flux.job.FluxExecutor, optional): The FluxExecutor instance. Defaults to None.
flux_executor_pmi_mode (str, optional): The PMI option. Defaults to None.
flux_executor_nesting (bool, optional): Whether to use nested FluxExecutor. Defaults to False.
flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False.
"""

def __init__(
Expand All @@ -31,6 +32,7 @@ def __init__(
flux_executor: Optional[flux.job.FluxExecutor] = None,
flux_executor_pmi_mode: Optional[str] = None,
flux_executor_nesting: bool = False,
flux_log_files: bool = False,
):
super().__init__(
cwd=cwd,
Expand All @@ -42,6 +44,7 @@ def __init__(
self._flux_executor = flux_executor
self._flux_executor_pmi_mode = flux_executor_pmi_mode
self._flux_executor_nesting = flux_executor_nesting
self._flux_log_files = flux_log_files
self._future = None

def bootup(
Expand Down Expand Up @@ -85,6 +88,9 @@ def bootup(
jobspec.setattr_shell_option("pmi", self._flux_executor_pmi_mode)
if self._cwd is not None:
jobspec.cwd = self._cwd
if self._flux_log_files and self._cwd is not None:
jobspec.stderr = os.path.join(self._cwd, "flux.err")
jobspec.stdout = os.path.join(self._cwd, "flux.out")
self._future = self._flux_executor.submit(jobspec)

def shutdown(self, wait: bool = True):
Expand Down
10 changes: 10 additions & 0 deletions executorlib/standalone/inputcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,16 @@ def check_flux_executor_pmi_mode(flux_executor_pmi_mode: Optional[str]) -> None:
)


def check_flux_log_files(flux_log_files: Optional[bool]) -> None:
"""
Check if flux_log_files is True and raise a ValueError if it is.
"""
if flux_log_files:
raise ValueError(
"The flux_log_files parameter is only supported for the flux framework backend."
)


def check_pysqa_config_directory(pysqa_config_directory: Optional[str]) -> None:
"""
Check if pysqa_config_directory is None and raise a ValueError if it is not.
Expand Down
Loading