From 9830342ee6087908300eee665d65c6d38b2a3f88 Mon Sep 17 00:00:00 2001 From: Harold Sultan Date: Wed, 28 May 2014 12:11:30 -0400 Subject: [PATCH 1/4] initial version of LPA --- .../org/apache/spark/graphx/lib/LPA.scala | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 graphx/src/main/scala/org/apache/spark/graphx/lib/LPA.scala diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/LPA.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/LPA.scala new file mode 100644 index 000000000000..5f89a72e10a4 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/LPA.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.lib + +import scala.reflect.ClassTag +import org.apache.spark.graphx._ + +/** LPA algorithm. */ +object LPA { + /** + * Run LPA (label propogation algorithm) for detecting communities in networks using the pregel framework. + * + * Each node in the network is initially assigned to its own community. At every super step + * nodes send their community affiliation to all neighbors and update their state to the mode + * community affiliation of incomming messages. + * + * LPA is a standard community detection algorithm for graphs. It is very inexpensive + * computationally, although (1) convergence is not guaranteed and (2) one can end up with + * trivial solutions (all nodes are identified into a single community). + * + * @tparam VD the vertex attribute type (discarded in the computation) + * @tparam ED the edge attribute type (not used in the computation) + * + * @param graph the graph for which to compute the community affiliation + * @param maxSteps the number of supersteps of LPA to be performed + * + * @return a graph with vertex attributes containing the label of community affiliation + */ + def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], maxSteps: Int): Graph[VertexId, Long]{ + val lpaGraph = graph.mapVertices { case (vid, _) => vid } + def sendMessage(edge: EdgeTriplet[VertexId, ED]) = { + Iterator((e.srcId, Map(e.dstAttr -> 1L)),(e.dstId, Map(e.srcAttr -> 1L))) + } + def mergeMessage(count1: Map[VertexId, Long], count2: Map[VertexId, Long]): Map[VertexId, Long] = { + (count1.keySet ++ count2.keySet).map { i => + val count1Val = count1.getOrElse(i,0L) + val count2Val = count2.getOrElse(i,0L) + i -> (count1Val +count2Val) + }.toMap + } + def vertexProgram(vid: VertexId, attr: Long, message: Map[VertexId, Long])={ + if (message.isEmpty) attr else message.maxBy{_._2}._1), + } + val initialMessage = Map[VertexId,Long]() + Pregel(lpaGraph, initialMessage, maxIterations = maxSteps)( + vprog = vertexProgram, + sendMsg = sendMessage, + mergeMsg = mergeMessage) + } +} From 84aa061514d46a7d334ffcaf134181711ee4d409 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Wed, 28 May 2014 11:43:50 -0700 Subject: [PATCH 2/4] LabelPropagation: Fix compile errors and style; rename from LPA --- .../lib/{LPA.scala => LabelPropagation.scala} | 43 ++++++++++--------- 1 file changed, 22 insertions(+), 21 deletions(-) rename graphx/src/main/scala/org/apache/spark/graphx/lib/{LPA.scala => LabelPropagation.scala} (66%) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/LPA.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala similarity index 66% rename from graphx/src/main/scala/org/apache/spark/graphx/lib/LPA.scala rename to graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala index 5f89a72e10a4..4745d4ef87c9 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/LPA.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala @@ -20,43 +20,44 @@ package org.apache.spark.graphx.lib import scala.reflect.ClassTag import org.apache.spark.graphx._ -/** LPA algorithm. */ -object LPA { +/** Label Propagation algorithm. */ +object LabelPropagation { /** - * Run LPA (label propogation algorithm) for detecting communities in networks using the pregel framework. - * - * Each node in the network is initially assigned to its own community. At every super step - * nodes send their community affiliation to all neighbors and update their state to the mode - * community affiliation of incomming messages. + * Run static Label Propagation for detecting communities in networks. * - * LPA is a standard community detection algorithm for graphs. It is very inexpensive + * Each node in the network is initially assigned to its own community. At every superstep, nodes + * send their community affiliation to all neighbors and update their state to the mode community + * affiliation of incoming messages. + * + * LPA is a standard community detection algorithm for graphs. It is very inexpensive * computationally, although (1) convergence is not guaranteed and (2) one can end up with * trivial solutions (all nodes are identified into a single community). * - * @tparam VD the vertex attribute type (discarded in the computation) * @tparam ED the edge attribute type (not used in the computation) * * @param graph the graph for which to compute the community affiliation - * @param maxSteps the number of supersteps of LPA to be performed + * @param maxSteps the number of supersteps of LPA to be performed. Because this is a static + * implementation, the algorithm will run for exactly this many supersteps. * * @return a graph with vertex attributes containing the label of community affiliation */ - def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], maxSteps: Int): Graph[VertexId, Long]{ + def run[ED: ClassTag](graph: Graph[_, ED], maxSteps: Int): Graph[VertexId, ED] = { val lpaGraph = graph.mapVertices { case (vid, _) => vid } - def sendMessage(edge: EdgeTriplet[VertexId, ED]) = { - Iterator((e.srcId, Map(e.dstAttr -> 1L)),(e.dstId, Map(e.srcAttr -> 1L))) + def sendMessage(e: EdgeTriplet[VertexId, ED]) = { + Iterator((e.srcId, Map(e.dstAttr -> 1L)), (e.dstId, Map(e.srcAttr -> 1L))) } - def mergeMessage(count1: Map[VertexId, Long], count2: Map[VertexId, Long]): Map[VertexId, Long] = { + def mergeMessage(count1: Map[VertexId, Long], count2: Map[VertexId, Long]) + : Map[VertexId, Long] = { (count1.keySet ++ count2.keySet).map { i => - val count1Val = count1.getOrElse(i,0L) - val count2Val = count2.getOrElse(i,0L) - i -> (count1Val +count2Val) - }.toMap + val count1Val = count1.getOrElse(i, 0L) + val count2Val = count2.getOrElse(i, 0L) + i -> (count1Val + count2Val) + }.toMap } - def vertexProgram(vid: VertexId, attr: Long, message: Map[VertexId, Long])={ - if (message.isEmpty) attr else message.maxBy{_._2}._1), + def vertexProgram(vid: VertexId, attr: Long, message: Map[VertexId, Long]) = { + if (message.isEmpty) attr else message.maxBy(_._2)._1 } - val initialMessage = Map[VertexId,Long]() + val initialMessage = Map[VertexId, Long]() Pregel(lpaGraph, initialMessage, maxIterations = maxSteps)( vprog = vertexProgram, sendMsg = sendMessage, From 0e24303190a38d4657fd468ca5c091a8e7f4c905 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Wed, 28 May 2014 12:18:17 -0700 Subject: [PATCH 3/4] Add LabelPropagationSuite --- .../graphx/lib/LabelPropagationSuite.scala | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 graphx/src/test/scala/org/apache/spark/graphx/lib/LabelPropagationSuite.scala diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/LabelPropagationSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/LabelPropagationSuite.scala new file mode 100644 index 000000000000..61fd0c460556 --- /dev/null +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/LabelPropagationSuite.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.lib + +import org.scalatest.FunSuite + +import org.apache.spark.graphx._ + +class LabelPropagationSuite extends FunSuite with LocalSparkContext { + test("Label Propagation") { + withSpark { sc => + // Construct a graph with two cliques connected by a single edge + val n = 5 + val clique1 = for (u <- 0L until n; v <- 0L until n) yield Edge(u, v, 1) + val clique2 = for (u <- 0L to n; v <- 0L to n) yield Edge(u + n, v + n, 1) + val twoCliques = sc.parallelize(clique1 ++ clique2 :+ Edge(0L, n, 1)) + val graph = Graph.fromEdges(twoCliques, 1) + // Run label propagation + val labels = LabelPropagation.run(graph, n * 4).cache() + + // All vertices within a clique should have the same label + val clique1Labels = labels.vertices.filter(_._1 < n).map(_._2).collect.toArray + assert(clique1Labels.forall(_ == clique1Labels(0))) + val clique2Labels = labels.vertices.filter(_._1 >= n).map(_._2).collect.toArray + assert(clique2Labels.forall(_ == clique2Labels(0))) + // The two cliques should have different labels + assert(clique1Labels(0) != clique2Labels(0)) + } + } +} From 227a4d0c80da6408134329774017837b012982e4 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Wed, 28 May 2014 17:28:35 -0700 Subject: [PATCH 4/4] Untabify --- .../scala/org/apache/spark/graphx/lib/LabelPropagation.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala index 4745d4ef87c9..776bfb8dd6bf 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala @@ -51,8 +51,8 @@ object LabelPropagation { (count1.keySet ++ count2.keySet).map { i => val count1Val = count1.getOrElse(i, 0L) val count2Val = count2.getOrElse(i, 0L) - i -> (count1Val + count2Val) - }.toMap + i -> (count1Val + count2Val) + }.toMap } def vertexProgram(vid: VertexId, attr: Long, message: Map[VertexId, Long]) = { if (message.isEmpty) attr else message.maxBy(_._2)._1