Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
rename a config
  • Loading branch information
cloud-fan committed May 26, 2017
commit 3f9d2020be7ce9fa46564667a398ccb6f8bfcbd4
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,11 @@ package object config {
.createOptional
// End blacklist confs

private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
private[spark] val LISTENER_BUS_EVENT_QUEUE_CAPACITY =
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.capacity")
.withAlternative("spark.scheduler.listenerbus.eventqueue.size")
.intConf
.checkValue(_ > 0, "The capacity of listener bus event queue must not be negative")
.createWithDefault(10000)

// This property sets the root namespace for metrics reporting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}

import scala.util.DynamicVariable

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.SparkContext
import org.apache.spark.internal.config._
import org.apache.spark.util.Utils

Expand All @@ -34,23 +34,14 @@ import org.apache.spark.util.Utils
* is stopped when `stop()` is called, and it will drop further events after stopping.
*/
private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends SparkListenerBus {

self =>

import LiveListenerBus._

// Cap the capacity of the event queue so we get an explicit error (rather than
// an OOM exception) if it's perpetually being added to more quickly than it's being drained.
private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize()
private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)

private def validateAndGetQueueSize(): Int = {
val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
if (queueSize <= 0) {
throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!")
}
queueSize
}
private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))

// Indicate if `start()` is called
private val started = new AtomicBoolean(false)
Expand Down