Skip to content
Closed
Prev Previous commit
Next Next commit
Minor cleanups.
  • Loading branch information
JoshRosen committed Jun 5, 2017
commit 4a083decb7e817fab49f25f4f0fe119352525aa7
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,8 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus {

// Cap the capacity of the event queue so we get an explicit error (rather than
// an OOM exception) if it's perpetually being added to more quickly than it's being drained.
private val eventQueue = {
val capacity = conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)
require(capacity > 0, s"${LISTENER_BUS_EVENT_QUEUE_CAPACITY.key} must be > 0!")
new LinkedBlockingQueue[SparkListenerEvent](capacity)
}
private val eventQueue =
new LinkedBlockingQueue[SparkListenerEvent](conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))

private[spark] val metrics = new LiveListenerBusMetrics(eventQueue)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
val listenerAndMaybeTimer = iter.next()
val listener = listenerAndMaybeTimer._1
val maybeTimer = listenerAndMaybeTimer._2
var maybeTimerContext = if (maybeTimer != null) {
val maybeTimerContext = if (maybeTimer != null) {
maybeTimer.time()
} else {
null
Expand Down