Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
d640d9c
online lda initial checkin
hhbyyh Feb 6, 2015
043e786
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Feb 6, 2015
26dca1b
style fix and make class private
hhbyyh Feb 6, 2015
f41c5ca
style fix
hhbyyh Feb 6, 2015
45884ab
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Feb 8, 2015
fa408a8
ssMerge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Feb 9, 2015
0d0f3ee
replace random split with sliding
hhbyyh Feb 10, 2015
0dd3947
kMerge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Feb 10, 2015
3a06526
merge with new example
hhbyyh Feb 10, 2015
aa365d1
merge upstream master
hhbyyh Mar 2, 2015
20328d1
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Mar 2, 2015
37af91a
iMerge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Mar 2, 2015
581c623
seperate API and adjust batch split
hhbyyh Mar 2, 2015
e271eb1
remove non ascii
hhbyyh Mar 2, 2015
4a3f27e
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Mar 5, 2015
a570c9a
use sample to pick up batch
hhbyyh Mar 11, 2015
d86cdec
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Mar 11, 2015
f6d47ca
Merge branch 'ldaonline' of https://github.com/hhbyyh/spark into ldao…
hhbyyh Mar 11, 2015
02d0373
fix style in comment
hhbyyh Mar 12, 2015
62405cc
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Mar 20, 2015
8cb16a6
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Mar 23, 2015
f367cc9
change to optimization
hhbyyh Mar 23, 2015
e7bf3b0
move to seperate file
hhbyyh Mar 27, 2015
97b9e1a
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Mar 27, 2015
d19ef55
change OnlineLDA to class
hhbyyh Apr 2, 2015
b29193b
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Apr 16, 2015
15be071
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Apr 17, 2015
dbe3cff
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Apr 28, 2015
b1178cf
fit into the optimizer framework
hhbyyh Apr 28, 2015
a996a82
respond to comments
hhbyyh Apr 29, 2015
61d60df
Minor cleanups:
jkbradley Apr 29, 2015
9e910d9
small fix
jkbradley Apr 29, 2015
138bfed
Merge pull request #1 from jkbradley/hhbyyh-ldaonline-update
hhbyyh Apr 29, 2015
4041723
add ut
hhbyyh Apr 29, 2015
68c2318
add a java ut
hhbyyh Apr 30, 2015
54cf8da
some style change
hhbyyh May 1, 2015
cf0007d
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh May 1, 2015
6149ca6
fix for setOptimizer
hhbyyh May 1, 2015
cf376ff
For private vars needed for testing, I made them private and added ac…
jkbradley May 2, 2015
1045eec
Merge pull request #2 from jkbradley/hhbyyh-ldaonline2
hhbyyh May 3, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.Random

import breeze.linalg.{DenseVector => BDV, DenseMatrix => BDM, sum, normalize, kron}
import breeze.numerics.{digamma, exp, abs}
import breeze.stats.distributions.Gamma
import breeze.stats.distributions.{Gamma, RandBasis}

