Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[SPARK-18838][CORE] Introduce blocking strategy for LiveListener
## What changes were proposed in this pull request?
When the queue of the LiveListener is full, events are dropped in an
 arbitrary fashion. But some of them can be crucial for the good
 execution of the job. the job doesn t in general fail immediately, but
 presents incoherent state before dying.
 In this changelist:
 the queue implementation to increase  a bit the performances
 (reduce object allocation & decrease lock usage)
 A "waiting for space" strategy is introduced and can be used instead of
 the default and current "dropping" strategy.
 The new strategy even if it does not resolve the "real" problem
 (the small dequeing rate) bring a lot more stability to the jobs with a
  huge event production rate (i.e jobs with many partitions).

  ## How was this patch tested?
  Existing unit tests have been enriched to run on both strategy.
  + manual tests have been run on the cluster
  • Loading branch information
bOOm-X authored and Antoine PRANG committed Jun 1, 2017
commit bbcc72de7cd0807f546723f5be27cba9e983450c
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ package object config {
.checkValue(_ > 0, "The capacity of listener bus event queue must not be negative")
.createWithDefault(10000)

private[spark] val LISTENER_BUS_EVENT_QUEUE_DROP =
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.drop")
.booleanConf
.createWithDefault(true)

// This property sets the root namespace for metrics reporting
private[spark] val METRICS_NAMESPACE = ConfigBuilder("spark.metrics.namespace")
.stringConf
Expand Down
216 changes: 129 additions & 87 deletions core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

package org.apache.spark.scheduler

import java.util.concurrent._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.concurrent.locks.ReentrantLock

import scala.util.DynamicVariable

import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.util.Utils

Expand All @@ -38,55 +40,45 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa

import LiveListenerBus._

// Cap the capacity of the event queue so we get an explicit error (rather than
// an OOM exception) if it's perpetually being added to more quickly than it's being drained.
private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))
private lazy val BUFFER_SIZE = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)
private lazy val circularBuffer = new Array[SparkListenerEvent](BUFFER_SIZE)

// Indicate if `start()` is called
private val started = new AtomicBoolean(false)
// Indicate if `stop()` is called
private val stopped = new AtomicBoolean(false)
private lazy val queueStrategy = getQueueStrategy

/** A counter for dropped events. It will be reset every time we log it. */
private val droppedEventsCounter = new AtomicLong(0L)

/** When `droppedEventsCounter` was logged last time in milliseconds. */
@volatile private var lastReportTimestamp = 0L
private def getQueueStrategy: QueuingStrategy = {
val queueDrop = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_DROP)
if (queueDrop) {
new DropQueuingStrategy(BUFFER_SIZE)
} else {
new WaitQueuingStrategy(BUFFER_SIZE)
}
}

private val numberOfEvents = new AtomicInteger(0)

// Indicate if we are processing some event
// Guarded by `self`
private var processingEvent = false
@volatile private var writeIndex = 0
@volatile private var readIndex = 0

private val logDroppedEvent = new AtomicBoolean(false)
// Indicate if `start()` is called
private val started = new AtomicBoolean(false)
// Indicate if `stop()` is called
private val stopped = new AtomicBoolean(false)

// A counter that represents the number of events produced and consumed in the queue
private val eventLock = new Semaphore(0)
// only post is done from multiple threads so need a lock
private val postLock = new ReentrantLock()

