Skip to content

Commit

Permalink
Merge pull request #412 from maxulysse/dsl2_csv_magic
Browse files Browse the repository at this point in the history
add CI for --skip_markduplicates
  • Loading branch information
maxulysse authored Aug 4, 2021
2 parents 474706c + 4d55e71 commit 2fc03e5
Show file tree
Hide file tree
Showing 12 changed files with 184 additions and 99 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
# Nextflow versions: check pipeline minimum and current latest
nxf_ver: ['21.04.0', '']
engine: ['docker']
test: ['default', 'aligner', 'gatk4_spark', 'targeted', 'tumor_normal_pair', 'variant_calling', 'annotation']
test: ['default', 'aligner', 'gatk4_spark', 'targeted', 'skip_markduplicates', 'tumor_normal_pair', 'variant_calling', 'annotation']
steps:
- name: Check out pipeline code
uses: actions/checkout@v2
Expand Down
5 changes: 5 additions & 0 deletions conf/modules.config
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ params {
args2 = 'sort'
publish_files = false
}
'samtools_index_mapping' {
publish_by_meta = true
publish_files = ['bai':'mapped']
publish_dir = 'preprocessing'
}
// MARKDUPLICATES
'markduplicates' {
args = 'REMOVE_DUPLICATES=false VALIDATION_STRINGENCY=LENIENT'
Expand Down
7 changes: 7 additions & 0 deletions conf/test.config
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,16 @@ profiles {
pair {
params.input = 'https://raw.githubusercontent.com/nf-core/test-datasets/sarek/testdata/csv/tiny-manta-https.csv'
}
prepare_recalibration {
params.input = 'https://raw.githubusercontent.com/nf-core/test-datasets/sarek/testdata/csv/tiny-mapped-normal-https.csv'
params.step = 'prepare_recalibration'
}
save_bam_mapped {
params.save_bam_mapped = true
}
skip_markduplicates {
params.skip_markduplicates = true
}
split_fastq {
params.split_fastq = 2
}
Expand Down
2 changes: 1 addition & 1 deletion modules/local/gatk4/baserecalibrator/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ process GATK4_BASERECALIBRATOR {
path fasta
path fai
path dict
path knownSitesTBI
path knownSites
path knownSites_tbi

output:
tuple val(meta), path("*.table"), emit: table
Expand Down
8 changes: 4 additions & 4 deletions subworkflows/local/build_indices.nf
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ workflow BUILD_INDICES {

result_dbsnp_tbi = Channel.empty()
version_dbsnp_tbi = Channel.empty()
if (!(params.dbsnp_tbi) && params.dbsnp && ('mapping' in step || 'prepare_recalibration' in step || 'controlfreec' in tools || 'haplotypecaller' in tools|| 'mutect2' in tools || 'tnscope' in tools)) {
if (!(params.dbsnp_tbi) && params.dbsnp && ('mapping' in step || 'preparerecalibration' in step || 'controlfreec' in tools || 'haplotypecaller' in tools|| 'mutect2' in tools || 'tnscope' in tools)) {
dbsnp_id = dbsnp.map {it -> [[id:"$it.baseName"], it]}
(result_dbsnp_tbi, version_dbsnp_tbi) = TABIX_DBSNP(dbsnp_id)
result_dbsnp_tbi = result_dbsnp_tbi.map {meta, tbi -> [tbi]}
Expand All @@ -81,14 +81,14 @@ workflow BUILD_INDICES {

result_germline_resource_tbi = Channel.empty()
version_germline_resource_tbi = Channel.empty()
if (!(params.germline_resource_tbi) && params.germline_resource && 'mutect2' in tools){
if (!(params.germline_resource_tbi) && params.germline_resource && 'mutect2' in tools) {
germline_resource_id = germline_resource.map {it -> [[id:"$it.baseName"], it]}
(result_germline_resource_tbi, version_germline_resource_tbi) = TABIX_GERMLINE_RESOURCE(germline_resource_id)
}

result_known_indels_tbi = Channel.empty()
version_known_indels_tbi = Channel.empty()
if (!(params.known_indels_tbi) && params.known_indels && ('mapping' in step || 'prepare_recalibration' in step)){
if (!(params.known_indels_tbi) && params.known_indels && ('mapping' in step || 'preparerecalibration' in step)) {
known_indels_id = known_indels.map {it -> [[id:"$it.baseName"], it]}
(result_known_indels_tbi, version_known_indels_tbi) = TABIX_KNOWN_INDELS(known_indels_id)
result_known_indels_tbi = result_known_indels_tbi.map {meta, tbi -> [tbi]}
Expand All @@ -101,7 +101,7 @@ workflow BUILD_INDICES {

result_pon_tbi = Channel.empty()
version_pon_tbi = Channel.empty()
if (!(params.pon_tbi) && params.pon && ('tnscope' in tools || 'mutect2' in tools)){
if (!(params.pon_tbi) && params.pon && ('tnscope' in tools || 'mutect2' in tools)) {
pon_id = pon.map {it -> [[id:"$it.baseName"], it]}
(result_pon_tbi, version_pon_tbi) = TABIX_PON(pon_id)
}
Expand Down
6 changes: 3 additions & 3 deletions subworkflows/local/mapping_csv.nf
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@

workflow MAPPING_CSV {
take:
bam_mapped // channel: [mandatory] meta, bam, bai
bam_indexed // channel: [mandatory] meta, bam, bai
save_bam_mapped // boolean: [mandatory] save_bam_mapped
skip_markduplicates // boolean: [mandatory] skip_markduplicates

main:
if (save_bam_mapped) {
csv_bam_mapped = bam_mapped.map { meta, bam, bai -> [meta] }
csv_bam_mapped = bam_indexed.map { meta, bam, bai -> [meta] }
// Creating csv files to restart from this step
csv_bam_mapped.collectFile(storeDir: "${params.outdir}/preprocessing/csv") { meta ->
patient = meta.patient[0]
Expand All @@ -26,7 +26,7 @@ workflow MAPPING_CSV {
}

if (skip_markduplicates) {
csv_bam_mapped = bam_mapped.map { meta, bam, bai -> [meta] }
csv_bam_mapped = bam_indexed.map { meta, bam, bai -> [meta] }
// Creating csv files to restart from this step
csv_bam_mapped.collectFile(storeDir: "${params.outdir}/preprocessing/csv") { meta ->
patient = meta.patient[0]
Expand Down
57 changes: 38 additions & 19 deletions subworkflows/nf-core/mapping.nf
Original file line number Diff line number Diff line change
Expand Up @@ -4,44 +4,46 @@
========================================================================================
*/

params.seqkit_split2_options = [:]
params.bwamem1_mem_options = [:]
params.bwamem1_mem_tumor_options = [:]
params.bwamem2_mem_options = [:]
params.bwamem2_mem_tumor_options = [:]
params.merge_bam_options = [:]
params.samtools_index_options = [:]
params.seqkit_split2_options = [:]

include { SEQKIT_SPLIT2 } from '../../modules/nf-core/modules/seqkit/split2/main.nf' addParams(options: params.seqkit_split2_options)
include { BWAMEM2_MEM as BWAMEM2_MEM_T } from '../../modules/local/bwamem2/mem/main' addParams(options: params.bwamem2_mem_tumor_options)
include { BWAMEM2_MEM } from '../../modules/local/bwamem2/mem/main' addParams(options: params.bwamem2_mem_options)
include { BWA_MEM as BWAMEM1_MEM } from '../../modules/local/bwa/mem/main' addParams(options: params.bwamem1_mem_options)
include { BWA_MEM as BWAMEM1_MEM_T } from '../../modules/local/bwa/mem/main' addParams(options: params.bwamem1_mem_tumor_options)
include { BWAMEM2_MEM } from '../../modules/local/bwamem2/mem/main' addParams(options: params.bwamem2_mem_options)
include { BWAMEM2_MEM as BWAMEM2_MEM_T } from '../../modules/local/bwamem2/mem/main' addParams(options: params.bwamem2_mem_tumor_options)
include { SAMTOOLS_INDEX } from '../../modules/local/samtools/index/main' addParams(options: params.samtools_index_options)
include { SAMTOOLS_MERGE } from '../../modules/nf-core/modules/samtools/merge/main' addParams(options: params.merge_bam_options)
include { SEQKIT_SPLIT2 } from '../../modules/nf-core/modules/seqkit/split2/main.nf' addParams(options: params.seqkit_split2_options)

workflow MAPPING {
take:
aligner // string: [mandatory] "bwa-mem" or "bwa-mem2"
bwa // channel: [mandatory] bwa
fai // channel: [mandatory] fai
fasta // channel: [mandatory] fasta
reads_input // channel: [mandatory] meta, reads_input
aligner // string: [mandatory] "bwa-mem" or "bwa-mem2"
bwa // channel: [mandatory] bwa
fai // channel: [mandatory] fai
fasta // channel: [mandatory] fasta
reads_input // channel: [mandatory] meta, reads_input
skip_markduplicates // boolean: true/false

main:

bam_mapped_index = Channel.empty()
bam_reports = Channel.empty()

bam_indexed = Channel.empty()

if(params.split_fastq > 1){
if (params.split_fastq > 1) {
reads_input_split = SEQKIT_SPLIT2(reads_input).reads.map{
key, reads ->
//TODO maybe this can be replaced by a regex to include part_001 etc.

//sorts list of split fq files by :
//[R1.part_001, R2.part_001, R1.part_002, R2.part_002,R1.part_003, R2.part_003,...]
//TODO: determine whether it is possible to have an uneven number of parts, so remainder: true woud need to be used, I guess this could be possible for unfiltered reads, reads that don't have pairs etc.
return [key, reads.sort{ a,b -> a.getName().tokenize('.')[ a.getName().tokenize('.').size() - 3] <=> b.getName().tokenize('.')[ b.getName().tokenize('.').size() - 3]}
.collate(2)]
return [key, reads.sort{ a,b -> a.getName().tokenize('.')[ a.getName().tokenize('.').size() - 3] <=> b.getName().tokenize('.')[ b.getName().tokenize('.').size() - 3]}.collate(2)]
}.transpose()
}else{
} else {
reads_input_split = reads_input
}

Expand Down Expand Up @@ -77,14 +79,31 @@ workflow MAPPING {
bam_bwa.map{ meta, bam ->
meta.remove('read_group')
meta.id = meta.sample
// groupKey is to makes sure that the correct group can advance as soon as it is complete
// and not stall the workflow until all pieces are mapped
def groupKey = groupKey(meta, meta.numLanes * params.split_fastq)
tuple(groupKey, bam)
[meta, bam]
}.groupTuple() //groupKey above is somehow makes sure, the workflow doesn't stall until all pieces are mapped, but that the correct group can advance as soon as it is complete
}.groupTuple()
.set{bam_mapped}

// STEP 1.5: MERGING AND INDEXING BAM FROM MULTIPLE LANES // MarkDuplicates can take care of this
// MarkDuplicates can handle multiple BAMS as input, so no merging/indexing at this step
// Except if and only if skipping MarkDuplicates

if (skip_markduplicates) {
bam_mapped.branch{
single: it[1].size() == 1
multiple: it[1].size() > 1
}.set{ bam_to_merge }

SAMTOOLS_MERGE(bam_to_merge.multiple)
bam_merged = bam_to_merge.single.mix(SAMTOOLS_MERGE.out.bam)

SAMTOOLS_INDEX(bam_merged)
bam_indexed = bam_merged.join(SAMTOOLS_INDEX.out.bai)
}

emit:
bam = bam_mapped
bam = bam_mapped
bam_indexed = bam_indexed
}
24 changes: 8 additions & 16 deletions subworkflows/nf-core/prepare_recalibration.nf
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,23 @@ workflow PREPARE_RECALIBRATION {
known_sites // channel: [optional] known_sites
known_sites_tbi // channel: [optional] known_sites_tbi
no_intervals // value: [mandatory] no_intervals
known_indels
dbsnp

main:
cram_markduplicates.combine(intervals)
.map{ meta, cram, crai, intervals ->
new_meta = meta.clone()
new_meta.id = meta.sample + "_" + intervals.baseName
[new_meta, cram, crai, intervals]
}
.set{cram_markduplicates_intervals}
.map{ meta, cram, crai, intervals ->
new_meta = meta.clone()
new_meta.id = meta.sample + "_" + intervals.baseName
[new_meta, cram, crai, intervals]
}.set{cram_markduplicates_intervals}

if(use_gatk_spark){
if (use_gatk_spark) {
BASERECALIBRATOR_SPARK(cram_markduplicates_intervals, fasta, fai, dict, known_sites, known_sites_tbi)
table_baserecalibrator = BASERECALIBRATOR_SPARK.out.table
}else{
BASERECALIBRATOR(cram_markduplicates_intervals, fasta, fai, dict, known_sites_tbi, known_sites)
} else {
BASERECALIBRATOR(cram_markduplicates_intervals, fasta, fai, dict, known_sites, known_sites_tbi)
table_baserecalibrator = BASERECALIBRATOR.out.table
}

//num_intervals = intervals.toList().size.view() //Integer.valueOf()
//.view()
//println(intervals.toList().getClass()) //.value.getClass())

//STEP 3.5: MERGING RECALIBRATION TABLES
if (no_intervals) {
table_baserecalibrator.map { meta, table ->
Expand All @@ -62,7 +55,6 @@ workflow PREPARE_RECALIBRATION {

GATHERBQSRREPORTS(recaltable)
table_bqsr = GATHERBQSRREPORTS.out.table

}

emit:
Expand Down
35 changes: 13 additions & 22 deletions subworkflows/nf-core/qc_markduplicates.nf
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ params.samtools_index_options = [:]
include { GATK4_MARKDUPLICATES } from '../../modules/local/gatk4/markduplicates/main' addParams(options: params.markduplicates_options)
include { GATK4_MARKDUPLICATES_SPARK } from '../../modules/local/gatk4/markduplicatesspark/main' addParams(options: params.markduplicatesspark_options)
include { GATK4_ESTIMATELIBRARYCOMPLEXITY } from '../../modules/local/gatk4/estimatelibrarycomplexity/main' addParams(options: params.estimatelibrarycomplexity_options)
include { SAMTOOLS_MERGE } from '../../modules/nf-core/modules/samtools/merge/main' addParams(options: params.merge_bam_options)
include { QUALIMAP_BAMQC } from '../../modules/local/qualimap/bamqc/main' addParams(options: params.qualimap_bamqc_options)
include { SAMTOOLS_STATS } from '../../modules/local/samtools/stats/main' addParams(options: params.samtools_stats_options)
include { SAMTOOLS_VIEW as SAMTOOLS_BAM_TO_CRAM } from '../../modules/local/samtools/view/main.nf' addParams(options: params.samtools_view_options)
Expand All @@ -25,51 +24,43 @@ include { SAMTOOLS_INDEX } from '../../modules/loca

workflow QC_MARKDUPLICATES {
take:
bam_mapped // channel: [mandatory] meta, bam, bai
bam_mapped // channel: [mandatory] meta, bam
bam_indexed // channel: [mandatory] meta, bam, bai
use_gatk_spark // value: [mandatory] use gatk spark
save_metrics // value: [mandatory] save metrics
fasta // channel: [mandatory] fasta
fai // channel: [mandatory] fai
dict // channel: [mandatory] dict
skip_markduplicates // boolean: true/false
skip_bamqc // boolean: true/false
skip_samtools // boolean: true/false
target_bed // channel: [optional] target_bed

main:

report_markduplicates = Channel.empty()
if(params.skip_markduplicates){

bam_mapped.branch{
single: it[1].size() == 1
multiple: it[1].size() > 1
}.set{ bam_to_merge }

SAMTOOLS_MERGE(bam_to_merge.multiple)
bam_merged = bam_to_merge.single.mix(SAMTOOLS_MERGE.out.bam)

SAMTOOLS_INDEX(bam_merged)
bam_markduplicates = bam_merged.join(SAMTOOLS_INDEX.out.bai)
cram_markduplicates = SAMTOOLS_BAM_TO_CRAM(bam_markduplicates, fasta, fai)
} else{

if (skip_markduplicates) {
bam_markduplicates = bam_indexed
SAMTOOLS_BAM_TO_CRAM(bam_markduplicates, fasta, fai)
cram_markduplicates = SAMTOOLS_BAM_TO_CRAM.out.cram
} else {
if (use_gatk_spark) {

//If BAMQC should be run on MD output, then don't use MDSpark to convert to cram, but use bam output instead
if(!skip_bamqc){
if (!skip_bamqc) {
GATK4_MARKDUPLICATES_SPARK(bam_mapped, fasta, fai, dict, "bam")
SAMTOOLS_INDEX(GATK4_MARKDUPLICATES_SPARK.out.output)
bam_markduplicates = GATK4_MARKDUPLICATES_SPARK.out.output.join(SAMTOOLS_INDEX.out.bai)

SAMTOOLS_BAM_TO_CRAM_SPARK(bam_markduplicates, fasta, fai)
cram_markduplicates = SAMTOOLS_BAM_TO_CRAM_SPARK.out.cram
}else{
} else {
GATK4_MARKDUPLICATES_SPARK(bam_mapped, fasta, fai, dict, "cram")
SAMTOOLS_INDEX(GATK4_MARKDUPLICATES_SPARK.out.output)
cram_markduplicates = GATK4_MARKDUPLICATES_SPARK.out.output.join(SAMTOOLS_INDEX.out.crai)
}

if(save_metrics){
if (save_metrics) {
GATK4_ESTIMATELIBRARYCOMPLEXITY(bam_mapped, fasta, fai, dict)
report_markduplicates = GATK4_ESTIMATELIBRARYCOMPLEXITY.out.metrics
}
Expand All @@ -88,12 +79,12 @@ workflow QC_MARKDUPLICATES {
//if !skip_markduplicates, then QC tools are run on duplicate marked bams
//After bamqc finishes, convert to cram for further analysis
qualimap_bamqc = Channel.empty()
if(!skip_bamqc){
if (!skip_bamqc && !skip_markduplicates) {
//TODO: after adding CI tests, allow bamqc on mapped bams if no duplicate marking is done
QUALIMAP_BAMQC(bam_markduplicates, target_bed, params.target_bed)
qualimap_bamqc = QUALIMAP_BAMQC.out
}


samtools_stats = Channel.empty()
if (!skip_samtools) {
SAMTOOLS_STATS(cram_markduplicates, fasta)
Expand Down
3 changes: 3 additions & 0 deletions tests/test_annotation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
files:
- path: results/annotation/1234N/1234N_snpEff.ann.gz
- path: results/annotation/1234N/1234N_snpEff.ann.gz.tbi
- path: results/multiqc
- name: Run VEP
command: nextflow run main.nf -profile test,annotation,docker --tools vep
tags:
Expand All @@ -14,6 +15,7 @@
files:
- path: results/annotation/1234N/1234N_VEP.ann.gz
- path: results/annotation/1234N/1234N_VEP.ann.gz.tbi
- path: results/multiqc
- name: Run snpEff followed by VEP
command: nextflow run main.nf -profile test,annotation,docker --tools merge
tags:
Expand All @@ -26,3 +28,4 @@
- path: results/annotation/1234N/1234N_snpEff.ann.gz.tbi
- path: results/annotation/1234N/1234N_snpEff_VEP.ann.gz
- path: results/annotation/1234N/1234N_snpEff_VEP.ann.gz.tbi
- path: results/multiqc
Loading

0 comments on commit 2fc03e5

Please sign in to comment.