Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
39ba441
spark-9104 first draft version
liyezhang556520 Aug 17, 2015
ecc1044
Merge remote-tracking branch 'apache/master' into netMem-9104
liyezhang556520 Aug 17, 2015
2101538
show N/A for nio
liyezhang556520 Aug 17, 2015
9ccaf88
handle executor add and remove event for memotyTab
liyezhang556520 Aug 18, 2015
13c17fb
show removed executors info on page
liyezhang556520 Aug 19, 2015
c9b44b1
add stage memory trace
liyezhang556520 Aug 19, 2015
984feaf
add history support for heartbeat event
liyezhang556520 Aug 20, 2015
2501c82
limit history event log frequency
liyezhang556520 Aug 20, 2015
e0ae855
add some comments for EventLoggingListener
liyezhang556520 Aug 20, 2015
7491279
Merge remote-tracking branch 'apache/master' into netMem-9104
liyezhang556520 Aug 20, 2015
424c172
scala style fix
liyezhang556520 Aug 20, 2015
f21a804
remove executor port and fix test failure
liyezhang556520 Aug 21, 2015
2f3d30b
merge spache/master after master updated
liyezhang556520 Sep 25, 2015
7b846a2
work with JavaConverters
liyezhang556520 Oct 9, 2015
41874aa
Merge remote-tracking branch 'apache/master' into netMem-9104
liyezhang556520 Oct 9, 2015
0531d0f
Merge remote-tracking branch 'apache/master' into netMem-9104
liyezhang556520 Oct 29, 2015
27b7da1
refine the code according to Imran's comments and the design doc
liyezhang556520 Nov 2, 2015
a8fcf74
Merge remote-tracking branch 'apache/master' into netMem-9104
liyezhang556520 Nov 2, 2015
f2f0e64
fix scala style test
liyezhang556520 Nov 3, 2015
5f7a999
capitalize class name
liyezhang556520 Nov 3, 2015
5ad7a6a
change task metrics json format back to origin
liyezhang556520 Nov 3, 2015
c836fb9
Merge remote-tracking branch 'apache/master' into netMem-9104
liyezhang556520 Nov 3, 2015
b5aa4da
Merge remote-tracking branch 'apache/master' into netMem-9104
liyezhang556520 Nov 5, 2015
e8e2bdd
Merge remote-tracking branch 'apache/master' into netMem-9104
liyezhang556520 Nov 6, 2015
1dffa29
accroding to Imran's comment, refine the code
liyezhang556520 Nov 17, 2015
75e63c3
add first test case
liyezhang556520 Nov 17, 2015
0c1241c
fix scala style
liyezhang556520 Nov 17, 2015
c78628e
add more test cases, with eventloging test left
liyezhang556520 Nov 19, 2015
a93bd96
scala style fix
liyezhang556520 Nov 19, 2015
89214f3
fix test fail and add event logging unit test
liyezhang556520 Nov 23, 2015
1ed48c1
scala syle
liyezhang556520 Nov 23, 2015
cb307aa
merge to apache/master branch, fix merge conflict
liyezhang556520 Nov 24, 2015
b438077
roll back useless change
liyezhang556520 Nov 24, 2015
4123ac7
modify the code according to Imran's comments, mainly with unit test
liyezhang556520 Dec 8, 2015
2ce9fd9
fix scala style
liyezhang556520 Dec 8, 2015
17d094e
merge to master branch with tests update
liyezhang556520 Dec 8, 2015
4b3dbe4
change port to option and some bug fixes
liyezhang556520 Dec 9, 2015
0ea7cab
address comments of code refinement
liyezhang556520 Jan 12, 2016
5e031ce
merge to latest master branch from spark-9104-draft
liyezhang556520 Jan 12, 2016
87f8172
fix import ordering error
liyezhang556520 Jan 12, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
show removed executors info on page
  • Loading branch information
liyezhang556520 committed Aug 19, 2015
commit 13c17fb9a110fcecac7411281b592a01a7b5d1cd
106 changes: 77 additions & 29 deletions core/src/main/scala/org/apache/spark/ui/memory/MemoryPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.ui.memory

import javax.servlet.http.HttpServletRequest

import scala.xml.Node
import scala.xml.{Node, NodeSeq}

