Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.mllib.clustering

import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argtopk, normalize, sum}
import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argmax, argtopk, normalize, sum}
import breeze.numerics.{exp, lgamma}
import org.apache.hadoop.fs.Path
import org.json4s.DefaultFormats
Expand Down Expand Up @@ -412,7 +412,7 @@ object LocalLDAModel extends Loader[LocalLDAModel] {
Loader.checkSchema[Data](dataFrame.schema)
val topics = dataFrame.collect()
val vocabSize = topics(0).getAs[Vector](0).size
val k = topics.size
val k = topics.length

val brzTopics = BDM.zeros[Double](vocabSize, k)
topics.foreach { case Row(vec: Vector, ind: Int) =>
Expand Down Expand Up @@ -575,6 +575,48 @@ class DistributedLDAModel private[clustering] (
}
}

/**
* Return the top topic for each (doc, term) pair. I.e., for each document, what is the most
* likely topic generating each term?
*
* @return RDD of (doc ID, assignment of top topic index for each term),
* where the assignment is specified via a pair of zippable arrays
* (term indices, topic indices). Note that terms will be omitted if not present in
* the document.
*/
lazy val topicAssignments: RDD[(Long, Array[Int], Array[Int])] = {
// For reference, compare the below code with the core part of EMLDAOptimizer.next().
val eta = topicConcentration
val W = vocabSize
val alpha = docConcentration(0)
val N_k = globalTopicTotals
val sendMsg: EdgeContext[TopicCounts, TokenCount, (Array[Int], Array[Int])] => Unit =
(edgeContext) => {
// E-STEP: Compute gamma_{wjk} (smoothed topic distributions).
val scaledTopicDistribution: TopicCounts =
computePTopic(edgeContext.srcAttr, edgeContext.dstAttr, N_k, W, eta, alpha)
// For this (doc j, term w), send top topic k to doc vertex.
val topTopic: Int = argmax(scaledTopicDistribution)
val term: Int = index2term(edgeContext.dstId)
edgeContext.sendToSrc((Array(term), Array(topTopic)))
}
val mergeMsg: ((Array[Int], Array[Int]), (Array[Int], Array[Int])) => (Array[Int], Array[Int]) =
(terms_topics0, terms_topics1) => {
(terms_topics0._1 ++ terms_topics1._1, terms_topics0._2 ++ terms_topics1._2)
}
// M-STEP: Aggregation computes new N_{kj}, N_{wk} counts.
graph.aggregateMessages[(Array[Int], Array[Int])](sendMsg, mergeMsg).filter(isDocumentVertex)
.map { case (docID: Long, (terms: Array[Int], topics: Array[Int])) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix indentation

val (sortedTerms, sortedTopics) = terms.zip(topics).sortBy(_._1).unzip
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave a TODO because zip is not efficient

(docID, sortedTerms.toArray, sortedTopics.toArray)
}
}

/** Java-friendly version of [[topicAssignments]] */
lazy val javaTopicAssignments: JavaRDD[(java.lang.Long, Array[Int], Array[Int])] = {
topicAssignments.asInstanceOf[RDD[(java.lang.Long, Array[Int], Array[Int])]].toJavaRDD()
}

// TODO
// override def logLikelihood(documents: RDD[(Long, Vector)]): Double = ???

Expand Down Expand Up @@ -797,10 +839,9 @@ object DistributedLDAModel extends Loader[DistributedLDAModel] {
val classNameV1_0 = SaveLoadV1_0.thisClassName

val model = (loadedClassName, loadedVersion) match {
case (className, "1.0") if className == classNameV1_0 => {
case (className, "1.0") if className == classNameV1_0 =>
DistributedLDAModel.SaveLoadV1_0.load(sc, path, vocabSize, docConcentration,
topicConcentration, iterationTimes.toArray, gammaShape)
}
case _ => throw new Exception(
s"DistributedLDAModel.load did not recognize model with (className, format version):" +
s"($loadedClassName, $loadedVersion). Supported: ($classNameV1_0, 1.0)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ final class EMLDAOptimizer extends LDAOptimizer {
edgeContext.sendToDst((false, scaledTopicDistribution))
edgeContext.sendToSrc((false, scaledTopicDistribution))
}
// This is a hack to detect whether we could modify the values in-place.
// The Boolean is a hack to detect whether we could modify the values in-place.
// TODO: Add zero/seqOp/combOp option to aggregateMessages. (SPARK-5438)
val mergeMsg: ((Boolean, TopicCounts), (Boolean, TopicCounts)) => (Boolean, TopicCounts) =
(m0, m1) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,13 @@ public Boolean call(Tuple2<Long, Vector> tuple2) {
double[] topicWeights = topTopics._3();
assertEquals(3, topicIndices.length);
assertEquals(3, topicWeights.length);

// Check: topTopicAssignments
Tuple3<Long, int[], int[]> topicAssignment = model.javaTopicAssignments().first();
Long docId2 = topicAssignment._1();
int[] termIndices2 = topicAssignment._2();
int[] topicIndices2 = topicAssignment._3();
assertEquals(termIndices2.length, topicIndices2.length);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,17 +135,34 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext {
}

// Top 3 documents per topic
model.topDocumentsPerTopic(3).zip(topDocsByTopicDistributions(3)).foreach {case (t1, t2) =>
model.topDocumentsPerTopic(3).zip(topDocsByTopicDistributions(3)).foreach { case (t1, t2) =>
assert(t1._1 === t2._1)
assert(t1._2 === t2._2)
}

// All documents per topic
val q = tinyCorpus.length
model.topDocumentsPerTopic(q).zip(topDocsByTopicDistributions(q)).foreach {case (t1, t2) =>
model.topDocumentsPerTopic(q).zip(topDocsByTopicDistributions(q)).foreach { case (t1, t2) =>
assert(t1._1 === t2._1)
assert(t1._2 === t2._2)
}

// Check: topTopicAssignments
// Make sure it assigns a topic to each term appearing in each doc.
val topTopicAssignments: Map[Long, (Array[Int], Array[Int])] =
model.topicAssignments.collect().map(x => x._1 -> (x._2, x._3)).toMap
assert(topTopicAssignments.keys.max < tinyCorpus.length)
tinyCorpus.foreach { case (docID: Long, doc: Vector) =>
if (topTopicAssignments.contains(docID)) {
val (inds, vals) = topTopicAssignments(docID)
assert(inds.length === doc.numNonzeros)
// For "term" in actual doc,
// check that it has a topic assigned.
doc.foreachActive((term, wcnt) => assert(wcnt === 0 || inds.contains(term)))
} else {
assert(doc.numNonzeros === 0)
}
}
}

test("vertex indexing") {
Expand Down