Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Modified Linear Regression to support new MLContext and added support
for Spark 2.0
  • Loading branch information
Niketan Pansare committed Aug 9, 2016
commit 3a2a4cfb8a8f7e3b254e7ef7bfebe72c30013b0e
34 changes: 24 additions & 10 deletions src/main/java/org/apache/sysml/api/python/SystemML.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,13 +321,15 @@ class mllearn:
class BaseSystemMLEstimator(Estimator):
# TODO: Allow users to set featuresCol (with default 'features') and labelCol (with default 'label')

# Returns a model after calling fit(df) on Estimator object on JVM
def _fit(self, X):
if hasattr(X, '_jdf') and 'features' in X.columns and 'label' in X.columns:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should add a TODO here for users to set the name of the featuresCol and labelCols

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also not sure there should be the condition for label because Estimator is a general name and can include unsupervised algorithms.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

X can be a Pandas Dataframe as well, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I was going by the convention in MLPipeline.

self.model = self.estimator.fit(X._jdf)
return self
else:
raise Exception('Incorrect usage: Expected dataframe as input with features/label as columns')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename Exception to a more useful error everywhere?



# Returns a model after calling fit(X:MatrixBlock, y:MatrixBlock) on Estimator object on JVM
def fit(self, X, y=None, params=None):
if y is None:
return self._fit(X)
Expand Down Expand Up @@ -356,7 +358,8 @@ def fit(self, X, y=None, params=None):

def transform(self, X):
return self.predict(X)


# Returns either a DataFrame or MatrixBlock after calling transform(X:MatrixBlock, y:MatrixBlock) on Model object on JVM
def predict(self, X):
if isinstance(X, SUPPORTED_TYPES):
if self.transferUsingDF:
Expand Down Expand Up @@ -389,12 +392,23 @@ def predict(self, X):
else:
raise Exception('Unsupported input type')

class BaseSystemMLClassifier(BaseSystemMLEstimator):

# Scores the predicted value with ground truth 'y'
def score(self, X, y):
return metrics.accuracy_score(y, self.predict(X))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This suggests that we should maybe start having a BaseRegressor and a BaseClassifier

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Python part of the code is a thin API and I think having BaseClassifier and BaseRegressor might be overkill. But, I agree we need to have BaseClassifier and BaseRegressor at Scala side as it implements the core logic and have updated code accordingly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was suggesting this because if we inherit from BaseSystemMLEstimator, the default score is the accuracy score which does not apply for non-classifiers.
We should either make this an abstract method and reimplement the score logic for all the models, or make a BaseRegressor etc and avoid reimplenting the logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point. I will do the necessary change in the next commit 👍


class BaseSystemMLRegressor(BaseSystemMLEstimator):

# Scores the predicted value with ground truth 'y'
def score(self, X, y):
return metrics.r2_score(y, self.predict(X), multioutput='variance_weighted')


# Or we can create new Python project with package structure
class LogisticRegression(BaseSystemMLEstimator):
class LogisticRegression(BaseSystemMLClassifier):

