Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,42 +50,47 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has

/**
* Param for the initialization algorithm. This can be either "random" to use a random vector
* as vertex properties, or "degree" to use normalized sum similarities. Default: random.
* as vertex properties, or "degree" to use a normalized sum of similarities with other vertices.
* Default: random.
* @group expertParam
*/
@Since("2.3.0")
final val initMode = {
val allowedParams = ParamValidators.inArray(Array("random", "degree"))
new Param[String](this, "initMode", "The initialization algorithm. " +
"Supported options: 'random' and 'degree'.", allowedParams)
new Param[String](this, "initMode", "The initialization algorithm. This can be either " +
"'random' to use a random vector as vertex properties, or 'degree' to use a normalized sum " +
"of similarities with other vertices. Supported options: 'random' and 'degree'.",
allowedParams)
}

/** @group expertGetParam */
@Since("2.3.0")
def getInitMode: String = $(initMode)

/**
* Param for the column name for ids returned by PowerIterationClustering.transform().
* Param for the name of the input column for vertex IDs.
* Default: "id"
* @group param
*/
@Since("2.3.0")
val idCol = new Param[String](this, "id", "column name for ids.")
val idCol = new Param[String](this, "idCol", "Name of the input column for vertex IDs.")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 2nd arg here should match the Param name exactly


/** @group getParam */
@Since("2.3.0")
def getIdCol: String = $(idCol)

/**
* Param for the column name for neighbors required by PowerIterationClustering.transform().
* Default: "neighbor"
* Param for the name of the input column for neighbors in the adjacency list representation.
* Default: "neighbors"
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this plural since each row holds multiple neighbors (not 1 neighbor)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the Param, getter & setter accordingly

* @group param
*/
@Since("2.3.0")
val neighborCol = new Param[String](this, "neighbor", "column name for neighbors.")
val neighborsCol = new Param[String](this, "neighborsCol",
"Name of the input column for neighbors in the adjacency list representation.")

/** @group getParam */
@Since("2.3.0")
def getNeighborCol: String = $(neighborCol)
def getNeighborsCol: String = $(neighborsCol)

/**
* Validates the input schema
Expand All @@ -104,8 +109,21 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has
* PIC finds a very low-dimensional embedding of a dataset using truncated power
* iteration on a normalized pair-wise similarity matrix of the data.
*
* Note that we implement [[PowerIterationClustering]] as a transformer. The [[transform]] is an
* expensive operation, because it uses PIC algorithm to cluster the whole input dataset.
* PIC takes an affinity matrix between items (or vertices) as input. An affinity matrix
* is a symmetric matrix whose entries are non-negative similarities between items.
* PIC takes this matrix (or graph) as an adjacency matrix. Specifically, each input row includes:
* - `idCol`: vertex ID
* - `neighborsCol`: neighbors of vertex in `idCol`
* - `neighborWeightsCol`: non-negative weights of edges between the vertex in `idCol` and
* each neighbor in `neighborsCol`
* PIC returns a cluster assignment for each input vertex. It appends a new column `predictionCol`
* containing the cluster assignment in `[0,k)` for each row (vertex).
*
* Notes:
* - [[PowerIterationClustering]] is a transformer with an expensive [[transform]] operation.
* Transform runs the iterative PIC algorithm to cluster the whole input dataset.
* - Input validation: This validates that weights are non-negative but does NOT validate
* that the input matrix is symmetric.
*
* @see <a href=http://en.wikipedia.org/wiki/Spectral_clustering>
* Spectral clustering (Wikipedia)</a>
Expand All @@ -122,7 +140,7 @@ class PowerIterationClustering private[clustering] (
initMode -> "random",
idCol -> "id",
weightCol -> "weight",
neighborCol -> "neighbor")
neighborsCol -> "neighbors")

@Since("2.3.0")
override def copy(extra: ParamMap): PowerIterationClustering = defaultCopy(extra)
Expand Down Expand Up @@ -164,24 +182,25 @@ class PowerIterationClustering private[clustering] (
def setWeightCol(value: String): this.type = set(weightCol, value)

/**
* Sets the value of param [[neighborCol]].
* Default is "neighbor"
* Sets the value of param [[neighborsCol]].
* Default is "neighbors"
*
* @group setParam
*/
@Since("2.3.0")
def setNeighborCol(value: String): this.type = set(neighborCol, value)
def setNeighborsCol(value: String): this.type = set(neighborsCol, value)

