From b1875139ce34cca256958cc6dc4cd74eb6e6a33f Mon Sep 17 00:00:00 2001 From: Charles Cowart <42684307+charles-cowart@users.noreply.github.com> Date: Thu, 7 Dec 2023 14:09:14 -0800 Subject: [PATCH] Updates based on testing in Qiita-RC (#112) * Updates based on testing in Qiita-RC * Threads per fastp/minimap2 processes parameterized * Updates based on feedback * flake8 --- sequence_processing_pipeline/NuQCJob.py | 82 +++++++------ .../templates/nuqc_job.sh | 30 ++--- .../tests/test_NuQCJob.py | 110 ++++++++---------- 3 files changed, 101 insertions(+), 121 deletions(-) diff --git a/sequence_processing_pipeline/NuQCJob.py b/sequence_processing_pipeline/NuQCJob.py index c33e8a43..dd8f0b96 100644 --- a/sequence_processing_pipeline/NuQCJob.py +++ b/sequence_processing_pipeline/NuQCJob.py @@ -1,5 +1,5 @@ from metapool import KLSampleSheet, validate_and_scrub_sample_sheet -from os import stat, makedirs +from os import stat, makedirs, rename from os.path import join, basename, dirname, exists from sequence_processing_pipeline.Job import Job from sequence_processing_pipeline.PipelineError import PipelineError @@ -11,7 +11,6 @@ from jinja2 import Environment, PackageLoader import glob import re -from json import dumps from sys import executable @@ -23,7 +22,8 @@ def __init__(self, fastq_root_dir, output_path, sample_sheet_path, minimap_database_paths, queue_name, node_count, wall_time_limit, jmem, fastp_path, minimap2_path, samtools_path, modules_to_load, qiita_job_id, pool_size, - max_array_length, known_adapters_path, bucket_size=8): + max_array_length, known_adapters_path, bucket_size=8, + length_limit=100, cores_per_task=4): """ Submit a slurm job where the contents of fastq_root_dir are processed using fastp, minimap2, and samtools. Human-genome sequences will be @@ -44,7 +44,8 @@ def __init__(self, fastq_root_dir, output_path, sample_sheet_path, :param pool_size: The number of jobs to process concurrently. :param known_adapters_path: The path to an .fna file of known adapters. :param bucket_size: the size in GB of each bucket to process - + :param length_limit: reads shorter than this will be discarded. + :param cores_per_task: Number of CPU cores per node to request. """ super().__init__(fastq_root_dir, output_path, @@ -80,14 +81,17 @@ def __init__(self, fastq_root_dir, output_path, sample_sheet_path, self.counts = {} self.known_adapters_path = known_adapters_path self.max_file_list_size_in_gb = bucket_size + self.length_limit = length_limit + self.cores_per_task = cores_per_task self.temp_dir = join(self.output_path, 'tmp') makedirs(self.temp_dir, exist_ok=True) - self.batch_prefix = "hd-split-pangenome" + + self.batch_prefix = f"hds-{self.qiita_job_id}" self.minimum_bytes = 3100 self.fastq_regex = re.compile(r'^(.*)_S\d{1,4}_L\d{3}_R\d_\d{3}' r'\.fastq\.gz$') self.html_regex = re.compile(r'^(.*)_S\d{1,4}_L\d{3}_R\d_\d{3}\.html$') - self.json_regex = re.compile(r'^(.*)_S\d{1,4}_L\d{3}_R\d_\d{3}\.json') + self.json_regex = re.compile(r'^(.*)_S\d{1,4}_L\d{3}_R\d_\d{3}\.json$') self._validate_project_data() @@ -157,7 +161,17 @@ def _move_helper(self, completed_files, regex, samples_in_project, dst): # check if found substring is a member of this # project. Note sample-name != sample-id if substr[1] in samples_in_project: - files_to_move.append(fp) + if fp.endswith('.fastq.gz'): + # legacy QC'ed files were always denoted with + # 'trimmed' to distinguish them from raw files. + renamed_fp = fp.replace('.fastq.gz', + '.trimmed.fastq.gz') + rename(fp, renamed_fp) + # move file into destination w/new filename + files_to_move.append(renamed_fp) + else: + # move file into destination folder w/no namechange. + files_to_move.append(fp) for fp in files_to_move: move(fp, dst) @@ -175,7 +189,7 @@ def run(self, callback=None): self.counts[self.batch_prefix] = batch_count export_params = [f"MMI={self.minimap_database_paths}", - f"PREFIX={self.batch_prefix}", + f"PREFIX={batch_location}", f"OUTPUT={self.output_path}", f"TMPDIR={self.temp_dir}"] @@ -261,40 +275,15 @@ def run(self, callback=None): empty_files_directory, self.minimum_bytes) - def _get_failed_indexes(self, project_name, job_id): - pattern = f"{self.temp_dir}/{self.batch_prefix}.*.completed" + def _confirm_job_completed(self): + # since NuQCJob processes across all projects in a run, there isn't + # a need to iterate by project_name and job_id. + pattern = f"{self.output_path}/hds-{self.qiita_job_id}.*.completed" completed_files = list(glob.glob(pattern)) - completed_indexes = [] - regex = r'^%s/%s.%s_([0-9]+).completed$' % (self.temp_dir, - self.batch_prefix, - str(job_id)) - array_ids = re.compile(regex) - - for completed_file in completed_files: - match = array_ids.search(completed_file) - if match is None: - raise PipelineError("Malformed completed file") - else: - id_ = int(match.groups(0)[0]) - completed_indexes.append(id_) - - # a successfully completed job array should have a list of array - # numbers from 0 - len(self.commands). - all_indexes = list(range(1, self.counts[self.batch_prefix])) - failed_indexes = sorted(set(all_indexes) - set(completed_indexes)) - - # generate log-file here instead of in run() where it can be - # unittested more easily. - log_fp = join(self.output_path, - 'logs', - f'failed_indexes_{job_id}.json') - - if failed_indexes: - with open(log_fp, 'w') as f: - f.write(dumps({'job_id': job_id, - 'failed_indexes': failed_indexes}, indent=2)) + if completed_files: + return True - return failed_indexes + return False def _process_sample_sheet(self): sheet = KLSampleSheet(self.sample_sheet_path) @@ -362,12 +351,19 @@ def _generate_job_script(self): # should be 4 * 24 * 60 = 4 days wall_time_limit=self.wall_time_limit, mem_in_gb=self.jmem, - node_count=1, - cores_per_task=4, + # Note NuQCJob now maps node_count to + # SLURM -N parameter to act like other + # Job classes. + # self.node_count should be 1 + node_count=self.node_count, + # cores-per-task (-c) should be 4 + cores_per_task=self.cores_per_task, knwn_adpt_path=self.known_adapters_path, output_path=self.output_path, html_path=html_path, json_path=json_path, - demux_path=demux_path)) + demux_path=demux_path, + temp_dir=self.temp_dir, + length_limit=self.length_limit)) return job_script_path diff --git a/sequence_processing_pipeline/templates/nuqc_job.sh b/sequence_processing_pipeline/templates/nuqc_job.sh index 163b1964..df8b39bb 100644 --- a/sequence_processing_pipeline/templates/nuqc_job.sh +++ b/sequence_processing_pipeline/templates/nuqc_job.sh @@ -5,6 +5,9 @@ #SBATCH --time {{wall_time_limit}} #SBATCH --mem {{mem_in_gb}}G #SBATCH -N {{node_count}} +### Note cores_per_task maps to fastp & minimap2 thread counts +### as well as sbatch -c. demux threads remains fixed at 1. +### Note -c set to 4 and thread counts set to 7 during testing. #SBATCH -c {{cores_per_task}} if [[ -z "${SLURM_ARRAY_TASK_ID}" ]]; then @@ -32,11 +35,6 @@ if [[ -z ${OUTPUT} ]]; then exit 1 fi -if [[ -z ${TMPDIR} ]]; then - echo "TMPDIR is not set" - exit 1 -fi - echo "MMI is ${MMI}" conda activate human-depletion @@ -49,16 +47,16 @@ hostname echo ${SLURM_JOBID} ${SLURM_ARRAY_TASK_ID} ### output_path = output_path passed to Job objects + 'NuQCJob' ### e.g.: working-directory/ConvertJob, working-directory/QCJob... -cd ${TMPDIR} +cd {{output_path}} ### set a temp directory, make a new unique one under it and ### make sure we clean up as we're dumping to shm ### DO NOT do this casually. Only do a clean up like this if ### you know for sure TMPDIR is what you want. -mkdir -p ${TMPDIR} -export TMPDIR=${TMPDIR} -export TMPDIR=$(mktemp -d) +export TMPDIR={{temp_dir}}/ +# don't use mktemp -d to create a random temp dir. +# the one passed is unique already. echo $TMPDIR mkdir -p {{html_path}} @@ -71,7 +69,7 @@ function cleanup { } trap cleanup EXIT -export FILES=$(pwd)/$(printf "%s-%d" ${PREFIX} ${SLURM_ARRAY_TASK_ID}) +export FILES=$(printf "%s-%d" ${PREFIX} ${SLURM_ARRAY_TASK_ID}) if [[ ! -f ${FILES} ]]; then logger ${FILES} not found exit 1 @@ -92,16 +90,18 @@ do # for now just make sure each file is saved and we can read the data inside # to sort them out later. - html_name=$(echo "$r1_name.html") - json_name=$(echo "$r1_name.json") + + s_name=$(basename "${r1}" | sed -r 's/\.fastq\.gz//') + html_name=$(echo "$s_name.html") + json_name=$(echo "$s_name.json") echo "${i} ${r1_name} ${r2_name} ${base}" >> ${TMPDIR}/id_map fastp \ - -l 45 \ + -l {{length_limit}} \ -i ${r1} \ -I ${r2} \ - -w 7 \ + -w {{cores_per_task}} \ --adapter_fasta {{knwn_adpt_path}} \ --html {{html_path}}/${html_name} \ --json {{json_path}}/${json_name} \ @@ -113,7 +113,7 @@ function minimap2_runner () { mmi=$1 echo "$(date) :: $(basename ${mmi})" - minimap2 -2 -ax sr -t 7 ${mmi} ${TMPDIR}/seqs.fastq | \ + minimap2 -2 -ax sr -t {{cores_per_task}} ${mmi} ${TMPDIR}/seqs.fastq | \ samtools fastq -@ 1 -f 12 -F 256 > ${TMPDIR}/seqs_new.fastq mv ${TMPDIR}/seqs_new.fastq ${TMPDIR}/seqs.fastq } diff --git a/sequence_processing_pipeline/tests/test_NuQCJob.py b/sequence_processing_pipeline/tests/test_NuQCJob.py index e3c0c6f7..1d9468bc 100644 --- a/sequence_processing_pipeline/tests/test_NuQCJob.py +++ b/sequence_processing_pipeline/tests/test_NuQCJob.py @@ -6,7 +6,6 @@ from sequence_processing_pipeline.PipelineError import PipelineError from os import makedirs, remove from metapool import KLSampleSheet, validate_and_scrub_sample_sheet -from json import load import re @@ -1048,20 +1047,18 @@ def test_completed_file_generation(self): 30, 1000, '') my_path = ('sequence_processing_pipeline/tests/data/output_dir/' - 'NuQCJob/tmp') + 'NuQCJob') - # since .completed files are generated when jobs are submitted via - # the run() method and completed successfully, we must manually - # create the files _was_successful() expects to see. - for i in range(1, 10): - with open(join(my_path, - f'hd-split-pangenome.3456_{i}.completed'), - 'w') as f: - f.write("This is a .completed file.") + # since a .completed file is generated when the job is complete, we + # must manually create the file we expect to see once a run is + # complete. e.g.: /NuQCJob/ + # hds-c11c054e-aeb1-46ce-8cea-e233d4cd3070.1740338.completed + comp_fp = join(my_path, f'hds-{self.qiita_job_id}.3456.completed') - job.counts[job.batch_prefix] = 9 - results = job._get_failed_indexes('hd-split-pangenome', "3456") - self.assertTrue(len(results) == 0) + with open(comp_fp, 'w') as f: + f.write("This is a .completed file.") + + self.assertTrue(job._confirm_job_completed()) def test_completed_file_generation_some_failures(self): double_db_paths = ["db_path/mmi_1.db", "db_path/mmi_2.db"] @@ -1072,35 +1069,9 @@ def test_completed_file_generation_some_failures(self): 'fastp', 'minimap2', 'samtools', [], self.qiita_job_id, 30, 1000, '') - my_path = ('sequence_processing_pipeline/tests/data/output_dir/' - 'NuQCJob/tmp') - - # simulate one job failing and not generating a .completed file by - # setting range to (2, 9) from (0, 9). get_failed_indexes() should - # return [0,1] as a result. - for i in range(3, 10): - with open(join(my_path, - f'hd-split-pangenome.4567_{i}.completed'), - 'w') as f: - f.write("This is a .completed file.") - - job.counts[job.batch_prefix] = 9 - results = job._get_failed_indexes('hd-split-pangenome', "4567") - self.assertTrue(results, [1, 2]) - - # verify that when one or more array jobs have failed to complete, - # a file is created that gives a job id and an array index. - my_path = ('sequence_processing_pipeline/tests/data/output_dir/' - 'NuQCJob/logs') - log_fp = join(my_path, 'failed_indexes_4567.json') - - self.assertTrue(exists(log_fp)) - - with open(log_fp, 'r') as f: - obs = load(f) - exp = {"job_id": "4567", - "failed_indexes": [1, 2]} - self.assertDictEqual(obs, exp) + # test _confirm_job_completed() fails when a .completed file isn't + # manually created. + self.assertFalse(job._confirm_job_completed()) def test_generate_job_script(self): double_db_paths = ["db_path/mmi_1.db", "db_path/mmi_2.db"] @@ -1117,24 +1088,33 @@ def test_generate_job_script(self): with open(job_script_path, 'r') as f: obs = [line.replace('\n', '') for line in f] - base_path_rgx = re.compile(r' (/.*/mg-scripts)/sequence_processing_' - 'pipeline/tests/data/output_dir/NuQCJob/') - - demux_bin_rgx = re.compile(r'\s+(.*/bin)/demux \\') + # remove path to demux binary, which is testing environment + # specific, and replace w/a static string. + rgx = re.compile(r'\s+(.*/bin)/demux \\') for i in range(0, len(obs)): - m = base_path_rgx.search(obs[i]) + m = rgx.search(obs[i]) if m: obs[i] = obs[i].replace(m[1], 'REMOVED') - else: - m = demux_bin_rgx.search(obs[i]) - if m: - obs[i] = obs[i].replace(m[1], 'REMOVED') + + # remove partial paths that are testing environment specific and + # replace w/a static string. + rgx = re.compile(r"(/.*/mg-scripts/)") + + for i in range(0, len(obs)): + m = rgx.search(obs[i]) + if m: + obs[i] = obs[i].replace(m[1], 'REMOVED/') exp = ["#!/bin/bash -l", "#SBATCH -J abcdabcdabcdabcdabcdabcdabcdabcd_NuQCJob", "#SBATCH -p queue_name", "### wall-time-limit in minutes", "#SBATCH --time 1440", "#SBATCH --mem 8gbG", "#SBATCH -N 1", + ("### Note cores_per_task maps to fastp & minimap2 thread " + "counts"), + "### as well as sbatch -c. demux threads remains fixed at 1.", + ("### Note -c set to 4 and thread counts set to 7 during " + "testing."), "#SBATCH -c 4", "", "if [[ -z \"${SLURM_ARRAY_TASK_ID}\" ]]; then", " echo \"Not operating within an array\"", @@ -1145,8 +1125,7 @@ def test_generate_job_script(self): " exit 1", "fi", "", "if [[ -z ${PREFIX} ]]; then", " echo \"PREFIX is not set\"", " exit 1", "fi", "", "if [[ -z ${OUTPUT} ]]; then", " echo \"OUTPUT is not set\"", - " exit 1", "fi", "", "if [[ -z ${TMPDIR} ]]; then", - " echo \"TMPDIR is not set\"", " exit 1", "fi", "", + " exit 1", "fi", "", "echo \"MMI is ${MMI}\"", "", "conda activate human-depletion", "", "set -x", "set -e", "", "date", "hostname", "echo ${SLURM_JOBID} ${SLURM_ARRAY_TASK_ID}", @@ -1154,13 +1133,17 @@ def test_generate_job_script(self): "'NuQCJob'"), ("### e.g.: working-directory/ConvertJob, working-directory/" "QCJob..."), - "cd ${TMPDIR}", "", + ("cd REMOVED/sequence_processing_pipeline/tests/data/" + "output_dir/NuQCJob"), "", "### set a temp directory, make a new unique one under it and", "### make sure we clean up as we're dumping to shm", "### DO NOT do this casually. Only do a clean up like this if", "### you know for sure TMPDIR is what you want.", "", - "mkdir -p ${TMPDIR}", "export TMPDIR=${TMPDIR}", - "export TMPDIR=$(mktemp -d)", "echo $TMPDIR", "", + ("export TMPDIR=REMOVED/sequence_processing_pipeline/tests/" + "data/output_dir/NuQCJob/tmp/"), + "# don't use mktemp -d to create a random temp dir.", + "# the one passed is unique already.", + "echo $TMPDIR", "", ("mkdir -p REMOVED/sequence_processing_pipeline/tests/data/" "output_dir/NuQCJob/fastp_reports_dir/html"), ("mkdir -p REMOVED/sequence_processing_pipeline/tests/data/" @@ -1168,7 +1151,7 @@ def test_generate_job_script(self): "", "function cleanup {", " echo \"Removing $TMPDIR\"", " rm -fr $TMPDIR", " unset TMPDIR", "}", "trap cleanup EXIT", "", - ("export FILES=$(pwd)/$(printf \"%s-%d\" ${PREFIX} ${SLURM_" + ("export FILES=$(printf \"%s-%d\" ${PREFIX} ${SLURM_" "ARRAY_TASK_ID})"), "if [[ ! -f ${FILES} ]]; then", " logger ${FILES} not found", " exit 1", "fi", "", "delimiter=::MUX::", @@ -1183,13 +1166,14 @@ def test_generate_job_script(self): " r2_name=$(basename ${r2} .fastq.gz)", "", (" # for now just make sure each file is saved and we can " "read the data inside"), - " # to sort them out later.", - " html_name=$(echo \"$r1_name.html\")", - " json_name=$(echo \"$r1_name.json\")", "", + " # to sort them out later.", "", + " s_name=$(basename \"${r1}\" | sed -r 's/\\.fastq\\.gz//')", + " html_name=$(echo \"$s_name.html\")", + " json_name=$(echo \"$s_name.json\")", "", (" echo \"${i} ${r1_name} ${r2_name} ${base}\" >> " "${TMPDIR}/id_map"), - "", " fastp \\", " -l 45 \\", " -i ${r1} \\", - " -I ${r2} \\", " -w 7 \\", + "", " fastp \\", " -l 100 \\", " -i ${r1} \\", + " -I ${r2} \\", " -w 4 \\", " --adapter_fasta \\", (" --html REMOVED/sequence_processing_pipeline/tests/" "data/output_dir/NuQCJob/fastp_reports_dir/html/" @@ -1203,7 +1187,7 @@ def test_generate_job_script(self): "function minimap2_runner () {", " mmi=$1", " ", " echo \"$(date) :: $(basename ${mmi})\"", - " minimap2 -2 -ax sr -t 7 ${mmi} ${TMPDIR}/seqs.fastq | \\", + " minimap2 -2 -ax sr -t 4 ${mmi} ${TMPDIR}/seqs.fastq | \\", (" samtools fastq -@ 1 -f 12 -F 256 > ${TMPDIR}/" "seqs_new.fastq"), " mv ${TMPDIR}/seqs_new.fastq ${TMPDIR}/seqs.fastq",