Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
modify and clean the unit tests
  • Loading branch information
lu-wang-dl committed May 4, 2018
commit c7a14bb14132ce8de084192b02fcdaa1922986a9
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private[spark] object SchemaUtils {

/**
* Check whether the given column in the schema is one of the supporting vector type: Vector,
* Array[Dloat]. Array[Double]
* Array[Float]. Array[Double]
* @param schema input schema
* @param colName column name
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@

package org.apache.spark.ml.clustering

import scala.language.existentials

import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
import org.apache.spark.ml.util.TestingUtils._
import org.apache.spark.mllib.clustering.DistanceMeasure
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType}
import org.apache.spark.sql.{DataFrame, Dataset}

class BisectingKMeansSuite
extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
Expand Down Expand Up @@ -186,37 +187,24 @@ class BisectingKMeansSuite
}

test("BisectingKMeans with Array input") {
val featuresColNameD = "array_double_features"
val featuresColNameF = "array_float_features"
val doubleUDF = udf { (features: Vector) =>
val featureArray = Array.fill[Double](features.size)(0.0)
features.foreachActive((idx, value) => featureArray(idx) = value.toFloat)
featureArray
}
val floatUDF = udf { (features: Vector) =>
val featureArray = Array.fill[Float](features.size)(0.0f)
features.foreachActive((idx, value) => featureArray(idx) = value.toFloat)
featureArray
def trainTransfromAndComputeCost(dataset: Dataset[_]): (DataFrame, Double) = {
val model = new BisectingKMeans().setK(k).setMaxIter(1).setSeed(1).fit(dataset)
(model.transform(dataset), model.computeCost(dataset))
}
val newdatasetD = dataset.withColumn(featuresColNameD, doubleUDF(col("features")))
.drop("features")
val newdatasetF = dataset.withColumn(featuresColNameF, floatUDF(col("features")))
.drop("features")
assert(newdatasetD.schema(featuresColNameD).dataType.equals(new ArrayType(DoubleType, false)))
assert(newdatasetF.schema(featuresColNameF).dataType.equals(new ArrayType(FloatType, false)))

val bkmD = new BisectingKMeans()
.setK(k).setMaxIter(1).setFeaturesCol(featuresColNameD).setSeed(1)
val bkmF = new BisectingKMeans()
.setK(k).setMaxIter(1).setFeaturesCol(featuresColNameF).setSeed(1)
val modelD = bkmD.fit(newdatasetD)
val modelF = bkmF.fit(newdatasetF)
val transformedD = modelD.transform(newdatasetD)
val transformedF = modelF.transform(newdatasetF)
val predictDifference = transformedD.select("prediction")

val (newDatasetD, newDatasetF) = MLTestingUtils.generateArrayFeatureDataset(dataset)
val (transformed, trueCost) = trainTransfromAndComputeCost(dataset)
val (transformedD, doubleArrayCost) = trainTransfromAndComputeCost(newDatasetD)
val (transformedF, floatArrayCost) = trainTransfromAndComputeCost(newDatasetF)

val predictDifferenceD = transformed.select("prediction")
.except(transformedD.select("prediction"))
assert(predictDifferenceD.count() == 0)
val predictDifferenceF = transformed.select("prediction")
.except(transformedF.select("prediction"))
assert(predictDifference.count() == 0)
assert(modelD.computeCost(newdatasetD) == modelF.computeCost(newdatasetF) )
assert(predictDifferenceF.count() == 0)
assert(trueCost ~== doubleArrayCost absTol 1e-6)
assert(trueCost ~== floatArrayCost absTol 1e-6)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,16 @@

package org.apache.spark.ml.clustering

import scala.language.existentials

import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.linalg.{DenseMatrix, Matrices, Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.stat.distribution.MultivariateGaussian
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
import org.apache.spark.ml.util.TestingUtils._
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType}

import org.apache.spark.sql.{DataFrame, Dataset, Row}

class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext
with DefaultReadWriteTest {
Expand Down Expand Up @@ -260,39 +259,29 @@ class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext
}

test("GaussianMixture with Array input") {
val featuresColNameD = "array_double_features"
val featuresColNameF = "array_float_features"
val doubleUDF = udf { (features: Vector) =>
val featureArray = Array.fill[Double](features.size)(0.0)
features.foreachActive((idx, value) => featureArray(idx) = value.toFloat)
featureArray
}
val floatUDF = udf { (features: Vector) =>
val featureArray = Array.fill[Float](features.size)(0.0f)
features.foreachActive((idx, value) => featureArray(idx) = value.toFloat)
featureArray
def trainAndTransfrom(dataset: Dataset[_]): DataFrame = {
val model = new GaussianMixture().setK(k).setMaxIter(1).setSeed(1).fit(dataset)
model.transform(dataset)
}
val newdatasetD = dataset.withColumn(featuresColNameD, doubleUDF(col("features")))
.drop("features")
val newdatasetF = dataset.withColumn(featuresColNameF, floatUDF(col("features")))
.drop("features")
assert(newdatasetD.schema(featuresColNameD).dataType.equals(new ArrayType(DoubleType, false)))
assert(newdatasetF.schema(featuresColNameF).dataType.equals(new ArrayType(FloatType, false)))

val gmD = new GaussianMixture().setK(k).setMaxIter(1)
.setFeaturesCol(featuresColNameD).setSeed(1)
val gmF = new GaussianMixture().setK(k).setMaxIter(1)
.setFeaturesCol(featuresColNameF).setSeed(1)
val modelD = gmD.fit(newdatasetD)
val modelF = gmF.fit(newdatasetF)
val transformedD = modelD.transform(newdatasetD)
val transformedF = modelF.transform(newdatasetF)
val predictDifference = transformedD.select("prediction")

val (newDatasetD, newDatasetF) = MLTestingUtils.generateArrayFeatureDataset(dataset)
val transformed = trainAndTransfrom(dataset)
val transformedD = trainAndTransfrom(newDatasetD)
val transformedF = trainAndTransfrom(newDatasetF)

val predictDifferenceD = transformed.select("prediction")
.except(transformedD.select("prediction"))
assert(predictDifferenceD.count() == 0)
val predictDifferenceF = transformed.select("prediction")
.except(transformedF.select("prediction"))
assert(predictDifference.count() == 0)
val probabilityDifference = transformedD.select("probability")
assert(predictDifferenceF.count() == 0)

val probabilityDifferenceD = transformed.select("probability")
.except(transformedD.select("probability"))
assert(probabilityDifferenceD.count() == 0)
val probabilityDifferenceF = transformed.select("probability")
.except(transformedF.select("probability"))
assert(probabilityDifference.count() == 0)
assert(probabilityDifferenceF.count() == 0)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.ml.clustering

import scala.language.existentials
import scala.util.Random

import org.dmg.pmml.{ClusteringModel, PMML}
Expand All @@ -25,13 +26,11 @@ import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util._
import org.apache.spark.mllib.clustering.{DistanceMeasure, KMeans => MLlibKMeans,
KMeansModel => MLlibKMeansModel}
import org.apache.spark.ml.util.TestingUtils._
import org.apache.spark.mllib.clustering.{DistanceMeasure, KMeans => MLlibKMeans, KMeansModel => MLlibKMeansModel}
import org.apache.spark.mllib.linalg.{Vectors => MLlibVectors}
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType}

private[clustering] case class TestRow(features: Vector)

Expand Down Expand Up @@ -202,38 +201,24 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultR
}

test("KMean with Array input") {
val featuresColNameD = "array_double_features"
val featuresColNameF = "array_float_features"

val doubleUDF = udf { (features: Vector) =>
val featureArray = Array.fill[Double](features.size)(0.0)
features.foreachActive((idx, value) => featureArray(idx) = value.toFloat)
featureArray
}
val floatUDF = udf { (features: Vector) =>
val featureArray = Array.fill[Float](features.size)(0.0f)
features.foreachActive((idx, value) => featureArray(idx) = value.toFloat)
featureArray
def trainTransfromAndComputeCost(dataset: Dataset[_]): (DataFrame, Double) = {
val model = new KMeans().setK(k).setMaxIter(1).setSeed(1).fit(dataset)
(model.transform(dataset), model.computeCost(dataset))
}

val newdatasetD = dataset.withColumn(featuresColNameD, doubleUDF(col("features")))
.drop("features")
val newdatasetF = dataset.withColumn(featuresColNameF, floatUDF(col("features")))
.drop("features")
assert(newdatasetD.schema(featuresColNameD).dataType.equals(new ArrayType(DoubleType, false)))
assert(newdatasetF.schema(featuresColNameF).dataType.equals(new ArrayType(FloatType, false)))

val kmeansD = new KMeans().setK(k).setMaxIter(1).setFeaturesCol(featuresColNameD).setSeed(1)
val kmeansF = new KMeans().setK(k).setMaxIter(1).setFeaturesCol(featuresColNameF).setSeed(1)
val modelD = kmeansD.fit(newdatasetD)
val modelF = kmeansF.fit(newdatasetF)
val transformedD = modelD.transform(newdatasetD)
val transformedF = modelF.transform(newdatasetF)

val predictDifference = transformedD.select("prediction")
val (newDatasetD, newDatasetF) = MLTestingUtils.generateArrayFeatureDataset(dataset)
val (transformed, trueCost) = trainTransfromAndComputeCost(dataset)
val (transformedD, doubleArrayCost) = trainTransfromAndComputeCost(newDatasetD)
val (transformedF, floatArrayCost) = trainTransfromAndComputeCost(newDatasetF)

val predictDifferenceD = transformed.select("prediction")
.except(transformedD.select("prediction"))
assert(predictDifferenceD.count() == 0)
val predictDifferenceF = transformed.select("prediction")
.except(transformedF.select("prediction"))
assert(predictDifference.count() == 0)
assert(modelD.computeCost(newdatasetD) == modelF.computeCost(newdatasetF) )
assert(predictDifferenceF.count() == 0)
assert(trueCost ~== doubleArrayCost absTol 1e-6)
assert(trueCost ~== floatArrayCost absTol 1e-6)
}


Expand Down
43 changes: 10 additions & 33 deletions mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.ml.clustering

import scala.language.existentials

import org.apache.hadoop.fs.Path

import org.apache.spark.SparkFunSuite
Expand All @@ -25,8 +27,6 @@ import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
import org.apache.spark.ml.util.TestingUtils._
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType}

object LDASuite {
def generateLDAData(
Expand Down Expand Up @@ -326,41 +326,18 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
}

test("LDA with Array input") {
val featuresColNameD = "array_double_features"
val featuresColNameF = "array_float_features"
val doubleUDF = udf { (features: Vector) =>
val featureArray = Array.fill[Double](features.size)(0.0)
features.foreachActive((idx, value) => featureArray(idx) = value.toFloat)
featureArray
}
val floatUDF = udf { (features: Vector) =>
val featureArray = Array.fill[Float](features.size)(0.0f)
features.foreachActive((idx, value) => featureArray(idx) = value.toFloat)
featureArray
def trainAndLogLikehoodAndPerplexity(dataset: Dataset[_]): (Double, Double) = {
val model = new LDA().setK(k).setOptimizer("online").setMaxIter(1).setSeed(1).fit(dataset)
(model.logLikelihood(dataset), model.logPerplexity(dataset))
}
val newdatasetD = dataset.withColumn(featuresColNameD, doubleUDF(col("features")))
.drop("features")
val newdatasetF = dataset.withColumn(featuresColNameF, floatUDF(col("features")))
.drop("features")
assert(newdatasetD.schema(featuresColNameD).dataType.equals(new ArrayType(DoubleType, false)))
assert(newdatasetF.schema(featuresColNameF).dataType.equals(new ArrayType(FloatType, false)))

val ldaD = new LDA().setK(k).setOptimizer("online")
.setMaxIter(1).setFeaturesCol(featuresColNameD).setSeed(1)
val ldaF = new LDA().setK(k).setOptimizer("online").
setMaxIter(1).setFeaturesCol(featuresColNameF).setSeed(1)
val modelD = ldaD.fit(newdatasetD)
val modelF = ldaF.fit(newdatasetF)

// logLikelihood, logPerplexity
val llD = modelD.logLikelihood(newdatasetD)
val llF = modelF.logLikelihood(newdatasetF)
// assert(llD == llF)
val (newDatasetD, newDatasetF) = MLTestingUtils.generateArrayFeatureDataset(dataset)
val (ll, lp) = trainAndLogLikehoodAndPerplexity(dataset)
val (llD, lpD) = trainAndLogLikehoodAndPerplexity(newDatasetD)
val (llF, lpF) = trainAndLogLikehoodAndPerplexity(newDatasetF)
// TODO: need to compare the result once we fix the seed issue for LDA (SPARK-22210)
assert(llD <= 0.0 && llD != Double.NegativeInfinity)
assert(llF <= 0.0 && llF != Double.NegativeInfinity)
val lpD = modelD.logPerplexity(newdatasetD)
val lpF = modelF.logPerplexity(newdatasetF)
// assert(lpD == lpF)
assert(lpD >= 0.0 && lpD != Double.NegativeInfinity)
assert(lpF >= 0.0 && lpF != Double.NegativeInfinity)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}
Expand Down
10 changes: 10 additions & 0 deletions mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -247,4 +247,14 @@ object MLTestingUtils extends SparkFunSuite {
}
models.sliding(2).foreach { case Seq(m1, m2) => modelEquals(m1, m2)}
}

def generateArrayFeatureDataset(dataset: Dataset[_]): (Dataset[_], Dataset[_]) = {
val doubleUDF = udf { (features: Vector) => features.toArray.map(_.toFloat.toDouble)}
val floatUDF = udf { (features: Vector) => features.toArray.map(_.toFloat)}
val newDatasetD = dataset.withColumn("features", doubleUDF(col("features")))
val newDatasetF = dataset.withColumn("features", floatUDF(col("features")))
assert(newDatasetD.schema("features").dataType.equals(new ArrayType(DoubleType, false)))
assert(newDatasetF.schema("features").dataType.equals(new ArrayType(FloatType, false)))
(newDatasetD, newDatasetF)
}
}