Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Finished SPARK-4431
  • Loading branch information
DB Tsai committed Nov 20, 2014
commit 6441f929f2e302b1c11cc53aaef10598c7397deb
98 changes: 98 additions & 0 deletions mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,22 @@ sealed trait Vector extends Serializable {
def copy: Vector = {
throw new NotImplementedError(s"copy is not implemented for ${this.getClass}.")
}

/**
* It will return the iterator for the active elements of dense and sparse vector as
* (index, value) pair. Note that foreach method can be overridden for better performance
* in different vector implementation.
*
* @param skippingZeros Skipping zero elements explicitly if true. It will be useful when we
* iterator through dense vector having lots of zero elements which
* we want to skip. Default is false.
* @return Iterator[(Int, Double)] where the first element in the tuple is the index,
* and the second element is the corresponding value.
*/
private[spark] def activeIterator(skippingZeros: Boolean): Iterator[(Int, Double)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need skippingZeros here because it is very easy to chain the iterator with a filter to achieve it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

skippingZeros will be very useful in foreach operation, and if you use iterator -> filter -> foreach, it will not use the optimized foreach which is implemented by native while loop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the following code should have the same performance:

vec.foreach { (i, v) =>
  if (v != 0.0) {
    ...
  }
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the following code,

sample.activeIterator(false).foreach {
  case (index, value) => if(value != 0.0) add(index, value)
}

It takes 61.809 for dense vector, and 54.626 for sparse vector.

The most expensive part is calling the anonymous function even when the values are zero.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, the issue is in the anonymous function. Basically, scala will convert primitive index: Int and value: Double into boxed object in order to have them in tuple. In my testing dataset, there are so many zeros explicitly, and even those values with zero have to be converted to tuple before we do the if statement. That's why it's dramatically faster if we do the if statement before calling the anonymous function.

Changing the signature of foreach into

def foreach[@specialized(Unit) U](f: (Int, Double) => U)

to take two primitive variables will solve this problem, but it will not comply the interface of foreach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tuple2[Int, Double] is specialized in Scala: https://github.com/scala/scala/blob/2.10.x/src/library/scala/Tuple2.scala, but the iterator interface still won't be high-performance for numerical computation. The iterator interface is not used in this PR, as we only need foreach. Then the best for us is defining foreach directly:

def foreach(f: (Int, Double) => Unit)

( We could implement ZippedTraversable2 but it doesn't seem to be necessary.)

or

def foreach(skipZeros = True)(f: (Int, Double) => Unit)

This is a private function. We can change it when we see more use cases.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right; the Tuple2[Int, Double] is specialized, and I mistakenly interpreted the bytecode.
For the flowing scala code,

def foreach[@specialized(Unit) U](f: ((Int, Double)) => U) {
  var i = 0
  val localValuesSize = values.size
  val localValues = values
  while (i < localValuesSize) {
    f(i, localValues(i))
    i += 1
  }
}

the generated bytecode will be

  public foreach(Lscala/Function1;)V
   L0
    LINENUMBER 296 L0
    ICONST_0
    ISTORE 2
   L1
    LINENUMBER 297 L1
    GETSTATIC scala/Predef$.MODULE$ : Lscala/Predef$;
    ALOAD 0
    INVOKEVIRTUAL org/apache/spark/mllib/linalg/DenseVector.values ()[D
    INVOKEVIRTUAL scala/Predef$.doubleArrayOps ([D)Lscala/collection/mutable/ArrayOps;
    INVOKEINTERFACE scala/collection/mutable/ArrayOps.size ()I
    ISTORE 3
   L2
    LINENUMBER 298 L2
    ALOAD 0
    INVOKEVIRTUAL org/apache/spark/mllib/linalg/DenseVector.values ()[D
    ASTORE 4
   L3
    LINENUMBER 299 L3
   FRAME APPEND [I I [D]
    ILOAD 2
    ILOAD 3
    IF_ICMPGE L4
   L5
    LINENUMBER 300 L5
    ALOAD 1
    NEW scala/Tuple2$mcID$sp
    DUP
    ILOAD 2
    ALOAD 4
    ILOAD 2
    DALOAD
    INVOKESPECIAL scala/Tuple2$mcID$sp.<init> (ID)V
    INVOKEINTERFACE scala/Function1.apply (Ljava/lang/Object;)Ljava/lang/Object;
    POP
   L6
    LINENUMBER 301 L6
    ILOAD 2
    ICONST_1
    IADD
    ISTORE 2
    GOTO L3

However,

    INVOKESPECIAL scala/Tuple2$mcID$sp.<init> (ID)V
    INVOKEINTERFACE scala/Function1.apply (Ljava/lang/Object;)Ljava/lang/Object;

is expensive, so that's why checking zero in the anonymous function will slow down the whole thing.

I agree with you, the iterator is slow by nature, and we are only interested in foreach implementation. I'll remove the iterator, and just have foreach method in vector.


private[spark] def activeIterator: Iterator[(Int, Double)] = activeIterator(false)

}

/**
Expand Down Expand Up @@ -273,6 +289,47 @@ class DenseVector(val values: Array[Double]) extends Vector {
override def copy: DenseVector = {
new DenseVector(values.clone())
}

private[spark] override def activeIterator(skippingZeros: Boolean) = new Iterator[(Int, Double)] {
private var i = 0
private val valuesSize = values.size

// If zeros are asked to be explicitly skipped, the parent `size` method is called to count
// the number of nonzero elements using `hasNext` and `next` methods.
override lazy val size: Int = if (skippingZeros) super.size else valuesSize

override def hasNext = {
if (skippingZeros) {
var found = false
while (!found && i < valuesSize) if (values(i) != 0.0) found = true else i += 1
}
i < valuesSize
}

override def next = {
val result = (i, values(i))
i += 1
result
}

override def foreach[@specialized(Unit) U](f: ((Int, Double)) => U) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option is force the return type to be Unit. Scala compiler will know how to handle f with other return types.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. In scala's range code, they have

@inline final override def foreach[@specialized(Unit) U](f: Int => U)

I'll do a bytecode analysis, and see if it will generate the same bytecode.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, the generated bytecode of both approaches are the same.

var i = 0
if (skippingZeros) {
while (i < valuesSize) {
if (values(i) != 0.0) {
f(i, values(i))
}
i += 1
}
} else {
while (i < valuesSize) {
f(i, values(i))
i += 1
}
}
}
}

}

/**
Expand Down Expand Up @@ -309,4 +366,45 @@ class SparseVector(
}

private[mllib] override def toBreeze: BV[Double] = new BSV[Double](indices, values, size)

private[spark] override def activeIterator(skippingZeros: Boolean) = new Iterator[(Int, Double)] {
private var i = 0
private val valuesSize = values.size

// If zeros are asked to be explicitly skipped, the parent `size` method is called to count
// the number of nonzero elements using `hasNext` and `next` methods.
override lazy val size: Int = if (skippingZeros) super.size else valuesSize

def hasNext = {
if (skippingZeros) {
var found = false
while (!found && i < valuesSize) if (values(i) != 0.0) found = true else i += 1
}
i < valuesSize
}

def next = {
val result = (indices(i), values(i))
i += 1
result
}

override def foreach[@specialized(Unit) U](f: ((Int, Double)) => U) {
var i = 0
if (skippingZeros) {
while (i < valuesSize) {
if (values(i) != 0.0) {
f(indices(i), values(i))
}
i += 1
}
} else {
while (i < valuesSize) {
f(indices(i), values(i))
i += 1
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,21 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
* Adds input value to position i.
*/
private[this] def add(i: Int, value: Double) = {
if (value != 0.0) {
if (currMax(i) < value) {
currMax(i) = value
}
if (currMin(i) > value) {
currMin(i) = value
}
if (currMax(i) < value) {
currMax(i) = value
}
if (currMin(i) > value) {
currMin(i) = value
}

val prevMean = currMean(i)
val diff = value - prevMean
currMean(i) = prevMean + diff / (nnz(i) + 1.0)
currM2n(i) += (value - currMean(i)) * diff
currM2(i) += value * value
currL1(i) += math.abs(value)
val prevMean = currMean(i)
val diff = value - prevMean
currMean(i) = prevMean + diff / (nnz(i) + 1.0)
currM2n(i) += (value - currMean(i)) * diff
currM2(i) += value * value
currL1(i) += math.abs(value)

nnz(i) += 1.0
}
nnz(i) += 1.0
}

/**
Expand All @@ -95,21 +93,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
require(n == sample.size, s"Dimensions mismatch when adding new sample." +
s" Expecting $n but got ${sample.size}.")

sample match {
case dv: DenseVector => {
var j = 0
while (j < dv.size) {
add(j, dv.values(j))
j += 1
}
}
case sv: SparseVector =>
var j = 0
while (j < sv.indices.size) {
add(sv.indices(j), sv.values(j))
j += 1
}
case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass)
sample.activeIterator(true).foreach {
case (index, value) => add(index, value)
}

totalCnt += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,80 @@ class VectorsSuite extends FunSuite {
val v = Vectors.fromBreeze(x(::, 0))
assert(v.size === x.rows)
}

test("activeIterator") {
val dv = Vectors.dense(0.0, 1.2, 3.1, 0.0)
val sv = Vectors.sparse(4, Seq((1, 1.2), (2, 3.1), (3, 0.0)))

// Testing if the size of iterator is correct when the zeros are explicitly skipped.
// The default setting will not skip any zero explicitly.
assert(dv.activeIterator.size === 4)
assert(dv.activeIterator(false).size === 4)
assert(dv.activeIterator(true).size === 2)

assert(sv.activeIterator.size === 3)
assert(sv.activeIterator(false).size === 3)
assert(sv.activeIterator(true).size === 2)

// Testing `hasNext` and `next`
val dvIter1 = dv.activeIterator(false)
assert(dvIter1.hasNext === true && dvIter1.next === (0, 0.0))
assert(dvIter1.hasNext === true && dvIter1.next === (1, 1.2))
assert(dvIter1.hasNext === true && dvIter1.next === (2, 3.1))
assert(dvIter1.hasNext === true && dvIter1.next === (3, 0.0))
assert(dvIter1.hasNext === false)

val dvIter2 = dv.activeIterator(true)
assert(dvIter2.hasNext === true && dvIter2.next === (1, 1.2))
assert(dvIter2.hasNext === true && dvIter2.next === (2, 3.1))
assert(dvIter2.hasNext === false)

val svIter1 = sv.activeIterator(false)
assert(svIter1.hasNext === true && svIter1.next === (1, 1.2))
assert(svIter1.hasNext === true && svIter1.next === (2, 3.1))
assert(svIter1.hasNext === true && svIter1.next === (3, 0.0))
assert(svIter1.hasNext === false)

val svIter2 = sv.activeIterator(true)
assert(svIter2.hasNext === true && svIter2.next === (1, 1.2))
assert(svIter2.hasNext === true && svIter2.next === (2, 3.1))
assert(svIter2.hasNext === false)

// Testing `foreach`
val dvMap1 = scala.collection.mutable.Map[Int, Double]()
dvIter1.foreach{
case (index, value) => dvMap1.put(index, value)
}
assert(dvMap1.size === 4)
assert(dvMap1.get(0) === Some(0.0))
assert(dvMap1.get(1) === Some(1.2))
assert(dvMap1.get(2) === Some(3.1))
assert(dvMap1.get(3) === Some(0.0))

val dvMap2 = scala.collection.mutable.Map[Int, Double]()
dvIter2.foreach{
case (index, value) => dvMap2.put(index, value)
}
assert(dvMap2.size === 2)
assert(dvMap2.get(1) === Some(1.2))
assert(dvMap2.get(2) === Some(3.1))

val svMap1 = scala.collection.mutable.Map[Int, Double]()
dvIter1.foreach{
case (index, value) => svMap1.put(index, value)
}
assert(svMap1.size === 4)
assert(svMap1.get(1) === Some(1.2))
assert(svMap1.get(2) === Some(3.1))
assert(svMap1.get(3) === Some(0.0))

val svMap2 = scala.collection.mutable.Map[Int, Double]()
svIter2.foreach{
case (index, value) => svMap2.put(index, value)
}
assert(svMap2.size === 2)
assert(svMap2.get(1) === Some(1.2))
assert(svMap2.get(2) === Some(3.1))

}
}