Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
test pass
  • Loading branch information
Peng Meng committed Jul 13, 2017
commit 215efc3114012ebc19af984a3d0172aecb22f255
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.mllib.recommendation
import java.io.IOException
import java.lang.{Integer => JavaInteger}

import scala.collection.mutable

import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
import com.github.fommil.netlib.BLAS.{getInstance => blas}
import org.apache.hadoop.fs.Path
Expand All @@ -31,7 +33,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.annotation.Since
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD}
import org.apache.spark.internal.Logging
import org.apache.spark.mllib.linalg.BLAS
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.rdd.MLPairRDDFunctions._
import org.apache.spark.mllib.util.{Loader, Saveable}
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -286,40 +288,123 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
srcFeatures: RDD[(Int, Array[Double])],
dstFeatures: RDD[(Int, Array[Double])],
num: Int): RDD[(Int, Array[(Int, Double)])] = {
val srcBlocks = blockify(srcFeatures)
val dstBlocks = blockify(dstFeatures)
val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) =>
val m = srcIter.size
val n = math.min(dstIter.size, num)
val output = new Array[(Int, (Int, Double))](m * n)
val srcBlocks = blockify(rank, srcFeatures).zipWithIndex()
val dstBlocks = blockify(rank, dstFeatures)
val ratings = srcBlocks.cartesian(dstBlocks).map {
case (((srcIds, srcFactors), index), (dstIds, dstFactors)) =>
val m = srcIds.length
val n = dstIds.length
val dstIdMatrix = new Array[Int](m * num)
val scoreMatrix = Array.fill[Double](m * num)(Double.MinValue)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, MinValue is not the most negative value, but the smallest positive value. Is that what you want here?

val pq = new BoundedPriorityQueue[(Int, Double)](num)(Ordering.by(_._2))

val ratings = srcFactors.transpose.multiply(dstFactors)
var i = 0
var j = 0
while (i < m) {
var k = 0
while (k < n) {
pq += dstIds(k) -> ratings(i, k)
k += 1
}
var size = pq.size
while(size > 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll need to fix up a few style things like a space after while

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not add a nonEmpty / isEmpty method for this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean add an isEmpty method for PriorityQueue? Thanks.

size -= 1
val factor = pq.poll
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

poll() because it has side effects

dstIdMatrix(j + size) = factor._1
scoreMatrix(j + size) = factor._2
}
i += 1
// pq.size maybe less than num, corner case
j += num
pq.clear
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clear()

}
(index -> (srcIds, dstIdMatrix, new DenseMatrix(m, num, scoreMatrix)))
}
ratings.aggregateByKey(null: Array[Int], null: Array[Int], null: DenseMatrix)(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is aggregating by key which in this case appears to be the "block index". What is the benefit then? Since each block will have a unique index, there would be no intermediate aggregation.

(rateSum, rate) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Braces aren't needed in these args, just put them on one line

mergeFunc(rateSum, rate, num)
},
(rateSum1, rateSum2) => {
mergeFunc(rateSum1, rateSum2, num)
}
).flatMap(value => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.flatMap { value => to avoid redundant parens

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, use case (...) instead of value to name its elements. The ._2, ._3 below is hard to understand

// to avoid corner case that the number of items is less than recommendation num
var col: Int = 0
while (col < num && value._2._3(0, col) > Double.MinValue) {
col += 1
}
val row = value._2._3.numRows
val output = new Array[(Int, Array[(Int, Double)])](row)
var i = 0
val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2))
srcIter.foreach { case (srcId, srcFactor) =>
dstIter.foreach { case (dstId, dstFactor) =>
// We use F2jBLAS which is faster than a call to native BLAS for vector dot product
val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1)
pq += dstId -> score
while (i < row) {
val factors = new Array[(Int, Double)](col)
var j = 0
while (j < col) {
factors(j) = (value._2._2(i * num + j), value._2._3(i, j))
j += 1
}
pq.foreach { case (dstId, score) =>
output(i) = (srcId, (dstId, score))
i += 1
output(i) = (value._2._1(i), factors)
i += 1
}
output.toSeq})
}

private def mergeFunc(rateSum: (Array[Int], Array[Int], DenseMatrix),
rate: (Array[Int], Array[Int], DenseMatrix),
num: Int): (Array[Int], Array[Int], DenseMatrix) = {
if (rateSum._1 == null) {
rate
} else {
val row = rateSum._3.numRows
var i = 0
val tempIdMatrix = new Array[Int](row * num)
val tempScoreMatrix = new Array[Double](row * num)
while (i < row) {
var j = 0
var sum_index = 0
var rate_index = 0
while (j < num) {
if (rate._3(i, rate_index) > rateSum._3(i, sum_index)) {
tempIdMatrix(i * num + j) = rate._2(i * num + rate_index)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth storing i * num in a local to avoid recomputing it

tempScoreMatrix(i * num + j) = rate._3(i, rate_index)
rate_index += 1
} else if (rate._3(i, rate_index) < rateSum._3(i, sum_index)) {
tempIdMatrix(i * num + j) = rateSum._2(i * num + sum_index)
tempScoreMatrix(i * num + j) = rateSum._3(i, sum_index)
sum_index += 1
}
j += 1
}
pq.clear()
i += 1
}
output.toSeq
(rateSum._1, tempIdMatrix, new DenseMatrix(row, num, tempScoreMatrix))
}
ratings.topByKey(num)(Ordering.by(_._2))
}

/**
* Blockifies features to improve the efficiency of cartesian product
* TODO: SPARK-20443 - expose blockSize as a param?
*/
private def blockify(
features: RDD[(Int, Array[Double])],
blockSize: Int = 4096): RDD[Seq[(Int, Array[Double])]] = {
def blockify(
rank: Int,
features: RDD[(Int, Array[Double])]): RDD[(Array[Int], DenseMatrix)] = {
val blockSize = 2000 // TODO: tune the block size
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So will you add a parameter for this ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we have another PR to set this value SPARK-20443.
If the blockSize is large enough, it is possible to OOM. For my test, the blockSize is set from 1000 to 8000, the performance of this PR is better than the master.
And the performance is about the same for blockSize is 1000 to 8000.

val blockStorage = rank * blockSize
features.mapPartitions { iter =>
iter.grouped(blockSize)
iter.grouped(blockSize).map { grouped =>
val ids = mutable.ArrayBuilder.make[Int]
ids.sizeHint(blockSize)
val factors = mutable.ArrayBuilder.make[Double]
factors.sizeHint(blockStorage)
var i = 0
grouped.foreach { case (id, factor) =>
ids += id
factors ++= factor
i += 1
}
(ids.result(), new DenseMatrix(rank, i, factors.result()))
}
}
}

Expand Down