@@ -363,9 +363,9 @@ class ALSModel private[ml] (
363363 * relatively efficient, the approach implemented here is significantly more efficient.
364364 *
365365 * This approach groups factors into blocks and computes the top-k elements per block,
366- * using Level 1 BLAS (dot ) and an efficient [[BoundedPriorityQueue ]]. It then computes the
367- * global top-k by aggregating the per block top-k elements with a [[ TopByKeyAggregator ]].
368- * This significantly reduces the size of intermediate and shuffle data.
366+ * using a simple dot product (instead of gemm ) and an efficient [[BoundedPriorityQueue ]].
367+ * It then computes the global top-k by aggregating the per block top-k elements with
368+ * a [[ TopByKeyAggregator ]]. This significantly reduces the size of intermediate and shuffle data.
369369 * This is the DataFrame equivalent to the approach used in
370370 * [[org.apache.spark.mllib.recommendation.MatrixFactorizationModel ]].
371371 *
@@ -397,20 +397,18 @@ class ALSModel private[ml] (
397397 val pq = new BoundedPriorityQueue [(Int , Float )](num)(Ordering .by(_._2))
398398 srcIter.foreach { case (srcId, srcFactor) =>
399399 dstIter.foreach { case (dstId, dstFactor) =>
400- /**
400+ /*
401401 * The below code is equivalent to
402- * val score = blas.sdot(rank, srcFactor, 1, dstFactor, 1)
403- * Compared with BLAS.dot, the hand-written version used below is more efficient than
404- * a call to the native BLAS backend and the same performance as the fallback
405- * F2jBLAS backend.
402+ * `val score = blas.sdot(rank, srcFactor, 1, dstFactor, 1)`
403+ * This handwritten version is as or more efficient as BLAS calls in this case.
406404 */
407405 var score = 0.0f
408406 var k = 0
409407 while (k < rank) {
410408 score += srcFactor(k) * dstFactor(k)
411409 k += 1
412410 }
413- pq += { ( dstId, score) }
411+ pq += dstId -> score
414412 }
415413 val pqIter = pq.iterator
416414 var i = 0
@@ -434,15 +432,16 @@ class ALSModel private[ml] (
434432 .add(dstOutputColumn, IntegerType )
435433 .add(" rating" , FloatType )
436434 )
437- recs.select($" id" as srcOutputColumn , $" recommendations" cast arrayType)
435+ recs.select($" id" .as( srcOutputColumn) , $" recommendations" . cast( arrayType) )
438436 }
439437
440438 /**
441439 * Blockifies factors to improve the efficiency of cross join
440+ * TODO: SPARK-20443 - expose blockSize as a param?
442441 */
443442 private def blockify (
444- factors : Dataset [(Int , Array [Float ])],
445- /* TODO make blockSize a param? */ blockSize : Int = 4096 ): Dataset [Seq [(Int , Array [Float ])]] = {
443+ factors : Dataset [(Int , Array [Float ])],
444+ blockSize : Int = 4096 ): Dataset [Seq [(Int , Array [Float ])]] = {
446445 import factors .sparkSession .implicits ._
447446 factors.mapPartitions(_.grouped(blockSize))
448447 }
0 commit comments