Skip to content

Commit

Permalink
Merge pull request #439 from desihub/redpipe
Browse files Browse the repository at this point in the history
Redrock pipeline integration
  • Loading branch information
sbailey authored Sep 21, 2017
2 parents dd2294f + 0dd7e78 commit 05fdbaa
Show file tree
Hide file tree
Showing 11 changed files with 306 additions and 73 deletions.
7 changes: 4 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ env:
# - NUMPY_VERSION=1.10
# - SCIPY_VERSION=0.16
- ASTROPY_VERSION=1.2.1
# - SPHINX_VERSION=1.6
- SPHINX_VERSION=1.5
- DESIUTIL_VERSION=1.9.5
- SPECLITE_VERSION=0.5
- SPECTER_VERSION=0.6.0
Expand All @@ -65,20 +65,21 @@ env:
- DESIMODEL_DATA=trunk
# - HARP_VERSION=1.0.1
# - SPECEX_VERSION=0.3.9
- REDROCK_VERSION=master
- MAIN_CMD='python setup.py'
# Additional conda channels to use.
- CONDA_CHANNELS="openastronomy"
# These packages will always be installed.
- CONDA_DEPENDENCIES="qt=4"
# These packages will only be installed if we really need them.
- CONDA_ALL_DEPENDENCIES="scipy matplotlib sqlalchemy coverage==3.7.1 pyyaml healpy"
- CONDA_ALL_DEPENDENCIES="scipy matplotlib sqlalchemy coverage==3.7.1 pyyaml healpy numba numpy"
# These packages will always be installed.
- PIP_DEPENDENCIES=""
# These packages will only be installed if we really need them.
- PIP_ALL_DEPENDENCIES="speclite==${SPECLITE_VERSION} coveralls"
# These pip packages need to be installed in a certain order, so we
# do that separately from the astropy/ci-helpers scripts.
- DESIHUB_PIP_DEPENDENCIES="desiutil=${DESIUTIL_VERSION} specter=${SPECTER_VERSION} desimodel=${DESIMODEL_VERSION}"
- DESIHUB_PIP_DEPENDENCIES="desiutil=${DESIUTIL_VERSION} specter=${SPECTER_VERSION} desimodel=${DESIMODEL_VERSION} redrock=${REDROCK_VERSION}"
# Debug the Travis install process.
- DEBUG=False
matrix:
Expand Down
6 changes: 3 additions & 3 deletions doc/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,9 @@
'matplotlib.cm',
'matplotlib.gridspec',
'matplotlib.collections',
'numpy',
'numpy.polynomial',
'numpy.polynomial.legendre',
# 'numpy',
# 'numpy.polynomial',
# 'numpy.polynomial.legendre',
'scipy',
'scipy.constants',
'scipy.interpolate',
Expand Down
2 changes: 1 addition & 1 deletion py/desispec/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
rawdata_root, specprod_root, validate_night,
get_pipe_plandir, get_pipe_rundir, get_pipe_scriptdir,
get_pipe_logdir, get_pipe_faildir, get_reduced_frames,
get_nights, find_exposure_night)
get_nights, find_exposure_night, get_pipe_redshiftdir)
from .params import read_params
from .qa import (read_qa_frame, read_qa_data, write_qa_frame, write_qa_brick,
load_qa_frame, write_qa_exposure, write_qa_prod)
Expand Down
12 changes: 12 additions & 0 deletions py/desispec/io/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,3 +458,15 @@ def get_pipe_faildir():
The name of the subdirectory.
"""
return "failed"


def get_pipe_redshiftdir():
"""
Return the name of the subdirectory containing pipeline redshift
log files.
Returns (str):
The name of the subdirectory.
"""
return "redshift"

2 changes: 1 addition & 1 deletion py/desispec/pipeline/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
"stdstars" : "Stdstars",
"fluxcal" : "Fluxcal",
"calibrate" : "Procexp",
"redshift" : "Redmonster"
"redshift" : "Redrock"
}
"""The default worker type for each pipeline step."""

Expand Down
8 changes: 8 additions & 0 deletions py/desispec/pipeline/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ class names and the values are dictionaries that are
if not os.path.isdir(faildir):
os.makedirs(faildir)

failreddir = os.path.join(faildir, io.get_pipe_redshiftdir())
if not os.path.isdir(failreddir):
os.makedirs(failreddir)

scriptdir = os.path.join(rundir, io.get_pipe_scriptdir())
if not os.path.isdir(scriptdir):
os.makedirs(scriptdir)
Expand All @@ -142,6 +146,10 @@ class names and the values are dictionaries that are
if not os.path.isdir(logdir):
os.makedirs(logdir)

logreddir = os.path.join(logdir, io.get_pipe_redshiftdir())
if not os.path.isdir(logreddir):
os.makedirs(logreddir)

optfile = os.path.join(rundir, "options.yaml")
if not os.path.isfile(optfile):
opts = default_options(extra=extra)
Expand Down
92 changes: 70 additions & 22 deletions py/desispec/pipeline/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def run_step(step, grph, opts, comm=None):
else:
group_ntask = 0
else:
if step == "zfind":
if step == "redshift":
# We load balance the spectra across process groups based
# on the number of targets per group. All groups with
# < taskproc targets are weighted the same.
Expand All @@ -163,11 +163,11 @@ def run_step(step, grph, opts, comm=None):
worksizes = [ taskproc if (x < taskproc) else x for x in spectrasizes ]

if rank == 0:
log.debug("zfind {} groups".format(ngroup))
log.debug("redshift {} groups".format(ngroup))
workstr = ""
for w in worksizes:
workstr = "{}{} ".format(workstr, w)
log.debug("zfind work sizes = {}".format(workstr))
log.debug("redshift work sizes = {}".format(workstr))

group_firsttask, group_ntask = dist_discrete(worksizes, ngroup, group)

Expand Down Expand Up @@ -215,8 +215,23 @@ def run_step(step, grph, opts, comm=None):
group_failcount += 1
continue

nfaildir = os.path.join(faildir, night)
nlogdir = os.path.join(logdir, night)
nfaildir = None
nlogdir = None
if step == "redshift":
ztype, nstr, pstr = graph_name_split(gname)
subdir = io.util.healpix_subdirectory(int(nstr),
int(pstr))
nfaildir = os.path.join(faildir, io.get_pipe_redshiftdir(),
subdir)
nlogdir = os.path.join(logdir, io.get_pipe_redshiftdir(), subdir)
if group_rank == 0:
if not os.path.isdir(nfaildir):
os.makedirs(nfaildir)
if not os.path.isdir(nlogdir):
os.makedirs(nlogdir)
else:
nfaildir = os.path.join(faildir, night)
nlogdir = os.path.join(logdir, night)

tgraph = graph_slice(grph, names=[tasks[t]], deps=True)
ffile = os.path.join(nfaildir, "{}_{}.yaml".format(step, tasks[t]))
Expand Down Expand Up @@ -565,37 +580,54 @@ def nersc_job(host, path, logroot, desisetup, commands, nodes=1, \
f.write("#SBATCH --constraint=haswell\n")
elif host == "coriknl":
f.write("#SBATCH --constraint=knl,quad,cache\n")
f.write("#SBATCH --core-spec=4\n")
f.write("#SBATCH --account=desi\n")
f.write("#SBATCH --nodes={}\n".format(totalnodes))
f.write("#SBATCH --time={}\n".format(timestr))
f.write("#SBATCH --job-name={}\n".format(jobname))
f.write("#SBATCH --output={}_%j.log\n".format(logroot))

f.write("echo Starting slurm script at `date`\n\n")
f.write("source {}\n\n".format(desisetup))

f.write("# Set TMPDIR to be on the ramdisk\n")
f.write("export TMPDIR=/dev/shm\n\n")
f.write("node_cores=0\n")
f.write("if [ ${NERSC_HOST} = edison ]; then\n")
f.write(" node_cores=24\n")
f.write("else\n")
f.write(" node_cores=32\n")
f.write("fi\n")
f.write("\n")

if host == "edison":
f.write("cpu_per_core=2\n\n")
f.write("node_cores=24\n\n")
elif host == "cori":
f.write("cpu_per_core=2\n\n")
f.write("node_cores=32\n\n")
elif host == "coriknl":
f.write("cpu_per_core=4\n\n")
f.write("node_cores=64\n\n")
else:
raise RuntimeError("Unsupported NERSC host")

f.write("nodes={}\n".format(nodes))
f.write("node_proc={}\n".format(nodeproc))
f.write("node_thread=$(( node_cores / node_proc ))\n")
f.write("node_depth=$(( cpu_per_core * node_thread ))\n")

f.write("procs=$(( nodes * node_proc ))\n\n")
if openmp:
f.write("export OMP_NUM_THREADS=${node_thread}\n")
f.write("export OMP_PLACES=threads\n")
f.write("export OMP_PROC_BIND=spread\n")
else:
f.write("export OMP_NUM_THREADS=1\n")
f.write("\n")

runstr = "srun"
if multiproc:
runstr = "{} --cpu_bind=no".format(runstr)
f.write("export KMP_AFFINITY=disabled\n")
f.write("\n")
f.write("run=\"{} -n ${{procs}} -N ${{nodes}} -c ${{node_thread}}\"\n\n".format(runstr))
else:
runstr = "{} --cpu_bind=cores".format(runstr)
f.write("run=\"{} -n ${{procs}} -N ${{nodes}} -c ${{node_depth}}\"\n\n".format(runstr))

f.write("now=`date +%Y%m%d-%H:%M:%S`\n")
f.write("echo \"job datestamp = ${now}\"\n")
f.write("log={}_${{now}}.log\n\n".format(logroot))
Expand Down Expand Up @@ -655,6 +687,7 @@ def nersc_shifter_job(host, path, img, specdata, specredux, desiroot, logroot, d
f.write("#SBATCH --constraint=haswell\n")
elif host == "coriknl":
f.write("#SBATCH --constraint=knl,quad,cache\n")
f.write("#SBATCH --core-spec=4\n")
f.write("#SBATCH --account=desi\n")
f.write("#SBATCH --nodes={}\n".format(totalnodes))
f.write("#SBATCH --time={}\n".format(timestr))
Expand All @@ -665,29 +698,44 @@ def nersc_shifter_job(host, path, img, specdata, specredux, desiroot, logroot, d
f.write("echo Starting slurm script at `date`\n\n")
f.write("source {}\n\n".format(desisetup))

f.write("node_cores=0\n")
f.write("if [ ${NERSC_HOST} = edison ]; then\n")
f.write(" module load shifter\n")
f.write(" node_cores=24\n")
f.write("else\n")
f.write(" node_cores=32\n")
f.write("fi\n")
f.write("\n")
f.write("# Set TMPDIR to be on the ramdisk\n")
f.write("export TMPDIR=/dev/shm\n\n")

if host == "edison":
f.write("cpu_per_core=2\n\n")
f.write("node_cores=24\n\n")
elif host == "cori":
f.write("cpu_per_core=2\n\n")
f.write("node_cores=32\n\n")
elif host == "coriknl":
f.write("cpu_per_core=4\n\n")
f.write("node_cores=64\n\n")
else:
raise RuntimeError("Unsupported NERSC host")

f.write("nodes={}\n".format(nodes))
f.write("node_proc={}\n".format(nodeproc))
f.write("node_thread=$(( node_cores / node_proc ))\n")
f.write("node_depth=$(( cpu_per_core * node_thread ))\n")

f.write("procs=$(( nodes * node_proc ))\n\n")
if openmp:
f.write("export OMP_NUM_THREADS=${node_thread}\n")
f.write("export OMP_PLACES=threads\n")
f.write("export OMP_PROC_BIND=spread\n")
else:
f.write("export OMP_NUM_THREADS=1\n")
f.write("\n")

runstr = "srun"
if multiproc:
runstr = "{} --cpu_bind=no".format(runstr)
f.write("export KMP_AFFINITY=disabled\n")
f.write("\n")
f.write("run=\"{} -n ${{procs}} -N ${{nodes}} -c ${{node_thread}} shifter\"\n\n".format(runstr))
else:
runstr = "{} --cpu_bind=cores".format(runstr)
f.write("run=\"{} -n ${{procs}} -N ${{nodes}} -c ${{node_depth}} shifter\"\n\n".format(runstr))

f.write("now=`date +%Y%m%d-%H:%M:%S`\n")
f.write("echo \"job datestamp = ${now}\"\n")
f.write("log={}_${{now}}.log\n\n".format(logroot))
Expand Down
Loading

0 comments on commit 05fdbaa

Please sign in to comment.