Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[SPARK-15703] Make ListenerBus event queue configurable
  • Loading branch information
dhruve committed Jul 19, 2016
commit 9c0cb445856cfd4384287c1ac00c4f6a87cdb52a
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
def isStopped: Boolean = stopped.get()

// An asynchronous listener bus for Spark events
private[spark] val listenerBus = new LiveListenerBus
private[spark] val listenerBus = new LiveListenerBus(this)

// This function allows components created by SparkEnv to be mocked in unit tests:
private[spark] def createSparkEnv(
Expand Down Expand Up @@ -2147,7 +2147,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
}

listenerBus.start(this)
listenerBus.start()
_listenerBusStarted = true
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,17 @@ import org.apache.spark.util.Utils
* has started will events be actually propagated to all attached listeners. This listener bus
* is stopped when `stop()` is called, and it will drop further events after stopping.
*/
private[spark] class LiveListenerBus extends SparkListenerBus {
private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends SparkListenerBus {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm modifying LiveListenerBus now and noticed that we're passing in sparkContext even though we only use it to access conf. I think it would have been better to just pass in conf here. It would make the initialization order constraints a lot clearer, too: right now it's not immediately clear why eventQueue needs to be a lazy val.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we also use this to tear down sparkContext when the listener dies. I still have a weak preference for the old code, however, since I think the lifecycle was clearer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I recall and as you mentioned, a ref to sparkContext was already being used to tear down when the listener died because of an uncaught Exception. The idea was to refactor the code to make sparkContext available at instantiation time and access the conf from it rather than passing it separately. This defn is cyclic and lazy val was used to ensure that conf was accessed only after sc was intialized.


self =>

import LiveListenerBus._

private var sparkContext: SparkContext = null

// Cap the capacity of the event queue so we get an explicit error (rather than
// an OOM exception) if it's perpetually being added to more quickly than it's being drained.
private val EVENT_QUEUE_CAPACITY = 10000
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
private lazy val EVENT_QUEUE_CAPACITY = sparkContext.conf.
getInt("spark.scheduler.listenerbus.eventqueue.size", 10000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should use a ConfigEntry for this

private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)

// Indicate if `start()` is called
private val started = new AtomicBoolean(false)
Expand Down Expand Up @@ -96,11 +95,9 @@ private[spark] class LiveListenerBus extends SparkListenerBus {
* listens for any additional events asynchronously while the listener bus is still running.
* This should only be called once.
*
* @param sc Used to stop the SparkContext in case the listener thread dies.
*/
def start(sc: SparkContext): Unit = {
def start(): Unit = {
if (started.compareAndSet(false, true)) {
sparkContext = sc
listenerThread.start()
} else {
throw new IllegalStateException(s"$name already started!")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,14 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
extraConf.foreach { case (k, v) => conf.set(k, v) }
val logName = compressionCodec.map("test-" + _).getOrElse("test")
val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf)
val listenerBus = new LiveListenerBus
val listenerBus = new LiveListenerBus(sc)
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
125L, "Mickey", None)
val applicationEnd = SparkListenerApplicationEnd(1000L)

// A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite
eventLogger.start()
listenerBus.start(sc)
listenerBus.start()
listenerBus.addListener(eventLogger)
listenerBus.postToAll(applicationStart)
listenerBus.postToAll(applicationEnd)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
test("don't call sc.stop in listener") {
sc = new SparkContext("local", "SparkListenerSuite")
val listener = new SparkContextStoppingListener(sc)
val bus = new LiveListenerBus
val bus = new LiveListenerBus(sc)
bus.addListener(listener)

// Starting listener bus should flush all buffered events
bus.start(sc)
bus.start()
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)

Expand All @@ -53,15 +53,15 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match

test("basic creation and shutdown of LiveListenerBus") {
val counter = new BasicJobCounter
val bus = new LiveListenerBus
val bus = new LiveListenerBus(sc)
bus.addListener(counter)

// Listener bus hasn't started yet, so posting events should not increment counter
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
assert(counter.count === 0)

// Starting listener bus should flush all buffered events
bus.start(sc)
bus.start()
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(counter.count === 5)

Expand All @@ -72,14 +72,14 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match

// Listener bus must not be started twice
intercept[IllegalStateException] {
val bus = new LiveListenerBus
bus.start(sc)
bus.start(sc)
val bus = new LiveListenerBus(sc)
bus.start()
bus.start()
}

// ... or stopped before starting
intercept[IllegalStateException] {
val bus = new LiveListenerBus
val bus = new LiveListenerBus(sc)
bus.stop()
}
}
Expand Down Expand Up @@ -107,11 +107,11 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
}
}

val bus = new LiveListenerBus
val bus = new LiveListenerBus(sc)
val blockingListener = new BlockingListener

bus.addListener(blockingListener)
bus.start(sc)
bus.start()
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))

listenerStarted.acquire()
Expand Down Expand Up @@ -353,13 +353,13 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val badListener = new BadListener
val jobCounter1 = new BasicJobCounter
val jobCounter2 = new BasicJobCounter
val bus = new LiveListenerBus
val bus = new LiveListenerBus(sc)

// Propagate events to bad listener first
bus.addListener(badListener)
bus.addListener(jobCounter1)
bus.addListener(jobCounter2)
bus.start(sc)
bus.start()

// Post events to all listeners, and wait until the queue is drained
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
conf.set("spark.storage.cachedPeersTtl", "10")

master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
new BlockManagerMasterEndpoint(rpcEnv, true, conf,
new LiveListenerBus(new SparkContext(conf)))), conf, true)
allStores.clear()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
conf.set("spark.driver.port", rpcEnv.address.port.toString)

master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
new BlockManagerMasterEndpoint(rpcEnv, true, conf,
new LiveListenerBus(new SparkContext(conf)))), conf, true)

val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.ui.storage

import org.scalatest.BeforeAndAfter

import org.apache.spark.{SparkConf, SparkFunSuite, Success}
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite, Success}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.storage._
Expand All @@ -43,8 +43,9 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
private val bm1 = BlockManagerId("big", "dog", 1)

before {
bus = new LiveListenerBus
storageStatusListener = new StorageStatusListener(new SparkConf())
val conf = new SparkConf()
bus = new LiveListenerBus(new SparkContext(conf))
storageStatusListener = new StorageStatusListener(conf)
storageListener = new StorageListener(storageStatusListener)
bus.addListener(storageStatusListener)
bus.addListener(storageListener)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ class ReceivedBlockHandlerSuite
conf.set("spark.driver.port", rpcEnv.address.port.toString)

blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
new BlockManagerMasterEndpoint(rpcEnv, true, conf,
new LiveListenerBus(new SparkContext(conf)))), conf, true)

storageLevel = StorageLevel.MEMORY_ONLY_SER
blockManager = createBlockManager(blockManagerSize, conf)
Expand Down