Skip to content

Commit 8fb852b

Browse files
Stavros KontopoulosJackey Lee
authored andcommitted
[SPARK-25394][CORE] Add an application status metrics source
- Exposes several metrics regarding application status as a source, useful to scrape them via jmx instead of mining the metrics rest api. Example use case: prometheus + jmx exporter. - Metrics are gathered when a job ends at the AppStatusListener side, could be more fine-grained but most metrics like tasks completed are also counted by executors. More metrics could be exposed in the future to avoid scraping executors in some scenarios. - a config option `spark.app.status.metrics.enabled` is added to disable/enable these metrics, by default they are disabled. This was manually tested with jmx source enabled and prometheus server on k8s: ![metrics](https://user-images.githubusercontent.com/7945591/45300945-63064d00-b518-11e8-812a-d9b4155ba0c0.png) In the next pic the job delay is shown for repeated pi calculation (Spark action). ![pi](https://user-images.githubusercontent.com/7945591/45329927-89a1a380-b56b-11e8-9cc1-5e76cb83969f.png) Closes apache#22381 from skonto/add_app_status_metrics. Authored-by: Stavros Kontopoulos <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
1 parent 785d0b7 commit 8fb852b

File tree

5 files changed

+125
-9
lines changed

5 files changed

+125
-9
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ import org.apache.spark.rpc.RpcEndpointRef
5353
import org.apache.spark.scheduler._
5454
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend}
5555
import org.apache.spark.scheduler.local.LocalSchedulerBackend
56-
import org.apache.spark.status.AppStatusStore
56+
import org.apache.spark.status.{AppStatusSource, AppStatusStore}
5757
import org.apache.spark.status.api.v1.ThreadStackTrace
5858
import org.apache.spark.storage._
5959
import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump
@@ -418,7 +418,8 @@ class SparkContext(config: SparkConf) extends Logging {
418418

419419
// Initialize the app status store and listener before SparkEnv is created so that it gets
420420
// all events.
421-
_statusStore = AppStatusStore.createLiveStore(conf)
421+
val appStatusSource = AppStatusSource.createSource(conf)
422+
_statusStore = AppStatusStore.createLiveStore(conf, appStatusSource)
422423
listenerBus.addToStatusQueue(_statusStore.listener.get)
423424

424425
// Create the Spark execution environment (cache, map output tracker, etc)
@@ -569,7 +570,7 @@ class SparkContext(config: SparkConf) extends Logging {
569570
_executorAllocationManager.foreach { e =>
570571
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
571572
}
572-
573+
appStatusSource.foreach(_env.metricsSystem.registerSource(_))
573574
// Make sure the context is stopped if the user forgets about it. This avoids leaving
574575
// unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
575576
// is killed, though.

core/src/main/scala/org/apache/spark/status/AppStatusListener.scala

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ private[spark] class AppStatusListener(
4444
kvstore: ElementTrackingStore,
4545
conf: SparkConf,
4646
live: Boolean,
47+
appStatusSource: Option[AppStatusSource] = None,
4748
lastUpdateTime: Option[Long] = None) extends SparkListener with Logging {
4849

4950
import config._
@@ -280,6 +281,11 @@ private[spark] class AppStatusListener(
280281
private def updateBlackListStatus(execId: String, blacklisted: Boolean): Unit = {
281282
liveExecutors.get(execId).foreach { exec =>
282283
exec.isBlacklisted = blacklisted
284+
if (blacklisted) {
285+
appStatusSource.foreach(_.BLACKLISTED_EXECUTORS.inc())
286+
} else {
287+
appStatusSource.foreach(_.UNBLACKLISTED_EXECUTORS.inc())
288+
}
283289
liveUpdate(exec, System.nanoTime())
284290
}
285291
}
@@ -382,11 +388,34 @@ private[spark] class AppStatusListener(
382388
}
383389

384390
job.status = event.jobResult match {
385-
case JobSucceeded => JobExecutionStatus.SUCCEEDED
386-
case JobFailed(_) => JobExecutionStatus.FAILED
391+
case JobSucceeded =>
392+
appStatusSource.foreach{_.SUCCEEDED_JOBS.inc()}
393+
JobExecutionStatus.SUCCEEDED
394+
case JobFailed(_) =>
395+
appStatusSource.foreach{_.FAILED_JOBS.inc()}
396+
JobExecutionStatus.FAILED
387397
}
388398

389399
job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None
400+
401+
for {
402+
source <- appStatusSource
403+
submissionTime <- job.submissionTime
404+
completionTime <- job.completionTime
405+
} {
406+
source.JOB_DURATION.value.set(completionTime.getTime() - submissionTime.getTime())
407+
}
408+
409+
// update global app status counters
410+
appStatusSource.foreach { source =>
411+
source.COMPLETED_STAGES.inc(job.completedStages.size)
412+
source.FAILED_STAGES.inc(job.failedStages)
413+
source.COMPLETED_TASKS.inc(job.completedTasks)
414+
source.FAILED_TASKS.inc(job.failedTasks)
415+
source.KILLED_TASKS.inc(job.killedTasks)
416+
source.SKIPPED_TASKS.inc(job.skippedTasks)
417+
source.SKIPPED_STAGES.inc(job.skippedStages.size)
418+
}
390419
update(job, now, last = true)
391420
if (job.status == JobExecutionStatus.SUCCEEDED) {
392421
appSummary = new AppSummary(appSummary.numCompletedJobs + 1, appSummary.numCompletedStages)
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.status
18+
19+
import java.util.concurrent.atomic.AtomicLong
20+
21+
import AppStatusSource.getCounter
22+
import com.codahale.metrics.{Counter, Gauge, MetricRegistry}
23+
24+
import org.apache.spark.SparkConf
25+
import org.apache.spark.internal.config.ConfigBuilder
26+
import org.apache.spark.metrics.source.Source
27+
28+
private [spark] class JobDuration(val value: AtomicLong) extends Gauge[Long] {
29+
override def getValue: Long = value.get()
30+
}
31+
32+
private[spark] class AppStatusSource extends Source {
33+
34+
override implicit val metricRegistry = new MetricRegistry()
35+
36+
override val sourceName = "appStatus"
37+
38+
val jobDuration = new JobDuration(new AtomicLong(0L))
39+
40+
// Duration of each job in milliseconds
41+
val JOB_DURATION = metricRegistry
42+
.register(MetricRegistry.name("jobDuration"), jobDuration)
43+
44+
val FAILED_STAGES = getCounter("stages", "failedStages")
45+
46+
val SKIPPED_STAGES = getCounter("stages", "skippedStages")
47+
48+
val COMPLETED_STAGES = getCounter("stages", "completedStages")
49+
50+
val SUCCEEDED_JOBS = getCounter("jobs", "succeededJobs")
51+
52+
val FAILED_JOBS = getCounter("jobs", "failedJobs")
53+
54+
val COMPLETED_TASKS = getCounter("tasks", "completedTasks")
55+
56+
val FAILED_TASKS = getCounter("tasks", "failedTasks")
57+
58+
val KILLED_TASKS = getCounter("tasks", "killedTasks")
59+
60+
val SKIPPED_TASKS = getCounter("tasks", "skippedTasks")
61+
62+
val BLACKLISTED_EXECUTORS = getCounter("tasks", "blackListedExecutors")
63+
64+
val UNBLACKLISTED_EXECUTORS = getCounter("tasks", "unblackListedExecutors")
65+
}
66+
67+
private[spark] object AppStatusSource {
68+
69+
def getCounter(prefix: String, name: String)(implicit metricRegistry: MetricRegistry): Counter = {
70+
metricRegistry.counter(MetricRegistry.name(prefix, name))
71+
}
72+
73+
def createSource(conf: SparkConf): Option[AppStatusSource] = {
74+
Option(conf.get(AppStatusSource.APP_STATUS_METRICS_ENABLED))
75+
.filter(identity)
76+
.map { _ => new AppStatusSource() }
77+
}
78+
79+
val APP_STATUS_METRICS_ENABLED =
80+
ConfigBuilder("spark.app.status.metrics.enabled")
81+
.doc("Whether Dropwizard/Codahale metrics " +
82+
"will be reported for the status of the running spark app.")
83+
.booleanConf
84+
.createWithDefault(false)
85+
}

core/src/main/scala/org/apache/spark/status/AppStatusStore.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -505,10 +505,11 @@ private[spark] object AppStatusStore {
505505
/**
506506
* Create an in-memory store for a live application.
507507
*/
508-
def createLiveStore(conf: SparkConf): AppStatusStore = {
508+
def createLiveStore(
509+
conf: SparkConf,
510+
appStatusSource: Option[AppStatusSource] = None): AppStatusStore = {
509511
val store = new ElementTrackingStore(new InMemoryStore(), conf)
510-
val listener = new AppStatusListener(store, conf, true)
512+
val listener = new AppStatusListener(store, conf, true, appStatusSource)
511513
new AppStatusStore(store, listener = Some(listener))
512514
}
513-
514515
}

core/src/main/scala/org/apache/spark/status/LiveEntity.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ private[spark] abstract class LiveEntity {
6161
private class LiveJob(
6262
val jobId: Int,
6363
name: String,
64-
submissionTime: Option[Date],
64+
val submissionTime: Option[Date],
6565
val stageIds: Seq[Int],
6666
jobGroup: Option[String],
6767
numTasks: Int) extends LiveEntity {

0 commit comments

Comments
 (0)