Skip to content

Commit

Permalink
shm and se operation changes
Browse files Browse the repository at this point in the history
  • Loading branch information
wasade committed Dec 11, 2023
1 parent 4fd447b commit 5de964a
Showing 1 changed file with 176 additions and 70 deletions.
246 changes: 176 additions & 70 deletions sequence_processing_pipeline/templates/nuqc_job.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
### as well as sbatch -c. demux threads remains fixed at 1.
### Note -c set to 4 and thread counts set to 7 during testing.
#SBATCH -c {{cores_per_task}}
#SBATCH --gres=node_jobs:1 # use 1/4 processing slots

if [[ -z "${SLURM_ARRAY_TASK_ID}" ]]; then
echo "Not operating within an array"
Expand Down Expand Up @@ -41,32 +42,31 @@ conda activate human-depletion

set -x
set -e
set -o pipefail

date
hostname
echo ${SLURM_JOBID} ${SLURM_ARRAY_TASK_ID}
### output_path = output_path passed to Job objects + 'NuQCJob'
### e.g.: working-directory/ConvertJob, working-directory/QCJob...
cd {{output_path}}

### set a temp directory, make a new unique one under it and
### make sure we clean up as we're dumping to shm
### DO NOT do this casually. Only do a clean up like this if
### you know for sure TMPDIR is what you want.

export TMPDIR={{temp_dir}}/
# don't use mktemp -d to create a random temp dir.
# the one passed is unique already.
echo $TMPDIR

mkdir -p {{html_path}}
mkdir -p {{json_path}}

function cleanup {
echo "Removing $TMPDIR"
rm -fr $TMPDIR
unset TMPDIR
}
TMPDIR=/dev/shm
if [[ ! -d ${TMPDIR} ]]; then
echo "Cannot access ${TMPDIR}"
exit 1
fi
export TMPDIR=${TMPDIR}
export TMPDIR=$(mktemp -d)
echo $TMPDIR

function cleanup {
echo "Removing $TMPDIR"
rm -fr $TMPDIR
unset TMPDIR
}
trap cleanup EXIT

export FILES=$(printf "%s-%d" ${PREFIX} ${SLURM_ARRAY_TASK_ID})
Expand All @@ -76,38 +76,59 @@ if [[ ! -f ${FILES} ]]; then
fi

delimiter=::MUX::
n=$(wc -l ${FILES} | cut -f 1 -d" ")
export r1_tag=/1
export r2_tag=/2

for i in $(seq 1 ${n})
do
echo "Beginning loop iteration ${i}"
line=$(head -n ${i} ${FILES} | tail -n 1)
r1=$(echo ${line} | cut -f 1 -d" ")
r2=$(echo ${line} | cut -f 2 -d" ")
base=$(echo ${line} | cut -f 3 -d" ")
r1_name=$(basename ${r1} .fastq.gz)
r2_name=$(basename ${r2} .fastq.gz)

# for now just make sure each file is saved and we can read the data inside
# to sort them out later.

s_name=$(basename "${r1}" | sed -r 's/\.fastq\.gz//')
html_name=$(echo "$s_name.html")
json_name=$(echo "$s_name.json")

echo "${i} ${r1_name} ${r2_name} ${base}" >> ${TMPDIR}/id_map

fastp \
-l {{length_limit}} \
-i ${r1} \
-I ${r2} \
-w {{cores_per_task}} \
--adapter_fasta {{knwn_adpt_path}} \
--html {{html_path}}/${html_name} \
--json {{json_path}}/${json_name} \
--stdout | \
sed -r "1~4s/^@(.*)/@${i}${delimiter}\1/"
done > ${TMPDIR}/seqs.fastq
function mux_runner () {
n=$(wc -l ${FILES} | cut -f 1 -d" ")

jobd=${TMPDIR}
id_map=${jobd}/id_map
seqs_r1=${jobd}/seqs.r1.fastq
seqs_r2=${jobd}/seqs.r2.fastq
r1_filt=${jobd}/seqs.r1.adapter-removed.fastq
r2_filt=${jobd}/seqs.r2.adapter-removed.fastq

for i in $(seq 1 ${n})
do
line=$(head -n ${i} ${FILES} | tail -n 1)
r1=$(echo ${line} | cut -f 1 -d" ")
r2=$(echo ${line} | cut -f 2 -d" ")
base=$(echo ${line} | cut -f 3 -d" ")
r1_name=$(basename ${r1} .fastq.gz)
r2_name=$(basename ${r2} .fastq.gz)

s_name=$(basename "${r1}" | sed -r 's/\.fastq\.gz//')
html_name=$(echo "$s_name.html")
json_name=$(echo "$s_name.json")

echo "${i} ${r1_name} ${r2_name} ${base}" >> ${id_map}

fastp \
-l {{length_limit}} \
-i ${r1} \
-I ${r2} \
-w {{cores_per_task}} \
--adapter_fasta {{knwn_adpt_path}} \
--html {{html_path}}/${html_name} \
--json {{json_path}}/${json_name} \
--out1 ${r1_filt} \
--out2 ${r2_filt}

cat ${r1_filt} | \
sed -r "1~4s/^@(.*)/@${i}${delimiter}\1/" \
>> ${seqs_r1} &
cat ${r2_filt} | \
sed -r "1~4s/^@(.*)/@${i}${delimiter}\1/" \
>> ${seqs_r2} &
wait

rm ${r1_filt} &
rm ${r2_filt} &
wait
done
}
export -f mux_runner

