Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates to support proper function and testing of code in qp-klp. #110

Merged
merged 10 commits into from
Nov 14, 2023
19 changes: 16 additions & 3 deletions sequence_processing_pipeline/Commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ def split_similar_size_bins(data_location_path, max_file_list_size_in_gb,
# 72dc6080-c453-418e-8a47-1ccd6d646a30/ConvertJob/Tell_Seq_15196.
# Each of these directories contain R1 and R2 fastq files. Hence, path
# is now the following:
fastq_paths = glob.glob(data_location_path + '*/*.fastq.gz')
# add one more level to account for project_names nested under ConvertJob
# dir.
fastq_paths = glob.glob(data_location_path + '*/*/*.fastq.gz')

# convert from GB and halve as we sum R1
max_size = (int(max_file_list_size_in_gb) * (2 ** 30) / 2)
Expand Down Expand Up @@ -55,6 +57,16 @@ def split_similar_size_bins(data_location_path, max_file_list_size_in_gb,
return split_offset


def demux_cmd(id_map_fp, fp_fp, out_d, encoded, threads):
with open(id_map_fp, 'r') as f:
id_map = f.readlines()
id_map = [line.rstrip() for line in id_map]

# fp needs to be an open file handle.
with open(fp_fp, 'r') as fp:
demux(id_map, fp, out_d, encoded, threads)


def demux(id_map, fp, out_d, encoded, threads):
"""Split infile data based in provided map"""
delimiter = '::MUX::'
Expand All @@ -74,8 +86,9 @@ def demux(id_map, fp, out_d, encoded, threads):

# setup output locations
outdir = out_d + sep + outbase
fullname_r1 = outdir + sep + fname_r1
fullname_r2 = outdir + sep + fname_r2

fullname_r1 = outdir + sep + fname_r1 + '.fastq.gz'
fullname_r2 = outdir + sep + fname_r2 + '.fastq.gz'

os.makedirs(outdir, exist_ok=True)
current_fp_r1 = pgzip.open(fullname_r1, mode, thread=threads,
Expand Down
99 changes: 78 additions & 21 deletions sequence_processing_pipeline/NuQCJob.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
from metapool import KLSampleSheet, validate_and_scrub_sample_sheet
from os import stat, makedirs
from os.path import join, exists
from os.path import join, basename, dirname, exists
from sequence_processing_pipeline.Job import Job
from sequence_processing_pipeline.PipelineError import PipelineError
from sequence_processing_pipeline.Pipeline import Pipeline
from shutil import move
import logging
from sequence_processing_pipeline.Commands import split_similar_size_bins
from sequence_processing_pipeline.util import iter_paired_files
from jinja2 import Environment, FileSystemLoader
from jinja2 import Environment, PackageLoader
import glob
import re
from json import dumps
from sys import executable


logging.basicConfig(level=logging.DEBUG)
Expand Down Expand Up @@ -68,18 +69,21 @@ def __init__(self, fastq_root_dir, output_path, sample_sheet_path,
self.qiita_job_id = qiita_job_id
self.pool_size = pool_size
self.suffix = 'fastq.gz'
self.jinja_env = Environment(loader=FileSystemLoader("templates/"))

# for projects that use sequence_processing_pipeline as a dependency,
# jinja_env must be set to sequence_processing_pipeline's root path,
# rather than the project's root path.
self.jinja_env = Environment(loader=PackageLoader('sequence_processing'
'_pipeline',
'templates'))

self.counts = {}
self.known_adapters_path = known_adapters_path

self.max_file_list_size_in_gb = bucket_size

self.temp_dir = join(self.output_path, 'tmp')
makedirs(self.temp_dir, exist_ok=True)
self.batch_prefix = "hd-split-pangenome"

self.minimum_bytes = 3100

self._validate_project_data()

def _validate_project_data(self):
Expand Down Expand Up @@ -167,33 +171,73 @@ def run(self, callback=None):
# for now.
exec_from=self.log_path,
callback=callback)

job_id = job_info['job_id']
logging.debug(f'QCJob {job_id} completed')
logging.debug(f'NuQCJob {job_id} completed')

for project in self.project_data:
project_name = project['Sample_Project']
needs_human_filtering = project['HumanFiltering']

source_dir = join(self.output_path, project_name)
pattern = f"{source_dir}/*.fastq.gz"
completed_files = list(glob.glob(pattern))

if not self._get_failed_indexes(project_name, job_id):
raise PipelineError("QCJob did not complete successfully.")

# TODO: IMPLEMNENT A NEW FILTER FOR FILTERED FASTQ.GZ FILES THAT
# ARE BELOW THE MINIMUM FILE SIZE THRESHOLD INTO A NEW FOLDER
# NAMED 'ZERO-LENGTH-FILES'.

# determine where the filtered fastq files can be found and move
# the 'zero-length' files to another directory.
if needs_human_filtering is True:
filtered_directory = join(source_dir, 'filtered_sequences')
else:
filtered_directory = join(source_dir, 'trimmed_sequences')

if not exists(filtered_directory):
raise PipelineError(f"{filtered_directory} does not exist")
# create the properly named directory to move files to in
# in order to preserve legacy behavior.
makedirs(filtered_directory, exist_ok=True)

# get the list of sample-names in this project.
samples_in_project = [x[0] for x in self.sample_ids
if x[1] == project_name]

files_to_move = []
regex = re.compile(r'^(.*)_S\d_L\d\d\d_R\d+_\d\d\d\.fastq\.gz$')
for fp in completed_files:
file_name = basename(fp)
substr = regex.search(file_name)
if substr is None:
raise ValueError(f"{file_name} does not follow naming "
" pattern.")
else:
# check if found substring is a member of this
# project. Note sample-name != sample-id
if substr[1] in samples_in_project:
files_to_move.append(fp)

for fp in files_to_move:
move(fp, filtered_directory)

# once fastq.gz files have been moved into the right project,
# we now need to consider the html and json fastp_reports
# files.
old_html_path = join(self.output_path, 'fastp_reports_dir', 'html')
old_json_path = join(self.output_path, 'fastp_reports_dir', 'json')

new_html_path = join(source_dir, 'fastp_reports_dir', 'html')
new_json_path = join(source_dir, 'fastp_reports_dir', 'json')

makedirs(new_html_path, exist_ok=True)
makedirs(new_json_path, exist_ok=True)

pattern = f"{old_html_path}/*.html"
completed_htmls = list(glob.glob(pattern))

for fp in completed_htmls:
move(fp, new_html_path)

pattern = f"{old_json_path}/*.json"
completed_jsons = list(glob.glob(pattern))

for fp in completed_jsons:
move(fp, new_json_path)

charles-cowart marked this conversation as resolved.
Show resolved Hide resolved
# now that files are separated by project as per legacy
# operation, continue normal processing.
empty_files_directory = join(source_dir, 'zero_files')
self._filter_empty_fastq_files(filtered_directory,
empty_files_directory,
Expand Down Expand Up @@ -283,6 +327,16 @@ def _generate_job_script(self):

job_name = f'{self.qiita_job_id}_{self.job_name}'

html_path = join(self.output_path, 'fastp_reports_dir', 'html')
json_path = join(self.output_path, 'fastp_reports_dir', 'json')

# get location of python executable in this environment.
# demux script should be present in the same location.
demux_path = join(dirname(executable), 'demux')

if not exists(demux_path):
raise ValueError(f"{demux_path} does not exist.")

with open(job_script_path, mode="w", encoding="utf-8") as f:
# the job resources should come from a configuration file
f.write(template.render(job_name=job_name,
Expand All @@ -293,6 +347,9 @@ def _generate_job_script(self):
node_count=1,
cores_per_task=4,
knwn_adpt_path=self.known_adapters_path,
output_path=self.output_path))
output_path=self.output_path,
html_path=html_path,
json_path=json_path,
demux_path=demux_path))

return job_script_path
46 changes: 30 additions & 16 deletions sequence_processing_pipeline/templates/nuqc_job.sh
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#!/bin/bash -l
#SBATCH -J {{job_name}}
#SBATCH -p {{queue_name}}
# wall-time-limit in minutes
### wall-time-limit in minutes
#SBATCH --time {{wall_time_limit}}
#SBATCH --mem {{mem_in_gb}}gb
#SBATCH --mem {{mem_in_gb}}G
#SBATCH -N {{node_count}}
#SBATCH -c {{cores_per_task}}

Expand Down Expand Up @@ -37,6 +37,8 @@ if [[ -z ${TMPDIR} ]]; then
exit 1
fi

echo "MMI is ${MMI}"

conda activate human-depletion

set -x
Expand All @@ -45,20 +47,23 @@ set -e
date
hostname
echo ${SLURM_JOBID} ${SLURM_ARRAY_TASK_ID}
# output_path = output_path passed to Job objects + 'NuQCJob'
# e.g.: working-directory/ConvertJob, working-directory/QCJob...
cd {{output_path}}
### output_path = output_path passed to Job objects + 'NuQCJob'
### e.g.: working-directory/ConvertJob, working-directory/QCJob...
cd {{output_path}}/tmp
charles-cowart marked this conversation as resolved.
Show resolved Hide resolved

# set a temp directory, make a new unique one under it and
# make sure we clean up as we're dumping to shm
# DO NOT do this casually. Only do a clean up like this if
# you know for sure TMPDIR is what you want.
### set a temp directory, make a new unique one under it and
### make sure we clean up as we're dumping to shm
### DO NOT do this casually. Only do a clean up like this if
### you know for sure TMPDIR is what you want.

mkdir -p ${TMPDIR}
export TMPDIR=${TMPDIR}
export TMPDIR=$(mktemp -d)
echo $TMPDIR

mkdir -p {{html_path}}
mkdir -p {{json_path}}

function cleanup {
echo "Removing $TMPDIR"
rm -fr $TMPDIR
Expand All @@ -77,13 +82,21 @@ n=$(wc -l ${FILES} | cut -f 1 -d" ")

for i in $(seq 1 ${n})
do
echo "Beginning loop iteration ${i}"
line=$(head -n ${i} ${FILES} | tail -n 1)
r1=$(echo ${line} | cut -f 1 -d" ")
r2=$(echo ${line} | cut -f 2 -d" ")
base=$(echo ${line} | cut -f 3 -d" ")
r1_name=$(basename ${r1} .fastq.gz)
r2_name=$(basename ${r2} .fastq.gz)

# for now just make sure each file is saved and we can read the data inside
# to sort them out later.

s_name=$(basename "${r1}" | sed -r 's/\.fastq\.gz//')
charles-cowart marked this conversation as resolved.
Show resolved Hide resolved
html_name=$(echo "$s_name.html")
json_name=$(echo "$s_name.json")

echo "${i} ${r1_name} ${r2_name} ${base}" >> ${TMPDIR}/id_map

fastp \
Expand All @@ -92,13 +105,13 @@ do
-I ${r2} \
-w 7 \
--adapter_fasta {{knwn_adpt_path}} \
--html /dev/null \
--json /dev/null \
--html {{html_path}}/${html_name} \
--json {{json_path}}/${json_name} \
--stdout | \
sed -r "1~4s/^@(.*)/@${i}${delimiter}\1/"
done > ${TMPDIR}/seqs.fastq

function minimap2 () {
function minimap2_runner () {
mmi=$1

echo "$(date) :: $(basename ${mmi})"
Expand All @@ -112,7 +125,7 @@ function runner () {

# with the current proposed resources, we likely do not get
# benefit for parallel gzip write
mgscripts demux \
{{demux_path}} \
--id-map ${TMPDIR}/id_map \
--infile ${TMPDIR}/seqs.fastq \
--output ${OUTPUT} \
Expand All @@ -122,11 +135,11 @@ function runner () {
export -f runner

if [[ -f ${MMI} ]]; then
minimap2 ${MMI}
minimap2_runner ${MMI}
else
for mmi in ${MMI}/*.mmi
do
minimap2 ${mmi}
minimap2_runner ${mmi}
done
fi

Expand All @@ -135,8 +148,9 @@ mkdir -p ${OUTPUT}
jobs=${SLURM_JOB_CPUS_PER_NODE}

echo "$(date) :: demux start"
# let it do its thing

seq 1 ${n} | parallel -j ${jobs} runner

echo "$(date) :: demux stop"

touch ${OUTPUT}/${SLURM_JOB_NAME}.${SLURM_JOB_ID}.completed
Loading