From 6f161dbaf93baa0e1d4d4a500ce9bdb6bbad8d0d Mon Sep 17 00:00:00 2001 From: Sebastiaan Huber Date: Wed, 8 Feb 2023 17:49:45 +0100 Subject: [PATCH] `CalcJob`: Fix bug causing exception after restarting daemon In `v2.2.0`, a feature was added to attach monitors to a `CalcJob` for which the `Waiting` (see `aiida.engine.processes.calcjob.tasks`) state of the process was modified. The `_command` attribute was moved inside the `data` argument passed to the constructor and was initialized only in the constructor. However, when the process is loaded from a serialized checkpoint, such as after a daemon restart, the constructor is not called but `load_instance_state` is. In this case, the `_command` attribute would not be set and when referenced it would cause an exception. The solution is to add the `_command` attribute to the `auto_persist` decorator as this will be called on `load_instance_state` and will reinitialize these attributes from the persisted checkpoint. A similar problem was lurking with the `_monitors` attribute which was also only initialized in the constructor. Since this has a custom data type `CalcJobMonitors` which may not be serializable by `auto_persist` a property `monitors` is added that will recreate the attribute and populate it with the `CalcJobMonitors` instance with the monitors specified in the inputs. --- aiida/engine/processes/calcjobs/tasks.py | 21 ++++++++--- .../processes/calcjobs/test_calc_job.py | 35 +++++++++++++++++++ 2 files changed, 51 insertions(+), 5 deletions(-) diff --git a/aiida/engine/processes/calcjobs/tasks.py b/aiida/engine/processes/calcjobs/tasks.py index dd062c0ee2..34138b00d8 100644 --- a/aiida/engine/processes/calcjobs/tasks.py +++ b/aiida/engine/processes/calcjobs/tasks.py @@ -430,7 +430,7 @@ async def do_kill(): return result -@plumpy.persistence.auto_persist('msg', 'data', '_monitor_result') +@plumpy.persistence.auto_persist('msg', 'data', '_command', '_monitor_result') class Waiting(plumpy.process_states.Waiting): """The waiting state for the `CalcJob` process.""" @@ -451,15 +451,26 @@ def __init__( self._monitor_result: CalcJobMonitorResult | None = None self._monitors: CalcJobMonitors | None = None - if 'monitors' in self.process.node.inputs: - self._monitors = CalcJobMonitors(self.process.node.inputs.monitors) - if isinstance(self.data, dict): self._command = self.data['command'] self._monitor_result = self.data.get('monitor_result', None) else: self._command = self.data + @property + def monitors(self) -> CalcJobMonitors | None: + """Return the collection of monitors if specified in the inputs. + + :return: Instance of ``CalcJobMonitors`` containing monitors if specified in the process' input. + """ + if not hasattr(self, '_monitors'): + self._monitors = None + + if self._monitors is None and 'monitors' in self.process.node.inputs: + self._monitors = CalcJobMonitors(self.process.node.inputs.monitors) + + return self._monitors + @property def process(self) -> 'CalcJob': """ @@ -504,7 +515,7 @@ async def execute(self) -> plumpy.process_states.State: # type: ignore[override process_status = f'Monitoring scheduler: job state {scheduler_state_string}' node.set_process_status(process_status) job_done = await self._launch_task(task_update_job, node, self.process.runner.job_manager) - monitor_result = await self._monitor_job(node, transport_queue, self._monitors) + monitor_result = await self._monitor_job(node, transport_queue, self.monitors) if monitor_result and monitor_result.action is CalcJobMonitorAction.KILL: await self._kill_job(node, transport_queue) diff --git a/tests/engine/processes/calcjobs/test_calc_job.py b/tests/engine/processes/calcjobs/test_calc_job.py index 7377900850..36b7fca7d3 100644 --- a/tests/engine/processes/calcjobs/test_calc_job.py +++ b/tests/engine/processes/calcjobs/test_calc_job.py @@ -1172,6 +1172,41 @@ def test_monitor_result_action_disable_self(get_calcjob_builder, entry_points, c assert len([record for record in caplog.records if 'Disable self.' in record.message]) == 1 +def test_restart_after_daemon_reset(get_calcjob_builder, daemon_client, submit_and_await): + """Test that a job can be restarted when it is launched and the daemon is restarted. + + This is a regression test for https://github.com/aiidateam/aiida-core/issues/5882. + """ + import time + + import plumpy + + daemon_client.start_daemon() + + # Launch a job with a one second sleep to ensure it doesn't finish before we get the chance to restart the daemon. + # A monitor is added to ensure that those are properly reinitialized in the ``Waiting`` state of the process. + builder = get_calcjob_builder() + builder.metadata.options.sleep = 1 + builder.monitors = {'monitor': orm.Dict({'entry_point': 'core.always_kill', 'disabled': True})} + node = submit_and_await(builder, plumpy.ProcessState.WAITING) + + daemon_client.restart_daemon(wait=True) + + start_time = time.time() + timeout = 10 + + while node.process_state not in [plumpy.ProcessState.FINISHED, plumpy.ProcessState.EXCEPTED]: + + if node.is_excepted: + raise AssertionError(f'The process excepted: {node.exception}') + + if time.time() - start_time >= timeout: + raise AssertionError(f'process failed to terminate within timeout, current state: {node.process_state}') + + assert node.is_finished, node.process_state + assert node.is_finished_ok, node.exit_status + + class TestImport: """Test the functionality to import existing calculations completed outside of AiiDA."""