Skip to content

Commit c9e2e03

Browse files
committed
[SPARK-14804][Graphx] Graph vertexRDD/EdgeRDD checkpoint results ClassCastException
1 parent b512f04 commit c9e2e03

File tree

4 files changed

+53
-2
lines changed

4 files changed

+53
-2
lines changed

graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] (
7676
}
7777

7878
override def isCheckpointed: Boolean = {
79-
firstParent[(PartitionID, EdgePartition[ED, VD])].isCheckpointed
79+
partitionsRDD != null && partitionsRDD.isCheckpointed
8080
}
8181

8282
override def getCheckpointFile: Option[String] = {

graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ class VertexRDDImpl[VD] private[graphx] (
7676
}
7777

7878
override def isCheckpointed: Boolean = {
79-
firstParent[ShippableVertexPartition[VD]].isCheckpointed
79+
partitionsRDD != null && partitionsRDD.isCheckpointed
8080
}
8181

8282
override def getCheckpointFile: Option[String] = {

graphx/src/test/scala/org/apache/spark/graphx/EdgeRDDSuite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.graphx
1919

2020
import org.apache.spark.SparkFunSuite
2121
import org.apache.spark.storage.StorageLevel
22+
import org.apache.spark.util.Utils
2223

2324
class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext {
2425

@@ -33,4 +34,29 @@ class EdgeRDDSuite extends SparkFunSuite with LocalSparkContext {
3334
}
3435
}
3536

37+
test("checkpointing") {
38+
withSpark { sc =>
39+
val verts = sc.parallelize(List((0L, 0), (1L, 1), (1L, 2), (2L, 3), (2L, 3), (2L, 3)))
40+
val edges = EdgeRDD.fromEdges(sc.parallelize(List.empty[Edge[Int]]))
41+
sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath)
42+
edges.checkpoint()
43+
44+
// EdgeRDD and partitionsRDD are not checkpointed yet
45+
assert(!edges.isCheckpointed)
46+
assert(!edges.isCheckpointedAndMaterialized)
47+
assert(!edges.partitionsRDD.isCheckpointed)
48+
assert(!edges.partitionsRDD.isCheckpointedAndMaterialized)
49+
50+
val data = edges.collect().toSeq // force checkpointing
51+
52+
// EdgeRDD and partitionsRDD are checkpointed now
53+
assert(edges.isCheckpointed)
54+
assert(edges.isCheckpointedAndMaterialized)
55+
assert(edges.partitionsRDD.isCheckpointed)
56+
assert(edges.partitionsRDD.isCheckpointedAndMaterialized)
57+
58+
assert(edges.collect().toSeq === data) // test checkpointed RDD
59+
}
60+
}
61+
3662
}

graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.graphx
2020
import org.apache.spark.{HashPartitioner, SparkContext, SparkFunSuite}
2121
import org.apache.spark.rdd.RDD
2222
import org.apache.spark.storage.StorageLevel
23+
import org.apache.spark.util.Utils
2324

2425
class VertexRDDSuite extends SparkFunSuite with LocalSparkContext {
2526

@@ -197,4 +198,28 @@ class VertexRDDSuite extends SparkFunSuite with LocalSparkContext {
197198
}
198199
}
199200

201+
test("checkpoint") {
202+
withSpark { sc =>
203+
val n = 100
204+
val verts = vertices(sc, n)
205+
sc.setCheckpointDir(Utils.createTempDir().getCanonicalPath)
206+
verts.checkpoint()
207+
208+
// VertexRDD and partitionsRDD are not checkpointed yet
209+
assert(!verts.isCheckpointed)
210+
assert(!verts.isCheckpointedAndMaterialized)
211+
assert(!verts.partitionsRDD.isCheckpointed)
212+
assert(!verts.partitionsRDD.isCheckpointedAndMaterialized)
213+
214+
val data = verts.collect().toSeq // force checkpointing
215+
216+
// VertexRDD and partitionsRDD are not checkpointed now
217+
assert(verts.isCheckpointed)
218+
assert(verts.isCheckpointedAndMaterialized)
219+
assert(verts.partitionsRDD.isCheckpointed)
220+
assert(verts.partitionsRDD.isCheckpointedAndMaterialized)
221+
222+
assert(verts.collect().toSeq === data) // test checkpointed RDD
223+
}
224+
}
200225
}

0 commit comments

Comments
 (0)