Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,12 @@ import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend}
import org.apache.spark.scheduler.local.LocalSchedulerBackend
import org.apache.spark.status.{AppStatusPlugin, AppStatusStore}
import org.apache.spark.status.{AppStatusListener, AppStatusStore}
import org.apache.spark.storage._
import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump
import org.apache.spark.ui.{ConsoleProgressBar, SparkUI}
import org.apache.spark.util._
import org.apache.spark.util.kvstore.InMemoryStore

/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
Expand Down Expand Up @@ -416,7 +417,13 @@ class SparkContext(config: SparkConf) extends Logging {

// Initialize the app status store and listener before SparkEnv is created so that it gets
// all events.
_statusStore = AppStatusStore.createLiveStore(conf, l => listenerBus.addToStatusQueue(l))
_statusStore = {
// Create an in-memory store for a live application.
val store = new InMemoryStore()
val listener = new AppStatusListener(store, conf, true)
listenerBus.addToStatusQueue(listener)
new AppStatusStore(store, listener = Some(listener))
}

// Create the Spark execution environment (cache, map output tracker, etc)
_env = createSparkEnv(_conf, isLocal, listenerBus)
Expand Down Expand Up @@ -445,14 +452,9 @@ class SparkContext(config: SparkConf) extends Logging {
// For tests, do not enable the UI
None
}
_ui.foreach { ui =>
// Load any plugins that might want to modify the UI.
AppStatusPlugin.loadPlugins().foreach(_.setupUI(ui))

// Bind the UI before starting the task scheduler to communicate
// the bound port to the cluster manager properly
ui.bind()
}
// Bind the UI before starting the task scheduler to communicate
// the bound port to the cluster manager properly
_ui.foreach(_.bind())

_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,14 +318,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
(new InMemoryStore(), true)
}

val plugins = ServiceLoader.load(
classOf[SparkHistoryUIPlugin], Utils.getContextOrSparkClassLoader).asScala

if (needReplay) {
val replayBus = new ReplayListenerBus()
val listener = new AppStatusListener(kvstore, conf, false,
lastUpdateTime = Some(attempt.info.lastUpdated.getTime()))
replayBus.addListener(listener)
AppStatusPlugin.loadPlugins().foreach { plugin =>
plugin.setupListeners(conf, kvstore, l => replayBus.addListener(l), false)
}
for {
plugin <- plugins
listener <- plugin.createListeners(conf, kvstore)
} replayBus.addListener(listener)
try {
val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath))
replay(fileStatus, isApplicationCompleted(fileStatus), replayBus)
Expand All @@ -350,9 +354,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
HistoryServer.getAttemptURI(appId, attempt.info.attemptId),
attempt.info.startTime.getTime(),
attempt.info.appSparkVersion)
AppStatusPlugin.loadPlugins().foreach { plugin =>
plugin.setupUI(ui)
}
plugins.foreach(_.setupUI(ui))

val loadedUI = LoadedAppUI(ui)

Expand Down
19 changes: 19 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.storage.{BlockManagerId, BlockUpdatedInfo}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.kvstore.KVStore

@DeveloperApi
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
Expand Down Expand Up @@ -167,6 +168,24 @@ case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
@DeveloperApi
case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent

/**
* An interface for creating history listeners(to replay event logs) defined in other modules like
* SQL, and setup the UI of the plugin to rebuild the history UI.
*/
private[spark] trait SparkHistoryUIPlugin {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a UI plugin. It's also only marginally related to this source file.

It should remain in the .status package. If you really feel strongly about the existing name, you can use a different name (e.g. "AppHistoryServerPlugin" or something that doesn't explicit says "UI" or "Listener").


/**
* Creates listeners to replay the event logs.
*/
def createListeners(conf: SparkConf, store: KVStore): Seq[SparkListener]

/**
* Sets up UI of this plugin to rebuild the history UI.
*/
def setupUI(ui: SparkUI): Unit
}


/**
* Interface for listening to events from the Spark scheduler. Most applications should probably
* extend SparkListener or SparkFirehoseListener directly, rather than implementing this class.
Expand Down
71 changes: 0 additions & 71 deletions core/src/main/scala/org/apache/spark/status/AppStatusPlugin.scala

This file was deleted.

23 changes: 2 additions & 21 deletions core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@

package org.apache.spark.status

import java.io.File
import java.util.{Arrays, List => JList}
import java.util.{List => JList}

import scala.collection.JavaConverters._

import org.apache.spark.{JobExecutionStatus, SparkConf}
import org.apache.spark.scheduler.SparkListener
import org.apache.spark.status.api.v1
import org.apache.spark.ui.scope._
import org.apache.spark.util.{Distribution, Utils}
import org.apache.spark.util.Distribution
import org.apache.spark.util.kvstore.{InMemoryStore, KVStore}

/**
Expand Down Expand Up @@ -337,25 +336,7 @@ private[spark] class AppStatusStore(
}

private[spark] object AppStatusStore {

val CURRENT_VERSION = 1L

/**
* Create an in-memory store for a live application.
*
* @param conf Configuration.
* @param addListenerFn Function to register a listener with a bus.
*/
def createLiveStore(conf: SparkConf, addListenerFn: SparkListener => Unit): AppStatusStore = {
val store = new InMemoryStore()
val listener = new AppStatusListener(store, conf, true)
addListenerFn(listener)
AppStatusPlugin.loadPlugins().foreach { p =>
p.setupListeners(conf, store, addListenerFn, true)
}
new AppStatusStore(store, listener = Some(listener))
}

}

