Skip to content

Commit dca0d9a

Browse files
hhbyyhjkbradley
authored andcommitted
[SPARK-14322][MLLIB] Use treeAggregate instead of reduce in OnlineLDAOptimizer
## What changes were proposed in this pull request? jira: https://issues.apache.org/jira/browse/SPARK-14322 OnlineLDAOptimizer uses RDD.reduce in two places where it could use treeAggregate. This can cause scalability issues. This should be an easy fix. This is also a bug since it modifies the first argument to reduce, so we should use aggregate or treeAggregate. See this line: https://github.com/apache/spark/blob/f12f11e578169b47e3f8b18b299948c0670ba585/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala#L452 and a few lines below it. ## How was this patch tested? unit tests Author: Yuhao Yang <hhbyyh@gmail.com> Closes #12106 from hhbyyh/ldaTreeReduce. (cherry picked from commit 8cffcb6) Signed-off-by: Joseph K. Bradley <joseph@databricks.com>
1 parent cfe9f02 commit dca0d9a

File tree

1 file changed

+3
-2
lines changed

1 file changed

+3
-2
lines changed

mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -449,10 +449,11 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
449449
}
450450
Iterator((stat, gammaPart))
451451
}
452-
val statsSum: BDM[Double] = stats.map(_._1).reduce(_ += _)
452+
val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
453+
_ += _, _ += _)
453454
expElogbetaBc.unpersist()
454455
val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
455-
stats.map(_._2).reduce(_ ++ _).map(_.toDenseMatrix): _*)
456+
stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
456457
val batchResult = statsSum :* expElogbeta.t
457458

458459
// Note that this is an optimization to avoid batch.count

0 commit comments

Comments
 (0)