Skip to content

Commit 300d596

Browse files
zsxwingrxin
authored andcommitted
[SPARK-18143][SQL] Ignore Structured Streaming event logs to avoid breaking history server (branch 2.0)
## What changes were proposed in this pull request? Backport #15663 to branch-2.0 and fixed conflicts in `ReplayListenerBus`. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes #15695 from zsxwing/fix-event-log-2.0.
1 parent 9f92474 commit 300d596

File tree

4 files changed

+67
-0
lines changed

4 files changed

+67
-0
lines changed

core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.core.JsonParseException
2525
import org.json4s.jackson.JsonMethods._
2626

2727
import org.apache.spark.internal.Logging
28+
import org.apache.spark.scheduler.ReplayListenerBus._
2829
import org.apache.spark.util.JsonProtocol
2930

3031
/**
@@ -57,6 +58,10 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
5758
try {
5859
postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine)))
5960
} catch {
61+
case e: ClassNotFoundException if KNOWN_REMOVED_CLASSES.contains(e.getMessage) =>
62+
// Ignore events generated by Structured Streaming in Spark 2.0.0 and 2.0.1.
63+
// It's safe since no place uses them.
64+
logWarning(s"Dropped incompatible Structured Streaming log: $currentLine")
6065
case jpe: JsonParseException =>
6166
// We can only ignore exception from last line of the file that might be truncated
6267
if (!maybeTruncated || lines.hasNext) {
@@ -78,3 +83,15 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
7883
}
7984

8085
}
86+
87+
private[spark] object ReplayListenerBus {
88+
89+
/**
90+
* Classes that were removed. Structured Streaming doesn't use them any more. However, parsing
91+
* old json may fail and we can just ignore these failures.
92+
*/
93+
val KNOWN_REMOVED_CLASSES = Set(
94+
"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgress",
95+
"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated"
96+
)
97+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgress","queryInfo":{"name":"hello","id":0,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0"}],"sinkStatus":{"description":"org.apache.spark.sql.execution.streaming.MemorySink@2b85b3a5","offsetDesc":"[#0]"}}}
2+
{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated","queryInfo":{"name":"hello","id":0,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0"}],"sinkStatus":{"description":"org.apache.spark.sql.execution.streaming.MemorySink@2b85b3a5","offsetDesc":"[#0]"}},"exception":null,"stackTrace":[]}
3+
{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated","queryInfo":{"name":"hello","id":0,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0"}],"sinkStatus":{"description":"org.apache.spark.sql.execution.streaming.MemorySink@514502dc","offsetDesc":"[-]"}},"exception":"Query hello terminated with exception: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.ArithmeticException: / by zero\n\tat $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)\n\tat $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)\n\tat org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)\n\tat org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)\n\tat org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)\n\tat org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:85)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:","stackTrace":[{"methodName":"org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches","fileName":"StreamExecution.scala","lineNumber":208,"className":"org.apache.spark.sql.execution.streaming.StreamExecution","nativeMethod":false},{"methodName":"run","fileName":"StreamExecution.scala","lineNumber":120,"className":"org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1","nativeMethod":false}]}
4+
{"Event":"SparkListenerApplicationEnd","Timestamp":1477593059313}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgress","queryInfo":{"name":"hello","id":0,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0"}],"sinkStatus":{"description":"org.apache.spark.sql.execution.streaming.MemorySink@10e5ec94","offsetDesc":"[#0]"}}}
2+
{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated","queryInfo":{"name":"hello","id":0,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0"}],"sinkStatus":{"description":"org.apache.spark.sql.execution.streaming.MemorySink@10e5ec94","offsetDesc":"[#0]"}},"exception":null}
3+
{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated","queryInfo":{"name":"hello","id":0,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0"}],"sinkStatus":{"description":"org.apache.spark.sql.execution.streaming.MemorySink@70c61dc8","offsetDesc":"[-]"}},"exception":"org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.ArithmeticException: / by zero\n\tat $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)\n\tat $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)\n\tat org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)\n\tat org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)\n\tat org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)\n\tat org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:86)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat scala.Option.foreach(Option.scala:257)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1930)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:912)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:358)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:911)\n\tat org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:290)\n\tat org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)\n\tat org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)\n\tat org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)\n\tat org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collect$1.apply(Dataset.scala:2197)\n\tat org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collect$1.apply(Dataset.scala:2197)\n\tat org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2559)\n\tat org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2197)\n\tat org.apache.spark.sql.Dataset.collect(Dataset.scala:2173)\n\tat org.apache.spark.sql.execution.streaming.MemorySink.addBatch(memory.scala:154)\n\tat org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:366)\n\tat org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:197)\n\tat org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)\n\tat org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:187)\n\tat org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:124)\nCaused by: java.lang.ArithmeticException: / by zero\n\tat $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)\n\tat $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)\n\tat org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)\n\tat org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)\n\tat org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)\n\tat org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:86)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n"}
4+
{"Event":"SparkListenerApplicationEnd","Timestamp":1477701734609}

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,14 @@
1717

1818
package org.apache.spark.sql.streaming
1919

20+
import scala.collection.mutable
21+
2022
import org.scalactic.TolerantNumerics
2123
import org.scalatest.BeforeAndAfter
2224
import org.scalatest.PrivateMethodTester._
2325

2426
import org.apache.spark.SparkException
27+
import org.apache.spark.scheduler._
2528
import org.apache.spark.sql.DataFrame
2629
import org.apache.spark.sql.execution.streaming._
2730
import org.apache.spark.sql.functions._
@@ -206,6 +209,45 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
206209
assert(queryQueryTerminated.exception === newQueryTerminated.exception)
207210
}
208211

212+
test("ReplayListenerBus should ignore broken event jsons generated in 2.0.0") {
213+
// query-event-logs-version-2.0.0.txt has all types of events generated by
214+
// Structured Streaming in Spark 2.0.0.
215+
// SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
216+
// to verify that we can skip broken jsons generated by Structured Streaming.
217+
testReplayListenerBusWithBrokenEventJsons("query-event-logs-version-2.0.0.txt")
218+
}
219+
220+
test("ReplayListenerBus should ignore broken event jsons generated in 2.0.1") {
221+
// query-event-logs-version-2.0.1.txt has all types of events generated by
222+
// Structured Streaming in Spark 2.0.1.
223+
// SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
224+
// to verify that we can skip broken jsons generated by Structured Streaming.
225+
testReplayListenerBusWithBrokenEventJsons("query-event-logs-version-2.0.1.txt")
226+
}
227+
228+
private def testReplayListenerBusWithBrokenEventJsons(fileName: String): Unit = {
229+
val input = getClass.getResourceAsStream(s"/structured-streaming/$fileName")
230+
val events = mutable.ArrayBuffer[SparkListenerEvent]()
231+
try {
232+
val replayer = new ReplayListenerBus() {
233+
// Redirect all parsed events to `events`
234+
override def doPostEvent(
235+
listener: SparkListenerInterface,
236+
event: SparkListenerEvent): Unit = {
237+
events += event
238+
}
239+
}
240+
// Add a dummy listener so that "doPostEvent" will be called.
241+
replayer.addListener(new SparkListener {})
242+
replayer.replay(input, fileName)
243+
// SparkListenerApplicationEnd is the only valid event
244+
assert(events.size === 1)
245+
assert(events(0).isInstanceOf[SparkListenerApplicationEnd])
246+
} finally {
247+
input.close()
248+
}
249+
}
250+
209251
private def assertStreamingQueryInfoEquals(
210252
expected: StreamingQueryStatus,
211253
actual: StreamingQueryStatus): Unit = {

0 commit comments

Comments
 (0)