Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit b7c6eb2

Browse files
authored
Refactoring of metrics naming to add mesos semantics and avoid clashing with existing Spark metrics (#58)
1 parent 957ad17 commit b7c6eb2

File tree

3 files changed

+64
-65
lines changed

3 files changed

+64
-65
lines changed

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,15 @@ import java.util.{Collections, Date, List => JList}
2323
import scala.collection.JavaConverters._
2424
import scala.collection.mutable
2525
import scala.collection.mutable.ArrayBuffer
26-
2726
import org.apache.mesos.{Scheduler, SchedulerDriver}
2827
import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
2928
import org.apache.mesos.Protos.Environment.Variable
3029
import org.apache.mesos.Protos.TaskStatus.Reason
31-
32-
import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState}
33-
import org.apache.spark.deploy.mesos.{config, MesosDriverDescription}
30+
import org.apache.spark.{SecurityManager, SparkConf, SparkEnv, SparkException, TaskState}
31+
import org.apache.spark.deploy.mesos.{MesosDriverDescription, config}
3432
import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse}
3533
import org.apache.spark.metrics.MetricsSystem
34+
import org.apache.spark.metrics.source.JvmSource
3635
import org.apache.spark.util.Utils
3736

3837
/**
@@ -122,9 +121,9 @@ private[spark] class MesosClusterScheduler(
122121
conf: SparkConf)
123122
extends Scheduler with MesosSchedulerUtils {
124123
var frameworkUrl: String = _
125-
private val metricsSource = new MesosClusterSchedulerSource(this)
124+
private val mesosClusterSchedulerMetricsSource = new MesosClusterSchedulerSource(this)
126125
private val metricsSystem =
127-
MetricsSystem.createMetricsSystem(metricsSource.sourceName, conf, new SecurityManager(conf))
126+
MetricsSystem.createMetricsSystem("dispatcher", conf, new SecurityManager(conf))
128127
private val master = conf.get("spark.master")
129128
private val appName = conf.get("spark.app.name")
130129
private val queuedCapacity = conf.getInt("spark.mesos.maxDrivers", 200)
@@ -306,7 +305,7 @@ private[spark] class MesosClusterScheduler(
306305
frameworkId = id
307306
}
308307
recoverState()
309-
metricsSystem.registerSource(metricsSource)
308+
metricsSystem.registerSource(mesosClusterSchedulerMetricsSource)
310309
metricsSystem.start()
311310
val driver = createSchedulerDriver(
312311
masterUrl = master,
@@ -667,14 +666,14 @@ private[spark] class MesosClusterScheduler(
667666
new Date(),
668667
None,
669668
getDriverFrameworkID(submission))
670-
metricsSource.recordLaunchedDriver(submission)
669+
mesosClusterSchedulerMetricsSource.recordLaunchedDriver(submission)
671670
launchedDrivers(submission.submissionId) = newState
672671
launchedDriversState.persist(submission.submissionId, newState)
673672
afterLaunchCallback(submission.submissionId)
674673
} catch {
675674
case e: SparkException =>
676675
afterLaunchCallback(submission.submissionId)
677-
metricsSource.recordExceptionDriver(submission)
676+
mesosClusterSchedulerMetricsSource.recordExceptionDriver(submission)
678677
finishedDrivers += new MesosClusterSubmissionState(
679678
submission,
680679
TaskID.newBuilder().setValue(submission.submissionId).build(),
@@ -807,10 +806,10 @@ private[spark] class MesosClusterScheduler(
807806
val nextRetry = new Date(new Date().getTime + waitTimeSec * 1000L)
808807
val newDriverDescription = state.driverDescription.copy(
809808
retryState = Some(new MesosClusterRetryState(status, retries, nextRetry, waitTimeSec)))
810-
metricsSource.recordRetryingDriver(state)
809+
mesosClusterSchedulerMetricsSource.recordRetryingDriver(state)
811810
addDriverToPending(newDriverDescription, newDriverDescription.submissionId)
812811
} else if (TaskState.isFinished(mesosToTaskState(status.getState))) {
813-
metricsSource.recordFinishedDriver(state, status.getState)
812+
mesosClusterSchedulerMetricsSource.recordFinishedDriver(state, status.getState)
814813
retireDriver(subId, state)
815814
}
816815
state.mesosTaskStatus = Option(status)
@@ -891,7 +890,7 @@ private[spark] class MesosClusterScheduler(
891890

892891
private def addDriverToQueue(desc: MesosDriverDescription): Unit = {
893892
queuedDriversState.persist(desc.submissionId, desc)
894-
metricsSource.recordQueuedDriver()
893+
mesosClusterSchedulerMetricsSource.recordQueuedDriver()
895894
queuedDrivers += desc
896895
revive()
897896
}

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSource.scala

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@
1717

1818
package org.apache.spark.scheduler.cluster.mesos
1919

20-
import java.util.concurrent.TimeUnit
2120
import java.util.Date
21+
import java.util.concurrent.TimeUnit
2222

2323
import scala.collection.mutable.HashMap
2424

25-
import com.codahale.metrics.{Counter, Gauge, MetricRegistry, Timer}
25+
import com.codahale.metrics.{Gauge, MetricRegistry, Timer}
2626
import org.apache.mesos.Protos.{TaskState => MesosTaskState}
2727

2828
import org.apache.spark.TaskState
@@ -47,75 +47,75 @@ private[mesos] class MesosClusterSchedulerSource(scheduler: MesosClusterSchedule
4747
// - pruning/retireDriver():
4848
// From: finishedDrivers:
4949
// To: NULL
50-
51-
override val sourceName: String = "mesos_cluster"
50+
override val sourceName: String = "mesos"
5251
override val metricRegistry: MetricRegistry = new MetricRegistry
5352

5453
// PULL METRICS:
5554
// These gauge metrics are periodically polled/pulled by the metrics system
5655

57-
metricRegistry.register(MetricRegistry.name("driver", "waiting"), new Gauge[Int] {
56+
metricRegistry.register(MetricRegistry.name("drivers", "waiting"), new Gauge[Int] {
5857
override def getValue: Int = scheduler.getQueuedDriversSize
5958
})
6059

61-
metricRegistry.register(MetricRegistry.name("driver", "launched"), new Gauge[Int] {
60+
metricRegistry.register(MetricRegistry.name("drivers", "launched"), new Gauge[Int] {
6261
override def getValue: Int = scheduler.getLaunchedDriversSize
6362
})
6463

65-
metricRegistry.register(MetricRegistry.name("driver", "retry"), new Gauge[Int] {
64+
metricRegistry.register(MetricRegistry.name("drivers", "retry"), new Gauge[Int] {
6665
override def getValue: Int = scheduler.getPendingRetryDriversSize
6766
})
6867

69-
metricRegistry.register(MetricRegistry.name("driver", "finished"), new Gauge[Int] {
68+
metricRegistry.register(MetricRegistry.name("drivers", "finished"), new Gauge[Int] {
7069
override def getValue: Int = scheduler.getFinishedDriversSize
7170
})
7271

7372
// PUSH METRICS:
7473
// These metrics are updated directly as events occur
7574

76-
private val queuedCounter = metricRegistry.counter(MetricRegistry.name("driver", "waiting_count"))
75+
private val queuedCounter =
76+
metricRegistry.counter(MetricRegistry.name("drivers", "waiting_count"))
7777
private val launchedCounter =
78-
metricRegistry.counter(MetricRegistry.name("driver", "launched_count"))
79-
private val retryCounter = metricRegistry.counter(MetricRegistry.name("driver", "retry_count"))
78+
metricRegistry.counter(MetricRegistry.name("drivers", "launched_count"))
79+
private val retryCounter = metricRegistry.counter(MetricRegistry.name("drivers", "retry_count"))
8080
private val exceptionCounter =
81-
metricRegistry.counter(MetricRegistry.name("driver", "exception_count"))
81+
metricRegistry.counter(MetricRegistry.name("drivers", "exception_count"))
8282
private val finishedCounter =
83-
metricRegistry.counter(MetricRegistry.name("driver", "finished_count"))
83+
metricRegistry.counter(MetricRegistry.name("drivers", "finished_count"))
8484

8585
// Same as finishedCounter above, except grouped by MesosTaskState.
8686
private val finishedMesosStateCounters = MesosTaskState.values
8787
// Avoid registering 'finished' metrics for states that aren't considered finished:
8888
.filter(state => TaskState.isFinished(mesosToTaskState(state)))
8989
.map(state => (state, metricRegistry.counter(
90-
MetricRegistry.name("driver", "finished_count_mesos_state", state.name.toLowerCase))))
90+
MetricRegistry.name("drivers", "finished_count_mesos_state", state.name.toLowerCase))))
9191
.toMap
9292
private val finishedMesosUnknownStateCounter =
93-
metricRegistry.counter(MetricRegistry.name("driver", "finished_count_mesos_state", "UNKNOWN"))
93+
metricRegistry.counter(MetricRegistry.name("drivers", "finished_count_mesos_state", "UNKNOWN"))
9494

9595
// Duration from submission to FIRST launch.
9696
// This omits retries since those would exaggerate the time since original submission.
9797
private val submitToFirstLaunch =
98-
metricRegistry.timer(MetricRegistry.name("driver", "submit_to_first_launch"))
98+
metricRegistry.timer(MetricRegistry.name("drivers", "submit_to_first_launch"))
9999
// Duration from initial submission to an exception.
100100
private val submitToException =
101-
metricRegistry.timer(MetricRegistry.name("driver", "submit_to_exception"))
101+
metricRegistry.timer(MetricRegistry.name("drivers", "submit_to_exception"))
102102

103103
// Duration from (most recent) launch to a retry.
104-
private val launchToRetry = metricRegistry.timer(MetricRegistry.name("driver", "launch_to_retry"))
104+
private val launchToRetry = metricRegistry.timer(MetricRegistry.name("drivers", "launch_to_retry"))
105105

106106
// Duration from initial submission to finished.
107107
private val submitToFinish =
108-
metricRegistry.timer(MetricRegistry.name("driver", "submit_to_finish"))
108+
metricRegistry.timer(MetricRegistry.name("drivers", "submit_to_finish"))
109109
// Duration from (most recent) launch to finished.
110110
private val launchToFinish =
111-
metricRegistry.timer(MetricRegistry.name("driver", "launch_to_finish"))
111+
metricRegistry.timer(MetricRegistry.name("drivers", "launch_to_finish"))
112112

113113
// Same as submitToFinish and launchToFinish above, except grouped by Spark TaskState.
114114
class FinishStateTimers(state: String) {
115115
val submitToFinish =
116-
metricRegistry.timer(MetricRegistry.name("driver", "submit_to_finish_state", state))
116+
metricRegistry.timer(MetricRegistry.name("drivers", "submit_to_finish_state", state))
117117
val launchToFinish =
118-
metricRegistry.timer(MetricRegistry.name("driver", "launch_to_finish_state", state))
118+
metricRegistry.timer(MetricRegistry.name("drivers", "launch_to_finish_state", state))
119119
}
120120
private val finishSparkStateTimers = HashMap.empty[TaskState.TaskState, FinishStateTimers]
121121
for (state <- TaskState.values) {
@@ -125,12 +125,12 @@ private[mesos] class MesosClusterSchedulerSource(scheduler: MesosClusterSchedule
125125
}
126126
}
127127
private val submitToFinishUnknownState = metricRegistry.timer(
128-
MetricRegistry.name("driver", "submit_to_finish_state", "UNKNOWN"))
128+
MetricRegistry.name("drivers", "submit_to_finish_state", "UNKNOWN"))
129129
private val launchToFinishUnknownState = metricRegistry.timer(
130-
MetricRegistry.name("driver", "launch_to_finish_state", "UNKNOWN"))
130+
MetricRegistry.name("drivers", "launch_to_finish_state", "UNKNOWN"))
131131

132132
// Histogram of retry counts at retry scheduling
133-
private val retryCount = metricRegistry.histogram(MetricRegistry.name("driver", "retry_counts"))
133+
private val retryCount = metricRegistry.histogram(MetricRegistry.name("drivers", "retry_counts"))
134134

135135
// Records when a submission initially enters the launch queue.
136136
def recordQueuedDriver(): Unit = queuedCounter.inc

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerSource.scala

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -34,45 +34,45 @@ private[mesos] class MesosCoarseGrainedSchedulerSource(
3434
scheduler: MesosCoarseGrainedSchedulerBackend)
3535
extends Source with MesosSchedulerUtils {
3636

37-
override val sourceName: String = "mesos_cluster"
37+
override val sourceName: String = "mesos"
3838
override val metricRegistry: MetricRegistry = new MetricRegistry
3939

4040
// EXECUTOR STATE POLLING METRICS:
4141
// These metrics periodically poll the scheduler for its state, including resource allocation and
4242
// task states.
4343

4444
// Number of CPUs used
45-
metricRegistry.register(MetricRegistry.name("executor", "resource", "cores"), new Gauge[Double] {
45+
metricRegistry.register(MetricRegistry.name("resource", "cores"), new Gauge[Double] {
4646
override def getValue: Double = scheduler.getCoresUsed
4747
})
4848
// Number of CPUs vs max
4949
if (scheduler.getMaxCores != 0) {
50-
metricRegistry.register(MetricRegistry.name("executor", "resource", "cores_of_max"),
50+
metricRegistry.register(MetricRegistry.name("resource", "cores_of_max"),
5151
new Gauge[Double] {
5252
// Note: See above div0 check before calling register()
5353
override def getValue: Double = scheduler.getCoresUsed / scheduler.getMaxCores
5454
})
5555
}
5656
// Number of CPUs per task
57-
metricRegistry.register(MetricRegistry.name("executor", "resource", "mean_cores_per_task"),
57+
metricRegistry.register(MetricRegistry.name("resource", "mean_cores_per_task"),
5858
new Gauge[Double] {
5959
override def getValue: Double = scheduler.getMeanCoresPerTask
6060
})
6161

6262
// Number of GPUs used
63-
metricRegistry.register(MetricRegistry.name("executor", "resource", "gpus"), new Gauge[Double] {
63+
metricRegistry.register(MetricRegistry.name("resource", "gpus"), new Gauge[Double] {
6464
override def getValue: Double = scheduler.getGpusUsed
6565
})
6666
// Number of GPUs vs max
6767
if (scheduler.getMaxGpus != 0) {
68-
metricRegistry.register(MetricRegistry.name("executor", "resource", "gpus_of_max"),
68+
metricRegistry.register(MetricRegistry.name("resource", "gpus_of_max"),
6969
new Gauge[Double] {
7070
// Note: See above div0 check before calling register()
7171
override def getValue: Double = scheduler.getGpusUsed / scheduler.getMaxGpus
7272
})
7373
}
7474
// Number of GPUs per task
75-
metricRegistry.register(MetricRegistry.name("executor", "resource", "mean_gpus_per_task"),
75+
metricRegistry.register(MetricRegistry.name("resource", "mean_gpus_per_task"),
7676
new Gauge[Double] {
7777
override def getValue: Double = scheduler.getMeanGpusPerTask
7878
})
@@ -84,7 +84,7 @@ private[mesos] class MesosCoarseGrainedSchedulerSource(
8484
// Number of tasks vs max
8585
if (scheduler.isExecutorLimitEnabled) {
8686
// executorLimit is assigned asynchronously, so it may start off with a zero value.
87-
metricRegistry.register(MetricRegistry.name("executor", "count_of_max"), new Gauge[Int] {
87+
metricRegistry.register(MetricRegistry.name("count_of_max"), new Gauge[Int] {
8888
override def getValue: Int = {
8989
if (scheduler.getExecutorLimit == 0) {
9090
0
@@ -95,19 +95,19 @@ private[mesos] class MesosCoarseGrainedSchedulerSource(
9595
})
9696
}
9797
// Number of task failures
98-
metricRegistry.register(MetricRegistry.name("executor", "failures"), new Gauge[Int] {
98+
metricRegistry.register(MetricRegistry.name("failures"), new Gauge[Int] {
9999
override def getValue: Int = scheduler.getTaskFailureCount
100100
})
101101
// Number of tracked agents regardless of whether we're currently present on them
102-
metricRegistry.register(MetricRegistry.name("executor", "known_agents"), new Gauge[Int] {
102+
metricRegistry.register(MetricRegistry.name("known_agents"), new Gauge[Int] {
103103
override def getValue: Int = scheduler.getKnownAgentsCount
104104
})
105105
// Number of tracked agents with tasks on them
106-
metricRegistry.register(MetricRegistry.name("executor", "occupied_agents"), new Gauge[Int] {
106+
metricRegistry.register(MetricRegistry.name("occupied_agents"), new Gauge[Int] {
107107
override def getValue: Int = scheduler.getOccupiedAgentsCount
108108
})
109109
// Number of blacklisted agents (too many failures)
110-
metricRegistry.register(MetricRegistry.name("executor", "blacklisted_agents"), new Gauge[Int] {
110+
metricRegistry.register(MetricRegistry.name("blacklisted_agents"), new Gauge[Int] {
111111
override def getValue: Int = scheduler.getBlacklistedAgentCount
112112
})
113113

@@ -116,63 +116,63 @@ private[mesos] class MesosCoarseGrainedSchedulerSource(
116116

117117
// Rate of offers received (total number of offers, not offer RPCs)
118118
private val offerCounter =
119-
metricRegistry.counter(MetricRegistry.name("executor", "mesos", "offer"))
119+
metricRegistry.counter(MetricRegistry.name("offers", "received"))
120120
// Rate of all offers declined, sum of the following reasons for declines
121121
private val declineCounter =
122-
metricRegistry.counter(MetricRegistry.name("executor", "mesos", "decline"))
122+
metricRegistry.counter(MetricRegistry.name("offers", "declined"))
123123
// Offers declined for unmet requirements (with RejectOfferDurationForUnmetConstraints)
124124
private val declineUnmetCounter =
125-
metricRegistry.counter(MetricRegistry.name("executor", "mesos", "decline_unmet"))
125+
metricRegistry.counter(MetricRegistry.name("offers", "declined_unmet"))
126126
// Offers declined when the deployment is finished (with RejectOfferDurationForReachedMaxCores)
127127
private val declineFinishedCounter =
128-
metricRegistry.counter(MetricRegistry.name("executor", "mesos", "decline_finished"))
128+
metricRegistry.counter(MetricRegistry.name("offers", "declined_finished"))
129129
// Offers declined when offers are being unused (no duration in the decline filter)
130130
private val declineUnusedCounter =
131-
metricRegistry.counter(MetricRegistry.name("executor", "mesos", "decline_unused"))
131+
metricRegistry.counter(MetricRegistry.name("offers", "declined_unused"))
132132
// Rate of revive operations
133133
private val reviveCounter =
134-
metricRegistry.counter(MetricRegistry.name("executor", "mesos", "revive"))
134+
metricRegistry.counter(MetricRegistry.name("offers", "revived"))
135135
// Rate of launch operations
136136
private val launchCounter =
137-
metricRegistry.counter(MetricRegistry.name("executor", "mesos", "launch"))
137+
metricRegistry.counter(MetricRegistry.name("offers", "launched"))
138138

139139
// Counters for Spark states on launched executors (LAUNCHING, RUNNING, ...)
140140
private val sparkStateCounters = TaskState.values
141141
.map(state => (state, metricRegistry.counter(
142-
MetricRegistry.name("executor", "spark_state", state.toString.toLowerCase))))
142+
MetricRegistry.name("spark_state", state.toString.toLowerCase))))
143143
.toMap
144144
private val sparkUnknownStateCounter =
145-
metricRegistry.counter(MetricRegistry.name("executor", "spark_state", "UNKNOWN"))
145+
metricRegistry.counter(MetricRegistry.name("spark_state", "UNKNOWN"))
146146
// Counters for Mesos states on launched executors (TASK_RUNNING, TASK_LOST, ...),
147147
// more granular than sparkStateCounters
148148
private val mesosStateCounters = MesosTaskState.values
149149
.map(state => (state, metricRegistry.counter(
150-
MetricRegistry.name("executor", "mesos_state", state.name.toLowerCase))))
150+
MetricRegistry.name("mesos_state", state.name.toLowerCase))))
151151
.toMap
152152
private val mesosUnknownStateCounter =
153-
metricRegistry.counter(MetricRegistry.name("executor", "mesos_state", "UNKNOWN"))
153+
metricRegistry.counter(MetricRegistry.name("mesos_state", "UNKNOWN"))
154154

155155
// TASK TIMER METRICS:
156156
// These metrics measure the duration to launch and run executors
157157

158158
// Duration from driver start to the first task launching.
159159
private val startToFirstLaunched =
160-
metricRegistry.timer(MetricRegistry.name("executor", "start_to_first_launched"))
160+
metricRegistry.timer(MetricRegistry.name("start_to_first_launched"))
161161
// Duration from driver start to the first task running.
162162
private val startToFirstRunning =
163-
metricRegistry.timer(MetricRegistry.name("executor", "start_to_first_running"))
163+
metricRegistry.timer(MetricRegistry.name("start_to_first_running"))
164164

165165
// Duration from driver start to maxCores footprint being filled
166166
private val startToAllLaunched =
167-
metricRegistry.timer(MetricRegistry.name("executor", "start_to_all_launched"))
167+
metricRegistry.timer(MetricRegistry.name("start_to_all_launched"))
168168

169169
// Duration between an executor launch and the executor entering a given spark state, e.g. RUNNING
170170
private val launchToSparkStateTimers = TaskState.values
171171
.map(state => (state, metricRegistry.timer(
172-
MetricRegistry.name("executor", "launch_to_spark_state", state.toString.toLowerCase))))
172+
MetricRegistry.name("launch_to_spark_state", state.toString.toLowerCase))))
173173
.toMap
174174
private val launchToUnknownSparkStateTimer = metricRegistry.timer(
175-
MetricRegistry.name("executor", "launch_to_spark_state", "UNKNOWN"))
175+
MetricRegistry.name("launch_to_spark_state", "UNKNOWN"))
176176

177177
// Time that the scheduler was initialized. This is the 'start time'.
178178
private val schedulerInitTime = new Date

0 commit comments

Comments
 (0)