From 80c1741ca3df607c75254a61fd664018dc043219 Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Mon, 13 Jan 2025 16:46:21 +0100 Subject: [PATCH] Adapt message arguments passing to process controller (#6668) Changes required after aiidateam/plumpy#301 and aiidateam/plumpy#291 For the rpc call of process.kill and process.pause, the passed in argument name is changed from msg to msg_text. Same for kill_processes and pause_processes of process controller. Meanwhile, when calling _perform_actions the action is passed as a callable that takes no argument, it then requires the callable should be constructed with its arguments using functools.partial before hands. This increase the readability by makes the arguments stay together with the function itself where originally the arguments passed to callable is separated from other arguments for _perform_action itself. - bump plumpy version to `0.24.0`. --- environment.yml | 2 +- pyproject.toml | 2 +- src/aiida/cmdline/commands/cmd_process.py | 18 ++++++++++++++---- src/aiida/engine/processes/control.py | 11 +++++++---- src/aiida/engine/processes/functions.py | 4 ++-- src/aiida/engine/processes/process.py | 6 +++--- src/aiida/engine/runners.py | 2 +- tests/engine/test_rmq.py | 10 ++++------ uv.lock | 10 +++++----- 9 files changed, 38 insertions(+), 27 deletions(-) diff --git a/environment.yml b/environment.yml index ad80dd3416..fbce5ffe8c 100644 --- a/environment.yml +++ b/environment.yml @@ -22,7 +22,7 @@ dependencies: - importlib-metadata~=6.0 - numpy~=1.21 - paramiko~=3.0 -- plumpy~=0.22.3 +- plumpy~=0.24.0 - pgsu~=0.3.0 - psutil~=5.6 - psycopg[binary]~=3.0 diff --git a/pyproject.toml b/pyproject.toml index 32894eb4ac..6cca60defd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,7 @@ dependencies = [ 'importlib-metadata~=6.0', 'numpy~=1.21', 'paramiko~=3.0', - 'plumpy~=0.22.3', + 'plumpy~=0.24.0', 'pgsu~=0.3.0', 'psutil~=5.6', 'psycopg[binary]~=3.0', diff --git a/src/aiida/cmdline/commands/cmd_process.py b/src/aiida/cmdline/commands/cmd_process.py index e203bdddfc..5ad7c5d53c 100644 --- a/src/aiida/cmdline/commands/cmd_process.py +++ b/src/aiida/cmdline/commands/cmd_process.py @@ -340,8 +340,13 @@ def process_kill(processes, all_entries, timeout, wait): with capture_logging() as stream: try: - message = 'Killed through `verdi process kill`' - control.kill_processes(processes, all_entries=all_entries, timeout=timeout, wait=wait, message=message) + control.kill_processes( + processes, + msg_text='Killed through `verdi process kill`', + all_entries=all_entries, + timeout=timeout, + wait=wait, + ) except control.ProcessTimeoutException as exception: echo.echo_critical(f'{exception}\n{REPAIR_INSTRUCTIONS}') @@ -371,8 +376,13 @@ def process_pause(processes, all_entries, timeout, wait): with capture_logging() as stream: try: - message = 'Paused through `verdi process pause`' - control.pause_processes(processes, all_entries=all_entries, timeout=timeout, wait=wait, message=message) + control.pause_processes( + processes, + msg_text='Paused through `verdi process pause`', + all_entries=all_entries, + timeout=timeout, + wait=wait, + ) except control.ProcessTimeoutException as exception: echo.echo_critical(f'{exception}\n{REPAIR_INSTRUCTIONS}') diff --git a/src/aiida/engine/processes/control.py b/src/aiida/engine/processes/control.py index 7cc214c76c..2ecc8477df 100644 --- a/src/aiida/engine/processes/control.py +++ b/src/aiida/engine/processes/control.py @@ -4,6 +4,7 @@ import collections import concurrent +import functools import typing as t import kiwipy @@ -135,7 +136,7 @@ def play_processes( def pause_processes( processes: list[ProcessNode] | None = None, *, - message: str = 'Paused through `aiida.engine.processes.control.pause_processes`', + msg_text: str = 'Paused through `aiida.engine.processes.control.pause_processes`', all_entries: bool = False, timeout: float = 5.0, wait: bool = False, @@ -164,13 +165,14 @@ def pause_processes( return controller = get_manager().get_process_controller() - _perform_actions(processes, controller.pause_process, 'pause', 'pausing', timeout, wait, msg=message) + action = functools.partial(controller.pause_process, msg_text=msg_text) + _perform_actions(processes, action, 'pause', 'pausing', timeout, wait) def kill_processes( processes: list[ProcessNode] | None = None, *, - message: str = 'Killed through `aiida.engine.processes.control.kill_processes`', + msg_text: str = 'Killed through `aiida.engine.processes.control.kill_processes`', all_entries: bool = False, timeout: float = 5.0, wait: bool = False, @@ -199,7 +201,8 @@ def kill_processes( return controller = get_manager().get_process_controller() - _perform_actions(processes, controller.kill_process, 'kill', 'killing', timeout, wait, msg=message) + action = functools.partial(controller.kill_process, msg_text=msg_text) + _perform_actions(processes, action, 'kill', 'killing', timeout, wait) def _perform_actions( diff --git a/src/aiida/engine/processes/functions.py b/src/aiida/engine/processes/functions.py index 8bca68f55c..7936979531 100644 --- a/src/aiida/engine/processes/functions.py +++ b/src/aiida/engine/processes/functions.py @@ -222,7 +222,7 @@ def run_get_node(*args, **kwargs) -> tuple[dict[str, t.Any] | None, 'ProcessNode if kwargs and not process_class.spec().inputs.dynamic: raise ValueError(f'{function.__name__} does not support these kwargs: {kwargs.keys()}') - process = process_class(inputs=inputs, runner=runner) + process: Process = process_class(inputs=inputs, runner=runner) # Only add handlers for interrupt signal to kill the process if we are in a local and not a daemon runner. # Without this check, running process functions in a daemon worker would be killed if the daemon is shutdown @@ -235,7 +235,7 @@ def run_get_node(*args, **kwargs) -> tuple[dict[str, t.Any] | None, 'ProcessNode def kill_process(_num, _frame): """Send the kill signal to the process in the current scope.""" LOGGER.critical('runner received interrupt, killing process %s', process.pid) - result = process.kill(msg='Process was killed because the runner received an interrupt') + result = process.kill(msg_text='Process was killed because the runner received an interrupt') return result # Store the current handler on the signal such that it can be restored after process has terminated diff --git a/src/aiida/engine/processes/process.py b/src/aiida/engine/processes/process.py index e25d1b7c23..f29d426770 100644 --- a/src/aiida/engine/processes/process.py +++ b/src/aiida/engine/processes/process.py @@ -329,7 +329,7 @@ def load_instance_state( self.node.logger.info(f'Loaded process<{self.node.pk}> from saved state') - def kill(self, msg: Union[str, None] = None) -> Union[bool, plumpy.futures.Future]: + def kill(self, msg_text: str | None = None) -> Union[bool, plumpy.futures.Future]: """Kill the process and all the children calculations it called :param msg: message @@ -338,7 +338,7 @@ def kill(self, msg: Union[str, None] = None) -> Union[bool, plumpy.futures.Futur had_been_terminated = self.has_terminated() - result = super().kill(msg) + result = super().kill(msg_text) # Only kill children if we could be killed ourselves if result is not False and not had_been_terminated: @@ -348,7 +348,7 @@ def kill(self, msg: Union[str, None] = None) -> Union[bool, plumpy.futures.Futur self.logger.info('no controller available to kill child<%s>', child.pk) continue try: - result = self.runner.controller.kill_process(child.pk, f'Killed by parent<{self.node.pk}>') + result = self.runner.controller.kill_process(child.pk, msg_text=f'Killed by parent<{self.node.pk}>') result = asyncio.wrap_future(result) # type: ignore[arg-type] if asyncio.isfuture(result): killing.append(result) diff --git a/src/aiida/engine/runners.py b/src/aiida/engine/runners.py index 42cb76244c..b19821b2e7 100644 --- a/src/aiida/engine/runners.py +++ b/src/aiida/engine/runners.py @@ -250,7 +250,7 @@ def kill_process(_num, _frame): LOGGER.warning('runner received interrupt, process %s already being killed', process_inited.pid) return LOGGER.critical('runner received interrupt, killing process %s', process_inited.pid) - process_inited.kill(msg='Process was killed because the runner received an interrupt') + process_inited.kill(msg_text='Process was killed because the runner received an interrupt') original_handler_int = signal.getsignal(signal.SIGINT) original_handler_term = signal.getsignal(signal.SIGTERM) diff --git a/tests/engine/test_rmq.py b/tests/engine/test_rmq.py index a2edc2fa41..d0a3e461fb 100644 --- a/tests/engine/test_rmq.py +++ b/tests/engine/test_rmq.py @@ -93,8 +93,7 @@ async def do_pause(): assert result assert calc_node.paused - kill_message = 'Sorry, you have to go mate' - kill_future = controller.kill_process(calc_node.pk, msg=kill_message) + kill_future = controller.kill_process(calc_node.pk, msg_text='Sorry, you have to go mate') future = await with_timeout(asyncio.wrap_future(kill_future)) result = await self.wait_future(asyncio.wrap_future(future)) assert result @@ -112,7 +111,7 @@ async def do_pause_play(): await asyncio.sleep(0.1) pause_message = 'Take a seat' - pause_future = controller.pause_process(calc_node.pk, msg=pause_message) + pause_future = controller.pause_process(calc_node.pk, msg_text=pause_message) future = await with_timeout(asyncio.wrap_future(pause_future)) result = await self.wait_future(asyncio.wrap_future(future)) assert calc_node.paused @@ -126,8 +125,7 @@ async def do_pause_play(): assert not calc_node.paused assert calc_node.process_status is None - kill_message = 'Sorry, you have to go mate' - kill_future = controller.kill_process(calc_node.pk, msg=kill_message) + kill_future = controller.kill_process(calc_node.pk, msg_text='Sorry, you have to go mate') future = await with_timeout(asyncio.wrap_future(kill_future)) result = await self.wait_future(asyncio.wrap_future(future)) assert result @@ -145,7 +143,7 @@ async def do_kill(): await asyncio.sleep(0.1) kill_message = 'Sorry, you have to go mate' - kill_future = controller.kill_process(calc_node.pk, msg=kill_message) + kill_future = controller.kill_process(calc_node.pk, msg_text=kill_message) future = await with_timeout(asyncio.wrap_future(kill_future)) result = await self.wait_future(asyncio.wrap_future(future)) assert result diff --git a/uv.lock b/uv.lock index 9aa6014660..c503a4264b 100644 --- a/uv.lock +++ b/uv.lock @@ -200,7 +200,7 @@ requires-dist = [ { name = "pg8000", marker = "extra == 'tests'", specifier = "~=1.13" }, { name = "pgsu", specifier = "~=0.3.0" }, { name = "pgtest", marker = "extra == 'tests'", specifier = "~=1.3,>=1.3.1" }, - { name = "plumpy", specifier = "~=0.22.3" }, + { name = "plumpy", specifier = "~=0.24.0" }, { name = "pre-commit", marker = "extra == 'pre-commit'", specifier = "~=3.5" }, { name = "psutil", specifier = "~=5.6" }, { name = "psycopg", extras = ["binary"], specifier = "~=3.0" }, @@ -3085,7 +3085,7 @@ name = "pexpect" version = "4.9.0" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "ptyprocess" }, + { name = "ptyprocess", marker = "python_full_version < '3.10' or sys_platform != 'win32'" }, ] sdist = { url = "https://files.pythonhosted.org/packages/42/92/cc564bf6381ff43ce1f4d06852fc19a2f11d180f23dc32d9588bee2f149d/pexpect-4.9.0.tar.gz", hash = "sha256:ee7d41123f3c9911050ea2c2dac107568dc43b2d3b0c7557a33212c398ead30f", size = 166450 } wheels = [ @@ -3241,16 +3241,16 @@ wheels = [ [[package]] name = "plumpy" -version = "0.22.3" +version = "0.24.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "kiwipy", extra = ["rmq"] }, { name = "nest-asyncio" }, { name = "pyyaml" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/ab/99/6c931d3f4697acd34cf18eb3fbfe96ed55cd0408d9be7c0f316349117a8e/plumpy-0.22.3.tar.gz", hash = "sha256:e58f45e6360f173babf04e2a4abacae9867622768ce2a126c8260db3b46372c4", size = 73582 } +sdist = { url = "https://files.pythonhosted.org/packages/d9/0c/0bb568982e461f5e428606ccbdfe6d43c11dab0e3f5a8090298feb321172/plumpy-0.24.0.tar.gz", hash = "sha256:c17c8efbd124d7f5ec2f27cb1f2c3de7901143e61551ce81f3ee22bf7e2ed42d", size = 75634 } wheels = [ - { url = "https://files.pythonhosted.org/packages/95/d9/12fd8281f494ca79d6a7a9d40099616d16415be5807959e5b024dffe8aed/plumpy-0.22.3-py3-none-any.whl", hash = "sha256:63ae6c90713f52483836a3b2b3e1941eab7ada920c303092facc27e78229bdc3", size = 74244 }, + { url = "https://files.pythonhosted.org/packages/99/d3/68c83d4774f7a4f8e8dd4e30ce34e46071706a4b4dc40d3a1ad77de793fc/plumpy-0.24.0-py3-none-any.whl", hash = "sha256:09efafe97c88c8928e73f1dc08cf02a2c4737fa767920bff23dfa26226252cc6", size = 74955 }, ] [[package]]