diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala
index 12be814a150b5..d59ba49ed1ba3 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DenseGmmEM.scala
@@ -21,6 +21,13 @@ import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.clustering.GaussianMixtureModelEM
import org.apache.spark.mllib.linalg.Vectors
+/**
+ * An example Gaussian Mixture Model EM app. Run with
+ * {{{
+ * ./bin/run-example org.apache.spark.examples.mllib.DenseGmmEM
+ * }}}
+ * If you use it as a template to create your own app, please use `spark-submit` to submit your app.
+ */
object DenseGmmEM {
def main(args: Array[String]): Unit = {
if (args.length != 3) {
@@ -44,13 +51,15 @@ object DenseGmmEM {
.run(data)
for (i <- 0 until clusters.k) {
- println("weight=%f mu=%s sigma=\n%s\n" format
+ println("weight=%f\nmu=%s\nsigma=\n%s\n" format
(clusters.weight(i), clusters.mu(i), clusters.sigma(i)))
}
+ println("Cluster labels:")
val (responsibilityMatrix, clusterLabels) = clusters.predict(data)
for (x <- clusterLabels.collect) {
print(" " + x)
}
+ println
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala
index 5ab654f609bd2..5c190120904b9 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModelEM.scala
@@ -26,6 +26,8 @@ import org.apache.spark.mllib.stat.impl.MultivariateGaussian
import org.apache.spark.{Accumulator, AccumulatorParam, SparkContext}
import org.apache.spark.SparkContext.DoubleAccumulatorParam
+import scala.collection.mutable.IndexedSeqView
+
/**
* This class performs expectation maximization for multivariate Gaussian
* Mixture Models (GMMs). A GMM represents a composite distribution of
@@ -51,6 +53,7 @@ class GaussianMixtureModelEM private (
// Type aliases for convenience
private type DenseDoubleVector = BreezeVector[Double]
private type DenseDoubleMatrix = BreezeMatrix[Double]
+ private type VectorArrayView = IndexedSeqView[DenseDoubleVector, Array[DenseDoubleVector]]
private type ExpectationSum = (
Array[Double], // log-likelihood in index 0
@@ -80,10 +83,12 @@ class GaussianMixtureModelEM private (
// compute cluster contributions for each input point
// (U, T) => U for aggregation
- private def computeExpectation(weights: Array[Double], dists: Array[MultivariateGaussian])
+ private def computeExpectation(
+ weights: Array[Double],
+ dists: Array[MultivariateGaussian])
(model: ExpectationSum, x: DenseDoubleVector): ExpectationSum = {
val k = model._2.length
- val p = (0 until k).map(i => eps + weights(i) * dists(i).pdf(x)).toArray
+ val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(x) }
val pSum = p.sum
model._1(0) += math.log(pSum)
val xxt = x * new Transpose(x)
@@ -106,7 +111,10 @@ class GaussianMixtureModelEM private (
/** A default instance, 2 Gaussians, 100 iterations, 0.01 log-likelihood threshold */
def this() = this(2, 0.01, 100)
- /** Set the initial GMM starting point, bypassing the random initialization */
+ /** Set the initial GMM starting point, bypassing the random initialization.
+ * You must call setK() prior to calling this method, and the condition
+ * (gmm.k == this.k) must be met; failure will result in an IllegalArgumentException
+ */
def setInitialGmm(gmm: GaussianMixtureModel): this.type = {
if (gmm.k == k) {
initialGmm = Some(gmm)
@@ -156,7 +164,7 @@ class GaussianMixtureModelEM private (
/** Perform expectation maximization */
def run(data: RDD[Vector]): GaussianMixtureModel = {
- val ctx = data.sparkContext
+ val sc = data.sparkContext
// we will operate on the data as breeze data
val breezeData = data.map(u => u.toBreeze.toDenseVector).cache()
@@ -164,26 +172,22 @@ class GaussianMixtureModelEM private (
// Get length of the input vectors
val d = breezeData.first.length
- // gaussians will be array of (weight, mean, covariance) tuples.
+ // Determine initial weights and corresponding Gaussians.
// If the user supplied an initial GMM, we use those values, otherwise
// we start with uniform weights, a random mean from the data, and
// diagonal covariance matrices using component variances
- // derived from the samples
- var gaussians = initialGmm match {
- case Some(gmm) => (0 until k).map{ i =>
- (gmm.weight(i), gmm.mu(i).toBreeze.toDenseVector, gmm.sigma(i).toBreeze.toDenseMatrix)
- }.toArray
+ // derived from the samples
+ val (weights, gaussians) = initialGmm match {
+ case Some(gmm) => (gmm.weight, gmm.mu.zip(gmm.sigma).map{ case(mu, sigma) =>
+ new MultivariateGaussian(mu.toBreeze.toDenseVector, sigma.toBreeze.toDenseMatrix)
+ }.toArray)
case None => {
- // For each Gaussian, we will initialize the mean as the average
- // of some random samples from the data
val samples = breezeData.takeSample(true, k * nSamples, scala.util.Random.nextInt)
-
- (0 until k).map{ i =>
- (1.0 / k,
- vectorMean(samples.slice(i * nSamples, (i + 1) * nSamples)),
- initCovariance(samples.slice(i * nSamples, (i + 1) * nSamples)))
- }.toArray
+ ((0 until k).map(_ => 1.0 / k).toArray, (0 until k).map{ i =>
+ val slice = samples.view(i * nSamples, (i + 1) * nSamples)
+ new MultivariateGaussian(vectorMean(slice), initCovariance(slice))
+ }.toArray)
}
}
@@ -192,47 +196,36 @@ class GaussianMixtureModelEM private (
var iter = 0
do {
- // pivot gaussians into weight and distribution arrays
- val weights = (0 until k).map(i => gaussians(i)._1).toArray
- val dists = (0 until k).map{ i =>
- new MultivariateGaussian(gaussians(i)._2, gaussians(i)._3)
- }.toArray
-
// create and broadcast curried cluster contribution function
- val compute = ctx.broadcast(computeExpectation(weights, dists)_)
+ val compute = sc.broadcast(computeExpectation(weights, gaussians)_)
// aggregate the cluster contribution for all sample points
- val sums = breezeData.aggregate(zeroExpectationSum(k, d))(compute.value, addExpectationSums)
-
- // Assignments to make the code more readable
- val logLikelihood = sums._1(0)
- val W = sums._2
- val MU = sums._3
- val SIGMA = sums._4
+ val (logLikelihood, wSums, muSums, sigmaSums) =
+ breezeData.aggregate(zeroExpectationSum(k, d))(compute.value, addExpectationSums)
// Create new distributions based on the partial assignments
// (often referred to as the "M" step in literature)
- gaussians = (0 until k).map{ i =>
- val weight = W(i) / W.sum
- val mu = MU(i) / W(i)
- val sigma = SIGMA(i) / W(i) - mu * new Transpose(mu)
- (weight, mu, sigma)
- }.toArray
-
+ val sumWeights = wSums.sum
+ for (i <- 0 until k) {
+ val mu = muSums(i) / wSums(i)
+ val sigma = sigmaSums(i) / wSums(i) - mu * new Transpose(mu)
+ weights(i) = wSums(i) / sumWeights
+ gaussians(i) = new MultivariateGaussian(mu, sigma)
+ }
+
llhp = llh // current becomes previous
- llh = logLikelihood // this is the freshly computed log-likelihood
+ llh = logLikelihood(0) // this is the freshly computed log-likelihood
iter += 1
} while(iter < maxIterations && Math.abs(llh-llhp) > convergenceTol)
// Need to convert the breeze matrices to MLlib matrices
- val weights = (0 until k).map(i => gaussians(i)._1).toArray
- val means = (0 until k).map(i => Vectors.fromBreeze(gaussians(i)._2)).toArray
- val sigmas = (0 until k).map(i => Matrices.fromBreeze(gaussians(i)._3)).toArray
+ val means = (0 until k).map(i => Vectors.fromBreeze(gaussians(i).mu)).toArray
+ val sigmas = (0 until k).map(i => Matrices.fromBreeze(gaussians(i).sigma)).toArray
new GaussianMixtureModel(weights, means, sigmas)
}
/** Average of dense breeze vectors */
- private def vectorMean(x: Array[DenseDoubleVector]): DenseDoubleVector = {
+ private def vectorMean(x: VectorArrayView): DenseDoubleVector = {
val v = BreezeVector.zeros[Double](x(0).length)
x.foreach(xi => v += xi)
v / x.length.asInstanceOf[Double]
@@ -242,7 +235,7 @@ class GaussianMixtureModelEM private (
* Construct matrix where diagonal entries are element-wise
* variance of input vectors (computes biased variance)
*/
- private def initCovariance(x: Array[DenseDoubleVector]): DenseDoubleMatrix = {
+ private def initCovariance(x: VectorArrayView): DenseDoubleMatrix = {
val mu = vectorMean(x)
val ss = BreezeVector.zeros[Double](x(0).length)
val cov = BreezeMatrix.eye[Double](ss.length)
@@ -255,15 +248,18 @@ class GaussianMixtureModelEM private (
* Given the input vectors, return the membership value of each vector
* to all mixture components.
*/
- def predictClusters(points: RDD[Vector], mu: Array[Vector], sigma: Array[Matrix],
+ def predictClusters(
+ points: RDD[Vector],
+ mu: Array[Vector],
+ sigma: Array[Matrix],
weight: Array[Double], k: Int): RDD[Array[Double]] = {
- val ctx = points.sparkContext
- val dists = ctx.broadcast{
+ val sc = points.sparkContext
+ val dists = sc.broadcast{
(0 until k).map{ i =>
new MultivariateGaussian(mu(i).toBreeze.toDenseVector, sigma(i).toBreeze.toDenseMatrix)
}.toArray
}
- val weights = ctx.broadcast((0 until k).map(i => weight(i)).toArray)
+ val weights = sc.broadcast((0 until k).map(i => weight(i)).toArray)
points.map{ x =>
computeSoftAssignments(x.toBreeze.toDenseVector, dists.value, weights.value, k)
}
@@ -272,11 +268,14 @@ class GaussianMixtureModelEM private (
/**
* Compute the partial assignments for each vector
*/
- def computeSoftAssignments(pt: DenseDoubleVector, dists: Array[MultivariateGaussian],
- weights: Array[Double], k: Int): Array[Double] = {
- val p = (0 until k).map(i => eps + weights(i) * dists(i).pdf(pt)).toArray
+ private def computeSoftAssignments(
+ pt: DenseDoubleVector,
+ dists: Array[MultivariateGaussian],
+ weights: Array[Double],
+ k: Int): Array[Double] = {
+ val p = weights.zip(dists).map { case (weight, dist) => eps + weight * dist.pdf(pt) }
val pSum = p.sum
- for(i<- 0 until k){
+ for (i <- 0 until k){
p(i) /= pSum
}
p
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/impl/MultivariateGaussian.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/impl/MultivariateGaussian.scala
index ab046a3acea05..d14adab09177e 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/impl/MultivariateGaussian.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/impl/MultivariateGaussian.scala
@@ -18,7 +18,7 @@
package org.apache.spark.mllib.stat.impl
import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix}
-import breeze.linalg.{Transpose, det, inv}
+import breeze.linalg.{Transpose, det, pinv}
/**
* Utility class to implement the density function for multivariate Gaussian distribution.
@@ -28,7 +28,7 @@ import breeze.linalg.{Transpose, det, inv}
private[mllib] class MultivariateGaussian(
val mu: BreezeVector[Double],
val sigma: BreezeMatrix[Double]) extends Serializable {
- private val sigmaInv2 = inv(sigma) * -0.5
+ private val sigmaInv2 = pinv(sigma) * -0.5
private val U = math.pow(2.0 * math.Pi, -mu.length / 2.0) * math.pow(det(sigma), -0.5)
def pdf(x: BreezeVector[Double]): Double = {