From d00357e40cef090c20ed4089d1cc23ebdaba2918 Mon Sep 17 00:00:00 2001 From: noelsmith Date: Mon, 24 Aug 2015 18:02:13 +0100 Subject: [PATCH 1/6] Added test for cross validation --- python/pyspark/ml/tests.py | 61 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index c151d21fd661..392c954675dc 100644 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -264,5 +264,66 @@ def test_ngram(self): self.assertEquals(transformedDF.head().output, ["a b c d", "b c d e"]) + +class HasInducedError(Params): + + def __init__(self): + super(HasInducedError, self).__init__() + self.inducedError = Param(self, "inducedError", "Uniformly-distributed error added to feature") + + def getInducedError(self): + return self.getOrDefault(self.inducedError) + +class InducedErrorModel(Model, HasInducedError): + + def __init__(self): + super(InducedErrorModel, self).__init__() + + def _transform(self, dataset): + return dataset.withColumn("prediction", + dataset.feature + (rand(0) * self.getInducedError())) + +class InducedErrorEstimator(Estimator, HasInducedError): + + def __init__(self, inducedError=1.0): + super(InducedErrorEstimator, self).__init__() + self._set(inducedError=inducedError) + + def _fit(self, dataset): + model = InducedErrorModel() + self._copyValues(model) + return model + +class CrossValidatorTests(PySparkTestCase): + + def test_fit_regression(self): + + sqlContext = SQLContext(self.sc) + dataset = sqlContext.createDataFrame([ + (10, 10.0), + (50, 50.0), + (100, 100.0), + (500, 500.0)] * 10, + ["feature", "label"]) + + + iee = InducedErrorEstimator() + evaluator = RegressionEvaluator(metricName="rmse") + + grid = (ParamGridBuilder() + .addGrid( iee.inducedError, [100.0, 0.0, 10000.0] ) + .build()) + + cv = CrossValidator(estimator=iee0, estimatorParamMaps=grid, evaluator=evaluator) + + cvModel = cv.fit(dataset) + bestModel = cvModel.bestModel + + bestModelMetric = rmse_eval.evaluate(bestModel.transform(dataset)) + + self.assertEqual(0.0, bestModel.getOrDefault('inducedError'), "Best model has zero induced error") + self.assertEqual(0.0, bestModelMetric, "Best model should fit exactly") + + if __name__ == "__main__": unittest.main() From 6cd4ed12e4c37e80a3f88f93cb4255a1c011f5af Mon Sep 17 00:00:00 2001 From: noelsmith Date: Mon, 24 Aug 2015 19:03:18 +0100 Subject: [PATCH 2/6] Added/fixed tests for cross validation --- python/pyspark/ml/tests.py | 41 ++++++++++++++++++++++++++++++-------- 1 file changed, 33 insertions(+), 8 deletions(-) diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 392c954675dc..c3412d854af0 100644 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -32,11 +32,14 @@ from pyspark.tests import ReusedPySparkTestCase as PySparkTestCase from pyspark.sql import DataFrame, SQLContext +from pyspark.sql.functions import rand +from pyspark.ml.evaluation import RegressionEvaluator from pyspark.ml.param import Param, Params from pyspark.ml.param.shared import HasMaxIter, HasInputCol, HasSeed from pyspark.ml.util import keyword_only from pyspark.ml import Estimator, Model, Pipeline, Transformer from pyspark.ml.feature import * +from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel from pyspark.mllib.linalg import DenseVector @@ -296,8 +299,7 @@ def _fit(self, dataset): class CrossValidatorTests(PySparkTestCase): - def test_fit_regression(self): - + def test_fit_minimize_metric(self): sqlContext = SQLContext(self.sc) dataset = sqlContext.createDataFrame([ (10, 10.0), @@ -306,23 +308,46 @@ def test_fit_regression(self): (500, 500.0)] * 10, ["feature", "label"]) - iee = InducedErrorEstimator() evaluator = RegressionEvaluator(metricName="rmse") grid = (ParamGridBuilder() .addGrid( iee.inducedError, [100.0, 0.0, 10000.0] ) .build()) + cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator) + cvModel = cv.fit(dataset) + bestModel = cvModel.bestModel + bestModelMetric = evaluator.evaluate(bestModel.transform(dataset)) + + print bestModel.getOrDefault('inducedError'), "Best model should have zero induced error" + print bestModelMetric, "Best model should fit exactly" + self.assertEqual(0.0, bestModel.getOrDefault('inducedError'), "Best model should have zero induced error") + self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0") + + def test_fit_maximize_metric(self): + sqlContext = SQLContext(self.sc) + dataset = sqlContext.createDataFrame([ + (10, 10.0), + (50, 50.0), + (100, 100.0), + (500, 500.0)] * 10, + ["feature", "label"]) - cv = CrossValidator(estimator=iee0, estimatorParamMaps=grid, evaluator=evaluator) + iee = InducedErrorEstimator() + evaluator = RegressionEvaluator(metricName="r2") + grid = (ParamGridBuilder() + .addGrid( iee.inducedError, [100.0, 0.0, 10000.0] ) + .build()) + cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator) cvModel = cv.fit(dataset) bestModel = cvModel.bestModel + bestModelMetric = evaluator.evaluate(bestModel.transform(dataset)) - bestModelMetric = rmse_eval.evaluate(bestModel.transform(dataset)) - - self.assertEqual(0.0, bestModel.getOrDefault('inducedError'), "Best model has zero induced error") - self.assertEqual(0.0, bestModelMetric, "Best model should fit exactly") + print bestModel.getOrDefault('inducedError'), "Best model should have zero induced error" + print bestModelMetric, "Best model should fit exactly" + self.assertEqual(0.0, bestModel.getOrDefault('inducedError'), "Best model should have zero induced error") + self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1") if __name__ == "__main__": From 63b3835b3676d8c1c19f756d4e9dba5575ef9d3f Mon Sep 17 00:00:00 2001 From: noelsmith Date: Mon, 24 Aug 2015 19:24:48 +0100 Subject: [PATCH 3/6] Removed print statements --- python/pyspark/ml/tests.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index c3412d854af0..2fda3f85455e 100644 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -319,8 +319,6 @@ def test_fit_minimize_metric(self): bestModel = cvModel.bestModel bestModelMetric = evaluator.evaluate(bestModel.transform(dataset)) - print bestModel.getOrDefault('inducedError'), "Best model should have zero induced error" - print bestModelMetric, "Best model should fit exactly" self.assertEqual(0.0, bestModel.getOrDefault('inducedError'), "Best model should have zero induced error") self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0") @@ -344,8 +342,6 @@ def test_fit_maximize_metric(self): bestModel = cvModel.bestModel bestModelMetric = evaluator.evaluate(bestModel.transform(dataset)) - print bestModel.getOrDefault('inducedError'), "Best model should have zero induced error" - print bestModelMetric, "Best model should fit exactly" self.assertEqual(0.0, bestModel.getOrDefault('inducedError'), "Best model should have zero induced error") self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1") From 7794cf73e10f2b5c57cfff1a2ea4a175e282c33c Mon Sep 17 00:00:00 2001 From: noelsmith Date: Mon, 24 Aug 2015 19:25:33 +0100 Subject: [PATCH 4/6] Added checks for isLargerBetter() --- python/pyspark/ml/evaluation.py | 6 ++++++ python/pyspark/ml/tuning.py | 6 +++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/python/pyspark/ml/evaluation.py b/python/pyspark/ml/evaluation.py index 6b0a9ffde9f4..9594cb344ec1 100644 --- a/python/pyspark/ml/evaluation.py +++ b/python/pyspark/ml/evaluation.py @@ -66,6 +66,9 @@ def evaluate(self, dataset, params=None): else: raise ValueError("Params must be a param map but got %s." % type(params)) + def isLargerBetter(self): + return True + @inherit_doc class JavaEvaluator(Evaluator, JavaWrapper): @@ -85,6 +88,9 @@ def _evaluate(self, dataset): self._transfer_params_to_java() return self._java_obj.evaluate(dataset._jdf) + def isLargerBetter(self): + return self._java_obj.isLargerBetter() + @inherit_doc class BinaryClassificationEvaluator(JavaEvaluator, HasLabelCol, HasRawPredictionCol): diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index dcfee6a3170a..7f6bbebfe0f6 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -223,7 +223,11 @@ def _fit(self, dataset): # TODO: duplicate evaluator to take extra params from input metric = eva.evaluate(model.transform(validation, epm[j])) metrics[j] += metric - bestIndex = np.argmax(metrics) + + if eva.isLargerBetter(): + bestIndex = np.argmax(metrics) + else: + bestIndex = np.argmin(metrics) bestModel = est.fit(dataset, epm[bestIndex]) return CrossValidatorModel(bestModel) From bada4539227a3705337beea7e08bdc45183e2903 Mon Sep 17 00:00:00 2001 From: noelsmith Date: Thu, 27 Aug 2015 08:08:50 +0100 Subject: [PATCH 5/6] Added fixes from PR notes. Fixed style test failures. --- python/pyspark/ml/evaluation.py | 6 ++++++ python/pyspark/ml/tests.py | 37 +++++++++++++++++++-------------- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/python/pyspark/ml/evaluation.py b/python/pyspark/ml/evaluation.py index 9594cb344ec1..cb3b07947e48 100644 --- a/python/pyspark/ml/evaluation.py +++ b/python/pyspark/ml/evaluation.py @@ -67,6 +67,11 @@ def evaluate(self, dataset, params=None): raise ValueError("Params must be a param map but got %s." % type(params)) def isLargerBetter(self): + """ + Indicates whether the metric returned by :py:meth:`evaluate` should be maximized + (True, default) or minimized (False). + A given evaluator may support multiple metrics which may be maximized or minimized. + """ return True @@ -89,6 +94,7 @@ def _evaluate(self, dataset): return self._java_obj.evaluate(dataset._jdf) def isLargerBetter(self): + self._transfer_params_to_java() return self._java_obj.isLargerBetter() diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 2fda3f85455e..60e4237293ad 100644 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -267,25 +267,27 @@ def test_ngram(self): self.assertEquals(transformedDF.head().output, ["a b c d", "b c d e"]) - class HasInducedError(Params): def __init__(self): super(HasInducedError, self).__init__() - self.inducedError = Param(self, "inducedError", "Uniformly-distributed error added to feature") + self.inducedError = Param(self, "inducedError", + "Uniformly-distributed error added to feature") def getInducedError(self): return self.getOrDefault(self.inducedError) + class InducedErrorModel(Model, HasInducedError): def __init__(self): super(InducedErrorModel, self).__init__() - + def _transform(self, dataset): - return dataset.withColumn("prediction", + return dataset.withColumn("prediction", dataset.feature + (rand(0) * self.getInducedError())) + class InducedErrorEstimator(Estimator, HasInducedError): def __init__(self, inducedError=1.0): @@ -297,14 +299,15 @@ def _fit(self, dataset): self._copyValues(model) return model + class CrossValidatorTests(PySparkTestCase): def test_fit_minimize_metric(self): sqlContext = SQLContext(self.sc) dataset = sqlContext.createDataFrame([ - (10, 10.0), - (50, 50.0), - (100, 100.0), + (10, 10.0), + (50, 50.0), + (100, 100.0), (500, 500.0)] * 10, ["feature", "label"]) @@ -312,22 +315,23 @@ def test_fit_minimize_metric(self): evaluator = RegressionEvaluator(metricName="rmse") grid = (ParamGridBuilder() - .addGrid( iee.inducedError, [100.0, 0.0, 10000.0] ) - .build()) + .addGrid(iee.inducedError, [100.0, 0.0, 10000.0]) + .build()) cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator) cvModel = cv.fit(dataset) bestModel = cvModel.bestModel bestModelMetric = evaluator.evaluate(bestModel.transform(dataset)) - self.assertEqual(0.0, bestModel.getOrDefault('inducedError'), "Best model should have zero induced error") + self.assertEqual(0.0, bestModel.getOrDefault('inducedError'), + "Best model should have zero induced error") self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0") def test_fit_maximize_metric(self): sqlContext = SQLContext(self.sc) dataset = sqlContext.createDataFrame([ - (10, 10.0), - (50, 50.0), - (100, 100.0), + (10, 10.0), + (50, 50.0), + (100, 100.0), (500, 500.0)] * 10, ["feature", "label"]) @@ -335,14 +339,15 @@ def test_fit_maximize_metric(self): evaluator = RegressionEvaluator(metricName="r2") grid = (ParamGridBuilder() - .addGrid( iee.inducedError, [100.0, 0.0, 10000.0] ) - .build()) + .addGrid(iee.inducedError, [100.0, 0.0, 10000.0]) + .build()) cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator) cvModel = cv.fit(dataset) bestModel = cvModel.bestModel bestModelMetric = evaluator.evaluate(bestModel.transform(dataset)) - self.assertEqual(0.0, bestModel.getOrDefault('inducedError'), "Best model should have zero induced error") + self.assertEqual(0.0, bestModel.getOrDefault('inducedError'), + "Best model should have zero induced error") self.assertEqual(1.0, bestModelMetric, "Best model has R-squared of 1") From 84c8a405e40d7428748e89819ba112bd87aa4869 Mon Sep 17 00:00:00 2001 From: noelsmith Date: Thu, 27 Aug 2015 23:57:16 +0100 Subject: [PATCH 6/6] Style fix - missed a line of whitespace --- python/pyspark/ml/tuning.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index 7f6bbebfe0f6..cae778869e9c 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -223,7 +223,7 @@ def _fit(self, dataset): # TODO: duplicate evaluator to take extra params from input metric = eva.evaluate(model.transform(validation, epm[j])) metrics[j] += metric - + if eva.isLargerBetter(): bestIndex = np.argmax(metrics) else: