Skip to content

Commit b6c8366

Browse files
sarutakjiangxb1987
authored andcommitted
[SPARK-31764][CORE][3.0] JsonProtocol doesn't write RDDInfo#isBarrier
### What changes were proposed in this pull request? This PR backports the change of apache#28583 (SPARK-31764) to branch-3.0, which changes JsonProtocol to write RDDInfos#isBarrier. ### Why are the changes needed? JsonProtocol reads RDDInfos#isBarrier but not write it so it's a bug. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I added a testcase. Closes apache#28660 from sarutak/SPARK-31764-branch-3.0. Authored-by: Kousuke Saruta <[email protected]> Signed-off-by: Xingbo Jiang <[email protected]>
1 parent 8a7fb26 commit b6c8366

File tree

3 files changed

+56
-0
lines changed

3 files changed

+56
-0
lines changed

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,7 @@ private[spark] object JsonProtocol {
475475
("Callsite" -> rddInfo.callSite) ~
476476
("Parent IDs" -> parentIds) ~
477477
("Storage Level" -> storageLevel) ~
478+
("Barrier" -> rddInfo.isBarrier) ~
478479
("Number of Partitions" -> rddInfo.numPartitions) ~
479480
("Number of Cached Partitions" -> rddInfo.numCachedPartitions) ~
480481
("Memory Size" -> rddInfo.memSize) ~

core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import org.apache.spark.deploy.history.{EventLogFileReader, SingleEventLogFileWr
3636
import org.apache.spark.deploy.history.EventLogTestHelper._
3737
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
3838
import org.apache.spark.internal.Logging
39+
import org.apache.spark.internal.config.{EVENT_LOG_DIR, EVENT_LOG_ENABLED}
3940
import org.apache.spark.io._
4041
import org.apache.spark.metrics.{ExecutorMetricType, MetricsSystem}
4142
import org.apache.spark.scheduler.cluster.ExecutorInfo
@@ -99,6 +100,49 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
99100
testStageExecutorMetricsEventLogging()
100101
}
101102

103+
test("SPARK-31764: isBarrier should be logged in event log") {
104+
val conf = new SparkConf()
105+
conf.set(EVENT_LOG_ENABLED, true)
106+
conf.set(EVENT_LOG_DIR, testDirPath.toString)
107+
val sc = new SparkContext("local", "test-SPARK-31764", conf)
108+
val appId = sc.applicationId
109+
110+
sc.parallelize(1 to 10)
111+
.barrier()
112+
.mapPartitions(_.map(elem => (elem, elem)))
113+
.filter(elem => elem._1 % 2 == 0)
114+
.reduceByKey(_ + _)
115+
.collect
116+
sc.stop()
117+
118+
val eventLogStream = EventLogFileReader.openEventLog(new Path(testDirPath, appId), fileSystem)
119+
val events = readLines(eventLogStream).map(line => JsonProtocol.sparkEventFromJson(parse(line)))
120+
val jobStartEvents = events
121+
.filter(event => event.isInstanceOf[SparkListenerJobStart])
122+
.map(_.asInstanceOf[SparkListenerJobStart])
123+
124+
assert(jobStartEvents.size === 1)
125+
val stageInfos = jobStartEvents.head.stageInfos
126+
assert(stageInfos.size === 2)
127+
128+
val stage0 = stageInfos(0)
129+
val rddInfosInStage0 = stage0.rddInfos
130+
assert(rddInfosInStage0.size === 3)
131+
val sortedRddInfosInStage0 = rddInfosInStage0.sortBy(_.scope.get.name)
132+
assert(sortedRddInfosInStage0(0).scope.get.name === "filter")
133+
assert(sortedRddInfosInStage0(0).isBarrier === true)
134+
assert(sortedRddInfosInStage0(1).scope.get.name === "mapPartitions")
135+
assert(sortedRddInfosInStage0(1).isBarrier === true)
136+
assert(sortedRddInfosInStage0(2).scope.get.name === "parallelize")
137+
assert(sortedRddInfosInStage0(2).isBarrier === false)
138+
139+
val stage1 = stageInfos(1)
140+
val rddInfosInStage1 = stage1.rddInfos
141+
assert(rddInfosInStage1.size === 1)
142+
assert(rddInfosInStage1(0).scope.get.name === "reduceByKey")
143+
assert(rddInfosInStage1(0).isBarrier === false) // reduceByKey
144+
}
145+
102146
/* ----------------- *
103147
* Actual test logic *
104148
* ----------------- */

core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1063,6 +1063,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
10631063
| "Deserialized": true,
10641064
| "Replication": 1
10651065
| },
1066+
| "Barrier" : false,
10661067
| "Number of Partitions": 201,
10671068
| "Number of Cached Partitions": 301,
10681069
| "Memory Size": 401,
@@ -1585,6 +1586,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
15851586
| "Deserialized": true,
15861587
| "Replication": 1
15871588
| },
1589+
| "Barrier" : false,
15881590
| "Number of Partitions": 200,
15891591
| "Number of Cached Partitions": 300,
15901592
| "Memory Size": 400,
@@ -1629,6 +1631,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
16291631
| "Deserialized": true,
16301632
| "Replication": 1
16311633
| },
1634+
| "Barrier" : false,
16321635
| "Number of Partitions": 400,
16331636
| "Number of Cached Partitions": 600,
16341637
| "Memory Size": 800,
@@ -1645,6 +1648,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
16451648
| "Deserialized": true,
16461649
| "Replication": 1
16471650
| },
1651+
| "Barrier" : false,
16481652
| "Number of Partitions": 401,
16491653
| "Number of Cached Partitions": 601,
16501654
| "Memory Size": 801,
@@ -1689,6 +1693,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
16891693
| "Deserialized": true,
16901694
| "Replication": 1
16911695
| },
1696+
| "Barrier" : false,
16921697
| "Number of Partitions": 600,
16931698
| "Number of Cached Partitions": 900,
16941699
| "Memory Size": 1200,
@@ -1705,6 +1710,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
17051710
| "Deserialized": true,
17061711
| "Replication": 1
17071712
| },
1713+
| "Barrier" : false,
17081714
| "Number of Partitions": 601,
17091715
| "Number of Cached Partitions": 901,
17101716
| "Memory Size": 1201,
@@ -1721,6 +1727,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
17211727
| "Deserialized": true,
17221728
| "Replication": 1
17231729
| },
1730+
| "Barrier" : false,
17241731
| "Number of Partitions": 602,
17251732
| "Number of Cached Partitions": 902,
17261733
| "Memory Size": 1202,
@@ -1765,6 +1772,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
17651772
| "Deserialized": true,
17661773
| "Replication": 1
17671774
| },
1775+
| "Barrier" : false,
17681776
| "Number of Partitions": 800,
17691777
| "Number of Cached Partitions": 1200,
17701778
| "Memory Size": 1600,
@@ -1781,6 +1789,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
17811789
| "Deserialized": true,
17821790
| "Replication": 1
17831791
| },
1792+
| "Barrier" : false,
17841793
| "Number of Partitions": 801,
17851794
| "Number of Cached Partitions": 1201,
17861795
| "Memory Size": 1601,
@@ -1797,6 +1806,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
17971806
| "Deserialized": true,
17981807
| "Replication": 1
17991808
| },
1809+
| "Barrier" : false,
18001810
| "Number of Partitions": 802,
18011811
| "Number of Cached Partitions": 1202,
18021812
| "Memory Size": 1602,
@@ -1813,6 +1823,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
18131823
| "Deserialized": true,
18141824
| "Replication": 1
18151825
| },
1826+
| "Barrier" : false,
18161827
| "Number of Partitions": 803,
18171828
| "Number of Cached Partitions": 1203,
18181829
| "Memory Size": 1603,

0 commit comments

Comments
 (0)