From a50c351008ea27a527cbceac3a021f143d16e015 Mon Sep 17 00:00:00 2001 From: Frank Austin Nothaft Date: Thu, 18 May 2017 16:42:01 -0700 Subject: [PATCH] [ADAM-1539] Support locus predicate in Transform. Resolves #1539. --- .../org/bdgenomics/adam/cli/Transform.scala | 31 ++++++++++++++-- adam-cli/src/test/resources/sorted.bam | Bin 0 -> 283 bytes adam-cli/src/test/resources/sorted.bam.bai | Bin 0 -> 336 bytes .../bdgenomics/adam/cli/TransformSuite.scala | 20 ++++++++++ adam-core/pom.xml | 5 +++ .../adam/models/ReferenceRegion.scala | 35 ++++++++++++++++++ .../org/bdgenomics/adam/rdd/GenomicRDD.scala | 6 +-- .../adam/models/ReferenceRegionSuite.scala | 22 +++++++++++ pom.xml | 5 +++ 9 files changed, 117 insertions(+), 7 deletions(-) create mode 100644 adam-cli/src/test/resources/sorted.bam create mode 100644 adam-cli/src/test/resources/sorted.bam.bai diff --git a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Transform.scala b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Transform.scala index e6a41396b2..bd084a63f4 100644 --- a/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Transform.scala +++ b/adam-cli/src/main/scala/org/bdgenomics/adam/cli/Transform.scala @@ -23,7 +23,7 @@ import org.apache.spark.SparkContext import org.apache.spark.storage.StorageLevel import org.bdgenomics.adam.algorithms.consensus._ import org.bdgenomics.adam.instrumentation.Timers._ -import org.bdgenomics.adam.models.SnpTable +import org.bdgenomics.adam.models.{ ReferenceRegion, SnpTable } import org.bdgenomics.adam.projections.{ AlignmentRecordField, Filter } import org.bdgenomics.adam.rdd.ADAMContext._ import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs @@ -49,8 +49,10 @@ class TransformArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs { var outputPath: String = null @Args4jOption(required = false, name = "-limit_projection", usage = "Only project necessary fields. Only works for Parquet files.") var limitProjection: Boolean = false - @Args4jOption(required = false, name = "-aligned_read_predicate", usage = "Only load aligned reads. Only works for Parquet files.") + @Args4jOption(required = false, name = "-aligned_read_predicate", usage = "Only load aligned reads. Only works for Parquet files. Exclusive of locus predicate.") var useAlignedReadPredicate: Boolean = false + @Args4jOption(required = false, name = "-locus_predicate", usage = "Only load a specific range of loci. Mutually exclusive with aligned read predicate.") + var locusPredicate: String = null @Args4jOption(required = false, name = "-sort_reads", usage = "Sort the reads by referenceId and read position") var sortReads: Boolean = false @Args4jOption(required = false, name = "-sort_lexicographically", usage = "Sort the reads lexicographically by contig name, instead of by index.") @@ -426,10 +428,20 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans "-limit_projection only applies to Parquet files, but a non-Parquet input path was specified." ) } + if (args.useAlignedReadPredicate && args.locusPredicate != null) { + throw new IllegalArgumentException( + "-aligned_read_predicate and -locus_predicate are mutually exclusive" + ) + } val aRdd: AlignmentRecordRDD = if (args.forceLoadBam) { - sc.loadBam(args.inputPath) + if (args.locusPredicate != null) { + val loci = ReferenceRegion.fromString(args.locusPredicate) + sc.loadIndexedBam(args.inputPath, loci) + } else { + sc.loadBam(args.inputPath) + } } else if (args.forceLoadFastq) { sc.loadFastq(args.inputPath, Option(args.pairedFastqFile), Option(args.fastqRecordGroup), stringency) } else if (args.forceLoadIFastq) { @@ -439,6 +451,10 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans args.limitProjection) { val pred = if (args.useAlignedReadPredicate) { Some(BooleanColumn("readMapped") === true) + } else if (args.locusPredicate != null) { + Some(ReferenceRegion.createPredicate( + ReferenceRegion.fromString(args.locusPredicate).toSeq: _* + )) } else { None } @@ -454,12 +470,19 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans optPredicate = pred, optProjection = proj) } else { - sc.loadAlignments( + val loadedReads = sc.loadAlignments( args.inputPath, optPathName2 = Option(args.pairedFastqFile), optRecordGroup = Option(args.fastqRecordGroup), stringency = stringency ) + + if (args.locusPredicate != null) { + val loci = ReferenceRegion.fromString(args.locusPredicate) + loadedReads.filterByOverlappingRegions(loci) + } else { + loadedReads + } } val rdd = aRdd.rdd val sd = aRdd.sequences diff --git a/adam-cli/src/test/resources/sorted.bam b/adam-cli/src/test/resources/sorted.bam new file mode 100644 index 0000000000000000000000000000000000000000..ea8514f8c72e05f6801010ba8cbfe63fdb373f21 GIT binary patch literal 283 zcmb2|=3rp}f&Xj_PR>jW#SFzoUs8LL6A}tO6g=elyxHiA@l&2}+qKSWpVQa!^nF;w z!S{rRjjQ+<11B>x^Na?=T@9Qb2RJ+xEEk{PSl7Xt!7MBwZL}w$Z9~sP9pN_aH#|IR z9K<}hBP19UE*(8S6=;<_nw?z?#;FbrYRdkKZVG2vkNjWApz`j-1_lqVMh1168%(VU zT{BH=T4!v`C`l?{nAGL)rXjOI&_h{ZKf|k(Ni8?dG^RBeJ2QtU`%KrIz`se0bxE}N z?&{c$JFl~?`p6x?K6&SIffHGBCcSzIy;=rmJ$6}IGA7?>-k`~d=@e-OW^ibL2mmcj BThssm literal 0 HcmV?d00001 diff --git a/adam-cli/src/test/resources/sorted.bam.bai b/adam-cli/src/test/resources/sorted.bam.bai new file mode 100644 index 0000000000000000000000000000000000000000..c79e7358b0ac0eb4d37d0f5888e858b08348f936 GIT binary patch literal 336 zcmZ>A^kigVU|?VZVoxCk21X#wz)%7vEupm6B#1n^I7BU2DVRXki)1t?2H8BAISdTaU@b8JqKkv{AOnzMgc^tp06UHq8UO$Q literal 0 HcmV?d00001 diff --git a/adam-cli/src/test/scala/org/bdgenomics/adam/cli/TransformSuite.scala b/adam-cli/src/test/scala/org/bdgenomics/adam/cli/TransformSuite.scala index 457fcb116a..7a7aa37c6d 100644 --- a/adam-cli/src/test/scala/org/bdgenomics/adam/cli/TransformSuite.scala +++ b/adam-cli/src/test/scala/org/bdgenomics/adam/cli/TransformSuite.scala @@ -70,4 +70,24 @@ class TransformSuite extends ADAMFunSuite { assert(qualityScoreCounts(30) === 92899) assert(qualityScoreCounts(10) === 7101) } + + sparkTest("run locus predicate") { + // alas, copy resource does not work here... + val inputPath = testFile("sorted.bam") + val outputPath1 = tmpFile("predicate.1.adam") + val outputPath2 = tmpFile("predicate.2.adam") + val outputPath3 = tmpFile("predicate.3.adam") + Transform(Array(inputPath, outputPath1, + "-locus_predicate", "1:0-200,chr2:0-1000", + "-force_load_bam")).run(sc) + Transform(Array(outputPath1, outputPath2, + "-locus_predicate", "chr2:0-1000", + "-force_load_parquet")).run(sc) + Transform(Array(outputPath2, outputPath3, + "-locus_predicate", "chr2:0-400")).run(sc) + + assert(sc.loadAlignments(outputPath1).rdd.count === 3) + assert(sc.loadAlignments(outputPath2).rdd.count === 2) + assert(sc.loadAlignments(outputPath3).rdd.count === 1) + } } diff --git a/adam-core/pom.xml b/adam-core/pom.xml index 0a47a2f4a7..282988aaec 100644 --- a/adam-core/pom.xml +++ b/adam-core/pom.xml @@ -116,6 +116,11 @@ utils-intervalrdd_${scala.version.prefix} compile + + org.hammerlab + genomic-loci_${scala.version.prefix} + compile + com.esotericsoftware.kryo kryo diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/models/ReferenceRegion.scala b/adam-core/src/main/scala/org/bdgenomics/adam/models/ReferenceRegion.scala index d5dde4e0e8..5546d75127 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/models/ReferenceRegion.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/models/ReferenceRegion.scala @@ -23,6 +23,12 @@ import org.apache.parquet.filter2.dsl.Dsl._ import org.apache.parquet.filter2.predicate.FilterPredicate import org.bdgenomics.formats.avro._ import org.bdgenomics.utils.interval.array.Interval +import org.hammerlab.genomics.loci.parsing.{ + All, + LociRange, + LociRanges, + ParsedLoci +} import scala.math.{ max, min } trait ReferenceOrdering[T <: ReferenceRegion] extends Ordering[T] { @@ -86,6 +92,35 @@ object ReferenceRegion { implicit def orderingForPositions = RegionOrdering implicit def orderingForOptionalPositions = OptionalRegionOrdering + /** + * Parses a set of comma delimited loci from a string. + * + * Acceptable strings include: + * - ctg:start-end + * - ctg:pos + * + * @param loci The string describing the loci to create reference regions for. + * @return Returns an iterable collection of reference regions. + */ + def fromString(loci: String): Iterable[ReferenceRegion] = { + + ParsedLoci(loci) match { + case All => { + throw new IllegalArgumentException("Unsupported value 'all'") + } + case LociRanges(ranges) => { + ranges.map(range => range match { + case LociRange(contigName, start, None) => { + ReferencePosition(contigName, start) + } + case LociRange(contigName, start, Some(end)) => { + ReferenceRegion(contigName, start, end) + } + }) + } + } + } + /** * Creates a reference region that starts at the beginning of a contig. * diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala index e1de5c9941..31e7556d21 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala @@ -378,14 +378,14 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] { * @return Returns a new GenomicRDD containing only data that overlaps the * querys region. */ - def filterByOverlappingRegions(querys: List[ReferenceRegion]): U = { + def filterByOverlappingRegions(querys: Iterable[ReferenceRegion]): U = { replaceRdd(rdd.filter(elem => { val regions = getReferenceRegions(elem) - querys.map(query => { + querys.exists(query => { regions.exists(_.overlaps(query)) - }).fold(false)((a, b) => a || b) + }) })) } diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/models/ReferenceRegionSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/models/ReferenceRegionSuite.scala index a8e891583a..11cdf9a3aa 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/models/ReferenceRegionSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/models/ReferenceRegionSuite.scala @@ -39,6 +39,28 @@ class ReferenceRegionSuite extends FunSuite { } } + test("can't parse all locus") { + intercept[IllegalArgumentException] { + ReferenceRegion.fromString("all") + } + intercept[IllegalArgumentException] { + ReferenceRegion.fromString("1:100-200,all") + } + } + + test("parse string into reference regions") { + val loci = ReferenceRegion.fromString("1:100,2:1000-2000") + assert(loci.size === 2) + val ctg1 = loci.filter(_.referenceName == "1") + assert(ctg1.size === 1) + assert(ctg1.head.start === 100L) + assert(ctg1.head.end === 101L) + val ctg2 = loci.filter(_.referenceName == "2") + assert(ctg2.size === 1) + assert(ctg2.head.start === 1000L) + assert(ctg2.head.end === 2000L) + } + test("contains(: ReferenceRegion)") { assert(region("chr0", 10, 100).contains(region("chr0", 50, 70))) assert(region("chr0", 10, 100).contains(region("chr0", 10, 100))) diff --git a/pom.xml b/pom.xml index 20d0d07a5a..78c1d84e77 100644 --- a/pom.xml +++ b/pom.xml @@ -398,6 +398,11 @@ + + org.hammerlab + genomic-loci_${scala.version.prefix} + 1.4.4 + org.bdgenomics.utils utils-cli_${scala.version.prefix}