diff --git a/sequence_processing_pipeline/Commands.py b/sequence_processing_pipeline/Commands.py index ae971fc9..2879cb03 100644 --- a/sequence_processing_pipeline/Commands.py +++ b/sequence_processing_pipeline/Commands.py @@ -1,7 +1,8 @@ import glob import gzip import os -from sequence_processing_pipeline.util import iter_paired_files +from sequence_processing_pipeline.util import (iter_paired_files, + determine_orientation) def split_similar_size_bins(data_location_path, max_file_list_size_in_gb, @@ -14,21 +15,17 @@ def split_similar_size_bins(data_location_path, max_file_list_size_in_gb, :return: The number of output-files created, size of largest bin. ''' - # SPP root paths are of the form: - # ../72dc6080-c453-418e-8a47-1ccd6d646a30/ConvertJob, and contain only - # directories named after projects e.g: - # 72dc6080-c453-418e-8a47-1ccd6d646a30/ConvertJob/Tell_Seq_15196. - # Each of these directories contain R1 and R2 fastq files. Hence, path - # is now the following: - # add one more level to account for project_names nested under ConvertJob - # dir. - # this will ignore the _I1_ reads that appear in the integrated result. - fastq_paths = glob.glob(data_location_path + '*/*/*.fastq.gz') - - # case-specific filter for TellSeq output directories that also contain - # _I1_ files. Ensure paths are still sorted afterwards. - fastq_paths = [x for x in fastq_paths if '_I1_001.fastq.gz' not in x] - fastq_paths = sorted(fastq_paths) + # to prevent issues w/filenames like the ones below from being mistaken + # for R1 or R2 files, use determine_orientation(). + # LS_8_22_2014_R2_SRE_S2_L007_I1_001.fastq.gz + # LS_8_22_2014_R1_SRE_S3_L007_I1_001.fastq.gz + + # since the names of all fastq files are being scanned for orientation, + # collect all of them instead of mistakenly pre-filtering some files. + # fastq_paths = glob.glob(data_location_path + '/*/*_R?_*.fastq.gz') + fastq_paths = glob.glob(data_location_path + '/*/*.fastq.gz') + fastq_paths = [x for x in fastq_paths + if determine_orientation(x) in ['R1', 'R2']] # convert from GB and halve as we sum R1 max_size = (int(max_file_list_size_in_gb) * (2 ** 30) / 2) @@ -114,21 +111,43 @@ def demux(id_map, fp, out_d, task, maxtask): openfps[idx] = current_fp # setup a parser - id_ = iter(fp) + seq_id = iter(fp) seq = iter(fp) dumb = iter(fp) qual = iter(fp) - for i, s, d, q in zip(id_, seq, dumb, qual): - fname_encoded, id_ = i.split(delimiter, 1) + for i, s, d, q in zip(seq_id, seq, dumb, qual): + # '@1', 'LH00444:84:227CNHLT4:7:1101:41955:2443/1' + # '@1', 'LH00444:84:227CNHLT4:7:1101:41955:2443/1 BX:Z:TATGACACATGCGGCCCT' # noqa + # '@baz/1 + + fname_encoded, sid = i.split(delimiter, 1) if fname_encoded not in openfps: continue - orientation = id_[-2] # -1 is \n current_fp = openfps[fname_encoded] - id_ = rec + id_ - current_fp[orientation].write(id_) + + # remove '\n' from sid and split on all whitespace. + tmp = sid.strip().split() + + if len(tmp) == 1: + # sequence id line contains no optional metadata. + # don't change sid. + # -1 is \n + orientation = sid[-2] + sid = rec + sid + elif len(tmp) == 2: + sid = tmp[0] + metadata = tmp[1] + # no '\n' + orientation = sid[-1] + # hexdump confirms separator is ' ', not '\t' + sid = rec + sid + ' ' + metadata + '\n' + else: + raise ValueError(f"'{sid}' is not a recognized form") + + current_fp[orientation].write(sid) current_fp[orientation].write(s) current_fp[orientation].write(d) current_fp[orientation].write(q) diff --git a/sequence_processing_pipeline/FastQCJob.py b/sequence_processing_pipeline/FastQCJob.py index 8db0440b..03151126 100644 --- a/sequence_processing_pipeline/FastQCJob.py +++ b/sequence_processing_pipeline/FastQCJob.py @@ -278,11 +278,13 @@ def _generate_job_script(self): lines.append(f"#SBATCH -n {self.nprocs}") lines.append("#SBATCH --time %d" % self.wall_time_limit) lines.append(f"#SBATCH --mem {self.jmem}") + lines.append('#SBATCH --constraint="intel"') lines.append("#SBATCH --array 1-%d%%%d" % ( len(self.commands), self.pool_size)) lines.append("set -x") + lines.append("set +e") lines.append('date') lines.append('hostname') lines.append('echo ${SLURM_JOBID} ${SLURM_ARRAY_TASK_ID}') diff --git a/sequence_processing_pipeline/GenPrepFileJob.py b/sequence_processing_pipeline/GenPrepFileJob.py index 49e8f651..129d5050 100644 --- a/sequence_processing_pipeline/GenPrepFileJob.py +++ b/sequence_processing_pipeline/GenPrepFileJob.py @@ -13,7 +13,7 @@ class GenPrepFileJob(Job): def __init__(self, run_dir, convert_job_path, qc_job_path, output_path, input_file_path, seqpro_path, modules_to_load, - qiita_job_id, is_amplicon=False): + qiita_job_id, reports_path, is_amplicon=False): super().__init__(run_dir, output_path, @@ -31,13 +31,22 @@ def __init__(self, run_dir, convert_job_path, qc_job_path, output_path, self.commands = [] self.has_replicates = False self.replicate_count = 0 + self.reports_path = reports_path # make the 'root' of your run_directory makedirs(join(self.output_path, self.run_id), exist_ok=True) # copy bcl-convert's Stats-equivalent directory to the # run_directory - copytree(join(convert_job_path, 'Reports'), - join(self.output_path, self.run_id, 'Reports')) + + # This directory will already exist on restarts, hence avoid + # copying. + reports_dir = join(self.output_path, self.run_id, 'Reports') + + if exists(reports_dir): + self.is_restart = True + else: + self.is_restart = False + copytree(self.reports_path, reports_dir) # extracting from either convert_job_path or qc_job_path should # produce equal results. @@ -52,22 +61,26 @@ def __init__(self, run_dir, convert_job_path, qc_job_path, output_path, dst = join(self.output_path, self.run_id, project) - if self.is_amplicon: - if exists(amplicon_seq_dir): - makedirs(dst, exist_ok=True) - symlink(amplicon_seq_dir, join(dst, 'amplicon')) - else: - if exists(filtered_seq_dir): - makedirs(dst, exist_ok=True) - symlink(filtered_seq_dir, join(dst, 'filtered_sequences')) - - if exists(trimmed_seq_dir): - makedirs(dst, exist_ok=True) - symlink(trimmed_seq_dir, join(dst, 'trimmed_sequences')) - - if exists(fastp_rept_dir): - makedirs(dst, exist_ok=True) - symlink(fastp_rept_dir, join(dst, 'json')) + if not self.is_restart: + # these will already be created if restarted. + if self.is_amplicon: + if exists(amplicon_seq_dir): + makedirs(dst, exist_ok=True) + symlink(amplicon_seq_dir, join(dst, 'amplicon')) + else: + if exists(filtered_seq_dir): + makedirs(dst, exist_ok=True) + symlink(filtered_seq_dir, join(dst, + 'filtered_sequences')) + + if exists(trimmed_seq_dir): + makedirs(dst, exist_ok=True) + symlink(trimmed_seq_dir, join(dst, + 'trimmed_sequences')) + + if exists(fastp_rept_dir): + makedirs(dst, exist_ok=True) + symlink(fastp_rept_dir, join(dst, 'json')) # seqpro usage: # seqpro path/to/run_dir path/to/sample/sheet /path/to/fresh/output_dir diff --git a/sequence_processing_pipeline/SeqCountsJob.py b/sequence_processing_pipeline/SeqCountsJob.py index f080bd00..b88fdb3a 100644 --- a/sequence_processing_pipeline/SeqCountsJob.py +++ b/sequence_processing_pipeline/SeqCountsJob.py @@ -1,11 +1,14 @@ -from os.path import join, split +from collections import defaultdict from .Job import Job, KISSLoader from .PipelineError import JobFailedError -import logging +from glob import glob from jinja2 import Environment +from metapool import load_sample_sheet from os import walk -from json import dumps -from glob import glob +from os.path import join, split +import logging +import pandas as pd +from sequence_processing_pipeline.util import determine_orientation logging.basicConfig(level=logging.DEBUG) @@ -15,7 +18,7 @@ class SeqCountsJob(Job): def __init__(self, run_dir, output_path, queue_name, node_count, wall_time_limit, jmem, modules_to_load, qiita_job_id, max_array_length, files_to_count_path, - cores_per_task=4): + sample_sheet_path, cores_per_task=4): """ ConvertJob provides a convenient way to run bcl-convert or bcl2fastq on a directory BCL files to generate Fastq files. @@ -29,6 +32,7 @@ def __init__(self, run_dir, output_path, queue_name, :param qiita_job_id: identify Torque jobs using qiita_job_id :param max_array_length: A hard-limit for array-sizes :param files_to_count_path: A path to a list of file-paths to count. + :param sample_sheet_path: A path to the sample-sheet. :param cores_per_task: (Optional) # of CPU cores per node to request. """ super().__init__(run_dir, @@ -50,6 +54,7 @@ def __init__(self, run_dir, output_path, queue_name, self.job_name = (f"seq_counts_{self.qiita_job_id}") self.files_to_count_path = files_to_count_path + self.sample_sheet_path = sample_sheet_path with open(self.files_to_count_path, 'r') as f: lines = f.readlines() @@ -61,7 +66,7 @@ def run(self, callback=None): job_script_path = self._generate_job_script() params = ['--parsable', f'-J {self.job_name}', - f'--array 1-{self.sample_count}'] + f'--array 1-{self.file_count}'] try: self.job_info = self.submit_job(job_script_path, job_parameters=' '.join(params), @@ -77,7 +82,7 @@ def run(self, callback=None): info.insert(0, str(e)) raise JobFailedError('\n'.join(info)) - self._aggregate_counts() + self._aggregate_counts(self.sample_sheet_path) logging.debug(f'SeqCountJob {self.job_info["job_id"]} completed') @@ -96,6 +101,7 @@ def _generate_job_script(self): "cores_per_task": self.cores_per_task, "queue_name": self.queue_name, "file_count": self.file_count, + "files_to_count_path": self.files_to_count_path, "output_path": self.output_path })) @@ -113,18 +119,24 @@ def parse_logs(self): return [msg.strip() for msg in msgs] - def _aggregate_counts(self): - def extract_metadata(fp): - with open(fp, 'r') as f: + def _aggregate_counts_by_file(self): + # aggregates sequence & bp counts from a directory of log files. + + def extract_metadata(log_output_file_path): + """ + extracts sequence & bp counts from individual log files. + """ + with open(log_output_file_path, 'r') as f: lines = f.readlines() lines = [x.strip() for x in lines] if len(lines) != 2: - raise ValueError("error processing %s" % fp) + raise ValueError( + "error processing %s" % log_output_file_path) _dir, _file = split(lines[0]) seq_counts, base_pairs = lines[1].split('\t') return _dir, _file, int(seq_counts), int(base_pairs) - results = {} + results = defaultdict(dict) for root, dirs, files in walk(self.log_path): for _file in files: @@ -133,15 +145,123 @@ def extract_metadata(fp): _dir, _file, seq_counts, base_pairs = \ extract_metadata(log_output_file) - if _dir not in results: - results[_dir] = {} + results[_file] = { + 'seq_counts': seq_counts, + 'base_pairs': base_pairs + } + + return results + + def _aggregate_counts(self, sample_sheet_path): + """ + Aggregate results by sample_ids and write to file. + Args: + sample_sheet_path: + + Returns: None + """ + def get_metadata(sample_sheet_path): + sheet = load_sample_sheet(sample_sheet_path) + + lanes = [] + + if sheet.validate_and_scrub_sample_sheet(): + results = {} + + for sample in sheet.samples: + sample_name = sample['Sample_Name'] + sample_id = sample['Sample_ID'] + lanes.append(sample['Lane']) + results[sample_id] = sample_name + + lanes = list(set(lanes)) + + if len(lanes) != 1: + raise ValueError( + "More than one lane is declared in sample-sheet") + + return results, lanes[0] + + # get lane number and sample-sheet names and ids + samples, lane = get_metadata(sample_sheet_path) + + # aggregate results by filename + by_files = self._aggregate_counts_by_file() + + # the per-sample-fastqs will be named according to sample-id. Generate + # a list of the sample-ids defined in the sample-sheet and sort them + # from longest to shortest. This allows us to match sample-ids to + # files correctly even when some sample-ids are subsets of longer + # sample-ids e.g.: 'T_LS_7_15_15B_SRE' and 'T_LS_7_15_15B'. This is + # important as SeqCounts is intended to count directories of fastq + # files that don't necessarily obey the standard Illumina naming + # convention e.g: samplename_S1_L001_R1_001.fastq.gz. + sample_ids = list(samples.keys()) + sample_ids.sort(reverse=True, key=len) + + results = defaultdict(list) + + # generate a list of file names to associate with sample-ids. + # only count forward and reverse reads. don't count I? or any other + # type of file present. + file_names = [x for x in list(by_files.keys()) if + determine_orientation(x) in ['R1', 'R2']] + + for sample_id in sample_ids: + found = [x for x in file_names if x.startswith(sample_id)] + + if len(found) == 0: + # zero file matches for a sample_id means that a per-sample + # fastq file was not generated at the stage referenced by the + # paths in self.files_to_count_path. For example, a per-sample + # fastq file may not have been generated by bcl-convert for a + # particular sample, or the filtered sample may be of zero- + # length. These things are normal operation and any error is + # going to be logged by those Job() objects. It's okay that + # a match wasn't found for a given sample_id defined in the + # sample-sheet. + continue + + if len(found) != 2: + # Raise an error if more or less than two matches are found + # for a given sample-id because this our output must be the + # total sequence count for forward and reverse reads. + raise ValueError("Multiple file matches for sample-id " + f"'{sample_id}' found: {found}") + + # remove the found elements from the list of files so they're + # not associated with additional sample-ids in a subsequent + # loop iteration. + file_names = list(set(file_names) - set(found)) + + results[sample_id] = found + + # output the results in CSV format. + sample_ids = [] + raw_reads_r1r2 = [] + lanes = [] + + for sample_id in results: + sample_ids.append(sample_id) + found = results[sample_id] + seq_counts = 0 + + for _file in found: + seq_counts += by_files[_file]['seq_counts'] + + raw_reads_r1r2.append(seq_counts) + lanes.append(lane) + + df = pd.DataFrame(data={'Sample_ID': sample_ids, + 'raw_reads_r1r2': raw_reads_r1r2, + 'Lane': lanes}) - results[_dir][_file] = {'seq_counts': seq_counts, - 'base_pairs': base_pairs} + df.set_index(['Sample_ID', 'Lane'], verify_integrity=True) - results_path = join(self.output_path, 'aggregate_counts.json') + # sort results into a predictable order for testing purposes + df = df.sort_values(by='Sample_ID') - with open(results_path, 'w') as f: - print(dumps(results, indent=2), file=f) + result_path = join(self.output_path, 'SeqCounts.csv') + df.to_csv(result_path, index=False, sep=",") - return results_path + return result_path diff --git a/sequence_processing_pipeline/TRIntegrateJob.py b/sequence_processing_pipeline/TRIntegrateJob.py index 7b8740b4..90494f97 100644 --- a/sequence_processing_pipeline/TRIntegrateJob.py +++ b/sequence_processing_pipeline/TRIntegrateJob.py @@ -6,8 +6,10 @@ from .Pipeline import Pipeline from .PipelineError import PipelineError from metapool import load_sample_sheet -from os import makedirs +from os import makedirs, walk from shutil import copyfile +from collections import defaultdict +import re logging.basicConfig(level=logging.DEBUG) @@ -85,11 +87,6 @@ def run(self, callback=None): copyfile(self.sil_path, join(self.output_path, 'sample_index_list.txt')) - # generate the tailored subset of adapter to barcode_id based on - # the proprietary lists owned by the manufacturer and supplied by - # the caller, and the barcode ids found in the sample-sheet. - self._generate_sample_index_list() - makedirs(self.tmp_dir) params = ['--parsable', @@ -161,3 +158,27 @@ def _generate_job_script(self): "output_dir": self.output_path})) return job_script_path + + def audit(self): + def map_barcode_id_to_sample_id(barcode_id): + for s_id, _, b_id in self.sample_ids: + if barcode_id == b_id: + return s_id + + integrated = defaultdict(list) + for root, dirs, files in walk(join(self.output_path, 'integrated')): + for _file in files: + m = re.match(r"(C5\d\d)\.([R,I]\d)\.fastq.gz", _file) + if m: + barcode_id, read = m.groups(1) + integrated[barcode_id].append(read) + + # a sample was processed successfully if all three expected reads are + # present. + failed = [] + for barcode_id in integrated: + # we expect only the following read/orientations. + if not set(integrated[barcode_id]) == {'I1', 'R1', 'R2'}: + failed.append(map_barcode_id_to_sample_id(barcode_id)) + + return sorted(failed) diff --git a/sequence_processing_pipeline/TellReadJob.py b/sequence_processing_pipeline/TellReadJob.py index 3d68d4c8..53802736 100644 --- a/sequence_processing_pipeline/TellReadJob.py +++ b/sequence_processing_pipeline/TellReadJob.py @@ -6,6 +6,9 @@ from .Pipeline import Pipeline from .PipelineError import PipelineError from metapool import load_sample_sheet +from os import walk +import re +from collections import defaultdict logging.basicConfig(level=logging.DEBUG) @@ -172,3 +175,31 @@ def _generate_job_script(self): })) return job_script_path + + def audit(self): + # this overriden audit method does not need sample-ids passed as a + # parameter because this job is already aware of what samples should + # be present and more importantly, how they map to barcode_ids. + def map_barcode_id_to_sample_id(barcode_id): + for s_id, _, b_id in self.sample_ids: + if barcode_id == b_id: + return s_id + + corrected = defaultdict(list) + for root, dirs, files in walk(join(self.output_path, 'Full')): + for _file in files: + m = re.match(r"TellReadJob_(.\d)_(C\d\d\d).fastq.gz.corrected." + r"err_barcode_removed.fastq", _file) + if m: + read, barcode_id = m.groups(1) + corrected[barcode_id].append(read) + + # a sample was processed successfully if all three expected reads are + # present. + failed = [] + for barcode_id in corrected: + # we expect only the following read/orientations. + if not set(corrected[barcode_id]) == {'I1', 'R1', 'R2'}: + failed.append(map_barcode_id_to_sample_id(barcode_id)) + + return sorted(failed) diff --git a/sequence_processing_pipeline/templates/nuqc_job.sh b/sequence_processing_pipeline/templates/nuqc_job.sh index 363b45f1..7ada81f4 100644 --- a/sequence_processing_pipeline/templates/nuqc_job.sh +++ b/sequence_processing_pipeline/templates/nuqc_job.sh @@ -12,6 +12,7 @@ ### Commented out for now, but there is a possibility it will be needed ### in the future. ###SBATCH --gres=node_jobs:{{gres_value}} +#SBATCH --constraint="amd" echo "---------------" diff --git a/sequence_processing_pipeline/templates/seq_counts.sbatch b/sequence_processing_pipeline/templates/seq_counts.sbatch index f44bd5b9..5ea12650 100644 --- a/sequence_processing_pipeline/templates/seq_counts.sbatch +++ b/sequence_processing_pipeline/templates/seq_counts.sbatch @@ -15,7 +15,7 @@ set -e mkdir -p {{output_path}}/logs -files=($(cat {{output_path}}/files_to_count.txt)) +files=($(cat {{files_to_count_path}})) my_file=${files[$((${SLURM_ARRAY_TASK_ID} - 1))]} echo "${my_file}" diff --git a/sequence_processing_pipeline/tests/data/SeqCounts.csv b/sequence_processing_pipeline/tests/data/SeqCounts.csv new file mode 100644 index 00000000..18e905ab --- /dev/null +++ b/sequence_processing_pipeline/tests/data/SeqCounts.csv @@ -0,0 +1,3 @@ +Sample_ID,raw_reads_r1r2,Lane +LS_8_22_2014_R1_SRE,140798056,1 +LS_8_22_2014_R2_SRE,128928324,1 diff --git a/sequence_processing_pipeline/tests/data/aggregate_counts_results.json b/sequence_processing_pipeline/tests/data/aggregate_counts_results.json deleted file mode 100644 index 1cae0f05..00000000 --- a/sequence_processing_pipeline/tests/data/aggregate_counts_results.json +++ /dev/null @@ -1,36 +0,0 @@ -{ - "REMOVED/8edbdee2-da52-4278-af40-267185bbcd7e/TellReadJob/Full": { - "TellReadJob_I1_C520.fastq.gz.erroneous.fastq": { - "seq_counts": 2139633, - "base_pairs": 38513394 - }, - "TellReadJob_R1_C519.fastq.gz.corrected.err_barcode_removed.fastq": { - "seq_counts": 64464162, - "base_pairs": 8345327641 - }, - "TellReadJob_R1_C520.fastq.gz.corrected.err_barcode_removed.fastq": { - "seq_counts": 70399028, - "base_pairs": 9293296513 - }, - "TellReadJob_I1_C519.fastq.gz.erroneous.fastq": { - "seq_counts": 1932116, - "base_pairs": 34778088 - }, - "TellReadJob_I1_C519.fastq.gz.corrected.err_barcode_removed.fastq": { - "seq_counts": 64464162, - "base_pairs": 1160354916 - }, - "TellReadJob_R2_C519.fastq.gz.corrected.err_barcode_removed.fastq": { - "seq_counts": 64464162, - "base_pairs": 8370238082 - }, - "TellReadJob_R2_C520.fastq.gz.corrected.err_barcode_removed.fastq": { - "seq_counts": 70399028, - "base_pairs": 9317943166 - }, - "TellReadJob_I1_C520.fastq.gz.corrected.err_barcode_removed.fastq": { - "seq_counts": 70399028, - "base_pairs": 1267182504 - } - } -} diff --git a/sequence_processing_pipeline/tests/data/seq_counts.sbatch b/sequence_processing_pipeline/tests/data/seq_counts.sbatch index cc73187c..70398dfe 100644 --- a/sequence_processing_pipeline/tests/data/seq_counts.sbatch +++ b/sequence_processing_pipeline/tests/data/seq_counts.sbatch @@ -3,7 +3,7 @@ #SBATCH --time 1440 #SBATCH --mem 8G #SBATCH -N 1 -#SBATCH -c 1 +#SBATCH -c 4 #SBATCH -p qiita #SBATCH --array=1-8 @@ -15,7 +15,7 @@ set -e mkdir -p sequence_processing_pipeline/tests/2caa8226-cf69-45a3-bd40-1e90ec3d18d0/SeqCountsJob/logs -files=($(cat sequence_processing_pipeline/tests/2caa8226-cf69-45a3-bd40-1e90ec3d18d0/SeqCountsJob/files_to_count.txt)) +files=($(cat sequence_processing_pipeline/tests/data/files_to_count.txt)) my_file=${files[$((${SLURM_ARRAY_TASK_ID} - 1))]} echo "${my_file}" diff --git a/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_1.err b/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_1.err index 47c59651..8ef9a94c 100644 --- a/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_1.err +++ b/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_1.err @@ -1,3 +1,3 @@ This is an example .err file produced by seq_counts.sbatch. Additional details removed. -+ seqtk size REMOVED/working_dir/8edbdee2-da52-4278-af40-267185bbcd7e/TellReadJob/Full/TellReadJob_R1_C519.fastq.gz.corrected.err_barcode_removed.fastq ++ seqtk size REMOVED/working_dir/8edbdee2-da52-4278-af40-267185bbcd7e/TRIntegrateJob/integrated/LS_8_22_2014_R2_SRE_S2_L007_R1_001.fastq.gz diff --git a/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_1.out b/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_1.out index 50a46674..06753505 100644 --- a/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_1.out +++ b/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_1.out @@ -1,2 +1,2 @@ -REMOVED/8edbdee2-da52-4278-af40-267185bbcd7e/TellReadJob/Full/TellReadJob_R1_C519.fastq.gz.corrected.err_barcode_removed.fastq +REMOVED/8edbdee2-da52-4278-af40-267185bbcd7e/TRIntegrateJob/integrated/LS_8_22_2014_R2_SRE_S2_L007_R1_001.fastq.gz 64464162 8345327641 diff --git a/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_2.err b/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_2.err index 47c59651..ab1c2b3e 100644 --- a/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_2.err +++ b/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_2.err @@ -1,3 +1,3 @@ This is an example .err file produced by seq_counts.sbatch. Additional details removed. -+ seqtk size REMOVED/working_dir/8edbdee2-da52-4278-af40-267185bbcd7e/TellReadJob/Full/TellReadJob_R1_C519.fastq.gz.corrected.err_barcode_removed.fastq ++ seqtk size REMOVED/working_dir/8edbdee2-da52-4278-af40-267185bbcd7e/TRIntegrateJob/integrated/LS_8_22_2014_R1_SRE_S3_L007_R1_001.fastq.gz \ No newline at end of file diff --git a/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_2.out b/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_2.out index 87ad9f55..439eac51 100644 --- a/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_2.out +++ b/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_2.out @@ -1,2 +1,2 @@ -REMOVED/8edbdee2-da52-4278-af40-267185bbcd7e/TellReadJob/Full/TellReadJob_R1_C520.fastq.gz.corrected.err_barcode_removed.fastq +REMOVED/8edbdee2-da52-4278-af40-267185bbcd7e/TRIntegrateJob/integrated/LS_8_22_2014_R1_SRE_S3_L007_R1_001.fastq.gz 70399028 9293296513 diff --git a/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_3.err b/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_3.err deleted file mode 100644 index e9c0cf9d..00000000 --- a/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_3.err +++ /dev/null @@ -1,3 +0,0 @@ -This is an example .err file produced by seq_counts.sbatch. -Additional details removed. -+ seqtk size REMOVED/working_dir/8edbdee2-da52-4278-af40-267185bbcd7e/TellReadJob/Full/TellReadJob_R1_C520.fastq.gz.corrected.err_barcode_removed.fastq diff --git a/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_3.out b/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_3.out deleted file mode 100644 index a22d9f8d..00000000 --- a/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_3.out +++ /dev/null @@ -1,2 +0,0 @@ -REMOVED/8edbdee2-da52-4278-af40-267185bbcd7e/TellReadJob/Full/TellReadJob_I1_C519.fastq.gz.erroneous.fastq -1932116 34778088 diff --git a/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_4.err b/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_4.err index e9c0cf9d..4c635b60 100644 --- a/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_4.err +++ b/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_4.err @@ -1,3 +1,3 @@ This is an example .err file produced by seq_counts.sbatch. Additional details removed. -+ seqtk size REMOVED/working_dir/8edbdee2-da52-4278-af40-267185bbcd7e/TellReadJob/Full/TellReadJob_R1_C520.fastq.gz.corrected.err_barcode_removed.fastq ++ seqtk size REMOVED/working_dir/8edbdee2-da52-4278-af40-267185bbcd7e/TRIntegrateJob/integrated/LS_8_22_2014_R1_SRE_S3_L007_R2_001.fastq.gz diff --git a/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_4.out b/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_4.out index 0b35614a..da6697a7 100644 --- a/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_4.out +++ b/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_4.out @@ -1,2 +1,2 @@ -REMOVED/8edbdee2-da52-4278-af40-267185bbcd7e/TellReadJob/Full/TellReadJob_R2_C520.fastq.gz.corrected.err_barcode_removed.fastq +REMOVED/8edbdee2-da52-4278-af40-267185bbcd7e/TRIntegrateJob/integrated/LS_8_22_2014_R1_SRE_S3_L007_R2_001.fastq.gz 70399028 9317943166 diff --git a/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_5.err b/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_5.err index 47c59651..360f379f 100644 --- a/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_5.err +++ b/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_5.err @@ -1,3 +1,3 @@ This is an example .err file produced by seq_counts.sbatch. Additional details removed. -+ seqtk size REMOVED/working_dir/8edbdee2-da52-4278-af40-267185bbcd7e/TellReadJob/Full/TellReadJob_R1_C519.fastq.gz.corrected.err_barcode_removed.fastq ++ seqtk size REMOVED/working_dir/8edbdee2-da52-4278-af40-267185bbcd7e/TRIntegrateJob/integrated/LS_8_22_2014_R2_SRE_S2_L007_I1_001.fastq.gz diff --git a/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_5.out b/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_5.out index 887522ae..ba88f823 100644 --- a/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_5.out +++ b/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_5.out @@ -1,2 +1,2 @@ -REMOVED/8edbdee2-da52-4278-af40-267185bbcd7e/TellReadJob/Full/TellReadJob_I1_C520.fastq.gz.corrected.err_barcode_removed.fastq +REMOVED/8edbdee2-da52-4278-af40-267185bbcd7e/TRIntegrateJob/integrated/LS_8_22_2014_R2_SRE_S2_L007_I1_001.fastq.gz 70399028 1267182504 diff --git a/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_6.err b/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_6.err index e9c0cf9d..b28a1701 100644 --- a/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_6.err +++ b/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_6.err @@ -1,3 +1,3 @@ This is an example .err file produced by seq_counts.sbatch. Additional details removed. -+ seqtk size REMOVED/working_dir/8edbdee2-da52-4278-af40-267185bbcd7e/TellReadJob/Full/TellReadJob_R1_C520.fastq.gz.corrected.err_barcode_removed.fastq ++ seqtk size REMOVED/working_dir/8edbdee2-da52-4278-af40-267185bbcd7e/TRIntegrateJob/integrated/LS_8_22_2014_R2_SRE_S2_L007_R2_001.fastq.gz diff --git a/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_6.out b/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_6.out index a4fbd555..67ff467e 100644 --- a/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_6.out +++ b/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_6.out @@ -1,2 +1,2 @@ -REMOVED/8edbdee2-da52-4278-af40-267185bbcd7e/TellReadJob/Full/TellReadJob_R2_C519.fastq.gz.corrected.err_barcode_removed.fastq +REMOVED/8edbdee2-da52-4278-af40-267185bbcd7e/TRIntegrateJob/integrated/LS_8_22_2014_R2_SRE_S2_L007_R2_001.fastq.gz 64464162 8370238082 diff --git a/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_7.err b/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_7.err index 47c59651..069c0ac9 100644 --- a/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_7.err +++ b/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_7.err @@ -1,3 +1,3 @@ This is an example .err file produced by seq_counts.sbatch. Additional details removed. -+ seqtk size REMOVED/working_dir/8edbdee2-da52-4278-af40-267185bbcd7e/TellReadJob/Full/TellReadJob_R1_C519.fastq.gz.corrected.err_barcode_removed.fastq ++ seqtk size REMOVED/working_dir/8edbdee2-da52-4278-af40-267185bbcd7e/TRIntegrateJob/integrated/LS_8_22_2014_R1_SRE_S3_L007_I1_001.fastq.gz \ No newline at end of file diff --git a/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_7.out b/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_7.out index 6c6a9c06..75b103ce 100644 --- a/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_7.out +++ b/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_7.out @@ -1,2 +1,2 @@ -REMOVED/8edbdee2-da52-4278-af40-267185bbcd7e/TellReadJob/Full/TellReadJob_I1_C519.fastq.gz.corrected.err_barcode_removed.fastq +REMOVED/8edbdee2-da52-4278-af40-267185bbcd7e/TRIntegrateJob/integrated/LS_8_22_2014_R1_SRE_S3_L007_I1_001.fastq.gz 64464162 1160354916 diff --git a/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_8.err b/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_8.err deleted file mode 100644 index e9c0cf9d..00000000 --- a/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_8.err +++ /dev/null @@ -1,3 +0,0 @@ -This is an example .err file produced by seq_counts.sbatch. -Additional details removed. -+ seqtk size REMOVED/working_dir/8edbdee2-da52-4278-af40-267185bbcd7e/TellReadJob/Full/TellReadJob_R1_C520.fastq.gz.corrected.err_barcode_removed.fastq diff --git a/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_8.out b/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_8.out deleted file mode 100644 index 9be52329..00000000 --- a/sequence_processing_pipeline/tests/data/seq_counts_logs/seq_count_2679966_8.out +++ /dev/null @@ -1,2 +0,0 @@ -REMOVED/8edbdee2-da52-4278-af40-267185bbcd7e/TellReadJob/Full/TellReadJob_I1_C520.fastq.gz.erroneous.fastq -2139633 38513394 diff --git a/sequence_processing_pipeline/tests/test_GenPrepFileJob.py b/sequence_processing_pipeline/tests/test_GenPrepFileJob.py index c5b9a13b..9424b8ea 100644 --- a/sequence_processing_pipeline/tests/test_GenPrepFileJob.py +++ b/sequence_processing_pipeline/tests/test_GenPrepFileJob.py @@ -21,7 +21,8 @@ def setUp(self): self.run_id = '210518_A00953_0305_TEST' self.run_dir = join(self.working_directory_root, self.run_id) self.convert_job_path = join(self.run_dir, 'ConvertJob') - makedirs(join(self.convert_job_path, 'Reports'), exist_ok=True) + self.reports_path = join(self.convert_job_path, 'Reports') + makedirs(self.reports_path, exist_ok=True) self.qc_job_path = join(self.run_dir, 'QCJob') self.project_list = ['Project1'] makedirs(join(self.qc_job_path, self.project_list[0], @@ -43,7 +44,9 @@ def test_creation(self): sample_sheet_path, 'seqpro', [], - 'abcdabcdabcdabcdabcdabcdabcdabcd') + 'abcdabcdabcdabcdabcdabcdabcdabcd', + self.reports_path) + results = job._system_call(f'find {self.run_dir}') lines = results['stdout'].split('\n') lines = [re.sub(r'^.*?sequence_processing_pipeline\/', '', x) @@ -82,7 +85,8 @@ def test_get_prep_file_paths(self): sample_sheet_path, 'seqpro', [], - 'abcdabcdabcdabcdabcdabcdabcdabcd') + 'abcdabcdabcdabcdabcdabcdabcdabcd', + self.reports_path) # We cannot run the object and test the output that is returned from # seqpro, but we can test the helper method against canned stdout and @@ -121,7 +125,8 @@ def setUp(self): self.run_id = '210518_A00953_0305_TEST' self.run_dir = join(self.working_directory_root, self.run_id) self.convert_job_path = join(self.run_dir, 'ConvertJob') - makedirs(join(self.convert_job_path, 'Reports'), exist_ok=True) + self.reports_path = join(self.convert_job_path, 'Reports') + makedirs(self.reports_path, exist_ok=True) self.qc_job_path = join(self.run_dir, 'QCJob') self.project_list = ['Project1'] makedirs(join(self.qc_job_path, self.project_list[0], @@ -147,7 +152,8 @@ def test_sample_sheet_replicate_file_creation(self): sample_sheet_path, 'seqpro', [], - 'abcdabcdabcdabcdabcdabcdabcdabcd') + 'abcdabcdabcdabcdabcdabcdabcdabcd', + self.reports_path) exp = [['seqpro', '--verbose', ('sequence_processing_pipeline/b197f317-1c06-4619-9af3-' @@ -196,6 +202,7 @@ def test_pre_prep_replicate_file_creation(self): 'seqpro', [], 'abcdabcdabcdabcdabcdabcdabcdabcd', + self.reports_path, is_amplicon=True) exp = [['seqpro', '--verbose', diff --git a/sequence_processing_pipeline/tests/test_SeqCountsJob.py b/sequence_processing_pipeline/tests/test_SeqCountsJob.py index d641c3b2..657b5a8f 100644 --- a/sequence_processing_pipeline/tests/test_SeqCountsJob.py +++ b/sequence_processing_pipeline/tests/test_SeqCountsJob.py @@ -2,7 +2,8 @@ from sequence_processing_pipeline.SeqCountsJob import SeqCountsJob from functools import partial import unittest -from json import load as json_load +import pandas as pd +from pandas.testing import assert_frame_equal class TestSeqCountsJob(unittest.TestCase): @@ -31,8 +32,10 @@ def setUp(self): self.raw_fastq_dir = join(self.output_path, "TellReadJob", "Full") self.max_array_length = 100 self.exp_sbatch_output = self.path("data", "seq_counts.sbatch") - self.exp_results = self.path("data", - "aggregate_counts_results.json") + self.exp_results = self.path("data", "SeqCounts.csv") + self.dummy_sample_sheet = self.path("data", + "tellseq_metag_dummy_sample" + "_sheet.csv") def test_creation(self): def compare_files(obs, exp): @@ -64,10 +67,14 @@ def compare_files(obs, exp): # the output directory for a run we didn't run(). job.log_path = self.path("data", "seq_counts_logs") - obs = json_load(open(job._aggregate_counts(), 'r')) - exp = json_load(open(self.exp_results, 'r')) + obs = pd.read_csv(job._aggregate_counts(self.dummy_sample_sheet), + sep=',', dtype='str') + exp = pd.read_csv(self.exp_results, sep=',', dtype='str') - self.assertDictEqual(obs, exp) + # assert_frame_equal will raise an AssertionError if the dfs are not + # equal. The AssertionError message itself will be the have the + # best description of the error so return it to the user. + assert_frame_equal(obs, exp, check_like=True) if __name__ == '__main__': diff --git a/sequence_processing_pipeline/tests/test_commands.py b/sequence_processing_pipeline/tests/test_commands.py index ac8a4bd9..bad8e107 100644 --- a/sequence_processing_pipeline/tests/test_commands.py +++ b/sequence_processing_pipeline/tests/test_commands.py @@ -42,6 +42,45 @@ class MockStat: obs_1 = open(tmp + '/prefix-2').read() self.assertEqual(obs_1, exp_2) + @patch('os.stat') + @patch('glob.glob') + def test_split_similar_size_bins_odd_sample_names(self, glob, stat): + """ + # to prevent issues w/filenames like the ones below from being mistaken + # for R1 or R2 files, use determine_orientation(). + """ + class MockStat: + st_size = 2 ** 28 # 256MB + + mockglob = ['/foo/bar/Sample1_R1_001.fastq.gz', + '/foo/bar/Sample2_R2_001.fastq.gz', + '/foo/bar/Sample1_R2_001.fastq.gz', + '/foo/baz/Sample3_R2_SRE_S2_L007_R1_001.fastq.gz', + '/foo/baz/Sample3_R1_SRE_S2_L007_R2_001.fastq.gz', + '/foo/bar/Sample2_R1_001.fastq.gz'] + + with TemporaryDirectory() as tmp: + exp = (2, 1073741824) + stat.return_value = MockStat() # 512MB + glob.return_value = mockglob + obs = split_similar_size_bins('foo', 1, tmp + '/prefix') + self.assertEqual(obs, exp) + + exp_1 = ('/foo/bar/Sample1_R1_001.fastq.gz\t' + '/foo/bar/Sample1_R2_001.fastq.gz\t' + 'bar\n' + '/foo/bar/Sample2_R1_001.fastq.gz\t' + '/foo/bar/Sample2_R2_001.fastq.gz\t' + 'bar\n') + exp_2 = ('/foo/baz/Sample3_R1_SRE_S2_L007_R2_001.fastq.gz\t' + '/foo/baz/Sample3_R2_SRE_S2_L007_R1_001.fastq.gz\t' + 'baz\n') + + obs_1 = open(tmp + '/prefix-1').read() + self.assertEqual(obs_1, exp_1) + obs_1 = open(tmp + '/prefix-2').read() + self.assertEqual(obs_1, exp_2) + def test_demux(self): with TemporaryDirectory() as tmp: id_map = [ diff --git a/sequence_processing_pipeline/util.py b/sequence_processing_pipeline/util.py index e19bf98a..6a0781b5 100644 --- a/sequence_processing_pipeline/util.py +++ b/sequence_processing_pipeline/util.py @@ -1,22 +1,45 @@ import re -# PAIR_UNDERSCORE = (re.compile(r'_R1_'), '_R1_', '_R2_') -# The above will truncate on the first _R1_ found, which only works when _R1_ -# or _R2_ appears exactly once in a file path. When the wet-lab incorporates -# these same strings in their sample-names as descriptive metadata, this -# assumption is broken. For all raw fastq files being used as input into -# NuQCJob, we can assume they end in the following convention. Per Illumina -# spec, all fastq files end in _001 and we preserve this convention even at -# the cost of renaming output files from TRIntegrateJob. -# PAIR_DOT is kept as is, but may be removed later because for the purposes of -# SPP, no input should ever be named with dots instead of underscores. -PAIR_UNDERSCORE = (re.compile(r'_R1_001.fastq.gz'), - '_R1_001.fastq.gz', '_R2_001.fastq.gz') +PAIR_UNDERSCORE = (re.compile(r'_R1_'), '_R1_', '_R2_') PAIR_DOT = (re.compile(r'\.R1\.'), '.R1.', '.R2.') PAIR_TESTS = (PAIR_UNDERSCORE, PAIR_DOT) +def determine_orientation(file_name): + # aka forward, reverse, and indexed reads + orientations = ['R1', 'R2', 'I1', 'I2'] + + results = [] + + # assume orientation is always present in the file's name. + # assume that it is of one of the four forms above. + # assume that it is always the right-most occurance of the four + # orientations above. + # assume that orientation is encapsulated with either '_' or '.' + # e.g.: '_R1_', '.I2.'. + # assume users can and will include any or all of the four + # orientation as part of their filenames as well. e.g.: + # ABC_7_04_1776_R1_SRE_S3_L007_R2_001.trimmed.fastq.gz + for o in orientations: + variations = [f"_{o}_", f".{o}."] + for v in variations: + # rfind searches from the end of the string, rather than + # its beginning. It returns the position in the string + # where the substring begins. + results.append((file_name.rfind(v), o)) + + # the orientation will be the substring found with the maximum + # found value for pos. That is, it will be the substring that + # begins at the rightest most position in the file name. + results.sort(reverse=True) + + pos, orientation = results[0] + + # if no orientations were found, then return None. + return None if pos == -1 else orientation + + def iter_paired_files(files): """Yield matched r1/r2 paired files""" files = sorted(files) @@ -35,8 +58,19 @@ def iter_paired_files(files): if r2_exp not in r2_fp: raise ValueError(f"Cannot find '{r2_exp}' in '{r2_fp}'") - r1_prefix = r1_fp[:r1_fp.find(r1_exp)] - r2_prefix = r2_fp[:r2_fp.find(r2_exp)] + # replace find w/find so that search for R1 and R2 begin + # from the end of the string, not the beginning. This prevents + # the code from breaking when filenames include R1 and R2 as + # part of their name in addition to representing forward and + # reversed reads e.g.: + # LS_8_22_2014_R1_SRE_S3_L007_R1_001.trimmed.fastq.gz + # LS_8_22_2014_R1_SRE_S3_L007_R2_001.trimmed.fastq.gz + # using find(), r1_prefix and r2_prefix will be the following: + # r1_prefix will be: LS_8_22_2014 + # r2_prefix will be: LS_8_22_2014_R1_SRE_S3_L007 + + r1_prefix = r1_fp[:r1_fp.rfind(r1_exp)] + r2_prefix = r2_fp[:r2_fp.rfind(r2_exp)] if r1_prefix != r2_prefix: raise ValueError(f"Mismatch prefixes:\n{r1_prefix}\n"