Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
e4492a6
add pic framework (model, class etc)
wangmiao1981 Jun 13, 2016
7086249
change a comment
wangmiao1981 Jun 13, 2016
b73d8a7
add missing functions fit predict load save etc.
wangmiao1981 Jun 17, 2016
022fe52
add unit test flie
wangmiao1981 Jun 18, 2016
552cf54
add test cases part 1
wangmiao1981 Jun 20, 2016
0b4954d
add unit test part 2: test fit, parameters etc.
wangmiao1981 Jun 20, 2016
f22b01e
fix a type issue
wangmiao1981 Jun 20, 2016
305b194
add more unit tests
wangmiao1981 Jun 21, 2016
4b32cbf
delete unused import and add comments
wangmiao1981 Jun 21, 2016
f6eda88
change version to 2.1.0
wangmiao1981 Oct 25, 2016
45c4b1c
change PIC as a Transformer
wangmiao1981 Nov 3, 2016
e8d7ed3
add LabelCol
wangmiao1981 Nov 4, 2016
e4e1e05
change col implementation
wangmiao1981 Nov 4, 2016
8384422
address some of the comments
wangmiao1981 Feb 17, 2017
d6a199c
add additional test with dataset having more data
wangmiao1981 Feb 21, 2017
b0c3aff
change input data format
wangmiao1981 Mar 14, 2017
091225d
resolve warnings
wangmiao1981 Mar 15, 2017
8bb9956
add neighbor and weight cols
wangmiao1981 Mar 16, 2017
8ba82e8
address review comments 1
wangmiao1981 Aug 15, 2017
468a947
fix style
wangmiao1981 Aug 15, 2017
ec10f24
remove unused comments
wangmiao1981 Aug 15, 2017
5710cfc
add Since
wangmiao1981 Aug 15, 2017
88654b3
fix missing >
wangmiao1981 Aug 17, 2017
804adc6
fix doc
wangmiao1981 Aug 17, 2017
4a6dd79
address review comments
wangmiao1981 Oct 25, 2017
5cb8ed6
fix unit test
wangmiao1981 Oct 30, 2017
6abf602
cleanups to docs
jkbradley Apr 3, 2018
d927087
typo
jkbradley Apr 3, 2018
d215748
final updates for PIC PR
jkbradley Apr 17, 2018
375e150
fixed scala style
jkbradley Apr 19, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add more unit tests
  • Loading branch information
wangmiao1981 authored and jkbradley committed Apr 16, 2018
commit 305b194dae40eaff990c18837c3f2bc8d469e60c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.ml.clustering

import org.apache.hadoop.fs.Path

import org.apache.spark.SparkException
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.{Estimator, Model}
Expand All @@ -30,9 +32,9 @@ import org.apache.spark.mllib.clustering.PowerIterationClustering.Assignment
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType}

/*
/**
* Common params for PowerIterationClustering and PowerIterationClusteringModel
*/
private[clustering] trait PowerIterationClusteringParams extends Params with HasMaxIter
Expand Down Expand Up @@ -78,7 +80,6 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has
}
}


