[avocado-122] Tested and reintegrated EM algorithm.
fnothaft committed Jan 8, 2015
1 parent b5718a6 commit 343b70d
Showing 9 changed files with 300 additions and 100 deletions.
package org.bdgenomics.avocado.calls.pileup
package org.bdgenomics.avocado.algorithms.em

import scala.math.pow
import breeze.stats.distributions.Binomial
import scala.math.{ abs, pow }

object EMForAlleles {

* Note: GL is currently an array of (numSnps) arrays of length (numInds),
def emForMAF(Phi: Array[Double], GL: Array[Array[(Double, Double, Double)]]): Array[Double] = {
var eps = 1.0
val tol = 0.0001
val L = Phi.length
var phi_updates = Phi
while (eps > tol) {
var Phi_next = Array.fill(L) { 0.0 }
Phi_next.indices.foreach(i => {
GL(i).foreach(l => {
Phi_next(i) += (1.0 / (2.0 * GL(i).length)) * ((1.0 * l._2 * 2.0 * phi_updates(i) * (1 - phi_updates(i)) +
2.0 * l._3 * pow(phi_updates(i), 2.0)) / (l._1 * pow(1.0 - phi_updates(i), 2.0) +
def emForMAF(genotypeLikelihoods: Array[Array[Double]],
referenceFrequencyEstimate: Double,
maxIterations: Option[Int] = None,
targetTolerance: Option[Double] = None): Double = {
assert(maxIterations.isDefined || targetTolerance.isDefined,
"Must define at least one of the iteration or tolerance limits.")

// M is the total number of chromosomes in all samples
val samplePloidy = - 1)
val M = samplePloidy.sum.toDouble
val ploidies = samplePloidy.distinct.toArray

// loop until convergence
var psi = referenceFrequencyEstimate
var lastPsi = psi
var iter = 0
do {
// carry over psi from previous iteration
lastPsi = psi

// calculate the new prior distributions per ploidy
val ploidyDistributionMap = => {
// build distribution
val dist = Binomial(m, psi)

// evaluate for all states 0...(m + 1)
val stateArray = new Array[Double](m + 1)
(0 to m).foreach(i => {
stateArray(i) = dist.probabilityOf(i)
// per sample, calculate the contribution of each genotype state
// then, sum these contributions together and normalize
psi = => {
// the length of the genotype likelihood array is equal to the sample ploidy plus 1
val ploidyP1 = gls.length

val prior = ploidyDistributionMap(ploidyP1)

// loop to sum
var num = 0.0
var denom = 0.0
(0 until ploidyP1).foreach(i => {
val contribution = gls(i) * prior(i)
num += i * contribution
denom += contribution

num / denom
}).sum / M

// increment iteration count
iter += 1
} while (targetTolerance.fold(true)(_ < abs(psi - lastPsi)) &&
maxIterations.fold(true)(_ > iter))

package org.bdgenomics.avocado.genotyping

import breeze.stats.distributions.Binomial
import org.apache.commons.configuration.{ HierarchicalConfiguration, SubnodeConfiguration }
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.bdgenomics.adam.models.{ ReferencePosition, VariantContext }
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.util.PhredUtils
import org.bdgenomics.avocado.algorithms.em.EMForAlleles
import org.bdgenomics.avocado.models.{ AlleleObservation, Observation }
import org.bdgenomics.avocado.stats.AvocadoConfigAndStats
import org.bdgenomics.formats.avro.{ Contig, Genotype, GenotypeAllele, Variant }
import scala.annotation.tailrec
import scala.math.pow
import scala.math.{ max, min, pow }

object BiallelicGenotyper extends GenotyperCompanion {

Expand All @@ -36,15 +38,58 @@ object BiallelicGenotyper extends GenotyperCompanion {
protected def apply(stats: AvocadoConfigAndStats,
config: SubnodeConfiguration): Genotyper = {

// get finishing conditions for EM algorithms
val useEM = config.getBoolean("useEM", true)
val maxIterations = if (config.containsKey("maxEMIterations")) {
val iterLimit = config.getInt("maxEMIterations")
if (iterLimit <= 0) {
throw new IllegalArgumentException("EM iteration limit must be greater than 0.")
} else {
val tolerance = if (config.containsKey("emTolerance")) {
val emTol = config.getDouble("emTolerance")
if (emTol < 0.0 || emTol > 1.0) {
throw new IllegalArgumentException("EM tolerance must be between 0 and 1, non-inclusive.")
} else {

if (maxIterations.isEmpty && tolerance.isEmpty && useEM) {
throw new IllegalArgumentException("At least one constraint must be defined for the EM algorithm.")

// what level do we saturate the reference frequency to if we encounter underflow?
val referenceFrequency = config.getDouble("referenceFrequency", 0.999)
if (referenceFrequency < 0.0 || referenceFrequency > 1.0) {
throw new IllegalArgumentException("Reference frequency must be between 0 and 1, non-inclusive.")
val saturationThreshold = config.getDouble("emSaturationThreshold", 0.001)
if (saturationThreshold < 0.0 || saturationThreshold > 1.0) {
throw new IllegalArgumentException("Saturation threshold must be between 0 and 1, non-inclusive.")

new BiallelicGenotyper(config.getInt("ploidy", 2),
config.getBoolean("useEM", false),
config.getBoolean("emitGVCF", true))
config.getBoolean("emitGVCF", true),

class BiallelicGenotyper(ploidy: Int = 2,
useEM: Boolean = false,
emitGVCF: Boolean = true) extends Genotyper with Logging {
emitGVCF: Boolean = true,
estimatedReferenceFrequency: Double = 0.999,
maxIterations: Option[Int] = Some(10),
tolerance: Option[Double] = Some(1e-3),
saturationThreshold: Double = 0.001) extends Genotyper with Logging {

val companion: GenotyperCompanion = BiallelicGenotyper

// compensate likelihoods on the basis of population statistics
val compensatedLikelihoodsPerSample = if (useEM) {
// TODO: connect up EM algorithm
val majorAlleleFrequency = if (useEM) {
min(1.0 - saturationThreshold,
max(EMForAlleles.emForMAF(likelihoodsPerSample.flatMap(s => {
// did we have any observations from this sample?
if (s._1.size > 0) {
} else {
tolerance), saturationThreshold))
} else {
val distribution = Binomial(ploidy, majorAlleleFrequency)
val statePriors = (0 to ploidy).map(g => distribution.probabilityOf(g))
val compensatedLikelihoodsPerSample = => {
// extract info
val (observations, likelihoods, likelihoodOtherAlt) = s

(0 to ploidy).foreach(i => {
likelihoods(i) *= statePriors(i)
(observations, likelihoods, likelihoodOtherAlt)

// construct variant
val variant = Variant.newBuilder()
@@ -0,0 +1,77 @@
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.bdgenomics.avocado.algorithms.em

import org.scalatest.FunSuite
import scala.math.abs

class EMForAllelesSuite extends FunSuite {

def fpEquals(a: Double, b: Double, eps: Double = 1e-3): Boolean = {
abs(a - b) < eps

test("cannot run EM without specifying an iteration limit or target tolerance") {
intercept[AssertionError] {
1.0 - 1e-3)

test("run EM on single sample, definite ref") {
val psi = EMForAlleles.emForMAF(Array(Array(0.0, 0.0, 1.0)),
1.0 - 1e-3,
maxIterations = Some(10))

assert(fpEquals(psi, 1.0))

test("run EM on three samples, mix of hom ref, het, hom alt") {
val psi = EMForAlleles.emForMAF(Array(Array(0.0000001, 0.0001, 0.1),
Array(0.0001, 0.1, 0.0001),
Array(0.1, 0.0001, 0.0000001)),
1.0 - 1e-3,
maxIterations = Some(10))

assert(fpEquals(psi, 0.5))

test("run EM on five samples, one hom alt, all others hom ref") {
val psi = EMForAlleles.emForMAF(Array(Array(0.0000001, 0.0001, 0.1),
Array(0.0000001, 0.0001, 0.1),
Array(0.0000001, 0.0001, 0.1),
Array(0.0000001, 0.0001, 0.1),
Array(0.1, 0.0001, 0.0000001)),
1.0 - 1e-3,
maxIterations = Some(10))

assert(fpEquals(psi, 0.8))

test("run EM on five samples, with varying ploidy, M = 10, G = 7") {
val psi = EMForAlleles.emForMAF(Array(Array(0.0000001, 0.0001, 0.1),
Array(0.0000001, 0.0001, 0.1),
Array(0.0000001, 0.0001, 0.1),
Array(0.1, 0.0001),
Array(0.0001, 0.1, 0.0001, 0.0000001)),
1.0 - 1e-3,
maxIterations = Some(10))

assert(fpEquals(psi, 0.7))
biallelicGenotyper = {
tolerance = 0.001;
defPart =
