Skip to content
Prev Previous commit
Next Next commit
use treeAggregate instead of aggregate
  • Loading branch information
Liquan Pei committed Aug 4, 2014
commit e93e7263d74879379257e6fff40d5efc8417f2ce
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.HashPartitioner
import org.apache.spark.storage.StorageLevel
import org.apache.spark.mllib.rdd.RDDFunctions._
/**
* Entry in vocabulary
*/
Expand Down Expand Up @@ -111,9 +112,9 @@ class Word2Vec(
}

private def learnVocabPerPartition(words:RDD[String]) {

}

private def createExpTable(): Array[Double] = {
val expTable = new Array[Double](EXP_TABLE_SIZE)
var i = 0
Expand Down Expand Up @@ -254,7 +255,7 @@ class Word2Vec(
val (aggSyn0, aggSyn1, _, _) =
// TODO: broadcast temp instead of serializing it directly
// or initialize the model in each executor
newSentences.aggregate((syn0Global.clone(), syn1Global.clone(), 0, 0))(
newSentences.treeAggregate((syn0Global.clone(), syn1Global.clone(), 0, 0))(
seqOp = (c, v) => (c, v) match {
case ((syn0, syn1, lastWordCount, wordCount), sentence) =>
var lwc = lastWordCount
Expand Down