Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
3f8321a
Integration of ProcessTreeMetrics with PR 21221
Jul 26, 2018
cd16a75
Changing the position of ptree and also make the computation configur…
Aug 7, 2018
94c2b04
Seperate metrics for jvm, python and others and update the tests
Aug 8, 2018
062f5d7
Update JsonProtocolSuite
Sep 25, 2018
245221d
[SPARK-24958] Add executors' process tree total memory information to…
Oct 2, 2018
c72be03
Adressing most of Imran's comments
Oct 3, 2018
8f3c938
Fixing the scala style and some minor comments
Oct 3, 2018
f2dca27
Removing types from the definitions where ever possible
Oct 4, 2018
a9f924c
Using Utils methods when possible or use ProcessBuilder
Oct 5, 2018
a11e3a2
make use of Utils.trywithresources
Oct 5, 2018
34ad625
Changing ExecutorMericType and ExecutorMetrics to use a map instead o…
Oct 9, 2018
415f976
Changing ExecutorMetric to use array instead of a map
Oct 10, 2018
067b81d
A small cosmetic change
Oct 10, 2018
18ee4ad
Merge branch 'master' of https://github.com/apache/spark into ptreeme…
Oct 17, 2018
7f7ed2b
Applying latest review commments. Using Arrays instead of Map for ret…
Oct 23, 2018
f3867ff
Merge branch 'master' of https://github.com/apache/spark into ptreeme…
Nov 5, 2018
0f8f3e2
Fix an issue with jsonProtoclSuite
Nov 5, 2018
ea08c61
Fix scalastyle issue
Nov 5, 2018
8f20857
Applying latest review comments
Nov 14, 2018
6e65360
Using the companion object and other stuff
Nov 27, 2018
4659f4a
Update the use of process builder and applying other review comments
Nov 28, 2018
ef4be38
Small style fixes based on reviews
Nov 30, 2018
805741c
Applying review comments, mostly style related
Nov 30, 2018
4c1f073
emove the unnecessary trywithresources
Nov 30, 2018
0a7402e
Applying the comment about error handling and some more style fixes
Dec 4, 2018
3d65b35
Removing a return
Dec 6, 2018
6eab315
Reordering of info in a test resource file to avoid confusion
Dec 6, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Applying latest review comments
  • Loading branch information
