Skip to content
41 changes: 14 additions & 27 deletions core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@

package org.apache.spark.ui.exec

import java.net.URLEncoder
import javax.servlet.http.HttpServletRequest

import scala.xml.Node

import org.apache.spark.status.api.v1.ExecutorSummary
import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
import org.apache.spark.util.Utils
import org.apache.spark.ui.{UIUtils, WebUIPage}

// This isn't even used anymore -- but we need to keep it b/c of a MiMa false positive
private[ui] case class ExecutorSummaryInfo(
Expand Down Expand Up @@ -83,18 +81,7 @@ private[spark] object ExecutorsPage {
val memUsed = status.memUsed
val maxMem = status.maxMem
val diskUsed = status.diskUsed
val totalCores = listener.executorToTotalCores.getOrElse(execId, 0)
val maxTasks = listener.executorToTasksMax.getOrElse(execId, 0)
val activeTasks = listener.executorToTasksActive.getOrElse(execId, 0)
val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0)
val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
val totalTasks = activeTasks + failedTasks + completedTasks
val totalDuration = listener.executorToDuration.getOrElse(execId, 0L)
val totalGCTime = listener.executorToJvmGCTime.getOrElse(execId, 0L)
val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0L)
val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L)
val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L)
val executorLogs = listener.executorToLogUrls.getOrElse(execId, Map.empty)
val taskSummary = listener.executorToTaskSummary.getOrElse(execId, ExecutorTaskSummary(execId))

new ExecutorSummary(
execId,
Expand All @@ -103,19 +90,19 @@ private[spark] object ExecutorsPage {
rddBlocks,
memUsed,
diskUsed,
totalCores,
maxTasks,
activeTasks,
failedTasks,
completedTasks,
totalTasks,
totalDuration,
totalGCTime,
totalInputBytes,
totalShuffleRead,
totalShuffleWrite,
taskSummary.totalCores,
taskSummary.tasksMax,
taskSummary.tasksActive,
taskSummary.tasksFailed,
taskSummary.tasksComplete,
taskSummary.tasksActive + taskSummary.tasksFailed + taskSummary.tasksComplete,
taskSummary.duration,
taskSummary.jvmGCTime,
taskSummary.inputBytes,
taskSummary.shuffleRead,
taskSummary.shuffleWrite,
maxMem,
executorLogs
taskSummary.executorLogs
)
}
}
112 changes: 66 additions & 46 deletions core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@

package org.apache.spark.ui.exec

import scala.collection.mutable.HashMap
import scala.collection.mutable.{LinkedHashMap, ListBuffer}

import org.apache.spark.{ExceptionFailure, Resubmitted, SparkConf, SparkContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
import org.apache.spark.storage.{StorageStatus, StorageStatusListener}
import org.apache.spark.ui.{SparkUI, SparkUITab}
import org.apache.spark.ui.jobs.UIData.ExecutorUIData

private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") {
val listener = parent.executorsListener
Expand All @@ -38,47 +37,67 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "exec
}
}

private[ui] case class ExecutorTaskSummary(
var executorId: String,
var totalCores: Int = 0,
var tasksMax: Int = 0,
var tasksActive: Int = 0,
var tasksFailed: Int = 0,
var tasksComplete: Int = 0,
var duration: Long = 0L,
var jvmGCTime: Long = 0L,
var inputBytes: Long = 0L,
var inputRecords: Long = 0L,
var outputBytes: Long = 0L,
var outputRecords: Long = 0L,
var shuffleRead: Long = 0L,
var shuffleWrite: Long = 0L,
var executorLogs: Map[String, String] = Map.empty,
var isAlive: Boolean = true
)

