Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,18 +131,18 @@ object BinaryClassification {
.setNumIterations(params.numIterations)
.setUpdater(updater)
.setRegParam(params.regParam)
algorithm.run(training).clearThreshold()
algorithm.run(training)
case SVM =>
val algorithm = new SVMWithSGD()
algorithm.optimizer
.setNumIterations(params.numIterations)
.setStepSize(params.stepSize)
.setUpdater(updater)
.setRegParam(params.regParam)
algorithm.run(training).clearThreshold()
algorithm.run(training)
}

val prediction = model.predict(test.map(_.features))
val prediction = model.predictClass(test.map(_.features))
val predictionAndLabel = prediction.zip(test.map(_.label))

val metrics = new BinaryClassificationMetrics(predictionAndLabel)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ object LinearRegression extends App {

val model = algorithm.run(training)

val prediction = model.predict(test.map(_.features))
val prediction = model.predictScore(test.map(_.features))
val predictionAndLabel = prediction.zip(test.map(_.label))

val loss = predictionAndLabel.map { case (p, l) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ object SparseNaiveBayes {

val model = new NaiveBayes().setLambda(params.lambda).run(training)

val prediction = model.predict(test.map(_.features))
val prediction = model.predictClass(test.map(_.features))
val predictionAndLabel = prediction.zip(test.map(_.label))
val accuracy = predictionAndLabel.filter(x => x._1 == x._2).count().toDouble / numTest

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.classification

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.regression.GeneralizedLinearModel
import org.apache.spark.api.java.JavaRDD

/**
* Represents a classification model that predicts to which of a set of categories an example
* belongs. The categories are represented by double values: 0.0, 1.0
*/
class BinaryClassificationModel (
override val weights: Vector,
override val intercept: Double)
extends GeneralizedLinearModel(weights, intercept) with ClassificationModel with Serializable {

protected var threshold: Double = 0.0

// this is only used to ensure prior behaviour of deprecated `predict``
protected var useThreshold: Boolean = true

/**
* Setter and getter for the threshold. The threshold separates positive predictions from
* negative predictions. An example with prediction score greater than or equal to this
* threshold is identified as an positive, and negative otherwise. The default value is 0.5.
*/
def setThreshold(threshold: Double): this.type = {
this.useThreshold = true
this.threshold = threshold
this
}

def getThreshold = threshold

private def compareWithThreshold(value: Double): Double =
if (value < threshold) 0.0 else 1.0

def predictClass(testData: RDD[Vector]): RDD[Double] = {
predictScore(testData).map(compareWithThreshold)
}

def predictClass(testData: Vector): Double = {
compareWithThreshold(predictScore(testData))
}

/**
* DEPRECATED: Use predictScore(...) or predictClass(...) instead
* Clears the threshold so that `predict` will output raw prediction scores.
*/
@Deprecated
def clearThreshold(): this.type = {
this.useThreshold = false
this
}

/**
* DEPRECATED: Use predictScore(...) or predictClass(...) instead
*/
@Deprecated
override protected def predictPoint(
dataMatrix: Vector,
weightMatrix: Vector,
intercept: Double) = {
if (useThreshold) predictClass(dataMatrix)
else predictScore(dataMatrix)
}

/**
* DEPRECATED: Use predictScore(...) or predictClass(...) instead
* Predict values for the given data set using the model trained.
*
* @param testData RDD representing data points to be predicted
* @return an RDD[Double] where each entry contains the corresponding prediction
*/
@Deprecated
override def predict(testData: RDD[Vector]): RDD[Double] = {
if (useThreshold) predictClass(testData)
else predictScore(testData)
}

/**
* DEPRECATED: Use predictScore(...) or predictClass(...) instead
* Predict values for a single data point using the model trained.
*
* @param testData array representing a single data point
* @return predicted category from the trained model
*/
@Deprecated
override def predict(testData: Vector): Double = {
if (useThreshold) predictClass(testData)
else predictScore(testData)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,26 @@ import org.apache.spark.rdd.RDD
@Experimental
trait ClassificationModel extends Serializable {
/**
* Predict values for the given data set using the model trained.
* Classify the given data set using the model trained.
*
* @param testData RDD representing data points to be predicted
* @param testData RDD representing data points to be classified
* @return an RDD[Double] where each entry contains the corresponding prediction
*/
def predict(testData: RDD[Vector]): RDD[Double]
def predictClass(testData: RDD[Vector]): RDD[Double]

/**
* Predict values for a single data point using the model trained.
* Classify a single data point using the model trained.
*
* @param testData array representing a single data point
* @return predicted category from the trained model
*/
def predict(testData: Vector): Double
def predictClass(testData: Vector): Double

/**
* Predict values for examples stored in a JavaRDD.
* @param testData JavaRDD representing data points to be predicted
* @return a JavaRDD[java.lang.Double] where each entry contains the corresponding prediction
*/
def predict(testData: JavaRDD[Vector]): JavaRDD[java.lang.Double] =
predict(testData.rdd).toJavaRDD().asInstanceOf[JavaRDD[java.lang.Double]]
def predictClass(testData: JavaRDD[Vector]): JavaRDD[java.lang.Double] =
predictClass(testData.rdd).toJavaRDD().asInstanceOf[JavaRDD[java.lang.Double]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.mllib.classification

import org.apache.spark.annotation.Experimental
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.regression._
Expand All @@ -33,45 +32,34 @@ import org.apache.spark.rdd.RDD
class LogisticRegressionModel (
override val weights: Vector,
override val intercept: Double)
extends GeneralizedLinearModel(weights, intercept) with ClassificationModel with Serializable {
extends BinaryClassificationModel(weights, intercept) with ProbabilisticClassificationModel {

private var threshold: Option[Double] = Some(0.5)
protected def computeProbability(value: Double) = {
1.0 / (1.0 + math.exp(-value))
}

/**
* :: Experimental ::
* Sets the threshold that separates positive predictions from negative predictions. An example
* with prediction score greater than or equal to this threshold is identified as an positive,
* and negative otherwise. The default value is 0.5.
*/
@Experimental
def setThreshold(threshold: Double): this.type = {
this.threshold = Some(threshold)
this
def predictProbability(testData: RDD[Vector]): RDD[Double] = {
predictScore(testData).map(computeProbability)
}

/**
* :: Experimental ::
* Clears the threshold so that `predict` will output raw prediction scores.
*/
@Experimental
def clearThreshold(): this.type = {
threshold = None
this
def predictProbability(testData: Vector): Double = {
computeProbability(predictScore(testData))
}

/**
* DEPRECATED: Use predictProbability(...) or predictClass(...) instead
*/
@Deprecated
override protected def predictPoint(dataMatrix: Vector, weightMatrix: Vector,
intercept: Double) = {
val margin = weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
val score = 1.0 / (1.0 + math.exp(-margin))
threshold match {
case Some(t) => if (score < t) 0.0 else 1.0
case None => score
}
intercept: Double) = {
if (useThreshold) predictClass(dataMatrix)
else predictProbability(dataMatrix)
}
}

/**
* Train a classification model for Logistic Regression using Stochastic Gradient Descent.
* Train a classification model for Logistic Regression using limited-memory
* Broyden–Fletcher–Goldfarb–Shanno algorithm.
* NOTE: Labels used in Logistic Regression should be {0, 1}
*
* Using [[LogisticRegressionWithLBFGS]] is recommended over this.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.api.java.JavaRDD

/**
* Model for Naive Bayes Classifiers.
Expand Down Expand Up @@ -54,17 +55,36 @@ class NaiveBayesModel private[mllib] (
}
}

override def predict(testData: RDD[Vector]): RDD[Double] = {
override def predictClass(testData: RDD[Vector]): RDD[Double] = {
val bcModel = testData.context.broadcast(this)
testData.mapPartitions { iter =>
val model = bcModel.value
iter.map(model.predict)
iter.map(model.predictClass)
}
}

override def predict(testData: Vector): Double = {
override def predictClass(testData: Vector): Double = {
labels(brzArgmax(brzPi + brzTheta * testData.toBreeze))
}

/**
* DEPRECATED: Use predictClass(...) instead
*/
@Deprecated
def predict(testData: RDD[Vector]): RDD[Double] = predictClass(testData)

/**
* DEPRECATED: Use predictClass(...) instead
*/
@deprecated
def predict(testData: Vector): Double = predictClass(testData)

/**
* DEPRECATED: Use predictClass(...) instead
*/
@Deprecated
def predict(testData: JavaRDD[Vector]): JavaRDD[java.lang.Double] =
predict(testData.rdd).toJavaRDD().asInstanceOf[JavaRDD[java.lang.Double]]
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.classification

import org.apache.spark.annotation.Experimental
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.rdd.RDD

/**
* :: Experimental ::
* Represents a probabilistic classification model that provides a probability
* distribution over a set of classes, rather than only predicting a class.
*/
@Experimental
trait ProbabilisticClassificationModel extends ClassificationModel {
/**
* Return probability for the prediction of the given data set using the model trained.
*
* @param testData RDD representing data points to be classified
* @return an RDD[Double] where each entry contains the corresponding prediction
*/
def predictProbability(testData: RDD[Vector]): RDD[Double]

/**
* Return probability for a single data point prediction using the model trained.
*
* @param testData array representing a single data point
* @return predicted category from the trained model
*/
def predictProbability(testData: Vector): Double
}
Loading