Skip to content
Closed
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
d640d9c
online lda initial checkin
hhbyyh Feb 6, 2015
043e786
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Feb 6, 2015
26dca1b
style fix and make class private
hhbyyh Feb 6, 2015
f41c5ca
style fix
hhbyyh Feb 6, 2015
45884ab
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Feb 8, 2015
fa408a8
ssMerge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Feb 9, 2015
0d0f3ee
replace random split with sliding
hhbyyh Feb 10, 2015
0dd3947
kMerge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Feb 10, 2015
3a06526
merge with new example
hhbyyh Feb 10, 2015
aa365d1
merge upstream master
hhbyyh Mar 2, 2015
20328d1
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Mar 2, 2015
37af91a
iMerge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Mar 2, 2015
581c623
seperate API and adjust batch split
hhbyyh Mar 2, 2015
e271eb1
remove non ascii
hhbyyh Mar 2, 2015
4a3f27e
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Mar 5, 2015
a570c9a
use sample to pick up batch
hhbyyh Mar 11, 2015
d86cdec
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Mar 11, 2015
f6d47ca
Merge branch 'ldaonline' of https://github.com/hhbyyh/spark into ldao…
hhbyyh Mar 11, 2015
02d0373
fix style in comment
hhbyyh Mar 12, 2015
62405cc
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Mar 20, 2015
8cb16a6
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Mar 23, 2015
f367cc9
change to optimization
hhbyyh Mar 23, 2015
e7bf3b0
move to seperate file
hhbyyh Mar 27, 2015
97b9e1a
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Mar 27, 2015
d19ef55
change OnlineLDA to class
hhbyyh Apr 2, 2015
b29193b
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Apr 16, 2015
15be071
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Apr 17, 2015
dbe3cff
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh Apr 28, 2015
b1178cf
fit into the optimizer framework
hhbyyh Apr 28, 2015
a996a82
respond to comments
hhbyyh Apr 29, 2015
61d60df
Minor cleanups:
jkbradley Apr 29, 2015
9e910d9
small fix
jkbradley Apr 29, 2015
138bfed
Merge pull request #1 from jkbradley/hhbyyh-ldaonline-update
hhbyyh Apr 29, 2015
4041723
add ut
hhbyyh Apr 29, 2015
68c2318
add a java ut
hhbyyh Apr 30, 2015
54cf8da
some style change
hhbyyh May 1, 2015
cf0007d
Merge remote-tracking branch 'upstream/master' into ldaonline
hhbyyh May 1, 2015
6149ca6
fix for setOptimizer
hhbyyh May 1, 2015
cf376ff
For private vars needed for testing, I made them private and added ac…
jkbradley May 2, 2015
1045eec
Merge pull request #2 from jkbradley/hhbyyh-ldaonline2
hhbyyh May 3, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
change to optimization
  • Loading branch information
