diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximization.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximization.scala index a8db4bd862523..a733bdb4e4369 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximization.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GMMExpectationMaximization.scala @@ -32,7 +32,7 @@ object GMMExpectationMaximization { /** * Trains a GMM using the given parameters * - * @param data training points stores as RDD[Vector] + * @param data training points stored as RDD[Vector] * @param k the number of Gaussians in the mixture * @param maxIterations the maximum number of iterations to perform * @param delta change in log-likelihood at which convergence is considered achieved @@ -47,7 +47,7 @@ object GMMExpectationMaximization { /** * Trains a GMM using the given parameters * - * @param data training points stores as RDD[Vector] + * @param data training points stored as RDD[Vector] * @param k the number of Gaussians in the mixture * @param maxIterations the maximum number of iterations to perform */ @@ -58,7 +58,18 @@ object GMMExpectationMaximization { /** * Trains a GMM using the given parameters * - * @param data training points stores as RDD[Vector] + * @param data training points stored as RDD[Vector] + * @param k the number of Gaussians in the mixture + * @param delta change in log-likelihood at which convergence is considered achieved + */ + def train(data: RDD[Vector], k: Int, delta: Double): GaussianMixtureModel = { + new GMMExpectationMaximization().setK(k).setDelta(delta).run(data) + } + + /** + * Trains a GMM using the given parameters + * + * @param data training points stored as RDD[Vector] * @param k the number of Gaussians in the mixture */ def train(data: RDD[Vector], k: Int): GaussianMixtureModel = { @@ -127,10 +138,12 @@ class GMMExpectationMaximization private ( // C will be array of (weight, mean, covariance) tuples // we start with uniform weights, a random mean from the data, and - // identity matrices for covariance + // diagonal covariance matrices using component variances + // derived from the samples var C = (0 until k).map(i => (1.0/k, vec_mean(samples.slice(i * nSamples, (i + 1) * nSamples)), - BreezeMatrix.eye[Double](d))).toArray + init_cov(samples.slice(i * nSamples, (i + 1) * nSamples))) + ).toArray val acc_w = new Array[Accumulator[Double]](k) val acc_mu = new Array[Accumulator[DenseDoubleVector]](k) @@ -216,6 +229,19 @@ class GMMExpectationMaximization private ( v / x.length.asInstanceOf[Double] } + /** + * Construct matrix where diagonal entries are element-wise + * variance of input vectors (computes biased variance) + */ + private def init_cov(x : Array[DenseDoubleVector]) : DenseDoubleMatrix = { + val mu = vec_mean(x) + val ss = BreezeVector.zeros[Double](x(0).length) + val result = BreezeMatrix.eye[Double](ss.length) + (0 until x.length).map(i => (x(i) - mu) :^ 2.0).foreach(u => ss += u) + (0 until ss.length).foreach(i => result(i,i) = ss(i) / x.length) + result + } + /** AccumulatorParam for Dense Breeze Vectors */ private object DenseDoubleVectorAccumulatorParam extends AccumulatorParam[DenseDoubleVector] { def zero(initialVector : DenseDoubleVector) : DenseDoubleVector = {