Skip to content

Commit 9ac2bb1

Browse files
ankurdaverxin
authored andcommitted
[SPARK-4444] Drop VD type parameter from EdgeRDD
Due to vertex attribute caching, EdgeRDD previously took two type parameters: ED and VD. However, this is an implementation detail that should not be exposed in the interface, so this PR drops the VD type parameter. This requires removing the `filter` method from the EdgeRDD interface, because it depends on vertex attribute caching. Author: Ankur Dave <ankurdave@gmail.com> Closes apache#3303 from ankurdave/edgerdd-drop-tparam and squashes the following commits: 38dca9b [Ankur Dave] Leave EdgeRDD.fromEdges public fafeb51 [Ankur Dave] Drop VD type parameter from EdgeRDD
1 parent e7690ed commit 9ac2bb1

File tree

7 files changed

+40
-50
lines changed

7 files changed

+40
-50
lines changed

graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.graphx
1919

20+
import scala.language.existentials
2021
import scala.reflect.ClassTag
2122

2223
import org.apache.spark.Dependency
@@ -36,16 +37,16 @@ import org.apache.spark.graphx.impl.EdgeRDDImpl
3637
* edge to provide the triplet view. Shipping of the vertex attributes is managed by
3738
* `impl.ReplicatedVertexView`.
3839
*/
39-
abstract class EdgeRDD[ED, VD](
40+
abstract class EdgeRDD[ED](
4041
@transient sc: SparkContext,
4142
@transient deps: Seq[Dependency[_]]) extends RDD[Edge[ED]](sc, deps) {
4243

43-
private[graphx] def partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])]
44+
private[graphx] def partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])] forSome { type VD }
4445

4546
override protected def getPartitions: Array[Partition] = partitionsRDD.partitions
4647

4748
override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
48-
val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context)
49+
val p = firstParent[(PartitionID, EdgePartition[ED, _])].iterator(part, context)
4950
if (p.hasNext) {
5051
p.next._2.iterator.map(_.copy())
5152
} else {
@@ -60,19 +61,14 @@ abstract class EdgeRDD[ED, VD](
6061
* @param f the function from an edge to a new edge value
6162
* @return a new EdgeRDD containing the new edge values
6263
*/
63-
def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2, VD]
64+
def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2]
6465

6566
/**
6667
* Reverse all the edges in this RDD.
6768
*
6869
* @return a new EdgeRDD containing all the edges reversed
6970
*/
70-
def reverse: EdgeRDD[ED, VD]
71-
72-
/** Removes all edges but those matching `epred` and where both vertices match `vpred`. */
73-
def filter(
74-
epred: EdgeTriplet[VD, ED] => Boolean,
75-
vpred: (VertexId, VD) => Boolean): EdgeRDD[ED, VD]
71+
def reverse: EdgeRDD[ED]
7672

7773
/**
7874
* Inner joins this EdgeRDD with another EdgeRDD, assuming both are partitioned using the same
@@ -84,15 +80,8 @@ abstract class EdgeRDD[ED, VD](
8480
* with values supplied by `f`
8581
*/
8682
def innerJoin[ED2: ClassTag, ED3: ClassTag]
87-
(other: EdgeRDD[ED2, _])
88-
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD]
89-
90-
private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
91-
f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2]
92-
93-
/** Replaces the edge partitions while preserving all other properties of the EdgeRDD. */
94-
private[graphx] def withPartitionsRDD[ED2: ClassTag, VD2: ClassTag](
95-
partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): EdgeRDD[ED2, VD2]
83+
(other: EdgeRDD[ED2])
84+
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]
9685

9786
/**
9887
* Changes the target storage level while preserving all other properties of the
@@ -101,7 +90,7 @@ abstract class EdgeRDD[ED, VD](
10190
* This does not actually trigger a cache; to do this, call
10291
* [[org.apache.spark.graphx.EdgeRDD#cache]] on the returned EdgeRDD.
10392
*/
104-
private[graphx] def withTargetStorageLevel(targetStorageLevel: StorageLevel): EdgeRDD[ED, VD]
93+
private[graphx] def withTargetStorageLevel(targetStorageLevel: StorageLevel): EdgeRDD[ED]
10594
}
10695

