diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 60e3c6343122..ff0a339a39c6 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -332,7 +332,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { +: getFormattedTimeQuantiles(serializationTimes) val gettingResultTimes = validTasks.map { case TaskUIData(info, _, _) => - getGettingResultTime(info).toDouble + getGettingResultTime(info, currentTime).toDouble } val gettingResultQuantiles = @@ -346,7 +346,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { // machine and to send back the result (but not the time to fetch the task result, // if it needed to be fetched from the block manager on the worker). val schedulerDelays = validTasks.map { case TaskUIData(info, metrics, _) => - getSchedulerDelay(info, metrics.get).toDouble + getSchedulerDelay(info, metrics.get, currentTime).toDouble } val schedulerDelayTitle = Scheduler Delay @@ -544,7 +544,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { val serializationTimeProportion = toProportion(serializationTime) val deserializationTime = metricsOpt.map(_.executorDeserializeTime).getOrElse(0L) val deserializationTimeProportion = toProportion(deserializationTime) - val gettingResultTime = getGettingResultTime(taskUIData.taskInfo) + val gettingResultTime = getGettingResultTime(taskUIData.taskInfo, currentTime) val gettingResultTimeProportion = toProportion(gettingResultTime) val schedulerDelay = totalExecutionTime - (executorComputingTime + shuffleReadTime + shuffleWriteTime + @@ -685,11 +685,11 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { else metrics.map(_.executorRunTime).getOrElse(1L) val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration) else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("") - val schedulerDelay = metrics.map(getSchedulerDelay(info, _)).getOrElse(0L) + val schedulerDelay = metrics.map(getSchedulerDelay(info, _, currentTime)).getOrElse(0L) val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L) val taskDeserializationTime = metrics.map(_.executorDeserializeTime).getOrElse(0L) val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L) - val gettingResultTime = getGettingResultTime(info) + val gettingResultTime = getGettingResultTime(info, currentTime) val maybeAccumulators = info.accumulables val accumulatorsReadable = maybeAccumulators.map{acc => s"${acc.name}: ${acc.update.get}"} @@ -852,32 +852,31 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { {errorSummary}{details} } - private def getGettingResultTime(info: TaskInfo): Long = { - if (info.gettingResultTime > 0) { - if (info.finishTime > 0) { + private def getGettingResultTime(info: TaskInfo, currentTime: Long): Long = { + if (info.gettingResult) { + if (info.finished) { info.finishTime - info.gettingResultTime } else { // The task is still fetching the result. - System.currentTimeMillis - info.gettingResultTime + currentTime - info.gettingResultTime } } else { 0L } } - private def getSchedulerDelay(info: TaskInfo, metrics: TaskMetrics): Long = { - val totalExecutionTime = - if (info.gettingResult) { - info.gettingResultTime - info.launchTime - } else if (info.finished) { - info.finishTime - info.launchTime - } else { - 0 - } - val executorOverhead = (metrics.executorDeserializeTime + - metrics.resultSerializationTime) - math.max( - 0, - totalExecutionTime - metrics.executorRunTime - executorOverhead - getGettingResultTime(info)) + private def getSchedulerDelay(info: TaskInfo, metrics: TaskMetrics, currentTime: Long): Long = { + if (info.finished) { + val totalExecutionTime = info.finishTime - info.launchTime + val executorOverhead = (metrics.executorDeserializeTime + + metrics.resultSerializationTime) + math.max( + 0, + totalExecutionTime - metrics.executorRunTime - executorOverhead - + getGettingResultTime(info, currentTime)) + } else { + // The task is still running and the metrics like executorRunTime are not available. + 0L + } } }