Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,9 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {

postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine)))
} catch {
case e: ClassNotFoundException if KNOWN_REMOVED_CLASSES.contains(e.getMessage) =>
// Ignore events generated by Structured Streaming in Spark 2.0.0 and 2.0.1.
// It's safe since no place uses them.
logWarning(s"Dropped incompatible Structured Streaming log: $currentLine")
case e: UnrecognizedPropertyException if e.getMessage != null && e.getMessage.startsWith(
"Unrecognized field \"queryStatus\" " +
"(class org.apache.spark.sql.streaming.StreamingQueryListener$") =>
// Ignore events generated by Structured Streaming in Spark 2.0.2
// It's safe since no place uses them.
logWarning(s"Dropped incompatible Structured Streaming log: $currentLine")
case _: ClassNotFoundException | _: UnrecognizedPropertyException =>
// Ignore unknown events or unrecognized properties, parse through the event log file.
logWarning(s"Drop incompatible event log: $currentLine")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the case before, but isn't this going to spam the SHS log? Perhaps only log this once per log file being replayed, or per class, or something?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it configurable?

I think both behaviors make sense in different use cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe log subsequent errors at debug level, instead of adding a new config?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am just afraid some events are not correctly parsed and then silently ignored.

What I am proposing here is not related to the log level. I think we can have something like, zero tolerance policy, event level, and file level with a whitelist of events we can ignore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you're suggesting to restore the previous version that had a whitelist and extend it so that the behavior is configurable?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we can extend the existing whitelist mechanism and make it configurable too. At the same time, we can make the tolerance level configurable. Just want to make it more configurable. Also keep it strict during the tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or just log it once and saying unrecognized event found in file xxx

case jpe: JsonParseException =>
// We can only ignore exception from last line of the file that might be truncated
// the last entry may not be the very last line in the event log, but we treat it
Expand Down Expand Up @@ -125,13 +118,4 @@ private[spark] object ReplayListenerBus {

// utility filter that selects all event logs during replay
val SELECT_ALL_FILTER: ReplayEventsFilter = { (eventString: String) => true }

/**
* Classes that were removed. Structured Streaming doesn't use them any more. However, parsing
* old json may fail and we can just ignore these failures.
*/
val KNOWN_REMOVED_CLASSES = Set(
"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgress",
"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated"
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,35 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
}
}

test("Replay incompatible event log") {
val logFilePath = Utils.getFilePath(testDir, "incompatible.txt")
val fstream = fileSystem.create(logFilePath)
val writer = new PrintWriter(fstream)
val applicationStart = SparkListenerApplicationStart("Worst App (N)ever", None,
125L, "Donald", None)
val applicationEnd = SparkListenerApplicationEnd(1000L)
// scalastyle:off println
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
writer.println("""{"Event":"UnrecognizedEventOnlyForTest","Timestamp":1477593059313}""")
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
// scalastyle:on println
writer.close()

val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath)
val logData = fileSystem.open(logFilePath)
val eventMonster = new EventMonster(conf)
try {
val replayer = new ReplayListenerBus()
replayer.addListener(eventMonster)
replayer.replay(logData, logFilePath.toString)
} finally {
logData.close()
}
assert(eventMonster.loggedEvents.size === 2)
assert(eventMonster.loggedEvents(0) === JsonProtocol.sparkEventToJson(applicationStart))
assert(eventMonster.loggedEvents(1) === JsonProtocol.sparkEventToJson(applicationEnd))
}

// This assumes the correctness of EventLoggingListener
test("End-to-end replay") {
testApplicationReplay()
Expand Down