@Since("2.3.0")
override def transform(dataset: Dataset[_]): DataFrame = {
val sparkSession = dataset.sparkSession
val rdd: RDD[(Long, Long, Double)] =
dataset.select(col($(idCol)), col($(neighborCol)), col($(weightCol))).rdd.flatMap {
dataset.select(col($(idCol)), col($(neighborsCol)), col($(weightCol))).rdd.flatMap {
case Row(id: Long, nbr: Vector, weight: Vector) =>
require(nbr.size == weight.size,
"The length of neighbor list must be equal to the the length of the weight list.")
nbr.toArray.toIterator.zip(weight.toArray.toIterator)
.map(x => (id, x._1.toLong, x._2))}
require(nbr.size == weight.size,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needed indentation fix to match scala style guide

"The length of neighbor list must be equal to the the length of the weight list.")
nbr.toArray.toIterator.zip(weight.toArray.toIterator)
.map(x => (id, x._1.toLong, x._2))
}
val algorithm = new MLlibPowerIterationClustering()
.setK($(k))
.setInitializationMode($(initMode))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class PowerIterationClusteringSuite extends SparkFunSuite
assert(pic.getPredictionCol === "prediction")
assert(pic.getIdCol === "id")
assert(pic.getWeightCol === "weight")
assert(pic.getNeighborCol === "neighbor")
assert(pic.getNeighborsCol === "neighbors")
}

test("set parameters") {
Expand All @@ -63,7 +63,7 @@ class PowerIterationClusteringSuite extends SparkFunSuite
.setPredictionCol("test_prediction")
.setIdCol("test_id")
.setWeightCol("test_weight")
.setNeighborCol("test_neighbor")
.setNeighborsCol("test_neighbor")

assert(pic.getK === 9)
assert(pic.getMaxIter === 33)
Expand All @@ -72,7 +72,7 @@ class PowerIterationClusteringSuite extends SparkFunSuite
assert(pic.getPredictionCol === "test_prediction")
assert(pic.getIdCol === "test_id")
assert(pic.getWeightCol === "test_weight")
assert(pic.getNeighborCol === "test_neighbor")
assert(pic.getNeighborsCol === "test_neighbor")
}

test("parameters validation") {
Expand Down Expand Up @@ -129,7 +129,8 @@ class PowerIterationClusteringSuite extends SparkFunSuite

object PowerIterationClusteringSuite {

case class TestRow2(id: Long, neighbor: Vector, weight: Vector)
case class TestRow2(id: Long, neighbors: Vector, weight: Vector)

/** Generates a circle of points. */
private def genCircle(r: Double, n: Int): Array[(Double, Double)] = {
Array.tabulate(n) { i =>
Expand All @@ -144,27 +145,32 @@ object PowerIterationClusteringSuite {
math.exp(-dist2 / 2.0)
}

def generatePICData(spark: SparkSession, r1: Double, r2: Double,
n1: Int, n2: Int): DataFrame = {
def generatePICData(
spark: SparkSession,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

matching scala style guide

r1: Double,
r2: Double,
n1: Int,
n2: Int): DataFrame = {
// Generate two circles following the example in the PIC paper.
val n = n1 + n2
val points = genCircle(r1, n1) ++ genCircle(r2, n2)

val similarities = for (i <- 1 until n) yield {
val neighbor = for (j <- 0 until i) yield {
val neighbors = for (j <- 0 until i) yield {
j.toLong
}
val weight = for (j <- 0 until i) yield {
val weights = for (j <- 0 until i) yield {
sim(points(i), points(j))
}
(i.toLong, neighbor.toArray, weight.toArray)
(i.toLong, neighbors.toArray, weights.toArray)
}

val sc = spark.sparkContext

val rdd = sc.parallelize(similarities).map{
val rdd = sc.parallelize(similarities).map {
case (id: Long, nbr: Array[Long], weight: Array[Double]) =>
TestRow2(id, Vectors.dense(nbr.map(i => i.toDouble)), Vectors.dense(weight))}
TestRow2(id, Vectors.dense(nbr.map(i => i.toDouble)), Vectors.dense(weight))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: scala style

}
spark.createDataFrame(rdd)
}

Expand Down