Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redrock pipeline integration #439

Merged
merged 3 commits into from
Sep 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ env:
# - NUMPY_VERSION=1.10
# - SCIPY_VERSION=0.16
- ASTROPY_VERSION=1.2.1
# - SPHINX_VERSION=1.6
- SPHINX_VERSION=1.5
- DESIUTIL_VERSION=1.9.5
- SPECLITE_VERSION=0.5
- SPECTER_VERSION=0.6.0
Expand All @@ -65,20 +65,21 @@ env:
- DESIMODEL_DATA=trunk
# - HARP_VERSION=1.0.1
# - SPECEX_VERSION=0.3.9
- REDROCK_VERSION=master
- MAIN_CMD='python setup.py'
# Additional conda channels to use.
- CONDA_CHANNELS="openastronomy"
# These packages will always be installed.
- CONDA_DEPENDENCIES="qt=4"
# These packages will only be installed if we really need them.
- CONDA_ALL_DEPENDENCIES="scipy matplotlib sqlalchemy coverage==3.7.1 pyyaml healpy"
- CONDA_ALL_DEPENDENCIES="scipy matplotlib sqlalchemy coverage==3.7.1 pyyaml healpy numba numpy"
# These packages will always be installed.
- PIP_DEPENDENCIES=""
# These packages will only be installed if we really need them.
- PIP_ALL_DEPENDENCIES="speclite==${SPECLITE_VERSION} coveralls"
# These pip packages need to be installed in a certain order, so we
# do that separately from the astropy/ci-helpers scripts.
- DESIHUB_PIP_DEPENDENCIES="desiutil=${DESIUTIL_VERSION} specter=${SPECTER_VERSION} desimodel=${DESIMODEL_VERSION}"
- DESIHUB_PIP_DEPENDENCIES="desiutil=${DESIUTIL_VERSION} specter=${SPECTER_VERSION} desimodel=${DESIMODEL_VERSION} redrock=${REDROCK_VERSION}"
# Debug the Travis install process.
- DEBUG=False
matrix:
Expand Down
6 changes: 3 additions & 3 deletions doc/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,9 @@
'matplotlib.cm',
'matplotlib.gridspec',
'matplotlib.collections',
'numpy',
'numpy.polynomial',
'numpy.polynomial.legendre',
# 'numpy',
# 'numpy.polynomial',
# 'numpy.polynomial.legendre',
'scipy',
'scipy.constants',
'scipy.interpolate',
Expand Down
2 changes: 1 addition & 1 deletion py/desispec/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
rawdata_root, specprod_root, validate_night,
get_pipe_plandir, get_pipe_rundir, get_pipe_scriptdir,
get_pipe_logdir, get_pipe_faildir, get_reduced_frames,
get_nights, find_exposure_night)
get_nights, find_exposure_night, get_pipe_redshiftdir)
from .params import read_params
from .qa import (read_qa_frame, read_qa_data, write_qa_frame, write_qa_brick,
load_qa_frame, write_qa_exposure, write_qa_prod)
Expand Down
12 changes: 12 additions & 0 deletions py/desispec/io/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,3 +458,15 @@ def get_pipe_faildir():
The name of the subdirectory.
"""
return "failed"


def get_pipe_redshiftdir():
"""
Return the name of the subdirectory containing pipeline redshift
log files.

