Skip to content

Commit e4fbd32

Browse files
committed
Unify GraphImpl RDDs + other graph load optimizations
This commit makes the following changes: 1. *Unify RDDs to avoid zipPartitions.* A graph used to be four RDDs: vertices, edges, routing table, and triplet view. This commit merges them down to two: vertices (with routing table), and edges (with replicated vertices). 2. *Avoid duplicate shuffle in graph building.* We used to do two shuffles when building a graph: one to extract routing information from the edges and move it to the vertices, and another to find nonexistent vertices referred to by edges. With this commit, the latter is done as a side effect of the former. 3. *Avoid no-op shuffle when joins are fully eliminated.* This is a side effect of unifying the edges and the triplet view. 4. *Join elimination for mapTriplets.* 5. *Ship only the needed vertex attributes when upgrading the triplet view.* If the triplet view already contains source attributes, and we now need both attributes, only ship destination attributes rather than re-shipping both. This is done in `ReplicatedVertexView#upgrade`.
1 parent d6d60e2 commit e4fbd32

24 files changed

+1299
-841
lines changed

graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala

Lines changed: 38 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,19 @@ package org.apache.spark.graphx
2020
import scala.reflect.{classTag, ClassTag}
2121

2222
import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext}
23-
import org.apache.spark.graphx.impl.EdgePartition
2423
import org.apache.spark.rdd.RDD
2524
import org.apache.spark.storage.StorageLevel
2625

26+
import org.apache.spark.graphx.impl.EdgePartition
27+
2728
/**
28-
* `EdgeRDD[ED]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each partition
29-
* for performance.
29+
* `EdgeRDD[ED, VD]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each
30+
* partition for performance. It may additionally store the vertex attributes associated with each
31+
* edge to provide the triplet view. Shipping of the vertex attributes is managed by
32+
* [[org.apache.spark.graphx.impl.ReplicatedVertexView]].
3033
*/
31-
class EdgeRDD[@specialized ED: ClassTag](
32-
val partitionsRDD: RDD[(PartitionID, EdgePartition[ED])])
34+
class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
35+
val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])])
3336
extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
3437

3538
partitionsRDD.setName("EdgeRDD")
@@ -45,33 +48,41 @@ class EdgeRDD[@specialized ED: ClassTag](
4548
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))
4649

4750
override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
48-
val p = firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context)
49-
p.next._2.iterator.map(_.copy())
51+
val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context)
52+
if (p.hasNext) {
53+
p.next._2.iterator.map(_.copy())
54+
} else {
55+
Iterator.empty
56+
}
5057
}
5158

5259
override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
5360

54-
override def persist(newLevel: StorageLevel): EdgeRDD[ED] = {
61+
override def persist(newLevel: StorageLevel): EdgeRDD[ED, VD] = {
5562
partitionsRDD.persist(newLevel)
5663
this
5764
}
5865

5966
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
60-
override def persist(): EdgeRDD[ED] = persist(StorageLevel.MEMORY_ONLY)
67+
override def persist(): EdgeRDD[ED, VD] = persist(StorageLevel.MEMORY_ONLY)
6168

6269
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
63-
override def cache(): EdgeRDD[ED] = persist()
70+
override def cache(): EdgeRDD[ED, VD] = persist()
6471

65-
override def unpersist(blocking: Boolean = true): EdgeRDD[ED] = {
72+
override def unpersist(blocking: Boolean = true): EdgeRDD[ED, VD] = {
6673
partitionsRDD.unpersist(blocking)
6774
this
6875
}
6976

70-
private[graphx] def mapEdgePartitions[ED2: ClassTag](
71-
f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2]): EdgeRDD[ED2] = {
72-
new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter =>
73-
val (pid, ep) = iter.next()
74-
Iterator(Tuple2(pid, f(pid, ep)))
77+
private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
78+
f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = {
79+
new EdgeRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
80+
if (iter.hasNext) {
81+
val (pid, ep) = iter.next()
82+
Iterator(Tuple2(pid, f(pid, ep)))
83+
} else {
84+
Iterator.empty
85+
}
7586
}, preservesPartitioning = true))
7687
}
7788

@@ -82,15 +93,21 @@ class EdgeRDD[@specialized ED: ClassTag](
8293
* @param f the function from an edge to a new edge value
8394
* @return a new EdgeRDD containing the new edge values
8495
*/
85-
def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2] =
96+
def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2, VD] =
8697
mapEdgePartitions((pid, part) => part.map(f))
8798

8899
/**
89100
* Reverse all the edges in this RDD.
90101
*
91102
* @return a new EdgeRDD containing all the edges reversed
92103
*/
93-
def reverse: EdgeRDD[ED] = mapEdgePartitions((pid, part) => part.reverse)
104+
def reverse: EdgeRDD[ED, VD] = mapEdgePartitions((pid, part) => part.reverse)
105+
106+
def filter(
107+
epred: EdgeTriplet[VD, ED] => Boolean,
108+
vpred: (VertexId, VD) => Boolean): EdgeRDD[ED, VD] = {
109+
mapEdgePartitions((pid, part) => part.filter(epred, vpred))
110+
}
94111

