Skip to content

Commit 9c0cb44

Browse files
committed
[SPARK-15703] Make ListenerBus event queue configurable
1 parent 162d04a commit 9c0cb44

File tree

8 files changed

+31
-30
lines changed

8 files changed

+31
-30
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
249249
def isStopped: Boolean = stopped.get()
250250

251251
// An asynchronous listener bus for Spark events
252-
private[spark] val listenerBus = new LiveListenerBus
252+
private[spark] val listenerBus = new LiveListenerBus(this)
253253

254254
// This function allows components created by SparkEnv to be mocked in unit tests:
255255
private[spark] def createSparkEnv(
@@ -2147,7 +2147,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
21472147
}
21482148
}
21492149

2150-
listenerBus.start(this)
2150+
listenerBus.start()
21512151
_listenerBusStarted = true
21522152
}
21532153

core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,17 @@ import org.apache.spark.util.Utils
3232
* has started will events be actually propagated to all attached listeners. This listener bus
3333
* is stopped when `stop()` is called, and it will drop further events after stopping.
3434
*/
35-
private[spark] class LiveListenerBus extends SparkListenerBus {
35+
private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends SparkListenerBus {
3636

3737
self =>
3838

3939
import LiveListenerBus._
4040

41-
private var sparkContext: SparkContext = null
42-
4341
// Cap the capacity of the event queue so we get an explicit error (rather than
4442
// an OOM exception) if it's perpetually being added to more quickly than it's being drained.
45-
private val EVENT_QUEUE_CAPACITY = 10000
46-
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
43+
private lazy val EVENT_QUEUE_CAPACITY = sparkContext.conf.
44+
getInt("spark.scheduler.listenerbus.eventqueue.size", 10000)
45+
private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
4746

4847
// Indicate if `start()` is called
4948
private val started = new AtomicBoolean(false)
@@ -96,11 +95,9 @@ private[spark] class LiveListenerBus extends SparkListenerBus {
9695
* listens for any additional events asynchronously while the listener bus is still running.
9796
* This should only be called once.
9897
*
99-
* @param sc Used to stop the SparkContext in case the listener thread dies.
10098
*/
101-
def start(sc: SparkContext): Unit = {
99+
def start(): Unit = {
102100
if (started.compareAndSet(false, true)) {
103-
sparkContext = sc
104101
listenerThread.start()
105102
} else {
106103
throw new IllegalStateException(s"$name already started!")

core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,14 +142,14 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
142142
extraConf.foreach { case (k, v) => conf.set(k, v) }
143143
val logName = compressionCodec.map("test-" + _).getOrElse("test")
144144
val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf)
145-
val listenerBus = new LiveListenerBus
145+
val listenerBus = new LiveListenerBus(sc)
146146
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
147147
125L, "Mickey", None)
148148
val applicationEnd = SparkListenerApplicationEnd(1000L)
149149

150150
// A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite
151151
eventLogger.start()
152-
listenerBus.start(sc)
152+
listenerBus.start()
153153
listenerBus.addListener(eventLogger)
154154
listenerBus.postToAll(applicationStart)
155155
listenerBus.postToAll(applicationEnd)

core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,11 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
3939
test("don't call sc.stop in listener") {
4040
sc = new SparkContext("local", "SparkListenerSuite")
4141
val listener = new SparkContextStoppingListener(sc)
42-
val bus = new LiveListenerBus
42+
val bus = new LiveListenerBus(sc)
4343
bus.addListener(listener)
4444

4545
// Starting listener bus should flush all buffered events
46-
bus.start(sc)
46+
bus.start()
4747
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
4848
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
4949

@@ -53,15 +53,15 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
5353

5454
test("basic creation and shutdown of LiveListenerBus") {
5555
val counter = new BasicJobCounter
56-
val bus = new LiveListenerBus
56+
val bus = new LiveListenerBus(sc)
5757
bus.addListener(counter)
5858

5959
// Listener bus hasn't started yet, so posting events should not increment counter
6060
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
6161
assert(counter.count === 0)
6262

6363
// Starting listener bus should flush all buffered events
64-
bus.start(sc)
64+
bus.start()
6565
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
6666
assert(counter.count === 5)
6767

@@ -72,14 +72,14 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
7272

7373
// Listener bus must not be started twice
7474
intercept[IllegalStateException] {
75-
val bus = new LiveListenerBus
76-
bus.start(sc)
77-
bus.start(sc)
75+
val bus = new LiveListenerBus(sc)
76+
bus.start()
77+
bus.start()
7878
}
7979

8080
// ... or stopped before starting
8181
intercept[IllegalStateException] {
82-
val bus = new LiveListenerBus
82+
val bus = new LiveListenerBus(sc)
8383
bus.stop()
8484
}
8585
}
@@ -107,11 +107,11 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
107107
}
108108
}
109109

110-
val bus = new LiveListenerBus
110+
val bus = new LiveListenerBus(sc)
111111
val blockingListener = new BlockingListener
112112

113113
bus.addListener(blockingListener)
114-
bus.start(sc)
114+
bus.start()
115115
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
116116

117117
listenerStarted.acquire()
@@ -353,13 +353,13 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
353353
val badListener = new BadListener
354354
val jobCounter1 = new BasicJobCounter
355355
val jobCounter2 = new BasicJobCounter
356-
val bus = new LiveListenerBus
356+
val bus = new LiveListenerBus(sc)
357357

358358
// Propagate events to bad listener first
359359
bus.addListener(badListener)
360360
bus.addListener(jobCounter1)
361361
bus.addListener(jobCounter2)
362-
bus.start(sc)
362+
bus.start()
363363

364364
// Post events to all listeners, and wait until the queue is drained
365365
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }

core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
9292
conf.set("spark.storage.cachedPeersTtl", "10")
9393

9494
master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
95-
new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
95+
new BlockManagerMasterEndpoint(rpcEnv, true, conf,
96+
new LiveListenerBus(new SparkContext(conf)))), conf, true)
9697
allStores.clear()
9798
}
9899

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
108108
conf.set("spark.driver.port", rpcEnv.address.port.toString)
109109