private val listenerThread = new Thread(name) {
setDaemon(true)
override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
LiveListenerBus.withinListenerThread.withValue(true) {
while (true) {
eventLock.acquire()
self.synchronized {
processingEvent = true
}
try {
val event = eventQueue.poll
if (event == null) {
// Get out of the while loop and shutdown the daemon thread
if (!stopped.get) {
throw new IllegalStateException("Polling `null` from eventQueue means" +
" the listener bus has been stopped. So `stopped` must be true")
}
return
}
postToAll(event)
} finally {
self.synchronized {
processingEvent = false
}
while (!stopped.get() || numberOfEvents.get() > 0) {
if (numberOfEvents.get() > 0) {
postToAll(circularBuffer(readIndex))
numberOfEvents.decrementAndGet()
readIndex = (readIndex + 1) % BUFFER_SIZE
} else {
Thread.sleep(20) // give more chance for producer thread to be scheduled
}
}
}
Expand Down Expand Up @@ -115,30 +107,14 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
logError(s"$name has already stopped! Dropping event $event")
return
}
val eventAdded = eventQueue.offer(event)
if (eventAdded) {
eventLock.release()
} else {
onDropEvent(event)
droppedEventsCounter.incrementAndGet()
}

val droppedEvents = droppedEventsCounter.get
if (droppedEvents > 0) {
// Don't log too frequently
if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
// There may be multiple threads trying to decrease droppedEventsCounter.
// Use "compareAndSet" to make sure only one thread can win.
// And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
// then that thread will update it.
if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {
val prevLastReportTimestamp = lastReportTimestamp
lastReportTimestamp = System.currentTimeMillis()
logWarning(s"Dropped $droppedEvents SparkListenerEvents since " +
new java.util.Date(prevLastReportTimestamp))
}
}
postLock.lock()
val queueOrNot = queueStrategy.queue(numberOfEvents)
if(queueOrNot) {
circularBuffer(writeIndex) = event
numberOfEvents.incrementAndGet()
writeIndex = (writeIndex + 1) % BUFFER_SIZE
}
postLock.unlock()
}

/**
Expand Down Expand Up @@ -170,10 +146,15 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
/**
* Return whether the event queue is empty.
*
* The use of synchronized here guarantees that all events that once belonged to this queue
* The use of the post lock here guarantees that all events that once belonged to this queue
* have already been processed by all attached listeners, if this returns true.
*/
private def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty && !processingEvent }
private def queueIsEmpty: Boolean = {
postLock.lock()
val isEmpty = numberOfEvents.get() == 0
postLock.unlock()
isEmpty
}

/**
* Stop the listener bus. It will wait until the queued events have been processed, but drop the
Expand All @@ -183,30 +164,10 @@ private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends Spa
if (!started.get()) {
throw new IllegalStateException(s"Attempted to stop $name that has not yet started!")
}
if (stopped.compareAndSet(false, true)) {
// Call eventLock.release() so that listenerThread will poll `null` from `eventQueue` and know
// `stop` is called.
eventLock.release()
listenerThread.join()
} else {
// Keep quiet
}
stopped.set(true)
listenerThread.join()
}

/**
* If the event queue exceeds its capacity, the new events will be dropped. The subclasses will be
* notified with the dropped events.
*
* Note: `onDropEvent` can be called in any thread.
*/
def onDropEvent(event: SparkListenerEvent): Unit = {
if (logDroppedEvent.compareAndSet(false, true)) {
// Only log the following message once to avoid duplicated annoying logs.
logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
"This likely means one of the SparkListeners is too slow and cannot keep up with " +
"the rate at which tasks are being started by the scheduler.")
}
}
}

