Skip to content

Commit

Permalink
add ut
Browse files Browse the repository at this point in the history
  • Loading branch information
hhbyyh committed Apr 29, 2015
1 parent 138bfed commit 4041723
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,8 @@ class OnlineLDAOptimizer extends LDAOptimizer {
private var k: Int = 0
private var corpusSize: Long = 0
private var vocabSize: Int = 0
private var alpha: Double = 0
private var eta: Double = 0
private[clustering] var alpha: Double = 0
private[clustering] var eta: Double = 0
private var randomGenerator: java.util.Random = null

// Online LDA specific parameters
Expand All @@ -238,12 +238,11 @@ class OnlineLDAOptimizer extends LDAOptimizer {

// internal data structure
private var docs: RDD[(Long, Vector)] = null
private var lambda: BDM[Double] = null
private var Elogbeta: BDM[Double] = null
private var expElogbeta: BDM[Double] = null
private[clustering] var lambda: BDM[Double] = null

// count of invocation to next, which helps deciding the weight for each iteration
private var iteration: Int = 0
private var gammaShape: Double = 100

/**
* A (positive) learning parameter that downweights early iterations. Larger values make early
Expand Down Expand Up @@ -295,7 +294,24 @@ class OnlineLDAOptimizer extends LDAOptimizer {
this
}

override private[clustering] def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = {
/**
* The function is for test only now. In the future, it can help support training strop/resume
*/
private[clustering] def setLambda(lambda: BDM[Double]): this.type = {
this.lambda = lambda
this
}

/**
* Used to control the gamma distribution. Larger value produces values closer to 1.0.
*/
private[clustering] def setGammaShape(shape: Double): this.type = {
this.gammaShape = shape
this
}

override private[clustering] def initialize(docs: RDD[(Long, Vector)], lda: LDA):
OnlineLDAOptimizer = {
this.k = lda.getK
this.corpusSize = docs.count()
this.vocabSize = docs.first()._2.size
Expand All @@ -307,26 +323,30 @@ class OnlineLDAOptimizer extends LDAOptimizer {

// Initialize the variational distribution q(beta|lambda)
this.lambda = getGammaMatrix(k, vocabSize)
this.Elogbeta = dirichletExpectation(lambda)
this.expElogbeta = exp(Elogbeta)
this.iteration = 0
this
}

override private[clustering] def next(): OnlineLDAOptimizer = {
val batch = docs.sample(withReplacement = true, miniBatchFraction, randomGenerator.nextLong())
if (batch.isEmpty()) return this
submitMiniBatch(batch)
}


/**
* Submit a subset (like 1%, decide by the miniBatchFraction) of the corpus to the Online LDA
* model, and it will update the topic distribution adaptively for the terms appearing in the
* subset.
*/
override private[clustering] def next(): OnlineLDAOptimizer = {
private[clustering] def submitMiniBatch(batch: RDD[(Long, Vector)]): OnlineLDAOptimizer = {
iteration += 1
val batch = docs.sample(withReplacement = true, miniBatchFraction, randomGenerator.nextLong())
if (batch.isEmpty()) return this

val k = this.k
val vocabSize = this.vocabSize
val expElogbeta = this.expElogbeta
val Elogbeta = dirichletExpectation(lambda)
val expElogbeta = exp(Elogbeta)
val alpha = this.alpha
val gammaShape = this.gammaShape

val stats: RDD[BDM[Double]] = batch.mapPartitions { docs =>
val stat = BDM.zeros[Double](k, vocabSize)
Expand All @@ -340,7 +360,7 @@ class OnlineLDAOptimizer extends LDAOptimizer {
}

// Initialize the variational distribution q(theta|gamma) for the mini-batch
var gammad = new Gamma(100, 1.0 / 100.0).samplesVector(k).t // 1 * K
var gammad = new Gamma(gammaShape, 1.0 / gammaShape).samplesVector(k).t // 1 * K
var Elogthetad = digamma(gammad) - digamma(sum(gammad)) // 1 * K
var expElogthetad = exp(Elogthetad) // 1 * K
val expElogbetad = expElogbeta(::, ids).toDenseMatrix // K * ids
Expand All @@ -350,7 +370,7 @@ class OnlineLDAOptimizer extends LDAOptimizer {
val ctsVector = new BDV[Double](cts).t // 1 * ids

// Iterate between gamma and phi until convergence
while (meanchange > 1e-5) {
while (meanchange > 1e-3) {
val lastgamma = gammad
// 1*K 1 * ids ids * k
gammad = (expElogthetad :* ((ctsVector / phinorm) * expElogbetad.t)) + alpha
Expand All @@ -372,7 +392,10 @@ class OnlineLDAOptimizer extends LDAOptimizer {
Iterator(stat)
}

val batchResult: BDM[Double] = stats.reduce(_ += _)
val statsSum: BDM[Double] = stats.reduce(_ += _)
val batchResult = statsSum :* expElogbeta

// Note that this is an optimization to avoid batch.count
update(batchResult, iteration, (miniBatchFraction * corpusSize).toInt)
this
}
Expand All @@ -384,28 +407,23 @@ class OnlineLDAOptimizer extends LDAOptimizer {
/**
* Update lambda based on the batch submitted. batchSize can be different for each iteration.
*/
private def update(raw: BDM[Double], iter: Int, batchSize: Int): Unit = {
private[clustering] def update(stat: BDM[Double], iter: Int, batchSize: Int): Unit = {
val tau_0 = this.getTau_0
val kappa = this.getKappa

// weight of the mini-batch.
val weight = math.pow(tau_0 + iter, -kappa)

// This step finishes computing the sufficient statistics for the M step
val stat = raw :* expElogbeta

// Update lambda based on documents.
lambda = lambda * (1 - weight) +
(stat * (corpusSize.toDouble / batchSize.toDouble) + eta) * weight
Elogbeta = dirichletExpectation(lambda)
expElogbeta = exp(Elogbeta)
}

/**
* Get a random matrix to initialize lambda
*/
private def getGammaMatrix(row: Int, col: Int): BDM[Double] = {
val gammaRandomGenerator = new Gamma(100, 1.0 / 100.0)
val gammaRandomGenerator = new Gamma(gammaShape, 1.0 / gammaShape)
val temp = gammaRandomGenerator.sample(row * col).toArray
new BDM[Double](col, row, temp).t
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.mllib.clustering

import breeze.linalg.{DenseMatrix => BDM}

import org.scalatest.FunSuite

import org.apache.spark.mllib.linalg.{Vector, DenseMatrix, Matrix, Vectors}
Expand Down Expand Up @@ -54,7 +56,7 @@ class LDASuite extends FunSuite with MLlibTestSparkContext {
}
}

test("running and DistributedLDAModel") {
test("running and DistributedLDAModel with default Optimizer (EM)") {
val k = 3
val topicSmoothing = 1.2
val termSmoothing = 1.2
Expand Down Expand Up @@ -131,6 +133,87 @@ class LDASuite extends FunSuite with MLlibTestSparkContext {
assert(lda.getBeta === 3.0)
assert(lda.getTopicConcentration === 3.0)
}

test("OnlineLDAOptimizer initialization") {
val lda = new LDA().setK(2)
val corpus = sc.parallelize(tinyCorpus, 2)
val op = new OnlineLDAOptimizer().initialize(corpus, lda)
op.setKappa(0.9876).setMiniBatchFraction(0.123).setTau_0(567)
assert(op.alpha == 0.5) // default 1.0 / k
assert(op.eta == 0.5) // default 1.0 / k
assert(op.getKappa == 0.9876)
assert(op.getMiniBatchFraction == 0.123)
assert(op.getTau_0 == 567)
}

test("OnlineLDAOptimizer one iteration") {
// run OnlineLDAOptimizer for 1 iteration to verify it's consistency with Blei-lab,
// [[https://github.com/Blei-Lab/onlineldavb]]
val k = 2
val vocabSize = 6

def docs: Array[(Long, Vector)] = Array(
Vectors.sparse(vocabSize, Array(0, 1, 2), Array(1, 1, 1)), // apple, orange, banana
Vectors.sparse(vocabSize, Array(3, 4, 5), Array(1, 1, 1))) // tiger, cat, dog
.zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) }
val corpus = sc.parallelize(docs, 2)

// setGammaShape large so to avoid the stochastic impact.
val op = new OnlineLDAOptimizer().setTau_0(1024).setKappa(0.51).setGammaShape(1e40)
.setMiniBatchFraction(1)
val lda = new LDA().setK(k).setMaxIterations(1).setOptimizer(op)

val state = op.initialize(corpus, lda)
// override lambda to simulate an intermediate state
// [[ 1.1 1.2 1.3 0.9 0.8 0.7]
// [ 0.9 0.8 0.7 1.1 1.2 1.3]]
op.setLambda(new BDM[Double](k, vocabSize,
Array(1.1, 0.9, 1.2, 0.8, 1.3, 0.7, 0.9, 1.1, 0.8, 1.2, 0.7, 1.3)))

// run for one iteration
state.submitMiniBatch(corpus)

// verify the result, Note this generate the identical result as
// [[https://github.com/Blei-Lab/onlineldavb]]
val topic1 = op.lambda(0, ::).inner.toArray.map("%.4f".format(_)).mkString(", ")
val topic2 = op.lambda(1, ::).inner.toArray.map("%.4f".format(_)).mkString(", ")
assert("1.1101, 1.2076, 1.3050, 0.8899, 0.7924, 0.6950" == topic1)
assert("0.8899, 0.7924, 0.6950, 1.1101, 1.2076, 1.3050" == topic2)
}

test("OnlineLDAOptimizer with toy data") {
def toydata: Array[(Long, Vector)] = Array(
Vectors.sparse(6, Array(0, 1), Array(1, 1)),
Vectors.sparse(6, Array(1, 2), Array(1, 1)),
Vectors.sparse(6, Array(0, 2), Array(1, 1)),

Vectors.sparse(6, Array(3, 4), Array(1, 1)),
Vectors.sparse(6, Array(3, 5), Array(1, 1)),
Vectors.sparse(6, Array(4, 5), Array(1, 1))
).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) }

val docs = sc.parallelize(toydata)
val op = new OnlineLDAOptimizer().setMiniBatchFraction(1).setTau_0(1024).setKappa(0.51)
.setGammaShape(1e10)
val lda = new LDA().setK(2)
.setDocConcentration(0.01)
.setTopicConcentration(0.01)
.setMaxIterations(100)
.setOptimizer(op)

val ldaModel = lda.run(docs)
val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 10)
val topics = topicIndices.map { case (terms, termWeights) =>
terms.zip(termWeights)
}

// check distribution for each topic, typical distribution is (0.3, 0.3, 0.3, 0.02, 0.02, 0.02)
topics.foreach(topic =>{
val smalls = topic.filter(t => (t._2 < 0.1)).map(_._2)
assert(smalls.size == 3 && smalls.sum < 0.2)
})
}

}

private[clustering] object LDASuite {
Expand Down

0 comments on commit 4041723

Please sign in to comment.