Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
d640d9c
online lda initial checkin
hhbyyh Feb 6, 2015
043e786
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Feb 6, 2015
26dca1b
style fix and make class private
hhbyyh Feb 6, 2015
f41c5ca
style fix
hhbyyh Feb 6, 2015
45884ab
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Feb 8, 2015
fa408a8
ssMerge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Feb 9, 2015
0d0f3ee
replace random split with sliding
hhbyyh Feb 10, 2015
0dd3947
kMerge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Feb 10, 2015
3a06526
merge with new example
hhbyyh Feb 10, 2015
aa365d1
merge upstream master
hhbyyh Mar 2, 2015
20328d1
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Mar 2, 2015
37af91a
iMerge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Mar 2, 2015
581c623
seperate API and adjust batch split
hhbyyh Mar 2, 2015
e271eb1
remove non ascii
hhbyyh Mar 2, 2015
4a3f27e
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Mar 5, 2015
a570c9a
use sample to pick up batch
hhbyyh Mar 11, 2015
d86cdec
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Mar 11, 2015
f6d47ca
Merge branch 'ldaonline' of https://github.com/hhbyyh/spark into ldao…
hhbyyh Mar 11, 2015
02d0373
fix style in comment
hhbyyh Mar 12, 2015
62405cc
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Mar 20, 2015
8cb16a6
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Mar 23, 2015
f367cc9
change to optimization
hhbyyh Mar 23, 2015
e7bf3b0
move to seperate file
hhbyyh Mar 27, 2015
97b9e1a
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Mar 27, 2015
d19ef55
change OnlineLDA to class
hhbyyh Apr 2, 2015
b29193b
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Apr 16, 2015
15be071
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Apr 17, 2015
dbe3cff
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Apr 28, 2015
b1178cf
fit into the optimizer framework
hhbyyh Apr 28, 2015
a996a82
respond to comments
hhbyyh Apr 29, 2015
61d60df
Minor cleanups:
jkbradley Apr 29, 2015
9e910d9
small fix
jkbradley Apr 29, 2015
138bfed
Merge pull request #1 from jkbradley/hhbyyh-ldaonline-update
hhbyyh Apr 29, 2015
4041723
add ut
hhbyyh Apr 29, 2015
68c2318
add a java ut
hhbyyh Apr 30, 2015
54cf8da
some style change
hhbyyh May 1, 2015
cf0007d
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh May 1, 2015
6149ca6
fix for setOptimizer
hhbyyh May 1, 2015
cf376ff
For private vars needed for testing, I made them private and added ac…
jkbradley May 2, 2015
1045eec
Merge pull request #2 from jkbradley/hhbyyh-ldaonline2
hhbyyh May 3, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add ut
  • Loading branch information
