From b027d0e93de638784bf8fe7736a8cc03d2bf6e47 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Tue, 3 Apr 2018 16:46:40 -0700 Subject: [PATCH 1/2] cleanups to docs --- .../clustering/PowerIterationClustering.scala | 59 ++++++++++++------- .../PowerIterationClusteringSuite.scala | 28 +++++---- 2 files changed, 56 insertions(+), 31 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala index a255ac4adc30..24a8ca7dc4d4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala @@ -50,13 +50,17 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has /** * Param for the initialization algorithm. This can be either "random" to use a random vector - * as vertex properties, or "degree" to use normalized sum similarities. Default: random. + * as vertex properties, or "degree" to use normalized sum of similarities with other vertices. + * Default: random. + * @group expertParam */ @Since("2.3.0") final val initMode = { val allowedParams = ParamValidators.inArray(Array("random", "degree")) - new Param[String](this, "initMode", "The initialization algorithm. " + - "Supported options: 'random' and 'degree'.", allowedParams) + new Param[String](this, "initMode", "The initialization algorithm. This can be either " + + "'random' to use a random vector as vertex properties, or 'degree' to use normalized sum " + + "of similarities with other vertices. Supported options: 'random' and 'degree'.", + allowedParams) } /** @group expertGetParam */ @@ -64,28 +68,29 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has def getInitMode: String = $(initMode) /** - * Param for the column name for ids returned by PowerIterationClustering.transform(). + * Param for the name of the input column for vertex IDs. * Default: "id" * @group param */ @Since("2.3.0") - val idCol = new Param[String](this, "id", "column name for ids.") + val idCol = new Param[String](this, "idCol", "Name of the input column for vertex IDs.") /** @group getParam */ @Since("2.3.0") def getIdCol: String = $(idCol) /** - * Param for the column name for neighbors required by PowerIterationClustering.transform(). - * Default: "neighbor" + * Param for the name of the input column for neighbors in the adjacency list representation. + * Default: "neighbors" * @group param */ @Since("2.3.0") - val neighborCol = new Param[String](this, "neighbor", "column name for neighbors.") + val neighborsCol = new Param[String](this, "neighborsCol", + "Name of the input column for neighbors in the adjacency list representation.") /** @group getParam */ @Since("2.3.0") - def getNeighborCol: String = $(neighborCol) + def getNeighborsCol: String = $(neighborsCol) /** * Validates the input schema @@ -104,8 +109,21 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has * PIC finds a very low-dimensional embedding of a dataset using truncated power * iteration on a normalized pair-wise similarity matrix of the data. * - * Note that we implement [[PowerIterationClustering]] as a transformer. The [[transform]] is an - * expensive operation, because it uses PIC algorithm to cluster the whole input dataset. + * PIC takes an affinity matrix between items (or vertices) as input. An affinity matrix + * is a symmetric matrix whose entries are non-negative similarities between items. + * PIC takes this matrix (or graph) as an adjacency matrix. Specifically, each input row includes: + * - `idCol`: vertex ID + * - `neighborsCol`: neighbors of vertex in `idCol` + * - `neighborWeightsCol`: non-negative weights of edges between the vertex in `idCol` and + * each neighbor in `neighborsCol` + * PIC returns a cluster assignment for each input vertex. It appends a new column `predictionCol` + * containing the cluster assignment in `[0,k)` for each row (vertex). + * + * Notes: + * - [[PowerIterationClustering]] is a transformer with an expensive [[transform]] operation. + * Transform runs the iterative PIC algorithm to cluster the whole input dataset. + * - Input validation: This validates that weights are non-negative but does NOT validate + * that the input matrix is symmetric. * * @see * Spectral clustering (Wikipedia) @@ -122,7 +140,7 @@ class PowerIterationClustering private[clustering] ( initMode -> "random", idCol -> "id", weightCol -> "weight", - neighborCol -> "neighbor") + neighborsCol -> "neighbors") @Since("2.3.0") override def copy(extra: ParamMap): PowerIterationClustering = defaultCopy(extra) @@ -164,24 +182,25 @@ class PowerIterationClustering private[clustering] ( def setWeightCol(value: String): this.type = set(weightCol, value) /** - * Sets the value of param [[neighborCol]]. - * Default is "neighbor" + * Sets the value of param [[neighborsCol]]. + * Default is "neighbors" * * @group setParam */ @Since("2.3.0") - def setNeighborCol(value: String): this.type = set(neighborCol, value) + def setNeighborsCol(value: String): this.type = set(neighborsCol, value) @Since("2.3.0") override def transform(dataset: Dataset[_]): DataFrame = { val sparkSession = dataset.sparkSession val rdd: RDD[(Long, Long, Double)] = - dataset.select(col($(idCol)), col($(neighborCol)), col($(weightCol))).rdd.flatMap { + dataset.select(col($(idCol)), col($(neighborsCol)), col($(weightCol))).rdd.flatMap { case Row(id: Long, nbr: Vector, weight: Vector) => - require(nbr.size == weight.size, - "The length of neighbor list must be equal to the the length of the weight list.") - nbr.toArray.toIterator.zip(weight.toArray.toIterator) - .map(x => (id, x._1.toLong, x._2))} + require(nbr.size == weight.size, + "The length of neighbor list must be equal to the the length of the weight list.") + nbr.toArray.toIterator.zip(weight.toArray.toIterator) + .map(x => (id, x._1.toLong, x._2)) + } val algorithm = new MLlibPowerIterationClustering() .setK($(k)) .setInitializationMode($(initMode)) diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala index 1c7f01b0dfdf..2d368837ab50 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala @@ -51,7 +51,7 @@ class PowerIterationClusteringSuite extends SparkFunSuite assert(pic.getPredictionCol === "prediction") assert(pic.getIdCol === "id") assert(pic.getWeightCol === "weight") - assert(pic.getNeighborCol === "neighbor") + assert(pic.getNeighborsCol === "neighbors") } test("set parameters") { @@ -63,7 +63,7 @@ class PowerIterationClusteringSuite extends SparkFunSuite .setPredictionCol("test_prediction") .setIdCol("test_id") .setWeightCol("test_weight") - .setNeighborCol("test_neighbor") + .setNeighborsCol("test_neighbor") assert(pic.getK === 9) assert(pic.getMaxIter === 33) @@ -72,7 +72,7 @@ class PowerIterationClusteringSuite extends SparkFunSuite assert(pic.getPredictionCol === "test_prediction") assert(pic.getIdCol === "test_id") assert(pic.getWeightCol === "test_weight") - assert(pic.getNeighborCol === "test_neighbor") + assert(pic.getNeighborsCol === "test_neighbor") } test("parameters validation") { @@ -129,7 +129,8 @@ class PowerIterationClusteringSuite extends SparkFunSuite object PowerIterationClusteringSuite { - case class TestRow2(id: Long, neighbor: Vector, weight: Vector) + case class TestRow2(id: Long, neighbors: Vector, weight: Vector) + /** Generates a circle of points. */ private def genCircle(r: Double, n: Int): Array[(Double, Double)] = { Array.tabulate(n) { i => @@ -144,27 +145,32 @@ object PowerIterationClusteringSuite { math.exp(-dist2 / 2.0) } - def generatePICData(spark: SparkSession, r1: Double, r2: Double, - n1: Int, n2: Int): DataFrame = { + def generatePICData( + spark: SparkSession, + r1: Double, + r2: Double, + n1: Int, + n2: Int): DataFrame = { // Generate two circles following the example in the PIC paper. val n = n1 + n2 val points = genCircle(r1, n1) ++ genCircle(r2, n2) val similarities = for (i <- 1 until n) yield { - val neighbor = for (j <- 0 until i) yield { + val neighbors = for (j <- 0 until i) yield { j.toLong } - val weight = for (j <- 0 until i) yield { + val weights = for (j <- 0 until i) yield { sim(points(i), points(j)) } - (i.toLong, neighbor.toArray, weight.toArray) + (i.toLong, neighbors.toArray, weights.toArray) } val sc = spark.sparkContext - val rdd = sc.parallelize(similarities).map{ + val rdd = sc.parallelize(similarities).map { case (id: Long, nbr: Array[Long], weight: Array[Double]) => - TestRow2(id, Vectors.dense(nbr.map(i => i.toDouble)), Vectors.dense(weight))} + TestRow2(id, Vectors.dense(nbr.map(i => i.toDouble)), Vectors.dense(weight)) + } spark.createDataFrame(rdd) } From f4d115c8a2c53bbdf41630a841522369fcd3e4b2 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Tue, 3 Apr 2018 16:52:36 -0700 Subject: [PATCH 2/2] typo --- .../apache/spark/ml/clustering/PowerIterationClustering.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala index 24a8ca7dc4d4..4ce56c0a67fe 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/PowerIterationClustering.scala @@ -50,7 +50,7 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has /** * Param for the initialization algorithm. This can be either "random" to use a random vector - * as vertex properties, or "degree" to use normalized sum of similarities with other vertices. + * as vertex properties, or "degree" to use a normalized sum of similarities with other vertices. * Default: random. * @group expertParam */ @@ -58,7 +58,7 @@ private[clustering] trait PowerIterationClusteringParams extends Params with Has final val initMode = { val allowedParams = ParamValidators.inArray(Array("random", "degree")) new Param[String](this, "initMode", "The initialization algorithm. This can be either " + - "'random' to use a random vector as vertex properties, or 'degree' to use normalized sum " + + "'random' to use a random vector as vertex properties, or 'degree' to use a normalized sum " + "of similarities with other vertices. Supported options: 'random' and 'degree'.", allowedParams) }