Skip to content

Commit

Permalink
Updates based on testing in Qiita-RC (#112)
Browse files Browse the repository at this point in the history
* Updates based on testing in Qiita-RC

* Threads per fastp/minimap2 processes parameterized

* Updates based on feedback

* flake8
  • Loading branch information
charles-cowart authored Dec 7, 2023
1 parent ae0fe27 commit b187513
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 121 deletions.
82 changes: 39 additions & 43 deletions sequence_processing_pipeline/NuQCJob.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from metapool import KLSampleSheet, validate_and_scrub_sample_sheet
from os import stat, makedirs
from os import stat, makedirs, rename
from os.path import join, basename, dirname, exists
from sequence_processing_pipeline.Job import Job
from sequence_processing_pipeline.PipelineError import PipelineError
Expand All @@ -11,7 +11,6 @@
from jinja2 import Environment, PackageLoader
import glob
import re
from json import dumps
from sys import executable


Expand All @@ -23,7 +22,8 @@ def __init__(self, fastq_root_dir, output_path, sample_sheet_path,
minimap_database_paths, queue_name, node_count,
wall_time_limit, jmem, fastp_path, minimap2_path,
samtools_path, modules_to_load, qiita_job_id, pool_size,
max_array_length, known_adapters_path, bucket_size=8):
max_array_length, known_adapters_path, bucket_size=8,
length_limit=100, cores_per_task=4):
"""
Submit a slurm job where the contents of fastq_root_dir are processed
using fastp, minimap2, and samtools. Human-genome sequences will be
Expand All @@ -44,7 +44,8 @@ def __init__(self, fastq_root_dir, output_path, sample_sheet_path,
:param pool_size: The number of jobs to process concurrently.
:param known_adapters_path: The path to an .fna file of known adapters.
:param bucket_size: the size in GB of each bucket to process
:param length_limit: reads shorter than this will be discarded.
:param cores_per_task: Number of CPU cores per node to request.
"""
super().__init__(fastq_root_dir,
output_path,
Expand Down Expand Up @@ -80,14 +81,17 @@ def __init__(self, fastq_root_dir, output_path, sample_sheet_path,
self.counts = {}
self.known_adapters_path = known_adapters_path
self.max_file_list_size_in_gb = bucket_size
self.length_limit = length_limit
self.cores_per_task = cores_per_task
self.temp_dir = join(self.output_path, 'tmp')
makedirs(self.temp_dir, exist_ok=True)
self.batch_prefix = "hd-split-pangenome"

self.batch_prefix = f"hds-{self.qiita_job_id}"
self.minimum_bytes = 3100
self.fastq_regex = re.compile(r'^(.*)_S\d{1,4}_L\d{3}_R\d_\d{3}'
r'\.fastq\.gz$')
self.html_regex = re.compile(r'^(.*)_S\d{1,4}_L\d{3}_R\d_\d{3}\.html$')
self.json_regex = re.compile(r'^(.*)_S\d{1,4}_L\d{3}_R\d_\d{3}\.json')
self.json_regex = re.compile(r'^(.*)_S\d{1,4}_L\d{3}_R\d_\d{3}\.json$')

self._validate_project_data()

Expand Down Expand Up @@ -157,7 +161,17 @@ def _move_helper(self, completed_files, regex, samples_in_project, dst):
# check if found substring is a member of this
# project. Note sample-name != sample-id
if substr[1] in samples_in_project:
files_to_move.append(fp)
if fp.endswith('.fastq.gz'):
# legacy QC'ed files were always denoted with
# 'trimmed' to distinguish them from raw files.
renamed_fp = fp.replace('.fastq.gz',
'.trimmed.fastq.gz')
rename(fp, renamed_fp)
# move file into destination w/new filename
files_to_move.append(renamed_fp)
else:
# move file into destination folder w/no namechange.
files_to_move.append(fp)

for fp in files_to_move:
move(fp, dst)
Expand All @@ -175,7 +189,7 @@ def run(self, callback=None):
self.counts[self.batch_prefix] = batch_count

export_params = [f"MMI={self.minimap_database_paths}",
f"PREFIX={self.batch_prefix}",
f"PREFIX={batch_location}",
f"OUTPUT={self.output_path}",
f"TMPDIR={self.temp_dir}"]

Expand Down Expand Up @@ -261,40 +275,15 @@ def run(self, callback=None):
empty_files_directory,
self.minimum_bytes)

def _get_failed_indexes(self, project_name, job_id):
pattern = f"{self.temp_dir}/{self.batch_prefix}.*.completed"
def _confirm_job_completed(self):
# since NuQCJob processes across all projects in a run, there isn't
# a need to iterate by project_name and job_id.
pattern = f"{self.output_path}/hds-{self.qiita_job_id}.*.completed"
completed_files = list(glob.glob(pattern))
completed_indexes = []
regex = r'^%s/%s.%s_([0-9]+).completed$' % (self.temp_dir,
self.batch_prefix,
str(job_id))
array_ids = re.compile(regex)

for completed_file in completed_files:
match = array_ids.search(completed_file)
if match is None:
raise PipelineError("Malformed completed file")
else:
id_ = int(match.groups(0)[0])
completed_indexes.append(id_)

# a successfully completed job array should have a list of array
# numbers from 0 - len(self.commands).
all_indexes = list(range(1, self.counts[self.batch_prefix]))
failed_indexes = sorted(set(all_indexes) - set(completed_indexes))

# generate log-file here instead of in run() where it can be
# unittested more easily.
log_fp = join(self.output_path,
'logs',
f'failed_indexes_{job_id}.json')

if failed_indexes:
with open(log_fp, 'w') as f:
f.write(dumps({'job_id': job_id,
'failed_indexes': failed_indexes}, indent=2))
if completed_files:
return True

return failed_indexes
return False

def _process_sample_sheet(self):
sheet = KLSampleSheet(self.sample_sheet_path)
Expand Down Expand Up @@ -362,12 +351,19 @@ def _generate_job_script(self):
# should be 4 * 24 * 60 = 4 days
wall_time_limit=self.wall_time_limit,
mem_in_gb=self.jmem,
node_count=1,
cores_per_task=4,
# Note NuQCJob now maps node_count to
# SLURM -N parameter to act like other
# Job classes.
# self.node_count should be 1
node_count=self.node_count,
# cores-per-task (-c) should be 4
cores_per_task=self.cores_per_task,
knwn_adpt_path=self.known_adapters_path,
output_path=self.output_path,
html_path=html_path,
json_path=json_path,
demux_path=demux_path))
demux_path=demux_path,
temp_dir=self.temp_dir,
length_limit=self.length_limit))