hhbyyh committed Apr 29, 2015
commit 4041723d4fe0dcfab845c9d2a2c72be2ed87895e
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,8 @@ class OnlineLDAOptimizer extends LDAOptimizer {
private var k: Int = 0
private var corpusSize: Long = 0
private var vocabSize: Int = 0
private var alpha: Double = 0
private var eta: Double = 0
private[clustering] var alpha: Double = 0
private[clustering] var eta: Double = 0
private var randomGenerator: java.util.Random = null

// Online LDA specific parameters
Expand All @@ -238,12 +238,11 @@ class OnlineLDAOptimizer extends LDAOptimizer {

// internal data structure
private var docs: RDD[(Long, Vector)] = null
private var lambda: BDM[Double] = null
private var Elogbeta: BDM[Double] = null
private var expElogbeta: BDM[Double] = null
private[clustering] var lambda: BDM[Double] = null

// count of invocation to next, which helps deciding the weight for each iteration
private var iteration: Int = 0
private var gammaShape: Double = 100

/**
* A (positive) learning parameter that downweights early iterations. Larger values make early
Expand Down Expand Up @@ -295,7 +294,24 @@ class OnlineLDAOptimizer extends LDAOptimizer {
this
}

override private[clustering] def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = {
/**
* The function is for test only now. In the future, it can help support training strop/resume
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: "strop" -> "stop"

*/
private[clustering] def setLambda(lambda: BDM[Double]): this.type = {
this.lambda = lambda
this
}

/**
* Used to control the gamma distribution. Larger value produces values closer to 1.0.
*/
private[clustering] def setGammaShape(shape: Double): this.type = {
this.gammaShape = shape
this
}

override private[clustering] def initialize(docs: RDD[(Long, Vector)], lda: LDA):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scala style: If this can't fit on 1 line (100 chars), then put 1 argument per line:

override private[clustering] def initialize(
    docs: RDD[(Long, Vector)],
    lda: LDA): OnlineLDAOptimizer = {

OnlineLDAOptimizer = {
this.k = lda.getK
this.corpusSize = docs.count()
this.vocabSize = docs.first()._2.size
Expand All @@ -307,26 +323,30 @@ class OnlineLDAOptimizer extends LDAOptimizer {

// Initialize the variational distribution q(beta|lambda)
this.lambda = getGammaMatrix(k, vocabSize)
this.Elogbeta = dirichletExpectation(lambda)
this.expElogbeta = exp(Elogbeta)
this.iteration = 0
this
}

override private[clustering] def next(): OnlineLDAOptimizer = {
val batch = docs.sample(withReplacement = true, miniBatchFraction, randomGenerator.nextLong())
if (batch.isEmpty()) return this
submitMiniBatch(batch)
}


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scala style: remove extra newline

/**
* Submit a subset (like 1%, decide by the miniBatchFraction) of the corpus to the Online LDA
* model, and it will update the topic distribution adaptively for the terms appearing in the
* subset.
*/
override private[clustering] def next(): OnlineLDAOptimizer = {
private[clustering] def submitMiniBatch(batch: RDD[(Long, Vector)]): OnlineLDAOptimizer = {
iteration += 1
val batch = docs.sample(withReplacement = true, miniBatchFraction, randomGenerator.nextLong())
if (batch.isEmpty()) return this

val k = this.k
val vocabSize = this.vocabSize
val expElogbeta = this.expElogbeta
val Elogbeta = dirichletExpectation(lambda)
val expElogbeta = exp(Elogbeta)
val alpha = this.alpha
val gammaShape = this.gammaShape

val stats: RDD[BDM[Double]] = batch.mapPartitions { docs =>
val stat = BDM.zeros[Double](k, vocabSize)
Expand All @@ -340,7 +360,7 @@ class OnlineLDAOptimizer extends LDAOptimizer {
}

// Initialize the variational distribution q(theta|gamma) for the mini-batch
var gammad = new Gamma(100, 1.0 / 100.0).samplesVector(k).t // 1 * K
var gammad = new Gamma(gammaShape, 1.0 / gammaShape).samplesVector(k).t // 1 * K
var Elogthetad = digamma(gammad) - digamma(sum(gammad)) // 1 * K
var expElogthetad = exp(Elogthetad) // 1 * K
val expElogbetad = expElogbeta(::, ids).toDenseMatrix // K * ids
Expand All @@ -350,7 +370,7 @@ class OnlineLDAOptimizer extends LDAOptimizer {
val ctsVector = new BDV[Double](cts).t // 1 * ids

// Iterate between gamma and phi until convergence
while (meanchange > 1e-5) {
while (meanchange > 1e-3) {
val lastgamma = gammad
// 1*K 1 * ids ids * k
gammad = (expElogthetad :* ((ctsVector / phinorm) * expElogbetad.t)) + alpha
Expand All @@ -372,7 +392,10 @@ class OnlineLDAOptimizer extends LDAOptimizer {
Iterator(stat)
}

val batchResult: BDM[Double] = stats.reduce(_ += _)
val statsSum: BDM[Double] = stats.reduce(_ += _)
val batchResult = statsSum :* expElogbeta

// Note that this is an optimization to avoid batch.count
update(batchResult, iteration, (miniBatchFraction * corpusSize).toInt)
this
}
Expand All @@ -384,28 +407,23 @@ class OnlineLDAOptimizer extends LDAOptimizer {
/**
* Update lambda based on the batch submitted. batchSize can be different for each iteration.
*/
private def update(raw: BDM[Double], iter: Int, batchSize: Int): Unit = {
private[clustering] def update(stat: BDM[Double], iter: Int, batchSize: Int): Unit = {
val tau_0 = this.getTau_0
val kappa = this.getKappa

// weight of the mini-batch.
val weight = math.pow(tau_0 + iter, -kappa)

// This step finishes computing the sufficient statistics for the M step
val stat = raw :* expElogbeta

// Update lambda based on documents.
lambda = lambda * (1 - weight) +
(stat * (corpusSize.toDouble / batchSize.toDouble) + eta) * weight
Elogbeta = dirichletExpectation(lambda)
expElogbeta = exp(Elogbeta)
}

/**
* Get a random matrix to initialize lambda
*/
private def getGammaMatrix(row: Int, col: Int): BDM[Double] = {
val gammaRandomGenerator = new Gamma(100, 1.0 / 100.0)
val gammaRandomGenerator = new Gamma(gammaShape, 1.0 / gammaShape)
val temp = gammaRandomGenerator.sample(row * col).toArray
new BDM[Double](col, row, temp).t
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.mllib.clustering

import breeze.linalg.{DenseMatrix => BDM}

import org.scalatest.FunSuite

import org.apache.spark.mllib.linalg.{Vector, DenseMatrix, Matrix, Vectors}
Expand Down Expand Up @@ -54,7 +56,7 @@ class LDASuite extends FunSuite with MLlibTestSparkContext {
}
}

test("running and DistributedLDAModel") {
test("running and DistributedLDAModel with default Optimizer (EM)") {
val k = 3
val topicSmoothing = 1.2
val termSmoothing = 1.2
Expand Down Expand Up @@ -131,6 +133,87 @@ class LDASuite extends FunSuite with MLlibTestSparkContext {
assert(lda.getBeta === 3.0)
assert(lda.getTopicConcentration === 3.0)
}

test("OnlineLDAOptimizer initialization") {
val lda = new LDA().setK(2)
val corpus = sc.parallelize(tinyCorpus, 2)
val op = new OnlineLDAOptimizer().initialize(corpus, lda)
op.setKappa(0.9876).setMiniBatchFraction(0.123).setTau_0(567)
assert(op.alpha == 0.5) // default 1.0 / k
assert(op.eta == 0.5) // default 1.0 / k
assert(op.getKappa == 0.9876)
assert(op.getMiniBatchFraction == 0.123)
assert(op.getTau_0 == 567)
}

test("OnlineLDAOptimizer one iteration") {
// run OnlineLDAOptimizer for 1 iteration to verify it's consistency with Blei-lab,
// [[https://github.com/Blei-Lab/onlineldavb]]
val k = 2
val vocabSize = 6

def docs: Array[(Long, Vector)] = Array(
Vectors.sparse(vocabSize, Array(0, 1, 2), Array(1, 1, 1)), // apple, orange, banana
Vectors.sparse(vocabSize, Array(3, 4, 5), Array(1, 1, 1))) // tiger, cat, dog
.zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) }
val corpus = sc.parallelize(docs, 2)

// setGammaShape large so to avoid the stochastic impact.
val op = new OnlineLDAOptimizer().setTau_0(1024).setKappa(0.51).setGammaShape(1e40)
.setMiniBatchFraction(1)
val lda = new LDA().setK(k).setMaxIterations(1).setOptimizer(op)

val state = op.initialize(corpus, lda)
// override lambda to simulate an intermediate state
// [[ 1.1 1.2 1.3 0.9 0.8 0.7]
// [ 0.9 0.8 0.7 1.1 1.2 1.3]]
op.setLambda(new BDM[Double](k, vocabSize,
Array(1.1, 0.9, 1.2, 0.8, 1.3, 0.7, 0.9, 1.1, 0.8, 1.2, 0.7, 1.3)))

// run for one iteration
state.submitMiniBatch(corpus)

// verify the result, Note this generate the identical result as
// [[https://github.com/Blei-Lab/onlineldavb]]
val topic1 = op.lambda(0, ::).inner.toArray.map("%.4f".format(_)).mkString(", ")
val topic2 = op.lambda(1, ::).inner.toArray.map("%.4f".format(_)).mkString(", ")
assert("1.1101, 1.2076, 1.3050, 0.8899, 0.7924, 0.6950" == topic1)
assert("0.8899, 0.7924, 0.6950, 1.1101, 1.2076, 1.3050" == topic2)
}

test("OnlineLDAOptimizer with toy data") {
def toydata: Array[(Long, Vector)] = Array(
Vectors.sparse(6, Array(0, 1), Array(1, 1)),
Vectors.sparse(6, Array(1, 2), Array(1, 1)),
Vectors.sparse(6, Array(0, 2), Array(1, 1)),

Vectors.sparse(6, Array(3, 4), Array(1, 1)),
Vectors.sparse(6, Array(3, 5), Array(1, 1)),
Vectors.sparse(6, Array(4, 5), Array(1, 1))
).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) }

val docs = sc.parallelize(toydata)
val op = new OnlineLDAOptimizer().setMiniBatchFraction(1).setTau_0(1024).setKappa(0.51)
.setGammaShape(1e10)
val lda = new LDA().setK(2)
.setDocConcentration(0.01)
.setTopicConcentration(0.01)
.setMaxIterations(100)
.setOptimizer(op)

val ldaModel = lda.run(docs)
val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 10)
val topics = topicIndices.map { case (terms, termWeights) =>
terms.zip(termWeights)
}

// check distribution for each topic, typical distribution is (0.3, 0.3, 0.3, 0.02, 0.02, 0.02)
topics.foreach(topic =>{
val smalls = topic.filter(t => (t._2 < 0.1)).map(_._2)
assert(smalls.size == 3 && smalls.sum < 0.2)
})
}

}

private[clustering] object LDASuite {
Expand Down