diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index 26a6a3effc9a..c9cd662f5709 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -69,6 +69,8 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { eventsFilter: ReplayEventsFilter): Unit = { var currentLine: String = null var lineNumber: Int = 0 + val unrecognizedEvents = new scala.collection.mutable.HashSet[String] + val unrecognizedProperties = new scala.collection.mutable.HashSet[String] try { val lineEntries = lines @@ -84,16 +86,22 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine))) } catch { - case e: ClassNotFoundException if KNOWN_REMOVED_CLASSES.contains(e.getMessage) => - // Ignore events generated by Structured Streaming in Spark 2.0.0 and 2.0.1. - // It's safe since no place uses them. - logWarning(s"Dropped incompatible Structured Streaming log: $currentLine") - case e: UnrecognizedPropertyException if e.getMessage != null && e.getMessage.startsWith( - "Unrecognized field \"queryStatus\" " + - "(class org.apache.spark.sql.streaming.StreamingQueryListener$") => - // Ignore events generated by Structured Streaming in Spark 2.0.2 - // It's safe since no place uses them. - logWarning(s"Dropped incompatible Structured Streaming log: $currentLine") + case e: ClassNotFoundException => + // Ignore unknown events, parse through the event log file. + // To avoid spamming, warnings are only displayed once for each unknown event. + if (!unrecognizedEvents.contains(e.getMessage)) { + logWarning(s"Drop unrecognized event: ${e.getMessage}") + unrecognizedEvents.add(e.getMessage) + } + logDebug(s"Drop incompatible event log: $currentLine") + case e: UnrecognizedPropertyException => + // Ignore unrecognized properties, parse through the event log file. + // To avoid spamming, warnings are only displayed once for each unrecognized property. + if (!unrecognizedProperties.contains(e.getMessage)) { + logWarning(s"Drop unrecognized property: ${e.getMessage}") + unrecognizedProperties.add(e.getMessage) + } + logDebug(s"Drop incompatible event log: $currentLine") case jpe: JsonParseException => // We can only ignore exception from last line of the file that might be truncated // the last entry may not be the very last line in the event log, but we treat it @@ -125,13 +133,4 @@ private[spark] object ReplayListenerBus { // utility filter that selects all event logs during replay val SELECT_ALL_FILTER: ReplayEventsFilter = { (eventString: String) => true } - - /** - * Classes that were removed. Structured Streaming doesn't use them any more. However, parsing - * old json may fail and we can just ignore these failures. - */ - val KNOWN_REMOVED_CLASSES = Set( - "org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgress", - "org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated" - ) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index d17e3864854a..73e7b3fe8c1d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -128,6 +128,35 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp } } + test("Replay incompatible event log") { + val logFilePath = Utils.getFilePath(testDir, "incompatible.txt") + val fstream = fileSystem.create(logFilePath) + val writer = new PrintWriter(fstream) + val applicationStart = SparkListenerApplicationStart("Incompatible App", None, + 125L, "UserUsingIncompatibleVersion", None) + val applicationEnd = SparkListenerApplicationEnd(1000L) + // scalastyle:off println + writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart)))) + writer.println("""{"Event":"UnrecognizedEventOnlyForTest","Timestamp":1477593059313}""") + writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd)))) + // scalastyle:on println + writer.close() + + val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath) + val logData = fileSystem.open(logFilePath) + val eventMonster = new EventMonster(conf) + try { + val replayer = new ReplayListenerBus() + replayer.addListener(eventMonster) + replayer.replay(logData, logFilePath.toString) + } finally { + logData.close() + } + assert(eventMonster.loggedEvents.size === 2) + assert(eventMonster.loggedEvents(0) === JsonProtocol.sparkEventToJson(applicationStart)) + assert(eventMonster.loggedEvents(1) === JsonProtocol.sparkEventToJson(applicationEnd)) + } + // This assumes the correctness of EventLoggingListener test("End-to-end replay") { testApplicationReplay()