From 48a8e680ffd406460607fd6000635b5712e9af46 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 12 Dec 2024 18:31:25 -0800 Subject: [PATCH 01/18] Add option to write flux log files --- executorlib/__init__.py | 5 +++++ executorlib/cache/executor.py | 3 +++ executorlib/interactive/executor.py | 5 +++++ executorlib/interactive/flux.py | 6 ++++++ executorlib/standalone/inputcheck.py | 10 ++++++++++ 5 files changed, 29 insertions(+) diff --git a/executorlib/__init__.py b/executorlib/__init__.py index 5e0a1c94..4fb64e2e 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -94,6 +94,7 @@ def __init__( flux_executor=None, flux_executor_pmi_mode: Optional[str] = None, flux_executor_nesting: bool = False, + flux_log_files: bool = False, pysqa_config_directory: Optional[str] = None, hostname_localhost: Optional[bool] = None, block_allocation: bool = False, @@ -115,6 +116,7 @@ def __new__( flux_executor=None, flux_executor_pmi_mode: Optional[str] = None, flux_executor_nesting: bool = False, + flux_log_files: bool = False, pysqa_config_directory: Optional[str] = None, hostname_localhost: Optional[bool] = None, block_allocation: bool = False, @@ -194,6 +196,7 @@ def __new__( flux_executor=flux_executor, flux_executor_pmi_mode=flux_executor_pmi_mode, flux_executor_nesting=flux_executor_nesting, + flux_log_files=flux_log_files, pysqa_config_directory=pysqa_config_directory, hostname_localhost=hostname_localhost, block_allocation=block_allocation, @@ -211,6 +214,7 @@ def __new__( flux_executor=flux_executor, flux_executor_pmi_mode=flux_executor_pmi_mode, flux_executor_nesting=flux_executor_nesting, + flux_log_files=flux_log_files, hostname_localhost=hostname_localhost, block_allocation=block_allocation, init_function=init_function, @@ -230,6 +234,7 @@ def __new__( flux_executor=flux_executor, flux_executor_pmi_mode=flux_executor_pmi_mode, flux_executor_nesting=flux_executor_nesting, + flux_log_files=flux_log_files, hostname_localhost=hostname_localhost, block_allocation=block_allocation, init_function=init_function, diff --git a/executorlib/cache/executor.py b/executorlib/cache/executor.py index ad817915..62a66f39 100644 --- a/executorlib/cache/executor.py +++ b/executorlib/cache/executor.py @@ -10,6 +10,7 @@ from executorlib.standalone.inputcheck import ( check_executor, check_flux_executor_pmi_mode, + check_flux_log_files, check_hostname_localhost, check_max_workers_and_cores, check_nested_flux_executor, @@ -88,6 +89,7 @@ def create_file_executor( flux_executor=None, flux_executor_pmi_mode: Optional[str] = None, flux_executor_nesting: bool = False, + flux_log_files: bool = False, pysqa_config_directory: Optional[str] = None, hostname_localhost: Optional[bool] = None, block_allocation: bool = False, @@ -109,6 +111,7 @@ def create_file_executor( check_hostname_localhost(hostname_localhost=hostname_localhost) check_executor(executor=flux_executor) check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) + check_flux_log_files(flux_log_files=flux_log_files) return FileExecutor( cache_directory=cache_directory, resource_dict=resource_dict, diff --git a/executorlib/interactive/executor.py b/executorlib/interactive/executor.py index 5a31f1e6..e85cb2b8 100644 --- a/executorlib/interactive/executor.py +++ b/executorlib/interactive/executor.py @@ -13,6 +13,7 @@ check_gpus_per_worker, check_init_function, check_nested_flux_executor, + check_flux_log_files, check_oversubscribe, check_pmi, validate_number_of_cores, @@ -154,6 +155,7 @@ def create_executor( flux_executor=None, flux_executor_pmi_mode: Optional[str] = None, flux_executor_nesting: bool = False, + flux_log_files: bool = False, hostname_localhost: Optional[bool] = None, block_allocation: bool = False, init_function: Optional[callable] = None, @@ -213,6 +215,7 @@ def create_executor( resource_dict["flux_executor"] = flux_executor resource_dict["flux_executor_pmi_mode"] = flux_executor_pmi_mode resource_dict["flux_executor_nesting"] = flux_executor_nesting + resource_dict["flux_log_files"] = flux_log_files if block_allocation: resource_dict["init_function"] = init_function return InteractiveExecutor( @@ -235,6 +238,7 @@ def create_executor( elif backend == "slurm_allocation": check_executor(executor=flux_executor) check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) + check_flux_log_files(flux_log_files=flux_log_files) if block_allocation: resource_dict["init_function"] = init_function return InteractiveExecutor( @@ -257,6 +261,7 @@ def create_executor( elif backend == "local": check_executor(executor=flux_executor) check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) + check_flux_log_files(flux_log_files=flux_log_files) check_gpus_per_worker(gpus_per_worker=resource_dict["gpus_per_core"]) check_command_line_argument_lst( command_line_argument_lst=resource_dict["slurm_cmd_args"] diff --git a/executorlib/interactive/flux.py b/executorlib/interactive/flux.py index 472a4792..f2fede48 100644 --- a/executorlib/interactive/flux.py +++ b/executorlib/interactive/flux.py @@ -19,6 +19,7 @@ class FluxPythonSpawner(BaseSpawner): flux_executor (flux.job.FluxExecutor, optional): The FluxExecutor instance. Defaults to None. flux_executor_pmi_mode (str, optional): The PMI option. Defaults to None. flux_executor_nesting (bool, optional): Whether to use nested FluxExecutor. Defaults to False. + flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False. """ def __init__( @@ -31,6 +32,7 @@ def __init__( flux_executor: Optional[flux.job.FluxExecutor] = None, flux_executor_pmi_mode: Optional[str] = None, flux_executor_nesting: bool = False, + flux_log_files: bool = False, ): super().__init__( cwd=cwd, @@ -42,6 +44,7 @@ def __init__( self._flux_executor = flux_executor self._flux_executor_pmi_mode = flux_executor_pmi_mode self._flux_executor_nesting = flux_executor_nesting + self._flux_log_files = flux_log_files self._future = None def bootup( @@ -85,6 +88,9 @@ def bootup( jobspec.setattr_shell_option("pmi", self._flux_executor_pmi_mode) if self._cwd is not None: jobspec.cwd = self._cwd + if self._flux_log_files and self._cwd is not None: + jobspec.stderr = os.path.join(self._cwd, "flux.err") + jobspec.stdout = os.path.join(self._cwd, "flux.out") self._future = self._flux_executor.submit(jobspec) def shutdown(self, wait: bool = True): diff --git a/executorlib/standalone/inputcheck.py b/executorlib/standalone/inputcheck.py index 84898ee5..e56bc500 100644 --- a/executorlib/standalone/inputcheck.py +++ b/executorlib/standalone/inputcheck.py @@ -147,6 +147,16 @@ def check_flux_executor_pmi_mode(flux_executor_pmi_mode: Optional[str]) -> None: ) +def check_flux_log_files(flux_log_files: Optional[bool]) -> None: + """ + Check if flux_log_files is True and raise a ValueError if it is. + """ + if flux_log_files: + raise ValueError( + "The flux_log_files parameter is only supported for the flux framework backend." + ) + + def check_pysqa_config_directory(pysqa_config_directory: Optional[str]) -> None: """ Check if pysqa_config_directory is None and raise a ValueError if it is not. From d14ebbd940649042b3c14b386264bca43cd6b27a Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 13 Dec 2024 02:32:13 +0000 Subject: [PATCH 02/18] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/interactive/executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/interactive/executor.py b/executorlib/interactive/executor.py index e85cb2b8..fd9a903f 100644 --- a/executorlib/interactive/executor.py +++ b/executorlib/interactive/executor.py @@ -10,10 +10,10 @@ from executorlib.standalone.inputcheck import ( check_command_line_argument_lst, check_executor, + check_flux_log_files, check_gpus_per_worker, check_init_function, check_nested_flux_executor, - check_flux_log_files, check_oversubscribe, check_pmi, validate_number_of_cores, From 114ff490c55b6778608fb1477a281451c469aa6c Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 12 Dec 2024 18:39:00 -0800 Subject: [PATCH 03/18] Add tests and docstrings --- executorlib/__init__.py | 2 ++ executorlib/interactive/executor.py | 1 + tests/test_shared_input_check.py | 5 +++++ 3 files changed, 8 insertions(+) diff --git a/executorlib/__init__.py b/executorlib/__init__.py index 4fb64e2e..e1092d20 100644 --- a/executorlib/__init__.py +++ b/executorlib/__init__.py @@ -45,6 +45,7 @@ class Executor: flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function. + flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False. pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an @@ -152,6 +153,7 @@ def __new__( flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function. + flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False. pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an diff --git a/executorlib/interactive/executor.py b/executorlib/interactive/executor.py index e85cb2b8..e7372a0c 100644 --- a/executorlib/interactive/executor.py +++ b/executorlib/interactive/executor.py @@ -186,6 +186,7 @@ def create_executor( flux_executor (flux.job.FluxExecutor): Flux Python interface to submit the workers to flux flux_executor_pmi_mode (str): PMI interface to use (OpenMPI v5 requires pmix) default is None (Flux only) flux_executor_nesting (bool): Provide hierarchically nested Flux job scheduler inside the submitted function. + flux_log_files (bool, optional): Write flux stdout and stderr files. Defaults to False. hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the context of an HPC cluster this essential to be able to communicate to an Executor running on a different compute node within the same allocation. And in principle diff --git a/tests/test_shared_input_check.py b/tests/test_shared_input_check.py index 5aa70bdc..5f682a8b 100644 --- a/tests/test_shared_input_check.py +++ b/tests/test_shared_input_check.py @@ -7,6 +7,7 @@ check_executor, check_init_function, check_nested_flux_executor, + check_flux_log_files, check_pmi, check_plot_dependency_graph, check_refresh_rate, @@ -67,6 +68,10 @@ def test_check_nested_flux_executor(self): with self.assertRaises(ValueError): check_nested_flux_executor(nested_flux_executor=True) + def test_check_flux_log_files(self): + with self.assertRaises(ValueError): + check_flux_log_files(flux_log_files=True) + def test_check_plot_dependency_graph(self): with self.assertRaises(ValueError): check_plot_dependency_graph(plot_dependency_graph=True) From f02ce7204a4eda4f04d7d18e5c1409c20d2c692c Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Mon, 16 Dec 2024 21:37:04 -0700 Subject: [PATCH 04/18] Add test --- tests/test_executor_backend_flux.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/tests/test_executor_backend_flux.py b/tests/test_executor_backend_flux.py index 8ab11569..ebedb998 100644 --- a/tests/test_executor_backend_flux.py +++ b/tests/test_executor_backend_flux.py @@ -100,6 +100,28 @@ def test_single_task(self): [[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]], ) + def test_output_files(self): + file_stdout = os.path.join(os.path.dirname(__file__), "flux.out") + file_stderr = os.path.join(os.path.dirname(__file__), "flux.err") + with Executor( + max_cores=2, + resource_dict={"cores": 2}, + flux_executor=self.executor, + backend="flux_allocation", + block_allocation=True, + flux_executor_pmi_mode=pmi, + flux_log_files=True, + ) as p: + output = p.map(mpi_funct, [1, 2, 3]) + self.assertEqual( + list(output), + [[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]], + ) + self.assertTrue(os.path.exists(file_stdout)) + self.assertTrue(os.path.exists(file_stderr)) + os.remove(file_stdout) + os.remove(file_stderr) + def test_internal_memory(self): with Executor( max_cores=1, From 5234addb9864f1b25bdce088b0c57062ded366ac Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Mon, 16 Dec 2024 21:41:19 -0700 Subject: [PATCH 05/18] fix tests --- tests/test_executor_backend_flux.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_executor_backend_flux.py b/tests/test_executor_backend_flux.py index ebedb998..6ee176ce 100644 --- a/tests/test_executor_backend_flux.py +++ b/tests/test_executor_backend_flux.py @@ -101,8 +101,8 @@ def test_single_task(self): ) def test_output_files(self): - file_stdout = os.path.join(os.path.dirname(__file__), "flux.out") - file_stderr = os.path.join(os.path.dirname(__file__), "flux.err") + file_stdout = os.path.abspath("flux.out") + file_stderr = os.path.abspath("flux.err") with Executor( max_cores=2, resource_dict={"cores": 2}, From 14405a269ee9cb57976b46bc938659f20e1f4a9a Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Mon, 16 Dec 2024 21:52:05 -0700 Subject: [PATCH 06/18] extensions --- executorlib/interactive/flux.py | 3 +++ tests/test_executor_backend_flux.py | 24 +++++++++++++++++++++++- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/executorlib/interactive/flux.py b/executorlib/interactive/flux.py index f2fede48..b900c35e 100644 --- a/executorlib/interactive/flux.py +++ b/executorlib/interactive/flux.py @@ -91,6 +91,9 @@ def bootup( if self._flux_log_files and self._cwd is not None: jobspec.stderr = os.path.join(self._cwd, "flux.err") jobspec.stdout = os.path.join(self._cwd, "flux.out") + elif self._flux_log_files: + jobspec.stderr = os.path.abspath("flux.err") + jobspec.stdout = os.path.abspath("flux.out") self._future = self._flux_executor.submit(jobspec) def shutdown(self, wait: bool = True): diff --git a/tests/test_executor_backend_flux.py b/tests/test_executor_backend_flux.py index 6ee176ce..6df6a5c3 100644 --- a/tests/test_executor_backend_flux.py +++ b/tests/test_executor_backend_flux.py @@ -100,7 +100,29 @@ def test_single_task(self): [[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]], ) - def test_output_files(self): + def test_output_files_cwd(self): + file_stdout = os.path.join(os.path.dirname(__file__), "flux.out") + file_stderr = os.path.join(os.path.dirname(__file__), "flux.err") + with Executor( + max_cores=2, + resource_dict={"cores": 2, "cwd": os.path.dirname(__file__)}, + flux_executor=self.executor, + backend="flux_allocation", + block_allocation=True, + flux_executor_pmi_mode=pmi, + flux_log_files=True, + ) as p: + output = p.map(mpi_funct, [1, 2, 3]) + self.assertEqual( + list(output), + [[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]], + ) + self.assertTrue(os.path.exists(file_stdout)) + self.assertTrue(os.path.exists(file_stderr)) + os.remove(file_stdout) + os.remove(file_stderr) + + def test_output_files_abs(self): file_stdout = os.path.abspath("flux.out") file_stderr = os.path.abspath("flux.err") with Executor( From 38125a5d98135e01b22601f7e6dd44b9670034c7 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 19 Dec 2024 15:03:19 -0700 Subject: [PATCH 07/18] Update test_executor_backend_flux.py --- tests/test_executor_backend_flux.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_executor_backend_flux.py b/tests/test_executor_backend_flux.py index 6df6a5c3..86068981 100644 --- a/tests/test_executor_backend_flux.py +++ b/tests/test_executor_backend_flux.py @@ -104,8 +104,8 @@ def test_output_files_cwd(self): file_stdout = os.path.join(os.path.dirname(__file__), "flux.out") file_stderr = os.path.join(os.path.dirname(__file__), "flux.err") with Executor( - max_cores=2, - resource_dict={"cores": 2, "cwd": os.path.dirname(__file__)}, + max_cores=1, + resource_dict={"cores": 1, "cwd": os.path.dirname(__file__)}, flux_executor=self.executor, backend="flux_allocation", block_allocation=True, @@ -126,8 +126,8 @@ def test_output_files_abs(self): file_stdout = os.path.abspath("flux.out") file_stderr = os.path.abspath("flux.err") with Executor( - max_cores=2, - resource_dict={"cores": 2}, + max_cores=1, + resource_dict={"cores": 1}, flux_executor=self.executor, backend="flux_allocation", block_allocation=True, From 9ef5180c44532a2b882afc02550b9324271eb941 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 19 Dec 2024 15:08:45 -0700 Subject: [PATCH 08/18] Update test_executor_backend_flux.py --- tests/test_executor_backend_flux.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_executor_backend_flux.py b/tests/test_executor_backend_flux.py index 86068981..a12cd6eb 100644 --- a/tests/test_executor_backend_flux.py +++ b/tests/test_executor_backend_flux.py @@ -112,10 +112,10 @@ def test_output_files_cwd(self): flux_executor_pmi_mode=pmi, flux_log_files=True, ) as p: - output = p.map(mpi_funct, [1, 2, 3]) + output = p.map(calc, [1, 2, 3]) self.assertEqual( list(output), - [[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]], + [1, 2, 3], ) self.assertTrue(os.path.exists(file_stdout)) self.assertTrue(os.path.exists(file_stderr)) @@ -134,10 +134,10 @@ def test_output_files_abs(self): flux_executor_pmi_mode=pmi, flux_log_files=True, ) as p: - output = p.map(mpi_funct, [1, 2, 3]) + output = p.map(calc, [1, 2, 3]) self.assertEqual( list(output), - [[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]], + [1, 2, 3], ) self.assertTrue(os.path.exists(file_stdout)) self.assertTrue(os.path.exists(file_stderr)) From c94f0ad141e6782f91526c9e05ac59e8afd2e3c1 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 19 Dec 2024 21:28:23 -0700 Subject: [PATCH 09/18] remove pmi --- tests/test_executor_backend_flux.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/test_executor_backend_flux.py b/tests/test_executor_backend_flux.py index a12cd6eb..d2cd464f 100644 --- a/tests/test_executor_backend_flux.py +++ b/tests/test_executor_backend_flux.py @@ -109,7 +109,6 @@ def test_output_files_cwd(self): flux_executor=self.executor, backend="flux_allocation", block_allocation=True, - flux_executor_pmi_mode=pmi, flux_log_files=True, ) as p: output = p.map(calc, [1, 2, 3]) @@ -131,7 +130,6 @@ def test_output_files_abs(self): flux_executor=self.executor, backend="flux_allocation", block_allocation=True, - flux_executor_pmi_mode=pmi, flux_log_files=True, ) as p: output = p.map(calc, [1, 2, 3]) From e930b7919a230caadb8241cedd286c90ea41ed94 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 19 Dec 2024 21:35:09 -0700 Subject: [PATCH 10/18] disable new tests --- tests/test_executor_backend_flux.py | 82 ++++++++++++++--------------- 1 file changed, 41 insertions(+), 41 deletions(-) diff --git a/tests/test_executor_backend_flux.py b/tests/test_executor_backend_flux.py index d2cd464f..18fcb8d9 100644 --- a/tests/test_executor_backend_flux.py +++ b/tests/test_executor_backend_flux.py @@ -100,47 +100,47 @@ def test_single_task(self): [[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]], ) - def test_output_files_cwd(self): - file_stdout = os.path.join(os.path.dirname(__file__), "flux.out") - file_stderr = os.path.join(os.path.dirname(__file__), "flux.err") - with Executor( - max_cores=1, - resource_dict={"cores": 1, "cwd": os.path.dirname(__file__)}, - flux_executor=self.executor, - backend="flux_allocation", - block_allocation=True, - flux_log_files=True, - ) as p: - output = p.map(calc, [1, 2, 3]) - self.assertEqual( - list(output), - [1, 2, 3], - ) - self.assertTrue(os.path.exists(file_stdout)) - self.assertTrue(os.path.exists(file_stderr)) - os.remove(file_stdout) - os.remove(file_stderr) - - def test_output_files_abs(self): - file_stdout = os.path.abspath("flux.out") - file_stderr = os.path.abspath("flux.err") - with Executor( - max_cores=1, - resource_dict={"cores": 1}, - flux_executor=self.executor, - backend="flux_allocation", - block_allocation=True, - flux_log_files=True, - ) as p: - output = p.map(calc, [1, 2, 3]) - self.assertEqual( - list(output), - [1, 2, 3], - ) - self.assertTrue(os.path.exists(file_stdout)) - self.assertTrue(os.path.exists(file_stderr)) - os.remove(file_stdout) - os.remove(file_stderr) + # def test_output_files_cwd(self): + # file_stdout = os.path.join(os.path.dirname(__file__), "flux.out") + # file_stderr = os.path.join(os.path.dirname(__file__), "flux.err") + # with Executor( + # max_cores=1, + # resource_dict={"cores": 1, "cwd": os.path.dirname(__file__)}, + # flux_executor=self.executor, + # backend="flux_allocation", + # block_allocation=True, + # flux_log_files=True, + # ) as p: + # output = p.map(calc, [1, 2, 3]) + # self.assertEqual( + # list(output), + # [1, 2, 3], + # ) + # self.assertTrue(os.path.exists(file_stdout)) + # self.assertTrue(os.path.exists(file_stderr)) + # os.remove(file_stdout) + # os.remove(file_stderr) + # + # def test_output_files_abs(self): + # file_stdout = os.path.abspath("flux.out") + # file_stderr = os.path.abspath("flux.err") + # with Executor( + # max_cores=1, + # resource_dict={"cores": 1}, + # flux_executor=self.executor, + # backend="flux_allocation", + # block_allocation=True, + # flux_log_files=True, + # ) as p: + # output = p.map(calc, [1, 2, 3]) + # self.assertEqual( + # list(output), + # [1, 2, 3], + # ) + # self.assertTrue(os.path.exists(file_stdout)) + # self.assertTrue(os.path.exists(file_stderr)) + # os.remove(file_stdout) + # os.remove(file_stderr) def test_internal_memory(self): with Executor( From aaf3b1e032bb4d9b765e01b32d275339eebf655e Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 19 Dec 2024 21:40:12 -0700 Subject: [PATCH 11/18] test cwd --- tests/test_executor_backend_flux.py | 42 ++++++++++++++--------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/tests/test_executor_backend_flux.py b/tests/test_executor_backend_flux.py index 18fcb8d9..31b02c6a 100644 --- a/tests/test_executor_backend_flux.py +++ b/tests/test_executor_backend_flux.py @@ -100,27 +100,27 @@ def test_single_task(self): [[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]], ) - # def test_output_files_cwd(self): - # file_stdout = os.path.join(os.path.dirname(__file__), "flux.out") - # file_stderr = os.path.join(os.path.dirname(__file__), "flux.err") - # with Executor( - # max_cores=1, - # resource_dict={"cores": 1, "cwd": os.path.dirname(__file__)}, - # flux_executor=self.executor, - # backend="flux_allocation", - # block_allocation=True, - # flux_log_files=True, - # ) as p: - # output = p.map(calc, [1, 2, 3]) - # self.assertEqual( - # list(output), - # [1, 2, 3], - # ) - # self.assertTrue(os.path.exists(file_stdout)) - # self.assertTrue(os.path.exists(file_stderr)) - # os.remove(file_stdout) - # os.remove(file_stderr) - # + def test_output_files_cwd(self): + file_stdout = os.path.join(os.path.dirname(__file__), "flux.out") + file_stderr = os.path.join(os.path.dirname(__file__), "flux.err") + with Executor( + max_cores=1, + resource_dict={"cores": 1, "cwd": os.path.dirname(__file__)}, + flux_executor=self.executor, + backend="flux_allocation", + block_allocation=True, + flux_log_files=True, + ) as p: + output = p.map(calc, [1, 2, 3]) + self.assertEqual( + list(output), + [1, 2, 3], + ) + self.assertTrue(os.path.exists(file_stdout)) + self.assertTrue(os.path.exists(file_stderr)) + os.remove(file_stdout) + os.remove(file_stderr) + # def test_output_files_abs(self): # file_stdout = os.path.abspath("flux.out") # file_stderr = os.path.abspath("flux.err") From eb1e6f8f0840f313205bcb272fc758cb8663beb5 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 19 Dec 2024 21:44:13 -0700 Subject: [PATCH 12/18] abspath --- tests/test_executor_backend_flux.py | 50 ++++++++++++++--------------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/tests/test_executor_backend_flux.py b/tests/test_executor_backend_flux.py index 31b02c6a..e28b9a35 100644 --- a/tests/test_executor_backend_flux.py +++ b/tests/test_executor_backend_flux.py @@ -100,33 +100,12 @@ def test_single_task(self): [[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]], ) - def test_output_files_cwd(self): - file_stdout = os.path.join(os.path.dirname(__file__), "flux.out") - file_stderr = os.path.join(os.path.dirname(__file__), "flux.err") - with Executor( - max_cores=1, - resource_dict={"cores": 1, "cwd": os.path.dirname(__file__)}, - flux_executor=self.executor, - backend="flux_allocation", - block_allocation=True, - flux_log_files=True, - ) as p: - output = p.map(calc, [1, 2, 3]) - self.assertEqual( - list(output), - [1, 2, 3], - ) - self.assertTrue(os.path.exists(file_stdout)) - self.assertTrue(os.path.exists(file_stderr)) - os.remove(file_stdout) - os.remove(file_stderr) - - # def test_output_files_abs(self): - # file_stdout = os.path.abspath("flux.out") - # file_stderr = os.path.abspath("flux.err") + # def test_output_files_cwd(self): + # file_stdout = os.path.join(os.path.dirname(__file__), "flux.out") + # file_stderr = os.path.join(os.path.dirname(__file__), "flux.err") # with Executor( # max_cores=1, - # resource_dict={"cores": 1}, + # resource_dict={"cores": 1, "cwd": os.path.dirname(__file__)}, # flux_executor=self.executor, # backend="flux_allocation", # block_allocation=True, @@ -142,6 +121,27 @@ def test_output_files_cwd(self): # os.remove(file_stdout) # os.remove(file_stderr) + def test_output_files_abs(self): + file_stdout = os.path.abspath("flux.out") + file_stderr = os.path.abspath("flux.err") + with Executor( + max_cores=1, + resource_dict={"cores": 1}, + flux_executor=self.executor, + backend="flux_allocation", + block_allocation=True, + flux_log_files=True, + ) as p: + output = p.map(calc, [1, 2, 3]) + self.assertEqual( + list(output), + [1, 2, 3], + ) + self.assertTrue(os.path.exists(file_stdout)) + self.assertTrue(os.path.exists(file_stderr)) + os.remove(file_stdout) + os.remove(file_stderr) + def test_internal_memory(self): with Executor( max_cores=1, From a2a076e2a64b331d895d3771472c9be661cff0c6 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 19 Dec 2024 21:50:01 -0700 Subject: [PATCH 13/18] create new directory --- tests/test_executor_backend_flux.py | 43 +++++++++++++++-------------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/tests/test_executor_backend_flux.py b/tests/test_executor_backend_flux.py index e28b9a35..03f17aa7 100644 --- a/tests/test_executor_backend_flux.py +++ b/tests/test_executor_backend_flux.py @@ -100,26 +100,29 @@ def test_single_task(self): [[(1, 2, 0), (1, 2, 1)], [(2, 2, 0), (2, 2, 1)], [(3, 2, 0), (3, 2, 1)]], ) - # def test_output_files_cwd(self): - # file_stdout = os.path.join(os.path.dirname(__file__), "flux.out") - # file_stderr = os.path.join(os.path.dirname(__file__), "flux.err") - # with Executor( - # max_cores=1, - # resource_dict={"cores": 1, "cwd": os.path.dirname(__file__)}, - # flux_executor=self.executor, - # backend="flux_allocation", - # block_allocation=True, - # flux_log_files=True, - # ) as p: - # output = p.map(calc, [1, 2, 3]) - # self.assertEqual( - # list(output), - # [1, 2, 3], - # ) - # self.assertTrue(os.path.exists(file_stdout)) - # self.assertTrue(os.path.exists(file_stderr)) - # os.remove(file_stdout) - # os.remove(file_stderr) + def test_output_files_cwd(self): + dirname = os.path.abspath("logfiles") + os.mkdir(dirname) + file_stdout = os.path.join(dirname, "flux.out") + file_stderr = os.path.join(dirname, "flux.err") + with Executor( + max_cores=1, + resource_dict={"cores": 1, "cwd": dirname}, + flux_executor=self.executor, + backend="flux_allocation", + block_allocation=True, + flux_log_files=True, + ) as p: + output = p.map(calc, [1, 2, 3]) + self.assertEqual( + list(output), + [1, 2, 3], + ) + self.assertTrue(os.path.exists(file_stdout)) + self.assertTrue(os.path.exists(file_stderr)) + os.remove(file_stdout) + os.remove(file_stderr) + os.rmdir(dirname) def test_output_files_abs(self): file_stdout = os.path.abspath("flux.out") From 655bb07fe2cdbf92dc8127a6c53abc5f4de930b7 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 19 Dec 2024 21:57:34 -0700 Subject: [PATCH 14/18] do not remove --- tests/test_executor_backend_flux.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_executor_backend_flux.py b/tests/test_executor_backend_flux.py index 03f17aa7..7e56b9b8 100644 --- a/tests/test_executor_backend_flux.py +++ b/tests/test_executor_backend_flux.py @@ -120,9 +120,9 @@ def test_output_files_cwd(self): ) self.assertTrue(os.path.exists(file_stdout)) self.assertTrue(os.path.exists(file_stderr)) - os.remove(file_stdout) - os.remove(file_stderr) - os.rmdir(dirname) + # os.remove(file_stdout) + # os.remove(file_stderr) + # os.rmdir(dirname) def test_output_files_abs(self): file_stdout = os.path.abspath("flux.out") From 7704d57813e4f6c452f7408e64698f27ff1de12c Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 19 Dec 2024 22:00:08 -0700 Subject: [PATCH 15/18] cwd --- tests/test_executor_backend_flux.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/test_executor_backend_flux.py b/tests/test_executor_backend_flux.py index 7e56b9b8..6ef3f1fe 100644 --- a/tests/test_executor_backend_flux.py +++ b/tests/test_executor_backend_flux.py @@ -103,23 +103,23 @@ def test_single_task(self): def test_output_files_cwd(self): dirname = os.path.abspath("logfiles") os.mkdir(dirname) - file_stdout = os.path.join(dirname, "flux.out") - file_stderr = os.path.join(dirname, "flux.err") + # file_stdout = os.path.join(dirname, "flux.out") + # file_stderr = os.path.join(dirname, "flux.err") with Executor( max_cores=1, resource_dict={"cores": 1, "cwd": dirname}, flux_executor=self.executor, backend="flux_allocation", block_allocation=True, - flux_log_files=True, + # flux_log_files=True, ) as p: output = p.map(calc, [1, 2, 3]) self.assertEqual( list(output), [1, 2, 3], ) - self.assertTrue(os.path.exists(file_stdout)) - self.assertTrue(os.path.exists(file_stderr)) + # self.assertTrue(os.path.exists(file_stdout)) + # self.assertTrue(os.path.exists(file_stderr)) # os.remove(file_stdout) # os.remove(file_stderr) # os.rmdir(dirname) From e0ed2b7304d5ec4c9f88c88afb747cd1ad7f3d8d Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 19 Dec 2024 22:20:01 -0700 Subject: [PATCH 16/18] fix working directory --- tests/test_executor_backend_flux.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/tests/test_executor_backend_flux.py b/tests/test_executor_backend_flux.py index 6ef3f1fe..fd37f6d3 100644 --- a/tests/test_executor_backend_flux.py +++ b/tests/test_executor_backend_flux.py @@ -101,28 +101,27 @@ def test_single_task(self): ) def test_output_files_cwd(self): - dirname = os.path.abspath("logfiles") + dirname = os.path.abspath(os.path.dirname(__file__)) os.mkdir(dirname) - # file_stdout = os.path.join(dirname, "flux.out") - # file_stderr = os.path.join(dirname, "flux.err") + file_stdout = os.path.join(dirname, "flux.out") + file_stderr = os.path.join(dirname, "flux.err") with Executor( max_cores=1, resource_dict={"cores": 1, "cwd": dirname}, flux_executor=self.executor, backend="flux_allocation", block_allocation=True, - # flux_log_files=True, + flux_log_files=True, ) as p: output = p.map(calc, [1, 2, 3]) self.assertEqual( list(output), [1, 2, 3], ) - # self.assertTrue(os.path.exists(file_stdout)) - # self.assertTrue(os.path.exists(file_stderr)) - # os.remove(file_stdout) - # os.remove(file_stderr) - # os.rmdir(dirname) + self.assertTrue(os.path.exists(file_stdout)) + self.assertTrue(os.path.exists(file_stderr)) + os.remove(file_stdout) + os.remove(file_stderr) def test_output_files_abs(self): file_stdout = os.path.abspath("flux.out") From 1e22aa8f524cd59d5cf93bda9baf221c702cb935 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 19 Dec 2024 22:31:15 -0700 Subject: [PATCH 17/18] fix makedir --- tests/test_executor_backend_flux.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_executor_backend_flux.py b/tests/test_executor_backend_flux.py index fd37f6d3..076ca5b3 100644 --- a/tests/test_executor_backend_flux.py +++ b/tests/test_executor_backend_flux.py @@ -102,7 +102,7 @@ def test_single_task(self): def test_output_files_cwd(self): dirname = os.path.abspath(os.path.dirname(__file__)) - os.mkdir(dirname) + os.makedirs(dirname, exist_ok=True) file_stdout = os.path.join(dirname, "flux.out") file_stderr = os.path.join(dirname, "flux.err") with Executor( From 482cb8feff6d56d840f055946b8c70977256b17b Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 19 Dec 2024 22:52:07 -0700 Subject: [PATCH 18/18] try higher level --- tests/test_executor_backend_flux.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_executor_backend_flux.py b/tests/test_executor_backend_flux.py index 076ca5b3..97f1e2d8 100644 --- a/tests/test_executor_backend_flux.py +++ b/tests/test_executor_backend_flux.py @@ -101,7 +101,7 @@ def test_single_task(self): ) def test_output_files_cwd(self): - dirname = os.path.abspath(os.path.dirname(__file__)) + dirname = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) os.makedirs(dirname, exist_ok=True) file_stdout = os.path.join(dirname, "flux.out") file_stderr = os.path.join(dirname, "flux.err")