From 131164c2104a119468e782fb1d484f2d15274e33 Mon Sep 17 00:00:00 2001 From: Shahid Date: Mon, 19 Nov 2018 04:08:21 +0530 Subject: [PATCH 1/5] taskMetrics duration --- .../main/scala/org/apache/spark/status/AppStatusStore.scala | 5 +++++ core/src/main/scala/org/apache/spark/status/api/v1/api.scala | 1 + core/src/main/scala/org/apache/spark/status/storeTypes.scala | 1 + core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 2 +- 4 files changed, 8 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 63b9d8988499..cbcf02be5a06 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -181,6 +181,7 @@ private[spark] class AppStatusStore( val distributions = new v1.TaskMetricDistributions( quantiles = quantiles, + duration = toValues(_.duration), executorDeserializeTime = toValues(_.executorDeserializeTime), executorDeserializeCpuTime = toValues(_.executorDeserializeCpuTime), executorRunTime = toValues(_.executorRunTime), @@ -250,6 +251,9 @@ private[spark] class AppStatusStore( val computedQuantiles = new v1.TaskMetricDistributions( quantiles = quantiles, + duration = scanTasks(TaskIndexNames.DURATION) { t => + t.duration + }, executorDeserializeTime = scanTasks(TaskIndexNames.DESER_TIME) { t => t.executorDeserializeTime }, @@ -301,6 +305,7 @@ private[spark] class AppStatusStore( .filter { case (q, _) => quantiles.contains(q) && shouldCacheQuantile(q) } .foreach { case (q, idx) => val cached = new CachedQuantile(stageId, stageAttemptId, quantileToString(q), count, + duration = computedQuantiles.duration(idx), executorDeserializeTime = computedQuantiles.executorDeserializeTime(idx), executorDeserializeCpuTime = computedQuantiles.executorDeserializeCpuTime(idx), executorRunTime = computedQuantiles.executorRunTime(idx), diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 30afd8b76972..f660f1ba8a5c 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -296,6 +296,7 @@ class ShuffleWriteMetrics private[spark]( class TaskMetricDistributions private[spark]( val quantiles: IndexedSeq[Double], + val duration: IndexedSeq[Double], val executorDeserializeTime: IndexedSeq[Double], val executorDeserializeCpuTime: IndexedSeq[Double], val executorRunTime: IndexedSeq[Double], diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index 646cf25880e3..5653afc1b391 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -448,6 +448,7 @@ private[spark] class CachedQuantile( val taskCount: Long, // The following fields are an exploded view of a single entry for TaskMetricDistributions. + val duration: Double, val executorDeserializeTime: Double, val executorDeserializeCpuTime: Double, val executorRunTime: Double, diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 477b9ce7f784..9d0e129c6f39 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -352,7 +352,7 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We val deserializationQuantiles = titleCell("Task Deserialization Time", ToolTips.TASK_DESERIALIZATION_TIME) ++ timeQuantiles(metrics.executorDeserializeTime) - val serviceQuantiles = simpleTitleCell("Duration") ++ timeQuantiles(metrics.executorRunTime) + val serviceQuantiles = simpleTitleCell("Duration") ++ timeQuantiles(metrics.duration) val gcQuantiles = titleCell("GC Time", ToolTips.GC_TIME) ++ timeQuantiles(metrics.jvmGcTime) From f834190d7e2980071b9f5b44d71b905ab5115121 Mon Sep 17 00:00:00 2001 From: Shahid Date: Mon, 19 Nov 2018 15:38:20 +0530 Subject: [PATCH 2/5] test failure correction --- .../stage_task_summary_w__custom_quantiles_expectation.json | 1 + .../stage_task_summary_w_shuffle_read_expectation.json | 1 + .../stage_task_summary_w_shuffle_write_expectation.json | 1 + 3 files changed, 3 insertions(+) diff --git a/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w__custom_quantiles_expectation.json b/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w__custom_quantiles_expectation.json index 5c42ac1d87f4..51af1f3b82c7 100644 --- a/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w__custom_quantiles_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w__custom_quantiles_expectation.json @@ -1,5 +1,6 @@ { "quantiles" : [ 0.01, 0.5, 0.99 ], + "duration" : [ 21.0, 40.0, 435.0 ], "executorDeserializeTime" : [ 1.0, 3.0, 36.0 ], "executorDeserializeCpuTime" : [ 0.0, 0.0, 0.0 ], "executorRunTime" : [ 16.0, 28.0, 351.0 ], diff --git a/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_read_expectation.json b/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_read_expectation.json index e6b705989cc9..1a498dbdcc6c 100644 --- a/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_read_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_read_expectation.json @@ -1,5 +1,6 @@ { "quantiles" : [ 0.05, 0.25, 0.5, 0.75, 0.95 ], + "duration" : [ 37.0, 81.0, 83.0, 86.0 ], "executorDeserializeTime" : [ 1.0, 2.0, 2.0, 2.0, 3.0 ], "executorDeserializeCpuTime" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ], "executorRunTime" : [ 30.0, 74.0, 75.0, 76.0, 79.0 ], diff --git a/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_write_expectation.json b/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_write_expectation.json index 788f28cf7b36..78aa1b1fa809 100644 --- a/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_write_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_write_expectation.json @@ -1,5 +1,6 @@ { "quantiles" : [ 0.05, 0.25, 0.5, 0.75, 0.95 ], + "duration" : [ 23.0, 33.0, 40.0, 74.0, 419.0 ], "executorDeserializeTime" : [ 2.0, 2.0, 3.0, 7.0, 31.0 ], "executorDeserializeCpuTime" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ], "executorRunTime" : [ 16.0, 18.0, 28.0, 49.0, 349.0 ], From b9a06a924840ab1bc042d91d0adf5c1928a654d8 Mon Sep 17 00:00:00 2001 From: Shahid Date: Tue, 20 Nov 2018 03:27:19 +0530 Subject: [PATCH 3/5] update PR --- .../main/scala/org/apache/spark/status/AppStatusStore.scala | 5 ----- .../src/main/scala/org/apache/spark/status/api/v1/api.scala | 1 - .../src/main/scala/org/apache/spark/status/storeTypes.scala | 1 - .../src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 6 +++--- .../stage_task_summary_w__custom_quantiles_expectation.json | 1 - .../stage_task_summary_w_shuffle_read_expectation.json | 1 - .../stage_task_summary_w_shuffle_write_expectation.json | 1 - 7 files changed, 3 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index cbcf02be5a06..63b9d8988499 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -181,7 +181,6 @@ private[spark] class AppStatusStore( val distributions = new v1.TaskMetricDistributions( quantiles = quantiles, - duration = toValues(_.duration), executorDeserializeTime = toValues(_.executorDeserializeTime), executorDeserializeCpuTime = toValues(_.executorDeserializeCpuTime), executorRunTime = toValues(_.executorRunTime), @@ -251,9 +250,6 @@ private[spark] class AppStatusStore( val computedQuantiles = new v1.TaskMetricDistributions( quantiles = quantiles, - duration = scanTasks(TaskIndexNames.DURATION) { t => - t.duration - }, executorDeserializeTime = scanTasks(TaskIndexNames.DESER_TIME) { t => t.executorDeserializeTime }, @@ -305,7 +301,6 @@ private[spark] class AppStatusStore( .filter { case (q, _) => quantiles.contains(q) && shouldCacheQuantile(q) } .foreach { case (q, idx) => val cached = new CachedQuantile(stageId, stageAttemptId, quantileToString(q), count, - duration = computedQuantiles.duration(idx), executorDeserializeTime = computedQuantiles.executorDeserializeTime(idx), executorDeserializeCpuTime = computedQuantiles.executorDeserializeCpuTime(idx), executorRunTime = computedQuantiles.executorRunTime(idx), diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index f660f1ba8a5c..30afd8b76972 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -296,7 +296,6 @@ class ShuffleWriteMetrics private[spark]( class TaskMetricDistributions private[spark]( val quantiles: IndexedSeq[Double], - val duration: IndexedSeq[Double], val executorDeserializeTime: IndexedSeq[Double], val executorDeserializeCpuTime: IndexedSeq[Double], val executorRunTime: IndexedSeq[Double], diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index 5653afc1b391..646cf25880e3 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -448,7 +448,6 @@ private[spark] class CachedQuantile( val taskCount: Long, // The following fields are an exploded view of a single entry for TaskMetricDistributions. - val duration: Double, val executorDeserializeTime: Double, val executorDeserializeCpuTime: Double, val executorRunTime: Double, diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 9d0e129c6f39..e5abd38f2e9b 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -352,7 +352,7 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We val deserializationQuantiles = titleCell("Task Deserialization Time", ToolTips.TASK_DESERIALIZATION_TIME) ++ timeQuantiles(metrics.executorDeserializeTime) - val serviceQuantiles = simpleTitleCell("Duration") ++ timeQuantiles(metrics.duration) + val serviceQuantiles = simpleTitleCell("Duration") ++ timeQuantiles(metrics.executorRunTime) val gcQuantiles = titleCell("GC Time", ToolTips.GC_TIME) ++ timeQuantiles(metrics.jvmGcTime) @@ -843,7 +843,7 @@ private[ui] class TaskPagedTable( {UIUtils.formatDate(task.launchTime)} - {formatDuration(task.duration)} + {formatDuration(task.taskMetrics.map(_.executorRunTime))} {UIUtils.formatDuration(AppStatusUtils.schedulerDelay(task))} @@ -996,7 +996,7 @@ private[ui] object ApiHelper { HEADER_EXECUTOR -> TaskIndexNames.EXECUTOR, HEADER_HOST -> TaskIndexNames.HOST, HEADER_LAUNCH_TIME -> TaskIndexNames.LAUNCH_TIME, - HEADER_DURATION -> TaskIndexNames.DURATION, + HEADER_DURATION -> TaskIndexNames.EXEC_RUN_TIME, HEADER_SCHEDULER_DELAY -> TaskIndexNames.SCHEDULER_DELAY, HEADER_DESER_TIME -> TaskIndexNames.DESER_TIME, HEADER_GC_TIME -> TaskIndexNames.GC_TIME, diff --git a/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w__custom_quantiles_expectation.json b/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w__custom_quantiles_expectation.json index 51af1f3b82c7..5c42ac1d87f4 100644 --- a/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w__custom_quantiles_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w__custom_quantiles_expectation.json @@ -1,6 +1,5 @@ { "quantiles" : [ 0.01, 0.5, 0.99 ], - "duration" : [ 21.0, 40.0, 435.0 ], "executorDeserializeTime" : [ 1.0, 3.0, 36.0 ], "executorDeserializeCpuTime" : [ 0.0, 0.0, 0.0 ], "executorRunTime" : [ 16.0, 28.0, 351.0 ], diff --git a/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_read_expectation.json b/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_read_expectation.json index 1a498dbdcc6c..e6b705989cc9 100644 --- a/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_read_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_read_expectation.json @@ -1,6 +1,5 @@ { "quantiles" : [ 0.05, 0.25, 0.5, 0.75, 0.95 ], - "duration" : [ 37.0, 81.0, 83.0, 86.0 ], "executorDeserializeTime" : [ 1.0, 2.0, 2.0, 2.0, 3.0 ], "executorDeserializeCpuTime" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ], "executorRunTime" : [ 30.0, 74.0, 75.0, 76.0, 79.0 ], diff --git a/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_write_expectation.json b/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_write_expectation.json index 78aa1b1fa809..788f28cf7b36 100644 --- a/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_write_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_write_expectation.json @@ -1,6 +1,5 @@ { "quantiles" : [ 0.05, 0.25, 0.5, 0.75, 0.95 ], - "duration" : [ 23.0, 33.0, 40.0, 74.0, 419.0 ], "executorDeserializeTime" : [ 2.0, 2.0, 3.0, 7.0, 31.0 ], "executorDeserializeCpuTime" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ], "executorRunTime" : [ 16.0, 18.0, 28.0, 49.0, 349.0 ], From 99b9135e67ad8f049f915294b082f8fafc5e5ba4 Mon Sep 17 00:00:00 2001 From: Shahid Date: Tue, 20 Nov 2018 21:44:20 +0530 Subject: [PATCH 4/5] address comment --- core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index e5abd38f2e9b..b032a2bfa4be 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -996,6 +996,8 @@ private[ui] object ApiHelper { HEADER_EXECUTOR -> TaskIndexNames.EXECUTOR, HEADER_HOST -> TaskIndexNames.HOST, HEADER_LAUNCH_TIME -> TaskIndexNames.LAUNCH_TIME, + // SPARK-26109: Duration of task as executorRunTime to make it consistent with the + // aggregated tasks summary metrics table and the previous versions of Spark. HEADER_DURATION -> TaskIndexNames.EXEC_RUN_TIME, HEADER_SCHEDULER_DELAY -> TaskIndexNames.SCHEDULER_DELAY, HEADER_DESER_TIME -> TaskIndexNames.DESER_TIME, From fb94fa78044cd6465c97f4d7f72ef35244c52748 Mon Sep 17 00:00:00 2001 From: Shahid Date: Tue, 20 Nov 2018 21:57:40 +0530 Subject: [PATCH 5/5] nit: remove space --- core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index b032a2bfa4be..7e6cc4297d6b 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -996,7 +996,7 @@ private[ui] object ApiHelper { HEADER_EXECUTOR -> TaskIndexNames.EXECUTOR, HEADER_HOST -> TaskIndexNames.HOST, HEADER_LAUNCH_TIME -> TaskIndexNames.LAUNCH_TIME, - // SPARK-26109: Duration of task as executorRunTime to make it consistent with the + // SPARK-26109: Duration of task as executorRunTime to make it consistent with the // aggregated tasks summary metrics table and the previous versions of Spark. HEADER_DURATION -> TaskIndexNames.EXEC_RUN_TIME, HEADER_SCHEDULER_DELAY -> TaskIndexNames.SCHEDULER_DELAY,