110110
master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
111-
new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
111+
new BlockManagerMasterEndpoint(rpcEnv, true, conf,
112+
new LiveListenerBus(new SparkContext(conf)))), conf, true)
112113

113114
val initialize = PrivateMethod[Unit]('initialize)
114115
SizeEstimator invokePrivate initialize()

core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.ui.storage
1919

2020
import org.scalatest.BeforeAndAfter
2121

22-
import org.apache.spark.{SparkConf, SparkFunSuite, Success}
22+
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite, Success}
2323
import org.apache.spark.executor.TaskMetrics
2424
import org.apache.spark.scheduler._
2525
import org.apache.spark.storage._
@@ -43,8 +43,9 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
4343
private val bm1 = BlockManagerId("big", "dog", 1)
4444

4545
before {
46-
bus = new LiveListenerBus
47-
storageStatusListener = new StorageStatusListener(new SparkConf())
46+
val conf = new SparkConf()
47+
bus = new LiveListenerBus(new SparkContext(conf))
48+
storageStatusListener = new StorageStatusListener(conf)
4849
storageListener = new StorageListener(storageStatusListener)
4950
bus.addListener(storageStatusListener)
5051
bus.addListener(storageListener)

streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ class ReceivedBlockHandlerSuite
7878
conf.set("spark.driver.port", rpcEnv.address.port.toString)
7979

8080
blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
81-
new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
81+
new BlockManagerMasterEndpoint(rpcEnv, true, conf,
82+
new LiveListenerBus(new SparkContext(conf)))), conf, true)
8283

8384
storageLevel = StorageLevel.MEMORY_ONLY_SER
8485
blockManager = createBlockManager(blockManagerSize, conf)

0 commit comments

Comments
 (0)