Skip to content

Commit

Permalink
Fixed style issues
Browse files Browse the repository at this point in the history
  • Loading branch information
FlytxtRnD committed Feb 2, 2015
1 parent b22532c commit 3aee84b
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 71 deletions.
26 changes: 12 additions & 14 deletions examples/src/main/python/mllib/gaussian_mixture_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@

"""
A Gaussian Mixture Model clustering program using MLlib.
"""

import sys
import random
import argparse
import numpy as np

from pyspark import SparkConf, SparkContext
from pyspark.mllib.clustering import GaussianMixtureEM
from pyspark.mllib.clustering import GaussianMixture


def parseVector(line):
Expand All @@ -37,29 +35,29 @@ def parseVector(line):
"""
Parameters
----------
input_file : Input file path which contains data points
inputFile : Input file path which contains data points
k : Number of mixture components
convergenceTol : convergence_threshold. Default to 1e-3
seed : random seed
convergenceTol : Convergence threshold. Default to 1e-3
maxIterations : Number of EM iterations to perform. Default to 100
seed : Random seed
"""

parser = argparse.ArgumentParser()
parser.add_argument('input_file', help='input file')
parser.add_argument('k', type=int, help='num_of_clusters')
parser.add_argument('--convergenceTol', default=1e-3, type=float, help='convergence_threshold')
parser.add_argument('inputFile', help='Input File')
parser.add_argument('k', type=int, help='Number of clusters')
parser.add_argument('--convergenceTol', default=1e-3, type=float, help='convergence threshold')
parser.add_argument('--maxIterations', default=100, type=int, help='Number of iterations')
parser.add_argument('--seed', default=random.getrandbits(19),
type=long, help='num_of_iterations')
parser.add_argument('--maxIterations', default=100, type=int, help='max_num_of_iterations')
type=long, help='Random seed')
args = parser.parse_args()

conf = SparkConf().setAppName("GMM")
sc = SparkContext(conf=conf)

lines = sc.textFile(args.input_file)
lines = sc.textFile(args.inputFile)
data = lines.map(parseVector)
model = GaussianMixtureEM.train(data, args.k, args.convergenceTol,
args.seed, args.maxIterations)
model = GaussianMixture.train(data, args.k, args.convergenceTol,
args.maxIterations, args.seed)
for i in range(args.k):
print ("weight = ", model.weights[i], "mu = ", model.gaussians[i].mu,
"sigma = ", model.gaussians[i].sigma.toArray())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ class PythonMLLibAPI extends Serializable {
}

