Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the execmanager.submit_calculation idempotent'ish #3188

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions aiida/engine/daemon/execmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,27 +236,36 @@ def upload_calculation(node, transport, calc_info, script_filename, dry_run=Fals


def submit_calculation(calculation, transport, calc_info, script_filename):
"""
Submit a calculation
"""Submit a previously uploaded `CalcJob` to the scheduler.

:param calculation: the instance of CalcJobNode to submit.
:param transport: an already opened transport to use to submit the calculation.
:param calc_info: the calculation info datastructure returned by `CalcJobNode._presubmit`
:param script_filename: the job launch script returned by `CalcJobNode._presubmit`
:return: the job id as returned by the scheduler `submit_from_script` call
"""
job_id = calculation.get_job_id()

# If the `job_id` attribute is already set, that means this function was already executed once and the scheduler
# submit command was successful as the job id it returned was set on the node. This scenario can happen when the
# daemon runner gets shutdown right after accomplishing the submission task, but before it gets the chance to
# finalize the state transition of the `CalcJob` to the `UPDATE` transport task. Since the job is already submitted
# we do not want to submit it a second time, so we simply return the existing job id here.
if job_id is not None:
return job_id

scheduler = calculation.computer.get_scheduler()
scheduler.set_transport(transport)

workdir = calculation.get_remote_workdir()
job_id = scheduler.submit_from_script(workdir, script_filename)
calculation.set_job_id(job_id)

return job_id


def retrieve_calculation(calculation, transport, retrieved_temporary_folder):
"""
Retrieve all the files of a completed job calculation using the given transport.
"""Retrieve all the files of a completed job calculation using the given transport.

If the job defined anything in the `retrieve_temporary_list`, those entries will be stored in the
`retrieved_temporary_folder`. The caller is responsible for creating and destroying this folder.
Expand Down