Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Address
  • Loading branch information
zsxwing committed Oct 27, 2016
commit 0f51b4ee160f4a0c0e822da945df545ce189c501
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,7 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {

postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine)))
} catch {
case e: java.lang.ClassNotFoundException if
e.getMessage == "org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgress" ||
e.getMessage == "org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated"
=>
case e: ClassNotFoundException if KNOWN_REMOVED_CLASSES.contains(e.getMessage) =>
// Ignore events generated by Structured Streaming in Spark 2.0.0 and 2.0.1.
// It's safe since no place uses them.
logWarning(s"Dropped incompatible Structured Streaming log: $currentLine")
Expand Down Expand Up @@ -109,4 +106,13 @@ private[spark] object ReplayListenerBus {

// utility filter that selects all event logs during replay
val SELECT_ALL_FILTER: ReplayEventsFilter = { (eventString: String) => true }

/**
* Classes that were removed. Structured Streaming doesn't use them any more. However, parsing
* old json may fail and we can just ignore these failures.
*/
val KNOWN_REMOVED_CLASSES = Set(
"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgress",
"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated"
)
}