function minimap2_runner () {
mmi=$1
Expand All @@ -116,39 +137,124 @@ function minimap2_runner () {
minimap2 -2 -ax sr -t {{cores_per_task}} ${mmi} ${TMPDIR}/seqs.fastq | \
samtools fastq -@ 1 -f 12 -F 256 > ${TMPDIR}/seqs_new.fastq
mv ${TMPDIR}/seqs_new.fastq ${TMPDIR}/seqs.fastq
}

function runner () {
i=${1}

# with the current proposed resources, we likely do not get
# benefit for parallel gzip write
{{demux_path}} \
--id-map ${TMPDIR}/id_map \
--infile ${TMPDIR}/seqs.fastq \
--output ${OUTPUT} \
--encoded-id ${i} \
--threads 1
fcurrent=${1}

jobd=${TMPDIR}

id_map=${jobd}/id_map
if [[ ! -f ${id_map} ]]; then
echo "No samples..."
return
fi

seqs_r1=${jobd}/seqs.r1.fastq
seqs_r2=${jobd}/seqs.r2.fastq
seqs_mmpe_r1=${jobd}/seqs.mmpe.r1.fastq
seqs_mmpe_r2=${jobd}/seqs.mmpe.r2.fastq
seqs_mmpese=${jobd}/seqs.mmpese.fastq
seqs_mmpese_r1=${jobd}/seqs.mmpese.r1.fastq
seqs_mmpese_r2=${jobd}/seqs.mmpese.r2.fastq

# PE operation
minimap2 -2 -ax sr -t 12 \
${fcurrent} \
${seqs_r1} \
${seqs_r2} | \
samtools fastq \
-@ 4 \
-f 12 \
-F 256 \
-N \
-1 ${seqs_mmpe_r1} \
-2 ${seqs_mmpe_r2}

rm ${seqs_r1} &
rm ${seqs_r2} &
# no need to block

# SE operation
# run r1/r2 serially to avoid minimap2 picking up on interleve
minimap2 -2 -ax sr -t 12 \
${fcurrent} \
<(cat ${seqs_mmpe_r1} ${seqs_mmpe_r2}) | \
samtools fastq \
-@ 4 \
-f 4 \
-F 256 \
-0 ${seqs_mmpese}

rm ${seqs_mmpe_r1} &
rm ${seqs_mmpe_r2} &
# no need to block

./splitter/target/release/splitter ${seqs_mmpese} ${seqs_mmpese_r1} ${delimiter} ${r1_tag} &
./splitter/target/release/splitter ${seqs_mmpese} ${seqs_mmpese_r2} ${delimiter} ${r2_tag} &
wait

rm ${seqs_mmpese} &

fastq_pair -t 50000000 ${seqs_mmpese_r1} ${seqs_mmpese_r2}
rm ${seqs_mmpese_r1}.single.fq &
rm ${seqs_mmpese_r2}.single.fq &
rm ${seqs_mmpese_r1} &
rm ${seqs_mmpese_r2} &
wait

mv ${seqs_mmpese_r1}.paired.fq ${seqs_r1}
mv ${seqs_mmpese_r2}.paired.fq ${seqs_r2}
}
export -f runner
export -f minimap2_runner

if [[ -f ${MMI} ]]; then
minimap2_runner ${MMI}
else
for mmi in ${MMI}/*.mmi
function demux_runner () {
n_demux_jobs=${SLURM_CPUS_PER_TASK}
jobd=${TMPDIR}
id_map=${jobd}/id_map
seqs_r1=${jobd}/seqs.r1.fastq
seqs_r2=${jobd}/seqs.r2.fastq

id_map=${jobd}/id_map
if [[ ! -f ${id_map} ]]; then
echo "No samples..."
return
fi

for idx in $(seq 0 ${n_demux_jobs})
do
minimap2_runner ${mmi}
python {{demux_path}} \
${id_map} \
<(cat ${seqs_r1} ${seqs_r2}) \
${OUTPUT} \
${idx} \
${n_demux_jobs} &
done
fi
wait
}
export -f demux_runner

mkdir -p ${OUTPUT}
mmi_files=${TMPDIR}/mmi-files
if [[ -d ${MMI} ]]; then
/bin/ls -1 ${MMI}/*.mmi > ${mmi_files}
else
echo ${MMI} > ${mmi_files}
fi
n_files=$(wc -l ${mmi_files} | cut -f 1 -d" ")

jobs=${SLURM_JOB_CPUS_PER_NODE}
mux-runner

echo "$(date) :: demux start"
# process with minimap2
for idx in $(seq 1 ${n_files})
do
fcurrent=$(head -n ${idx} ${mmi_files} | tail -n 1)
export fcurrent
echo "$(date) :: $(basename ${fcurrent})"
minimap2-runner
done

seq 1 ${n} | parallel -j ${jobs} runner
mkdir -p ${OUTPUT}

echo "$(date) :: demux start"
demux-runner
echo "$(date) :: demux stop"

touch ${OUTPUT}/${SLURM_JOB_NAME}.${SLURM_JOB_ID}.completed

0 comments on commit 5de964a

Please sign in to comment.