Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
updates
  • Loading branch information
jkbradley committed Aug 19, 2015
commit d99d5be9439ea29e8fd13b600ad0c8334b0a171b
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.mllib.clustering

import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argtopk, normalize, sum}
import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argmax, argtopk, normalize, sum}
import breeze.numerics.{exp, lgamma}
import org.apache.hadoop.fs.Path
import org.json4s.DefaultFormats
Expand All @@ -27,7 +27,7 @@ import org.json4s.jackson.JsonMethods._
import org.apache.spark.SparkContext
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD}
import org.apache.spark.graphx.{Edge, EdgeContext, Graph, VertexId}
import org.apache.spark.graphx._
import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector, Vectors}
import org.apache.spark.mllib.util.{Loader, Saveable}
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -412,7 +412,7 @@ object LocalLDAModel extends Loader[LocalLDAModel] {
Loader.checkSchema[Data](dataFrame.schema)
val topics = dataFrame.collect()
val vocabSize = topics(0).getAs[Vector](0).size
val k = topics.size
val k = topics.length

val brzTopics = BDM.zeros[Double](vocabSize, k)
topics.foreach { case Row(vec: Vector, ind: Int) =>
Expand Down Expand Up @@ -575,6 +575,42 @@ class DistributedLDAModel private[clustering] (
}
}

/**
* Return the top topic for each (doc, term) pair. I.e., for each document, what is the most
* likely topic generating each term?
*
* @return RDD of (doc ID, assignment of top topic index for each term),
* where the assignment is specified via a pair of zippable arrays
* (term indices, topic indices). Note that terms will be omitted if not present in
* the document.
*/
lazy val topTopicAssignments: RDD[(Long, Array[Int], Array[Int])] = {
// For reference, compare the below code with the core part of EMLDAOptimizer.next().
val eta = topicConcentration
val W = vocabSize
val alpha = docConcentration(0)
val N_k = globalTopicTotals
val sendMsg: EdgeContext[TopicCounts, TokenCount, (Array[Int], Array[Int])] => Unit =
(edgeContext) => {
// E-STEP: Compute gamma_{wjk} (smoothed topic distributions).
val scaledTopicDistribution: TopicCounts =
computePTopic(edgeContext.srcAttr, edgeContext.dstAttr, N_k, W, eta, alpha)
// For this (doc j, term w), send top topic k to doc vertex.
val topTopic: Int = argmax(scaledTopicDistribution)
val term: Int = edgeContext.dstId.toInt // ok to assume # terms < Int.MaxValue
edgeContext.sendToSrc((Array(term), Array(topTopic)))
}
val mergeMsg: ((Array[Int], Array[Int]), (Array[Int], Array[Int])) => (Array[Int], Array[Int]) =
(terms_topics0, terms_topics1) => {
(terms_topics0._1 ++ terms_topics1._1, terms_topics0._2 ++ terms_topics1._2)
}
// M-STEP: Aggregation computes new N_{kj}, N_{wk} counts.
graph.aggregateMessages[(Array[Int], Array[Int])](sendMsg, mergeMsg).filter(isDocumentVertex)
.map { case (docID: Long, (terms: Array[Int], topics: Array[Int])) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix indentation

(docID, terms, topics)
}
}

// TODO
// override def logLikelihood(documents: RDD[(Long, Vector)]): Double = ???

Expand Down Expand Up @@ -797,10 +833,9 @@ object DistributedLDAModel extends Loader[DistributedLDAModel] {
val classNameV1_0 = SaveLoadV1_0.thisClassName

val model = (loadedClassName, loadedVersion) match {
case (className, "1.0") if className == classNameV1_0 => {
case (className, "1.0") if className == classNameV1_0 =>
DistributedLDAModel.SaveLoadV1_0.load(sc, path, vocabSize, docConcentration,
topicConcentration, iterationTimes.toArray, gammaShape)
}
case _ => throw new Exception(
s"DistributedLDAModel.load did not recognize model with (className, format version):" +
s"($loadedClassName, $loadedVersion). Supported: ($classNameV1_0, 1.0)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ final class EMLDAOptimizer extends LDAOptimizer {
edgeContext.sendToDst((false, scaledTopicDistribution))
edgeContext.sendToSrc((false, scaledTopicDistribution))
}
// This is a hack to detect whether we could modify the values in-place.
// The Boolean is a hack to detect whether we could modify the values in-place.
// TODO: Add zero/seqOp/combOp option to aggregateMessages. (SPARK-5438)
val mergeMsg: ((Boolean, TopicCounts), (Boolean, TopicCounts)) => (Boolean, TopicCounts) =
(m0, m1) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,17 +135,35 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext {
}

// Top 3 documents per topic
model.topDocumentsPerTopic(3).zip(topDocsByTopicDistributions(3)).foreach {case (t1, t2) =>
model.topDocumentsPerTopic(3).zip(topDocsByTopicDistributions(3)).foreach { case (t1, t2) =>
assert(t1._1 === t2._1)
assert(t1._2 === t2._2)
}

// All documents per topic
val q = tinyCorpus.length
model.topDocumentsPerTopic(q).zip(topDocsByTopicDistributions(q)).foreach {case (t1, t2) =>
model.topDocumentsPerTopic(q).zip(topDocsByTopicDistributions(q)).foreach { case (t1, t2) =>
assert(t1._1 === t2._1)
assert(t1._2 === t2._2)
}

// Check: topTopicAssignments
// Make sure it assigns a topic to each term appearing in each doc.
val topTopicAssignments: Array[(Array[Int], Array[Int])] =
model.topTopicAssignments.collect().sortBy(_._1).map(x => (x._2, x._3))
assert(topTopicAssignments.length === tinyCorpus.length)
topTopicAssignments.zip(tinyCorpus.map(_._2)).foreach { case ((inds, vals), doc) =>
assert(inds.length === doc.numNonzeros)
doc.foreachActive((i, _) => assert(inds.contains(i)))
}
// Compare with manually computing topTopicAssignments
// P(topic k | term w, doc j) \propto P(term w | topic k) * P(topic k | doc j)
val topics = model.topicsMatrix
topicDistributions.foreach { case (docID, dist) =>
Range(0, )
val P_k_given_wj = topics.multiply(dist)
P_k_given_wj
}
}

test("vertex indexing") {
Expand Down