Skip to content

Commit

Permalink
Merge pull request #584 from fnothaft/fasta-kmer-counting
Browse files Browse the repository at this point in the history
[ADAM-583] Add k-mer counting functionality for nucleotide contig fragments
  • Loading branch information
massie committed Feb 23, 2015
2 parents 7ca3f33 + 539b320 commit 293407f
Show file tree
Hide file tree
Showing 6 changed files with 328 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ object ADAMMain extends Logging {
FindReads,
CalculateDepth,
CountKmers,
CountContigKmers,
Transform,
Adam2Fastq,
/* TODO (nealsid): Reimplement in terms of new schema
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.cli

import java.util.logging.Level
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{ SparkContext, Logging }
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.util.ParquetLogger
import org.bdgenomics.formats.avro.NucleotideContigFragment
import org.kohsuke.args4j.{ Argument, Option => Args4jOption }

object CountContigKmers extends ADAMCommandCompanion {
val commandName = "count_contig_kmers"
val commandDescription = "Counts the k-mers/q-mers from a read dataset."

def apply(cmdLine: Array[String]) = {
new CountKmers(Args4j[CountKmersArgs](cmdLine))
}
}

class CountContigKmersArgs extends Args4jBase with ParquetArgs {
@Argument(required = true, metaVar = "INPUT", usage = "The ADAM or FASTA file to count kmers from", index = 0)
var inputPath: String = null
@Argument(required = true, metaVar = "OUTPUT", usage = "Location for storing k-mer counts", index = 1)
var outputPath: String = null
@Argument(required = true, metaVar = "KMER_LENGTH", usage = "Length of k-mers", index = 2)
var kmerLength: Int = 0
@Args4jOption(required = false, name = "-printHistogram", usage = "Prints a histogram of counts.")
var printHistogram: Boolean = false
}

class CountContigKmers(protected val args: CountContigKmersArgs) extends ADAMSparkCommand[CountContigKmersArgs] with Logging {
val companion = CountContigKmers

def run(sc: SparkContext, job: Job) {

// Quiet Parquet...
ParquetLogger.hadoopLoggerLevel(Level.SEVERE)

// read from disk
var fragments: RDD[NucleotideContigFragment] = sc.loadSequence(args.inputPath)

// count kmers
val countedKmers = fragments.countKmers(args.kmerLength)

// print histogram, if requested
if (args.printHistogram) {
// cache counted kmers
countedKmers.cache()

countedKmers.map(kv => kv._2.toLong)
.countByValue()
.toSeq
.sortBy(kv => kv._1)
.foreach(println)
}

// save as text file
countedKmers.map(kv => kv._1 + ", " + kv._2)
.saveAsTextFile(args.outputPath)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.rdd

import org.apache.spark.Partitioner
import org.bdgenomics.adam.models.{
ReferencePosition,
ReferenceRegion,
SequenceDictionary
}

/**
* Repartitions objects that are keyed by a ReferencePosition or ReferenceRegion
* into a single partition per contig.
*/
case class ReferencePartitioner(sd: SequenceDictionary) extends Partitioner {

// extract just the reference names
private val referenceNames = sd.records.map(_.name)

override def numPartitions: Int = referenceNames.length

private def partitionFromName(name: String): Int = {
// which reference is this in?
val pIdx = referenceNames.indexOf(name)

// provide debug info to user if key is bad
assert(pIdx != -1, "Reference not found in " + sd + " for key " + name)

pIdx
}

override def getPartition(key: Any): Int = key match {
case rp: ReferencePosition => {
partitionFromName(rp.referenceName)
}
case rr: ReferenceRegion => {
partitionFromName(rr.referenceName)
}
case _ => throw new IllegalArgumentException("Only ReferencePositions or ReferenceRegions can be used as a key.")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.rdd.contig

import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.models.{ ReferenceRegion, SequenceDictionary }
import org.bdgenomics.adam.rdd.ReferencePartitioner
import org.bdgenomics.formats.avro.NucleotideContigFragment

private[contig] object FlankReferenceFragments extends Serializable {

def apply(rdd: RDD[NucleotideContigFragment],
sd: SequenceDictionary,
flankSize: Int): RDD[NucleotideContigFragment] = {
rdd.keyBy(ctg => ReferenceRegion(ctg).get)
.repartitionAndSortWithinPartitions(ReferencePartitioner(sd))
.mapPartitions(flank(_, flankSize))
}

def flank(iter: Iterator[(ReferenceRegion, NucleotideContigFragment)],
flankSize: Int): Iterator[NucleotideContigFragment] = {
// we need to have at least one element in the iterator
if (iter.hasNext) {
// now, we apply a window and flank adjacent segments
var lastFragment = iter.next
iter.map(f => {
// grab temp copy; we will overwrite later
val copyLastFragment = lastFragment

// are the two fragments adjacent? if so, we must add the flanking sequences
if (copyLastFragment._1.isAdjacent(f._1)) {
val lastSequence = copyLastFragment._2.getFragmentSequence
val currSequence = f._2.getFragmentSequence

// update fragments with flanking sequences
copyLastFragment._2.setFragmentSequence(lastSequence + currSequence.take(flankSize))
copyLastFragment._2.setDescription(Option(copyLastFragment._2.getDescription)
.fold("rr")(_ + "rr"))
f._2.setFragmentSequence(lastSequence.takeRight(flankSize) + currSequence)
f._2.setDescription("f")

// we must change the start position of the fragment we are appending in front of
f._2.setFragmentStartPosition(f._2.getFragmentStartPosition - flankSize.toLong)
}

// overwrite last fragment
lastFragment = f

// emit updated last fragment
copyLastFragment._2
}) ++ Iterator(lastFragment._2)
} else {
Iterator()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,36 @@ class NucleotideContigFragmentRDDFunctions(rdd: RDD[NucleotideContigFragment]) e
// variant context contains a single locus
Set(SequenceRecord.fromADAMContigFragment(elem))
}

/**
* For all adjacent records in the RDD, we extend the records so that the adjacent
* records now overlap by _n_ bases, where _n_ is the flank length.
*
* @param flankLength The length to extend adjacent records by.
* @param optSd An optional sequence dictionary. If none is provided, we recompute the
* sequence dictionary on the fly. Default is None.
* @return Returns the RDD, with all adjacent fragments extended with flanking sequence.
*/
def flankAdjacentFragments(flankLength: Int,
optSd: Option[SequenceDictionary] = None): RDD[NucleotideContigFragment] = {
FlankReferenceFragments(rdd, optSd.getOrElse(adamGetSequenceDictionary), flankLength)
}

/**
* Counts the k-mers contained in a FASTA contig.
*
* @param kmerLength The length of k-mers to count.
* @param optSd An optional sequence dictionary. If none is provided, we recompute the
* sequence dictionary on the fly. Default is None.
* @return Returns an RDD containing k-mer/count pairs.
*/
def countKmers(kmerLength: Int,
optSd: Option[SequenceDictionary] = None): RDD[(String, Long)] = {
flankAdjacentFragments(kmerLength, optSd).flatMap(r => {
// cut each read into k-mers, and attach a count of 1L
r.getFragmentSequence
.sliding(kmerLength)
.map(k => (k, 1L))
}).reduceByKey((k1: Long, k2: Long) => k1 + k2)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.rdd.contig

import org.bdgenomics.adam.models.ReferenceRegion
import org.bdgenomics.formats.avro.{ Contig, NucleotideContigFragment }
import org.scalatest.FunSuite

class FlankReferenceFragmentsSuite extends FunSuite {

test("don't put flanks on non-adjacent fragments") {
val testIter = Iterator((ReferenceRegion("chr1", 0L, 10L),
NucleotideContigFragment.newBuilder()
.setContig(Contig.newBuilder()
.setContigName("chr1")
.build())
.setFragmentSequence("AAAAATTTTT")
.setFragmentStartPosition(0L)
.build()), (ReferenceRegion("chr1", 20L, 30L),
NucleotideContigFragment.newBuilder()
.setContig(Contig.newBuilder()
.setContigName("chr1")
.build())
.setFragmentSequence("CCCCCGGGGG")
.setFragmentStartPosition(20L)
.build()))

val fragments = FlankReferenceFragments.flank(testIter, 5).toSeq

assert(fragments.size === 2)
fragments.foreach(_.getFragmentSequence.length === 10)
assert(fragments(0).getFragmentSequence === "AAAAATTTTT")
assert(fragments(0).getFragmentStartPosition === 0L)
assert(fragments(1).getFragmentSequence === "CCCCCGGGGG")
assert(fragments(1).getFragmentStartPosition === 20L)
}

test("put flanks on adjacent fragments") {
val testIter = Iterator((ReferenceRegion("chr1", 0L, 10L),
NucleotideContigFragment.newBuilder()
.setContig(Contig.newBuilder()
.setContigName("chr1")
.build())
.setFragmentSequence("AAAAATTTTT")
.setFragmentStartPosition(0L)
.build()), (ReferenceRegion("chr1", 10L, 20L),
NucleotideContigFragment.newBuilder()
.setContig(Contig.newBuilder()
.setContigName("chr1")
.build())
.setFragmentSequence("NNNNNUUUUU")
.setFragmentStartPosition(10L)
.build()), (ReferenceRegion("chr1", 20L, 30L),
NucleotideContigFragment.newBuilder()
.setContig(Contig.newBuilder()
.setContigName("chr1")
.build())
.setFragmentSequence("CCCCCGGGGG")
.setFragmentStartPosition(20L)
.build()))

val fragments = FlankReferenceFragments.flank(testIter, 5).toSeq

assert(fragments.size === 3)
assert(fragments(0).getFragmentSequence === "AAAAATTTTTNNNNN")
assert(fragments(0).getFragmentStartPosition === 0L)
assert(fragments(1).getFragmentSequence === "TTTTTNNNNNUUUUUCCCCC")
assert(fragments(1).getFragmentStartPosition === 5L)
assert(fragments(2).getFragmentSequence === "UUUUUCCCCCGGGGG")
assert(fragments(2).getFragmentStartPosition === 15L)
}
}

0 comments on commit 293407f

Please sign in to comment.