Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,15 @@ import java.util.{Collections, Date, List => JList}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.mesos.{Scheduler, SchedulerDriver}
import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
import org.apache.mesos.Protos.Environment.Variable
import org.apache.mesos.Protos.TaskStatus.Reason

import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState}
import org.apache.spark.deploy.mesos.{config, MesosDriverDescription}
import org.apache.spark.{SecurityManager, SparkConf, SparkEnv, SparkException, TaskState}
import org.apache.spark.deploy.mesos.{MesosDriverDescription, config}
import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.metrics.source.JvmSource
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -122,9 +121,9 @@ private[spark] class MesosClusterScheduler(
conf: SparkConf)
extends Scheduler with MesosSchedulerUtils {
var frameworkUrl: String = _
private val metricsSource = new MesosClusterSchedulerSource(this)
private val mesosClusterSchedulerMetricsSource = new MesosClusterSchedulerSource(this)
private val metricsSystem =
MetricsSystem.createMetricsSystem(metricsSource.sourceName, conf, new SecurityManager(conf))
MetricsSystem.createMetricsSystem("dispatcher", conf, new SecurityManager(conf))
private val master = conf.get("spark.master")
private val appName = conf.get("spark.app.name")
private val queuedCapacity = conf.getInt("spark.mesos.maxDrivers", 200)
Expand Down Expand Up @@ -306,7 +305,7 @@ private[spark] class MesosClusterScheduler(
frameworkId = id
}
recoverState()
metricsSystem.registerSource(metricsSource)
metricsSystem.registerSource(mesosClusterSchedulerMetricsSource)
metricsSystem.start()
val driver = createSchedulerDriver(
masterUrl = master,
Expand Down Expand Up @@ -667,14 +666,14 @@ private[spark] class MesosClusterScheduler(
new Date(),
None,
getDriverFrameworkID(submission))
metricsSource.recordLaunchedDriver(submission)
mesosClusterSchedulerMetricsSource.recordLaunchedDriver(submission)
launchedDrivers(submission.submissionId) = newState
launchedDriversState.persist(submission.submissionId, newState)
afterLaunchCallback(submission.submissionId)
} catch {
case e: SparkException =>
afterLaunchCallback(submission.submissionId)
metricsSource.recordExceptionDriver(submission)
mesosClusterSchedulerMetricsSource.recordExceptionDriver(submission)
finishedDrivers += new MesosClusterSubmissionState(
submission,
TaskID.newBuilder().setValue(submission.submissionId).build(),
Expand Down Expand Up @@ -807,10 +806,10 @@ private[spark] class MesosClusterScheduler(
val nextRetry = new Date(new Date().getTime + waitTimeSec * 1000L)
val newDriverDescription = state.driverDescription.copy(
retryState = Some(new MesosClusterRetryState(status, retries, nextRetry, waitTimeSec)))
metricsSource.recordRetryingDriver(state)
mesosClusterSchedulerMetricsSource.recordRetryingDriver(state)
addDriverToPending(newDriverDescription, newDriverDescription.submissionId)
} else if (TaskState.isFinished(mesosToTaskState(status.getState))) {
metricsSource.recordFinishedDriver(state, status.getState)
mesosClusterSchedulerMetricsSource.recordFinishedDriver(state, status.getState)
retireDriver(subId, state)
}
state.mesosTaskStatus = Option(status)
Expand Down Expand Up @@ -891,7 +890,7 @@ private[spark] class MesosClusterScheduler(

private def addDriverToQueue(desc: MesosDriverDescription): Unit = {
queuedDriversState.persist(desc.submissionId, desc)
metricsSource.recordQueuedDriver()
mesosClusterSchedulerMetricsSource.recordQueuedDriver()
queuedDrivers += desc
revive()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package org.apache.spark.scheduler.cluster.mesos

import java.util.concurrent.TimeUnit
import java.util.Date
import java.util.concurrent.TimeUnit

import scala.collection.mutable.HashMap

import com.codahale.metrics.{Counter, Gauge, MetricRegistry, Timer}
import com.codahale.metrics.{Gauge, MetricRegistry, Timer}
import org.apache.mesos.Protos.{TaskState => MesosTaskState}

import org.apache.spark.TaskState
Expand All @@ -47,75 +47,75 @@ private[mesos] class MesosClusterSchedulerSource(scheduler: MesosClusterSchedule
// - pruning/retireDriver():
// From: finishedDrivers:
// To: NULL

override val sourceName: String = "mesos_cluster"
override val sourceName: String = "mesos"
override val metricRegistry: MetricRegistry = new MetricRegistry

// PULL METRICS:
// These gauge metrics are periodically polled/pulled by the metrics system

metricRegistry.register(MetricRegistry.name("driver", "waiting"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("drivers", "waiting"), new Gauge[Int] {
override def getValue: Int = scheduler.getQueuedDriversSize
})

metricRegistry.register(MetricRegistry.name("driver", "launched"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("drivers", "launched"), new Gauge[Int] {
override def getValue: Int = scheduler.getLaunchedDriversSize
})

metricRegistry.register(MetricRegistry.name("driver", "retry"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("drivers", "retry"), new Gauge[Int] {
override def getValue: Int = scheduler.getPendingRetryDriversSize
})

metricRegistry.register(MetricRegistry.name("driver", "finished"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("drivers", "finished"), new Gauge[Int] {
override def getValue: Int = scheduler.getFinishedDriversSize
})

// PUSH METRICS:
// These metrics are updated directly as events occur

private val queuedCounter = metricRegistry.counter(MetricRegistry.name("driver", "waiting_count"))
private val queuedCounter =
metricRegistry.counter(MetricRegistry.name("drivers", "waiting_count"))
private val launchedCounter =
metricRegistry.counter(MetricRegistry.name("driver", "launched_count"))
private val retryCounter = metricRegistry.counter(MetricRegistry.name("driver", "retry_count"))
metricRegistry.counter(MetricRegistry.name("drivers", "launched_count"))
private val retryCounter = metricRegistry.counter(MetricRegistry.name("drivers", "retry_count"))
private val exceptionCounter =
metricRegistry.counter(MetricRegistry.name("driver", "exception_count"))
metricRegistry.counter(MetricRegistry.name("drivers", "exception_count"))
private val finishedCounter =
metricRegistry.counter(MetricRegistry.name("driver", "finished_count"))
metricRegistry.counter(MetricRegistry.name("drivers", "finished_count"))

// Same as finishedCounter above, except grouped by MesosTaskState.
private val finishedMesosStateCounters = MesosTaskState.values
// Avoid registering 'finished' metrics for states that aren't considered finished:
.filter(state => TaskState.isFinished(mesosToTaskState(state)))
.map(state => (state, metricRegistry.counter(
MetricRegistry.name("driver", "finished_count_mesos_state", state.name.toLowerCase))))
MetricRegistry.name("drivers", "finished_count_mesos_state", state.name.toLowerCase))))
.toMap
private val finishedMesosUnknownStateCounter =
metricRegistry.counter(MetricRegistry.name("driver", "finished_count_mesos_state", "UNKNOWN"))
metricRegistry.counter(MetricRegistry.name("drivers", "finished_count_mesos_state", "UNKNOWN"))

// Duration from submission to FIRST launch.
// This omits retries since those would exaggerate the time since original submission.
private val submitToFirstLaunch =
metricRegistry.timer(MetricRegistry.name("driver", "submit_to_first_launch"))
metricRegistry.timer(MetricRegistry.name("drivers", "submit_to_first_launch"))
// Duration from initial submission to an exception.
private val submitToException =
metricRegistry.timer(MetricRegistry.name("driver", "submit_to_exception"))
metricRegistry.timer(MetricRegistry.name("drivers", "submit_to_exception"))

// Duration from (most recent) launch to a retry.
private val launchToRetry = metricRegistry.timer(MetricRegistry.name("driver", "launch_to_retry"))
private val launchToRetry = metricRegistry.timer(MetricRegistry.name("drivers", "launch_to_retry"))

// Duration from initial submission to finished.
private val submitToFinish =
metricRegistry.timer(MetricRegistry.name("driver", "submit_to_finish"))
metricRegistry.timer(MetricRegistry.name("drivers", "submit_to_finish"))
// Duration from (most recent) launch to finished.
private val launchToFinish =
metricRegistry.timer(MetricRegistry.name("driver", "launch_to_finish"))
metricRegistry.timer(MetricRegistry.name("drivers", "launch_to_finish"))

// Same as submitToFinish and launchToFinish above, except grouped by Spark TaskState.
class FinishStateTimers(state: String) {
val submitToFinish =
metricRegistry.timer(MetricRegistry.name("driver", "submit_to_finish_state", state))
metricRegistry.timer(MetricRegistry.name("drivers", "submit_to_finish_state", state))
val launchToFinish =
metricRegistry.timer(MetricRegistry.name("driver", "launch_to_finish_state", state))
metricRegistry.timer(MetricRegistry.name("drivers", "launch_to_finish_state", state))
}
private val finishSparkStateTimers = HashMap.empty[TaskState.TaskState, FinishStateTimers]
for (state <- TaskState.values) {
Expand All @@ -125,12 +125,12 @@ private[mesos] class MesosClusterSchedulerSource(scheduler: MesosClusterSchedule
}
}
private val submitToFinishUnknownState = metricRegistry.timer(
MetricRegistry.name("driver", "submit_to_finish_state", "UNKNOWN"))
MetricRegistry.name("drivers", "submit_to_finish_state", "UNKNOWN"))
private val launchToFinishUnknownState = metricRegistry.timer(
MetricRegistry.name("driver", "launch_to_finish_state", "UNKNOWN"))
MetricRegistry.name("drivers", "launch_to_finish_state", "UNKNOWN"))

// Histogram of retry counts at retry scheduling
private val retryCount = metricRegistry.histogram(MetricRegistry.name("driver", "retry_counts"))
private val retryCount = metricRegistry.histogram(MetricRegistry.name("drivers", "retry_counts"))

// Records when a submission initially enters the launch queue.
def recordQueuedDriver(): Unit = queuedCounter.inc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,45 +34,45 @@ private[mesos] class MesosCoarseGrainedSchedulerSource(
scheduler: MesosCoarseGrainedSchedulerBackend)
extends Source with MesosSchedulerUtils {

override val sourceName: String = "mesos_cluster"
override val sourceName: String = "mesos"
override val metricRegistry: MetricRegistry = new MetricRegistry

// EXECUTOR STATE POLLING METRICS:
// These metrics periodically poll the scheduler for its state, including resource allocation and
// task states.

// Number of CPUs used
metricRegistry.register(MetricRegistry.name("executor", "resource", "cores"), new Gauge[Double] {
metricRegistry.register(MetricRegistry.name("resource", "cores"), new Gauge[Double] {
override def getValue: Double = scheduler.getCoresUsed
})
// Number of CPUs vs max
if (scheduler.getMaxCores != 0) {
metricRegistry.register(MetricRegistry.name("executor", "resource", "cores_of_max"),
metricRegistry.register(MetricRegistry.name("resource", "cores_of_max"),
new Gauge[Double] {
// Note: See above div0 check before calling register()
override def getValue: Double = scheduler.getCoresUsed / scheduler.getMaxCores
})
}
// Number of CPUs per task
metricRegistry.register(MetricRegistry.name("executor", "resource", "mean_cores_per_task"),
metricRegistry.register(MetricRegistry.name("resource", "mean_cores_per_task"),
new Gauge[Double] {
override def getValue: Double = scheduler.getMeanCoresPerTask
})

// Number of GPUs used
metricRegistry.register(MetricRegistry.name("executor", "resource", "gpus"), new Gauge[Double] {
metricRegistry.register(MetricRegistry.name("resource", "gpus"), new Gauge[Double] {
override def getValue: Double = scheduler.getGpusUsed
})
// Number of GPUs vs max
if (scheduler.getMaxGpus != 0) {
metricRegistry.register(MetricRegistry.name("executor", "resource", "gpus_of_max"),
metricRegistry.register(MetricRegistry.name("resource", "gpus_of_max"),
new Gauge[Double] {
// Note: See above div0 check before calling register()
override def getValue: Double = scheduler.getGpusUsed / scheduler.getMaxGpus
})
}
// Number of GPUs per task
metricRegistry.register(MetricRegistry.name("executor", "resource", "mean_gpus_per_task"),
metricRegistry.register(MetricRegistry.name("resource", "mean_gpus_per_task"),
new Gauge[Double] {
override def getValue: Double = scheduler.getMeanGpusPerTask
})
Expand All @@ -84,7 +84,7 @@ private[mesos] class MesosCoarseGrainedSchedulerSource(
// Number of tasks vs max
if (scheduler.isExecutorLimitEnabled) {
// executorLimit is assigned asynchronously, so it may start off with a zero value.
metricRegistry.register(MetricRegistry.name("executor", "count_of_max"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("count_of_max"), new Gauge[Int] {
override def getValue: Int = {
if (scheduler.getExecutorLimit == 0) {
0
Expand All @@ -95,19 +95,19 @@ private[mesos] class MesosCoarseGrainedSchedulerSource(
})
}
// Number of task failures
metricRegistry.register(MetricRegistry.name("executor", "failures"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("failures"), new Gauge[Int] {
override def getValue: Int = scheduler.getTaskFailureCount
})
// Number of tracked agents regardless of whether we're currently present on them
metricRegistry.register(MetricRegistry.name("executor", "known_agents"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("known_agents"), new Gauge[Int] {
override def getValue: Int = scheduler.getKnownAgentsCount
})
// Number of tracked agents with tasks on them
metricRegistry.register(MetricRegistry.name("executor", "occupied_agents"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("occupied_agents"), new Gauge[Int] {
override def getValue: Int = scheduler.getOccupiedAgentsCount
})
// Number of blacklisted agents (too many failures)
metricRegistry.register(MetricRegistry.name("executor", "blacklisted_agents"), new Gauge[Int] {
metricRegistry.register(MetricRegistry.name("blacklisted_agents"), new Gauge[Int] {
override def getValue: Int = scheduler.getBlacklistedAgentCount
})

Expand All @@ -116,63 +116,63 @@ private[mesos] class MesosCoarseGrainedSchedulerSource(

// Rate of offers received (total number of offers, not offer RPCs)
private val offerCounter =
metricRegistry.counter(MetricRegistry.name("executor", "mesos", "offer"))
metricRegistry.counter(MetricRegistry.name("offers", "received"))
// Rate of all offers declined, sum of the following reasons for declines
private val declineCounter =
metricRegistry.counter(MetricRegistry.name("executor", "mesos", "decline"))
metricRegistry.counter(MetricRegistry.name("offers", "declined"))
// Offers declined for unmet requirements (with RejectOfferDurationForUnmetConstraints)
private val declineUnmetCounter =
metricRegistry.counter(MetricRegistry.name("executor", "mesos", "decline_unmet"))
metricRegistry.counter(MetricRegistry.name("offers", "declined_unmet"))
// Offers declined when the deployment is finished (with RejectOfferDurationForReachedMaxCores)
private val declineFinishedCounter =
metricRegistry.counter(MetricRegistry.name("executor", "mesos", "decline_finished"))
metricRegistry.counter(MetricRegistry.name("offers", "declined_finished"))
// Offers declined when offers are being unused (no duration in the decline filter)
private val declineUnusedCounter =
metricRegistry.counter(MetricRegistry.name("executor", "mesos", "decline_unused"))
metricRegistry.counter(MetricRegistry.name("offers", "declined_unused"))
// Rate of revive operations
private val reviveCounter =
metricRegistry.counter(MetricRegistry.name("executor", "mesos", "revive"))
metricRegistry.counter(MetricRegistry.name("offers", "revived"))
// Rate of launch operations
private val launchCounter =
metricRegistry.counter(MetricRegistry.name("executor", "mesos", "launch"))
metricRegistry.counter(MetricRegistry.name("offers", "launched"))

// Counters for Spark states on launched executors (LAUNCHING, RUNNING, ...)
private val sparkStateCounters = TaskState.values
.map(state => (state, metricRegistry.counter(
MetricRegistry.name("executor", "spark_state", state.toString.toLowerCase))))
MetricRegistry.name("spark_state", state.toString.toLowerCase))))
.toMap
private val sparkUnknownStateCounter =
metricRegistry.counter(MetricRegistry.name("executor", "spark_state", "UNKNOWN"))
metricRegistry.counter(MetricRegistry.name("spark_state", "UNKNOWN"))
// Counters for Mesos states on launched executors (TASK_RUNNING, TASK_LOST, ...),
// more granular than sparkStateCounters
private val mesosStateCounters = MesosTaskState.values
.map(state => (state, metricRegistry.counter(
MetricRegistry.name("executor", "mesos_state", state.name.toLowerCase))))
MetricRegistry.name("mesos_state", state.name.toLowerCase))))
.toMap
private val mesosUnknownStateCounter =
metricRegistry.counter(MetricRegistry.name("executor", "mesos_state", "UNKNOWN"))
metricRegistry.counter(MetricRegistry.name("mesos_state", "UNKNOWN"))

// TASK TIMER METRICS:
// These metrics measure the duration to launch and run executors

// Duration from driver start to the first task launching.
private val startToFirstLaunched =
metricRegistry.timer(MetricRegistry.name("executor", "start_to_first_launched"))
metricRegistry.timer(MetricRegistry.name("start_to_first_launched"))
// Duration from driver start to the first task running.
private val startToFirstRunning =
metricRegistry.timer(MetricRegistry.name("executor", "start_to_first_running"))
metricRegistry.timer(MetricRegistry.name("start_to_first_running"))

// Duration from driver start to maxCores footprint being filled
private val startToAllLaunched =
metricRegistry.timer(MetricRegistry.name("executor", "start_to_all_launched"))
metricRegistry.timer(MetricRegistry.name("start_to_all_launched"))

// Duration between an executor launch and the executor entering a given spark state, e.g. RUNNING
private val launchToSparkStateTimers = TaskState.values
.map(state => (state, metricRegistry.timer(
MetricRegistry.name("executor", "launch_to_spark_state", state.toString.toLowerCase))))
MetricRegistry.name("launch_to_spark_state", state.toString.toLowerCase))))
.toMap
private val launchToUnknownSparkStateTimer = metricRegistry.timer(
MetricRegistry.name("executor", "launch_to_spark_state", "UNKNOWN"))
MetricRegistry.name("launch_to_spark_state", "UNKNOWN"))

// Time that the scheduler was initialized. This is the 'start time'.
private val schedulerInitTime = new Date
Expand Down