return job_script_path
30 changes: 15 additions & 15 deletions sequence_processing_pipeline/templates/nuqc_job.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
#SBATCH --time {{wall_time_limit}}
#SBATCH --mem {{mem_in_gb}}G
#SBATCH -N {{node_count}}
### Note cores_per_task maps to fastp & minimap2 thread counts
### as well as sbatch -c. demux threads remains fixed at 1.
### Note -c set to 4 and thread counts set to 7 during testing.
#SBATCH -c {{cores_per_task}}

if [[ -z "${SLURM_ARRAY_TASK_ID}" ]]; then
Expand Down Expand Up @@ -32,11 +35,6 @@ if [[ -z ${OUTPUT} ]]; then
exit 1
fi

if [[ -z ${TMPDIR} ]]; then
echo "TMPDIR is not set"
exit 1
fi

echo "MMI is ${MMI}"

conda activate human-depletion
Expand All @@ -49,16 +47,16 @@ hostname
echo ${SLURM_JOBID} ${SLURM_ARRAY_TASK_ID}
### output_path = output_path passed to Job objects + 'NuQCJob'
### e.g.: working-directory/ConvertJob, working-directory/QCJob...
cd ${TMPDIR}
cd {{output_path}}

### set a temp directory, make a new unique one under it and
### make sure we clean up as we're dumping to shm
### DO NOT do this casually. Only do a clean up like this if
### you know for sure TMPDIR is what you want.

mkdir -p ${TMPDIR}
export TMPDIR=${TMPDIR}
export TMPDIR=$(mktemp -d)
export TMPDIR={{temp_dir}}/
# don't use mktemp -d to create a random temp dir.
# the one passed is unique already.
echo $TMPDIR

mkdir -p {{html_path}}
Expand All @@ -71,7 +69,7 @@ function cleanup {
}
trap cleanup EXIT

export FILES=$(pwd)/$(printf "%s-%d" ${PREFIX} ${SLURM_ARRAY_TASK_ID})
export FILES=$(printf "%s-%d" ${PREFIX} ${SLURM_ARRAY_TASK_ID})
if [[ ! -f ${FILES} ]]; then
logger ${FILES} not found
exit 1
Expand All @@ -92,16 +90,18 @@ do

# for now just make sure each file is saved and we can read the data inside
# to sort them out later.
html_name=$(echo "$r1_name.html")
json_name=$(echo "$r1_name.json")

s_name=$(basename "${r1}" | sed -r 's/\.fastq\.gz//')
html_name=$(echo "$s_name.html")
json_name=$(echo "$s_name.json")

echo "${i} ${r1_name} ${r2_name} ${base}" >> ${TMPDIR}/id_map

fastp \
-l 45 \
-l {{length_limit}} \
-i ${r1} \
-I ${r2} \
-w 7 \
-w {{cores_per_task}} \
--adapter_fasta {{knwn_adpt_path}} \
--html {{html_path}}/${html_name} \
--json {{json_path}}/${json_name} \
Expand All @@ -113,7 +113,7 @@ function minimap2_runner () {
mmi=$1

echo "$(date) :: $(basename ${mmi})"
minimap2 -2 -ax sr -t 7 ${mmi} ${TMPDIR}/seqs.fastq | \
minimap2 -2 -ax sr -t {{cores_per_task}} ${mmi} ${TMPDIR}/seqs.fastq | \
samtools fastq -@ 1 -f 12 -F 256 > ${TMPDIR}/seqs_new.fastq
mv ${TMPDIR}/seqs_new.fastq ${TMPDIR}/seqs.fastq
}
Expand Down
Loading

0 comments on commit b187513

Please sign in to comment.