-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-8598] [MLlib] Implementation of 1-sample, two-sided, Kolmogorov Smirnov Test for RDDs #6994
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
13dfe4d
c659ea1
4da189b
ce8e9a1
b9cff3a
f6951b6
c18dc66
16b5c4c
0b5e8ec
4b8ba61
6a4784f
992293b
3f81ad2
9c0f1af
1226b30
9026895
3288e42
e760ebd
7e66f57
a4bc0c7
2ec2aa6
1bb44bd
a48ae7b
1f56371
0d0c201
bbb30b1
08834f4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -169,6 +169,17 @@ object Statistics { | |
| KSTest.testOneSample(data, cdf) | ||
| } | ||
|
|
||
| /** | ||
| * A convenience method to conduct a one-sample, two sided Kolmogorov Smirnov test for probability | ||
| * distribution equality | ||
| * @param data an `RDD[Double]` containing the sample of data to test | ||
| * @param name a `String` name for a theoretical distribution | ||
|
||
| * @return KSTestResult object containing test statistic, p-value, and null hypothesis. | ||
| */ | ||
| def ksTest(data: RDD[Double], name: String): KSTestResult = { | ||
| KSTest.testOneSample(data, name) | ||
| } | ||
|
|
||
| /** | ||
| * Conduct a one-sample, two sided Kolmogorov Smirnov test for probability distribution equality, | ||
| * which creates only 1 distribution object per partition (useful in conjunction with Apache | ||
|
|
@@ -187,15 +198,4 @@ object Statistics { | |
| : KSTestResult = { | ||
| KSTest.testOneSampleOpt(data, distCalc) | ||
| } | ||
|
|
||
| /** | ||
| * A convenience method to conduct a one-sample, two sided Kolmogorov Smirnov test for probability | ||
| * distribution equality | ||
| * @param data an `RDD[Double]` containing the sample of data to test | ||
| * @param name a `String` name for a theoretical distribution | ||
| * @return KSTestResult object containing test statistic, p-value, and null hypothesis. | ||
| */ | ||
| def ksTest(data: RDD[Double], name: String): KSTestResult = { | ||
| KSTest.testOneSample(data, name) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,7 +20,6 @@ package org.apache.spark.mllib.stat.test | |
| import org.apache.commons.math3.distribution.NormalDistribution | ||
| import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest | ||
|
|
||
| import org.apache.spark.{SparkException, Logging} | ||
| import org.apache.spark.rdd.RDD | ||
|
|
||
|
||
|
|
||
|
|
@@ -31,7 +30,7 @@ import org.apache.spark.rdd.RDD | |
| * the null hypothesis that the sample data comes from that theoretical distribution. | ||
| * For more information on KS Test: https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test | ||
|
||
| */ | ||
| private[stat] object KSTest { | ||
| private[stat] object KSTest { | ||
|
|
||
| // Null hypothesis for the type of KS test to be included in the result. | ||
| object NullHypothesis extends Enumeration { | ||
|
|
@@ -41,22 +40,21 @@ import org.apache.spark.rdd.RDD | |
|
|
||
| /** | ||
| * Calculate empirical cumulative distribution values needed for KS statistic | ||
| * @param dat `RDD[Double]` on which to calculate empirical cumulative distribution values | ||
| * @param data `RDD[Double]` on which to calculate empirical cumulative distribution values | ||
| * @return and RDD of (Double, Double, Double), where the first element in each tuple is the | ||
| * value, the second element is the ECDFV - 1 /n, and the third element is the ECDFV, | ||
| * where ECDF stands for empirical cumulative distribution function value | ||
| * | ||
| */ | ||
| def empirical(dat: RDD[Double]): RDD[(Double, Double, Double)] = { | ||
| val n = dat.count().toDouble | ||
| dat.sortBy(x => x).zipWithIndex().map { case (v, i) => (v, i / n, (i + 1) / n) } | ||
| def empirical(data: RDD[Double]): RDD[(Double, Double, Double)] = { | ||
| val n = data.count().toDouble | ||
| data.sortBy(x => x).zipWithIndex().map { case (v, i) => (v, i / n, (i + 1) / n) } | ||
| } | ||
|
|
||
| /** | ||
| * Runs a KS test for 1 set of sample data, comparing it to a theoretical distribution | ||
| * @param dat `RDD[Double]` to evaluate | ||
| * @param cdf `Double => Double` function to calculate the theoretical CDF | ||
| * @return a KSTestResult summarizing the test results (pval, statistic, and null hypothesis) | ||
| * @return KSTestResult summarizing the test results (pval, statistic, and null hypothesis) | ||
| */ | ||
| def testOneSample(dat: RDD[Double], cdf: Double => Double): KSTestResult = { | ||
|
||
| val empiriRDD = empirical(dat) // empirical distribution | ||
|
|
@@ -77,11 +75,11 @@ import org.apache.spark.rdd.RDD | |
| * @param dat `RDD[Double]` to evaluate | ||
| * @param distCalc a function to calculate the distance between the empirical values and the | ||
| * theoretical value | ||
| * @return a KSTestResult summarizing the test results (pval, statistic, and null hypothesis) | ||
| * @return KSTestResult summarizing the test results (pval, statistic, and null hypothesis) | ||
| */ | ||
| def testOneSampleOpt(dat: RDD[Double], | ||
| distCalc: Iterator[(Double, Double, Double)] => Iterator[Double]) | ||
| : KSTestResult = { | ||
| distCalc: Iterator[(Double, Double, Double)] => Iterator[Double]) | ||
| : KSTestResult = { | ||
| val empiriRDD = empirical(dat) // empirical distribution information | ||
| val distances = empiriRDD.mapPartitions(distCalc, false) | ||
| val ksStat = distances.max | ||
|
|
@@ -91,7 +89,8 @@ import org.apache.spark.rdd.RDD | |
| /** | ||
| * Returns a function to calculate the KSTest with a standard normal distribution | ||
| * to be used with testOneSampleOpt | ||
| * @return Return a function that we can map over partitions to calculate the KS distance in each | ||
| * @return Return a function that we can map over partitions to calculate the KS distance for each | ||
| * observation on a per-partition basis | ||
| */ | ||
| def stdNormDistances(): (Iterator[(Double, Double, Double)]) => Iterator[Double] = { | ||
| val dist = new NormalDistribution(0, 1) | ||
|
|
@@ -107,13 +106,14 @@ import org.apache.spark.rdd.RDD | |
| * a named distribution | ||
| * @param dat the sample data that we wish to evaluate | ||
| * @param distName the name of the theoretical distribution | ||
| * @return The KS statistic and p-value associated with a two sided test | ||
| * @return KSTestResult summarizing the test results (pval, statistic, and null hypothesis) | ||
| */ | ||
| def testOneSample(dat: RDD[Double], distName: String): KSTestResult = { | ||
| val distanceCalc = | ||
| distName match { | ||
| case "stdnorm" => stdNormDistances() | ||
| case _ => throw new UnsupportedOperationException() | ||
| case _ => throw new UnsupportedOperationException(s"$distName not yet supported through" + | ||
| s"convenience method. Current options are:[stdnorm].") | ||
| } | ||
|
|
||
| testOneSampleOpt(dat, distanceCalc) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mention what distributions are supported