Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidate and derep bins #10

Merged
merged 10 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,21 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: docker://snakemake/snakemake:v5.24.2
- uses: docker://snakemake/snakemake:latest
- name: Dry Run with test data
run: |
docker run -v $PWD:/opt2 snakemake/snakemake:v5.24.2 \
docker run -v $PWD:/opt2 snakemake/snakemake:latest \
/opt2/metamorph run --input \
/opt2/.tests/WT_S1.R1.fastq.gz /opt2/.tests/WT_S1.R2.fastq.gz \
/opt2/.tests/WT_S1_R1.fastq.gz /opt2/.tests/WT_S1_R2.fastq.gz \
/opt2/.tests/WT_S2_R1.fastq.gz /opt2/.tests/WT_S2_R2.fastq.gz \
/opt2/.tests/WT_S3_1.fastq.gz /opt2/.tests/WT_S3_2.fastq.gz \
/opt2/.tests/WT_S4_R1.001.fastq.gz /opt2/.tests/WT_S4_R2.001.fastq.gz \
/opt2/.tests/WT_S3_R1.fastq.gz /opt2/.tests/WT_S3_R2.fastq.gz \
/opt2/.tests/WT_S4_R1.fastq.gz /opt2/.tests/WT_S4_R2.fastq.gz \
--output /opt2/output --mode local --dry-run
- name: View the pipeline config file
run: |
echo "Generated config file for pipeline...." && cat $PWD/output/config.json
- name: Lint Workflow
continue-on-error: true
run: |
docker run -v $PWD:/opt2 snakemake/snakemake:v5.24.2 snakemake --lint -s /opt2/output/workflow/Snakefile -d /opt2/output || \
docker run -v $PWD:/opt2 snakemake/snakemake:latest snakemake --lint -s /opt2/output/workflow/Snakefile -d /opt2/output || \
echo 'There may have been a few warnings or errors. Please read through the log to determine if its harmless.'
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
3 changes: 2 additions & 1 deletion config/cluster.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
"threads": 8,
"mem": "16g",
"partition": "norm",
"time": "0-04:00:00"
"time": "0-04:00:00",
"gres": "None"
},
"metawrap_genome_assembly": {
"threads": 24,
Expand Down
6 changes: 5 additions & 1 deletion config/images.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
{
"images": {
"metawrap": "docker://rroutsong/metamorph_metawrap:1.3.2",
"metawrap": "docker://rroutsong/metamorph_metawrap:0.0.2",
"metagenome": "docker://rroutsong/metamorph_metagenome:0.0.1"
},
"containers": {
"metawrap": "/data/OpenOmics/SIFs/metamorph_metawrap_1.3.2.sif",
"metagenome": "/data/OpenOmics/SIFs/metamorph_metagenome_0.0.1.sif"
}
}
5 changes: 2 additions & 3 deletions docker/metawrap/docker_1.3.2
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,5 @@ RUN echo ". ${CONDA_DIR}/etc/profile.d/conda.sh && conda activate metawrap-env"
ENV PATH="/home/metawrap/bin:$PATH"
COPY docker/metawrap/config-metawrap_1.3.2 /home/metaWRAP/bin/config-metawrap
RUN chmod u+x /home/metaWRAP/bin/config-metawrap
COPY docker/metawrap/ep.sh /ep.sh
RUN chmod u+x /ep.sh
ENTRYPOINT ["/ep.sh"]
RUN mamba run -n metawrap-env pip3 install drep
ENTRYPOINT ["tini", "--", "/bin/bash", "--rcfile", "/etc/skel/.bashrc"]
4 changes: 0 additions & 4 deletions docker/metawrap/ep.sh

This file was deleted.

44 changes: 37 additions & 7 deletions metamorph
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,16 @@ express or implied, including warranties of performance,
merchantability or fitness for any particular purpose.
Please cite the author and NIH resources like the "Biowulf Cluster"
in any work or product based on this material.

USAGE:
$ metamorph <command> [OPTIONS]
EXAMPLE:
$ metamorph run --input *.R?.fastq.gz --output output/
co-assembly:
$ metamorph run --coa --input *.R?.fastq.gz --output output/
$ metamorph run -C --input *.R?.fastq.gz --output output/

per-sample assembly:
$ metamorph run --input *.R?.fastq.gz --output output/
"""

# Python standard library
Expand All @@ -39,17 +45,16 @@ import argparse # potential python3 3rd party package, added in python/3.5

# Local imports
from src import version
from src.run import init, setup, bind, dryrun, runner
from src.shells import bash
from src.run import init, setup, bind, dryrun, runner, run_coa_pipeline
from src.utils import (
Colors,
err,
exists,
fatal,
hashed,
permissions,
check_cache,
require)
require
)


# Pipeline Metadata
Expand Down Expand Up @@ -127,6 +132,7 @@ def run(sub_args):
)

config['bindpaths'] = bindpaths
config['coassembly'] = sub_args.coa

# Step 4. Save config to output directory
with open(os.path.join(sub_args.output, 'config.json'), 'w') as fh:
Expand All @@ -152,8 +158,17 @@ def run(sub_args):
else:
log = os.path.join(sub_args.output, 'logfiles', 'master.log')
logfh = open(log, 'w')

if sub_args.coa:
cjob = run_coa_pipeline(sub_args.mode,
sub_args.output,
sub_args.singularity_cache,
logfh,
sub_args.tmp_dir,
",".join(bindpaths))

mjob = runner(mode = sub_args.mode,
outdir = sub_args.output,
outdir = sub_args.output,
# additional_bind_paths = all_bind_paths,
alt_cache = sub_args.singularity_cache,
threads = int(sub_args.threads),
Expand Down Expand Up @@ -361,13 +376,20 @@ def parsed_arguments(name, description):
--mode slurm \\
--dry-run

# Step 2B.) Run the {0} pipeline
# Step 3A.) Run the {0} pipeline in per-sample fashion
# The slurm mode will submit jobs to
# the cluster. It is recommended running
# the pipeline in this mode.
./{0} run --input .tests/*.R?.fastq.gz \\
--output /data/$USER/output \\
--mode slurm

# Step 3B.) Run the {0} pipeline in co-assembly fashion
# with slurm
./{0} run --coa --input .tests/*.R?.fastq.gz \\
--output /data/$USER/output \\
--mode slurm


{2}{3}Version:{4}
{1}
Expand Down Expand Up @@ -427,6 +449,14 @@ def parsed_arguments(name, description):
help = argparse.SUPPRESS
)

# a supported job scheduler, etc.
subparser_run.add_argument(
'-C', '--coa',
action="store_true",
required = False,
help = argparse.SUPPRESS
)

# Name of master job
subparser_run.add_argument(
'--job-name',
Expand Down
153 changes: 146 additions & 7 deletions src/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
# Python standard library
from __future__ import print_function
from shutil import copytree
from uuid import uuid4
from datetime import datetime
import os, re, json, sys, subprocess

# Local imports
Expand Down Expand Up @@ -329,11 +331,9 @@ def bind(sub_args, config):
if value not in bindpaths:
bindpaths.append(value)

rawdata_bind_paths = [os.path.realpath(p) for p in config['project']['datapath'].split(',')]
rawdata_bind_paths = [os.path.abspath(p) for p in config['project']['datapath'].split(',')]
working_directory = os.path.realpath(config['project']['workpath'])



return bindpaths


Expand Down Expand Up @@ -393,6 +393,8 @@ def add_user_information(config):
# username
config['project']['userhome'] = home
config['project']['username'] = username
dt = datetime.now().strftime("%m/%d/%Y")
config['project']['id'] = f"{uuid4()}_{dt}_metagenome"

return config

Expand Down Expand Up @@ -595,7 +597,7 @@ def dryrun(outdir, config='config.json', snakefile=os.path.join('workflow', 'Sna
# displays the true number of cores a rule
# will use, it uses the min(--cores CORES, N)
dryrun_output = subprocess.check_output([
'snakemake', '-npr',
'snakemake', '-np',
'-s', str(snakefile),
'--use-singularity',
'--rerun-incomplete',
Expand All @@ -620,9 +622,146 @@ def dryrun(outdir, config='config.json', snakefile=os.path.join('workflow', 'Sna
return dryrun_output


def runner(mode, outdir, alt_cache, logger, additional_bind_paths = None,
threads=2, jobname='pl:master', submission_script='run.sh',
tmp_dir = '/lscratch/$SLURM_JOBID/'):
def run_coa_pipeline(mode, outdir, alt_cache, logger, tmp_dir, additional_bind_paths):
# gzip compression speed: ~10.5 MB/s
# see: https://tukaani.org/lzma/benchmarks.html
# large fastq ~20GB
# 96 samples x 2 (R1 + R2) = 192 fastqs
# total size (high estimate): 192 fastqs * 20GB/fastq = 3,840 GB = 3,840,000 MB
# total compression time: 3,840,000 MB / (10.5 MB/s) = 365714 s = ~ 4 days
# pigz ~6.5 times faster than gzip
# https://github.com/neurolabusc/pigz-bench
# 3,840,000 MB / (10.5 MB/s * 6.5) = ~ 15.6 hours
# ~~~~~~~~~~~~~~

# Add additional singularity bind PATHs
# to mount the local filesystem to the
# containers filesystem, NOTE: these
# PATHs must be an absolute PATHs
outdir = os.path.abspath(outdir).strip()
# Add any default PATHs to bind to
# the container's filesystem, like
# tmp directories, /lscratch
addpaths = []
temp = os.path.dirname(tmp_dir.rstrip('/'))
if temp == os.sep:
temp = tmp_dir.rstrip('/')
if outdir not in additional_bind_paths.split(','):
addpaths.append(outdir)
if temp not in additional_bind_paths.split(','):
addpaths.append(temp)
bindpaths = ','.join(addpaths)

# Set ENV variable 'SINGULARITY_CACHEDIR'
# to output directory
my_env = {}; my_env.update(os.environ)
cache = os.path.join(outdir, ".singularity")
my_env['SINGULARITY_CACHEDIR'] = cache
if alt_cache:
# Override the pipeline's default
# cache location
my_env['SINGULARITY_CACHEDIR'] = alt_cache
cache = alt_cache

if additional_bind_paths:
# Add Bind PATHs for outdir and tmp dir
if bindpaths:
bindpaths = ",{}".format(bindpaths)
bindpaths = "{}{}".format(additional_bind_paths,bindpaths)

if not exists(os.path.join(outdir, 'logfiles')):
# Create directory for logfiles
os.makedirs(os.path.join(outdir, 'logfiles'))

# Create .singularity directory for
# installations of snakemake without
# setuid which creates a sandbox in
# the SINGULARITY_CACHEDIR
if not exists(cache):
# Create directory for sandbox
# and image layers
os.makedirs(cache, mode=0o755)

snakefile = os.path.abspath(os.path.join(__file__, '..', 'workflow', 'coa', 'Snakefile'))
slurm_dir = os.path.abspath(os.path.join(outdir, 'slurm'))
if not os.path.exists(slurm_dir):
os.mkdir(slurm_dir, mode=0o755)

CLUSTER_OPTS = "sbatch --gres {cluster.gres}" + \
" --cpus-per-task {cluster.threads}" + \
" -p {cluster.partition}" + \
" -t {cluster.time}" + \
" --mem {cluster.mem}" + \
" --job-name={params.rname}" + \
" -e $SLURM_DIR/slurm-%j_{params.rname}.out" + \
" -o $SLURM_DIR/slurm-%j_{params.rname}.out"

sbatch_params = [
"#SBATCH --cpus-per-task=28",
"#SBATCH --mem=64g",
"#SBATCH --time=10-00:00:00",
"#SBATCH -p norm",
"#SBATCH --parsable",
"#SBATCH -J \"metagenome_coa\"",
"#SBATCH --mail-type=BEGIN,END,FAIL",
"#SBATCH --output \"" + outdir + "/logfiles/snakemake.log\"",
"#SBATCH --error \"" + outdir + "/logfiles/snakemake.log\"",
]

jobscript = [
"#!/usr/bin/env bash",
"module load snakemake singularity",
"snakemake \\",
"--latency-wait 120 \\",
"-s " + snakefile + " \\",
"-d \"{outdir}\" \\",
"--use-singularity \\",
"--singularity-args \"'-B " + bindpaths + "'\" \\",
"--configfile=\"" + outdir + "/config.json\" \\",
"--printshellcmds \\",
"--cluster-config \"" + outdir + "/resources/cluster.json\" \\",
"--cluster \"" + CLUSTER_OPTS + "\" \\",
"--keep-going \\",
"--restart-times 3 \\",
"-j 500 \\",
"--rerun-incomplete --stats \"" + outdir + "/logfiles/runtime_statistics.json\" \\",
"--keep-remote \\",
"--local-cores 28 2>&1 | tee -a \"" + outdir + "/logfiles/master.log\"",
]

exec_sh = 'bash'
if mode == 'slurm':
exec_sh = 'sbatch'
jobscript = [jobscript[0], *sbatch_params, *jobscript[1:]]

coa_jobscript = os.path.join(slurm_dir, 'jobscript.sh')
with open(coa_jobscript, 'w') as fo:
fo.write("\n".join(jobscript))

coajob = subprocess.Popen([
exec_sh, str(coa_jobscript)
], cwd = outdir, stderr=subprocess.STDOUT, stdout=logger, env=my_env)

coajob.wait()
return coajob.returncode


try:
__job_name__ = 'metamorph_' + os.getlogin() + ':master'
except OSError:
__job_name__ = 'metamorph:master'

def runner(
mode,
outdir,
alt_cache,
logger,
additional_bind_paths = None,
threads=2,
jobname=__job_name__,
submission_script='run.sh',
tmp_dir = '/lscratch/$SLURM_JOBID/'
):
"""Runs the pipeline via selected executor: local or slurm.
If 'local' is selected, the pipeline is executed locally on a compute node/instance.
If 'slurm' is selected, jobs will be submited to the cluster using SLURM job scheduler.
Expand Down
Loading
Loading