Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
3f8321a
Integration of ProcessTreeMetrics with PR 21221
Jul 26, 2018
cd16a75
Changing the position of ptree and also make the computation configur…
Aug 7, 2018
94c2b04
Seperate metrics for jvm, python and others and update the tests
Aug 8, 2018
062f5d7
Update JsonProtocolSuite
Sep 25, 2018
245221d
[SPARK-24958] Add executors' process tree total memory information to…
Oct 2, 2018
c72be03
Adressing most of Imran's comments
Oct 3, 2018
8f3c938
Fixing the scala style and some minor comments
Oct 3, 2018
f2dca27
Removing types from the definitions where ever possible
Oct 4, 2018
a9f924c
Using Utils methods when possible or use ProcessBuilder
Oct 5, 2018
a11e3a2
make use of Utils.trywithresources
Oct 5, 2018
34ad625
Changing ExecutorMericType and ExecutorMetrics to use a map instead o…
Oct 9, 2018
415f976
Changing ExecutorMetric to use array instead of a map
Oct 10, 2018
067b81d
A small cosmetic change
Oct 10, 2018
18ee4ad
Merge branch 'master' of https://github.com/apache/spark into ptreeme…
Oct 17, 2018
7f7ed2b
Applying latest review commments. Using Arrays instead of Map for ret…
Oct 23, 2018
f3867ff
Merge branch 'master' of https://github.com/apache/spark into ptreeme…
Nov 5, 2018
0f8f3e2
Fix an issue with jsonProtoclSuite
Nov 5, 2018
ea08c61
Fix scalastyle issue
Nov 5, 2018
8f20857
Applying latest review comments
Nov 14, 2018
6e65360
Using the companion object and other stuff
Nov 27, 2018
4659f4a
Update the use of process builder and applying other review comments
Nov 28, 2018
ef4be38
Small style fixes based on reviews
Nov 30, 2018
805741c
Applying review comments, mostly style related
Nov 30, 2018
4c1f073
emove the unnecessary trywithresources
Nov 30, 2018
0a7402e
Applying the comment about error handling and some more style fixes
Dec 4, 2018
3d65b35
Removing a return
Dec 6, 2018
6eab315
Reordering of info in a test resource file to avoid confusion
Dec 6, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Seperate metrics for jvm, python and others and update the tests
(cherry picked from commit 3671427)
  • Loading branch information
