Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
create pr
  • Loading branch information
zhengruifeng committed Oct 15, 2019
commit b97e8e855538fe01509a8090d3d7609d2ed0272c
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,23 @@
package org.apache.spark.ml.evaluation

import org.apache.spark.annotation.Since
import org.apache.spark.ml.param.{DoubleParam, Param, ParamMap, ParamValidators}
import org.apache.spark.ml.param.shared.{HasLabelCol, HasPredictionCol, HasWeightCol}
import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable, SchemaUtils}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DoubleType

/**
* Evaluator for multiclass classification, which expects two input columns: prediction and label.
* Evaluator for multiclass classification, which expects input columns: prediction, label,
* weight(optional) and probabilityCol(only for log-loss).
*/
@Since("1.5.0")
class MulticlassClassificationEvaluator @Since("1.5.0") (@Since("1.5.0") override val uid: String)
extends Evaluator with HasPredictionCol with HasLabelCol
with HasWeightCol with DefaultParamsWritable {
extends Evaluator with HasPredictionCol with HasLabelCol with HasWeightCol
with HasProbabilityCol with DefaultParamsWritable {

import MulticlassClassificationEvaluator.supportedMetricNames

Expand Down Expand Up @@ -71,6 +73,10 @@ class MulticlassClassificationEvaluator @Since("1.5.0") (@Since("1.5.0") overrid
@Since("3.0.0")
def setWeightCol(value: String): this.type = set(weightCol, value)

/** @group setParam */
@Since("3.0.0")
def setProbabilityCol(value: String): this.type = set(probabilityCol, value)

@Since("3.0.0")
final val metricLabel: DoubleParam = new DoubleParam(this, "metricLabel",
"The class whose metric will be computed in " +
Expand Down Expand Up @@ -104,20 +110,51 @@ class MulticlassClassificationEvaluator @Since("1.5.0") (@Since("1.5.0") overrid

setDefault(beta -> 1.0)

@Since("3.0.0")
final val eps: DoubleParam = new DoubleParam(this, "eps",
"Log loss is undefined for p=0 or p=1, so probabilities are clipped to " +
"max(eps, min(1 - eps, p)).",
ParamValidators.inRange(0, 0.5, false, false))

/** @group getParam */
@Since("3.0.0")
def getEps: Double = $(eps)

/** @group setParam */
@Since("3.0.0")
def setEps(value: Double): this.type = set(eps, value)

setDefault(eps -> 1e-15)

@Since("2.0.0")
override def evaluate(dataset: Dataset[_]): Double = {
val schema = dataset.schema
SchemaUtils.checkColumnType(schema, $(predictionCol), DoubleType)
SchemaUtils.checkNumericType(schema, $(labelCol))

val predictionAndLabelsWithWeights =
dataset.select(col($(predictionCol)), col($(labelCol)).cast(DoubleType),
if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol)))
val w = if (isDefined(weightCol) && $(weightCol).nonEmpty) {
col($(weightCol)).cast(DoubleType)
} else {
lit(1.0)
}

val rdd = if ($(metricName) == "logloss") {
// probabilityCol is only needed to compute logloss
require(isDefined(probabilityCol) && $(probabilityCol).nonEmpty)
val p = DatasetUtils.columnToVector(dataset, $(probabilityCol))
dataset.select(col($(predictionCol)), col($(labelCol)).cast(DoubleType), w, p)
.rdd.map {
case Row(prediction: Double, label: Double, weight: Double, probability: Vector) =>
(prediction, label, weight, probability.toArray)
}
} else {
dataset.select(col($(predictionCol)), col($(labelCol)).cast(DoubleType), w)
.rdd.map {
case Row(prediction: Double, label: Double, weight: Double) => (prediction, label, weight)
}
val metrics = new MulticlassMetrics(predictionAndLabelsWithWeights)
}

val metrics = new MulticlassMetrics(rdd)
$(metricName) match {
case "f1" => metrics.weightedFMeasure
case "accuracy" => metrics.accuracy
Expand All @@ -131,16 +168,14 @@ class MulticlassClassificationEvaluator @Since("1.5.0") (@Since("1.5.0") overrid
case "precisionByLabel" => metrics.precision($(metricLabel))
case "recallByLabel" => metrics.recall($(metricLabel))
case "fMeasureByLabel" => metrics.fMeasure($(metricLabel), $(beta))
case "logloss" => metrics.logloss($(eps))
}
}

@Since("1.5.0")
override def isLargerBetter: Boolean = {
$(metricName) match {
case "weightedFalsePositiveRate" => false
case "falsePositiveRateByLabel" => false
case _ => true
}
override def isLargerBetter: Boolean = $(metricName) match {
case "weightedFalsePositiveRate" | "falsePositiveRateByLabel" | "logloss" => false
case _ => true
}

@Since("1.5.0")
Expand All @@ -154,7 +189,7 @@ object MulticlassClassificationEvaluator
private val supportedMetricNames = Array("f1", "accuracy", "weightedPrecision", "weightedRecall",
"weightedTruePositiveRate", "weightedFalsePositiveRate", "weightedFMeasure",
"truePositiveRateByLabel", "falsePositiveRateByLabel", "precisionByLabel", "recallByLabel",
"fMeasureByLabel")
"fMeasureByLabel", "logloss")

@Since("1.6.0")
override def load(path: String): MulticlassClassificationEvaluator = super.load(path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import org.apache.spark.sql.{DataFrame, Row}
/**
* Evaluator for multiclass classification.
*
* @param predictionAndLabels an RDD of (prediction, label, weight) or
* (prediction, label) tuples.
* @param predictionAndLabels an RDD of (prediction, label, weight, probability) or
* (prediction, label, weight) or (prediction, label) tuples.
*/
@Since("1.1.0")
class MulticlassMetrics @Since("1.1.0") (predictionAndLabels: RDD[_ <: Product]) {
Expand All @@ -39,17 +39,18 @@ class MulticlassMetrics @Since("1.1.0") (predictionAndLabels: RDD[_ <: Product])
* @param predictionAndLabels a DataFrame with two double columns: prediction and label
*/
private[mllib] def this(predictionAndLabels: DataFrame) =
this(predictionAndLabels.rdd.map {
case Row(prediction: Double, label: Double, weight: Double) =>
(prediction, label, weight)
case Row(prediction: Double, label: Double) =>
(prediction, label, 1.0)
case other =>
throw new IllegalArgumentException(s"Expected Row of tuples, got $other")
this(predictionAndLabels.rdd.map { r =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

matching will not work in pyspark, so I have to use r.get instead.
MultilabelMetrics also deals with dataframe in this way.

r.size match {
case 2 => (r.getDouble(0), r.getDouble(1), 1.0, null)
case 3 => (r.getDouble(0), r.getDouble(1), r.getDouble(2), null)
case 4 => (r.getDouble(0), r.getDouble(1), r.getDouble(2), r.getSeq[Double](3).toArray)
case _ => throw new IllegalArgumentException(s"Expected Row of tuples, got $r")
}
})


private val confusions = predictionAndLabels.map {
private lazy val confusions = predictionAndLabels.map {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the metricName==logloss, then the confusion matrix is not needed, so I make this computation lazy.

case (prediction: Double, label: Double, weight: Double, _) =>
((label, prediction), weight)
case (prediction: Double, label: Double, weight: Double) =>
((label, prediction), weight)
case (prediction: Double, label: Double) =>
Expand Down Expand Up @@ -237,4 +238,38 @@ class MulticlassMetrics @Since("1.1.0") (predictionAndLabels: RDD[_ <: Product])
*/
@Since("1.1.0")
lazy val labels: Array[Double] = tpByClass.keys.toArray.sorted

/**
* Returns the log-loss, aka logistic loss or cross-entropy loss.
* @param eps Log loss is undefined for p=0 or p=1, so probabilities are
* clipped to max(eps, min(1 - eps, p)).
*/
@Since("3.0.0")
def logloss(eps: Double = 1e-15): Double = {
require(eps > 0 && eps < 0.5, s"eps must be in range (0, 0.5), but got $eps")
val loss1 = - math.log(eps)
val loss2 = - math.log(1 - eps)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

- math.log1p(-eps)? because eps is going to be very small


val (lossSum, weightSum) = predictionAndLabels.map {
case (prediction: Double, label: Double, weight: Double, probability: Array[Double]) =>
require(label.toInt == label && label >= 0, s"Invalid label $label")
require(probability != null, "probability of each class can not be null")
val p = probability(label.toInt)
val loss = if (p < eps) {
loss1
} else if (p > 1 - eps) {
loss2
} else {
- math.log(p)
}
(loss * weight, weight)

case other =>
throw new IllegalArgumentException(s"Expected quadruples, got $other")
}.treeReduce { case ((l1, w1), (l2, w2)) =>
(l1 + l2, w1 + w2)
}

lossSum / weightSum
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.ml.evaluation

import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.param.ParamsSuite
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
import org.apache.spark.mllib.util.MLlibTestSparkContext
Expand Down Expand Up @@ -60,4 +61,23 @@ class MulticlassClassificationEvaluatorSuite
.setMetricLabel(1.0)
assert(evaluator.evaluate(predictionAndLabels) ~== 3.0 / 4 absTol 1e-5)
}

test("MulticlassClassificationEvaluator support logloss") {
val labels = Seq(1.0, 2.0, 0.0, 1.0)
val probabilities = Seq(
Vectors.dense(0.1, 0.8, 0.1),
Vectors.dense(0.9, 0.05, 0.05),
Vectors.dense(0.8, 0.2, 0.0),
Vectors.dense(0.3, 0.65, 0.05))

val df = sc.parallelize(labels.zip(probabilities)).map {
case (label, probability) =>
val prediction = probability.argmax.toDouble
(prediction, label, probability)
}.toDF("prediction", "label", "probability")

val evaluator = new MulticlassClassificationEvaluator()
.setMetricName("logloss")
assert(evaluator.evaluate(df) ~== 0.9682005730687164 absTol 1e-5)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,82 @@ class MulticlassMetricsSuite extends SparkFunSuite with MLlibTestSparkContext {
(weight0 * f2measure0 + weight1 * f2measure1 + weight2 * f2measure2) relTol delta)
assert(metrics.labels === labels)
}

test("MulticlassMetrics supports binary class log-loss") {
/*
Using the following Python code to verify the correctness.

from sklearn.metrics import log_loss
labels = [1, 0, 0, 1]
probabilities = [[.1, .9], [.9, .1], [.8, .2], [.35, .65]]
weights = [1.5, 2.0, 1.0, 0.5]

>>> log_loss(y_true=labels, y_pred=probabilities, sample_weight=weights)
0.16145936283256573
>>> log_loss(y_true=labels, y_pred=probabilities)
0.21616187468057912
*/

val labels = Seq(1.0, 0.0, 0.0, 1.0)
val probabilities = Seq(
Array(0.1, 0.9),
Array(0.9, 0.1),
Array(0.8, 0.2),
Array(0.35, 0.65))
val weights = Seq(1.5, 2.0, 1.0, 0.5)

val rdd = sc.parallelize(labels.zip(weights).zip(probabilities)).map {
case ((label, weight), probability) =>
val prediction = probability.indexOf(probability.max).toDouble
(prediction, label, weight, probability)
}
val metrics = new MulticlassMetrics(rdd)
assert(metrics.logloss() ~== 0.16145936283256573 relTol delta)

val rdd2 = rdd.map {
case (prediction: Double, label: Double, weight: Double, probability: Array[Double]) =>
(prediction, label, 1.0, probability)
}
val metrics2 = new MulticlassMetrics(rdd2)
assert(metrics2.logloss() ~== 0.21616187468057912 relTol delta)
}

test("MulticlassMetrics supports multi-class log-loss") {
/*
Using the following Python code to verify the correctness.

from sklearn.metrics import log_loss
labels = [1, 2, 0, 1]
probabilities = [[.1, .8, .1], [.9, .05, .05], [.8, .2, .0], [.3, .65, .05]]
weights = [1.5, 2.0, 1.0, 0.5]

>>> log_loss(y_true=labels, y_pred=probabilities, sample_weight=weights)
1.3529429766879466
>>> log_loss(y_true=labels, y_pred=probabilities)
0.9682005730687164
*/

val labels = Seq(1.0, 2.0, 0.0, 1.0)
val probabilities = Seq(
Array(0.1, 0.8, 0.1),
Array(0.9, 0.05, 0.05),
Array(0.8, 0.2, 0.0),
Array(0.3, 0.65, 0.05))
val weights = Seq(1.5, 2.0, 1.0, 0.5)

val rdd = sc.parallelize(labels.zip(weights).zip(probabilities)).map {
case ((label, weight), probability) =>
val prediction = probability.indexOf(probability.max).toDouble
(prediction, label, weight, probability)
}
val metrics = new MulticlassMetrics(rdd)
assert(metrics.logloss() ~== 1.3529429766879466 relTol delta)

val rdd2 = rdd.map {
case (prediction: Double, label: Double, weight: Double, probability: Array[Double]) =>
(prediction, label, 1.0, probability)
}
val metrics2 = new MulticlassMetrics(rdd2)
assert(metrics2.logloss() ~== 0.9682005730687164 relTol delta)
}
}
Loading