Skip to content
Prev Previous commit
Next Next commit
make configurable
  • Loading branch information
LantaoJin committed Feb 19, 2019
commit 94fd9625e32354b527729498815666ca3cd29012
21 changes: 21 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,27 @@ package object config {
.booleanConf
.createWithDefault(false)

private[spark] val EVENT_LOG_GARBAGE_COLLECTION_METRICS =
ConfigBuilder("spark.eventLog.logStageExecutorGCMetrics.enabled")
.booleanConf
.createWithDefault(true)

private[spark] val ADDITIONAL_YOUNG_GENERATION_GARBAGE_COLLECTORS =
ConfigBuilder("spark.additionalYoungGenerationGarbageCollectors")
.doc("Names of additional young generation garbage collector, " +
"usually is the return of GarbageCollectorMXBean.getName, e.g. ParNew.")
.stringConf
.toSequence
.createWithDefault(Nil)

private[spark] val ADDITIONAL_OLD_GENERATION_GARBAGE_COLLECTORS =
ConfigBuilder("spark.additionalOldGenerationGarbageCollectors")
.doc("Names of additional old generation garbage collector, " +
"usually is the return of GarbageCollectorMXBean.getName, e.g. ConcurrentMarkSweep.")
.stringConf
.toSequence
.createWithDefault(Nil)

private[spark] val EVENT_LOG_OVERWRITE =
ConfigBuilder("spark.eventLog.overwrite").booleanConf.createWithDefault(false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import javax.management.ObjectName
import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.SparkEnv
import org.apache.spark.executor.ProcfsMetricsGetter
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.memory.MemoryManager

/**
Expand Down Expand Up @@ -100,25 +102,46 @@ case object ProcessTreeMetrics extends ExecutorMetricType {
}
}

case object GarbageCollectionMetrics extends ExecutorMetricType {
case object GarbageCollectionMetrics extends ExecutorMetricType with Logging {
import GC_TYPE._
override val names = Seq(
"MinorGCCount",
"MinorGCTime",
"MajorGCCount",
"MajorGCTime"
)

private lazy val supported: Boolean = {
val shouldLogStageExecutorGCMetrics =
SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
val supportedVendor = System.getProperty("java.vendor").contains("Oracle") ||
System.getProperty("java.vendor").contains("OpenJDK")
shouldLogStageExecutorGCMetrics && supportedVendor
}

private lazy val youngGenGarbageCollector: Seq[String] = {
Seq(`copy`, `psScavenge`, `parNew`, `g1Young`) ++ /* additional young gc we added */
SparkEnv.get.conf.get(config.ADDITIONAL_YOUNG_GENERATION_GARBAGE_COLLECTORS)
}

private lazy val oldGenGarbageCollector: Seq[String] = {
Seq(`markSweepCompact`, `psMarkSweep`, `cms`, `g1Old`) ++ /* additional old gc we added */
SparkEnv.get.conf.get(config.ADDITIONAL_OLD_GENERATION_GARBAGE_COLLECTORS)
}

override private[spark] def getMetricValues(memoryManager: MemoryManager): Array[Long] = {
val gcMetrics = Array[Long](names.length) // minorCount, minorTime, majorCount, majorTime
ManagementFactory.getGarbageCollectorMXBeans.asScala.foreach { mxBean =>
mxBean.getName match {
case `copy` | `psScavenge` | `parNew` | `g1Young` =>
if (supported) {
ManagementFactory.getGarbageCollectorMXBeans.asScala.foreach { mxBean =>
if (youngGenGarbageCollector.contains(mxBean.getName)) {
gcMetrics(0) = mxBean.getCollectionCount
gcMetrics(1) = mxBean.getCollectionTime
case `markSweepCompact` | `psMarkSweep` | `cms` | `g1Old` =>
} else if (oldGenGarbageCollector.contains(mxBean.getName)) {
gcMetrics(2) = mxBean.getCollectionCount
gcMetrics(3) = mxBean.getCollectionTime
case _ =>
} else {
logDebug(s"${mxBean.getName} is an unsupported garbage collector.")
}
}
}
gcMetrics
Expand Down