Skip to content
Closed
Prev Previous commit
Add configuration.
  • Loading branch information
JoshRosen committed Jun 8, 2017
commit 76b669ca6eb35a0cce4291702baa5d1f60adb467
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@ package object config {
.checkValue(_ > 0, "The capacity of listener bus event queue must not be negative")
.createWithDefault(10000)

private[spark] val LISTENER_BUS_METRICS_MAX_LISTENER_CLASSES_TIMED =
ConfigBuilder("spark.scheduler.listenerbus.metrics.maxListenerClassesTimed")
.internal()
.intConf
.createWithDefault(128)

// This property sets the root namespace for metrics reporting
private[spark] val METRICS_NAMESPACE = ConfigBuilder("spark.metrics.namespace")
.stringConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus {
private val eventQueue =
new LinkedBlockingQueue[SparkListenerEvent](conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))

private[spark] val metrics = new LiveListenerBusMetrics(eventQueue)
private[spark] val metrics = new LiveListenerBusMetrics(conf, eventQueue)

// Indicate if `start()` is called
private val started = new AtomicBoolean(false)
Expand Down Expand Up @@ -243,8 +243,11 @@ private[spark] object LiveListenerBus {
val name = "SparkListenerBus"
}

private[spark]
class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source with Logging {
private[spark] class LiveListenerBusMetrics(
conf: SparkConf,
queue: LinkedBlockingQueue[_])
extends Source with Logging {

override val sourceName: String = "LiveListenerBus"
override val metricRegistry: MetricRegistry = new MetricRegistry

Expand Down Expand Up @@ -285,7 +288,7 @@ class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source with
def getTimerForListenerClass(cls: Class[_ <: SparkListenerInterface]): Option[Timer] = {
synchronized {
val className = cls.getName
val maxTimed = 128
val maxTimed = conf.get(LISTENER_BUS_METRICS_MAX_LISTENER_CLASSES_TIMED)
perListenerClassTimers.get(className).orElse {
if (perListenerClassTimers.size == maxTimed) {
logError(s"Not measuring processing time for listener class $className because a " +
Expand Down