private[spark] object LiveListenerBus {
Expand All @@ -215,5 +176,86 @@ private[spark] object LiveListenerBus {

/** The thread name of Spark listener bus */
val name = "SparkListenerBus"

private trait FirstAndRecurrentLogging extends Logging {

@volatile private var numberOfTime = 0
/** When `numberOfTime` was logged last time in milliseconds. */
@volatile private var lastReportTimestamp = 0L
@volatile private var logFirstTime = false


def inc(): Unit = {
numberOfTime = numberOfTime + 1
}

def waringIfNotToClose(message: Int => String): Unit = {
if (numberOfTime > 0 &&
(System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000)) {
val prevLastReportTimestamp = lastReportTimestamp
lastReportTimestamp = System.currentTimeMillis()
logWarning(s"${message(numberOfTime)} SparkListenerEvents since " +
new java.util.Date(prevLastReportTimestamp))
numberOfTime = 0
}
}

def errorIfFirstTime(firstTimeAction: String): Unit = {
if (!logFirstTime) {
// Only log the following message once to avoid duplicated annoying logs.
logError(s"$firstTimeAction SparkListenerEvent because no remaining room in event" +
" queue. " +
"This likely means one of the SparkListeners is too slow and cannot keep up with " +
"the rate at which tasks are being started by the scheduler.")
logFirstTime = true
lastReportTimestamp = System.currentTimeMillis()
}
}

}

private trait QueuingStrategy {
/**
* this method indicate if an element should be queued or discarded
* @param numberOfEvents atomic integer: the queue size
* @return true if an element should be queued, false if it should be dropped
*/
def queue(numberOfEvents: AtomicInteger): Boolean

}

private class DropQueuingStrategy(val bufferSize: Int)
extends QueuingStrategy with FirstAndRecurrentLogging {

override def queue(numberOfEvents: AtomicInteger): Boolean = {
if (numberOfEvents.get() == bufferSize) {
errorIfFirstTime("Dropping")
inc()
waringIfNotToClose(count => s"Dropped $count")
false
} else {
true
}
}

}

private class WaitQueuingStrategy(val bufferSize: Int)
extends QueuingStrategy with FirstAndRecurrentLogging {

override def queue(numberOfEvents: AtomicInteger): Boolean = {
if (numberOfEvents.get() == bufferSize) {
errorIfFirstTime("Waiting for posting")
waringIfNotToClose(count => s"Waiting $count period posting")
while (numberOfEvents.get() == bufferSize) {
inc()
Thread.sleep(20) // give more chance for consumer thread to be scheduled
}
}
true
}

}

}

36 changes: 22 additions & 14 deletions core/src/main/scala/org/apache/spark/util/ListenerBus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@

package org.apache.spark.util

import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.atomic.AtomicReference

import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import scala.util.control.NonFatal

Expand All @@ -31,40 +30,49 @@ import org.apache.spark.internal.Logging
private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {

// Marked `private[spark]` for access in tests.
private[spark] val listeners = new CopyOnWriteArrayList[L]
private[spark] def listeners = internalHolder.get()
private val internalHolder = new AtomicReference[Array[L]](Array.empty.asInstanceOf[Array[L]])

/**
* Add a listener to listen events. This method is thread-safe and can be called in any thread.
*/
final def addListener(listener: L): Unit = {
listeners.add(listener)

var oldVal, candidate: Array[L] = null
do {
oldVal = listeners
candidate = oldVal.:+(listener)(oldVal.elemTag)
// This creates a new array so we can compare reference
} while (!internalHolder.compareAndSet(oldVal, candidate))
}

/**
* Remove a listener and it won't receive any events. This method is thread-safe and can be called
* in any thread.
*/
final def removeListener(listener: L): Unit = {
listeners.remove(listener)
var oldVal, candidate: Array[L] = null
do {
oldVal = listeners
candidate = oldVal.filter(l => !l.equals(listener))
} while (!internalHolder.compareAndSet(oldVal, candidate))
}

/**
* Post the event to all registered listeners. The `postToAll` caller should guarantee calling
* `postToAll` in the same thread for all events.
*/
def postToAll(event: E): Unit = {
// JavaConverters can create a JIterableWrapper if we use asScala.
// However, this method will be called frequently. To avoid the wrapper cost, here we use
// Java Iterator directly.
val iter = listeners.iterator
while (iter.hasNext) {
val listener = iter.next()
val currentVal = listeners
var i = 0
while(i < currentVal.length) {
try {
doPostEvent(listener, event)
doPostEvent(currentVal(i), event)
} catch {
case NonFatal(e) =>
logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
logError(s"Listener ${Utils.getFormattedClassName(currentVal(i))} threw an exception", e)
}
i = i + 1
}
}

Expand All @@ -76,7 +84,7 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {

private[spark] def findListenersByClass[T <: L : ClassTag](): Seq[T] = {
val c = implicitly[ClassTag[T]].runtimeClass
listeners.asScala.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq
listeners.filter(_.getClass == c).map(_.asInstanceOf[T])
}

}
Loading