Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CalcJob: Fix bug causing exception after restarting daemon #5886

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions aiida/engine/processes/calcjobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ async def do_kill():
return result


@plumpy.persistence.auto_persist('msg', 'data', '_monitor_result')
@plumpy.persistence.auto_persist('msg', 'data', '_command', '_monitor_result')
class Waiting(plumpy.process_states.Waiting):
"""The waiting state for the `CalcJob` process."""

Expand All @@ -451,15 +451,26 @@ def __init__(
self._monitor_result: CalcJobMonitorResult | None = None
self._monitors: CalcJobMonitors | None = None

if 'monitors' in self.process.node.inputs:
self._monitors = CalcJobMonitors(self.process.node.inputs.monitors)

if isinstance(self.data, dict):
self._command = self.data['command']
self._monitor_result = self.data.get('monitor_result', None)
else:
self._command = self.data

@property
def monitors(self) -> CalcJobMonitors | None:
"""Return the collection of monitors if specified in the inputs.

:return: Instance of ``CalcJobMonitors`` containing monitors if specified in the process' input.
"""
if not hasattr(self, '_monitors'):
self._monitors = None

if self._monitors is None and 'monitors' in self.process.node.inputs:
self._monitors = CalcJobMonitors(self.process.node.inputs.monitors)

return self._monitors

@property
def process(self) -> 'CalcJob':
"""
Expand Down Expand Up @@ -504,7 +515,7 @@ async def execute(self) -> plumpy.process_states.State: # type: ignore[override
process_status = f'Monitoring scheduler: job state {scheduler_state_string}'
node.set_process_status(process_status)
job_done = await self._launch_task(task_update_job, node, self.process.runner.job_manager)
monitor_result = await self._monitor_job(node, transport_queue, self._monitors)
monitor_result = await self._monitor_job(node, transport_queue, self.monitors)

if monitor_result and monitor_result.action is CalcJobMonitorAction.KILL:
await self._kill_job(node, transport_queue)
Expand Down
35 changes: 35 additions & 0 deletions tests/engine/processes/calcjobs/test_calc_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1172,6 +1172,41 @@ def test_monitor_result_action_disable_self(get_calcjob_builder, entry_points, c
assert len([record for record in caplog.records if 'Disable self.' in record.message]) == 1


def test_restart_after_daemon_reset(get_calcjob_builder, daemon_client, submit_and_await):
"""Test that a job can be restarted when it is launched and the daemon is restarted.

This is a regression test for https://github.com/aiidateam/aiida-core/issues/5882.
"""
import time

import plumpy

daemon_client.start_daemon()

# Launch a job with a one second sleep to ensure it doesn't finish before we get the chance to restart the daemon.
# A monitor is added to ensure that those are properly reinitialized in the ``Waiting`` state of the process.
builder = get_calcjob_builder()
builder.metadata.options.sleep = 1
builder.monitors = {'monitor': orm.Dict({'entry_point': 'core.always_kill', 'disabled': True})}
node = submit_and_await(builder, plumpy.ProcessState.WAITING)

daemon_client.restart_daemon(wait=True)

start_time = time.time()
timeout = 10

while node.process_state not in [plumpy.ProcessState.FINISHED, plumpy.ProcessState.EXCEPTED]:

if node.is_excepted:
raise AssertionError(f'The process excepted: {node.exception}')

if time.time() - start_time >= timeout:
raise AssertionError(f'process failed to terminate within timeout, current state: {node.process_state}')

assert node.is_finished, node.process_state
assert node.is_finished_ok, node.exit_status


class TestImport:
"""Test the functionality to import existing calculations completed outside of AiiDA."""

Expand Down