diff --git a/sequence_processing_pipeline/Job.py b/sequence_processing_pipeline/Job.py index af04ef9c..1c1a7593 100644 --- a/sequence_processing_pipeline/Job.py +++ b/sequence_processing_pipeline/Job.py @@ -12,6 +12,27 @@ class Job: + slurm_status_terminated = ['BOOT_FAIL', 'CANCELLED', 'DEADLINE', 'FAILED', + 'NODE_FAIL', 'OUT_OF_MEMORY', 'PREEMPTED', + 'REVOKED', 'TIMEOUT'] + + slurm_status_successful = ['COMPLETED'] + + slurm_status_running = ['COMPLETING', 'CONFIGURING', 'PENDING', 'REQUEUED', + 'REQUEUE_FED', 'REQUEUE_HOLD', 'RESIZING', + 'RESV_DEL_HOLD', 'RUNNING', 'SIGNALING', + 'SPECIAL_EXIT', 'STAGE_OUT', 'STOPPED', + 'SUSPENDED'] + + slurm_status_not_running = (slurm_status_terminated + + slurm_status_successful) + + slurm_status_all_states = (slurm_status_terminated + + slurm_status_successful + + slurm_status_running) + + polling_interval_in_seconds = 60 + def __init__(self, root_dir, output_path, job_name, executable_paths, max_array_length, modules_to_load=None): """ @@ -191,20 +212,96 @@ def _system_call(self, cmd, allow_return_codes=[], callback=None): return {'stdout': stdout, 'stderr': stderr, 'return_code': return_code} + def wait_on_job_ids(self, job_ids, callback=None): + ''' + Wait for the given job-ids to finish running before returning. + :param job_ids: A list of Slurm job-ids + :param callback: Set callback function that receives status updates. + :return: A dictionary of job-ids and their current statuses. + ''' + + # wait_on_job_ids was broken out of submit_job() and updated to monitor + # multiple job ids. This will allow multiple jobs to be submitted to + # Slurm in parallel and a single wait_on_job_ids() can wait on all of + # them before returning, optionally submitting callbacks for each + # job-id. + + def query_slurm(job_ids): + # internal function query_slurm encapsulates the handling of + # squeue. + count = 0 + while True: + result = self._system_call("squeue -t all -j " + f"{','.join(job_ids)} " + "-o '%F,%A,%T'") + + if result['return_code'] == 0: + # there was no issue w/squeue, break this loop and + # continue. + break + else: + # there was a likely intermittent issue w/squeue. Pause + # and wait before trying a few more times. If the problem + # persists then report the error and exit. + count += 1 + + if count > 3: + raise ExecFailedError(result['stderr']) + + sleep(60) + + lines = result['stdout'].split('\n') + lines.pop(0) # remove header + lines = [x.split(',') for x in lines if x != ''] + + jobs = {} + child_jobs = {} + for job_id, unique_id, state in lines: + jobs[unique_id] = state + + if unique_id != job_id: + child_jobs[unique_id] = job_id # job is a child job + + return jobs, child_jobs + + while True: + jobs, child_jobs = query_slurm(job_ids) + + for jid in job_ids: + logging.debug("JOB %s: %s" % (jid, jobs[jid])) + if callback is not None: + callback(jid=jid, status=jobs[jid]) + + children = [x for x in child_jobs if child_jobs[x] == jid] + if len(children) == 0: + logging.debug("\tNO CHILDREN") + for cid in children: + logging.debug("\tCHILD JOB %s: %s" % (cid, jobs[cid])) + status = [jobs[x] in Job.slurm_status_not_running for x in job_ids] + + if set(status) == {True}: + # all jobs either completed successfully or terminated. + break + + sleep(Job.polling_interval_in_seconds) + + return jobs + def submit_job(self, script_path, job_parameters=None, script_parameters=None, wait=True, exec_from=None, callback=None): """ - Submit a Torque job script and optionally wait for it to finish. - :param script_path: The path to a Torque job (bash) script. + Submit a Slurm job script and optionally wait for it to finish. + :param script_path: The path to a Slurm job (bash) script. :param job_parameters: Optional parameters for scheduler submission. :param script_parameters: Optional parameters for your job script. :param wait: Set to False to submit job and not wait. :param exec_from: Set working directory to execute command from. :param callback: Set callback function that receives status updates. - :return: Dictionary containing the job's id, name, status, and - elapsed time. Raises PipelineError if job could not be submitted or - if job was unsuccessful. + :return: If wait is True, a dictionary containing the job's id and + status. If wait is False, the Slurm job-id of the submitted + job. Raises PipelineError if job could not be submitted or if + job was unsuccessful. """ if job_parameters: cmd = 'sbatch %s %s' % (job_parameters, script_path) @@ -230,95 +327,35 @@ def submit_job(self, script_path, job_parameters=None, job_id = stdout.strip().split()[-1] - job_info = {'job_id': None, 'job_name': None, 'job_state': None, - 'elapsed_time': None} # Just to give some time for everything to be set up properly sleep(10) - exit_count = 0 - - while wait: - result = self._system_call(f"sacct -P -n --job {job_id} --format " - "JobID,JobName,State,Elapsed,ExitCode") - - if result['return_code'] != 0: - # sacct did not successfully submit the job. - raise ExecFailedError(result['stderr']) - - # [-1] remove the extra \n - jobs_data = result['stdout'].split('\n')[:-1] - states = dict() - estatuses = dict() - for i, jd in enumerate(jobs_data): - jid, jname, jstate, etime, estatus = jd.split('|') - if jid.endswith('.extern') or jid.endswith('.batch'): - continue - - if i == 0: - job_info['job_id'] = jid - job_info['job_name'] = jname - job_info['elapsed_time'] = etime - job_info['exit_status'] = estatus - - if jstate not in states: - states[jstate] = 0 - states[jstate] += 1 - - if estatus not in estatuses: - estatuses[estatus] = 0 - estatuses[estatus] += 1 + if wait is False: + # return job_id since that is the only information for this new + # job that we have available. User should expect that this is + # not a dict if they explicitly set wait=False. + return job_id - job_info['job_state'] = f'{states}' - job_info['exit_status'] = f'{estatuses}' + # the user is expecting a dict with 'job_id' and 'job_state' + # attributes. This method will return a dict w/job_ids as keys and + # their job status as values. This must be munged before returning + # to the user. + results = self.wait_on_job_ids([job_id], callback=callback) - if callback is not None: - callback(jid=job_id, status=f'{states}') - - logging.debug("Job info: %s" % job_info) - - # if job is completed after having run or exited after having - # run, then stop waiting. - if not set(states) - {'COMPLETED', 'FAILED', 'CANCELLED'}: - # break - exit_count += 1 + job_result = {'job_id': job_id, 'job_state': results[job_id]} - if exit_count > 4: - break - - sleep(10) + if callback is not None: + callback(jid=job_id, status=job_result['job_state']) - if job_info['job_id'] is not None: - # job was once in the queue - if callback is not None: - callback(jid=job_id, status=job_info['job_state']) - - if set(states) == {'COMPLETED'}: - if 'exit_status' in job_info: - if set(estatuses) == {'0:0'}: - # job completed successfully - return job_info - else: - exit_status = job_info['exit_status'] - raise JobFailedError(f"job {job_id} exited with exit_" - f"status {exit_status}") - else: - # with no other info, assume job completed successfully - return job_info - else: - # job exited unsuccessfully - raise JobFailedError(f"job {job_id} exited with status " - f"{job_info['job_state']}") + if job_result['job_state'] == 'COMPLETED': + return job_result else: - # job was never in the queue - return an error. - if callback is not None: - callback(jid=job_id, status='ERROR') - - raise JobFailedError(f"job {job_id} never appeared in the " - "queue.") + raise JobFailedError(f"job {job_id} exited with status " + f"{job_result['job_state']}") def _group_commands(self, cmds): # break list of commands into chunks of max_array_length (Typically - # 1000 for Torque job arrays). To ensure job arrays are never more + # 1000 for Slurm job arrays). To ensure job arrays are never more # than 1000 jobs long, we'll chain additional commands together, and # evenly distribute them amongst the first 1000. cmds.sort()