Skip to content

Commit

Permalink
Allow first metadata column for merge groups
Browse files Browse the repository at this point in the history
  • Loading branch information
chbk committed Apr 19, 2022
1 parent c0ff476 commit 8cc13c7
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 117 deletions.
12 changes: 12 additions & 0 deletions main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,18 @@ params.skip_lnc_detection =
params.containsKey('skip-feelnc') ?
true : false

params.max_cpus =
params.containsKey('max-cpus') ?
params.'max-cpus' as int : 16

params.max_memory =
params.containsKey('max-memory') ?
params.'max-memory' as nextflow.util.MemoryUnit : 64.GB

params.max_time =
params.containsKey('max-time') ?
params.'max-time' as nextflow.util.Duration : 18.h

if (!params.output) {
error += '\nNo --output provided\n'
}
Expand Down
16 changes: 12 additions & 4 deletions modules/samtools.nf
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ process SAMTOOLS_control_contigs {

process SAMTOOLS_merge_reads {

label 'cpu_16'
cpus = { bams instanceof List ? Math.min(16, params.max_cpus) : 1 }

input:
tuple val(id), path(bams), val(length), val(direction)
Expand All @@ -82,7 +82,15 @@ process SAMTOOLS_merge_reads {

shell:
merged = id + '.bam'
'''
samtools merge '!{merged}' !{bams} --threads !{task.cpus}
'''
if (bams instanceof List)
'''
samtools merge '!{merged}' !{bams} --threads !{task.cpus}
'''
else if (bams.getName() != merged)
'''
mv !{bams} '!{merged}'
'''
else
'''
'''
}
10 changes: 3 additions & 7 deletions workflows/create_channels.nf
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ workflow CREATE_CHANNELS {
).splitCsv(header: true, sep: '\t').map({ row ->
[
'prefix': row.values()[0],
'columns': row.subMap(row.keySet().collect()[1..-1])
'columns': row
]
})

Expand Down Expand Up @@ -306,17 +306,13 @@ workflow CREATE_CHANNELS {
channel_assembly_metadata =
set_metadata_groups(channel_metadata, params.assemble_by)

if (params.assemble_by) {
log_metadata_groups(channel_assembly_metadata, 'assembly')
}
log_metadata_groups(channel_assembly_metadata, 'assembly')

// each [prefix, id, [column: value, column: value]]
channel_quantification_metadata =
set_metadata_groups(channel_metadata, params.quantify_by)

if (params.quantify_by) {
log_metadata_groups(channel_quantification_metadata, 'quantification')
}
log_metadata_groups(channel_quantification_metadata, 'quantification')

// DECOMPRESS INDEX --------------------------------------------------------

Expand Down
198 changes: 92 additions & 106 deletions workflows/merge_reads.nf
Original file line number Diff line number Diff line change
Expand Up @@ -92,114 +92,100 @@ workflow MERGE_READS {

main:

if (params.assemble_by) {

// each [id, [bams], [lengths], [directions]]
channel_assembly_reads =
group_reads(
channel_aligned_reads,
channel_assembly_metadata
)

check_grouped_directions(
channel_assembly_reads,
'assembly'
// each [id, [bams], [lengths], [directions]]
channel_assembly_reads =
group_reads(
channel_aligned_reads,
channel_assembly_metadata
)

// each [id, [bams], length, direction]
channel_assembly_reads =
channel_assembly_reads.map({ group ->
[
group['id'],
group['bams'],
(group['lengths'].sum()/group['lengths'].size()).toInteger(),
group['directions'][0]
]
})

// each [id, [bams], length, direction] => [id, bam, length, direction]
SAMTOOLS_merge_reads_for_assembly(channel_assembly_reads)

// each [id, bam, length, direction]
channel_assembly_reads =
SAMTOOLS_merge_reads_for_assembly.out.map({ output ->
[
'id': output[0],
'bam': output[1],
'length': output[2],
'direction': output[3]
]
})

} else {

// each [id, bam, length, direction]
channel_assembly_reads =
channel_aligned_reads.map({ aligned ->
[
'id': aligned['prefix'],
'bam': aligned['bam'],
'length': aligned['length'],
'direction': aligned['direction']
]
})
}

if (params.quantify_by) {

// each [id, [bams], [lengths], [directions]]
channel_quantification_reads =
group_reads(
channel_aligned_reads,
channel_quantification_metadata
)

check_grouped_lengths(
channel_quantification_reads,
'quantification'
check_grouped_directions(
channel_assembly_reads,
'assembly'
)

// each [id, [bams], length, direction]
channel_assembly_reads =
channel_assembly_reads.map({ group ->
[
group['id'],
group['bams'].size() == 1 &&
(group['bams'][0].getName() =~ /^(.+?)\.bam$/)[0][1] == group['id'] ?
group['bams'][0] : group['bams'],
(group['lengths'].sum()/group['lengths'].size()).toInteger(),
group['directions'][0]
]
}).branch({ group ->
grouped: group[1] instanceof List
single: true
})

// each [id, [bams], length, direction] => [id, bam, length, direction]
SAMTOOLS_merge_reads_for_assembly(
channel_assembly_reads.grouped
)

// each [id, bam, length, direction]
channel_assembly_reads =
SAMTOOLS_merge_reads_for_assembly.out.mix(
channel_assembly_reads.single
).map({ output ->
[
'id': output[0],
'bam': output[1],
'length': output[2],
'direction': output[3]
]
})

// each [id, [bams], [lengths], [directions]]
channel_quantification_reads =
group_reads(
channel_aligned_reads,
channel_quantification_metadata
)

check_grouped_directions(
channel_quantification_reads,
'quantification'
)

// each [id, [bams], length, direction]
channel_quantification_reads =
channel_quantification_reads.map({ group ->
[
group['id'],
group['bams'],
(group['lengths'].sum()/group['lengths'].size()).toInteger(),
group['directions'][0]
]
})

// each [id, [bams], length, direction] => [id, bam, length, direction]
SAMTOOLS_merge_reads_for_quantification(channel_quantification_reads)

// each [id, bam, length, direction]
channel_quantification_reads =
SAMTOOLS_merge_reads_for_quantification.out.map({ output ->
[
'id': output[0],
'bam': output[1],
'length': output[2],
'direction': output[3]
]
})

} else {

// each [id, bam, length, direction]
channel_quantification_reads =
channel_aligned_reads.map({ aligned ->
[
'id': aligned['prefix'],
'bam': aligned['bam'],
'length': aligned['length'],
'direction': aligned['direction']
]
})
}
check_grouped_lengths(
channel_quantification_reads,
'quantification'
)

check_grouped_directions(
channel_quantification_reads,
'quantification'
)

// each [id, [bams], length, direction]
channel_quantification_reads =
channel_quantification_reads.map({ group ->
[
group['id'],
group['bams'].size() == 1 &&
(group['bams'][0].getName() =~ /^(.+?)\.bam$/)[0][1] == group['id'] ?
group['bams'][0] : group['bams'],
(group['lengths'].sum()/group['lengths'].size()).toInteger(),
group['directions'][0]
]
}).branch({ group ->
grouped: group[1] instanceof List
single: true
})

// each [id, [bams], length, direction] => [id, bam, length, direction]
SAMTOOLS_merge_reads_for_quantification(
channel_quantification_reads.grouped
)

// each [id, bam, length, direction]
channel_quantification_reads =
SAMTOOLS_merge_reads_for_quantification.out.mix(
channel_quantification_reads.single
).map({ output ->
[
'id': output[0],
'bam': output[1],
'length': output[2],
'direction': output[3]
]
})
}

0 comments on commit 8cc13c7

Please sign in to comment.