Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
[SPARK-18838][CORE] Introduce asynchronous EventLoggingListener
## What changes were proposed in this pull request?
An asynchronous version of the EventLoggingListener has been introduced
This listener is the slowest one of all standard ones and is clearly the
 bottleneck of the liveListenerBus dequeing process

  ## How was this patch tested?
  Existing unit tests have been enriched to run on both version.
  + manual tests have been run on the cluster
  • Loading branch information
bOOm-X authored and Antoine PRANG committed Jun 5, 2017
commit c789da4e21c01e4803f206eaf8fcd463d733b743
22 changes: 19 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ class SparkContext(config: SparkConf) extends Logging {
def appName: String = _conf.get("spark.app.name")

private[spark] def isEventLogEnabled: Boolean = _conf.getBoolean("spark.eventLog.enabled", false)
private[spark] def isEventLogAsync: Boolean = _conf.getBoolean("spark.eventLog.async", false)

private[spark] def eventLogDir: Option[URI] = _eventLogDir
private[spark] def eventLogCodec: Option[String] = _eventLogCodec

Expand Down Expand Up @@ -525,9 +527,7 @@ class SparkContext(config: SparkConf) extends Logging {

_eventLogger =
if (isEventLogEnabled) {
val logger =
new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
_conf, _hadoopConfiguration)
val logger = getEventLogger(isEventLogAsync)
logger.start()
listenerBus.addListener(logger)
Some(logger)
Expand Down Expand Up @@ -593,6 +593,22 @@ class SparkContext(config: SparkConf) extends Logging {
}
}

private def getEventLogger(async: Boolean): EventLoggingListener = {
if (async) {
val queueSize = _conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)
new AsynchronousEventLoggingListener(_applicationId,
_applicationAttemptId,
_eventLogDir.get,
_conf,
_hadoopConfiguration,
queueSize)
}
else {
new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
_conf, _hadoopConfiguration)
}
}

/**
* Called by the web UI to obtain executor thread dumps. This method may be expensive.
* Logs an error and returns None if we failed to obtain a thread dump, which could occur due
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io._
import java.net.URI
import java.nio.charset.StandardCharsets
import java.util.Locale
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -131,20 +132,24 @@ private[spark] class EventLoggingListener(
}

/** Log the event as JSON. */
private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) {
protected def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) {
val eventJson = JsonProtocol.sparkEventToJson(event)
// scalastyle:off println
writer.foreach(_.println(compact(render(eventJson))))
// scalastyle:on println
if (flushLogger) {
writer.foreach(_.flush())
hadoopDataStream.foreach(_.hflush())
flush()
}
if (testing) {
loggedEvents += eventJson
}
}

private def flush(): Unit = {
writer.foreach(_.flush())
hadoopDataStream.foreach(_.hflush())
}

// Events that do not trigger a flush
override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = logEvent(event)

