diff --git a/examples/src/main/python/mllib/gaussian_mixture_model.py b/examples/src/main/python/mllib/gaussian_mixture_model.py index 327afa4a9e9d7..ee52e14bfaf17 100644 --- a/examples/src/main/python/mllib/gaussian_mixture_model.py +++ b/examples/src/main/python/mllib/gaussian_mixture_model.py @@ -17,16 +17,14 @@ """ A Gaussian Mixture Model clustering program using MLlib. - """ - import sys import random import argparse import numpy as np from pyspark import SparkConf, SparkContext -from pyspark.mllib.clustering import GaussianMixtureEM +from pyspark.mllib.clustering import GaussianMixture def parseVector(line): @@ -37,29 +35,29 @@ def parseVector(line): """ Parameters ---------- - input_file : Input file path which contains data points + inputFile : Input file path which contains data points k : Number of mixture components - convergenceTol : convergence_threshold. Default to 1e-3 - seed : random seed + convergenceTol : Convergence threshold. Default to 1e-3 maxIterations : Number of EM iterations to perform. Default to 100 + seed : Random seed """ parser = argparse.ArgumentParser() - parser.add_argument('input_file', help='input file') - parser.add_argument('k', type=int, help='num_of_clusters') - parser.add_argument('--convergenceTol', default=1e-3, type=float, help='convergence_threshold') + parser.add_argument('inputFile', help='Input File') + parser.add_argument('k', type=int, help='Number of clusters') + parser.add_argument('--convergenceTol', default=1e-3, type=float, help='convergence threshold') + parser.add_argument('--maxIterations', default=100, type=int, help='Number of iterations') parser.add_argument('--seed', default=random.getrandbits(19), - type=long, help='num_of_iterations') - parser.add_argument('--maxIterations', default=100, type=int, help='max_num_of_iterations') + type=long, help='Random seed') args = parser.parse_args() conf = SparkConf().setAppName("GMM") sc = SparkContext(conf=conf) - lines = sc.textFile(args.input_file) + lines = sc.textFile(args.inputFile) data = lines.map(parseVector) - model = GaussianMixtureEM.train(data, args.k, args.convergenceTol, - args.seed, args.maxIterations) + model = GaussianMixture.train(data, args.k, args.convergenceTol, + args.maxIterations, args.seed) for i in range(args.k): print ("weight = ", model.weights[i], "mu = ", model.gaussians[i].mu, "sigma = ", model.gaussians[i].sigma.toArray()) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index f973ce1f2b738..6382f2a156ec9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -261,7 +261,7 @@ class PythonMLLibAPI extends Serializable { } /** - * Java stub for Python mllib KMeans.train() + * Java stub for Python mllib KMeans.run() */ def trainKMeansModel( data: JavaRDD[Vector], @@ -286,22 +286,23 @@ class PythonMLLibAPI extends Serializable { } /** - * Java stub for Python mllib GaussianMixtureEM.train() + * Java stub for Python mllib GaussianMixture.run() * Returns a list containing weights, mean and covariance of each mixture component. */ - def trainGaussianMixtureEM( + def trainGaussianMixture( data: JavaRDD[Vector], k: Int, convergenceTol: Double, - seed: Long, - maxIterations: Int): JList[Object] = { - val gmmAlg = new GaussianMixtureEM() + maxIterations: Int, + seed: Long): JList[Object] = { + val gmmAlg = new GaussianMixture() .setK(k) .setConvergenceTol(convergenceTol) - .setSeed(seed) .setMaxIterations(maxIterations) + .setSeed(seed) try { val model = gmmAlg.run(data.rdd.persist(StorageLevel.MEMORY_AND_DISK)) + var wtArray:Array[Double] = Array() var muArray:Array[Vector] = Array() var siArray :Array[Matrix] = Array() @@ -310,6 +311,7 @@ class PythonMLLibAPI extends Serializable { muArray = muArray ++ Array(model.gaussians(i).mu) siArray = siArray ++ Array(model.gaussians(i).sigma) } + List(wtArray, muArray, siArray).map(_.asInstanceOf[Object]).asJava } finally { data.rdd.unpersist(blocking = false) @@ -319,12 +321,11 @@ class PythonMLLibAPI extends Serializable { /** * Java stub for Python mllib GaussianMixtureModel.predictSoft() */ - def predictGMM( + def predictSoftGMM( data: JavaRDD[Vector], wt: Object, mu: Array[Object], si: Array[Object]): RDD[Array[Double]] = { - try { val weight = wt.asInstanceOf[Array[Double]] val mean = mu.map(_.asInstanceOf[DenseVector]) val sigma = si.map(_.asInstanceOf[DenseMatrix]) @@ -332,10 +333,7 @@ class PythonMLLibAPI extends Serializable { i => new MultivariateGaussian(mean(i),sigma(i)) } val model = new GaussianMixtureModel(weight, gaussians) - model.predictSoft(data.rdd.persist(StorageLevel.MEMORY_AND_DISK)) - } finally { - data.rdd.unpersist(blocking = false) - } + model.predictSoft(data) } /** diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index c0104c99c56c5..02cd8e1646a8e 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -17,19 +17,20 @@ from numpy import array +from pyspark import RDD from pyspark import SparkContext from pyspark.mllib.common import callMLlibFunc, callJavaFunc from pyspark.mllib.linalg import DenseVector, SparseVector, _convert_to_vector +from pyspark.mllib.stat.distribution import MultivariateGaussian -__all__ = ['KMeansModel', 'KMeans', 'GaussianMixtureModel', 'GaussianMixtureEM', - 'MultiVariateGaussian'] +__all__ = ['KMeansModel', 'KMeans', 'GaussianMixtureModel', 'GaussianMixture'] class KMeansModel(object): """A clustering model derived from the k-means method. - >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2) + >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4, 2) >>> model = KMeans.train( ... sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random") >>> model.predict(array([0.0, 0.0])) == model.predict(array([1.0, 1.0])) @@ -94,8 +95,8 @@ class GaussianMixtureModel(object): >>> clusterdata_1 = sc.parallelize(array([-0.1,-0.05,-0.01,-0.1, ... 0.9,0.8,0.75,0.935, - ... -0.83,-0.68,-0.91,-0.76 ]).reshape(6,2)) - >>> model = GaussianMixtureEM.train(clusterdata_1, 3, convergenceTol=0.0001, + ... -0.83,-0.68,-0.91,-0.76 ]).reshape(6, 2)) + >>> model = GaussianMixture.train(clusterdata_1, 3, convergenceTol=0.0001, ... maxIterations=50, seed=10) >>> labels = model.predict(clusterdata_1).collect() >>> labels[0]==labels[1] @@ -108,8 +109,8 @@ class GaussianMixtureModel(object): ... -5.2211, -5.0602, 4.7118, ... 6.8989, 3.4592, 4.6322, ... 5.7048, 4.6567, 5.5026, - ... 4.5605, 5.2043, 6.2734]).reshape(5,3)) - >>> model = GaussianMixtureEM.train(clusterdata_2, 2, convergenceTol=0.0001, + ... 4.5605, 5.2043, 6.2734]).reshape(5, 3)) + >>> model = GaussianMixture.train(clusterdata_2, 2, convergenceTol=0.0001, ... maxIterations=150, seed=10) >>> labels = model.predict(clusterdata_2).collect() >>> labels[0]==labels[1]==labels[2] @@ -123,46 +124,60 @@ def __init__(self, weights, gaussians): self.gaussians = gaussians self.k = len(self.weights) - def predict(self, X): + def predict(self, x): """ - Find the cluster to which the points in X has maximum membership + Find the cluster to which the points in 'x' has maximum membership in this model. - Returns an RDD of cluster labels. - """ - cluster_labels = self.predictSoft(X).map(lambda x: x.index(max(x))) - return cluster_labels - def predictSoft(self, X): - """ - Find the membership of each point in X to all mixture components. - Returns an RDD of array of double values. - """ - means_temp = () - sigmas_temp = () - for i in range(self.k): - means_temp = means_temp + (self.gaussians[i].mu,) - sigmas_temp = sigmas_temp + (self.gaussians[i].sigma,) - membership_matrix = callMLlibFunc("predictGMM", X.map(_convert_to_vector), - self.weights, means_temp, sigmas_temp) - return membership_matrix + Parameters + ---------- + x : RDD of data points + Returns + ------- + cluster_labels : RDD of cluster labels. + """ + if isinstance(x, RDD): + cluster_labels = self.predictSoft(x).map(lambda z: z.index(max(z))) + return cluster_labels -class MultiVariateGaussian(object): + def predictSoft(self, x): + """ + Find the membership of each point in 'x' to all mixture components. - def __init__(self, mu, sigma): - self.mu = mu - self.sigma = sigma + Parameters + ---------- + x : RDD of data points + Returns + ------- + membership_matrix : RDD of array of double values. + """ + means, sigmas = zip(*[(g.mu, g.sigma) for g in self.gaussians]) + membership_matrix = callMLlibFunc("predictSoftGMM", x.map(_convert_to_vector), + self.weights, means, sigmas) + return membership_matrix -class GaussianMixtureEM(object): +class GaussianMixture(object): + """ + Estimate model parameters with the expectation-maximization algorithm. + + Parameters + ---------- + data - RDD of data points + k - Number of components + convergenceTol - Threshold value to check the convergence criteria. Defaults to 1e-3 + maxIterations - Number of iterations. Default to 100 + seed - Random Seed + """ @classmethod - def train(cls, rdd, k, convergenceTol=1e-3, seed=None, maxIterations=100): + def train(cls, rdd, k, convergenceTol=1e-3, maxIterations=100, seed=None): """Train a Gaussian Mixture clustering model.""" - weight, mu, sigma = callMLlibFunc("trainGaussianMixtureEM", + weight, mu, sigma = callMLlibFunc("trainGaussianMixture", rdd.map(_convert_to_vector), k, - convergenceTol, seed, maxIterations) - mvg_obj = array([MultiVariateGaussian(mu[i], sigma[i]) for i in range(k)]) + convergenceTol, maxIterations, seed) + mvg_obj = array([MultivariateGaussian(mu[i], sigma[i]) for i in range(k)]) return GaussianMixtureModel(weight, mvg_obj) diff --git a/python/pyspark/mllib/stat/__init__.py b/python/pyspark/mllib/stat/__init__.py index 799d260c096b1..b686d955a0080 100644 --- a/python/pyspark/mllib/stat/__init__.py +++ b/python/pyspark/mllib/stat/__init__.py @@ -20,5 +20,6 @@ """ from pyspark.mllib.stat._statistics import * +from pyspark.mllib.stat.distribution import MultivariateGaussian -__all__ = ["Statistics", "MultivariateStatisticalSummary"] +__all__ = ["Statistics", "MultivariateStatisticalSummary", "MultivariateGaussian"] diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 4ac6f37cdd0c6..1a583f705289c 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -168,28 +168,28 @@ def test_kmeans_deterministic(self): self.assertTrue(array_equal(c1, c2)) def test_gmm(self): - from pyspark.mllib.clustering import GaussianMixtureEM + from pyspark.mllib.clustering import GaussianMixture data = self.sc.parallelize([ [1, 2], [8, 9], [-4, -3], [-6, -7], ]) - clusters = GaussianMixtureEM.train(data, 2, convergenceTol=0.001, - seed=56, maxIterations=100) + clusters = GaussianMixture.train(data, 2, convergenceTol=0.001, + maxIterations=100, seed=56) labels = clusters.predict(data).collect() self.assertEquals(labels[0], labels[1]) self.assertEquals(labels[2], labels[3]) def test_gmm_deterministic(self): - from pyspark.mllib.clustering import GaussianMixtureEM + from pyspark.mllib.clustering import GaussianMixture X = range(0, 100, 10) Y = range(0, 100, 10) data = self.sc.parallelize([[x, y] for x, y in zip(X, Y)]) - clusters1 = GaussianMixtureEM.train(data, 5, convergenceTol=0.001, - seed=63, maxIterations=100) - clusters2 = GaussianMixtureEM.train(data, 5, convergenceTol=0.001, - seed=63, maxIterations=100) + clusters1 = GaussianMixture.train(data, 5, convergenceTol=0.001, + maxIterations=100, seed=63) + clusters2 = GaussianMixture.train(data, 5, convergenceTol=0.001, + maxIterations=100, seed=63) for c1, c2 in zip(clusters1.weights, clusters2.weights): self.assertEquals(round(c1, 7), round(c2, 7))