1- // Databricks notebook source exported at Fri, 11 Nov 2016 03:20:18 UTC
1+ // Databricks notebook source exported at Sat, 3 Dec 2016 14:00:12 UTC
22// MAGIC %md
33// MAGIC # Extending spark.graphx.lib.ShortestPaths to GraphXShortestWeightedPaths
44// MAGIC
@@ -118,7 +118,8 @@ val graph: Graph[Long, Double] = GraphGenerators.logNormalGraph(sc, numVertices
118118
119119// COMMAND ----------
120120
121- val result = GraphXShortestWeightedPaths .run(graph, Seq (4L , 0L , 9L ))
121+ val landMarkVertexIds = Seq (4L , 0L , 9L )
122+ val result = GraphXShortestWeightedPaths .run(graph, landMarkVertexIds)
122123
123124// COMMAND ----------
124125
@@ -128,4 +129,138 @@ println(result.vertices.collect.mkString("\n"))
128129// COMMAND ----------
129130
130131// edges with weights, make sure to check couple of shortest paths from above
131- display(result.edges.toDF)
132+ display(result.edges.toDF)
133+
134+ // COMMAND ----------
135+
136+ display(graph.edges.toDF) // this is the directed weighted edge of the graph
137+
138+ // COMMAND ----------
139+
140+ // now let us collect the shortest distance between every vertex and every landmark vertex
141+ // to manipulate scala maps that are vertices of the result see: http://docs.scala-lang.org/overviews/collections/maps.html
142+ // a quick point: http://stackoverflow.com/questions/28769367/scala-map-a-map-to-list-of-tuples
143+ val shortestDistsVertex2Landmark = result.vertices.flatMap(GxSwpSPMap => {
144+ GxSwpSPMap ._2.toSeq.map(x => (GxSwpSPMap ._1, x._1, x._2)) // to get triples: vertex, landmarkVertex, shortest_distance
145+ })
146+
147+ // COMMAND ----------
148+
149+ shortestDistsVertex2Landmark.collect.mkString(" \n " )
150+
151+ // COMMAND ----------
152+
153+ // MAGIC %md
154+ // MAGIC #### Let's make a DataFrame for visualizing pairwise matrix plots
155+ // MAGIC
156+ // MAGIC We want to make 4 columns in this example as follows (note actual values change for each realisation of graph!):
157+ // MAGIC
158+ // MAGIC ```
159+ // MAGIC landmark_Id1 ("0"), landmarkID2 ("4"), landmarkId3 ("9"), srcVertexId
160+ // MAGIC ------------------------------------------------------------------------
161+ // MAGIC 0.0, 0.7425.., 0.8718, 0
162+ // MAGIC 0.924..., 1.2464.., 1.0472, 1
163+ // MAGIC ...
164+ // MAGIC ```
165+
166+ // COMMAND ----------
167+
168+ // http://alvinalexander.com/scala/how-to-sort-map-in-scala-key-value-sortby-sortwith
169+ // we need this to make sure that the maps are ordered by the keys for ensuring unique column values
170+ import scala .collection .immutable .ListMap
171+ import sqlContext .implicits ._
172+
173+ // COMMAND ----------
174+
175+ // recall our landmark vertices in landMarkVertexIds. let's use their Strings for names
176+ val unorderedNamedLandmarkVertices = landMarkVertexIds.map(id => (id, id.toString) )
177+ val orderedNamedLandmarkVertices = ListMap (unorderedNamedLandmarkVertices.sortBy(_._1):_* )
178+ val orderedLandmarkVertexNames = orderedNamedLandmarkVertices.toSeq.map(x => x._2)
179+ orderedLandmarkVertexNames.mkString(" , " )
180+
181+ // COMMAND ----------
182+
183+ // this is going to be our column names
184+ val columnNames : Seq [String ] = orderedLandmarkVertexNames :+ " srcVertexId"
185+
186+ // COMMAND ----------
187+
188+ // a case class to make a data-frame quickly from the result
189+ case class SeqOfDoublesAndsrcVertexId (shortestDistances : Seq [Double ], srcVertexId : VertexId )
190+
191+ // COMMAND ----------
192+
193+ val shortestDistsSeqFromVertex2Landmark2DF = result.vertices.map(GxSwpSPMap => {
194+ // GxSwpSPMap._2.toSeq.map(x => (GxSwpSPMap._1, x._1, x._2)) // from before to get triples: vertex, landmarkVertex, shortest_distance
195+ val v = GxSwpSPMap ._1
196+ val a = ListMap (GxSwpSPMap ._2.toSeq.sortBy(_._1):_* ).toSeq.map(x => x._2)
197+ val d = (a,v)
198+ d
199+ }).map(x => SeqOfDoublesAndsrcVertexId (x._1, x._2)).toDF()
200+
201+ // COMMAND ----------
202+
203+ display(shortestDistsSeqFromVertex2Landmark2DF) // but this dataframe needs the first column exploded into 3 columns
204+
205+ // COMMAND ----------
206+
207+ // MAGIC %md
208+ // MAGIC Now we want to make separate columns for each distance in the Sequence in column 'shortestDistances'.
209+ // MAGIC
210+ // MAGIC Let us use the following ideas for this:
211+ // MAGIC * https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/3741049972324885/2662535171379268/4413065072037724/latest.html
212+
213+ // COMMAND ----------
214+
215+ // this is from https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/3741049972324885/2662535171379268/4413065072037724/latest.html
216+ import org .apache .spark .sql .{Column , DataFrame }
217+ import org .apache .spark .sql .functions .{lit , udf }
218+
219+ // UDF to extract i-th element from array column
220+ // val elem = udf((x: Seq[Int], y: Int) => x(y))
221+ val elem = udf((x : Seq [Double ], y : Int ) => x(y)) // modified for Sequence of Doubles
222+
223+ // Method to apply 'elem' UDF on each element, requires knowing length of sequence in advance
224+ def split (col : Column , len : Int ): Seq [Column ] = {
225+ for (i <- 0 until len) yield { elem(col, lit(i)).as(s " $col( $i) " ) }
226+ }
227+
228+ // Implicit conversion to make things nicer to use, e.g.
229+ // select(Column, Seq[Column], Column) is converted into select(Column*) flattening sequences
230+ implicit class DataFrameSupport (df : DataFrame ) {
231+ def select (cols : Any * ): DataFrame = {
232+ var buffer : Seq [Column ] = Seq .empty
233+ for (col <- cols) {
234+ if (col.isInstanceOf [Seq [_]]) {
235+ buffer = buffer ++ col.asInstanceOf [Seq [Column ]]
236+ } else {
237+ buffer = buffer :+ col.asInstanceOf [Column ]
238+ }
239+ }
240+ df.select(buffer:_* )
241+ }
242+ }
243+
244+ // COMMAND ----------
245+
246+ val shortestDistsFromVertex2Landmark2DF = shortestDistsSeqFromVertex2Landmark2DF.select(split($" shortestDistances" , 3 ), $" srcVertexId" )
247+
248+ // COMMAND ----------
249+
250+ display(shortestDistsFromVertex2Landmark2DF)
251+
252+ // COMMAND ----------
253+
254+ // now let's give it our names based on the landmark vertex Ids
255+ val shortestDistsFromVertex2Landmark2DF = shortestDistsSeqFromVertex2Landmark2DF.select(split($" shortestDistances" , 3 ), $" srcVertexId" ).toDF(columnNames:_* )
256+
257+ // COMMAND ----------
258+
259+ display(shortestDistsFromVertex2Landmark2DF)
260+
261+ // COMMAND ----------
262+
263+ display(shortestDistsFromVertex2Landmark2DF.select($" 0" ,$" 4" ,$" 9" ))
264+
265+ // COMMAND ----------
266+
0 commit comments