Returns (str):
The name of the subdirectory.
"""
return "redshift"

2 changes: 1 addition & 1 deletion py/desispec/pipeline/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
"stdstars" : "Stdstars",
"fluxcal" : "Fluxcal",
"calibrate" : "Procexp",
"redshift" : "Redmonster"
"redshift" : "Redrock"
}
"""The default worker type for each pipeline step."""

Expand Down
8 changes: 8 additions & 0 deletions py/desispec/pipeline/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ class names and the values are dictionaries that are
if not os.path.isdir(faildir):
os.makedirs(faildir)

failreddir = os.path.join(faildir, io.get_pipe_redshiftdir())
if not os.path.isdir(failreddir):
os.makedirs(failreddir)

scriptdir = os.path.join(rundir, io.get_pipe_scriptdir())
if not os.path.isdir(scriptdir):
os.makedirs(scriptdir)
Expand All @@ -142,6 +146,10 @@ class names and the values are dictionaries that are
if not os.path.isdir(logdir):
os.makedirs(logdir)

logreddir = os.path.join(logdir, io.get_pipe_redshiftdir())
if not os.path.isdir(logreddir):
os.makedirs(logreddir)

optfile = os.path.join(rundir, "options.yaml")
if not os.path.isfile(optfile):
opts = default_options(extra=extra)
Expand Down
92 changes: 70 additions & 22 deletions py/desispec/pipeline/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def run_step(step, grph, opts, comm=None):
else:
group_ntask = 0
else:
if step == "zfind":
if step == "redshift":
# We load balance the spectra across process groups based
# on the number of targets per group. All groups with
# < taskproc targets are weighted the same.
Expand All @@ -163,11 +163,11 @@ def run_step(step, grph, opts, comm=None):
worksizes = [ taskproc if (x < taskproc) else x for x in spectrasizes ]

if rank == 0:
log.debug("zfind {} groups".format(ngroup))
log.debug("redshift {} groups".format(ngroup))
workstr = ""
for w in worksizes:
workstr = "{}{} ".format(workstr, w)
log.debug("zfind work sizes = {}".format(workstr))
log.debug("redshift work sizes = {}".format(workstr))

group_firsttask, group_ntask = dist_discrete(worksizes, ngroup, group)

Expand Down Expand Up @@ -215,8 +215,23 @@ def run_step(step, grph, opts, comm=None):
group_failcount += 1
continue

nfaildir = os.path.join(faildir, night)
nlogdir = os.path.join(logdir, night)
nfaildir = None
nlogdir = None
if step == "redshift":
ztype, nstr, pstr = graph_name_split(gname)
subdir = io.util.healpix_subdirectory(int(nstr),
int(pstr))
nfaildir = os.path.join(faildir, io.get_pipe_redshiftdir(),
subdir)
nlogdir = os.path.join(logdir, io.get_pipe_redshiftdir(), subdir)
if group_rank == 0:
if not os.path.isdir(nfaildir):
os.makedirs(nfaildir)
if not os.path.isdir(nlogdir):
os.makedirs(nlogdir)
else:
nfaildir = os.path.join(faildir, night)
nlogdir = os.path.join(logdir, night)

tgraph = graph_slice(grph, names=[tasks[t]], deps=True)
ffile = os.path.join(nfaildir, "{}_{}.yaml".format(step, tasks[t]))
Expand Down Expand Up @@ -565,37 +580,54 @@ def nersc_job(host, path, logroot, desisetup, commands, nodes=1, \
f.write("#SBATCH --constraint=haswell\n")
elif host == "coriknl":
f.write("#SBATCH --constraint=knl,quad,cache\n")
f.write("#SBATCH --core-spec=4\n")
f.write("#SBATCH --account=desi\n")
f.write("#SBATCH --nodes={}\n".format(totalnodes))
f.write("#SBATCH --time={}\n".format(timestr))
f.write("#SBATCH --job-name={}\n".format(jobname))
f.write("#SBATCH --output={}_%j.log\n".format(logroot))

f.write("echo Starting slurm script at `date`\n\n")
f.write("source {}\n\n".format(desisetup))

f.write("# Set TMPDIR to be on the ramdisk\n")
f.write("export TMPDIR=/dev/shm\n\n")
f.write("node_cores=0\n")
f.write("if [ ${NERSC_HOST} = edison ]; then\n")
f.write(" node_cores=24\n")
f.write("else\n")
f.write(" node_cores=32\n")
f.write("fi\n")
f.write("\n")

if host == "edison":
f.write("cpu_per_core=2\n\n")
f.write("node_cores=24\n\n")
elif host == "cori":
f.write("cpu_per_core=2\n\n")
f.write("node_cores=32\n\n")
elif host == "coriknl":
f.write("cpu_per_core=4\n\n")
f.write("node_cores=64\n\n")
else:
raise RuntimeError("Unsupported NERSC host")

f.write("nodes={}\n".format(nodes))
f.write("node_proc={}\n".format(nodeproc))
f.write("node_thread=$(( node_cores / node_proc ))\n")
f.write("node_depth=$(( cpu_per_core * node_thread ))\n")

f.write("procs=$(( nodes * node_proc ))\n\n")
if openmp:
f.write("export OMP_NUM_THREADS=${node_thread}\n")
f.write("export OMP_PLACES=threads\n")
f.write("export OMP_PROC_BIND=spread\n")
else:
f.write("export OMP_NUM_THREADS=1\n")
f.write("\n")

runstr = "srun"
if multiproc:
runstr = "{} --cpu_bind=no".format(runstr)
f.write("export KMP_AFFINITY=disabled\n")
f.write("\n")
f.write("run=\"{} -n ${{procs}} -N ${{nodes}} -c ${{node_thread}}\"\n\n".format(runstr))
else:
runstr = "{} --cpu_bind=cores".format(runstr)
f.write("run=\"{} -n ${{procs}} -N ${{nodes}} -c ${{node_depth}}\"\n\n".format(runstr))

f.write("now=`date +%Y%m%d-%H:%M:%S`\n")
f.write("echo \"job datestamp = ${now}\"\n")
f.write("log={}_${{now}}.log\n\n".format(logroot))
Expand Down Expand Up @@ -655,6 +687,7 @@ def nersc_shifter_job(host, path, img, specdata, specredux, desiroot, logroot, d
f.write("#SBATCH --constraint=haswell\n")
elif host == "coriknl":
f.write("#SBATCH --constraint=knl,quad,cache\n")
f.write("#SBATCH --core-spec=4\n")
f.write("#SBATCH --account=desi\n")
f.write("#SBATCH --nodes={}\n".format(totalnodes))
f.write("#SBATCH --time={}\n".format(timestr))
Expand All @@ -665,29 +698,44 @@ def nersc_shifter_job(host, path, img, specdata, specredux, desiroot, logroot, d
f.write("echo Starting slurm script at `date`\n\n")
f.write("source {}\n\n".format(desisetup))

f.write("node_cores=0\n")
f.write("if [ ${NERSC_HOST} = edison ]; then\n")
f.write(" module load shifter\n")
f.write(" node_cores=24\n")
f.write("else\n")
f.write(" node_cores=32\n")
f.write("fi\n")
f.write("\n")
f.write("# Set TMPDIR to be on the ramdisk\n")
f.write("export TMPDIR=/dev/shm\n\n")

if host == "edison":
f.write("cpu_per_core=2\n\n")
f.write("node_cores=24\n\n")
elif host == "cori":
f.write("cpu_per_core=2\n\n")
f.write("node_cores=32\n\n")
elif host == "coriknl":
f.write("cpu_per_core=4\n\n")
f.write("node_cores=64\n\n")
else:
raise RuntimeError("Unsupported NERSC host")

f.write("nodes={}\n".format(nodes))
f.write("node_proc={}\n".format(nodeproc))
f.write("node_thread=$(( node_cores / node_proc ))\n")
f.write("node_depth=$(( cpu_per_core * node_thread ))\n")

f.write("procs=$(( nodes * node_proc ))\n\n")
if openmp:
f.write("export OMP_NUM_THREADS=${node_thread}\n")
f.write("export OMP_PLACES=threads\n")
f.write("export OMP_PROC_BIND=spread\n")
else:
f.write("export OMP_NUM_THREADS=1\n")
f.write("\n")

runstr = "srun"
if multiproc:
runstr = "{} --cpu_bind=no".format(runstr)
f.write("export KMP_AFFINITY=disabled\n")
f.write("\n")
f.write("run=\"{} -n ${{procs}} -N ${{nodes}} -c ${{node_thread}} shifter\"\n\n".format(runstr))
else:
runstr = "{} --cpu_bind=cores".format(runstr)
f.write("run=\"{} -n ${{procs}} -N ${{nodes}} -c ${{node_depth}} shifter\"\n\n".format(runstr))

f.write("now=`date +%Y%m%d-%H:%M:%S`\n")
f.write("echo \"job datestamp = ${now}\"\n")
f.write("log={}_${{now}}.log\n\n".format(logroot))
Expand Down
Loading