Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 22 additions & 23 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</td> +: getFormattedTimeQuantiles(serializationTimes)

val gettingResultTimes = validTasks.map { case TaskUIData(info, _, _) =>
getGettingResultTime(info).toDouble
getGettingResultTime(info, currentTime).toDouble
}
val gettingResultQuantiles =
<td>
Expand All @@ -346,7 +346,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
// machine and to send back the result (but not the time to fetch the task result,
// if it needed to be fetched from the block manager on the worker).
val schedulerDelays = validTasks.map { case TaskUIData(info, metrics, _) =>
getSchedulerDelay(info, metrics.get).toDouble
getSchedulerDelay(info, metrics.get, currentTime).toDouble
}
val schedulerDelayTitle = <td><span data-toggle="tooltip"
title={ToolTips.SCHEDULER_DELAY} data-placement="right">Scheduler Delay</span></td>
Expand Down Expand Up @@ -544,7 +544,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val serializationTimeProportion = toProportion(serializationTime)
val deserializationTime = metricsOpt.map(_.executorDeserializeTime).getOrElse(0L)
val deserializationTimeProportion = toProportion(deserializationTime)
val gettingResultTime = getGettingResultTime(taskUIData.taskInfo)
val gettingResultTime = getGettingResultTime(taskUIData.taskInfo, currentTime)
val gettingResultTimeProportion = toProportion(gettingResultTime)
val schedulerDelay = totalExecutionTime -
(executorComputingTime + shuffleReadTime + shuffleWriteTime +
Expand Down Expand Up @@ -685,11 +685,11 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
else metrics.map(_.executorRunTime).getOrElse(1L)
val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration)
else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("")
val schedulerDelay = metrics.map(getSchedulerDelay(info, _)).getOrElse(0L)
val schedulerDelay = metrics.map(getSchedulerDelay(info, _, currentTime)).getOrElse(0L)
val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L)
val taskDeserializationTime = metrics.map(_.executorDeserializeTime).getOrElse(0L)
val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L)
val gettingResultTime = getGettingResultTime(info)
val gettingResultTime = getGettingResultTime(info, currentTime)

val maybeAccumulators = info.accumulables
val accumulatorsReadable = maybeAccumulators.map{acc => s"${acc.name}: ${acc.update.get}"}
Expand Down Expand Up @@ -852,32 +852,31 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
<td>{errorSummary}{details}</td>
}

private def getGettingResultTime(info: TaskInfo): Long = {
if (info.gettingResultTime > 0) {
if (info.finishTime > 0) {
private def getGettingResultTime(info: TaskInfo, currentTime: Long): Long = {
if (info.gettingResult) {
if (info.finished) {
info.finishTime - info.gettingResultTime
} else {
// The task is still fetching the result.
System.currentTimeMillis - info.gettingResultTime
currentTime - info.gettingResultTime
}
} else {
0L
}
}

private def getSchedulerDelay(info: TaskInfo, metrics: TaskMetrics): Long = {
val totalExecutionTime =
if (info.gettingResult) {
info.gettingResultTime - info.launchTime
} else if (info.finished) {
info.finishTime - info.launchTime
} else {
0
}
val executorOverhead = (metrics.executorDeserializeTime +
metrics.resultSerializationTime)
math.max(
0,
totalExecutionTime - metrics.executorRunTime - executorOverhead - getGettingResultTime(info))
private def getSchedulerDelay(info: TaskInfo, metrics: TaskMetrics, currentTime: Long): Long = {
if (info.finished) {
val totalExecutionTime = info.finishTime - info.launchTime
val executorOverhead = (metrics.executorDeserializeTime +
metrics.resultSerializationTime)
math.max(
0,
totalExecutionTime - metrics.executorRunTime - executorOverhead -
getGettingResultTime(info, currentTime))
} else {
// The task is still running and the metrics like executorRunTime are not available.
0L
}
}
}