95112
/**
96113
* Inner joins this EdgeRDD with another EdgeRDD, assuming both are partitioned using the same
@@ -102,19 +119,15 @@ class EdgeRDD[@specialized ED: ClassTag](
102119
* with values supplied by `f`
103120
*/
104121
def innerJoin[ED2: ClassTag, ED3: ClassTag]
105-
(other: EdgeRDD[ED2])
106-
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3] = {
122+
(other: EdgeRDD[ED2, _])
123+
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = {
107124
val ed2Tag = classTag[ED2]
108125
val ed3Tag = classTag[ED3]
109-
new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
126+
new EdgeRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
110127
(thisIter, otherIter) =>
111128
val (pid, thisEPart) = thisIter.next()
112129
val (_, otherEPart) = otherIter.next()
113130
Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
114131
})
115132
}
116-
117-
private[graphx] def collectVertexIds(): RDD[VertexId] = {
118-
partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) }
119-
}
120133
}

graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,6 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
6363
if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr }
6464

6565
override def toString = ((srcId, srcAttr), (dstId, dstAttr), attr).toString()
66+
67+
def toTuple = ((srcId, srcAttr), (dstId, dstAttr), attr)
6668
}

graphx/src/main/scala/org/apache/spark/graphx/Graph.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
5959
* along with their vertex data.
6060
*
6161
*/
62-
val edges: EdgeRDD[ED]
62+
val edges: EdgeRDD[ED, VD]
6363

6464
/**
6565
* An RDD containing the edge triplets, which are edges along with the vertex data associated with

graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@ package org.apache.spark.graphx
1919

2020
import com.esotericsoftware.kryo.Kryo
2121

22-
import org.apache.spark.graphx.impl._
2322
import org.apache.spark.serializer.KryoRegistrator
24-
import org.apache.spark.util.collection.BitSet
2523
import org.apache.spark.util.BoundedPriorityQueue
24+
import org.apache.spark.util.collection.BitSet
25+
26+
import org.apache.spark.graphx.impl._
2627

2728
/**
2829
* Registers GraphX classes with Kryo for improved performance.
@@ -33,8 +34,9 @@ class GraphKryoRegistrator extends KryoRegistrator {
3334
kryo.register(classOf[Edge[Object]])
3435
kryo.register(classOf[MessageToPartition[Object]])
3536
kryo.register(classOf[VertexBroadcastMsg[Object]])
37+
kryo.register(classOf[RoutingTableMessage])
3638
kryo.register(classOf[(VertexId, Object)])
37-
kryo.register(classOf[EdgePartition[Object]])
39+
kryo.register(classOf[EdgePartition[Object, Object]])
3840
kryo.register(classOf[BitSet])
3941
kryo.register(classOf[VertexIdToIndexMap])
4042
kryo.register(classOf[VertexAttributeBlock[Object]])

graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,7 @@ object GraphLoader extends Logging {
4747
* @param path the path to the file (e.g., /home/data/file or hdfs://file)
4848
* @param canonicalOrientation whether to orient edges in the positive
4949
* direction
50-
* @param minEdgePartitions the number of partitions for the
51-
* the edge RDD
50+
* @param minEdgePartitions the number of partitions for the edge RDD
5251
*/
5352
def edgeListFile(
5453
sc: SparkContext,
@@ -60,8 +59,9 @@ object GraphLoader extends Logging {
6059
val startTime = System.currentTimeMillis
6160

6261
// Parse the edge data table directly into edge partitions
63-
val edges = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions).mapPartitionsWithIndex { (pid, iter) =>
64-
val builder = new EdgePartitionBuilder[Int]
62+
val lines = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions)
63+
val edges = lines.mapPartitionsWithIndex { (pid, iter) =>
64+
val builder = new EdgePartitionBuilder[Int, Int]
6565
iter.foreach { line =>
6666
if (!line.isEmpty && line(0) != '#') {
6767
val lineArray = line.split("\\s+")
@@ -78,7 +78,7 @@ object GraphLoader extends Logging {
7878
}
7979
}
8080
Iterator((pid, builder.toEdgePartition))
81-
}.cache()
81+
}.cache().setName("GraphLoader.edgeListFile - edges (%s)".format(path))
8282
edges.count()
8383

8484
logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))

graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
package org.apache.spark.graphx
1919

2020
import scala.reflect.ClassTag
21-
import org.apache.spark.SparkContext._
21+
import scala.util.Random
22+
2223
import org.apache.spark.SparkException
23-
import org.apache.spark.graphx.lib._
24+
import org.apache.spark.SparkContext._
2425
import org.apache.spark.rdd.RDD
25-
import scala.util.Random
26+
27+
import org.apache.spark.graphx.lib._
2628

2729
/**
2830
* Contains additional functionality for [[Graph]]. All operations are expressed in terms of the
@@ -43,19 +45,19 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
4345
* The in-degree of each vertex in the graph.
4446
* @note Vertices with no in-edges are not returned in the resulting RDD.
4547
*/
46-
lazy val inDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.In)
48+
lazy val inDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.In).setName("GraphOps.inDegrees")
4749

4850
/**
4951
* The out-degree of each vertex in the graph.
5052
* @note Vertices with no out-edges are not returned in the resulting RDD.
5153
*/
52-
lazy val outDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Out)
54+
lazy val outDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Out).setName("GraphOps.outDegrees")
5355

5456
/**
5557
* The degree of each vertex in the graph.
5658
* @note Vertices with no edges are not returned in the resulting RDD.
5759
*/
58-
lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Either)
60+
lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Either).setName("GraphOps.degrees")
5961

6062
/**
6163
* Computes the neighboring vertex degrees.

0 commit comments

Comments
 (0)