hhbyyh committed Mar 23, 2015
commit f367cc90bcb19773b618ed3ca47b3529d8ee370c
75 changes: 41 additions & 34 deletions mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala
Original file line number Diff line number Diff line change
Expand Up @@ -249,32 +249,24 @@ class LDA private (


/**
* TODO: add API to take documents paths once tokenizer is ready.
* Learn an LDA model using the given dataset, using online variational Bayes (VB) algorithm.
*
* @param documents RDD of documents, which are term (word) count vectors paired with IDs.
* The term count vectors are "bags of words" with a fixed-size vocabulary
* (where the vocabulary size is the length of the vector).
* Document IDs must be unique and >= 0.
* @param batchNumber Number of batches. For each batch, recommendation size is [4, 16384].
* -1 for automatic batchNumber.
* @param batchNumber Number of batches to split input corpus. For each batch, recommendation
* size is [4, 16384]. -1 for automatic batchNumber.
* @return Inferred LDA model
*/
def runOnlineLDA(documents: RDD[(Long, Vector)], batchNumber: Int = -1): LDAModel = {
val D = documents.count().toInt
val batchSize =
if (batchNumber == -1) { // auto mode
if (D / 100 > 16384) 16384
else if (D / 100 < 4) 4
else D / 100
}
else {
require(batchNumber > 0, "batchNumber should be positive or -1")
D / batchNumber
}
require(batchNumber > 0 || batchNumber == -1,
s"batchNumber must be greater or -1, but was set to $batchNumber")

val onlineLDA = new LDA.OnlineLDAOptimizer(documents, k, batchSize)
(0 until onlineLDA.actualBatchNumber).map(_ => onlineLDA.next())
new LocalLDAModel(Matrices.fromBreeze(onlineLDA.lambda).transpose)
val onlineLDA = new LDA.OnlineLDAOptimizer(documents, k, batchNumber)
val model = onlineLDA.optimize()
new LocalLDAModel(Matrices.fromBreeze(model).transpose)
}

/** Java-friendly version of [[run()]] */
Expand Down Expand Up @@ -437,39 +429,54 @@ private[clustering] object LDA {
private[clustering] class OnlineLDAOptimizer(
private val documents: RDD[(Long, Vector)],
private val k: Int,
private val batchSize: Int) extends Serializable{
private val batchNumber: Int) extends Serializable{

private val vocabSize = documents.first._2.size
private val D = documents.count().toInt
val actualBatchNumber = Math.ceil(D.toDouble / batchSize).toInt
private val batchSize =
if (batchNumber == -1) { // auto mode
if (D / 100 > 16384) 16384
else if (D / 100 < 4) 4
else D / 100
}
else {
D / batchNumber
}

// Initialize the variational distribution q(beta|lambda)
var lambda = getGammaMatrix(k, vocabSize) // K * V
private var lambda = getGammaMatrix(k, vocabSize) // K * V
private var Elogbeta = dirichlet_expectation(lambda) // K * V
private var expElogbeta = exp(Elogbeta) // K * V

private var batchId = 0
def next(): Unit = {
require(batchId < actualBatchNumber)
// weight of the mini-batch. 1024 down weights early iterations
val weight = math.pow(1024 + batchId, -0.5)
val batch = documents.sample(true, batchSize.toDouble / D)
batch.cache()
// Given a mini-batch of documents, estimates the parameters gamma controlling the
// variational distribution over the topic weights for each document in the mini-batch.
var stat = BDM.zeros[Double](k, vocabSize)
stat = batch.aggregate(stat)(seqOp, _ += _)
stat = stat :* expElogbeta
def optimize(): BDM[Double] = {
val actualBatchNumber = Math.ceil(D.toDouble / batchSize).toInt
for(i <- 1 to actualBatchNumber){
val batch = documents.sample(true, batchSize.toDouble / D)

// Given a mini-batch of documents, estimates the parameters gamma controlling the
// variational distribution over the topic weights for each document in the mini-batch.
var stat = BDM.zeros[Double](k, vocabSize)
stat = batch.treeAggregate(stat)(gradient, _ += _)
update(stat, i)
}
lambda
}

private def update(raw: BDM[Double], iter:Int): Unit ={
// weight of the mini-batch. 1024 helps down weights early iterations
val weight = math.pow(1024 + iter, -0.5)

// This step finishes computing the sufficient statistics for the M step
val stat = raw :* expElogbeta

// Update lambda based on documents.
lambda = lambda * (1 - weight) + (stat * D.toDouble / batchSize.toDouble + 1.0 / k) * weight
Elogbeta = dirichlet_expectation(lambda)
expElogbeta = exp(Elogbeta)
batchId += 1
}

// for each document d update that document's gamma and phi
private def seqOp(stat: BDM[Double], doc: (Long, Vector)): BDM[Double] = {
private def gradient(stat: BDM[Double], doc: (Long, Vector)): BDM[Double] = {
val termCounts = doc._2
val (ids, cts) = termCounts match {
case v: DenseVector => (((0 until v.size).toList), v.values)
Expand All @@ -488,7 +495,7 @@ private[clustering] object LDA {
val ctsVector = new BDV[Double](cts).t // 1 * ids

// Iterate between gamma and phi until convergence
while (meanchange > 1e-6) {
while (meanchange > 1e-5) {
val lastgamma = gammad
// 1*K 1 * ids ids * k
gammad = (expElogthetad :* ((ctsVector / phinorm) * (expElogbetad.t))) + 1.0/k
Expand Down