From ff17423bd714592d38b69df426382838216cd133 Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Fri, 25 Jul 2014 12:31:35 -0700 Subject: [PATCH 01/14] WIP --- .../mllib/stat/test/ChiSquaredTest.scala | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala new file mode 100644 index 000000000000..c01791b824ea --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.test + +class ChiSquaredTest { + +} From 6598379979e1ed69de6956ebf56ad0b7b47029bf Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Fri, 25 Jul 2014 15:29:08 -0700 Subject: [PATCH 02/14] API and code structure. --- .../apache/spark/mllib/stat/Statistics.scala | 12 +++- .../mllib/stat/test/ChiSquaredTest.scala | 17 +++++- .../spark/mllib/stat/test/TestResult.scala | 60 +++++++++++++++++++ 3 files changed, 87 insertions(+), 2 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala index 68f3867ba6c1..6bc6388d5f5f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala @@ -20,10 +20,12 @@ package org.apache.spark.mllib.stat import org.apache.spark.annotation.Experimental import org.apache.spark.mllib.linalg.{Matrix, Vector} import org.apache.spark.mllib.stat.correlation.Correlations +import org.apache.spark.mllib.stat.test.{ChiSquaredTest, ChiSquaredTestResult} import org.apache.spark.rdd.RDD /** - * API for statistical functions in MLlib + * :: Experimental :: + * API for statistical functions in MLLib */ @Experimental object Statistics { @@ -75,4 +77,12 @@ object Statistics { * specified method. */ def corr(x: RDD[Double], y: RDD[Double], method: String): Double = Correlations.corr(x, y, method) + + def chiSquared(x: RDD[Double], y: RDD[Double], method: String): ChiSquaredTestResult = { + ChiSquaredTest.chiSquared(x, y, method) + } + + def chiSquared(x: RDD[Double], y: RDD[Double]): ChiSquaredTestResult = { + ChiSquaredTest.chiSquared(x, y) + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala index c01791b824ea..e79934c56ede 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala @@ -17,6 +17,21 @@ package org.apache.spark.mllib.stat.test -class ChiSquaredTest { +import org.apache.spark.rdd.RDD + +private[stat] object ChiSquaredTest { + + val PEARSON = "pearson" + + def chiSquared(x: RDD[Double], + y: RDD[Double], + method: String = PEARSON): ChiSquaredTestResult = { + method match { + case PEARSON => chiSquaredPearson(x, y) + case _ => throw new IllegalArgumentException("Unrecognized method for Chi squared test.") + } + } + + private def chiSquaredPearson(x: RDD[Double], y: RDD[Double]): ChiSquaredTestResult = ??? } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala new file mode 100644 index 000000000000..e4c613614c67 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.mllib.stat.test + +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * Trait for hypothesis test results. + */ +@Experimental +trait TestResult { + + def pValue: Double + + def degreesOfFreedom: Array[Int] + + def statistic: Double + + /** + * Returns a String explaining the hypothesis test result. + * Specific classes implementing this trait should override this method to output test-specific + * information. + */ + def explain: String = { + s"pValue = $pValue \n" + + s"degrees of freedom = $degreesOfFreedom \n" + + s"statistic = $statistic" + } +} + +/** + * :: Experimental :: + */ +@Experimental +class ChiSquaredTestResult(override val pValue: Double, + override val degreesOfFreedom: Array[Int], + override val statistic: Double, + val method: String) extends TestResult { + + override def explain: String = { + "Chi squared test summary: \n" + + s"method: $method \n" + + super.explain + } +} From 706d436aea3db8b8cf15db0bcccb25e19c121a78 Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Fri, 25 Jul 2014 15:38:07 -0700 Subject: [PATCH 03/14] Added API for RDD[Vector] --- .../org/apache/spark/mllib/stat/Statistics.scala | 6 ++++++ .../apache/spark/mllib/stat/test/ChiSquaredTest.scala | 11 +++++++++++ 2 files changed, 17 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala index 6bc6388d5f5f..2d5d2657bc1d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala @@ -85,4 +85,10 @@ object Statistics { def chiSquared(x: RDD[Double], y: RDD[Double]): ChiSquaredTestResult = { ChiSquaredTest.chiSquared(x, y) } + + def chiSquared(X: RDD[Vector], method: String): ChiSquaredTestResult = { + ChiSquaredTest.chiSquared(X, method) + } + + def chiSquared(X: RDD[Vector]): ChiSquaredTestResult = ChiSquaredTest.chiSquared(X) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala index e79934c56ede..34dee8026bbe 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala @@ -18,6 +18,7 @@ package org.apache.spark.mllib.stat.test import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.Vector private[stat] object ChiSquaredTest { @@ -32,6 +33,16 @@ private[stat] object ChiSquaredTest { } } + def chiSquared(X: RDD[Vector], + method: String = PEARSON): ChiSquaredTestResult = { + method match { + case PEARSON => chiSquaredPearson(X) + case _ => throw new IllegalArgumentException("Unrecognized method for Chi squared test.") + } + } + private def chiSquaredPearson(x: RDD[Double], y: RDD[Double]): ChiSquaredTestResult = ??? + private def chiSquaredPearson(X: RDD[Vector]): ChiSquaredTestResult = ??? + } From 3d615828a913b341c9fc7afe6e371f3950d591ab Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Fri, 25 Jul 2014 15:54:23 -0700 Subject: [PATCH 04/14] input names --- .../apache/spark/mllib/stat/Statistics.scala | 19 ++++++++++++------- .../mllib/stat/test/ChiSquaredTest.scala | 16 ++++++++-------- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala index 2d5d2657bc1d..37a93a89b577 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala @@ -78,17 +78,22 @@ object Statistics { */ def corr(x: RDD[Double], y: RDD[Double], method: String): Double = Correlations.corr(x, y, method) - def chiSquared(x: RDD[Double], y: RDD[Double], method: String): ChiSquaredTestResult = { - ChiSquaredTest.chiSquared(x, y, method) + // Technically these should be RDD[Long] since the data should be counts + def chiSquared(expected: RDD[Double], + observed: RDD[Double], + method: String): ChiSquaredTestResult = { + ChiSquaredTest.chiSquared(expected, observed, method) } - def chiSquared(x: RDD[Double], y: RDD[Double]): ChiSquaredTestResult = { - ChiSquaredTest.chiSquared(x, y) + def chiSquared(expected: RDD[Double], observed: RDD[Double]): ChiSquaredTestResult = { + ChiSquaredTest.chiSquared(expected, observed) } - def chiSquared(X: RDD[Vector], method: String): ChiSquaredTestResult = { - ChiSquaredTest.chiSquared(X, method) + // Same here. It should be something like RDD[Array[Long]] for counts instead, but I don't know + // if we should be consistent about how a "matrix" is presented + def chiSquared(counts: RDD[Vector], method: String): ChiSquaredTestResult = { + ChiSquaredTest.chiSquared(counts, method) } - def chiSquared(X: RDD[Vector]): ChiSquaredTestResult = ChiSquaredTest.chiSquared(X) + def chiSquared(counts: RDD[Vector]): ChiSquaredTestResult = ChiSquaredTest.chiSquared(counts) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala index 34dee8026bbe..5be8c1bb695e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala @@ -24,25 +24,25 @@ private[stat] object ChiSquaredTest { val PEARSON = "pearson" - def chiSquared(x: RDD[Double], - y: RDD[Double], + def chiSquared(expected: RDD[Double], + observed: RDD[Double], method: String = PEARSON): ChiSquaredTestResult = { method match { - case PEARSON => chiSquaredPearson(x, y) + case PEARSON => chiSquaredPearson(expected, observed) case _ => throw new IllegalArgumentException("Unrecognized method for Chi squared test.") } } - def chiSquared(X: RDD[Vector], - method: String = PEARSON): ChiSquaredTestResult = { + def chiSquared(counts: RDD[Vector], method: String = PEARSON): ChiSquaredTestResult = { method match { - case PEARSON => chiSquaredPearson(X) + case PEARSON => chiSquaredPearson(counts) case _ => throw new IllegalArgumentException("Unrecognized method for Chi squared test.") } } - private def chiSquaredPearson(x: RDD[Double], y: RDD[Double]): ChiSquaredTestResult = ??? + private def chiSquaredPearson(expected: RDD[Double], + observed: RDD[Double]): ChiSquaredTestResult = ??? - private def chiSquaredPearson(X: RDD[Vector]): ChiSquaredTestResult = ??? + private def chiSquaredPearson(counts: RDD[Vector]): ChiSquaredTestResult = ??? } From e6b83f35375701f71f699697a83236e7e0c76d6c Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Fri, 1 Aug 2014 13:33:04 -0700 Subject: [PATCH 05/14] reviewer comments --- .../org/apache/spark/mllib/stat/test/TestResult.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala index e4c613614c67..780f4dcad60f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala @@ -36,8 +36,8 @@ trait TestResult { * Specific classes implementing this trait should override this method to output test-specific * information. */ - def explain: String = { - s"pValue = $pValue \n" + + override def toString: String = { + s"pValue = $pValue \n" + // TODO explain what pValue is s"degrees of freedom = $degreesOfFreedom \n" + s"statistic = $statistic" } @@ -47,14 +47,14 @@ trait TestResult { * :: Experimental :: */ @Experimental -class ChiSquaredTestResult(override val pValue: Double, +case class ChiSquaredTestResult(override val pValue: Double, override val degreesOfFreedom: Array[Int], override val statistic: Double, val method: String) extends TestResult { - override def explain: String = { + override def toString: String = { "Chi squared test summary: \n" + s"method: $method \n" + - super.explain + super.toString } } From 4e4e36199aa81d9d1628322c499e40556fbdc6ef Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Fri, 1 Aug 2014 19:15:57 -0700 Subject: [PATCH 06/14] WIP --- .../mllib/stat/test/ChiSquaredTest.scala | 43 +++++++++++++++++-- .../mllib/stat/HypothesisTestSuite.scala | 28 ++++++++++++ 2 files changed, 67 insertions(+), 4 deletions(-) create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala index 5be8c1bb695e..638d65686dd3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala @@ -17,8 +17,10 @@ package org.apache.spark.mllib.stat.test +import cern.jet.stat.Probability.chiSquareComplemented + import org.apache.spark.rdd.RDD -import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.linalg.{DenseVector, Vector} private[stat] object ChiSquaredTest { @@ -40,9 +42,42 @@ private[stat] object ChiSquaredTest { } } - private def chiSquaredPearson(expected: RDD[Double], - observed: RDD[Double]): ChiSquaredTestResult = ??? + private def chiSquaredPearson(x: RDD[Double], + y: RDD[Double]): ChiSquaredTestResult = { + val mat: RDD[Vector] = x.zip(y).map { case (xi, yi) => new DenseVector(Array(xi, yi)) } + chiSquaredPearson(mat) + } + + private def chiSquaredPearson(counts: RDD[Vector]): ChiSquaredTestResult = { + val numCols = counts.first.size + val colSums = new Array[Double](numCols) + var result = (colSums, 0) // second value is for count of vectors in the RDD + + // Make two passes over the RDD with the first pass for collecting column sums + // TODO check that the counts are all non-negative in this pass + counts.aggregate(result)( + (sums, vector) => ((sums._1, vector.toArray).zipped.map(_ + _), sums._2 + 1), // seqOp + (sums1, sums2) => ((sums1._1, sums2._1).zipped.map(_ + _), sums1._2 + sums2._2)) // combOp + + val total = colSums.sum - private def chiSquaredPearson(counts: RDD[Vector]): ChiSquaredTestResult = ??? + // Second pass to compute chi-squared statistic + val statistic = counts.aggregate(0.0)(rowStatistic(colSums, total), _ + _) + val df = (numCols - 1) * (result._2 - 1) + val pValue = chiSquareComplemented(statistic, df) + new ChiSquaredTestResult(pValue, Array(df), statistic, PEARSON) + } + + // curried function to be used as seqOp in the aggregate operation to collect statistic + private def rowStatistic(colSums: Array[Double], total: Double) = { + (statistic: Double, vector: Vector) => { + val arr = vector.toArray + val rowSum = arr.sum + (arr, colSums).zipped.foldLeft(statistic) { case (stat, (observed, colSum)) => + val expected = rowSum * colSum / total + stat + (observed - expected) * (observed - expected) / expected + } + } + } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala new file mode 100644 index 000000000000..8a7e4ca7ebf9 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat + +import org.scalatest.FunSuite + +import org.apache.spark.mllib.util.LocalSparkContext + +class HypothesisTestSuite extends FunSuite with LocalSparkContext { + test("chi squared") { + + } +} From bc7eb2eeba4e2ccf10b891e4ce59db55823cea3b Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Fri, 1 Aug 2014 20:48:05 -0700 Subject: [PATCH 07/14] unit passed; still need docs and some refactoring --- .../apache/spark/mllib/stat/Statistics.scala | 18 ++++++++----- .../mllib/stat/test/ChiSquaredTest.scala | 27 ++++++++++++------- .../spark/mllib/stat/test/TestResult.scala | 2 +- .../mllib/stat/HypothesisTestSuite.scala | 9 ++++++- 4 files changed, 37 insertions(+), 19 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala index e53b17527f1e..58afc76c3d34 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala @@ -91,22 +91,26 @@ object Statistics { @Experimental def corr(x: RDD[Double], y: RDD[Double], method: String): Double = Correlations.corr(x, y, method) - // Technically these should be RDD[Long] since the data should be counts - def chiSquared(expected: RDD[Double], - observed: RDD[Double], - method: String): ChiSquaredTestResult = { - ChiSquaredTest.chiSquared(expected, observed, method) + // Technically input should be RDD[Long] since the data should be counts + @Experimental + def chiSquared(x: RDD[Double], y: RDD[Double], method: String): ChiSquaredTestResult = { + ChiSquaredTest.chiSquared(x, y, method) } + @Experimental def chiSquared(expected: RDD[Double], observed: RDD[Double]): ChiSquaredTestResult = { ChiSquaredTest.chiSquared(expected, observed) } // Same here. It should be something like RDD[Array[Long]] for counts instead, but I don't know // if we should be consistent about how a "matrix" is presented + @Experimental def chiSquared(counts: RDD[Vector], method: String): ChiSquaredTestResult = { - ChiSquaredTest.chiSquared(counts, method) + ChiSquaredTest.chiSquaredMatrix(counts, method) } - def chiSquared(counts: RDD[Vector]): ChiSquaredTestResult = ChiSquaredTest.chiSquared(counts) + @Experimental + def chiSquared(counts: RDD[Vector]): ChiSquaredTestResult = { + ChiSquaredTest.chiSquaredMatrix(counts) + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala index 638d65686dd3..8d7cafde4072 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala @@ -17,7 +17,7 @@ package org.apache.spark.mllib.stat.test -import cern.jet.stat.Probability.chiSquareComplemented +import cern.jet.stat.Probability.chiSquare import org.apache.spark.rdd.RDD import org.apache.spark.mllib.linalg.{DenseVector, Vector} @@ -35,7 +35,7 @@ private[stat] object ChiSquaredTest { } } - def chiSquared(counts: RDD[Vector], method: String = PEARSON): ChiSquaredTestResult = { + def chiSquaredMatrix(counts: RDD[Vector], method: String = PEARSON): ChiSquaredTestResult = { method match { case PEARSON => chiSquaredPearson(counts) case _ => throw new IllegalArgumentException("Unrecognized method for Chi squared test.") @@ -48,23 +48,29 @@ private[stat] object ChiSquaredTest { chiSquaredPearson(mat) } + // Makes two passes over the RDD total private def chiSquaredPearson(counts: RDD[Vector]): ChiSquaredTestResult = { val numCols = counts.first.size - val colSums = new Array[Double](numCols) - var result = (colSums, 0) // second value is for count of vectors in the RDD - // Make two passes over the RDD with the first pass for collecting column sums - // TODO check that the counts are all non-negative in this pass - counts.aggregate(result)( - (sums, vector) => ((sums._1, vector.toArray).zipped.map(_ + _), sums._2 + 1), // seqOp + // first pass for collecting column sums + val result = counts.aggregate((new Array[Double](numCols), 0))( + (sums, vector) => { + val arr = vector.toArray + // check that the counts are all non-negative and finite in this pass + if (!arr.forall( i => !i.isNaN && !i.isInfinite && i >= 0.0)) { + throw new IllegalArgumentException("All input entries must be nonnegative and finite.") + } + ((sums._1, arr).zipped.map(_ + _), sums._2 + 1) + }, //seqOp (sums1, sums2) => ((sums1._1, sums2._1).zipped.map(_ + _), sums1._2 + sums2._2)) // combOp + val colSums = result._1 val total = colSums.sum // Second pass to compute chi-squared statistic val statistic = counts.aggregate(0.0)(rowStatistic(colSums, total), _ + _) val df = (numCols - 1) * (result._2 - 1) - val pValue = chiSquareComplemented(statistic, df) + val pValue = chiSquare(statistic, df) new ChiSquaredTestResult(pValue, Array(df), statistic, PEARSON) } @@ -76,7 +82,8 @@ private[stat] object ChiSquaredTest { val rowSum = arr.sum (arr, colSums).zipped.foldLeft(statistic) { case (stat, (observed, colSum)) => val expected = rowSum * colSum / total - stat + (observed - expected) * (observed - expected) / expected + val r = stat + (observed - expected) * (observed - expected) / expected + r } } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala index 780f4dcad60f..a07dfcfe9219 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala @@ -38,7 +38,7 @@ trait TestResult { */ override def toString: String = { s"pValue = $pValue \n" + // TODO explain what pValue is - s"degrees of freedom = $degreesOfFreedom \n" + + s"degrees of freedom = ${degreesOfFreedom.mkString} \n" + s"statistic = $statistic" } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala index 8a7e4ca7ebf9..0ab2f6cacf39 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala @@ -20,9 +20,16 @@ package org.apache.spark.mllib.stat import org.scalatest.FunSuite import org.apache.spark.mllib.util.LocalSparkContext +import org.apache.spark.mllib.util.TestingUtils._ class HypothesisTestSuite extends FunSuite with LocalSparkContext { test("chi squared") { - + val x = sc.parallelize(Array(2.0, 23.0, 53.0)) + val y = sc.parallelize(Array(53.0, 76.0, 1.0)) + val c = Statistics.chiSquared(x, y) + assert(c.statistic ~= 120.2546 absTol 1e-3) + + val bad = sc.parallelize(Array(2.0, -23.0, 53.0)) + intercept[Exception](Statistics.chiSquared(bad, y)) } } From 56860820cc55370a4aa2186b395412bbe57ce865 Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Tue, 5 Aug 2014 19:16:34 -0700 Subject: [PATCH 08/14] facelift --- .../apache/spark/mllib/stat/Statistics.scala | 63 +++++++- .../mllib/stat/test/ChiSquaredTest.scala | 134 ++++++++++++++---- .../spark/mllib/stat/test/TestResult.scala | 27 +++- .../mllib/stat/HypothesisTestSuite.scala | 99 +++++++++++-- 4 files changed, 275 insertions(+), 48 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala index 58afc76c3d34..39f4a3a10c3a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala @@ -91,24 +91,73 @@ object Statistics { @Experimental def corr(x: RDD[Double], y: RDD[Double], method: String): Double = Correlations.corr(x, y, method) - // Technically input should be RDD[Long] since the data should be counts + /** + * :: Experimental :: + * Conduct the Chi-squared goodness of fit test of the observed data against the + * expected distribution. + * + * Note: the two input RDDs need to have the same number of partitions and the same number of + * elements in each partition. + * + * @param observed RDD[Double] containing the observed counts. + * @param expected RDD[Double] containing the expected counts. If the observed total differs from + * the expected total, this RDD is rescaled to sum up to the observed total. + * @param method String specifying the method to use for the Chi-squared test. + * Supported: `pearson` (default) + * @return ChiSquaredTest object containing the test statistic, degrees of freedom, p-value, + * the method used, and the null hypothesis. + */ @Experimental - def chiSquared(x: RDD[Double], y: RDD[Double], method: String): ChiSquaredTestResult = { - ChiSquaredTest.chiSquared(x, y, method) + def chiSquared(observed: RDD[Double], + expected: RDD[Double], + method: String): ChiSquaredTestResult = { + ChiSquaredTest.chiSquared(observed, expected, method) } + /** + * :: Experimental :: + * Conduct the Chi-squared goodness of fit test of the observed data against the + * expected distribution. + * + * Note: the two input RDDs need to have the same number of partitions and the same number of + * elements in each partition. + * + * @param observed RDD[Double] containing the observed counts. + * @param expected RDD[Double] containing the expected counts. If the observed total differs from + * the expected total, this RDD is rescaled to sum up to the observed total. + * @return ChiSquaredTest object containing the test statistic, degrees of freedom, p-value, + * the method used, and the null hypothesis. + */ @Experimental - def chiSquared(expected: RDD[Double], observed: RDD[Double]): ChiSquaredTestResult = { - ChiSquaredTest.chiSquared(expected, observed) + def chiSquared(observed: RDD[Double], expected: RDD[Double]): ChiSquaredTestResult = { + ChiSquaredTest.chiSquared(observed, expected) } - // Same here. It should be something like RDD[Array[Long]] for counts instead, but I don't know - // if we should be consistent about how a "matrix" is presented + /** + * :: Experimental :: + * Conduct the Chi-squared independence test between the columns in the input matrix. + * + * @param counts RDD[Vector] containing observations with rows representing categories and columns + * representing separate trials for which independence between trials is assessed. + * @param method String specifying the method to use for the Chi-squared test. + * Supported: `pearson` (default) + * @return ChiSquaredTest object containing the test statistic, degrees of freedom, p-value, + * the method used, and the null hypothesis. + */ @Experimental def chiSquared(counts: RDD[Vector], method: String): ChiSquaredTestResult = { ChiSquaredTest.chiSquaredMatrix(counts, method) } + /** + * :: Experimental :: + * Conduct the Chi-squared independence test between the columns in the input matrix. + * + * @param counts RDD[Vector] containing observations with rows representing categories and columns + * representing separate trials for which independence between trials is assessed. + * @return ChiSquaredTest object containing the test statistic, degrees of freedom, p-value, + * the method used, and the null hypothesis. + */ @Experimental def chiSquared(counts: RDD[Vector]): ChiSquaredTestResult = { ChiSquaredTest.chiSquaredMatrix(counts) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala index 8d7cafde4072..86088ba4efe9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala @@ -17,72 +17,152 @@ package org.apache.spark.mllib.stat.test -import cern.jet.stat.Probability.chiSquare +import cern.jet.stat.Probability.chiSquareComplemented +import org.apache.spark.Logging +import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.linalg.Vector import org.apache.spark.rdd.RDD -import org.apache.spark.mllib.linalg.{DenseVector, Vector} -private[stat] object ChiSquaredTest { +/** + * Conduct the Chi-squared test for the input RDDs using the specified method. + * Goodness-of-fit test is conducted on two RDD[Double]s, whereas test of independence is conducted + * on an input of type RDD[Vector] in which independence between columns is assessed. + * + * Supported methods for goodness of fit: `pearson` (default) + * Supported methods for independence: `pearson` (default) + * + * More information on Chi-squared test: http://en.wikipedia.org/wiki/Chi-squared_test + * More information on Pearson's chi-squared test: + * http://en.wikipedia.org/wiki/Pearson%27s_chi-squared_test + * + */ +private[stat] object ChiSquaredTest extends Logging { val PEARSON = "pearson" - def chiSquared(expected: RDD[Double], - observed: RDD[Double], + object NullHypothesis extends Enumeration { + type NullHypothesis = Value + val goodnessOfFit = Value("observed follows the same distribution as expected.") + val independence = Value("observations in each column are statistically independent.") + } + + val zeroExpectedError = new IllegalArgumentException("Chi square statistic cannot be computed" + + " for input RDD due to nonpositive entries in the expected contingency table.") + + // delegator method for goodness of fit test + def chiSquared(observed: RDD[Double], + expected: RDD[Double], method: String = PEARSON): ChiSquaredTestResult = { method match { - case PEARSON => chiSquaredPearson(expected, observed) + case PEARSON => chiSquaredPearson(observed, expected) case _ => throw new IllegalArgumentException("Unrecognized method for Chi squared test.") } } + // delegator method for independence test def chiSquaredMatrix(counts: RDD[Vector], method: String = PEARSON): ChiSquaredTestResult = { method match { + // Yates' correction doesn't really apply here case PEARSON => chiSquaredPearson(counts) case _ => throw new IllegalArgumentException("Unrecognized method for Chi squared test.") } } - private def chiSquaredPearson(x: RDD[Double], - y: RDD[Double]): ChiSquaredTestResult = { - val mat: RDD[Vector] = x.zip(y).map { case (xi, yi) => new DenseVector(Array(xi, yi)) } - chiSquaredPearson(mat) + // Equation for computing Pearson's chi-squared statistic + private def pearson = (observed: Double, expected: Double) => { + val dev = observed - expected + dev * dev / expected + } + + /* + * Pearon's goodness of fit test. This can be easily made abstract to support other methods. + * Makes two passes over both input RDDs. + */ + private def chiSquaredPearson(observed: RDD[Double], + expected: RDD[Double]): ChiSquaredTestResult = { + + // compute the scaling factor and count for the input RDDs and check positivity in one pass + val observedStats = observed.stats() + if (observedStats.min < 0.0) { + throw new IllegalArgumentException("Values in observed must be nonnegative.") + } + val expectedStats = expected.stats() + if (expectedStats.min <= 0.0) { + throw new IllegalArgumentException("Values in expected must be positive.") + } + if (observedStats.count != expectedStats.count) { + throw new IllegalArgumentException("observed and expected must be of the same size.") + } + + val expScaled = if (math.abs(observedStats.sum - expectedStats.sum) < 1e-7) { + // No scaling needed since both RDDs have the same total + expected + } else { + expected.map(_ * observedStats.sum / expectedStats.sum) + } + + // Second pass to compute chi-squared statistic + val statistic = observed.zip(expScaled).aggregate(0.0)({ case (sum, (obs, exp)) => { + sum + pearson(obs, exp) + }}, _ + _) + val df = observedStats.count - 1 + val pValue = chiSquareComplemented(df, statistic) + new ChiSquaredTestResult(pValue, Array(df), statistic, PEARSON, + NullHypothesis.goodnessOfFit.toString) } - // Makes two passes over the RDD total + /* + * Pearon's independence test. This can be easily made abstract to support other methods. + * Makes two passes over the input RDD. + */ private def chiSquaredPearson(counts: RDD[Vector]): ChiSquaredTestResult = { + val numCols = counts.first.size // first pass for collecting column sums - val result = counts.aggregate((new Array[Double](numCols), 0))( - (sums, vector) => { + case class SumNCount(colSums: Array[Double], numRows: Long) + + val result = counts.aggregate(new SumNCount(new Array[Double](numCols), 0L))( + (sumNCount, vector) => { val arr = vector.toArray // check that the counts are all non-negative and finite in this pass - if (!arr.forall( i => !i.isNaN && !i.isInfinite && i >= 0.0)) { - throw new IllegalArgumentException("All input entries must be nonnegative and finite.") + if (!arr.forall(i => !i.isNaN && !i.isInfinite && i >= 0.0)) { + throw new IllegalArgumentException("Values in the input RDD must be nonnegative.") } - ((sums._1, arr).zipped.map(_ + _), sums._2 + 1) - }, //seqOp - (sums1, sums2) => ((sums1._1, sums2._1).zipped.map(_ + _), sums1._2 + sums2._2)) // combOp + new SumNCount((sumNCount.colSums, arr).zipped.map(_ + _), sumNCount.numRows + 1) + }, (sums1, sums2) => { + new SumNCount((sums1.colSums, sums2.colSums).zipped.map(_ + _), + sums1.numRows + sums2.numRows) + }) - val colSums = result._1 + val colSums = result.colSums + if (!colSums.forall(_ > 0.0)) { + throw zeroExpectedError + } val total = colSums.sum // Second pass to compute chi-squared statistic - val statistic = counts.aggregate(0.0)(rowStatistic(colSums, total), _ + _) - val df = (numCols - 1) * (result._2 - 1) - val pValue = chiSquare(statistic, df) - - new ChiSquaredTestResult(pValue, Array(df), statistic, PEARSON) + val statistic = counts.aggregate(0.0)(rowStatistic(colSums, total, pearson), _ + _) + val df = (numCols - 1) * (result.numRows - 1) + val pValue = chiSquareComplemented(df, statistic) + new ChiSquaredTestResult(pValue, Array(df), statistic, PEARSON, + NullHypothesis.independence.toString) } - // curried function to be used as seqOp in the aggregate operation to collect statistic - private def rowStatistic(colSums: Array[Double], total: Double) = { + // returns function to be used as seqOp in the aggregate operation to collect statistic + private def rowStatistic(colSums: Array[Double], + total: Double, + chiSquared: (Double, Double) => Double) = { (statistic: Double, vector: Vector) => { val arr = vector.toArray val rowSum = arr.sum + if (rowSum == 0.0) { // rowSum >= 0.0 as ensured by the nonnegative check + throw zeroExpectedError + } (arr, colSums).zipped.foldLeft(statistic) { case (stat, (observed, colSum)) => val expected = rowSum * colSum / total - val r = stat + (observed - expected) * (observed - expected) / expected + val r = stat + chiSquared(observed, expected) r } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala index a07dfcfe9219..050a771e52a4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.spark.mllib.stat.test import org.apache.spark.annotation.Experimental @@ -27,34 +28,48 @@ trait TestResult { def pValue: Double - def degreesOfFreedom: Array[Int] + def degreesOfFreedom: Array[Long] def statistic: Double /** - * Returns a String explaining the hypothesis test result. + * String explaining the hypothesis test result. * Specific classes implementing this trait should override this method to output test-specific * information. */ override def toString: String = { - s"pValue = $pValue \n" + // TODO explain what pValue is + + val pValueExplain = if (pValue <= 0.01) { + "Very strong presumption against null hypothesis." + } else if (0.01 < pValue && pValue <= 0.05) { + "Strong presumption against null hypothesis." + } else if (0.05 < pValue && pValue <= 0.01) { + "Low presumption against null hypothesis." + } else { + "No presumption against null hypothesis." + } + s"degrees of freedom = ${degreesOfFreedom.mkString} \n" + - s"statistic = $statistic" + s"statistic = $statistic \n" + + s"pValue = $pValue \n" + pValueExplain } } /** * :: Experimental :: + * Object containing the test results for the chi squared hypothesis test. */ @Experimental case class ChiSquaredTestResult(override val pValue: Double, - override val degreesOfFreedom: Array[Int], + override val degreesOfFreedom: Array[Long], override val statistic: Double, - val method: String) extends TestResult { + val method: String, + val nullHypothesis: String) extends TestResult { override def toString: String = { "Chi squared test summary: \n" + s"method: $method \n" + + s"null hypothesis: $nullHypothesis \n" + super.toString } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala index 0ab2f6cacf39..f51513d4f3e4 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala @@ -19,17 +19,100 @@ package org.apache.spark.mllib.stat import org.scalatest.FunSuite +import org.apache.spark.SparkException +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.stat.test.ChiSquaredTest import org.apache.spark.mllib.util.LocalSparkContext import org.apache.spark.mllib.util.TestingUtils._ class HypothesisTestSuite extends FunSuite with LocalSparkContext { - test("chi squared") { - val x = sc.parallelize(Array(2.0, 23.0, 53.0)) - val y = sc.parallelize(Array(53.0, 76.0, 1.0)) - val c = Statistics.chiSquared(x, y) - assert(c.statistic ~= 120.2546 absTol 1e-3) - - val bad = sc.parallelize(Array(2.0, -23.0, 53.0)) - intercept[Exception](Statistics.chiSquared(bad, y)) + + test("chi squared pearson goodness of fit") { + + // check that number of partitions does not affect results + for (numParts <- List(1, 2, 3, 4, 5)) { + val observed = sc.parallelize(Array[Double](4, 6, 5), numParts) + val expected = sc.parallelize(Array[Double](5, 5, 5), numParts) + val default = Statistics.chiSquared(observed, expected) + val pearson = Statistics.chiSquared(observed, expected, "pearson") + + // Results validated against the R command `chisq.test(c(4, 6, 5), p=c(1/3, 1/3, 1/3))` + assert(default.statistic === 0.4) + assert(default.degreesOfFreedom === Array(2)) + assert(default.pValue ~= 0.8187 absTol 1e-3) + assert(default.method === ChiSquaredTest.PEARSON) + assert(default.nullHypothesis === ChiSquaredTest.NullHypothesis.goodnessOfFit.toString) + assert(pearson.statistic === 0.4) + assert(pearson.degreesOfFreedom === Array(2)) + assert(pearson.pValue ~= 0.8187 absTol 1e-3) + assert(pearson.method === ChiSquaredTest.PEARSON) + assert(pearson.nullHypothesis === ChiSquaredTest.NullHypothesis.goodnessOfFit.toString) + + // different expected and observed sum + val observed1 = sc.parallelize(Array[Double](21, 38, 43, 80), numParts) + val expected1 = sc.parallelize(Array[Double](3, 5, 7, 20), numParts) + val c1 = Statistics.chiSquared(observed1, expected1) + + // Results validated against the R command + // `chisq.test(c(21, 38, 43, 80), p=c(3/35, 1/7, 1/5, 4/7))` + assert(c1.statistic ~= 14.1429 absTol 1e-3) + assert(c1.degreesOfFreedom === Array(3)) + assert(c1.pValue ~= 0.002717 absTol 1e-6) + assert(c1.method === ChiSquaredTest.PEARSON) + assert(c1.nullHypothesis === ChiSquaredTest.NullHypothesis.goodnessOfFit.toString) + } + + // different sized RDDs + val observed = sc.parallelize(Array(1.0, 2.0, 3.0)) + val expected = sc.parallelize(Array(1.0, 2.0, 3.0, 4.0)) + intercept[IllegalArgumentException](Statistics.chiSquared(observed, expected)) + + // negative counts in observed + val negObs = sc.parallelize(Array(1.0, 2.0, 3.0, -4.0)) + intercept[IllegalArgumentException](Statistics.chiSquared(negObs, expected)) + + // count = 0.0 in expected + val zeroExpected = sc.parallelize(Array(1.0, 0.0, 3.0)) + intercept[IllegalArgumentException](Statistics.chiSquared(observed, zeroExpected)) + } + + test("chi squared pearson independence") { + + val data = Seq( + Vectors.dense(40.0, 56.0, 31.0, 30.0), + Vectors.dense(24.0, 32.0, 10.0, 15.0), + Vectors.dense(29.0, 42.0, 0.0, 12.0) + ) + val chi = Statistics.chiSquared(sc.parallelize(data)) + assert(chi.statistic ~= 21.9958 absTol 1e-3) + assert(chi.degreesOfFreedom === Array(6)) + assert(chi.pValue ~= 0.001213 absTol 1e-6) + assert(chi.method === ChiSquaredTest.PEARSON) + assert(chi.nullHypothesis === ChiSquaredTest.NullHypothesis.independence.toString) + + // Negative counts + val negCounts = Seq( + Vectors.dense(4.0, 5.0, 3.0, 3.0), + Vectors.dense(0.0, -3.0, 0.0, 5.0), + Vectors.dense(9.0, 0.0, 0.0, 1.0) + ) + intercept[SparkException](Statistics.chiSquared(sc.parallelize(negCounts))) + + // Row sum = 0.0 + val rowZero = Seq( + Vectors.dense(4.0, 5.0, 3.0, 3.0), + Vectors.dense(0.0, 0.0, 0.0, 0.0), + Vectors.dense(9.0, 0.0, 0.0, 1.0) + ) + intercept[SparkException](Statistics.chiSquared(sc.parallelize(rowZero))) + + // Column sum = 0.0 + val colZero = Seq( + Vectors.dense(1.0, 0.0, 0.0, 2.0), + Vectors.dense(4.0, 5.0, 0.0, 3.0), + Vectors.dense(9.0, 0.0, 0.0, 1.0) + ) + // IllegalArgumentException thrown here since it's thrown on driver, not inside a task + intercept[IllegalArgumentException](Statistics.chiSquared(sc.parallelize(colZero))) } } From 7eea80bf38558fc61be399bfefa772a41aedb652 Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Thu, 7 Aug 2014 13:32:34 -0700 Subject: [PATCH 09/14] WIP --- .../apache/spark/mllib/stat/Statistics.scala | 60 ++--- .../mllib/stat/test/ChiSquaredTest.scala | 223 ++++++++++-------- .../spark/mllib/stat/test/TestResult.scala | 23 +- .../mllib/stat/HypothesisTestSuite.scala | 123 +++++----- 4 files changed, 220 insertions(+), 209 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala index 39f4a3a10c3a..16395d7b35c1 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala @@ -19,6 +19,7 @@ package org.apache.spark.mllib.stat import org.apache.spark.annotation.Experimental import org.apache.spark.mllib.linalg.{Matrix, Vector} +import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.stat.correlation.Correlations import org.apache.spark.mllib.stat.test.{ChiSquaredTest, ChiSquaredTestResult} import org.apache.spark.rdd.RDD @@ -93,73 +94,46 @@ object Statistics { /** * :: Experimental :: - * Conduct the Chi-squared goodness of fit test of the observed data against the + * Conduct Pearson's chi-squared goodness of fit test of the observed data against the * expected distribution. * - * Note: the two input RDDs need to have the same number of partitions and the same number of - * elements in each partition. + * Note: the two input Vectors need to have the same size. * - * @param observed RDD[Double] containing the observed counts. - * @param expected RDD[Double] containing the expected counts. If the observed total differs from - * the expected total, this RDD is rescaled to sum up to the observed total. - * @param method String specifying the method to use for the Chi-squared test. - * Supported: `pearson` (default) + * @param observed Vector containing the observed categorical counts/relative frequencies. + * @param expected Vector containing the expected categorical counts/relative frequencies. + * `expected` is rescaled if the `expected` sum differs from the `observed` sum. * @return ChiSquaredTest object containing the test statistic, degrees of freedom, p-value, * the method used, and the null hypothesis. */ @Experimental - def chiSquared(observed: RDD[Double], - expected: RDD[Double], - method: String): ChiSquaredTestResult = { - ChiSquaredTest.chiSquared(observed, expected, method) - } + def chiSqTest(observed: Vector, + expected: Vector): ChiSquaredTestResult = ChiSquaredTest.chiSquared(observed, expected) /** * :: Experimental :: - * Conduct the Chi-squared goodness of fit test of the observed data against the - * expected distribution. + * Conduct Pearson's chi-squared goodness of fit test of the observed data against the uniform + * distribution, with each category having an expected frequency of `1 / observed.size`. * - * Note: the two input RDDs need to have the same number of partitions and the same number of - * elements in each partition. - * - * @param observed RDD[Double] containing the observed counts. - * @param expected RDD[Double] containing the expected counts. If the observed total differs from - * the expected total, this RDD is rescaled to sum up to the observed total. + * @param observed Vector containing the observed categorical counts/relative frequencies. * @return ChiSquaredTest object containing the test statistic, degrees of freedom, p-value, * the method used, and the null hypothesis. */ @Experimental - def chiSquared(observed: RDD[Double], expected: RDD[Double]): ChiSquaredTestResult = { - ChiSquaredTest.chiSquared(observed, expected) - } + def chiSqTest(observed: Vector): ChiSquaredTestResult = ChiSquaredTest.chiSquared(observed) /** * :: Experimental :: - * Conduct the Chi-squared independence test between the columns in the input matrix. - * - * @param counts RDD[Vector] containing observations with rows representing categories and columns - * representing separate trials for which independence between trials is assessed. - * @param method String specifying the method to use for the Chi-squared test. - * Supported: `pearson` (default) - * @return ChiSquaredTest object containing the test statistic, degrees of freedom, p-value, - * the method used, and the null hypothesis. + * TODO */ @Experimental - def chiSquared(counts: RDD[Vector], method: String): ChiSquaredTestResult = { - ChiSquaredTest.chiSquaredMatrix(counts, method) - } + def chiSqTest(counts: Matrix): ChiSquaredTestResult = ChiSquaredTest.chiSquaredMatrix(counts) /** * :: Experimental :: - * Conduct the Chi-squared independence test between the columns in the input matrix. - * - * @param counts RDD[Vector] containing observations with rows representing categories and columns - * representing separate trials for which independence between trials is assessed. - * @return ChiSquaredTest object containing the test statistic, degrees of freedom, p-value, - * the method used, and the null hypothesis. + * TODO */ @Experimental - def chiSquared(counts: RDD[Vector]): ChiSquaredTestResult = { - ChiSquaredTest.chiSquaredMatrix(counts) + def chiSqTest(data: RDD[LabeledPoint]): Array[ChiSquaredTestResult] = { + ChiSquaredTest.chiSquaredFeatures(data) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala index 86088ba4efe9..7d6561e23fd5 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala @@ -17,17 +17,19 @@ package org.apache.spark.mllib.stat.test +import breeze.linalg.{DenseMatrix => BDM} import cern.jet.stat.Probability.chiSquareComplemented import org.apache.spark.Logging -import org.apache.spark.SparkContext._ -import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector, Vectors} +import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD /** * Conduct the Chi-squared test for the input RDDs using the specified method. * Goodness-of-fit test is conducted on two RDD[Double]s, whereas test of independence is conducted - * on an input of type RDD[Vector] in which independence between columns is assessed. + * on an input of type RDD[Vector] or RDD[LabeledPoint] in which independence between columns is + * assessed. * * Supported methods for goodness of fit: `pearson` (default) * Supported methods for independence: `pearson` (default) @@ -39,7 +41,16 @@ import org.apache.spark.rdd.RDD */ private[stat] object ChiSquaredTest extends Logging { - val PEARSON = "pearson" + /** + * @param name String name for the method. + * @param chiSqFunc Function for computing the statistic given the observed and expected counts. + */ + case class Method(name: String, chiSqFunc: (Double, Double) => Double) + + val PEARSON = new Method("pearson", (observed: Double, expected: Double) => { + val dev = observed - expected + dev * dev / expected + }) object NullHypothesis extends Enumeration { type NullHypothesis = Value @@ -47,124 +58,146 @@ private[stat] object ChiSquaredTest extends Logging { val independence = Value("observations in each column are statistically independent.") } - val zeroExpectedError = new IllegalArgumentException("Chi square statistic cannot be computed" - + " for input RDD due to nonpositive entries in the expected contingency table.") - - // delegator method for goodness of fit test - def chiSquared(observed: RDD[Double], - expected: RDD[Double], - method: String = PEARSON): ChiSquaredTestResult = { - method match { - case PEARSON => chiSquaredPearson(observed, expected) + private def methodFromString(methodName: String): Method = { + methodName match { + case PEARSON.name => PEARSON case _ => throw new IllegalArgumentException("Unrecognized method for Chi squared test.") } } - // delegator method for independence test - def chiSquaredMatrix(counts: RDD[Vector], method: String = PEARSON): ChiSquaredTestResult = { - method match { - // Yates' correction doesn't really apply here - case PEARSON => chiSquaredPearson(counts) - case _ => throw new IllegalArgumentException("Unrecognized method for Chi squared test.") + /** + * Conduct Pearson's independence test for each feature against the label across the input RDD. + * + * @param data RDD of LabeledPoints. + * @return Array[ChiSquareTestResult] containing + */ + def chiSquaredFeatures(data: RDD[LabeledPoint], + methodName: String = PEARSON.name): Array[ChiSquaredTestResult] = { + val numCols = data.first().features.size + val results = new Array[ChiSquaredTestResult](numCols) + var labels = Array[Double]() + var col = 0 + while (col < numCols) { + val featureVLabel = data.map(p => (p.label, p.features(col))) + // The following block of code can be cleaned up and made public as + // chiSquared(data: RDD[(V1, V2)]) + val pairCounts = featureVLabel.countByValue() + if (labels.size == 0) { + // Do this only once since labels are invariant across features. + labels = pairCounts.keys.map(_._1).toArray + } + val featureValues = pairCounts.keys.map(_._2).toArray + val numCols = labels.size + val numRows = featureValues.size + val contingency = new BDM(numRows, numCols, new Array[Double](numRows * numCols)) + for (((label, feature), count) <- pairCounts) { + val col = labels.indexOf(label) + val row = featureValues.indexOf(feature) + contingency(row, col) += count + } + results(col) = chiSquaredMatrix(Matrices.fromBreeze(contingency), methodName) + col += 1 } - } - - // Equation for computing Pearson's chi-squared statistic - private def pearson = (observed: Double, expected: Double) => { - val dev = observed - expected - dev * dev / expected + results } /* * Pearon's goodness of fit test. This can be easily made abstract to support other methods. - * Makes two passes over both input RDDs. */ - private def chiSquaredPearson(observed: RDD[Double], - expected: RDD[Double]): ChiSquaredTestResult = { + def chiSquared(observed: Vector, + expected: Vector = Vectors.dense(Array[Double]()), + methodName: String = PEARSON.name): ChiSquaredTestResult = { - // compute the scaling factor and count for the input RDDs and check positivity in one pass - val observedStats = observed.stats() - if (observedStats.min < 0.0) { - throw new IllegalArgumentException("Values in observed must be nonnegative.") - } - val expectedStats = expected.stats() - if (expectedStats.min <= 0.0) { - throw new IllegalArgumentException("Values in expected must be positive.") - } - if (observedStats.count != expectedStats.count) { + // Validate input arguments + val method = methodFromString(methodName) + if (expected.size != 0 && observed.size != expected.size) { throw new IllegalArgumentException("observed and expected must be of the same size.") } + val size = observed.size + // Avoid calling toArray on input vectors to avoid memory blow up + // (esp if size = Int.MaxValue for a SparseVector). + // Check positivity and collect sums + var obsSum = 0.0 + var expSum = if (expected.size == 0.0) 1.0 else 0.0 + var i = 0 + while (i < size) { + val obs = observed(i) + if (obs < 0.0) { + throw new IllegalArgumentException("Values in observed must be nonnegative.") + } + obsSum += obs + if (expected.size > 0) { + val exp = expected(i) + if (exp <= 0.0) { + throw new IllegalArgumentException("Values in expected must be positive.") + } + expSum += exp + } + i += 1 + } - val expScaled = if (math.abs(observedStats.sum - expectedStats.sum) < 1e-7) { - // No scaling needed since both RDDs have the same total - expected + // Determine the scaling factor for expected + val scale = if (math.abs(obsSum - expSum) < 1e-7) 1.0 else obsSum / expSum + val getExpected: (Int) => Double = if (expected.size == 0) { + // Assume uniform distribution + if (scale == 1.0) _ => 1.0 / size else _ => scale / size } else { - expected.map(_ * observedStats.sum / expectedStats.sum) + if (scale == 1.0) (i: Int) => expected(i) else (i: Int) => scale * expected(i) } - // Second pass to compute chi-squared statistic - val statistic = observed.zip(expScaled).aggregate(0.0)({ case (sum, (obs, exp)) => { - sum + pearson(obs, exp) - }}, _ + _) - val df = observedStats.count - 1 + // compute chi-squared statistic + var statistic = 0.0 + i = 0 + while (i < observed.size) { + val obs = observed(i) + if (obs != 0.0) { + statistic += method.chiSqFunc(obs, getExpected(i)) + } + } + val df = size - 1 val pValue = chiSquareComplemented(df, statistic) - new ChiSquaredTestResult(pValue, Array(df), statistic, PEARSON, - NullHypothesis.goodnessOfFit.toString) + new ChiSquaredTestResult(pValue, df, statistic, PEARSON.name, NullHypothesis.goodnessOfFit.toString) } /* * Pearon's independence test. This can be easily made abstract to support other methods. - * Makes two passes over the input RDD. */ - private def chiSquaredPearson(counts: RDD[Vector]): ChiSquaredTestResult = { - - val numCols = counts.first.size - - // first pass for collecting column sums - case class SumNCount(colSums: Array[Double], numRows: Long) - - val result = counts.aggregate(new SumNCount(new Array[Double](numCols), 0L))( - (sumNCount, vector) => { - val arr = vector.toArray - // check that the counts are all non-negative and finite in this pass - if (!arr.forall(i => !i.isNaN && !i.isInfinite && i >= 0.0)) { - throw new IllegalArgumentException("Values in the input RDD must be nonnegative.") - } - new SumNCount((sumNCount.colSums, arr).zipped.map(_ + _), sumNCount.numRows + 1) - }, (sums1, sums2) => { - new SumNCount((sums1.colSums, sums2.colSums).zipped.map(_ + _), - sums1.numRows + sums2.numRows) - }) - - val colSums = result.colSums - if (!colSums.forall(_ > 0.0)) { - throw zeroExpectedError + def chiSquaredMatrix(counts: Matrix, methodName:String = PEARSON.name): ChiSquaredTestResult = { + val method = methodFromString(methodName) + val numRows = counts.numRows + val numCols = counts.numCols + + // get row and column sums + val colSums = new Array[Double](numCols) + val rowSums = new Array[Double](numRows) + val colMajorArr = counts.toArray + var i = 0 + while (i < colMajorArr.size) { + val elem = colMajorArr(i) + if (elem < 0.0) { + throw new IllegalArgumentException("Contingency table cannot contain negative entries.") + } + colSums(i / numRows) += elem + rowSums(i % numRows) += elem + i += 1 + } + if (!colSums.forall(_ > 0.0) || !rowSums.forall(_ > 0.0)) { + throw new IllegalArgumentException("Chi square statistic cannot be computed for input matrix due to " + + "0.0 entries in the expected contingency table.") } val total = colSums.sum + // second pass to collect statistic + var statistic = 0.0 + i = 0 + while (i < colMajorArr.size) { + val expected = colSums(i / numRows) * rowSums(i % numRows) / total + statistic += method.chiSqFunc(colMajorArr(i), expected) + } + // Second pass to compute chi-squared statistic - val statistic = counts.aggregate(0.0)(rowStatistic(colSums, total, pearson), _ + _) - val df = (numCols - 1) * (result.numRows - 1) + val df = (numCols - 1) * (numRows - 1) val pValue = chiSquareComplemented(df, statistic) - new ChiSquaredTestResult(pValue, Array(df), statistic, PEARSON, - NullHypothesis.independence.toString) - } - - // returns function to be used as seqOp in the aggregate operation to collect statistic - private def rowStatistic(colSums: Array[Double], - total: Double, - chiSquared: (Double, Double) => Double) = { - (statistic: Double, vector: Vector) => { - val arr = vector.toArray - val rowSum = arr.sum - if (rowSum == 0.0) { // rowSum >= 0.0 as ensured by the nonnegative check - throw zeroExpectedError - } - (arr, colSums).zipped.foldLeft(statistic) { case (stat, (observed, colSum)) => - val expected = rowSum * colSum / total - val r = stat + chiSquared(observed, expected) - r - } - } + new ChiSquaredTestResult(pValue, df, statistic, methodName, NullHypothesis.independence.toString) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala index 050a771e52a4..2a7998b02610 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala @@ -22,14 +22,26 @@ import org.apache.spark.annotation.Experimental /** * :: Experimental :: * Trait for hypothesis test results. + * @tparam DF Return type of `degreesOfFreedom` */ @Experimental -trait TestResult { +trait TestResult[DF] { + /** + * + */ def pValue: Double - def degreesOfFreedom: Array[Long] + /** + * + * @return + */ + def degreesOfFreedom: DF + /** + * + * @return + */ def statistic: Double /** @@ -39,6 +51,7 @@ trait TestResult { */ override def toString: String = { + // String explaining what the p-value indicates. val pValueExplain = if (pValue <= 0.01) { "Very strong presumption against null hypothesis." } else if (0.01 < pValue && pValue <= 0.05) { @@ -49,7 +62,7 @@ trait TestResult { "No presumption against null hypothesis." } - s"degrees of freedom = ${degreesOfFreedom.mkString} \n" + + s"degrees of freedom = ${degreesOfFreedom.toString} \n" + s"statistic = $statistic \n" + s"pValue = $pValue \n" + pValueExplain } @@ -61,10 +74,10 @@ trait TestResult { */ @Experimental case class ChiSquaredTestResult(override val pValue: Double, - override val degreesOfFreedom: Array[Long], + override val degreesOfFreedom: Int, override val statistic: Double, val method: String, - val nullHypothesis: String) extends TestResult { + val nullHypothesis: String) extends TestResult[Int] { override def toString: String = { "Chi squared test summary: \n" + diff --git a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala index f51513d4f3e4..071db28b0ba4 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.mllib.stat import org.scalatest.FunSuite import org.apache.spark.SparkException -import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.linalg.{Matrices, DenseVector, Vectors} import org.apache.spark.mllib.stat.test.ChiSquaredTest import org.apache.spark.mllib.util.LocalSparkContext import org.apache.spark.mllib.util.TestingUtils._ @@ -29,61 +29,50 @@ class HypothesisTestSuite extends FunSuite with LocalSparkContext { test("chi squared pearson goodness of fit") { - // check that number of partitions does not affect results - for (numParts <- List(1, 2, 3, 4, 5)) { - val observed = sc.parallelize(Array[Double](4, 6, 5), numParts) - val expected = sc.parallelize(Array[Double](5, 5, 5), numParts) - val default = Statistics.chiSquared(observed, expected) - val pearson = Statistics.chiSquared(observed, expected, "pearson") - - // Results validated against the R command `chisq.test(c(4, 6, 5), p=c(1/3, 1/3, 1/3))` - assert(default.statistic === 0.4) - assert(default.degreesOfFreedom === Array(2)) - assert(default.pValue ~= 0.8187 absTol 1e-3) - assert(default.method === ChiSquaredTest.PEARSON) - assert(default.nullHypothesis === ChiSquaredTest.NullHypothesis.goodnessOfFit.toString) - assert(pearson.statistic === 0.4) - assert(pearson.degreesOfFreedom === Array(2)) - assert(pearson.pValue ~= 0.8187 absTol 1e-3) - assert(pearson.method === ChiSquaredTest.PEARSON) - assert(pearson.nullHypothesis === ChiSquaredTest.NullHypothesis.goodnessOfFit.toString) - - // different expected and observed sum - val observed1 = sc.parallelize(Array[Double](21, 38, 43, 80), numParts) - val expected1 = sc.parallelize(Array[Double](3, 5, 7, 20), numParts) - val c1 = Statistics.chiSquared(observed1, expected1) - - // Results validated against the R command - // `chisq.test(c(21, 38, 43, 80), p=c(3/35, 1/7, 1/5, 4/7))` - assert(c1.statistic ~= 14.1429 absTol 1e-3) - assert(c1.degreesOfFreedom === Array(3)) - assert(c1.pValue ~= 0.002717 absTol 1e-6) - assert(c1.method === ChiSquaredTest.PEARSON) - assert(c1.nullHypothesis === ChiSquaredTest.NullHypothesis.goodnessOfFit.toString) - } - - // different sized RDDs - val observed = sc.parallelize(Array(1.0, 2.0, 3.0)) - val expected = sc.parallelize(Array(1.0, 2.0, 3.0, 4.0)) - intercept[IllegalArgumentException](Statistics.chiSquared(observed, expected)) + val observed = new DenseVector(Array[Double](4, 6, 5)) + val pearson = Statistics.chiSqTest(observed) + + // Results validated against the R command `chisq.test(c(4, 6, 5), p=c(1/3, 1/3, 1/3))` + assert(pearson.statistic === 0.4) + assert(pearson.degreesOfFreedom === Array(2)) + assert(pearson.pValue ~= 0.8187 absTol 1e-3) + assert(pearson.method === ChiSquaredTest.PEARSON) + assert(pearson.nullHypothesis === ChiSquaredTest.NullHypothesis.goodnessOfFit.toString) + + // different expected and observed sum + val observed1 = new DenseVector(Array[Double](21, 38, 43, 80)) + val expected1 = new DenseVector(Array[Double](3, 5, 7, 20)) + val c1 = Statistics.chiSqTest(observed1, expected1) + + // Results validated against the R command + // `chisq.test(c(21, 38, 43, 80), p=c(3/35, 1/7, 1/5, 4/7))` + assert(c1.statistic ~= 14.1429 absTol 1e-3) + assert(c1.degreesOfFreedom === Array(3)) + assert(c1.pValue ~= 0.002717 absTol 1e-6) + assert(c1.method === ChiSquaredTest.PEARSON) + assert(c1.nullHypothesis === ChiSquaredTest.NullHypothesis.goodnessOfFit.toString) + + // Vectors with different sizes + val observed3 = new DenseVector(Array(1.0, 2.0, 3.0)) + val expected3 = new DenseVector(Array(1.0, 2.0, 3.0, 4.0)) + intercept[IllegalArgumentException](Statistics.chiSqTest(observed3, expected3)) // negative counts in observed - val negObs = sc.parallelize(Array(1.0, 2.0, 3.0, -4.0)) - intercept[IllegalArgumentException](Statistics.chiSquared(negObs, expected)) + val negObs = new DenseVector(Array(1.0, 2.0, 3.0, -4.0)) + intercept[IllegalArgumentException](Statistics.chiSqTest(negObs, expected1)) // count = 0.0 in expected - val zeroExpected = sc.parallelize(Array(1.0, 0.0, 3.0)) - intercept[IllegalArgumentException](Statistics.chiSquared(observed, zeroExpected)) + val zeroExpected = new DenseVector(Array(1.0, 0.0, 3.0)) + intercept[IllegalArgumentException](Statistics.chiSqTest(observed, zeroExpected)) } test("chi squared pearson independence") { - val data = Seq( - Vectors.dense(40.0, 56.0, 31.0, 30.0), - Vectors.dense(24.0, 32.0, 10.0, 15.0), - Vectors.dense(29.0, 42.0, 0.0, 12.0) - ) - val chi = Statistics.chiSquared(sc.parallelize(data)) + val data = Array( + 40.0, 56.0, 31.0, 30.0, + 24.0, 32.0, 10.0, 15.0, + 29.0, 42.0, 0.0, 12.0) + val chi = Statistics.chiSqTest(Matrices.dense(3, 4, data)) assert(chi.statistic ~= 21.9958 absTol 1e-3) assert(chi.degreesOfFreedom === Array(6)) assert(chi.pValue ~= 0.001213 absTol 1e-6) @@ -91,28 +80,30 @@ class HypothesisTestSuite extends FunSuite with LocalSparkContext { assert(chi.nullHypothesis === ChiSquaredTest.NullHypothesis.independence.toString) // Negative counts - val negCounts = Seq( - Vectors.dense(4.0, 5.0, 3.0, 3.0), - Vectors.dense(0.0, -3.0, 0.0, 5.0), - Vectors.dense(9.0, 0.0, 0.0, 1.0) - ) - intercept[SparkException](Statistics.chiSquared(sc.parallelize(negCounts))) + val negCounts = Array( + 4.0, 5.0, 3.0, 3.0, + 0.0, -3.0, 0.0, 5.0, + 9.0, 0.0, 0.0, 1.0) + intercept[SparkException](Statistics.chiSqTest(Matrices.dense(3, 4, negCounts))) // Row sum = 0.0 - val rowZero = Seq( - Vectors.dense(4.0, 5.0, 3.0, 3.0), - Vectors.dense(0.0, 0.0, 0.0, 0.0), - Vectors.dense(9.0, 0.0, 0.0, 1.0) - ) - intercept[SparkException](Statistics.chiSquared(sc.parallelize(rowZero))) + val rowZero = Array( + 4.0, 5.0, 3.0, 3.0, + 0.0, 0.0, 0.0, 0.0, + 9.0, 0.0, 0.0, 1.0) + intercept[SparkException](Statistics.chiSqTest(Matrices.dense(3, 4, rowZero))) // Column sum = 0.0 - val colZero = Seq( - Vectors.dense(1.0, 0.0, 0.0, 2.0), - Vectors.dense(4.0, 5.0, 0.0, 3.0), - Vectors.dense(9.0, 0.0, 0.0, 1.0) - ) + val colZero = Array( + 1.0, 0.0, 0.0, 2.0, + 4.0, 5.0, 0.0, 3.0, + 9.0, 0.0, 0.0, 1.0) // IllegalArgumentException thrown here since it's thrown on driver, not inside a task - intercept[IllegalArgumentException](Statistics.chiSquared(sc.parallelize(colZero))) + intercept[IllegalArgumentException](Statistics.chiSqTest(Matrices.dense(3, 4, colZero))) } + + test("chi squared pearson features") { + + } + } From c39eeb5d4b885f32f6defa06976dccbd06c33c0b Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Thu, 7 Aug 2014 15:28:18 -0700 Subject: [PATCH 10/14] units passed with updated API --- .../apache/spark/mllib/stat/Statistics.scala | 19 +++- .../mllib/stat/test/ChiSquaredTest.scala | 54 ++++++----- .../mllib/stat/HypothesisTestSuite.scala | 97 +++++++++++-------- 3 files changed, 106 insertions(+), 64 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala index 16395d7b35c1..82c43cd27041 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala @@ -98,6 +98,8 @@ object Statistics { * expected distribution. * * Note: the two input Vectors need to have the same size. + * `observed` cannot contain negative values. + * `expected` cannot contain nonpositive values. * * @param observed Vector containing the observed categorical counts/relative frequencies. * @param expected Vector containing the expected categorical counts/relative frequencies. @@ -114,6 +116,8 @@ object Statistics { * Conduct Pearson's chi-squared goodness of fit test of the observed data against the uniform * distribution, with each category having an expected frequency of `1 / observed.size`. * + * Note: `observed` cannot contain negative values. + * * @param observed Vector containing the observed categorical counts/relative frequencies. * @return ChiSquaredTest object containing the test statistic, degrees of freedom, p-value, * the method used, and the null hypothesis. @@ -123,14 +127,25 @@ object Statistics { /** * :: Experimental :: - * TODO + * Conduct Pearson's independence test on the input contingency matrix, which cannot contain + * negative entries or columns or rows that sum up to 0. + * + * @param counts The contingency matrix. + * @return ChiSquaredTest object containing the test statistic, degrees of freedom, p-value, + * the method used, and the null hypothesis. */ @Experimental def chiSqTest(counts: Matrix): ChiSquaredTestResult = ChiSquaredTest.chiSquaredMatrix(counts) /** * :: Experimental :: - * TODO + * Conduct Pearson's independence test for every feature against the label across the input RDD. + * For each feature, the (feature, label) pairs are converted into a contingency matrix for which + * the chi-squared statistic is computed. + * + * @param data an `RDD[LabeledPoint]` containing the Labeled dataset. + * @return an array containing the ChiSquaredTestResult for every feature against the label. + * The order of the elements in the returned array reflects the order of input features. */ @Experimental def chiSqTest(data: RDD[LabeledPoint]): Array[ChiSquaredTestResult] = { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala index 7d6561e23fd5..cdf0c8ab6642 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala @@ -27,17 +27,16 @@ import org.apache.spark.rdd.RDD /** * Conduct the Chi-squared test for the input RDDs using the specified method. - * Goodness-of-fit test is conducted on two RDD[Double]s, whereas test of independence is conducted - * on an input of type RDD[Vector] or RDD[LabeledPoint] in which independence between columns is - * assessed. + * Goodness-of-fit test is conducted on two `Vectors`, whereas test of independence is conducted + * on an input of type `Matrix` in which independence between columns is assessed. + * We also provide a method for computing the chi-squared statistic between each feature and the + * label for an input `RDD[LabeledPoint]`, return an `Array[ChiSquaredTestResult]` of size = + * number of features in the inpuy RDD. * * Supported methods for goodness of fit: `pearson` (default) * Supported methods for independence: `pearson` (default) * * More information on Chi-squared test: http://en.wikipedia.org/wiki/Chi-squared_test - * More information on Pearson's chi-squared test: - * http://en.wikipedia.org/wiki/Pearson%27s_chi-squared_test - * */ private[stat] object ChiSquaredTest extends Logging { @@ -47,17 +46,20 @@ private[stat] object ChiSquaredTest extends Logging { */ case class Method(name: String, chiSqFunc: (Double, Double) => Double) + // Pearson's chi-squared test: http://en.wikipedia.org/wiki/Pearson%27s_chi-squared_test val PEARSON = new Method("pearson", (observed: Double, expected: Double) => { val dev = observed - expected dev * dev / expected }) + // Null hypothesis for the two different types of chi-squared tests to be included in the result. object NullHypothesis extends Enumeration { type NullHypothesis = Value val goodnessOfFit = Value("observed follows the same distribution as expected.") val independence = Value("observations in each column are statistically independent.") } + // Method identification based on input methodName string private def methodFromString(methodName: String): Method = { methodName match { case PEARSON.name => PEARSON @@ -67,9 +69,9 @@ private[stat] object ChiSquaredTest extends Logging { /** * Conduct Pearson's independence test for each feature against the label across the input RDD. - * - * @param data RDD of LabeledPoints. - * @return Array[ChiSquareTestResult] containing + * The contingency table is constructed from the raw (feature, label) pairs and used to conduct + * the independence test. + * Returns an array containing the ChiSquaredTestResult for every feature against the label. */ def chiSquaredFeatures(data: RDD[LabeledPoint], methodName: String = PEARSON.name): Array[ChiSquaredTestResult] = { @@ -102,7 +104,8 @@ private[stat] object ChiSquaredTest extends Logging { } /* - * Pearon's goodness of fit test. This can be easily made abstract to support other methods. + * Pearon's goodness of fit test on the input observed and expected counts/relative frequencies. + * Uniform distribution is assumed when `expected` is not passed in. */ def chiSquared(observed: Vector, expected: Vector = Vectors.dense(Array[Double]()), @@ -147,20 +150,23 @@ private[stat] object ChiSquaredTest extends Logging { // compute chi-squared statistic var statistic = 0.0 - i = 0 - while (i < observed.size) { - val obs = observed(i) + var j = 0 + while (j < observed.size) { + val obs = observed(j) if (obs != 0.0) { - statistic += method.chiSqFunc(obs, getExpected(i)) + statistic += method.chiSqFunc(obs, getExpected(j)) } + j += 1 } val df = size - 1 val pValue = chiSquareComplemented(df, statistic) - new ChiSquaredTestResult(pValue, df, statistic, PEARSON.name, NullHypothesis.goodnessOfFit.toString) + new ChiSquaredTestResult(pValue, df, statistic, PEARSON.name, + NullHypothesis.goodnessOfFit.toString) } /* - * Pearon's independence test. This can be easily made abstract to support other methods. + * Pearon's independence test on the input contingency matrix. + * TODO: optimize for SparseMatrix when it becomes supported. */ def chiSquaredMatrix(counts: Matrix, methodName:String = PEARSON.name): ChiSquaredTestResult = { val method = methodFromString(methodName) @@ -182,22 +188,24 @@ private[stat] object ChiSquaredTest extends Logging { i += 1 } if (!colSums.forall(_ > 0.0) || !rowSums.forall(_ > 0.0)) { - throw new IllegalArgumentException("Chi square statistic cannot be computed for input matrix due to " - + "0.0 entries in the expected contingency table.") + throw new IllegalArgumentException("Chi square statistic cannot be computed for input matrix " + + "due to 0.0 entries in the expected contingency table.") } val total = colSums.sum // second pass to collect statistic var statistic = 0.0 - i = 0 - while (i < colMajorArr.size) { - val expected = colSums(i / numRows) * rowSums(i % numRows) / total - statistic += method.chiSqFunc(colMajorArr(i), expected) + var j = 0 + while (j < colMajorArr.size) { + val expected = colSums(j / numRows) * rowSums(j % numRows) / total + statistic += method.chiSqFunc(colMajorArr(j), expected) + j += 1 } // Second pass to compute chi-squared statistic val df = (numCols - 1) * (numRows - 1) val pValue = chiSquareComplemented(df, statistic) - new ChiSquaredTestResult(pValue, df, statistic, methodName, NullHypothesis.independence.toString) + new ChiSquaredTestResult(pValue, df, statistic, methodName, + NullHypothesis.independence.toString) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala index 071db28b0ba4..226b23ae0a01 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala @@ -19,8 +19,8 @@ package org.apache.spark.mllib.stat import org.scalatest.FunSuite -import org.apache.spark.SparkException -import org.apache.spark.mllib.linalg.{Matrices, DenseVector, Vectors} +import org.apache.spark.mllib.linalg.{DenseVector, Matrices, Vectors} +import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.stat.test.ChiSquaredTest import org.apache.spark.mllib.util.LocalSparkContext import org.apache.spark.mllib.util.TestingUtils._ @@ -34,23 +34,28 @@ class HypothesisTestSuite extends FunSuite with LocalSparkContext { // Results validated against the R command `chisq.test(c(4, 6, 5), p=c(1/3, 1/3, 1/3))` assert(pearson.statistic === 0.4) - assert(pearson.degreesOfFreedom === Array(2)) - assert(pearson.pValue ~= 0.8187 absTol 1e-3) - assert(pearson.method === ChiSquaredTest.PEARSON) + assert(pearson.degreesOfFreedom === 2) + assert(pearson.pValue ~= 0.8187 relTol 1e-4) + assert(pearson.method === ChiSquaredTest.PEARSON.name) assert(pearson.nullHypothesis === ChiSquaredTest.NullHypothesis.goodnessOfFit.toString) // different expected and observed sum val observed1 = new DenseVector(Array[Double](21, 38, 43, 80)) val expected1 = new DenseVector(Array[Double](3, 5, 7, 20)) - val c1 = Statistics.chiSqTest(observed1, expected1) + val pearson1 = Statistics.chiSqTest(observed1, expected1) // Results validated against the R command // `chisq.test(c(21, 38, 43, 80), p=c(3/35, 1/7, 1/5, 4/7))` - assert(c1.statistic ~= 14.1429 absTol 1e-3) - assert(c1.degreesOfFreedom === Array(3)) - assert(c1.pValue ~= 0.002717 absTol 1e-6) - assert(c1.method === ChiSquaredTest.PEARSON) - assert(c1.nullHypothesis === ChiSquaredTest.NullHypothesis.goodnessOfFit.toString) + assert(pearson1.statistic ~= 14.1429 relTol 1e-4) + assert(pearson1.degreesOfFreedom === 3) + assert(pearson1.pValue ~= 0.002717 relTol 1e-4) + assert(pearson1.method === ChiSquaredTest.PEARSON.name) + assert(pearson1.nullHypothesis === ChiSquaredTest.NullHypothesis.goodnessOfFit.toString) + + // SparseVector representation to make sure memory doesn't blow up + // Commented out because it takes too long for unit tests. Should be run as part of perf test. + // val observed2 = new SparseVector(Int.MaxValue, Array(1000005), Array[Double](10.0)) + // val pearson2 = Statistics.chiSqTest(observed2) // Vectors with different sizes val observed3 = new DenseVector(Array(1.0, 2.0, 3.0)) @@ -66,44 +71,58 @@ class HypothesisTestSuite extends FunSuite with LocalSparkContext { intercept[IllegalArgumentException](Statistics.chiSqTest(observed, zeroExpected)) } - test("chi squared pearson independence") { - - val data = Array( - 40.0, 56.0, 31.0, 30.0, - 24.0, 32.0, 10.0, 15.0, - 29.0, 42.0, 0.0, 12.0) + test("chi squared pearson matrix independence") { + val data = Array(40.0, 24.0, 29.0, 56.0, 32.0, 42.0, 31.0, 10.0, 0.0, 30.0, 15.0, 12.0) + // [[40.0, 56.0, 31.0, 30.0], + // [24.0, 32.0, 10.0, 15.0], + // [29.0, 42.0, 0.0, 12.0]] val chi = Statistics.chiSqTest(Matrices.dense(3, 4, data)) - assert(chi.statistic ~= 21.9958 absTol 1e-3) - assert(chi.degreesOfFreedom === Array(6)) - assert(chi.pValue ~= 0.001213 absTol 1e-6) - assert(chi.method === ChiSquaredTest.PEARSON) + // Results validated against R command + // `chisq.test(rbind(c(40, 56, 31, 30),c(24, 32, 10, 15), c(29, 42, 0, 12)))` + assert(chi.statistic ~= 21.9958 relTol 1e-4) + assert(chi.degreesOfFreedom === 6) + assert(chi.pValue ~= 0.001213 relTol 1e-4) + assert(chi.method === ChiSquaredTest.PEARSON.name) assert(chi.nullHypothesis === ChiSquaredTest.NullHypothesis.independence.toString) // Negative counts - val negCounts = Array( - 4.0, 5.0, 3.0, 3.0, - 0.0, -3.0, 0.0, 5.0, - 9.0, 0.0, 0.0, 1.0) - intercept[SparkException](Statistics.chiSqTest(Matrices.dense(3, 4, negCounts))) + val negCounts = Array(4.0, 5.0, 3.0, -3.0) + intercept[IllegalArgumentException](Statistics.chiSqTest(Matrices.dense(2, 2, negCounts))) // Row sum = 0.0 - val rowZero = Array( - 4.0, 5.0, 3.0, 3.0, - 0.0, 0.0, 0.0, 0.0, - 9.0, 0.0, 0.0, 1.0) - intercept[SparkException](Statistics.chiSqTest(Matrices.dense(3, 4, rowZero))) + val rowZero = Array(0.0, 1.0, 0.0, 2.0) + intercept[IllegalArgumentException](Statistics.chiSqTest(Matrices.dense(2, 2, rowZero))) // Column sum = 0.0 - val colZero = Array( - 1.0, 0.0, 0.0, 2.0, - 4.0, 5.0, 0.0, 3.0, - 9.0, 0.0, 0.0, 1.0) + val colZero = Array(0.0, 0.0, 2.0, 2.0) // IllegalArgumentException thrown here since it's thrown on driver, not inside a task - intercept[IllegalArgumentException](Statistics.chiSqTest(Matrices.dense(3, 4, colZero))) + intercept[IllegalArgumentException](Statistics.chiSqTest(Matrices.dense(2, 2, colZero))) } - test("chi squared pearson features") { - + test("chi squared pearson RDD[LabeledPoint]") { + // labels: 1.0 (2 / 6), 0.0 (4 / 6) + // feature1: 0.5 (1 / 6), 1.5 (2 / 6), 3.5 (3 / 6) + // feature2: 10.0 (1 / 6), 20.0 (1 / 6), 30.0 (2 / 6), 40.0 (2 / 6) + val data = Array(new LabeledPoint(0.0, Vectors.dense(0.5, 10.0)), + new LabeledPoint(0.0, Vectors.dense(1.5, 20.0)), + new LabeledPoint(1.0, Vectors.dense(1.5, 30.0)), + new LabeledPoint(0.0, Vectors.dense(3.5, 30.0)), + new LabeledPoint(0.0, Vectors.dense(3.5, 40.0)), + new LabeledPoint(1.0, Vectors.dense(3.5, 40.0))) + for (numParts <- List(2, 4, 6, 8)) { + val chi = Statistics.chiSqTest(sc.parallelize(data, numParts)) + val feature1 = chi(0) + assert(feature1.statistic === 0.75) + assert(feature1.degreesOfFreedom === 2) + assert(feature1.pValue ~= 0.6873 relTol 1e-4) + assert(feature1.method === ChiSquaredTest.PEARSON.name) + assert(feature1.nullHypothesis === ChiSquaredTest.NullHypothesis.independence.toString) + val feature2 = chi(1) + assert(feature2.statistic === 1.5) + assert(feature2.degreesOfFreedom === 3) + assert(feature2.pValue ~= 0.6823 relTol 1e-4) + assert(feature2.method === ChiSquaredTest.PEARSON.name) + assert(feature2.nullHypothesis === ChiSquaredTest.NullHypothesis.independence.toString) + } } - } From 80d03e2caf48b5b2227a48c0520ea51fe92f2a9d Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Fri, 8 Aug 2014 13:05:14 -0700 Subject: [PATCH 11/14] Reviewer comments. --- .../apache/spark/mllib/stat/Statistics.scala | 18 +++---- .../mllib/stat/test/ChiSquaredTest.scala | 47 +++++++++++------- .../spark/mllib/stat/test/TestResult.scala | 12 ++--- .../mllib/stat/HypothesisTestSuite.scala | 48 ++++++++++--------- 4 files changed, 70 insertions(+), 55 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala index 82c43cd27041..795171144d82 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala @@ -21,7 +21,7 @@ import org.apache.spark.annotation.Experimental import org.apache.spark.mllib.linalg.{Matrix, Vector} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.stat.correlation.Correlations -import org.apache.spark.mllib.stat.test.{ChiSquaredTest, ChiSquaredTestResult} +import org.apache.spark.mllib.stat.test.{ChiSqTest, ChiSquaredTestResult} import org.apache.spark.rdd.RDD /** @@ -108,8 +108,9 @@ object Statistics { * the method used, and the null hypothesis. */ @Experimental - def chiSqTest(observed: Vector, - expected: Vector): ChiSquaredTestResult = ChiSquaredTest.chiSquared(observed, expected) + def chiSqTest(observed: Vector, expected: Vector): ChiSquaredTestResult = { + ChiSqTest.chiSquared(observed, expected) + } /** * :: Experimental :: @@ -123,19 +124,19 @@ object Statistics { * the method used, and the null hypothesis. */ @Experimental - def chiSqTest(observed: Vector): ChiSquaredTestResult = ChiSquaredTest.chiSquared(observed) + def chiSqTest(observed: Vector): ChiSquaredTestResult = ChiSqTest.chiSquared(observed) /** * :: Experimental :: * Conduct Pearson's independence test on the input contingency matrix, which cannot contain * negative entries or columns or rows that sum up to 0. * - * @param counts The contingency matrix. + * @param observed The contingency matrix (containing either counts or relative frequencies). * @return ChiSquaredTest object containing the test statistic, degrees of freedom, p-value, * the method used, and the null hypothesis. */ @Experimental - def chiSqTest(counts: Matrix): ChiSquaredTestResult = ChiSquaredTest.chiSquaredMatrix(counts) + def chiSqTest(observed: Matrix): ChiSquaredTestResult = ChiSqTest.chiSquaredMatrix(observed) /** * :: Experimental :: @@ -143,12 +144,13 @@ object Statistics { * For each feature, the (feature, label) pairs are converted into a contingency matrix for which * the chi-squared statistic is computed. * - * @param data an `RDD[LabeledPoint]` containing the Labeled dataset. + * @param data an `RDD[LabeledPoint]` containing the labeled dataset with categorical features. + * Real-valued features will be treated as categorical for each distinct value. * @return an array containing the ChiSquaredTestResult for every feature against the label. * The order of the elements in the returned array reflects the order of input features. */ @Experimental def chiSqTest(data: RDD[LabeledPoint]): Array[ChiSquaredTestResult] = { - ChiSquaredTest.chiSquaredFeatures(data) + ChiSqTest.chiSquaredFeatures(data) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala index cdf0c8ab6642..bda0823f3647 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala @@ -26,7 +26,7 @@ import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD /** - * Conduct the Chi-squared test for the input RDDs using the specified method. + * Conduct the chi-squared test for the input RDDs using the specified method. * Goodness-of-fit test is conducted on two `Vectors`, whereas test of independence is conducted * on an input of type `Matrix` in which independence between columns is assessed. * We also provide a method for computing the chi-squared statistic between each feature and the @@ -38,7 +38,7 @@ import org.apache.spark.rdd.RDD * * More information on Chi-squared test: http://en.wikipedia.org/wiki/Chi-squared_test */ -private[stat] object ChiSquaredTest extends Logging { +private[stat] object ChiSqTest extends Logging { /** * @param name String name for the method. @@ -78,27 +78,38 @@ private[stat] object ChiSquaredTest extends Logging { val numCols = data.first().features.size val results = new Array[ChiSquaredTestResult](numCols) var labels = Array[Double]() - var col = 0 - while (col < numCols) { - val featureVLabel = data.map(p => (p.label, p.features(col))) + // At most 100 columns at a time + val batchSize = 100 + var batch = 0 + while (batch * batchSize < numCols) { // The following block of code can be cleaned up and made public as // chiSquared(data: RDD[(V1, V2)]) - val pairCounts = featureVLabel.countByValue() + val startCol = batch * batchSize + val endCol = startCol + math.min(batchSize, numCols - startCol) + val pairCounts = data.flatMap { p => + // assume dense vectors + p.features.toArray.slice(startCol, endCol).zipWithIndex.map { case (feature, col) => + (col, feature, p.label) + } + }.countByValue() + if (labels.size == 0) { - // Do this only once since labels are invariant across features. - labels = pairCounts.keys.map(_._1).toArray + // Do this only once for the first column since labels are invariant across features. + labels = pairCounts.keys.filter(_._1 == startCol).map(_._3).toArray.distinct } - val featureValues = pairCounts.keys.map(_._2).toArray - val numCols = labels.size - val numRows = featureValues.size - val contingency = new BDM(numRows, numCols, new Array[Double](numRows * numCols)) - for (((label, feature), count) <- pairCounts) { - val col = labels.indexOf(label) - val row = featureValues.indexOf(feature) - contingency(row, col) += count + val numLabels = labels.size + pairCounts.keys.groupBy(_._1).map { case (col, keys) => + val features = keys.map(_._2).toArray.distinct + val numRows = features.size + val contingency = new BDM(numRows, numLabels, new Array[Double](numRows * numLabels)) + keys.foreach { case (_, feature, label) => + val i = features.indexOf(feature) + val j = labels.indexOf(label) + contingency(i, j) += pairCounts((col, feature, label)) + } + results(col) = chiSquaredMatrix(Matrices.fromBreeze(contingency), methodName) } - results(col) = chiSquaredMatrix(Matrices.fromBreeze(contingency), methodName) - col += 1 + batch += 1 } results } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala index 2a7998b02610..a39fd4012efc 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala @@ -22,25 +22,25 @@ import org.apache.spark.annotation.Experimental /** * :: Experimental :: * Trait for hypothesis test results. - * @tparam DF Return type of `degreesOfFreedom` + * @tparam DF Return type of `degreesOfFreedom`. */ @Experimental trait TestResult[DF] { /** - * + * The probability of obtaining a test statistic result at least as extreme as the one that was + * actually observed, assuming that the null hypothesis is true. */ def pValue: Double /** - * - * @return + * Returns the degree(s) of freedom of the hypothesis test. + * Return type should be Number(e.g. Int, Double) or tuples of Numbers for toString compatibility. */ def degreesOfFreedom: DF /** - * - * @return + * Test statistic. */ def statistic: Double diff --git a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala index 226b23ae0a01..cefcc943127c 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala @@ -21,7 +21,7 @@ import org.scalatest.FunSuite import org.apache.spark.mllib.linalg.{DenseVector, Matrices, Vectors} import org.apache.spark.mllib.regression.LabeledPoint -import org.apache.spark.mllib.stat.test.ChiSquaredTest +import org.apache.spark.mllib.stat.test.ChiSqTest import org.apache.spark.mllib.util.LocalSparkContext import org.apache.spark.mllib.util.TestingUtils._ @@ -35,9 +35,9 @@ class HypothesisTestSuite extends FunSuite with LocalSparkContext { // Results validated against the R command `chisq.test(c(4, 6, 5), p=c(1/3, 1/3, 1/3))` assert(pearson.statistic === 0.4) assert(pearson.degreesOfFreedom === 2) - assert(pearson.pValue ~= 0.8187 relTol 1e-4) - assert(pearson.method === ChiSquaredTest.PEARSON.name) - assert(pearson.nullHypothesis === ChiSquaredTest.NullHypothesis.goodnessOfFit.toString) + assert(pearson.pValue ~== 0.8187 relTol 1e-4) + assert(pearson.method === ChiSqTest.PEARSON.name) + assert(pearson.nullHypothesis === ChiSqTest.NullHypothesis.goodnessOfFit.toString) // different expected and observed sum val observed1 = new DenseVector(Array[Double](21, 38, 43, 80)) @@ -46,16 +46,11 @@ class HypothesisTestSuite extends FunSuite with LocalSparkContext { // Results validated against the R command // `chisq.test(c(21, 38, 43, 80), p=c(3/35, 1/7, 1/5, 4/7))` - assert(pearson1.statistic ~= 14.1429 relTol 1e-4) + assert(pearson1.statistic ~== 14.1429 relTol 1e-4) assert(pearson1.degreesOfFreedom === 3) - assert(pearson1.pValue ~= 0.002717 relTol 1e-4) - assert(pearson1.method === ChiSquaredTest.PEARSON.name) - assert(pearson1.nullHypothesis === ChiSquaredTest.NullHypothesis.goodnessOfFit.toString) - - // SparseVector representation to make sure memory doesn't blow up - // Commented out because it takes too long for unit tests. Should be run as part of perf test. - // val observed2 = new SparseVector(Int.MaxValue, Array(1000005), Array[Double](10.0)) - // val pearson2 = Statistics.chiSqTest(observed2) + assert(pearson1.pValue ~== 0.002717 relTol 1e-4) + assert(pearson1.method === ChiSqTest.PEARSON.name) + assert(pearson1.nullHypothesis === ChiSqTest.NullHypothesis.goodnessOfFit.toString) // Vectors with different sizes val observed3 = new DenseVector(Array(1.0, 2.0, 3.0)) @@ -79,11 +74,11 @@ class HypothesisTestSuite extends FunSuite with LocalSparkContext { val chi = Statistics.chiSqTest(Matrices.dense(3, 4, data)) // Results validated against R command // `chisq.test(rbind(c(40, 56, 31, 30),c(24, 32, 10, 15), c(29, 42, 0, 12)))` - assert(chi.statistic ~= 21.9958 relTol 1e-4) + assert(chi.statistic ~== 21.9958 relTol 1e-4) assert(chi.degreesOfFreedom === 6) - assert(chi.pValue ~= 0.001213 relTol 1e-4) - assert(chi.method === ChiSquaredTest.PEARSON.name) - assert(chi.nullHypothesis === ChiSquaredTest.NullHypothesis.independence.toString) + assert(chi.pValue ~== 0.001213 relTol 1e-4) + assert(chi.method === ChiSqTest.PEARSON.name) + assert(chi.nullHypothesis === ChiSqTest.NullHypothesis.independence.toString) // Negative counts val negCounts = Array(4.0, 5.0, 3.0, -3.0) @@ -114,15 +109,22 @@ class HypothesisTestSuite extends FunSuite with LocalSparkContext { val feature1 = chi(0) assert(feature1.statistic === 0.75) assert(feature1.degreesOfFreedom === 2) - assert(feature1.pValue ~= 0.6873 relTol 1e-4) - assert(feature1.method === ChiSquaredTest.PEARSON.name) - assert(feature1.nullHypothesis === ChiSquaredTest.NullHypothesis.independence.toString) + assert(feature1.pValue ~== 0.6873 relTol 1e-4) + assert(feature1.method === ChiSqTest.PEARSON.name) + assert(feature1.nullHypothesis === ChiSqTest.NullHypothesis.independence.toString) val feature2 = chi(1) assert(feature2.statistic === 1.5) assert(feature2.degreesOfFreedom === 3) - assert(feature2.pValue ~= 0.6823 relTol 1e-4) - assert(feature2.method === ChiSquaredTest.PEARSON.name) - assert(feature2.nullHypothesis === ChiSquaredTest.NullHypothesis.independence.toString) + assert(feature2.pValue ~== 0.6823 relTol 1e-4) + assert(feature2.method === ChiSqTest.PEARSON.name) + assert(feature2.nullHypothesis === ChiSqTest.NullHypothesis.independence.toString) } + + // Test that the right number of results is returned + val numCols = 321 + val sparseData = Array(new LabeledPoint(0.0, Vectors.sparse(numCols, Seq((100, 2.0)))), + new LabeledPoint(0.0, Vectors.sparse(numCols, Seq((200, 1.0))))) + val chi = Statistics.chiSqTest(sc.parallelize(sparseData)) + assert(chi.size === numCols) } } From 7dde711d2efc3d1b4af51168dd0c97fce4ee932c Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Fri, 8 Aug 2014 13:58:59 -0700 Subject: [PATCH 12/14] ChiSqTestResult renaming and changed to Class --- .../org/apache/spark/mllib/stat/Statistics.scala | 10 +++++----- .../spark/mllib/stat/test/ChiSquaredTest.scala | 14 ++++++-------- .../apache/spark/mllib/stat/test/TestResult.scala | 2 +- 3 files changed, 12 insertions(+), 14 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala index 795171144d82..cf8679610e19 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala @@ -21,7 +21,7 @@ import org.apache.spark.annotation.Experimental import org.apache.spark.mllib.linalg.{Matrix, Vector} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.stat.correlation.Correlations -import org.apache.spark.mllib.stat.test.{ChiSqTest, ChiSquaredTestResult} +import org.apache.spark.mllib.stat.test.{ChiSqTest, ChiSqTestResult} import org.apache.spark.rdd.RDD /** @@ -108,7 +108,7 @@ object Statistics { * the method used, and the null hypothesis. */ @Experimental - def chiSqTest(observed: Vector, expected: Vector): ChiSquaredTestResult = { + def chiSqTest(observed: Vector, expected: Vector): ChiSqTestResult = { ChiSqTest.chiSquared(observed, expected) } @@ -124,7 +124,7 @@ object Statistics { * the method used, and the null hypothesis. */ @Experimental - def chiSqTest(observed: Vector): ChiSquaredTestResult = ChiSqTest.chiSquared(observed) + def chiSqTest(observed: Vector): ChiSqTestResult = ChiSqTest.chiSquared(observed) /** * :: Experimental :: @@ -136,7 +136,7 @@ object Statistics { * the method used, and the null hypothesis. */ @Experimental - def chiSqTest(observed: Matrix): ChiSquaredTestResult = ChiSqTest.chiSquaredMatrix(observed) + def chiSqTest(observed: Matrix): ChiSqTestResult = ChiSqTest.chiSquaredMatrix(observed) /** * :: Experimental :: @@ -150,7 +150,7 @@ object Statistics { * The order of the elements in the returned array reflects the order of input features. */ @Experimental - def chiSqTest(data: RDD[LabeledPoint]): Array[ChiSquaredTestResult] = { + def chiSqTest(data: RDD[LabeledPoint]): Array[ChiSqTestResult] = { ChiSqTest.chiSquaredFeatures(data) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala index bda0823f3647..abe12cacc84a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala @@ -74,9 +74,9 @@ private[stat] object ChiSqTest extends Logging { * Returns an array containing the ChiSquaredTestResult for every feature against the label. */ def chiSquaredFeatures(data: RDD[LabeledPoint], - methodName: String = PEARSON.name): Array[ChiSquaredTestResult] = { + methodName: String = PEARSON.name): Array[ChiSqTestResult] = { val numCols = data.first().features.size - val results = new Array[ChiSquaredTestResult](numCols) + val results = new Array[ChiSqTestResult](numCols) var labels = Array[Double]() // At most 100 columns at a time val batchSize = 100 @@ -120,7 +120,7 @@ private[stat] object ChiSqTest extends Logging { */ def chiSquared(observed: Vector, expected: Vector = Vectors.dense(Array[Double]()), - methodName: String = PEARSON.name): ChiSquaredTestResult = { + methodName: String = PEARSON.name): ChiSqTestResult = { // Validate input arguments val method = methodFromString(methodName) @@ -171,15 +171,14 @@ private[stat] object ChiSqTest extends Logging { } val df = size - 1 val pValue = chiSquareComplemented(df, statistic) - new ChiSquaredTestResult(pValue, df, statistic, PEARSON.name, - NullHypothesis.goodnessOfFit.toString) + new ChiSqTestResult(pValue, df, statistic, PEARSON.name, NullHypothesis.goodnessOfFit.toString) } /* * Pearon's independence test on the input contingency matrix. * TODO: optimize for SparseMatrix when it becomes supported. */ - def chiSquaredMatrix(counts: Matrix, methodName:String = PEARSON.name): ChiSquaredTestResult = { + def chiSquaredMatrix(counts: Matrix, methodName:String = PEARSON.name): ChiSqTestResult = { val method = methodFromString(methodName) val numRows = counts.numRows val numCols = counts.numCols @@ -216,7 +215,6 @@ private[stat] object ChiSqTest extends Logging { // Second pass to compute chi-squared statistic val df = (numCols - 1) * (numRows - 1) val pValue = chiSquareComplemented(df, statistic) - new ChiSquaredTestResult(pValue, df, statistic, methodName, - NullHypothesis.independence.toString) + new ChiSqTestResult(pValue, df, statistic, methodName, NullHypothesis.independence.toString) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala index a39fd4012efc..2f278621335e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala @@ -73,7 +73,7 @@ trait TestResult[DF] { * Object containing the test results for the chi squared hypothesis test. */ @Experimental -case class ChiSquaredTestResult(override val pValue: Double, +class ChiSqTestResult(override val pValue: Double, override val degreesOfFreedom: Int, override val statistic: Double, val method: String, From e95e485caa7b9efa3fd245997615684a277559f6 Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Mon, 11 Aug 2014 14:28:08 -0700 Subject: [PATCH 13/14] reviewer comments. --- .../{ChiSquaredTest.scala => ChiSqTest.scala} | 95 ++++++++++--------- .../mllib/stat/HypothesisTestSuite.scala | 13 ++- 2 files changed, 59 insertions(+), 49 deletions(-) rename mllib/src/main/scala/org/apache/spark/mllib/stat/test/{ChiSquaredTest.scala => ChiSqTest.scala} (75%) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala similarity index 75% rename from mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala rename to mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala index abe12cacc84a..2ad2179c0aaf 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSquaredTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala @@ -77,7 +77,7 @@ private[stat] object ChiSqTest extends Logging { methodName: String = PEARSON.name): Array[ChiSqTestResult] = { val numCols = data.first().features.size val results = new Array[ChiSqTestResult](numCols) - var labels = Array[Double]() + var labels: Map[Double, Int] = null // At most 100 columns at a time val batchSize = 100 var batch = 0 @@ -93,18 +93,19 @@ private[stat] object ChiSqTest extends Logging { } }.countByValue() - if (labels.size == 0) { + if (labels == null) { // Do this only once for the first column since labels are invariant across features. - labels = pairCounts.keys.filter(_._1 == startCol).map(_._3).toArray.distinct + labels = + pairCounts.keys.filter(_._1 == startCol).map(_._3).toArray.distinct.zipWithIndex.toMap } val numLabels = labels.size pairCounts.keys.groupBy(_._1).map { case (col, keys) => - val features = keys.map(_._2).toArray.distinct + val features = keys.map(_._2).toArray.distinct.zipWithIndex.toMap val numRows = features.size val contingency = new BDM(numRows, numLabels, new Array[Double](numRows * numLabels)) keys.foreach { case (_, feature, label) => - val i = features.indexOf(feature) - val j = labels.indexOf(label) + val i = features(feature) + val j = labels(label) contingency(i, j) += pairCounts((col, feature, label)) } results(col) = chiSquaredMatrix(Matrices.fromBreeze(contingency), methodName) @@ -128,46 +129,40 @@ private[stat] object ChiSqTest extends Logging { throw new IllegalArgumentException("observed and expected must be of the same size.") } val size = observed.size - // Avoid calling toArray on input vectors to avoid memory blow up - // (esp if size = Int.MaxValue for a SparseVector). - // Check positivity and collect sums - var obsSum = 0.0 - var expSum = if (expected.size == 0.0) 1.0 else 0.0 - var i = 0 - while (i < size) { - val obs = observed(i) - if (obs < 0.0) { - throw new IllegalArgumentException("Values in observed must be nonnegative.") - } - obsSum += obs - if (expected.size > 0) { - val exp = expected(i) - if (exp <= 0.0) { - throw new IllegalArgumentException("Values in expected must be positive.") - } - expSum += exp - } - i += 1 + if (size > 1000) { + logWarning("Chi-squared approximation may not be accurate due to low expected frequencies " + + s" as a result of a large number of categories: $size.") + } + val obsArr = observed.toArray + val expArr = if (expected.size == 0) Array.tabulate(size)(_ => 1.0 / size) else expected.toArray + if (!obsArr.forall(_ >= 0.0)) { + throw new IllegalArgumentException("Negative entries disallowed in the observed vector.") + } + if (expected.size != 0 && ! expArr.forall(_ >= 0.0)) { + throw new IllegalArgumentException("Negative entries disallowed in the expected vector.") } // Determine the scaling factor for expected - val scale = if (math.abs(obsSum - expSum) < 1e-7) 1.0 else obsSum / expSum - val getExpected: (Int) => Double = if (expected.size == 0) { - // Assume uniform distribution - if (scale == 1.0) _ => 1.0 / size else _ => scale / size - } else { - if (scale == 1.0) (i: Int) => expected(i) else (i: Int) => scale * expected(i) - } + val obsSum = obsArr.sum + val expSum = if (expected.size == 0.0) 1.0 else expArr.sum + val scale = if (math.abs(obsSum - expSum) < 1e-7) 1.0 else obsSum / expSum // compute chi-squared statistic - var statistic = 0.0 - var j = 0 - while (j < observed.size) { - val obs = observed(j) - if (obs != 0.0) { - statistic += method.chiSqFunc(obs, getExpected(j)) + val statistic = obsArr.zip(expArr).foldLeft(0.0) { case (stat, (obs, exp)) => + if (exp == 0.0) { + if (obs == 0.0) { + throw new IllegalArgumentException("Chi-squared statistic undefined for input vectors due" + + " to 0.0 values in both observed and expected.") + } else { + return new ChiSqTestResult(Double.PositiveInfinity, size - 1, Double.PositiveInfinity, + PEARSON.name, NullHypothesis.goodnessOfFit.toString) + } + } + if (scale == 1.0) { + stat + method.chiSqFunc(obs, exp) + } else { + stat + method.chiSqFunc(obs, exp * scale) } - j += 1 } val df = size - 1 val pValue = chiSquareComplemented(df, statistic) @@ -197,22 +192,28 @@ private[stat] object ChiSqTest extends Logging { rowSums(i % numRows) += elem i += 1 } - if (!colSums.forall(_ > 0.0) || !rowSums.forall(_ > 0.0)) { - throw new IllegalArgumentException("Chi square statistic cannot be computed for input matrix " - + "due to 0.0 entries in the expected contingency table.") - } val total = colSums.sum // second pass to collect statistic var statistic = 0.0 var j = 0 while (j < colMajorArr.size) { - val expected = colSums(j / numRows) * rowSums(j % numRows) / total + val col = j / numRows + val colSum = colSums(col) + if (colSum == 0.0) { + throw new IllegalArgumentException("Chi-squared statistic undefined for input matrix due to" + + s"0 sum in column [$col].") + } + val row = j % numRows + val rowSum = rowSums(row) + if (rowSum == 0.0) { + throw new IllegalArgumentException("Chi-squared statistic undefined for input matrix due to" + + s"0 sum in row [$row].") + } + val expected = colSum * rowSum / total statistic += method.chiSqFunc(colMajorArr(j), expected) j += 1 } - - // Second pass to compute chi-squared statistic val df = (numCols - 1) * (numRows - 1) val pValue = chiSquareComplemented(df, statistic) new ChiSqTestResult(pValue, df, statistic, methodName, NullHypothesis.independence.toString) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala index cefcc943127c..b47f1b75f1e7 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala @@ -61,9 +61,18 @@ class HypothesisTestSuite extends FunSuite with LocalSparkContext { val negObs = new DenseVector(Array(1.0, 2.0, 3.0, -4.0)) intercept[IllegalArgumentException](Statistics.chiSqTest(negObs, expected1)) - // count = 0.0 in expected + // count = 0.0 in expected but not observed val zeroExpected = new DenseVector(Array(1.0, 0.0, 3.0)) - intercept[IllegalArgumentException](Statistics.chiSqTest(observed, zeroExpected)) + val inf = Statistics.chiSqTest(observed, zeroExpected) + assert(inf.statistic === Double.PositiveInfinity) + assert(inf.degreesOfFreedom === 2) + assert(inf.pValue === Double.PositiveInfinity) + assert(inf.method === ChiSqTest.PEARSON.name) + assert(inf.nullHypothesis === ChiSqTest.NullHypothesis.goodnessOfFit.toString) + + // 0.0 in expected and observed simultaneously + val zeroObserved = new DenseVector(Array(2.0, 0.0, 1.0)) + intercept[IllegalArgumentException](Statistics.chiSqTest(zeroObserved, zeroExpected)) } test("chi squared pearson matrix independence") { From cafb3a773cb7efd1280c62dfc4749d0c23ec8e2b Mon Sep 17 00:00:00 2001 From: Doris Xin Date: Mon, 11 Aug 2014 16:10:52 -0700 Subject: [PATCH 14/14] fixed p-value for extreme case. --- .../scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala | 4 ++-- .../org/apache/spark/mllib/stat/HypothesisTestSuite.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala index 2ad2179c0aaf..8f6752737402 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala @@ -154,8 +154,8 @@ private[stat] object ChiSqTest extends Logging { throw new IllegalArgumentException("Chi-squared statistic undefined for input vectors due" + " to 0.0 values in both observed and expected.") } else { - return new ChiSqTestResult(Double.PositiveInfinity, size - 1, Double.PositiveInfinity, - PEARSON.name, NullHypothesis.goodnessOfFit.toString) + return new ChiSqTestResult(0.0, size - 1, Double.PositiveInfinity, PEARSON.name, + NullHypothesis.goodnessOfFit.toString) } } if (scale == 1.0) { diff --git a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala index b47f1b75f1e7..5bd0521298c1 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala @@ -66,7 +66,7 @@ class HypothesisTestSuite extends FunSuite with LocalSparkContext { val inf = Statistics.chiSqTest(observed, zeroExpected) assert(inf.statistic === Double.PositiveInfinity) assert(inf.degreesOfFreedom === 2) - assert(inf.pValue === Double.PositiveInfinity) + assert(inf.pValue === 0.0) assert(inf.method === ChiSqTest.PEARSON.name) assert(inf.nullHypothesis === ChiSqTest.NullHypothesis.goodnessOfFit.toString)