/**
* :: DeveloperApi ::
* A SparkListener that prepares information to be displayed on the ExecutorsTab
*/
@DeveloperApi
class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: SparkConf)
extends SparkListener {
val executorToTotalCores = HashMap[String, Int]()
val executorToTasksMax = HashMap[String, Int]()
val executorToTasksActive = HashMap[String, Int]()
val executorToTasksComplete = HashMap[String, Int]()
val executorToTasksFailed = HashMap[String, Int]()
val executorToDuration = HashMap[String, Long]()
val executorToJvmGCTime = HashMap[String, Long]()
val executorToInputBytes = HashMap[String, Long]()
val executorToInputRecords = HashMap[String, Long]()
val executorToOutputBytes = HashMap[String, Long]()
val executorToOutputRecords = HashMap[String, Long]()
val executorToShuffleRead = HashMap[String, Long]()
val executorToShuffleWrite = HashMap[String, Long]()
val executorToLogUrls = HashMap[String, Map[String, String]]()
val executorIdToData = HashMap[String, ExecutorUIData]()
var executorToTaskSummary = LinkedHashMap[String, ExecutorTaskSummary]()
var executorEvents = new ListBuffer[SparkListenerEvent]()

private val maxTimelineExecutors = conf.getInt("spark.ui.timeline.executors.maximum", 1000)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting close now, but what about spark.ui.timeline.retainedExecutors? that would be more consistent. Then what about spark.ui.timeline.retainedDeadExecutors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.ui.timeline.executors.maximum is similar to spark.ui.timeline.tasks.maximum. It is a configuration about ExecutorAdded event and ExecutorRemoved event, so spark.ui.timeline.retainedDeadExecutors is not suitable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK on spark.ui.timeline.executors.maximum. The dead executor config isn't relevant to the timeline?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executorToTaskSummary is used by ExecutorsPage. Dead executors are still retained in ExecutorsPage. So I can't remove this executor's information immediately after it is removed.

private val retainedDeadExecutors = conf.getInt("spark.ui.retainedDeadExecutors", 100)

def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList

def deadStorageStatusList: Seq[StorageStatus] = storageStatusListener.deadStorageStatusList

override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = synchronized {
val eid = executorAdded.executorId
executorToLogUrls(eid) = executorAdded.executorInfo.logUrlMap
executorToTotalCores(eid) = executorAdded.executorInfo.totalCores
executorToTasksMax(eid) = executorToTotalCores(eid) / conf.getInt("spark.task.cpus", 1)
executorIdToData(eid) = new ExecutorUIData(executorAdded.time)
val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid))
taskSummary.executorLogs = executorAdded.executorInfo.logUrlMap
taskSummary.totalCores = executorAdded.executorInfo.totalCores
taskSummary.tasksMax = taskSummary.totalCores / conf.getInt("spark.task.cpus", 1)
executorEvents += executorAdded
if (executorEvents.size > maxTimelineExecutors) {
executorEvents.remove(0)
}

val deadExecutors = executorToTaskSummary.filter(e => !e._2.isAlive)
if (deadExecutors.size > retainedDeadExecutors) {
val head = deadExecutors.head
executorToTaskSummary.remove(head._1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we remove only one elements in each time. So we would remove one element when each new executor is added.
Could we remove more elements at once time?

}
}

override def onExecutorRemoved(
executorRemoved: SparkListenerExecutorRemoved): Unit = synchronized {
val eid = executorRemoved.executorId
val uiData = executorIdToData(eid)
uiData.finishTime = Some(executorRemoved.time)
uiData.finishReason = Some(executorRemoved.reason)
executorEvents += executorRemoved
if (executorEvents.size > maxTimelineExecutors) {
executorEvents.remove(0)
}
executorToTaskSummary.get(executorRemoved.executorId).foreach(e => e.isAlive = false)
}

override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
Expand All @@ -87,19 +106,25 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
s.blockManagerId.executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER ||
s.blockManagerId.executorId == SparkContext.DRIVER_IDENTIFIER
}
storageStatus.foreach { s => executorToLogUrls(s.blockManagerId.executorId) = logs.toMap }
storageStatus.foreach { s =>
val eid = s.blockManagerId.executorId
val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid))
taskSummary.executorLogs = logs.toMap
}
}
}

override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
val eid = taskStart.taskInfo.executorId
executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1
val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid))
taskSummary.tasksActive += 1
}

override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
val info = taskEnd.taskInfo
if (info != null) {
val eid = info.executorId
val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid))
taskEnd.reason match {
case Resubmitted =>
// Note: For resubmitted tasks, we continue to use the metrics that belong to the
Expand All @@ -108,31 +133,26 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
// metrics added by each attempt, but this is much more complicated.
return
case e: ExceptionFailure =>
executorToTasksFailed(eid) = executorToTasksFailed.getOrElse(eid, 0) + 1
taskSummary.tasksFailed += 1
case _ =>
executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
taskSummary.tasksComplete += 1
}

executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 1) - 1
executorToDuration(eid) = executorToDuration.getOrElse(eid, 0L) + info.duration
if (taskSummary.tasksActive >= 1) {
taskSummary.tasksActive -= 1
}
taskSummary.duration += info.duration

// Update shuffle read/write
val metrics = taskEnd.taskMetrics
if (metrics != null) {
executorToInputBytes(eid) =
executorToInputBytes.getOrElse(eid, 0L) + metrics.inputMetrics.bytesRead
executorToInputRecords(eid) =
executorToInputRecords.getOrElse(eid, 0L) + metrics.inputMetrics.recordsRead
executorToOutputBytes(eid) =
executorToOutputBytes.getOrElse(eid, 0L) + metrics.outputMetrics.bytesWritten
executorToOutputRecords(eid) =
executorToOutputRecords.getOrElse(eid, 0L) + metrics.outputMetrics.recordsWritten

executorToShuffleRead(eid) =
executorToShuffleRead.getOrElse(eid, 0L) + metrics.shuffleReadMetrics.remoteBytesRead
executorToShuffleWrite(eid) =
executorToShuffleWrite.getOrElse(eid, 0L) + metrics.shuffleWriteMetrics.bytesWritten
executorToJvmGCTime(eid) = executorToJvmGCTime.getOrElse(eid, 0L) + metrics.jvmGCTime
taskSummary.inputBytes += metrics.inputMetrics.bytesRead
taskSummary.inputRecords += metrics.inputMetrics.recordsRead
taskSummary.outputBytes += metrics.outputMetrics.bytesWritten
taskSummary.outputRecords += metrics.outputMetrics.recordsWritten

taskSummary.shuffleRead += metrics.shuffleReadMetrics.remoteBytesRead
taskSummary.shuffleWrite += metrics.shuffleWriteMetrics.bytesWritten
taskSummary.jvmGCTime += metrics.jvmGCTime
}
}
}
Expand Down
66 changes: 33 additions & 33 deletions core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ import scala.xml._
import org.apache.commons.lang3.StringEscapeUtils

