From 126864488f76a34d50ace5b5327d3af13196b897 Mon Sep 17 00:00:00 2001 From: Frank Austin Nothaft Date: Wed, 18 Mar 2015 09:55:10 -0700 Subject: [PATCH] Straggler mitigation code for duplicate marking and sorting. --- .../bdgenomics/adam/cli/CalculateDepth.scala | 4 +- .../adam/models/ReferencePositionPair.scala | 60 +++++++------------ .../read/AlignmentRecordRDDFunctions.scala | 31 ++-------- .../adam/rdd/read/MarkDuplicates.scala | 2 - .../AlignmentRecordRDDFunctionsSuite.scala | 1 + 5 files changed, 29 insertions(+), 69 deletions(-) diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/CalculateDepth.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/CalculateDepth.scala index ab9bbf922d..b56165eda5 100644 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/CalculateDepth.scala +++ b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/CalculateDepth.scala @@ -89,8 +89,8 @@ class CalculateDepth(protected val args: CalculateDepthArgs) extends ADAMSparkCo val variantNames = vcf.collect().toMap val joinedRDD: RDD[(ReferenceRegion, AlignmentRecord)] = - if (args.cartesian) BroadcastRegionJoin.cartesianFilter(variantPositions.keyBy(v => v), mappedRDD.keyBy(ReferenceRegion(_).get)) - else BroadcastRegionJoin.partitionAndJoin(variantPositions.keyBy(v => v), mappedRDD.keyBy(ReferenceRegion(_).get)) + if (args.cartesian) BroadcastRegionJoin.cartesianFilter(variantPositions.keyBy(v => v), mappedRDD.keyBy(ReferenceRegion(_))) + else BroadcastRegionJoin.partitionAndJoin(variantPositions.keyBy(v => v), mappedRDD.keyBy(ReferenceRegion(_))) val depths: RDD[(ReferenceRegion, Int)] = joinedRDD.map { case (region, record) => (region, 1) }.reduceByKey(_ + _).sortByKey() diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/models/ReferencePositionPair.scala b/adam-core/src/main/scala/org/bdgenomics/adam/models/ReferencePositionPair.scala index 1159a3e702..44ae201231 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/models/ReferencePositionPair.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/models/ReferencePositionPair.scala @@ -27,47 +27,27 @@ import org.bdgenomics.adam.rich.RichAlignmentRecord import org.bdgenomics.formats.avro.AlignmentRecord object ReferencePositionPair extends Logging { - private def posForRead(read: AlignmentRecord): Option[ReferencePosition] = { - Some(RichAlignmentRecord(read).fivePrimeReferencePosition) - } - def apply(singleReadBucket: SingleReadBucket): ReferencePositionPair = CreateReferencePositionPair.time { - singleReadBucket.primaryMapped.toSeq.lift(0) match { - case None => - // No mapped reads - new ReferencePositionPair(None, None) - case Some(read1) => - val read1pos = posForRead(read1) - if (read1.getReadPaired && read1.getMateMapped) { - singleReadBucket.primaryMapped.toSeq.lift(1) match { - case None => - // Orphaned read. Missing its mate. - log.warn("%s denoted mate as mapped but mate does not exist".format(read1.getReadName)) - new ReferencePositionPair(read1pos, None) - case Some(read2) => - // Both reads are mapped - val read2pos = posForRead(read2) - if (read1pos.get.disorient.compareTo(read2pos.get.disorient) < 0) { - new ReferencePositionPair(read1pos, read2pos) - } else { - new ReferencePositionPair(read2pos, read1pos) - } - } - } else { - singleReadBucket.primaryMapped.toSeq.lift(1) match { - case None => - // Mate is not mapped... - new ReferencePositionPair(read1pos, None) - case Some(read2) => - val read2pos = posForRead(read2) - log.warn("%s claimed to not have mate but mate found".format(read1.getReadName)) - if (read1pos.get.disorient.compareTo(read2pos.get.disorient) < 0) { - new ReferencePositionPair(read1pos, read2pos) - } else { - new ReferencePositionPair(read2pos, read1pos) - } - } - } + val firstOfPair = (singleReadBucket.primaryMapped.filter(_.getFirstOfPair) ++ + singleReadBucket.unmapped.filter(_.getFirstOfPair)).toSeq + val secondOfPair = (singleReadBucket.primaryMapped.filter(_.getSecondOfPair) ++ + singleReadBucket.unmapped.filter(_.getSecondOfPair)).toSeq + + def getPos(r: AlignmentRecord): ReferencePosition = { + if (r.getReadMapped) { + new RichAlignmentRecord(r).fivePrimeReferencePosition + } else { + ReferencePosition(r.getSequence, 0L) + } + } + + if (firstOfPair.size + secondOfPair.size > 0) { + new ReferencePositionPair(firstOfPair.lift(0).map(getPos), + secondOfPair.lift(0).map(getPos)) + } else { + new ReferencePositionPair((singleReadBucket.primaryMapped ++ + singleReadBucket.unmapped).toSeq.lift(0).map(getPos), + None) } } } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDFunctions.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDFunctions.scala index 8e0f6334b6..8b50f7fb9f 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDFunctions.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDFunctions.scala @@ -246,33 +246,14 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord]) log.info("Sorting reads by reference position") // NOTE: In order to keep unmapped reads from swamping a single partition - // we place them in a range of referenceIds at the end of the file. - // The referenceId is an Int and typical only a few dozen values are even used. - // These referenceId values are not stored; they are only used during sorting. - val unmappedReferenceNames = new Iterator[String] with Serializable { - var currentOffsetFromEnd = 0 - - def hasNext: Boolean = true - - def next(): String = { - currentOffsetFromEnd += 1 - if (currentOffsetFromEnd > 10000) { - currentOffsetFromEnd = 0 - } - // NB : this is really ugly - any better way to manufacture - // string values that are greater than anything else we care - // about? - "unmapped" + (Int.MaxValue - currentOffsetFromEnd).toString - } - } - - rdd.map(p => { - val referencePos = if (p.getReadMapped) { - ReferencePosition(p) + // we sort the unmapped reads by read name. we prefix with "ZZZ" to ensure + // that the read name is lexicographically "after" the contig names + rdd.keyBy(r => { + if (r.getReadMapped) { + ReferencePosition(r) } else { - ReferencePosition(unmappedReferenceNames.next(), 0) + ReferencePosition("ZZZ%s".format(r.getReadName), 0) } - (referencePos, p) }).sortByKey().map(p => p._2) } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/MarkDuplicates.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/MarkDuplicates.scala index c4afada9c7..64aa79e3dc 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/MarkDuplicates.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/MarkDuplicates.scala @@ -118,9 +118,7 @@ private[rdd] object MarkDuplicates extends Serializable { } else { markReads(reads, areDups = true) } - }) - } readsAtLeftPos.flatMap(read => { read._2.allReads }) diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDFunctionsSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDFunctionsSuite.scala index 33469e92ff..37856ae9a8 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDFunctionsSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDFunctionsSuite.scala @@ -41,6 +41,7 @@ class AlignmentRecordRDDFunctionsSuite extends ADAMFunSuite { val start = random.nextInt(1000000) builder.setContig(contig).setStart(start).setEnd(start) } + builder.setReadName((0 until 20).map(i => (random.nextInt(100) + 64)).mkString) builder.build() } val rdd = sc.parallelize(reads)