Skip to content
Closed
Prev Previous commit
Next Next commit
Protect against registering thousands of listener classes.
  • Loading branch information
JoshRosen committed May 26, 2017
commit 60c7448d2cff7dd809f9d75ff48b31e21b88a915
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ package org.apache.spark.scheduler
import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}

import scala.collection.mutable
import scala.util.DynamicVariable

import com.codahale.metrics.{Counter, Gauge, MetricRegistry, Timer}
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.metrics.source.Source
Expand Down Expand Up @@ -112,13 +113,8 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus {
}
}

override protected def createTimer(listener: SparkListenerInterface): Option[Timer] = {
if (listener.getClass.getName.startsWith("org.apache.spark")) {
metrics.perListenerTimers.size()
Some(metrics.perListenerTimers(listener.getClass.getSimpleName))
} else {
None
}
override protected def getTimer(listener: SparkListenerInterface): Option[Timer] = {
metrics.getTimerForListener(listener)
}

/**
Expand Down Expand Up @@ -250,7 +246,8 @@ private[spark] object LiveListenerBus {
val name = "SparkListenerBus"
}

private[spark] class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source {
private[spark]
class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source with Logging {
override val sourceName: String = "LiveListenerBus"
override val metricRegistry: MetricRegistry = new MetricRegistry

Expand Down Expand Up @@ -281,15 +278,29 @@ private[spark] class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) exten
})
}

// Guarded by synchronization.
private val perListenerClassTimers = mutable.Map[String, Timer]()

/**
* Mapping from fully-qualified listener class name to a timer tracking the processing time of
* events processed by that listener.
* Returns a timer tracking the processing time of the given listener class.
* events processed by that listener. This method is thread-safe.
*/
val perListenerTimers: LoadingCache[String, Timer] =
CacheBuilder.newBuilder().build[String, Timer](new CacheLoader[String, Timer] {
override def load(listenerName: String): Timer = {
metricRegistry.timer(MetricRegistry.name("listenerProcessingTime", listenerName))
def getTimerForListener(listener: SparkListenerInterface): Option[Timer] = {
synchronized {
val className = listener.getClass.getName
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible that users register the same listener twice? Then the class name may not be a good identifier for listeners. I think this is the main problem of having listener-wise metrics, how to identify each listener?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but my goal with these metrics is to be able to identify which listeners are causing performance problems and for that purpose it's more useful to group listeners by class rather than to instrument individual listeners. Most (all?) of Spark's internal listeners have one instance per driver / SparkContext, so in practice keeping track of stats on a per-instance basis wouldn't actually be a meaningful difference in typical cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll update the PR description to discuss this per-listener metric.

val maxTimed = 128
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be configurable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shrug. Maybe, but note that this would be 128 separate listener classes. Let me put in an undocumented configuration.

perListenerClassTimers.get(className).orElse {
if (perListenerClassTimers.size == maxTimed) {
logError(s"Not measuring processing time for listener class $className because a " +
s"maximum of $maxTimed listener classes are already timed.")
None
} else {
perListenerClassTimers(className) =
metricRegistry.timer(MetricRegistry.name("listenerProcessingTime", className))
perListenerClassTimers.get(className)
}
}
})
}
}
}

4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/util/ListenerBus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
* Returns a CodaHale metrics Timer for measuring the listener's event processing time.
* This method is intended to be overridden by subclasses.
*/
protected def createTimer(listener: L): Option[Timer] = None
protected def getTimer(listener: L): Option[Timer] = None

/**
* Add a listener to listen events. This method is thread-safe and can be called in any thread.
*/
final def addListener(listener: L): Unit = {
listenersPlusTimers.add((listener, createTimer(listener).orNull))
listenersPlusTimers.add((listener, getTimer(listener).orNull))
}

/**
Expand Down