Skip to content

Commit

Permalink
Updates to support proper function and testing of code in qp-klp. (#110)
Browse files Browse the repository at this point in the history
* WIP Updates to support proper function and testing of code in qp-klp.

* WIP Updates to support NuQCJob tests

* WIP Updates based on testing w/qp-klp

Needs an additional test for the fastp_reports migration.

* Updates based on testing

* Added test for job-script generation

* Updates based on testing on qiita-rc

* Corrects error in HTML and JSON file moves

For runs with multiple projects, all html and json files would be copied
to a single project's subdirectories. This fix makes it so each html and
json files is moved to it's proper location.

* unittest for regex added.

* update based on feedback

* Update change-dir to use env variable
  • Loading branch information
charles-cowart authored Nov 14, 2023
1 parent 93db942 commit 0297979
Show file tree
Hide file tree
Showing 7 changed files with 409 additions and 521 deletions.
19 changes: 16 additions & 3 deletions sequence_processing_pipeline/Commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ def split_similar_size_bins(data_location_path, max_file_list_size_in_gb,
# 72dc6080-c453-418e-8a47-1ccd6d646a30/ConvertJob/Tell_Seq_15196.
# Each of these directories contain R1 and R2 fastq files. Hence, path
# is now the following:
fastq_paths = glob.glob(data_location_path + '*/*.fastq.gz')
# add one more level to account for project_names nested under ConvertJob
# dir.
fastq_paths = glob.glob(data_location_path + '*/*/*.fastq.gz')

# convert from GB and halve as we sum R1
max_size = (int(max_file_list_size_in_gb) * (2 ** 30) / 2)
Expand Down Expand Up @@ -55,6 +57,16 @@ def split_similar_size_bins(data_location_path, max_file_list_size_in_gb,
return split_offset


def demux_cmd(id_map_fp, fp_fp, out_d, encoded, threads):
with open(id_map_fp, 'r') as f:
id_map = f.readlines()
id_map = [line.rstrip() for line in id_map]

# fp needs to be an open file handle.
with open(fp_fp, 'r') as fp:
demux(id_map, fp, out_d, encoded, threads)


def demux(id_map, fp, out_d, encoded, threads):
"""Split infile data based in provided map"""
delimiter = '::MUX::'
Expand All @@ -74,8 +86,9 @@ def demux(id_map, fp, out_d, encoded, threads):

# setup output locations
outdir = out_d + sep + outbase
fullname_r1 = outdir + sep + fname_r1
fullname_r2 = outdir + sep + fname_r2

fullname_r1 = outdir + sep + fname_r1 + '.fastq.gz'
fullname_r2 = outdir + sep + fname_r2 + '.fastq.gz'

os.makedirs(outdir, exist_ok=True)
current_fp_r1 = pgzip.open(fullname_r1, mode, thread=threads,
Expand Down
117 changes: 96 additions & 21 deletions sequence_processing_pipeline/NuQCJob.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
from metapool import KLSampleSheet, validate_and_scrub_sample_sheet
from os import stat, makedirs
from os.path import join, exists
from os.path import join, basename, dirname, exists
from sequence_processing_pipeline.Job import Job
from sequence_processing_pipeline.PipelineError import PipelineError
from sequence_processing_pipeline.Pipeline import Pipeline
from shutil import move
import logging
from sequence_processing_pipeline.Commands import split_similar_size_bins
from sequence_processing_pipeline.util import iter_paired_files
from jinja2 import Environment, FileSystemLoader
from jinja2 import Environment, PackageLoader
import glob
import re
from json import dumps
from sys import executable


logging.basicConfig(level=logging.DEBUG)
Expand Down Expand Up @@ -68,17 +69,25 @@ def __init__(self, fastq_root_dir, output_path, sample_sheet_path,
self.qiita_job_id = qiita_job_id
self.pool_size = pool_size
self.suffix = 'fastq.gz'
self.jinja_env = Environment(loader=FileSystemLoader("templates/"))

# for projects that use sequence_processing_pipeline as a dependency,
# jinja_env must be set to sequence_processing_pipeline's root path,
# rather than the project's root path.
self.jinja_env = Environment(loader=PackageLoader('sequence_processing'
'_pipeline',
'templates'))

self.counts = {}
self.known_adapters_path = known_adapters_path

self.max_file_list_size_in_gb = bucket_size

self.temp_dir = join(self.output_path, 'tmp')
makedirs(self.temp_dir, exist_ok=True)
self.batch_prefix = "hd-split-pangenome"

self.minimum_bytes = 3100
self.fastq_regex = re.compile(r'^(.*)_S\d{1,4}_L\d{3}_R\d_\d{3}'
r'\.fastq\.gz$')
self.html_regex = re.compile(r'^(.*)_S\d{1,4}_L\d{3}_R\d_\d{3}\.html$')
self.json_regex = re.compile(r'^(.*)_S\d{1,4}_L\d{3}_R\d_\d{3}\.json')

self._validate_project_data()

Expand Down Expand Up @@ -136,6 +145,23 @@ def _filter_empty_fastq_files(self, filtered_directory,
logging.debug(f'moving {item}')
move(item, empty_files_directory)

def _move_helper(self, completed_files, regex, samples_in_project, dst):
files_to_move = []
for fp in completed_files:
file_name = basename(fp)
substr = regex.search(file_name)
if substr is None:
raise ValueError(f"{file_name} does not follow naming "
" pattern.")
else:
# check if found substring is a member of this
# project. Note sample-name != sample-id
if substr[1] in samples_in_project:
files_to_move.append(fp)

for fp in files_to_move:
move(fp, dst)

def run(self, callback=None):
# now a single job-script will be created to process all projects at
# the same time, and intelligently handle adapter-trimming as needed
Expand Down Expand Up @@ -167,33 +193,69 @@ def run(self, callback=None):
# for now.
exec_from=self.log_path,
callback=callback)

job_id = job_info['job_id']
logging.debug(f'QCJob {job_id} completed')
logging.debug(f'NuQCJob {job_id} completed')

for project in self.project_data:
project_name = project['Sample_Project']
needs_human_filtering = project['HumanFiltering']

source_dir = join(self.output_path, project_name)
pattern = f"{source_dir}/*.fastq.gz"
completed_files = list(glob.glob(pattern))

if not self._get_failed_indexes(project_name, job_id):
raise PipelineError("QCJob did not complete successfully.")

# TODO: IMPLEMNENT A NEW FILTER FOR FILTERED FASTQ.GZ FILES THAT
# ARE BELOW THE MINIMUM FILE SIZE THRESHOLD INTO A NEW FOLDER
# NAMED 'ZERO-LENGTH-FILES'.

# determine where the filtered fastq files can be found and move
# the 'zero-length' files to another directory.
if needs_human_filtering is True:
filtered_directory = join(source_dir, 'filtered_sequences')
else:
filtered_directory = join(source_dir, 'trimmed_sequences')

if not exists(filtered_directory):
raise PipelineError(f"{filtered_directory} does not exist")

# create the properly named directory to move files to in
# in order to preserve legacy behavior.
makedirs(filtered_directory, exist_ok=True)

# get the list of sample-names in this project.
samples_in_project = [x[0] for x in self.sample_ids
if x[1] == project_name]

# Tissue_1_Mag_Hom_DNASe_RIBO_S16_L001_R2_001.fastq.gz
# Nislux_SLC_Trizol_DNASe_S7_L001_R2_001.fastq.gz
self._move_helper(completed_files,
self.fastq_regex,
samples_in_project,
filtered_directory)

# once fastq.gz files have been moved into the right project,
# we now need to consider the html and json fastp_reports
# files.
old_html_path = join(self.output_path, 'fastp_reports_dir', 'html')
old_json_path = join(self.output_path, 'fastp_reports_dir', 'json')

new_html_path = join(source_dir, 'fastp_reports_dir', 'html')
new_json_path = join(source_dir, 'fastp_reports_dir', 'json')

makedirs(new_html_path, exist_ok=True)
makedirs(new_json_path, exist_ok=True)

# move all html files underneath the subdirectory for this project.
pattern = f"{old_html_path}/*.html"
completed_htmls = list(glob.glob(pattern))
self._move_helper(completed_htmls,
# Tissue_1_Super_Trizol_S19_L001_R1_001.html
self.html_regex,
samples_in_project,
new_html_path)

# move all json files underneath the subdirectory for this project.
pattern = f"{old_json_path}/*.json"
completed_jsons = list(glob.glob(pattern))
self._move_helper(completed_jsons,
# Tissue_1_Super_Trizol_S19_L001_R1_001.json
self.json_regex,
samples_in_project,
new_json_path)

# now that files are separated by project as per legacy
# operation, continue normal processing.
empty_files_directory = join(source_dir, 'zero_files')
self._filter_empty_fastq_files(filtered_directory,
empty_files_directory,
Expand Down Expand Up @@ -283,6 +345,16 @@ def _generate_job_script(self):

job_name = f'{self.qiita_job_id}_{self.job_name}'

html_path = join(self.output_path, 'fastp_reports_dir', 'html')
json_path = join(self.output_path, 'fastp_reports_dir', 'json')

# get location of python executable in this environment.
# demux script should be present in the same location.
demux_path = join(dirname(executable), 'demux')

if not exists(demux_path):
raise ValueError(f"{demux_path} does not exist.")

with open(job_script_path, mode="w", encoding="utf-8") as f:
# the job resources should come from a configuration file
f.write(template.render(job_name=job_name,
Expand All @@ -293,6 +365,9 @@ def _generate_job_script(self):
node_count=1,
cores_per_task=4,
knwn_adpt_path=self.known_adapters_path,
output_path=self.output_path))
output_path=self.output_path,
html_path=html_path,
json_path=json_path,
demux_path=demux_path))

return job_script_path
4 changes: 2 additions & 2 deletions sequence_processing_pipeline/scripts/cli.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import click
from sequence_processing_pipeline.Commands import demux as _demux
from sequence_processing_pipeline.Commands import demux_cmd


@click.group()
Expand All @@ -14,7 +14,7 @@ def cli():
@click.option('--encoded-id', type=str, required=True)
@click.option('--threads', type=int, required=True)
def demux(id_map, infile, output, encoded_id, threads):
_demux(id_map, infile, output, encoded_id, threads)
demux_cmd(id_map, infile, output, encoded_id, threads)


if __name__ == '__main__':
Expand Down
44 changes: 28 additions & 16 deletions sequence_processing_pipeline/templates/nuqc_job.sh
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#!/bin/bash -l
#SBATCH -J {{job_name}}
#SBATCH -p {{queue_name}}
# wall-time-limit in minutes
### wall-time-limit in minutes
#SBATCH --time {{wall_time_limit}}
#SBATCH --mem {{mem_in_gb}}gb
#SBATCH --mem {{mem_in_gb}}G
#SBATCH -N {{node_count}}
#SBATCH -c {{cores_per_task}}

Expand Down Expand Up @@ -37,6 +37,8 @@ if [[ -z ${TMPDIR} ]]; then
exit 1
fi

echo "MMI is ${MMI}"

conda activate human-depletion

set -x
Expand All @@ -45,20 +47,23 @@ set -e
date
hostname
echo ${SLURM_JOBID} ${SLURM_ARRAY_TASK_ID}
# output_path = output_path passed to Job objects + 'NuQCJob'
# e.g.: working-directory/ConvertJob, working-directory/QCJob...
cd {{output_path}}
### output_path = output_path passed to Job objects + 'NuQCJob'
### e.g.: working-directory/ConvertJob, working-directory/QCJob...
cd ${TMPDIR}

# set a temp directory, make a new unique one under it and
# make sure we clean up as we're dumping to shm
# DO NOT do this casually. Only do a clean up like this if
# you know for sure TMPDIR is what you want.
### set a temp directory, make a new unique one under it and
### make sure we clean up as we're dumping to shm
### DO NOT do this casually. Only do a clean up like this if
### you know for sure TMPDIR is what you want.

mkdir -p ${TMPDIR}
export TMPDIR=${TMPDIR}
export TMPDIR=$(mktemp -d)
echo $TMPDIR

mkdir -p {{html_path}}
mkdir -p {{json_path}}

function cleanup {
echo "Removing $TMPDIR"
rm -fr $TMPDIR
Expand All @@ -77,13 +82,19 @@ n=$(wc -l ${FILES} | cut -f 1 -d" ")

for i in $(seq 1 ${n})
do
echo "Beginning loop iteration ${i}"
line=$(head -n ${i} ${FILES} | tail -n 1)
r1=$(echo ${line} | cut -f 1 -d" ")
r2=$(echo ${line} | cut -f 2 -d" ")
base=$(echo ${line} | cut -f 3 -d" ")
r1_name=$(basename ${r1} .fastq.gz)
r2_name=$(basename ${r2} .fastq.gz)

# for now just make sure each file is saved and we can read the data inside
# to sort them out later.
html_name=$(echo "$r1_name.html")
json_name=$(echo "$r1_name.json")

echo "${i} ${r1_name} ${r2_name} ${base}" >> ${TMPDIR}/id_map

fastp \
Expand All @@ -92,13 +103,13 @@ do
-I ${r2} \
-w 7 \
--adapter_fasta {{knwn_adpt_path}} \
--html /dev/null \
--json /dev/null \
--html {{html_path}}/${html_name} \
--json {{json_path}}/${json_name} \
--stdout | \
sed -r "1~4s/^@(.*)/@${i}${delimiter}\1/"
done > ${TMPDIR}/seqs.fastq

function minimap2 () {
function minimap2_runner () {
mmi=$1

echo "$(date) :: $(basename ${mmi})"
Expand All @@ -112,7 +123,7 @@ function runner () {

# with the current proposed resources, we likely do not get
# benefit for parallel gzip write
mgscripts demux \
{{demux_path}} \
--id-map ${TMPDIR}/id_map \
--infile ${TMPDIR}/seqs.fastq \
--output ${OUTPUT} \
Expand All @@ -122,11 +133,11 @@ function runner () {
export -f runner

if [[ -f ${MMI} ]]; then
minimap2 ${MMI}
minimap2_runner ${MMI}
else
for mmi in ${MMI}/*.mmi
do
minimap2 ${mmi}
minimap2_runner ${mmi}
done
fi

Expand All @@ -135,8 +146,9 @@ mkdir -p ${OUTPUT}
jobs=${SLURM_JOB_CPUS_PER_NODE}

echo "$(date) :: demux start"
# let it do its thing

seq 1 ${n} | parallel -j ${jobs} runner

echo "$(date) :: demux stop"

touch ${OUTPUT}/${SLURM_JOB_NAME}.${SLURM_JOB_ID}.completed
Loading

0 comments on commit 0297979

Please sign in to comment.