-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24958][CORE] Add memory from procfs to executor metrics. #22612
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
3f8321a
cd16a75
94c2b04
062f5d7
245221d
c72be03
8f3c938
f2dca27
a9f924c
a11e3a2
34ad625
415f976
067b81d
18ee4ad
7f7ed2b
f3867ff
0f8f3e2
ea08c61
8f20857
6e65360
4659f4a
ef4be38
805741c
4c1f073
0a7402e
3d65b35
6eab315
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,272 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.executor | ||
|
|
||
| import java.io._ | ||
| import java.nio.charset.Charset | ||
| import java.nio.file.{Files, Paths} | ||
| import java.util.Locale | ||
|
|
||
| import scala.collection.mutable | ||
| import scala.collection.mutable.ArrayBuffer | ||
|
|
||
| import org.apache.spark.{SparkEnv, SparkException} | ||
| import org.apache.spark.internal.{config, Logging} | ||
| import org.apache.spark.util.Utils | ||
|
|
||
| private[spark] case class ProcfsBasedSystemsMetrics( | ||
| jvmVmemTotal: Long, | ||
| jvmRSSTotal: Long, | ||
| pythonVmemTotal: Long, | ||
| pythonRSSTotal: Long, | ||
| otherVmemTotal: Long, | ||
| otherRSSTotal: Long) | ||
|
|
||
| // Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop | ||
| // project. | ||
| private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { | ||
| val procfsStatFile = "stat" | ||
| val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") | ||
| var pageSize = computePageSize() | ||
| var isAvailable: Boolean = isProcfsAvailable | ||
| private val pid = computePid() | ||
|
||
| private val ptree = mutable.Map[ Int, Set[Int]]() | ||
|
|
||
| var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) | ||
| private var latestJVMVmemTotal = 0L | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| private var latestJVMRSSTotal = 0L | ||
| private var latestPythonVmemTotal = 0L | ||
| private var latestPythonRSSTotal = 0L | ||
| private var latestOtherVmemTotal = 0L | ||
| private var latestOtherRSSTotal = 0L | ||
|
|
||
| computeProcessTree() | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| private def isProcfsAvailable: Boolean = { | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if (testing) { | ||
| return true | ||
| } | ||
| try { | ||
| if (!Files.exists(Paths.get(procfsDir))) { | ||
| return false | ||
| } | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| catch { | ||
| case f: FileNotFoundException => return false | ||
| } | ||
| val shouldLogStageExecutorMetrics = | ||
| SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS) | ||
| val shouldLogStageExecutorProcessTreeMetrics = | ||
| SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS) | ||
| shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics | ||
| } | ||
|
|
||
| private def computePid(): Int = { | ||
| if (!isAvailable || testing) { | ||
| return -1; | ||
| } | ||
| try { | ||
| // This can be simplified in java9: | ||
| // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html | ||
| val cmd = Array("bash", "-c", "echo $PPID") | ||
| val length = 10 | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| val out2 = Utils.executeAndGetOutput(cmd) | ||
|
||
| val pid = Integer.parseInt(out2.split("\n")(0)) | ||
| return pid; | ||
| } | ||
| catch { | ||
| case e: SparkException => logDebug("IO Exception when trying to compute process tree." + | ||
|
||
| " As a result reporting of ProcessTree metrics is stopped", e) | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| isAvailable = false | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return -1 | ||
| } | ||
| } | ||
|
|
||
| private def computePageSize(): Long = { | ||
| if (testing) { | ||
| return 0; | ||
| } | ||
| val cmd = Array("getconf", "PAGESIZE") | ||
| val out2 = Utils.executeAndGetOutput(cmd) | ||
| return Integer.parseInt(out2.split("\n")(0)) | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| private def computeProcessTree(): Unit = { | ||
| if (!isAvailable || testing) { | ||
| return | ||
| } | ||
| val queue = mutable.Queue.empty[Int] | ||
| queue += pid | ||
| while( !queue.isEmpty ) { | ||
| val p = queue.dequeue() | ||
| val c = getChildPids(p) | ||
| if(!c.isEmpty) { | ||
| queue ++= c | ||
| ptree += (p -> c.toSet) | ||
| } | ||
| else { | ||
| ptree += (p -> Set[Int]()) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private def getChildPids(pid: Int): ArrayBuffer[Int] = { | ||
| try { | ||
| val cmd = Array("pgrep", "-P", pid.toString) | ||
| val builder = new ProcessBuilder("pgrep", "-P", pid.toString) | ||
| val process = builder.start() | ||
| val output = new StringBuilder() | ||
| val threadName = "read stdout for " + "pgrep" | ||
| def appendToOutput(s: String): Unit = output.append(s).append("\n") | ||
| val stdoutThread = Utils.processStreamByLine(threadName, | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| process.getInputStream, appendToOutput) | ||
| val exitCode = process.waitFor() | ||
| stdoutThread.join() | ||
| // pgrep will have exit code of 1 if there are more than one child process | ||
| // and it will have a exit code of 2 if there is no child process | ||
| if (exitCode != 0 && exitCode > 2) { | ||
| logError(s"Process $cmd exited with code $exitCode: $output") | ||
| throw new SparkException(s"Process $cmd exited with code $exitCode") | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| val childPids = output.toString.split("\n") | ||
| val childPidsInInt = mutable.ArrayBuffer.empty[Int] | ||
| for (p <- childPids) { | ||
| if (p != "") { | ||
| logDebug("Found a child pid: " + p) | ||
| childPidsInInt += Integer.parseInt(p) | ||
| } | ||
| } | ||
| childPidsInInt | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } catch { | ||
| case e: IOException => logDebug("IO Exception when trying to compute process tree." + | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| " As a result reporting of ProcessTree metrics is stopped", e) | ||
| isAvailable = false | ||
| return mutable.ArrayBuffer.empty[Int] | ||
| } | ||
| } | ||
|
|
||
| def computeProcessInfo(pid: Int): Unit = { | ||
| /* | ||
| * Hadoop ProcfsBasedProcessTree class used regex and pattern matching to retrive the memory | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| * info. I tried that but found it not correct during tests, so I used normal string analysis | ||
| * instead. The computation of RSS and Vmem are based on proc(5): | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| * http://man7.org/linux/man-pages/man5/proc.5.html | ||
| */ | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| try { | ||
| val pidDir = new File(procfsDir, pid.toString) | ||
| Utils.tryWithResource( new InputStreamReader( | ||
| new FileInputStream( | ||
| new File(pidDir, procfsStatFile)), Charset.forName("UTF-8"))) { fReader => | ||
| Utils.tryWithResource( new BufferedReader(fReader)) { in => | ||
| val procInfo = in.readLine | ||
| val procInfoSplit = procInfo.split(" ") | ||
| if (procInfoSplit != null) { | ||
| val vmem = procInfoSplit(22).toLong | ||
| val rssPages = procInfoSplit(23).toLong | ||
| if (procInfoSplit(1).toLowerCase(Locale.US).contains("java")) { | ||
|
||
| latestJVMVmemTotal += vmem | ||
| latestJVMRSSTotal += rssPages | ||
| } | ||
| else if (procInfoSplit(1).toLowerCase(Locale.US).contains("python")) { | ||
| latestPythonVmemTotal += vmem | ||
| latestPythonRSSTotal += rssPages | ||
| } | ||
| else { | ||
| latestOtherVmemTotal += vmem | ||
| latestOtherRSSTotal += rssPages | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } catch { | ||
| case f: FileNotFoundException => logDebug("There was a problem with reading" + | ||
| " the stat file of the process", f) | ||
| } | ||
| } | ||
|
|
||
| def updateAllMetrics(): Unit = { | ||
| allMetrics = computeAllMetrics | ||
| } | ||
|
|
||
| private def computeAllMetrics(): ProcfsBasedSystemsMetrics = { | ||
| if (!isAvailable) { | ||
| return ProcfsBasedSystemsMetrics(-1, -1, -1, -1, -1, -1) | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| computeProcessTree | ||
| val pids = ptree.keySet | ||
| latestJVMRSSTotal = 0 | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| latestJVMVmemTotal = 0 | ||
| latestPythonRSSTotal = 0 | ||
| latestPythonVmemTotal = 0 | ||
| latestOtherRSSTotal = 0 | ||
| latestOtherVmemTotal = 0 | ||
| for (p <- pids) { | ||
| computeProcessInfo(p) | ||
|
||
| } | ||
| ProcfsBasedSystemsMetrics( | ||
| getJVMVirtualMemInfo, | ||
| getJVMRSSInfo, | ||
| getPythonVirtualMemInfo, | ||
| getPythonRSSInfo, | ||
| getOtherVirtualMemInfo, | ||
| getOtherRSSInfo) | ||
|
|
||
| } | ||
|
|
||
| def getOtherRSSInfo(): Long = { | ||
| if (!isAvailable) { | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return -1 | ||
| } | ||
| latestOtherRSSTotal*pageSize | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| def getOtherVirtualMemInfo(): Long = { | ||
| if (!isAvailable) { | ||
| return -1 | ||
| } | ||
| latestOtherVmemTotal | ||
| } | ||
|
|
||
| def getJVMRSSInfo(): Long = { | ||
| if (!isAvailable) { | ||
| return -1 | ||
| } | ||
| latestJVMRSSTotal*pageSize | ||
| } | ||
|
|
||
| def getJVMVirtualMemInfo(): Long = { | ||
| if (!isAvailable) { | ||
| return -1 | ||
| } | ||
| latestJVMVmemTotal | ||
| } | ||
|
|
||
| def getPythonRSSInfo(): Long = { | ||
| if (!isAvailable) { | ||
| return -1 | ||
| } | ||
| latestPythonRSSTotal*pageSize | ||
| } | ||
|
|
||
| def getPythonVirtualMemInfo(): Long = { | ||
| if (!isAvailable) { | ||
| return -1 | ||
| } | ||
| latestPythonVmemTotal | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ package org.apache.spark.metrics | |
| import java.lang.management.{BufferPoolMXBean, ManagementFactory} | ||
| import javax.management.ObjectName | ||
|
|
||
| import org.apache.spark.executor.ProcfsBasedSystems | ||
| import org.apache.spark.memory.MemoryManager | ||
|
|
||
| /** | ||
|
|
@@ -59,6 +60,43 @@ case object JVMOffHeapMemory extends ExecutorMetricType { | |
| } | ||
| } | ||
|
|
||
| case object ProcessTreeJVMRSSMemory extends ExecutorMetricType { | ||
| override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = { | ||
| ExecutorMetricType.pTreeInfo.updateAllMetrics() | ||
|
||
| ExecutorMetricType.pTreeInfo.allMetrics.jvmRSSTotal | ||
| } | ||
| } | ||
|
|
||
| case object ProcessTreeJVMVMemory extends ExecutorMetricType { | ||
| override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = { | ||
| ExecutorMetricType.pTreeInfo.allMetrics.jvmVmemTotal | ||
| } | ||
| } | ||
|
|
||
| case object ProcessTreePythonRSSMemory extends ExecutorMetricType { | ||
| override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = { | ||
| ExecutorMetricType.pTreeInfo.allMetrics.pythonRSSTotal | ||
| } | ||
| } | ||
|
|
||
| case object ProcessTreePythonVMemory extends ExecutorMetricType { | ||
| override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = { | ||
| ExecutorMetricType.pTreeInfo.allMetrics.pythonVmemTotal | ||
| } | ||
| } | ||
|
|
||
| case object ProcessTreeOtherRSSMemory extends ExecutorMetricType { | ||
| override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = { | ||
| ExecutorMetricType.pTreeInfo.allMetrics.otherRSSTotal | ||
| } | ||
| } | ||
|
|
||
| case object ProcessTreeOtherVMemory extends ExecutorMetricType { | ||
| override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = { | ||
| ExecutorMetricType.pTreeInfo.allMetrics.otherVmemTotal | ||
| } | ||
| } | ||
|
|
||
| case object OnHeapExecutionMemory extends MemoryManagerExecutorMetricType( | ||
| _.onHeapExecutionMemoryUsed) | ||
|
|
||
|
|
@@ -84,6 +122,8 @@ case object MappedPoolMemory extends MBeanExecutorMetricType( | |
| "java.nio:type=BufferPool,name=mapped") | ||
|
|
||
| private[spark] object ExecutorMetricType { | ||
| final val pTreeInfo = new ProcfsBasedSystems | ||
|
||
|
|
||
| // List of all executor metric types | ||
| val values = IndexedSeq( | ||
| JVMHeapMemory, | ||
|
|
@@ -95,7 +135,13 @@ private[spark] object ExecutorMetricType { | |
| OnHeapUnifiedMemory, | ||
| OffHeapUnifiedMemory, | ||
| DirectPoolMemory, | ||
| MappedPoolMemory | ||
| MappedPoolMemory, | ||
| ProcessTreeJVMVMemory, | ||
| ProcessTreeJVMRSSMemory, | ||
| ProcessTreePythonVMemory, | ||
| ProcessTreePythonRSSMemory, | ||
| ProcessTreeOtherVMemory, | ||
| ProcessTreeOtherRSSMemory | ||
| ) | ||
|
|
||
| // Map of executor metric type to its index in values. | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.