Expand Down Expand Up @@ -227,6 +232,7 @@ private[spark] class EventLoggingListener(
* ".inprogress" suffix.
*/
def stop(): Unit = {
flush()
writer.foreach(_.close())

val target = new Path(logPath)
Expand All @@ -250,6 +256,8 @@ private[spark] class EventLoggingListener(
}
}



private[spark] def redactEvent(
event: SparkListenerEnvironmentUpdate): SparkListenerEnvironmentUpdate = {
// environmentDetails maps a string descriptor to a set of properties
Expand All @@ -267,11 +275,89 @@ private[spark] class EventLoggingListener(

}

private[spark] sealed class AsynchronousEventLoggingListener(
appId: String,
appAttemptId : Option[String],
logBaseDir: URI,
sparkConf: SparkConf,
hadoopConf: Configuration,
val bufferSize: Int)
extends EventLoggingListener(appId, appAttemptId, logBaseDir, sparkConf, hadoopConf) {
import EventLoggingListener._

private lazy val eventBuffer = new Array[SparkListenerEvent](bufferSize)

private val numberOfEvents = new AtomicInteger(0)

@volatile private var writeIndex = 0
@volatile private var readIndex = 0
@volatile private var stopThread = false
@volatile private var lastReportTimestamp = 0L
@volatile private var numberOfDrop = 0
@volatile private var lastFlushEvent = 0

private val listenerThread = new Thread(THREAD_NAME) {
setDaemon(true)
override def run(): Unit = {
while (!stopThread || numberOfEvents.get() > 0) {
if (numberOfEvents.get() > 0) {
executelogEvent(eventBuffer(readIndex), lastFlushEvent == FLUSH_FREQUENCY)
numberOfEvents.decrementAndGet()
readIndex = (readIndex + 1) % bufferSize
if (lastFlushEvent == FLUSH_FREQUENCY) {
lastFlushEvent = 0
} else {
lastFlushEvent = lastFlushEvent + 1
}
} else {
Thread.sleep(20) // give more chance for producer thread to be scheduled
}
}
}
}

private def executelogEvent(event: SparkListenerEvent, flushLogger: Boolean) =
super.logEvent(event, flushLogger)

override protected def logEvent(event: SparkListenerEvent, flushLogger: Boolean): Unit = {
if (numberOfEvents.get() < bufferSize) {
eventBuffer(writeIndex) = event
numberOfEvents.incrementAndGet()
writeIndex = (writeIndex + 1) % bufferSize
} else {
numberOfDrop = numberOfDrop + 1
if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
val prevLastReportTimestamp = lastReportTimestamp
lastReportTimestamp = System.currentTimeMillis()
logWarning(
s"dropped $numberOfDrop SparkListenerEvents since " +
new java.util.Date(prevLastReportTimestamp))
numberOfDrop = 0
}
}
}

override def start(): Unit = {
super.start()
listenerThread.start()
}

override def stop(): Unit = {
stopThread = true
listenerThread.join()
super.stop()
}

}

private[spark] object EventLoggingListener extends Logging {
// Suffix applied to the names of files still being written by applications.
val IN_PROGRESS = ".inprogress"
val DEFAULT_LOG_DIR = "/tmp/spark-events"

val THREAD_NAME = "EventLoggingListener"
val FLUSH_FREQUENCY = 200

private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort)

// A cache for compression codecs to avoid creating the same codec many times
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,15 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
}
}

test("End-to-end event logging") {
testApplicationEventLogging()
}
Seq(false, true).foreach{ async =>
test((if (async) "Async " else "") + "End-to-end event logging") {
testApplicationEventLogging(None, async)
}

test("End-to-end event logging with compression") {
CompressionCodec.ALL_COMPRESSION_CODECS.foreach { codec =>
testApplicationEventLogging(compressionCodec = Some(CompressionCodec.getShortName(codec)))
test((if (async) "Async " else "") + "End-to-end event logging with compression") {
CompressionCodec.ALL_COMPRESSION_CODECS.foreach { codec =>
testApplicationEventLogging(Some(CompressionCodec.getShortName(codec)), async)
}
}
}

Expand Down Expand Up @@ -189,11 +191,12 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
* Test end-to-end event logging functionality in an application.
* This runs a simple Spark job and asserts that the expected events are logged when expected.
*/
private def testApplicationEventLogging(compressionCodec: Option[String] = None) {
private def testApplicationEventLogging(compressionCodec: Option[String], asynchronous: Boolean) {
// Set defaultFS to something that would cause an exception, to make sure we don't run
// into SPARK-6688.
val conf = getLoggingConf(testDirPath, compressionCodec)
.set("spark.hadoop.fs.defaultFS", "unsupported://example.com")
.set("spark.eventLog.async", asynchronous.toString)
sc = new SparkContext("local-cluster[2,2,1024]", "test", conf)
assert(sc.eventLogger.isDefined)
val eventLogger = sc.eventLogger.get
Expand Down