diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index b6182be02fb56..d0a299cb894b2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -227,8 +227,8 @@ class OnlineLDAOptimizer extends LDAOptimizer { private var k: Int = 0 private var corpusSize: Long = 0 private var vocabSize: Int = 0 - private var alpha: Double = 0 - private var eta: Double = 0 + private[clustering] var alpha: Double = 0 + private[clustering] var eta: Double = 0 private var randomGenerator: java.util.Random = null // Online LDA specific parameters @@ -238,12 +238,11 @@ class OnlineLDAOptimizer extends LDAOptimizer { // internal data structure private var docs: RDD[(Long, Vector)] = null - private var lambda: BDM[Double] = null - private var Elogbeta: BDM[Double] = null - private var expElogbeta: BDM[Double] = null + private[clustering] var lambda: BDM[Double] = null // count of invocation to next, which helps deciding the weight for each iteration private var iteration: Int = 0 + private var gammaShape: Double = 100 /** * A (positive) learning parameter that downweights early iterations. Larger values make early @@ -295,7 +294,24 @@ class OnlineLDAOptimizer extends LDAOptimizer { this } - override private[clustering] def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = { + /** + * The function is for test only now. In the future, it can help support training strop/resume + */ + private[clustering] def setLambda(lambda: BDM[Double]): this.type = { + this.lambda = lambda + this + } + + /** + * Used to control the gamma distribution. Larger value produces values closer to 1.0. + */ + private[clustering] def setGammaShape(shape: Double): this.type = { + this.gammaShape = shape + this + } + + override private[clustering] def initialize(docs: RDD[(Long, Vector)], lda: LDA): + OnlineLDAOptimizer = { this.k = lda.getK this.corpusSize = docs.count() this.vocabSize = docs.first()._2.size @@ -307,26 +323,30 @@ class OnlineLDAOptimizer extends LDAOptimizer { // Initialize the variational distribution q(beta|lambda) this.lambda = getGammaMatrix(k, vocabSize) - this.Elogbeta = dirichletExpectation(lambda) - this.expElogbeta = exp(Elogbeta) this.iteration = 0 this } + override private[clustering] def next(): OnlineLDAOptimizer = { + val batch = docs.sample(withReplacement = true, miniBatchFraction, randomGenerator.nextLong()) + if (batch.isEmpty()) return this + submitMiniBatch(batch) + } + + /** * Submit a subset (like 1%, decide by the miniBatchFraction) of the corpus to the Online LDA * model, and it will update the topic distribution adaptively for the terms appearing in the * subset. */ - override private[clustering] def next(): OnlineLDAOptimizer = { + private[clustering] def submitMiniBatch(batch: RDD[(Long, Vector)]): OnlineLDAOptimizer = { iteration += 1 - val batch = docs.sample(withReplacement = true, miniBatchFraction, randomGenerator.nextLong()) - if (batch.isEmpty()) return this - val k = this.k val vocabSize = this.vocabSize - val expElogbeta = this.expElogbeta + val Elogbeta = dirichletExpectation(lambda) + val expElogbeta = exp(Elogbeta) val alpha = this.alpha + val gammaShape = this.gammaShape val stats: RDD[BDM[Double]] = batch.mapPartitions { docs => val stat = BDM.zeros[Double](k, vocabSize) @@ -340,7 +360,7 @@ class OnlineLDAOptimizer extends LDAOptimizer { } // Initialize the variational distribution q(theta|gamma) for the mini-batch - var gammad = new Gamma(100, 1.0 / 100.0).samplesVector(k).t // 1 * K + var gammad = new Gamma(gammaShape, 1.0 / gammaShape).samplesVector(k).t // 1 * K var Elogthetad = digamma(gammad) - digamma(sum(gammad)) // 1 * K var expElogthetad = exp(Elogthetad) // 1 * K val expElogbetad = expElogbeta(::, ids).toDenseMatrix // K * ids @@ -350,7 +370,7 @@ class OnlineLDAOptimizer extends LDAOptimizer { val ctsVector = new BDV[Double](cts).t // 1 * ids // Iterate between gamma and phi until convergence - while (meanchange > 1e-5) { + while (meanchange > 1e-3) { val lastgamma = gammad // 1*K 1 * ids ids * k gammad = (expElogthetad :* ((ctsVector / phinorm) * expElogbetad.t)) + alpha @@ -372,7 +392,10 @@ class OnlineLDAOptimizer extends LDAOptimizer { Iterator(stat) } - val batchResult: BDM[Double] = stats.reduce(_ += _) + val statsSum: BDM[Double] = stats.reduce(_ += _) + val batchResult = statsSum :* expElogbeta + + // Note that this is an optimization to avoid batch.count update(batchResult, iteration, (miniBatchFraction * corpusSize).toInt) this } @@ -384,28 +407,23 @@ class OnlineLDAOptimizer extends LDAOptimizer { /** * Update lambda based on the batch submitted. batchSize can be different for each iteration. */ - private def update(raw: BDM[Double], iter: Int, batchSize: Int): Unit = { + private[clustering] def update(stat: BDM[Double], iter: Int, batchSize: Int): Unit = { val tau_0 = this.getTau_0 val kappa = this.getKappa // weight of the mini-batch. val weight = math.pow(tau_0 + iter, -kappa) - // This step finishes computing the sufficient statistics for the M step - val stat = raw :* expElogbeta - // Update lambda based on documents. lambda = lambda * (1 - weight) + (stat * (corpusSize.toDouble / batchSize.toDouble) + eta) * weight - Elogbeta = dirichletExpectation(lambda) - expElogbeta = exp(Elogbeta) } /** * Get a random matrix to initialize lambda */ private def getGammaMatrix(row: Int, col: Int): BDM[Double] = { - val gammaRandomGenerator = new Gamma(100, 1.0 / 100.0) + val gammaRandomGenerator = new Gamma(gammaShape, 1.0 / gammaShape) val temp = gammaRandomGenerator.sample(row * col).toArray new BDM[Double](col, row, temp).t } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala index 41ec794146c69..6b2293ba49f51 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.mllib.clustering +import breeze.linalg.{DenseMatrix => BDM} + import org.scalatest.FunSuite import org.apache.spark.mllib.linalg.{Vector, DenseMatrix, Matrix, Vectors} @@ -54,7 +56,7 @@ class LDASuite extends FunSuite with MLlibTestSparkContext { } } - test("running and DistributedLDAModel") { + test("running and DistributedLDAModel with default Optimizer (EM)") { val k = 3 val topicSmoothing = 1.2 val termSmoothing = 1.2 @@ -131,6 +133,87 @@ class LDASuite extends FunSuite with MLlibTestSparkContext { assert(lda.getBeta === 3.0) assert(lda.getTopicConcentration === 3.0) } + + test("OnlineLDAOptimizer initialization") { + val lda = new LDA().setK(2) + val corpus = sc.parallelize(tinyCorpus, 2) + val op = new OnlineLDAOptimizer().initialize(corpus, lda) + op.setKappa(0.9876).setMiniBatchFraction(0.123).setTau_0(567) + assert(op.alpha == 0.5) // default 1.0 / k + assert(op.eta == 0.5) // default 1.0 / k + assert(op.getKappa == 0.9876) + assert(op.getMiniBatchFraction == 0.123) + assert(op.getTau_0 == 567) + } + + test("OnlineLDAOptimizer one iteration") { + // run OnlineLDAOptimizer for 1 iteration to verify it's consistency with Blei-lab, + // [[https://github.com/Blei-Lab/onlineldavb]] + val k = 2 + val vocabSize = 6 + + def docs: Array[(Long, Vector)] = Array( + Vectors.sparse(vocabSize, Array(0, 1, 2), Array(1, 1, 1)), // apple, orange, banana + Vectors.sparse(vocabSize, Array(3, 4, 5), Array(1, 1, 1))) // tiger, cat, dog + .zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) } + val corpus = sc.parallelize(docs, 2) + + // setGammaShape large so to avoid the stochastic impact. + val op = new OnlineLDAOptimizer().setTau_0(1024).setKappa(0.51).setGammaShape(1e40) + .setMiniBatchFraction(1) + val lda = new LDA().setK(k).setMaxIterations(1).setOptimizer(op) + + val state = op.initialize(corpus, lda) + // override lambda to simulate an intermediate state + // [[ 1.1 1.2 1.3 0.9 0.8 0.7] + // [ 0.9 0.8 0.7 1.1 1.2 1.3]] + op.setLambda(new BDM[Double](k, vocabSize, + Array(1.1, 0.9, 1.2, 0.8, 1.3, 0.7, 0.9, 1.1, 0.8, 1.2, 0.7, 1.3))) + + // run for one iteration + state.submitMiniBatch(corpus) + + // verify the result, Note this generate the identical result as + // [[https://github.com/Blei-Lab/onlineldavb]] + val topic1 = op.lambda(0, ::).inner.toArray.map("%.4f".format(_)).mkString(", ") + val topic2 = op.lambda(1, ::).inner.toArray.map("%.4f".format(_)).mkString(", ") + assert("1.1101, 1.2076, 1.3050, 0.8899, 0.7924, 0.6950" == topic1) + assert("0.8899, 0.7924, 0.6950, 1.1101, 1.2076, 1.3050" == topic2) + } + + test("OnlineLDAOptimizer with toy data") { + def toydata: Array[(Long, Vector)] = Array( + Vectors.sparse(6, Array(0, 1), Array(1, 1)), + Vectors.sparse(6, Array(1, 2), Array(1, 1)), + Vectors.sparse(6, Array(0, 2), Array(1, 1)), + + Vectors.sparse(6, Array(3, 4), Array(1, 1)), + Vectors.sparse(6, Array(3, 5), Array(1, 1)), + Vectors.sparse(6, Array(4, 5), Array(1, 1)) + ).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) } + + val docs = sc.parallelize(toydata) + val op = new OnlineLDAOptimizer().setMiniBatchFraction(1).setTau_0(1024).setKappa(0.51) + .setGammaShape(1e10) + val lda = new LDA().setK(2) + .setDocConcentration(0.01) + .setTopicConcentration(0.01) + .setMaxIterations(100) + .setOptimizer(op) + + val ldaModel = lda.run(docs) + val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 10) + val topics = topicIndices.map { case (terms, termWeights) => + terms.zip(termWeights) + } + + // check distribution for each topic, typical distribution is (0.3, 0.3, 0.3, 0.02, 0.02, 0.02) + topics.foreach(topic =>{ + val smalls = topic.filter(t => (t._2 < 0.1)).map(_._2) + assert(smalls.size == 3 && smalls.sum < 0.2) + }) + } + } private[clustering] object LDASuite {