From 5bffd3d29ba0c601e91cfb0818b35eed1c8230ff Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Mon, 24 Nov 2014 15:45:04 -0800 Subject: [PATCH 1/6] first commit --- .../spark/mllib/feature/StandardScaler.scala | 55 ++++++++++++------- 1 file changed, 36 insertions(+), 19 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala index 4dfd1f0ab813..0343941ce6d4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala @@ -17,11 +17,9 @@ package org.apache.spark.mllib.feature -import breeze.linalg.{DenseVector => BDV, SparseVector => BSV} - import org.apache.spark.Logging import org.apache.spark.annotation.Experimental -import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors} import org.apache.spark.mllib.rdd.RDDFunctions._ import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer import org.apache.spark.rdd.RDD @@ -77,8 +75,8 @@ class StandardScalerModel private[mllib] ( require(mean.size == variance.size) - private lazy val factor: BDV[Double] = { - val f = BDV.zeros[Double](variance.size) + private lazy val factor: Array[Double] = { + val f = Array.ofDim[Double](variance.size) var i = 0 while (i < f.size) { f(i) = if (variance(i) != 0.0) 1.0 / math.sqrt(variance(i)) else 0.0 @@ -87,6 +85,8 @@ class StandardScalerModel private[mllib] ( f } + private lazy val shift: Array[Double] = mean.toArray + /** * Applies standardization transformation on a vector. * @@ -96,31 +96,48 @@ class StandardScalerModel private[mllib] ( */ override def transform(vector: Vector): Vector = { require(mean.size == vector.size) + val localFactor = factor if (withMean) { - vector.toBreeze match { - case dv: BDV[Double] => - val output = vector.toBreeze.copy + val localShift = shift + vector match { + case dv: DenseVector => + val values = dv.values.clone() var i = 0 - while (i < output.length) { - output(i) = (output(i) - mean(i)) * (if (withStd) factor(i) else 1.0) - i += 1 + if(withStd) { + while (i < values.length) { + values(i) = (values(i) - localShift(i)) * localFactor(i) + i += 1 + } + } else { + while (i < values.length) { + values(i) -= localShift(i) + i += 1 + } } - Vectors.fromBreeze(output) + Vectors.dense(values) case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass) } } else if (withStd) { - vector.toBreeze match { - case dv: BDV[Double] => Vectors.fromBreeze(dv :* factor) - case sv: BSV[Double] => + vector match { + case dv: DenseVector => + val values = dv.values.clone() + var i = 0 + while(i < values.length) { + values(i) *= localFactor(i) + i += 1 + } + Vectors.dense(values) + case sv: SparseVector => // For sparse vector, the `index` array inside sparse vector object will not be changed, // so we can re-use it to save memory. - val output = new BSV[Double](sv.index, sv.data.clone(), sv.length) + val indices = sv.indices + val values = sv.values.clone() var i = 0 - while (i < output.data.length) { - output.data(i) *= factor(output.index(i)) + while (i < indices.length) { + values(i) *= localFactor(indices(i)) i += 1 } - Vectors.fromBreeze(output) + Vectors.sparse(sv.size, indices, values) case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass) } } else { From fc795e4d7f8792ce4ad70b7ecb41531556669983 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Mon, 24 Nov 2014 16:06:22 -0800 Subject: [PATCH 2/6] update --- .../scala/org/apache/spark/mllib/feature/StandardScaler.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala index 0343941ce6d4..ac3851b8703f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala @@ -96,7 +96,6 @@ class StandardScalerModel private[mllib] ( */ override def transform(vector: Vector): Vector = { require(mean.size == vector.size) - val localFactor = factor if (withMean) { val localShift = shift vector match { @@ -104,6 +103,7 @@ class StandardScalerModel private[mllib] ( val values = dv.values.clone() var i = 0 if(withStd) { + val localFactor = factor while (i < values.length) { values(i) = (values(i) - localShift(i)) * localFactor(i) i += 1 @@ -118,6 +118,7 @@ class StandardScalerModel private[mllib] ( case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass) } } else if (withStd) { + val localFactor = factor vector match { case dv: DenseVector => val values = dv.values.clone() From 9c51eefd381c89d3b35e5fdde8b5a607ed1545f0 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Mon, 24 Nov 2014 16:43:24 -0800 Subject: [PATCH 3/6] style --- .../scala/org/apache/spark/mllib/feature/StandardScaler.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala index ac3851b8703f..94b30276add0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala @@ -102,7 +102,7 @@ class StandardScalerModel private[mllib] ( case dv: DenseVector => val values = dv.values.clone() var i = 0 - if(withStd) { + if (withStd) { val localFactor = factor while (i < values.length) { values(i) = (values(i) - localShift(i)) * localFactor(i) From cdb5cefc89548e6d040b06765cce7b684c7854a9 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Mon, 24 Nov 2014 17:18:30 -0800 Subject: [PATCH 4/6] small change --- .../apache/spark/mllib/feature/StandardScaler.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala index 94b30276add0..70193d4c89d1 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala @@ -101,15 +101,16 @@ class StandardScalerModel private[mllib] ( vector match { case dv: DenseVector => val values = dv.values.clone() + val size = values.size var i = 0 if (withStd) { val localFactor = factor - while (i < values.length) { + while (i < size) { values(i) = (values(i) - localShift(i)) * localFactor(i) i += 1 } } else { - while (i < values.length) { + while (i < size) { values(i) -= localShift(i) i += 1 } @@ -122,8 +123,9 @@ class StandardScalerModel private[mllib] ( vector match { case dv: DenseVector => val values = dv.values.clone() + val size = values.size var i = 0 - while(i < values.length) { + while(i < size) { values(i) *= localFactor(i) i += 1 } @@ -133,8 +135,9 @@ class StandardScalerModel private[mllib] ( // so we can re-use it to save memory. val indices = sv.indices val values = sv.values.clone() + val size = values.size var i = 0 - while (i < indices.length) { + while (i < size) { values(i) *= localFactor(indices(i)) i += 1 } From daf2b0670a1e2311f50f3312aa688bcd9b26b099 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Mon, 24 Nov 2014 21:27:30 -0800 Subject: [PATCH 5/6] Address the feedback --- .../spark/mllib/feature/StandardScaler.scala | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala index 70193d4c89d1..bebd8cc1f3c4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala @@ -85,7 +85,7 @@ class StandardScalerModel private[mllib] ( f } - private lazy val shift: Array[Double] = mean.toArray + private val shift: Array[Double] = mean.toArray /** * Applies standardization transformation on a vector. @@ -97,19 +97,24 @@ class StandardScalerModel private[mllib] ( override def transform(vector: Vector): Vector = { require(mean.size == vector.size) if (withMean) { + // By default, Scala generates Java methods for member variables. So every time when + // the member variables are accessed, `invokespecial` will be called which is expensive. + // This can be avoid by having a local reference of `shift`. val localShift = shift vector match { case dv: DenseVector => val values = dv.values.clone() val size = values.size - var i = 0 if (withStd) { + // Having a local reference of `factor` to avoid overhead as the comment before. val localFactor = factor + var i = 0 while (i < size) { values(i) = (values(i) - localShift(i)) * localFactor(i) i += 1 } } else { + var i = 0 while (i < size) { values(i) -= localShift(i) i += 1 @@ -119,6 +124,7 @@ class StandardScalerModel private[mllib] ( case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass) } } else if (withStd) { + // Having a local reference of `factor` to avoid overhead as the comment before. val localFactor = factor vector match { case dv: DenseVector => @@ -135,9 +141,9 @@ class StandardScalerModel private[mllib] ( // so we can re-use it to save memory. val indices = sv.indices val values = sv.values.clone() - val size = values.size + val nnz = values.size var i = 0 - while (i < size) { + while (i < nnz) { values(i) *= localFactor(indices(i)) i += 1 } From 85885a98125e41a323212499eb7a8c6895f8c252 Mon Sep 17 00:00:00 2001 From: DB Tsai Date: Mon, 24 Nov 2014 23:57:14 -0800 Subject: [PATCH 6/6] revert to have lazy in shift array. --- .../org/apache/spark/mllib/feature/StandardScaler.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala index bebd8cc1f3c4..8c4c5db5258d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala @@ -85,7 +85,10 @@ class StandardScalerModel private[mllib] ( f } - private val shift: Array[Double] = mean.toArray + // Since `shift` will be only used in `withMean` branch, we have it as + // `lazy val` so it will be evaluated in that branch. Note that we don't + // want to create this array multiple times in `transform` function. + private lazy val shift: Array[Double] = mean.toArray /** * Applies standardization transformation on a vector.