Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
replace all Array[Double] with Vector
  • Loading branch information
soulmachine committed Jul 26, 2013
commit bb3dfbcc5a55a187bdd76f5c6fb51cacbc29414c
35 changes: 27 additions & 8 deletions mllib/src/main/scala/spark/mllib/math/vector/DenseVector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class DenseVector private (dimension: Int, array: Option[DoubleMatrix] = None) e

def this(that: SparseVector) = this(that.toArray)

override def clone(): DenseVector = new DenseVector(this)
override def deepClone(): DenseVector = new DenseVector(this)

def apply(i: Int): Double = values.get(i)

Expand All @@ -60,7 +60,8 @@ class DenseVector private (dimension: Int, array: Option[DoubleMatrix] = None) e
def toArray(): Array[Double] = this.values.toArray()

def +(that: Vector): Vector = {
if(this.dimension != that.dimension) throw new DimensionException(dimension, that.dimension)
if(this.dimension != that.dimension)
throw new DimensionException(dimension, that.dimension)

val thatV = DenseVector.getOrConvert(that)
val result = new DoubleMatrix(dimension)
Expand All @@ -75,7 +76,8 @@ class DenseVector private (dimension: Int, array: Option[DoubleMatrix] = None) e
}

def +=(that: Vector): Vector = {
if(this.dimension != that.dimension) throw new DimensionException(dimension, that.dimension)
if(this.dimension != that.dimension)
throw new DimensionException(dimension, that.dimension)

val thatV = DenseVector.getOrConvert(that)
values.addi(thatV.values)
Expand All @@ -90,7 +92,8 @@ class DenseVector private (dimension: Int, array: Option[DoubleMatrix] = None) e
}

def -(that: Vector): Vector = {
if(this.dimension != that.dimension) throw new DimensionException(dimension, that.dimension)
if(this.dimension != that.dimension)
throw new DimensionException(dimension, that.dimension)

val thatV = DenseVector.getOrConvert(that)
val result = new DoubleMatrix(dimension)
Expand All @@ -105,7 +108,8 @@ class DenseVector private (dimension: Int, array: Option[DoubleMatrix] = None) e
}

def -=(that: Vector): Vector = {
if(this.dimension != that.dimension) throw new DimensionException(dimension, that.dimension)
if(this.dimension != that.dimension)
throw new DimensionException(dimension, that.dimension)

val thatV = DenseVector.getOrConvert(that)
values.subi(thatV.values)
Expand All @@ -119,7 +123,8 @@ class DenseVector private (dimension: Int, array: Option[DoubleMatrix] = None) e
}

def *(that: Vector): Double = {
if(this.dimension != that.dimension) throw new DimensionException(dimension, that.dimension)
if(this.dimension != that.dimension)
throw new DimensionException(dimension, that.dimension)

val thatV = DenseVector.getOrConvert(that)
values.dot(thatV.values)
Expand All @@ -140,11 +145,25 @@ class DenseVector private (dimension: Int, array: Option[DoubleMatrix] = None) e
invalidateCachedLength()
this
}

def / (that: Vector): Vector = {
val result = this.deepClone
val thatV = DenseVector.getOrConvert(that)
result.values.divi(thatV.values)
result
}
/** Elementwise divide(in place). */
def /= (that: Vector): Vector = {
val thatV = DenseVector.getOrConvert(that)
this.values.divi(thatV.values)
this
}

def sum(): Double = this.values.sum()