/**
Expand Down
24 changes: 12 additions & 12 deletions core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ import javax.servlet.http.HttpServletRequest

import scala.xml.Node

import org.mockito.Matchers.anyString
import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}

import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.status.AppStatusStore
import org.apache.spark.status.{AppStatusListener, AppStatusStore}
import org.apache.spark.ui.jobs.{StagePage, StagesTab}
import org.apache.spark.util.Utils
import org.apache.spark.util.kvstore.InMemoryStore

class StagePageSuite extends SparkFunSuite with LocalSparkContext {

Expand All @@ -55,20 +54,21 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
* This also runs a dummy stage to populate the page with useful content.
*/
private def renderStagePage(conf: SparkConf): Seq[Node] = {
val bus = new ReplayListenerBus()
val store = AppStatusStore.createLiveStore(conf, l => bus.addListener(l))
val kvStore = new InMemoryStore
val listener = new AppStatusListener(kvStore, conf, true)
val statusStore = new AppStatusStore(kvStore, listener = Some(listener))

try {
val tab = mock(classOf[StagesTab], RETURNS_SMART_NULLS)
when(tab.store).thenReturn(store)
when(tab.store).thenReturn(statusStore)

val request = mock(classOf[HttpServletRequest])
when(tab.conf).thenReturn(conf)
when(tab.appName).thenReturn("testing")
when(tab.headerTabs).thenReturn(Seq.empty)
when(request.getParameter("id")).thenReturn("0")
when(request.getParameter("attempt")).thenReturn("0")
val page = new StagePage(tab, store)
val page = new StagePage(tab, statusStore)

// Simulate a stage in job progress listener
val stageInfo = new StageInfo(0, 0, "dummy", 1, Seq.empty, Seq.empty, "details")
Expand All @@ -77,17 +77,17 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
taskId =>
val taskInfo = new TaskInfo(taskId, taskId, 0, 0, "0", "localhost", TaskLocality.ANY,
false)
bus.postToAll(SparkListenerStageSubmitted(stageInfo))
bus.postToAll(SparkListenerTaskStart(0, 0, taskInfo))
listener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo))
listener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo))
taskInfo.markFinished(TaskState.FINISHED, System.currentTimeMillis())
val taskMetrics = TaskMetrics.empty
taskMetrics.incPeakExecutionMemory(peakExecutionMemory)
bus.postToAll(SparkListenerTaskEnd(0, 0, "result", Success, taskInfo, taskMetrics))
listener.onTaskEnd(SparkListenerTaskEnd(0, 0, "result", Success, taskInfo, taskMetrics))
}
bus.postToAll(SparkListenerStageCompleted(stageInfo))
listener.onStageCompleted(SparkListenerStageCompleted(stageInfo))
page.render(request)
} finally {
store.close()
statusStore.close()
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.spark.sql.execution.ui.SQLHistoryUIPlugin

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,12 @@ import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.metric._
import org.apache.spark.status.LiveEntity
import org.apache.spark.status.config._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.kvstore.KVStore

private[sql] class SQLAppStatusListener(
class SQLAppStatusListener(
conf: SparkConf,
kvstore: KVStore,
live: Boolean,
ui: Option[SparkUI] = None)
live: Boolean)
extends SparkListener with Logging {

// How often to flush intermediate state of a live execution to the store. When replaying logs,
Expand All @@ -46,10 +44,11 @@ private[sql] class SQLAppStatusListener(
// Live tracked data is needed by the SQL status store to calculate metrics for in-flight
// executions; that means arbitrary threads may be querying these maps, so they need to be
// thread-safe.
private val liveExecutions = new ConcurrentHashMap[Long, LiveExecutionData]()
private val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]()
// Exposed for testing only.
private[sql] val liveExecutions = new ConcurrentHashMap[Long, LiveExecutionData]()

private var uiInitialized = false
// Exposed for testing only.
private[sql] val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]()

override def onJobStart(event: SparkListenerJobStart): Unit = {
val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
Expand Down Expand Up @@ -212,14 +211,6 @@ private[sql] class SQLAppStatusListener(
}

private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
// Install the SQL tab in a live app if it hasn't been initialized yet.
if (!uiInitialized) {
ui.foreach { _ui =>
new SQLTab(new SQLAppStatusStore(kvstore, Some(this)), _ui)
}
uiInitialized = true
}

val SparkListenerSQLExecutionStart(executionId, description, details,
physicalPlanDescription, sparkPlanInfo, time) = event

Expand Down Expand Up @@ -319,7 +310,7 @@ private[sql] class SQLAppStatusListener(

}

private class LiveExecutionData(val executionId: Long) extends LiveEntity {
class LiveExecutionData(val executionId: Long) extends LiveEntity {

var description: String = null
var details: String = null
Expand Down Expand Up @@ -354,13 +345,13 @@ private class LiveExecutionData(val executionId: Long) extends LiveEntity {

}

private class LiveStageMetrics(
class LiveStageMetrics(
val stageId: Int,
var attemptId: Int,
val accumulatorIds: Array[Long],
val taskMetrics: ConcurrentHashMap[Long, LiveTaskMetrics])

private[sql] class LiveTaskMetrics(
class LiveTaskMetrics(
val ids: Array[Long],
val values: Array[Long],
val succeeded: Boolean)
Loading