Skip to content

Commit 5feb1bd

Browse files
committed
Ignore Structured Streaming event logs to avoid breaking history server
1 parent 1042325 commit 5feb1bd

File tree

4 files changed

+56
-0
lines changed

4 files changed

+56
-0
lines changed

core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,12 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
7272

7373
postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine)))
7474
} catch {
75+
case e: java.lang.ClassNotFoundException if
76+
e.getMessage == "org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgress" ||
77+
e.getMessage == "org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated"
78+
=>
79+
// Ignore events generated by Structured Streaming in Spark 2.0.0 and 2.0.1.
80+
// It's safe since no place uses them.
7581
case jpe: JsonParseException =>
7682
// We can only ignore exception from last line of the file that might be truncated
7783
// the last entry may not be the very last line in the event log, but we treat it
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgress","queryInfo":{"name":"hello","id":0,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0"}],"sinkStatus":{"description":"org.apache.spark.sql.execution.streaming.MemorySink@2b85b3a5","offsetDesc":"[#0]"}}}
2+
{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated","queryInfo":{"name":"hello","id":0,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0"}],"sinkStatus":{"description":"org.apache.spark.sql.execution.streaming.MemorySink@2b85b3a5","offsetDesc":"[#0]"}},"exception":null,"stackTrace":[]}
3+
{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated","queryInfo":{"name":"hello","id":0,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0"}],"sinkStatus":{"description":"org.apache.spark.sql.execution.streaming.MemorySink@514502dc","offsetDesc":"[-]"}},"exception":"Query hello terminated with exception: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.ArithmeticException: / by zero\n\tat $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)\n\tat $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)\n\tat org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)\n\tat org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)\n\tat org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)\n\tat org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:85)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:","stackTrace":[{"methodName":"org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches","fileName":"StreamExecution.scala","lineNumber":208,"className":"org.apache.spark.sql.execution.streaming.StreamExecution","nativeMethod":false},{"methodName":"run","fileName":"StreamExecution.scala","lineNumber":120,"className":"org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1","nativeMethod":false}]}
4+
{"Event":"SparkListenerApplicationEnd","Timestamp":1477593059313}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgress","queryInfo":{"name":"hello","id":0,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0"}],"sinkStatus":{"description":"org.apache.spark.sql.execution.streaming.MemorySink@611c739e","offsetDesc":"[#0]"}}}
2+
{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated","queryInfo":{"name":"hello","id":0,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0"}],"sinkStatus":{"description":"org.apache.spark.sql.execution.streaming.MemorySink@611c739e","offsetDesc":"[#0]"}},"exception":null,"stackTrace":[]}
3+
{"Event":"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated","queryInfo":{"name":"hello","id":0,"sourceStatuses":[{"description":"FileStreamSource[file:/Users/zsx/stream]","offsetDesc":"#0"}],"sinkStatus":{"description":"org.apache.spark.sql.execution.streaming.MemorySink@76959af6","offsetDesc":"[-]"}},"exception":"Query hello terminated with exception: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.ArithmeticException: / by zero\n\tat $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)\n\tat $line15.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)\n\tat org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)\n\tat org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)\n\tat org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)\n\tat org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:283)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:85)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:","stackTrace":[{"methodName":"org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches","fileName":"StreamExecution.scala","lineNumber":208,"className":"org.apache.spark.sql.execution.streaming.StreamExecution","nativeMethod":false},{"methodName":"run","fileName":"StreamExecution.scala","lineNumber":120,"className":"org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1","nativeMethod":false}]}
4+
{"Event":"SparkListenerApplicationEnd","Timestamp":1477594073785}

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,14 @@
1717

1818
package org.apache.spark.sql.streaming
1919

20+
import scala.collection.mutable
21+
2022
import org.scalactic.TolerantNumerics
2123
import org.scalatest.BeforeAndAfter
2224
import org.scalatest.PrivateMethodTester._
2325

2426
import org.apache.spark.SparkException
27+
import org.apache.spark.scheduler._
2528
import org.apache.spark.sql.DataFrame
2629
import org.apache.spark.sql.execution.streaming._
2730
import org.apache.spark.sql.functions._
@@ -206,6 +209,45 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
206209
assert(queryQueryTerminated.exception === newQueryTerminated.exception)
207210
}
208211

212+
test("ReplayListenerBus should ignore broken event jsons generated in 2.0.0") {
213+
// structured-streaming-query-event-logs-2.0.0.txt has all types of events generated by
214+
// Structured Streaming in Spark 2.0.0.
215+
// SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
216+
// to verify that we can skip broken jsons generated by Structured Streaming.
217+
testReplayListenerBusWithBorkenEventJsons("structured-streaming-query-event-logs-2.0.0.txt")
218+
}
219+
220+
test("ReplayListenerBus should ignore broken event jsons generated in 2.0.1") {
221+
// structured-streaming-query-event-logs-2.0.1.txt has all types of events generated by
222+
// Structured Streaming in Spark 2.0.1.
223+
// SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
224+
// to verify that we can skip broken jsons generated by Structured Streaming.
225+
testReplayListenerBusWithBorkenEventJsons("structured-streaming-query-event-logs-2.0.1.txt")
226+
}
227+
228+
private def testReplayListenerBusWithBorkenEventJsons(fileName: String): Unit = {
229+
val input = getClass.getResourceAsStream(s"/history-server/$fileName")
230+
val events = mutable.ArrayBuffer[SparkListenerEvent]()
231+
try {
232+
val replayer = new ReplayListenerBus() {
233+
// Redirect all parsed events to `events`
234+
override def doPostEvent(
235+
listener: SparkListenerInterface,
236+
event: SparkListenerEvent): Unit = {
237+
events += event
238+
}
239+
}
240+
// Add a dummy listener so that "doPostEvent" will be called.
241+
replayer.addListener(new SparkListener {})
242+
replayer.replay(input, fileName)
243+
// SparkListenerApplicationEnd is the only valid event
244+
assert(events.size === 1)
245+
assert(events(0).isInstanceOf[SparkListenerApplicationEnd])
246+
} finally {
247+
input.close()
248+
}
249+
}
250+
209251
private def assertStreamingQueryInfoEquals(
210252
expected: StreamingQueryStatus,
211253
actual: StreamingQueryStatus): Unit = {

0 commit comments

Comments
 (0)