def getDistanceSquared(that: Vector): Double = {
if(this.dimension != that.dimension) throw new DimensionException(dimension, that.dimension)
if(this.dimension != that.dimension)
throw new DimensionException(dimension, that.dimension)

val thatV = DenseVector.getOrConvert(that)
this.values.squaredDistance(thatV.values)
Expand All @@ -167,7 +186,7 @@ class DenseVector private (dimension: Int, array: Option[DoubleMatrix] = None) e
if (power.isInfinite() || power <= 1.0) {
throw new IllegalArgumentException("Power must be > 1 and < infinity");
} else {
val result = this.clone()
val result = this.deepClone()
result.values.addi(1.0)
logi(result.values)
result.values.divi(log(power) * norm)
Expand Down
27 changes: 20 additions & 7 deletions mllib/src/main/scala/spark/mllib/math/vector/SparseVector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class SparseVector(dimension: Int) extends AbstractVector(dimension) {

def this(that: DenseVector) = this(that.toArray)

override def clone(): SparseVector = new SparseVector(this)
override def deepClone(): SparseVector = new SparseVector(this)

def apply(i: Int): Double = values.getQuick(i)

Expand All @@ -62,7 +62,7 @@ class SparseVector(dimension: Int) extends AbstractVector(dimension) {
}

def + (x: Double): Vector = {
val result = this.clone
val result = this.deepClone
if (x != 0.0) {
result.assign(Functions.plus(x))
}
Expand All @@ -87,12 +87,12 @@ class SparseVector(dimension: Int) extends AbstractVector(dimension) {
if(this.dimension != that.dimension) throw new DimensionException(dimension, that.dimension)

val thatV = SparseVector.getOrConvert(that)
val result = this.clone
val result = this.deepClone
result.assign(thatV, Functions.minus)
}

def - (x: Double): Vector = {
val result = this.clone
val result = this.deepClone
if (x != 0.0) {
result.assign(Functions.minus(x))
}
Expand Down Expand Up @@ -121,7 +121,7 @@ class SparseVector(dimension: Int) extends AbstractVector(dimension) {
}

def *(x: Double): Vector = {
val result = this.clone
val result = this.deepClone
result.assign(Functions.mult(x))
result
}
Expand All @@ -132,7 +132,7 @@ class SparseVector(dimension: Int) extends AbstractVector(dimension) {
}

def /(x: Double): Vector = {
val result = this.clone
val result = this.deepClone
result.assign(Functions.div(x))
result
}
Expand All @@ -142,6 +142,19 @@ class SparseVector(dimension: Int) extends AbstractVector(dimension) {
this
}

def / (that: Vector): Vector = {
val result = this.deepClone
val thatV = SparseVector.getOrConvert(that)
result.assign(thatV, Functions.div)
result
}
/** Elementwise divide(in place). */
def /= (that: Vector): Vector = {
val thatV = SparseVector.getOrConvert(that)
this.assign(thatV, Functions.div)
this
}

def sum(): Double = this.values.zSum()

def getDistanceSquared(that: Vector): Double = {
Expand Down Expand Up @@ -206,7 +219,7 @@ class SparseVector(dimension: Int) extends AbstractVector(dimension) {
if (power.isInfinite() || power <= 1.0) {
throw new IllegalArgumentException("Power must be > 1 and < infinity");
} else {
val result = this.clone()
val result = this.deepClone()
result.assign(Functions.chain(Functions.div(norm), Functions.chain(Functions.lg(power), Functions.plus(1.0))))
result
}
Expand Down
8 changes: 8 additions & 0 deletions mllib/src/main/scala/spark/mllib/math/vector/Vector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ abstract class Vector extends Serializable with Equals {
* Get the dimension of the vector.
*/
def dimension: Int

/** Return a deep clone of the recipient. */
def deepClone(): Vector

/** Get the i-th element of the vector. */
def apply(i: Int): Double
/** Set the i-th element of the vector. */
Expand Down Expand Up @@ -65,6 +69,10 @@ abstract class Vector extends Serializable with Equals {
def / (x: Double): Vector
/** Return the original vector containing the elements of the recipient divided by the argument. */
def /= (x: Double): Vector
/** Elementwise divide. */
def / (that: Vector): Vector
/** Elementwise divide(in place). */
def /= (that: Vector): Vector

/** Return the dot product of the recipient and the argument. */
def * (that: Vector): Double
Expand Down
5 changes: 3 additions & 2 deletions mllib/src/main/scala/spark/mllib/optimization/Gradient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package spark.mllib.optimization

import org.jblas.DoubleMatrix
import spark.mllib.math.vector.Vector

abstract class Gradient extends Serializable {
/**
Expand All @@ -27,6 +28,6 @@ abstract class Gradient extends Serializable {
* @param label - Label for this data item.
* @param weights - Column matrix containing weights for every feature.
*/
def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix):
(DoubleMatrix, Double)
def compute(data: Vector, label: Double, weights: Vector):
(Vector, Double)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package spark.mllib.optimization

import spark.{Logging, RDD, SparkContext}
import spark.SparkContext._
import spark.mllib.math.vector.Vector

import org.jblas.DoubleMatrix

Expand All @@ -44,37 +45,36 @@ object GradientDescent {
* loss computed for every iteration.
*/
def runMiniBatchSGD(
data: RDD[(Double, Array[Double])],
data: RDD[(Double, Vector)],
gradient: Gradient,
updater: Updater,
stepSize: Double,
numIters: Int,
initialWeights: Array[Double],
miniBatchFraction: Double=1.0) : (Array[Double], Array[Double]) = {
initialWeights: Vector,
miniBatchFraction: Double=1.0) : (Vector, Array[Double]) = {

val stochasticLossHistory = new ArrayBuffer[Double](numIters)

val nexamples: Long = data.count()
val miniBatchSize = nexamples * miniBatchFraction

// Initialize weights as a column vector
var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights:_*)
var weights = initialWeights.deepClone()
var reg_val = 0.0

for (i <- 1 to numIters) {
val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42+i).map {
case (y, features) =>
val featuresRow = new DoubleMatrix(features.length, 1, features:_*)
val (grad, loss) = gradient.compute(featuresRow, y, weights)
val (grad, loss) = gradient.compute(features, y, weights)
(grad, loss)
}.reduce((a, b) => (a._1.addi(b._1), a._2 + b._2))
}.reduce((a, b) => (a._1 += b._1, a._2 + b._2))

stochasticLossHistory.append(lossSum / miniBatchSize + reg_val)
val update = updater.compute(weights, gradientSum.div(miniBatchSize), stepSize, i)
val update = updater.compute(weights, gradientSum / miniBatchSize, stepSize, i)
weights = update._1
reg_val = update._2
}

(weights.toArray, stochasticLossHistory.toArray)
(weights, stochasticLossHistory.toArray)
}
}
14 changes: 8 additions & 6 deletions mllib/src/main/scala/spark/mllib/optimization/Updater.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package spark.mllib.optimization

import spark.mllib.math.vector.Vector

import org.jblas.DoubleMatrix

abstract class Updater extends Serializable {
Expand All @@ -31,14 +33,14 @@ abstract class Updater extends Serializable {
* @return A tuple of 2 elements. The first element is a column matrix containing updated weights,
* and the second element is the regularization value.
*/
def compute(weightsOlds: DoubleMatrix, gradient: DoubleMatrix, stepSize: Double, iter: Int):
(DoubleMatrix, Double)
def compute(weightsOlds: Vector, gradient: Vector, stepSize: Double, iter: Int):
(Vector, Double)
}

class SimpleUpdater extends Updater {
override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix,
stepSize: Double, iter: Int): (DoubleMatrix, Double) = {
val normGradient = gradient.mul(stepSize / math.sqrt(iter))
(weightsOld.sub(normGradient), 0)
override def compute(weightsOld: Vector, gradient: Vector,
stepSize: Double, iter: Int): (Vector, Double) = {
val normGradient = gradient * stepSize / math.sqrt(iter)
(weightsOld - normGradient, 0)
}
}
9 changes: 5 additions & 4 deletions mllib/src/main/scala/spark/mllib/recommendation/ALS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import spark.{HashPartitioner, Partitioner, SparkContext, RDD}
import spark.storage.StorageLevel
import spark.KryoRegistrator
import spark.SparkContext._
import spark.mllib.math.vector.{Vector, DenseVector}

import com.esotericsoftware.kryo.Kryo
import org.jblas.{DoubleMatrix, SimpleBlas, Solve}
Expand Down Expand Up @@ -136,10 +137,10 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l

// Flatten and cache the two final RDDs to un-block them
val usersOut = users.join(userOutLinks).flatMap { case (b, (factors, outLinkBlock)) =>
for (i <- 0 until factors.length) yield (outLinkBlock.elementIds(i), factors(i))
for (i <- 0 until factors.length) yield (outLinkBlock.elementIds(i), new DenseVector(factors(i)).asInstanceOf[Vector])
}
val productsOut = products.join(productOutLinks).flatMap { case (b, (factors, outLinkBlock)) =>
for (i <- 0 until factors.length) yield (outLinkBlock.elementIds(i), factors(i))
for (i <- 0 until factors.length) yield (outLinkBlock.elementIds(i), new DenseVector(factors(i)).asInstanceOf[Vector])
}

usersOut.persist()
Expand Down Expand Up @@ -426,9 +427,9 @@ object ALS {
(fields(0).toInt, fields(1).toInt, fields(2).toDouble)
}
val model = ALS.train(ratings, rank, iters, 0.01, blocks)
model.userFeatures.map{ case (id, vec) => id + "," + vec.mkString(" ") }
model.userFeatures.map{ case (id, vec) => id + "," + vec.toArray.mkString(" ") }
.saveAsTextFile(outputDir + "/userFeatures")
model.productFeatures.map{ case (id, vec) => id + "," + vec.mkString(" ") }
model.productFeatures.map{ case (id, vec) => id + "," + vec.toArray.mkString(" ") }
.saveAsTextFile(outputDir + "/productFeatures")
println("Final user/product features written to " + outputDir)
System.exit(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,21 @@ package spark.mllib.recommendation

import spark.RDD
import spark.SparkContext._
import spark.mllib.math.vector.{Vector, DenseVector}

import org.jblas._

class MatrixFactorizationModel(
val rank: Int,
val userFeatures: RDD[(Int, Array[Double])],
val productFeatures: RDD[(Int, Array[Double])])
val userFeatures: RDD[(Int, Vector)],
val productFeatures: RDD[(Int, Vector)])
extends Serializable
{
/** Predict the rating of one user for one product. */
def predict(user: Int, product: Int): Double = {
val userVector = new DoubleMatrix(userFeatures.lookup(user).head)
val productVector = new DoubleMatrix(productFeatures.lookup(product).head)
userVector.dot(productVector)
val userVector = userFeatures.lookup(user).head
val productVector = productFeatures.lookup(product).head
userVector * productVector
}

// TODO: Figure out what good bulk prediction methods would look like.
Expand Down
Loading