-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-8598] [MLlib] Implementation of 1-sample, two-sided, Kolmogorov Smirnov Test for RDDs #6994
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 21 commits
13dfe4d
c659ea1
4da189b
ce8e9a1
b9cff3a
f6951b6
c18dc66
16b5c4c
0b5e8ec
4b8ba61
6a4784f
992293b
3f81ad2
9c0f1af
1226b30
9026895
3288e42
e760ebd
7e66f57
a4bc0c7
2ec2aa6
1bb44bd
a48ae7b
1f56371
0d0c201
bbb30b1
08834f4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,7 +23,7 @@ import org.apache.spark.mllib.linalg.distributed.RowMatrix | |
| import org.apache.spark.mllib.linalg.{Matrix, Vector} | ||
| import org.apache.spark.mllib.regression.LabeledPoint | ||
| import org.apache.spark.mllib.stat.correlation.Correlations | ||
| import org.apache.spark.mllib.stat.test.{ChiSqTest, ChiSqTestResult} | ||
| import org.apache.spark.mllib.stat.test.{ChiSqTest, ChiSqTestResult, KSTest, KSTestResult} | ||
| import org.apache.spark.rdd.RDD | ||
|
|
||
| /** | ||
|
|
@@ -158,4 +158,47 @@ object Statistics { | |
| def chiSqTest(data: RDD[LabeledPoint]): Array[ChiSqTestResult] = { | ||
| ChiSqTest.chiSquaredFeatures(data) | ||
| } | ||
|
|
||
| /** | ||
| * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a | ||
|
||
| * continuous distribution. By comparing the largest difference between the empirical cumulative | ||
| * distribution of the sample data and the theoretical distribution we can provide a test for the | ||
| * the null hypothesis that the sample data comes from that theoretical distribution. | ||
| * For more information on KS Test: | ||
| * @see [[https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test]] | ||
| * | ||
| * Implementation note: We seek to implement the KS test with a minimal number of distributed | ||
| * passes. We sort the RDD, and then perform the following operations on a per-partition basis: | ||
| * calculate an empirical cumulative distribution value for each observation, and a theoretical | ||
| * cumulative distribution value. We know the latter to be correct, while the former will be off | ||
| * by a constant (how large the constant is depends on how many values precede it in other | ||
| * partitions).However, given that this constant simply shifts the ECDF upwards, but doesn't | ||
|
||
| * change its shape, and furthermore, that constant is the same within a given partition, we can | ||
| * pick 2 values in each partition that can potentially resolve to the largest global distance. | ||
| * Namely, we pick the minimum distance and the maximum distance. Additionally, we keep track of | ||
| * how many elements are in each partition. Once these three values have been returned for every | ||
| * partition, we can collect and operate locally. Locally, we can now adjust each distance by the | ||
| * appropriate constant (the cumulative sum of # of elements in the prior partitions divided by | ||
|
||
| * the data set size). Finally, we take the maximum absolute value, and this is the statistic. | ||
|
||
| * @param data an `RDD[Double]` containing the sample of data to test | ||
| * @param cdf a `Double => Double` function to calculate the theoretical CDF at a given value | ||
| * @return KSTestResult object containing test statistic, p-value, and null hypothesis. | ||
|
||
| */ | ||
| def ksTest(data: RDD[Double], cdf: Double => Double): KSTestResult = { | ||
| KSTest.testOneSample(data, cdf) | ||
| } | ||
|
|
||
| /** | ||
| * Convenience function to conduct a one-sample, two sided Kolmogorov Smirnov test for probability | ||
|
||
| * distribution equality. Currently supports the normal distribution, taking as parameters | ||
| * the mean and standard deviation. | ||
| * (distName = "norm") | ||
| * @param data an `RDD[Double]` containing the sample of data to test | ||
| * @param distName a `String` name for a theoretical distribution | ||
| * @param params `Double*` specifying the parameters to be used for the theoretical distribution | ||
| * @return KSTestResult object containing test statistic, p-value, and null hypothesis. | ||
| */ | ||
| def ksTest(data: RDD[Double], distName: String, params: Double*): KSTestResult = { | ||
|
||
| KSTest.testOneSample(data, distName, params: _*) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,203 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.mllib.stat.test | ||
|
|
||
| import scala.annotation.varargs | ||
|
|
||
| import org.apache.commons.math3.distribution.{NormalDistribution, RealDistribution} | ||
| import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest | ||
|
|
||
| import org.apache.spark.Logging | ||
| import org.apache.spark.rdd.RDD | ||
|
|
||
|
||
| /** | ||
| * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a | ||
| * continuous distribution. By comparing the largest difference between the empirical cumulative | ||
| * distribution of the sample data and the theoretical distribution we can provide a test for the | ||
| * the null hypothesis that the sample data comes from that theoretical distribution. | ||
| * For more information on KS Test: | ||
| * @see [[https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test]] | ||
| * | ||
| * Implementation note: We seek to implement the KS test with a minimal number of distributed | ||
| * passes. We sort the RDD, and then perform the following operations on a per-partition basis: | ||
| * calculate an empirical cumulative distribution value for each observation, and a theoretical | ||
| * cumulative distribution value. We know the latter to be correct, while the former will be off by | ||
| * a constant (how large the constant is depends on how many values precede it in other partitions). | ||
| * However, given that this constant simply shifts the ECDF upwards, but doesn't change its shape, | ||
| * and furthermore, that constant is the same within a given partition, we can pick 2 values | ||
| * in each partition that can potentially resolve to the largest global distance. Namely, we | ||
|
||
| * pick the minimum distance and the maximum distance. Additionally, we keep track of how many | ||
| * elements are in each partition. Once these three values have been returned for every partition, | ||
| * we can collect and operate locally. Locally, we can now adjust each distance by the appropriate | ||
| * constant (the cumulative sum of # of elements in the prior partitions divided by the data set | ||
| * size). Finally, we take the maximum absolute value, and this is the statistic. | ||
|
||
| */ | ||
| private[stat] object KSTest extends Logging { | ||
|
|
||
| // Null hypothesis for the type of KS test to be included in the result. | ||
| object NullHypothesis extends Enumeration { | ||
| type NullHypothesis = Value | ||
| val oneSampleTwoSided = Value("Sample follows theoretical distribution") | ||
|
||
| } | ||
|
|
||
| /** | ||
| * Runs a KS test for 1 set of sample data, comparing it to a theoretical distribution | ||
| * @param data `RDD[Double]` data on which to run test | ||
| * @param cdf `Double => Double` function to calculate the theoretical CDF | ||
| * @return KSTestResult summarizing the test results (pval, statistic, and null hypothesis) | ||
| */ | ||
| def testOneSample(data: RDD[Double], cdf: Double => Double): KSTestResult = { | ||
| val n = data.count().toDouble | ||
| val localData = data.sortBy(x => x).mapPartitions { part => | ||
| val partDiffs = oneSampleDifferences(part, n, cdf) // local distances | ||
| searchOneSampleCandidates(partDiffs) // candidates: local extrema | ||
| }.collect() | ||
| val ksStat = searchOneSampleStatistic(localData, n) // result: global extreme | ||
| evalOneSampleP(ksStat, n.toLong) | ||
| } | ||
|
|
||
| /** | ||
| * Runs a KS test for 1 set of sample data, comparing it to a theoretical distribution | ||
| * @param data `RDD[Double]` data on which to run test | ||
| * @param createDist `Unit => RealDistribution` function to create a theoretical distribution | ||
| * @return KSTestResult summarizing the test results (pval, statistic, and null hypothesis) | ||
| */ | ||
| def testOneSample(data: RDD[Double], createDist: () => RealDistribution): KSTestResult = { | ||
| val n = data.count().toDouble | ||
| val localData = data.sortBy(x => x).mapPartitions { part => | ||
| val partDiffs = oneSampleDifferences(part, n, createDist) // local distances | ||
| searchOneSampleCandidates(partDiffs) // candidates: local extrema | ||
| }.collect() | ||
| val ksStat = searchOneSampleStatistic(localData, n) // result: global extreme | ||
| evalOneSampleP(ksStat, n.toLong) | ||
|
||
| } | ||
|
|
||
| /** | ||
| * Calculate unadjusted distances between the empirical CDF and the theoretical CDF in a | ||
| * partition | ||
| * @param partData `Iterator[Double]` 1 partition of a sorted RDD | ||
| * @param n `Double` the total size of the RDD | ||
| * @param cdf `Double => Double` a function the calculates the theoretical CDF of a value | ||
| * @return `Iterator[(Double, Double)] `Unadjusted (ie. off by a constant) potential extrema | ||
| * in a partition. The first element corresponds to the (ECDF - 1/N) - CDF, the second | ||
| * element corresponds to ECDF - CDF. We can then search the resulting iterator | ||
| * for the minimum of the first and the maximum of the second element, and provide this | ||
| * as a partition's candidate extrema | ||
| */ | ||
| private def oneSampleDifferences(partData: Iterator[Double], n: Double, cdf: Double => Double) | ||
| : Iterator[(Double, Double)] = { | ||
| // zip data with index (within that partition) | ||
| // calculate local (unadjusted) ECDF and subtract CDF | ||
| partData.zipWithIndex.map { case (v, ix) => | ||
| // dp and dl are later adjusted by constant, when global info is available | ||
| val dp = (ix + 1) / n | ||
| val dl = ix / n | ||
| val cdfVal = cdf(v) | ||
| (dl - cdfVal, dp - cdfVal) | ||
| } | ||
| } | ||
|
|
||
| private def oneSampleDifferences( | ||
| partData: Iterator[Double], | ||
| n: Double, | ||
| createDist: () => RealDistribution) | ||
| : Iterator[(Double, Double)] = { | ||
| val dist = createDist() | ||
| oneSampleDifferences(partData, n, x => dist.cumulativeProbability(x)) | ||
| } | ||
|
|
||
| /** | ||
| * Search the unadjusted differences in a partition and return the | ||
| * two extrema (furthest below and furthest above CDF), along with a count of elements in that | ||
| * partition | ||
| * @param partDiffs `Iterator[(Double, Double)]` the unadjusted differences between ECDF and CDF | ||
| * in a partition, which come as a tuple of (ECDF - 1/N - CDF, ECDF - CDF) | ||
| * @return `Iterator[(Double, Double, Double)]` the local extrema and a count of elements | ||
| */ | ||
| private def searchOneSampleCandidates(partDiffs: Iterator[(Double, Double)]) | ||
| : Iterator[(Double, Double, Double)] = { | ||
| val initAcc = (Double.MaxValue, Double.MinValue, 0.0) | ||
| val pResults = partDiffs.foldLeft(initAcc) { case ((pMin, pMax, pCt), (dl, dp)) => | ||
| (math.min(pMin, dl), math.max(pMax, dp), pCt + 1) | ||
| } | ||
| val results = if (pResults == initAcc) Array[(Double, Double, Double)]() else Array(pResults) | ||
| results.iterator | ||
| } | ||
|
|
||
| /** | ||
| * Find the global maximum distance between ECDF and CDF (i.e. the KS Statistic) after adjusting | ||
| * local extrema estimates from individual partitions with the amount of elements in preceding | ||
| * partitions | ||
| * @param localData `Array[(Double, Double, Double)]` A local array containing the collected | ||
| * results of `searchOneSampleCandidates` across all partitions | ||
| * @param n `Double`The size of the RDD | ||
| * @return The one-sample Kolmogorov Smirnov Statistic | ||
| */ | ||
| private def searchOneSampleStatistic(localData: Array[(Double, Double, Double)], n: Double) | ||
| : Double = { | ||
| val initAcc = (Double.MinValue, 0.0) | ||
| // adjust differences based on the # of elements preceding it, which should provide | ||
| // the correct distance between ECDF and CDF | ||
| val results = localData.foldLeft(initAcc) { case ((prevMax, prevCt), (minCand, maxCand, ct)) => | ||
| val adjConst = prevCt / n | ||
| val dist1 = math.abs(minCand + adjConst) | ||
| val dist2 = math.abs(maxCand + adjConst) | ||
| val maxVal = Array(prevMax, dist1, dist2).max | ||
| (maxVal, prevCt + ct) | ||
| } | ||
| results._1 | ||
| } | ||
|
|
||
| /** | ||
| * A convenience function that allows running the KS test for 1 set of sample data against | ||
| * a named distribution | ||
| * @param data the sample data that we wish to evaluate | ||
| * @param distName the name of the theoretical distribution | ||
| * @param params Variable length parameter for distribution's parameters | ||
| * @return KSTestResult summarizing the test results (pval, statistic, and null hypothesis) | ||
| */ | ||
| @varargs | ||
| def testOneSample(data: RDD[Double], distName: String, params: Double*): KSTestResult = { | ||
| val distanceCalc = | ||
| distName match { | ||
| case "norm" => () => { | ||
| if (params.nonEmpty) { | ||
| // parameters are passed, then can only be 2 | ||
| require(params.length == 2, "Normal distribution requires mean and standard " + | ||
| "deviation as parameters") | ||
| new NormalDistribution(params(0), params(1)) | ||
| } else { | ||
| // if no parameters passed in initializes to standard normal | ||
| logInfo("No parameters specified for Normal distribution," + | ||
|
||
| "initialized to standard normal (i.e. N(0, 1))") | ||
| new NormalDistribution(0, 1) | ||
| } | ||
| } | ||
| case _ => throw new UnsupportedOperationException(s"$distName not yet supported through" + | ||
| s" convenience method. Current options are:[stdnorm].") | ||
|
||
| } | ||
|
|
||
| testOneSample(data, distanceCalc) | ||
| } | ||
|
|
||
| private def evalOneSampleP(ksStat: Double, n: Long): KSTestResult = { | ||
| val pval = 1 - new KolmogorovSmirnovTest().cdf(ksStat, n.toInt) | ||
| new KSTestResult(pval, ksStat, NullHypothesis.oneSampleTwoSided.toString) | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -90,3 +90,19 @@ class ChiSqTestResult private[stat] (override val pValue: Double, | |
| super.toString | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * :: Experimental :: | ||
| * Object containing the test results for the Kolmogorov-Smirnov test. | ||
| */ | ||
| @Experimental | ||
| class KSTestResult private[stat] (override val pValue: Double, | ||
|
||
| override val statistic: Double, | ||
| override val nullHypothesis: String) extends TestResult[Int] { | ||
|
|
||
| override val degreesOfFreedom = 0 | ||
|
||
|
|
||
| override def toString: String = { | ||
| "Kolmogorov Smirnov test summary:\n" + super.toString | ||
|
||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kolmogorov-Smirnov->Kolmogorov-Smirnov (KS)Otherwise, we use
KSwithout definition.