Skip to content
Closed
Prev Previous commit
Next Next commit
Add test for per-listener-class timer; rename method.
  • Loading branch information
JoshRosen committed Jun 5, 2017
commit d1a5e991fb7fc3e7f93090c23d8088be8b650f61
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus {
}

override protected def getTimer(listener: SparkListenerInterface): Option[Timer] = {
metrics.getTimerForListener(listener)
metrics.getTimerForListenerClass(listener.getClass.asSubclass(classOf[SparkListenerInterface]))
}

/**
Expand Down Expand Up @@ -282,9 +282,9 @@ class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source with
* Returns a timer tracking the processing time of the given listener class.
* events processed by that listener. This method is thread-safe.
*/
def getTimerForListener(listener: SparkListenerInterface): Option[Timer] = {
def getTimerForListenerClass(cls: Class[_ <: SparkListenerInterface]): Option[Timer] = {
synchronized {
val className = listener.getClass.getName
val className = cls.getName
val maxTimed = 128
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be configurable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shrug. Maybe, but note that this would be 128 separate listener classes. Let me put in an undocumented configuration.

perListenerClassTimers.get(className).orElse {
if (perListenerClassTimers.size == maxTimed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
assert(counter.count === 5)
assert(bus.metrics.numEventsPosted.getCount === 5)

// Make sure per-listener-class timers were created:
assert(bus.metrics.getTimerForListenerClass(
classOf[BasicJobCounter].asSubclass(classOf[SparkListenerInterface])).get.getCount == 5)

// Listener bus must not be started twice
intercept[IllegalStateException] {
val bus = new LiveListenerBus(conf)
Expand Down