From 6fa1fde8e8461ba51680b4dd6a1797eb5104c266 Mon Sep 17 00:00:00 2001 From: Tim Fennell Date: Thu, 26 Oct 2023 05:47:36 -0600 Subject: [PATCH] Alternative PR to add duplicate marking to GroupReadsByUmi (#940) Add duplicate marking to GroupReadsByUmi. Also includes: - Ability to pass through secondary and supplementary reads - Defaulting passing through secondary and supplementary reads based on whether duplicate marking is activated or not Co-authored-by: Joe Vieira Co-authored-by: Nils Homer --- .github/workflows/unittests.yaml | 1 + .../umi/ConsensusCallingIterator.scala | 8 +- .../fulcrumgenomics/umi/GroupReadsByUmi.scala | 104 +++++++++++++----- .../umi/GroupReadsByUmiTest.scala | 63 ++++++++++- 4 files changed, 144 insertions(+), 32 deletions(-) diff --git a/.github/workflows/unittests.yaml b/.github/workflows/unittests.yaml index 0502aea40..d1e897ecd 100644 --- a/.github/workflows/unittests.yaml +++ b/.github/workflows/unittests.yaml @@ -24,6 +24,7 @@ jobs: - name: Code Coverage uses: codecov/codecov-action@v3.1.0 with: + token: ${{ secrets.CODECOV_TOKEN }} flags: unittests # optional fail_ci_if_error: true # optional (default = false) verbose: true # optional (default = false) diff --git a/src/main/scala/com/fulcrumgenomics/umi/ConsensusCallingIterator.scala b/src/main/scala/com/fulcrumgenomics/umi/ConsensusCallingIterator.scala index 919c8d111..d99c7559c 100644 --- a/src/main/scala/com/fulcrumgenomics/umi/ConsensusCallingIterator.scala +++ b/src/main/scala/com/fulcrumgenomics/umi/ConsensusCallingIterator.scala @@ -53,10 +53,14 @@ class ConsensusCallingIterator[ConsensusRead <: SimpleRead](sourceIterator: Iter private var collectedStats: Boolean = false protected val iter: Iterator[SamRecord] = { + val filteredIterator = sourceIterator.filterNot { r => + r.secondary || r.supplementary || r.unmapped || (r.paired && r.mateUnmapped) + } + // Wrap our input iterator in a progress logging iterator if we have a progress logger val progressIterator = progress match { - case Some(p) => sourceIterator.tapEach { r => p.record(r) } - case None => sourceIterator + case Some(p) => filteredIterator.tapEach { r => p.record(r) } + case None => filteredIterator } // Then turn it into a grouping iterator diff --git a/src/main/scala/com/fulcrumgenomics/umi/GroupReadsByUmi.scala b/src/main/scala/com/fulcrumgenomics/umi/GroupReadsByUmi.scala index 7c335fd9a..1559b9f7c 100644 --- a/src/main/scala/com/fulcrumgenomics/umi/GroupReadsByUmi.scala +++ b/src/main/scala/com/fulcrumgenomics/umi/GroupReadsByUmi.scala @@ -26,11 +26,10 @@ */ package com.fulcrumgenomics.umi -import java.util.concurrent.atomic.AtomicLong import com.fulcrumgenomics.FgBioDef._ import com.fulcrumgenomics.bam.api.SamOrder.TemplateCoordinate -import com.fulcrumgenomics.bam.{Bams, Template} import com.fulcrumgenomics.bam.api.{SamOrder, SamRecord, SamSource, SamWriter} +import com.fulcrumgenomics.bam.{Bams, Template} import com.fulcrumgenomics.cmdline.{ClpGroups, FgBioTool} import com.fulcrumgenomics.commons.util.{LazyLogging, NumericCounter, SimpleCounter} import com.fulcrumgenomics.sopt.{arg, clp} @@ -39,13 +38,14 @@ import com.fulcrumgenomics.util.Metric.{Count, Proportion} import com.fulcrumgenomics.util.Sequences.countMismatches import com.fulcrumgenomics.util._ import enumeratum.EnumEntry +import htsjdk.samtools.DuplicateScoringStrategy.ScoringStrategy import htsjdk.samtools._ import htsjdk.samtools.util.SequenceUtil -import scala.collection.BufferedIterator +import java.util.concurrent.atomic.AtomicLong import scala.collection.immutable.IndexedSeq -import scala.collection.mutable import scala.collection.mutable.ListBuffer +import scala.collection.{BufferedIterator, Iterator, mutable} object GroupReadsByUmi { @@ -439,9 +439,18 @@ object Strategy extends FgBioEnum[Strategy] { | 3. The assigned UMI tag | 4. Read Name | - |During grouping, reads are filtered out if a) all reads with the same queryname are unmapped, b) any primary - |read has mapping quality < `min-map-q` (default=1), or c) the primary mappings for R1 and R2 are on different - |chromosomes and `--allow-inter-contig` has been set to false. + |If the input is not template-coordinate sorted (i.e. `SO:unsorted GO:query SS:unsorted:template-coordinate`), then + |this tool will re-sort the input. The output will be written in template-coordinate order. + | + |During grouping, reads and templates are filtered out as follows: + | + |1. Templates are filtered if all reads for the template are unmapped + |2. Templates are filtered if any non-secondary, non-supplementary read has mapping quality < `min-map-q` + |3. Templates are filtered if R1 and R2 are mapped to different chromosomes and `--allow-inter-contig` is false + |4. Templates are filtered if any UMI sequence contains one or more `N` bases + |5. Templates are filtered if `--min-umi-length` is specified and the UMI does not meet the length requirement + |6. Reads are filtered out if flagged as secondary and `--include-secondary` is false + |7. Reads are filtered out if flagged as supplementary and `--include-supplementary` is false | |Grouping of UMIs is performed by one of four strategies: | @@ -470,27 +479,40 @@ object Strategy extends FgBioEnum[Strategy] { |(i.e. does not count dashes and other non-ACGT characters). This option is not implemented for reads with UMI pairs |(i.e. using the paired assigner). | - |If the input is not template-coordinate sorted (i.e. `SO:unsorted GO:query SS:unsorted:template-coordinate`), then - |this tool will re-sort the input. The ouitput will be written in template-coordinate order. + |If the `--mark-duplicates` option is given, reads will also have their duplicate flag set in the BAM file. + |Each tag-family is treated separately, and a single template within the tag family is chosen to be the "unique" + |template and marked as non-duplicate, while all other templates in the tag family are then marked as duplicate. + |One limitation of duplicate-marking mode, vs. e.g. Picard MarkDuplicates, is that read pairs with one unmapped read + |are duplicate-marked independently from read pairs with both reads mapped. + | + |Several parameters have different defaults depending on whether duplicates are being marked or not (all are + |directly settable on the command line): + | + | 1. `--min-map-q` defaults to 0 in duplicate marking mode and 1 otherwise + | 2. `--include-secondary` defaults to true in duplicate marking mode and false otherwise + | 3. `--include-supplementary` defaults to true in duplicate marking mode and false otherwise """ ) class GroupReadsByUmi -( @arg(flag='i', doc="The input BAM file.") val input: PathToBam = Io.StdIn, - @arg(flag='o', doc="The output BAM file.") val output: PathToBam = Io.StdOut, - @arg(flag='f', doc="Optional output of tag family size counts.") val familySizeHistogram: Option[FilePath] = None, - @arg(flag='t', doc="The tag containing the raw UMI.") val rawTag: String = "RX", - @arg(flag='T', doc="The output tag for UMI grouping.") val assignTag: String = "MI", - @arg(flag='m', doc="Minimum mapping quality for mapped reads.") val minMapQ: Int = 1, - @arg(flag='n', doc="Include non-PF reads.") val includeNonPfReads: Boolean = false, - @arg(flag='s', doc="The UMI assignment strategy.") val strategy: Strategy, - @arg(flag='e', doc="The allowable number of edits between UMIs.") val edits: Int = 1, - @arg(flag='l', doc= """The minimum UMI length. If not specified then all UMIs must have the same length, - |otherwise discard reads with UMIs shorter than this length and allow for differing UMI lengths. - |""") - val minUmiLength: Option[Int] = None, - @arg(flag='x', doc= """ - |DEPRECATED: this option will be removed in future versions and inter-contig reads will be - |automatically processed.""") +(@arg(flag='i', doc="The input BAM file.") val input: PathToBam = Io.StdIn, + @arg(flag='o', doc="The output BAM file.") val output: PathToBam = Io.StdOut, + @arg(flag='f', doc="Optional output of tag family size counts.") val familySizeHistogram: Option[FilePath] = None, + @arg(flag='t', doc="The tag containing the raw UMI.") val rawTag: String = ConsensusTags.UmiBases, + @arg(flag='T', doc="The output tag for UMI grouping.") val assignTag: String = ConsensusTags.MolecularId, + @arg(flag='d', doc="Turn on duplicate marking mode.") val markDuplicates: Boolean = false, + @arg(flag='S', doc="Include secondary reads.") val includeSecondary: Option[Boolean] = None, + @arg(flag='U', doc="Include supplementary reads.") val includeSupplementary: Option[Boolean] = None, + @arg(flag='m', doc="Minimum mapping quality for mapped reads.") val minMapQ: Option[Int] = None, + @arg(flag='n', doc="Include non-PF reads.") val includeNonPfReads: Boolean = false, + @arg(flag='s', doc="The UMI assignment strategy.") val strategy: Strategy, + @arg(flag='e', doc="The allowable number of edits between UMIs.") val edits: Int = 1, + @arg(flag='l', doc= """The minimum UMI length. If not specified then all UMIs must have the same length, + |otherwise discard reads with UMIs shorter than this length and allow for differing UMI lengths. + |""") + val minUmiLength: Option[Int] = None, + @arg(flag='x', doc= """ + |DEPRECATED: this option will be removed in future versions and inter-contig reads will be + |automatically processed.""") @deprecated val allowInterContig: Boolean = true )extends FgBioTool with LazyLogging { import GroupReadsByUmi._ @@ -499,6 +521,10 @@ class GroupReadsByUmi private val assigner = strategy.newStrategy(this.edits) + // Give values to unset parameters that are different in duplicate marking mode + private val _minMapQ = this.minMapQ.getOrElse(if (this.markDuplicates) 0 else 1) + private val _includeSecondaryReads = this.includeSecondary.getOrElse(this.markDuplicates) + private val _includeSupplementaryReads = this.includeSupplementary.getOrElse(this.markDuplicates) /** Checks that the read's mapq is over a minimum, and if the read is paired, that the mate mapq is also over the min. */ private def mapqOk(rec: SamRecord, minMapQ: Int): Boolean = { @@ -546,11 +572,12 @@ class GroupReadsByUmi // Filter and sort the input BAM file logger.info("Filtering the input.") val filteredIterator = in.iterator - .filter(r => !r.secondary && !r.supplementary) + .filter(r => this._includeSecondaryReads || !r.secondary ) + .filter(r => this._includeSupplementaryReads || !r.supplementary ) .filter(r => (includeNonPfReads || r.pf) || { filteredNonPf += 1; false }) .filter(r => (r.mapped || (r.paired && r.mateMapped)) || { filteredPoorAlignment += 1; false }) .filter(r => (allowInterContig || r.unpaired || r.refIndex == r.mateRefIndex) || { filteredPoorAlignment += 1; false }) - .filter(r => mapqOk(r, this.minMapQ) || { filteredPoorAlignment += 1; false }) + .filter(r => mapqOk(r, this._minMapQ) || { filteredPoorAlignment += 1; false }) .filter(r => !r.get[String](rawTag).exists(_.contains('N')) || { filteredNsInUmi += 1; false }) .filter { r => this.minUmiLength.forall { l => @@ -610,10 +637,15 @@ class GroupReadsByUmi // Take the next set of templates by position and assign UMIs val templates = takeNextGroup(templateCoordinateIterator, canTakeNextGroupByUmi=canTakeNextGroupByUmi) assignUmiGroups(templates) - - // Then output the records in the right order (assigned tag, read name, r1, r2) + // Then group the records in the right order (assigned tag, read name, r1, r2) val templatesByMi = templates.groupBy { t => t.r1.get.apply[String](this.assignTag) } + // If marking duplicates, assign bitflag to all duplicate reads + if (this.markDuplicates) { + templatesByMi.values.foreach(t => setDuplicateFlags(t)) + } + + // Then output the records in the right order (assigned tag, read name, r1, r2) templatesByMi.keys.toSeq.sortBy(id => (id.length, id)).foreach(tag => { templatesByMi(tag).sortBy(t => t.name).flatMap(t => t.primaryReads).foreach(rec => { out += rec @@ -731,4 +763,18 @@ class GroupReadsByUmi r1[String](this.rawTag) } } + + /** Sets the duplicate flags on all reads within all templates. */ + private def setDuplicateFlags(group: Seq[Template]): Unit = { + val nonDuplicateTemplate = group.maxBy { template => + template.primaryReads.sumBy { r => + r.mapq + DuplicateScoringStrategy.computeDuplicateScore(r.asSam, ScoringStrategy.SUM_OF_BASE_QUALITIES) + } + } + + group.foreach { template => + val flag = template ne nonDuplicateTemplate + template.allReads.foreach(_.duplicate = flag) + } + } } diff --git a/src/test/scala/com/fulcrumgenomics/umi/GroupReadsByUmiTest.scala b/src/test/scala/com/fulcrumgenomics/umi/GroupReadsByUmiTest.scala index beb9917ab..baec12e8e 100644 --- a/src/test/scala/com/fulcrumgenomics/umi/GroupReadsByUmiTest.scala +++ b/src/test/scala/com/fulcrumgenomics/umi/GroupReadsByUmiTest.scala @@ -229,7 +229,7 @@ class GroupReadsByUmiTest extends UnitSpec with OptionValues with PrivateMethodT val in = builder.toTempFile() val out = Files.createTempFile("umi_grouped.", ".sam") val hist = Files.createTempFile("umi_grouped.", ".histogram.txt") - val tool = new GroupReadsByUmi(input=in, output=out, familySizeHistogram=Some(hist), rawTag="RX", assignTag="MI", strategy=Strategy.Edit, edits=1, minMapQ=30) + val tool = new GroupReadsByUmi(input=in, output=out, familySizeHistogram=Some(hist), rawTag="RX", assignTag="MI", strategy=Strategy.Edit, edits=1, minMapQ=Some(30)) val logs = executeFgbioTool(tool) val groups = readBamRecs(out).groupBy(_.name.charAt(0)) @@ -255,6 +255,67 @@ class GroupReadsByUmiTest extends UnitSpec with OptionValues with PrivateMethodT } } + it should "correctly mark duplicates on duplicate reads in group, when flag is passed" in { + val builder = new SamBuilder(readLength = 100, sort = Some(SamOrder.Coordinate)) + // Mapping Quality is a tie breaker, so use that to our advantage here. + builder.addPair(mapq1 = 10, mapq2 = 10, name = "a01", start1 = 100, start2 = 300, strand1 = Plus, strand2 = Minus, attrs = Map("RX" -> "ACT-ACT")) + builder.addPair(mapq1 = 30, mapq2 = 30, name = "a02", start1 = 100, start2 = 300, strand1 = Plus, strand2 = Minus, attrs = Map("RX" -> "ACT-ACT")) + builder.addPair(mapq1 = 100, mapq2 = 10, name = "a03", start1 = 100, start2 = 300, strand1 = Plus, strand2 = Minus, attrs = Map("RX" -> "ACT-ACT")) + builder.addPair(mapq1 = 0, mapq2 = 0, name = "a04", start1 = 100, start2 = 300, strand1 = Plus, strand2 = Minus, attrs = Map("RX" -> "ACT-ACT")) + + val in = builder.toTempFile() + val out = Files.createTempFile("umi_grouped.", ".sam") + val hist = Files.createTempFile("umi_grouped.", ".histogram.txt") + val gr = new GroupReadsByUmi(input=in, output=out, familySizeHistogram=Some(hist), strategy=Strategy.Paired, edits=1, markDuplicates=true) + + gr.markDuplicates shouldBe true + + val recs = readBamRecs(out) + recs.filter(_.name.equals("a01")).forall(_.duplicate == true) shouldBe true + recs.filter(_.name.equals("a02")).forall(_.duplicate == true) shouldBe true + recs.filter(_.name.equals("a03")).forall(_.duplicate == false) shouldBe true + recs.filter(_.name.equals("a04")).forall(_.duplicate == true) shouldBe true + } + + it should "does not mark duplicates on reads in group, when flag is not passed" in { + val builder = new SamBuilder(readLength = 100, sort = Some(SamOrder.Coordinate)) + // Mapping Quality is a tie breaker, so use that to our advantage here. + builder.addPair(mapq1 = 10, mapq2 = 10, name = "a01", start1 = 100, start2 = 300, strand1 = Plus, strand2 = Minus, attrs = Map("RX" -> "ACT-ACT")) + builder.addPair(mapq1 = 30, mapq2 = 30, name = "a02", start1 = 100, start2 = 300, strand1 = Plus, strand2 = Minus, attrs = Map("RX" -> "ACT-ACT")) + builder.addPair(mapq1 = 100, mapq2 = 10, name = "a03", start1 = 100, start2 = 300, strand1 = Plus, strand2 = Minus, attrs = Map("RX" -> "ACT-ACT")) + builder.addPair(mapq1 = 0, mapq2 = 0, name = "a04", start1 = 100, start2 = 300, strand1 = Plus, strand2 = Minus, attrs = Map("RX" -> "ACT-ACT")) + + val in = builder.toTempFile() + val out = Files.createTempFile("umi_grouped.", ".sam") + val hist = Files.createTempFile("umi_grouped.", ".histogram.txt") + new GroupReadsByUmi(input=in, output=out, familySizeHistogram=Some(hist), strategy=Strategy.Paired, edits=1).execute() + + val recs = readBamRecs(out) + recs.filter(_.name.equals("a01")).forall(_.duplicate == false) shouldBe true + recs.filter(_.name.equals("a02")).forall(_.duplicate == false) shouldBe true + recs.filter(_.name.equals("a03")).forall(_.duplicate == false) shouldBe true + recs.filter(_.name.equals("a04")).forall(_.duplicate == false) shouldBe true + } + + it should "correctly mark duplicates on duplicate single-end reads with UMIs" in { + val builder = new SamBuilder(readLength = 100, sort = Some(SamOrder.Coordinate)) + builder.addFrag(mapq = 100, name = "a01", start = 100, attrs = Map("RX" -> "AAAAAAAA")) + builder.addFrag(mapq = 10, name = "a02", start = 100, attrs = Map("RX" -> "AAAAAAAA")) + builder.addFrag(mapq = 100, name = "a03", start = 100, attrs = Map("RX" -> "CACACACA")) + builder.addFrag(mapq = 10, name = "a04", start = 100, attrs = Map("RX" -> "CACACACC")) + + val in = builder.toTempFile() + val out = Files.createTempFile("umi_grouped.", ".sam") + val hist = Files.createTempFile("umi_grouped.", ".histogram.txt") + new GroupReadsByUmi(input = in, output = out, familySizeHistogram = Some(hist), rawTag = "RX", assignTag = "MI", strategy = Strategy.Edit, edits = 1, markDuplicates = true).execute() + + val recs = readBamRecs(out) + recs.filter(_.name.equals("a01")).forall(_.duplicate == false) shouldBe true + recs.filter(_.name.equals("a02")).forall(_.duplicate == true) shouldBe true + recs.filter(_.name.equals("a03")).forall(_.duplicate == false) shouldBe true + recs.filter(_.name.equals("a04")).forall(_.duplicate == true) shouldBe true + } + it should "correctly group reads with the paired assigner when the two UMIs are the same" in { val builder = new SamBuilder(readLength=100, sort=Some(SamOrder.Coordinate)) builder.addPair(name="a01", start1=100, start2=300, strand1=Plus, strand2=Minus, attrs=Map("RX" -> "ACT-ACT"))