From 2b2f5a147534f855abb620c6d70fe3c211c8a968 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Tue, 31 Mar 2015 00:22:34 +0800 Subject: [PATCH 1/5] Model import/export for IsotonicRegression --- .../mllib/regression/IsotonicRegression.scala | 73 ++++++++++++++++++- .../regression/IsotonicRegressionSuite.scala | 19 +++++ 2 files changed, 91 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index cb70852e3cc8..ce35c6f5558e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -23,9 +23,15 @@ import java.util.Arrays.binarySearch import scala.collection.mutable.ArrayBuffer +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD} +import org.apache.spark.mllib.util.{Loader, Saveable} import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext +import org.apache.spark.sql.{DataFrame, SQLContext} /** * :: Experimental :: @@ -42,7 +48,7 @@ import org.apache.spark.rdd.RDD class IsotonicRegressionModel ( val boundaries: Array[Double], val predictions: Array[Double], - val isotonic: Boolean) extends Serializable { + val isotonic: Boolean) extends Serializable with Saveable { private val predictionOrd = if (isotonic) Ordering[Double] else Ordering[Double].reverse @@ -124,6 +130,71 @@ class IsotonicRegressionModel ( predictions(foundIndex) } } + + override def save(sc: SparkContext, path: String): Unit = { + val data = IsotonicRegressionModel.SaveLoadV1_0.Data(boundaries, predictions, isotonic) + IsotonicRegressionModel.SaveLoadV1_0.save(sc, path, data) + } + + override protected def formatVersion: String = "1.0" +} + +object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] { + + import org.apache.spark.mllib.util.Loader._ + + private object SaveLoadV1_0 { + + def thisFormatVersion: String = "1.0" + + /** Hard-code class name string in case it changes in the future */ + def thisClassName: String = "org.apache.spark.mllib.regression.IsotonicRegressionModel" + + /** Model data for model import/export */ + case class Data(boundaries: Array[Double], predictions: Array[Double], isotonic: Boolean) + + def save(sc: SparkContext, path: String, data: Data): Unit = { + val sqlContext = new SQLContext(sc) + import sqlContext.implicits._ + + val metadata = compact(render( + ("class" -> thisClassName) ~ ("version" -> thisFormatVersion))) + sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path)) + + val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF() + dataRDD.saveAsParquetFile(dataPath(path)) + } + + def load(sc: SparkContext, path: String): IsotonicRegressionModel = { + val sqlContext = new SQLContext(sc) + val dataRDD = sqlContext.parquetFile(dataPath(path)) + + checkSchema[Data](dataRDD.schema) + val dataArray = dataRDD.select("boundaries", "predictions", "isotonic").take(1) + assert(dataArray.size == 1, + s"Unable to load IsotonicRegressionModel data from: ${dataPath(path)}") + val data = dataArray(0) + val boundaries = data.getAs[Seq[Double]](0).toArray + val predictions = data.getAs[Seq[Double]](1).toArray + val isotonic = data.getAs[Boolean](2) + new IsotonicRegressionModel(boundaries, predictions, isotonic) + } + } + + override def load(sc: SparkContext, path: String): IsotonicRegressionModel = { + val (loadedClassName, version, metadata) = loadMetadata(sc, path) + val classNameV1_0 = SaveLoadV1_0.thisClassName + (loadedClassName, version) match { + case (className, "1.0") if className == classNameV1_0 => + val model = SaveLoadV1_0.load(sc, path) + model + case _ => throw new Exception( + s"IsotonicRegressionModel.load did not recognize model with (className, format version):" + + s"($loadedClassName, $version). Supported:\n" + + s" ($classNameV1_0, 1.0)" + ) + } + } } /** diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala index 7ef45248281e..0f2542b9fc4c 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala @@ -21,6 +21,7 @@ import org.scalatest.{Matchers, FunSuite} import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ +import org.apache.spark.util.Utils class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with Matchers { @@ -73,6 +74,24 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M assert(model.isotonic) } + test("model save/load") { + val model = runIsotonicRegression(Seq(1, 2, 3, 1, 6, 17, 16, 17, 18), true) + + val tempDir = Utils.createTempDir() + val path = tempDir.toURI.toString + + // Save model, load it back, and compare. + try { + model.save(sc, path) + val sameModel = IsotonicRegressionModel.load(sc, path) + assert(model.boundaries === sameModel.boundaries) + assert(model.predictions === sameModel.predictions) + assert(model.isotonic == model.isotonic) + } finally { + Utils.deleteRecursively(tempDir) + } + } + test("isotonic regression with size 0") { val model = runIsotonicRegression(Seq(), true) From 429ff7dd71384fcb36f355930611673ae8c84e71 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Sat, 4 Apr 2015 13:47:56 -0400 Subject: [PATCH 2/5] store each interval as a record --- .../mllib/regression/IsotonicRegression.scala | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index ce35c6f5558e..2e51834f3ba9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -23,6 +23,7 @@ import java.util.Arrays.binarySearch import scala.collection.mutable.ArrayBuffer +import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ @@ -132,8 +133,9 @@ class IsotonicRegressionModel ( } override def save(sc: SparkContext, path: String): Unit = { - val data = IsotonicRegressionModel.SaveLoadV1_0.Data(boundaries, predictions, isotonic) - IsotonicRegressionModel.SaveLoadV1_0.save(sc, path, data) + val intervals = boundaries.toList.zip(predictions.toList).toArray + val data = IsotonicRegressionModel.SaveLoadV1_0.Data(intervals) + IsotonicRegressionModel.SaveLoadV1_0.save(sc, path, data, isotonic) } override protected def formatVersion: String = "1.0" @@ -151,43 +153,45 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] { def thisClassName: String = "org.apache.spark.mllib.regression.IsotonicRegressionModel" /** Model data for model import/export */ - case class Data(boundaries: Array[Double], predictions: Array[Double], isotonic: Boolean) + case class Data(intervals: Array[(Double, Double)]) - def save(sc: SparkContext, path: String, data: Data): Unit = { + def save(sc: SparkContext, path: String, data: Data, isotonic: Boolean): Unit = { val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val metadata = compact(render( - ("class" -> thisClassName) ~ ("version" -> thisFormatVersion))) + ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ + ("isotonic" -> isotonic))) sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path)) val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF() dataRDD.saveAsParquetFile(dataPath(path)) } - def load(sc: SparkContext, path: String): IsotonicRegressionModel = { + def load(sc: SparkContext, path: String): (Array[Double], Array[Double]) = { val sqlContext = new SQLContext(sc) val dataRDD = sqlContext.parquetFile(dataPath(path)) checkSchema[Data](dataRDD.schema) - val dataArray = dataRDD.select("boundaries", "predictions", "isotonic").take(1) + val dataArray = dataRDD.select("intervals").take(1) assert(dataArray.size == 1, s"Unable to load IsotonicRegressionModel data from: ${dataPath(path)}") val data = dataArray(0) - val boundaries = data.getAs[Seq[Double]](0).toArray - val predictions = data.getAs[Seq[Double]](1).toArray - val isotonic = data.getAs[Boolean](2) - new IsotonicRegressionModel(boundaries, predictions, isotonic) + val intervals = data.getAs[Seq[(Double, Double)]](0) + val (boundaries, predictions) = intervals.unzip + (boundaries.toArray, predictions.toArray) } } override def load(sc: SparkContext, path: String): IsotonicRegressionModel = { + implicit val formats = DefaultFormats val (loadedClassName, version, metadata) = loadMetadata(sc, path) + val isotonic = (metadata \ "isotonic").extract[Boolean] val classNameV1_0 = SaveLoadV1_0.thisClassName (loadedClassName, version) match { case (className, "1.0") if className == classNameV1_0 => - val model = SaveLoadV1_0.load(sc, path) - model + val (boundaries, predictions) = SaveLoadV1_0.load(sc, path) + new IsotonicRegressionModel(boundaries, predictions, isotonic) case _ => throw new Exception( s"IsotonicRegressionModel.load did not recognize model with (className, format version):" + s"($loadedClassName, $version). Supported:\n" + From 49600cc3be22383d787d40a73cae09ed7106c083 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Thu, 9 Apr 2015 13:58:39 -0400 Subject: [PATCH 3/5] address comments --- .../mllib/regression/IsotonicRegression.scala | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index 2e51834f3ba9..2144c19f12c0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -133,9 +133,7 @@ class IsotonicRegressionModel ( } override def save(sc: SparkContext, path: String): Unit = { - val intervals = boundaries.toList.zip(predictions.toList).toArray - val data = IsotonicRegressionModel.SaveLoadV1_0.Data(intervals) - IsotonicRegressionModel.SaveLoadV1_0.save(sc, path, data, isotonic) + IsotonicRegressionModel.SaveLoadV1_0.save(sc, path, boundaries, predictions, isotonic) } override protected def formatVersion: String = "1.0" @@ -153,9 +151,14 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] { def thisClassName: String = "org.apache.spark.mllib.regression.IsotonicRegressionModel" /** Model data for model import/export */ - case class Data(intervals: Array[(Double, Double)]) - - def save(sc: SparkContext, path: String, data: Data, isotonic: Boolean): Unit = { + case class Data(boundary: Double, prediction: Double) + + def save( + sc: SparkContext, + path: String, + boundaries: Array[Double], + predictions: Array[Double], + isotonic: Boolean): Unit = { val sqlContext = new SQLContext(sc) import sqlContext.implicits._ @@ -164,8 +167,8 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] { ("isotonic" -> isotonic))) sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path)) - val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF() - dataRDD.saveAsParquetFile(dataPath(path)) + sqlContext.createDataFrame(boundaries.toList.zip(predictions.toList) + .map { case (b, p) => Data(b, p) }).saveAsParquetFile(dataPath(path)) } def load(sc: SparkContext, path: String): (Array[Double], Array[Double]) = { @@ -173,12 +176,9 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] { val dataRDD = sqlContext.parquetFile(dataPath(path)) checkSchema[Data](dataRDD.schema) - val dataArray = dataRDD.select("intervals").take(1) - assert(dataArray.size == 1, - s"Unable to load IsotonicRegressionModel data from: ${dataPath(path)}") - val data = dataArray(0) - val intervals = data.getAs[Seq[(Double, Double)]](0) - val (boundaries, predictions) = intervals.unzip + val dataArray = dataRDD.select("boundary", "prediction").collect() + val (boundaries, predictions) = dataArray.map { + x => (x.getAs[Double](0), x.getAs[Double](1)) }.toList.unzip (boundaries.toArray, predictions.toArray) } } From f80ec1b2bb3a1989e30c06c588f956062821ab05 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Tue, 21 Apr 2015 00:42:28 +0800 Subject: [PATCH 4/5] address comments --- .../spark/mllib/regression/IsotonicRegression.scala | 8 ++++---- .../spark/mllib/regression/IsotonicRegressionSuite.scala | 4 +++- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index 2144c19f12c0..cd9231155fb9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -143,7 +143,7 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] { import org.apache.spark.mllib.util.Loader._ - private object SaveLoadV1_0 { + private object SaveLoadV1_0 { def thisFormatVersion: String = "1.0" @@ -167,7 +167,7 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] { ("isotonic" -> isotonic))) sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path)) - sqlContext.createDataFrame(boundaries.toList.zip(predictions.toList) + sqlContext.createDataFrame(boundaries.toSeq.zip(predictions) .map { case (b, p) => Data(b, p) }).saveAsParquetFile(dataPath(path)) } @@ -177,8 +177,8 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] { checkSchema[Data](dataRDD.schema) val dataArray = dataRDD.select("boundary", "prediction").collect() - val (boundaries, predictions) = dataArray.map { - x => (x.getAs[Double](0), x.getAs[Double](1)) }.toList.unzip + val (boundaries, predictions) = dataArray.map { + x => (x.getDouble(0), x.getDouble(1)) }.toList.sortBy(_._1).unzip (boundaries.toArray, predictions.toArray) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala index 0f2542b9fc4c..a32318bfbaa6 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala @@ -75,7 +75,9 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M } test("model save/load") { - val model = runIsotonicRegression(Seq(1, 2, 3, 1, 6, 17, 16, 17, 18), true) + val boundaries = Array(0.0, 1.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0) + val predictions = Array(1, 2, 2, 6, 16.5, 16.5, 17.0, 18.0) + val model = new IsotonicRegressionModel(boundaries, predictions, true) val tempDir = Utils.createTempDir() val path = tempDir.toURI.toString From 872028db838fcd29743c509422e96e928c49e149 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Tue, 21 Apr 2015 13:07:01 +0800 Subject: [PATCH 5/5] fix code style --- .../spark/mllib/regression/IsotonicRegression.scala | 11 ++++++----- .../mllib/regression/IsotonicRegressionSuite.scala | 2 +- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index cd9231155fb9..1d7617046b6c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -160,15 +160,15 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] { predictions: Array[Double], isotonic: Boolean): Unit = { val sqlContext = new SQLContext(sc) - import sqlContext.implicits._ val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("isotonic" -> isotonic))) sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path)) - sqlContext.createDataFrame(boundaries.toSeq.zip(predictions) - .map { case (b, p) => Data(b, p) }).saveAsParquetFile(dataPath(path)) + sqlContext.createDataFrame( + boundaries.toSeq.zip(predictions).map { case (b, p) => Data(b, p) } + ).saveAsParquetFile(dataPath(path)) } def load(sc: SparkContext, path: String): (Array[Double], Array[Double]) = { @@ -177,8 +177,9 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] { checkSchema[Data](dataRDD.schema) val dataArray = dataRDD.select("boundary", "prediction").collect() - val (boundaries, predictions) = dataArray.map { - x => (x.getDouble(0), x.getDouble(1)) }.toList.sortBy(_._1).unzip + val (boundaries, predictions) = dataArray.map { x => + (x.getDouble(0), x.getDouble(1)) + }.toList.sortBy(_._1).unzip (boundaries.toArray, predictions.toArray) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala index a32318bfbaa6..8e12340bbd9d 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/IsotonicRegressionSuite.scala @@ -88,7 +88,7 @@ class IsotonicRegressionSuite extends FunSuite with MLlibTestSparkContext with M val sameModel = IsotonicRegressionModel.load(sc, path) assert(model.boundaries === sameModel.boundaries) assert(model.predictions === sameModel.predictions) - assert(model.isotonic == model.isotonic) + assert(model.isotonic === model.isotonic) } finally { Utils.deleteRecursively(tempDir) }