From 82858c8e597dfc61db0f35f04b2c32c8499e54dc Mon Sep 17 00:00:00 2001 From: Frank Austin Nothaft Date: Tue, 18 Mar 2014 21:28:01 -0700 Subject: [PATCH] Updates to indel realigner to improve performance and accuracy. --- .../consensus/ConsensusGenerator.scala | 53 +++ .../ConsensusGeneratorFromKnowns.scala | 79 ++++ .../ConsensusGeneratorFromReads.scala | 86 ++++ .../ConsensusGeneratorFromSmithWaterman.scala | 74 +++ .../IndelRealignmentTarget.scala | 424 +++++------------- .../RealignmentTargetFinder.scala | 72 +-- .../smithwaterman/SmithWaterman.scala | 167 ++++++- .../SmithWatermanConstantGapScoring.scala | 20 +- .../SmithWatermanGapScoringFromFn.scala | 55 ++- .../bdgenomics/adam/models/Consensus.scala | 25 +- .../bdgenomics/adam/models/IndelTable.scala | 90 ++++ .../adam/models/ReferenceRegion.scala | 4 + .../adam/rdd/ADAMRDDFunctions.scala | 42 +- .../bdgenomics/adam/rdd/RealignIndels.scala | 243 +++++----- .../rdd/variation/ADAMVariationContext.scala | 2 + .../serialization/ADAMKryoRegistrator.scala | 8 +- .../org/bdgenomics/adam/util/MdTag.scala | 73 ++- .../ConsensusGeneratorFromReadsSuite.scala | 44 ++ .../IndelRealignmentTargetSuite.scala | 314 ++++--------- .../smithwaterman/SmithWatermanSuite.scala | 240 ++++++++++ .../adam/models/ConsensusSuite.scala | 7 +- .../adam/models/IndelTableSuite.scala | 81 ++++ .../adam/rdd/RealignIndelsSuite.scala | 51 +-- .../org/bdgenomics/adam/util/MdTagSuite.scala | 62 ++- 24 files changed, 1515 insertions(+), 801 deletions(-) create mode 100644 adam-core/src/main/scala/org/bdgenomics/adam/algorithms/consensus/ConsensusGenerator.scala create mode 100644 adam-core/src/main/scala/org/bdgenomics/adam/algorithms/consensus/ConsensusGeneratorFromKnowns.scala create mode 100644 adam-core/src/main/scala/org/bdgenomics/adam/algorithms/consensus/ConsensusGeneratorFromReads.scala create mode 100644 adam-core/src/main/scala/org/bdgenomics/adam/algorithms/consensus/ConsensusGeneratorFromSmithWaterman.scala create mode 100644 adam-core/src/main/scala/org/bdgenomics/adam/models/IndelTable.scala create mode 100644 adam-core/src/test/scala/org/bdgenomics/adam/algorithms/consensus/ConsensusGeneratorFromReadsSuite.scala create mode 100644 adam-core/src/test/scala/org/bdgenomics/adam/algorithms/smithwaterman/SmithWatermanSuite.scala create mode 100644 adam-core/src/test/scala/org/bdgenomics/adam/models/IndelTableSuite.scala diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/consensus/ConsensusGenerator.scala b/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/consensus/ConsensusGenerator.scala new file mode 100644 index 0000000000..a83421a2d3 --- /dev/null +++ b/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/consensus/ConsensusGenerator.scala @@ -0,0 +1,53 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.adam.algorithms.consensus + +import org.bdgenomics.adam.algorithms.realignmenttarget.IndelRealignmentTarget +import org.bdgenomics.adam.models.{ Consensus, ReferenceRegion } +import org.bdgenomics.adam.rich.RichADAMRecord +import org.apache.spark.rdd.RDD + +abstract class ConsensusGenerator extends Serializable { + + /** + * Generates targets to add to initial set of indel realignment targets, if additional + * targets are necessary. + * + * @return Returns an option which wraps an RDD of indel realignment targets. + */ + def targetsToAdd(): Option[RDD[IndelRealignmentTarget]] + + /** + * Performs any preprocessing specific to this consensus generation algorithm, e.g., + * indel normalization. + * + * @param reads Reads to preprocess. + * @return Preprocessed reads. + */ + def preprocessReadsForRealignment(reads: Iterable[RichADAMRecord], + reference: String, + region: ReferenceRegion): Iterable[RichADAMRecord] + + /** + * For all reads in this region, generates the list of consensus sequences for realignment. + * + * @param reads Reads to generate consensus sequences from. + * @return Consensus sequences to use for realignment. + */ + def findConsensus(reads: Iterable[RichADAMRecord]): Iterable[Consensus] +} diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/consensus/ConsensusGeneratorFromKnowns.scala b/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/consensus/ConsensusGeneratorFromKnowns.scala new file mode 100644 index 0000000000..61e1f94dfe --- /dev/null +++ b/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/consensus/ConsensusGeneratorFromKnowns.scala @@ -0,0 +1,79 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.adam.algorithms.consensus + +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.bdgenomics.adam.rdd.ADAMContext._ +import org.bdgenomics.adam.rdd.variation.ADAMVariationContext._ +import org.bdgenomics.adam.algorithms.realignmenttarget.IndelRealignmentTarget +import org.bdgenomics.adam.models._ +import org.bdgenomics.adam.rich.RichADAMRecord + +class ConsensusGeneratorFromKnowns(file: String, sc: SparkContext) extends ConsensusGenerator { + + val indelTable = sc.broadcast(IndelTable(file, sc)) + + /** + * Generates targets to add to initial set of indel realignment targets, if additional + * targets are necessary. + * + * @return Returns an option which wraps an RDD of indel realignment targets. + */ + def targetsToAdd(): Option[RDD[IndelRealignmentTarget]] = { + val rdd: RDD[ADAMVariantContext] = sc.adamVCFLoad(file) + + Some(rdd.map(_.variant.variant) + .filter(v => v.getReferenceAllele.length != v.getVariantAllele.length) + .map(v => ReferenceRegion(v.getContig.getContigName, v.getPosition, v.getPosition + v.getReferenceAllele.length)) + .map(r => new IndelRealignmentTarget(Some(r), r))) + } + + /** + * Performs any preprocessing specific to this consensus generation algorithm, e.g., + * indel normalization. + * + * @param reads Reads to preprocess. + * @return Preprocessed reads. + */ + def preprocessReadsForRealignment(reads: Iterable[RichADAMRecord], + reference: String, + region: ReferenceRegion): Iterable[RichADAMRecord] = { + reads + } + + /** + * For all reads in this region, generates the list of consensus sequences for realignment. + * + * @param reads Reads to generate consensus sequences from. + * @return Consensus sequences to use for realignment. + */ + def findConsensus(reads: Iterable[RichADAMRecord]): Iterable[Consensus] = { + val table = indelTable.value + + // get region + val start = reads.map(_.record.getStart.toLong).reduce(_ min _) + val end = reads.flatMap(_.end).reduce(_ max _) + val refId = reads.head.record.getContig.getContigName + + val region = ReferenceRegion(refId, start, end + 1) + + // get reads + table.getIndelsInRegion(region) + } +} diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/consensus/ConsensusGeneratorFromReads.scala b/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/consensus/ConsensusGeneratorFromReads.scala new file mode 100644 index 0000000000..b81c03add0 --- /dev/null +++ b/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/consensus/ConsensusGeneratorFromReads.scala @@ -0,0 +1,86 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.adam.algorithms.consensus + +import org.apache.spark.rdd.RDD +import org.bdgenomics.adam.algorithms.realignmenttarget.IndelRealignmentTarget +import org.bdgenomics.adam.models.{ Consensus, ReferenceRegion, ReferencePosition } +import org.bdgenomics.adam.rich.RichADAMRecord +import org.bdgenomics.adam.rich.RichADAMRecord._ +import org.bdgenomics.adam.rich.RichCigar._ +import org.bdgenomics.adam.util.MdTag +import org.bdgenomics.adam.util.ImplicitJavaConversions._ +import org.bdgenomics.adam.util.NormalizationUtils._ +import org.bdgenomics.formats.avro.ADAMRecord + +class ConsensusGeneratorFromReads extends ConsensusGenerator { + + /** + * No targets to add if generating consensus targets from reads. + * + * @return Returns a None. + */ + def targetsToAdd(): Option[RDD[IndelRealignmentTarget]] = None + + /** + * Performs read preprocessing by normalizing indels for all reads that have evidence of one + * indel. + * + * @param reads Reads to process. + * @return Reads with indels normalized if they contain a single indel. + */ + def preprocessReadsForRealignment(reads: Iterable[RichADAMRecord], + reference: String, + region: ReferenceRegion): Iterable[RichADAMRecord] = { + reads.map(r => { + // if there are two alignment blocks (sequence matches) then there is a single indel in the read + if (r.samtoolsCigar.numAlignmentBlocks == 2) { + // left align this indel and update the mdtag + val cigar = leftAlignIndel(r) + val mdTag = MdTag.moveAlignment(r, cigar) + + val newRead: RichADAMRecord = ADAMRecord.newBuilder(r) + .setCigar(cigar.toString) + .setMismatchingPositions(mdTag.toString()) + .build() + + newRead + } else { + r + } + }) + } + + /** + * Generates concensus sequences from reads with indels. + */ + def findConsensus(reads: Iterable[RichADAMRecord]): Iterable[Consensus] = { + reads.filter(r => r.mdTag.isDefined) + .flatMap(r => { + // try to generate a consensus alignment - if a consensus exists, add it to our + // list of consensuses to test + Consensus.generateAlternateConsensus(r.getSequence, + ReferencePosition(r.getContig.getContigName, + r.getStart), + r.samtoolsCigar) + }) + .toSeq + .distinct + } + +} diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/consensus/ConsensusGeneratorFromSmithWaterman.scala b/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/consensus/ConsensusGeneratorFromSmithWaterman.scala new file mode 100644 index 0000000000..35d96148d6 --- /dev/null +++ b/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/consensus/ConsensusGeneratorFromSmithWaterman.scala @@ -0,0 +1,74 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.adam.algorithms.consensus + +import org.bdgenomics.adam.algorithms.smithwaterman.SmithWatermanConstantGapScoring +import org.bdgenomics.adam.models.ReferenceRegion +import org.bdgenomics.adam.rich.RichADAMRecord +import org.bdgenomics.adam.rich.RichADAMRecord._ +import org.bdgenomics.adam.rich.RichCigar._ +import org.bdgenomics.adam.util.MdTag +import org.bdgenomics.formats.avro.ADAMRecord + +class ConsensusGeneratorFromSmithWaterman(wMatch: Double, + wMismatch: Double, + wInsert: Double, + wDelete: Double) extends ConsensusGeneratorFromReads { + + /** + * Attempts realignment of all reads using Smith-Waterman. Accepts all realignments that have one + * or fewer indels. + * + * @param reads Reads to process. + * @return Reads with indels normalized if they contain a single indel. + */ + override def preprocessReadsForRealignment(reads: Iterable[RichADAMRecord], + reference: String, + region: ReferenceRegion): Iterable[RichADAMRecord] = { + val rds: Iterable[RichADAMRecord] = reads.map(r => { + + val sw = new SmithWatermanConstantGapScoring(r.record.getSequence.toString, + reference, + wMatch, + wMismatch, + wInsert, + wDelete) + println("for " + r.record.getReadName + " sw to " + sw.xStart + " with " + sw.cigarX) + + // if we realign with fewer than three alignment blocks, then take the new alignment + if (sw.cigarX.numAlignmentBlocks <= 2) { + val mdTag = MdTag(r.record.getSequence.toString, + reference.drop(sw.xStart), + sw.cigarX, + region.start) + + val newRead: RichADAMRecord = ADAMRecord.newBuilder(r) + .setStart(sw.xStart + region.start) + .setCigar(sw.cigarX.toString) + .setMismatchingPositions(mdTag.toString()) + .build() + + newRead + } else { + r + } + }) + + super.preprocessReadsForRealignment(rds, reference, region) + } +} diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/realignmenttarget/IndelRealignmentTarget.scala b/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/realignmenttarget/IndelRealignmentTarget.scala index ca16942f91..70b4ad9b08 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/realignmenttarget/IndelRealignmentTarget.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/realignmenttarget/IndelRealignmentTarget.scala @@ -17,13 +17,15 @@ */ package org.bdgenomics.adam.algorithms.realignmenttarget -import org.bdgenomics.formats.avro.{ ADAMPileup, ADAMRecord } -import org.bdgenomics.adam.rich.RichADAMRecord._ -import scala.collection.immutable.{ TreeSet, HashSet, NumericRange } import com.esotericsoftware.kryo.{ Kryo, Serializer } import com.esotericsoftware.kryo.io.{ Input, Output } +import net.sf.samtools.CigarOperator import org.apache.spark.Logging -import scala.util.Sorting.quickSort +import org.bdgenomics.adam.models.ReferenceRegion +import org.bdgenomics.adam.rdd.ADAMContext._ +import org.bdgenomics.adam.rich.RichADAMRecord +import org.bdgenomics.formats.avro.ADAMRecord +import scala.collection.immutable.TreeSet object ZippedTargetOrdering extends Ordering[(IndelRealignmentTarget, Int)] { @@ -35,7 +37,7 @@ object ZippedTargetOrdering extends Ordering[(IndelRealignmentTarget, Int)] { * @return Comparison done by starting position. */ def compare(a: (IndelRealignmentTarget, Int), b: (IndelRealignmentTarget, Int)): Int = { - a._1.getReadRange.start compare b._1.getReadRange.start + TargetOrdering.compare(a._1, b._1) } } @@ -48,37 +50,32 @@ object TargetOrdering extends Ordering[IndelRealignmentTarget] { * @param b Indel realignment target to compare. * @return Comparison done by starting position. */ - def compare(a: IndelRealignmentTarget, b: IndelRealignmentTarget): Int = a.getReadRange.start compare b.getReadRange.start + def compare(a: IndelRealignmentTarget, b: IndelRealignmentTarget): Int = a.readRange compare b.readRange /** - * Compares a read to an indel realignment target to see if it starts before the start of the indel realignment target. + * Check to see if an indel realignment target contains the given read. * * @param target Realignment target to compare. * @param read Read to compare. - * @return True if start of read is before the start of the indel alignment target. + * @return True if read alignment is contained in target span. */ - def lt(target: IndelRealignmentTarget, read: ADAMRecord): Boolean = target.getReadRange.start < read.getStart + def contains(target: IndelRealignmentTarget, read: ADAMRecord): Boolean = { + val reg = RichADAMRecord(read).readRegion - /** - * Check to see if an indel realignment target and a read are mapped over the same length. - * - * @param target Realignment target to compare. - * @param read Read to compare. - * @return True if read alignment span is identical to the target span. - */ - def equals(target: IndelRealignmentTarget, read: ADAMRecord): Boolean = { - (target.getReadRange.start == read.getStart) && (target.getReadRange.end == read.end.get) + reg.forall(r => target.readRange.overlaps(r)) } /** - * Check to see if an indel realignment target contains the given read. + * Compares a read to an indel realignment target to see if it starts before the start of the indel realignment target. * * @param target Realignment target to compare. * @param read Read to compare. - * @return True if read alignment is contained in target span. + * @return True if start of read is before the start of the indel alignment target. */ - def contains(target: IndelRealignmentTarget, read: ADAMRecord): Boolean = { - (target.getReadRange.start <= read.getStart) && (target.getReadRange.end >= read.end.get - 1) // -1 since read end is non-inclusive + def lt(target: IndelRealignmentTarget, read: RichADAMRecord): Boolean = { + val region = read.readRegion + + region.forall(r => target.readRange.compare(r) < 0) } /** @@ -89,280 +86,63 @@ object TargetOrdering extends Ordering[IndelRealignmentTarget] { * @return True if two targets overlap. */ def overlap(a: IndelRealignmentTarget, b: IndelRealignmentTarget): Boolean = { - // Note: the last two conditions were added for completeness; they should generally not - // be necessary although maybe in weird cases (indel on both reads in a mate pair that - // span a structural variant) and then one probably would not want to re-align these - // together. - // TODO: introduce an upper bound on re-align distance as GATK does?? - ((a.getReadRange.start >= b.getReadRange.start && a.getReadRange.start <= b.getReadRange.end) || - (a.getReadRange.end >= b.getReadRange.start && a.getReadRange.end <= b.getReadRange.start) || - (a.getReadRange.start >= b.getReadRange.start && a.getReadRange.end <= b.getReadRange.end) || - (b.getReadRange.start >= a.getReadRange.start && b.getReadRange.end <= a.getReadRange.end)) - } -} - -abstract class GenericRange(val readRange: NumericRange[Long]) { - - def getReadRange(): NumericRange[Long] = readRange - - def merge(r: GenericRange): GenericRange - - def compareRange(other: GenericRange): Int - - def compareReadRange(other: GenericRange) = { - if (readRange.start != other.getReadRange().start) - readRange.start.compareTo(other.getReadRange().start) - else - readRange.end.compareTo(other.getReadRange().end) - } -} - -object IndelRange { - val emptyRange = IndelRange( - new NumericRange.Inclusive[Long](-1, -1, 1), - new NumericRange.Inclusive[Long](-1, -1, 1)) -} - -case class IndelRange(indelRange: NumericRange[Long], override val readRange: NumericRange[Long]) extends GenericRange(readRange) with Ordered[IndelRange] { - - /** - * Merge two identical indel ranges. - * - * @param ir Indel range to merge in. - * @return Merged range. - */ - override def merge(ir: GenericRange): IndelRange = { - if (this == IndelRange.emptyRange) - ir - - assert(indelRange == ir.asInstanceOf[IndelRange].getIndelRange) - // do not need to check read range - read range must contain indel range, so if - // indel range is the same, read ranges will overlap - - new IndelRange(indelRange, - new NumericRange.Inclusive[Long]( - readRange.start min ir.readRange.start, - readRange.end max ir.readRange.end, - 1)) - } - - def getIndelRange(): NumericRange[Long] = indelRange - - override def compareRange(other: GenericRange): Int = - if (indelRange.start != other.asInstanceOf[IndelRange].indelRange.start) - indelRange.start.compareTo(other.asInstanceOf[IndelRange].indelRange.start) - else - indelRange.end.compareTo(other.asInstanceOf[IndelRange].indelRange.end) - - override def compare(other: IndelRange): Int = { - val cmp = compareRange(other) - if (cmp != 0) - cmp - else - super.compareReadRange(other) - } -} - -class IndelRangeSerializer extends Serializer[IndelRange] { - def write(kryo: Kryo, output: Output, obj: IndelRange) = { - output.writeLong(obj.getIndelRange().start) - output.writeLong(obj.getIndelRange().end) - output.writeLong(obj.getReadRange().start) - output.writeLong(obj.getReadRange().end) - } - - def read(kryo: Kryo, input: Input, klazz: Class[IndelRange]): IndelRange = { - val irStart = input.readLong() - val irEnd = input.readLong() - val rrStart = input.readLong() - val rrEnd = input.readLong() - new IndelRange( - new NumericRange.Inclusive[Long](irStart, irEnd, 1), - new NumericRange.Inclusive[Long](rrStart, rrEnd, 1)) - } -} - -object SNPRange { - val emptyRange = SNPRange( - -1L, - new NumericRange.Inclusive[Long](-1, -1, 1)) -} - -case class SNPRange(snpSite: Long, override val readRange: NumericRange[Long]) extends GenericRange(readRange) with Ordered[SNPRange] { - - /** - * Merge two identical SNP sites. - * - * @param sr SNP range to merge in. - * @return Merged SNP range. - */ - override def merge(sr: GenericRange): SNPRange = { - if (this == SNPRange.emptyRange) - sr - - assert(snpSite == sr.asInstanceOf[SNPRange].getSNPSite) - // do not need to check read range - read range must contain snp site, so if - // snp site is the same, read ranges will overlap - - new SNPRange(snpSite, - new NumericRange.Inclusive[Long]( - readRange.start min sr.readRange.start, - readRange.end max sr.readRange.end, - 1)) - } - - def getSNPSite(): Long = snpSite - - override def compare(other: SNPRange): Int = { - val cmp = compareRange(other) - if (cmp != 0) - cmp - else - super.compareReadRange(other) - } - - override def compareRange(other: GenericRange): Int = - snpSite.compareTo(other.asInstanceOf[SNPRange].snpSite) -} - -class SNPRangeSerializer extends Serializer[SNPRange] { - def write(kryo: Kryo, output: Output, obj: SNPRange) = { - output.writeLong(obj.getSNPSite()) - output.writeLong(obj.getReadRange().start) - output.writeLong(obj.getReadRange().end) - } - - def read(kryo: Kryo, input: Input, klazz: Class[SNPRange]): SNPRange = { - val SNPSite = input.readLong() - val rrStart = input.readLong() - val rrEnd = input.readLong() - new SNPRange( - SNPSite, - new NumericRange.Inclusive[Long](rrStart, rrEnd, 1)) + (a.variation.isDefined && a.variation.forall(_.overlaps(b.readRange))) || + (b.variation.isDefined && b.variation.forall(_.overlaps(a.readRange))) } } object IndelRealignmentTarget { - // threshold for determining whether a pileup contains sufficient mismatch evidence - val mismatchThreshold = 0.15 - /** - * Generates an indel realignment target from a pileup. + * Generates 1+ indel realignment targets from a single read. * - * @param rod Base pileup. - * @return Generated realignment target. + * @param read Read to use for generation. + * @param maxIndelSize Maximum allowable size of an indel. + * @return Set of generated realignment targets. */ - def apply(rod: Iterable[ADAMPileup]): IndelRealignmentTarget = { - - /** - * If we have a indel in a pileup position, generates an indel range. - * - * @param pileup Single pileup position. - * @return Indel range. - */ - def mapEvent(pileup: ADAMPileup): IndelRange = { - Option(pileup.getReadBase) match { - case None => { - // deletion - new IndelRange( - new NumericRange.Inclusive[Long]( - pileup.getPosition.toLong - pileup.getRangeOffset.toLong, - pileup.getPosition.toLong + pileup.getRangeLength.toLong - pileup.getRangeOffset.toLong - 1, - 1), - new NumericRange.Inclusive[Long](pileup.getReadStart.toLong, pileup.getReadEnd.toLong - 1, 1)) + def apply(read: RichADAMRecord, + maxIndelSize: Int): Seq[IndelRealignmentTarget] = { + + val region = read.readRegion.get + val refId = read.record.getContig.getContigName + var pos = List[ReferenceRegion]() + var referencePos = read.record.getStart + val cigar = read.samtoolsCigar + + cigar.getCigarElements.foreach(cigarElement => + cigarElement.getOperator match { + // INSERT + case CigarOperator.I => { + if (cigarElement.getLength <= maxIndelSize) { + pos ::= ReferenceRegion(refId, referencePos, referencePos + 1) + } } - case Some(o) => { - // insert - new IndelRange( - new NumericRange.Inclusive[Long](pileup.getPosition.toLong, pileup.getPosition.toLong, 1), - new NumericRange.Inclusive[Long](pileup.getReadStart.toLong, pileup.getReadEnd.toLong - 1, 1)) + // DELETE + case CigarOperator.D => { + if (cigarElement.getLength <= maxIndelSize) { + pos ::= ReferenceRegion(refId, referencePos, referencePos + cigarElement.getLength) + } + referencePos += cigarElement.getLength } - } - } - - /** - * If we have a point event, generates a SNPRange. - * - * @param pileup Pileup position with mismatch evidence. - * @return SNP range. - */ - def mapPoint(pileup: ADAMPileup): SNPRange = { - val range: NumericRange.Inclusive[Long] = - new NumericRange.Inclusive[Long](pileup.getReadStart.toLong, pileup.getReadEnd.toLong - 1, 1) - new SNPRange(pileup.getPosition, range) - } - - // segregate into indels, matches, and mismatches - val indels = extractIndels(rod) - val matches = extractMatches(rod) - val mismatches = extractMismatches(rod) - - // TODO: this assumes Sanger encoding; how about older data? Should there be a property somewhere? - // calculate the quality of the matches and the mismatches - val matchQuality: Int = - if (matches.size > 0) - matches.map(_.getSangerQuality).reduce(_ + _) - else - 0 - val mismatchQuality: Int = - if (mismatches.size > 0) - mismatches.map(_.getSangerQuality).reduce(_ + _) - else - 0 + case _ => { + if (cigarElement.getOperator.consumesReferenceBases()) { + referencePos += cigarElement.getLength + } + } + }) - // check our mismatch ratio - if we have a sufficiently high ratio of mismatch quality, generate a snp event, else just generate indel events - if (matchQuality == 0 || mismatchQuality.toDouble / matchQuality.toDouble >= mismatchThreshold) { - new IndelRealignmentTarget( - new HashSet[IndelRange]().union(indels.map(mapEvent).toSet), - new HashSet[SNPRange]().union(mismatches.map(mapPoint).toSet)) + // if we have indels, emit those targets, else emit a target for this read + if (pos.length == 0) { + Seq(new IndelRealignmentTarget(None, region)) } else { - new IndelRealignmentTarget( - new HashSet[IndelRange]().union(indels.map(mapEvent).toSet), HashSet[SNPRange]()) + pos.map(ir => new IndelRealignmentTarget(Some(ir), region)) + .toSeq } } - - def extractMismatches(rod: Iterable[ADAMPileup]): Iterable[ADAMPileup] = { - rod.filter(r => r.getRangeOffset == null && r.getNumSoftClipped == 0) - .filter(r => r.getReadBase != r.getReferenceBase) - } - - def extractMatches(rod: Iterable[ADAMPileup]): Iterable[ADAMPileup] = - rod.filter(r => r.getRangeOffset == null && r.getNumSoftClipped == 0) - .filter(r => r.getReadBase == r.getReferenceBase) - - def extractIndels(rod: Iterable[ADAMPileup]): Iterable[ADAMPileup] = - rod.filter(_.getRangeOffset != null) - - /** - * @return An empty target that has no indel nor SNP evidence. - */ - def emptyTarget(): IndelRealignmentTarget = { - new IndelRealignmentTarget(new HashSet[IndelRange](), new HashSet[SNPRange]()) - } -} - -class RangeAccumulator[T <: GenericRange](val data: List[T], val previous: T) { - def accumulate(current: T): RangeAccumulator[T] = { - if (previous == null) - new RangeAccumulator[T](data, current) - else if (previous.compareRange(current) == 0) - new RangeAccumulator[T](data, previous.merge(current).asInstanceOf[T]) - else - new RangeAccumulator[T](previous :: data, current) - } } -class IndelRealignmentTarget(val indelSet: Set[IndelRange], val snpSet: Set[SNPRange]) extends Logging { - - // the maximum range covered by either snps or indels - def readRange: NumericRange.Inclusive[Long] = { - ( - indelSet.toList.map(_.getReadRange.asInstanceOf[NumericRange.Inclusive[Long]]) ++ - snpSet.toList.map(_.getReadRange.asInstanceOf[NumericRange.Inclusive[Long]])).reduce( - (a: NumericRange.Inclusive[Long], b: NumericRange.Inclusive[Long]) => - new NumericRange.Inclusive[Long]((a.start min b.start), (a.end max b.end), 1)) - } +class IndelRealignmentTarget(val variation: Option[ReferenceRegion], + val readRange: ReferenceRegion) extends Logging { /** * Merges two indel realignment targets. @@ -371,63 +151,61 @@ class IndelRealignmentTarget(val indelSet: Set[IndelRange], val snpSet: Set[SNPR * @return Merged target. */ def merge(target: IndelRealignmentTarget): IndelRealignmentTarget = { - - // TODO: this is unnecessarily wasteful; if the sets themselves - // were sorted (requires refactoring) we could achieve the same - // in a single merge (as in mergesort) operation. This should - // be done once correctness has been established - val currentIndelSet = indelSet.union(target.getIndelSet()).toArray - quickSort(currentIndelSet) - - val accumulator: RangeAccumulator[IndelRange] = new RangeAccumulator[IndelRange](List(), null) - val newIndelSetAccumulated: RangeAccumulator[IndelRange] = currentIndelSet.foldLeft(accumulator) { - (acc, elem) => acc.accumulate(elem) + assert(readRange.isAdjacent(target.readRange) || readRange.overlaps(target.readRange), + "Targets do not overlap, and therefore cannot be merged.") + + val newVar = if (variation.isDefined && target.variation.isDefined) { + Some(variation.get.hull(target.variation.get)) + } else if (variation.isDefined) { + variation + } else if (target.variation.isDefined) { + target.variation + } else { + None } - if (newIndelSetAccumulated.previous == null) // without the if we end up with a singleton set with null as element - new IndelRealignmentTarget(newIndelSetAccumulated.data.toSet, snpSet ++ target.getSNPSet) - else - new IndelRealignmentTarget(newIndelSetAccumulated.data.toSet + newIndelSetAccumulated.previous, snpSet ++ target.getSNPSet) + new IndelRealignmentTarget(newVar, readRange.merge(target.readRange)) } - def isEmpty(): Boolean = { - indelSet.isEmpty && snpSet.isEmpty + def isEmpty: Boolean = { + variation.isEmpty } +} - def getReadRange(): NumericRange[Long] = { - if ((snpSet != null || indelSet != null) - && (readRange == null)) - log.warn("snpSet or indelSet non-empty but readRange empty!") - readRange - } +class TargetSetSerializer extends Serializer[TargetSet] { - def getSortKey(): Long = { - if (readRange != null) - readRange.start - else if (!getIndelSet().isEmpty && getSNPSet().isEmpty) - getIndelSet().head.getReadRange().start - else if (getIndelSet().isEmpty && !getSNPSet().isEmpty) - getSNPSet().head.getReadRange().start - else { - log.error("unknown sort key for IndelRealignmentTarget") - -1.toLong - } + def write(kryo: Kryo, output: Output, obj: TargetSet) = { + kryo.writeClassAndObject(output, obj.set.toList) + } + def read(kryo: Kryo, input: Input, klazz: Class[TargetSet]): TargetSet = { + new TargetSet(new TreeSet()(TargetOrdering) + .union(kryo.readClassAndObject(input).asInstanceOf[List[IndelRealignmentTarget]].toSet)) } +} - protected[realignmenttarget] def getSNPSet(): Set[SNPRange] = snpSet +class ZippedTargetSetSerializer extends Serializer[ZippedTargetSet] { - protected[realignmenttarget] def getIndelSet(): Set[IndelRange] = indelSet + def write(kryo: Kryo, output: Output, obj: ZippedTargetSet) = { + kryo.writeClassAndObject(output, obj.set.toList) + } + def read(kryo: Kryo, input: Input, klazz: Class[ZippedTargetSet]): ZippedTargetSet = { + new ZippedTargetSet(new TreeSet()(ZippedTargetOrdering) + .union(kryo.readClassAndObject(input).asInstanceOf[List[(IndelRealignmentTarget, Int)]].toSet)) + } } -class TreeSetSerializer extends Serializer[TreeSet[IndelRealignmentTarget]] { - - def write(kryo: Kryo, output: Output, obj: TreeSet[IndelRealignmentTarget]) = { - kryo.writeClassAndObject(output, obj.toList) +object TargetSet { + def apply(): TargetSet = { + new TargetSet(TreeSet[IndelRealignmentTarget]()(TargetOrdering)) } +} - def read(kryo: Kryo, input: Input, klazz: Class[TreeSet[IndelRealignmentTarget]]): TreeSet[IndelRealignmentTarget] = { - new TreeSet()(TargetOrdering).union(kryo.readClassAndObject(input).asInstanceOf[List[IndelRealignmentTarget]].toSet) - } +// These two case classes are needed to get around some serialization issues +case class TargetSet(set: TreeSet[IndelRealignmentTarget]) extends Serializable { } + +case class ZippedTargetSet(set: TreeSet[(IndelRealignmentTarget, Int)]) extends Serializable { +} + diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/realignmenttarget/RealignmentTargetFinder.scala b/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/realignmenttarget/RealignmentTargetFinder.scala index 5e3e8cdb5c..f3f3aa1e98 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/realignmenttarget/RealignmentTargetFinder.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/realignmenttarget/RealignmentTargetFinder.scala @@ -17,13 +17,11 @@ */ package org.bdgenomics.adam.algorithms.realignmenttarget -import org.apache.spark.rdd.RDD -import org.apache.spark.SparkContext._ import org.apache.spark.Logging -import org.bdgenomics.formats.avro.{ ADAMRecord, ADAMPileup } +import org.apache.spark.rdd.RDD +import org.bdgenomics.adam.rich.RichADAMRecord import scala.annotation.tailrec import scala.collection.immutable.TreeSet -import org.bdgenomics.adam.rdd.ADAMContext._ object RealignmentTargetFinder { @@ -33,8 +31,10 @@ object RealignmentTargetFinder { * @param rdd RDD of reads to use in generating realignment targets. * @return Sorted set of realignment targets. */ - def apply(rdd: RDD[ADAMRecord]): TreeSet[IndelRealignmentTarget] = { - new RealignmentTargetFinder().findTargets(rdd) + def apply(rdd: RDD[RichADAMRecord], + maxIndelSize: Int = 500, + maxTargetSize: Int = 3000): TreeSet[IndelRealignmentTarget] = { + new RealignmentTargetFinder().findTargets(rdd, maxIndelSize, maxTargetSize).set } } @@ -43,23 +43,27 @@ class RealignmentTargetFinder extends Serializable with Logging { /** * Joins two sorted sets of targets together. Is tail call recursive. * - * @param first A sorted set of realignment targets. This set must be ordered ahead of the second set. + * @note This function should not be called in a context where target set serialization is needed. + * Instead, call joinTargets(TargetSet, TargetSet), which wraps this function. + * + * @param first A sorted set of realignment targets. This set must be ordered ahead of the + * second set. * @param second A sorted set of realignment targets. * @return A merged set of targets. */ - // TODO: it seems that the old way of merging allows for duplicate targets (say two copies of the same indel - // that have been generated from two different reads that therefore have different read ranges) - // That should be fixed now, see the change in merging. @tailrec protected final def joinTargets( first: TreeSet[IndelRealignmentTarget], second: TreeSet[IndelRealignmentTarget]): TreeSet[IndelRealignmentTarget] = { - if (!first.isEmpty && second.isEmpty) + if (first.isEmpty && second.isEmpty) { + TreeSet[IndelRealignmentTarget]()(TargetOrdering) + } else if (second.isEmpty) { first - else if (first.isEmpty && !second.isEmpty) + } else if (first.isEmpty) { second - else { - // if the two sets overlap, we must merge their head and tail elements, else we can just blindly append + } else { + // if the two sets overlap, we must merge their head and tail elements, + // else we can just blindly append if (!TargetOrdering.overlap(first.last, second.head)) { first.union(second) } else { @@ -69,33 +73,47 @@ class RealignmentTargetFinder extends Serializable with Logging { } } + /** + * Wrapper for joinTargets(TreeSet[IndelRealignmentTarget], TreeSet[IndelRealignmentTarget]) + * for contexts where serialization is needed. + * + * @param first A sorted set of realignment targets. This set must be ordered ahead of the + * second set. + * @param second A sorted set of realignment targets. + * @return A merged set of targets. + */ + def joinTargets(first: TargetSet, + second: TargetSet): TargetSet = { + new TargetSet(joinTargets(first.set, second.set)) + } + /** * Finds indel targets over a set of reads. * * @param reads An RDD containing reads to generate indel realignment targets from. * @return An ordered set of indel realignment targets. */ - def findTargets(reads: RDD[ADAMRecord]): TreeSet[IndelRealignmentTarget] = { + def findTargets(reads: RDD[RichADAMRecord], + maxIndelSize: Int = 500, + maxTargetSize: Int = 3000): TargetSet = { - // generate pileups from reads - val rods: RDD[Iterable[ADAMPileup]] = reads.adamRecords2Pileup(true) - .groupBy(_.getPosition).map(_._2) - - def createTreeSet(target: IndelRealignmentTarget): TreeSet[IndelRealignmentTarget] = { + def createTargetSet(target: IndelRealignmentTarget): TargetSet = { val tmp = new TreeSet()(TargetOrdering) - tmp + target + new TargetSet(tmp + target) } /* for each rod, generate an indel realignment target. we then filter out all "empty" targets: these * are targets which do not show snp/indel evidence. we order these targets by reference position, and * merge targets who have overlapping positions */ - val targetSet = rods.map(IndelRealignmentTarget(_)) - .filter(!_.isEmpty) - .keyBy(_.getSortKey()) - .sortByKey() - .map(x => createTreeSet(x._2)).collect() - .fold(new TreeSet()(TargetOrdering))(joinTargets) + val targets = reads.flatMap(IndelRealignmentTarget(_, maxIndelSize)) + .filter(t => !t.isEmpty) + + val targetSet: TargetSet = TargetSet(targets.mapPartitions(iter => iter.toArray.sorted(TargetOrdering).toIterator) + .map(createTargetSet) + .fold(TargetSet())((t1: TargetSet, t2: TargetSet) => joinTargets(t1, t2)) + .set.filter(_.readRange.length <= maxTargetSize)) + targetSet } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/smithwaterman/SmithWaterman.scala b/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/smithwaterman/SmithWaterman.scala index 6318c6302a..49f78c0ce8 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/smithwaterman/SmithWaterman.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/smithwaterman/SmithWaterman.scala @@ -17,19 +17,168 @@ */ package org.bdgenomics.adam.algorithms.smithwaterman -import net.sf.samtools.Cigar +import net.sf.samtools.{ Cigar, TextCigarCodec } +import scala.annotation.tailrec -abstract class SmithWaterman(xSequence: String, ySequence: String) { +private[smithwaterman] object SmithWaterman { + val CIGAR_CODEC: TextCigarCodec = TextCigarCodec.getSingleton +} + +abstract class SmithWaterman(xSequence: String, ySequence: String) extends Serializable { + + lazy val (scoringMatrix, moveMatrix) = buildScoringMatrix() + lazy val (cigarX, cigarY, xStart, yStart) = trackback(scoringMatrix, moveMatrix) + + /** + * Builds Smith-Waterman scoring matrix. + * + * @return 2D array of doubles, along with move direction at each point. + * + * @note To work with the move function, expected move directions are: + * + * * I: move in I coordinate + * * J: move in J coordinate + * * B: move in both I and J + * * T: terminate move + * + * @see move + */ + private[smithwaterman] def buildScoringMatrix(): (Array[Array[Double]], Array[Array[Char]]) + + /** + * Finds coordinates of a matrix with highest value. + * + * @param matrix Matrix to score. + * @return Tuple of (i, j) coordinates. + */ + private[smithwaterman] final def maxCoordinates(matrix: Array[Array[Double]]): (Int, Int) = { + def maxInCol(col: Array[Double]): (Double, Int) = { + def takeMax(a: (Double, Int), b: (Double, Int)): (Double, Int) = { + if (a._1 > b._1) { + a + } else { + b + } + } + + val c: Array[(Double, Int)] = col.zipWithIndex + + c.reduce(takeMax) + } + + def maxCol(cols: Array[(Double, Int)]): (Int, Int) = { + def takeMax(a: (Double, Int, Int), b: (Double, Int, Int)): (Double, Int, Int) = { + if (a._1 > b._1) { + a + } else { + b + } + } + + val c: Array[((Double, Int), Int)] = cols.zipWithIndex + + val m: (Double, Int, Int) = c.map(kv => (kv._1._1, kv._1._2, kv._2)) + .reduce(takeMax) + + (m._2, m._3) + } + + maxCol(matrix.map(maxInCol)) + } + + /** + * Converts a reversed non-numeric CIGAR into a normal CIGAR. + * + * @note A reversed non-numeric CIGAR is a CIGAR where each alignment block + * has length = 1, and the alignment block ordering goes from end-to-beginning. E.g., + * the equivalent of the CIGAR 4M2D1M would be MDDMMMM. + * + * @param nnc Reversed non-numeric CIGAR. + * @return A normal CIGAR. + */ + private[smithwaterman] def cigarFromRNNCigar(nnc: String): String = { + + @tailrec def buildCigar(last: Char, runCount: Int, nnc: String, cigar: String): String = { + if (nnc.length == 0) { + (runCount.toString + last) + cigar + } else { + val (next, nrc, nc) = if (nnc.head == last) { + (last, runCount + 1, cigar) + } else { + (nnc.head, 1, (runCount.toString + last) + cigar) + } + + buildCigar(next, nrc, nnc.drop(1), nc) + } + } + + buildCigar(nnc.head, 1, nnc.drop(1), "") + } + + /** + * Recursive function to do backtrack. + * + * @param matrix Matrix to track back upon. + * @param i Current position in x sequence. + * @param j Current position in y sequence. + * @param cX Current reversed non-numeric CIGAR for the X sequence. + * @param cY Current reversed non-numeric CIGAR for the Y sequence. + * @return Returns the alignment CIGAR for the X and Y sequences, along with start indices. + * + * @note To work with the move function, expected move directions are: + * + * * I: move in I coordinate + * * J: move in J coordinate + * * B: move in both I and J + * * T: terminate move + * + * @see buildScoringMatrix + */ + @tailrec private[smithwaterman] final def move(matrix: Array[Array[Char]], + i: Int, + j: Int, + cX: String, + cY: String): (String, String, Int, Int) = { + if (matrix(i)(j) == 'T') { + // return if told to terminate + (cigarFromRNNCigar(cX), cigarFromRNNCigar(cY), i, j) + } else { + // find next move + val (in, jn, cXn, cYn) = if (matrix(i)(j) == 'B') { + (i - 1, j - 1, cX + "M", cY + "M") + } else if (matrix(i)(j) == 'J') { + (i - 1, j, cX + "I", cY + "D") + } else { + (i, j - 1, cX + "D", cY + "I") + } + + // recurse + move(matrix, in, jn, cXn, cYn) + } + } - var max = 0.0 - var maxX = 0 - var maxY = 0 + /** + * Runs trackback on scoring matrix. + * + * @param scoreMatrix Scored matrix to track back on. + * @param moveMatrix Move matrix to track back on. + * @return Tuple of Cigar for X, Y. + */ + private[smithwaterman] def trackback(scoreMatrix: Array[Array[Double]], + moveMatrix: Array[Array[Char]]): (Cigar, Cigar, Int, Int) = { + assert(scoreMatrix.length == xSequence.length + 1) + assert(scoreMatrix.forall(_.length == ySequence.length + 1)) + assert(moveMatrix.length == xSequence.length + 1) + assert(moveMatrix.forall(_.length == ySequence.length + 1)) - lazy val scoringMatrix = buildScoringMatrix - lazy val (cigarX, cigarY, alignmentX, alignmentY) = trackback + // get the position of the max scored box - start trackback here + val (sx, sy) = maxCoordinates(scoreMatrix) - def buildScoringMatrix(): Array[Array[Double]] + // run trackback + val (cX, cY, xI, yI) = move(moveMatrix, sy, sx, "", "") - protected def trackback(): (Cigar, Cigar, String, String) + // get cigars and return + (SmithWaterman.CIGAR_CODEC.decode(cX), SmithWaterman.CIGAR_CODEC.decode(cY), xI, yI) + } } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/smithwaterman/SmithWatermanConstantGapScoring.scala b/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/smithwaterman/SmithWatermanConstantGapScoring.scala index 93fdabcc30..1b8d2c7302 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/smithwaterman/SmithWatermanConstantGapScoring.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/smithwaterman/SmithWatermanConstantGapScoring.scala @@ -20,11 +20,11 @@ package org.bdgenomics.adam.algorithms.smithwaterman object SmithWatermanConstantGapScoring { protected def constantGapFn(wMatch: Double, wDelete: Double, wInsert: Double, wMismatch: Double)(x: Int, y: Int, i: Char, j: Char): Double = { - if (x == y) { + if (i == j) { wMatch - } else if (x == '_') { + } else if (i == '_') { wDelete - } else if (y == '_') { + } else if (j == '_') { wInsert } else { wMismatch @@ -33,11 +33,11 @@ object SmithWatermanConstantGapScoring { } -abstract class SmithWatermanConstantGapScoring(xSequence: String, - ySequence: String, - wMatch: Double, - wMismatch: Double, - wInsert: Double, - wDelete: Double) - extends SmithWatermanGapScoringFromFn(xSequence, ySequence, SmithWatermanConstantGapScoring.constantGapFn(wMatch, wMismatch, wInsert, wDelete)) { +class SmithWatermanConstantGapScoring(xSequence: String, + ySequence: String, + wMatch: Double, + wMismatch: Double, + wInsert: Double, + wDelete: Double) + extends SmithWatermanGapScoringFromFn(xSequence, ySequence, SmithWatermanConstantGapScoring.constantGapFn(wMatch, wInsert, wDelete, wMismatch)) { } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/smithwaterman/SmithWatermanGapScoringFromFn.scala b/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/smithwaterman/SmithWatermanGapScoringFromFn.scala index 273a270bbd..946acd3dae 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/smithwaterman/SmithWatermanGapScoringFromFn.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/algorithms/smithwaterman/SmithWatermanGapScoringFromFn.scala @@ -22,44 +22,51 @@ abstract class SmithWatermanGapScoringFromFn(xSequence: String, scoreFn: (Int, Int, Char, Char) => Double) extends SmithWaterman(xSequence, ySequence) { - def buildScoringMatrix(): Array[Array[Double]] = { + def buildScoringMatrix(): (Array[Array[Double]], Array[Array[Char]]) = { - val y = ySequence.length + 1 - val x = xSequence.length + 1 + val y = ySequence.length + val x = xSequence.length - var matrix = new Array[Array[Double]](x) - for (i <- 0 until x) { - matrix(i) = new Array[Double](y) + val scoreMatrix = new Array[Array[Double]](x + 1) + val moveMatrix = new Array[Array[Char]](x + 1) + for (i <- 0 to x) { + scoreMatrix(i) = new Array[Double](y + 1) + moveMatrix(i) = new Array[Char](y + 1) } // set row/col 0 to 0 - for (i <- 0 until x) { - matrix(i)(0) = 0.0 + for (i <- 0 to x) { + scoreMatrix(i)(0) = 0.0 + moveMatrix(i)(0) = 'T' } - for (j <- 0 until y) { - matrix(0)(j) = 0.0 + for (j <- 0 to y) { + scoreMatrix(0)(j) = 0.0 + moveMatrix(0)(j) = 'T' } // score matrix - for (i <- 1 until x) { - for (j <- i until y) { - val m = matrix(i - 1)(j - 1) + scoreFn(i, j, xSequence(i), ySequence(j)) - val d = matrix(i - 1)(j) + scoreFn(i, j, xSequence(i), '_') - val in = matrix(i)(j - 1) + scoreFn(i, j, '_', ySequence(j)) - val update = (d max in) max (m max 0) + for (i <- 1 to x) { + for (j <- 1 to y) { + val m = scoreMatrix(i - 1)(j - 1) + scoreFn(i, j, xSequence(i - 1), ySequence(j - 1)) + val d = scoreMatrix(i - 1)(j) + scoreFn(i, j, xSequence(i - 1), '_') + val in = scoreMatrix(i)(j - 1) + scoreFn(i, j, '_', ySequence(j - 1)) - matrix(i)(j) = update - - // check if new max and update - if (update > max) { - maxX = i - maxY = j - max = update + val (scoreUpdate, moveUpdate) = if (m >= d && m >= in && m > 0.0) { + (m, 'B') + } else if (d >= in && d > 0.0) { + (d, 'J') + } else if (in > 0.0) { + (in, 'I') + } else { + (0.0, 'T') } + + scoreMatrix(i)(j) = scoreUpdate + moveMatrix(i)(j) = moveUpdate } } - matrix + (scoreMatrix, moveMatrix) } } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/models/Consensus.scala b/adam-core/src/main/scala/org/bdgenomics/adam/models/Consensus.scala index 476602ee8e..78c09aa08f 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/models/Consensus.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/models/Consensus.scala @@ -17,22 +17,21 @@ */ package org.bdgenomics.adam.models -import scala.collection.immutable.NumericRange import net.sf.samtools.{ Cigar, CigarOperator } import org.bdgenomics.adam.util.ImplicitJavaConversions._ object Consensus { - def generateAlternateConsensus(sequence: String, start: Long, cigar: Cigar): Option[Consensus] = { + def generateAlternateConsensus(sequence: String, start: ReferencePosition, cigar: Cigar): Option[Consensus] = { var readPos = 0 - var referencePos = start + var referencePos = start.pos - if (cigar.getCigarElements.filter(elem => elem.getOperator == CigarOperator.I || elem.getOperator == CigarOperator.D).length == 1) { + if (cigar.getCigarElements.count(elem => elem.getOperator == CigarOperator.I || elem.getOperator == CigarOperator.D) == 1) { cigar.getCigarElements.foreach(cigarElement => { cigarElement.getOperator match { - case CigarOperator.I => return Some(new Consensus(sequence.substring(readPos, readPos + cigarElement.getLength), referencePos to referencePos)) - case CigarOperator.D => return Some(new Consensus("", referencePos until (referencePos + cigarElement.getLength))) + case CigarOperator.I => return Some(new Consensus(sequence.substring(readPos, readPos + cigarElement.getLength), ReferenceRegion(start.referenceName, referencePos, referencePos + 1))) + case CigarOperator.D => return Some(new Consensus("", ReferenceRegion(start.referenceName, referencePos, referencePos + cigarElement.getLength + 1))) case _ => { if (cigarElement.getOperator.consumesReadBases && cigarElement.getOperator.consumesReferenceBases) { readPos += cigarElement.getLength @@ -51,13 +50,21 @@ object Consensus { } -case class Consensus(consensus: String, index: NumericRange[Long]) { +case class Consensus(consensus: String, index: ReferenceRegion) { def insertIntoReference(reference: String, refStart: Long, refEnd: Long): String = { - if (index.head < refStart || index.head > refEnd || index.end < refStart || index.end > refEnd) { + if (index.start < refStart || index.start > refEnd || index.end - 1 < refStart || index.end - 1 > refEnd) { throw new IllegalArgumentException("Consensus and reference do not overlap: " + index + " vs. " + refStart + " to " + refEnd) } else { - reference.substring(0, (index.head - refStart).toInt) + consensus + reference.substring((index.end - refStart).toInt) + reference.substring(0, (index.start - refStart).toInt) + consensus + reference.substring((index.end - 1 - refStart).toInt) + } + } + + override def toString: String = { + if (index.start + 1 != index.end) { + "Deletion over " + index.toString + } else { + "Inserted " + consensus + " at " + index.toString } } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/models/IndelTable.scala b/adam-core/src/main/scala/org/bdgenomics/adam/models/IndelTable.scala new file mode 100644 index 0000000000..d120ab8411 --- /dev/null +++ b/adam-core/src/main/scala/org/bdgenomics/adam/models/IndelTable.scala @@ -0,0 +1,90 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.adam.models + +import org.apache.spark.{ Logging, SparkContext } +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD +import org.bdgenomics.adam.rdd.variation.ADAMVariationContext._ +import org.bdgenomics.formats.avro.ADAMVariant + +class IndelTable(private val table: Map[String, Iterable[Consensus]]) extends Serializable with Logging { + log.info("Indel table has %s contigs and %s entries".format(table.size, + table.values.map(_.size).sum)) + + /** + * Returns all known indels within the given reference region. If none are known, returns an empty Seq. + * + * @param region Region to look for known indels. + * @return Returns a sequence of consensuses. + */ + def getIndelsInRegion(region: ReferenceRegion): Seq[Consensus] = { + if (table.contains(region.referenceName)) { + val bucket = table(region.referenceName) + + bucket.filter(_.index.overlaps(region)).toSeq + } else { + Seq() + } + } +} + +object IndelTable { + + /** + * Creates an indel table from a file containing known indels. + * + * @param knownIndelsFile Path to file with known indels. + * @param sc SparkContext to use for loading. + * @return Returns a table with the known indels populated. + */ + def apply(knownIndelsFile: String, sc: SparkContext): IndelTable = { + val rdd: RDD[ADAMVariantContext] = sc.adamVCFLoad(knownIndelsFile) + apply(rdd.map(_.variant.variant)) + } + + /** + * Creates an indel table from an RDD containing known variants. + * + * @param variants RDD of variants. + * @return Returns a table with known indels populated. + */ + def apply(variants: RDD[ADAMVariant]): IndelTable = { + val consensus: Map[String, Iterable[Consensus]] = variants.filter(v => v.getReferenceAllele.length != v.getVariantAllele.length) + .map(v => { + val referenceName = v.getContig.getContigName.toString + val consensus = if (v.getReferenceAllele.length > v.getVariantAllele.length) { + // deletion + val deletionLength = v.getReferenceAllele.length - v.getVariantAllele.length + val start = v.getPosition + v.getVariantAllele.length + + Consensus("", ReferenceRegion(referenceName, start, start + deletionLength)) + } else { + val start = v.getPosition + v.getReferenceAllele.length + + Consensus(v.getVariantAllele.toString.drop(v.getReferenceAllele.length), ReferenceRegion(referenceName, start, start + 1)) + } + + (referenceName, consensus) + }).groupByKey() + .collect() + .toMap + + new IndelTable(consensus) + } +} diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/models/ReferenceRegion.scala b/adam-core/src/main/scala/org/bdgenomics/adam/models/ReferenceRegion.scala index 6bf08fc63c..ea01cc9f6a 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/models/ReferenceRegion.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/models/ReferenceRegion.scala @@ -199,6 +199,10 @@ case class ReferenceRegion(referenceName: String, start: Long, end: Long) extend start.compareTo(that.start) else end.compareTo(that.end) + + def length(): Long = { + end - start + } } class ReferenceRegionSerializer extends Serializer[ReferenceRegion] { diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMRDDFunctions.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMRDDFunctions.scala index 3fe86e8175..9dfaeb1183 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMRDDFunctions.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMRDDFunctions.scala @@ -31,6 +31,10 @@ import org.bdgenomics.formats.avro.{ ADAMRecord, ADAMNucleotideContigFragment } +import org.bdgenomics.adam.algorithms.consensus.{ + ConsensusGenerator, + ConsensusGeneratorFromReads +} import org.bdgenomics.adam.converters.ADAMRecordConverter import org.bdgenomics.adam.models.{ ADAMRod, @@ -55,7 +59,7 @@ import org.bdgenomics.adam.util.{ ADAMBAMOutputFormat, ADAMSAMOutputFormat } -import parquet.avro.{ AvroParquetOutputFormat, AvroWriteSupport } +import parquet.avro.AvroParquetOutputFormat import parquet.hadoop.ParquetOutputFormat import parquet.hadoop.metadata.CompressionCodecName import parquet.hadoop.util.ContextUtil @@ -167,8 +171,8 @@ class ADAMRecordRDDFunctions(rdd: RDD[ADAMRecord]) extends ADAMSequenceDictionar // attach header to output format asSam match { - case true => ADAMSAMOutputFormat.addHeader(convertRecords.first.get.getHeader()) - case false => ADAMBAMOutputFormat.addHeader(convertRecords.first.get.getHeader()) + case true => ADAMSAMOutputFormat.addHeader(header) + case false => ADAMBAMOutputFormat.addHeader(header) } // write file to disk @@ -306,8 +310,28 @@ class ADAMRecordRDDFunctions(rdd: RDD[ADAMRecord]) extends ADAMSequenceDictionar BaseQualityRecalibration(rdd, knownSnps) } - def adamRealignIndels(): RDD[ADAMRecord] = { - RealignIndels(rdd) + /** + * Realigns indels using a concensus-based heuristic. + * + * @see RealignIndels + * + * @param isSorted If the input data is sorted, setting this parameter to true avoids a second sort. + * @param maxIndelSize The size of the largest indel to use for realignment. + * @param maxConsensusNumber The maximum number of consensus sequences to realign against per + * target region. + * @param lodThreshold Log-odds threhold to use when realigning; realignments are only finalized + * if the log-odds threshold is exceeded. + * @param maxTargetSize The maximum width of a single target region for realignment. + * + * @return Returns an RDD of mapped reads which have been realigned. + */ + def adamRealignIndels(consensusModel: ConsensusGenerator = new ConsensusGeneratorFromReads, + isSorted: Boolean = false, + maxIndelSize: Int = 500, + maxConsensusNumber: Int = 30, + lodThreshold: Double = 5.0, + maxTargetSize: Int = 3000): RDD[ADAMRecord] = { + RealignIndels(rdd, consensusModel, isSorted, maxIndelSize, maxConsensusNumber, lodThreshold) } // Returns a tuple of (failedQualityMetrics, passedQualityMetrics) @@ -378,7 +402,7 @@ class ADAMRecordRDDFunctions(rdd: RDD[ADAMRecord]) extends ADAMSequenceDictionar * @return A sequence containing the rods in this bucket. */ def bucketedReadsToRods(bucket: (ReferencePosition, Iterable[ADAMRecord])): Iterable[ADAMRod] = { - val (bucketStart, bucketReads) = bucket + val (_, bucketReads) = bucket bucketReads.flatMap(pp.readToPileups) .groupBy(ReferencePosition(_)) @@ -490,7 +514,7 @@ class ADAMRodRDDFunctions(rdd: RDD[ADAMRod]) extends Serializable with Logging { * @return Rods split up by samples and _not_ grouped together. */ def adamSplitRodsBySamples(): RDD[ADAMRod] = { - rdd.flatMap(_.splitBySamples) + rdd.flatMap(_.splitBySamples()) } /** @@ -500,7 +524,7 @@ class ADAMRodRDDFunctions(rdd: RDD[ADAMRod]) extends Serializable with Logging { * @return Rods split up by samples and grouped together by position. */ def adamDivideRodsBySamples(): RDD[(ReferencePosition, List[ADAMRod])] = { - rdd.keyBy(_.position).map(r => (r._1, r._2.splitBySamples)) + rdd.keyBy(_.position).map(r => (r._1, r._2.splitBySamples())) } /** @@ -531,7 +555,7 @@ class ADAMRodRDDFunctions(rdd: RDD[ADAMRod]) extends Serializable with Logging { val totalBases: Long = rdd.map(_.pileups.length.toLong).reduce(_ + _) // coverage is the total count of bases, over the total number of loci - totalBases.toDouble / rdd.count.toDouble + totalBases.toDouble / rdd.count().toDouble } } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/RealignIndels.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/RealignIndels.scala index ace8d0609a..11015b1105 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/RealignIndels.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/RealignIndels.scala @@ -17,27 +17,35 @@ */ package org.bdgenomics.adam.rdd -import org.apache.spark.rdd.RDD +import net.sf.samtools.{ Cigar, CigarOperator, CigarElement } import org.apache.spark.Logging -import org.bdgenomics.formats.avro.ADAMRecord +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD +import org.bdgenomics.adam.algorithms.consensus.{ + ConsensusGenerator, + ConsensusGeneratorFromReads +} import org.bdgenomics.adam.algorithms.realignmenttarget.{ RealignmentTargetFinder, IndelRealignmentTarget, TargetOrdering, - ZippedTargetOrdering + ZippedTargetOrdering, + ZippedTargetSet +} +import org.bdgenomics.adam.models.{ + Consensus, + ReferencePosition, + ReferenceRegion } -import scala.collection.immutable.TreeSet -import scala.annotation.tailrec -import scala.collection.mutable.Map -import net.sf.samtools.{ Cigar, CigarOperator, CigarElement } -import scala.collection.immutable.NumericRange -import org.bdgenomics.adam.models.Consensus -import org.bdgenomics.adam.util.ImplicitJavaConversions._ import org.bdgenomics.adam.rich.RichADAMRecord import org.bdgenomics.adam.rich.RichADAMRecord._ -import org.bdgenomics.adam.rich.RichCigar._ import org.bdgenomics.adam.util.MdTag -import org.bdgenomics.adam.util.NormalizationUtils._ +import org.bdgenomics.adam.util.ImplicitJavaConversions._ +import org.bdgenomics.formats.avro.ADAMRecord +import scala.annotation.tailrec +import scala.collection.immutable.{ NumericRange, TreeSet } +import scala.collection.mutable.Map +import scala.util.Random private[rdd] object RealignIndels { @@ -47,8 +55,19 @@ private[rdd] object RealignIndels { * @param rdd RDD of reads to realign. * @return RDD of realigned reads. */ - def apply(rdd: RDD[ADAMRecord]): RDD[ADAMRecord] = { - new RealignIndels().realignIndels(rdd) + def apply(rdd: RDD[ADAMRecord], + consensusModel: ConsensusGenerator = new ConsensusGeneratorFromReads, + dataIsSorted: Boolean = false, + maxIndelSize: Int = 500, + maxConsensusNumber: Int = 30, + lodThreshold: Double = 5.0, + maxTargetSize: Int = 3000): RDD[ADAMRecord] = { + new RealignIndels(consensusModel, + dataIsSorted, + maxIndelSize, + maxConsensusNumber, + lodThreshold, + maxTargetSize).realignIndels(rdd) } /** @@ -56,7 +75,6 @@ private[rdd] object RealignIndels { * target and should be realigned, else returns the "empty" target (denoted by a negative index). * * @note Generally, this function shouldn't be called directly---for most cases, prefer mapTargets. - * * @param read Read to check. * @param targets Sorted set of realignment targets. * @return If overlapping target is found, returns that target. Else, returns the "empty" target. @@ -73,7 +91,7 @@ private[rdd] object RealignIndels { } else { // else, return an empty target (negative index) // to prevent key skew, split up by max indel alignment length - (-1 - (read.record.getStart() / 3000L)).toInt + (-1 - (read.record.getStart / 3000L)).toInt } } else { // split the set and recurse @@ -87,10 +105,27 @@ private[rdd] object RealignIndels { } } + /** + * This method wraps mapToTarget(RichADAMRecord, TreeSet[Tuple2[IndelRealignmentTarget, Int]]) for + * serialization purposes. + * + * @param read Read to check. + * @param targets Wrapped zipped indel realignment target. + * @return Target if an overlapping target is found, else the empty target. + * + * @see mapTargets + */ + def mapToTarget(read: RichADAMRecord, + targets: ZippedTargetSet): Int = { + mapToTarget(read, targets.set) + } + /** * Method to map a target index to an indel realignment target. * * @note Generally, this function shouldn't be called directly---for most cases, prefer mapTargets. + * @note This function should not be called in a context where target set serialization is needed. + * Instead, call mapToTarget(Int, ZippedTargetSet), which wraps this function. * * @param targetIndex Index of target. * @param targets Set of realignment targets. @@ -98,14 +133,29 @@ private[rdd] object RealignIndels { * * @see mapTargets */ - def mapToTarget(targetIndex: Int, targets: TreeSet[(IndelRealignmentTarget, Int)]): IndelRealignmentTarget = { + def mapToTargetUnpacked(targetIndex: Int, + targets: TreeSet[Tuple2[IndelRealignmentTarget, Int]]): Option[IndelRealignmentTarget] = { if (targetIndex < 0) { - IndelRealignmentTarget.emptyTarget() + None } else { - targets.filter(p => p._2 == targetIndex).head._1 + Some(targets.filter(p => p._2 == targetIndex).head._1) } } + /** + * Wrapper for mapToTarget(Int, TreeSet[Tuple2[IndelRealignmentTarget, Int]]) for contexts where + * serialization is needed. + * + * @param targetIndex Index of target. + * @param targets Set of realignment targets. + * @return Indel realignment target. + * + * @see mapTargets + */ + def mapToTarget(targetIndex: Int, targets: ZippedTargetSet): Option[IndelRealignmentTarget] = { + mapToTargetUnpacked(targetIndex, targets.set) + } + /** * Maps reads to targets. Wraps both mapToTarget functions together and handles target index creation and broadcast. * @@ -120,11 +170,12 @@ private[rdd] object RealignIndels { * * @see mapToTarget */ - def mapTargets(rich_rdd: RDD[RichADAMRecord], targets: TreeSet[IndelRealignmentTarget]): RDD[(IndelRealignmentTarget, Iterable[RichADAMRecord])] = { + def mapTargets(rich_rdd: RDD[RichADAMRecord], targets: TreeSet[IndelRealignmentTarget]): RDD[(Option[IndelRealignmentTarget], Iterable[RichADAMRecord])] = { val tmpZippedTargets = targets.zip(0 until targets.count(t => true)) - var tmpZippedTargets2 = new TreeSet[(IndelRealignmentTarget, Int)]()(ZippedTargetOrdering) + var tmpZippedTargets2 = new TreeSet[Tuple2[IndelRealignmentTarget, Int]]()(ZippedTargetOrdering) tmpZippedTargets.foreach(t => tmpZippedTargets2 = tmpZippedTargets2 + t) - val zippedTargets = tmpZippedTargets2 + + val zippedTargets = new ZippedTargetSet(tmpZippedTargets2) // group reads by target val broadcastTargets = rich_rdd.context.broadcast(zippedTargets) @@ -143,11 +194,12 @@ private[rdd] object RealignIndels { /** * From a set of reads, returns the reference sequence that they overlap. */ - def getReferenceFromReads(reads: Seq[RichADAMRecord]): (String, Long, Long) = { + def getReferenceFromReads(reads: Iterable[RichADAMRecord]): (String, Long, Long) = { // get reference and range from a single read val readRefs = reads.map((r: RichADAMRecord) => { (r.mdTag.get.getReference(r), r.getStart.toLong to r.end.get) }) + .toSeq .sortBy(_._2.head) // fold over sequences and append - sequence is sorted at start @@ -168,62 +220,12 @@ private[rdd] object RealignIndels { import RealignIndels._ -private[rdd] class RealignIndels extends Serializable with Logging { - - // parameter for longest indel to realign - val maxIndelSize = 3000 - - // max number of consensuses to evaluate - TODO: not currently used - val maxConsensusNumber = 30 - - // log-odds threshold for entropy improvement for accepting a realignment - val lodThreshold = 5.0 - - def findConsensus(reads: Iterable[RichADAMRecord]): Tuple3[List[RichADAMRecord], List[RichADAMRecord], List[Consensus]] = { - var realignedReads = List[RichADAMRecord]() - var readsToClean = List[RichADAMRecord]() - var consensus = List[Consensus]() - - // loop across reads and triage/generate consensus sequences - reads.foreach(r => { - var cigar: Cigar = null - var mdTag: MdTag = null - - // if there are two alignment blocks (sequence matches) then there is a single indel in the read - if (r.samtoolsCigar.numAlignmentBlocks == 2) { - // left align this indel and update the mdtag - cigar = leftAlignIndel(r) - mdTag = MdTag.moveAlignment(r, cigar) - } - - mdTag = if (mdTag == null) r.mdTag.get else mdTag - - if (mdTag.hasMismatches) { - val newRead: RichADAMRecord = - if (cigar != null) { - ADAMRecord.newBuilder(r).setCigar(cigar.toString).setMismatchingPositions(mdTag.toString).build() - } else { - ADAMRecord.newBuilder(r).build() - } - // we clean all reads that have mismatches - readsToClean = newRead :: readsToClean - - // try to generate a consensus alignment - if a consensus exists, add it to our list of consensuses to test - consensus = Consensus.generateAlternateConsensus(newRead.getSequence, newRead.getStart, newRead.samtoolsCigar) match { - case Some(o) => o :: consensus - case None => consensus - } - } else { - // if the read does not have mismatches, then we needn't process it further - realignedReads = new RichADAMRecord(r) :: realignedReads - } - }) - - // TODO: check whether this improves or harms performance - consensus = consensus.distinct - - new Tuple3(realignedReads, readsToClean, consensus) - } +private[rdd] class RealignIndels(val consensusModel: ConsensusGenerator = new ConsensusGeneratorFromReads, + val dataIsSorted: Boolean = false, + val maxIndelSize: Int = 500, + val maxConsensusNumber: Int = 30, + val lodThreshold: Double = 5.0, + val maxTargetSize: Int = 3000) extends Serializable with Logging { /** * Given a target group with an indel realignment target and a group of reads to realign, this method @@ -233,21 +235,32 @@ private[rdd] class RealignIndels extends Serializable with Logging { * @return A sequence of reads which have either been realigned if there is a sufficiently good alternative * consensus, or not realigned if there is not a sufficiently good consensus. */ - def realignTargetGroup(targetGroup: (IndelRealignmentTarget, Iterable[RichADAMRecord])): Iterable[RichADAMRecord] = { + def realignTargetGroup(targetGroup: (Option[IndelRealignmentTarget], Iterable[RichADAMRecord])): Iterable[RichADAMRecord] = { val (target, reads) = targetGroup - var (realignedReads, readsToClean, consensus) = findConsensus(reads) + var realignedReads = reads.filter(r => r.mdTag.isDefined && !r.mdTag.get.hasMismatches) + + // get reference from reads + val (reference, refStart, refEnd) = getReferenceFromReads(reads.map(r => new RichADAMRecord(r))) + val refRegion = ReferenceRegion(reads.head.record.getContig.getContigName, refStart, refEnd) + + // preprocess reads and get consensus + val readsToClean = consensusModel.preprocessReadsForRealignment(reads.filter(r => !r.mdTag.isDefined || r.mdTag.get.hasMismatches), + reference, + refRegion) + var consensus = consensusModel.findConsensus(readsToClean) + + // reduce count of consensus sequences + if (consensus.size > maxConsensusNumber) { + val r = new Random() + consensus = r.shuffle(consensus).take(maxConsensusNumber) + } if (target.isEmpty) { // if the indel realignment target is empty, do not realign reads } else { - var mismatchSum = 0L - - if (readsToClean.length > 0 && consensus.length > 0) { - - // get reference from reads - val (reference, refStart, refEnd) = getReferenceFromReads(reads.map(r => new RichADAMRecord(r)).toSeq) + if (readsToClean.size > 0 && consensus.size > 0) { // do not check realigned reads - they must match val totalMismatchSumPreCleaning = readsToClean.map(sumMismatchQuality(_)).reduce(_ + _) @@ -266,7 +279,7 @@ private[rdd] class RealignIndels extends Serializable with Logging { // evaluate all reads against the new consensus val sweptValues = readsToClean.map(r => { - val (qual, pos) = sweepReadOverReferenceForQuality(r.getSequence, consensusSequence, r.qualityScores.map(_.toInt)) + val (qual, pos) = sweepReadOverReferenceForQuality(r.getSequence, consensusSequence, r.qualityScores) val originalQual = sumMismatchQuality(r) // if the read's mismatch quality improves over the original alignment, save @@ -306,7 +319,7 @@ private[rdd] class RealignIndels extends Serializable with Logging { if ((totalMismatchSumPreCleaning - bestConsensusMismatchSum).toDouble / 10.0 > lodThreshold) { // if we see a sufficient improvement, realign the reads - val cleanedReads: List[RichADAMRecord] = readsToClean.map(r => { + val cleanedReads: Iterable[RichADAMRecord] = readsToClean.map(r => { val builder: ADAMRecord.Builder = ADAMRecord.newBuilder(r) val remapping = bestMappings(r) @@ -320,17 +333,17 @@ private[rdd] class RealignIndels extends Serializable with Logging { builder.setStart(refStart + remapping) // recompute cigar - val newCigar: Cigar = if (refStart + remapping >= bestConsensus.index.head && refStart + remapping <= bestConsensus.index.end) { + val newCigar: Cigar = if (refStart + remapping >= bestConsensus.index.start && refStart + remapping <= bestConsensus.index.end - 1) { // if element overlaps with consensus indel, modify cigar with indel - val (idElement, endLength) = if (bestConsensus.index.head == bestConsensus.index.end) { + val (idElement, endLength) = if (bestConsensus.index.start == bestConsensus.index.end - 1) { (new CigarElement(bestConsensus.consensus.length, CigarOperator.I), - r.getSequence.length - bestConsensus.consensus.length - (bestConsensus.index.head - (refStart + remapping))) + r.getSequence.length - bestConsensus.consensus.length - (bestConsensus.index.start - (refStart + remapping))) } else { - (new CigarElement((bestConsensus.index.end - bestConsensus.index.head).toInt, CigarOperator.D), - r.getSequence.length - (bestConsensus.index.head - (refStart + remapping))) + (new CigarElement((bestConsensus.index.end - 1 - bestConsensus.index.start).toInt, CigarOperator.D), + r.getSequence.length - (bestConsensus.index.start - (refStart + remapping))) } - val cigarElements = List[CigarElement](new CigarElement((refStart + remapping - bestConsensus.index.head).toInt, CigarOperator.M), + val cigarElements = List[CigarElement](new CigarElement((refStart + remapping - bestConsensus.index.start).toInt, CigarOperator.M), idElement, new CigarElement(endLength.toInt, CigarOperator.M)) @@ -341,22 +354,20 @@ private[rdd] class RealignIndels extends Serializable with Logging { } // update mdtag and cigar - builder.setMismatchingPositions(MdTag.moveAlignment(r, newCigar, reference.drop(remapping), refStart + remapping).toString) + builder.setMismatchingPositions(MdTag.moveAlignment(r, newCigar, reference.drop(remapping), refStart + remapping).toString()) builder.setCigar(newCigar.toString) new RichADAMRecord(builder.build()) - // TODO: fix mismatchingPositions string } else new RichADAMRecord(builder.build()) }) - realignedReads = cleanedReads ::: realignedReads + realignedReads = cleanedReads ++ realignedReads } else { - realignedReads = readsToClean ::: realignedReads + realignedReads = readsToClean ++ realignedReads } } // return all reads that we cleaned and all reads that were initially realigned - //readsToClean //::: realignedReads realignedReads } } @@ -392,9 +403,9 @@ private[rdd] class RealignIndels extends Serializable with Logging { } /** - * Sums the mismatch quality of a read against a reference. Mismatch quality is defined as the sum of the base - * quality for all bases in the read that do not match the reference. This method ignores the cigar string, which - * treats indels as causing mismatches. + * Sums the mismatch quality of a read against a reference. Mismatch quality is defined as the sum + * of the base quality for all bases in the read that do not match the reference. This method + * ignores the cigar string, which treats indels as causing mismatches. * * @param read Read to evaluate. * @param reference Reference sequence to look for mismatches against. @@ -415,7 +426,8 @@ private[rdd] class RealignIndels extends Serializable with Logging { } /** - * Given a read, sums the mismatch quality against it's current alignment position. Does NOT ignore cigar. + * Given a read, sums the mismatch quality against it's current alignment position. + * Does NOT ignore cigar. * * @param read Read over which to sum mismatch quality. * @return Mismatch quality of read for current alignment. @@ -423,22 +435,35 @@ private[rdd] class RealignIndels extends Serializable with Logging { def sumMismatchQuality(read: ADAMRecord): Int = { sumMismatchQualityIgnoreCigar(read.getSequence, read.mdTag.get.getReference(read), - read.qualityScores.map(_.toInt)) + read.qualityScores) } /** - * Performs realignment for an RDD of reads. This includes target generation, read/target classification, - * and read realignment. + * Performs realignment for an RDD of reads. This includes target generation, read/target + * classification, and read realignment. * * @param rdd Reads to realign. * @return Realigned read. */ def realignIndels(rdd: RDD[ADAMRecord]): RDD[ADAMRecord] = { + val sortedRdd = if (dataIsSorted) { + rdd.filter(r => r.getReadMapped) + } else { + val sr = rdd.filter(r => r.getReadMapped) + .keyBy(r => ReferencePosition(r).get) + .sortByKey() + + sr.map(kv => kv._2) + } + // we only want to convert once so let's get it over with - val rich_rdd = rdd.map(new RichADAMRecord(_)) + val rich_rdd = sortedRdd.map(new RichADAMRecord(_)) + // find realignment targets log.info("Generating realignment targets...") - val targets: TreeSet[IndelRealignmentTarget] = RealignmentTargetFinder(rdd) + val targets: TreeSet[IndelRealignmentTarget] = RealignmentTargetFinder(rich_rdd, + maxIndelSize, + maxTargetSize) // map reads to targets log.info("Grouping reads by target...") diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variation/ADAMVariationContext.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variation/ADAMVariationContext.scala index c694e0c4aa..90ab11e130 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variation/ADAMVariationContext.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variation/ADAMVariationContext.scala @@ -55,8 +55,10 @@ class ADAMVariationContext(sc: SparkContext) extends Serializable with Logging { def adamVCFAnnotationLoad(filePath: String, dict: Option[SequenceDictionary] = None): RDD[ADAMDatabaseVariantAnnotation] = { log.info("Reading VCF file from %s".format(filePath)) + val job = HadoopUtil.newJob(sc) val vcc = new VariantContextConverter(dict) + val records = sc.newAPIHadoopFile( filePath, classOf[VCFInputFormat], classOf[LongWritable], classOf[VariantContextWritable], diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoRegistrator.scala b/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoRegistrator.scala index 423fa2b3c3..6e891ccad9 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoRegistrator.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoRegistrator.scala @@ -25,9 +25,8 @@ import org.bdgenomics.formats.avro._ import org.bdgenomics.adam.models._ import it.unimi.dsi.fastutil.io.{ FastByteArrayInputStream, FastByteArrayOutputStream } import org.apache.spark.serializer.KryoRegistrator -import org.bdgenomics.adam.algorithms.realignmenttarget._ -import scala.collection.immutable.TreeSet import scala.reflect.ClassTag +import org.bdgenomics.adam.algorithms.realignmenttarget._ case class InputStreamWithDecoder(size: Int) { val buffer = new Array[Byte](size) @@ -79,9 +78,8 @@ class ADAMKryoRegistrator extends KryoRegistrator { kryo.register(classOf[ReferencePosition], new ReferencePositionSerializer) kryo.register(classOf[ReferencePositionPair], new ReferencePositionPairSerializer) kryo.register(classOf[SingleReadBucket], new SingleReadBucketSerializer) - kryo.register(classOf[IndelRange], new IndelRangeSerializer()) - kryo.register(classOf[SNPRange], new SNPRangeSerializer) kryo.register(classOf[IndelRealignmentTarget]) - kryo.register(classOf[TreeSet[IndelRealignmentTarget]], new TreeSetSerializer) + kryo.register(classOf[TargetSet], new TargetSetSerializer) + kryo.register(classOf[ZippedTargetSet], new ZippedTargetSetSerializer) } } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/util/MdTag.scala b/adam-core/src/main/scala/org/bdgenomics/adam/util/MdTag.scala index 16cb535d9d..92bcef3785 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/util/MdTag.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/util/MdTag.scala @@ -17,16 +17,14 @@ */ package org.bdgenomics.adam.util -import scala.collection.immutable -import scala.collection.immutable.NumericRange -import scala.util.matching.Regex -import net.sf.samtools.{ Cigar, CigarOperator } import org.bdgenomics.adam.models.ReferencePosition - -//import org.bdgenomics.adam.util.ImplicitJavaConversions._ import org.bdgenomics.adam.rdd.ADAMContext._ -import org.bdgenomics.adam.rich.RichADAMRecord import org.bdgenomics.adam.rich.RichADAMRecord._ +import org.bdgenomics.adam.rich.RichADAMRecord +import net.sf.samtools.{ Cigar, CigarOperator } +import scala.collection.immutable +import scala.collection.immutable.NumericRange +import scala.util.matching.Regex object MdTagEvent extends Enumeration { val Match, Mismatch, Delete = Value @@ -234,6 +232,57 @@ object MdTag { def moveAlignment(read: RichADAMRecord, newCigar: Cigar, newReference: String, newAlignmentStart: Long): MdTag = { moveAlignment(newReference, read.record.getSequence, newCigar, newAlignmentStart) } + + def apply(read: String, reference: String, cigar: Cigar, start: Long): MdTag = { + var matchCount = 0 + var delCount = 0 + var string = "" + var readPos = 0 + var refPos = 0 + + // loop over all cigar elements + cigar.getCigarElements.foreach(cigarElement => { + cigarElement.getOperator match { + case CigarOperator.M => { + for (i <- 0 until cigarElement.getLength) { + if (read(readPos) == reference(refPos)) { + matchCount += 1 + } else { + string += matchCount.toString + reference(refPos) + matchCount = 0 + } + readPos += 1 + refPos += 1 + delCount = 0 + } + } + case CigarOperator.D => { + for (i <- 0 until cigarElement.getLength) { + if (delCount == 0) { + string += matchCount.toString + "^" + } + string += reference(refPos) + + matchCount = 0 + delCount += 1 + refPos += 1 + } + } + case _ => { + if (cigarElement.getOperator.consumesReadBases) { + readPos += cigarElement.getLength + } + if (cigarElement.getOperator.consumesReferenceBases) { + throw new IllegalArgumentException("Cannot handle operator: " + cigarElement.getOperator) + } + } + } + }) + + string += matchCount.toString + + apply(string, start) + } } class MdTag( @@ -280,7 +329,7 @@ class MdTag( * * @return True if this read has mismatches. We do not return true if the read has no mismatches but has deletions. */ - def hasMismatches(): Boolean = { + def hasMismatches: Boolean = { !mismatches.isEmpty } @@ -333,7 +382,7 @@ class MdTag( cigarElement.getOperator match { case CigarOperator.M => { // if we are a match, loop over bases in element - for (i <- (0 until cigarElement.getLength)) { + for (i <- 0 until cigarElement.getLength) { // if a mismatch, get from the mismatch set, else pull from read if (mismatches.contains(referencePos)) { reference += { @@ -352,7 +401,7 @@ class MdTag( } case CigarOperator.D => { // if a delete, get from the delete pool - for (i <- (0 until cigarElement.getLength)) { + for (i <- 0 until cigarElement.getLength) { reference += { deletes.get(referencePos) match { case Some(base) => base @@ -381,7 +430,7 @@ class MdTag( /** * Converts an MdTag object to a properly formatted MD string. * - * @return MD string corresponding to [0-9]+(([A-Z]|\^[A-Z]+)[0-9]+) + * @return MD string corresponding to [0-9]+(([A-Z]|\^[A-Z]+)[0-9]+) * @see http://zenfractal.com/2013/06/19/playing-with-matches/ */ override def toString(): String = { @@ -454,5 +503,5 @@ class MdTag( case _ => false } def canEqual(other: Any): Boolean = other.isInstanceOf[MdTag] - override def hashCode: Int = toString.hashCode + override def hashCode: Int = toString().hashCode } diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/algorithms/consensus/ConsensusGeneratorFromReadsSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/algorithms/consensus/ConsensusGeneratorFromReadsSuite.scala new file mode 100644 index 0000000000..0f466b7559 --- /dev/null +++ b/adam-core/src/test/scala/org/bdgenomics/adam/algorithms/consensus/ConsensusGeneratorFromReadsSuite.scala @@ -0,0 +1,44 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.adam.algorithms.consensus + +import org.apache.spark.rdd.RDD +import org.bdgenomics.adam.predicates.UniqueMappedReadPredicate +import org.bdgenomics.adam.rdd.ADAMContext._ +import org.bdgenomics.adam.rich.RichADAMRecord +import org.bdgenomics.adam.util.SparkFunSuite +import org.bdgenomics.formats.avro.ADAMRecord + +class ConsensusGeneratorFromReadsSuite extends SparkFunSuite { + + val cg = new ConsensusGeneratorFromReads + + def artificial_reads: RDD[ADAMRecord] = { + val path = ClassLoader.getSystemClassLoader.getResource("artificial.sam").getFile + sc.adamLoad[ADAMRecord, UniqueMappedReadPredicate](path) + } + + sparkTest("checking search for consensus list for artificial reads") { + val consensus = cg.findConsensus(artificial_reads.map(new RichADAMRecord(_)) + .collect() + .toSeq) + + assert(consensus.size === 2) + } +} + diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/algorithms/realignmenttarget/IndelRealignmentTargetSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/algorithms/realignmenttarget/IndelRealignmentTargetSuite.scala index da83166ad0..67491027cc 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/algorithms/realignmenttarget/IndelRealignmentTargetSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/algorithms/realignmenttarget/IndelRealignmentTargetSuite.scala @@ -18,47 +18,30 @@ package org.bdgenomics.adam.algorithms.realignmenttarget -import org.bdgenomics.adam.util.SparkFunSuite -import org.bdgenomics.formats.avro.{ ADAMPileup, ADAMRecord } import org.apache.spark.rdd.RDD +import org.bdgenomics.adam.models.ReferenceRegion +import org.bdgenomics.adam.predicates.UniqueMappedReadPredicate import org.bdgenomics.adam.rdd.ADAMContext._ -import org.apache.spark.SparkContext._ -import scala.collection.immutable.{ NumericRange, TreeSet } -import org.apache.spark.TaskContext -import org.bdgenomics.adam.rdd.ADAMContext._ +import org.bdgenomics.adam.rich.RichADAMRecord +import org.bdgenomics.adam.util.SparkFunSuite +import org.bdgenomics.formats.avro.{ ADAMRecord, ADAMContig } class IndelRealignmentTargetSuite extends SparkFunSuite { // Note: this can't be lazy vals because Spark won't find the RDDs after the first test - def mason_reads: RDD[ADAMRecord] = { + def mason_reads: RDD[RichADAMRecord] = { val path = ClassLoader.getSystemClassLoader.getResource("small_realignment_targets.sam").getFile - val reads: RDD[ADAMRecord] = sc.adamLoad(path) - reads + sc.adamLoad[ADAMRecord, UniqueMappedReadPredicate](path).map(RichADAMRecord(_)) } - def mason_rods: RDD[Iterable[ADAMPileup]] = { - mason_reads.adamRecords2Pileup() - .groupBy(_.getPosition) // this we just do to match behaviour in IndelRealignerTargetFinder - .sortByKey(ascending = true, numPartitions = 1) - .map(_._2) - } - - def artificial_reads: RDD[ADAMRecord] = { + def artificial_reads: RDD[RichADAMRecord] = { val path = ClassLoader.getSystemClassLoader.getResource("artificial.sam").getFile - val reads: RDD[ADAMRecord] = sc.adamLoad(path) - reads - } - - def artificial_rods: RDD[Iterable[ADAMPileup]] = { - artificial_reads.adamRecords2Pileup() - .groupBy(_.getPosition) // this we just do to match behaviour in IndelRealignerTargetFinder - .sortByKey(ascending = true, numPartitions = 1) - .map(_._2) + sc.adamLoad[ADAMRecord, UniqueMappedReadPredicate](path).map(RichADAMRecord(_)) } - def make_read(start: Long, cigar: String, mdtag: String, length: Int, id: Int = 0): ADAMRecord = { + def make_read(start: Long, cigar: String, mdtag: String, length: Int, id: Int = 0): RichADAMRecord = { val sequence: String = "A" * length - ADAMRecord.newBuilder() + RichADAMRecord(ADAMRecord.newBuilder() .setReadName("read" + id.toString) .setStart(start) .setReadMapped(true) @@ -67,196 +50,131 @@ class IndelRealignmentTargetSuite extends SparkFunSuite { .setReadNegativeStrand(false) .setMapq(60) .setQual(sequence) // no typo, we just don't care + .setContig(ADAMContig.newBuilder() + .setContigName("1") + .build()) .setMismatchingPositions(mdtag) - .build() - } - - def make_pileup(reads: RDD[ADAMRecord]): Array[Iterable[ADAMPileup]] = { - reads.adamRecords2Pileup().groupBy(_.getPosition).sortByKey().map(_._2).collect() - } - - sparkTest("checking simple ranges") { - val range1: IndelRange = new IndelRange(new NumericRange.Inclusive[Long](1, 4, 1), - new NumericRange.Inclusive[Long](1, 10, 1)) - val range2: IndelRange = new IndelRange(new NumericRange.Inclusive[Long](1, 4, 1), - new NumericRange.Inclusive[Long](40, 50, 1)) - val range3: SNPRange = new SNPRange(5, new NumericRange.Inclusive[Long](40, 50, 1)) - - assert(range1 != range2) - assert(range1.compareRange(range2) === 0) - assert(range1.compare(range2) === -1) - assert(range1.compareReadRange(range2) === -1) - assert(range2.compareReadRange(range3) === 0) - assert(range1.merge(range2).getReadRange().start === 1) - assert(range1.merge(range2).getReadRange().end === 50) + .build()) } sparkTest("checking simple realignment target") { - val range1: IndelRange = new IndelRange(new NumericRange.Inclusive[Long](1, 4, 1), - new NumericRange.Inclusive[Long](1, 10, 1)) - val range2: IndelRange = new IndelRange(new NumericRange.Inclusive[Long](1, 4, 1), - new NumericRange.Inclusive[Long](40, 50, 1)) - val range3: IndelRange = new IndelRange(new NumericRange.Inclusive[Long](6, 14, 1), - new NumericRange.Inclusive[Long](80, 90, 1)) - val range4: IndelRange = new IndelRange(new NumericRange.Inclusive[Long](2, 4, 1), - new NumericRange.Inclusive[Long](60, 70, 1)) - - val indelRanges1 = (range1 :: range2 :: List()).toSet - val target1 = new IndelRealignmentTarget(indelRanges1, Set.empty[SNPRange]) - val indelRanges2 = (range3 :: range4 :: List()).toSet - val target2 = new IndelRealignmentTarget(indelRanges2, Set.empty[SNPRange]) + val target1 = new IndelRealignmentTarget(Some(ReferenceRegion("1", 1, 10)), + ReferenceRegion("1", 1, 51)) + val target2 = new IndelRealignmentTarget(None, + ReferenceRegion("1", 60, 91)) assert(target1.readRange.start === 1) - assert(target1.readRange.end === 50) + assert(target1.readRange.end === 51) assert(TargetOrdering.overlap(target1, target1) === true) assert(TargetOrdering.overlap(target1, target2) === false) - assert(target2.getReadRange().start === 60) - assert(target2.getReadRange().end === 90) + assert(target2.readRange.start === 60) + assert(target2.readRange.end === 91) + assert(!target1.isEmpty) + assert(target2.isEmpty) } sparkTest("creating simple target from read with deletion") { val read = make_read(3L, "2M3D2M", "2^AAA2", 4) - val read_rdd: RDD[ADAMRecord] = sc.makeRDD(Seq(read), 1) + val read_rdd: RDD[RichADAMRecord] = sc.makeRDD(Seq(read), 1) val targets = RealignmentTargetFinder(read_rdd) assert(targets != null) assert(targets.size === 1) - assert(targets.head.getIndelSet().head.indelRange.start === 5) - assert(targets.head.getIndelSet().head.indelRange.end === 7) - assert(targets.head.getIndelSet().head.readRange.start === 3) - assert(targets.head.getIndelSet().head.readRange.end === 9) + assert(targets.head.variation.get.start === 5) + assert(targets.head.variation.get.end === 8) + assert(targets.head.readRange.start === 3) + assert(targets.head.readRange.end === 10) } sparkTest("creating simple target from read with insertion") { val read = make_read(3L, "2M3I2M", "4", 7) - val read_rdd: RDD[ADAMRecord] = sc.makeRDD(Seq(read), 1) + val read_rdd: RDD[RichADAMRecord] = sc.makeRDD(Seq(read), 1) val targets = RealignmentTargetFinder(read_rdd) assert(targets != null) assert(targets.size === 1) - assert(targets.head.getIndelSet().head.indelRange.start === 5) - assert(targets.head.getIndelSet().head.indelRange.end === 5) - assert(targets.head.getIndelSet().head.readRange.start === 3) - assert(targets.head.getIndelSet().head.readRange.end === 6) + assert(targets.head.variation.get.start === 5) + assert(targets.head.variation.get.end === 6) + assert(targets.head.readRange.start === 3) + assert(targets.head.readRange.end === 7) } - sparkTest("joining simple realignment targets") { - val range1: IndelRange = new IndelRange(new NumericRange.Inclusive[Long](10, 15, 1), - new NumericRange.Inclusive[Long](1, 20, 1)) - val range2: IndelRange = new IndelRange(new NumericRange.Inclusive[Long](10, 15, 1), - new NumericRange.Inclusive[Long](6, 25, 1)) - val target1 = new IndelRealignmentTarget((range1 :: List()).toSet, Set.empty[SNPRange]) - val target2 = new IndelRealignmentTarget((range2 :: List()).toSet, Set.empty[SNPRange]) + sparkTest("joining simple realignment targets on same chr") { + val target1 = new IndelRealignmentTarget(Some(ReferenceRegion("1", 10, 16)), + ReferenceRegion("1", 1, 21)) + val target2 = new IndelRealignmentTarget(Some(ReferenceRegion("1", 10, 16)), + ReferenceRegion("1", 6, 26)) val merged_target = target1.merge(target2) - assert(merged_target.getReadRange().start === 1) - assert(merged_target.getReadRange().end === 25) - assert(merged_target.getIndelSet().toArray.apply(0).indelRange.start === 10) - assert(merged_target.getIndelSet().toArray.apply(0).indelRange.end === 15) - assert(merged_target.getIndelSet().toArray.apply(0).readRange.start === 1) - assert(merged_target.getIndelSet().toArray.apply(0).readRange.end === 25) + assert(merged_target.readRange.start === 1) + assert(merged_target.readRange.end === 26) + assert(merged_target.variation.get.start === 10) + assert(merged_target.variation.get.end === 16) + } + + sparkTest("joining simple realignment targets on different chr throws exception") { + val target1 = new IndelRealignmentTarget(Some(ReferenceRegion("1", 10, 16)), + ReferenceRegion("1", 1, 21)) + val target2 = new IndelRealignmentTarget(Some(ReferenceRegion("2", 10, 16)), + ReferenceRegion("2", 6, 26)) + + intercept[AssertionError] { + val merged_target = target1.merge(target2) + } } sparkTest("creating targets from three intersecting reads, same indel") { val read1 = make_read(1L, "4M3D2M", "4^AAA2", 6) val read2 = make_read(2L, "3M3D2M", "3^AAA2", 5) val read3 = make_read(3L, "2M3D2M", "2^AAA2", 4) - val read_rdd: RDD[ADAMRecord] = sc.makeRDD(Seq(read1, read2, read3), 1) + val read_rdd: RDD[RichADAMRecord] = sc.makeRDD(Seq(read1, read2, read3), 1) val targets = RealignmentTargetFinder(read_rdd) assert(targets != null) assert(targets.size === 1) - assert(targets.head.getIndelSet().head.indelRange.start === 5) - assert(targets.head.getIndelSet().head.indelRange.end === 7) - assert(targets.head.getIndelSet().head.readRange.start === 1) - assert(targets.head.getIndelSet().head.readRange.end === 9) + assert(targets.head.variation.get.start === 5) + assert(targets.head.variation.get.end === 8) + assert(targets.head.readRange.start === 1) + assert(targets.head.readRange.end === 10) } sparkTest("creating targets from three intersecting reads, two different indel") { val read1 = make_read(1L, "2M2D4M", "2^AA4", 6, 0) val read2 = make_read(1L, "2M2D2M2D2M", "2^AA2^AA2", 6, 1) val read3 = make_read(5L, "2M2D4M", "2^AA4", 6, 2) - val read_rdd: RDD[ADAMRecord] = sc.makeRDD(Seq(read1, read2, read3), 1) + + val read_rdd: RDD[RichADAMRecord] = sc.makeRDD(Seq(read1, read2, read3), 1) val targets = RealignmentTargetFinder(read_rdd) + assert(targets != null) assert(targets.size === 1) - val indels = targets.head.indelSet.toArray - assert(indels(0).indelRange.start === 3) - assert(indels(0).indelRange.end === 4) - assert(indels(0).readRange.start === 1) - assert(indels(0).readRange.end === 10) - assert(indels(1).indelRange.start === 7) - assert(indels(1).indelRange.end === 8) - assert(indels(1).readRange.start === 1) - assert(indels(1).readRange.end === 12) - assert(targets.head.getReadRange().start === 1) - assert(targets.head.getReadRange().end === 12) + assert(targets.head.variation.get.start === 3) + assert(targets.head.variation.get.end === 9) + assert(targets.head.readRange.start === 1) + assert(targets.head.readRange.end === 13) } sparkTest("creating targets from two disjoint reads") { val read1 = make_read(1L, "2M2D2M", "2^AA2", 4) val read2 = make_read(7L, "2M2D2M", "2^AA2", 4) - val read_rdd: RDD[ADAMRecord] = sc.makeRDD(Seq(read1, read2), 1) + val read_rdd: RDD[RichADAMRecord] = sc.makeRDD(Seq(read1, read2), 1) val targets = RealignmentTargetFinder(read_rdd).toArray assert(targets != null) assert(targets.size === 2) - assert(targets(0).getIndelSet().head.indelRange.start === 3) - assert(targets(0).getIndelSet().head.indelRange.end === 4) - assert(targets(0).getIndelSet().head.readRange.start === 1) - assert(targets(0).getIndelSet().head.readRange.end === 6) - assert(targets(1).getIndelSet().head.indelRange.start === 9) - assert(targets(1).getIndelSet().head.indelRange.end === 10) - assert(targets(1).getIndelSet().head.readRange.start === 7) - assert(targets(1).getIndelSet().head.readRange.end === 12) - } - - sparkTest("extracting matches, mismatches and indels from mason reads") { - - val extracted_rods: RDD[Tuple3[Iterable[ADAMPileup], Iterable[ADAMPileup], Iterable[ADAMPileup]]] = - mason_rods.map(x => Tuple3(IndelRealignmentTarget.extractIndels(x), IndelRealignmentTarget.extractMatches(x), IndelRealignmentTarget.extractMismatches(x))) - val extracted_rods_collected: Array[Tuple3[Iterable[ADAMPileup], Iterable[ADAMPileup], Iterable[ADAMPileup]]] = extracted_rods.collect() - - // the first read has CIGAR 100M and MD 92T7 - assert(extracted_rods_collected.slice(0, 100).forall(x => x._1.size == 0)) - assert(extracted_rods_collected.slice(0, 92).forall(x => x._2.size == 1)) - assert(extracted_rods_collected.slice(0, 92).forall(x => x._3.size == 0)) - assert(extracted_rods_collected(92)._2.size === 0) - assert(extracted_rods_collected(92)._3.size === 1) - assert(extracted_rods_collected.slice(93, 100).forall(x => x._2.size == 1)) - assert(extracted_rods_collected.slice(93, 100).forall(x => x._3.size == 0)) - // the second read has CIGAR 32M1D33M1I34M and MD 0G24A6^T67 - assert(extracted_rods_collected.slice(100, 132).forall(x => x._1.size == 0)) - // first the SNP at the beginning - assert(extracted_rods_collected(100)._2.size === 0) - assert(extracted_rods_collected(100)._3.size === 1) - // now a few matches - assert(extracted_rods_collected.slice(101, 125).forall(x => x._2.size == 1)) - assert(extracted_rods_collected.slice(101, 125).forall(x => x._3.size == 0)) - // another SNP - assert(extracted_rods_collected(125)._2.size === 0) - assert(extracted_rods_collected(125)._3.size === 1) - // a few more matches - assert(extracted_rods_collected.slice(126, 132).forall(x => x._2.size == 1)) - assert(extracted_rods_collected.slice(126, 132).forall(x => x._3.size == 0)) - // now comes the deletion of T - assert(extracted_rods_collected(132)._1.size === 1) - assert(extracted_rods_collected(132)._2.size === 0) - assert(extracted_rods_collected(132)._3.size === 0) - // now 33 more matches - assert(extracted_rods_collected.slice(133, 166).forall(x => x._1.size == 0)) - assert(extracted_rods_collected.slice(133, 166).forall(x => x._2.size == 1)) - assert(extracted_rods_collected.slice(133, 166).forall(x => x._3.size == 0)) - // now one insertion - assert(extracted_rods_collected(166)._1.size === 1) - assert(extracted_rods_collected(166)._2.size === 1) - assert(extracted_rods_collected(166)._3.size === 0) - // TODO: add read with more insertions, overlapping reads + assert(targets(0).variation.get.start === 3) + assert(targets(0).variation.get.end === 5) + assert(targets(0).readRange.start === 1) + assert(targets(0).readRange.end === 7) + assert(targets(1).variation.get.start === 9) + assert(targets(1).variation.get.end === 11) + assert(targets(1).readRange.start === 7) + assert(targets(1).readRange.end === 13) } sparkTest("creating targets for artificial reads: one-by-one") { def check_indel(target: IndelRealignmentTarget, read: ADAMRecord): Boolean = { - val indelRange: NumericRange[Long] = target.indelSet.head.getIndelRange() + val indelRange: ReferenceRegion = target.variation.get read.getStart.toLong match { - case 5L => ((indelRange.start == 34) && (indelRange.end == 43)) - case 10L => ((indelRange.start == 54) && (indelRange.end == 63)) - case 15L => ((indelRange.start == 34) && (indelRange.end == 43)) - case 20L => ((indelRange.start == 54) && (indelRange.end == 63)) - case 25L => ((indelRange.start == 34) && (indelRange.end == 43)) + case 5L => (indelRange.start == 34) && (indelRange.end == 44) + case 10L => (indelRange.start == 54) && (indelRange.end == 64) + case 15L => (indelRange.start == 34) && (indelRange.end == 44) + case 20L => (indelRange.start == 54) && (indelRange.end == 64) + case 25L => (indelRange.start == 34) && (indelRange.end == 44) case _ => false } } @@ -264,84 +182,38 @@ class IndelRealignmentTargetSuite extends SparkFunSuite { val reads = artificial_reads.collect() reads.foreach( read => { - val read_rdd: RDD[ADAMRecord] = sc.makeRDD(Seq(read), 1) + val read_rdd: RDD[RichADAMRecord] = sc.makeRDD(Seq(read), 1) val targets = RealignmentTargetFinder(read_rdd) if (read.getStart < 105) { assert(targets != null) assert(targets.size === 1) // the later read mates do not have indels - assert(targets.head.getIndelSet().head.readRange.start === read.getStart) - assert(targets.head.getIndelSet().head.readRange.end === read.end.get - 1) + assert(targets.head.readRange.start === read.getStart) + assert(targets.head.readRange.end === read.end.get) assert(check_indel(targets.head, read)) } }) } sparkTest("creating targets for artificial reads: all-at-once (merged)") { - val artificial_pileup = make_pileup(artificial_reads) - assert(artificial_pileup.size > 0) val targets_collected: Array[IndelRealignmentTarget] = RealignmentTargetFinder(artificial_reads).toArray - // there are no SNPs in the artificial reads - val only_SNPs = targets_collected.filter(_.getSNPSet() != Set.empty) - // TODO: it seems that mismatches all create separate SNP targets? - //assert(only_SNPs.size == 0) - // there are two indels (deletions) in the reads - val only_indels = targets_collected.filter(_.getIndelSet() != Set.empty) - assert(only_indels.size === 1) - assert(only_indels.head.getIndelSet().size == 2) - assert(only_indels.head.getReadRange().start === 5) - assert(only_indels.head.getReadRange().end === 94) - val indelsets = only_indels.head.getIndelSet().toArray - // NOTE: this assumes the set is in sorted order, which seems to be the case - assert(indelsets(0).getIndelRange().start === 34) - assert(indelsets(0).getIndelRange().end === 43) - assert(indelsets(1).getIndelRange().start === 54) - assert(indelsets(1).getIndelRange().end === 63) - // - } - sparkTest("creating SNP targets for mason reads") { - val targets_collected: Array[IndelRealignmentTarget] = RealignmentTargetFinder(mason_reads).toArray - assert(targets_collected.size > 0) - - // first look at SNPs - val only_SNPs = targets_collected.filter(_.getSNPSet() != Set.empty) //.collect() - // the first read has a single SNP - assert(only_SNPs(0).getSNPSet().size === 1) - assert(only_SNPs(0).getSNPSet().head.getSNPSite() === 701384) - // the second read has two SNPS - assert(only_SNPs(1).getSNPSet().size === 2) - assert(only_SNPs(1).getSNPSet().head.getSNPSite() === 702257) - assert(only_SNPs(1).getSNPSet().toIndexedSeq(1).getSNPSite() === 702282) - // the third has a single SNP - assert(only_SNPs(2).getSNPSet().size === 1) - assert(only_SNPs(2).getSNPSet().head.getSNPSite() === 807733) - // the last read has two SNPs - assert(only_SNPs(4).getSNPSet().size === 2) - assert(only_SNPs(4).getSNPSet().head.getSNPSite() === 869673) - assert(only_SNPs(4).getSNPSet().toIndexedSeq(1).getSNPSite() === 869572) + assert(targets_collected.size === 1) + assert(targets_collected.head.readRange.start === 5) + assert(targets_collected.head.readRange.end === 95) + assert(targets_collected.head.variation.get.start === 34) + assert(targets_collected.head.variation.get.end === 64) } sparkTest("creating indel targets for mason reads") { - object IndelRangeOrdering extends Ordering[IndelRange] { - def compare(x: IndelRange, y: IndelRange): Int = x.getIndelRange().start compare y.getIndelRange().start - } - val targets_collected: Array[IndelRealignmentTarget] = RealignmentTargetFinder(mason_reads).toArray - assert(targets_collected.size > 0) - val only_indels = targets_collected.filter(_.getIndelSet() != Set.empty) // the first read has no indels // the second read has a one-base deletion and a one-base insertion - assert(only_indels(0).getIndelSet().size === 2) - val tmp1 = new TreeSet()(IndelRangeOrdering).union(only_indels(0).getIndelSet()) - assert(tmp1.toIndexedSeq(0).getIndelRange().start == 702289 && tmp1.toIndexedSeq(0).getIndelRange().end == 702289) - assert(tmp1.toIndexedSeq(1).getIndelRange().start == 702323 && tmp1.toIndexedSeq(1).getIndelRange().end == 702323) + assert(targets_collected(0).variation.get.start == 702289 && targets_collected(0).variation.get.end == 702324) // the third read has a one base deletion - assert(only_indels(1).getIndelSet().size === 1) - assert(only_indels(1).getIndelSet().head.getIndelRange().start == 807755 && only_indels(1).getIndelSet().head.getIndelRange().end == 807755) + assert(targets_collected(1).variation.get.start == 807755 && targets_collected(1).variation.get.end == 807756) // read 7 has a single 4 bp deletion - assert(only_indels(5).getIndelSet().size === 1) - assert(only_indels(5).getIndelSet().head.getIndelRange().length === 4) - assert(only_indels(5).getIndelSet().head.getIndelRange().start == 869644 && only_indels(5).getIndelSet().head.getIndelRange().end == 869647) + assert(targets_collected(5).variation.get.length === 4) + assert(targets_collected(5).variation.get.start == 869644 && targets_collected(5).variation.get.end == 869648) } } diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/algorithms/smithwaterman/SmithWatermanSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/algorithms/smithwaterman/SmithWatermanSuite.scala new file mode 100644 index 0000000000..c0b1d2b078 --- /dev/null +++ b/adam-core/src/test/scala/org/bdgenomics/adam/algorithms/smithwaterman/SmithWatermanSuite.scala @@ -0,0 +1,240 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.adam.algorithms.smithwaterman + +import org.scalatest.FunSuite +import scala.math.abs + +class SmithWatermanSuite extends FunSuite { + + val epsilon = 1e-6 + def fpEquals(a: Double, b: Double): Boolean = { + abs(a - b) < epsilon + } + + test("gather max position from simple scoring matrix") { + val c0 = Array(0.0, 0.0, 0.0, 0.0, 0.0) + val c1 = Array(0.0, 1.0, 1.0, 1.0, 1.0) + val c2 = Array(0.0, 1.0, 2.0, 2.0, 2.0) + val c3 = Array(0.0, 1.0, 2.0, 3.0, 3.0) + val c4 = Array(0.0, 1.0, 2.0, 3.0, 4.0) + val matrix = Array(c0, c1, c2, c3, c4) + + val sw = new SmithWatermanConstantGapScoring("AAAA", "AAAA", 1.0, 0.0, -1.0, -1.0) + + val max = sw.maxCoordinates(matrix) + assert(max._1 === 4) + assert(max._2 === 4) + } + + test("gather max position from irregular scoring matrix") { + // _ A C G T + val c0 = Array(0.0, 0.0, 0.0, 0.0, 0.0) // _ + val c1 = Array(0.0, 1.0, 1.0, 1.0, 1.0) // A + val c2 = Array(0.0, 0.0, 2.0, 1.0, 1.0) // C + val c3 = Array(0.0, 1.0, 1.0, 2.0, 1.0) // A + val c4 = Array(0.0, 0.0, 1.0, 1.0, 3.0) // T + val c5 = Array(0.0, 0.0, 0.0, 0.0, 2.0) // G + val c6 = Array(0.0, 1.0, 0.0, 0.0, 1.0) // A + val matrix = Array(c0, c1, c2, c3, c4, c5, c6) + + val sw = new SmithWatermanConstantGapScoring("ACGT", "ACATGA", 1.0, 0.0, -1.0, -1.0) + + val max = sw.maxCoordinates(matrix) + assert(max._1 === 4) + assert(max._2 === 4) + } + + test("gather max position from irregular scoring matrix with deletions") { + // _ A C G A + val c0 = Array(0.0, 0.0, 0.0, 0.0, 0.0) // _ + val c1 = Array(0.0, 1.0, 1.0, 1.0, 1.0) // A + val c2 = Array(0.0, 0.5, 2.0, 1.0, 1.0) // C + val c3 = Array(0.0, 1.0, 1.5, 2.0, 1.5) // A + val c4 = Array(0.0, 0.5, 1.0, 1.5, 2.0) // T + val c5 = Array(0.0, 0.5, 0.5, 2.0, 1.5) // G + val c6 = Array(0.0, 1.0, 0.5, 1.5, 3.0) // A + val matrix = Array(c0, c1, c2, c3, c4, c5, c6) + + val sw = new SmithWatermanConstantGapScoring("ACGA", "ACATGA", 1.0, 0.0, -0.5, -0.5) + + val max = sw.maxCoordinates(matrix) + assert(max._1 === 4) + assert(max._2 === 6) + } + + test("score simple alignment with constant gap") { + val c0 = Array(0.0, 0.0, 0.0, 0.0, 0.0) + val c1 = Array(0.0, 1.0, 1.0, 1.0, 1.0) + val c2 = Array(0.0, 1.0, 2.0, 2.0, 2.0) + val c3 = Array(0.0, 1.0, 2.0, 3.0, 3.0) + val c4 = Array(0.0, 1.0, 2.0, 3.0, 4.0) + val matrix = Array(c0, c1, c2, c3, c4) + + val sw = new SmithWatermanConstantGapScoring("AAAA", "AAAA", 1.0, 0.0, -1.0, -1.0) + + val (swMatrix, _) = sw.buildScoringMatrix() + for (j <- 0 to 4) { + for (i <- 0 to 4) { + assert(fpEquals(swMatrix(i)(j), matrix(i)(j))) + } + } + } + + test("score irregular scoring matrix") { + // _ A C G T + val c0 = Array(0.0, 0.0, 0.0, 0.0, 0.0) // _ + val c1 = Array(0.0, 1.0, 0.0, 0.0, 0.0) // A + val c2 = Array(0.0, 0.0, 2.0, 1.0, 0.0) // C + val c3 = Array(0.0, 1.0, 1.0, 2.0, 1.0) // A + val c4 = Array(0.0, 0.0, 1.0, 1.0, 3.0) // T + val c5 = Array(0.0, 0.0, 0.0, 2.0, 2.0) // G + val c6 = Array(0.0, 1.0, 0.0, 1.0, 2.0) // A + val matrix = Array(c0, c1, c2, c3, c4, c5, c6) + + val sw = new SmithWatermanConstantGapScoring("ACATGA", "ACGT", 1.0, 0.0, -1.0, -1.0) + + val (swMatrix, _) = sw.buildScoringMatrix() + for (i <- 0 to 6) { + for (j <- 0 to 4) { + assert(fpEquals(swMatrix(i)(j), matrix(i)(j))) + } + } + } + + test("score irregular scoring matrix with indel") { + // _ A C G A + val c0 = Array(0.0, 0.0, 0.0, 0.0, 0.0) // _ + val c1 = Array(0.0, 1.0, 0.5, 0.0, 1.0) // A + val c2 = Array(0.0, 0.5, 2.0, 1.5, 1.0) // C + val c3 = Array(0.0, 1.0, 1.5, 2.0, 2.5) // A + val c4 = Array(0.0, 0.5, 1.0, 1.5, 2.0) // T + val c5 = Array(0.0, 0.0, 0.5, 2.0, 1.5) // G + val c6 = Array(0.0, 1.0, 0.5, 1.5, 3.0) // A + val matrix = Array(c0, c1, c2, c3, c4, c5, c6) + + val sw = new SmithWatermanConstantGapScoring("ACATGA", "ACGA", 1.0, 0.0, -0.5, -0.5) + + val (swMatrix, _) = sw.buildScoringMatrix() + for (i <- 0 to 6) { + for (j <- 0 to 4) { + assert(fpEquals(swMatrix(i)(j), matrix(i)(j))) + } + } + } + + val swh = new SmithWatermanConstantGapScoring("", "", 0.0, 0.0, 0.0, 0.0) + + test("can unroll cigars correctly") { + assert(swh.cigarFromRNNCigar("MDDMMMM") === "4M2D1M") + assert(swh.cigarFromRNNCigar("MMMIIMM") === "2M2I3M") + assert(swh.cigarFromRNNCigar("MMMMMMMM") === "8M") + } + + test("execute simple trackback") { + val c0 = Array('T', 'T', 'T', 'T', 'T') + val c1 = Array('T', 'B', 'B', 'B', 'B') + val c2 = Array('T', 'B', 'B', 'B', 'B') + val c3 = Array('T', 'B', 'B', 'B', 'B') + val c4 = Array('T', 'B', 'B', 'B', 'B') + val matrix = Array(c0, c1, c2, c3, c4) + + val (cx, cy, _, _) = swh.move(matrix, 4, 4, "", "") + + assert(cx === "4M") + assert(cy === "4M") + } + + test("execute trackback with indel") { + // _ A C G A + val c0 = Array('T', 'T', 'T', 'T', 'T') // _ + val c1 = Array('T', 'B', 'J', 'T', 'B') // A + val c2 = Array('T', 'I', 'B', 'B', 'B') // C + val c3 = Array('T', 'B', 'J', 'I', 'B') // A + val c4 = Array('T', 'I', 'J', 'B', 'B') // T + val c5 = Array('T', 'T', 'I', 'B', 'B') // G + val c6 = Array('T', 'B', 'J', 'B', 'B') // A + val matrix = Array(c0, c1, c2, c3, c4, c5, c6) + + val (cx, cy, _, _) = swh.move(matrix, 6, 4, "", "") + + assert(cx === "2M2I2M") + assert(cy === "2M2D2M") + } + + test("run end to end smith waterman for simple reads") { + val sw = new SmithWatermanConstantGapScoring("AAAA", "AAAA", 1.0, 0.0, -1.0, -1.0) + + assert(sw.cigarX.toString === "4M") + assert(sw.cigarY.toString === "4M") + } + + test("run end to end smith waterman for short sequences with indel") { + val sw = new SmithWatermanConstantGapScoring("ACATGA", "ACGA", 1.0, 0.0, -0.333, -0.333) + + assert(sw.cigarX.toString === "2M2I2M") + assert(sw.cigarY.toString === "2M2D2M") + } + + test("run end to end smith waterman for longer sequences with snp") { + // ATTAGACTACTTAATATACAGATTTACCCCAATAGA + // ATTAGACTACTTAATATACAGAATTACCCCAATAGA + val sw = new SmithWatermanConstantGapScoring("ATTAGACTACTTAATATACAGATTTACCCCAATAGA", + "ATTAGACTACTTAATATACAGAATTACCCCAATAGA", + 1.0, 0.0, -0.333, -0.333) + + assert(sw.cigarX.toString === "36M") + assert(sw.cigarY.toString === "36M") + } + + test("run end to end smith waterman for longer sequences with short indel") { + // ATTAGACTACTTAATATACAGATTTACCCCAATAGA + // ATTAGACTACTTAATATACAGA__TACCCCAATAGA + val sw = new SmithWatermanConstantGapScoring("ATTAGACTACTTAATATACAGATTTACCCCAATAGA", + "ATTAGACTACTTAATATACAGATACCCCAATAGA", + 1.0, 0.0, -0.333, -0.333) + + assert(sw.cigarX.toString === "22M2I12M") + assert(sw.cigarY.toString === "22M2D12M") + } + + test("run end to end smith waterman for shorter sequence in longer sequence") { + // ATTAGACTACTTAATATACAGATTTACCCCAATAGA + // ACTTAATATACAGATTTACC + val sw = new SmithWatermanConstantGapScoring("ATTAGACTACTTAATATACAGATTTACCCCAATAGA", + "ACTTAATATACAGATTTACC", + 1.0, 0.0, -0.333, -0.333) + + assert(sw.cigarX.toString === "20M") + assert(sw.cigarY.toString === "20M") + assert(sw.xStart === 8) + } + + test("run end to end smith waterman for shorter sequence in longer sequence, with indel") { + // ATTAGACTACTTAATATACAGATTTACCCCAATAGA + // ACTTAATAT__AGATTTACC + val sw = new SmithWatermanConstantGapScoring("ATTAGACTACTTAATATACAGATTTACCCCAATAGA", + "ACTTAATATAGATTTACC", + 1.0, 0.0, -0.333, -0.333) + + assert(sw.cigarX.toString === "9M2I9M") + assert(sw.cigarY.toString === "9M2D9M") + assert(sw.xStart === 8) + } + +} diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/models/ConsensusSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/models/ConsensusSuite.scala index ffb9fcace2..8f782bc7e1 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/models/ConsensusSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/models/ConsensusSuite.scala @@ -18,12 +18,11 @@ package org.bdgenomics.adam.models import org.scalatest.FunSuite -import net.sf.samtools.Cigar class ConsensusSuite extends FunSuite { test("test the insertion of a consensus insertion into a reference") { - val c = Consensus("TCGA", 10L to 10L) + val c = Consensus("TCGA", ReferenceRegion("0", 10L, 11L)) val ref = "AAAAAAAAAA" @@ -33,7 +32,7 @@ class ConsensusSuite extends FunSuite { } test("test the insertion of a consensus deletion into a reference") { - val c = Consensus("", 10L to 15L) + val c = Consensus("", ReferenceRegion("0", 10L, 16L)) val ref = "AAAAATTTTT" @@ -44,7 +43,7 @@ class ConsensusSuite extends FunSuite { test("inserting empty consensus returns the reference") { val ref = "AAAAAAAAAAAAA" - val c = new Consensus("", 0L to 0L) + val c = new Consensus("", ReferenceRegion("0", 0L, 1L)) val co = c.insertIntoReference(ref, 0, ref.length) diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/models/IndelTableSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/models/IndelTableSuite.scala new file mode 100644 index 0000000000..e9ddd96c04 --- /dev/null +++ b/adam-core/src/test/scala/org/bdgenomics/adam/models/IndelTableSuite.scala @@ -0,0 +1,81 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.adam.models + +import org.bdgenomics.adam.util.SparkFunSuite +import org.bdgenomics.formats.avro.{ ADAMContig, ADAMVariant } + +class IndelTableSuite extends SparkFunSuite { + + // indel table containing a 1 bp deletion at chr1, pos 1000 + val indelTable = new IndelTable(Map("1" -> Iterable(Consensus("", ReferenceRegion("1", 1000L, 1002L))))) + + test("check for indels in a region with known indels") { + assert(indelTable.getIndelsInRegion(ReferenceRegion("1", 0L, 2000L)).length === 1) + } + + test("check for indels in a contig that doesn't exist") { + assert(indelTable.getIndelsInRegion(ReferenceRegion("0", 0L, 0L)).length === 0) + } + + test("check for indels in a region without known indels") { + assert(indelTable.getIndelsInRegion(ReferenceRegion("1", 1002L, 1005L)).length === 0) + } + + sparkTest("build indel table from rdd of variants") { + val ctg1 = ADAMContig.newBuilder() + .setContigName("1") + .build() + val ctg2 = ADAMContig.newBuilder() + .setContigName("2") + .build() + val ins = ADAMVariant.newBuilder() + .setContig(ctg1) + .setPosition(1000L) + .setReferenceAllele("A") + .setVariantAllele("ATT") + .build() + val del = ADAMVariant.newBuilder() + .setContig(ctg2) + .setPosition(50L) + .setReferenceAllele("ACAT") + .setVariantAllele("A") + .build() + + val rdd = sc.parallelize(Seq(ins, del)) + + val table = IndelTable(rdd) + + // check insert + val insT = table.getIndelsInRegion(ReferenceRegion("1", 1000L, 1010L)) + assert(insT.length === 1) + assert(insT.head.consensus === "TT") + assert(insT.head.index.referenceName === "1") + assert(insT.head.index.start === 1001) + assert(insT.head.index.end === 1002) + + // check delete + val delT = table.getIndelsInRegion(ReferenceRegion("2", 40L, 60L)) + assert(delT.length === 1) + assert(delT.head.consensus === "") + assert(delT.head.index.referenceName === "2") + assert(delT.head.index.start === 51) + assert(delT.head.index.end === 54) + } + +} diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/RealignIndelsSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/RealignIndelsSuite.scala index c993019702..c7f7babd56 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/RealignIndelsSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/RealignIndelsSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.rdd.RDD import org.bdgenomics.formats.avro.ADAMRecord import org.bdgenomics.adam.algorithms.realignmenttarget.RealignmentTargetFinder import org.bdgenomics.adam.algorithms.realignmenttarget.IndelRealignmentTarget -import org.bdgenomics.adam.models.Consensus +import org.bdgenomics.adam.models.{ Consensus, ReferencePosition } import org.bdgenomics.adam.rich.RichADAMRecord class RealignIndelsSuite extends SparkFunSuite { @@ -53,8 +53,8 @@ class RealignIndelsSuite extends SparkFunSuite { } sparkTest("checking mapping to targets for artificial reads") { - val targets = RealignmentTargetFinder(artificial_reads) - assert(targets.size == 1) + val targets = RealignmentTargetFinder(artificial_reads.map(RichADAMRecord(_))) + assert(targets.size === 1) val rr = artificial_reads.map(RichADAMRecord(_)) val readsMappedToTarget = RealignIndels.mapTargets(rr, targets).map(kv => { val (t, r) = kv @@ -62,16 +62,15 @@ class RealignIndelsSuite extends SparkFunSuite { (t, r.map(r => r.record)) }).collect() - assert(readsMappedToTarget.size == 2) + assert(readsMappedToTarget.size === 2) readsMappedToTarget.forall { - case (target: IndelRealignmentTarget, reads: Seq[ADAMRecord]) => reads.forall { + case (target: Option[IndelRealignmentTarget], reads: Seq[ADAMRecord]) => reads.forall { read => { if (read.getStart <= 25) { - var result: Boolean = (2 == target.indelSet.size.toInt) - result = result && (target.getReadRange().start.toLong <= read.getStart.toLong) - result && (target.getReadRange().end >= read.end.get - 1L) + val result = target.get.readRange.start <= read.getStart.toLong + result && (target.get.readRange.end >= read.end.get) } else { target.isEmpty } @@ -86,7 +85,7 @@ class RealignIndelsSuite extends SparkFunSuite { // similar to realignTargetGroup() in RealignIndels artificial_reads.collect().toList.foreach(r => { if (r.mdTag.get.hasMismatches) { - consensus = Consensus.generateAlternateConsensus(r.getSequence, r.getStart, r.samtoolsCigar) match { + consensus = Consensus.generateAlternateConsensus(r.getSequence, ReferencePosition("0", r.getStart), r.samtoolsCigar) match { case Some(o) => o :: consensus case None => consensus } @@ -96,16 +95,16 @@ class RealignIndelsSuite extends SparkFunSuite { assert(consensus.length > 0) // Note: it seems that consensus ranges are non-inclusive assert(consensus.get(0).index.start === 34) - assert(consensus.get(0).index.end === 44) + assert(consensus.get(0).index.end === 45) assert(consensus.get(0).consensus === "") assert(consensus.get(1).index.start === 54) - assert(consensus.get(1).index.end === 64) + assert(consensus.get(1).index.end === 65) assert(consensus.get(1).consensus === "") // TODO: add check with insertions, how about SNPs } sparkTest("checking extraction of reference from reads") { - def checkReference(readReference: Tuple3[String, Long, Long]) { + def checkReference(readReference: (String, Long, Long)) { // the first three lines of artificial.fasta val refStr = "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAGGGGGGGGGGAAAAAAAAAAGGGGGGGGGGAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" val startIndex = Math.min(readReference._2.toInt, 120) @@ -113,13 +112,15 @@ class RealignIndelsSuite extends SparkFunSuite { assert(readReference._1 === refStr.substring(startIndex, stopIndex)) } - val targets = RealignmentTargetFinder(artificial_reads) + val targets = RealignmentTargetFinder(artificial_reads.map(RichADAMRecord(_))) val rr = artificial_reads.map(RichADAMRecord(_)) - val readsMappedToTarget: Array[Tuple2[IndelRealignmentTarget, Iterable[ADAMRecord]]] = RealignIndels.mapTargets(rr, targets).map(kv => { - val (t, r) = kv + val readsMappedToTarget: Array[(IndelRealignmentTarget, Iterable[ADAMRecord])] = RealignIndels.mapTargets(rr, targets) + .filter(_._1.isDefined) + .map(kv => { + val (t, r) = kv - (t, r.map(r => r.record)) - }).collect() + (t.get, r.map(r => r.record)) + }).collect() val readReference = readsMappedToTarget.map { case (target, reads) => { @@ -134,14 +135,6 @@ class RealignIndelsSuite extends SparkFunSuite { assert(readReference != null) } - sparkTest("checking search for consensus list for artitifical reads") { - val (realignedReads, readsToClean, consensus) = (new RealignIndels()).findConsensus(artificial_reads.map(new RichADAMRecord(_)) - .collect() - .toSeq) - - assert(consensus.length === 2) - } - sparkTest("checking realigned reads for artificial input") { val artificial_realigned_reads_collected = artificial_realigned_reads.collect() val gatk_artificial_realigned_reads_collected = gatk_artificial_realigned_reads.collect() @@ -163,7 +156,7 @@ class RealignIndelsSuite extends SparkFunSuite { } sparkTest("test mismatch quality scoring") { - val ri = new RealignIndels + val ri = new RealignIndels() val read = "AAAAAAAA" val ref = "AAGGGGAA" val qScores = Seq(40, 40, 40, 40, 40, 40, 40, 40) @@ -172,7 +165,7 @@ class RealignIndelsSuite extends SparkFunSuite { } sparkTest("test mismatch quality scoring for no mismatches") { - val ri = new RealignIndels + val ri = new RealignIndels() val read = "AAAAAAAA" val qScores = Seq(40, 40, 40, 40, 40, 40, 40, 40) @@ -180,8 +173,8 @@ class RealignIndelsSuite extends SparkFunSuite { } sparkTest("test mismatch quality scoring after unpacking read") { - val ri = new RealignIndels - val read = artificial_reads.first + val ri = new RealignIndels() + val read = artificial_reads.first() assert(ri.sumMismatchQuality(read) === 800) } diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/util/MdTagSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/util/MdTagSuite.scala index cd311924ff..b0d5b081f2 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/util/MdTagSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/util/MdTagSuite.scala @@ -24,6 +24,8 @@ import org.bdgenomics.adam.rich.RichADAMRecord._ class MdTagSuite extends FunSuite { + val CIGAR_CODEC: TextCigarCodec = TextCigarCodec.getSingleton + test("null md tag") { MdTag(null, 0L) } @@ -241,8 +243,6 @@ class MdTagSuite extends FunSuite { .setMismatchingPositions("27G0G0^GGGGGGGGAA8G0G0G0G0G0G0G0G0G0G13") .build() - val CIGAR_CODEC: TextCigarCodec = TextCigarCodec.getSingleton - val newCigar = CIGAR_CODEC.decode("27M10D33M") val newTag = MdTag.moveAlignment(read, newCigar) @@ -259,8 +259,6 @@ class MdTagSuite extends FunSuite { .setMismatchingPositions("27G0G0^GGGGGGGGAA8G0G0G0G0G0G0G0G0G0G13") .build() - val CIGAR_CODEC: TextCigarCodec = TextCigarCodec.getSingleton - val newCigar = CIGAR_CODEC.decode("60M") val newTag = MdTag.moveAlignment(read, newCigar, "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", 100L) @@ -279,8 +277,6 @@ class MdTagSuite extends FunSuite { .setMismatchingPositions("27G0G0^GGGGGGGGAA8G0G0G0G0G0G0G0G0G0G13") .build() - val CIGAR_CODEC: TextCigarCodec = TextCigarCodec.getSingleton - val newCigar = CIGAR_CODEC.decode("60M") val newTag = MdTag.moveAlignment(read, newCigar, "GGAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", 100L) @@ -299,8 +295,6 @@ class MdTagSuite extends FunSuite { .setMismatchingPositions("27G0G0^GGGGGGGGAA8G0G0G0G0G0G0G0G0G0G13") .build() - val CIGAR_CODEC: TextCigarCodec = TextCigarCodec.getSingleton - val newCigar = CIGAR_CODEC.decode("10M10D50M") val newTag = MdTag.moveAlignment(read, newCigar, "AAAAAAAAAAGGGGGGGGGGAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", 100L) @@ -319,8 +313,6 @@ class MdTagSuite extends FunSuite { .setMismatchingPositions("27G0G0^GGGGGGGGAA8G0G0G0G0G0G0G0G0G0G13") .build() - val CIGAR_CODEC: TextCigarCodec = TextCigarCodec.getSingleton - val newCigar = CIGAR_CODEC.decode("10I50M") val newTag = MdTag.moveAlignment(read, newCigar, "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", 100L) @@ -330,4 +322,54 @@ class MdTagSuite extends FunSuite { assert(newTag.end() === 149L) } + test("create new md tag from read vs. reference, perfect match") { + val read = "ACCATAGA" + val reference = "ACCATAGA" + val cigar = CIGAR_CODEC.decode("8M") + val start = 0L + + val tag = MdTag(read, reference, cigar, start) + + assert(tag.toString === "8") + } + + test("create new md tag from read vs. reference, perfect alignment match, 1 mismatch") { + val read = "ACCATAGA" + val reference = "ACAATAGA" + val cigar = CIGAR_CODEC.decode("8M") + val start = 0L + + val tag = MdTag(read, reference, cigar, start) + + assert(tag.toString === "2A5") + assert(tag.start === 0L) + assert(tag.end === 7L) + } + + test("create new md tag from read vs. reference, alignment with deletion") { + val read = "ACCATAGA" + val reference = "ACCATTTAGA" + val cigar = CIGAR_CODEC.decode("5M2D3M") + val start = 5L + + val tag = MdTag(read, reference, cigar, start) + + assert(tag.toString === "5^TT3") + assert(tag.start === 5L) + assert(tag.end === 14L) + } + + test("create new md tag from read vs. reference, alignment with insert") { + val read = "ACCCATAGA" + val reference = "ACCATAGA" + val cigar = CIGAR_CODEC.decode("3M1I5M") + val start = 10L + + val tag = MdTag(read, reference, cigar, start) + + assert(tag.toString === "8") + assert(tag.start === 10L) + assert(tag.end === 17L) + } + }