Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[SPARK-31764][CORE] JsonProtocol doesn't write RDDInfo#isBarrier
### What changes were proposed in this pull request?

This PR changes JsonProtocol to write RDDInfos#isBarrier.

### Why are the changes needed?

JsonProtocol reads RDDInfos#isBarrier but not write it so it's a bug.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I added a testcase.

Closes #28583 from sarutak/SPARK-31764.

Authored-by: Kousuke Saruta <[email protected]>
Signed-off-by: Xingbo Jiang <[email protected]>
  • Loading branch information
sarutak committed May 28, 2020
commit bc5fc2d0318ad5540f06ccabaf394376a5a48db8
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ private[spark] object JsonProtocol {
("Callsite" -> rddInfo.callSite) ~
("Parent IDs" -> parentIds) ~
("Storage Level" -> storageLevel) ~
("Barrier" -> rddInfo.isBarrier) ~
("Number of Partitions" -> rddInfo.numPartitions) ~
("Number of Cached Partitions" -> rddInfo.numCachedPartitions) ~
("Memory Size" -> rddInfo.memSize) ~
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.deploy.history.{EventLogFileReader, SingleEventLogFileWr
import org.apache.spark.deploy.history.EventLogTestHelper._
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{EVENT_LOG_DIR, EVENT_LOG_ENABLED}
import org.apache.spark.io._
import org.apache.spark.metrics.{ExecutorMetricType, MetricsSystem}
import org.apache.spark.scheduler.cluster.ExecutorInfo
Expand Down Expand Up @@ -99,6 +100,49 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
testStageExecutorMetricsEventLogging()
}

test("SPARK-31764: isBarrier should be logged in event log") {
val conf = new SparkConf()
conf.set(EVENT_LOG_ENABLED, true)
conf.set(EVENT_LOG_DIR, testDirPath.toString)
val sc = new SparkContext("local", "test-SPARK-31764", conf)
val appId = sc.applicationId

sc.parallelize(1 to 10)
.barrier()
.mapPartitions(_.map(elem => (elem, elem)))
.filter(elem => elem._1 % 2 == 0)
.reduceByKey(_ + _)
.collect
sc.stop()

val eventLogStream = EventLogFileReader.openEventLog(new Path(testDirPath, appId), fileSystem)
val events = readLines(eventLogStream).map(line => JsonProtocol.sparkEventFromJson(parse(line)))
val jobStartEvents = events
.filter(event => event.isInstanceOf[SparkListenerJobStart])
.map(_.asInstanceOf[SparkListenerJobStart])

assert(jobStartEvents.size === 1)
val stageInfos = jobStartEvents.head.stageInfos
assert(stageInfos.size === 2)

val stage0 = stageInfos(0)
val rddInfosInStage0 = stage0.rddInfos
assert(rddInfosInStage0.size === 3)
val sortedRddInfosInStage0 = rddInfosInStage0.sortBy(_.scope.get.name)
assert(sortedRddInfosInStage0(0).scope.get.name === "filter")
assert(sortedRddInfosInStage0(0).isBarrier === true)
assert(sortedRddInfosInStage0(1).scope.get.name === "mapPartitions")
assert(sortedRddInfosInStage0(1).isBarrier === true)
assert(sortedRddInfosInStage0(2).scope.get.name === "parallelize")
assert(sortedRddInfosInStage0(2).isBarrier === false)

val stage1 = stageInfos(1)
val rddInfosInStage1 = stage1.rddInfos
assert(rddInfosInStage1.size === 1)
assert(rddInfosInStage1(0).scope.get.name === "reduceByKey")
assert(rddInfosInStage1(0).isBarrier === false) // reduceByKey
}

/* ----------------- *
* Actual test logic *
* ----------------- */
Expand Down
11 changes: 11 additions & 0 deletions core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1063,6 +1063,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Deserialized": true,
| "Replication": 1
| },
| "Barrier" : false,
| "Number of Partitions": 201,
| "Number of Cached Partitions": 301,
| "Memory Size": 401,
Expand Down Expand Up @@ -1585,6 +1586,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Deserialized": true,
| "Replication": 1
| },
| "Barrier" : false,
| "Number of Partitions": 200,
| "Number of Cached Partitions": 300,
| "Memory Size": 400,
Expand Down Expand Up @@ -1629,6 +1631,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Deserialized": true,
| "Replication": 1
| },
| "Barrier" : false,
| "Number of Partitions": 400,
| "Number of Cached Partitions": 600,
| "Memory Size": 800,
Expand All @@ -1645,6 +1648,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Deserialized": true,
| "Replication": 1
| },
| "Barrier" : false,
| "Number of Partitions": 401,
| "Number of Cached Partitions": 601,
| "Memory Size": 801,
Expand Down Expand Up @@ -1689,6 +1693,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Deserialized": true,
| "Replication": 1
| },
| "Barrier" : false,
| "Number of Partitions": 600,
| "Number of Cached Partitions": 900,
| "Memory Size": 1200,
Expand All @@ -1705,6 +1710,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Deserialized": true,
| "Replication": 1
| },
| "Barrier" : false,
| "Number of Partitions": 601,
| "Number of Cached Partitions": 901,
| "Memory Size": 1201,
Expand All @@ -1721,6 +1727,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Deserialized": true,
| "Replication": 1
| },
| "Barrier" : false,
| "Number of Partitions": 602,
| "Number of Cached Partitions": 902,
| "Memory Size": 1202,
Expand Down Expand Up @@ -1765,6 +1772,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Deserialized": true,
| "Replication": 1
| },
| "Barrier" : false,
| "Number of Partitions": 800,
| "Number of Cached Partitions": 1200,
| "Memory Size": 1600,
Expand All @@ -1781,6 +1789,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Deserialized": true,
| "Replication": 1
| },
| "Barrier" : false,
| "Number of Partitions": 801,
| "Number of Cached Partitions": 1201,
| "Memory Size": 1601,
Expand All @@ -1797,6 +1806,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Deserialized": true,
| "Replication": 1
| },
| "Barrier" : false,
| "Number of Partitions": 802,
| "Number of Cached Partitions": 1202,
| "Memory Size": 1602,
Expand All @@ -1813,6 +1823,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Deserialized": true,
| "Replication": 1
| },
| "Barrier" : false,
| "Number of Partitions": 803,
| "Number of Cached Partitions": 1203,
| "Memory Size": 1603,
Expand Down