# See https://apache.github.io/incubator-systemml/algorithms-reference for usage
def __init__(self, sqlCtx, penalty='l2', fit_intercept=True, max_iter=100, max_inner_iter=0, tol=0.000001, C=1.0, solver='newton-cg', transferUsingDF=False):
self.sqlCtx = sqlCtx
self.sc = sqlCtx._sc
Expand All @@ -415,8 +429,9 @@ def __init__(self, sqlCtx, penalty='l2', fit_intercept=True, max_iter=100, max_i
if solver != 'newton-cg':
raise Exception('Only newton-cg solver supported')

class LinearRegression(BaseSystemMLEstimator):
class LinearRegression(BaseSystemMLRegressor):

# See https://apache.github.io/incubator-systemml/algorithms-reference for usage
def __init__(self, sqlCtx, fit_intercept=True, max_iter=100, tol=0.000001, C=1.0, solver='newton-cg', transferUsingDF=False):
self.sqlCtx = sqlCtx
self.sc = sqlCtx._sc
Expand All @@ -435,12 +450,10 @@ def __init__(self, sqlCtx, fit_intercept=True, max_iter=100, tol=0.000001, C=1.0
self.transferUsingDF = transferUsingDF
self.setOutputRawPredictionsToFalse = False

def score(self, X, y):
return metrics.r2_score(y, self.predict(X), multioutput='variance_weighted')


class SVM(BaseSystemMLEstimator):
class SVM(BaseSystemMLClassifier):

# See https://apache.github.io/incubator-systemml/algorithms-reference for usage
def __init__(self, sqlCtx, fit_intercept=True, max_iter=100, tol=0.000001, C=1.0, is_multi_class=False, transferUsingDF=False):
self.sqlCtx = sqlCtx
self.sc = sqlCtx._sc
Expand All @@ -456,13 +469,14 @@ def __init__(self, sqlCtx, fit_intercept=True, max_iter=100, tol=0.000001, C=1.0
self.transferUsingDF = transferUsingDF
self.setOutputRawPredictionsToFalse = False

class NaiveBayes(BaseSystemMLEstimator):
class NaiveBayes(BaseSystemMLClassifier):

# See https://apache.github.io/incubator-systemml/algorithms-reference for usage
def __init__(self, sqlCtx, laplace=1.0, transferUsingDF=False):
self.sqlCtx = sqlCtx
self.sc = sqlCtx._sc
self.uid = "nb"
self.estimator = self.sc._jvm.org.apache.sysml.api.ml.NaiveBayes(self.uid, self.sc._jsc.sc())
self.estimator.setLaplace(laplace)
self.transferUsingDF = transferUsingDF
self.setOutputRawPredictionsToFalse = False
self.setOutputRawPredictionsToFalse = False
71 changes: 38 additions & 33 deletions src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.apache.spark.rdd.RDD
import java.io.File
import org.apache.spark.SparkContext
import org.apache.spark.ml.{ Model, Estimator }
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.StructType
import org.apache.spark.ml.param.{ Params, Param, ParamMap, DoubleParam }
import org.apache.sysml.runtime.matrix.MatrixCharacteristics
Expand All @@ -32,6 +31,7 @@ import org.apache.sysml.runtime.DMLRuntimeException
import org.apache.sysml.runtime.instructions.spark.utils.{ RDDConverterUtilsExt => RDDConverterUtils }
import org.apache.sysml.api.mlcontext._
import org.apache.sysml.api.mlcontext.ScriptFactory._
import org.apache.spark.sql._

trait HasLaplace extends Params {
final val laplace: Param[Double] = new Param[Double](this, "laplace", "Laplace smoothing specified by the user to avoid creation of 0 probabilities.")
Expand Down Expand Up @@ -64,58 +64,63 @@ trait HasRegParam extends Params {
final def getRegParam: Double = $(regParam)
}


trait BaseSystemMLClassifier {
trait BaseSystemMLEstimator {
def transformSchema(schema: StructType): StructType = schema

// Returns the script and variables for X and y
def getTrainingScript(isSingleNode:Boolean):(Script, String, String)

def toDouble(i:Int): java.lang.Double = {
double2Double(i.toDouble)
}

def toDouble(d:Double): java.lang.Double = {
double2Double(d)
}
}

trait BaseSystemMLEstimatorModel {
def toDouble(i:Int): java.lang.Double = {
double2Double(i.toDouble)
}
def toDouble(d:Double): java.lang.Double = {
double2Double(d)
}

def transformSchema(schema: StructType): StructType = schema

// Returns the script and variable for X
def getPredictionScript(mloutput: MLResults, isSingleNode:Boolean): (Script, String)
}

trait BaseSystemMLClassifier extends BaseSystemMLEstimator {

def fit(X_mb: MatrixBlock, y_mb: MatrixBlock, sc: SparkContext): (MLResults, java.util.HashMap[Int, String]) = {
val isSingleNode = true
val ml = new org.apache.sysml.api.mlcontext.MLContext(sc)
val ml = new MLContext(sc)
val revLabelMapping = new java.util.HashMap[Int, String]
PredictionUtils.fillLabelMapping(y_mb, revLabelMapping)
val ret = getTrainingScript(isSingleNode)
val script = ret._1.in(ret._2, X_mb).in(ret._3, y_mb)
(ml.execute(script), revLabelMapping)
}

def fit(df: DataFrame, sc: SparkContext): (MLResults, java.util.HashMap[Int, String]) = {
def fit(df: ScriptsUtils.SparkDataType, sc: SparkContext): (MLResults, java.util.HashMap[Int, String]) = {
val isSingleNode = false
val ml = new MLContext(df.rdd.sparkContext)
val mcXin = new MatrixCharacteristics()
val Xin = RDDConverterUtils.vectorDataFrameToBinaryBlock(sc, df, mcXin, false, "features")
val Xin = RDDConverterUtils.vectorDataFrameToBinaryBlock(sc, df.asInstanceOf[DataFrame], mcXin, false, "features")
val revLabelMapping = new java.util.HashMap[Int, String]
val yin = PredictionUtils.fillLabelMapping(df, revLabelMapping)
val ret = getTrainingScript(isSingleNode)
val Xbin = new BinaryBlockMatrix(Xin, mcXin)
val script = ret._1.in(ret._2, Xbin).in(ret._3, yin)
(ml.execute(script), revLabelMapping)
}

def toDouble(i:Int): java.lang.Double = {
double2Double(i.toDouble)
}
def toDouble(d:Double): java.lang.Double = {
double2Double(d)
}

}

trait BaseSystemMLClassifierModel {

def toDouble(i:Int): java.lang.Double = {
double2Double(i.toDouble)
}
def toDouble(d:Double): java.lang.Double = {
double2Double(d)
}

def transformSchema(schema: StructType): StructType = schema

// Returns the script and variable for X
def getPredictionScript(mloutput: MLResults, isSingleNode:Boolean): (Script, String)
trait BaseSystemMLClassifierModel extends BaseSystemMLEstimatorModel {

def transform(X: MatrixBlock, mloutput: MLResults, labelMapping: java.util.HashMap[Int, String], sc: SparkContext, probVar:String): MatrixBlock = {
val isSingleNode = true
Expand All @@ -131,25 +136,25 @@ trait BaseSystemMLClassifierModel {
PredictionUtils.updateLabels(isSingleNode, null, ret, null, labelMapping)
return ret
}

def transform(df: DataFrame, mloutput: MLResults, labelMapping: java.util.HashMap[Int, String], sc: SparkContext,
def transform(df: ScriptsUtils.SparkDataType, mloutput: MLResults, labelMapping: java.util.HashMap[Int, String], sc: SparkContext,
probVar:String, outputProb:Boolean=true): DataFrame = {
val isSingleNode = false
val ml = new MLContext(sc)
val mcXin = new MatrixCharacteristics()
val Xin = RDDConverterUtils.vectorDataFrameToBinaryBlock(df.rdd.sparkContext, df, mcXin, false, "features")
val Xin = RDDConverterUtils.vectorDataFrameToBinaryBlock(df.rdd.sparkContext, df.asInstanceOf[DataFrame], mcXin, false, "features")
val script = getPredictionScript(mloutput, isSingleNode)
val Xin_bin = new BinaryBlockMatrix(Xin, mcXin)
val modelPredict = ml.execute(script._1.in(script._2, Xin_bin))
val predLabelOut = PredictionUtils.computePredictedClassLabelsFromProbability(modelPredict, isSingleNode, sc, probVar)
val predictedDF = PredictionUtils.updateLabels(isSingleNode, predLabelOut.getDataFrame("Prediction"), null, "C1", labelMapping).select("ID", "prediction")
if(outputProb) {
val prob = modelPredict.getDataFrame(probVar, true).withColumnRenamed("C1", "probability").select("ID", "probability")
val dataset = RDDConverterUtils.addIDToDataFrame(df, df.sqlContext, "ID")
return PredictionUtils.joinUsingID(dataset, PredictionUtils.joinUsingID(prob, predictedDF))
val dataset = RDDConverterUtils.addIDToDataFrame(df.asInstanceOf[DataFrame], df.sqlContext, "ID")
return PredictionUtils.joinUsingID(dataset, PredictionUtils.joinUsingID(prob, predictedDF))
}
else {
val dataset = RDDConverterUtils.addIDToDataFrame(df, df.sqlContext, "ID")
val dataset = RDDConverterUtils.addIDToDataFrame(df.asInstanceOf[DataFrame], df.sqlContext, "ID")
return PredictionUtils.joinUsingID(dataset, predictedDF)
}

Expand Down
86 changes: 86 additions & 0 deletions src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.sysml.api.ml

import org.apache.spark.rdd.RDD
import java.io.File
import org.apache.spark.SparkContext
import org.apache.spark.ml.{ Model, Estimator }
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.StructType
import org.apache.spark.ml.param.{ Params, Param, ParamMap, DoubleParam }
import org.apache.sysml.runtime.matrix.MatrixCharacteristics
import org.apache.sysml.runtime.matrix.data.MatrixBlock
import org.apache.sysml.runtime.DMLRuntimeException
import org.apache.sysml.runtime.instructions.spark.utils.{ RDDConverterUtilsExt => RDDConverterUtils }
import org.apache.sysml.api.mlcontext._
import org.apache.sysml.api.mlcontext.ScriptFactory._

trait BaseSystemMLRegressor extends BaseSystemMLEstimator {

def fit(X_mb: MatrixBlock, y_mb: MatrixBlock, sc: SparkContext): MLResults = {
val isSingleNode = true
val ml = new MLContext(sc)
val ret = getTrainingScript(isSingleNode)
val script = ret._1.in(ret._2, X_mb).in(ret._3, y_mb)
ml.execute(script)
}

def fit(df: ScriptsUtils.SparkDataType, sc: SparkContext): MLResults = {
val isSingleNode = false
val ml = new MLContext(df.rdd.sparkContext)
val mcXin = new MatrixCharacteristics()
val Xin = RDDConverterUtils.vectorDataFrameToBinaryBlock(sc, df.asInstanceOf[DataFrame], mcXin, false, "features")
val yin = df.select("label")
val ret = getTrainingScript(isSingleNode)
val Xbin = new BinaryBlockMatrix(Xin, mcXin)
val script = ret._1.in(ret._2, Xbin).in(ret._3, yin)
ml.execute(script)
}
}

trait BaseSystemMLRegressorModel extends BaseSystemMLEstimatorModel {

def transform(X: MatrixBlock, mloutput: MLResults, sc: SparkContext, predictionVar:String): MatrixBlock = {
val isSingleNode = true
val ml = new MLContext(sc)
val script = getPredictionScript(mloutput, isSingleNode)
val modelPredict = ml.execute(script._1.in(script._2, X))
val ret = modelPredict.getBinaryBlockMatrix(predictionVar).getMatrixBlock

if(ret.getNumColumns != 1) {
throw new RuntimeException("Expected prediction to be a column vector")
}
return ret
}

def transform(df: ScriptsUtils.SparkDataType, mloutput: MLResults, sc: SparkContext, predictionVar:String): DataFrame = {
val isSingleNode = false
val ml = new MLContext(sc)
val mcXin = new MatrixCharacteristics()
val Xin = RDDConverterUtils.vectorDataFrameToBinaryBlock(df.rdd.sparkContext, df.asInstanceOf[DataFrame], mcXin, false, "features")
val script = getPredictionScript(mloutput, isSingleNode)
val Xin_bin = new BinaryBlockMatrix(Xin, mcXin)
val modelPredict = ml.execute(script._1.in(script._2, Xin_bin))
val predictedDF = modelPredict.getDataFrame(predictionVar).select("ID", "C1").withColumnRenamed("C1", "prediction")
val dataset = RDDConverterUtils.addIDToDataFrame(df.asInstanceOf[DataFrame], df.sqlContext, "ID")
return PredictionUtils.joinUsingID(dataset, predictedDF)
}
}
Loading