From 35063ab150c46989a8f1a78732fa9a25a98eb9f9 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Thu, 10 Mar 2016 17:53:47 +0800 Subject: [PATCH 1/7] Initial version of GeneralizedLinearRegressionSummary --- .../IterativelyReweightedLeastSquares.scala | 9 +- .../GeneralizedLinearRegression.scala | 280 ++++++++++++++- .../GeneralizedLinearRegressionSuite.scala | 327 ++++++++++++++++++ 3 files changed, 610 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala index 6aa44e6ba723..edb8166d95ae 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala @@ -26,10 +26,14 @@ import org.apache.spark.rdd.RDD * Model fitted by [[IterativelyReweightedLeastSquares]]. * @param coefficients model coefficients * @param intercept model intercept + * @param diagInvAtWA diagonal of matrix (A^T * W * A)^-1 in the last iteration + * @param numIterations number of iterations */ private[ml] class IterativelyReweightedLeastSquaresModel( val coefficients: DenseVector, - val intercept: Double) extends Serializable + val intercept: Double, + val diagInvAtWA: DenseVector, + val numIterations: Int) extends Serializable /** * Implements the method of iteratively reweighted least squares (IRLS) which is used to solve @@ -103,6 +107,7 @@ private[ml] class IterativelyReweightedLeastSquares( } - new IterativelyReweightedLeastSquaresModel(model.coefficients, model.intercept) + new IterativelyReweightedLeastSquaresModel(model.coefficients, + model.intercept, model.diagInvAtWA, iter) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index de1dff942114..a8b7ca9c3ab7 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -17,7 +17,7 @@ package org.apache.spark.ml.regression -import breeze.stats.distributions.{Gaussian => GD} +import breeze.stats.distributions.{Gaussian => GD, StudentsT} import org.apache.hadoop.fs.Path import org.apache.spark.{Logging, SparkException} @@ -217,7 +217,17 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val val model = copyValues( new GeneralizedLinearRegressionModel(uid, wlsModel.coefficients, wlsModel.intercept) .setParent(this)) - return model + // Handle possible missing or invalid prediction columns + val (summaryModel, predictionColName) = model.findSummaryModelAndPredictionCol() + val trainingSummary = new GeneralizedLinearRegressionSummary( + summaryModel.transform(dataset), + predictionColName, + familyObj, + linkObj, + model, + wlsModel.diagInvAtWA.toArray, + 1) + return model.setSummary(trainingSummary) } // Fit Generalized Linear Model by iteratively reweighted least squares (IRLS). @@ -229,7 +239,18 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val val model = copyValues( new GeneralizedLinearRegressionModel(uid, irlsModel.coefficients, irlsModel.intercept) .setParent(this)) - model + // Handle possible missing or invalid prediction columns + val (summaryModel, predictionColName) = model.findSummaryModelAndPredictionCol() + val trainingSummary = new GeneralizedLinearRegressionSummary( + summaryModel.transform(dataset), + predictionColName, + familyObj, + linkObj, + model, + irlsModel.diagInvAtWA.toArray, + irlsModel.numIterations) + + model.setSummary(trainingSummary) } @Since("2.0.0") @@ -318,6 +339,14 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine /** The variance of the endogenous variable's mean, given the value mu. */ def variance(mu: Double): Double + /** + * Deviance of (y, mu) pair. + * Deviance is usually defined as twice the loglikelihood ratio. + */ + def deviance(y: Double, mu: Double, weight: Double): Double + + def aic(predictions: RDD[(Double, Double, Double)], deviance: Double, weightSum: Double): Double + /** Trim the fitted value so that it will be in valid range. */ def project(mu: Double): Double = mu } @@ -348,7 +377,17 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine override def initialize(y: Double, weight: Double): Double = y - def variance(mu: Double): Double = 1.0 + override def variance(mu: Double): Double = 1.0 + + override def deviance(y: Double, mu: Double, weight: Double): Double = { + weight * math.pow(y - mu, 2.0) + } + + override def aic(predictions: RDD[(Double, Double, Double)], deviance: Double, weightSum: Double): Double = { + val nobs = predictions.count() + val wt = predictions.map(x => math.log(x._3)).sum() + nobs * (math.log(deviance / nobs * 2.0 * 3.141593) + 1.0) + 2.0 - wt + } override def project(mu: Double): Double = { if (mu.isNegInfinity) { @@ -378,6 +417,17 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine override def variance(mu: Double): Double = mu * (1.0 - mu) + override def deviance(y: Double, mu: Double, weight: Double): Double = { + val my = 1.0 - y + 2.0 * weight * (y * math.log(math.max(y, 1.0) / mu) + my * math.log(math.max(my, 1.0) / (1.0 - mu))) + } + + override def aic(predictions: RDD[(Double, Double, Double)], deviance: Double, weightSum: Double): Double = { + predictions.map { case (y: Double, mu: Double, weight: Double) => + weight * breeze.stats.distributions.Binomial(1, mu).logProbabilityOf(math.round(y).toInt) + }.sum() * (-2.0) + } + override def project(mu: Double): Double = { if (mu < epsilon) { epsilon @@ -405,6 +455,16 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine override def variance(mu: Double): Double = mu + override def deviance(y: Double, mu: Double, weight: Double): Double = { + 2 * weight * (y * math.log(y / mu) - (y - mu)) + } + + override def aic(predictions: RDD[(Double, Double, Double)], deviance: Double, weightSum: Double): Double = { + predictions.map { case (y: Double, mu: Double, weight: Double) => + weight * breeze.stats.distributions.Poisson(mu).logProbabilityOf(y.toInt) + }.sum() * (-2.0) + } + override def project(mu: Double): Double = { if (mu < epsilon) { epsilon @@ -432,6 +492,18 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine override def variance(mu: Double): Double = math.pow(mu, 2.0) + override def deviance(y: Double, mu: Double, weight: Double): Double = { + val x = if (y == 0.0) 1.0 else y / mu + -2.0 * weight * (math.log(x) - (y - mu)/mu) + } + + override def aic(predictions: RDD[(Double, Double, Double)], deviance: Double, weightSum: Double): Double = { + val disp = deviance / weightSum + predictions.map { case (y: Double, mu: Double, weight: Double) => + weight * breeze.stats.distributions.Gamma(1.0 / disp, mu * disp).logPdf(y) + }.sum() * (-2.0) + 2.0 + } + override def project(mu: Double): Double = { if (mu < epsilon) { epsilon @@ -573,6 +645,36 @@ class GeneralizedLinearRegressionModel private[ml] ( familyAndLink.fitted(eta) } + private var trainingSummary: Option[GeneralizedLinearRegressionSummary] = None + + def summary: GeneralizedLinearRegressionSummary = trainingSummary match { + case Some(summ) => summ + case None => + throw new SparkException( + "No training summary available for this GeneralizedLinearRegressionModel", + new NullPointerException()) + } + + private[regression] def setSummary(summary: GeneralizedLinearRegressionSummary): this.type = { + this.trainingSummary = Some(summary) + this + } + + /** + * If the prediction column is set returns the current model and prediction column, + * otherwise generates a new column and sets it as the prediction column on a new copy + * of the current model. + */ + private[regression] def findSummaryModelAndPredictionCol(): ( + GeneralizedLinearRegressionModel, String) = { + $(predictionCol) match { + case "" => + val predictionColName = "prediction_" + java.util.UUID.randomUUID.toString() + (copy(ParamMap.empty).setPredictionCol(predictionColName), predictionColName) + case p => (this, p) + } + } + @Since("2.0.0") override def copy(extra: ParamMap): GeneralizedLinearRegressionModel = { copyValues(new GeneralizedLinearRegressionModel(uid, coefficients, intercept), extra) @@ -633,3 +735,173 @@ object GeneralizedLinearRegressionModel extends MLReadable[GeneralizedLinearRegr } } } + +class GeneralizedLinearRegressionSummary private[regression] ( + @transient val predictions: DataFrame, + val predictionCol: String, + val family: GeneralizedLinearRegression.Family, + val link: GeneralizedLinearRegression.Link, + val model: GeneralizedLinearRegressionModel, + private val diagInvAtWA: Array[Double], + val numIterations: Int) extends Serializable { + + import GeneralizedLinearRegression._ + + /** Number of instances in DataFrame predictions */ + lazy val numInstances: Long = predictions.count() + + lazy val rank: Long = if (model.getFitIntercept) { + model.coefficients.size + 1 + } else { + model.coefficients.size + } + + /** Degrees of freedom */ + lazy val degreesOfFreedom: Long = { + numInstances - rank + } + + // df.residual + lazy val residualDegreeOfFreedom: Long = degreesOfFreedom + + // df.null + lazy val residualDegreeOfFreedomNull: Long = if (model.getFitIntercept) { + numInstances - 1 + } else { + numInstances + } + + lazy val devianceResidualsMax: Double = { + devianceResiduals.select(max(col("devianceResiduals"))).first().getDouble(0) + } + + private lazy val devianceResiduals: DataFrame = { + val drUDF = udf { (y: Double, mu: Double, weight: Double) => + val r = math.sqrt(math.max(family.deviance(y, mu, weight), 0.0)) + if (y > mu) { + r + } else { + -1.0 * r + } + } + val w = if (model.getWeightCol.isEmpty) lit(1.0) else col(model.getWeightCol) + predictions.withColumn("devianceResiduals", drUDF(col(model.getLabelCol), col(predictionCol), w)) + } + + private lazy val pearsonResiduals: DataFrame = { + val prUDF = udf { mu: Double => family.variance(mu) } + val w = if (model.getWeightCol.isEmpty) lit(1.0) else col(model.getWeightCol) + predictions.select(col(model.getLabelCol).minus(col(model.getPredictionCol)) + .multiply(sqrt(w)).divide(sqrt(prUDF(col(model.getPredictionCol))))).as("pearsonResiduals") + } + + private lazy val workingResiduals: DataFrame = { + val wrUDF = udf { (y: Double, mu: Double) => + (y - mu) * link.deriv(mu) + } + predictions.withColumn("workingResiduals", wrUDF(col(model.getLabelCol), col(predictionCol))) + } + + private lazy val responseResiduals: DataFrame = { + predictions.select(col(model.getLabelCol).minus(col(model.getPredictionCol))).as("responseResiduals") + } + + def residuals(residualsType: String = "deviance"): DataFrame = { + residualsType match { + case "deviance" => devianceResiduals + case "pearson" => pearsonResiduals + case "working" => workingResiduals + case "response" => responseResiduals + case other => throw new UnsupportedOperationException( + s"The residuals type $other is not supported by Generialized Linear Regression.") + } + } + + lazy val nullDeviance: Double = { + val w = if (model.getWeightCol.isEmpty) lit(1.0) else col(model.getWeightCol) + + val wtdmu: Double = if (model.getFitIntercept) { + val t1 = predictions.select(sum(w.multiply(col(model.getLabelCol)))).first().getDouble(0) + val t2 = predictions.select(sum(w)).first().getDouble(0) + t1 / t2 + } else { + link.unlink(0.0) + } + + predictions.select(col(model.getLabelCol), w).rdd.map { + case Row(y: Double, weight: Double) => + family.deviance(y, wtdmu, weight) + }.sum() + } + + lazy val deviance: Double = { + val w = if (model.getWeightCol.isEmpty) lit(1.0) else col(model.getWeightCol) + predictions.select(col(model.getLabelCol), col(predictionCol), w).rdd.map { + case Row(label: Double, pred: Double, weight: Double) => + family.deviance(label, pred, weight) + }.sum() + } + + lazy val dispersion: Double = if ( + model.getFamily == Binomial.name || model.getFamily == Poisson.name) { + 1.0 + } else { + val w = if (model.getWeightCol.isEmpty) lit(1.0) else col(model.getWeightCol) + val rss = predictions.select(col(model.getPredictionCol), col(model.getLabelCol), w).rdd.map { + case Row(pred: Double, label: Double, weight: Double) => + math.pow(label - pred, 2.0) / family.variance(pred) * weight + }.sum() + + rss / degreesOfFreedom + } + + lazy val aic: Double = { + val w = if (model.getWeightCol.isEmpty) lit(1.0) else col(model.getWeightCol) + val weightSum = predictions.select(w).agg(sum(w)).first().getAs[Double](0) + val t = predictions.select(col(model.getPredictionCol), col(model.getLabelCol), w).rdd.map { + case Row(pred: Double, label: Double, weight: Double) => + (label, pred, weight) + } + family.aic(t, deviance, weightSum) + 2 * rank + } + + /** + * Standard error of estimated coefficients and intercept. + */ + lazy val coefficientStandardErrors: Array[Double] = { + diagInvAtWA.map(_ * dispersion).map(math.sqrt(_)) + } + + /** + * T-statistic of estimated coefficients and intercept. + */ + lazy val tValues: Array[Double] = { + if (diagInvAtWA.length == 1 && diagInvAtWA(0) == 0) { + throw new UnsupportedOperationException( + "No t-statistic available for this LinearRegressionModel") + } else { + val estimate = if (model.getFitIntercept) { + Array.concat(model.coefficients.toArray, Array(model.intercept)) + } else { + model.coefficients.toArray + } + estimate.zip(coefficientStandardErrors).map { x => x._1 / x._2 } + } + } + + /** + * Two-sided p-value of estimated coefficients and intercept. + */ + lazy val pValues: Array[Double] = { + if (diagInvAtWA.length == 1 && diagInvAtWA(0) == 0) { + throw new UnsupportedOperationException( + "No p-value available for this LinearRegressionModel") + } else { + if (model.getFamily == Binomial.name || model.getFamily == Poisson.name) { + tValues.map { x => 2.0 * (1.0 - GD(0.0, 1.0).cdf(math.abs(x))) } + } else { + tValues.map { x => 2.0 * (1.0 - StudentsT(degreesOfFreedom.toDouble).cdf(math.abs(x))) } + } + } + } +} diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala index 618304ad19ab..23945f147efd 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.ml.regression import scala.util.Random import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} import org.apache.spark.mllib.classification.LogisticRegressionSuite._ @@ -29,6 +30,7 @@ import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.functions._ class GeneralizedLinearRegressionSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { @@ -466,6 +468,331 @@ class GeneralizedLinearRegressionSuite } } + test("glm summary: gaussian family with weight") { + /* + R code: + + A <- matrix(c(0, 1, 2, 3, 5, 7, 11, 13), 4, 2) + b <- c(17, 19, 23, 29) + w <- c(1, 2, 3, 4) + df <- as.data.frame(cbind(A, b)) + */ + val datasetWithWeight = sqlContext.createDataFrame(sc.parallelize(Seq( + Instance(17.0, 1.0, Vectors.dense(0.0, 5.0).toSparse), + Instance(19.0, 2.0, Vectors.dense(1.0, 7.0)), + Instance(23.0, 3.0, Vectors.dense(2.0, 11.0)), + Instance(29.0, 4.0, Vectors.dense(3.0, 13.0)) + ), 2)) + /* + R code: + + model <- glm(formula = "b ~ .", family="gaussian", data = df, weights = w) + summary(model) + + Deviance Residuals: + 1 2 3 4 + 1.920 -1.358 -1.109 0.960 + + Coefficients: + Estimate Std. Error t value Pr(>|t|) + (Intercept) 18.080 9.608 1.882 0.311 + V1 6.080 5.556 1.094 0.471 + V2 -0.600 1.960 -0.306 0.811 + + (Dispersion parameter for gaussian family taken to be 7.68) + + Null deviance: 202.00 on 3 degrees of freedom + Residual deviance: 7.68 on 1 degrees of freedom + AIC: 18.783 + + Number of Fisher Scoring iterations: 2 + */ + val trainer = new GeneralizedLinearRegression() + .setWeightCol("weight") + + val model = trainer.fit(datasetWithWeight) + + val coefficientsR = Vectors.dense(Array(6.080, -0.600)) + val interceptR = 18.080 + val devianceResidualsR = Array(1.920, -1.358, -1.109, 0.960) + val seCoefR = Array(5.556, 1.960, 9.608) + val tValsR = Array(1.094, -0.306, 1.882) + val pValsR = Array(0.471, 0.811, 0.311) + val dispersionR = 7.68 + val nullDevianceR = 202.00 + val residualDevianceR = 7.68 + val residualDegreeOfFreedomNullR = 3 + val residualDegreeOfFreedomR = 1 + val aicR = 18.783 + + val summary = model.summary + val devianceResiduals = summary.residuals() + .select(col("devianceResiduals")) + .collect() + .map(_.getAs[Double](0)) + + assert(model.coefficients ~== coefficientsR absTol 1E-3) + assert(model.intercept ~== interceptR absTol 1E-3) + devianceResiduals.zip(devianceResidualsR).foreach { x => + assert(x._1 ~== x._2 absTol 1E-3) } + summary.coefficientStandardErrors.zip(seCoefR).foreach{ x => + assert(x._1 ~== x._2 absTol 1E-3) } + summary.tValues.zip(tValsR).foreach{ x => assert(x._1 ~== x._2 absTol 1E-3) } + summary.pValues.zip(pValsR).foreach{ x => assert(x._1 ~== x._2 absTol 1E-3) } + assert(summary.dispersion ~== dispersionR absTol 1E-3) + assert(summary.nullDeviance ~== nullDevianceR absTol 1E-3) + assert(summary.deviance ~== residualDevianceR absTol 1E-3) + assert(summary.residualDegreeOfFreedom === residualDegreeOfFreedomR) + assert(summary.residualDegreeOfFreedomNull === residualDegreeOfFreedomNullR) + assert(summary.aic ~== aicR absTol 1E-3) + } + + test("glm summary: binomial family with weight") { + /* + R code: + + A <- matrix(c(0, 1, 2, 3, 5, 2, 1, 3), 4, 2) + b <- c(1, 0, 1, 0) + w <- c(1, 2, 3, 4) + df <- as.data.frame(cbind(A, b)) + */ + val datasetWithWeight = sqlContext.createDataFrame(sc.parallelize(Seq( + Instance(1.0, 1.0, Vectors.dense(0.0, 5.0).toSparse), + Instance(0.0, 2.0, Vectors.dense(1.0, 2.0)), + Instance(1.0, 3.0, Vectors.dense(2.0, 1.0)), + Instance(0.0, 4.0, Vectors.dense(3.0, 3.0)) + ), 2)) + /* + R code: + + model <- glm(formula = "b ~ . -1", family="binomial", data = df, weights = w) + summary(model) + + Deviance Residuals: + 1 2 3 4 + 1.273 -1.437 2.533 -1.556 + + Coefficients: + Estimate Std. Error z value Pr(>|z|) + V1 -0.30217 0.46242 -0.653 0.513 + V2 -0.04452 0.37124 -0.120 0.905 + + (Dispersion parameter for binomial family taken to be 1) + + Null deviance: 13.863 on 4 degrees of freedom + Residual deviance: 12.524 on 2 degrees of freedom + AIC: 16.524 + + Number of Fisher Scoring iterations: 5 + */ + val trainer = new GeneralizedLinearRegression() + .setFamily("binomial") + .setWeightCol("weight") + .setFitIntercept(false) + + val model = trainer.fit(datasetWithWeight) + + val coefficientsR = Vectors.dense(Array(-0.30217, -0.04452)) + val interceptR = 0.0 + val devianceResidualsR = Array(1.273, -1.437, 2.533, -1.556) + val seCoefR = Array(0.46242, 0.37124) + val tValsR = Array(-0.653, -0.120) + val pValsR = Array(0.513, 0.905) + val dispersionR = 1.0 + val nullDevianceR = 13.863 + val residualDevianceR = 12.524 + val residualDegreeOfFreedomNullR = 4 + val residualDegreeOfFreedomR = 2 + val aicR = 16.524 + + val summary = model.summary + val devianceResiduals = summary.residuals() + .select(col("devianceResiduals")) + .collect() + .map(_.getAs[Double](0)) + + assert(model.coefficients ~== coefficientsR absTol 1E-3) + assert(model.intercept ~== interceptR absTol 1E-3) + devianceResiduals.zip(devianceResidualsR).foreach { x => + assert(x._1 ~== x._2 absTol 1E-3) } + summary.coefficientStandardErrors.zip(seCoefR).foreach{ x => + assert(x._1 ~== x._2 absTol 1E-3) } + summary.tValues.zip(tValsR).foreach{ x => assert(x._1 ~== x._2 absTol 1E-3) } + summary.pValues.zip(pValsR).foreach{ x => assert(x._1 ~== x._2 absTol 1E-3) } + assert(summary.dispersion ~== dispersionR absTol 1E-3) + assert(summary.nullDeviance ~== nullDevianceR absTol 1E-3) + assert(summary.deviance ~== residualDevianceR absTol 1E-3) + assert(summary.residualDegreeOfFreedom === residualDegreeOfFreedomR) + assert(summary.residualDegreeOfFreedomNull === residualDegreeOfFreedomNullR) + assert(summary.aic ~== aicR absTol 1E-3) + } + + test("glm summary: poisson family with weight") { + /* + R code: + + A <- matrix(c(0, 1, 2, 3, 5, 7, 11, 13), 4, 2) + b <- c(2, 8, 3, 9) + w <- c(1, 2, 3, 4) + df <- as.data.frame(cbind(A, b)) + */ + val datasetWithWeight = sqlContext.createDataFrame(sc.parallelize(Seq( + Instance(2.0, 1.0, Vectors.dense(0.0, 5.0).toSparse), + Instance(8.0, 2.0, Vectors.dense(1.0, 7.0)), + Instance(3.0, 3.0, Vectors.dense(2.0, 11.0)), + Instance(9.0, 4.0, Vectors.dense(3.0, 13.0)) + ), 2)) + /* + R code: + + model <- glm(formula = "b ~ .", family="poisson", data = df, weights = w) + summary(model) + + Deviance Residuals: + 1 2 3 4 + -0.28952 0.11048 0.14839 -0.07268 + + Coefficients: + Estimate Std. Error z value Pr(>|z|) + (Intercept) 6.2999 1.6086 3.916 8.99e-05 *** + V1 3.3241 1.0184 3.264 0.00110 ** + V2 -1.0818 0.3522 -3.071 0.00213 ** + --- + Signif. codes: 0 ‘***’ 0.001 ‘**’ 0.01 ‘*’ 0.05 ‘.’ 0.1 ‘ ’ 1 + + (Dispersion parameter for poisson family taken to be 1) + + Null deviance: 15.38066 on 3 degrees of freedom + Residual deviance: 0.12333 on 1 degrees of freedom + AIC: 41.803 + + Number of Fisher Scoring iterations: 3 + */ + val trainer = new GeneralizedLinearRegression() + .setFamily("poisson") + .setWeightCol("weight") + .setFitIntercept(true) + + val model = trainer.fit(datasetWithWeight) + + val coefficientsR = Vectors.dense(Array(3.3241, -1.0818)) + val interceptR = 6.2999 + val devianceResidualsR = Array(-0.28952, 0.11048, 0.14839, -0.07268) + val seCoefR = Array(1.0184, 0.3522, 1.6086) + val tValsR = Array(3.264, -3.071, 3.916) + val pValsR = Array(0.00110, 0.00213, 0.00009) + val dispersionR = 1.0 + val nullDevianceR = 15.38066 + val residualDevianceR = 0.12333 + val residualDegreeOfFreedomNullR = 3 + val residualDegreeOfFreedomR = 1 + val aicR = 41.803 + + val summary = model.summary + val devianceResiduals = summary.residuals() + .select(col("devianceResiduals")) + .collect() + .map(_.getAs[Double](0)) + + assert(model.coefficients ~== coefficientsR absTol 1E-3) + assert(model.intercept ~== interceptR absTol 1E-3) + devianceResiduals.zip(devianceResidualsR).foreach { x => + assert(x._1 ~== x._2 absTol 1E-3) } + summary.coefficientStandardErrors.zip(seCoefR).foreach{ x => + assert(x._1 ~== x._2 absTol 1E-3) } + summary.tValues.zip(tValsR).foreach{ x => assert(x._1 ~== x._2 absTol 1E-3) } + summary.pValues.zip(pValsR).foreach{ x => assert(x._1 ~== x._2 absTol 1E-3) } + assert(summary.dispersion ~== dispersionR absTol 1E-3) + assert(summary.nullDeviance ~== nullDevianceR absTol 1E-3) + assert(summary.deviance ~== residualDevianceR absTol 1E-3) + assert(summary.residualDegreeOfFreedom === residualDegreeOfFreedomR) + assert(summary.residualDegreeOfFreedomNull === residualDegreeOfFreedomNullR) + assert(summary.aic ~== aicR absTol 1E-3) + } + + test("glm summary: gamma family with weight") { + /* + R code: + + A <- matrix(c(0, 1, 2, 3, 5, 7, 11, 13), 4, 2) + b <- c(2, 8, 3, 9) + w <- c(1, 2, 3, 4) + df <- as.data.frame(cbind(A, b)) + */ + val datasetWithWeight = sqlContext.createDataFrame(sc.parallelize(Seq( + Instance(2.0, 1.0, Vectors.dense(0.0, 5.0).toSparse), + Instance(8.0, 2.0, Vectors.dense(1.0, 7.0)), + Instance(3.0, 3.0, Vectors.dense(2.0, 11.0)), + Instance(9.0, 4.0, Vectors.dense(3.0, 13.0)) + ), 2)) + /* + R code: + + model <- glm(formula = "b ~ .", family="Gamma", data = df, weights = w) + summary(model) + + Deviance Residuals: + 1 2 3 4 + -0.26343 0.05761 0.12818 -0.03484 + + Coefficients: + Estimate Std. Error t value Pr(>|t|) + (Intercept) -0.81511 0.23449 -3.476 0.178 + V1 -0.72730 0.16137 -4.507 0.139 + V2 0.23894 0.05481 4.359 0.144 + + (Dispersion parameter for Gamma family taken to be 0.07986091) + + Null deviance: 2.937462 on 3 degrees of freedom + Residual deviance: 0.090358 on 1 degrees of freedom + AIC: 23.202 + + Number of Fisher Scoring iterations: 4 + */ + val trainer = new GeneralizedLinearRegression() + .setFamily("gamma") + .setWeightCol("weight") + + val model = trainer.fit(datasetWithWeight) + + println(model.coefficients) + println(model.intercept) + + val coefficientsR = Vectors.dense(Array(-0.72730, 0.23894)) + val interceptR = -0.81511 + val devianceResidualsR = Array(-0.26343, 0.05761, 0.12818, -0.03484) + val seCoefR = Array(0.16137, 0.05481, 0.23449) + val tValsR = Array(-4.507, 4.359, -3.476) + val pValsR = Array(0.139, 0.144, 0.178) + val dispersionR = 0.07986091 + val nullDevianceR = 2.937462 + val residualDevianceR = 0.090358 + val residualDegreeOfFreedomNullR = 3 + val residualDegreeOfFreedomR = 1 + val aicR = 23.202 + + val summary = model.summary + val devianceResiduals = summary.residuals() + .select(col("devianceResiduals")) + .collect() + .map(_.getAs[Double](0)) + + assert(model.coefficients ~== coefficientsR absTol 1E-3) + assert(model.intercept ~== interceptR absTol 1E-3) + devianceResiduals.zip(devianceResidualsR).foreach { x => + assert(x._1 ~== x._2 absTol 1E-3) } + summary.coefficientStandardErrors.zip(seCoefR).foreach{ x => + assert(x._1 ~== x._2 absTol 1E-3) } + summary.tValues.zip(tValsR).foreach{ x => assert(x._1 ~== x._2 absTol 1E-3) } + summary.pValues.zip(pValsR).foreach{ x => assert(x._1 ~== x._2 absTol 1E-3) } + assert(summary.dispersion ~== dispersionR absTol 1E-3) + assert(summary.nullDeviance ~== nullDevianceR absTol 1E-3) + assert(summary.deviance ~== residualDevianceR absTol 1E-3) + assert(summary.residualDegreeOfFreedom === residualDegreeOfFreedomR) + assert(summary.residualDegreeOfFreedomNull === residualDegreeOfFreedomNullR) + assert(summary.aic ~== aicR absTol 1E-3) + } + test("read/write") { def checkModelData( model: GeneralizedLinearRegressionModel, From 6bb7cbe2dfb38220c4912eb333c6ae88c932754d Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Fri, 11 Mar 2016 18:19:05 +0800 Subject: [PATCH 2/7] add test cases & update API doc --- .../GeneralizedLinearRegression.scala | 49 +++++-- .../GeneralizedLinearRegressionSuite.scala | 136 +++++++++++++++++- 2 files changed, 172 insertions(+), 13 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index a8b7ca9c3ab7..74e7fdf3dfe2 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -736,6 +736,20 @@ object GeneralizedLinearRegressionModel extends MLReadable[GeneralizedLinearRegr } } +/** + * :: Experimental :: + * Summarizing Generalized Linear regression Fits. + * + * @param predictions predictions outputted by the model's `transform` method + * @param predictionCol field in "predictions" which gives the prediction value of each instance + * @param family the family object of the model + * @param link the link object of the model + * @param model the model that should be summarized + * @param diagInvAtWA diagonal of matrix (A^T * W * A)^-1 in the last iteration + * @param numIterations number of iterations + */ +@Since("2.0.0") +@Experimental class GeneralizedLinearRegressionSummary private[regression] ( @transient val predictions: DataFrame, val predictionCol: String, @@ -750,6 +764,7 @@ class GeneralizedLinearRegressionSummary private[regression] ( /** Number of instances in DataFrame predictions */ lazy val numInstances: Long = predictions.count() + /** The numeric rank of the fitted linear model */ lazy val rank: Long = if (model.getFitIntercept) { model.coefficients.size + 1 } else { @@ -761,20 +776,16 @@ class GeneralizedLinearRegressionSummary private[regression] ( numInstances - rank } - // df.residual + /** The residual degrees of freedom */ lazy val residualDegreeOfFreedom: Long = degreesOfFreedom - // df.null + /** The residual degrees of freedom for the null model */ lazy val residualDegreeOfFreedomNull: Long = if (model.getFitIntercept) { numInstances - 1 } else { numInstances } - lazy val devianceResidualsMax: Double = { - devianceResiduals.select(max(col("devianceResiduals"))).first().getDouble(0) - } - private lazy val devianceResiduals: DataFrame = { val drUDF = udf { (y: Double, mu: Double, weight: Double) => val r = math.sqrt(math.max(family.deviance(y, mu, weight), 0.0)) @@ -785,27 +796,32 @@ class GeneralizedLinearRegressionSummary private[regression] ( } } val w = if (model.getWeightCol.isEmpty) lit(1.0) else col(model.getWeightCol) - predictions.withColumn("devianceResiduals", drUDF(col(model.getLabelCol), col(predictionCol), w)) + predictions.select(drUDF(col(model.getLabelCol), col(predictionCol), w).as("devianceResiduals")) } private lazy val pearsonResiduals: DataFrame = { val prUDF = udf { mu: Double => family.variance(mu) } val w = if (model.getWeightCol.isEmpty) lit(1.0) else col(model.getWeightCol) predictions.select(col(model.getLabelCol).minus(col(model.getPredictionCol)) - .multiply(sqrt(w)).divide(sqrt(prUDF(col(model.getPredictionCol))))).as("pearsonResiduals") + .multiply(sqrt(w)).divide(sqrt(prUDF(col(model.getPredictionCol)))).as("pearsonResiduals")) } private lazy val workingResiduals: DataFrame = { val wrUDF = udf { (y: Double, mu: Double) => (y - mu) * link.deriv(mu) } - predictions.withColumn("workingResiduals", wrUDF(col(model.getLabelCol), col(predictionCol))) + predictions.select(wrUDF(col(model.getLabelCol), col(predictionCol)).as("workingResiduals")) } private lazy val responseResiduals: DataFrame = { - predictions.select(col(model.getLabelCol).minus(col(model.getPredictionCol))).as("responseResiduals") + predictions.select(col(model.getLabelCol).minus(col(model.getPredictionCol)).as("responseResiduals")) } + /** + * Get the residuals of the fitted model by type. + * @param residualsType The type of residuals which should be returned. + * Supported options: deviance(default), pearson, working and response. + */ def residuals(residualsType: String = "deviance"): DataFrame = { residualsType match { case "deviance" => devianceResiduals @@ -817,6 +833,9 @@ class GeneralizedLinearRegressionSummary private[regression] ( } } + /** + * The deviance for the null model. + */ lazy val nullDeviance: Double = { val w = if (model.getWeightCol.isEmpty) lit(1.0) else col(model.getWeightCol) @@ -834,6 +853,9 @@ class GeneralizedLinearRegressionSummary private[regression] ( }.sum() } + /** + * The deviance for the fitted model. + */ lazy val deviance: Double = { val w = if (model.getWeightCol.isEmpty) lit(1.0) else col(model.getWeightCol) predictions.select(col(model.getLabelCol), col(predictionCol), w).rdd.map { @@ -842,6 +864,12 @@ class GeneralizedLinearRegressionSummary private[regression] ( }.sum() } + /** + * The dispersion of the fitted model. + * It is taken as 1.0 for the "binomial" and "poisson" families, and otherwise + * estimated by the residual Pearson's Chi-Squared statistic(which is defined as + * sum of the squares of the Pearson residuals) divided by the residual degrees of freedom. + */ lazy val dispersion: Double = if ( model.getFamily == Binomial.name || model.getFamily == Poisson.name) { 1.0 @@ -855,6 +883,7 @@ class GeneralizedLinearRegressionSummary private[regression] ( rss / degreesOfFreedom } + /** Akaike's "An Information Criterion" for the fitted model */ lazy val aic: Double = { val w = if (model.getWeightCol.isEmpty) lit(1.0) else col(model.getWeightCol) val weightSum = predictions.select(w).agg(sum(w)).first().getAs[Double](0) diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala index 23945f147efd..c7ba2cc3be70 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala @@ -506,6 +506,18 @@ class GeneralizedLinearRegressionSuite AIC: 18.783 Number of Fisher Scoring iterations: 2 + + residuals(model, type="pearson") + 1 2 3 4 + 1.920000 -1.357645 -1.108513 0.960000 + + residuals(model, type="working") + 1 2 3 4 + 1.92 -0.96 -0.64 0.48 + + residuals(model, type="response") + 1 2 3 4 + 1.92 -0.96 -0.64 0.48 */ val trainer = new GeneralizedLinearRegression() .setWeightCol("weight") @@ -515,6 +527,9 @@ class GeneralizedLinearRegressionSuite val coefficientsR = Vectors.dense(Array(6.080, -0.600)) val interceptR = 18.080 val devianceResidualsR = Array(1.920, -1.358, -1.109, 0.960) + val pearsonResidualsR = Array(1.920000, -1.357645, -1.108513, 0.960000) + val workingResidualsR = Array(1.92, -0.96, -0.64, 0.48) + val responseResidualsR = Array(1.92, -0.96, -0.64, 0.48) val seCoefR = Array(5.556, 1.960, 9.608) val tValsR = Array(1.094, -0.306, 1.882) val pValsR = Array(0.471, 0.811, 0.311) @@ -526,15 +541,34 @@ class GeneralizedLinearRegressionSuite val aicR = 18.783 val summary = model.summary + val devianceResiduals = summary.residuals() .select(col("devianceResiduals")) .collect() .map(_.getAs[Double](0)) + val pearsonResiduals = summary.residuals("pearson") + .select(col("pearsonResiduals")) + .collect() + .map(_.getAs[Double](0)) + val workingResiduals = summary.residuals("working") + .select(col("workingResiduals")) + .collect() + .map(_.getAs[Double](0)) + val responseResiduals = summary.residuals("response") + .select(col("responseResiduals")) + .collect() + .map(_.getAs[Double](0)) assert(model.coefficients ~== coefficientsR absTol 1E-3) assert(model.intercept ~== interceptR absTol 1E-3) devianceResiduals.zip(devianceResidualsR).foreach { x => assert(x._1 ~== x._2 absTol 1E-3) } + pearsonResiduals.zip(pearsonResidualsR).foreach { x => + assert(x._1 ~== x._2 absTol 1E-3) } + workingResiduals.zip(workingResidualsR).foreach { x => + assert(x._1 ~== x._2 absTol 1E-3) } + responseResiduals.zip(responseResidualsR).foreach { x => + assert(x._1 ~== x._2 absTol 1E-3) } summary.coefficientStandardErrors.zip(seCoefR).foreach{ x => assert(x._1 ~== x._2 absTol 1E-3) } summary.tValues.zip(tValsR).foreach{ x => assert(x._1 ~== x._2 absTol 1E-3) } @@ -584,6 +618,18 @@ class GeneralizedLinearRegressionSuite AIC: 16.524 Number of Fisher Scoring iterations: 5 + + residuals(model, type="pearson") + 1 2 3 4 + 1.117731 -1.162962 2.395838 -1.189005 + + residuals(model, type="working") + 1 2 3 4 + 2.249324 -1.676240 2.913346 -1.353433 + + residuals(model, type="response") + 1 2 3 4 + 0.5554219 -0.4034267 0.6567520 -0.2611382 */ val trainer = new GeneralizedLinearRegression() .setFamily("binomial") @@ -595,6 +641,9 @@ class GeneralizedLinearRegressionSuite val coefficientsR = Vectors.dense(Array(-0.30217, -0.04452)) val interceptR = 0.0 val devianceResidualsR = Array(1.273, -1.437, 2.533, -1.556) + val pearsonResidualsR = Array(1.117731, -1.162962, 2.395838, -1.189005) + val workingResidualsR = Array(2.249324, -1.676240, 2.913346, -1.353433) + val responseResidualsR = Array(0.5554219, -0.4034267, 0.6567520, -0.2611382) val seCoefR = Array(0.46242, 0.37124) val tValsR = Array(-0.653, -0.120) val pValsR = Array(0.513, 0.905) @@ -610,11 +659,29 @@ class GeneralizedLinearRegressionSuite .select(col("devianceResiduals")) .collect() .map(_.getAs[Double](0)) + val pearsonResiduals = summary.residuals("pearson") + .select(col("pearsonResiduals")) + .collect() + .map(_.getAs[Double](0)) + val workingResiduals = summary.residuals("working") + .select(col("workingResiduals")) + .collect() + .map(_.getAs[Double](0)) + val responseResiduals = summary.residuals("response") + .select(col("responseResiduals")) + .collect() + .map(_.getAs[Double](0)) assert(model.coefficients ~== coefficientsR absTol 1E-3) assert(model.intercept ~== interceptR absTol 1E-3) devianceResiduals.zip(devianceResidualsR).foreach { x => assert(x._1 ~== x._2 absTol 1E-3) } + pearsonResiduals.zip(pearsonResidualsR).foreach { x => + assert(x._1 ~== x._2 absTol 1E-3) } + workingResiduals.zip(workingResidualsR).foreach { x => + assert(x._1 ~== x._2 absTol 1E-3) } + responseResiduals.zip(responseResidualsR).foreach { x => + assert(x._1 ~== x._2 absTol 1E-3) } summary.coefficientStandardErrors.zip(seCoefR).foreach{ x => assert(x._1 ~== x._2 absTol 1E-3) } summary.tValues.zip(tValsR).foreach{ x => assert(x._1 ~== x._2 absTol 1E-3) } @@ -667,6 +734,18 @@ class GeneralizedLinearRegressionSuite AIC: 41.803 Number of Fisher Scoring iterations: 3 + + residuals(model, type="pearson") + 1 2 3 4 + -0.28043145 0.11099310 0.14963714 -0.07253611 + + residuals(model, type="working") + 1 2 3 4 + -0.17960679 0.02813593 0.05113852 -0.01201650 + + residuals(model, type="response") + 1 2 3 4 + -0.4378554 0.2189277 0.1459518 -0.1094638 */ val trainer = new GeneralizedLinearRegression() .setFamily("poisson") @@ -678,6 +757,9 @@ class GeneralizedLinearRegressionSuite val coefficientsR = Vectors.dense(Array(3.3241, -1.0818)) val interceptR = 6.2999 val devianceResidualsR = Array(-0.28952, 0.11048, 0.14839, -0.07268) + val pearsonResidualsR = Array(-0.28043145, 0.11099310, 0.14963714, -0.07253611) + val workingResidualsR = Array(-0.17960679, 0.02813593, 0.05113852, -0.01201650) + val responseResidualsR = Array(-0.4378554, 0.2189277, 0.1459518, -0.1094638) val seCoefR = Array(1.0184, 0.3522, 1.6086) val tValsR = Array(3.264, -3.071, 3.916) val pValsR = Array(0.00110, 0.00213, 0.00009) @@ -693,11 +775,29 @@ class GeneralizedLinearRegressionSuite .select(col("devianceResiduals")) .collect() .map(_.getAs[Double](0)) + val pearsonResiduals = summary.residuals("pearson") + .select(col("pearsonResiduals")) + .collect() + .map(_.getAs[Double](0)) + val workingResiduals = summary.residuals("working") + .select(col("workingResiduals")) + .collect() + .map(_.getAs[Double](0)) + val responseResiduals = summary.residuals("response") + .select(col("responseResiduals")) + .collect() + .map(_.getAs[Double](0)) assert(model.coefficients ~== coefficientsR absTol 1E-3) assert(model.intercept ~== interceptR absTol 1E-3) devianceResiduals.zip(devianceResidualsR).foreach { x => assert(x._1 ~== x._2 absTol 1E-3) } + pearsonResiduals.zip(pearsonResidualsR).foreach { x => + assert(x._1 ~== x._2 absTol 1E-3) } + workingResiduals.zip(workingResidualsR).foreach { x => + assert(x._1 ~== x._2 absTol 1E-3) } + responseResiduals.zip(responseResidualsR).foreach { x => + assert(x._1 ~== x._2 absTol 1E-3) } summary.coefficientStandardErrors.zip(seCoefR).foreach{ x => assert(x._1 ~== x._2 absTol 1E-3) } summary.tValues.zip(tValsR).foreach{ x => assert(x._1 ~== x._2 absTol 1E-3) } @@ -748,6 +848,18 @@ class GeneralizedLinearRegressionSuite AIC: 23.202 Number of Fisher Scoring iterations: 4 + + residuals(model, type="pearson") + 1 2 3 4 + -0.24082508 0.05839241 0.13135766 -0.03463621 + + residuals(model, type="working") + 1 2 3 4 + 0.091414181 -0.005374314 -0.027196998 0.001890910 + + residuals(model, type="response") + 1 2 3 4 + -0.6344390 0.3172195 0.2114797 -0.1586097 */ val trainer = new GeneralizedLinearRegression() .setFamily("gamma") @@ -755,12 +867,12 @@ class GeneralizedLinearRegressionSuite val model = trainer.fit(datasetWithWeight) - println(model.coefficients) - println(model.intercept) - val coefficientsR = Vectors.dense(Array(-0.72730, 0.23894)) val interceptR = -0.81511 val devianceResidualsR = Array(-0.26343, 0.05761, 0.12818, -0.03484) + val pearsonResidualsR = Array(-0.24082508, 0.05839241, 0.13135766, -0.03463621) + val workingResidualsR = Array(0.091414181, -0.005374314, -0.027196998, 0.001890910) + val responseResidualsR = Array(-0.6344390, 0.3172195, 0.2114797, -0.1586097) val seCoefR = Array(0.16137, 0.05481, 0.23449) val tValsR = Array(-4.507, 4.359, -3.476) val pValsR = Array(0.139, 0.144, 0.178) @@ -776,11 +888,29 @@ class GeneralizedLinearRegressionSuite .select(col("devianceResiduals")) .collect() .map(_.getAs[Double](0)) + val pearsonResiduals = summary.residuals("pearson") + .select(col("pearsonResiduals")) + .collect() + .map(_.getAs[Double](0)) + val workingResiduals = summary.residuals("working") + .select(col("workingResiduals")) + .collect() + .map(_.getAs[Double](0)) + val responseResiduals = summary.residuals("response") + .select(col("responseResiduals")) + .collect() + .map(_.getAs[Double](0)) assert(model.coefficients ~== coefficientsR absTol 1E-3) assert(model.intercept ~== interceptR absTol 1E-3) devianceResiduals.zip(devianceResidualsR).foreach { x => assert(x._1 ~== x._2 absTol 1E-3) } + pearsonResiduals.zip(pearsonResidualsR).foreach { x => + assert(x._1 ~== x._2 absTol 1E-3) } + workingResiduals.zip(workingResidualsR).foreach { x => + assert(x._1 ~== x._2 absTol 1E-3) } + responseResiduals.zip(responseResidualsR).foreach { x => + assert(x._1 ~== x._2 absTol 1E-3) } summary.coefficientStandardErrors.zip(seCoefR).foreach{ x => assert(x._1 ~== x._2 absTol 1E-3) } summary.tValues.zip(tValsR).foreach{ x => assert(x._1 ~== x._2 absTol 1E-3) } From 5d4c87b9ba9a88f2f8aee985e5f918bf02600ae9 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Mon, 14 Mar 2016 16:39:54 +0800 Subject: [PATCH 3/7] update AIC calculation --- .../GeneralizedLinearRegression.scala | 125 +++++++++--------- 1 file changed, 61 insertions(+), 64 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 74e7fdf3dfe2..3f14c3d4585a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -339,13 +339,12 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine /** The variance of the endogenous variable's mean, given the value mu. */ def variance(mu: Double): Double - /** - * Deviance of (y, mu) pair. - * Deviance is usually defined as twice the loglikelihood ratio. - */ + /** Deviance of (y, mu) pair. */ def deviance(y: Double, mu: Double, weight: Double): Double - def aic(predictions: RDD[(Double, Double, Double)], deviance: Double, weightSum: Double): Double + /** Akaike's 'An Information Criterion'(AIC) value of the family. */ + def aic(predictions: RDD[(Double, Double, Double)], deviance: Double, + numInstances: Double, weightSum: Double): Double /** Trim the fitted value so that it will be in valid range. */ def project(mu: Double): Double = mu @@ -383,10 +382,13 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine weight * math.pow(y - mu, 2.0) } - override def aic(predictions: RDD[(Double, Double, Double)], deviance: Double, weightSum: Double): Double = { - val nobs = predictions.count() + override def aic( + predictions: RDD[(Double, Double, Double)], + deviance: Double, + numInstances: Double, + weightSum: Double): Double = { val wt = predictions.map(x => math.log(x._3)).sum() - nobs * (math.log(deviance / nobs * 2.0 * 3.141593) + 1.0) + 2.0 - wt + numInstances * (math.log(deviance / numInstances * 2.0 * math.Pi) + 1.0) + 2.0 - wt } override def project(mu: Double): Double = { @@ -419,13 +421,18 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine override def deviance(y: Double, mu: Double, weight: Double): Double = { val my = 1.0 - y - 2.0 * weight * (y * math.log(math.max(y, 1.0) / mu) + my * math.log(math.max(my, 1.0) / (1.0 - mu))) + 2.0 * weight * (y * math.log(math.max(y, 1.0) / mu) + + my * math.log(math.max(my, 1.0) / (1.0 - mu))) } - override def aic(predictions: RDD[(Double, Double, Double)], deviance: Double, weightSum: Double): Double = { - predictions.map { case (y: Double, mu: Double, weight: Double) => + override def aic( + predictions: RDD[(Double, Double, Double)], + deviance: Double, + numInstances: Double, + weightSum: Double): Double = { + -2.0 * predictions.map { case (y: Double, mu: Double, weight: Double) => weight * breeze.stats.distributions.Binomial(1, mu).logProbabilityOf(math.round(y).toInt) - }.sum() * (-2.0) + }.sum() } override def project(mu: Double): Double = { @@ -459,10 +466,14 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine 2 * weight * (y * math.log(y / mu) - (y - mu)) } - override def aic(predictions: RDD[(Double, Double, Double)], deviance: Double, weightSum: Double): Double = { - predictions.map { case (y: Double, mu: Double, weight: Double) => + override def aic( + predictions: RDD[(Double, Double, Double)], + deviance: Double, + numInstances: Double, + weightSum: Double): Double = { + -2.0 * predictions.map { case (y: Double, mu: Double, weight: Double) => weight * breeze.stats.distributions.Poisson(mu).logProbabilityOf(y.toInt) - }.sum() * (-2.0) + }.sum() } override def project(mu: Double): Double = { @@ -497,11 +508,15 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine -2.0 * weight * (math.log(x) - (y - mu)/mu) } - override def aic(predictions: RDD[(Double, Double, Double)], deviance: Double, weightSum: Double): Double = { + override def aic( + predictions: RDD[(Double, Double, Double)], + deviance: Double, + numInstances: Double, + weightSum: Double): Double = { val disp = deviance / weightSum - predictions.map { case (y: Double, mu: Double, weight: Double) => + -2.0 * predictions.map { case (y: Double, mu: Double, weight: Double) => weight * breeze.stats.distributions.Gamma(1.0 / disp, mu * disp).logPdf(y) - }.sum() * (-2.0) + 2.0 + }.sum() + 2.0 } override def project(mu: Double): Double = { @@ -647,6 +662,11 @@ class GeneralizedLinearRegressionModel private[ml] ( private var trainingSummary: Option[GeneralizedLinearRegressionSummary] = None + /** + * Gets R-like summary of model on training set. An exception is + * thrown if `trainingSummary == None`. + */ + @Since("2.0.0") def summary: GeneralizedLinearRegressionSummary = trainingSummary match { case Some(summ) => summ case None => @@ -789,32 +809,27 @@ class GeneralizedLinearRegressionSummary private[regression] ( private lazy val devianceResiduals: DataFrame = { val drUDF = udf { (y: Double, mu: Double, weight: Double) => val r = math.sqrt(math.max(family.deviance(y, mu, weight), 0.0)) - if (y > mu) { - r - } else { - -1.0 * r - } + if (y > mu) r else -1.0 * r } val w = if (model.getWeightCol.isEmpty) lit(1.0) else col(model.getWeightCol) - predictions.select(drUDF(col(model.getLabelCol), col(predictionCol), w).as("devianceResiduals")) + predictions.select( + drUDF(col(model.getLabelCol), col(predictionCol), w).as("devianceResiduals")) } private lazy val pearsonResiduals: DataFrame = { val prUDF = udf { mu: Double => family.variance(mu) } val w = if (model.getWeightCol.isEmpty) lit(1.0) else col(model.getWeightCol) - predictions.select(col(model.getLabelCol).minus(col(model.getPredictionCol)) - .multiply(sqrt(w)).divide(sqrt(prUDF(col(model.getPredictionCol)))).as("pearsonResiduals")) + predictions.select(col(model.getLabelCol).minus(col(predictionCol)) + .multiply(sqrt(w)).divide(sqrt(prUDF(col(predictionCol)))).as("pearsonResiduals")) } private lazy val workingResiduals: DataFrame = { - val wrUDF = udf { (y: Double, mu: Double) => - (y - mu) * link.deriv(mu) - } + val wrUDF = udf { (y: Double, mu: Double) => (y - mu) * link.deriv(mu) } predictions.select(wrUDF(col(model.getLabelCol), col(predictionCol)).as("workingResiduals")) } private lazy val responseResiduals: DataFrame = { - predictions.select(col(model.getLabelCol).minus(col(model.getPredictionCol)).as("responseResiduals")) + predictions.select(col(model.getLabelCol).minus(col(predictionCol)).as("responseResiduals")) } /** @@ -829,7 +844,7 @@ class GeneralizedLinearRegressionSummary private[regression] ( case "working" => workingResiduals case "response" => responseResiduals case other => throw new UnsupportedOperationException( - s"The residuals type $other is not supported by Generialized Linear Regression.") + s"The residuals type $other is not supported by Generalized Linear Regression.") } } @@ -838,15 +853,12 @@ class GeneralizedLinearRegressionSummary private[regression] ( */ lazy val nullDeviance: Double = { val w = if (model.getWeightCol.isEmpty) lit(1.0) else col(model.getWeightCol) - val wtdmu: Double = if (model.getFitIntercept) { - val t1 = predictions.select(sum(w.multiply(col(model.getLabelCol)))).first().getDouble(0) - val t2 = predictions.select(sum(w)).first().getDouble(0) - t1 / t2 + val agg = predictions.agg(sum(w.multiply(col(model.getLabelCol))), sum(w)).first() + agg.getDouble(0) / agg.getDouble(1) } else { link.unlink(0.0) } - predictions.select(col(model.getLabelCol), w).rdd.map { case Row(y: Double, weight: Double) => family.deviance(y, wtdmu, weight) @@ -874,63 +886,48 @@ class GeneralizedLinearRegressionSummary private[regression] ( model.getFamily == Binomial.name || model.getFamily == Poisson.name) { 1.0 } else { - val w = if (model.getWeightCol.isEmpty) lit(1.0) else col(model.getWeightCol) - val rss = predictions.select(col(model.getPredictionCol), col(model.getLabelCol), w).rdd.map { - case Row(pred: Double, label: Double, weight: Double) => - math.pow(label - pred, 2.0) / family.variance(pred) * weight - }.sum() - + val rss = pearsonResiduals.agg(sum(pow(col("pearsonResiduals"), 2.0))).first().getDouble(0) rss / degreesOfFreedom } - /** Akaike's "An Information Criterion" for the fitted model */ + /** Akaike's "An Information Criterion"(AIC) for the fitted model */ lazy val aic: Double = { val w = if (model.getWeightCol.isEmpty) lit(1.0) else col(model.getWeightCol) val weightSum = predictions.select(w).agg(sum(w)).first().getAs[Double](0) - val t = predictions.select(col(model.getPredictionCol), col(model.getLabelCol), w).rdd.map { - case Row(pred: Double, label: Double, weight: Double) => + val t = predictions.select(col(model.getLabelCol), col(predictionCol), w).rdd.map { + case Row(label: Double, pred: Double, weight: Double) => (label, pred, weight) } - family.aic(t, deviance, weightSum) + 2 * rank + family.aic(t, deviance, numInstances, weightSum) + 2 * rank } /** * Standard error of estimated coefficients and intercept. */ lazy val coefficientStandardErrors: Array[Double] = { - diagInvAtWA.map(_ * dispersion).map(math.sqrt(_)) + diagInvAtWA.map(_ * dispersion).map(math.sqrt) } /** * T-statistic of estimated coefficients and intercept. */ lazy val tValues: Array[Double] = { - if (diagInvAtWA.length == 1 && diagInvAtWA(0) == 0) { - throw new UnsupportedOperationException( - "No t-statistic available for this LinearRegressionModel") + val estimate = if (model.getFitIntercept) { + Array.concat(model.coefficients.toArray, Array(model.intercept)) } else { - val estimate = if (model.getFitIntercept) { - Array.concat(model.coefficients.toArray, Array(model.intercept)) - } else { - model.coefficients.toArray - } - estimate.zip(coefficientStandardErrors).map { x => x._1 / x._2 } + model.coefficients.toArray } + estimate.zip(coefficientStandardErrors).map { x => x._1 / x._2 } } /** * Two-sided p-value of estimated coefficients and intercept. */ lazy val pValues: Array[Double] = { - if (diagInvAtWA.length == 1 && diagInvAtWA(0) == 0) { - throw new UnsupportedOperationException( - "No p-value available for this LinearRegressionModel") + if (model.getFamily == Binomial.name || model.getFamily == Poisson.name) { + tValues.map { x => 2.0 * (1.0 - GD(0.0, 1.0).cdf(math.abs(x))) } } else { - if (model.getFamily == Binomial.name || model.getFamily == Poisson.name) { - tValues.map { x => 2.0 * (1.0 - GD(0.0, 1.0).cdf(math.abs(x))) } - } else { - tValues.map { x => 2.0 * (1.0 - StudentsT(degreesOfFreedom.toDouble).cdf(math.abs(x))) } - } + tValues.map { x => 2.0 * (1.0 - StudentsT(degreesOfFreedom.toDouble).cdf(math.abs(x))) } } } } From fba11123984af8b0e2482281e39c60df59170306 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Mon, 14 Mar 2016 17:37:44 +0800 Subject: [PATCH 4/7] fix typos --- .../spark/ml/regression/GeneralizedLinearRegressionSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala index c7ba2cc3be70..a2df2f491d31 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala @@ -725,7 +725,7 @@ class GeneralizedLinearRegressionSuite V1 3.3241 1.0184 3.264 0.00110 ** V2 -1.0818 0.3522 -3.071 0.00213 ** --- - Signif. codes: 0 ‘***’ 0.001 ‘**’ 0.01 ‘*’ 0.05 ‘.’ 0.1 ‘ ’ 1 + Signif. codes: 0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1 (Dispersion parameter for poisson family taken to be 1) From fd55fd2a21aff73c571d2ad15419c1ec5fe31820 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Tue, 15 Mar 2016 18:42:40 +0800 Subject: [PATCH 5/7] Address comments --- .../IterativelyReweightedLeastSquares.scala | 4 +- .../GeneralizedLinearRegression.scala | 77 ++++++++++--------- .../GeneralizedLinearRegressionSuite.scala | 32 ++++---- 3 files changed, 60 insertions(+), 53 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala index edb8166d95ae..fe82324ab25c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala @@ -107,7 +107,7 @@ private[ml] class IterativelyReweightedLeastSquares( } - new IterativelyReweightedLeastSquaresModel(model.coefficients, - model.intercept, model.diagInvAtWA, iter) + new IterativelyReweightedLeastSquaresModel( + model.coefficients, model.intercept, model.diagInvAtWA, iter) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 3f14c3d4585a..69fe3556168d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -17,7 +17,7 @@ package org.apache.spark.ml.regression -import breeze.stats.distributions.{Gaussian => GD, StudentsT} +import breeze.stats.{distributions => dist} import org.apache.hadoop.fs.Path import org.apache.spark.{Logging, SparkException} @@ -222,8 +222,6 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val val trainingSummary = new GeneralizedLinearRegressionSummary( summaryModel.transform(dataset), predictionColName, - familyObj, - linkObj, model, wlsModel.diagInvAtWA.toArray, 1) @@ -244,8 +242,6 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val val trainingSummary = new GeneralizedLinearRegressionSummary( summaryModel.transform(dataset), predictionColName, - familyObj, - linkObj, model, irlsModel.diagInvAtWA.toArray, irlsModel.numIterations) @@ -342,9 +338,18 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine /** Deviance of (y, mu) pair. */ def deviance(y: Double, mu: Double, weight: Double): Double - /** Akaike's 'An Information Criterion'(AIC) value of the family. */ - def aic(predictions: RDD[(Double, Double, Double)], deviance: Double, - numInstances: Double, weightSum: Double): Double + /** + * Akaike's 'An Information Criterion'(AIC) value of the family. + * @param predictions an RDD of (y, mu, weight) of instances + * @param deviance the deviance for the fitted model + * @param numInstances number of instances in DataFrame predictions + * @param weightSum weights sum of instances in DataFrame predictions + */ + def aic( + predictions: RDD[(Double, Double, Double)], + deviance: Double, + numInstances: Double, + weightSum: Double): Double /** Trim the fitted value so that it will be in valid range. */ def project(mu: Double): Double = mu @@ -379,7 +384,7 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine override def variance(mu: Double): Double = 1.0 override def deviance(y: Double, mu: Double, weight: Double): Double = { - weight * math.pow(y - mu, 2.0) + weight * (y - mu) * (y - mu) } override def aic( @@ -431,7 +436,7 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine numInstances: Double, weightSum: Double): Double = { -2.0 * predictions.map { case (y: Double, mu: Double, weight: Double) => - weight * breeze.stats.distributions.Binomial(1, mu).logProbabilityOf(math.round(y).toInt) + weight * dist.Binomial(1, mu).logProbabilityOf(math.round(y).toInt) }.sum() } @@ -463,7 +468,7 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine override def variance(mu: Double): Double = mu override def deviance(y: Double, mu: Double, weight: Double): Double = { - 2 * weight * (y * math.log(y / mu) - (y - mu)) + 2.0 * weight * (y * math.log(y / mu) - (y - mu)) } override def aic( @@ -472,7 +477,7 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine numInstances: Double, weightSum: Double): Double = { -2.0 * predictions.map { case (y: Double, mu: Double, weight: Double) => - weight * breeze.stats.distributions.Poisson(mu).logProbabilityOf(y.toInt) + weight * dist.Poisson(mu).logProbabilityOf(y.toInt) }.sum() } @@ -501,11 +506,10 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine y } - override def variance(mu: Double): Double = math.pow(mu, 2.0) + override def variance(mu: Double): Double = mu * mu override def deviance(y: Double, mu: Double, weight: Double): Double = { - val x = if (y == 0.0) 1.0 else y / mu - -2.0 * weight * (math.log(x) - (y - mu)/mu) + -2.0 * weight * (math.log(y / mu) - (y - mu)/mu) } override def aic( @@ -515,7 +519,7 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine weightSum: Double): Double = { val disp = deviance / weightSum -2.0 * predictions.map { case (y: Double, mu: Double, weight: Double) => - weight * breeze.stats.distributions.Gamma(1.0 / disp, mu * disp).logPdf(y) + weight * dist.Gamma(1.0 / disp, mu * disp).logPdf(y) }.sum() + 2.0 } @@ -606,11 +610,13 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine private[ml] object Probit extends Link("probit") { - override def link(mu: Double): Double = GD(0.0, 1.0).icdf(mu) + override def link(mu: Double): Double = dist.Gaussian(0.0, 1.0).icdf(mu) - override def deriv(mu: Double): Double = 1.0 / GD(0.0, 1.0).pdf(GD(0.0, 1.0).icdf(mu)) + override def deriv(mu: Double): Double = { + 1.0 / dist.Gaussian(0.0, 1.0).pdf(dist.Gaussian(0.0, 1.0).icdf(mu)) + } - override def unlink(eta: Double): Double = GD(0.0, 1.0).cdf(eta) + override def unlink(eta: Double): Double = dist.Gaussian(0.0, 1.0).cdf(eta) } private[ml] object CLogLog extends Link("cloglog") { @@ -628,7 +634,7 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine override def deriv(mu: Double): Double = 1.0 / (2.0 * math.sqrt(mu)) - override def unlink(eta: Double): Double = math.pow(eta, 2.0) + override def unlink(eta: Double): Double = eta * eta } } @@ -667,12 +673,10 @@ class GeneralizedLinearRegressionModel private[ml] ( * thrown if `trainingSummary == None`. */ @Since("2.0.0") - def summary: GeneralizedLinearRegressionSummary = trainingSummary match { - case Some(summ) => summ - case None => - throw new SparkException( - "No training summary available for this GeneralizedLinearRegressionModel", - new NullPointerException()) + def summary: GeneralizedLinearRegressionSummary = trainingSummary.getOrElse { + throw new SparkException( + "No training summary available for this GeneralizedLinearRegressionModel", + new RuntimeException()) } private[regression] def setSummary(summary: GeneralizedLinearRegressionSummary): this.type = { @@ -685,8 +689,8 @@ class GeneralizedLinearRegressionModel private[ml] ( * otherwise generates a new column and sets it as the prediction column on a new copy * of the current model. */ - private[regression] def findSummaryModelAndPredictionCol(): ( - GeneralizedLinearRegressionModel, String) = { + private[regression] def findSummaryModelAndPredictionCol() + : (GeneralizedLinearRegressionModel, String) = { $(predictionCol) match { case "" => val predictionColName = "prediction_" + java.util.UUID.randomUUID.toString() @@ -762,8 +766,6 @@ object GeneralizedLinearRegressionModel extends MLReadable[GeneralizedLinearRegr * * @param predictions predictions outputted by the model's `transform` method * @param predictionCol field in "predictions" which gives the prediction value of each instance - * @param family the family object of the model - * @param link the link object of the model * @param model the model that should be summarized * @param diagInvAtWA diagonal of matrix (A^T * W * A)^-1 in the last iteration * @param numIterations number of iterations @@ -773,14 +775,19 @@ object GeneralizedLinearRegressionModel extends MLReadable[GeneralizedLinearRegr class GeneralizedLinearRegressionSummary private[regression] ( @transient val predictions: DataFrame, val predictionCol: String, - val family: GeneralizedLinearRegression.Family, - val link: GeneralizedLinearRegression.Link, val model: GeneralizedLinearRegressionModel, private val diagInvAtWA: Array[Double], val numIterations: Int) extends Serializable { import GeneralizedLinearRegression._ + lazy val family = Family.fromName(model.getFamily) + lazy val link = if (model.isDefined(model.getParam("link"))) { + Link.fromName(model.getLink) + } else { + family.defaultLink + } + /** Number of instances in DataFrame predictions */ lazy val numInstances: Long = predictions.count() @@ -893,7 +900,7 @@ class GeneralizedLinearRegressionSummary private[regression] ( /** Akaike's "An Information Criterion"(AIC) for the fitted model */ lazy val aic: Double = { val w = if (model.getWeightCol.isEmpty) lit(1.0) else col(model.getWeightCol) - val weightSum = predictions.select(w).agg(sum(w)).first().getAs[Double](0) + val weightSum = predictions.select(w).agg(sum(w)).first().getDouble(0) val t = predictions.select(col(model.getLabelCol), col(predictionCol), w).rdd.map { case Row(label: Double, pred: Double, weight: Double) => (label, pred, weight) @@ -925,9 +932,9 @@ class GeneralizedLinearRegressionSummary private[regression] ( */ lazy val pValues: Array[Double] = { if (model.getFamily == Binomial.name || model.getFamily == Poisson.name) { - tValues.map { x => 2.0 * (1.0 - GD(0.0, 1.0).cdf(math.abs(x))) } + tValues.map { x => 2.0 * (1.0 - dist.Gaussian(0.0, 1.0).cdf(math.abs(x))) } } else { - tValues.map { x => 2.0 * (1.0 - StudentsT(degreesOfFreedom.toDouble).cdf(math.abs(x))) } + tValues.map { x => 2.0 * (1.0 - dist.StudentsT(degreesOfFreedom.toDouble).cdf(math.abs(x))) } } } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala index a2df2f491d31..6d570f7bde07 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala @@ -545,19 +545,19 @@ class GeneralizedLinearRegressionSuite val devianceResiduals = summary.residuals() .select(col("devianceResiduals")) .collect() - .map(_.getAs[Double](0)) + .map(_.getDouble(0)) val pearsonResiduals = summary.residuals("pearson") .select(col("pearsonResiduals")) .collect() - .map(_.getAs[Double](0)) + .map(_.getDouble(0)) val workingResiduals = summary.residuals("working") .select(col("workingResiduals")) .collect() - .map(_.getAs[Double](0)) + .map(_.getDouble(0)) val responseResiduals = summary.residuals("response") .select(col("responseResiduals")) .collect() - .map(_.getAs[Double](0)) + .map(_.getDouble(0)) assert(model.coefficients ~== coefficientsR absTol 1E-3) assert(model.intercept ~== interceptR absTol 1E-3) @@ -658,19 +658,19 @@ class GeneralizedLinearRegressionSuite val devianceResiduals = summary.residuals() .select(col("devianceResiduals")) .collect() - .map(_.getAs[Double](0)) + .map(_.getDouble(0)) val pearsonResiduals = summary.residuals("pearson") .select(col("pearsonResiduals")) .collect() - .map(_.getAs[Double](0)) + .map(_.getDouble(0)) val workingResiduals = summary.residuals("working") .select(col("workingResiduals")) .collect() - .map(_.getAs[Double](0)) + .map(_.getDouble(0)) val responseResiduals = summary.residuals("response") .select(col("responseResiduals")) .collect() - .map(_.getAs[Double](0)) + .map(_.getDouble(0)) assert(model.coefficients ~== coefficientsR absTol 1E-3) assert(model.intercept ~== interceptR absTol 1E-3) @@ -774,19 +774,19 @@ class GeneralizedLinearRegressionSuite val devianceResiduals = summary.residuals() .select(col("devianceResiduals")) .collect() - .map(_.getAs[Double](0)) + .map(_.getDouble(0)) val pearsonResiduals = summary.residuals("pearson") .select(col("pearsonResiduals")) .collect() - .map(_.getAs[Double](0)) + .map(_.getDouble(0)) val workingResiduals = summary.residuals("working") .select(col("workingResiduals")) .collect() - .map(_.getAs[Double](0)) + .map(_.getDouble(0)) val responseResiduals = summary.residuals("response") .select(col("responseResiduals")) .collect() - .map(_.getAs[Double](0)) + .map(_.getDouble(0)) assert(model.coefficients ~== coefficientsR absTol 1E-3) assert(model.intercept ~== interceptR absTol 1E-3) @@ -887,19 +887,19 @@ class GeneralizedLinearRegressionSuite val devianceResiduals = summary.residuals() .select(col("devianceResiduals")) .collect() - .map(_.getAs[Double](0)) + .map(_.getDouble(0)) val pearsonResiduals = summary.residuals("pearson") .select(col("pearsonResiduals")) .collect() - .map(_.getAs[Double](0)) + .map(_.getDouble(0)) val workingResiduals = summary.residuals("working") .select(col("workingResiduals")) .collect() - .map(_.getAs[Double](0)) + .map(_.getDouble(0)) val responseResiduals = summary.residuals("response") .select(col("responseResiduals")) .collect() - .map(_.getAs[Double](0)) + .map(_.getDouble(0)) assert(model.coefficients ~== coefficientsR absTol 1E-3) assert(model.intercept ~== interceptR absTol 1E-3) From 49fcb71c33101ee1c47a81c667484dd8426d131c Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Wed, 16 Mar 2016 11:45:30 +0800 Subject: [PATCH 6/7] update docs --- .../GeneralizedLinearRegression.scala | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 69fe3556168d..929b54d042b0 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -339,11 +339,11 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine def deviance(y: Double, mu: Double, weight: Double): Double /** - * Akaike's 'An Information Criterion'(AIC) value of the family. - * @param predictions an RDD of (y, mu, weight) of instances - * @param deviance the deviance for the fitted model - * @param numInstances number of instances in DataFrame predictions - * @param weightSum weights sum of instances in DataFrame predictions + * Akaike's 'An Information Criterion'(AIC) value of the family for a given dataset. + * @param predictions an RDD of (y, mu, weight) of instances in evaluation dataset + * @param deviance the deviance for the fitted model in evaluation dataset + * @param numInstances number of instances in evaluation dataset + * @param weightSum weights sum of instances in evaluation dataset */ def aic( predictions: RDD[(Double, Double, Double)], @@ -839,12 +839,17 @@ class GeneralizedLinearRegressionSummary private[regression] ( predictions.select(col(model.getLabelCol).minus(col(predictionCol)).as("responseResiduals")) } + /** + * Get the default residuals(deviance residuals) of the fitted model. + */ + def residuals(): DataFrame = devianceResiduals + /** * Get the residuals of the fitted model by type. * @param residualsType The type of residuals which should be returned. - * Supported options: deviance(default), pearson, working and response. + * Supported options: deviance, pearson, working and response. */ - def residuals(residualsType: String = "deviance"): DataFrame = { + def residuals(residualsType: String): DataFrame = { residualsType match { case "deviance" => devianceResiduals case "pearson" => pearsonResiduals @@ -897,7 +902,7 @@ class GeneralizedLinearRegressionSummary private[regression] ( rss / degreesOfFreedom } - /** Akaike's "An Information Criterion"(AIC) for the fitted model */ + /** Akaike's "An Information Criterion"(AIC) for the fitted model. */ lazy val aic: Double = { val w = if (model.getWeightCol.isEmpty) lit(1.0) else col(model.getWeightCol) val weightSum = predictions.select(w).agg(sum(w)).first().getDouble(0) From f89cdf01d94faab7d6f9372df35033afe695f8aa Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Wed, 16 Mar 2016 12:03:47 +0800 Subject: [PATCH 7/7] add Since --- .../GeneralizedLinearRegression.scala | 27 ++++++++++++++----- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 929b54d042b0..b4e47c80735c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -773,25 +773,26 @@ object GeneralizedLinearRegressionModel extends MLReadable[GeneralizedLinearRegr @Since("2.0.0") @Experimental class GeneralizedLinearRegressionSummary private[regression] ( - @transient val predictions: DataFrame, - val predictionCol: String, - val model: GeneralizedLinearRegressionModel, + @Since("2.0.0") @transient val predictions: DataFrame, + @Since("2.0.0") val predictionCol: String, + @Since("2.0.0") val model: GeneralizedLinearRegressionModel, private val diagInvAtWA: Array[Double], - val numIterations: Int) extends Serializable { + @Since("2.0.0") val numIterations: Int) extends Serializable { import GeneralizedLinearRegression._ - lazy val family = Family.fromName(model.getFamily) - lazy val link = if (model.isDefined(model.getParam("link"))) { + private lazy val family = Family.fromName(model.getFamily) + private lazy val link = if (model.isDefined(model.getParam("link"))) { Link.fromName(model.getLink) } else { family.defaultLink } /** Number of instances in DataFrame predictions */ - lazy val numInstances: Long = predictions.count() + private lazy val numInstances: Long = predictions.count() /** The numeric rank of the fitted linear model */ + @Since("2.0.0") lazy val rank: Long = if (model.getFitIntercept) { model.coefficients.size + 1 } else { @@ -799,14 +800,17 @@ class GeneralizedLinearRegressionSummary private[regression] ( } /** Degrees of freedom */ + @Since("2.0.0") lazy val degreesOfFreedom: Long = { numInstances - rank } /** The residual degrees of freedom */ + @Since("2.0.0") lazy val residualDegreeOfFreedom: Long = degreesOfFreedom /** The residual degrees of freedom for the null model */ + @Since("2.0.0") lazy val residualDegreeOfFreedomNull: Long = if (model.getFitIntercept) { numInstances - 1 } else { @@ -842,6 +846,7 @@ class GeneralizedLinearRegressionSummary private[regression] ( /** * Get the default residuals(deviance residuals) of the fitted model. */ + @Since("2.0.0") def residuals(): DataFrame = devianceResiduals /** @@ -849,6 +854,7 @@ class GeneralizedLinearRegressionSummary private[regression] ( * @param residualsType The type of residuals which should be returned. * Supported options: deviance, pearson, working and response. */ + @Since("2.0.0") def residuals(residualsType: String): DataFrame = { residualsType match { case "deviance" => devianceResiduals @@ -863,6 +869,7 @@ class GeneralizedLinearRegressionSummary private[regression] ( /** * The deviance for the null model. */ + @Since("2.0.0") lazy val nullDeviance: Double = { val w = if (model.getWeightCol.isEmpty) lit(1.0) else col(model.getWeightCol) val wtdmu: Double = if (model.getFitIntercept) { @@ -880,6 +887,7 @@ class GeneralizedLinearRegressionSummary private[regression] ( /** * The deviance for the fitted model. */ + @Since("2.0.0") lazy val deviance: Double = { val w = if (model.getWeightCol.isEmpty) lit(1.0) else col(model.getWeightCol) predictions.select(col(model.getLabelCol), col(predictionCol), w).rdd.map { @@ -894,6 +902,7 @@ class GeneralizedLinearRegressionSummary private[regression] ( * estimated by the residual Pearson's Chi-Squared statistic(which is defined as * sum of the squares of the Pearson residuals) divided by the residual degrees of freedom. */ + @Since("2.0.0") lazy val dispersion: Double = if ( model.getFamily == Binomial.name || model.getFamily == Poisson.name) { 1.0 @@ -903,6 +912,7 @@ class GeneralizedLinearRegressionSummary private[regression] ( } /** Akaike's "An Information Criterion"(AIC) for the fitted model. */ + @Since("2.0.0") lazy val aic: Double = { val w = if (model.getWeightCol.isEmpty) lit(1.0) else col(model.getWeightCol) val weightSum = predictions.select(w).agg(sum(w)).first().getDouble(0) @@ -916,6 +926,7 @@ class GeneralizedLinearRegressionSummary private[regression] ( /** * Standard error of estimated coefficients and intercept. */ + @Since("2.0.0") lazy val coefficientStandardErrors: Array[Double] = { diagInvAtWA.map(_ * dispersion).map(math.sqrt) } @@ -923,6 +934,7 @@ class GeneralizedLinearRegressionSummary private[regression] ( /** * T-statistic of estimated coefficients and intercept. */ + @Since("2.0.0") lazy val tValues: Array[Double] = { val estimate = if (model.getFitIntercept) { Array.concat(model.coefficients.toArray, Array(model.intercept)) @@ -935,6 +947,7 @@ class GeneralizedLinearRegressionSummary private[regression] ( /** * Two-sided p-value of estimated coefficients and intercept. */ + @Since("2.0.0") lazy val pValues: Array[Double] = { if (model.getFamily == Binomial.name || model.getFamily == Poisson.name) { tValues.map { x => 2.0 * (1.0 - dist.Gaussian(0.0, 1.0).cdf(math.abs(x))) }