Skip to content

Commit

Permalink
Make the execmanager.submit_calculation idempotent'ish (#3188)
Browse files Browse the repository at this point in the history
The `submit_calculation` is responsible for submitting a job, that was
already uploaded to the remote machine, to the scheduler. There was a
risk of the daemon submitting the same job twice. If the first time
around it successfully submitted the job and set the job id as an
attribute on the node, but then was interrupted (for example due to a
shutdown) before it could transition the `CalcJob` to the `UPDATE`
transport task, once reloaded the command would be executed again. This
would not complain and simply submit the job again to the scheduler.
This is both a waste of resources and can cause various complications.

It is impossible to make the function fully idempotent, but by checking
if the job id attribute is already set for the node at the beginning and
returning it should minimize the risk of double submission.
  • Loading branch information
sphuber authored Jul 19, 2019
1 parent 9718480 commit b7a161d
Showing 1 changed file with 13 additions and 4 deletions.
17 changes: 13 additions & 4 deletions aiida/engine/daemon/execmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,27 +236,36 @@ def upload_calculation(node, transport, calc_info, script_filename, dry_run=Fals


def submit_calculation(calculation, transport, calc_info, script_filename):
"""
Submit a calculation
"""Submit a previously uploaded `CalcJob` to the scheduler.
:param calculation: the instance of CalcJobNode to submit.
:param transport: an already opened transport to use to submit the calculation.
:param calc_info: the calculation info datastructure returned by `CalcJobNode._presubmit`
:param script_filename: the job launch script returned by `CalcJobNode._presubmit`
:return: the job id as returned by the scheduler `submit_from_script` call
"""
job_id = calculation.get_job_id()

# If the `job_id` attribute is already set, that means this function was already executed once and the scheduler
# submit command was successful as the job id it returned was set on the node. This scenario can happen when the
# daemon runner gets shutdown right after accomplishing the submission task, but before it gets the chance to
# finalize the state transition of the `CalcJob` to the `UPDATE` transport task. Since the job is already submitted
# we do not want to submit it a second time, so we simply return the existing job id here.
if job_id is not None:
return job_id

scheduler = calculation.computer.get_scheduler()
scheduler.set_transport(transport)

workdir = calculation.get_remote_workdir()
job_id = scheduler.submit_from_script(workdir, script_filename)
calculation.set_job_id(job_id)

return job_id


def retrieve_calculation(calculation, transport, retrieved_temporary_folder):
"""
Retrieve all the files of a completed job calculation using the given transport.
"""Retrieve all the files of a completed job calculation using the given transport.
If the job defined anything in the `retrieve_temporary_list`, those entries will be stored in the
`retrieved_temporary_folder`. The caller is responsible for creating and destroying this folder.
Expand Down

0 comments on commit b7a161d

Please sign in to comment.