@Since("2.0.0")
@Experimental
class PowerIterationClusteringModel private[ml] (
Expand Down Expand Up @@ -126,11 +127,11 @@ class PowerIterationClusteringModel private[ml] (
.saveInitMode($(initMode))
.saveMaxIter($(maxIter))
val rows: RDD[Row] = model.assignments.map {
case assignment: Assignment => Row(assignment.cluster)
case assignment: Assignment => Row(assignment.id, assignment.cluster)
}
val schema = new StructType(Array(StructField("cluster", IntegerType)))
val predict = sparkSession.createDataFrame(rows, schema)
features.withColumn($(predictionCol), predict.col("cluster"))
val schema = new StructType(Array(StructField($(featuresCol), LongType),
StructField($(predictionCol), IntegerType)))
sparkSession.createDataFrame(rows, schema)
}

@Since("2.0.0")
Expand Down Expand Up @@ -178,7 +179,8 @@ object PowerIterationClusteringModel extends MLReadable[PowerIterationClustering
override protected def saveImpl(path: String): Unit = {
// Save metadata and Params
DefaultParamsWriter.saveMetadata(instance, path, sc)
MLlibPowerIterationClusteringModel.SaveLoadV1_0.save(sc, instance.parentModel, path)
val dataPath = new Path(path, "data").toString
instance.parentModel.save(sc, dataPath)
}
}

Expand All @@ -189,10 +191,9 @@ object PowerIterationClusteringModel extends MLReadable[PowerIterationClustering
private val className = classOf[PowerIterationClusteringModel].getName

override def load(path: String): PowerIterationClusteringModel = {

val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
val parentModel = MLlibPowerIterationClusteringModel.SaveLoadV1_0.load(sc, path)

val dataPath = new Path(path, "data").toString
val parentModel = MLlibPowerIterationClusteringModel.load(sc, dataPath)
val model = new PowerIterationClusteringModel(metadata.uid, parentModel)
DefaultParamsReader.getAndSetParams(model, metadata)
model
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode
PowerIterationClusteringModel.SaveLoadV1_0.load(sc, path)
}

private[spark]
private[clustering]
object SaveLoadV1_0 {

private val thisFormatVersion = "1.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,21 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.util.DefaultReadWriteTest
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

class PowerIterationClusteringSuite extends SparkFunSuite
with MLlibTestSparkContext with DefaultReadWriteTest {

import org.apache.spark.ml.clustering.PowerIterationClustering._
@transient var data: Dataset[_] = _
final val r1 = 1.0
final val n1 = 10
final val r2 = 4.0
final val n2 = 40

/** Generates a circle of points. */
private def genCircle(r: Double, n: Int): Array[(Double, Double)] = {
Array.tabulate(n) { i =>
val theta = 2.0 * math.Pi * i / n
(r * math.cos(theta), r * math.sin(theta))
}
}
override def beforeAll(): Unit = {
super.beforeAll()

/** Computes Gaussian similarity. */
private def sim(x: (Double, Double), y: (Double, Double)): Double = {
val dist2 = (x._1 - y._1) * (x._1 - y._1) + (x._2 - y._2) * (x._2 - y._2)
math.exp(-dist2 / 2.0)
data = PowerIterationClusteringSuite.generatePICData(spark, r1, r2, n1, n2)
}

test("default parameters") {
Expand Down Expand Up @@ -78,23 +75,7 @@ class PowerIterationClusteringSuite extends SparkFunSuite
}

test("power iteration clustering") {
// Generate two circles following the example in the PIC paper.
val r1 = 1.0
val n1 = 10
val r2 = 4.0
val n2 = 40
val n = n1 + n2
val points = genCircle(r1, n1) ++ genCircle(r2, n2)
val similarities = for (i <- 1 until n; j <- 0 until i) yield {
(i.toLong, j.toLong, sim(points(i), points(j)))
}

val sc = spark.sparkContext
val rdd = sc.parallelize(similarities)
.map{case (i: Long, j: Long, sim: Double) => Vectors.dense(Array(i, j, sim))}
.map(v => TestRow(v))
val data = spark.createDataFrame(rdd)

val model = new PowerIterationClustering()
.setK(2)
.setMaxIter(40)
Expand All @@ -116,4 +97,80 @@ class PowerIterationClusteringSuite extends SparkFunSuite
}
assert(predictions2.toSet == Set((0 until n1).toSet, (n1 until n).toSet))
}

test("transform") {
val predictionColName = "pic_prediction"
val model = new PowerIterationClustering()
.setK(2)
.setMaxIter(10)
.setPredictionCol(predictionColName)
.fit(data)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove blank line or add blank line after line 139 for consistence?

val transformed = model.transform(data)
val expectedColumns = Array("features", predictionColName)
expectedColumns.foreach { column =>
assert(transformed.columns.contains(column))
}
}

test("read/write") {
def checkModelData(model: PowerIterationClusteringModel,
model2: PowerIterationClusteringModel): Unit = {
assert(model.getK === model2.getK)
val modelAssignments =
model.assignments.map(x => (x.id, x.cluster))
val model2Assignments =
model2.assignments.map(x => (x.id, x.cluster))
val unequalElements = modelAssignments.join(model2Assignments).filter {
case (id, (c1, c2)) => c1 != c2 }.count()
assert(unequalElements === 0L)
}
val pic = new PowerIterationClustering()
testEstimatorAndModelReadWrite(pic, data, PowerIterationClusteringSuite.allParamSettings,
checkModelData)
}
}

object PowerIterationClusteringSuite {

/** Generates a circle of points. */
private def genCircle(r: Double, n: Int): Array[(Double, Double)] = {
Array.tabulate(n) { i =>
val theta = 2.0 * math.Pi * i / n
(r * math.cos(theta), r * math.sin(theta))
}
}

/** Computes Gaussian similarity. */
private def sim(x: (Double, Double), y: (Double, Double)): Double = {
val dist2 = (x._1 - y._1) * (x._1 - y._1) + (x._2 - y._2) * (x._2 - y._2)
math.exp(-dist2 / 2.0)
}

def generatePICData(spark: SparkSession, r1: Double, r2: Double,
n1: Int, n2: Int): DataFrame = {
// Generate two circles following the example in the PIC paper.
val n = n1 + n2
val points = genCircle(r1, n1) ++ genCircle(r2, n2)
val similarities = for (i <- 1 until n; j <- 0 until i) yield {
(i.toLong, j.toLong, sim(points(i), points(j)))
}
val sc = spark.sparkContext
val rdd = sc.parallelize(similarities)
.map{case (i: Long, j: Long, sim: Double) => Vectors.dense(Array(i, j, sim))}
.map(v => TestRow(v))
spark.createDataFrame(rdd)
}

/**
* Mapping from all Params to valid settings which differ from the defaults.
* This is useful for tests which need to exercise all Params, such as save/load.
* This excludes input columns to simplify some tests.
*/
val allParamSettings: Map[String, Any] = Map(
"predictionCol" -> "myPrediction",
"k" -> 2,
"maxIter" -> 10,
"initMode" -> "random"
)
}