Reza Safi committed Sep 25, 2018
commit 94c2b048603bd4e153473c591fa91fb72a7557d3
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ private[spark] trait ProcessTreeMetrics {
def computePid(): Int
def createProcessTree()
def updateProcessTree()
def getRSSInfo(): Long
def getVirtualMemInfo(): Long
def getJVMRSSInfo(): Long
def getJVMVirtualMemInfo(): Long
def getPythonRSSInfo(): Long
def getPythonVirtualMemInfo(): Long
def getOtherRSSInfo(): Long
def getOtherVirtualMemInfo(): Long
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@ class ProcfsBasedSystems extends ProcessTreeMetrics with Logging {
val ptree: scala.collection.mutable.Map[ Int, Set[Int]] =
scala.collection.mutable.Map[ Int, Set[Int]]()
val PROCFS_STAT_FILE = "stat"
var latestVmemTotal: Long = 0
var latestRSSTotal: Long = 0
var latestJVMVmemTotal: Long = 0
var latestJVMRSSTotal: Long = 0
var latestPythonVmemTotal: Long = 0
var latestPythonRSSTotal: Long = 0
var latestOtherVmemTotal: Long = 0
var latestOtherRSSTotal: Long = 0

createProcessTree

Expand Down Expand Up @@ -155,37 +159,82 @@ class ProcfsBasedSystems extends ProcessTreeMetrics with Logging {
fReader.close
val procInfoSplit = procInfo.split(" ")
if ( procInfoSplit != null ) {
latestVmemTotal += procInfoSplit(22).toLong
latestRSSTotal += procInfoSplit(23).toLong
if (procInfoSplit(1).toLowerCase.contains("java")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the checks for java & python seem error prone (maybe you have some external program called "pass-data-from-java-to-fizzbuzz-cpp") ... but I'm also not sure what else you could do here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I'm checking for the child processes of a Spark process. And there you will see either java or pythonX.Y. I don't think that Spark will create a process with custom name

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might not be as uncommon to have arbitrary forked processes from Spark executors as we think. Recent discussions from Spark Summit 2018 to my knowledge propose sending off work to be done by specialized hardware or specialized software. I'm not an expert in these use cases so someone can correct this observation if they like. Mostly just flagging that for completeness.

latestJVMVmemTotal += procInfoSplit(22).toLong
latestJVMRSSTotal += procInfoSplit(23).toLong
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could avoid a bit of the duplicated code if you first did

vmem = procInfoSplit(22).toLong
rssPages = procInfoSplit(23).toLong)
if (procInfoSplit(1) ...

}
else if (procInfoSplit(1).toLowerCase.contains("python")) {
latestPythonVmemTotal += procInfoSplit(22).toLong
latestPythonRSSTotal += procInfoSplit(23).toLong
}
else {
latestOtherVmemTotal += procInfoSplit(22).toLong
latestOtherRSSTotal += procInfoSplit(23).toLong }
}
} catch {
case f: FileNotFoundException => return null
}
}


def getRSSInfo(): Long = {
def getOtherRSSInfo(): Long = {
if (!isAvailable) {
return -1
}
updateProcessTree
val pids = ptree.keySet
latestRSSTotal = 0
latestVmemTotal = 0
latestJVMRSSTotal = 0
latestJVMVmemTotal = 0
latestPythonRSSTotal = 0
latestPythonVmemTotal = 0
latestOtherRSSTotal = 0
latestOtherVmemTotal = 0
for (p <- pids) {
getProcessInfo(p)
}
latestRSSTotal
latestOtherRSSTotal
}


def getVirtualMemInfo(): Long = {
def getOtherVirtualMemInfo(): Long = {
if (!isAvailable) {
return -1
}
// We won't call updateProcessTree and also compute total virtual memory here
// since we already did all of this when we computed RSS info
latestVmemTotal
latestOtherVmemTotal
}


def getJVMRSSInfo(): Long = {
if (!isAvailable) {
return -1
}
latestJVMRSSTotal
}


def getJVMVirtualMemInfo(): Long = {
if (!isAvailable) {
return -1
}
latestJVMVmemTotal
}


def getPythonRSSInfo(): Long = {
if (!isAvailable) {
return -1
}
latestPythonRSSTotal
}


def getPythonVirtualMemInfo(): Long = {
if (!isAvailable) {
return -1
}
latestPythonVmemTotal
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,39 @@ case object JVMOffHeapMemory extends ExecutorMetricType {
}
}

case object ProcessTreeRSSMemory extends ExecutorMetricType {
case object ProcessTreeJVMRSSMemory extends ExecutorMetricType {
override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = {
ExecutorMetricType.pTreeInfo.getRSSInfo()
ExecutorMetricType.pTreeInfo.getJVMRSSInfo()
}
}

case object ProcessTreeVMemory extends ExecutorMetricType {
case object ProcessTreeJVMVMemory extends ExecutorMetricType {
override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = {
ExecutorMetricType.pTreeInfo.getVirtualMemInfo()
ExecutorMetricType.pTreeInfo.getJVMVirtualMemInfo()
}
}

case object ProcessTreePythonRSSMemory extends ExecutorMetricType {
override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = {
ExecutorMetricType.pTreeInfo.getPythonRSSInfo()
}
}

case object ProcessTreePythonVMemory extends ExecutorMetricType {
override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = {
ExecutorMetricType.pTreeInfo.getPythonVirtualMemInfo()
}
}

case object ProcessTreeOtherRSSMemory extends ExecutorMetricType {
override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = {
ExecutorMetricType.pTreeInfo.getOtherRSSInfo()
}
}

case object ProcessTreeOtherVMemory extends ExecutorMetricType {
override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = {
ExecutorMetricType.pTreeInfo.getOtherVirtualMemInfo()
}
}

Expand Down Expand Up @@ -103,16 +127,20 @@ private[spark] object ExecutorMetricType {
val values = IndexedSeq(
JVMHeapMemory,
JVMOffHeapMemory,
ProcessTreeRSSMemory,
ProcessTreeVMemory,
OnHeapExecutionMemory,
OffHeapExecutionMemory,
OnHeapStorageMemory,
OffHeapStorageMemory,
OnHeapUnifiedMemory,
OffHeapUnifiedMemory,
DirectPoolMemory,
MappedPoolMemory
MappedPoolMemory,
ProcessTreeJVMVMemory,
ProcessTreeJVMRSSMemory,
ProcessTreePythonVMemory,
ProcessTreePythonRSSMemory,
ProcessTreeOtherVMemory,
ProcessTreeOtherRSSMemory
)

// Map of executor metric type to its index in values.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,20 @@
"peakMemoryMetrics" : {
"OnHeapStorageMemory" : 905801,
"JVMOffHeapMemory" : 205304696,
"ProcessTreeRSSMemory": 200000000,
"ProcessTreeVMemory": 500000000,
"OffHeapExecutionMemory" : 0,
"OnHeapUnifiedMemory" : 905801,
"OnHeapExecutionMemory" : 0,
"OffHeapUnifiedMemory" : 0,
"DirectPoolMemory" : 397602,
"MappedPoolMemory" : 0,
"JVMHeapMemory" : 629553808,
"OffHeapStorageMemory" : 0
"OffHeapStorageMemory" : 0,
"ProcessTreeJVMVMemory": 500000000,
"ProcessTreeJVMRSSMemory": 200000000,
"ProcessTreePythonVMemory": 80000000,
"ProcessTreePythonRSSMemory": 20000000,
"ProcessTreeOtherVMemory": 80000000,
"ProcessTreeOtherRSSMemory": 20000000
}
}, {
"id" : "7",
Expand Down Expand Up @@ -172,16 +176,20 @@
"peakMemoryMetrics" : {
"OnHeapStorageMemory" : 63104457,
"JVMOffHeapMemory" : 95657456,
"ProcessTreeRSSMemory": 100000000,
"ProcessTreeVMemory": 500000000,
"OffHeapExecutionMemory" : 0,
"OnHeapUnifiedMemory" : 100853193,
"OnHeapExecutionMemory" : 37748736,
"OffHeapUnifiedMemory" : 0,
"DirectPoolMemory" : 126261,
"MappedPoolMemory" : 0,
"JVMHeapMemory" : 518613056,
"OffHeapStorageMemory" : 0
"OffHeapStorageMemory" : 0,
"ProcessTreeJVMVMemory": 500000000,
"ProcessTreeJVMRSSMemory": 100000000,
"ProcessTreePythonVMemory": 80000000,
"ProcessTreePythonRSSMemory": 20000000,
"ProcessTreeOtherVMemory": 80000000,
"ProcessTreeOtherRSSMemory": 20000000
}
}, {
"id" : "3",
Expand Down Expand Up @@ -218,16 +226,20 @@
"peakMemoryMetrics" : {
"OnHeapStorageMemory" : 69535048,
"JVMOffHeapMemory" : 90709624,
"ProcessTreeRSSMemory": 200000000,
"ProcessTreeVMemory": 600000000,
"OffHeapExecutionMemory" : 0,
"OnHeapUnifiedMemory" : 69535048,
"OnHeapExecutionMemory" : 0,
"OffHeapUnifiedMemory" : 0,
"DirectPoolMemory" : 87796,
"MappedPoolMemory" : 0,
"JVMHeapMemory" : 726805712,
"OffHeapStorageMemory" : 0
"OffHeapStorageMemory" : 0,
"ProcessTreeJVMVMemory": 600000000,
"ProcessTreeJVMRSSMemory": 200000000,
"ProcessTreePythonVMemory": 200000000,
"ProcessTreePythonRSSMemory": 70000000,
"ProcessTreeOtherVMemory": 200000000,
"ProcessTreeOtherRSSMemory": 70000000
}
}, {
"id" : "2",
Expand Down Expand Up @@ -264,16 +276,20 @@
"peakMemoryMetrics" : {
"OnHeapStorageMemory" : 58468944,
"JVMOffHeapMemory" : 91208368,
"ProcessTreeRSSMemory": 90000000,
"ProcessTreeVMemory": 400000000,
"OffHeapExecutionMemory" : 0,
"OnHeapUnifiedMemory" : 58468944,
"OnHeapExecutionMemory" : 0,
"OffHeapUnifiedMemory" : 0,
"DirectPoolMemory" : 87796,
"MappedPoolMemory" : 0,
"JVMHeapMemory" : 595946552,
"OffHeapStorageMemory" : 0
"OffHeapStorageMemory" : 0,
"ProcessTreeJVMVMemory": 400000000,
"ProcessTreeJVMRSSMemory": 90000000,
"ProcessTreePythonVMemory": 100000000,
"ProcessTreePythonRSSMemory": 8000000,
"ProcessTreeOtherVMemory": 100000000,
"ProcessTreeOtherRSSMemory": 8000000
}
}, {
"id" : "1",
Expand Down Expand Up @@ -310,15 +326,19 @@
"peakMemoryMetrics" : {
"OnHeapStorageMemory" : 47962185,
"JVMOffHeapMemory" : 100519936,
"ProcessTreeRSSMemory": 100000000,
"ProcessTreeVMemory": 500000000,
"OffHeapExecutionMemory" : 0,
"OnHeapUnifiedMemory" : 47962185,
"OnHeapExecutionMemory" : 0,
"OffHeapUnifiedMemory" : 0,
"DirectPoolMemory" : 98230,
"MappedPoolMemory" : 0,
"JVMHeapMemory" : 755008624,
"OffHeapStorageMemory" : 0
"OffHeapStorageMemory" : 0,
"ProcessTreeJVMVMemory": 500000000,
"ProcessTreeJVMRSSMemory": 100000000,
"ProcessTreePythonVMemory": 400000000,
"ProcessTreePythonRSSMemory": 40000000,
"ProcessTreeOtherVMemory": 400000000,
"ProcessTreeOtherRSSMemory": 40000000
}
} ]
Loading