Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Guard against misuse of MetricsSystem methods.
  • Loading branch information
JoshRosen committed Dec 16, 2014
commit 87a229240aa18a38d7ee77533cb6d07beb292133
26 changes: 20 additions & 6 deletions core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,36 @@ private[spark] class MetricsSystem private (
private val sources = new mutable.ArrayBuffer[Source]
private val registry = new MetricRegistry()

private var running: Boolean = false

// Treat MetricsServlet as a special sink as it should be exposed to add handlers to web ui
private var metricsServlet: Option[MetricsServlet] = None

/** Get any UI handlers used by this metrics system. */
def getServletHandlers = metricsServlet.map(_.getHandlers).getOrElse(Array())
/**
* Get any UI handlers used by this metrics system; can only be called after start().
*/
def getServletHandlers = {
require(running, "Can only call getServletHandlers on a running MetricsSystem")
metricsServlet.map(_.getHandlers).getOrElse(Array())
}

metricsConfig.initialize()

def start() {
require(!running, "Attempting to start a MetricsSystem that is already running")
running = true
registerSources()
registerSinks()
sinks.foreach(_.start)
}

def stop() {
sinks.foreach(_.stop)
if (running) {
sinks.foreach(_.stop)
} else {
logWarning("Stopping a MetricsSystem that is not running")
}
running = false
}

def report() {
Expand All @@ -107,7 +121,7 @@ private[spark] class MetricsSystem private (
* @return An unique metric name for each combination of
* application, executor/driver and metric source.
*/
def buildRegistryName(source: Source): String = {
private[spark] def buildRegistryName(source: Source): String = {
val appId = conf.getOption("spark.app.id")
val executorId = conf.getOption("spark.executor.id")
val defaultName = MetricRegistry.name(source.sourceName)
Expand Down Expand Up @@ -144,7 +158,7 @@ private[spark] class MetricsSystem private (
})
}

def registerSources() {
private def registerSources() {
val instConfig = metricsConfig.getInstance(instance)
val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)

Expand All @@ -160,7 +174,7 @@ private[spark] class MetricsSystem private (
}
}

def registerSinks() {
private def registerSinks() {
val instConfig = metricsConfig.getInstance(instance)
val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)

Expand Down