diff --git a/.travis.yml b/.travis.yml index 2cea3189e..5bce062ab 100644 --- a/.travis.yml +++ b/.travis.yml @@ -55,7 +55,7 @@ env: # - NUMPY_VERSION=1.10 # - SCIPY_VERSION=0.16 - ASTROPY_VERSION=1.2.1 - # - SPHINX_VERSION=1.6 + - SPHINX_VERSION=1.5 - DESIUTIL_VERSION=1.9.5 - SPECLITE_VERSION=0.5 - SPECTER_VERSION=0.6.0 @@ -65,20 +65,21 @@ env: - DESIMODEL_DATA=trunk # - HARP_VERSION=1.0.1 # - SPECEX_VERSION=0.3.9 + - REDROCK_VERSION=master - MAIN_CMD='python setup.py' # Additional conda channels to use. - CONDA_CHANNELS="openastronomy" # These packages will always be installed. - CONDA_DEPENDENCIES="qt=4" # These packages will only be installed if we really need them. - - CONDA_ALL_DEPENDENCIES="scipy matplotlib sqlalchemy coverage==3.7.1 pyyaml healpy" + - CONDA_ALL_DEPENDENCIES="scipy matplotlib sqlalchemy coverage==3.7.1 pyyaml healpy numba numpy" # These packages will always be installed. - PIP_DEPENDENCIES="" # These packages will only be installed if we really need them. - PIP_ALL_DEPENDENCIES="speclite==${SPECLITE_VERSION} coveralls" # These pip packages need to be installed in a certain order, so we # do that separately from the astropy/ci-helpers scripts. - - DESIHUB_PIP_DEPENDENCIES="desiutil=${DESIUTIL_VERSION} specter=${SPECTER_VERSION} desimodel=${DESIMODEL_VERSION}" + - DESIHUB_PIP_DEPENDENCIES="desiutil=${DESIUTIL_VERSION} specter=${SPECTER_VERSION} desimodel=${DESIMODEL_VERSION} redrock=${REDROCK_VERSION}" # Debug the Travis install process. - DEBUG=False matrix: diff --git a/doc/conf.py b/doc/conf.py index af62d9940..07927d5b0 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -157,9 +157,9 @@ 'matplotlib.cm', 'matplotlib.gridspec', 'matplotlib.collections', - 'numpy', - 'numpy.polynomial', - 'numpy.polynomial.legendre', + # 'numpy', + # 'numpy.polynomial', + # 'numpy.polynomial.legendre', 'scipy', 'scipy.constants', 'scipy.interpolate', diff --git a/py/desispec/io/__init__.py b/py/desispec/io/__init__.py index 40f9e5704..027ed0fd6 100644 --- a/py/desispec/io/__init__.py +++ b/py/desispec/io/__init__.py @@ -24,7 +24,7 @@ rawdata_root, specprod_root, validate_night, get_pipe_plandir, get_pipe_rundir, get_pipe_scriptdir, get_pipe_logdir, get_pipe_faildir, get_reduced_frames, - get_nights, find_exposure_night) + get_nights, find_exposure_night, get_pipe_redshiftdir) from .params import read_params from .qa import (read_qa_frame, read_qa_data, write_qa_frame, write_qa_brick, load_qa_frame, write_qa_exposure, write_qa_prod) diff --git a/py/desispec/io/meta.py b/py/desispec/io/meta.py index 1e46f17e4..3aac16fa2 100644 --- a/py/desispec/io/meta.py +++ b/py/desispec/io/meta.py @@ -458,3 +458,15 @@ def get_pipe_faildir(): The name of the subdirectory. """ return "failed" + + +def get_pipe_redshiftdir(): + """ + Return the name of the subdirectory containing pipeline redshift + log files. + + Returns (str): + The name of the subdirectory. + """ + return "redshift" + diff --git a/py/desispec/pipeline/common.py b/py/desispec/pipeline/common.py index 0da3c0390..a6d90c16a 100644 --- a/py/desispec/pipeline/common.py +++ b/py/desispec/pipeline/common.py @@ -95,7 +95,7 @@ "stdstars" : "Stdstars", "fluxcal" : "Fluxcal", "calibrate" : "Procexp", - "redshift" : "Redmonster" + "redshift" : "Redrock" } """The default worker type for each pipeline step.""" diff --git a/py/desispec/pipeline/plan.py b/py/desispec/pipeline/plan.py index 3e24dfe22..1515e568f 100644 --- a/py/desispec/pipeline/plan.py +++ b/py/desispec/pipeline/plan.py @@ -134,6 +134,10 @@ class names and the values are dictionaries that are if not os.path.isdir(faildir): os.makedirs(faildir) + failreddir = os.path.join(faildir, io.get_pipe_redshiftdir()) + if not os.path.isdir(failreddir): + os.makedirs(failreddir) + scriptdir = os.path.join(rundir, io.get_pipe_scriptdir()) if not os.path.isdir(scriptdir): os.makedirs(scriptdir) @@ -142,6 +146,10 @@ class names and the values are dictionaries that are if not os.path.isdir(logdir): os.makedirs(logdir) + logreddir = os.path.join(logdir, io.get_pipe_redshiftdir()) + if not os.path.isdir(logreddir): + os.makedirs(logreddir) + optfile = os.path.join(rundir, "options.yaml") if not os.path.isfile(optfile): opts = default_options(extra=extra) diff --git a/py/desispec/pipeline/run.py b/py/desispec/pipeline/run.py index 02e992a80..0cbd2ae7d 100644 --- a/py/desispec/pipeline/run.py +++ b/py/desispec/pipeline/run.py @@ -150,7 +150,7 @@ def run_step(step, grph, opts, comm=None): else: group_ntask = 0 else: - if step == "zfind": + if step == "redshift": # We load balance the spectra across process groups based # on the number of targets per group. All groups with # < taskproc targets are weighted the same. @@ -163,11 +163,11 @@ def run_step(step, grph, opts, comm=None): worksizes = [ taskproc if (x < taskproc) else x for x in spectrasizes ] if rank == 0: - log.debug("zfind {} groups".format(ngroup)) + log.debug("redshift {} groups".format(ngroup)) workstr = "" for w in worksizes: workstr = "{}{} ".format(workstr, w) - log.debug("zfind work sizes = {}".format(workstr)) + log.debug("redshift work sizes = {}".format(workstr)) group_firsttask, group_ntask = dist_discrete(worksizes, ngroup, group) @@ -215,8 +215,23 @@ def run_step(step, grph, opts, comm=None): group_failcount += 1 continue - nfaildir = os.path.join(faildir, night) - nlogdir = os.path.join(logdir, night) + nfaildir = None + nlogdir = None + if step == "redshift": + ztype, nstr, pstr = graph_name_split(gname) + subdir = io.util.healpix_subdirectory(int(nstr), + int(pstr)) + nfaildir = os.path.join(faildir, io.get_pipe_redshiftdir(), + subdir) + nlogdir = os.path.join(logdir, io.get_pipe_redshiftdir(), subdir) + if group_rank == 0: + if not os.path.isdir(nfaildir): + os.makedirs(nfaildir) + if not os.path.isdir(nlogdir): + os.makedirs(nlogdir) + else: + nfaildir = os.path.join(faildir, night) + nlogdir = os.path.join(logdir, night) tgraph = graph_slice(grph, names=[tasks[t]], deps=True) ffile = os.path.join(nfaildir, "{}_{}.yaml".format(step, tasks[t])) @@ -565,37 +580,54 @@ def nersc_job(host, path, logroot, desisetup, commands, nodes=1, \ f.write("#SBATCH --constraint=haswell\n") elif host == "coriknl": f.write("#SBATCH --constraint=knl,quad,cache\n") + f.write("#SBATCH --core-spec=4\n") f.write("#SBATCH --account=desi\n") f.write("#SBATCH --nodes={}\n".format(totalnodes)) f.write("#SBATCH --time={}\n".format(timestr)) f.write("#SBATCH --job-name={}\n".format(jobname)) f.write("#SBATCH --output={}_%j.log\n".format(logroot)) + f.write("echo Starting slurm script at `date`\n\n") f.write("source {}\n\n".format(desisetup)) + f.write("# Set TMPDIR to be on the ramdisk\n") f.write("export TMPDIR=/dev/shm\n\n") - f.write("node_cores=0\n") - f.write("if [ ${NERSC_HOST} = edison ]; then\n") - f.write(" node_cores=24\n") - f.write("else\n") - f.write(" node_cores=32\n") - f.write("fi\n") - f.write("\n") + + if host == "edison": + f.write("cpu_per_core=2\n\n") + f.write("node_cores=24\n\n") + elif host == "cori": + f.write("cpu_per_core=2\n\n") + f.write("node_cores=32\n\n") + elif host == "coriknl": + f.write("cpu_per_core=4\n\n") + f.write("node_cores=64\n\n") + else: + raise RuntimeError("Unsupported NERSC host") + f.write("nodes={}\n".format(nodes)) f.write("node_proc={}\n".format(nodeproc)) f.write("node_thread=$(( node_cores / node_proc ))\n") + f.write("node_depth=$(( cpu_per_core * node_thread ))\n") + f.write("procs=$(( nodes * node_proc ))\n\n") if openmp: f.write("export OMP_NUM_THREADS=${node_thread}\n") + f.write("export OMP_PLACES=threads\n") + f.write("export OMP_PROC_BIND=spread\n") else: f.write("export OMP_NUM_THREADS=1\n") f.write("\n") + runstr = "srun" if multiproc: runstr = "{} --cpu_bind=no".format(runstr) f.write("export KMP_AFFINITY=disabled\n") f.write("\n") - f.write("run=\"{} -n ${{procs}} -N ${{nodes}} -c ${{node_thread}}\"\n\n".format(runstr)) + else: + runstr = "{} --cpu_bind=cores".format(runstr) + f.write("run=\"{} -n ${{procs}} -N ${{nodes}} -c ${{node_depth}}\"\n\n".format(runstr)) + f.write("now=`date +%Y%m%d-%H:%M:%S`\n") f.write("echo \"job datestamp = ${now}\"\n") f.write("log={}_${{now}}.log\n\n".format(logroot)) @@ -655,6 +687,7 @@ def nersc_shifter_job(host, path, img, specdata, specredux, desiroot, logroot, d f.write("#SBATCH --constraint=haswell\n") elif host == "coriknl": f.write("#SBATCH --constraint=knl,quad,cache\n") + f.write("#SBATCH --core-spec=4\n") f.write("#SBATCH --account=desi\n") f.write("#SBATCH --nodes={}\n".format(totalnodes)) f.write("#SBATCH --time={}\n".format(timestr)) @@ -665,29 +698,44 @@ def nersc_shifter_job(host, path, img, specdata, specredux, desiroot, logroot, d f.write("echo Starting slurm script at `date`\n\n") f.write("source {}\n\n".format(desisetup)) - f.write("node_cores=0\n") - f.write("if [ ${NERSC_HOST} = edison ]; then\n") - f.write(" module load shifter\n") - f.write(" node_cores=24\n") - f.write("else\n") - f.write(" node_cores=32\n") - f.write("fi\n") - f.write("\n") + f.write("# Set TMPDIR to be on the ramdisk\n") + f.write("export TMPDIR=/dev/shm\n\n") + + if host == "edison": + f.write("cpu_per_core=2\n\n") + f.write("node_cores=24\n\n") + elif host == "cori": + f.write("cpu_per_core=2\n\n") + f.write("node_cores=32\n\n") + elif host == "coriknl": + f.write("cpu_per_core=4\n\n") + f.write("node_cores=64\n\n") + else: + raise RuntimeError("Unsupported NERSC host") + f.write("nodes={}\n".format(nodes)) f.write("node_proc={}\n".format(nodeproc)) f.write("node_thread=$(( node_cores / node_proc ))\n") + f.write("node_depth=$(( cpu_per_core * node_thread ))\n") + f.write("procs=$(( nodes * node_proc ))\n\n") if openmp: f.write("export OMP_NUM_THREADS=${node_thread}\n") + f.write("export OMP_PLACES=threads\n") + f.write("export OMP_PROC_BIND=spread\n") else: f.write("export OMP_NUM_THREADS=1\n") f.write("\n") + runstr = "srun" if multiproc: runstr = "{} --cpu_bind=no".format(runstr) f.write("export KMP_AFFINITY=disabled\n") f.write("\n") - f.write("run=\"{} -n ${{procs}} -N ${{nodes}} -c ${{node_thread}} shifter\"\n\n".format(runstr)) + else: + runstr = "{} --cpu_bind=cores".format(runstr) + f.write("run=\"{} -n ${{procs}} -N ${{nodes}} -c ${{node_depth}} shifter\"\n\n".format(runstr)) + f.write("now=`date +%Y%m%d-%H:%M:%S`\n") f.write("echo \"job datestamp = ${now}\"\n") f.write("log={}_${{now}}.log\n\n".format(logroot)) diff --git a/py/desispec/pipeline/task.py b/py/desispec/pipeline/task.py index bca7d4609..f45bff95a 100644 --- a/py/desispec/pipeline/task.py +++ b/py/desispec/pipeline/task.py @@ -15,6 +15,8 @@ import sys import re +from redrock.external.desi import rrdesi + from desiutil.log import get_logger, DEBUG from ..util import option_list from ..parallel import default_nproc @@ -28,7 +30,7 @@ from ..scripts import stdstars from ..scripts import fluxcalibration as fluxcal from ..scripts import procexp -from ..scripts import zfind +#from ..scripts import zfind from .common import * from .graph import * @@ -96,7 +98,7 @@ def max_nproc(self): return 1 def task_time(self): - return 15 + return 30 def default_options(self): opts = {} @@ -299,7 +301,7 @@ def max_nproc(self): return 1 def task_time(self): - return 10 + return 20 def default_options(self): @@ -349,7 +351,7 @@ def max_nproc(self): return 20 def task_time(self): - return 20 + return 100 def default_options(self): @@ -457,7 +459,7 @@ def max_nproc(self): return 1 def task_time(self): - return 5 + return 10 def default_options(self): @@ -603,7 +605,7 @@ def max_nproc(self): return 1 def task_time(self): - return 5 + return 30 def default_options(self): @@ -667,7 +669,11 @@ def run(self, grph, task, opts, comm=None): options["skymodels"] = skyfiles options["fiberflats"] = flatfiles options["outfile"] = outfile - options["ncpu"] = str(default_nproc) + # Do not mix MPI and multiprocessing. We are already + # running this across many processes per node, so do + # not need multiprocessing. + #options["ncpu"] = str(default_nproc) + options["ncpu"] = "1" #- TODO: no QA for fitting standard stars yet options.update(opts) @@ -696,7 +702,7 @@ def max_nproc(self): return 1 def task_time(self): - return 5 + return 10 def default_options(self): opts = {} @@ -786,7 +792,7 @@ def max_nproc(self): return 1 def task_time(self): - return 5 + return 10 def default_options(self): opts = {} @@ -925,6 +931,99 @@ def run(self, grph, task, opts, comm=None): return +class WorkerRedrock(Worker): + """ + Use Redrock to classify spectra and compute redshifts. + """ + def __init__(self, opts): + self.nodes = 1 + self.nodeprocs = 1 + self.specpermin = 20 + if "nodes" in opts: + self.nodes = int(opts["nodes"]) + if "nodeprocs" in opts: + self.nodeprocs = int(opts["nodeprocs"]) + if "spec_per_minute" in opts: + self.specpermin = opts["spec_per_minute"] + super(Worker, self).__init__() + + def max_nproc(self): + return (self.nodes * self.nodeprocs) + + def max_nodes(self): + return self.nodes + + def node_procs(self): + return self.nodeprocs + + def spec_per_min(self): + return self.specpermin + + def task_time(self): + return 60 + + def default_options(self): + opts = {} + return opts + + + def run(self, grph, task, opts, comm=None): + """ + Run Redrock on a spectral group. + + Args: + grph (dict): pruned graph with this task and dependencies. + task (str): the name of this task. + opts (dict): options to use for this task. + comm (mpi4py.MPI.Comm): optional MPI communicator. + """ + nproc = 1 + rank = 0 + if comm is not None: + nproc = comm.size + rank = comm.rank + + log = get_logger() + + node = grph[task] + + spectra = "spectra-{}-{}".format(node["nside"], node["pixel"]) + specfile = graph_path(spectra) + + outfile = graph_path(task) + outdir = os.path.dirname(outfile) + details = os.path.join(outdir, "rrdetails_{}.h5".format(task)) + + options = {} + options["output"] = details + options["zbest"] = outfile + options.update(opts) + optarray = option_list(options) + optarray.append(specfile) + + # write out the equivalent commandline + if rank == 0: + com = ["RUN", "rrdesi_mpi"] + com.extend(optarray) + log.info(" ".join(com)) + + failcount = 0 + try: + rrdesi(options=optarray, comm=comm) + except: + failcount += 1 + sys.stdout.flush() + + if comm is not None: + failcount = comm.allreduce(failcount) + + if failcount > 0: + # all processes throw + raise RuntimeError("some redshifts failed for task " + "{}".format(task)) + + return + class WorkerNoop(Worker): """ diff --git a/py/desispec/scripts/extract.py b/py/desispec/scripts/extract.py index a9b84d70d..521a39086 100644 --- a/py/desispec/scripts/extract.py +++ b/py/desispec/scripts/extract.py @@ -5,6 +5,7 @@ from __future__ import absolute_import, division, print_function import sys +import traceback import os import re import os.path diff --git a/py/desispec/scripts/pipe_prod.py b/py/desispec/scripts/pipe_prod.py index dcff3498a..9d2de6d08 100644 --- a/py/desispec/scripts/pipe_prod.py +++ b/py/desispec/scripts/pipe_prod.py @@ -212,6 +212,14 @@ def parse(options=None): parser.add_argument("--nside", required=False, type=int, default=64, help="HEALPix nside value to use for spectral grouping.") + parser.add_argument("--redshift_nodes", required=False, type=int, + default=1, help="Number of nodes to use for redshift fitting " + "in a single worker.") + + parser.add_argument("--redshift_spec_per_minute", required=False, + type=int, default=80, help="Number of spectra that can be " + "processed in a minute on a single node using all templates.") + args = None if options is None: args = parser.parse_args() @@ -272,13 +280,6 @@ def main(args): raise RuntimeError("You must set DESI_ROOT in your environment") desiroot = os.environ["DESI_ROOT"] - # Add any extra options to the initial options.yaml file - - extra = {} - if args.starmodels is not None: - extra["Stdstars"] = {} - extra["Stdstars"]["starmodels"] = args.starmodels - # Check the machine limits we are using for this production nodecores = 0 @@ -313,6 +314,34 @@ def main(args): if shell_maxcores > 1: shell_mpi_run = "{}".format(args.shell_mpi_run) + # Add any extra options to the initial options.yaml file + + extra = {} + if args.starmodels is not None: + extra["Stdstars"] = {} + extra["Stdstars"]["starmodels"] = args.starmodels + + # + # Redrock runtime notes from 2017-09. All numbers rounded + # down to be as conservative as possible: + # Small test (2% DC, one day, one spectrograph) + # edison: 1 node per worker, 24 procs per node, + # ~3500 spectra in 30 min =~ 110 spec per node min + # knl: 1 node per worker, 32 procs per node (not using 64 + # due to memory constraint- will gain 2x after fix), + # ~800 spectra in 35 min =~ 20 spec per node min + # Medium test (2% DC, one day) + # edison: 2 nodes per worker, 24 procs per node, + # ~6000 spectra in 40 min =~ 75 spec per node min + # edison: 1 nodes per worker, 24 procs per node, + # ~3000 spectra in 30 min =~ 100 spec per node min + # + + extra["Redrock"] = {} + extra["Redrock"]["nodes"] = args.redshift_nodes + extra["Redrock"]["nodeprocs"] = nodecores // 2 + extra["Redrock"]["spec_per_minute"] = args.redshift_spec_per_minute + # Select our spectrographs specs = [ x for x in range(10) ] @@ -379,6 +408,7 @@ def main(args): optfile = os.path.join(rundir, "options.yaml") opts = pipe.yaml_read(optfile) + workerhandles = {} workermax = {} workertime = {} workernames = {} @@ -388,6 +418,7 @@ def main(args): opts["{}_worker_opts".format(step)]) workermax[step] = worker.max_nproc() workertime[step] = worker.task_time() + workerhandles[step] = worker print(" {} : {} processes per task".format(step, workermax[step])) # create scripts for processing @@ -519,7 +550,7 @@ def main(args): taskproc = workermax["extract"] taskmin = workertime["extract"] - step_threads = 1 + step_threads = 2 step_mp = 1 nt = None first = "extract" @@ -549,7 +580,7 @@ def main(args): tasktime += workertime["fluxcal"] tasktime += workertime["calibrate"] - step_threads = 1 + step_threads = 2 step_mp = 1 nt = None first = "fiberflat" @@ -570,10 +601,15 @@ def main(args): # Make spectral groups. The groups are distributed, and we use # approximately 5 spectra per process. We also use one process - # per two cores. + # per two cores for more memory. + + # On KNL, the timing for ~5 spectra per process is about 1.5 hours. + # we use that empirical metric here to estimate the run time with + # some margin. ngroup = len(allpix.keys()) + spectime = 120 specprocs = ngroup // 5 specnodeprocs = nodecores // 2 specnodes = specprocs // specnodeprocs @@ -581,6 +617,10 @@ def main(args): specnodes = 1 specprocs = specnodes * specnodeprocs + specqueue = args.nersc_queue + if spectime > 30 and specqueue == "debug": + specqueue = "regular" + rundir = io.get_pipe_rundir() scrdir = os.path.join(rundir, io.get_pipe_scriptdir()) logdir = os.path.join(rundir, io.get_pipe_logdir()) @@ -594,9 +634,9 @@ def main(args): nersc_path = os.path.join(scrdir, "spectra.slurm") nersc_log = os.path.join(logdir, "spectra_slurm") pipe.nersc_job(args.nersc_host, nersc_path, nersc_log, setupfile, - speccom, nodes=specnodes, nodeproc=specnodeprocs, minutes=30, + speccom, nodes=specnodes, nodeproc=specnodeprocs, minutes=spectime, multisrun=False, openmp=True, multiproc=False, - queue=args.nersc_queue, jobname="groupspectra") + queue=specqueue, jobname="groupspectra") if args.shifter is not None: nersc_path = os.path.join(scrdir, "spectra_shifter.slurm") @@ -605,33 +645,55 @@ def main(args): rawdir, specdir, desiroot, nersc_log, setupfile, speccom, nodes=specnodes, nodeproc=specnodeprocs, minutes=30, multisrun=False, openmp=False, multiproc=False, - queue=args.nersc_queue, jobname="groupspectra") + queue=specqueue, jobname="groupspectra") - # redshift fitting. + # Redshift fitting. Use estimated spectra per node minute. - ntask = len(allpix.keys()) + red_worker_nodes = workerhandles["redshift"].max_nodes() + red_nodeprocs = workerhandles["redshift"].node_procs() + red_spec_per_node_min = workerhandles["redshift"].spec_per_min() + red_totalspec = np.sum([ y for x, y in allpix.items() ]) - taskproc = workermax["redshift"] - tasktime = workertime["redshift"] - step_threads = 1 - nt = None - first = "redshift" - last = "redshift" + red_runtime = workertime["redshift"] - # FIXME: we should really have the worker tell us more information - # about whether it uses multiprocessing or can use multiple nodes - # per task. For now, we hard-code a switch here for redmonster vs. - # redrock. + ntask = len(allpix.keys()) + + nworker = 1 + red_totalspec // (red_runtime * red_worker_nodes * + red_spec_per_node_min) + + red_nodes = nworker * red_worker_nodes - step_mp = 1 - if workernames["redshift"] == "redrock": - step_mp = nodecores + red_queue = args.nersc_queue + if red_runtime > 30 and red_queue == "debug": + red_queue = "regular" - scr_shell, scr_slurm, scr_shifter = compute_step(args.shifter, - rawdir, specdir, desiroot, setupfile, first, last, specs, nt, - ntask, taskproc, tasktime, shell_mpi_run, shell_maxcores, 1, - args.nersc_host, maxnodes, nodecores, step_threads, step_mp, - queuethresh, queue=args.nersc_queue) + rundir = io.get_pipe_rundir() + scrdir = os.path.join(rundir, io.get_pipe_scriptdir()) + logdir = os.path.join(rundir, io.get_pipe_logdir()) + + redcom = ["desi_pipe_run_mpi --first redshift --last redshift"] + + shell_path = os.path.join(scrdir, "redshift.sh") + shell_log = os.path.join(logdir, "redshift_sh") + pipe.shell_job(shell_path, shell_log, setupfile, redcom, + comrun=shell_mpi_run, mpiprocs=shell_maxcores, threads=1) + + nersc_path = os.path.join(scrdir, "redshift.slurm") + nersc_log = os.path.join(logdir, "redshift_slurm") + pipe.nersc_job(args.nersc_host, nersc_path, nersc_log, setupfile, + redcom, nodes=red_nodes, nodeproc=red_nodeprocs, + minutes=red_runtime, multisrun=False, openmp=True, multiproc=False, + queue=red_queue, jobname="redshift") + + if args.shifter is not None: + nersc_path = os.path.join(scrdir, "redshift_shifter.slurm") + nersc_log = os.path.join(logdir, "redshift_shifter") + pipe.nersc_shifter_job(args.nersc_host, nersc_path, args.shifter, + rawdir, specdir, desiroot, nersc_log, setupfile, redcom, + nodes=red_nodes, nodeproc=red_nodeprocs, minutes=red_runtime, + multisrun=False, openmp=True, multiproc=False, + queue=red_queue, jobname="redshift") + # Make high-level shell scripts which run or submit the steps diff --git a/py/desispec/scripts/specex.py b/py/desispec/scripts/specex.py index 1305433c4..ad83a12c3 100644 --- a/py/desispec/scripts/specex.py +++ b/py/desispec/scripts/specex.py @@ -360,7 +360,8 @@ def mean_psf(inputs, output): if entry == 0: log.info("for fiber bundle {}, {} valid PSFs".format(bundle, ok.size)) - fibers = np.arange(bundle*nfibers_per_bundle,(bundle+1)*nfibers_per_bundle) + fibers = np.arange(bundle*nfibers_per_bundle, (bundle+1)*nfibers_per_bundle, + dtype=np.int32) if ok.size >= 2: # use median for f in fibers : output_coeff[f] = np.median(coeff[ok,f],axis=0) @@ -390,3 +391,4 @@ def mean_psf(inputs, output): log.info("wrote {}".format(output)) return +