From 45d3cd879cffc78465621f1d2dc853d661d78cc3 Mon Sep 17 00:00:00 2001 From: Dhruve Ashar Date: Tue, 26 Jul 2016 13:23:33 -0500 Subject: [PATCH] [SPARK-15703][SCHEDULER][CORE][WEBUI] Make ListenerBus event queue size configurable This change adds a new configuration entry to specify the size of the spark listener bus event queue. The value for this config ("spark.scheduler.listenerbus.eventqueue.size") is set to a default to 10000. Note: I haven't currently documented the configuration entry. We can decide whether it would be appropriate to make it a public configuration or keep it as an undocumented one. Refer JIRA for more details. Ran existing jobs and verified the event queue size with debug logs and from the Spark WebUI Environment tab. Author: Dhruve Ashar Closes #14269 from dhruve/bug/SPARK-15703. --- .../scala/org/apache/spark/SparkContext.scala | 4 +-- .../spark/internal/config/package.scala | 5 ++++ .../spark/scheduler/LiveListenerBus.scala | 23 ++++++++------ .../scheduler/EventLoggingListenerSuite.scala | 4 +-- .../spark/scheduler/SparkListenerSuite.scala | 30 ++++++++++--------- .../BlockManagerReplicationSuite.scala | 9 ++++-- .../spark/storage/BlockManagerSuite.scala | 6 ++-- .../spark/ui/storage/StorageTabSuite.scala | 11 +++---- .../streaming/ReceivedBlockHandlerSuite.scala | 5 +++- 9 files changed, 60 insertions(+), 37 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 214758f03fdf..f3c2bb2680b9 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -249,7 +249,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli def isStopped: Boolean = stopped.get() // An asynchronous listener bus for Spark events - private[spark] val listenerBus = new LiveListenerBus + private[spark] val listenerBus = new LiveListenerBus(this) // This function allows components created by SparkEnv to be mocked in unit tests: private[spark] def createSparkEnv( @@ -2154,7 +2154,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } } - listenerBus.start(this) + listenerBus.start() _listenerBusStarted = true } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index f28a9a5cf81d..29f812a2ce11 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -112,4 +112,9 @@ package object config { // To limit how many applications are shown in the History Server summary ui private[spark] val HISTORY_UI_MAX_APPS = ConfigBuilder("spark.history.ui.maxApplications").intConf.createWithDefault(Integer.MAX_VALUE) + + private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE = + ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size") + .intConf + .createWithDefault(10000) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 1c21313d1cb1..bfa3c408f228 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -22,7 +22,8 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.util.DynamicVariable -import org.apache.spark.SparkContext +import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.internal.config._ import org.apache.spark.util.Utils /** @@ -32,18 +33,24 @@ import org.apache.spark.util.Utils * has started will events be actually propagated to all attached listeners. This listener bus * is stopped when `stop()` is called, and it will drop further events after stopping. */ -private[spark] class LiveListenerBus extends SparkListenerBus { +private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends SparkListenerBus { self => import LiveListenerBus._ - private var sparkContext: SparkContext = null - // Cap the capacity of the event queue so we get an explicit error (rather than // an OOM exception) if it's perpetually being added to more quickly than it's being drained. - private val EVENT_QUEUE_CAPACITY = 10000 - private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY) + private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize() + private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY) + + private def validateAndGetQueueSize(): Int = { + val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE) + if (queueSize <= 0) { + throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!") + } + queueSize + } // Indicate if `start()` is called private val started = new AtomicBoolean(false) @@ -96,11 +103,9 @@ private[spark] class LiveListenerBus extends SparkListenerBus { * listens for any additional events asynchronously while the listener bus is still running. * This should only be called once. * - * @param sc Used to stop the SparkContext in case the listener thread dies. */ - def start(sc: SparkContext): Unit = { + def start(): Unit = { if (started.compareAndSet(false, true)) { - sparkContext = sc listenerThread.start() } else { throw new IllegalStateException(s"$name already started!") diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index c4c80b5b57da..7f4859206e25 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -142,14 +142,14 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit extraConf.foreach { case (k, v) => conf.set(k, v) } val logName = compressionCodec.map("test-" + _).getOrElse("test") val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) - val listenerBus = new LiveListenerBus + val listenerBus = new LiveListenerBus(sc) val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None, 125L, "Mickey", None) val applicationEnd = SparkListenerApplicationEnd(1000L) // A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite eventLogger.start() - listenerBus.start(sc) + listenerBus.start() listenerBus.addListener(eventLogger) listenerBus.postToAll(applicationStart) listenerBus.postToAll(applicationEnd) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 5ba67afc0cd6..e8a88d4909a8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -37,13 +37,13 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match val jobCompletionTime = 1421191296660L test("don't call sc.stop in listener") { - sc = new SparkContext("local", "SparkListenerSuite") + sc = new SparkContext("local", "SparkListenerSuite", new SparkConf()) val listener = new SparkContextStoppingListener(sc) - val bus = new LiveListenerBus + val bus = new LiveListenerBus(sc) bus.addListener(listener) // Starting listener bus should flush all buffered events - bus.start(sc) + bus.start() bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) @@ -52,8 +52,9 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match } test("basic creation and shutdown of LiveListenerBus") { + sc = new SparkContext("local", "SparkListenerSuite", new SparkConf()) val counter = new BasicJobCounter - val bus = new LiveListenerBus + val bus = new LiveListenerBus(sc) bus.addListener(counter) // Listener bus hasn't started yet, so posting events should not increment counter @@ -61,7 +62,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match assert(counter.count === 0) // Starting listener bus should flush all buffered events - bus.start(sc) + bus.start() bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(counter.count === 5) @@ -72,14 +73,14 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match // Listener bus must not be started twice intercept[IllegalStateException] { - val bus = new LiveListenerBus - bus.start(sc) - bus.start(sc) + val bus = new LiveListenerBus(sc) + bus.start() + bus.start() } // ... or stopped before starting intercept[IllegalStateException] { - val bus = new LiveListenerBus + val bus = new LiveListenerBus(sc) bus.stop() } } @@ -106,12 +107,12 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match drained = true } } - - val bus = new LiveListenerBus + sc = new SparkContext("local", "SparkListenerSuite", new SparkConf()) + val bus = new LiveListenerBus(sc) val blockingListener = new BlockingListener bus.addListener(blockingListener) - bus.start(sc) + bus.start() bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) listenerStarted.acquire() @@ -353,13 +354,14 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match val badListener = new BadListener val jobCounter1 = new BasicJobCounter val jobCounter2 = new BasicJobCounter - val bus = new LiveListenerBus + sc = new SparkContext("local", "SparkListenerSuite", new SparkConf()) + val bus = new LiveListenerBus(sc) // Propagate events to bad listener first bus.addListener(badListener) bus.addListener(jobCounter1) bus.addListener(jobCounter2) - bus.start(sc) + bus.start() // Post events to all listeners, and wait until the queue is drained (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index 31687e614731..b9e3a364ee22 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -38,7 +38,10 @@ import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.storage.StorageLevel._ /** Testsuite that tests block replication in BlockManager */ -class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with BeforeAndAfter { +class BlockManagerReplicationSuite extends SparkFunSuite + with Matchers + with BeforeAndAfter + with LocalSparkContext { private val conf = new SparkConf(false).set("spark.app.id", "test") private var rpcEnv: RpcEnv = null @@ -91,8 +94,10 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo // to make cached peers refresh frequently conf.set("spark.storage.cachedPeersTtl", "10") + sc = new SparkContext("local", "test", conf) master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", - new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true) + new BlockManagerMasterEndpoint(rpcEnv, true, conf, + new LiveListenerBus(sc))), conf, true) allStores.clear() } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 6194d23f4d8d..e93eee273f16 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -49,7 +49,7 @@ import org.apache.spark.util._ import org.apache.spark.util.io.ChunkedByteBuffer class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach - with PrivateMethodTester with ResetSystemProperties { + with PrivateMethodTester with LocalSparkContext with ResetSystemProperties { import BlockManagerSuite._ @@ -107,8 +107,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr) conf.set("spark.driver.port", rpcEnv.address.port.toString) + sc = new SparkContext("local", "test", conf) master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", - new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true) + new BlockManagerMasterEndpoint(rpcEnv, true, conf, + new LiveListenerBus(sc))), conf, true) val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala index 411a0ddebeb7..f6c8418ba3ac 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala @@ -19,15 +19,14 @@ package org.apache.spark.ui.storage import org.scalatest.BeforeAndAfter -import org.apache.spark.{SparkConf, SparkFunSuite, Success} -import org.apache.spark.executor.TaskMetrics +import org.apache.spark._ import org.apache.spark.scheduler._ import org.apache.spark.storage._ /** * Test various functionality in the StorageListener that supports the StorageTab. */ -class StorageTabSuite extends SparkFunSuite with BeforeAndAfter { +class StorageTabSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfter { private var bus: LiveListenerBus = _ private var storageStatusListener: StorageStatusListener = _ private var storageListener: StorageListener = _ @@ -43,8 +42,10 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter { private val bm1 = BlockManagerId("big", "dog", 1) before { - bus = new LiveListenerBus - storageStatusListener = new StorageStatusListener(new SparkConf()) + val conf = new SparkConf() + sc = new SparkContext("local", "test", conf) + bus = new LiveListenerBus(sc) + storageStatusListener = new StorageStatusListener(conf) storageListener = new StorageListener(storageStatusListener) bus.addListener(storageStatusListener) bus.addListener(storageListener) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 5d362925a874..7e665454a540 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -48,6 +48,7 @@ class ReceivedBlockHandlerSuite extends SparkFunSuite with BeforeAndAfter with Matchers + with LocalSparkContext with Logging { import WriteAheadLogBasedBlockHandler._ @@ -78,8 +79,10 @@ class ReceivedBlockHandlerSuite rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr) conf.set("spark.driver.port", rpcEnv.address.port.toString) + sc = new SparkContext("local", "test", conf) blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", - new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true) + new BlockManagerMasterEndpoint(rpcEnv, true, conf, + new LiveListenerBus(sc))), conf, true) storageLevel = StorageLevel.MEMORY_ONLY_SER blockManager = createBlockManager(blockManagerSize, conf)