-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24958][CORE] Add memory from procfs to executor metrics. #22612
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
3f8321a
cd16a75
94c2b04
062f5d7
245221d
c72be03
8f3c938
f2dca27
a9f924c
a11e3a2
34ad625
415f976
067b81d
18ee4ad
7f7ed2b
f3867ff
0f8f3e2
ea08c61
8f20857
6e65360
4659f4a
ef4be38
805741c
4c1f073
0a7402e
3d65b35
6eab315
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,10 +24,10 @@ import java.util.Locale | |
|
|
||
| import scala.collection.mutable | ||
| import scala.collection.mutable.ArrayBuffer | ||
| import scala.collection.mutable.Queue | ||
|
|
||
| import org.apache.spark.SparkEnv | ||
| import org.apache.spark.{SparkEnv, SparkException} | ||
| import org.apache.spark.internal.{config, Logging} | ||
| import org.apache.spark.util.Utils | ||
|
|
||
| private[spark] case class ProcfsBasedSystemsMetrics( | ||
| jvmVmemTotal: Long, | ||
|
|
@@ -41,6 +41,7 @@ private[spark] case class ProcfsBasedSystemsMetrics( | |
| // project. | ||
| private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { | ||
| val procfsStatFile = "stat" | ||
| val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") | ||
| var pageSize = computePageSize() | ||
| var isAvailable: Boolean = isProcfsAvailable | ||
| private val pid = computePid() | ||
|
||
|
|
@@ -57,7 +58,6 @@ private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extend | |
| computeProcessTree() | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| private def isProcfsAvailable: Boolean = { | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") | ||
| if (testing) { | ||
| return true | ||
| } | ||
|
|
@@ -77,40 +77,37 @@ private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extend | |
| } | ||
|
|
||
| private def computePid(): Int = { | ||
| if (!isAvailable) { | ||
| if (!isAvailable || testing) { | ||
| return -1; | ||
| } | ||
| try { | ||
| // This can be simplified in java9: | ||
| // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html | ||
| val cmd = Array("bash", "-c", "echo $PPID") | ||
| val length = 10 | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| val out = Array.fill[Byte](length)(0) | ||
| Runtime.getRuntime.exec(cmd).getInputStream.read(out) | ||
| val pid = Integer.parseInt(new String(out, "UTF-8").trim) | ||
| val out2 = Utils.executeAndGetOutput(cmd) | ||
|
||
| val pid = Integer.parseInt(out2.split("\n")(0)) | ||
| return pid; | ||
| } | ||
| catch { | ||
| case e: IOException => logDebug("IO Exception when trying to compute process tree." + | ||
| case e: SparkException => logDebug("IO Exception when trying to compute process tree." + | ||
|
||
| " As a result reporting of ProcessTree metrics is stopped", e) | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| isAvailable = false | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return -1 | ||
| } | ||
| } | ||
|
|
||
| private def computePageSize(): Long = { | ||
| val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") | ||
| if (testing) { | ||
| return 0; | ||
| } | ||
| val cmd = Array("getconf", "PAGESIZE") | ||
| val out = Array.fill[Byte](10)(0) | ||
| Runtime.getRuntime.exec(cmd).getInputStream.read(out) | ||
| return Integer.parseInt(new String(out, "UTF-8").trim) | ||
| val out2 = Utils.executeAndGetOutput(cmd) | ||
| return Integer.parseInt(out2.split("\n")(0)) | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| private def computeProcessTree(): Unit = { | ||
| if (!isAvailable) { | ||
| if (!isAvailable || testing) { | ||
| return | ||
| } | ||
| val queue = mutable.Queue.empty[Int] | ||
|
|
@@ -131,18 +128,26 @@ private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extend | |
| private def getChildPids(pid: Int): ArrayBuffer[Int] = { | ||
| try { | ||
| val cmd = Array("pgrep", "-P", pid.toString) | ||
| val input = Runtime.getRuntime.exec(cmd).getInputStream | ||
| val childPidsInByte = mutable.ArrayBuffer.empty[Byte] | ||
| var d = input.read() | ||
| while (d != -1) { | ||
| childPidsInByte.append(d.asInstanceOf[Byte]) | ||
| d = input.read() | ||
| val builder = new ProcessBuilder("pgrep", "-P", pid.toString) | ||
| val process = builder.start() | ||
| val output = new StringBuilder() | ||
| val threadName = "read stdout for " + "pgrep" | ||
| def appendToOutput(s: String): Unit = output.append(s).append("\n") | ||
| val stdoutThread = Utils.processStreamByLine(threadName, | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| process.getInputStream, appendToOutput) | ||
| val exitCode = process.waitFor() | ||
| stdoutThread.join() | ||
| // pgrep will have exit code of 1 if there are more than one child process | ||
| // and it will have a exit code of 2 if there is no child process | ||
| if (exitCode != 0 && exitCode > 2) { | ||
| logError(s"Process $cmd exited with code $exitCode: $output") | ||
| throw new SparkException(s"Process $cmd exited with code $exitCode") | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| input.close() | ||
| val childPids = new String(childPidsInByte.toArray, "UTF-8").split("\n") | ||
| val childPids = output.toString.split("\n") | ||
| val childPidsInInt = mutable.ArrayBuffer.empty[Int] | ||
| for (p <- childPids) { | ||
| if (p != "") { | ||
| logInfo("Found a child pid: " + p) | ||
| childPidsInInt += Integer.parseInt(p) | ||
| } | ||
| } | ||
|
|
@@ -155,7 +160,7 @@ private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extend | |
| } | ||
| } | ||
|
|
||
| def getProcessInfo(pid: Int): Unit = { | ||
| def computeProcessInfo(pid: Int): Unit = { | ||
| /* | ||
| * Hadoop ProcfsBasedProcessTree class used regex and pattern matching to retrive the memory | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| * info. I tried that but found it not correct during tests, so I used normal string analysis | ||
|
|
@@ -188,7 +193,7 @@ private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extend | |
| latestOtherRSSTotal += rssPages } | ||
| } | ||
| } catch { | ||
| case f: FileNotFoundException => log.debug("There was a problem with reading" + | ||
| case f: FileNotFoundException => logDebug("There was a problem with reading" + | ||
| " the stat file of the process", f) | ||
| } | ||
| } | ||
|
|
@@ -210,7 +215,7 @@ private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extend | |
| latestOtherRSSTotal = 0 | ||
| latestOtherVmemTotal = 0 | ||
| for (p <- pids) { | ||
| getProcessInfo(p) | ||
| computeProcessInfo(p) | ||
|
||
| } | ||
| ProcfsBasedSystemsMetrics( | ||
| getJVMVirtualMemInfo, | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.