diff --git a/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/RecalibrateBaseQualities.scala b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/RecalibrateBaseQualities.scala index 1eafb884e8..4c9c89217c 100644 --- a/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/RecalibrateBaseQualities.scala +++ b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/RecalibrateBaseQualities.scala @@ -35,8 +35,8 @@ private[rdd] object RecalibrateBaseQualities extends Serializable with Logging { val rdd = poorRdd.map(new RichADAMRecord(_)) // initialize the covariates println("Instantiating covariates...") - val qualByRG = new QualByRG(rdd) - val otherCovars = List(new DiscreteCycle(rdd), new BaseContext(rdd)) + val qualByRG = new QualByRG() + val otherCovars = List(new DiscreteCycle(), new BaseContext()) println("Creating object...") val recalibrator = new RecalibrateBaseQualities(qualByRG, otherCovars) println("Computing table...") @@ -52,7 +52,6 @@ private[rdd] class RecalibrateBaseQualities(val qualCovar: QualByRG, val covars: def computeTable(rdd: RDD[RichADAMRecord], dbsnp: SparkBroadcast[SnpTable]): RecalTable = { def addCovariates(table: RecalTable, covar: ReadCovariates): RecalTable = { - //log.info("Aggregating covarates for read "+covar.read.record.getReadName.toString) table + covar } diff --git a/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/ReadCovariates.scala b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/ReadCovariates.scala index 8e015d3b6d..cf65228cc6 100644 --- a/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/ReadCovariates.scala +++ b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/ReadCovariates.scala @@ -28,27 +28,35 @@ object ReadCovariates { } class ReadCovariates(val read: RichADAMRecord, qualByRG: QualByRG, covars: List[StandardCovariate], - val dbSNP: SnpTable) extends Iterator[BaseCovariates] with Serializable { + val dbSNP: SnpTable, val minQuality:Int = 2) extends Iterator[BaseCovariates] with Serializable { - val startOffset = read.qualityScores.takeWhile(_ <= 2).size - val endOffset = read.qualityScores.size - read.qualityScores.reverseIterator.takeWhile(_ <= 2).size - val qualCovar: Array[Int] = qualByRG(read, startOffset, endOffset) - val requestedCovars: List[Array[Int]] = covars.map(covar => covar(read, startOffset, endOffset)) + def isLowQualityBase(qual : Byte) : Boolean = { + qual.toInt <= minQuality + } + + val qualityStartOffset = read.qualityScores.takeWhile(isLowQualityBase).size + val qualityEndOffset = read.qualityScores.size - read.qualityScores.reverseIterator.takeWhile(isLowQualityBase).size + + + val qualCovar: Array[Int] = qualByRG(read, qualityStartOffset, qualityEndOffset) + val requestedCovars: List[Array[Int]] = covars.map(covar => covar(read, qualityStartOffset, qualityEndOffset)) + + var readOffset = qualityStartOffset - var iter_position = startOffset - override def hasNext: Boolean = iter_position < endOffset + override def hasNext: Boolean = readOffset < qualityEndOffset override def next(): BaseCovariates = { - val offset = (iter_position - startOffset).toInt - val mismatch = read.isMismatchAtReadOffset(offset) + val baseCovarOffset = readOffset - qualityStartOffset + val mismatch = read.isMismatchAtReadOffset(readOffset) // FIXME: why does empty mismatch mean it should be masked? - val isMasked = dbSNP.isMaskedAtReadOffset(read, offset) || mismatch.isEmpty + val isMasked = dbSNP.isMaskedAtReadOffset(read, readOffset) || mismatch.isEmpty // getOrElse because reads without an MD tag can appear during *application* of recal table val isMismatch = mismatch.getOrElse(false) - iter_position += 1 - new BaseCovariates(qualCovar(offset), requestedCovars.map(v => v(offset)).toArray, - read.qualityScores(offset), isMismatch, isMasked) + val qualityScore = read.qualityScores(readOffset) + readOffset += 1 + new BaseCovariates(qualCovar(baseCovarOffset), requestedCovars.map(v => v(baseCovarOffset)).toArray, + qualityScore, isMismatch, isMasked) } } diff --git a/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/StandardCovariate.scala b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/StandardCovariate.scala index f990d37e66..8a788acac7 100644 --- a/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/StandardCovariate.scala +++ b/adam-core/src/main/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/StandardCovariate.scala @@ -22,7 +22,7 @@ import edu.berkeley.cs.amplab.adam.rich.RichADAMRecord._ import org.apache.spark.rdd.RDD // this class is required, not just standard. Baked in to recalibration. -class QualByRG(rdd: RDD[RichADAMRecord]) extends Serializable { +class QualByRG() extends Serializable { def apply(read: RichADAMRecord, start: Int, end: Int): Array[Int] = { val rg_offset = RecalUtil.Constants.MAX_REASONABLE_QSCORE * read.getRecordGroupId @@ -33,9 +33,10 @@ class QualByRG(rdd: RDD[RichADAMRecord]) extends Serializable { trait StandardCovariate extends Serializable { def apply(read: RichADAMRecord, start: Int, end: Int): Array[Int] // get the covariate for all the bases of the read + } -case class DiscreteCycle(args: RDD[RichADAMRecord]) extends StandardCovariate { +case class DiscreteCycle() extends StandardCovariate { // this is a special-case of the GATK's Cycle covariate for discrete technologies. // Not to be used for 454 or ion torrent (which are flow cycles) def apply(read: RichADAMRecord, startOffset: Int, endOffset: Int): Array[Int] = { @@ -46,10 +47,7 @@ case class DiscreteCycle(args: RDD[RichADAMRecord]) extends StandardCovariate { } } -case class BaseContext(records: RDD[RichADAMRecord], size: Int) extends StandardCovariate { - def this(_s: Int) = this(null, _s) - - def this(_r: RDD[RichADAMRecord]) = this(_r, 2) +case class BaseContext(size: Int = 2) extends StandardCovariate { val BASES = Array('A'.toByte, 'C'.toByte, 'G'.toByte, 'T'.toByte) val COMPL = Array('T'.toByte, 'G'.toByte, 'C'.toByte, 'A'.toByte) diff --git a/adam-core/src/test/scala/edu/berkeley/cs/amplab/adam/rdd/RecalibrateBaseQualitiesSuite.scala b/adam-core/src/test/scala/edu/berkeley/cs/amplab/adam/rdd/RecalibrateBaseQualitiesSuite.scala index d89819aed2..6b028f2793 100644 --- a/adam-core/src/test/scala/edu/berkeley/cs/amplab/adam/rdd/RecalibrateBaseQualitiesSuite.scala +++ b/adam-core/src/test/scala/edu/berkeley/cs/amplab/adam/rdd/RecalibrateBaseQualitiesSuite.scala @@ -391,9 +391,8 @@ class RecalibrateBaseQualitiesSuite extends SparkFunSuite { val rec2 = ADAMRecord.newBuilder().setRecordGroupId(rg2).setQual(qualStr(qual2)).build() val rec3 = ADAMRecord.newBuilder().setRecordGroupId(rg3).setQual(qualStr(qual3)).build() val records = List(rec1, rec2, rec3) - val recRDD = sc.makeRDD(records, 1).map(new RichADAMRecord(_)) - System.out.println(recRDD.first()) - val qualByRG = new QualByRG(recRDD) + + val qualByRG = new QualByRG() val intervals = List((0, 29), (6, 29), (0, 21), (0, 20)) for (interval <- intervals) { assert(qualByRG(rec1, interval._1, interval._2).deep == qual1.slice(interval._1, interval._2).toArray.deep) diff --git a/adam-core/src/test/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/ReadCovariatesSuite.scala b/adam-core/src/test/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/ReadCovariatesSuite.scala new file mode 100644 index 0000000000..6c23a5dcdd --- /dev/null +++ b/adam-core/src/test/scala/edu/berkeley/cs/amplab/adam/rdd/recalibration/ReadCovariatesSuite.scala @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2014. Mount Sinai School of Medicine + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package edu.berkeley.cs.amplab.adam.rdd.recalibration + +import edu.berkeley.cs.amplab.adam.util.SparkFunSuite +import edu.berkeley.cs.amplab.adam.avro.ADAMRecord +import edu.berkeley.cs.amplab.adam.models.SnpTable + +class ReadCovariatesSuite extends SparkFunSuite { + + test("Test Quality Offset"){ + + val read = ADAMRecord.newBuilder() + .setRecordGroupId(0) + .setReadMapped(true).setStart(10000) + .setReferenceName("1") + .setCigar("10M") + .setMismatchingPositions("5C4") + .setSequence("CTACCCTAAC") + .setQual("##LKLPPQ##") + .build() + var readCovar = ReadCovariates( read, new QualByRG(), List(new BaseContext(2)), SnpTable() ) + val firstBaseCovar = readCovar.next() + assert(firstBaseCovar.qual === 43 ) + readCovar.foreach(bc => assert(bc.qual === bc.qualByRG) ) + + readCovar = ReadCovariates( read, new QualByRG(), List(new BaseContext(2)), SnpTable() ) + val bases = readCovar.drop(3) + val mismatchedBase = bases.next() + assert(mismatchedBase.qual === 47) + assert(mismatchedBase.isMismatch === true) + + } + + test("Test ReadCovar on SoftClipped Read"){ + + val read = ADAMRecord.newBuilder() + .setRecordGroupId(0) + .setReadMapped(true).setStart(10000) + .setReferenceName("1") + .setCigar("2S6M2S") + .setMismatchingPositions("3C2") + .setSequence("CTACCCTAAC") + .setQual("##LKLPPQ##") + .build() + var readCovar = ReadCovariates( read, new QualByRG(), List(new BaseContext(2)), SnpTable() ) + val firstBaseCovar = readCovar.next() + assert(firstBaseCovar.qual === 43 ) + readCovar.foreach(bc => assert(bc.qual === bc.qualByRG) ) + + readCovar = ReadCovariates( read, new QualByRG(), List(new BaseContext(2)), SnpTable() ) + val bases = readCovar.drop(3) + val mismatchedBase = bases.next() + assert(mismatchedBase.qual === 47) + assert(mismatchedBase.isMismatch === true) + + } + +}