Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the execmanager.retrieve_calculation idempotent'ish #3142

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions aiida/engine/daemon/execmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import

import os

import warnings
from six.moves import zip

from aiida.common import AIIDA_LOGGER, exceptions
Expand Down Expand Up @@ -253,17 +253,22 @@ def retrieve_calculation(calculation, transport, retrieved_temporary_folder):
listed, if any, in the `retrieved_temporary_folder` of the jobs CalcInfo
"""
logger_extra = get_dblogger_extra(calculation)

execlogger.debug("Retrieving calc {}".format(calculation.pk), extra=logger_extra)
workdir = calculation.get_remote_workdir()

execlogger.debug(
"[retrieval of calc {}] chdir {}".format(calculation.pk, workdir),
extra=logger_extra)

# Create the FolderData node to attach everything to
execlogger.debug("Retrieving calc {}".format(calculation.pk), extra=logger_extra)
execlogger.debug("[retrieval of calc {}] chdir {}".format(calculation.pk, workdir), extra=logger_extra)

# If the calculation already has a `retrieved` folder, simply return. The retrieval was apparently already completed
# before, which can happen if the daemon is restarted and it shuts down after retrieving but before getting the
# chance to perform the state transition. Upon reloading this calculation, it will re-attempt the retrieval.
link_label = calculation.link_label_retrieved
if calculation.get_outgoing(FolderData, link_label_filter=link_label).first():
execlogger.warning('CalcJobNode<{}> already has a `{}` output folder: skipping retrieval'.format(
calculation.pk, link_label))
return

# Create the FolderData node into which to store the files that are to be retrieved
retrieved_files = FolderData()
retrieved_files.add_incoming(calculation, link_type=LinkType.CREATE, link_label=calculation.link_label_retrieved)

with transport:
transport.chdir(workdir)
Expand Down Expand Up @@ -300,6 +305,11 @@ def retrieve_calculation(calculation, transport, retrieved_temporary_folder):
extra=logger_extra)
retrieved_files.store()

# Make sure that attaching the `retrieved` folder with a link is the last thing we do. This gives the biggest chance
# of making this method idempotent. That is to say, if a runner get's interrupted during this action, it will simply
# retry the retrieval, unless we got here and managed to link it up, in which case we move to the next task.
retrieved_files.add_incoming(calculation, link_type=LinkType.CREATE, link_label=calculation.link_label_retrieved)


def kill_calculation(calculation, transport):
"""
Expand Down
6 changes: 3 additions & 3 deletions aiida/orm/utils/links.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,12 @@ def one(self):
def first(self):
"""Return the first entry from the iterator.

:return: LinkTriple instance
:raises ValueError: if the iterator contains anything but one entry
:return: LinkTriple instance or None if no entries were matched
"""
if self.link_triples:
return self.link_triples[0]
raise ValueError('no entries found')

return None

def all(self):
"""Return all entries from the list.
Expand Down