import org.apache.spark.JobExecutionStatus
import org.apache.spark.scheduler.StageInfo
import org.apache.spark.scheduler._
import org.apache.spark.ui._
import org.apache.spark.ui.jobs.UIData.{ExecutorUIData, JobUIData, StageUIData}
import org.apache.spark.ui.jobs.UIData.{JobUIData, StageUIData}
import org.apache.spark.util.Utils

/** Page showing list of all ongoing and recently finished jobs */
Expand Down Expand Up @@ -123,55 +123,55 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
}
}

private def makeExecutorEvent(executorUIDatas: HashMap[String, ExecutorUIData]): Seq[String] = {
private def makeExecutorEvent(executorUIDatas: Seq[SparkListenerEvent]):
Seq[String] = {
val events = ListBuffer[String]()
executorUIDatas.foreach {
case (executorId, event) =>
case a: SparkListenerExecutorAdded =>
val addedEvent =
s"""
|{
| 'className': 'executor added',
| 'group': 'executors',
| 'start': new Date(${event.startTime}),
| 'start': new Date(${a.time}),
| 'content': '<div class="executor-event-content"' +
| 'data-toggle="tooltip" data-placement="bottom"' +
| 'data-title="Executor ${executorId}<br>' +
| 'Added at ${UIUtils.formatDate(new Date(event.startTime))}"' +
| 'data-html="true">Executor ${executorId} added</div>'
| 'data-title="Executor ${a.executorId}<br>' +
| 'Added at ${UIUtils.formatDate(new Date(a.time))}"' +
| 'data-html="true">Executor ${a.executorId} added</div>'
|}
""".stripMargin
events += addedEvent
case e: SparkListenerExecutorRemoved =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no real content change here right? just that github makes it look like a new block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

val removedEvent =
s"""
|{
| 'className': 'executor removed',
| 'group': 'executors',
| 'start': new Date(${e.time}),
| 'content': '<div class="executor-event-content"' +
| 'data-toggle="tooltip" data-placement="bottom"' +
| 'data-title="Executor ${e.executorId}<br>' +
| 'Removed at ${UIUtils.formatDate(new Date(e.time))}' +
| '${
if (e.reason != null) {
s"""<br>Reason: ${e.reason.replace("\n", " ")}"""
} else {
""
}
}"' +
| 'data-html="true">Executor ${e.executorId} removed</div>'
|}
""".stripMargin
events += removedEvent

if (event.finishTime.isDefined) {
val removedEvent =
s"""
|{
| 'className': 'executor removed',
| 'group': 'executors',
| 'start': new Date(${event.finishTime.get}),
| 'content': '<div class="executor-event-content"' +
| 'data-toggle="tooltip" data-placement="bottom"' +
| 'data-title="Executor ${executorId}<br>' +
| 'Removed at ${UIUtils.formatDate(new Date(event.finishTime.get))}' +
| '${
if (event.finishReason.isDefined) {
s"""<br>Reason: ${event.finishReason.get.replace("\n", " ")}"""
} else {
""
}
}"' +
| 'data-html="true">Executor ${executorId} removed</div>'
|}
""".stripMargin
events += removedEvent
}
}
events.toSeq
}

private def makeTimeline(
jobs: Seq[JobUIData],
executors: HashMap[String, ExecutorUIData],
executors: Seq[SparkListenerEvent],
startTime: Long): Seq[Node] = {

val jobEventJsonAsStrSeq = makeJobEvent(jobs)
Expand Down Expand Up @@ -353,7 +353,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
var content = summary
val executorListener = parent.executorListener
content ++= makeTimeline(activeJobs ++ completedJobs ++ failedJobs,
executorListener.executorIdToData, startTime)
executorListener.executorEvents, startTime)

if (shouldShowActiveJobs) {
content ++= <h4 id="active">Active Jobs ({activeJobs.size})</h4> ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage
<div style="float: left">{k}</div>
<div style="float: right">
{
val logs = parent.executorsListener.executorToLogUrls.getOrElse(k, Map.empty)
val logs = parent.executorsListener.executorToTaskSummary.get(k)
.map(_.executorLogs).getOrElse(Map.empty)
logs.map {
case (logName, logUrl) => <div><a href={logUrl}>{logName}</a></div>
}
Expand Down
Loading