Reza Safi committed Nov 14, 2018
commit 8f208574a293a94d9029cc4adb45b03e7a67ed47
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/Heartbeater.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ private[spark] class Heartbeater(
* Get the current executor level metrics. These are returned as an array
*/
def getCurrentMetrics(): ExecutorMetrics = {
val metrics = new Array[Long](ExecutorMetricType.numberOfMetrics)
val metrics = new Array[Long](ExecutorMetricType.numMetrics)
var offset = 0
ExecutorMetricType.metricGetters.foreach { metric =>
val newSetOfMetrics = metric.getMetricSet(memoryManager)
val newSetOfMetrics = metric.getMetricValues(memoryManager)
Array.copy(newSetOfMetrics, 0, metrics, offset, newSetOfMetrics.size)
offset += newSetOfMetrics.length
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ import org.apache.spark.metrics.ExecutorMetricType
@DeveloperApi
class ExecutorMetrics private[spark] extends Serializable {

private val metrics = new Array[Long](ExecutorMetricType.numberOfMetrics)
private val metrics = new Array[Long](ExecutorMetricType.numMetrics)
// the first element is initialized to -1, indicating that the values for the array
// haven't been set yet.
metrics(0) = -1

/** Returns the value for the specified metric. */
def getMetricValue(metricName: String): Long = {
metrics(ExecutorMetricType.definedMetricsAndOffset.get(metricName).get)
metrics(ExecutorMetricType.metricToOffset.get(metricName).get)
}

/** Returns true if the values for the metrics have been set, false otherwise. */
Expand All @@ -53,7 +53,7 @@ class ExecutorMetrics private[spark] extends Serializable {
*/
private[spark] def this(executorMetrics: Map[String, Long]) {
this()
ExecutorMetricType.definedMetricsAndOffset.map { m =>
ExecutorMetricType.metricToOffset.map { m =>
metrics(m._2) = executorMetrics.getOrElse(m._1, 0L)
}
}
Expand All @@ -67,10 +67,10 @@ class ExecutorMetrics private[spark] extends Serializable {
*/
private[spark] def compareAndUpdatePeakValues(executorMetrics: ExecutorMetrics): Boolean = {
var updated = false
ExecutorMetricType.definedMetricsAndOffset.map {m =>
if (executorMetrics.metrics(m._2) > metrics(m._2)) {
ExecutorMetricType.metricToOffset.map { case (_, idx) =>
if (executorMetrics.metrics(idx) > metrics(idx)) {
updated = true
metrics(m._2) = executorMetrics.metrics(m._2)
metrics(idx) = executorMetrics.metrics(idx)
}
}
updated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,29 +45,33 @@ private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extend
var pageSize = computePageSize()
var isAvailable: Boolean = isProcfsAvailable
private val pid = computePid()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pageSize is only a var for testing -- instead just optionally pass it in to the constructor

also I think all of these can be private.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I can't call computePageSize() in the constructor signature to compute the default value. Another solution is to check for testing inside computePageSize and if we are testing assign a value to it that is provided in the constructor (default to 4096).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't put it as a default value, but if you make it a static method, then you can provide an overloaded method which uses it, see squito@cf00835

But, I think your other proposal is even better, if its testing just give it a fixed value (no need to even make it an argument to the constructor at all).

private var ptree = mutable.Map[ Int, Set[Int]]()

var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0)
// var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0)

computeProcessTree()

private def isProcfsAvailable: Boolean = {
private lazy val isProcfsAvailable: Boolean = {
if (testing) {
return true
true
}
try {
if (!Files.exists(Paths.get(procfsDir))) {
return false
else {
var procDirExists = true
try {
if (!Files.exists(Paths.get(procfsDir))) {
procDirExists = false
}
}
catch {
case f: IOException =>
logWarning("It seems that procfs isn't supported", f)
procDirExists = false
}
val shouldLogStageExecutorMetrics =
SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
val shouldLogStageExecutorProcessTreeMetrics =
SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
procDirExists && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics
}
catch {
case f: FileNotFoundException => return false
}
val shouldLogStageExecutorMetrics =
SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
val shouldLogStageExecutorProcessTreeMetrics =
SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics
}

private def computePid(): Int = {
Expand All @@ -78,13 +82,13 @@ private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extend
// This can be simplified in java9:
// https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
val cmd = Array("bash", "-c", "echo $PPID")
val length = 10
val out2 = Utils.executeAndGetOutput(cmd)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be out instead of out2

val pid = Integer.parseInt(out2.split("\n")(0))
return pid;
}
catch {
case e: SparkException => logWarning("Exception when trying to compute process tree." +
case e: SparkException =>
logWarning("Exception when trying to compute process tree." +
" As a result reporting of ProcessTree metrics is stopped", e)
isAvailable = false
return -1
Expand All @@ -97,8 +101,8 @@ private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extend
}
try {
val cmd = Array("getconf", "PAGESIZE")
val out2 = Utils.executeAndGetOutput(cmd)
return Integer.parseInt(out2.split("\n")(0))
val out = Utils.executeAndGetOutput(cmd)
return Integer.parseInt(out.split("\n")(0))
} catch {
case e: Exception => logWarning("Exception when trying to compute pagesize, as a" +
" result reporting of ProcessTree metrics is stopped")
Expand All @@ -107,24 +111,23 @@ private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extend
}
}

private def computeProcessTree(): Unit = {
private def computeProcessTree(): Set[Int] = {
if (!isAvailable || testing) {
return
return Set()
}
ptree = mutable.Map[ Int, Set[Int]]()
var ptree: Set[Int] = Set()
ptree += pid
val queue = mutable.Queue.empty[Int]
queue += pid
while( !queue.isEmpty ) {
val p = queue.dequeue()
val c = getChildPids(p)
if(!c.isEmpty) {
queue ++= c
ptree += (p -> c.toSet)
}
else {
ptree += (p -> Set[Int]())
ptree ++= c.toSet
}
}
ptree
}

private def getChildPids(pid: Int): ArrayBuffer[Int] = {
Expand Down Expand Up @@ -162,15 +165,17 @@ private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extend
}
}

def computeProcessInfo(pid: Int): Unit = {
/*
def computeProcessInfo(allMetrics: ProcfsBasedSystemsMetrics, pid: Int):
ProcfsBasedSystemsMetrics = {
/*
* Hadoop ProcfsBasedProcessTree class used regex and pattern matching to retrive the memory
* info. I tried that but found it not correct during tests, so I used normal string analysis
* instead. The computation of RSS and Vmem are based on proc(5):
* http://man7.org/linux/man-pages/man5/proc.5.html
*/
try {
val pidDir = new File(procfsDir, pid.toString)
var allMetricsUpdated = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0)
Utils.tryWithResource( new InputStreamReader(
new FileInputStream(
new File(pidDir, procfsStatFile)), Charset.forName("UTF-8"))) { fReader =>
Expand All @@ -181,41 +186,42 @@ private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extend
val vmem = procInfoSplit(22).toLong
val rssPages = procInfoSplit(23).toLong
if (procInfoSplit(1).toLowerCase(Locale.US).contains("java")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this just be vmem and rssPages, rather than splitting into JVM, Python, and other? Can you explain more about how the separate values would be used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is separated since it turns out knowing main actors like jvm in seperation can have some value for the user. We just consider jvm (case of pur scala) and python (case of using pyspark). Other stuff can be added per interest in future, but for now we consider everything else under "Other" category

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edwinalu It would be nice to have a break up of the total memory being consumed. Its easier to tune the parameters knowing what is consuming all the memory. For example if your container died OOMing - it helps to know if it was because of python or JVM. Also R fits in the other category so it makes sense to have all 3 of them as of now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have much pyspark ourselves, but yes, it seems useful to have the breakdown, and it's easy to sum the values for the total.

allMetrics = allMetrics.copy(
allMetricsUpdated = allMetrics.copy(
jvmVmemTotal = allMetrics.jvmVmemTotal + vmem,
jvmRSSTotal = allMetrics.jvmRSSTotal + (rssPages*pageSize)
)
}
else if (procInfoSplit(1).toLowerCase(Locale.US).contains("python")) {
allMetrics = allMetrics.copy(
allMetricsUpdated = allMetrics.copy(
pythonVmemTotal = allMetrics.pythonVmemTotal + vmem,
pythonRSSTotal = allMetrics.pythonRSSTotal + (rssPages*pageSize)
)
}
else {
allMetrics = allMetrics.copy(
allMetricsUpdated = allMetrics.copy(
otherVmemTotal = allMetrics.otherVmemTotal + vmem,
otherRSSTotal = allMetrics.otherRSSTotal + (rssPages*pageSize)
)
}
}
}
}
allMetricsUpdated
} catch {
case f: FileNotFoundException => logWarning("There was a problem with reading" +
" the stat file of the process. ", f)
ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0)
}
}

private[spark] def computeAllMetrics(): ProcfsBasedSystemsMetrics = {
if (!isAvailable) {
return ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0)
}
computeProcessTree
val pids = ptree.keySet
allMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0)
val pids = computeProcessTree
var allMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0)
for (p <- pids) {
computeProcessInfo(p)
allMetrics = computeProcessInfo(allMetrics, p)
}
return allMetrics
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,62 +28,50 @@ import org.apache.spark.memory.MemoryManager
* Executor metric types for executor-level metrics stored in ExecutorMetrics.
*/
sealed trait ExecutorMetricType {
private[spark] def getMetricValue(memoryManager: MemoryManager): Long = 0
private[spark] def getMetricSet(memoryManager: MemoryManager): Array[Long] = {
private[spark] def getMetricValues(memoryManager: MemoryManager): Array[Long] = {
new Array[Long](0)
}
private[spark] def names = Seq(getClass().getName().stripSuffix("$").split("""\.""").last)
private[spark] def names: Seq[String] = Seq()
}

private[spark] abstract class MemoryManagerExecutorMetricType(
f: MemoryManager => Long) extends ExecutorMetricType {
override private[spark] def getMetricSet(memoryManager: MemoryManager): Array[Long] = {
val metricAsSet = new Array[Long](names.length)
(0 until names.length ).foreach { idx =>
metricAsSet(idx) = (f(memoryManager))
}
metricAsSet
sealed trait SingleValueExecutorMetricType extends ExecutorMetricType {
override private[spark] def names = Seq(getClass().getName().
stripSuffix("$").split("""\.""").last)

override private[spark] def getMetricValues(memoryManager: MemoryManager): Array[Long] = {
val metrics = new Array[Long](1)
metrics(0) = getMetricValue(memoryManager)
metrics
}

private[spark] def getMetricValue(memoryManager: MemoryManager): Long = 0
}

private[spark] abstract class MemoryManagerExecutorMetricType(
f: MemoryManager => Long) extends SingleValueExecutorMetricType {
override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = {
f(memoryManager)
}
}

private[spark] abstract class MBeanExecutorMetricType(mBeanName: String)
extends ExecutorMetricType {
extends SingleValueExecutorMetricType {
private val bean = ManagementFactory.newPlatformMXBeanProxy(
ManagementFactory.getPlatformMBeanServer,
new ObjectName(mBeanName).toString, classOf[BufferPoolMXBean])

override private[spark] def getMetricSet(memoryManager: MemoryManager): Array[Long] = {
val metricAsSet = new Array[Long](1)
metricAsSet(0) = bean.getMemoryUsed
metricAsSet
}

override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = {
bean.getMemoryUsed
}
}

case object JVMHeapMemory extends ExecutorMetricType {

override private[spark] def getMetricSet(memoryManager: MemoryManager): Array[Long] = {
val metricAsSet = new Array[Long](1)
metricAsSet(0) = ( ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed())
metricAsSet
}
case object JVMHeapMemory extends SingleValueExecutorMetricType {
override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = {
ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed()
}
}

case object JVMOffHeapMemory extends ExecutorMetricType {
override private[spark] def getMetricSet(memoryManager: MemoryManager): Array[Long] = {
val metricAsSet = new Array[ Long](1)
metricAsSet(0) = ( ManagementFactory.getMemoryMXBean.getNonHeapMemoryUsage().getUsed())
metricAsSet
}
case object JVMOffHeapMemory extends SingleValueExecutorMetricType {
override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = {
ManagementFactory.getMemoryMXBean.getNonHeapMemoryUsage().getUsed()
}
Expand All @@ -97,7 +85,7 @@ case object ProcessTreeMetrics extends ExecutorMetricType {
"ProcessTreePythonRSSMemory",
"ProcessTreeOtherVMemory",
"ProcessTreeOtherRSSMemory")
override private[spark] def getMetricSet(memoryManager: MemoryManager): Array[Long] = {
override private[spark] def getMetricValues(memoryManager: MemoryManager): Array[Long] = {
val allMetrics = ExecutorMetricType.pTreeInfo.computeAllMetrics()
val processTreeMetrics = new Array[Long](names.length)
processTreeMetrics(0) = allMetrics.jvmVmemTotal
Expand Down Expand Up @@ -152,14 +140,18 @@ private[spark] object ExecutorMetricType {
ProcessTreeMetrics
)

var definedMetricsAndOffset = mutable.LinkedHashMap.empty[String, Int]
var numberOfMetrics = 0
metricGetters.foreach { m =>
var metricInSet = 0
while (metricInSet < m.names.length) {
definedMetricsAndOffset += (m.names(metricInSet) -> (metricInSet + numberOfMetrics) )
metricInSet += 1

val (metricToOffset, numMetrics) = {
var numberOfMetrics = 0
val definedMetricsAndOffset = mutable.LinkedHashMap.empty[String, Int]
metricGetters.foreach { m =>
var metricInSet = 0
while (metricInSet < m.names.length) {
definedMetricsAndOffset += (m.names(metricInSet) -> (metricInSet + numberOfMetrics))
metricInSet += 1
}
numberOfMetrics += m.names.length
}
numberOfMetrics += m.names.length
(definedMetricsAndOffset, numberOfMetrics)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ private[spark] class ExecutorMetricsJsonSerializer
jsonGenerator: JsonGenerator,
serializerProvider: SerializerProvider): Unit = {
metrics.foreach { m: ExecutorMetrics =>
val metricsMap = ExecutorMetricType.definedMetricsAndOffset.map { case (metric, _) =>
val metricsMap = ExecutorMetricType.metricToOffset.map { case (metric, _) =>
metric -> m.getMetricValue(metric)
}
jsonGenerator.writeObject(metricsMap)
Expand Down
7 changes: 2 additions & 5 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -394,12 +394,9 @@ private[spark] object JsonProtocol {

/** Convert executor metrics to JSON. */
def executorMetricsToJson(executorMetrics: ExecutorMetrics): JValue = {
val metrics = for {
(m, _) <- ExecutorMetricType.definedMetricsAndOffset
} yield {
val metrics = ExecutorMetricType.metricToOffset.map { case (m, _) =>
JField(m, executorMetrics.getMetricValue(m))
}

JObject(metrics.toSeq: _*)
}

Expand Down Expand Up @@ -614,7 +611,7 @@ private[spark] object JsonProtocol {
/** Extract the executor metrics from JSON. */
def executorMetricsFromJson(json: JValue): ExecutorMetrics = {
val metrics =
ExecutorMetricType.definedMetricsAndOffset.map { case (metric, _) =>
ExecutorMetricType.metricToOffset.map { case (metric, _) =>
metric -> jsonOption(json \ metric).map(_.extract[Long]).getOrElse(0L)
}
new ExecutorMetrics(metrics.toMap)
Expand Down
Loading