-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-4156 [MLLIB] EM algorithm for GMMs #3022
Changes from 8 commits
c15405c
5c96c57
c1a8e16
719d8cc
86fb382
e6ea805
676e523
8aaa17d
9770261
e7d413b
dc9c742
97044cf
f407b4c
b99ecc4
2df336b
c3b8ce0
d695034
9be2534
8b633f3
42b2142
20ebca1
cff73e0
308c8ad
227ad66
578c2d1
1de73f3
b97fe00
9b2fc2a
acf1fba
709e4bf
aaa8f25
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.examples.mllib | ||
|
||
import org.apache.spark.{SparkConf, SparkContext} | ||
import org.apache.spark.mllib.clustering.GaussianMixtureModel | ||
import org.apache.spark.mllib.clustering.GMMExpectationMaximization | ||
import org.apache.spark.mllib.linalg.Vectors | ||
|
||
object DenseGmmEM { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add documentation similar to other examples (e.g., DenseKMeans.scala) |
||
def main(args: Array[String]): Unit = { | ||
if( args.length != 3 ) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. scala style: should use this spacing: "if (args.length != 3)" |
||
println("usage: DenseGmmEM <input file> <k> <delta>") | ||
} else { | ||
run(args(0), args(1).toInt, args(2).toDouble) | ||
} | ||
} | ||
|
||
def run(inputFile: String, k: Int, tol: Double) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "tol" --> "delta" (or whatever delta is changed to; see other comment about delta) |
||
val conf = new SparkConf().setAppName("Spark EM Sample") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Gaussian Mixture Model EM example. |
||
val ctx = new SparkContext(conf) | ||
|
||
val data = ctx.textFile(inputFile).map(line => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. scala style: For multi-line map() calls, use braces:
This occurs in several other places in this PR. Could you please fix those too? |
||
Vectors.dense(line.trim.split(' ').map(_.toDouble))).cache() | ||
|
||
val clusters = GMMExpectationMaximization.train(data, k, tol) | ||
|
||
for(i <- 0 until clusters.k) { | ||
println("w=%f mu=%s sigma=\n%s\n" format (clusters.w(i), clusters.mu(i), clusters.sigma(i))) | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,283 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.mllib.clustering | ||
|
||
import breeze.linalg.{DenseVector => BreezeVector, DenseMatrix => BreezeMatrix} | ||
import breeze.linalg.{Transpose, det, inv} | ||
import org.apache.spark.rdd.RDD | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. space between groups of imports |
||
import org.apache.spark.mllib.linalg.{Matrices, Vector, Vectors} | ||
import org.apache.spark.{Accumulator, AccumulatorParam, SparkContext} | ||
import org.apache.spark.SparkContext.DoubleAccumulatorParam | ||
|
||
/** | ||
* Expectation-Maximization for multivariate Gaussian Mixture Models. | ||
* | ||
*/ | ||
object GMMExpectationMaximization { | ||
/** | ||
* Trains a GMM using the given parameters | ||
* | ||
* @param data training points stored as RDD[Vector] | ||
* @param k the number of Gaussians in the mixture | ||
* @param maxIterations the maximum number of iterations to perform | ||
* @param delta change in log-likelihood at which convergence is considered achieved | ||
*/ | ||
def train(data: RDD[Vector], k: Int, maxIterations: Int, delta: Double): GaussianMixtureModel = { | ||
new GMMExpectationMaximization().setK(k) | ||
.setMaxIterations(maxIterations) | ||
.setDelta(delta) | ||
.run(data) | ||
} | ||
|
||
/** | ||
* Trains a GMM using the given parameters | ||
* | ||
* @param data training points stored as RDD[Vector] | ||
* @param k the number of Gaussians in the mixture | ||
* @param maxIterations the maximum number of iterations to perform | ||
*/ | ||
def train(data: RDD[Vector], k: Int, maxIterations: Int): GaussianMixtureModel = { | ||
new GMMExpectationMaximization().setK(k).setMaxIterations(maxIterations).run(data) | ||
} | ||
|
||
/** | ||
* Trains a GMM using the given parameters | ||
* | ||
* @param data training points stored as RDD[Vector] | ||
* @param k the number of Gaussians in the mixture | ||
* @param delta change in log-likelihood at which convergence is considered achieved | ||
*/ | ||
def train(data: RDD[Vector], k: Int, delta: Double): GaussianMixtureModel = { | ||
new GMMExpectationMaximization().setK(k).setDelta(delta).run(data) | ||
} | ||
|
||
/** | ||
* Trains a GMM using the given parameters | ||
* | ||
* @param data training points stored as RDD[Vector] | ||
* @param k the number of Gaussians in the mixture | ||
*/ | ||
def train(data: RDD[Vector], k: Int): GaussianMixtureModel = { | ||
new GMMExpectationMaximization().setK(k).run(data) | ||
} | ||
} | ||
|
||
/** | ||
* This class performs multivariate Gaussian expectation maximization. It will | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be great if you could add a sentence or two explaining what GMMs are. |
||
* maximize the log-likelihood for a mixture of k Gaussians, iterating until | ||
* the log-likelihood changes by less than delta, or until it has reached | ||
* the max number of iterations. | ||
*/ | ||
class GMMExpectationMaximization private ( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It might be good to rename this class to "GaussianMixtureModelEM" so that its name is closer to the name of the model it produces. |
||
private var k: Int, | ||
private var delta: Double, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would recommend using "convergenceTol" since that is already used elsewhere (e.g., in LBFGS). |
||
private var maxIterations: Int) extends Serializable { | ||
|
||
// Type aliases for convenience | ||
private type DenseDoubleVector = BreezeVector[Double] | ||
private type DenseDoubleMatrix = BreezeMatrix[Double] | ||
|
||
// number of samples per cluster to use when initializing Gaussians | ||
private val nSamples = 5; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no semicolon |
||
|
||
// A default instance, 2 Gaussians, 100 iterations, 0.01 log-likelihood threshold | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use "/** ... */" for comment so it is part of the generated documentation. |
||
def this() = this(2, 0.01, 100) | ||
|
||
/** Set the number of Gaussians in the mixture model. Default: 2 */ | ||
def setK(k: Int): this.type = { | ||
this.k = k | ||
this | ||
} | ||
|
||
/** Set the maximum number of iterations to run. Default: 100 */ | ||
def setMaxIterations(maxIterations: Int): this.type = { | ||
this.maxIterations = maxIterations | ||
this | ||
} | ||
|
||
/** | ||
* Set the largest change in log-likelihood at which convergence is | ||
* considered to have occurred. | ||
*/ | ||
def setDelta(delta: Double): this.type = { | ||
this.delta = delta | ||
this | ||
} | ||
|
||
/** Machine precision value used to ensure matrix conditioning */ | ||
private val eps = math.pow(2.0, -52) | ||
|
||
/** Perform expectation maximization */ | ||
def run(data: RDD[Vector]): GaussianMixtureModel = { | ||
val ctx = data.sparkContext | ||
|
||
// we will operate on the data as breeze data | ||
val breezeData = data.map{ u => u.toBreeze.toDenseVector }.cache() | ||
|
||
// Get length of the input vectors | ||
val d = breezeData.first.length | ||
|
||
// For each Gaussian, we will initialize the mean as the average | ||
// of some random samples from the data | ||
val samples = breezeData.takeSample(true, k * nSamples, scala.util.Random.nextInt) | ||
|
||
// C will be array of (weight, mean, covariance) tuples | ||
// we start with uniform weights, a random mean from the data, and | ||
// diagonal covariance matrices using component variances | ||
// derived from the samples | ||
var C = (0 until k).map(i => (1.0/k, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. spaces around "/" operator Also, it might be good to use a more descriptive variable name like "centers" |
||
vec_mean(samples.slice(i * nSamples, (i + 1) * nSamples)), | ||
init_cov(samples.slice(i * nSamples, (i + 1) * nSamples))) | ||
).toArray | ||
|
||
val acc_w = new Array[Accumulator[Double]](k) | ||
val acc_mu = new Array[Accumulator[DenseDoubleVector]](k) | ||
val acc_sigma = new Array[Accumulator[DenseDoubleMatrix]](k) | ||
|
||
var llh = Double.MinValue // current log-likelihood | ||
var llhp = 0.0 // previous log-likelihood | ||
|
||
var i, iter = 0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i is not used (The for loops have implicit declarations of other "i" vars.) |
||
do { | ||
// reset accumulators | ||
for(i <- 0 until k){ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. scala style: |
||
acc_w(i) = ctx.accumulator(0.0) | ||
acc_mu(i) = ctx.accumulator( | ||
BreezeVector.zeros[Double](d))(DenseDoubleVectorAccumulatorParam) | ||
acc_sigma(i) = ctx.accumulator( | ||
BreezeMatrix.zeros[Double](d,d))(DenseDoubleMatrixAccumulatorParam) | ||
} | ||
|
||
val log_likelihood = ctx.accumulator(0.0) | ||
|
||
// broadcast the current weights and distributions to all nodes | ||
val dists = ctx.broadcast((0 until k).map(i => | ||
new MultivariateGaussian(C(i)._2, C(i)._3)).toArray) | ||
val weights = ctx.broadcast((0 until k).map(i => C(i)._1).toArray) | ||
|
||
// calculate partial assignments for each sample in the data | ||
// (often referred to as the "E" step in literature) | ||
breezeData.foreach(x => { | ||
val p = (0 until k).map(i => | ||
eps + weights.value(i) * dists.value(i).pdf(x)).toArray | ||
val norm = sum(p) | ||
|
||
log_likelihood += math.log(norm) | ||
|
||
// accumulate weighted sums | ||
val xxt = x * new Transpose(x) | ||
for(i <- 0 until k){ | ||
p(i) /= norm | ||
acc_w(i) += p(i) | ||
acc_mu(i) += x * p(i) | ||
acc_sigma(i) += xxt * p(i) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. extra whitespace after } |
||
}) | ||
|
||
// Collect the computed sums | ||
val W = (0 until k).map(i => acc_w(i).value).toArray | ||
val MU = (0 until k).map(i => acc_mu(i).value).toArray | ||
val SIGMA = (0 until k).map(i => acc_sigma(i).value).toArray | ||
|
||
// Create new distributions based on the partial assignments | ||
// (often referred to as the "M" step in literature) | ||
C = (0 until k).map(i => { | ||
val weight = W(i) / sum(W) | ||
val mu = MU(i) / W(i) | ||
val sigma = SIGMA(i) / W(i) - mu * new Transpose(mu) | ||
(weight, mu, sigma) | ||
}).toArray | ||
|
||
llhp = llh; // current becomes previous | ||
llh = log_likelihood.value // this is the freshly computed log-likelihood | ||
iter += 1 | ||
} while(iter < maxIterations && Math.abs(llh-llhp) > delta) | ||
|
||
// Need to convert the breeze matrices to MLlib matrices | ||
val weights = (0 until k).map(i => C(i)._1).toArray | ||
val means = (0 until k).map(i => Vectors.fromBreeze(C(i)._2)).toArray | ||
val sigmas = (0 until k).map(i => Matrices.fromBreeze(C(i)._3)).toArray | ||
new GaussianMixtureModel(weights, means, sigmas) | ||
} | ||
|
||
/** Sum the values in array of doubles */ | ||
private def sum(x : Array[Double]) : Double = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should be able to write |
||
var s : Double = 0.0 | ||
(0 until x.length).foreach(j => s += x(j)) | ||
s | ||
} | ||
|
||
/** Average of dense breeze vectors */ | ||
private def vec_mean(x : Array[DenseDoubleVector]) : DenseDoubleVector = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can probably write There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This does not work; the compiler can not find a suitable implicit conversion for the array of vectors when attempting x.sum There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, OK, good to know! |
||
val v = BreezeVector.zeros[Double](x(0).length) | ||
(0 until x.length).foreach(j => v += x(j)) | ||
v / x.length.asInstanceOf[Double] | ||
} | ||
|
||
/** | ||
* Construct matrix where diagonal entries are element-wise | ||
* variance of input vectors (computes biased variance) | ||
*/ | ||
private def init_cov(x : Array[DenseDoubleVector]) : DenseDoubleMatrix = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here and elsewhere, use camelCase naming convention |
||
val mu = vec_mean(x) | ||
val ss = BreezeVector.zeros[Double](x(0).length) | ||
val result = BreezeMatrix.eye[Double](ss.length) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "result" --> "cov" (or something more descriptive) |
||
(0 until x.length).map(i => (x(i) - mu) :^ 2.0).foreach(u => ss += u) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you simplify this to be:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here again, sum method on array of vectors fails to compile. |
||
(0 until ss.length).foreach(i => result(i,i) = ss(i) / x.length) | ||
result | ||
} | ||
|
||
/** AccumulatorParam for Dense Breeze Vectors */ | ||
private object DenseDoubleVectorAccumulatorParam extends AccumulatorParam[DenseDoubleVector] { | ||
def zero(initialVector : DenseDoubleVector) : DenseDoubleVector = { | ||
BreezeVector.zeros[Double](initialVector.length) | ||
} | ||
|
||
def addInPlace(a : DenseDoubleVector, b : DenseDoubleVector) : DenseDoubleVector = { | ||
a += b | ||
} | ||
} | ||
|
||
/** AccumulatorParam for Dense Breeze Matrices */ | ||
private object DenseDoubleMatrixAccumulatorParam extends AccumulatorParam[DenseDoubleMatrix] { | ||
def zero(initialVector : DenseDoubleMatrix) : DenseDoubleMatrix = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no space before ":" in method declaration (here and elsewhere) Also, "initialVector" --> "initialMatrix" |
||
BreezeMatrix.zeros[Double](initialVector.rows, initialVector.cols) | ||
} | ||
|
||
def addInPlace(a : DenseDoubleMatrix, b : DenseDoubleMatrix) : DenseDoubleMatrix = { | ||
a += b | ||
} | ||
} | ||
|
||
/** | ||
* Utility class to implement the density function for multivariate Gaussian distribution. | ||
* Breeze provides this functionality, but it requires the Apache Commons Math library, | ||
* so this class is here so-as to not introduce a new dependency in Spark. | ||
*/ | ||
private class MultivariateGaussian(val mu : DenseDoubleVector, val sigma : DenseDoubleMatrix) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you please move this class to a new folder mllib/stat/impl/ ? There are a few PRs introducing standard distributions. I think we should keep them private for now but collect them in stat/impl/. Later on, we can standardize the APIs and make them public classes. |
||
extends Serializable { | ||
private val sigma_inv_2 = inv(sigma) * -0.5 | ||
private val U = math.pow(2.0*math.Pi, -mu.length/2.0) * math.pow(det(sigma), -0.5) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. put spaces around operators "*" and "/" |
||
|
||
def pdf(x : DenseDoubleVector) : Double = { | ||
val delta = x - mu | ||
val delta_t = new Transpose(delta) | ||
U * math.exp(delta_t * sigma_inv_2 * delta) | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.mllib.clustering | ||
|
||
import org.apache.spark.mllib.linalg.Matrix | ||
import org.apache.spark.mllib.linalg.Vector | ||
|
||
/** | ||
* Multivariate Gaussian mixture model consisting of k Gaussians, where points are drawn | ||
* from each Gaussian i=1..k with probability w(i); mu(i) and sigma(i) are the respective | ||
* mean and covariance for each Gaussian distribution i=1..k. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would describe parameters using the param syntax. E.g.:
|
||
*/ | ||
class GaussianMixtureModel( | ||
val w: Array[Double], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It might be good to use a more explicit parameter name than "w." Maybe "weight" or "weights?" |
||
val mu: Array[Vector], | ||
val sigma: Array[Matrix]) extends Serializable { | ||
|
||
/** Number of gaussians in mixture */ | ||
def k: Int = w.length; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need for this import