Skip to content

Commit 1bb44bd

Browse files
author
jose.cambronero
committed
style and doc changes. Factored out ks test into 2 separate tests
1 parent 2ec2aa6 commit 1bb44bd

File tree

5 files changed

+47
-48
lines changed

5 files changed

+47
-48
lines changed

docs/mllib-statistics.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ for i, result in enumerate(featureTestResults):
422422

423423
</div>
424424

425-
Additionally, MLlib provides a 1-sample, 2-sided implementation of the Kolmogorov-Smirnov test
425+
Additionally, MLlib provides a 1-sample, 2-sided implementation of the Kolmogorov-Smirnov (KS) test
426426
for equality of probability distributions. By providing the name of a theoretical distribution
427427
(currently solely supported for the normal distribution) and its parameters, or a function to
428428
calculate the cumulative distribution according to a given theoretical distribution, the user can

mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.mllib.stat
1919

20+
import scala.annotation.varargs
21+
2022
import org.apache.spark.annotation.Experimental
2123
import org.apache.spark.api.java.JavaRDD
2224
import org.apache.spark.mllib.linalg.distributed.RowMatrix
@@ -160,44 +162,34 @@ object Statistics {
160162
}
161163

162164
/**
163-
* Conduct the two-sided Kolmogorov Smirnov test for data sampled from a
165+
* Conduct the two-sided Kolmogorov-Smirnov (KS) test for data sampled from a
164166
* continuous distribution. By comparing the largest difference between the empirical cumulative
165167
* distribution of the sample data and the theoretical distribution we can provide a test for the
166168
* the null hypothesis that the sample data comes from that theoretical distribution.
167169
* For more information on KS Test:
168170
* @see [[https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test]]
169171
*
170-
* Implementation note: We seek to implement the KS test with a minimal number of distributed
171-
* passes. We sort the RDD, and then perform the following operations on a per-partition basis:
172-
* calculate an empirical cumulative distribution value for each observation, and a theoretical
173-
* cumulative distribution value. We know the latter to be correct, while the former will be off
174-
* by a constant (how large the constant is depends on how many values precede it in other
175-
* partitions).However, given that this constant simply shifts the ECDF upwards, but doesn't
176-
* change its shape, and furthermore, that constant is the same within a given partition, we can
177-
* pick 2 values in each partition that can potentially resolve to the largest global distance.
178-
* Namely, we pick the minimum distance and the maximum distance. Additionally, we keep track of
179-
* how many elements are in each partition. Once these three values have been returned for every
180-
* partition, we can collect and operate locally. Locally, we can now adjust each distance by the
181-
* appropriate constant (the cumulative sum of # of elements in the prior partitions divided by
182-
* the data set size). Finally, we take the maximum absolute value, and this is the statistic.
183172
* @param data an `RDD[Double]` containing the sample of data to test
184173
* @param cdf a `Double => Double` function to calculate the theoretical CDF at a given value
185-
* @return KSTestResult object containing test statistic, p-value, and null hypothesis.
174+
* @return [[org.apache.spark.mllib.stat.test.KSTestResult]] object containing test statistic,
175+
* p-value, and null hypothesis.
186176
*/
187177
def ksTest(data: RDD[Double], cdf: Double => Double): KSTestResult = {
188178
KSTest.testOneSample(data, cdf)
189179
}
190180

191181
/**
192-
* Convenience function to conduct a one-sample, two sided Kolmogorov Smirnov test for probability
182+
* Convenience function to conduct a one-sample, two-sided Kolmogorov-Smirnov test for probability
193183
* distribution equality. Currently supports the normal distribution, taking as parameters
194184
* the mean and standard deviation.
195185
* (distName = "norm")
196186
* @param data an `RDD[Double]` containing the sample of data to test
197187
* @param distName a `String` name for a theoretical distribution
198188
* @param params `Double*` specifying the parameters to be used for the theoretical distribution
199-
* @return KSTestResult object containing test statistic, p-value, and null hypothesis.
189+
* @return [[org.apache.spark.mllib.stat.test.KSTestResult]] object containing test statistic,
190+
* p-value, and null hypothesis.
200191
*/
192+
@varargs
201193
def ksTest(data: RDD[Double], distName: String, params: Double*): KSTestResult = {
202194
KSTest.testOneSample(data, distName, params: _*)
203195
}

mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -38,28 +38,29 @@ import org.apache.spark.rdd.RDD
3838
* calculate an empirical cumulative distribution value for each observation, and a theoretical
3939
* cumulative distribution value. We know the latter to be correct, while the former will be off by
4040
* a constant (how large the constant is depends on how many values precede it in other partitions).
41-
* However, given that this constant simply shifts the ECDF upwards, but doesn't change its shape,
42-
* and furthermore, that constant is the same within a given partition, we can pick 2 values
43-
* in each partition that can potentially resolve to the largest global distance. Namely, we
44-
* pick the minimum distance and the maximum distance. Additionally, we keep track of how many
45-
* elements are in each partition. Once these three values have been returned for every partition,
46-
* we can collect and operate locally. Locally, we can now adjust each distance by the appropriate
47-
* constant (the cumulative sum of # of elements in the prior partitions divided by the data set
48-
* size). Finally, we take the maximum absolute value, and this is the statistic.
41+
* However, given that this constant simply shifts the empirical CDF upwards, but doesn't
42+
* change its shape, and furthermore, that constant is the same within a given partition, we can
43+
* pick 2 values in each partition that can potentially resolve to the largest global distance.
44+
* Namely, we pick the minimum distance and the maximum distance. Additionally, we keep track of how
45+
* many elements are in each partition. Once these three values have been returned for every
46+
* partition, we can collect and operate locally. Locally, we can now adjust each distance by the
47+
* appropriate constant (the cumulative sum of number of elements in the prior partitions divided by
48+
* thedata set size). Finally, we take the maximum absolute value, and this is the statistic.
4949
*/
5050
private[stat] object KSTest extends Logging {
5151

5252
// Null hypothesis for the type of KS test to be included in the result.
5353
object NullHypothesis extends Enumeration {
5454
type NullHypothesis = Value
55-
val oneSampleTwoSided = Value("Sample follows theoretical distribution")
55+
val OneSampleTwoSided = Value("Sample follows theoretical distribution")
5656
}
5757

5858
/**
5959
* Runs a KS test for 1 set of sample data, comparing it to a theoretical distribution
6060
* @param data `RDD[Double]` data on which to run test
6161
* @param cdf `Double => Double` function to calculate the theoretical CDF
62-
* @return KSTestResult summarizing the test results (pval, statistic, and null hypothesis)
62+
* @return [[org.apache.spark.mllib.stat.test.KSTestResult]] summarizing the test results
63+
* (p-value, statistic, and null hypothesis)
6364
*/
6465
def testOneSample(data: RDD[Double], cdf: Double => Double): KSTestResult = {
6566
val n = data.count().toDouble
@@ -75,7 +76,8 @@ private[stat] object KSTest extends Logging {
7576
* Runs a KS test for 1 set of sample data, comparing it to a theoretical distribution
7677
* @param data `RDD[Double]` data on which to run test
7778
* @param createDist `Unit => RealDistribution` function to create a theoretical distribution
78-
* @return KSTestResult summarizing the test results (pval, statistic, and null hypothesis)
79+
* @return [[org.apache.spark.mllib.stat.test.KSTestResult]] summarizing the test results
80+
* (p-value, statistic, and null hypothesis)
7981
*/
8082
def testOneSample(data: RDD[Double], createDist: () => RealDistribution): KSTestResult = {
8183
val n = data.count().toDouble
@@ -94,15 +96,15 @@ private[stat] object KSTest extends Logging {
9496
* @param n `Double` the total size of the RDD
9597
* @param cdf `Double => Double` a function the calculates the theoretical CDF of a value
9698
* @return `Iterator[(Double, Double)] `Unadjusted (ie. off by a constant) potential extrema
97-
* in a partition. The first element corresponds to the (ECDF - 1/N) - CDF, the second
98-
* element corresponds to ECDF - CDF. We can then search the resulting iterator
99-
* for the minimum of the first and the maximum of the second element, and provide this
100-
* as a partition's candidate extrema
99+
* in a partition. The first element corresponds to the (empirical CDF - 1/N) - CDF,
100+
* the second element corresponds to empirical CDF - CDF. We can then search the resulting
101+
* iterator for the minimum of the first and the maximum of the second element, and provide
102+
* this as a partition's candidate extrema
101103
*/
102104
private def oneSampleDifferences(partData: Iterator[Double], n: Double, cdf: Double => Double)
103105
: Iterator[(Double, Double)] = {
104106
// zip data with index (within that partition)
105-
// calculate local (unadjusted) ECDF and subtract CDF
107+
// calculate local (unadjusted) empirical CDF and subtract CDF
106108
partData.zipWithIndex.map { case (v, ix) =>
107109
// dp and dl are later adjusted by constant, when global info is available
108110
val dp = (ix + 1) / n
@@ -125,8 +127,9 @@ private[stat] object KSTest extends Logging {
125127
* Search the unadjusted differences in a partition and return the
126128
* two extrema (furthest below and furthest above CDF), along with a count of elements in that
127129
* partition
128-
* @param partDiffs `Iterator[(Double, Double)]` the unadjusted differences between ECDF and CDF
129-
* in a partition, which come as a tuple of (ECDF - 1/N - CDF, ECDF - CDF)
130+
* @param partDiffs `Iterator[(Double, Double)]` the unadjusted differences between empirical CDF
131+
* and CDFin a partition, which come as a tuple of
132+
* (empirical CDF - 1/N - CDF, empirical CDF - CDF)
130133
* @return `Iterator[(Double, Double, Double)]` the local extrema and a count of elements
131134
*/
132135
private def searchOneSampleCandidates(partDiffs: Iterator[(Double, Double)])
@@ -140,9 +143,9 @@ private[stat] object KSTest extends Logging {
140143
}
141144

142145
/**
143-
* Find the global maximum distance between ECDF and CDF (i.e. the KS Statistic) after adjusting
144-
* local extrema estimates from individual partitions with the amount of elements in preceding
145-
* partitions
146+
* Find the global maximum distance between empirical CDF and CDF (i.e. the KS statistic) after
147+
* adjusting local extrema estimates from individual partitions with the amount of elements in
148+
* preceding partitions
146149
* @param localData `Array[(Double, Double, Double)]` A local array containing the collected
147150
* results of `searchOneSampleCandidates` across all partitions
148151
* @param n `Double`The size of the RDD
@@ -151,8 +154,8 @@ private[stat] object KSTest extends Logging {
151154
private def searchOneSampleStatistic(localData: Array[(Double, Double, Double)], n: Double)
152155
: Double = {
153156
val initAcc = (Double.MinValue, 0.0)
154-
// adjust differences based on the # of elements preceding it, which should provide
155-
// the correct distance between ECDF and CDF
157+
// adjust differences based on the number of elements preceding it, which should provide
158+
// the correct distance between empirical CDF and CDF
156159
val results = localData.foldLeft(initAcc) { case ((prevMax, prevCt), (minCand, maxCand, ct)) =>
157160
val adjConst = prevCt / n
158161
val dist1 = math.abs(minCand + adjConst)
@@ -169,7 +172,8 @@ private[stat] object KSTest extends Logging {
169172
* @param data the sample data that we wish to evaluate
170173
* @param distName the name of the theoretical distribution
171174
* @param params Variable length parameter for distribution's parameters
172-
* @return KSTestResult summarizing the test results (pval, statistic, and null hypothesis)
175+
* @return [[org.apache.spark.mllib.stat.test.KSTestResult]] summarizing the test results
176+
* (p-value, statistic, and null hypothesis)
173177
*/
174178
@varargs
175179
def testOneSample(data: RDD[Double], distName: String, params: Double*): KSTestResult = {
@@ -183,21 +187,21 @@ private[stat] object KSTest extends Logging {
183187
new NormalDistribution(params(0), params(1))
184188
} else {
185189
// if no parameters passed in initializes to standard normal
186-
logInfo("No parameters specified for Normal distribution," +
190+
logInfo("No parameters specified for normal distribution," +
187191
"initialized to standard normal (i.e. N(0, 1))")
188192
new NormalDistribution(0, 1)
189193
}
190194
}
191195
case _ => throw new UnsupportedOperationException(s"$distName not yet supported through" +
192-
s" convenience method. Current options are:[stdnorm].")
196+
s" convenience method. Current options are:['norm'].")
193197
}
194198

195199
testOneSample(data, distanceCalc)
196200
}
197201

198202
private def evalOneSampleP(ksStat: Double, n: Long): KSTestResult = {
199203
val pval = 1 - new KolmogorovSmirnovTest().cdf(ksStat, n.toInt)
200-
new KSTestResult(pval, ksStat, NullHypothesis.oneSampleTwoSided.toString)
204+
new KSTestResult(pval, ksStat, NullHypothesis.OneSampleTwoSided.toString)
201205
}
202206
}
203207

mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,13 +96,14 @@ class ChiSqTestResult private[stat] (override val pValue: Double,
9696
* Object containing the test results for the Kolmogorov-Smirnov test.
9797
*/
9898
@Experimental
99-
class KSTestResult private[stat] (override val pValue: Double,
99+
class KSTestResult private[stat] (
100+
override val pValue: Double,
100101
override val statistic: Double,
101102
override val nullHypothesis: String) extends TestResult[Int] {
102103

103104
override val degreesOfFreedom = 0
104105

105106
override def toString: String = {
106-
"Kolmogorov Smirnov test summary:\n" + super.toString
107+
"Kolmogorov-Smirnov test summary:\n" + super.toString
107108
}
108109
}

mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ class HypothesisTestSuite extends SparkFunSuite with MLlibTestSparkContext {
158158
}
159159
}
160160

161-
test("1 sample Kolmogorov-Smirnov test") {
161+
test("1 sample Kolmogorov-Smirnov test: apache commons math3 implementation equivalence") {
162162
// Create theoretical distributions
163163
val stdNormalDist = new NormalDistribution(0, 1)
164164
val expDist = new ExponentialDistribution(0.6)
@@ -215,7 +215,9 @@ class HypothesisTestSuite extends SparkFunSuite with MLlibTestSparkContext {
215215
assert(result3.pValue ~== referencePVal3 relTol 1e-4)
216216
// reject null hypothesis
217217
assert(result3.pValue < pThreshold)
218+
}
218219

220+
test("1 sample Kolmogorov-Smirnov test: R implementation equivalence") {
219221
/*
220222
Comparing results with R's implementation of Kolmogorov-Smirnov for 1 sample
221223
> sessionInfo()

0 commit comments

Comments
 (0)