diff --git a/avocado-core/src/main/scala/org/bdgenomics/avocado/discovery/ReassemblyExplorer.scala b/avocado-core/src/main/scala/org/bdgenomics/avocado/discovery/ReassemblyExplorer.scala index 37eb7a19..c67503c5 100644 --- a/avocado-core/src/main/scala/org/bdgenomics/avocado/discovery/ReassemblyExplorer.scala +++ b/avocado-core/src/main/scala/org/bdgenomics/avocado/discovery/ReassemblyExplorer.scala @@ -21,8 +21,8 @@ import org.apache.commons.configuration.{ HierarchicalConfiguration, SubnodeConf import org.apache.spark.SparkContext._ import org.apache.spark.Logging import org.apache.spark.rdd.RDD -import org.bdgenomics.adam.models.{ ReferenceMapping, ReferenceRegion } -import org.bdgenomics.adam.rdd.RegionJoin +import org.bdgenomics.adam.models.{ ReferenceMapping, ReferenceRegion, SequenceDictionary } +import org.bdgenomics.adam.rdd.ShuffleRegionJoin import org.bdgenomics.adam.rich.ReferenceMappingContext._ import org.bdgenomics.avocado.algorithms.debrujin.KmerGraph import org.bdgenomics.avocado.models.{ Observation } @@ -36,7 +36,7 @@ object ReassemblyExplorer extends ExplorerCompanion { protected def apply(stats: AvocadoConfigAndStats, config: SubnodeConfiguration): Explorer = { val kmerLength = config.getInt("kmerLength", 20) - new ReassemblyExplorer(kmerLength, stats.reference) + new ReassemblyExplorer(kmerLength, stats.reference, stats.sequenceDict, stats.contigLengths) } implicit object ContigReferenceMapping extends ReferenceMapping[(Long, NucleotideContigFragment)] with Serializable { @@ -48,7 +48,11 @@ object ReassemblyExplorer extends ExplorerCompanion { import ReassemblyExplorer._ class ReassemblyExplorer(kmerLength: Int, - reference: RDD[NucleotideContigFragment]) extends Explorer with Logging { + reference: RDD[NucleotideContigFragment], + sd: SequenceDictionary, + contigLengths: Map[String, Long]) extends Explorer with Logging { + + val totalAssembledReferenceLength = contigLengths.values.sum val companion: ExplorerCompanion = ReassemblyExplorer @@ -76,9 +80,11 @@ class ReassemblyExplorer(kmerLength: Int, // filter mapped reads, join with reference contigs, then extract contig ids // ultimately, this should use the merge-sort join, not the broadcast join // will upgrade when ADAM-534 merges. - val joinWithId = RegionJoin.partitionAndJoin(reference.context, + val joinWithId = ShuffleRegionJoin.partitionAndJoin(reference.context, refIds, - reads.filter(_.getReadMapped)) + reads.filter(_.getReadMapped), + sd, + totalAssembledReferenceLength / reads.partitions.size) .map(kv => { (kv._1._1, kv._2) }) diff --git a/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/ExternalGenotyper.scala b/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/ExternalGenotyper.scala index a2ca1374..f329d279 100644 --- a/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/ExternalGenotyper.scala +++ b/avocado-core/src/main/scala/org/bdgenomics/avocado/genotyping/ExternalGenotyper.scala @@ -30,7 +30,7 @@ import org.bdgenomics.formats.avro.AlignmentRecord import org.bdgenomics.adam.converters.VariantContextConverter import org.bdgenomics.adam.models.{ SAMFileHeaderWritable, VariantContext => ADAMVariantContext, ReferencePosition } import org.bdgenomics.adam.rdd.ADAMContext._ -import org.bdgenomics.adam.rdd.GenomicRegionPartitioner +import org.bdgenomics.adam.rdd.GenomicPositionPartitioner import org.bdgenomics.adam.rdd.read.AlignmentRecordContext._ import org.bdgenomics.avocado.models.{ Observation, ReadObservation } import org.bdgenomics.avocado.stats.AvocadoConfigAndStats @@ -199,7 +199,7 @@ class ExternalGenotyper(contigLengths: Map[String, Long], // key reads by position and repartition val readsByPosition = reads.keyBy(r => ReferencePosition(r.get.getReferenceName.toString, r.get.getAlignmentStart)) - .partitionBy(new GenomicRegionPartitioner(numPart, contigLengths)) + .partitionBy(GenomicPositionPartitioner(numPart, contigLengths)) if (debug) { log.info("have " + readsByPosition.count + " reads after partitioning") diff --git a/avocado-core/src/test/scala/org/bdgenomics/avocado/discovery/ReassemblyExplorerSuite.scala b/avocado-core/src/test/scala/org/bdgenomics/avocado/discovery/ReassemblyExplorerSuite.scala index 90f7de21..3381aa8a 100644 --- a/avocado-core/src/test/scala/org/bdgenomics/avocado/discovery/ReassemblyExplorerSuite.scala +++ b/avocado-core/src/test/scala/org/bdgenomics/avocado/discovery/ReassemblyExplorerSuite.scala @@ -21,6 +21,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ import org.bdgenomics.adam.models.ReferenceRegion import org.bdgenomics.adam.rdd.ADAMContext._ +import org.bdgenomics.adam.rdd.read.AlignmentRecordRDDFunctions import org.bdgenomics.adam.rich.RichGenotype._ import org.bdgenomics.adam.rich.{ RichAlignmentRecord, RichGenotype } import org.bdgenomics.adam.util.SparkFunSuite @@ -85,12 +86,15 @@ class ReassemblyExplorerSuite extends SparkFunSuite { sc.loadAlignments(path) } + lazy val sd = new AlignmentRecordRDDFunctions(na12878_chr20_snp_reads).adamGetSequenceDictionary() + lazy val cl = sd.records.map(r => (r.name, r.length)).toMap + sparkTest("reassemble and call variants on real data") { val reads = na12878_chr20_snp_reads val reference = getReferenceFromReads(reads.collect.toSeq) - val re = new ReassemblyExplorer(20, sc.parallelize(Seq(reference))) + val re = new ReassemblyExplorer(20, sc.parallelize(Seq(reference)), sd, cl) val obs = re.discover(reads) val bg = new BiallelicGenotyper() val vc = bg.genotype(obs) @@ -110,7 +114,7 @@ class ReassemblyExplorerSuite extends SparkFunSuite { val reference = getReferenceFromReads(reads.collect.toSeq) - val re = new ReassemblyExplorer(20, sc.parallelize(Seq(reference))) + val re = new ReassemblyExplorer(20, sc.parallelize(Seq(reference)), sd, cl) val obs = re.discover(reads) val bg = new BiallelicGenotyper() val vc = bg.genotype(obs) diff --git a/pom.xml b/pom.xml index 21da1300..ad45d12f 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ 1.7 2.10.3 2.10 - 1.1.0 + 1.2.0 2.2.0 @@ -238,6 +238,10 @@ hadoop-client ${hadoop.version} + + com.google.guava + guava + asm asm