-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24958][CORE] Add memory from procfs to executor metrics. #22612
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
3f8321a
cd16a75
94c2b04
062f5d7
245221d
c72be03
8f3c938
f2dca27
a9f924c
a11e3a2
34ad625
415f976
067b81d
18ee4ad
7f7ed2b
f3867ff
0f8f3e2
ea08c61
8f20857
6e65360
4659f4a
ef4be38
805741c
4c1f073
0a7402e
3d65b35
6eab315
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,21 +39,20 @@ private[spark] case class ProcfsBasedSystemsMetrics( | |
|
|
||
| // Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop | ||
| // project. | ||
| private[spark] class ProcfsBasedSystems(procfsDir: String = "/proc/") extends Logging { | ||
| private[spark] class ProcfsBasedSystems(val procfsDir: String = "/proc/") extends Logging { | ||
| val procfsStatFile = "stat" | ||
| var pageSize = computePageSize() | ||
| var isAvailable: Boolean = isProcfsAvailable | ||
| private val pid: Int = computePid() | ||
| private val ptree: scala.collection.mutable.Map[ Int, Set[Int]] = | ||
| scala.collection.mutable.Map[ Int, Set[Int]]() | ||
| private val pid = computePid() | ||
|
||
| private val ptree = mutable.Map[ Int, Set[Int]]() | ||
|
|
||
| var allMetrics: ProcfsBasedSystemsMetrics = ProcfsBasedSystemsMetrics(0, 0, 0, 0, 0, 0) | ||
| private var latestJVMVmemTotal: Long = 0 | ||
| private var latestJVMRSSTotal: Long = 0 | ||
| private var latestPythonVmemTotal: Long = 0 | ||
| private var latestPythonRSSTotal: Long = 0 | ||
| private var latestOtherVmemTotal: Long = 0 | ||
| private var latestOtherRSSTotal: Long = 0 | ||
| private var latestJVMVmemTotal = 0L | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| private var latestJVMRSSTotal = 0L | ||
| private var latestPythonVmemTotal = 0L | ||
| private var latestPythonRSSTotal = 0L | ||
| private var latestOtherVmemTotal = 0L | ||
| private var latestOtherRSSTotal = 0L | ||
|
|
||
| computeProcessTree() | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
|
|
@@ -86,7 +85,7 @@ private[spark] class ProcfsBasedSystems(procfsDir: String = "/proc/") extends Lo | |
| // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html | ||
| val cmd = Array("bash", "-c", "echo $PPID") | ||
| val length = 10 | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| val out: Array[Byte] = Array.fill[Byte](length)(0) | ||
| val out = Array.fill[Byte](length)(0) | ||
| Runtime.getRuntime.exec(cmd).getInputStream.read(out) | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| val pid = Integer.parseInt(new String(out, "UTF-8").trim) | ||
| return pid; | ||
|
|
@@ -105,7 +104,7 @@ private[spark] class ProcfsBasedSystems(procfsDir: String = "/proc/") extends Lo | |
| return 0; | ||
| } | ||
| val cmd = Array("getconf", "PAGESIZE") | ||
| val out: Array[Byte] = Array.fill[Byte](10)(0) | ||
| val out = Array.fill[Byte](10)(0) | ||
| Runtime.getRuntime.exec(cmd).getInputStream.read(out) | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return Integer.parseInt(new String(out, "UTF-8").trim) | ||
| } | ||
|
|
@@ -114,7 +113,7 @@ private[spark] class ProcfsBasedSystems(procfsDir: String = "/proc/") extends Lo | |
| if (!isAvailable) { | ||
| return | ||
| } | ||
| val queue: Queue[Int] = new Queue[Int]() | ||
| val queue = mutable.Queue.empty[Int] | ||
| queue += pid | ||
| while( !queue.isEmpty ) { | ||
| val p = queue.dequeue() | ||
|
|
@@ -133,15 +132,15 @@ private[spark] class ProcfsBasedSystems(procfsDir: String = "/proc/") extends Lo | |
| try { | ||
| val cmd = Array("pgrep", "-P", pid.toString) | ||
| val input = Runtime.getRuntime.exec(cmd).getInputStream | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| val childPidsInByte: mutable.ArrayBuffer[Byte] = new mutable.ArrayBuffer() | ||
| val childPidsInByte = mutable.ArrayBuffer.empty[Byte] | ||
| var d = input.read() | ||
| while (d != -1) { | ||
| childPidsInByte.append(d.asInstanceOf[Byte]) | ||
| d = input.read() | ||
| } | ||
| input.close() | ||
| val childPids = new String(childPidsInByte.toArray, "UTF-8").split("\n") | ||
| val childPidsInInt: ArrayBuffer[Int] = new ArrayBuffer[Int]() | ||
| val childPidsInInt = mutable.ArrayBuffer.empty[Int] | ||
| for (p <- childPids) { | ||
| if (p != "") { | ||
| childPidsInInt += Integer.parseInt(p) | ||
|
|
@@ -152,7 +151,7 @@ private[spark] class ProcfsBasedSystems(procfsDir: String = "/proc/") extends Lo | |
| case e: IOException => logDebug("IO Exception when trying to compute process tree." + | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| " As a result reporting of ProcessTree metrics is stopped", e) | ||
| isAvailable = false | ||
| return new mutable.ArrayBuffer() | ||
| return mutable.ArrayBuffer.empty[Int] | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -164,11 +163,11 @@ private[spark] class ProcfsBasedSystems(procfsDir: String = "/proc/") extends Lo | |
| * http://man7.org/linux/man-pages/man5/proc.5.html | ||
| */ | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| try { | ||
| val pidDir: File = new File(procfsDir, pid.toString) | ||
| val pidDir = new File(procfsDir, pid.toString) | ||
| val fReader = new InputStreamReader( | ||
rezasafi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| new FileInputStream( | ||
| new File(pidDir, procfsStatFile)), Charset.forName("UTF-8")) | ||
| val in: BufferedReader = new BufferedReader(fReader) | ||
| val in = new BufferedReader(fReader) | ||
| val procInfo = in.readLine | ||
| in.close | ||
| fReader.close | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.