/**
* Java stub for Python mllib KMeans.train()
* Java stub for Python mllib KMeans.run()
*/
def trainKMeansModel(
data: JavaRDD[Vector],
Expand All @@ -286,22 +286,23 @@ class PythonMLLibAPI extends Serializable {
}

/**
* Java stub for Python mllib GaussianMixtureEM.train()
* Java stub for Python mllib GaussianMixture.run()
* Returns a list containing weights, mean and covariance of each mixture component.
*/
def trainGaussianMixtureEM(
def trainGaussianMixture(
data: JavaRDD[Vector],
k: Int,
convergenceTol: Double,
seed: Long,
maxIterations: Int): JList[Object] = {
val gmmAlg = new GaussianMixtureEM()
maxIterations: Int,
seed: Long): JList[Object] = {
val gmmAlg = new GaussianMixture()
.setK(k)
.setConvergenceTol(convergenceTol)
.setSeed(seed)
.setMaxIterations(maxIterations)
.setSeed(seed)
try {
val model = gmmAlg.run(data.rdd.persist(StorageLevel.MEMORY_AND_DISK))

var wtArray:Array[Double] = Array()
var muArray:Array[Vector] = Array()
var siArray :Array[Matrix] = Array()
Expand All @@ -310,6 +311,7 @@ class PythonMLLibAPI extends Serializable {
muArray = muArray ++ Array(model.gaussians(i).mu)
siArray = siArray ++ Array(model.gaussians(i).sigma)
}

List(wtArray, muArray, siArray).map(_.asInstanceOf[Object]).asJava
} finally {
data.rdd.unpersist(blocking = false)
Expand All @@ -319,23 +321,19 @@ class PythonMLLibAPI extends Serializable {
/**
* Java stub for Python mllib GaussianMixtureModel.predictSoft()
*/
def predictGMM(
def predictSoftGMM(
data: JavaRDD[Vector],
wt: Object,
mu: Array[Object],
si: Array[Object]): RDD[Array[Double]] = {
try {
val weight = wt.asInstanceOf[Array[Double]]
val mean = mu.map(_.asInstanceOf[DenseVector])
val sigma = si.map(_.asInstanceOf[DenseMatrix])
val gaussians = Array.tabulate(weight.length){
i => new MultivariateGaussian(mean(i),sigma(i))
}
val model = new GaussianMixtureModel(weight, gaussians)
model.predictSoft(data.rdd.persist(StorageLevel.MEMORY_AND_DISK))
} finally {
data.rdd.unpersist(blocking = false)
}
model.predictSoft(data)
}

/**
Expand Down
85 changes: 50 additions & 35 deletions python/pyspark/mllib/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,20 @@

from numpy import array

from pyspark import RDD
from pyspark import SparkContext
from pyspark.mllib.common import callMLlibFunc, callJavaFunc
from pyspark.mllib.linalg import DenseVector, SparseVector, _convert_to_vector
from pyspark.mllib.stat.distribution import MultivariateGaussian

__all__ = ['KMeansModel', 'KMeans', 'GaussianMixtureModel', 'GaussianMixtureEM',
'MultiVariateGaussian']
__all__ = ['KMeansModel', 'KMeans', 'GaussianMixtureModel', 'GaussianMixture']


class KMeansModel(object):

"""A clustering model derived from the k-means method.
>>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2)
>>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4, 2)
>>> model = KMeans.train(
... sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random")
>>> model.predict(array([0.0, 0.0])) == model.predict(array([1.0, 1.0]))
Expand Down Expand Up @@ -94,8 +95,8 @@ class GaussianMixtureModel(object):
>>> clusterdata_1 = sc.parallelize(array([-0.1,-0.05,-0.01,-0.1,
... 0.9,0.8,0.75,0.935,
... -0.83,-0.68,-0.91,-0.76 ]).reshape(6,2))
>>> model = GaussianMixtureEM.train(clusterdata_1, 3, convergenceTol=0.0001,
... -0.83,-0.68,-0.91,-0.76 ]).reshape(6, 2))
>>> model = GaussianMixture.train(clusterdata_1, 3, convergenceTol=0.0001,
... maxIterations=50, seed=10)
>>> labels = model.predict(clusterdata_1).collect()
>>> labels[0]==labels[1]
Expand All @@ -108,8 +109,8 @@ class GaussianMixtureModel(object):
... -5.2211, -5.0602, 4.7118,
... 6.8989, 3.4592, 4.6322,
... 5.7048, 4.6567, 5.5026,
... 4.5605, 5.2043, 6.2734]).reshape(5,3))
>>> model = GaussianMixtureEM.train(clusterdata_2, 2, convergenceTol=0.0001,
... 4.5605, 5.2043, 6.2734]).reshape(5, 3))
>>> model = GaussianMixture.train(clusterdata_2, 2, convergenceTol=0.0001,
... maxIterations=150, seed=10)
>>> labels = model.predict(clusterdata_2).collect()
>>> labels[0]==labels[1]==labels[2]
Expand All @@ -123,46 +124,60 @@ def __init__(self, weights, gaussians):
self.gaussians = gaussians
self.k = len(self.weights)

def predict(self, X):
def predict(self, x):
"""
Find the cluster to which the points in X has maximum membership
Find the cluster to which the points in 'x' has maximum membership
in this model.
Returns an RDD of cluster labels.
"""
cluster_labels = self.predictSoft(X).map(lambda x: x.index(max(x)))
return cluster_labels
def predictSoft(self, X):
"""
Find the membership of each point in X to all mixture components.
Returns an RDD of array of double values.
"""
means_temp = ()
sigmas_temp = ()
for i in range(self.k):
means_temp = means_temp + (self.gaussians[i].mu,)
sigmas_temp = sigmas_temp + (self.gaussians[i].sigma,)
membership_matrix = callMLlibFunc("predictGMM", X.map(_convert_to_vector),
self.weights, means_temp, sigmas_temp)
return membership_matrix
Parameters
----------
x : RDD of data points
Returns
-------
cluster_labels : RDD of cluster labels.
"""
if isinstance(x, RDD):
cluster_labels = self.predictSoft(x).map(lambda z: z.index(max(z)))
return cluster_labels

class MultiVariateGaussian(object):
def predictSoft(self, x):
"""
Find the membership of each point in 'x' to all mixture components.
def __init__(self, mu, sigma):
self.mu = mu
self.sigma = sigma
Parameters
----------
x : RDD of data points
Returns
-------
membership_matrix : RDD of array of double values.
"""
means, sigmas = zip(*[(g.mu, g.sigma) for g in self.gaussians])
membership_matrix = callMLlibFunc("predictSoftGMM", x.map(_convert_to_vector),
self.weights, means, sigmas)
return membership_matrix

class GaussianMixtureEM(object):

class GaussianMixture(object):
"""
Estimate model parameters with the expectation-maximization algorithm.
Parameters
----------
data - RDD of data points
k - Number of components
convergenceTol - Threshold value to check the convergence criteria. Defaults to 1e-3
maxIterations - Number of iterations. Default to 100
seed - Random Seed
"""
@classmethod
def train(cls, rdd, k, convergenceTol=1e-3, seed=None, maxIterations=100):
def train(cls, rdd, k, convergenceTol=1e-3, maxIterations=100, seed=None):
"""Train a Gaussian Mixture clustering model."""
weight, mu, sigma = callMLlibFunc("trainGaussianMixtureEM",
weight, mu, sigma = callMLlibFunc("trainGaussianMixture",
rdd.map(_convert_to_vector), k,
convergenceTol, seed, maxIterations)
mvg_obj = array([MultiVariateGaussian(mu[i], sigma[i]) for i in range(k)])
convergenceTol, maxIterations, seed)
mvg_obj = array([MultivariateGaussian(mu[i], sigma[i]) for i in range(k)])
return GaussianMixtureModel(weight, mvg_obj)


Expand Down
3 changes: 2 additions & 1 deletion python/pyspark/mllib/stat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@
"""

from pyspark.mllib.stat._statistics import *
from pyspark.mllib.stat.distribution import MultivariateGaussian

__all__ = ["Statistics", "MultivariateStatisticalSummary"]
__all__ = ["Statistics", "MultivariateStatisticalSummary", "MultivariateGaussian"]
16 changes: 8 additions & 8 deletions python/pyspark/mllib/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,28 +168,28 @@ def test_kmeans_deterministic(self):
self.assertTrue(array_equal(c1, c2))

def test_gmm(self):
from pyspark.mllib.clustering import GaussianMixtureEM
from pyspark.mllib.clustering import GaussianMixture
data = self.sc.parallelize([
[1, 2],
[8, 9],
[-4, -3],
[-6, -7],
])
clusters = GaussianMixtureEM.train(data, 2, convergenceTol=0.001,
seed=56, maxIterations=100)
clusters = GaussianMixture.train(data, 2, convergenceTol=0.001,
maxIterations=100, seed=56)
labels = clusters.predict(data).collect()
self.assertEquals(labels[0], labels[1])
self.assertEquals(labels[2], labels[3])

def test_gmm_deterministic(self):
from pyspark.mllib.clustering import GaussianMixtureEM
from pyspark.mllib.clustering import GaussianMixture
X = range(0, 100, 10)
Y = range(0, 100, 10)
data = self.sc.parallelize([[x, y] for x, y in zip(X, Y)])
clusters1 = GaussianMixtureEM.train(data, 5, convergenceTol=0.001,
seed=63, maxIterations=100)
clusters2 = GaussianMixtureEM.train(data, 5, convergenceTol=0.001,
seed=63, maxIterations=100)
clusters1 = GaussianMixture.train(data, 5, convergenceTol=0.001,
maxIterations=100, seed=63)
clusters2 = GaussianMixture.train(data, 5, convergenceTol=0.001,
maxIterations=100, seed=63)
for c1, c2 in zip(clusters1.weights, clusters2.weights):
self.assertEquals(round(c1, 7), round(c2, 7))

Expand Down

0 comments on commit 3aee84b

Please sign in to comment.