From b172457b6d20c01cbfa29dfc691ff20ddaadfe09 Mon Sep 17 00:00:00 2001 From: Francesco Di Natale Date: Wed, 18 Aug 2021 18:28:01 -0700 Subject: [PATCH] Addition of Flux connection to try/catch (#375) --- .../datastructures/core/executiongraph.py | 3 + .../interfaces/script/_flux/flux0_26_0.py | 84 ++++++++++--------- 2 files changed, 48 insertions(+), 39 deletions(-) diff --git a/maestrowf/datastructures/core/executiongraph.py b/maestrowf/datastructures/core/executiongraph.py index 40b0a7ab..0712fe8d 100644 --- a/maestrowf/datastructures/core/executiongraph.py +++ b/maestrowf/datastructures/core/executiongraph.py @@ -4,8 +4,10 @@ import getpass import logging import os +import random import shutil import tempfile +from time import sleep from filelock import FileLock, Timeout from maestrowf.abstracts import PickleInterface @@ -599,6 +601,7 @@ def _execute_record(self, record, adapter, restart=False): # Increment the number of restarts we've attempted. LOGGER.debug("Completed submission attempt %d", num_restarts) num_restarts += 1 + sleep((random.random() + 1) * num_restarts) if retcode == SubmissionCode.OK: self.in_progress.add(record.name) diff --git a/maestrowf/interfaces/script/_flux/flux0_26_0.py b/maestrowf/interfaces/script/_flux/flux0_26_0.py index 85f451bf..0b14ca1f 100644 --- a/maestrowf/interfaces/script/_flux/flux0_26_0.py +++ b/maestrowf/interfaces/script/_flux/flux0_26_0.py @@ -25,46 +25,46 @@ def submit( cls, nodes, procs, cores_per_task, path, cwd, walltime, ngpus=0, job_name=None, force_broker=True ): - cls.connect_to_flux() - - # NOTE: This previously placed everything under a broker. However, - # if there's a job that schedules items to Flux, it will schedule all - # new jobs to the sub-broker. Sometimes this is desired, but it's - # incorrect to make that the general case. If we are asking for a - # single node, don't use a broker -- but introduce a flag that can - # force a single node to run in a broker. - - if force_broker or nodes > 1: - LOGGER.debug( - "Launch under Flux sub-broker. [force_broker=%s, nodes=%d]", - force_broker, nodes - ) - ngpus_per_slot = int(ceil(ngpus / nodes)) - jobspec = flux.job.JobspecV1.from_nest_command( - [path], num_nodes=nodes, cores_per_slot=cores_per_task, - num_slots=procs, gpus_per_slot=ngpus_per_slot) - else: - LOGGER.debug( - "Launch under root Flux broker. [force_broker=%s, nodes=%d]", - force_broker, nodes - ) - jobspec = flux.job.JobspecV1.from_command( - [path], num_tasks=procs, num_nodes=nodes, - cores_per_task=cores_per_task, gpus_per_task=ngpus) - - LOGGER.debug("Handle address -- %s", hex(id(cls.flux_handle))) - if job_name: - jobspec.setattr("system.job.name", job_name) - jobspec.cwd = cwd - jobspec.environment = dict(os.environ) - - if walltime > 0: - jobspec.duration = walltime - - jobspec.stdout = f"{job_name}.{{{{id}}}}.out" - jobspec.stderr = f"{job_name}.{{{{id}}}}.err" - try: + cls.connect_to_flux() + + # NOTE: This previously placed everything under a broker. However, + # if there's a job that schedules items to Flux, it will schedule + # all new jobs to the sub-broker. Sometimes this is desired, but + # it's incorrect to make that the general case. If we are asking + # for a single node, don't use a broker -- but introduce a flag + # that can force a single node to run in a broker. + + if force_broker or nodes > 1: + LOGGER.debug( + "Launch under Flux sub-broker. [force_broker=%s, " + "nodes=%d]", force_broker, nodes + ) + ngpus_per_slot = int(ceil(ngpus / nodes)) + jobspec = flux.job.JobspecV1.from_nest_command( + [path], num_nodes=nodes, cores_per_slot=cores_per_task, + num_slots=procs, gpus_per_slot=ngpus_per_slot) + else: + LOGGER.debug( + "Launch under root Flux broker. [force_broker=%s, " + "nodes=%d]", force_broker, nodes + ) + jobspec = flux.job.JobspecV1.from_command( + [path], num_tasks=procs, num_nodes=nodes, + cores_per_task=cores_per_task, gpus_per_task=ngpus) + + LOGGER.debug("Handle address -- %s", hex(id(cls.flux_handle))) + if job_name: + jobspec.setattr("system.job.name", job_name) + jobspec.cwd = cwd + jobspec.environment = dict(os.environ) + + if walltime > 0: + jobspec.duration = walltime + + jobspec.stdout = f"{job_name}.{{{{id}}}}.out" + jobspec.stderr = f"{job_name}.{{{{id}}}}.err" + # Submit our job spec. jobid = \ flux.job.submit(cls.flux_handle, jobspec, waitable=True) @@ -73,6 +73,12 @@ def submit( LOGGER.info("Submission returned status OK. -- " "Assigned identifier (%s)", jobid) + except ConnectionResetError as exception: + LOGGER.error( + "Submission failed -- Message (%s).", exception) + jobid = -1 + retcode = -2 + submit_status = SubmissionCode.ERROR except Exception as exception: LOGGER.error( "Submission failed -- Message (%s).", exception)