Skip to content

Commit

Permalink
Merge pull request #1 from jkbradley/hhbyyh-ldaonline-update
Browse files Browse the repository at this point in the history
Minor cleanups
  • Loading branch information
hhbyyh committed Apr 29, 2015
2 parents a996a82 + 9e910d9 commit 138bfed
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 66 deletions.
37 changes: 19 additions & 18 deletions mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,15 @@ class LDA private (
* If set to -1, then docConcentration is set automatically.
* (default = -1 = automatic)
*
* Automatic setting of parameter:
* - For EM: default = (50 / k) + 1.
* - The 50/k is common in LDA libraries.
* - The +1 follows Asuncion et al. (2009), who recommend a +1 adjustment for EM.
* - For Online: default = (1.0 / k).
* - follows the implementation from: [[https://github.com/Blei-Lab/onlineldavb]].
*
* Note: For EM optimizer, This value should be > 1.0.
* Optimizer-specific parameter settings:
* - EM
* - Value should be > 1.0
* - default = (50 / k) + 1, where 50/k is common in LDA libraries and +1 follows
* Asuncion et al. (2009), who recommend a +1 adjustment for EM.
* - Online
* - Value should be >= 0
* - default = (1.0 / k), following the implementation from
* [[https://github.com/Blei-Lab/onlineldavb]].
*/
def setDocConcentration(docConcentration: Double): this.type = {
this.docConcentration = docConcentration
Expand All @@ -117,8 +118,7 @@ class LDA private (
* This is the parameter to a symmetric Dirichlet distribution.
*
* Note: The topics' distributions over terms are called "beta" in the original LDA paper
* by Blei et al., but are ca
* lled "phi" in many later papers such as Asuncion et al., 2009.
* by Blei et al., but are called "phi" in many later papers such as Asuncion et al., 2009.
*/
def getTopicConcentration: Double = this.topicConcentration

Expand All @@ -134,14 +134,15 @@ class LDA private (
* If set to -1, then topicConcentration is set automatically.
* (default = -1 = automatic)
*
* Automatic setting of parameter:
* - For EM: default = 0.1 + 1.
* - The 0.1 gives a small amount of smoothing.
* - The +1 follows Asuncion et al. (2009), who recommend a +1 adjustment for EM.
* - For Online: default = (1.0 / k).
* - follows the implementation from: [[https://github.com/Blei-Lab/onlineldavb]].
*
* Note: For EM optimizer, This value should be > 1.0.
* Optimizer-specific parameter settings:
* - EM
* - Value should be > 1.0
* - default = 0.1 + 1, where 0.1 gives a small amount of smoothing and +1 follows
* Asuncion et al. (2009), who recommend a +1 adjustment for EM.
* - Online
* - Value should be >= 0
* - default = (1.0 / k), following the implementation from
* [[https://github.com/Blei-Lab/onlineldavb]].
*/
def setTopicConcentration(topicConcentration: Double): this.type = {
this.topicConcentration = topicConcentration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class EMLDAOptimizer extends LDAOptimizer {
import LDA._

/**
* Following fields will only be initialized through initialize method
* The following fields will only be initialized through the initialize() method
*/
private[clustering] var graph: Graph[TopicCounts, TokenCount] = null
private[clustering] var k: Int = 0
Expand All @@ -94,7 +94,7 @@ class EMLDAOptimizer extends LDAOptimizer {
/**
* Compute bipartite term/doc graph.
*/
private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer={
override private[clustering] def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = {

val docConcentration = lda.getDocConcentration
val topicConcentration = lda.getTopicConcentration
Expand All @@ -121,7 +121,7 @@ class EMLDAOptimizer extends LDAOptimizer {

// Create vertices.
// Initially, we use random soft assignments of tokens to topics (random gamma).
def createVertices(): RDD[(VertexId, TopicCounts)] = {
val docTermVertices: RDD[(VertexId, TopicCounts)] = {
val verticesTMP: RDD[(VertexId, TopicCounts)] =
edges.mapPartitionsWithIndex { case (partIndex, partEdges) =>
val random = new Random(partIndex + randomSeed)
Expand All @@ -134,8 +134,6 @@ class EMLDAOptimizer extends LDAOptimizer {
verticesTMP.reduceByKey(_ + _)
}

val docTermVertices = createVertices()

// Partition such that edges are grouped by document
this.graph = Graph(docTermVertices, edges).partitionBy(PartitionStrategy.EdgePartition1D)
this.k = k
Expand All @@ -147,7 +145,7 @@ class EMLDAOptimizer extends LDAOptimizer {
this
}

private[clustering] override def next(): EMLDAOptimizer = {
override private[clustering] def next(): EMLDAOptimizer = {
require(graph != null, "graph is null, EMLDAOptimizer not initialized.")

val eta = topicConcentration
Expand Down Expand Up @@ -204,7 +202,7 @@ class EMLDAOptimizer extends LDAOptimizer {
graph.vertices.filter(isTermVertex).values.fold(BDV.zeros[Double](numTopics))(_ += _)
}

private[clustering] override def getLDAModel(iterationTimes: Array[Double]): LDAModel = {
override private[clustering] def getLDAModel(iterationTimes: Array[Double]): LDAModel = {
require(graph != null, "graph is null, EMLDAOptimizer not initialized.")
this.graphCheckpointer.deleteAllCheckpoints()
new DistributedLDAModel(this, iterationTimes)
Expand All @@ -216,10 +214,10 @@ class EMLDAOptimizer extends LDAOptimizer {
* :: Experimental ::
*
* An online optimizer for LDA. The Optimizer implements the Online variational Bayes LDA
* algorithm, which processes a subset of the corpus on each iteration, and update the term-topic
* algorithm, which processes a subset of the corpus on each iteration, and updates the term-topic
* distribution adaptively.
*
* References:
* Original Online LDA paper:
* Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010.
*/
@Experimental
Expand All @@ -236,31 +234,30 @@ class OnlineLDAOptimizer extends LDAOptimizer {
// Online LDA specific parameters
private var tau_0: Double = 1024
private var kappa: Double = 0.51
private var minibatchFraction: Double = 0.01
private var miniBatchFraction: Double = 0.01

// internal data structure
private var docs: RDD[(Long, Vector)] = null
private var lambda: BDM[Double] = null
private var Elogbeta: BDM[Double]= null
private var Elogbeta: BDM[Double] = null
private var expElogbeta: BDM[Double] = null

// count of invocation to next, which helps deciding the weight for each iteration
private var iteration = 0
private var iteration: Int = 0

/**
* A (positive) learning parameter that downweights early iterations. Larger values make early
* iterations count less
* iterations count less.
*/
def getTau_0: Double = this.tau_0

/**
* A (positive) learning parameter that downweights early iterations. Larger values make early
* iterations count less
* Automatic setting of parameter:
* - default = 1024, which follows the recommendation from OnlineLDA paper.
* iterations count less.
* Default: 1024, following the original Online LDA paper.
*/
def setTau_0(tau_0: Double): this.type = {
require(tau_0 > 0 || tau_0 == -1.0, s"LDA tau_0 must be positive, but was set to $tau_0")
require(tau_0 > 0, s"LDA tau_0 must be positive, but was set to $tau_0")
this.tau_0 = tau_0
this
}
Expand All @@ -273,31 +270,32 @@ class OnlineLDAOptimizer extends LDAOptimizer {
/**
* Learning rate: exponential decay rate---should be between
* (0.5, 1.0] to guarantee asymptotic convergence.
* - default = 0.51, which follows the recommendation from OnlineLDA paper.
* Default: 0.51, based on the original Online LDA paper.
*/
def setKappa(kappa: Double): this.type = {
require(kappa >= 0 || kappa == -1.0,
s"Online LDA kappa must be nonnegative (or -1 for auto), but was set to $kappa")
require(kappa >= 0, s"Online LDA kappa must be nonnegative, but was set to $kappa")
this.kappa = kappa
this
}

/**
* Mini-batch size, which controls how many documents are used in each iteration
* Mini-batch fraction, which sets the fraction of document sampled and used in each iteration
*/
def getMiniBatchFraction: Double = this.minibatchFraction
def getMiniBatchFraction: Double = this.miniBatchFraction

/**
* Mini-batch size, which controls how many documents are used in each iteration
* default = 1% from total documents.
* Mini-batch fraction in (0, 1], which sets the fraction of document sampled and used in
* each iteration.
* Default: 0.01, i.e., 1% of total documents
*/
def setMiniBatchFraction(miniBatchFraction: Double): this.type = {
this.minibatchFraction = miniBatchFraction
require(miniBatchFraction > 0.0 && miniBatchFraction <= 1.0,
s"Online LDA miniBatchFraction must be in range (0,1], but was set to $miniBatchFraction")
this.miniBatchFraction = miniBatchFraction
this
}

private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer={

override private[clustering] def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = {
this.k = lda.getK
this.corpusSize = docs.count()
this.vocabSize = docs.first()._2.size
Expand All @@ -320,24 +318,25 @@ class OnlineLDAOptimizer extends LDAOptimizer {
* model, and it will update the topic distribution adaptively for the terms appearing in the
* subset.
*/
private[clustering] override def next(): OnlineLDAOptimizer = {
override private[clustering] def next(): OnlineLDAOptimizer = {
iteration += 1
val batch = docs.sample(true, minibatchFraction, randomGenerator.nextLong())
if(batch.isEmpty()) return this
val batch = docs.sample(withReplacement = true, miniBatchFraction, randomGenerator.nextLong())
if (batch.isEmpty()) return this

val k = this.k
val vocabSize = this.vocabSize
val expElogbeta = this.expElogbeta
val alpha = this.alpha

val stats = batch.mapPartitions(docs =>{
val stats: RDD[BDM[Double]] = batch.mapPartitions { docs =>
val stat = BDM.zeros[Double](k, vocabSize)
docs.foreach(doc =>{
docs.foreach { doc =>
val termCounts = doc._2
val (ids, cts) = termCounts match {
case v: DenseVector => (((0 until v.size).toList), v.values)
val (ids: List[Int], cts: Array[Double]) = termCounts match {
case v: DenseVector => ((0 until v.size).toList, v.values)
case v: SparseVector => (v.indices.toList, v.values)
case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass)
case v => throw new IllegalArgumentException("Online LDA does not support vector type "
+ v.getClass)
}

// Initialize the variational distribution q(theta|gamma) for the mini-batch
Expand All @@ -354,7 +353,7 @@ class OnlineLDAOptimizer extends LDAOptimizer {
while (meanchange > 1e-5) {
val lastgamma = gammad
// 1*K 1 * ids ids * k
gammad = (expElogthetad :* ((ctsVector / phinorm) * (expElogbetad.t))) + alpha
gammad = (expElogthetad :* ((ctsVector / phinorm) * expElogbetad.t)) + alpha
Elogthetad = digamma(gammad) - digamma(sum(gammad))
expElogthetad = exp(Elogthetad)
phinorm = expElogthetad * expElogbetad + 1e-100
Expand All @@ -364,28 +363,28 @@ class OnlineLDAOptimizer extends LDAOptimizer {
val m1 = expElogthetad.t.toDenseMatrix.t
val m2 = (ctsVector / phinorm).t.toDenseMatrix
val outerResult = kron(m1, m2) // K * ids
for (i <- 0 until ids.size) {
var i = 0
while (i < ids.size) {
stat(::, ids(i)) := (stat(::, ids(i)) + outerResult(::, i))
i += 1
}
stat
})
}
Iterator(stat)
})
}

val batchResult = stats.reduce(_ += _)
update(batchResult, iteration, (minibatchFraction * corpusSize).toInt)
val batchResult: BDM[Double] = stats.reduce(_ += _)
update(batchResult, iteration, (miniBatchFraction * corpusSize).toInt)
this
}

private[clustering] override def getLDAModel(iterationTimes: Array[Double]): LDAModel = {
override private[clustering] def getLDAModel(iterationTimes: Array[Double]): LDAModel = {
new LocalLDAModel(Matrices.fromBreeze(lambda).transpose)
}

/**
* Update lambda based on the batch submitted. batchSize can be different for each iteration.
*/
private def update(raw: BDM[Double], iter:Int, batchSize: Int): Unit ={

private def update(raw: BDM[Double], iter: Int, batchSize: Int): Unit = {
val tau_0 = this.getTau_0
val kappa = this.getKappa

Expand All @@ -405,17 +404,17 @@ class OnlineLDAOptimizer extends LDAOptimizer {
/**
* Get a random matrix to initialize lambda
*/
private def getGammaMatrix(row:Int, col:Int): BDM[Double] ={
private def getGammaMatrix(row: Int, col: Int): BDM[Double] = {
val gammaRandomGenerator = new Gamma(100, 1.0 / 100.0)
val temp = gammaRandomGenerator.sample(row * col).toArray
(new BDM[Double](col, row, temp)).t
new BDM[Double](col, row, temp).t
}

/**
* For theta ~ Dir(alpha), computes E[log(theta)] given alpha. Currently the implementation
* uses digamma which is accurate but expensive.
*/
private def dirichletExpectation(alpha : BDM[Double]): BDM[Double] = {
private def dirichletExpectation(alpha: BDM[Double]): BDM[Double] = {
val rowSum = sum(alpha(breeze.linalg.*, ::))
val digAlpha = digamma(alpha)
val digRowSum = digamma(rowSum)
Expand Down

0 comments on commit 138bfed

Please sign in to comment.