From b1fce67705ab49815a9707938972a7d1c0a5f2f4 Mon Sep 17 00:00:00 2001 From: Frank Austin Nothaft Date: Fri, 20 Jan 2017 08:34:50 -0800 Subject: [PATCH] [ADAM-1358] Refactor BQSR to improve performance and legibility. Resolves #1358. * Adds instrumentation to BQSR. * Changed SnpTable to remove RichVariant conversion, use VariantRDD API. * Refactoring SnpTable to eliminate per-residue costly masked site lookup. * Restructuring core of SnpTable around an array to improve GC performance. Additionally, wrote custom serializer to improve serialization performance. * Added test suite for SnpTable, to test table creation. * Refactored SnpTable to use an IntervalArray-like approach. This approach improves masked site lookup performance by 50%. * Added tests to SnpTableSuite to cover lookup case, and reenabled tests in BaseQualityRecalibrationSuite. * Adding unit test coverage to covariates * Revert "[ADAM-775] Allow all IUPAC codes in BQSR" This reverts commit 207eebaf3dbf6f01e5b8398b2af388ee70c862aa. * Pulled Seq allocation for base check out into an immutable set. * Rewrote dinuc covariate. 50% improvement in performance. * Rewrite main BQSR aggregate as reduce by key * Added tests to recalibrator, recalibration table. * Majorly refactors of BQSR tables. * Starting to factor out the QualityScore class * Refactoring CovariateKey to reduce size in memory * Eliminated `org.bdgenomics.adam.rich.DecadentRead` (partially resolves #577) * Refactor CovariateKey to store record group ID instead of record group name. * Removed `org.bdgenomics.adam.models.QualityScore`. * Split multi-class files into one class per file (excepting private classes) to improve navigability. * Scaladoc all the recalibrators! You get a scaladoc! And you get a scaladoc! --- .../org/bdgenomics/adam/cli/Transform.scala | 28 +- .../adam/instrumentation/Timers.scala | 13 +- .../bdgenomics/adam/models/QualityScore.scala | 90 ----- .../org/bdgenomics/adam/models/SnpTable.scala | 186 +++++++-- .../adam/rdd/read/AlignmentRecordRDD.scala | 15 +- .../rdd/read/recalibration/Aggregate.scala | 75 ++++ .../BaseQualityRecalibration.scala | 377 ++++++++++++++---- .../rdd/read/recalibration/Covariate.scala | 122 +----- .../rdd/read/recalibration/CovariateKey.scala | 65 +++ .../read/recalibration/CovariateSpace.scala | 118 ++++++ .../read/recalibration/CycleCovariate.scala | 58 ++- .../read/recalibration/DinucCovariate.scala | 100 +++-- .../rdd/read/recalibration/Observation.scala | 91 +++++ .../read/recalibration/ObservationTable.scala | 136 +------ .../recalibration/RecalibrationTable.scala | 271 +++++++++++++ .../rdd/read/recalibration/Recalibrator.scala | 209 +++------- .../bdgenomics/adam/rich/DecadentRead.scala | 190 --------- .../adam/rich/ReferenceSequenceContext.scala | 40 -- .../adam/rich/RichAlignmentRecord.scala | 102 ----- .../serialization/ADAMKryoRegistrator.scala | 12 +- adam-core/src/test/resources/bqsr1.vcf | 1 + .../adam/models/SnpTableSuite.scala | 119 ++++++ .../BaseQualityRecalibrationSuite.scala | 41 +- .../recalibration/CycleCovariateSuite.scala | 166 ++++++++ .../recalibration/DinucCovariateSuite.scala | 96 +++++ .../RecalibrationTableSuite.scala | 45 +++ .../recalibration/RecalibratorSuite.scala | 99 +++++ .../adam/rich/DecadentReadSuite.scala | 155 ------- .../adam/rich/RichAlignmentRecordSuite.scala | 52 --- docs/source/50_cli.md | 2 - 30 files changed, 1870 insertions(+), 1204 deletions(-) delete mode 100644 adam-core/src/main/scala/org/bdgenomics/adam/models/QualityScore.scala create mode 100644 adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/Aggregate.scala create mode 100644 adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/CovariateKey.scala create mode 100644 adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/CovariateSpace.scala create mode 100644 adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/Observation.scala create mode 100644 adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/RecalibrationTable.scala delete mode 100644 adam-core/src/main/scala/org/bdgenomics/adam/rich/DecadentRead.scala delete mode 100644 adam-core/src/main/scala/org/bdgenomics/adam/rich/ReferenceSequenceContext.scala create mode 100644 adam-core/src/test/scala/org/bdgenomics/adam/models/SnpTableSuite.scala create mode 100644 adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/recalibration/CycleCovariateSuite.scala create mode 100644 adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/recalibration/DinucCovariateSuite.scala create mode 100644 adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/recalibration/RecalibrationTableSuite.scala create mode 100644 adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/recalibration/RecalibratorSuite.scala delete mode 100644 adam-core/src/test/scala/org/bdgenomics/adam/rich/DecadentReadSuite.scala diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Transform.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Transform.scala index d0a64d89ea..4a2999564c 100644 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Transform.scala +++ b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Transform.scala @@ -59,10 +59,10 @@ class TransformArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs { var markDuplicates: Boolean = false @Args4jOption(required = false, name = "-recalibrate_base_qualities", usage = "Recalibrate the base quality scores (ILLUMINA only)") var recalibrateBaseQualities: Boolean = false + @Args4jOption(required = false, name = "-min_acceptable_quality", usage = "Minimum acceptable quality for recalibrating a base in a read. Default is 5.") + var minAcceptableQuality: Int = 5 @Args4jOption(required = false, name = "-stringency", usage = "Stringency level for various checks; can be SILENT, LENIENT, or STRICT. Defaults to LENIENT") var stringency: String = "LENIENT" - @Args4jOption(required = false, name = "-dump_observations", usage = "Local path to dump BQSR observations to. Outputs CSV format.") - var observationsPath: String = null @Args4jOption(required = false, name = "-known_snps", usage = "Sites-only VCF giving location of known SNPs") var knownSnpsFile: String = null @Args4jOption(required = false, name = "-realign_indels", usage = "Locally realign indels present in reads.") @@ -218,25 +218,25 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans log.info("Recalibrating base qualities") // bqsr is a two pass algorithm, so cache the rdd if requested - if (args.cache) { - rdd.rdd.persist(sl) + val optSl = if (args.cache) { + Some(sl) + } else { + None } // create the known sites file, if one is available val knownSnps: SnpTable = createKnownSnpsTable(rdd.rdd.context) + val broadcastedSnps = BroadcastingKnownSnps.time { + rdd.rdd.context.broadcast(knownSnps) + } // run bqsr val bqsredRdd = rdd.recalibateBaseQualities( - rdd.rdd.context.broadcast(knownSnps), - Option(args.observationsPath), - stringency + broadcastedSnps, + args.minAcceptableQuality, + optSl ) - // if we cached the input, unpersist it, as it is never reused after bqsr - if (args.cache) { - rdd.rdd.unpersist() - } - bqsredRdd } else { rdd @@ -477,7 +477,7 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans isSorted = args.sortReads || args.sortLexicographically) } - private def createKnownSnpsTable(sc: SparkContext): SnpTable = CreateKnownSnpsTable.time { - Option(args.knownSnpsFile).fold(SnpTable())(f => SnpTable(sc.loadVariants(f).rdd.map(new RichVariant(_)))) + private def createKnownSnpsTable(sc: SparkContext): SnpTable = { + Option(args.knownSnpsFile).fold(SnpTable())(f => SnpTable(sc.loadVariants(f))) } } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/instrumentation/Timers.scala b/adam-core/src/main/scala/org/bdgenomics/adam/instrumentation/Timers.scala index af20ed8836..2a0eb56503 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/instrumentation/Timers.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/instrumentation/Timers.scala @@ -49,13 +49,24 @@ object Timers extends Metrics { // Recalibrate Base Qualities val BQSRInDriver = timer("Base Quality Recalibration") - val CreateKnownSnpsTable = timer("Create Known SNPs Table") + val CreatingKnownSnpsTable = timer("Creating Known SNPs Table") + val CollectingSnps = timer("Collecting SNPs") + val BroadcastingKnownSnps = timer("Broadcasting known SNPs") val ComputeCovariates = timer("Compute Covariates") + val ObservingRead = timer("Observing covariates per read") + val ReadCovariates = timer("Computing covariates per read base") + val ComputingDinucCovariate = timer("Computing dinuc covariate") + val ComputingCycleCovariate = timer("Computing cycle covariate") + val ReadResidues = timer("Splitting read into residues") + val CheckingForMask = timer("Checking if residue is masked") val ObservationAccumulatorComb = timer("Observation Accumulator: comb") val ObservationAccumulatorSeq = timer("Observation Accumulator: seq") val RecalibrateRead = timer("Recalibrate Read") val ComputeQualityScore = timer("Compute Quality Score") val GetExtraValues = timer("Get Extra Values") + val CreatingRecalibrationTable = timer("Creating recalibration table") + val InvertingRecalibrationTable = timer("Inverting recalibration table") + val QueryingRecalibrationTable = timer("Querying recalibration table") // Realign Indels val RealignIndelsInDriver = timer("Realign Indels") diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/models/QualityScore.scala b/adam-core/src/main/scala/org/bdgenomics/adam/models/QualityScore.scala deleted file mode 100644 index 0863a4800b..0000000000 --- a/adam-core/src/main/scala/org/bdgenomics/adam/models/QualityScore.scala +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to Big Data Genomics (BDG) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The BDG licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.bdgenomics.adam.models - -import org.bdgenomics.adam.util.PhredUtils - -/** - * Model describing a single Quality score. - * - * @param phred The phred score. - */ -private[adam] case class QualityScore(phred: Int) extends Ordered[QualityScore] with Serializable { - - // Valid range of phred + 33 is described by the regex "[!-~]". - require(phred + 33 >= '!'.toInt && phred + 33 <= '~'.toInt, "Phred %s out of range".format(phred)) - - /** - * @return This quality score as a success probability. - */ - def successProbability: Double = PhredUtils.phredToSuccessProbability(phred) - - /** - * @return This quality score as an error probability. - */ - def errorProbability: Double = PhredUtils.phredToErrorProbability(phred) - - /** - * @return Returns this quality score as a visible ASCII character. Assumes - * the Illumina encoding (Phred 0 == 33 == '!'). - */ - def toChar: Char = (phred + 33).toChar - - override def compare(that: QualityScore) = this.phred compare that.phred - - override def toString = "Q%02d".format(phred) - - override def equals(other: Any): Boolean = other match { - case that: QualityScore => this.phred == that.phred - case _ => false - } - - override def hashCode: Int = phred -} - -/** - * Companion object for building quality score objects. - */ -private[adam] object QualityScore { - - /** - * The lowest quality score. - */ - val zero = QualityScore(0) - - /** - * @param quals A seq of quality scores to encode. - * @return Returns this seq of quality scores as a visible ASCII string. - * Assumes the Illumina encoding (Phred 0 == 33 == '!'). - */ - def toString(quals: Seq[QualityScore]): String = { - String.valueOf(quals.map(_.toChar).toArray) - } - - /** - * Builds a quality score given an error probability. - * - * @param p The error probability to translate into a phred scaled quality - * score. - * @return Returns a QualityScore equivalent to this error probability, but - * rounded to the nearest int. - */ - def fromErrorProbability(p: Double) = { - QualityScore(PhredUtils.errorProbabilityToPhred(p)) - } -} diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/models/SnpTable.scala b/adam-core/src/main/scala/org/bdgenomics/adam/models/SnpTable.scala index f687b5afec..cfc071c3b6 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/models/SnpTable.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/models/SnpTable.scala @@ -17,52 +17,114 @@ */ package org.bdgenomics.adam.models -import org.bdgenomics.adam.rich.RichVariant -import org.bdgenomics.adam.rich.DecadentRead._ -import org.bdgenomics.utils.misc.Logging +import com.esotericsoftware.kryo.io.{ Input, Output } +import com.esotericsoftware.kryo.{ Kryo, Serializer } +import org.apache.spark.rdd.MetricsContext._ import org.apache.spark.rdd.RDD -import scala.collection.immutable._ -import scala.collection.mutable +import org.bdgenomics.adam.instrumentation.Timers._ +import org.bdgenomics.adam.rdd.variant.VariantRDD +import org.bdgenomics.utils.misc.Logging +import scala.annotation.tailrec +import scala.math.{ max, min } /** * A table containing all of the SNPs in a known variation dataset. * - * @param table A map between a contig name and a set containing all coordinates - * where a point variant is known to exist. + * @param indices A map of contig names to the (first, last) index in the + * site array that contain data from this contig. + * @param sites An array containing positions that have masked SNPs. Sorted by + * contig name and then position. */ -class SnpTable(private val table: Map[String, Set[Long]]) extends Serializable with Logging { - log.info("SNP table has %s contigs and %s entries".format( - table.size, - table.values.map(_.size).sum)) +class SnpTable private[models] ( + private[models] val indices: Map[String, (Int, Int)], + private[models] val sites: Array[Long]) extends Serializable with Logging { - /** - * Is there a known SNP at the reference location of this Residue? - */ - def isMasked(residue: Residue): Boolean = - contains(residue.referencePosition) + private val midpoints: Map[String, Int] = { + @tailrec def pow2ceil(length: Int, i: Int = 1): Int = { + if (2 * i >= length) { + i + } else { + pow2ceil(length, 2 * i) + } + } - /** - * Is there a known SNP at the given reference location? - */ - def contains(location: ReferencePosition): Boolean = { - val bucket = table.get(location.referenceName) - if (bucket.isEmpty) unknownContigWarning(location.referenceName) - bucket.exists(_.contains(location.pos)) + indices.mapValues(p => { + val (start, end) = p + pow2ceil(end - start + 1) + }) } - private val unknownContigs = new mutable.HashSet[String] + @tailrec private def binarySearch(rr: ReferenceRegion, + offset: Int, + length: Int, + step: Int, + idx: Int = 0): Option[Int] = { + if (length == 0) { + None + } else if (rr.start <= sites(offset + idx) && rr.end > sites(offset + idx)) { + // if we've satistfied this last condition, then the read is overlapping the + // current index and we have a hit + Some(offset + idx) + } else if (step == 0) { + None + } else { + val stepIdx = idx + step + val nextIdx: Int = if (stepIdx >= length || + rr.end <= sites(offset + stepIdx)) { + idx + } else { + stepIdx + } + binarySearch(rr, offset, length, step / 2, nextIdx) + } + } + + @tailrec private def extendForward(rr: ReferenceRegion, + offset: Int, + idx: Int, + list: List[Long] = List.empty): List[Long] = { + if (idx < offset) { + list + } else { + if (rr.start > sites(idx)) { + list + } else { + extendForward(rr, offset, idx - 1, sites(idx) :: list) + } + } + } - private def unknownContigWarning(contig: String) = { - // This is synchronized to avoid a data race. Multiple threads may - // race to update `unknownContigs`, e.g. when running with a Spark - // master of `local[N]`. - synchronized { - if (!unknownContigs.contains(contig)) { - unknownContigs += contig - log.warn("Contig has no entries in known SNPs table: %s".format(contig)) + @tailrec private def extendBackwards(rr: ReferenceRegion, + end: Int, + idx: Int, + list: List[Long]): Set[Long] = { + if (idx > end) { + list.toSet + } else { + if (rr.end <= sites(idx)) { + list.toSet + } else { + extendBackwards(rr, end, idx + 1, sites(idx) :: list) } } } + + /** + * Is there a known SNP at the reference location of this Residue? + */ + private[adam] def maskedSites(rr: ReferenceRegion): Set[Long] = CheckingForMask.time { + val optRange = indices.get(rr.referenceName) + + optRange.flatMap(range => { + val (offset, end) = range + val optIdx = binarySearch(rr, offset, end - offset + 1, midpoints(rr.referenceName)) + + optIdx.map(idx => { + extendBackwards(rr, end, idx + 1, extendForward(rr, offset, idx)) + .map(_.toLong) + }) + }).getOrElse(Set.empty) + } } /** @@ -76,20 +138,64 @@ object SnpTable { * @return An empty SNP table. */ def apply(): SnpTable = { - new SnpTable(Map[String, Set[Long]]()) + new SnpTable(Map.empty, + Array.empty) } /** - * Creates a SNP Table from an RDD of RichVariants. + * Creates a SNP Table from a VariantRDD. * * @param variants The variants to populate the table from. * @return Returns a new SNPTable containing the input variants. */ - def apply(variants: RDD[RichVariant]): SnpTable = { - val positions = variants.map(variant => (variant.variant.getContigName, - variant.variant.getStart)).collect() - val table = new mutable.HashMap[String, mutable.HashSet[Long]] - positions.foreach(tup => table.getOrElseUpdate(tup._1, { new mutable.HashSet[Long] }) += tup._2) - new SnpTable(table.mapValues(_.toSet).toMap) + def apply(variants: VariantRDD): SnpTable = CreatingKnownSnpsTable.time { + val (indices, positions) = CollectingSnps.time { + val sortedVariants = variants.sort + .rdd + .cache() + + val contigIndices = sortedVariants.map(_.getContigName) + .zipWithIndex + .mapValues(v => (v.toInt, v.toInt)) + .reduceByKeyLocally((p1, p2) => { + (min(p1._1, p2._1), max(p1._2, p2._2)) + }).toMap + val sites = sortedVariants.map(_.getStart: Long).collect() + + // unpersist the cached variants + sortedVariants.unpersist() + + (contigIndices, sites) + } + new SnpTable(indices, positions) + } +} + +private[adam] class SnpTableSerializer extends Serializer[SnpTable] { + + def write(kryo: Kryo, output: Output, obj: SnpTable) { + output.writeInt(obj.indices.size) + obj.indices.foreach(kv => { + val (contigName, (lowerBound, upperBound)) = kv + output.writeString(contigName) + output.writeInt(lowerBound) + output.writeInt(upperBound) + }) + output.writeInt(obj.sites.length) + obj.sites.foreach(output.writeLong(_)) + } + + def read(kryo: Kryo, input: Input, klazz: Class[SnpTable]): SnpTable = { + val indicesSize = input.readInt() + val indices = new Array[(String, (Int, Int))](indicesSize) + (0 until indicesSize).foreach(i => { + indices(i) = (input.readString(), (input.readInt(), input.readInt())) + }) + val sitesSize = input.readInt() + val sites = new Array[Long](sitesSize) + (0 until sitesSize).foreach(i => { + sites(i) = input.readLong() + }) + new SnpTable(indices.toMap, sites) } } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala index f37ad9bbef..e17b436f6e 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDD.scala @@ -629,15 +629,20 @@ case class AlignmentRecordRDD( * known SNPs to mask true variation during the recalibration process. * * @param knownSnps A table of known SNPs to mask valid variants. - * @param observationDumpFile An optional local path to dump recalibration - * observations to. + * @param minAcceptableQuality The minimum quality score to recalibrate. + * @param optStorageLevel An optional storage level to set for the output + * of the first stage of BQSR. Defaults to StorageLevel.MEMORY_ONLY. * @return Returns an RDD of recalibrated reads. */ def recalibateBaseQualities( knownSnps: Broadcast[SnpTable], - observationDumpFile: Option[String] = None, - validationStringency: ValidationStringency = ValidationStringency.LENIENT): AlignmentRecordRDD = BQSRInDriver.time { - replaceRdd(BaseQualityRecalibration(rdd, knownSnps, observationDumpFile, validationStringency)) + minAcceptableQuality: Int = 5, + optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY)): AlignmentRecordRDD = BQSRInDriver.time { + replaceRdd(BaseQualityRecalibration(rdd, + knownSnps, + recordGroups, + (minAcceptableQuality + 33).toChar, + optStorageLevel)) } /** diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/Aggregate.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/Aggregate.scala new file mode 100644 index 0000000000..3068a411df --- /dev/null +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/Aggregate.scala @@ -0,0 +1,75 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.adam.rdd.read.recalibration + +import org.bdgenomics.adam.util.PhredUtils + +/** + * A class representing the aggregated mismatch count in a covariate. + * + * @param total The total number of bases in this covariate. + * @param mismatches The total number of mismatching bases in this covariate. + * @param expectedMismatches The expected number of bases that we would see + * mismatching, given the quality scores assigned to the bases in this + * covariate. + */ +private[recalibration] class Aggregate( + total: Long, // number of total observations + mismatches: Long, // number of mismatches observed + val expectedMismatches: Double // expected number of mismatches based on reported quality scores + ) extends Observation(total, mismatches) { + + require(expectedMismatches <= total) + + /** + * @return Returns the probability of a base in this covariate being an error, + * given the number of observed bases and the number of observed errors. + */ + def reportedErrorProbability: Double = expectedMismatches / total.toDouble + + /** + * @param that Another aggregate to merge with. + * @return Returns the sum total of bases, errors, and expected errors between + * these two aggregates. + */ + def +(that: Aggregate): Aggregate = + new Aggregate( + this.total + that.total, + this.mismatches + that.mismatches, + this.expectedMismatches + that.expectedMismatches + ) +} + +private[recalibration] object Aggregate { + val empty: Aggregate = new Aggregate(0, 0, 0) + + /** + * @param key The error covariate that generated bases. Used to get the + * predicted error rate from the quality score. + * @param value The observation with the number of observed bases and errors. + * @return Returns an aggregate that uses the predicted error rate from the + * error covariate and the number of bases from the observation to say + * how many bases are expected to be in error. + */ + def apply(key: CovariateKey, value: Observation) = { + new Aggregate(value.total, + value.mismatches, + PhredUtils.phredToErrorProbability(key.quality.toInt - 33) * value.total) + } +} + diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/BaseQualityRecalibration.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/BaseQualityRecalibration.scala index b0e5ddf90f..dfcb919366 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/BaseQualityRecalibration.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/BaseQualityRecalibration.scala @@ -17,115 +17,336 @@ */ package org.bdgenomics.adam.rdd.read.recalibration -import htsjdk.samtools.ValidationStringency -import java.io._ -import org.bdgenomics.utils.misc.Logging +import htsjdk.samtools.{ + CigarElement, + CigarOperator, + TextCigarCodec, + ValidationStringency +} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD -import org.bdgenomics.adam.models.{ QualityScore, SnpTable } -import org.bdgenomics.adam.rich.DecadentRead -import org.bdgenomics.adam.rich.DecadentRead._ +import org.apache.spark.storage.StorageLevel +import org.bdgenomics.adam.models.{ + MdTag, + RecordGroupDictionary, + ReferenceRegion, + SnpTable +} +import org.bdgenomics.adam.instrumentation.Timers._ import org.bdgenomics.formats.avro.AlignmentRecord +import org.bdgenomics.utils.misc.Logging +import scala.annotation.tailrec /** * The algorithm proceeds in two phases. First, we make a pass over the reads * to collect statistics and build the recalibration tables. Then, we perform * a second pass over the reads to apply the recalibration and assign adjusted * quality scores. + * + * @param input The reads to recalibrate. + * @param knownSnps A broadcast variable containing the locations of any known + * SNPs to ignore during recalibration table generation. + * @param recordGroups The record groups that the reads in this dataset are from. + * @param minAcceptableAsciiPhred The minimum acceptable Phred score to attempt + * recalibrating, expressed as an ASCII character with Illumina (33) encodings. + * Defaults to '&' (Phred 5). + * @param optStorageLevel An optional storage level to apply if caching the + * output of the first stage of BQSR. */ private class BaseQualityRecalibration( - val input: RDD[(Option[DecadentRead], Option[AlignmentRecord])], + val input: RDD[AlignmentRecord], val knownSnps: Broadcast[SnpTable], - val dumpObservationTableFile: Option[String] = None) + val recordGroups: RecordGroupDictionary, + val minAcceptableAsciiPhred: Char = (5 + 33).toChar, + val optStorageLevel: Option[StorageLevel] = None) extends Serializable with Logging { - // Additional covariates to use when computing the correction - // TODO: parameterize - val covariates = CovariateSpace(new CycleCovariate, new DinucCovariate) - - // Bases with quality less than this will be skipped and left alone - // TODO: parameterize - val minAcceptableQuality = QualityScore(5) + /** + * An RDD representing the input reads along with any error covariates that + * the bases in the read map to. + */ + val dataset: RDD[(AlignmentRecord, Array[CovariateKey])] = { + val covRdd = input.map(read => { + val covariates = if (BaseQualityRecalibration.shouldIncludeRead(read)) { + BaseQualityRecalibration.observe(read, knownSnps, recordGroups) + } else { + Array.empty[CovariateKey] + } + (read, covariates) + }) - // Debug: Log the visited/skipped residues to bqsr-visits.dump - val enableVisitLogging = false + optStorageLevel.fold(covRdd)(sl => { + log.info("User requested %s persistance for covariate RDD.".format(sl)) + covRdd.persist(sl) + }) + } - val dataset: RDD[(CovariateKey, Residue)] = { - def shouldIncludeRead(read: DecadentRead) = - read.isCanonicalRecord && - read.record.record.getQual != null && - read.alignmentQuality.exists(_ > QualityScore.zero) && - read.passedQualityChecks + /** + * A table containing the error frequency observations for the given reads. + */ + val observed: ObservationTable = { + val observations = dataset.flatMap(p => p._2).flatMap(cov => { + if (cov.shouldInclude) { + Some((cov.toDefault, Observation(cov.isMismatch))) + } else { + None + } + }).reduceByKeyLocally(_ + _) + new ObservationTable(observations) + } - def shouldIncludeResidue(residue: Residue) = - residue.quality > QualityScore.zero && - residue.isRegularBase && - !residue.isInsertion && - !knownSnps.value.isMasked(residue) + /** + * The input reads but with the quality scores of the reads replaced with + * the quality scores given by the observation table. + */ + val result: RDD[AlignmentRecord] = { + val recalibrator = Recalibrator(observed, minAcceptableAsciiPhred) + dataset.map(p => recalibrator(p._1, p._2)) + } +} - def observe(read: DecadentRead): Seq[(CovariateKey, Residue)] = - covariates(read).zip(read.residues). - filter { case (key, residue) => shouldIncludeResidue(residue) } +private[read] object BaseQualityRecalibration { - input.flatMap(_._1).filter(shouldIncludeRead).flatMap(observe) + /** + * @param read The read to check to see if it is a proper quality read. + * @return We consider a read valid for use in generating the recalibration + * table if it is a "proper" read. That is to say, it is the canonical read + * (mapped as a primary alignment with non-zero mapQ and not marked as a + * duplicate), it did not fail vendor checks, and the quality and CIGAR + * are defined. + */ + private def shouldIncludeRead(read: AlignmentRecord) = { + (read.getReadMapped && read.getPrimaryAlignment && !read.getDuplicateRead) && + read.getQual != null && + (read.getMapq != null && read.getMapq > 0) && + (read.getCigar != null && read.getCigar != "*") && + !read.getFailedVendorQualityChecks } - if (enableVisitLogging) { - input.cache() - dataset.cache() - dumpVisits("bqsr-visits.dump") - } + /** + * @param read The read to generate residue inclusion/mismatch flags for. + * @param maskedSites The set of positions overlapped by this read that + * contain known SNPs. This is the product of a join-like operation. + * @return Returns two arrays that have the length of the read. The first + * array contains true at an index if the base at this index should be + * included in recalibration (non-zero quality, not in INDEL, not an N, + * not masked) and the second array contains true if the base is not a + * mismatch. + */ + private def computeResiduesToInclude(read: AlignmentRecord, + maskedSites: Set[Long]): (Array[Boolean], Array[Boolean]) = { + val readSequence = read.getSequence.toArray + val readQualities = read.getQual.toArray + val shouldInclude = new Array[Boolean](readSequence.length) + val isMismatch = new Array[Boolean](readSequence.length) + val readCigar = TextCigarCodec.decode(read.getCigar) + val readCigarIterator = readCigar.iterator + val firstCigarElement = readCigarIterator.next - val observed: ObservationTable = { - dataset. - map { case (key, residue) => (key, Observation(residue.isSNP)) }. - aggregate(ObservationAccumulator(covariates))(_ += _, _ ++= _).result - } + @tailrec def shouldIncludeResidue(shouldInclude: Array[Boolean], + isMismatch: Array[Boolean], + currentPos: Long, + readSequence: Array[Char], + readQualities: Array[Char], + readCigar: java.util.Iterator[CigarElement], + currentCigarElem: CigarElement, + optMd: Option[MdTag], + maskedSites: Set[Long], + cigarIdx: Int, + baseIdx: Int) { + if (baseIdx < readSequence.length) { + val currentCigarOp = currentCigarElem.getOperator + val (newPos, + newCigarElem, + newCigarIdx, + newBaseIdx) = if (currentCigarOp == CigarOperator.H) { + // we must either be at the start or the end of the read + // if we are at the end, we will have exited the loop already + // so just check to make sure we're at base 0 + assert(baseIdx == 0) + (currentPos, readCigar.next, 0, 0) + } else if (currentCigarOp == CigarOperator.N || + currentCigarOp == CigarOperator.D) { + (currentPos + currentCigarElem.getLength, + readCigar.next, + 0, + baseIdx) + } else if (currentCigarOp == CigarOperator.I || + currentCigarOp == CigarOperator.S) { - dumpObservationTableFile.foreach(p => { - val writer = new PrintWriter(new File(p)) + // soft clip at start needs "unclipped start pos" + val pos = if (currentCigarOp == CigarOperator.S && + baseIdx == 0) { + currentPos - currentCigarElem.getLength + } else { + currentPos + } - writer.write(observed.toCSV) - writer.close() - }) + // set all the overlaping bases to be ignored + val opLength = currentCigarElem.getLength + var localIdx = baseIdx + val baseIdxEnd = baseIdx + opLength - 1 + while (localIdx < baseIdxEnd) { + shouldInclude(localIdx) = false + isMismatch(localIdx) = false + localIdx += 1 + } - val result: RDD[AlignmentRecord] = { - val recalibrator = Recalibrator(observed, minAcceptableQuality) - input.map(recalibrator(_)) + // if we have an insertion, we have already advanced + // the start position in the match preceding the insertion + // if we have a soft clip, we have no need to advance the + // start position, since a soft clip implies that this base + // is not part of the local alignment + if (readCigar.hasNext) { + (currentPos, readCigar.next, 0, baseIdxEnd + 1) + } else { + // we cannot pop off the next cigar element when we are at the + // end of the read, so fake it + assert(baseIdxEnd >= readSequence.length - 1, baseIdxEnd) + (currentPos, currentCigarElem, 0, baseIdxEnd + 1) + } + } else { + shouldInclude(baseIdx) = ((readSequence(baseIdx) != 'N') && + !maskedSites(currentPos) && + readQualities(baseIdx) > '!') + isMismatch(baseIdx) = currentCigarOp match { + case CigarOperator.X => true + case CigarOperator.M => { + // TODO: allow user to broadcast a ReferenceFile + require(optMd.isDefined, + "Cigar M was seen for read with undefined MD tag.") + !optMd.get.isMatch(currentPos) + } + case _ => { + // we fall through to this case IFF we see a Cigar = + // however, the compiler can't check this since we + // if/else out of the non-X/M cases above (DHSIN, disregard P) + // so, we treat it as a case _ + false + } + } + + // are we at the end of this cigar block? if so, grab a new operator + val (nextCigarElem, + nextCigarIdx) = if (cigarIdx == currentCigarElem.getLength - 1) { + if (readCigar.hasNext) { + (readCigar.next, 0) + } else { + // we cannot pop off the next cigar element when we are at the + // end of the read, so fake it + assert(baseIdx >= readSequence.length - 1, baseIdx.toString) + (currentCigarElem, cigarIdx + 1) + } + } else { + (currentCigarElem, cigarIdx + 1) + } + + (currentPos + 1L, + nextCigarElem, + nextCigarIdx, + baseIdx + 1) + } + + // call recursively + shouldIncludeResidue(shouldInclude, + isMismatch, + newPos, + readSequence, + readQualities, + readCigar, + newCigarElem, + optMd, + maskedSites, + newCigarIdx, + newBaseIdx) + } + } + + shouldIncludeResidue(shouldInclude, isMismatch, + read.getStart, + readSequence, + readQualities, + readCigarIterator, + firstCigarElement, + Option(read.getMismatchingPositions) + .map(MdTag(_, read.getStart, readCigar)), + maskedSites, + 0, 0) + (shouldInclude, isMismatch) } - private def dumpVisits(filename: String) = { - def readId(read: DecadentRead): String = - read.name + - (if (read.isNegativeRead) "-" else "+") + - (if (read.record.getReadInFragment == 0) "1" else "") + - (if (read.record.getReadInFragment == 1) "2" else "") - - val readLengths = - input.flatMap(_._1).map(read => (readId(read), read.residues.length)).collectAsMap() - - val visited = dataset. - map { case (key, residue) => (readId(residue.read), Seq(residue.offset)) }. - reduceByKeyLocally((left, right) => left ++ right) - - val outf = new java.io.File(filename) - val writer = new java.io.PrintWriter(outf) - visited.foreach { - case (readName, visited) => - val length = readLengths(readName) - val buf = Array.fill[Char](length)('O') - visited.foreach { idx => buf(idx) = 'X' } - writer.println(readName + "\t" + String.valueOf(buf)) + /** + * Observes the error covariates contained in a read. + * + * @param read The read to observe. + * @param recordGroups A record group dictionary containing the read group + * this read is from. + * @param maskedSites The known SNP loci that this read covers. + * @return Returns an array of CovariateKeys that describe the per-base + * error covariates seen in this read. + * + * @note This method was split out from the other observe method to make it + * simpler to write unit tests for this package. Similarly, that is why + * this method is package private, while the other is private. + */ + private[recalibration] def observe( + read: AlignmentRecord, + recordGroups: RecordGroupDictionary, + maskedSites: Set[Long] = Set.empty): Array[CovariateKey] = ObservingRead.time { + val (toInclude, isMismatch) = ReadResidues.time { + computeResiduesToInclude(read, maskedSites) + } + ReadCovariates.time { + CovariateSpace(read, + toInclude, + isMismatch, + recordGroups) } - writer.close() } -} -private[read] object BaseQualityRecalibration { + /** + * Observes the error covariates contained in a read. + * + * @param read The read to observe. + * @param knownSnps A broadcast variable containing all known SNP loci to mask. + * @param recordGroups A record group dictionary containing the read group + * this read is from. + * @return Returns an array of CovariateKeys that describe the per-base + * error covariates seen in this read. + */ + private def observe( + read: AlignmentRecord, + knownSnps: Broadcast[SnpTable], + recordGroups: RecordGroupDictionary): Array[CovariateKey] = ObservingRead.time { + val maskedSites = knownSnps.value + .maskedSites(ReferenceRegion.unstranded(read)) + observe(read, recordGroups, maskedSites) + } + + /** + * Runs base quality score recalibration. + * + * @param rdd The reads to recalibrate. + * @param knownSnps A broadcast table of known variation to mask during the + * recalibration process. + * @param recordGroups The read groups that generated these reads. + * @param minAcceptableQuality The minimum quality score to attempt to + * recalibrate. + * @param optStorageLevel An optional storage level to apply if caching the + * output of the first stage of BQSR. + */ def apply( rdd: RDD[AlignmentRecord], knownSnps: Broadcast[SnpTable], - observationDumpFile: Option[String] = None, - validationStringency: ValidationStringency = ValidationStringency.STRICT): RDD[AlignmentRecord] = - new BaseQualityRecalibration(cloy(rdd, validationStringency), knownSnps).result + recordGroups: RecordGroupDictionary, + minAcceptableQuality: Int, + optStorageLevel: Option[StorageLevel]): RDD[AlignmentRecord] = { + require(minAcceptableQuality >= 0 && minAcceptableQuality < 93, + "MinAcceptableQuality (%d) must be positive and less than 93.") + new BaseQualityRecalibration(rdd, + knownSnps, + recordGroups, + (minAcceptableQuality + 33).toChar, + optStorageLevel).result + } } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/Covariate.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/Covariate.scala index 0d127cf11c..e86b724cec 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/Covariate.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/Covariate.scala @@ -17,123 +17,37 @@ */ package org.bdgenomics.adam.rdd.read.recalibration -import org.bdgenomics.adam.models.QualityScore -import org.bdgenomics.adam.rich.DecadentRead +import org.bdgenomics.formats.avro.AlignmentRecord /** * A Covariate represents a predictor, also known as a "feature" or * "independent variable". * - * @note Concrete implementations of Covariate should inherit from - * AbstractCovariate, not Covariate. + * @tparam T The type of this feature. */ -private[recalibration] trait Covariate { - type Value +private[recalibration] abstract class Covariate[T] { /** * Given a read, computes the value of this covariate for each residue in the * read. * - * The returned values must be in the same order as the residues. A value - * of None means this covariate does not apply to the corresponding residue. - * - * Example: The DinucCovariate returns a pair of bases for each residue, - * except for bases at the start of a read, for which it returns None. + * @param read The read to observe. + * @return The covariates corresponding to each base in this read. */ - def compute(read: DecadentRead): Seq[Option[Value]] - - def apply(read: DecadentRead): Seq[Option[Value]] = compute(read) - - // Format the provided Value to be compatible with GATK's CSV output - def toCSV(option: Option[Value]): String = option match { - case None => "(none)" - case Some(value) => value.toString - } - - // A short name for this covariate, used in CSV output header - def csvFieldName: String -} - -private[recalibration] abstract class AbstractCovariate[ValueT] extends Covariate with Serializable { - override type Value = ValueT -} - -/** - * Represents a tuple containing a value for each covariate. - * - * The values for mandatory covariates are stored in member fields and optional - * covariate values are in `extras`. - */ -private[adam] class CovariateKey( - val readGroup: String, - val quality: QualityScore, - val extras: Seq[Option[Covariate#Value]]) extends Serializable { - - def containsNone: Boolean = extras.exists(_.isEmpty) - - override def toString: String = { - def parts: Seq[Any] = Seq(readGroup, quality) ++ extras - "[" + parts.mkString(", ") + "]" - } - - override def equals(other: Any) = other match { - case that: CovariateKey => - this.readGroup == that.readGroup && this.quality == that.quality && this.extras == that.extras - case _ => false - } - - override val hashCode: Int = { - 41 * ( - 41 * ( - 41 + readGroup.hashCode - ) + quality.hashCode - ) + extras.hashCode - } -} - -/** - * Represents the abstract space of all possible CovariateKeys for the given set - * of Covariates. - */ -private[adam] class CovariateSpace(val extras: IndexedSeq[Covariate]) extends Serializable { - // Computes the covariate values for all residues in this read - def apply(read: DecadentRead): Seq[CovariateKey] = { - // Ask each 'extra' covariate to compute its values for this read - val extraVals = extras.map(cov => { - val result = cov(read) - // Each covariate must return a value per Residue - assert(result.size == read.residues.size) - result - }) - - // Construct the CovariateKeys - read.residues.zipWithIndex.map { - case (residue, residueIdx) => - val residueExtras = extraVals.map(_(residueIdx)) - new CovariateKey(read.readGroup, residue.quality, residueExtras) - } - } - - // Format the provided key to be compatible with GATK's CSV output - def toCSV(key: CovariateKey): Seq[String] = { - val extraFields: Seq[String] = extras.zip(key.extras).map { - case (cov, value) => cov.toCSV(value.asInstanceOf[Option[cov.Value]]) - } - Seq(key.readGroup, key.quality.phred.toString) ++ extraFields - } + def compute(read: AlignmentRecord): Array[T] - def csvHeader: Seq[String] = Seq("ReadGroup", "ReportedQ") ++ extras.map(_.csvFieldName) - - override def equals(other: Any): Boolean = other match { - case that: CovariateSpace => this.extras == that.extras - case _ => false + /** + * Format the provided covariate value to be compatible with GATK's CSV output. + * + * @param cov A covariate value to render. + * @return Returns the covariate value rendered as a single CSV cell. + */ + def toCSV(cov: T): String = { + cov.toString } - override def hashCode = extras.hashCode - -} - -private[recalibration] object CovariateSpace { - def apply(extras: Covariate*): CovariateSpace = - new CovariateSpace(extras.toIndexedSeq) + /** + * A short name for this covariate, used in CSV output header. + */ + val csvFieldName: String } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/CovariateKey.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/CovariateKey.scala new file mode 100644 index 0000000000..7e6c6f9dd2 --- /dev/null +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/CovariateKey.scala @@ -0,0 +1,65 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.adam.rdd.read.recalibration + +/** + * Represents a tuple containing a value for each covariate. + * + * @param readGroupId The ID of the read group that bases in this error + * covariate came from. + * @param quality The quality score of bases in this covariate. + * @param cycle The sequencer cycle that bases in this covariate came from. + * @param dinucPrev The nucleotide preceding this base. + * @param dinucCurr The nucleotide that was observed. + * @param shouldInclude Whether this base should be included in the + * recalibration table. + * @param isMismatch Whether this base was a mismatch against the reference. + */ +private[adam] case class CovariateKey( + readGroupId: Int, + quality: Char, + cycle: Int, + dinucPrev: Char, + dinucCurr: Char, + shouldInclude: Boolean = true, + isMismatch: Boolean = false) { + + /** + * @return Returns a new CovariateKey with the shouldInclude and isMismatch + * fields reset to their defaults. + */ + def toDefault: CovariateKey = { + copy(shouldInclude = true, isMismatch = false) + } + + /** + * @return Returns true if either the observed or preceeding bases were 'N'. + * + * @note This is required to render CSV formatting that matches the GATK. + */ + def containsNone: Boolean = { + dinucPrev == 'N' && dinucCurr == 'N' + } + + /** + * @return Packages the dinucleotide covariate as a tuple. + */ + def dinuc: (Char, Char) = { + (dinucPrev, dinucCurr) + } +} diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/CovariateSpace.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/CovariateSpace.scala new file mode 100644 index 0000000000..5bc5dfd641 --- /dev/null +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/CovariateSpace.scala @@ -0,0 +1,118 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.adam.rdd.read.recalibration + +import org.bdgenomics.adam.models.RecordGroupDictionary +import org.bdgenomics.formats.avro.AlignmentRecord + +/** + * Represents the space of all possible CovariateKeys for the given set + * of Covariates. + */ +private[adam] object CovariateSpace extends Serializable { + + private val cycle = new CycleCovariate + private val dinuc = new DinucCovariate + + /** + * Generates an array of observed covariates for a given read. + * + * @param read The read to generate covariates for. + * @param recordGroups A record group dictionary containing the record group + * that generated this read. + * @return Returns an array of error covariates. + * + * @note This method is provided solely as a convenience method for testing. + */ + private[recalibration] def apply( + read: AlignmentRecord, + recordGroups: RecordGroupDictionary): Array[CovariateKey] = { + apply(read, + Array.fill(read.getSequence.length) { true }, + Array.fill(read.getSequence.length) { false }, + recordGroups) + } + + /** + * Generates an array of observed covariates for a given read. + * + * @param read The read to generate covariates for. + * @param toInclude An array indicating whether each base in the read should + * be included in the generated observation table. + * @param isMismatch An array indicating whether the base was a mismatch + * against the reference genome. + * @param recordGroups A record group dictionary containing the record group + * that generated this read. + * @return Returns an array of error covariates, one per base in the read. + */ + def apply(read: AlignmentRecord, + toInclude: Array[Boolean], + isMismatch: Array[Boolean], + recordGroups: RecordGroupDictionary): Array[CovariateKey] = { + val cycles = cycle.compute(read) + val dinucs = dinuc.compute(read) + assert(cycles.length == dinucs.length) + assert(cycles.length == toInclude.length) + assert(cycles.length == isMismatch.length) + + // Construct the CovariateKeys + val readLength = cycles.length + val covariateArray = new Array[CovariateKey](readLength) + val qualities = read.getQual + var idx = 0 + while (idx < readLength) { + val residueCycle = cycles(idx) + val (residueDinucPrev, residueDinucCurr) = dinucs(idx) + covariateArray(idx) = new CovariateKey( + recordGroups.getIndex(read.getRecordGroupName), + qualities(idx), + residueCycle, + residueDinucPrev, + residueDinucCurr, + shouldInclude = toInclude(idx), + isMismatch = isMismatch(idx)) + idx += 1 + } + covariateArray + } + + /** + * Formats a given covariate to match the GATK's CSV output. + * + * @param key The error covariate to render. + * @param recordGroups A dictionary mapping recordGroupIds to record groups. + * @return Returns a Seq containing CSV cells for a single row of the CSV file. + */ + def toCSV(key: CovariateKey, + recordGroups: RecordGroupDictionary): Seq[String] = { + Seq(recordGroups.recordGroups(key.readGroupId).recordGroupName, + (key.quality.toInt - 33).toString, + cycle.toCSV(key.cycle), + dinuc.toCSV((key.dinucPrev, key.dinucCurr))) + } + + /** + * @return The CSV header line as a Seq with the header field per column. + */ + def csvHeader: Seq[String] = { + Seq("ReadGroup", + "ReportedQ", + cycle.csvFieldName, + dinuc.csvFieldName) + } +} diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/CycleCovariate.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/CycleCovariate.scala index ff180cfc0f..fc3acfe308 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/CycleCovariate.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/CycleCovariate.scala @@ -17,39 +17,57 @@ */ package org.bdgenomics.adam.rdd.read.recalibration -import org.bdgenomics.adam.rich.DecadentRead +import org.bdgenomics.adam.instrumentation.Timers._ +import org.bdgenomics.formats.avro.AlignmentRecord -// This is based on the CycleCovariate in GATK 1.6. -private[adam] class CycleCovariate extends AbstractCovariate[Int] { - def compute(read: DecadentRead): Seq[Option[Int]] = { +/** + * A covariate representing sequencer base errors that are correlated with + * sequencer cycle. Maps first of pair reads into positive cycles and + * second of pair reads into negative cycles. + */ +private[adam] class CycleCovariate extends Covariate[Int] { + + /** + * @param read The read to compute cycle covariates for. + * @return Returns an integer array where the array elements indicate the + * sequencer cycle that a base was from. + */ + def compute(read: AlignmentRecord): Array[Int] = ComputingCycleCovariate.time { val (initial, increment) = initialization(read) - read.residues.indices.map(pos => Some(initial + increment * pos)) + val seqLength = read.getSequence.length + val cycleArray = new Array[Int](seqLength) + var idx = 0 + var currVal = initial + while (idx < seqLength) { + cycleArray(idx) = currVal + idx += 1 + currVal += increment + } + cycleArray } - // Returns (initialValue, increment) - private def initialization(read: DecadentRead): (Int, Int) = { - if (!read.isNegativeRead) { - if (read.isSecondOfPair) { + /** + * @param read The read to generate cycle values for. Used to get the + * pairing, strand, and read length. + * @return Returns (initialValue, increment) + */ + private def initialization(read: AlignmentRecord): (Int, Int) = { + if (!read.getReadNegativeStrand) { + if (read.getReadInFragment != 0) { (-1, -1) } else { (1, 1) } } else { - if (read.isSecondOfPair) { - (-read.residues.length, 1) + val readLength = read.getSequence.length + if (read.getReadInFragment != 0) { + (-readLength, 1) } else { - (read.residues.length, -1) + (readLength, -1) } } } - override def csvFieldName: String = "Cycle" - - override def equals(other: Any) = other match { - case that: CycleCovariate => true - case _ => false - } - - override def hashCode = 0x83EFAB61 + val csvFieldName: String = "Cycle" } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/DinucCovariate.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/DinucCovariate.scala index cc6c5e8445..5eeb81bf39 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/DinucCovariate.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/DinucCovariate.scala @@ -17,13 +17,24 @@ */ package org.bdgenomics.adam.rdd.read.recalibration -import org.bdgenomics.adam.rich.DecadentRead +import org.bdgenomics.adam.instrumentation.Timers._ +import org.bdgenomics.formats.avro.AlignmentRecord +import scala.annotation.tailrec + +/** + * An error covariate that tracks quality score estimation errors that are + * correlated with two nucleotides appearing in sequence in a read. + */ +private[adam] class DinucCovariate extends Covariate[(Char, Char)] { + + /** + * @param read The read to compute the covariate for. + * @return Returns an array of dinucleotides. + */ + def compute(read: AlignmentRecord): Array[(Char, Char)] = ComputingDinucCovariate.time { + val sequence = read.getSequence + if (read.getReadNegativeStrand) { -// TODO: should inherit from something like AbstractCovariate[(DNABase, DNABase)] -private[adam] class DinucCovariate extends AbstractCovariate[(Char, Char)] { - def compute(read: DecadentRead): Seq[Option[(Char, Char)]] = { - val sequence = read.residues.map(_.base) - if (read.isNegativeRead) { /* Use the reverse-complement of the sequence to get back the original * sequence as it was read by the sequencing machine. The sequencer * always reads from the 5' to the 3' end of each strand, but the output @@ -31,46 +42,73 @@ private[adam] class DinucCovariate extends AbstractCovariate[(Char, Char)] { * use the reverse-complement if this read was originally from the * complementary strand. */ - dinucs(complement(sequence.reverse)).reverse + revDinucs(complement(sequence)) } else { - dinucs(sequence) + fwdDinucs(sequence) } } - private def dinucs(sequence: Seq[Char]): Seq[Option[(Char, Char)]] = { - sequence.zipWithIndex.map { - case (current, index) => - assert(Seq('A', 'C', 'T', 'G', 'N').contains(current)) - def previous = sequence(index - 1) - if (index > 0 && previous != 'N' && current != 'N') { - Some((previous, current)) + private[recalibration] def fwdDinucs(sequence: String): Array[(Char, Char)] = { + val array = new Array[(Char, Char)](sequence.length) + dinucs(sequence, false, array) + } + + private[recalibration] def revDinucs(sequence: String): Array[(Char, Char)] = { + val array = new Array[(Char, Char)](sequence.length) + dinucs(sequence, true, array) + } + + @tailrec private def dinucs(sequence: String, + swap: Boolean, + array: Array[(Char, Char)], + idx: Int = 0): Array[(Char, Char)] = { + if (idx < 0 || idx >= sequence.length) { + array + } else { + val current = sequence(idx) + // previously, this was implemented as a lookup into a set + // unrolling this and not using a set is 40% faster + // + // this is ugly, but as they say, + // "At 50, everyone has the face they deserve" + require(current == 'A' || + current == 'C' || + current == 'G' || + current == 'T' || + current == 'N', + "Saw invalid base %s. Accepted bases are A,C,G,T,N.".format(current)) + val elem = if ((!swap && idx > 0) || (swap && idx < sequence.length - 1)) { + val previous = if (swap) { + sequence(idx + 1) } else { - None + sequence(idx - 1) } + if (previous != 'N' && current != 'N') { + (previous, current) + } else { + ('N', 'N') + } + } else { + ('N', 'N') + } + array(idx) = elem + dinucs(sequence, swap, array, idx = idx + 1) } } - private def complement(sequence: Seq[Char]): Seq[Char] = { + override def toCSV(cov: (Char, Char)): String = { + "%s%s".format(cov._1, cov._2) + } + + private def complement(sequence: String): String = { sequence.map { case 'A' => 'T' case 'T' => 'A' case 'C' => 'G' case 'G' => 'C' - case 'W' | 'S' | 'Y' | 'R' | 'M' | 'K' | 'B' | 'D' | 'V' | 'H' | 'N' => 'N' + case _ => 'N' } } - override def toCSV(option: Option[Value]): String = option match { - case None => "NN" - case Some(value) => "%s%s".format(value._1, value._2) - } - - override def csvFieldName: String = "Dinuc" - - override def equals(other: Any) = other match { - case that: DinucCovariate => true - case _ => false - } - - override def hashCode = 0x9EAC50CB + val csvFieldName: String = "Dinuc" } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/Observation.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/Observation.scala new file mode 100644 index 0000000000..65d3067d85 --- /dev/null +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/Observation.scala @@ -0,0 +1,91 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.adam.rdd.read.recalibration + +import org.bdgenomics.adam.util.PhredUtils + +/** + * An empirical frequency count of mismatches from the reference. + * + * @param total The total number of bases observed. + * @param mismatches The number of mismatchign bases observed. + */ +private[adam] case class Observation(total: Long, mismatches: Long) { + require(mismatches >= 0 && mismatches <= total) + + /** + * @param that An observation to merge with. + * @return Returns a new observation that contains the sum values across + * both input observations. + */ + def +(that: Observation) = { + new Observation(this.total + that.total, this.mismatches + that.mismatches) + } + + /** + * @return Returns the empirically estimated probability of a mismatch, as a + * Phred scaled int. + */ + def empiricalQuality: Int = { + PhredUtils.errorProbabilityToPhred(bayesianErrorProbability()) + } + + /** + * Estimates the probability of a mismatch under a Bayesian model with + * Binomial likelihood and Beta(a, b) prior. When a = b = 1, this is also + * known as "Laplace's rule of succession". + * + * TODO: Beta(1, 1) is the safest choice, but maybe Beta(1/2, 1/2) is more + * accurate? + * + * @param a Beta distribution alpha parameter. + * @param b Beta distribution beta parameter. + * @return Returns the bayesian error probability of a base in this class + * being an error. + */ + def bayesianErrorProbability(a: Double = 1.0, + b: Double = 1.0): Double = { + (a + mismatches) / (a + b + total) + } + + /** + * @return Format as string compatible with GATK's CSV output + */ + def toCSV: Seq[String] = { + Seq(total.toString, mismatches.toString, empiricalQuality.toString) + } + + override def toString: String = { + "%s / %s (%s)".format(mismatches, total, empiricalQuality) + } +} + +private[recalibration] object Observation { + val empty = new Observation(0, 0) + + /** + * @param isMismatch Whether this observed base was a mismatch against the + * reference or not. + * @return Returns a new observation with one base observed, and either one + * or zero observed mismatches. + */ + def apply(isMismatch: Boolean) = { + new Observation(1, if (isMismatch) 1 else 0) + } +} + diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/ObservationTable.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/ObservationTable.scala index d605fef991..4b7095829d 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/ObservationTable.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/ObservationTable.scala @@ -17,139 +17,35 @@ */ package org.bdgenomics.adam.rdd.read.recalibration -import org.bdgenomics.adam.instrumentation.Timers._ -import org.bdgenomics.adam.models.QualityScore -import scala.collection.mutable +import org.bdgenomics.adam.models.RecordGroupDictionary /** - * An empirical frequency count of mismatches from the reference. + * Table containing the empirical frequency of mismatches for each set of + * covariate values. * - * This is used in ObservationTable, which maps from CovariateKey to Observation. - */ -private[adam] class Observation(val total: Long, val mismatches: Long) extends Serializable { - require(mismatches >= 0 && mismatches <= total) - - def this(that: Observation) = this(that.total, that.mismatches) - - def +(that: Observation) = - new Observation(this.total + that.total, this.mismatches + that.mismatches) - - /** - * Empirically estimated probability of a mismatch. - */ - def empiricalErrorProbability: Double = - bayesianErrorProbability - - /** - * Empirically estimated probability of a mismatch, as a QualityScore. - */ - def empiricalQuality: QualityScore = - QualityScore.fromErrorProbability(empiricalErrorProbability) - - /** - * Estimates the probability of a mismatch under a Bayesian model with - * Binomial likelihood and Beta(a, b) prior. When a = b = 1, this is also - * known as "Laplace's rule of succession". - * - * TODO: Beta(1, 1) is the safest choice, but maybe Beta(1/2, 1/2) is more - * accurate? - */ - def bayesianErrorProbability: Double = bayesianErrorProbability(1, 1) - def bayesianErrorProbability(a: Double, b: Double): Double = (a + mismatches) / (a + b + total) - - // Format as string compatible with GATK's CSV output - def toCSV: Seq[String] = Seq(total.toString, mismatches.toString, empiricalQuality.phred.toString) - - override def toString: String = - "%s / %s (%s)".format(mismatches, total, empiricalQuality) - - override def equals(other: Any): Boolean = other match { - case that: Observation => this.total == that.total && this.mismatches == that.mismatches - case _ => false - } - - override def hashCode: Int = { - 41 * ( - 41 + total.hashCode - ) + mismatches.hashCode - } - -} - -private[recalibration] object Observation { - val empty = new Observation(0, 0) - - def apply(isMismatch: Boolean) = new Observation(1, if (isMismatch) 1 else 0) -} - -private[adam] class Aggregate private ( - total: Long, // number of total observations - mismatches: Long, // number of mismatches observed - val expectedMismatches: Double // expected number of mismatches based on reported quality scores - ) extends Observation(total, mismatches) { - - require(expectedMismatches <= total) - - def reportedErrorProbability: Double = expectedMismatches / total.toDouble - - def +(that: Aggregate): Aggregate = - new Aggregate( - this.total + that.total, - this.mismatches + that.mismatches, - this.expectedMismatches + that.expectedMismatches - ) -} - -private[recalibration] object Aggregate { - val empty: Aggregate = new Aggregate(0, 0, 0) - - def apply(key: CovariateKey, value: Observation) = - new Aggregate(value.total, value.mismatches, key.quality.errorProbability * value.total) -} - -/** - * Table containing the empirical frequency of mismatches for each set of covariate values. + * @param entries The error covariate → observed error frequency mapping. */ private[adam] class ObservationTable( - val space: CovariateSpace, - val entries: Map[CovariateKey, Observation]) extends Serializable { + val entries: scala.collection.Map[CovariateKey, Observation]) extends Serializable { override def toString = entries.map { case (k, v) => "%s\t%s".format(k, v) }.mkString("\n") - // Format as CSV compatible with GATK's output - def toCSV: String = { + /** + * @param recordGroups The record groups that generated the reads in this table. + * @return Return this table as CSV. + */ + def toCSV(recordGroups: RecordGroupDictionary): String = { val rows = entries.map { case (key, obs) => - space.toCSV(key) ++ obs.toCSV ++ (if (key.containsNone) Seq("**") else Seq()) + (CovariateSpace.toCSV(key, recordGroups) ++ + obs.toCSV ++ + (if (key.containsNone) Seq("**") else Seq())) } (Seq(csvHeader) ++ rows).map(_.mkString(",")).mkString("\n") } - def csvHeader: Seq[String] = space.csvHeader ++ Seq("TotalCount", "MismatchCount", "EmpiricalQ", "IsSkipped") -} - -private[adam] class ObservationAccumulator(val space: CovariateSpace) extends Serializable { - private val entries = mutable.HashMap[CovariateKey, Observation]() - - def +=(that: (CovariateKey, Observation)): ObservationAccumulator = ObservationAccumulatorSeq.time { - accum(that._1, that._2) - } - - def ++=(that: ObservationAccumulator): ObservationAccumulator = ObservationAccumulatorComb.time { - if (this.space != that.space) - throw new IllegalArgumentException("Can only combine observations with matching CovariateSpaces") - that.entries.foreach { case (k, v) => accum(k, v) } - this + private def csvHeader: Seq[String] = { + (CovariateSpace.csvHeader ++ + Seq("TotalCount", "MismatchCount", "EmpiricalQ", "IsSkipped")) } - - def accum(key: CovariateKey, value: Observation): ObservationAccumulator = { - entries(key) = value + entries.getOrElse(key, Observation.empty) - this - } - - def result: ObservationTable = new ObservationTable(space, entries.toMap) -} - -private[recalibration] object ObservationAccumulator { - def apply(space: CovariateSpace) = new ObservationAccumulator(space) } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/RecalibrationTable.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/RecalibrationTable.scala new file mode 100644 index 0000000000..e6f3bf95f5 --- /dev/null +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/RecalibrationTable.scala @@ -0,0 +1,271 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.adam.rdd.read.recalibration + +import org.bdgenomics.adam.instrumentation.Timers._ +import org.bdgenomics.adam.util.PhredUtils +import scala.math.{ exp, log } + +/** + * A table containing the final error covariate to recalibrated phred mappings. + * + * This is generated by collecting the error covariates in an observation table, + * which is then turned into a TempRecalibrationTable, which is then inverted. + * This is necessitated by the hierarchical process through which the final + * quality scores are calculated. + * + * @param table A table mapping covariate keys to Phred scores, with the Phred + * scores encoded in ASCII using Illumina (33) encodings. + */ +private[adam] case class RecalibrationTable private[recalibration] ( + private val table: Map[CovariateKey, Char]) { + + /** + * @param covariates The covariates corresponding to all of the bases in a read. + * @return Returns an array of Phred-scaled quality scores representing the + * unfiltered quality scores in a read after recalibration. These scores are + * unfiltered in the sense that the read may decide to omit the recalibrated + * scores if the base was a low quality base. + */ + def apply(covariates: Array[CovariateKey]): Array[Char] = { + val numCovariates = covariates.length + val newQuals = new Array[Char](numCovariates) + QueryingRecalibrationTable.time { + var idx = 0 + while (idx < numCovariates) { + val key = covariates(idx) + newQuals(idx) = table.getOrElse(key, key.quality) + idx += 1 + } + } + newQuals + } +} + +/** + * A temporary table to be used for generating the BQSR recalibration table. + * + * In the prior implementation of BQSR, this class was the recalibration table. + * However, the downside of that implementation was the covariate → quality + * mapping required a recursive calculation inside an inner loop many times. + * loop many times. Given that we expect the number of bases to be orders and + * orders of magintude larger than the number of covariate bins, we've + * refactored the calculation so that we generate this temporary table, which we + * then query using all the known covariate keys. The result of these queries + * go into a new RecalibrationTable class, which does a constant lookup in a + * map. Additionally, a virtue of this approach is that we don't need to throw + * out any of the code used to compute the recalibration scores. + * + * @param table A table mapping record groups to error aggregates. + * @param maxQualScore The maximum quality score to recalibrate to. + */ +private case class TempRecalibrationTable( + val tempTable: Map[Int, (Aggregate, QualityTable)], + val maxQualScore: Int = 50) { + + private val maxLogP = log(PhredUtils.phredToErrorProbability(maxQualScore)) + + /** + * Looks up the recalibrated Phred of an error covariate by walking the table + * hierarchy for that covariate. + * + * @param key The error covariate to look up. + * @return Returns an ASCII/Illumina (33) encoded quality score for this error + * covariate. + */ + def lookup(key: CovariateKey): Char = { + val globalEntry = tempTable(key.readGroupId) + val globalDelta = computeGlobalDelta(globalEntry._1) + val residueLogP = log(PhredUtils.phredToErrorProbability(key.quality.toInt - 33)) + val qualityEntry = getQualityEntry(key.quality, globalEntry) + val qualityDelta = computeQualityDelta(qualityEntry, residueLogP + globalDelta) + val extrasDelta = computeExtrasDelta(qualityEntry, + key, + residueLogP + globalDelta + qualityDelta) + val correctedLogP = residueLogP + globalDelta + qualityDelta + extrasDelta + qualityFromLogP(correctedLogP) + } + + /** + * @param logP A log probability. + * @return This probability as a Phred scaled, ASCII/Illumina 33 encoded char. + */ + private def qualityFromLogP(logP: Double): Char = { + val boundedLogP = math.min(0.0, math.max(maxLogP, logP)) + (PhredUtils.errorProbabilityToPhred(exp(boundedLogP)) + 33).toChar + } + + /** + * @param globalEntry The aggregated global empirical error estimate for a + * single read group. + * @return Returns the log scaled delta between the empirical error rate and + * the error rate predicted from the quality scores of all the observed + * bases. + */ + private def computeGlobalDelta( + globalEntry: Aggregate): Double = { + log(globalEntry.bayesianErrorProbability()) - log(globalEntry.reportedErrorProbability) + } + + /** + * @param quality The quality score to look up in the global table for a read + * group. + * @param globalEntry The entry for this read group in the global table. + * @return Returns the error aggregate and extra covariates table + * corresponding to a single base with a given quality score in a given + * read group. + */ + private def getQualityEntry( + quality: Char, + globalEntry: (Aggregate, QualityTable)): (Aggregate, ExtrasTable) = { + globalEntry._2.table(quality) + } + + /** + * @param qualityEntry The error aggregate/extra covariate pair corresponding + * to a single base with a given quality score in a single read group. + * @param offset The log error probability of this quality bucket, given the + * corrected empirical error rate of all bases in this read group. + * @param The log scaled delta between the predicted and measured error rate + * for this bucket. + */ + private def computeQualityDelta(qualityEntry: (Aggregate, ExtrasTable), + offset: Double): Double = { + log(qualityEntry._1.bayesianErrorProbability()) - offset + } + + /** + * @param qualityEntry The error aggregate/extra covariate pair corresponding + * to a single base with a given quality score in a single read group. + * @param key The covariate key describing the error covariates that this + * read base maps into. + * @param offset The log error probability of this quality bucket, given the + * corrected empirical error rate of all bases in this read group. + * @param The log scaled delta between the predicted and measured error rate + * for this bucket. + */ + private def computeExtrasDelta(qualityEntry: (Aggregate, ExtrasTable), + key: CovariateKey, + offset: Double): Double = { + def tableContribution(aggregate: Aggregate): Double = { + log(aggregate.bayesianErrorProbability()) - offset + } + + // Returns sum(delta for each extra covariate) + val extrasTables = qualityEntry._2 + (tableContribution(extrasTables.cycleTable(key.cycle)) + + tableContribution(extrasTables.dinucTable(key.dinuc))) + } +} + +private[recalibration] object RecalibrationTable { + + /** + * Generates a recalibration table by mapping an observation table + * into the hierarchical structure used by BQSR and then inverting said table. + * + * @param observed The observed covariates along with their empirical error + * rate estimates. + * @return Returns a fully inverted recalibration table. + */ + def apply(observed: ObservationTable): RecalibrationTable = { + val globalTable: Map[Int, (Aggregate, QualityTable)] = observed.entries + .groupBy(_._1.readGroupId) + .map(globalEntry => { + (globalEntry._1, + (aggregateObservations(globalEntry._2), + computeQualityTable(globalEntry))) + }) + + // make a temp table to query + val tt = new TempRecalibrationTable(globalTable) + + // take all the covariates from the observation table, and query + // them against the recalibration table + val recalibrationQualityMappings = InvertingRecalibrationTable.time { + observed.entries + .keys + .map(key => (key, tt.lookup(key))) + .toMap + } + + RecalibrationTable(recalibrationQualityMappings) + } + + private def computeQualityTable( + globalEntry: (Int, scala.collection.Map[CovariateKey, Observation])): QualityTable = { + QualityTable(globalEntry._2.groupBy(_._1.quality).map(qualityEntry => { + val extras = computeExtrasTable(qualityEntry._2) + (qualityEntry._1, (aggregateObservations(qualityEntry._2), extras)) + })) + } + + private def computeExtrasTable( + table: scala.collection.Map[CovariateKey, Observation]): ExtrasTable = { + def makeTable[T](fn: CovariateKey => T): scala.collection.Map[T, Aggregate] = { + table.groupBy(kv => fn(kv._1)).map(extraEntry => { + (extraEntry._1, aggregateObservations(extraEntry._2)) + }).map(identity) + } + + ExtrasTable(makeTable[Int]((ck: CovariateKey) => ck.cycle), + makeTable[(Char, Char)]((ck: CovariateKey) => ck.dinuc)) + } + + /** + * Aggregates observations over a table. + * + * Assumes that it is called on a non-empty table, as it is called on the + * output of a groupBy. + * + * @param observations The map of observations to group by. + * @return Returns the aggregated base/mismatch observations over a group of + * observed bases. + */ + private def aggregateObservations( + observations: scala.collection.Map[CovariateKey, Observation]): Aggregate = { + assert(observations.nonEmpty) + observations.map(p => { + val (oldKey, obs) = p + Aggregate(oldKey, obs) + }).reduce(_ + _) + } +} + +/** + * A table containing all of the quality scores observed in a single read gorup. + * + * @param table A table mapping ASCII encoded quality scores to an error + * aggregate and to a table of extra covariates. + */ +private case class QualityTable( + table: scala.collection.Map[Char, (Aggregate, ExtrasTable)]) { +} + +/** + * A table containing all of the extra covariates observed in a single quality + * bucket for a single read group. + * + * @param cycleTable The error aggregates corresponding to a sequencer cycle. + * @param dinucTable The error aggregates corresponding to an individual pair + * of nucleotides. + */ +private case class ExtrasTable( + cycleTable: scala.collection.Map[Int, Aggregate], + dinucTable: scala.collection.Map[(Char, Char), Aggregate]) { +} diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/Recalibrator.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/Recalibrator.scala index a608a0c3a6..312cf8ecef 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/Recalibrator.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/recalibration/Recalibrator.scala @@ -17,160 +17,79 @@ */ package org.bdgenomics.adam.rdd.read.recalibration -import org.bdgenomics.adam.models.QualityScore -import org.bdgenomics.adam.rich.DecadentRead.Residue -import org.bdgenomics.adam.rich.RichAlignmentRecord._ -import org.bdgenomics.adam.rich.DecadentRead -import org.bdgenomics.formats.avro.AlignmentRecord +import java.lang.StringBuilder import org.bdgenomics.adam.instrumentation.Timers._ -import scala.math.{ exp, log } - -private[recalibration] class Recalibrator(val table: RecalibrationTable, val minAcceptableQuality: QualityScore) - extends Serializable { - - def apply(r: (Option[DecadentRead], Option[AlignmentRecord])): AlignmentRecord = RecalibrateRead.time { - r._1.fold(r._2.get)(read => { - val record: AlignmentRecord = read.record - if (record.getQual != null) { - AlignmentRecord.newBuilder(record) - .setQual(QualityScore.toString(computeQual(read))) - .setOrigQual(record.getQual) - .build() - } else { - record - } - }) - } +import org.bdgenomics.adam.util.PhredUtils +import org.bdgenomics.formats.avro.AlignmentRecord - def computeQual(read: DecadentRead): Seq[QualityScore] = ComputeQualityScore.time { - val origQuals = read.residues.map(_.quality) - val newQuals = table(read) - origQuals.zip(newQuals).map { - case (origQ, newQ) => - // Keep original quality score if below recalibration threshold - if (origQ >= minAcceptableQuality) newQ else origQ +/** + * The engine that recalibrates a read given a table of recalibrated qualities. + * + * @param table A table mapping error covariates back to finalized qualities. + * @param minAcceptableAsciiPhred The minimum quality score to attempt to + * recalibrate, encoded as an ASCII character on the Illumina (33) scale. + */ +private[recalibration] case class Recalibrator( + table: RecalibrationTable, + minAcceptableAsciiPhred: Char) { + + /** + * Rewrites the qualities for a read, given the recalibration table. + * + * @param record The read to recalibrate. + * @param covariates An array containing the covariate that each base in this + * read mapped to. + * @return Returns a recalibrated read if the read has defined qualities and + * if the covariates for this read were observed. + */ + def apply(record: AlignmentRecord, + covariates: Array[CovariateKey]): AlignmentRecord = RecalibrateRead.time { + if (record.getQual != null && + covariates.nonEmpty) { + AlignmentRecord.newBuilder(record) + .setQual(computeQual(record, covariates)) + .setOrigQual(record.getQual) + .build() + } else { + record } } -} - -private[recalibration] object Recalibrator { - def apply(observed: ObservationTable, minAcceptableQuality: QualityScore): Recalibrator = { - new Recalibrator(RecalibrationTable(observed), minAcceptableQuality) - } -} - -private[recalibration] class RecalibrationTable( - // covariates for this recalibration - val covariates: CovariateSpace, - // marginal and quality scores by read group, - val globalTable: Map[String, (Aggregate, QualityTable)]) - extends (DecadentRead => Seq[QualityScore]) with Serializable { - - // TODO: parameterize? - val maxQualScore = QualityScore(50) - - val maxLogP = log(maxQualScore.errorProbability) - def apply(read: DecadentRead): Seq[QualityScore] = { - val globalEntry: Option[(Aggregate, QualityTable)] = globalTable.get(read.readGroup) - val globalDelta = computeGlobalDelta(globalEntry) - val extraValues: IndexedSeq[Seq[Option[Covariate#Value]]] = getExtraValues(read) - read.residues.zipWithIndex.map(lookup(_, globalEntry, globalDelta, extraValues)) - } - - def lookup(residueWithIndex: (Residue, Int), globalEntry: Option[(Aggregate, QualityTable)], globalDelta: Double, - extraValues: IndexedSeq[Seq[Option[Covariate#Value]]]): QualityScore = { - val (residue, index) = residueWithIndex - val residueLogP = log(residue.quality.errorProbability) - val qualityEntry: Option[(Aggregate, ExtrasTables)] = getQualityEntry(residue.quality, globalEntry) - val qualityDelta = computeQualityDelta(qualityEntry, residueLogP + globalDelta) - val extrasDelta = computeExtrasDelta(qualityEntry, index, extraValues, residueLogP + globalDelta + qualityDelta) - val correctedLogP = residueLogP + globalDelta + qualityDelta + extrasDelta - qualityFromLogP(correctedLogP) - } - - def qualityFromLogP(logP: Double): QualityScore = { - val boundedLogP = math.min(0.0, math.max(maxLogP, logP)) - QualityScore.fromErrorProbability(exp(boundedLogP)) - } - - def computeGlobalDelta(globalEntry: Option[(Aggregate, QualityTable)]): Double = { - globalEntry.map(bucket => log(bucket._1.empiricalErrorProbability) - log(bucket._1.reportedErrorProbability)). - getOrElse(0.0) - } - - def getQualityEntry( - quality: QualityScore, - globalEntry: Option[(Aggregate, QualityTable)]): Option[(Aggregate, ExtrasTables)] = { - globalEntry.flatMap(_._2.table.get(quality)) - } - - def computeQualityDelta(qualityEntry: Option[(Aggregate, ExtrasTables)], offset: Double): Double = { - qualityEntry.map(bucket => log(bucket._1.empiricalErrorProbability) - offset). - getOrElse(0.0) - } - - def computeExtrasDelta(maybeQualityEntry: Option[(Aggregate, ExtrasTables)], residueIndex: Int, - extraValues: IndexedSeq[Seq[Option[Covariate#Value]]], offset: Double): Double = { - // Returns sum(delta for each extra covariate) - maybeQualityEntry.map(qualityEntry => { - val extrasTables = qualityEntry._2.extrasTables - assert(extrasTables.size == extraValues.size) - var extrasDelta = 0.0 - var index = 0 - extraValues.foreach(residueValues => { - val table = extrasTables(index) - extrasDelta += table.get(residueValues(residueIndex)). - map(aggregate => log(aggregate.empiricalErrorProbability) - offset). - getOrElse(0.0) - index += 1 - }) - extrasDelta - }).getOrElse(0.0) - } + /** + * @param record The read to recalibrate. + * @param covariates An array containing the covariate that each base in this + * read mapped to. + * @return Returns the new quality scores for this read by querying the error + * covariate table. + */ + private def computeQual(read: AlignmentRecord, + covariates: Array[CovariateKey]): String = ComputeQualityScore.time { + val origQuals = read.getQual + val quals = new StringBuilder(origQuals) + val newQuals = table(covariates.map(_.toDefault)) + var idx = 0 + while (idx < origQuals.length) { + // Keep original quality score if below recalibration threshold + if (origQuals(idx) >= minAcceptableAsciiPhred) { + quals.setCharAt(idx, newQuals(idx)) + } + idx += 1 + } - def getExtraValues(read: DecadentRead): IndexedSeq[Seq[Option[Covariate#Value]]] = GetExtraValues.time { - covariates.extras.map(extra => extra(read)) + quals.toString } - } -object RecalibrationTable { - - def apply(observed: ObservationTable): RecalibrationTable = { - // The ".map(identity)" calls are needed to force the result to be serializable. - val globalTable: Map[String, (Aggregate, QualityTable)] = observed.entries.groupBy(_._1.readGroup).map(globalEntry => { - (globalEntry._1, (aggregateObservations(globalEntry._2), new QualityTable(computeQualityTable(globalEntry, observed.space)))) - }).map(identity) - new RecalibrationTable(observed.space, globalTable) - } - - def computeQualityTable( - globalEntry: (String, Map[CovariateKey, Observation]), - space: CovariateSpace): Map[QualityScore, (Aggregate, ExtrasTables)] = { - globalEntry._2.groupBy(_._1.quality).map(qualityEntry => { - (qualityEntry._1, (aggregateObservations(qualityEntry._2), new ExtrasTables(computeExtrasTables(qualityEntry._2, space)))) - }).map(identity) - } - - def computeExtrasTables( - table: Map[CovariateKey, Observation], - space: CovariateSpace): IndexedSeq[Map[Option[Covariate#Value], Aggregate]] = { - Range(0, space.extras.length).map(index => { - table.groupBy(_._1.extras(index)).map(extraEntry => { - (extraEntry._1, aggregateObservations(extraEntry._2)) - }).map(identity) - }) - } +private[recalibration] object Recalibrator { - def aggregateObservations[K](observations: Map[CovariateKey, Observation]): Aggregate = { - observations.map { case (oldKey, obs) => Aggregate(oldKey, obs) }.fold(Aggregate.empty)(_ + _) + /** + * @param observed The observation table to invert to build the recalibrator. + * @param minAcceptableAsciiPhred The minimum quality score to attempt to + * recalibrate, encoded as an ASCII character on the Illumina (33) scale. + * @return Returns a recalibrator gained by inverting the observation table. + */ + def apply(observed: ObservationTable, + minAcceptableAsciiPhred: Char): Recalibrator = CreatingRecalibrationTable.time { + Recalibrator(RecalibrationTable(observed), minAcceptableAsciiPhred) } - } - -private[recalibration] class QualityTable( - val table: Map[QualityScore, (Aggregate, ExtrasTables)]) extends Serializable - -private[recalibration] class ExtrasTables( - val extrasTables: IndexedSeq[Map[Option[Covariate#Value], Aggregate]]) extends Serializable diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rich/DecadentRead.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rich/DecadentRead.scala deleted file mode 100644 index 2318429440..0000000000 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rich/DecadentRead.scala +++ /dev/null @@ -1,190 +0,0 @@ -/** - * Licensed to Big Data Genomics (BDG) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The BDG licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.bdgenomics.adam.rich - -import htsjdk.samtools.ValidationStringency -import org.bdgenomics.utils.misc.Logging -import org.apache.spark.rdd.RDD -import org.bdgenomics.adam.models.{ - MdTag, - ReferencePosition, - QualityScore -} -import org.bdgenomics.adam.rich.RichAlignmentRecord._ -import org.bdgenomics.formats.avro.AlignmentRecord - -@deprecated("Use RichAlignmentRecord wherever possible in new development.", since = "0.18.0") -private[adam] object DecadentRead extends Logging with Serializable { - type Residue = DecadentRead#Residue - - // Constructors - def apply(record: AlignmentRecord): DecadentRead = DecadentRead(RichAlignmentRecord(record)) - - def apply(rich: RichAlignmentRecord): DecadentRead = { - try { - new DecadentRead(rich) - } catch { - case exc: Exception => - val msg = "Error \"%s\" while constructing DecadentRead from Read(%s)".format(exc.getMessage, rich.record) - throw new IllegalArgumentException(msg, exc) - } - } - - /** - * cloy (verb) - * 1. To fill to loathing; to surfeit. - * 2. To clog, to glut, or satisfy, as the appetite; to satiate. - * 3. To fill up or choke up; to stop up. - */ - def cloy( - rdd: RDD[AlignmentRecord], - strictness: ValidationStringency = ValidationStringency.STRICT): RDD[(Option[DecadentRead], Option[AlignmentRecord])] = { - rdd.map(r => { - try { - val dr = DecadentRead.apply(r) - (Some(dr), None) - } catch { - case e: Throwable => { - if (strictness == ValidationStringency.STRICT) { - throw e - } else { - log.warn("Converting read %s to decadent read failed with %s. Skipping...".format( - r, e - )) - (None, Some(r)) - } - } - } - }) - } - - // The inevitable counterpart of the above. - implicit def decay(rdd: RDD[DecadentRead]): RDD[AlignmentRecord] = rdd.map(_.record) -} - -@deprecated("Use RichAlignmentRecord wherever possible in new development.", since = "0.18.0") -private[adam] class DecadentRead(val record: RichAlignmentRecord) extends Logging { - // Should have quality scores for all residues - require(record.getQual == null || - record.getSequence.length == record.qualityScores.length, "sequence and qualityScores must be same length") - - // MapQ should be valid - require(record.getMapq == null || (record.getMapq >= 0 && record.getMapq <= 93), "MapQ must be in [0, 255]") - - // Alignment must be valid - require(!record.getReadMapped || record.getStart >= 0, "Invalid alignment start index") - - // Sanity check on referencePositions - require(record.referencePositions.length == record.getSequence.length, s"Reference positions are not the same length as the sequence, ${record.referencePositions.length} != ${record.getSequence.length}") - - /** - * In biochemistry and molecular biology, a "residue" refers to a specific - * monomer within a polymeric chain, such as DNA. - */ - class Residue private[DecadentRead] (val offset: Int) { - def read = DecadentRead.this - - /** - * Nucleotide at this offset. - * - * TODO: Return values of meaningful type, e.g. `DNABase`. - */ - def base: Char = read.baseSequence(offset) - - def quality = QualityScore(record.qualityScores(offset)) - - def isRegularBase: Boolean = base match { - case 'A' | 'C' | 'T' | 'G' | 'U' => true - // 2-base alternatives in http://www.bioinformatics.org/sms/iupac.html - case 'R' | 'Y' | 'S' | 'W' | 'K' | 'M' => true - // 3-base alternatives in http://www.bioinformatics.org/sms/iupac.html - case 'B' | 'D' | 'H' | 'V' => true - case 'N' => false - case unk => throw new IllegalArgumentException("Encountered unexpected base '%s'".format(unk)) - } - - def isMismatch(includeInsertions: Boolean = true): Boolean = - assumingAligned(record.isMismatchAtReadOffset(offset).getOrElse(includeInsertions)) - - def isSNP: Boolean = isMismatch(false) - - def isInsertion: Boolean = - assumingAligned(record.isMismatchAtReadOffset(offset).isEmpty) - - def referencePositionOption: Option[ReferencePosition] = - assumingAligned( - record.readOffsetToReferencePosition(offset) - ) - - def referenceSequenceContext: Option[ReferenceSequenceContext] = - assumingAligned(record.readOffsetToReferenceSequenceContext(offset)) - - def referencePosition: ReferencePosition = - referencePositionOption.getOrElse( - throw new IllegalArgumentException("Residue has no reference location (may be an insertion)") - ) - } - - lazy val readGroup: String = record.getRecordGroupName - - private lazy val baseSequence: String = record.getSequence - - lazy val residues: IndexedSeq[Residue] = Range(0, baseSequence.length).map(new Residue(_)) - - def name: String = record.getReadName - - def isAligned: Boolean = record.getReadMapped - - def alignmentQuality: Option[QualityScore] = assumingAligned { - if (record.getMapq == null || record.getMapq == 255) { - None - } else { - Some(QualityScore(record.getMapq)) - } - } - - def ensureAligned: Boolean = - isAligned || (throw new IllegalArgumentException("Read has not been aligned to a reference")) - - def isPrimaryAlignment: Boolean = isAligned && record.getPrimaryAlignment - - def isDuplicate: Boolean = record.getDuplicateRead - - def isPaired: Boolean = record.getReadPaired - - def isFirstOfPair: Boolean = isPaired && record.getReadInFragment == 0 - - def isSecondOfPair: Boolean = isPaired && record.getReadInFragment == 1 - - def isNegativeRead: Boolean = record.getReadNegativeStrand - - // Is this the most representative record for this read? - def isCanonicalRecord: Boolean = isPrimaryAlignment && !isDuplicate - - def passedQualityChecks: Boolean = !record.getFailedVendorQualityChecks - - def mismatchesOption: Option[MdTag] = record.mdTag - - def mismatches: MdTag = - mismatchesOption.getOrElse(throw new IllegalArgumentException("Read has no MD tag")) - - private def assumingAligned[T](func: => T): T = { - ensureAligned - func - } -} diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rich/ReferenceSequenceContext.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rich/ReferenceSequenceContext.scala deleted file mode 100644 index 5363a6aa03..0000000000 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rich/ReferenceSequenceContext.scala +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to Big Data Genomics (BDG) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The BDG licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.bdgenomics.adam.rich - -import htsjdk.samtools.CigarElement -import org.bdgenomics.adam.models.ReferencePosition - -/** - * Represents information on the reference relative to a particular residue - * - * @param pos The position this residue is aligned at. - * @param referenceBase The base that is in the reference at this position - * @param cigarElement The CIGAR element covering this residue in the - * read-to-reference alignment. - * @param cigarElementOffset The position of this base within the CIGAR - * element. - */ -@deprecated("don't use ReferenceSequenceContext in new development", - since = "0.21.0") -private[adam] case class ReferenceSequenceContext( - pos: Option[ReferencePosition], - referenceBase: Option[Char], - cigarElement: CigarElement, - cigarElementOffset: Int) { -} diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rich/RichAlignmentRecord.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rich/RichAlignmentRecord.scala index 172b2251b9..cf8859a9ec 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rich/RichAlignmentRecord.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rich/RichAlignmentRecord.scala @@ -153,106 +153,4 @@ case class RichAlignmentRecord(record: AlignmentRecord) { mdTag.map(!_.isMatch(pos)) } } - - /** - * @param offset The index into the read sequence. - * @return If the read is not aligned, returns a None. Else, returns a wrapped - * boolean stating whether this read is a sequence mismatch against the - * reference at the given offset within the read? - */ - def isMismatchAtReadOffset(offset: Int): Option[Boolean] = { - // careful about offsets that are within an insertion! - if (referencePositions.isEmpty) { - None - } else { - readOffsetToReferencePosition(offset) - .flatMap(pos => isMismatchAtReferencePosition(pos)) - } - } - - private def getReferenceContext( - readOffset: Int, - referencePosition: Long, - cigarElem: CigarElement, - elemOffset: Int): ReferenceSequenceContext = { - val position = if (record.getReadMapped) { - Some(ReferencePosition(record.getContigName, referencePosition)) - } else { - None - } - - def getReferenceBase(cigarElement: CigarElement, - refPos: Long, - readPos: Int): Option[Char] = { - mdTag.flatMap(tag => { - cigarElement.getOperator match { - case CigarOperator.M => - if (!tag.isMatch(refPos)) { - tag.mismatchedBase(refPos) - } else { - Some(record.getSequence()(readPos)) - } - case CigarOperator.D => - // if a delete, get from the delete pool - tag.deletedBase(refPos) - case _ => None - } - }) - } - - val referenceBase = getReferenceBase(cigarElem, referencePosition, readOffset) - ReferenceSequenceContext(position, referenceBase, cigarElem, elemOffset) - } - - private[rich] lazy val referencePositions: Seq[Option[ReferencePosition]] = { - referenceContexts.map(ref => ref.flatMap(_.pos)) - } - - private[rich] lazy val referenceContexts: Seq[Option[ReferenceSequenceContext]] = { - if (record.getReadMapped) { - val resultTuple = samtoolsCigar.getCigarElements.foldLeft((unclippedStart, List[Option[ReferenceSequenceContext]]()))((runningPos, elem) => { - // runningPos is a tuple, the first element holds the starting position of the next CigarOperator - // and the second element is the list of positions up to this point - val op = elem.getOperator - val currentRefPos = runningPos._1 - val resultAccum = runningPos._2 - val advanceReference = op.consumesReferenceBases || op == CigarOperator.S - val newRefPos = currentRefPos + (if (advanceReference) elem.getLength else 0) - val resultParts: Seq[Option[ReferenceSequenceContext]] = - if (op.consumesReadBases) { - val range = NumericRange(currentRefPos, currentRefPos + elem.getLength, 1L) - range.zipWithIndex.map(kv => - if (advanceReference) - Some(getReferenceContext(resultAccum.size + kv._2, kv._1, elem, kv._2)) - else None) - } else { - Seq.empty - } - (newRefPos, resultAccum ++ resultParts) - }) - val results = resultTuple._2 - results.toIndexedSeq - } else { - qualityScores.map(t => None) - } - } - - private[rich] def readOffsetToReferencePosition(offset: Int): Option[ReferencePosition] = { - if (record.getReadMapped) { - referencePositions(offset) - } else { - None - } - } - - @deprecated("don't use ReferenceSequenceContext in new development", - since = "0.21.0") - private[rich] def readOffsetToReferenceSequenceContext( - offset: Int): Option[ReferenceSequenceContext] = { - if (record.getReadMapped) { - referenceContexts(offset) - } else { - None - } - } } diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoRegistrator.scala b/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoRegistrator.scala index 19ef367a30..9369237538 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoRegistrator.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/serialization/ADAMKryoRegistrator.scala @@ -164,7 +164,6 @@ class ADAMKryoRegistrator extends KryoRegistrator { kryo.register(classOf[org.bdgenomics.adam.models.MdTag]) kryo.register(classOf[org.bdgenomics.adam.models.MultiContigNonoverlappingRegions]) kryo.register(classOf[org.bdgenomics.adam.models.NonoverlappingRegions]) - kryo.register(classOf[org.bdgenomics.adam.models.QualityScore]) kryo.register(classOf[org.bdgenomics.adam.models.RecordGroup]) kryo.register(classOf[org.bdgenomics.adam.models.RecordGroupDictionary]) kryo.register(classOf[org.bdgenomics.adam.models.ReferencePosition], @@ -173,7 +172,8 @@ class ADAMKryoRegistrator extends KryoRegistrator { kryo.register(classOf[org.bdgenomics.adam.models.SAMFileHeaderWritable]) kryo.register(classOf[org.bdgenomics.adam.models.SequenceDictionary]) kryo.register(classOf[org.bdgenomics.adam.models.SequenceRecord]) - kryo.register(classOf[org.bdgenomics.adam.models.SnpTable]) + kryo.register(classOf[org.bdgenomics.adam.models.SnpTable], + new org.bdgenomics.adam.models.SnpTableSerializer) kryo.register(classOf[org.bdgenomics.adam.models.VariantContext], new org.bdgenomics.adam.models.VariantContextSerializer) @@ -216,16 +216,13 @@ class ADAMKryoRegistrator extends KryoRegistrator { new org.bdgenomics.adam.rdd.read.realignment.ZippedTargetSetSerializer) // org.bdgenomics.adam.rdd.read.recalibration. - kryo.register(classOf[org.bdgenomics.adam.rdd.read.recalibration.CovariateSpace]) + kryo.register(classOf[org.bdgenomics.adam.rdd.read.recalibration.CovariateKey]) kryo.register(classOf[org.bdgenomics.adam.rdd.read.recalibration.CycleCovariate]) kryo.register(classOf[org.bdgenomics.adam.rdd.read.recalibration.DinucCovariate]) - kryo.register(classOf[org.bdgenomics.adam.rdd.read.recalibration.CovariateKey]) - kryo.register(classOf[org.bdgenomics.adam.rdd.read.recalibration.ObservationAccumulator]) + kryo.register(classOf[org.bdgenomics.adam.rdd.read.recalibration.RecalibrationTable]) kryo.register(classOf[org.bdgenomics.adam.rdd.read.recalibration.Observation]) // org.bdgenomics.adam.rich - kryo.register(classOf[org.bdgenomics.adam.rich.DecadentRead]) - kryo.register(classOf[org.bdgenomics.adam.rich.ReferenceSequenceContext]) kryo.register(classOf[org.bdgenomics.adam.rich.RichAlignmentRecord]) kryo.register(classOf[org.bdgenomics.adam.rich.RichVariant]) @@ -308,6 +305,7 @@ class ADAMKryoRegistrator extends KryoRegistrator { kryo.register(classOf[scala.Array[org.bdgenomics.adam.models.ReferenceRegion]]) kryo.register(classOf[scala.Array[org.bdgenomics.adam.models.SequenceRecord]]) kryo.register(classOf[scala.Array[org.bdgenomics.adam.models.VariantContext]]) + kryo.register(classOf[scala.Array[org.bdgenomics.adam.rdd.read.recalibration.CovariateKey]]) kryo.register(classOf[scala.Array[org.bdgenomics.adam.rich.RichAlignmentRecord]]) kryo.register(classOf[scala.Array[scala.collection.Seq[_]]]) kryo.register(classOf[scala.Array[Int]]) diff --git a/adam-core/src/test/resources/bqsr1.vcf b/adam-core/src/test/resources/bqsr1.vcf index 572edfcf19..defc76d7f8 100644 --- a/adam-core/src/test/resources/bqsr1.vcf +++ b/adam-core/src/test/resources/bqsr1.vcf @@ -1,6 +1,7 @@ ##fileformat=VCFv4.1 ##INFO= ##INFO= +##contig= #CHROM POS ID REF ALT QUAL FILTER INFO 22 16050612 rs2186463 C G . . WGT=3 22 16050612 rs146752890 C G . . WGT=1 diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/models/SnpTableSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/models/SnpTableSuite.scala new file mode 100644 index 0000000000..5ef4694723 --- /dev/null +++ b/adam-core/src/test/scala/org/bdgenomics/adam/models/SnpTableSuite.scala @@ -0,0 +1,119 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.adam.models + +import org.bdgenomics.adam.rdd.ADAMContext._ +import org.bdgenomics.adam.rdd.variant.VariantRDD +import org.bdgenomics.adam.util.ADAMFunSuite +import org.bdgenomics.formats.avro.Variant + +class SnpTableSuite extends ADAMFunSuite { + + test("create an empty snp table") { + val table = SnpTable() + assert(table.indices.isEmpty) + assert(table.sites.isEmpty) + } + + sparkTest("create a snp table from variants on multiple contigs") { + val inputPath = testFile("random.vcf") + val table = SnpTable(sc.loadVariants(inputPath)) + assert(table.indices.size === 3) + assert(table.indices("1") === (0, 2)) + assert(table.indices("2") === (3, 3)) + assert(table.indices("13") === (4, 5)) + assert(table.sites.length === 6) + assert(table.sites(0) === 14396L) + assert(table.sites(1) === 14521L) + assert(table.sites(2) === 63734L) + assert(table.sites(3) === 19189L) + assert(table.sites(4) === 752720L) + assert(table.sites(5) === 752790L) + } + + sparkTest("create a snp table from a larger set of variants") { + val inputPath = testFile("bqsr1.vcf") + val variants = sc.loadVariants(inputPath) + val numVariants = variants.rdd.count + val table = SnpTable(variants) + assert(table.indices.size === 1) + assert(table.indices("22") === (0, numVariants - 1)) + assert(table.sites.length === numVariants) + val variantsByPos = variants.rdd + .map(v => v.getStart.toInt) + .collect + .sorted + table.sites + .zip(variantsByPos) + .foreach(p => { + assert(p._1 === p._2) + }) + } + + def lookUpVariants(rdd: VariantRDD): SnpTable = { + val table = SnpTable(rdd) + val variants = rdd.rdd.collect + + variants.foreach(v => { + val sites = table.maskedSites(ReferenceRegion(v)) + assert(sites.size === 1) + }) + + table + } + + sparkTest("perform lookups on multi-contig snp table") { + val inputPath = testFile("random.vcf") + val variants = sc.loadVariants(inputPath) + val table = lookUpVariants(variants) + + val s1 = table.maskedSites(ReferenceRegion("1", 14390L, 14530L)) + assert(s1.size === 2) + assert(s1(14396L)) + assert(s1(14521L)) + + val s2 = table.maskedSites(ReferenceRegion("13", 752700L, 752800L)) + assert(s2.size === 2) + assert(s2(752720L)) + assert(s2(752790L)) + } + + sparkTest("perform lookups on larger snp table") { + val inputPath = testFile("bqsr1.vcf") + val variants = sc.loadVariants(inputPath) + val table = lookUpVariants(variants) + + val s1 = table.maskedSites(ReferenceRegion("22", 16050670L, 16050690L)) + assert(s1.size === 2) + assert(s1(16050677L)) + assert(s1(16050682L)) + + val s2 = table.maskedSites(ReferenceRegion("22", 16050960L, 16050999L)) + assert(s2.size === 3) + assert(s2(16050966L)) + assert(s2(16050983L)) + assert(s2(16050993L)) + + val s3 = table.maskedSites(ReferenceRegion("22", 16052230L, 16052280L)) + assert(s3.size === 4) + assert(s3(16052238L)) + assert(s3(16052239L)) + assert(s3(16052249L)) + assert(s3(16052270L)) + } +} diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/recalibration/BaseQualityRecalibrationSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/recalibration/BaseQualityRecalibrationSuite.scala index 28e055411e..2aec319f23 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/recalibration/BaseQualityRecalibrationSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/recalibration/BaseQualityRecalibrationSuite.scala @@ -19,32 +19,57 @@ package org.bdgenomics.adam.rdd.read.recalibration import java.io.File import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel import org.bdgenomics.adam.models.SnpTable import org.bdgenomics.adam.rdd.ADAMContext._ -import org.bdgenomics.adam.rich.DecadentRead._ -import org.bdgenomics.adam.rich.RichVariant import org.bdgenomics.adam.util.ADAMFunSuite import org.bdgenomics.formats.avro.AlignmentRecord +import scala.io.Source class BaseQualityRecalibrationSuite extends ADAMFunSuite { - ignore("BQSR Test Input #1 w/ VCF Sites") { + def testBqsr(optSl: Option[StorageLevel]) { val readsFilepath = testFile("bqsr1.sam") val snpsFilepath = testFile("bqsr1.vcf") val obsFilepath = testFile("bqsr1-ref.observed") - val reads: RDD[AlignmentRecord] = sc.loadAlignments(readsFilepath).rdd - val variants: RDD[RichVariant] = sc.loadVariants(snpsFilepath).rdd.map(new RichVariant(_)) + val rdd = sc.loadAlignments(readsFilepath) + val reads: RDD[AlignmentRecord] = rdd.rdd + val variants = sc.loadVariants(snpsFilepath) val snps = sc.broadcast(SnpTable(variants)) - val bqsr = new BaseQualityRecalibration(cloy(reads), snps) + val bqsr = new BaseQualityRecalibration(reads, + snps, + rdd.recordGroups, + optStorageLevel = optSl) // Sanity checks assert(bqsr.result.count == reads.count) // Compare the ObservationTables - val referenceObs: Seq[String] = scala.io.Source.fromFile(new File(obsFilepath)).getLines().filter(_.length > 0).toSeq.sortWith((kv1, kv2) => kv1.compare(kv2) < 0) - val testObs: Seq[String] = bqsr.observed.toCSV.split('\n').filter(_.length > 0).toSeq.sortWith((kv1, kv2) => kv1.compare(kv2) < 0) + val referenceObs: Seq[String] = Source.fromFile(new File(obsFilepath)) + .getLines() + .filter(_.length > 0) + .toSeq + .sortWith((kv1, kv2) => kv1.compare(kv2) < 0) + val testObs: Seq[String] = bqsr.observed + .toCSV(rdd.recordGroups) + .split('\n') + .filter(_.length > 0) + .toSeq + .sortWith((kv1, kv2) => kv1.compare(kv2) < 0) referenceObs.zip(testObs).foreach(p => assert(p._1 === p._2)) } + + sparkTest("BQSR Test Input #1 w/ VCF Sites without caching") { + testBqsr(None) + } + + sparkTest("BQSR Test Input #1 w/ VCF Sites with caching") { + testBqsr(Some(StorageLevel.MEMORY_ONLY)) + } + + sparkTest("BQSR Test Input #1 w/ VCF Sites with serialized caching") { + testBqsr(Some(StorageLevel.MEMORY_ONLY)) + } } diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/recalibration/CycleCovariateSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/recalibration/CycleCovariateSuite.scala new file mode 100644 index 0000000000..3b31dec133 --- /dev/null +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/recalibration/CycleCovariateSuite.scala @@ -0,0 +1,166 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.adam.rdd.read.recalibration + +import org.bdgenomics.formats.avro.AlignmentRecord +import org.scalatest.FunSuite + +class CycleCovariateSuite extends FunSuite { + + val cc = new CycleCovariate + + test("compute covariates for an unpaired read on the negative strand") { + val read = AlignmentRecord.newBuilder() + .setSequence("AGCCTNGT") + .setQual("********") + .setReadNegativeStrand(true) + .setReadMapped(true) + .setStart(10L) + .setMapq(50) + .setCigar("8M") + .build + val covariates = cc.compute(read) + assert(covariates.size === 8) + assert(covariates(0) === 8) + assert(covariates(1) === 7) + assert(covariates(2) === 6) + assert(covariates(3) === 5) + assert(covariates(4) === 4) + assert(covariates(5) === 3) + assert(covariates(6) === 2) + assert(covariates(7) === 1) + } + + test("compute covariates for a first-of-pair read on the negative strand") { + val read = AlignmentRecord.newBuilder() + .setSequence("AGCCTNGT") + .setQual("********") + .setReadNegativeStrand(true) + .setReadMapped(true) + .setStart(10L) + .setMapq(50) + .setCigar("8M") + .setReadPaired(true) + .setReadInFragment(0) + .build + val covariates = cc.compute(read) + assert(covariates.size === 8) + assert(covariates(0) === 8) + assert(covariates(1) === 7) + assert(covariates(2) === 6) + assert(covariates(3) === 5) + assert(covariates(4) === 4) + assert(covariates(5) === 3) + assert(covariates(6) === 2) + assert(covariates(7) === 1) + } + + test("compute covariates for a second-of-pair read on the negative strand") { + val read = AlignmentRecord.newBuilder() + .setSequence("AGCCTNGT") + .setQual("********") + .setReadNegativeStrand(true) + .setReadMapped(true) + .setStart(10L) + .setMapq(50) + .setCigar("8M") + .setReadPaired(true) + .setReadInFragment(1) + .build + val covariates = cc.compute(read) + assert(covariates.size === 8) + assert(covariates(0) === -8) + assert(covariates(1) === -7) + assert(covariates(2) === -6) + assert(covariates(3) === -5) + assert(covariates(4) === -4) + assert(covariates(5) === -3) + assert(covariates(6) === -2) + assert(covariates(7) === -1) + } + + test("compute covariates for an unpaired read on the positive strand") { + val read = AlignmentRecord.newBuilder() + .setSequence("ACNAGGCT") + .setQual("********") + .setReadNegativeStrand(false) + .setReadMapped(true) + .setStart(10L) + .setMapq(50) + .setCigar("8M") + .build + val covariates = cc.compute(read) + assert(covariates.size === 8) + assert(covariates(0) === 1) + assert(covariates(1) === 2) + assert(covariates(2) === 3) + assert(covariates(3) === 4) + assert(covariates(4) === 5) + assert(covariates(5) === 6) + assert(covariates(6) === 7) + assert(covariates(7) === 8) + } + + test("compute covariates for a first-of-pair read on the positive strand") { + val read = AlignmentRecord.newBuilder() + .setSequence("ACNAGGCT") + .setQual("********") + .setReadNegativeStrand(false) + .setReadMapped(true) + .setStart(10L) + .setMapq(50) + .setCigar("8M") + .setReadPaired(true) + .setReadInFragment(0) + .build + val covariates = cc.compute(read) + assert(covariates.size === 8) + assert(covariates(0) === 1) + assert(covariates(1) === 2) + assert(covariates(2) === 3) + assert(covariates(3) === 4) + assert(covariates(4) === 5) + assert(covariates(5) === 6) + assert(covariates(6) === 7) + assert(covariates(7) === 8) + } + + test("compute covariates for a second-of-pair read on the positive strand") { + val read = AlignmentRecord.newBuilder() + .setSequence("ACNAGGCT") + .setQual("********") + .setReadNegativeStrand(false) + .setReadMapped(true) + .setStart(10L) + .setMapq(50) + .setCigar("8M") + .setReadPaired(true) + .setReadInFragment(1) + .build + val covariates = cc.compute(read) + assert(covariates.size === 8) + assert(covariates(0) === -1) + assert(covariates(1) === -2) + assert(covariates(2) === -3) + assert(covariates(3) === -4) + assert(covariates(4) === -5) + assert(covariates(5) === -6) + assert(covariates(6) === -7) + assert(covariates(7) === -8) + } +} diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/recalibration/DinucCovariateSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/recalibration/DinucCovariateSuite.scala new file mode 100644 index 0000000000..2651d5bc95 --- /dev/null +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/recalibration/DinucCovariateSuite.scala @@ -0,0 +1,96 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.adam.rdd.read.recalibration + +import org.bdgenomics.formats.avro.AlignmentRecord +import org.scalatest.FunSuite + +class DinucCovariateSuite extends FunSuite { + + val dc = new DinucCovariate + + test("computing dinucleotide pairs for a single base sequence should return (N,N)") { + val dinucs = dc.fwdDinucs("A") + assert(dinucs.size === 1) + assert(dinucs(0) === ('N', 'N')) + } + + test("compute dinucleotide pairs for a string of all valid bases") { + val dinucs = dc.fwdDinucs("AGCGT") + assert(dinucs.size === 5) + assert(dinucs(0) === ('N', 'N')) + assert(dinucs(1) === ('A', 'G')) + assert(dinucs(2) === ('G', 'C')) + assert(dinucs(3) === ('C', 'G')) + assert(dinucs(4) === ('G', 'T')) + } + + test("compute dinucleotide pairs for a string with an N") { + val dinucs = dc.fwdDinucs("AGNGT") + assert(dinucs.size === 5) + assert(dinucs(0) === ('N', 'N')) + assert(dinucs(1) === ('A', 'G')) + assert(dinucs(2) === ('N', 'N')) + assert(dinucs(3) === ('N', 'N')) + assert(dinucs(4) === ('G', 'T')) + } + + test("compute covariates for a read on the negative strand") { + val read = AlignmentRecord.newBuilder() + .setSequence("AGCCTNGT") + .setQual("********") + .setReadNegativeStrand(true) + .setReadMapped(true) + .setStart(10L) + .setMapq(50) + .setCigar("8M") + .build + val covariates = dc.compute(read) + assert(covariates.size === 8) + assert(covariates(0) === ('C', 'T')) + assert(covariates(1) === ('G', 'C')) + assert(covariates(2) === ('G', 'G')) + assert(covariates(3) === ('A', 'G')) + assert(covariates(4) === ('N', 'N')) + assert(covariates(5) === ('N', 'N')) + assert(covariates(6) === ('A', 'C')) + assert(covariates(7) === ('N', 'N')) + } + + test("compute covariates for a read on the positive strand") { + val read = AlignmentRecord.newBuilder() + .setSequence("ACNAGGCT") + .setQual("********") + .setReadNegativeStrand(false) + .setReadMapped(true) + .setStart(10L) + .setMapq(50) + .setCigar("8M") + .build + val covariates = dc.compute(read) + assert(covariates.size === 8) + assert(covariates(0) === ('N', 'N')) + assert(covariates(1) === ('A', 'C')) + assert(covariates(2) === ('N', 'N')) + assert(covariates(3) === ('N', 'N')) + assert(covariates(4) === ('A', 'G')) + assert(covariates(5) === ('G', 'G')) + assert(covariates(6) === ('G', 'C')) + assert(covariates(7) === ('C', 'T')) + } +} diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/recalibration/RecalibrationTableSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/recalibration/RecalibrationTableSuite.scala new file mode 100644 index 0000000000..79785d779f --- /dev/null +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/recalibration/RecalibrationTableSuite.scala @@ -0,0 +1,45 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.adam.rdd.read.recalibration + +import org.bdgenomics.formats.avro.AlignmentRecord +import org.scalatest.FunSuite + +class RecalibrationTableSuite extends FunSuite { + + val observedCovariates = Map((CovariateKey(0, + (50 + 33).toChar, + 2, + 'A', + 'C') -> new Aggregate(1000000, 1, 10.0)), + (CovariateKey(0, + (40 + 33).toChar, + 1, + 'N', + 'N') -> new Aggregate(100000, 1, 10.0))) + val table = RecalibrationTable(new ObservationTable( + observedCovariates)) + + test("look up quality scores in table") { + val scores = table(observedCovariates.map(_._1).toArray) + + assert(scores.size === 2) + assert(scores(0) === (50 + 33).toChar) + assert(scores(1) === (47 + 33).toChar) + } +} diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/recalibration/RecalibratorSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/recalibration/RecalibratorSuite.scala new file mode 100644 index 0000000000..d8b657eb22 --- /dev/null +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/recalibration/RecalibratorSuite.scala @@ -0,0 +1,99 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.bdgenomics.adam.rdd.read.recalibration + +import org.bdgenomics.adam.models.{ + RecordGroup, + RecordGroupDictionary +} +import org.bdgenomics.formats.avro.AlignmentRecord +import org.scalatest.FunSuite + +class RecalibratorSuite extends FunSuite { + + val table = RecalibrationTable(new ObservationTable( + Map((CovariateKey(0, + (50 + 33).toChar, + 2, + 'A', + 'C') -> new Aggregate(1000000, 1, 10.0)), + (CovariateKey(0, + (40 + 33).toChar, + 1, + 'N', + 'N') -> new Aggregate(100000, 1, 10.0))))) + val rgd = RecordGroupDictionary(Seq(RecordGroup("s", "rg0"))) + + val read = AlignmentRecord.newBuilder + .setContigName("chr1") + .setRecordGroupName("rg0") + .setStart(10L) + .setEnd(12L) + .setSequence("AC") + .setReadNegativeStrand(false) + .setQual(Seq(40, 50).map(i => (i + 33).toChar).mkString) + .setDuplicateRead(false) + .setReadMapped(true) + .setReadPaired(false) + .setReadInFragment(0) + .setPrimaryAlignment(true) + .setCigar("2M") + .setMismatchingPositions("2") + .setMapq(40) + .build + + val hiRecalibrator = Recalibrator(table, (48 + 33).toChar) + val lowRecalibrator = Recalibrator(table, (40 + 33).toChar) + + test("don't replace quality if quality was null") { + val qualFreeRead = AlignmentRecord.newBuilder(read) + .setQual(null) + .build + val recalibratedRead = lowRecalibrator(qualFreeRead, + Array.empty) + assert(recalibratedRead.getQual === null) + assert(recalibratedRead.getOrigQual === null) + } + + test("if no covariates, return alignment") { + val emptyRead = AlignmentRecord.newBuilder + .setReadName("emptyRead") + .build + val notRecalibratedRead = lowRecalibrator(emptyRead, Array.empty) + assert(emptyRead === notRecalibratedRead) + } + + test("skip recalibration if base is below quality threshold") { + val recalibratedRead = hiRecalibrator(read, + BaseQualityRecalibration.observe(read, rgd)) + val expectedRead = AlignmentRecord.newBuilder(read) + .setOrigQual(read.getQual) + .build + assert(recalibratedRead === expectedRead) + } + + test("recalibrate changed bases above quality threshold") { + val recalibratedRead = lowRecalibrator(read, + BaseQualityRecalibration.observe(read, rgd)) + val expectedRead = AlignmentRecord.newBuilder(read) + .setQual(Seq(47, 50).map(i => (i + 33).toChar).mkString) + .setOrigQual(read.getQual) + .build + assert(recalibratedRead === expectedRead) + } +} diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rich/DecadentReadSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rich/DecadentReadSuite.scala deleted file mode 100644 index 5ae3155d12..0000000000 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rich/DecadentReadSuite.scala +++ /dev/null @@ -1,155 +0,0 @@ -/** - * Licensed to Big Data Genomics (BDG) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The BDG licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.bdgenomics.adam.rich - -import htsjdk.samtools.ValidationStringency -import org.apache.spark.SparkException -import org.apache.spark.rdd.RDD -import org.bdgenomics.adam.models.ReferencePosition -import org.bdgenomics.adam.util.ADAMFunSuite -import org.bdgenomics.formats.avro.{ AlignmentRecord, Contig } - -class DecadentReadSuite extends ADAMFunSuite { - - test("reference position of decadent read") { - val contig = Contig.newBuilder - .setContigName("chr1") - .build - - val hardClippedRead = RichAlignmentRecord(AlignmentRecord - .newBuilder() - .setReadMapped(true) - .setStart(1000) - .setContigName(contig.getContigName) - .setMismatchingPositions("10") - .setSequence("AACCTTGGC") - .setQual("FFFFFFFFF") - .setCigar("9M1H").build()) - - val record = DecadentRead(hardClippedRead) - assert(record.residues.size === 9) - - val residueSeq = record.residues - assert(residueSeq.head.referencePosition === ReferencePosition("chr1", 1000)) - } - - test("reference position of decadent read with insertions") { - val contig = Contig.newBuilder - .setContigName("chr1") - .build - - val hardClippedRead = RichAlignmentRecord(AlignmentRecord - .newBuilder() - .setReadMapped(true) - .setStart(1000) - .setContigName(contig.getContigName) - .setMismatchingPositions("1TT10") - .setSequence("ATTGGGGGGGGGG") - .setQual("FFFFFFFFFFFFF") - .setCigar("1M2I10M").build()) - - val record = DecadentRead(hardClippedRead) - assert(record.residues.size === 13) - - val residueSeq = record.residues - assert(residueSeq.head.referencePosition === ReferencePosition("chr1", 1000)) - } - - test("build a decadent read from a read with null qual") { - val contig = Contig.newBuilder - .setContigName("chr1") - .build - - val hardClippedRead = RichAlignmentRecord(AlignmentRecord - .newBuilder() - .setReadMapped(true) - .setStart(1000) - .setContigName(contig.getContigName) - .setMismatchingPositions("10") - .setSequence("AACCTTGGC") - .setCigar("9M1H").build()) - - val record = DecadentRead(hardClippedRead) - assert(record.residues.size === 9) - } - - test("converting bad read should fail") { - val readBad = AlignmentRecord.newBuilder() - .setContigName("1") - .setStart(248262648L) - .setEnd(248262721L) - .setMapq(23) - .setSequence("GATCTTTTCAACAGTTACAGCAGAAAGTTTTCATGGAGAAATGGAATCACACTTCAAATGATTTCATTTTGTTGGG") - .setQual("IBBHEFFEKFCKFHFACKFIJFJDCFHFEEDJBCHIFIDDBCGJDBBJAJBJFCIDCACHBDEBHADDDADDAED;") - .setCigar("4S1M1D71M") - .setReadMapped(true) - .setMismatchingPositions("3^C71") - .build() - - intercept[IllegalArgumentException] { - DecadentRead(readBad) - } - } - - def badGoodReadRDD: RDD[AlignmentRecord] = { - val readBad = AlignmentRecord.newBuilder() - .setContigName("1") - .setStart(248262648L) - .setEnd(248262721L) - .setMapq(23) - .setSequence("GATCTTTTCAACAGTTACAGCAGAAAGTTTTCATGGAGAAATGGAATCACACTTCAAATGATTTCATTTTGTTGGG") - .setQual("IBBHEFFEKFCKFHFACKFIJFJDCFHFEEDJBCHIFIDDBCGJDBBJAJBJFCIDCACHBDEBHADDDADDAED;") - .setCigar("4S1M1D71M") - .setReadMapped(true) - .setMismatchingPositions("3^C71") - .build() - val readGood = AlignmentRecord.newBuilder() - .setContigName("1") - .setStart(248262648L) - .setEnd(248262721L) - .setMapq(23) - .setSequence("GATCTTTTCAACAGTTACAGCAGAAAGTTTTCATGGAGAAATGGAATCACACTTCAAATGATTTCATTTTGTTGGG") - .setQual("IBBHEFFEKFCKFHFACKFIJFJDCFHFEEDJBCHIFIDDBCGJDBBJAJBJFCIDCACHBDEBHADDDADDAED;") - .setCigar("4S1M1D71M") - .setReadMapped(true) - .setMismatchingPositions("1^C71") - .build() - sc.parallelize(Seq(readBad, readGood)) - } - - sparkTest("convert an RDD that has an bad read in it with loose validation") { - val rdd = badGoodReadRDD - val decadent = DecadentRead.cloy(rdd, ValidationStringency.LENIENT) - .repartition(2) - .collect() - assert(decadent.size === 2) - assert(decadent.count(_._1.isDefined) === 1) - assert(decadent.count(_._1.isEmpty) === 1) - assert(decadent.count(_._2.isDefined) === 1) - assert(decadent.count(_._2.isEmpty) === 1) - } - - sparkTest("converting an RDD that has an bad read in it with strict validation will throw an error") { - val rdd = badGoodReadRDD - intercept[SparkException] { - // need count to force computation - DecadentRead.cloy(rdd).count - } - } -} diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rich/RichAlignmentRecordSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rich/RichAlignmentRecordSuite.scala index d9e5ab4174..58bb3a6ff9 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rich/RichAlignmentRecordSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rich/RichAlignmentRecordSuite.scala @@ -43,13 +43,6 @@ class RichAlignmentRecordSuite extends FunSuite { assert(recordWithHardClipping.unclippedEnd == 20L) } - test("Cigar Clipping Sequence") { - val contig = Contig.newBuilder.setContigName("chr1").build - val softClippedRead = AlignmentRecord.newBuilder().setReadMapped(true).setStart(100).setCigar("10S90M").setContigName(contig.getContigName).build() - assert(softClippedRead.referencePositions(0).map(_.pos) == Some(90L)) - - } - test("tags contains optional fields") { val contig = Contig.newBuilder.setContigName("chr1").build val rec = AlignmentRecord.newBuilder().setAttributes("XX:i:3\tYY:Z:foo").setContigName(contig.getContigName).build() @@ -58,51 +51,6 @@ class RichAlignmentRecordSuite extends FunSuite { assert(rec.tags(1) === Attribute("YY", TagType.String, "foo")) } - test("Reference Positions") { - - val contig = Contig.newBuilder.setContigName("chr1").build - - val hardClippedRead = AlignmentRecord.newBuilder().setReadMapped(true).setStart(1000).setCigar("90M10H").setContigName(contig.getContigName).build() - assert(hardClippedRead.referencePositions.length == 90) - assert(hardClippedRead.referencePositions(0).map(_.pos) == Some(1000L)) - - val softClippedRead = AlignmentRecord.newBuilder().setReadMapped(true).setStart(1000).setCigar("10S90M").setContigName(contig.getContigName).build() - assert(softClippedRead.referencePositions.length == 100) - assert(softClippedRead.referencePositions(0).map(_.pos) == Some(990L)) - assert(softClippedRead.referencePositions(10).map(_.pos) == Some(1000L)) - - val doubleMatchNonsenseRead = AlignmentRecord.newBuilder().setReadMapped(true).setStart(1000).setCigar("10M10M").setContigName(contig.getContigName).build() - Range(0, 20).foreach(i => assert(doubleMatchNonsenseRead.referencePositions(i).map(_.pos) == Some(1000 + i))) - - val deletionRead = AlignmentRecord.newBuilder().setReadMapped(true).setStart(1000).setCigar("5M5D10M").setContigName(contig.getContigName).build() - assert(deletionRead.referencePositions.length == 15) - assert(deletionRead.referencePositions(0).map(_.pos) == Some(1000L)) - assert(deletionRead.referencePositions(5).map(_.pos) == Some(1010L)) - - val insertionRead = AlignmentRecord.newBuilder().setReadMapped(true).setStart(1000).setCigar("10M2I10M").setContigName(contig.getContigName).build() - assert(insertionRead.referencePositions.length == 22) - assert(insertionRead.referencePositions(0).map(_.pos) == Some(1000L)) - assert(insertionRead.referencePositions(10).map(_.pos) == None) - assert(insertionRead.referencePositions(12).map(_.pos) == Some(1010L)) - - val indelRead = AlignmentRecord.newBuilder().setReadMapped(true).setStart(1000).setCigar("10M3D10M2I").setContigName(contig.getContigName).build() - assert(indelRead.referencePositions.length == 22) - assert(indelRead.referencePositions(0).map(_.pos) == Some(1000L)) - assert(indelRead.referencePositions(10).map(_.pos) == Some(1013L)) - assert(indelRead.referencePositions(20).map(_.pos) == None) - - val hg00096read = AlignmentRecord.newBuilder().setReadMapped(true).setStart(1000).setCigar("1S28M1D32M1I15M1D23M").setContigName(contig.getContigName).build() - assert(hg00096read.referencePositions.length == 100) - assert(hg00096read.referencePositions(0).map(_.pos) == Some(999L)) - assert(hg00096read.referencePositions(1).map(_.pos) == Some(1000L)) - assert(hg00096read.referencePositions(29).map(_.pos) == Some(1029L)) - assert(hg00096read.referencePositions(61).map(_.pos) == None) - assert(hg00096read.referencePositions(62).map(_.pos) == Some(1061L)) - assert(hg00096read.referencePositions(78).map(_.pos) == Some(1078L)) - assert(hg00096read.referencePositions(99).map(_.pos) == Some(1099L)) - - } - test("read overlap unmapped read") { val unmappedRead = AlignmentRecord.newBuilder().setReadMapped(false).setStart(0L).setCigar("10M").setEnd(10L).build() diff --git a/docs/source/50_cli.md b/docs/source/50_cli.md index 67457838e3..2a1dd81741 100644 --- a/docs/source/50_cli.md +++ b/docs/source/50_cli.md @@ -150,8 +150,6 @@ options fall into several general categories: sites are treated as correct observations. If BQSR is run, this option should be passed, along with a path to a known variation database (e.g., [dbSNP](https://www.ncbi.nlm.nih.gov/projects/SNP/)). {#known-snps} - * `-dump_observations`: If provided, a path to dump the recalibration table - in CSV format. * Indel realignment options: Indel realignment is run with the `-realign_indels` flag. Additionally, the Indel realignment engine takes the following options: * `-known_indels`: Path to a VCF file/Parquet variant file containing known