Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Synchronize accesses to the LiveListenerBus' event queue
This guards against the race condition in which we (1) dequeue an event,
and (2) check for queue emptiness before (3) actually processing the
event in all attached listeners.

The solution is to make steps (1) and (3) atomic relatively to (2).
  • Loading branch information
andrewor14 committed Apr 25, 2014
commit eb486ae4c1d95682c0ebeb62edb242109210312e
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.scheduler

import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.{LinkedBlockingQueue, Semaphore}

import org.apache.spark.Logging

Expand All @@ -36,16 +36,24 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
private var queueFullErrorMessageLogged = false
private var started = false

// A counter that represents the number of events produced and consumed in the queue
private val eventLock = new Semaphore(0)

private val listenerThread = new Thread("SparkListenerBus") {
setDaemon(true)
override def run() {
while (true) {
val event = eventQueue.take
if (event == SparkListenerShutdown) {
// Get out of the while loop and shutdown the daemon thread
return
eventLock.acquire()
// Atomically remove and process this event
LiveListenerBus.this.synchronized {
val event = eventQueue.poll
if (event == SparkListenerShutdown) {
// Get out of the while loop and shutdown the daemon thread
return
}
Option(event).foreach(postToAll)
}
postToAll(event)
}
}
}
Expand Down Expand Up @@ -73,16 +81,17 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
"rate at which tasks are being started by the scheduler.")
queueFullErrorMessageLogged = true
}
eventLock.release()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can only send release if eventAdded. It's a minor detail but would also mean that event should never be null, which should (slightly) help the bus to catch up with the workload.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, didn't realize offer also returns a boolean

}

/**
* Waits until there are no more events in the queue, or until the specified time has elapsed.
* Used for testing only. Returns true if the queue has emptied and false is the specified time
* For testing only. Wait until there are no more events in the queue, or until the specified
* time has elapsed. Return true if the queue has emptied and false is the specified time
* elapsed before the queue emptied.
*/
def waitUntilEmpty(timeoutMillis: Int): Boolean = {
val finishTime = System.currentTimeMillis + timeoutMillis
while (!eventQueue.isEmpty) {
while (!queueIsEmpty) {
if (System.currentTimeMillis > finishTime) {
return false
}
Expand All @@ -93,6 +102,14 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
true
}

/**
* Return whether the event queue is empty.
*
* The use of synchronized here guarantees that all events that once belonged to this queue
* have already been processed by all attached listeners, if this returns true.
*/
def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty }

def stop() {
if (!started) {
throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@ import org.apache.spark.SparkContext._
import org.apache.spark.executor.TaskMetrics

class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
with BeforeAndAfter with BeforeAndAfterAll {
with BeforeAndAfter with BeforeAndAfterAll {

/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000

before {
sc = new SparkContext("local", "SparkListenerSuite")
}

override def afterAll {
override def afterAll() {
System.clearProperty("spark.akka.frameSize")
}

Expand Down