Skip to content

Commit

Permalink
Straggler mitigation code for duplicate marking and sorting.
Browse files Browse the repository at this point in the history
  • Loading branch information
fnothaft committed Mar 18, 2015
1 parent 75bb22e commit 1268644
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ class CalculateDepth(protected val args: CalculateDepthArgs) extends ADAMSparkCo
val variantNames = vcf.collect().toMap

val joinedRDD: RDD[(ReferenceRegion, AlignmentRecord)] =
if (args.cartesian) BroadcastRegionJoin.cartesianFilter(variantPositions.keyBy(v => v), mappedRDD.keyBy(ReferenceRegion(_).get))
else BroadcastRegionJoin.partitionAndJoin(variantPositions.keyBy(v => v), mappedRDD.keyBy(ReferenceRegion(_).get))
if (args.cartesian) BroadcastRegionJoin.cartesianFilter(variantPositions.keyBy(v => v), mappedRDD.keyBy(ReferenceRegion(_)))
else BroadcastRegionJoin.partitionAndJoin(variantPositions.keyBy(v => v), mappedRDD.keyBy(ReferenceRegion(_)))

val depths: RDD[(ReferenceRegion, Int)] =
joinedRDD.map { case (region, record) => (region, 1) }.reduceByKey(_ + _).sortByKey()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,47 +27,27 @@ import org.bdgenomics.adam.rich.RichAlignmentRecord
import org.bdgenomics.formats.avro.AlignmentRecord

object ReferencePositionPair extends Logging {
private def posForRead(read: AlignmentRecord): Option[ReferencePosition] = {
Some(RichAlignmentRecord(read).fivePrimeReferencePosition)
}

def apply(singleReadBucket: SingleReadBucket): ReferencePositionPair = CreateReferencePositionPair.time {
singleReadBucket.primaryMapped.toSeq.lift(0) match {
case None =>
// No mapped reads
new ReferencePositionPair(None, None)
case Some(read1) =>
val read1pos = posForRead(read1)
if (read1.getReadPaired && read1.getMateMapped) {
singleReadBucket.primaryMapped.toSeq.lift(1) match {
case None =>
// Orphaned read. Missing its mate.
log.warn("%s denoted mate as mapped but mate does not exist".format(read1.getReadName))
new ReferencePositionPair(read1pos, None)
case Some(read2) =>
// Both reads are mapped
val read2pos = posForRead(read2)
if (read1pos.get.disorient.compareTo(read2pos.get.disorient) < 0) {
new ReferencePositionPair(read1pos, read2pos)
} else {
new ReferencePositionPair(read2pos, read1pos)
}
}
} else {
singleReadBucket.primaryMapped.toSeq.lift(1) match {
case None =>
// Mate is not mapped...
new ReferencePositionPair(read1pos, None)
case Some(read2) =>
val read2pos = posForRead(read2)
log.warn("%s claimed to not have mate but mate found".format(read1.getReadName))
if (read1pos.get.disorient.compareTo(read2pos.get.disorient) < 0) {
new ReferencePositionPair(read1pos, read2pos)
} else {
new ReferencePositionPair(read2pos, read1pos)
}
}
}
val firstOfPair = (singleReadBucket.primaryMapped.filter(_.getFirstOfPair) ++
singleReadBucket.unmapped.filter(_.getFirstOfPair)).toSeq
val secondOfPair = (singleReadBucket.primaryMapped.filter(_.getSecondOfPair) ++
singleReadBucket.unmapped.filter(_.getSecondOfPair)).toSeq

def getPos(r: AlignmentRecord): ReferencePosition = {
if (r.getReadMapped) {
new RichAlignmentRecord(r).fivePrimeReferencePosition
} else {
ReferencePosition(r.getSequence, 0L)
}
}

if (firstOfPair.size + secondOfPair.size > 0) {
new ReferencePositionPair(firstOfPair.lift(0).map(getPos),
secondOfPair.lift(0).map(getPos))
} else {
new ReferencePositionPair((singleReadBucket.primaryMapped ++
singleReadBucket.unmapped).toSeq.lift(0).map(getPos),
None)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,33 +246,14 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
log.info("Sorting reads by reference position")

// NOTE: In order to keep unmapped reads from swamping a single partition
// we place them in a range of referenceIds at the end of the file.
// The referenceId is an Int and typical only a few dozen values are even used.
// These referenceId values are not stored; they are only used during sorting.
val unmappedReferenceNames = new Iterator[String] with Serializable {
var currentOffsetFromEnd = 0

def hasNext: Boolean = true

def next(): String = {
currentOffsetFromEnd += 1
if (currentOffsetFromEnd > 10000) {
currentOffsetFromEnd = 0
}
// NB : this is really ugly - any better way to manufacture
// string values that are greater than anything else we care
// about?
"unmapped" + (Int.MaxValue - currentOffsetFromEnd).toString
}
}

rdd.map(p => {
val referencePos = if (p.getReadMapped) {
ReferencePosition(p)
// we sort the unmapped reads by read name. we prefix with "ZZZ" to ensure
// that the read name is lexicographically "after" the contig names
rdd.keyBy(r => {
if (r.getReadMapped) {
ReferencePosition(r)
} else {
ReferencePosition(unmappedReferenceNames.next(), 0)
ReferencePosition("ZZZ%s".format(r.getReadName), 0)
}
(referencePos, p)
}).sortByKey().map(p => p._2)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,7 @@ private[rdd] object MarkDuplicates extends Serializable {
} else {
markReads(reads, areDups = true)
}

})

}

readsAtLeftPos.flatMap(read => { read._2.allReads })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class AlignmentRecordRDDFunctionsSuite extends ADAMFunSuite {
val start = random.nextInt(1000000)
builder.setContig(contig).setStart(start).setEnd(start)
}
builder.setReadName((0 until 20).map(i => (random.nextInt(100) + 64)).mkString)
builder.build()
}
val rdd = sc.parallelize(reads)
Expand Down

0 comments on commit 1268644

Please sign in to comment.