Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added predicate option to loadCoverage #1156

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -955,16 +955,27 @@ class ADAMContext private (@transient val sc: SparkContext) extends Serializable
FragmentRDD.fromRdd(records.map(fastqRecordConverter.convertFragment))
}

/**
* Loads file of Features to a CoverageRDD.
* Coverage is stored in the score attribute of Feature.
*
* @param filePath File path to load coverage from.
* @return CoverageRDD containing an RDD of Coverage
*/
def loadCoverage(filePath: String): CoverageRDD = loadFeatures(filePath).toCoverage

/**
* Loads Parquet file of Features to a CoverageRDD.
* Coverage is stored in the score attribute of Feature.
*
* @param filePath File path to load coverage from
* @param filePath File path to load coverage from.
* @param predicate An optional predicate to push down into the file.
* @return CoverageRDD containing an RDD of Coverage
*/
def loadCoverage(filePath: String): CoverageRDD = {
def loadParquetCoverage(filePath: String,
predicate: Option[FilterPredicate] = None): CoverageRDD = {
val proj = Projection(FeatureField.contigName, FeatureField.start, FeatureField.end, FeatureField.score)
loadFeatures(filePath, projection = Some(proj)).toCoverage
loadParquetFeatures(filePath, predicate = predicate, projection = Some(proj)).toCoverage
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@
*/
package org.bdgenomics.adam.rdd.features

import org.bdgenomics.adam.models.{ Coverage, SequenceDictionary, SequenceRecord }
import org.apache.parquet.filter2.dsl.Dsl._
import org.bdgenomics.adam.models.{
ReferenceRegion,
Coverage,
SequenceDictionary,
SequenceRecord
}
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.util.ADAMFunSuite
import org.bdgenomics.formats.avro.Feature
Expand Down Expand Up @@ -54,6 +60,23 @@ class CoverageRDDSuite extends ADAMFunSuite {
assert(coverage.rdd.count == 3)
}

sparkTest("correctly filters coverage with predicate") {
val f1 = Feature.newBuilder().setContigName("chr1").setStart(1).setEnd(10).setScore(3.0).build()
val f2 = Feature.newBuilder().setContigName("chr1").setStart(15).setEnd(20).setScore(2.0).build()
val f3 = Feature.newBuilder().setContigName("chr2").setStart(15).setEnd(20).setScore(2.0).build()

val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)))
val coverageRDD: CoverageRDD = featureRDD.toCoverage

val outputFile = tmpLocation(".adam")
coverageRDD.save(outputFile, false)

val region = ReferenceRegion("chr1", 1, 9)
val predicate = ((LongColumn("end") >= region.start) && (LongColumn("start") <= region.end) && (BinaryColumn("contigName") === region.referenceName))
val coverage = sc.loadParquetCoverage(outputFile, Some(predicate))
assert(coverage.rdd.count == 1)
}

sparkTest("correctly flatmaps coverage without aggregated bins") {
val f1 = Feature.newBuilder().setContigName("chr1").setStart(1).setEnd(5).setScore(1.0).build()
val f2 = Feature.newBuilder().setContigName("chr1").setStart(5).setEnd(7).setScore(3.0).build()
Expand Down