-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24958][CORE] Add memory from procfs to executor metrics. #22612
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
3f8321a
cd16a75
94c2b04
062f5d7
245221d
c72be03
8f3c938
f2dca27
a9f924c
a11e3a2
34ad625
415f976
067b81d
18ee4ad
7f7ed2b
f3867ff
0f8f3e2
ea08c61
8f20857
6e65360
4659f4a
ef4be38
805741c
4c1f073
0a7402e
3d65b35
6eab315
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
…urning set of metrics.
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -45,7 +45,7 @@ private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extend | |
| var pageSize = computePageSize() | ||
| var isAvailable: Boolean = isProcfsAvailable | ||
| private val pid = computePid() | ||
|
||
| private val ptree = mutable.Map[ Int, Set[Int]]() | ||
| private var ptree = mutable.Map[ Int, Set[Int]]() | ||
|
|
||
| var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) | ||
|
|
||
|
|
@@ -84,7 +84,7 @@ private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extend | |
| return pid; | ||
| } | ||
| catch { | ||
| case e: SparkException => logDebug("IO Exception when trying to compute process tree." + | ||
| case e: SparkException => logWarning("Exception when trying to compute process tree." + | ||
| " As a result reporting of ProcessTree metrics is stopped", e) | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| isAvailable = false | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return -1 | ||
|
|
@@ -95,15 +95,23 @@ private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extend | |
| if (testing) { | ||
| return 0; | ||
| } | ||
| val cmd = Array("getconf", "PAGESIZE") | ||
| val out2 = Utils.executeAndGetOutput(cmd) | ||
| return Integer.parseInt(out2.split("\n")(0)) | ||
| try { | ||
| val cmd = Array("getconf", "PAGESIZE") | ||
| val out2 = Utils.executeAndGetOutput(cmd) | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return Integer.parseInt(out2.split("\n")(0)) | ||
| } catch { | ||
| case e: Exception => logWarning("Exception when trying to compute pagesize, as a" + | ||
| " result reporting of ProcessTree metrics is stopped") | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| isAvailable = false | ||
| return 0 | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
|
|
||
| private def computeProcessTree(): Unit = { | ||
| if (!isAvailable || testing) { | ||
| return | ||
| } | ||
| ptree = mutable.Map[ Int, Set[Int]]() | ||
| val queue = mutable.Queue.empty[Int] | ||
| queue += pid | ||
| while( !queue.isEmpty ) { | ||
|
|
@@ -121,34 +129,34 @@ private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extend | |
|
|
||
| private def getChildPids(pid: Int): ArrayBuffer[Int] = { | ||
| try { | ||
| val cmd = Array("pgrep", "-P", pid.toString) | ||
| // val cmd = Array("pgrep", "-P", pid.toString) | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| val builder = new ProcessBuilder("pgrep", "-P", pid.toString) | ||
| val process = builder.start() | ||
| val output = new StringBuilder() | ||
| // val output = new StringBuilder() | ||
| val threadName = "read stdout for " + "pgrep" | ||
| def appendToOutput(s: String): Unit = output.append(s).append("\n") | ||
| val childPidsInInt = mutable.ArrayBuffer.empty[Int] | ||
| def appendChildPid(s: String): Unit = { | ||
| if (s != "") { | ||
| logDebug("Found a child pid:" + s) | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| childPidsInInt += Integer.parseInt(s) | ||
| } | ||
| } | ||
| val stdoutThread = Utils.processStreamByLine(threadName, | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| process.getInputStream, appendToOutput) | ||
| process.getInputStream, appendChildPid) | ||
| val exitCode = process.waitFor() | ||
| stdoutThread.join() | ||
| // pgrep will have exit code of 1 if there are more than one child process | ||
| // and it will have a exit code of 2 if there is no child process | ||
| if (exitCode != 0 && exitCode > 2) { | ||
| logError(s"Process $cmd exited with code $exitCode: $output") | ||
| val cmd = builder.command().toArray.mkString(" ") | ||
| logWarning(s"Process $cmd" + | ||
| s" exited with code $exitCode, with stderr:" + s"${process.getErrorStream} ") | ||
|
||
| throw new SparkException(s"Process $cmd exited with code $exitCode") | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| val childPids = output.toString.split("\n") | ||
| val childPidsInInt = mutable.ArrayBuffer.empty[Int] | ||
| for (p <- childPids) { | ||
| if (p != "") { | ||
| logDebug("Found a child pid: " + p) | ||
| childPidsInInt += Integer.parseInt(p) | ||
| } | ||
| } | ||
| childPidsInInt | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } catch { | ||
| case e: IOException => logDebug("IO Exception when trying to compute process tree." + | ||
| " As a result reporting of ProcessTree metrics is stopped", e) | ||
| case e: Exception => logWarning("Exception when trying to compute process tree." + | ||
| " As a result reporting of ProcessTree metrics is stopped.", e) | ||
| isAvailable = false | ||
| return mutable.ArrayBuffer.empty[Int] | ||
| } | ||
|
|
@@ -173,54 +181,42 @@ private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extend | |
| val vmem = procInfoSplit(22).toLong | ||
| val rssPages = procInfoSplit(23).toLong | ||
| if (procInfoSplit(1).toLowerCase(Locale.US).contains("java")) { | ||
|
||
| allMetrics = ProcfsBasedSystemsMetrics( | ||
| allMetrics.jvmVmemTotal + vmem, | ||
| allMetrics.jvmRSSTotal + (rssPages*pageSize), | ||
| allMetrics.pythonVmemTotal, | ||
| allMetrics.pythonRSSTotal, | ||
| allMetrics.otherVmemTotal, | ||
| allMetrics.otherRSSTotal | ||
| allMetrics = allMetrics.copy( | ||
| jvmVmemTotal = allMetrics.jvmVmemTotal + vmem, | ||
| jvmRSSTotal = allMetrics.jvmRSSTotal + (rssPages*pageSize) | ||
| ) | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| else if (procInfoSplit(1).toLowerCase(Locale.US).contains("python")) { | ||
| allMetrics = ProcfsBasedSystemsMetrics( | ||
| allMetrics.jvmVmemTotal, | ||
| allMetrics.jvmRSSTotal, | ||
| allMetrics.pythonVmemTotal + vmem, | ||
| allMetrics.pythonRSSTotal + (rssPages*pageSize), | ||
| allMetrics.otherVmemTotal, | ||
| allMetrics.otherRSSTotal | ||
| allMetrics = allMetrics.copy( | ||
| pythonVmemTotal = allMetrics.pythonVmemTotal + vmem, | ||
| pythonRSSTotal = allMetrics.pythonRSSTotal + (rssPages*pageSize) | ||
| ) | ||
| } | ||
| else { | ||
| allMetrics = ProcfsBasedSystemsMetrics( | ||
| allMetrics.jvmVmemTotal, | ||
| allMetrics.jvmRSSTotal, | ||
| allMetrics.pythonVmemTotal, | ||
| allMetrics.pythonRSSTotal, | ||
| allMetrics.otherVmemTotal + vmem, | ||
| allMetrics.otherRSSTotal + (rssPages*pageSize) | ||
| allMetrics = allMetrics.copy( | ||
| otherVmemTotal = allMetrics.otherVmemTotal + vmem, | ||
| otherRSSTotal = allMetrics.otherRSSTotal + (rssPages*pageSize) | ||
| ) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } catch { | ||
| case f: FileNotFoundException => logDebug("There was a problem with reading" + | ||
| " the stat file of the process", f) | ||
| case f: FileNotFoundException => logWarning("There was a problem with reading" + | ||
| " the stat file of the process. ", f) | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
|
|
||
| private[spark] def computeAllMetrics(): Unit = { | ||
| private[spark] def computeAllMetrics(): ProcfsBasedSystemsMetrics = { | ||
| if (!isAvailable) { | ||
| allMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) | ||
| return | ||
| return ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) | ||
| } | ||
| computeProcessTree | ||
| val pids = ptree.keySet | ||
| allMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) | ||
| for (p <- pids) { | ||
| computeProcessInfo(p) | ||
|
||
| } | ||
| return allMetrics | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,16 +27,19 @@ import org.apache.spark.memory.MemoryManager | |
| */ | ||
| sealed trait ExecutorMetricType { | ||
| private[spark] def getMetricValue(memoryManager: MemoryManager): Long = 0 | ||
|
||
| private[spark] def getMetricSet(memoryManager: MemoryManager): Map[String, Long] = | ||
| Map.empty[ String, Long] | ||
| private[spark] val name = getClass().getName().stripSuffix("$").split("""\.""").last | ||
| private[spark] def getMetricSet(memoryManager: MemoryManager): Array[Long] = { | ||
| new Array[Long](0) | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| private[spark] def names = Seq(getClass().getName().stripSuffix("$").split("""\.""").last) | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| private[spark] abstract class MemoryManagerExecutorMetricType( | ||
| f: MemoryManager => Long) extends ExecutorMetricType { | ||
| override private[spark] def getMetricSet(memoryManager: MemoryManager): Map[String, Long] = { | ||
| var metricAsSet = Map.empty[String, Long] | ||
| metricAsSet += (name -> f(memoryManager)) | ||
| override private[spark] def getMetricSet(memoryManager: MemoryManager): Array[Long] = { | ||
| val metricAsSet = new Array[Long](names.length) | ||
| (0 until names.length ).foreach { idx => | ||
| metricAsSet(idx) = (f(memoryManager)) | ||
| } | ||
| metricAsSet | ||
| } | ||
| override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = { | ||
|
|
@@ -50,15 +53,22 @@ private[spark] abstract class MBeanExecutorMetricType(mBeanName: String) | |
| ManagementFactory.getPlatformMBeanServer, | ||
| new ObjectName(mBeanName).toString, classOf[BufferPoolMXBean]) | ||
|
|
||
| override private[spark] def getMetricSet(memoryManager: MemoryManager): Array[Long] = { | ||
| val metricAsSet = new Array[Long](1) | ||
| metricAsSet(0) = bean.getMemoryUsed | ||
| metricAsSet | ||
| } | ||
|
|
||
| override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = { | ||
| bean.getMemoryUsed | ||
| } | ||
| } | ||
|
|
||
| case object JVMHeapMemory extends ExecutorMetricType { | ||
| override private[spark] def getMetricSet(memoryManager: MemoryManager): Map[String, Long] = { | ||
| var metricAsSet = Map.empty[String, Long] | ||
| metricAsSet += (name -> ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed()) | ||
|
|
||
| override private[spark] def getMetricSet(memoryManager: MemoryManager): Array[Long] = { | ||
| val metricAsSet = new Array[Long](1) | ||
| metricAsSet(0) = ( ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed()) | ||
| metricAsSet | ||
| } | ||
| override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = { | ||
|
|
@@ -67,9 +77,9 @@ case object JVMHeapMemory extends ExecutorMetricType { | |
| } | ||
|
|
||
| case object JVMOffHeapMemory extends ExecutorMetricType { | ||
| override private[spark] def getMetricSet(memoryManager: MemoryManager): Map[String, Long] = { | ||
| var metricAsSet = Map.empty[String, Long] | ||
| metricAsSet += (name -> ManagementFactory.getMemoryMXBean.getNonHeapMemoryUsage().getUsed()) | ||
| override private[spark] def getMetricSet(memoryManager: MemoryManager): Array[Long] = { | ||
| val metricAsSet = new Array[ Long](1) | ||
| metricAsSet(0) = ( ManagementFactory.getMemoryMXBean.getNonHeapMemoryUsage().getUsed()) | ||
| metricAsSet | ||
| } | ||
| override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = { | ||
|
|
@@ -78,21 +88,22 @@ case object JVMOffHeapMemory extends ExecutorMetricType { | |
| } | ||
|
|
||
| case object ProcessTreeMetrics extends ExecutorMetricType { | ||
| override private[spark] def getMetricSet(memoryManager: MemoryManager): Map[String, Long] = { | ||
| ExecutorMetricType.pTreeInfo.computeAllMetrics() | ||
| var processTreeMetrics = Map.empty[String, Long] | ||
| processTreeMetrics += ("ProcessTreeJVMVMemory" -> | ||
| ExecutorMetricType.pTreeInfo.allMetrics.jvmVmemTotal ) | ||
| processTreeMetrics += ("ProcessTreeJVMRSSMemory" -> | ||
| ExecutorMetricType.pTreeInfo.allMetrics.jvmRSSTotal ) | ||
| processTreeMetrics += ("ProcessTreePythonVMemory" -> | ||
| ExecutorMetricType.pTreeInfo.allMetrics.pythonVmemTotal ) | ||
| processTreeMetrics += ("ProcessTreePythonRSSMemory" -> | ||
| ExecutorMetricType.pTreeInfo.allMetrics.pythonRSSTotal ) | ||
| processTreeMetrics += ("ProcessTreeOtherVMemory" -> | ||
| ExecutorMetricType.pTreeInfo.allMetrics.otherVmemTotal ) | ||
| processTreeMetrics += ("ProcessTreeOtherRSSMemory" -> | ||
| ExecutorMetricType.pTreeInfo.allMetrics.otherRSSTotal ) | ||
| override val names = Seq( | ||
| "ProcessTreeJVMVMemory", | ||
| "ProcessTreeJVMRSSMemory", | ||
| "ProcessTreePythonVMemory", | ||
| "ProcessTreePythonRSSMemory", | ||
| "ProcessTreeOtherVMemory", | ||
| "ProcessTreeOtherRSSMemory") | ||
| override private[spark] def getMetricSet(memoryManager: MemoryManager): Array[Long] = { | ||
| val allMetrics = ExecutorMetricType.pTreeInfo.computeAllMetrics() | ||
| val processTreeMetrics = new Array[Long](names.length) | ||
| processTreeMetrics(0) = allMetrics.jvmVmemTotal | ||
| processTreeMetrics(1) = allMetrics.jvmRSSTotal | ||
| processTreeMetrics(2) = allMetrics.pythonVmemTotal | ||
| processTreeMetrics(3) = allMetrics.pythonRSSTotal | ||
| processTreeMetrics(4) = allMetrics.otherVmemTotal | ||
| processTreeMetrics(5) = allMetrics.otherRSSTotal | ||
| processTreeMetrics | ||
rezasafi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
|
|
@@ -138,26 +149,15 @@ private[spark] object ExecutorMetricType { | |
| MappedPoolMemory, | ||
| ProcessTreeMetrics | ||
| ) | ||
| // List of defined metrics | ||
| val definedMetrics = IndexedSeq( | ||
| "JVMHeapMemory", | ||
| "JVMOffHeapMemory", | ||
| "OnHeapExecutionMemory", | ||
| "OffHeapExecutionMemory", | ||
| "OnHeapStorageMemory", | ||
| "OffHeapStorageMemory", | ||
| "OnHeapUnifiedMemory", | ||
| "OffHeapUnifiedMemory", | ||
| "DirectPoolMemory", | ||
| "MappedPoolMemory", | ||
| "ProcessTreeJVMVMemory", | ||
| "ProcessTreeJVMRSSMemory", | ||
| "ProcessTreePythonVMemory", | ||
| "ProcessTreePythonRSSMemory", | ||
| "ProcessTreeOtherVMemory", | ||
| "ProcessTreeOtherRSSMemory" | ||
| ) | ||
|
|
||
| val metricIdxMap = | ||
| Map[String, Int](ExecutorMetricType.definedMetrics.zipWithIndex: _*) | ||
| var definedMetricsAndOffset = Map.empty[String, Int] | ||
| var numberOfMetrics = 0 | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| metricGetters.foreach { m => | ||
| var metricInSet = 0 | ||
| while (metricInSet < m.names.length) { | ||
| definedMetricsAndOffset += (m.names(metricInSet) -> (metricInSet + numberOfMetrics) ) | ||
| metricInSet += 1 | ||
| } | ||
| numberOfMetrics += m.names.length | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.