From 635fb4e78433e8760150d775d41b6af9b3cba976 Mon Sep 17 00:00:00 2001 From: Oliver Pierson Date: Mon, 22 Feb 2016 21:20:48 -0500 Subject: [PATCH 1/6] fixed splits bug in QuantileDiscretizer --- .../scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala index 1f4cca123310..ae01f90de770 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala @@ -110,7 +110,7 @@ object QuantileDiscretizer extends DefaultParamsReadable[QuantileDiscretizer] wi val totalSamples = dataset.count() require(totalSamples > 0, "QuantileDiscretizer requires non-empty input dataset but was given an empty input.") - val requiredSamples = math.max(numBins * numBins, 10000) + val requiredSamples = math.max(numBins * numBins, 10000.0) val fraction = math.min(requiredSamples / dataset.count(), 1.0) dataset.sample(withReplacement = false, fraction, new XORShiftRandom(seed).nextInt()).collect() } From d5dbaa251f55f7763712d5fb45682a2125a86bb8 Mon Sep 17 00:00:00 2001 From: Oliver Pierson Date: Tue, 23 Feb 2016 12:58:42 -0500 Subject: [PATCH 2/6] explicitly cast requiredSamples to Double --- .../org/apache/spark/ml/feature/QuantileDiscretizer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala index ae01f90de770..5a29fb4dd8a4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala @@ -110,8 +110,8 @@ object QuantileDiscretizer extends DefaultParamsReadable[QuantileDiscretizer] wi val totalSamples = dataset.count() require(totalSamples > 0, "QuantileDiscretizer requires non-empty input dataset but was given an empty input.") - val requiredSamples = math.max(numBins * numBins, 10000.0) - val fraction = math.min(requiredSamples / dataset.count(), 1.0) + val requiredSamples = math.max(numBins * numBins, 10000) + val fraction = math.min(requiredSamples.toDouble / dataset.count(), 1.0) dataset.sample(withReplacement = false, fraction, new XORShiftRandom(seed).nextInt()).collect() } From 4892fb7004a7690855e5c992667e5d27b0317107 Mon Sep 17 00:00:00 2001 From: Oliver Pierson Date: Tue, 23 Feb 2016 13:23:20 -0500 Subject: [PATCH 3/6] added minSamplesRequired parameter to QuantileDiscretizer --- .../apache/spark/ml/feature/QuantileDiscretizer.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala index 5a29fb4dd8a4..81c30ede49cf 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala @@ -103,6 +103,13 @@ final class QuantileDiscretizer(override val uid: String) @Since("1.6.0") object QuantileDiscretizer extends DefaultParamsReadable[QuantileDiscretizer] with Logging { + + /** + * Minimum number of samples required for finding splits, regardless of number of bins. If + * the dataset has less rows than this value, the entire dataset column will be used. + */ + val minSamplesRequired: Int = 10000 + /** * Sampling from the given dataset to collect quantile statistics. */ @@ -110,7 +117,7 @@ object QuantileDiscretizer extends DefaultParamsReadable[QuantileDiscretizer] wi val totalSamples = dataset.count() require(totalSamples > 0, "QuantileDiscretizer requires non-empty input dataset but was given an empty input.") - val requiredSamples = math.max(numBins * numBins, 10000) + val requiredSamples = math.max(numBins * numBins, minSamplesRequired) val fraction = math.min(requiredSamples.toDouble / dataset.count(), 1.0) dataset.sample(withReplacement = false, fraction, new XORShiftRandom(seed).nextInt()).collect() } From 3b55b6023e92ef22a7f7961c4625979d9cc811c4 Mon Sep 17 00:00:00 2001 From: Oliver Pierson Date: Tue, 23 Feb 2016 16:19:38 -0500 Subject: [PATCH 4/6] test for QuantileDiscretizer on large datasets --- .../ml/feature/QuantileDiscretizerSuite.scala | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala index 6a2c601bbed1..975c61605ada 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala @@ -71,6 +71,26 @@ class QuantileDiscretizerSuite } } + test("Test splits on relatively large dataset") { + val sqlCtx = SQLContext.getOrCreate(sc) + import sqlCtx.implicits._ + + val datasetSize = QuantileDiscretizer.minSamplesRequired + 1 + val numBuckets = 5 + val df = sc.parallelize((1.0 to datasetSize by 1.0).map(Tuple1.apply)).toDF("input") + val discretizer = new QuantileDiscretizer() + .setInputCol("input") + .setOutputCol("result") + .setNumBuckets(numBuckets) + .setSeed(1) + + val result = discretizer.fit(df).transform(df) + val observedNumBuckets = result.select("result").distinct.count + + assert(observedNumBuckets === numBuckets, + "Observed number of buckets does not equal expected number of buckets.") + } + test("read/write") { val t = new QuantileDiscretizer() .setInputCol("myInputCol") From c0052e4bbf6fce19a57b96cdcc342874525dd091 Mon Sep 17 00:00:00 2001 From: Oliver Pierson Date: Wed, 24 Feb 2016 10:14:23 -0500 Subject: [PATCH 5/6] private-tize minSamplesRequired; updated comments --- .../org/apache/spark/ml/feature/QuantileDiscretizer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala index 81c30ede49cf..769f4406e2df 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala @@ -106,9 +106,9 @@ object QuantileDiscretizer extends DefaultParamsReadable[QuantileDiscretizer] wi /** * Minimum number of samples required for finding splits, regardless of number of bins. If - * the dataset has less rows than this value, the entire dataset column will be used. + * the dataset has fewer rows than this value, the entire dataset will be used. */ - val minSamplesRequired: Int = 10000 + private[spark] val minSamplesRequired: Int = 10000 /** * Sampling from the given dataset to collect quantile statistics. From abea8765f66d061fca1d5358660fb71e9a194cc2 Mon Sep 17 00:00:00 2001 From: Oliver Pierson Date: Wed, 24 Feb 2016 10:20:18 -0500 Subject: [PATCH 6/6] change QuantileDiscretizer test name to better reflect purpose --- .../org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala index 975c61605ada..25fabf64d559 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala @@ -71,7 +71,7 @@ class QuantileDiscretizerSuite } } - test("Test splits on relatively large dataset") { + test("Test splits on dataset larger than minSamplesRequired") { val sqlCtx = SQLContext.getOrCreate(sc) import sqlCtx.implicits._