Skip to content

Commit

Permalink
Addition of Flux connection to try/catch (#375)
Browse files Browse the repository at this point in the history
  • Loading branch information
Francesco Di Natale authored Aug 19, 2021
1 parent dc7bf7a commit b172457
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 39 deletions.
3 changes: 3 additions & 0 deletions maestrowf/datastructures/core/executiongraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import getpass
import logging
import os
import random
import shutil
import tempfile
from time import sleep
from filelock import FileLock, Timeout

from maestrowf.abstracts import PickleInterface
Expand Down Expand Up @@ -599,6 +601,7 @@ def _execute_record(self, record, adapter, restart=False):
# Increment the number of restarts we've attempted.
LOGGER.debug("Completed submission attempt %d", num_restarts)
num_restarts += 1
sleep((random.random() + 1) * num_restarts)

if retcode == SubmissionCode.OK:
self.in_progress.add(record.name)
Expand Down
84 changes: 45 additions & 39 deletions maestrowf/interfaces/script/_flux/flux0_26_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,46 +25,46 @@ def submit(
cls, nodes, procs, cores_per_task, path, cwd, walltime,
ngpus=0, job_name=None, force_broker=True
):
cls.connect_to_flux()

# NOTE: This previously placed everything under a broker. However,
# if there's a job that schedules items to Flux, it will schedule all
# new jobs to the sub-broker. Sometimes this is desired, but it's
# incorrect to make that the general case. If we are asking for a
# single node, don't use a broker -- but introduce a flag that can
# force a single node to run in a broker.

if force_broker or nodes > 1:
LOGGER.debug(
"Launch under Flux sub-broker. [force_broker=%s, nodes=%d]",
force_broker, nodes
)
ngpus_per_slot = int(ceil(ngpus / nodes))
jobspec = flux.job.JobspecV1.from_nest_command(
[path], num_nodes=nodes, cores_per_slot=cores_per_task,
num_slots=procs, gpus_per_slot=ngpus_per_slot)
else:
LOGGER.debug(
"Launch under root Flux broker. [force_broker=%s, nodes=%d]",
force_broker, nodes
)
jobspec = flux.job.JobspecV1.from_command(
[path], num_tasks=procs, num_nodes=nodes,
cores_per_task=cores_per_task, gpus_per_task=ngpus)

LOGGER.debug("Handle address -- %s", hex(id(cls.flux_handle)))
if job_name:
jobspec.setattr("system.job.name", job_name)
jobspec.cwd = cwd
jobspec.environment = dict(os.environ)

if walltime > 0:
jobspec.duration = walltime

jobspec.stdout = f"{job_name}.{{{{id}}}}.out"
jobspec.stderr = f"{job_name}.{{{{id}}}}.err"

try:
cls.connect_to_flux()

# NOTE: This previously placed everything under a broker. However,
# if there's a job that schedules items to Flux, it will schedule
# all new jobs to the sub-broker. Sometimes this is desired, but
# it's incorrect to make that the general case. If we are asking
# for a single node, don't use a broker -- but introduce a flag
# that can force a single node to run in a broker.

if force_broker or nodes > 1:
LOGGER.debug(
"Launch under Flux sub-broker. [force_broker=%s, "
"nodes=%d]", force_broker, nodes
)
ngpus_per_slot = int(ceil(ngpus / nodes))
jobspec = flux.job.JobspecV1.from_nest_command(
[path], num_nodes=nodes, cores_per_slot=cores_per_task,
num_slots=procs, gpus_per_slot=ngpus_per_slot)
else:
LOGGER.debug(
"Launch under root Flux broker. [force_broker=%s, "
"nodes=%d]", force_broker, nodes
)
jobspec = flux.job.JobspecV1.from_command(
[path], num_tasks=procs, num_nodes=nodes,
cores_per_task=cores_per_task, gpus_per_task=ngpus)

LOGGER.debug("Handle address -- %s", hex(id(cls.flux_handle)))
if job_name:
jobspec.setattr("system.job.name", job_name)
jobspec.cwd = cwd
jobspec.environment = dict(os.environ)

if walltime > 0:
jobspec.duration = walltime

jobspec.stdout = f"{job_name}.{{{{id}}}}.out"
jobspec.stderr = f"{job_name}.{{{{id}}}}.err"

# Submit our job spec.
jobid = \
flux.job.submit(cls.flux_handle, jobspec, waitable=True)
Expand All @@ -73,6 +73,12 @@ def submit(

LOGGER.info("Submission returned status OK. -- "
"Assigned identifier (%s)", jobid)
except ConnectionResetError as exception:
LOGGER.error(
"Submission failed -- Message (%s).", exception)
jobid = -1
retcode = -2
submit_status = SubmissionCode.ERROR
except Exception as exception:
LOGGER.error(
"Submission failed -- Message (%s).", exception)
Expand Down

0 comments on commit b172457

Please sign in to comment.