10796
object EdgeRDD {
@@ -111,7 +100,7 @@ object EdgeRDD {
111100
* @tparam ED the edge attribute type
112101
* @tparam VD the type of the vertex attributes that may be joined with the returned EdgeRDD
113102
*/
114-
def fromEdges[ED: ClassTag, VD: ClassTag](edges: RDD[Edge[ED]]): EdgeRDD[ED, VD] = {
103+
def fromEdges[ED: ClassTag, VD: ClassTag](edges: RDD[Edge[ED]]): EdgeRDDImpl[ED, VD] = {
115104
val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) =>
116105
val builder = new EdgePartitionBuilder[ED, VD]
117106
iter.foreach { e =>
@@ -128,8 +117,8 @@ object EdgeRDD {
128117
* @tparam ED the edge attribute type
129118
* @tparam VD the type of the vertex attributes that may be joined with the returned EdgeRDD
130119
*/
131-
def fromEdgePartitions[ED: ClassTag, VD: ClassTag](
132-
edgePartitions: RDD[(Int, EdgePartition[ED, VD])]): EdgeRDD[ED, VD] = {
120+
private[graphx] def fromEdgePartitions[ED: ClassTag, VD: ClassTag](
121+
edgePartitions: RDD[(Int, EdgePartition[ED, VD])]): EdgeRDDImpl[ED, VD] = {
133122
new EdgeRDDImpl(edgePartitions)
134123
}
135124
}

graphx/src/main/scala/org/apache/spark/graphx/Graph.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
5959
* along with their vertex data.
6060
*
6161
*/
62-
@transient val edges: EdgeRDD[ED, VD]
62+
@transient val edges: EdgeRDD[ED]
6363

6464
/**
6565
* An RDD containing the edge triplets, which are edges along with the vertex data associated with

graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ abstract class VertexRDD[VD](
207207
def reverseRoutingTables(): VertexRDD[VD]
208208

209209
/** Prepares this VertexRDD for efficient joins with the given EdgeRDD. */
210-
def withEdges(edges: EdgeRDD[_, _]): VertexRDD[VD]
210+
def withEdges(edges: EdgeRDD[_]): VertexRDD[VD]
211211

212212
/** Replaces the vertex partitions while preserving all other properties of the VertexRDD. */
213213
private[graphx] def withPartitionsRDD[VD2: ClassTag](
@@ -269,7 +269,7 @@ object VertexRDD {
269269
* @param defaultVal the vertex attribute to use when creating missing vertices
270270
*/
271271
def apply[VD: ClassTag](
272-
vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD): VertexRDD[VD] = {
272+
vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_], defaultVal: VD): VertexRDD[VD] = {
273273
VertexRDD(vertices, edges, defaultVal, (a, b) => a)
274274
}
275275

@@ -286,7 +286,7 @@ object VertexRDD {
286286
* @param mergeFunc the commutative, associative duplicate vertex attribute merge function
287287
*/
288288
def apply[VD: ClassTag](
289-
vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD, mergeFunc: (VD, VD) => VD
289+
vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_], defaultVal: VD, mergeFunc: (VD, VD) => VD
290290
): VertexRDD[VD] = {
291291
val vPartitioned: RDD[(VertexId, VD)] = vertices.partitioner match {
292292
case Some(p) => vertices
@@ -314,7 +314,7 @@ object VertexRDD {
314314
* @param defaultVal the vertex attribute to use when creating missing vertices
315315
*/
316316
def fromEdges[VD: ClassTag](
317-
edges: EdgeRDD[_, _], numPartitions: Int, defaultVal: VD): VertexRDD[VD] = {
317+
edges: EdgeRDD[_], numPartitions: Int, defaultVal: VD): VertexRDD[VD] = {
318318
val routingTables = createRoutingTables(edges, new HashPartitioner(numPartitions))
319319
val vertexPartitions = routingTables.mapPartitions({ routingTableIter =>
320320
val routingTable =
@@ -325,7 +325,7 @@ object VertexRDD {
325325
}
326326

327327
private[graphx] def createRoutingTables(
328-
edges: EdgeRDD[_, _], vertexPartitioner: Partitioner): RDD[RoutingTablePartition] = {
328+
edges: EdgeRDD[_], vertexPartitioner: Partitioner): RDD[RoutingTablePartition] = {
329329
// Determine which vertices each edge partition needs by creating a mapping from vid to pid.
330330
val vid2pid = edges.partitionsRDD.mapPartitions(_.flatMap(
331331
Function.tupled(RoutingTablePartition.edgePartitionToMsgs)))

graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.graphx._
2828
class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
2929
override val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])],
3030
val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
31-
extends EdgeRDD[ED, VD](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
31+
extends EdgeRDD[ED](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
3232

3333
override def setName(_name: String): this.type = {
3434
if (partitionsRDD.name != null) {
@@ -75,20 +75,20 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
7575
partitionsRDD.map(_._2.size.toLong).reduce(_ + _)
7676
}
7777

78-
override def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2, VD] =
78+
override def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDDImpl[ED2, VD] =
7979
mapEdgePartitions((pid, part) => part.map(f))
8080

81-
override def reverse: EdgeRDD[ED, VD] = mapEdgePartitions((pid, part) => part.reverse)
81+
override def reverse: EdgeRDDImpl[ED, VD] = mapEdgePartitions((pid, part) => part.reverse)
8282

83-
override def filter(
83+
def filter(
8484
epred: EdgeTriplet[VD, ED] => Boolean,
85-
vpred: (VertexId, VD) => Boolean): EdgeRDD[ED, VD] = {
85+
vpred: (VertexId, VD) => Boolean): EdgeRDDImpl[ED, VD] = {
8686
mapEdgePartitions((pid, part) => part.filter(epred, vpred))
8787
}
8888

8989
override def innerJoin[ED2: ClassTag, ED3: ClassTag]
90-
(other: EdgeRDD[ED2, _])
91-
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = {
90+
(other: EdgeRDD[ED2])
91+
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDDImpl[ED3, VD] = {
9292
val ed2Tag = classTag[ED2]
9393
val ed3Tag = classTag[ED3]
9494
this.withPartitionsRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
@@ -99,8 +99,8 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
9999
})
100100
}
101101

102-
override private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
103-
f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = {
102+
def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
103+
f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDDImpl[ED2, VD2] = {
104104
this.withPartitionsRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
105105
if (iter.hasNext) {
106106
val (pid, ep) = iter.next()
@@ -111,13 +111,13 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
111111
}, preservesPartitioning = true))
112112
}
113113

114-
override private[graphx] def withPartitionsRDD[ED2: ClassTag, VD2: ClassTag](
115-
partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): EdgeRDD[ED2, VD2] = {
114+
private[graphx] def withPartitionsRDD[ED2: ClassTag, VD2: ClassTag](
115+
partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): EdgeRDDImpl[ED2, VD2] = {
116116
new EdgeRDDImpl(partitionsRDD, this.targetStorageLevel)
117117
}
118118

119119
override private[graphx] def withTargetStorageLevel(
120-
targetStorageLevel: StorageLevel): EdgeRDD[ED, VD] = {
120+
targetStorageLevel: StorageLevel): EdgeRDDImpl[ED, VD] = {
121121
new EdgeRDDImpl(this.partitionsRDD, targetStorageLevel)
122122
}
123123

graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
4343
/** Default constructor is provided to support serialization */
4444
protected def this() = this(null, null)
4545

46-
@transient override val edges: EdgeRDD[ED, VD] = replicatedVertexView.edges
46+
@transient override val edges: EdgeRDDImpl[ED, VD] = replicatedVertexView.edges
4747

4848
/** Return a RDD that brings edges together with their source and destination vertices. */
4949
@transient override lazy val triplets: RDD[EdgeTriplet[VD, ED]] = {
@@ -323,9 +323,10 @@ object GraphImpl {
323323
*/
324324
def apply[VD: ClassTag, ED: ClassTag](
325325
vertices: VertexRDD[VD],
326-
edges: EdgeRDD[ED, _]): GraphImpl[VD, ED] = {
326+
edges: EdgeRDD[ED]): GraphImpl[VD, ED] = {
327327
// Convert the vertex partitions in edges to the correct type
328-
val newEdges = edges.mapEdgePartitions((pid, part) => part.withoutVertexAttributes[VD])
328+
val newEdges = edges.asInstanceOf[EdgeRDDImpl[ED, _]]
329+
.mapEdgePartitions((pid, part) => part.withoutVertexAttributes[VD])
329330
GraphImpl.fromExistingRDDs(vertices, newEdges)
330331
}
331332

@@ -336,16 +337,16 @@ object GraphImpl {
336337
*/
337338
def fromExistingRDDs[VD: ClassTag, ED: ClassTag](
338339
vertices: VertexRDD[VD],
339-
edges: EdgeRDD[ED, VD]): GraphImpl[VD, ED] = {
340-
new GraphImpl(vertices, new ReplicatedVertexView(edges))
340+
edges: EdgeRDD[ED]): GraphImpl[VD, ED] = {
341+
new GraphImpl(vertices, new ReplicatedVertexView(edges.asInstanceOf[EdgeRDDImpl[ED, VD]]))
341342
}
342343

343344
/**
344345
* Create a graph from an EdgeRDD with the correct vertex type, setting missing vertices to
345346
* `defaultVertexAttr`. The vertices will have the same number of partitions as the EdgeRDD.
346347
*/
347348
private def fromEdgeRDD[VD: ClassTag, ED: ClassTag](
348-
edges: EdgeRDD[ED, VD],
349+
edges: EdgeRDDImpl[ED, VD],
349350
defaultVertexAttr: VD,
350351
edgeStorageLevel: StorageLevel,
351352
vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {

graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.spark.graphx._
3333
*/
3434
private[impl]
3535
class ReplicatedVertexView[VD: ClassTag, ED: ClassTag](
36-
var edges: EdgeRDD[ED, VD],
36+
var edges: EdgeRDDImpl[ED, VD],
3737
var hasSrcId: Boolean = false,
3838
var hasDstId: Boolean = false) {
3939

@@ -42,7 +42,7 @@ class ReplicatedVertexView[VD: ClassTag, ED: ClassTag](
4242
* shipping level.
4343
*/
4444
def withEdges[VD2: ClassTag, ED2: ClassTag](
45-
edges_ : EdgeRDD[ED2, VD2]): ReplicatedVertexView[VD2, ED2] = {
45+
edges_ : EdgeRDDImpl[ED2, VD2]): ReplicatedVertexView[VD2, ED2] = {
4646
new ReplicatedVertexView(edges_, hasSrcId, hasDstId)
4747
}
4848

graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ class VertexRDDImpl[VD] private[graphx] (
172172
override def reverseRoutingTables(): VertexRDD[VD] =
173173
this.mapVertexPartitions(vPart => vPart.withRoutingTable(vPart.routingTable.reverse))
174174

175-
override def withEdges(edges: EdgeRDD[_, _]): VertexRDD[VD] = {
175+
override def withEdges(edges: EdgeRDD[_]): VertexRDD[VD] = {
176176
val routingTables = VertexRDD.createRoutingTables(edges, this.partitioner.get)
177177
val vertexPartitions = partitionsRDD.zipPartitions(routingTables, true) {
178178
(partIter, routingTableIter) =>

0 commit comments

Comments
 (0)