-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-11496][GRAPHX] Parallel implementation of personalized pagerank #9457
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
f41975e
3605e40
8b34e5c
508ba45
09d31c8
8506353
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,6 +26,8 @@ import org.apache.spark.rdd.RDD | |
|
|
||
| import org.apache.spark.graphx.lib._ | ||
|
|
||
| import breeze.linalg.SparseVector | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. import breeze.linalg.{SparseVector => BSV} |
||
| /** | ||
| * Contains additional functionality for [[Graph]]. All operations are expressed in terms of the | ||
| * efficient GraphX API. This class is implicitly constructed for each Graph object. | ||
|
|
@@ -384,6 +386,16 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali | |
| PageRank.runUntilConvergenceWithOptions(graph, tol, resetProb, Some(src)) | ||
| } | ||
|
|
||
| /** | ||
| * Run parallel personalized PageRank for a given array of source vertices, such | ||
| * that all random walks are started relative to the source vertices | ||
| */ | ||
| def staticParallelPersonalizedPageRank(sources : Array[VertexId], numIter: Int, | ||
| resetProb: Double = 0.15) : Graph[SparseVector[Double], Double] = { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No space right before
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also |
||
| PageRank.runParallelPersonalizedPageRank(graph, numIter, resetProb, sources) | ||
| } | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove extra line. |
||
|
|
||
| /** | ||
| * Run Personalized PageRank for a fixed number of iterations with | ||
| * with all iterations originating at the source node | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,8 @@ import scala.language.postfixOps | |
| import org.apache.spark.Logging | ||
| import org.apache.spark.graphx._ | ||
|
|
||
| import breeze.linalg.SparseVector | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto. |
||
|
|
||
| /** | ||
| * PageRank algorithm implementation. There are two implementations of PageRank implemented. | ||
| * | ||
|
|
@@ -158,6 +160,83 @@ object PageRank extends Logging { | |
| rankGraph | ||
| } | ||
|
|
||
| /** | ||
| * Run Personalized PageRank for a fixed number of iterations, for a | ||
| * set of starting nodes in parallel. Returns a graph with vertex attributes | ||
| * containing the pagerank relative to all starting nodes (as a sparse vector) and | ||
| * edge attributes the normalized edge weight | ||
| * | ||
| * @tparam VD The original vertex attribute (not used) | ||
| * @tparam ED The original edge attribute (not used) | ||
| * | ||
| * @param graph The graph on which to compute personalized pagerank | ||
| * @param numIter The number of iterations to run | ||
| * @param resetProb The random reset probability | ||
| * @param sources The list of sources to compute personalized pagerank from | ||
| * @return the graph with vertex attributes | ||
| * containing the pagerank relative to all starting nodes (as a sparse vector) and | ||
| * edge attributes the normalized edge weight | ||
| */ | ||
| def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], | ||
| numIter: Int, resetProb: Double = 0.15, | ||
| sources : Array[VertexId]): Graph[SparseVector[Double], Double] = | ||
| { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move |
||
| // TODO if one sources vertex id is outside of the int range | ||
| // we won't be able to store its activations in a sparse vector | ||
| val zero = new SparseVector[Double](Array(), Array(), sources.size) | ||
| val sourcesInitMap = sources.zipWithIndex.map{case (vid, i) => { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should be val sourcesInitMap = sources.zipWithIndex.map { case (vid, i) =>
val v = new SparseVector[Double](Array(i), Array(resetProb), sources.size)
(vid, v)
}.toMap |
||
| val v = new SparseVector[Double](Array(i), Array(resetProb), sources.size) | ||
| (vid, v) | ||
| }}.toMap | ||
| val sc = graph.vertices.sparkContext | ||
| val sourcesInitMapBC = sc.broadcast(sourcesInitMap) | ||
| // Initialize the PageRank graph with each edge attribute having | ||
| // weight 1/outDegree and each source vertex with attribute 1.0. | ||
| var rankGraph = graph | ||
| // Associate the degree with each vertex | ||
| .outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) } | ||
| // Set the weight on the edges based on the degree | ||
| .mapTriplets( e => 1.0 / e.srcAttr, TripletFields.Src ) | ||
| .mapVertices( (vid, attr) => { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto. |
||
| if (sourcesInitMapBC.value contains vid) { | ||
| sourcesInitMapBC.value(vid) | ||
| } else { | ||
| zero | ||
| } | ||
| }) | ||
|
|
||
| var i = 0 | ||
| while (i < numIter) { | ||
| val prevRankGraph = rankGraph | ||
| // Propagates the message along outbound edges | ||
| // and adding start nodes back in with activation resetProb | ||
| val rankUpdates = rankGraph.aggregateMessages[SparseVector[Double]]( | ||
| ctx => ctx.sendToDst(ctx.srcAttr :* ctx.attr), | ||
| (a : SparseVector[Double], b : SparseVector[Double]) => a :+ b, TripletFields.Src) | ||
|
|
||
| rankGraph = rankGraph.joinVertices(rankUpdates) { | ||
| (vid, oldRank, msgSum) => { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| val popActivations : SparseVector[Double] = msgSum :* (1.0 - resetProb) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. extra space before |
||
| val resetActivations = if (sourcesInitMapBC.value contains vid) { | ||
| sourcesInitMapBC.value(vid) | ||
| } else { | ||
| zero | ||
| } | ||
| popActivations :+ resetActivations | ||
| }}.cache() | ||
|
|
||
| rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices | ||
| prevRankGraph.vertices.unpersist(false) | ||
| prevRankGraph.edges.unpersist(false) | ||
|
|
||
| logInfo(s"Parallel Personalized PageRank finished iteration $i.") | ||
|
|
||
| i += 1 | ||
| } | ||
|
|
||
| rankGraph | ||
| } | ||
|
|
||
| /** | ||
| * Run a dynamic version of PageRank returning a graph with vertex attributes containing the | ||
| * PageRank and edge attributes containing the normalized edge weight. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since mllib depneds on graphx, please remove the breeze dependencies in mllib.