Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates based on testing #160

Merged
merged 7 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 41 additions & 22 deletions sequence_processing_pipeline/Commands.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import glob
import gzip
import os
from sequence_processing_pipeline.util import iter_paired_files
from sequence_processing_pipeline.util import (iter_paired_files,
determine_orientation)


def split_similar_size_bins(data_location_path, max_file_list_size_in_gb,
Expand All @@ -14,21 +15,17 @@ def split_similar_size_bins(data_location_path, max_file_list_size_in_gb,
:return: The number of output-files created, size of largest bin.
'''

# SPP root paths are of the form:
# ../72dc6080-c453-418e-8a47-1ccd6d646a30/ConvertJob, and contain only
# directories named after projects e.g:
# 72dc6080-c453-418e-8a47-1ccd6d646a30/ConvertJob/Tell_Seq_15196.
# Each of these directories contain R1 and R2 fastq files. Hence, path
# is now the following:
# add one more level to account for project_names nested under ConvertJob
# dir.
# this will ignore the _I1_ reads that appear in the integrated result.
fastq_paths = glob.glob(data_location_path + '*/*/*.fastq.gz')

# case-specific filter for TellSeq output directories that also contain
# _I1_ files. Ensure paths are still sorted afterwards.
fastq_paths = [x for x in fastq_paths if '_I1_001.fastq.gz' not in x]
fastq_paths = sorted(fastq_paths)
# to prevent issues w/filenames like the ones below from being mistaken
# for R1 or R2 files, use determine_orientation().
# LS_8_22_2014_R2_SRE_S2_L007_I1_001.fastq.gz
# LS_8_22_2014_R1_SRE_S3_L007_I1_001.fastq.gz

# since the names of all fastq files are being scanned for orientation,
# collect all of them instead of mistakenly pre-filtering some files.
# fastq_paths = glob.glob(data_location_path + '/*/*_R?_*.fastq.gz')
fastq_paths = glob.glob(data_location_path + '/*/*.fastq.gz')
fastq_paths = [x for x in fastq_paths
if determine_orientation(x) in ['R1', 'R2']]

# convert from GB and halve as we sum R1
max_size = (int(max_file_list_size_in_gb) * (2 ** 30) / 2)
Expand Down Expand Up @@ -114,21 +111,43 @@ def demux(id_map, fp, out_d, task, maxtask):
openfps[idx] = current_fp

# setup a parser
id_ = iter(fp)
seq_id = iter(fp)
seq = iter(fp)
dumb = iter(fp)
qual = iter(fp)

for i, s, d, q in zip(id_, seq, dumb, qual):
fname_encoded, id_ = i.split(delimiter, 1)
for i, s, d, q in zip(seq_id, seq, dumb, qual):
# '@1', 'LH00444:84:227CNHLT4:7:1101:41955:2443/1'
# '@1', 'LH00444:84:227CNHLT4:7:1101:41955:2443/1 BX:Z:TATGACACATGCGGCCCT' # noqa
# '@baz/1

fname_encoded, sid = i.split(delimiter, 1)

if fname_encoded not in openfps:
continue

orientation = id_[-2] # -1 is \n
current_fp = openfps[fname_encoded]
id_ = rec + id_
current_fp[orientation].write(id_)

# remove '\n' from sid and split on all whitespace.
tmp = sid.strip().split()

if len(tmp) == 1:
# sequence id line contains no optional metadata.
# don't change sid.
# -1 is \n
orientation = sid[-2]
sid = rec + sid
elif len(tmp) == 2:
sid = tmp[0]
metadata = tmp[1]
# no '\n'
orientation = sid[-1]
# hexdump confirms separator is ' ', not '\t'
sid = rec + sid + ' ' + metadata + '\n'
else:
raise ValueError(f"'{sid}' is not a recognized form")

current_fp[orientation].write(sid)
current_fp[orientation].write(s)
current_fp[orientation].write(d)
current_fp[orientation].write(q)
Expand Down
2 changes: 2 additions & 0 deletions sequence_processing_pipeline/FastQCJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,11 +278,13 @@ def _generate_job_script(self):
lines.append(f"#SBATCH -n {self.nprocs}")
lines.append("#SBATCH --time %d" % self.wall_time_limit)
lines.append(f"#SBATCH --mem {self.jmem}")
lines.append('#SBATCH --constraint="intel"')

lines.append("#SBATCH --array 1-%d%%%d" % (
len(self.commands), self.pool_size))

lines.append("set -x")
lines.append("set +e")
lines.append('date')
lines.append('hostname')
lines.append('echo ${SLURM_JOBID} ${SLURM_ARRAY_TASK_ID}')
Expand Down
51 changes: 32 additions & 19 deletions sequence_processing_pipeline/GenPrepFileJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
class GenPrepFileJob(Job):
def __init__(self, run_dir, convert_job_path, qc_job_path, output_path,
input_file_path, seqpro_path, modules_to_load,
qiita_job_id, is_amplicon=False):
qiita_job_id, reports_path, is_amplicon=False):

super().__init__(run_dir,
output_path,
Expand All @@ -31,13 +31,22 @@ def __init__(self, run_dir, convert_job_path, qc_job_path, output_path,
self.commands = []
self.has_replicates = False
self.replicate_count = 0
self.reports_path = reports_path

# make the 'root' of your run_directory
makedirs(join(self.output_path, self.run_id), exist_ok=True)
# copy bcl-convert's Stats-equivalent directory to the
# run_directory
copytree(join(convert_job_path, 'Reports'),
join(self.output_path, self.run_id, 'Reports'))

# This directory will already exist on restarts, hence avoid
# copying.
reports_dir = join(self.output_path, self.run_id, 'Reports')

if exists(reports_dir):
self.is_restart = True
else:
self.is_restart = False
copytree(self.reports_path, reports_dir)

# extracting from either convert_job_path or qc_job_path should
# produce equal results.
Expand All @@ -52,22 +61,26 @@ def __init__(self, run_dir, convert_job_path, qc_job_path, output_path,

dst = join(self.output_path, self.run_id, project)

if self.is_amplicon:
if exists(amplicon_seq_dir):
makedirs(dst, exist_ok=True)
symlink(amplicon_seq_dir, join(dst, 'amplicon'))
else:
if exists(filtered_seq_dir):
makedirs(dst, exist_ok=True)
symlink(filtered_seq_dir, join(dst, 'filtered_sequences'))

if exists(trimmed_seq_dir):
makedirs(dst, exist_ok=True)
symlink(trimmed_seq_dir, join(dst, 'trimmed_sequences'))

if exists(fastp_rept_dir):
makedirs(dst, exist_ok=True)
symlink(fastp_rept_dir, join(dst, 'json'))
if not self.is_restart:
# these will already be created if restarted.
if self.is_amplicon:
if exists(amplicon_seq_dir):
makedirs(dst, exist_ok=True)
symlink(amplicon_seq_dir, join(dst, 'amplicon'))
else:
if exists(filtered_seq_dir):
makedirs(dst, exist_ok=True)
symlink(filtered_seq_dir, join(dst,
'filtered_sequences'))

if exists(trimmed_seq_dir):
makedirs(dst, exist_ok=True)
symlink(trimmed_seq_dir, join(dst,
'trimmed_sequences'))

if exists(fastp_rept_dir):
makedirs(dst, exist_ok=True)
symlink(fastp_rept_dir, join(dst, 'json'))

# seqpro usage:
# seqpro path/to/run_dir path/to/sample/sheet /path/to/fresh/output_dir
Expand Down
137 changes: 117 additions & 20 deletions sequence_processing_pipeline/SeqCountsJob.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from os.path import join, split
from collections import defaultdict
from .Job import Job, KISSLoader
from .PipelineError import JobFailedError
import logging
from glob import glob
from jinja2 import Environment
from metapool import load_sample_sheet
from os import walk
from json import dumps
from glob import glob
from os.path import join, split
import logging
import pandas as pd
import re


logging.basicConfig(level=logging.DEBUG)
Expand All @@ -15,7 +18,7 @@ class SeqCountsJob(Job):
def __init__(self, run_dir, output_path, queue_name,
node_count, wall_time_limit, jmem, modules_to_load,
qiita_job_id, max_array_length, files_to_count_path,
cores_per_task=4):
sample_sheet_path, cores_per_task=4):
"""
ConvertJob provides a convenient way to run bcl-convert or bcl2fastq
on a directory BCL files to generate Fastq files.
Expand All @@ -29,6 +32,7 @@ def __init__(self, run_dir, output_path, queue_name,
:param qiita_job_id: identify Torque jobs using qiita_job_id
:param max_array_length: A hard-limit for array-sizes
:param files_to_count_path: A path to a list of file-paths to count.
:param sample_sheet_path: A path to the sample-sheet.
:param cores_per_task: (Optional) # of CPU cores per node to request.
"""
super().__init__(run_dir,
Expand All @@ -50,6 +54,7 @@ def __init__(self, run_dir, output_path, queue_name,

self.job_name = (f"seq_counts_{self.qiita_job_id}")
self.files_to_count_path = files_to_count_path
self.sample_sheet_path = sample_sheet_path

with open(self.files_to_count_path, 'r') as f:
lines = f.readlines()
Expand All @@ -61,7 +66,7 @@ def run(self, callback=None):
job_script_path = self._generate_job_script()
params = ['--parsable',
f'-J {self.job_name}',
f'--array 1-{self.sample_count}']
f'--array 1-{self.file_count}']
try:
self.job_info = self.submit_job(job_script_path,
job_parameters=' '.join(params),
Expand All @@ -77,7 +82,7 @@ def run(self, callback=None):
info.insert(0, str(e))
raise JobFailedError('\n'.join(info))

self._aggregate_counts()
self._aggregate_counts(self.sample_sheet_path)

logging.debug(f'SeqCountJob {self.job_info["job_id"]} completed')

Expand All @@ -96,6 +101,7 @@ def _generate_job_script(self):
"cores_per_task": self.cores_per_task,
"queue_name": self.queue_name,
"file_count": self.file_count,
"files_to_count_path": self.files_to_count_path,
"output_path": self.output_path
}))

Expand All @@ -113,18 +119,24 @@ def parse_logs(self):

return [msg.strip() for msg in msgs]

def _aggregate_counts(self):
def extract_metadata(fp):
with open(fp, 'r') as f:
def _aggregate_counts_by_file(self):
# aggregates sequence & bp counts from a directory of log files.

def extract_metadata(log_output_file_path):
"""
extracts sequence & bp counts from individual log files.
"""
with open(log_output_file_path, 'r') as f:
lines = f.readlines()
lines = [x.strip() for x in lines]
if len(lines) != 2:
raise ValueError("error processing %s" % fp)
raise ValueError(
"error processing %s" % log_output_file_path)
_dir, _file = split(lines[0])
seq_counts, base_pairs = lines[1].split('\t')
return _dir, _file, int(seq_counts), int(base_pairs)

results = {}
results = defaultdict(dict)

for root, dirs, files in walk(self.log_path):
for _file in files:
Expand All @@ -133,15 +145,100 @@ def extract_metadata(fp):
_dir, _file, seq_counts, base_pairs = \
extract_metadata(log_output_file)

if _dir not in results:
results[_dir] = {}
results[_file] = {
'seq_counts': seq_counts,
'base_pairs': base_pairs
}

return results

def _aggregate_counts(self, sample_sheet_path):
"""
Aggregate results by sample_ids and write to file.
Args:
sample_sheet_path:

Returns: None
"""
def get_metadata(sample_sheet_path):
sheet = load_sample_sheet(sample_sheet_path)

lanes = []

if sheet.validate_and_scrub_sample_sheet():
results = {}

for sample in sheet.samples:
barcode_id = sample['barcode_id']
sample_id = sample['Sample_ID']
lanes.append(sample['Lane'])
results[barcode_id] = sample_id

lanes = list(set(lanes))

if len(lanes) != 1:
raise ValueError(
"More than one lane is declared in sample-sheet")

return results, lanes[0]

mapping, lane = get_metadata(sample_sheet_path)

# aggregate results by filename
by_files = self._aggregate_counts_by_file()

# aggregate results by sample. This means aggregating metadata
# across two file names.
counts = defaultdict(dict)

for _file in by_files:
if 'erroneous' in _file:
# for now, don't include sequences in erroneous files as part
charles-cowart marked this conversation as resolved.
Show resolved Hide resolved
# of the base counts.
continue

if re.match(r'TellReadJob_I1_C\d\d\d\.fastq.gz.corrected.err_'
r'barcode_removed.fastq', _file):
# skip indexed read files. We only want counts from R1 and R2
# files.
continue

m = re.match(r'TellReadJob_(R\d)_(C\d\d\d)\.fastq.gz.corrected.'
r'err_barcode_removed.fastq', _file)

if not m:
# if the filename doesn't match this pattern then unpredicted
# behavior has occured.
raise ValueError(
"'%s' doesnt appear to be a valid file name." % _file)

orientation = m.group(1)
barcode_id = m.group(2)

sample_id = mapping[barcode_id]

counts[sample_id][orientation] = by_files[_file]['seq_counts']

sample_ids = []
read_counts = []
lanes = []

for sample_id in counts:
sample_ids.append(sample_id)
read_counts.append(counts[sample_id]['R1'] +
counts[sample_id]['R2'])
lanes.append(lane)

df = pd.DataFrame(data={'Sample_ID': sample_ids,
'raw_reads_r1r2': read_counts,
'Lane': lanes})

results[_dir][_file] = {'seq_counts': seq_counts,
'base_pairs': base_pairs}
df.set_index(['Sample_ID', 'Lane'], verify_integrity=True)

results_path = join(self.output_path, 'aggregate_counts.json')
# sort results into a predictable order for testing purposes
df = df.sort_values(by='Sample_ID')

with open(results_path, 'w') as f:
print(dumps(results, indent=2), file=f)
result_path = join(self.output_path, 'SeqCounts.csv')
df.to_csv(result_path, index=False, sep=",")

return results_path
return result_path
Loading
Loading