From 0b0adb0b7a508f3e1902b838382e478a1435cfde Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Tue, 3 Sep 2019 19:50:41 -0700 Subject: [PATCH 1/4] [SPARK-28770][CORE][TEST] Fix ReplayListenerSuite tests that sometimes fail. testApplicationReplay fails if the application runs long enough for the driver to send an executor metrics update. This causes stage executor metrics to be written for the driver. However, executor metrics updates are not logged, and thus not replayed. Therefore no stage executor metrics for the driver is logged on replay. --- .../spark/scheduler/ReplayListenerSuite.scala | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index e79613749f0c..0aa2db136dd8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -219,11 +219,25 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp // Verify the same events are replayed in the same order assert(sc.eventLogger.isDefined) val originalEvents = sc.eventLogger.get.loggedEvents + .map(JsonProtocol.sparkEventFromJson(_)) val replayedEvents = eventMonster.loggedEvents - originalEvents.zip(replayedEvents).foreach { case (e1, e2) => - // Don't compare the JSON here because accumulators in StageInfo may be out of order - JsonProtocolSuite.assertEquals( - JsonProtocol.sparkEventFromJson(e1), JsonProtocol.sparkEventFromJson(e2)) + .map(JsonProtocol.sparkEventFromJson(_)) + // Executor metrics updates are not logged, so do not get replayed. + // Stage executor metrics are logged at stage completion, for any of the executors and + // the driver for which we have metrics. We always have metrics for executors, because + // they are sent at task end as well as in executor metrics updates. We do not always + // have metrics for the driver, because they are only sent in executor metrics updates + // (at heartbeat intervals), and when we do, it is only in the original events, never + // in the replay, since executor metrics updates are not replayed. + // For this reason, exclude stage executor metrics for the driver. + val filteredEvents = originalEvents.filter { e => + if (e.isInstanceOf[SparkListenerStageExecutorMetrics] && + e.asInstanceOf[SparkListenerStageExecutorMetrics].execId == "driver") { + false + } else true + } + filteredEvents.zip(replayedEvents).foreach { case (e1, e2) => + JsonProtocolSuite.assertEquals(e1, e1) } } From 7ba6b969bc08e5a1d929c25d72f809647222af73 Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Wed, 4 Sep 2019 09:37:02 -0700 Subject: [PATCH 2/4] [SPARK-28770][CORE][TEST] Simplify the filter. Suggested by Jungtaek Lim. --- .../org/apache/spark/scheduler/ReplayListenerSuite.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index 0aa2db136dd8..1a61eecf723d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -230,11 +230,9 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp // (at heartbeat intervals), and when we do, it is only in the original events, never // in the replay, since executor metrics updates are not replayed. // For this reason, exclude stage executor metrics for the driver. - val filteredEvents = originalEvents.filter { e => - if (e.isInstanceOf[SparkListenerStageExecutorMetrics] && - e.asInstanceOf[SparkListenerStageExecutorMetrics].execId == "driver") { - false - } else true + val filteredEvents = originalEvents.filter { + case e: SparkListenerStageExecutorMetrics if e.execId == "driver" => false + case _ => true } filteredEvents.zip(replayedEvents).foreach { case (e1, e2) => JsonProtocolSuite.assertEquals(e1, e1) From 6fab21f447ee58cdd5b16c8e9cecc57061209747 Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Wed, 4 Sep 2019 15:08:33 -0700 Subject: [PATCH 3/4] [SPARK-28770][CORE][TEST] Change implementation of EventMonster. Instead of having EventMonster extend EventLoggingListener, have it extend SparkFirehoseListener and simply buffer all events it receives. This makes it much more suitable for verifying that the ReplyListnerBus replays all the events from event logs. With this new EventMonster, there is no need to have special exception handling for certain types of events. --- .../spark/scheduler/ReplayListenerSuite.scala | 43 ++++++------------- 1 file changed, 14 insertions(+), 29 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index 1a61eecf723d..e7faa70c812b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -21,12 +21,14 @@ import java.io._ import java.net.URI import java.util.concurrent.atomic.AtomicInteger +import scala.collection.mutable.ArrayBuffer + import org.apache.hadoop.fs.Path import org.json4s.JsonAST.JValue import org.json4s.jackson.JsonMethods._ import org.scalatest.BeforeAndAfter -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io.{CompressionCodec, LZ4CompressionCodec} import org.apache.spark.util.{JsonProtocol, JsonProtocolSuite, Utils} @@ -62,7 +64,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath) val logData = fileSystem.open(logFilePath) - val eventMonster = new EventMonster(conf) + val eventMonster = new EventMonster try { val replayer = new ReplayListenerBus() replayer.addListener(eventMonster) @@ -108,7 +110,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath) val replayer = new ReplayListenerBus() - val eventMonster = new EventMonster(conf) + val eventMonster = new EventMonster replayer.addListener(eventMonster) // Verify the replay returns the events given the input maybe truncated. @@ -145,7 +147,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath) val logData = fileSystem.open(logFilePath) - val eventMonster = new EventMonster(conf) + val eventMonster = new EventMonster try { val replayer = new ReplayListenerBus() replayer.addListener(eventMonster) @@ -207,7 +209,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp // Replay events val logData = EventLoggingListener.openEventLog(eventLog.getPath(), fileSystem) - val eventMonster = new EventMonster(conf) + val eventMonster = new EventMonster try { val replayer = new ReplayListenerBus() replayer.addListener(eventMonster) @@ -222,19 +224,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp .map(JsonProtocol.sparkEventFromJson(_)) val replayedEvents = eventMonster.loggedEvents .map(JsonProtocol.sparkEventFromJson(_)) - // Executor metrics updates are not logged, so do not get replayed. - // Stage executor metrics are logged at stage completion, for any of the executors and - // the driver for which we have metrics. We always have metrics for executors, because - // they are sent at task end as well as in executor metrics updates. We do not always - // have metrics for the driver, because they are only sent in executor metrics updates - // (at heartbeat intervals), and when we do, it is only in the original events, never - // in the replay, since executor metrics updates are not replayed. - // For this reason, exclude stage executor metrics for the driver. - val filteredEvents = originalEvents.filter { - case e: SparkListenerStageExecutorMetrics if e.execId == "driver" => false - case _ => true - } - filteredEvents.zip(replayedEvents).foreach { case (e1, e2) => + originalEvents.zip(replayedEvents).foreach { case (e1, e2) => JsonProtocolSuite.assertEquals(e1, e1) } } @@ -248,20 +238,15 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp /** * A simple listener that buffers all the events it receives. * - * The event buffering functionality must be implemented within EventLoggingListener itself. - * This is because of the following race condition: the event may be mutated between being - * processed by one listener and being processed by another. Thus, in order to establish - * a fair comparison between the original events and the replayed events, both functionalities - * must be implemented within one listener (i.e. the EventLoggingListener). - * - * This child listener inherits only the event buffering functionality, but does not actually - * log the events. */ - private class EventMonster(conf: SparkConf) - extends EventLoggingListener("test", None, new URI("testdir"), conf) { + private class EventMonster extends SparkFirehoseListener { - override def start() { } + private[scheduler] val loggedEvents = new ArrayBuffer[JValue] + override def onEvent(event: SparkListenerEvent) { + val eventJson = JsonProtocol.sparkEventToJson(event) + loggedEvents += eventJson + } } /* From a0287a80e6c457efd14118a790d0d2be35a05ba4 Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Thu, 5 Sep 2019 09:33:32 -0700 Subject: [PATCH 4/4] [SPARK-28770][CORE][TEST] Rename EventMonster to EventBufferingListener. --- .../apache/spark/scheduler/ReplayListenerSuite.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index e7faa70c812b..d65b5cbfc094 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -64,7 +64,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath) val logData = fileSystem.open(logFilePath) - val eventMonster = new EventMonster + val eventMonster = new EventBufferingListener try { val replayer = new ReplayListenerBus() replayer.addListener(eventMonster) @@ -110,7 +110,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath) val replayer = new ReplayListenerBus() - val eventMonster = new EventMonster + val eventMonster = new EventBufferingListener replayer.addListener(eventMonster) // Verify the replay returns the events given the input maybe truncated. @@ -147,7 +147,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath) val logData = fileSystem.open(logFilePath) - val eventMonster = new EventMonster + val eventMonster = new EventBufferingListener try { val replayer = new ReplayListenerBus() replayer.addListener(eventMonster) @@ -209,7 +209,7 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp // Replay events val logData = EventLoggingListener.openEventLog(eventLog.getPath(), fileSystem) - val eventMonster = new EventMonster + val eventMonster = new EventBufferingListener try { val replayer = new ReplayListenerBus() replayer.addListener(eventMonster) @@ -237,9 +237,8 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp /** * A simple listener that buffers all the events it receives. - * */ - private class EventMonster extends SparkFirehoseListener { + private class EventBufferingListener extends SparkFirehoseListener { private[scheduler] val loggedEvents = new ArrayBuffer[JValue]