Skip to content

Commit a1fb5a8

Browse files
committed
WIP
1 parent 4816c2e commit a1fb5a8

File tree

8 files changed

+143
-40
lines changed

8 files changed

+143
-40
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ class SparkContext(config: SparkConf) extends Logging {
195195
private var _conf: SparkConf = _
196196
private var _eventLogDir: Option[URI] = None
197197
private var _eventLogCodec: Option[String] = None
198+
private var _listenerBus: LiveListenerBus = _
198199
private var _env: SparkEnv = _
199200
private var _jobProgressListener: JobProgressListener = _
200201
private var _statusTracker: SparkStatusTracker = _
@@ -247,7 +248,7 @@ class SparkContext(config: SparkConf) extends Logging {
247248
def isStopped: Boolean = stopped.get()
248249

249250
// An asynchronous listener bus for Spark events
250-
private[spark] val listenerBus = new LiveListenerBus(this)
251+
private[spark] def listenerBus: LiveListenerBus = _listenerBus
251252

252253
// This function allows components created by SparkEnv to be mocked in unit tests:
253254
private[spark] def createSparkEnv(
@@ -423,6 +424,8 @@ class SparkContext(config: SparkConf) extends Logging {
423424

424425
if (master == "yarn" && deployMode == "client") System.setProperty("SPARK_YARN_MODE", "true")
425426

427+
_listenerBus = new LiveListenerBus(_conf)
428+
426429
// "_jobProgressListener" should be set up before creating SparkEnv because when creating
427430
// "SparkEnv", some messages will be posted to "listenerBus" and we should not miss them.
428431
_jobProgressListener = new JobProgressListener(_conf)
@@ -2389,7 +2392,7 @@ class SparkContext(config: SparkConf) extends Logging {
23892392
}
23902393
}
23912394

2392-
listenerBus.start()
2395+
listenerBus.start(this, _env.metricsSystem)
23932396
_listenerBusStarted = true
23942397
}
23952398

core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala

Lines changed: 58 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,12 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
2222

2323
import scala.util.DynamicVariable
2424

25-
import org.apache.spark.{SparkContext, SparkException}
25+
import com.codahale.metrics.{Counter, Gauge, MetricRegistry, Timer}
26+
27+
import org.apache.spark.{SparkConf, SparkContext}
2628
import org.apache.spark.internal.config._
29+
import org.apache.spark.metrics.MetricsSystem
30+
import org.apache.spark.metrics.source.Source
2731
import org.apache.spark.util.Utils
2832

2933
/**
@@ -33,25 +37,24 @@ import org.apache.spark.util.Utils
3337
* has started will events be actually propagated to all attached listeners. This listener bus
3438
* is stopped when `stop()` is called, and it will drop further events after stopping.
3539
*/
36-
private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends SparkListenerBus {
40+
private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus {
3741

3842
self =>
3943

4044
import LiveListenerBus._
4145

46+
private var sparkContext: SparkContext = _
47+
4248
// Cap the capacity of the event queue so we get an explicit error (rather than
4349
// an OOM exception) if it's perpetually being added to more quickly than it's being drained.
44-
private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize()
45-
private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
46-
47-
private def validateAndGetQueueSize(): Int = {
48-
val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
49-
if (queueSize <= 0) {
50-
throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!")
51-
}
52-
queueSize
50+
private val eventQueue = {
51+
val capacity = conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
52+
require(capacity > 0, s"${LISTENER_BUS_EVENT_QUEUE_SIZE.key} must be > 0!")
53+
new LinkedBlockingQueue[SparkListenerEvent](capacity)
5354
}
5455

56+
private[spark] val metrics = new LiveListenerBusMetrics(eventQueue)
57+
5558
// Indicate if `start()` is called
5659
private val started = new AtomicBoolean(false)
5760
// Indicate if `stop()` is called
@@ -76,6 +79,7 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
7679
setDaemon(true)
7780
override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
7881
LiveListenerBus.withinListenerThread.withValue(true) {
82+
val timer = metrics.eventProcessingTime
7983
while (true) {
8084
eventLock.acquire()
8185
self.synchronized {
@@ -91,7 +95,12 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
9195
}
9296
return
9397
}
94-
postToAll(event)
98+
val timerContext = timer.time()
99+
try {
100+
postToAll(event)
101+
} finally {
102+
timerContext.stop()
103+
}
95104
} finally {
96105
self.synchronized {
97106
processingEvent = false
@@ -109,9 +118,12 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
109118
* listens for any additional events asynchronously while the listener bus is still running.
110119
* This should only be called once.
111120
*
121+
* @param sc Used to stop the SparkContext in case the listener thread dies.
112122
*/
113-
def start(): Unit = {
123+
def start(sc: SparkContext, metricsSystem: MetricsSystem): Unit = {
114124
if (started.compareAndSet(false, true)) {
125+
sparkContext = sc
126+
metricsSystem.registerSource(metrics)
115127
listenerThread.start()
116128
} else {
117129
throw new IllegalStateException(s"$name already started!")
@@ -124,11 +136,13 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
124136
logError(s"$name has already stopped! Dropping event $event")
125137
return
126138
}
139+
metrics.numEventsReceived.inc()
127140
val eventAdded = eventQueue.offer(event)
128141
if (eventAdded) {
129142
eventLock.release()
130143
} else {
131144
onDropEvent(event)
145+
metrics.numDroppedEvents.inc()
132146
droppedEventsCounter.incrementAndGet()
133147
}
134148

@@ -226,3 +240,34 @@ private[spark] object LiveListenerBus {
226240
val name = "SparkListenerBus"
227241
}
228242

243+
private[spark] class LiveListenerBusMetrics(queue: LinkedBlockingQueue[_]) extends Source {
244+
override val sourceName: String = "LiveListenerBus"
245+
override val metricRegistry: MetricRegistry = new MetricRegistry
246+
247+
/**
248+
* The total number of events posted to the LiveListenerBus. This counts the number of times
249+
* that `post()` is called, which might be less than the total number of events processed in
250+
* case events are dropped.
251+
*/
252+
val numEventsReceived: Counter = metricRegistry.counter(MetricRegistry.name("numEventsReceived"))
253+
254+
/**
255+
* The total number of events that were dropped without being delivered to listeners.
256+
*/
257+
val numDroppedEvents: Counter = metricRegistry.counter(MetricRegistry.name("numEventsDropped"))
258+
259+
/**
260+
* The amount of time taken to post a single event to all listeners.
261+
*/
262+
val eventProcessingTime: Timer = metricRegistry.timer(MetricRegistry.name("eventProcessingTime"))
263+
264+
/**
265+
* The number of of messages waiting in the queue.
266+
*/
267+
val queueSize: Gauge[Int] = {
268+
metricRegistry.register(MetricRegistry.name("queueSize"), new Gauge[Int]{
269+
override def getValue: Int = queue.size()
270+
})
271+
}
272+
}
273+

core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,14 +155,14 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
155155
extraConf.foreach { case (k, v) => conf.set(k, v) }
156156
val logName = compressionCodec.map("test-" + _).getOrElse("test")
157157
val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf)
158-
val listenerBus = new LiveListenerBus(sc)
158+
val listenerBus = new LiveListenerBus(conf)
159159
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
160160
125L, "Mickey", None)
161161
val applicationEnd = SparkListenerApplicationEnd(1000L)
162162

163163
// A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite
164164
eventLogger.start()
165-
listenerBus.start()
165+
listenerBus.start(sc, sc.env.metricsSystem)
166166
listenerBus.addListener(eventLogger)
167167
listenerBus.postToAll(applicationStart)
168168
listenerBus.postToAll(applicationEnd)

core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala

Lines changed: 73 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,13 @@ import java.util.concurrent.Semaphore
2222
import scala.collection.mutable
2323
import scala.collection.JavaConverters._
2424

25+
import org.mockito.Mockito
2526
import org.scalatest.Matchers
2627

2728
import org.apache.spark._
2829
import org.apache.spark.executor.TaskMetrics
30+
import org.apache.spark.internal.config.LISTENER_BUS_EVENT_QUEUE_SIZE
31+
import org.apache.spark.metrics.MetricsSystem
2932
import org.apache.spark.util.{ResetSystemProperties, RpcUtils}
3033

3134
class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Matchers
@@ -36,14 +39,17 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
3639

3740
val jobCompletionTime = 1421191296660L
3841

42+
private val mockSparkContext: SparkContext = Mockito.mock(classOf[SparkContext])
43+
private val mockMetricsSystem: MetricsSystem = Mockito.mock(classOf[MetricsSystem])
44+
3945
test("don't call sc.stop in listener") {
4046
sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
4147
val listener = new SparkContextStoppingListener(sc)
42-
val bus = new LiveListenerBus(sc)
48+
val bus = new LiveListenerBus(sc.conf)
4349
bus.addListener(listener)
4450

4551
// Starting listener bus should flush all buffered events
46-
bus.start()
52+
bus.start(sc, sc.env.metricsSystem)
4753
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
4854
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
4955

@@ -52,35 +58,50 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
5258
}
5359

5460
test("basic creation and shutdown of LiveListenerBus") {
55-
sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
61+
val conf = new SparkConf()
5662
val counter = new BasicJobCounter
57-
val bus = new LiveListenerBus(sc)
63+
val bus = new LiveListenerBus(conf)
5864
bus.addListener(counter)
5965

60-
// Listener bus hasn't started yet, so posting events should not increment counter
66+
// Metrics are initially empty.
67+
assert(bus.metrics.numEventsReceived.getCount === 0)
68+
assert(bus.metrics.numDroppedEvents.getCount === 0)
69+
assert(bus.metrics.queueSize.getValue === 0)
70+
assert(bus.metrics.eventProcessingTime.getCount === 0)
71+
72+
// Post five events:
6173
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
74+
75+
// Five messages should be marked as received and queued, but no messages should be posted to
76+
// listeners yet because the the listener bus hasn't been started.
77+
assert(bus.metrics.numEventsReceived.getCount === 5)
78+
assert(bus.metrics.queueSize.getValue === 5)
6279
assert(counter.count === 0)
6380

6481
// Starting listener bus should flush all buffered events
65-
bus.start()
82+
bus.start(mockSparkContext, mockMetricsSystem)
83+
Mockito.verify(mockMetricsSystem).registerSource(bus.metrics)
6684
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
6785
assert(counter.count === 5)
86+
assert(bus.metrics.queueSize.getValue === 0)
87+
assert(bus.metrics.eventProcessingTime.getCount === 5)
6888

6989
// After listener bus has stopped, posting events should not increment counter
7090
bus.stop()
7191
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
7292
assert(counter.count === 5)
93+
assert(bus.metrics.numEventsReceived.getCount === 5)
7394

7495
// Listener bus must not be started twice
7596
intercept[IllegalStateException] {
76-
val bus = new LiveListenerBus(sc)
77-
bus.start()
78-
bus.start()
97+
val bus = new LiveListenerBus(conf)
98+
bus.start(mockSparkContext, mockMetricsSystem)
99+
bus.start(mockSparkContext, mockMetricsSystem)
79100
}
80101

81102
// ... or stopped before starting
82103
intercept[IllegalStateException] {
83-
val bus = new LiveListenerBus(sc)
104+
val bus = new LiveListenerBus(conf)
84105
bus.stop()
85106
}
86107
}
@@ -107,12 +128,11 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
107128
drained = true
108129
}
109130
}
110-
sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
111-
val bus = new LiveListenerBus(sc)
131+
val bus = new LiveListenerBus(new SparkConf())
112132
val blockingListener = new BlockingListener
113133

114134
bus.addListener(blockingListener)
115-
bus.start()
135+
bus.start(mockSparkContext, mockMetricsSystem)
116136
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
117137

118138
listenerStarted.acquire()
@@ -138,6 +158,44 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
138158
assert(drained)
139159
}
140160

161+
test("metrics for dropped listener events") {
162+
val bus = new LiveListenerBus(new SparkConf().set(LISTENER_BUS_EVENT_QUEUE_SIZE, 1))
163+
164+
val listenerStarted = new Semaphore(0)
165+
val listenerWait = new Semaphore(0)
166+
167+
bus.addListener(new SparkListener {
168+
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
169+
listenerStarted.release()
170+
listenerWait.acquire()
171+
}
172+
})
173+
174+
bus.start(mockSparkContext, mockMetricsSystem)
175+
176+
// Post a message to the listener bus and wait for processing to begin:
177+
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
178+
listenerStarted.acquire()
179+
assert(bus.metrics.queueSize.getValue === 0)
180+
assert(bus.metrics.numDroppedEvents.getCount === 0)
181+
182+
// If we post an additional message then it should remain in the queue because the listener is
183+
// busy processing the first event:
184+
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
185+
assert(bus.metrics.queueSize.getValue === 1)
186+
assert(bus.metrics.numDroppedEvents.getCount === 0)
187+
188+
// The queue is now full, so any additional events posted to the listener will be dropped:
189+
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
190+
assert(bus.metrics.queueSize.getValue === 1)
191+
assert(bus.metrics.numDroppedEvents.getCount === 1)
192+
193+
194+
// Allow the the remaining events to be processed so we can stop the listener bus:
195+
listenerWait.release(2)
196+
bus.stop()
197+
}
198+
141199
test("basic creation of StageInfo") {
142200
sc = new SparkContext("local", "SparkListenerSuite")
143201
val listener = new SaveStageAndTaskInfo
@@ -354,14 +412,13 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
354412
val badListener = new BadListener
355413
val jobCounter1 = new BasicJobCounter
356414
val jobCounter2 = new BasicJobCounter
357-
sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
358-
val bus = new LiveListenerBus(sc)
415+
val bus = new LiveListenerBus(new SparkConf())
359416

360417
// Propagate events to bad listener first
361418
bus.addListener(badListener)
362419
bus.addListener(jobCounter1)
363420
bus.addListener(jobCounter2)
364-
bus.start()
421+
bus.start(mockSparkContext, mockMetricsSystem)
365422

366423
// Post events to all listeners, and wait until the queue is drained
367424
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }

core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
100100
sc = new SparkContext("local", "test", conf)
101101
master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
102102
new BlockManagerMasterEndpoint(rpcEnv, true, conf,
103-
new LiveListenerBus(sc))), conf, true)
103+
new LiveListenerBus(conf))), conf, true)
104104
allStores.clear()
105105
}
106106

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
124124
when(sc.conf).thenReturn(conf)
125125
master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
126126
new BlockManagerMasterEndpoint(rpcEnv, true, conf,
127-
new LiveListenerBus(sc))), conf, true)
127+
new LiveListenerBus(conf))), conf, true)
128128

129129
val initialize = PrivateMethod[Unit]('initialize)
130130
SizeEstimator invokePrivate initialize()

core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.storage._
2626
/**
2727
* Test various functionality in the StorageListener that supports the StorageTab.
2828
*/
29-
class StorageTabSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfter {
29+
class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
3030
private var bus: LiveListenerBus = _
3131
private var storageStatusListener: StorageStatusListener = _
3232
private var storageListener: StorageListener = _
@@ -43,8 +43,7 @@ class StorageTabSuite extends SparkFunSuite with LocalSparkContext with BeforeAn
4343

4444
before {
4545
val conf = new SparkConf()
46-
sc = new SparkContext("local", "test", conf)
47-
bus = new LiveListenerBus(sc)
46+
bus = new LiveListenerBus(conf)
4847
storageStatusListener = new StorageStatusListener(conf)
4948
storageListener = new StorageListener(storageStatusListener)
5049
bus.addListener(storageStatusListener)

0 commit comments

Comments
 (0)