import org.apache.spark.ui.{UIUtils, WebUIPage}
import org.apache.spark.util.Utils
Expand All @@ -29,41 +29,89 @@ private[ui] class MemoryPage(parent: MemoryTab) extends WebUIPage("") {

def render(request: HttpServletRequest): Seq[Node] = {

val executorIdToMem = listener.executorIdToMem
val memInfoSorted = executorIdToMem.toSeq.sortBy(_._1)

val memTable =
<table class={UIUtils.TABLE_CLASS_STRIPED}>
<thead>
<th>Executor ID</th>
<th>Address</th>
<th>Net Memory (on-heap)</th>
<th>Net Memory (direct-heap)</th>
<th>Peak Net Memory (on-heap) / Happen Time</th>
<th>Peak Net Read (direct-heap) / Happen Time</th>
</thead>
<tbody>
{memInfoSorted.map(showRow(_))}
</tbody>
</table>

val content =
<div class = "row">
<div class="span12">
{memTable}
</div>
</div>;
val activeExecutorIdToMem = listener.activeExecutorIdToMem
val removedExecutorIdToMem = listener.removedExecutorIdToMem
val activeMemInfoSorted = activeExecutorIdToMem.toSeq.sortBy(_._1)
val removedMemInfoSorted = removedExecutorIdToMem.toSeq.sortBy(_._1)
val shouldShowActiveExecutors = activeExecutorIdToMem.nonEmpty
val shouldShowRemovedExecutors = removedExecutorIdToMem.nonEmpty

val activeExecMemTable = new MemTableBase(activeMemInfoSorted, listener)
val removedExecMemTable = new MemTableBase(removedMemInfoSorted, listener)

val summary: NodeSeq =
<div>
<ul class="unstyled">
{
if (shouldShowActiveExecutors) {
<li>
<a href="#activeExec"><strong>Active Executors:</strong></a>
{activeExecutorIdToMem.size}
</li>
}
}
{
if (shouldShowRemovedExecutors) {
<li>
<a href="#removedExec"><strong>Active Executors:</strong></a>
{removedExecutorIdToMem.size}
</li>
}
}
</ul>
</div>

var content = summary
if (shouldShowActiveExecutors) {
content ++= <h4 id="activeExec">Active Executors ({activeExecutorIdToMem.size})</h4> ++
activeExecMemTable.toNodeSeq
}
if (shouldShowRemovedExecutors) {
content ++= <h4 id="activeExec">Active Executors ({removedMemInfoSorted.size})</h4> ++
removedExecMemTable.toNodeSeq
}

UIUtils.headerSparkPage("Memory Usage", content, parent)
}


}

private[ui] class MemTableBase(
memInfos: Seq[(String, MemoryUIInfo)],
listener: MemoryListener) {

protected def columns: Seq[Node] = {
<th>Executor ID</th>
<th>Address</th>
<th>Net Memory (on-heap)</th>
<th>Net Memory (direct-heap)</th>
<th>Peak Net Memory (on-heap) / Happen Time</th>
<th>Peak Net Read (direct-heap) / Happen Time</th>
}

def toNodeSeq: Seq[Node] = {
listener.synchronized {
memTable(showRow, memInfos)
}
}

protected def memTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
<table class={UIUtils.TABLE_CLASS_STRIPED}>
<thead>{columns}</thead>
<tbody>
{rows.map(r => makeRow(r))}
</tbody>
</table>
}

/** Render an HTML row representing an executor */
private def showRow(info: (String, MemoryUIInfo)): Seq[Node] = {
<tr>
<td>{info._1}</td>
<td>{info._2.executorAddress}</td>
{if (info._2.transportInfo.isDefined) {
<td>{Utils.bytesToString(info._2.transportInfo.get.onheapSize)}</td>
<td>{Utils.bytesToString(info._2.transportInfo.get.onheapSize)}</td>
<td>{Utils.bytesToString(info._2.transportInfo.get.directheapSize)}</td>
<td>
{Utils.bytesToString(info._2.transportInfo.get.peakOnheapSizeTime.memorySize)} /
Expand All @@ -73,12 +121,12 @@ private[ui] class MemoryPage(parent: MemoryTab) extends WebUIPage("") {
{Utils.bytesToString(info._2.transportInfo.get.peakDirectheapSizeTime.memorySize)} /
{UIUtils.formatDate(info._2.transportInfo.get.peakDirectheapSizeTime.timeStamp)}
</td>
} else {
<td>N/A</td>
} else {
<td>N/A</td>
<td>N/A</td>
<td>N/A</td>
<td>N/A</td>
}}
}}
</tr>
}
}
13 changes: 8 additions & 5 deletions core/src/main/scala/org/apache/spark/ui/memory/MemoryTab.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,28 +37,31 @@ private[ui] class MemoryTab(parent: SparkUI) extends SparkUITab(parent, "memory"
@DeveloperApi
class MemoryListener extends SparkListener {
type ExecutorId = String
val executorIdToMem = new HashMap[ExecutorId, MemoryUIInfo]
val activeExecutorIdToMem = new HashMap[ExecutorId, MemoryUIInfo]
val removedExecutorIdToMem = new HashMap[ExecutorId, MemoryUIInfo]

override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = {
val executorId = event.execId
val executorMetrics = event.executorMetrics
val memoryInfo = executorIdToMem.getOrElseUpdate(executorId, new MemoryUIInfo)
val memoryInfo = activeExecutorIdToMem.getOrElseUpdate(executorId, new MemoryUIInfo)
memoryInfo.updateExecutorMetrics(executorMetrics)
}

override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = {
val executorId = event.executorId
executorIdToMem.put(executorId, new MemoryUIInfo(event.executorInfo))
activeExecutorIdToMem.put(executorId, new MemoryUIInfo(event.executorInfo))
}

override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = {
val executorId = event.executorId
executorIdToMem.remove(executorId)
val info = activeExecutorIdToMem.remove(executorId)
removedExecutorIdToMem.getOrElseUpdate(executorId, info.getOrElse(new MemoryUIInfo))
}

override def onBlockManagerRemoved(event: SparkListenerBlockManagerRemoved): Unit = {
val executorId = event.blockManagerId.executorId
executorIdToMem.remove(executorId)
val info = activeExecutorIdToMem.remove(executorId)
removedExecutorIdToMem.getOrElseUpdate(executorId, info.getOrElse(new MemoryUIInfo))
}
}

Expand Down