import org.apache.spark.annotation.Experimental
import org.apache.spark.graphx._
Expand Down Expand Up @@ -227,20 +227,37 @@ class OnlineLDAOptimizer extends LDAOptimizer {
private var k: Int = 0
private var corpusSize: Long = 0
private var vocabSize: Int = 0
private[clustering] var alpha: Double = 0
private[clustering] var eta: Double = 0

/** alias for docConcentration */
private var alpha: Double = 0

/** (private[clustering] for debugging) Get docConcentration */
private[clustering] def getAlpha: Double = alpha

/** alias for topicConcentration */
private var eta: Double = 0

/** (private[clustering] for debugging) Get topicConcentration */
private[clustering] def getEta: Double = eta

private var randomGenerator: java.util.Random = null

// Online LDA specific parameters
// Learning rate is: (tau_0 + t)^{-kappa}
private var tau_0: Double = 1024
private var kappa: Double = 0.51
private var miniBatchFraction: Double = 0.01
private var miniBatchFraction: Double = 0.05

// internal data structure
private var docs: RDD[(Long, Vector)] = null
private[clustering] var lambda: BDM[Double] = null

// count of invocation to next, which helps deciding the weight for each iteration
/** Dirichlet parameter for the posterior over topics */
private var lambda: BDM[Double] = null

/** (private[clustering] for debugging) Get parameter for topics */
private[clustering] def getLambda: BDM[Double] = lambda

/** Current iteration (count of invocations of [[next()]]) */
private var iteration: Int = 0
private var gammaShape: Double = 100

Expand Down Expand Up @@ -285,7 +302,12 @@ class OnlineLDAOptimizer extends LDAOptimizer {
/**
* Mini-batch fraction in (0, 1], which sets the fraction of document sampled and used in
* each iteration.
* Default: 0.01, i.e., 1% of total documents
*
* Note that this should be adjusted in synch with [[LDA.setMaxIterations()]]
* so the entire corpus is used. Specifically, set both so that
* maxIterations * miniBatchFraction >= 1.
*
* Default: 0.05, i.e., 5% of total documents.
*/
def setMiniBatchFraction(miniBatchFraction: Double): this.type = {
require(miniBatchFraction > 0.0 && miniBatchFraction <= 1.0,
Expand All @@ -295,15 +317,20 @@ class OnlineLDAOptimizer extends LDAOptimizer {
}

/**
* The function is for test only now. In the future, it can help support training stop/resume
* (private[clustering])
* Set the Dirichlet parameter for the posterior over topics.
* This is only used for testing now. In the future, it can help support training stop/resume.
*/
private[clustering] def setLambda(lambda: BDM[Double]): this.type = {
this.lambda = lambda
this
}

/**
* Used to control the gamma distribution. Larger value produces values closer to 1.0.
* (private[clustering])
* Used for random initialization of the variational parameters.
* Larger value produces values closer to 1.0.
* This is only used for testing currently.
*/
private[clustering] def setGammaShape(shape: Double): this.type = {
this.gammaShape = shape
Expand Down Expand Up @@ -380,12 +407,11 @@ class OnlineLDAOptimizer extends LDAOptimizer {
meanchange = sum(abs(gammad - lastgamma)) / k
}

val m1 = expElogthetad.t.toDenseMatrix.t
val m2 = (ctsVector / phinorm).t.toDenseMatrix
val outerResult = kron(m1, m2) // K * ids
val m1 = expElogthetad.t
val m2 = (ctsVector / phinorm).t.toDenseVector
var i = 0
while (i < ids.size) {
stat(::, ids(i)) := (stat(::, ids(i)) + outerResult(::, i))
stat(::, ids(i)) := stat(::, ids(i)) + m1 * m2(i)
i += 1
}
}
Expand Down Expand Up @@ -423,7 +449,9 @@ class OnlineLDAOptimizer extends LDAOptimizer {
* Get a random matrix to initialize lambda
*/
private def getGammaMatrix(row: Int, col: Int): BDM[Double] = {
val gammaRandomGenerator = new Gamma(gammaShape, 1.0 / gammaShape)
val randBasis = new RandBasis(new org.apache.commons.math3.random.MersenneTwister(
randomGenerator.nextLong()))
val gammaRandomGenerator = new Gamma(gammaShape, 1.0 / gammaShape)(randBasis)
val temp = gammaRandomGenerator.sample(row * col).toArray
new BDM[Double](col, row, temp).t
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.Serializable;
import java.util.ArrayList;

import org.apache.spark.api.java.JavaRDD;
import scala.Tuple2;

import org.junit.After;
Expand All @@ -30,6 +29,7 @@
import org.junit.Test;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Vector;
Expand Down Expand Up @@ -148,6 +148,6 @@ public void OnlineOptimizerCompatibility() {
private static Matrix tinyTopics = LDASuite$.MODULE$.tinyTopics();
private static Tuple2<int[], double[]>[] tinyTopicDescription =
LDASuite$.MODULE$.tinyTopicDescription();
JavaPairRDD<Long, Vector> corpus;
private JavaPairRDD<Long, Vector> corpus;

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class LDASuite extends FunSuite with MLlibTestSparkContext {

// Check: describeTopics() with all terms
val fullTopicSummary = model.describeTopics()
assert(fullTopicSummary.size === tinyK)
assert(fullTopicSummary.length === tinyK)
fullTopicSummary.zip(tinyTopicDescription).foreach {
case ((algTerms, algTermWeights), (terms, termWeights)) =>
assert(algTerms === terms)
Expand Down Expand Up @@ -101,7 +101,7 @@ class LDASuite extends FunSuite with MLlibTestSparkContext {
// Check: per-doc topic distributions
val topicDistributions = model.topicDistributions.collect()
// Ensure all documents are covered.
assert(topicDistributions.size === tinyCorpus.size)
assert(topicDistributions.length === tinyCorpus.length)
assert(tinyCorpus.map(_._1).toSet === topicDistributions.map(_._1).toSet)
// Ensure we have proper distributions
topicDistributions.foreach { case (docId, topicDistribution) =>
Expand Down Expand Up @@ -139,8 +139,8 @@ class LDASuite extends FunSuite with MLlibTestSparkContext {
val corpus = sc.parallelize(tinyCorpus, 2)
val op = new OnlineLDAOptimizer().initialize(corpus, lda)
op.setKappa(0.9876).setMiniBatchFraction(0.123).setTau_0(567)
assert(op.alpha == 0.5) // default 1.0 / k
assert(op.eta == 0.5) // default 1.0 / k
assert(op.getAlpha == 0.5) // default 1.0 / k
assert(op.getEta == 0.5) // default 1.0 / k
assert(op.getKappa == 0.9876)
assert(op.getMiniBatchFraction == 0.123)
assert(op.getTau_0 == 567)
Expand All @@ -154,14 +154,14 @@ class LDASuite extends FunSuite with MLlibTestSparkContext {

def docs: Array[(Long, Vector)] = Array(
Vectors.sparse(vocabSize, Array(0, 1, 2), Array(1, 1, 1)), // apple, orange, banana
Vectors.sparse(vocabSize, Array(3, 4, 5), Array(1, 1, 1))) // tiger, cat, dog
.zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) }
Vectors.sparse(vocabSize, Array(3, 4, 5), Array(1, 1, 1)) // tiger, cat, dog
).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) }
val corpus = sc.parallelize(docs, 2)

// setGammaShape large so to avoid the stochastic impact.
// Set GammaShape large to avoid the stochastic impact.
val op = new OnlineLDAOptimizer().setTau_0(1024).setKappa(0.51).setGammaShape(1e40)
.setMiniBatchFraction(1)
val lda = new LDA().setK(k).setMaxIterations(1).setOptimizer(op)
val lda = new LDA().setK(k).setMaxIterations(1).setOptimizer(op).setSeed(12345)

val state = op.initialize(corpus, lda)
// override lambda to simulate an intermediate state
Expand All @@ -175,8 +175,8 @@ class LDASuite extends FunSuite with MLlibTestSparkContext {

// verify the result, Note this generate the identical result as
// [[https://github.com/Blei-Lab/onlineldavb]]
val topic1 = op.lambda(0, ::).inner.toArray.map("%.4f".format(_)).mkString(", ")
val topic2 = op.lambda(1, ::).inner.toArray.map("%.4f".format(_)).mkString(", ")
val topic1 = op.getLambda(0, ::).inner.toArray.map("%.4f".format(_)).mkString(", ")
val topic2 = op.getLambda(1, ::).inner.toArray.map("%.4f".format(_)).mkString(", ")
assert("1.1101, 1.2076, 1.3050, 0.8899, 0.7924, 0.6950" == topic1)
assert("0.8899, 0.7924, 0.6950, 1.1101, 1.2076, 1.3050" == topic2)
}
Expand All @@ -186,7 +186,6 @@ class LDASuite extends FunSuite with MLlibTestSparkContext {
Vectors.sparse(6, Array(0, 1), Array(1, 1)),
Vectors.sparse(6, Array(1, 2), Array(1, 1)),
Vectors.sparse(6, Array(0, 2), Array(1, 1)),

Vectors.sparse(6, Array(3, 4), Array(1, 1)),
Vectors.sparse(6, Array(3, 5), Array(1, 1)),
Vectors.sparse(6, Array(4, 5), Array(1, 1))
Expand All @@ -200,6 +199,7 @@ class LDASuite extends FunSuite with MLlibTestSparkContext {
.setTopicConcentration(0.01)
.setMaxIterations(100)
.setOptimizer(op)
.setSeed(12345)

val ldaModel = lda.run(docs)
val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 10)
Expand All @@ -208,10 +208,10 @@ class LDASuite extends FunSuite with MLlibTestSparkContext {
}

// check distribution for each topic, typical distribution is (0.3, 0.3, 0.3, 0.02, 0.02, 0.02)
topics.foreach(topic =>{
val smalls = topic.filter(t => (t._2 < 0.1)).map(_._2)
assert(smalls.size == 3 && smalls.sum < 0.2)
})
topics.foreach { topic =>
val smalls = topic.filter(t => t._2 < 0.1).map(_._2)
assert(smalls.length == 3 && smalls.sum < 0.2)
}
}

}
Expand Down