Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
39ba441
spark-9104 first draft version
liyezhang556520 Aug 17, 2015
ecc1044
Merge remote-tracking branch 'apache/master' into netMem-9104
liyezhang556520 Aug 17, 2015
2101538
show N/A for nio
liyezhang556520 Aug 17, 2015
9ccaf88
handle executor add and remove event for memotyTab
liyezhang556520 Aug 18, 2015
13c17fb
show removed executors info on page
liyezhang556520 Aug 19, 2015
c9b44b1
add stage memory trace
liyezhang556520 Aug 19, 2015
984feaf
add history support for heartbeat event
liyezhang556520 Aug 20, 2015
2501c82
limit history event log frequency
liyezhang556520 Aug 20, 2015
e0ae855
add some comments for EventLoggingListener
liyezhang556520 Aug 20, 2015
7491279
Merge remote-tracking branch 'apache/master' into netMem-9104
liyezhang556520 Aug 20, 2015
424c172
scala style fix
liyezhang556520 Aug 20, 2015
f21a804
remove executor port and fix test failure
liyezhang556520 Aug 21, 2015
2f3d30b
merge spache/master after master updated
liyezhang556520 Sep 25, 2015
7b846a2
work with JavaConverters
liyezhang556520 Oct 9, 2015
41874aa
Merge remote-tracking branch 'apache/master' into netMem-9104
liyezhang556520 Oct 9, 2015
0531d0f
Merge remote-tracking branch 'apache/master' into netMem-9104
liyezhang556520 Oct 29, 2015
27b7da1
refine the code according to Imran's comments and the design doc
liyezhang556520 Nov 2, 2015
a8fcf74
Merge remote-tracking branch 'apache/master' into netMem-9104
liyezhang556520 Nov 2, 2015
f2f0e64
fix scala style test
liyezhang556520 Nov 3, 2015
5f7a999
capitalize class name
liyezhang556520 Nov 3, 2015
5ad7a6a
change task metrics json format back to origin
liyezhang556520 Nov 3, 2015
c836fb9
Merge remote-tracking branch 'apache/master' into netMem-9104
liyezhang556520 Nov 3, 2015
b5aa4da
Merge remote-tracking branch 'apache/master' into netMem-9104
liyezhang556520 Nov 5, 2015
e8e2bdd
Merge remote-tracking branch 'apache/master' into netMem-9104
liyezhang556520 Nov 6, 2015
1dffa29
accroding to Imran's comment, refine the code
liyezhang556520 Nov 17, 2015
75e63c3
add first test case
liyezhang556520 Nov 17, 2015
0c1241c
fix scala style
liyezhang556520 Nov 17, 2015
c78628e
add more test cases, with eventloging test left
liyezhang556520 Nov 19, 2015
a93bd96
scala style fix
liyezhang556520 Nov 19, 2015
89214f3
fix test fail and add event logging unit test
liyezhang556520 Nov 23, 2015
1ed48c1
scala syle
liyezhang556520 Nov 23, 2015
cb307aa
merge to apache/master branch, fix merge conflict
liyezhang556520 Nov 24, 2015
b438077
roll back useless change
liyezhang556520 Nov 24, 2015
4123ac7
modify the code according to Imran's comments, mainly with unit test
liyezhang556520 Dec 8, 2015
2ce9fd9
fix scala style
liyezhang556520 Dec 8, 2015
17d094e
merge to master branch with tests update
liyezhang556520 Dec 8, 2015
4b3dbe4
change port to option and some bug fixes
liyezhang556520 Dec 9, 2015
0ea7cab
address comments of code refinement
liyezhang556520 Jan 12, 2016
5e031ce
merge to latest master branch from spark-9104-draft
liyezhang556520 Jan 12, 2016
87f8172
fix import ordering error
liyezhang556520 Jan 12, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
change port to option and some bug fixes
  • Loading branch information
liyezhang556520 committed Dec 9, 2015
commit 4b3dbe4d41cb06f7ee9b48e816e147ca46a0dea3
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ private[spark] class Executor(

private val executorMetrics: ExecutorMetrics = new ExecutorMetrics
executorMetrics.setHostname(Utils.localHostName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the hostname really enough? What about with multiple executors on the same host? The Heartbeat already has the executorId, maybe that is all we need?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to use HOST:PORT, but executor port cannot get here, which should get from RpcEnv.adress.port. We can get executorId on the driver side when receiving the message, that might be enough to identify the different executors, but since we will show the removed executors on the page, so we cannot know where the executor locate by only executorId, because the Executor tab only shows the active executors. we can remove the hostname here if we support showing the removed executors on Executor tab.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I forgot about this in my last round. I don't understand the first sentence of your reply -- why can't you get the hostname & port? it looks like you can do exactly what you suggested to get the host & port from the rpc env with executorMetrics.setHostPort(env.rpcEnv.address.hostPort)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry that I forgot the reason why I didn't get the port from rpcEnv originally, but it seems we can get the port from it directly. Let me try to add it back.

executorMetrics.setPort(env.rpcEnv.address.port)
if (env.rpcEnv.address != null) {
executorMetrics.setPort(Some(env.rpcEnv.address.port))
}

// Whether to load classes in user jars before those in Spark jars
private val userClassPathFirst = conf.getBoolean("spark.executor.userClassPathFirst", false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,17 @@ class ExecutorMetrics extends Serializable {
/**
* Host's port the executor runs on
*/
private var _port: Int = _
def port: Int = _port
private[spark] def setPort(value: Int) = _port = value
private var _port: Option[Int] = None
def port: Option[Int] = _port
private[spark] def setPort(value: Option[Int]) = _port = value

private[spark] def hostPort: String = hostname + ":" + port
private[spark] def hostPort: String = {
val hp = port match {
case None => hostname
case value => hostname + ":" + value
}
hp
}

private var _transportMetrics: TransportMetrics = new TransportMetrics
def transportMetrics: TransportMetrics = _transportMetrics
Expand All @@ -61,7 +67,7 @@ class ExecutorMetrics extends Serializable {
object ExecutorMetrics extends Serializable {
def apply(
hostName: String,
port: Int,
port: Option[Int],
transportMetrics: TransportMetrics): ExecutorMetrics = {
val execMetrics = new ExecutorMetrics
execMetrics.setHostname(hostName)
Expand Down
66 changes: 48 additions & 18 deletions core/src/main/scala/org/apache/spark/ui/memory/MemoryTab.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,32 @@ private[ui] class MemoryTab(parent: SparkUI) extends SparkUITab(parent, "memory"
class MemoryListener extends SparkListener {
type ExecutorId = String
val activeExecutorIdToMem = new HashMap[ExecutorId, MemoryUIInfo]
// TODO There might be plenty of removed executors (e.g. Dynamic Allocation Mode), This may use
// too much memory.
val removedExecutorIdToMem = new HashMap[ExecutorId, MemoryUIInfo]
// latestExecIdToExecMetrics including all executors that is active and removed.
// this may consume a lot of memory when executors are changing frequently, e.g. in dynamical
// allocation mode.
// A map that maintains the latest metrics of each active executor
val latestExecIdToExecMetrics = new HashMap[ExecutorId, ExecutorMetrics]
// activeStagesToMem a map maintains all executors memory information of each stage,
// the Map type is [(stageId, attemptId), Seq[(executorId, MemoryUIInfo)]
val activeStagesToMem = new HashMap[(Int, Int), HashMap[ExecutorId, MemoryUIInfo]]
// TODO We need to get conf of the retained stages so that we don't need to handle all the
// stages since there might be too many completed stages.
val completedStagesToMem = new HashMap[(Int, Int), HashMap[ExecutorId, MemoryUIInfo]]

override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = {
val executorId = event.execId
val executorMetrics = event.executorMetrics
val memoryInfo = activeExecutorIdToMem.getOrElseUpdate(executorId, new MemoryUIInfo)
memoryInfo.updateExecutorMetrics(executorMetrics)
memoryInfo.updateMemUiInfo(executorMetrics)
activeStagesToMem.foreach { case (_, stageMemMetrics) =>
if (stageMemMetrics.contains(executorId)) {
stageMemMetrics.get(executorId).get.updateExecutorMetrics(executorMetrics)
// If executor is added in the stage running time, we also update the metrics for the
// executor in {{activeStagesToMem}}
if (!stageMemMetrics.contains(executorId)) {
stageMemMetrics(executorId) = new MemoryUIInfo
}
stageMemMetrics(executorId).updateMemUiInfo(executorMetrics)
}
latestExecIdToExecMetrics.update(executorId, executorMetrics)
latestExecIdToExecMetrics(executorId) = executorMetrics
}

override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = {
Expand All @@ -71,28 +76,39 @@ class MemoryListener extends SparkListener {
override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = {
val executorId = event.executorId
val info = activeExecutorIdToMem.remove(executorId)
latestExecIdToExecMetrics.remove(executorId)
removedExecutorIdToMem.getOrElseUpdate(executorId, info.getOrElse(new MemoryUIInfo))
}

override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
val stage = (event.stageInfo.stageId, event.stageInfo.attemptId)
val memInfoMap = new HashMap[ExecutorId, MemoryUIInfo]
activeExecutorIdToMem.foreach(idToMem => memInfoMap.update(idToMem._1, new MemoryUIInfo))
activeStagesToMem.update(stage, memInfoMap)
activeExecutorIdToMem.map { case (id, _) =>
memInfoMap(id) = new MemoryUIInfo
val latestExecMetrics = latestExecIdToExecMetrics.get(id)
latestExecMetrics match {
case None => // Do nothing
case Some(metrics) =>
memInfoMap(id).updateMemUiInfo(metrics)
}
}
activeStagesToMem(stage) = memInfoMap
}

override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
val stage = (event.stageInfo.stageId, event.stageInfo.attemptId)
// We need to refresh {{activeStagesToMem}} with {{activeExecutorIdToMem}} in case the
// executor is added in the stage running time and no {{SparkListenerExecutorMetricsUpdate}}
// event is updated in this stage.
activeStagesToMem.get(stage).map { memInfoMap =>
activeExecutorIdToMem.foreach { case (executorId, _) =>
val memInfo = memInfoMap.getOrElse(executorId, new MemoryUIInfo)
latestExecIdToExecMetrics.get(executorId).foreach { prevExecutorMetrics =>
memInfo.updateExecutorMetrics(prevExecutorMetrics)
activeExecutorIdToMem.foreach { case (executorId, memUiInfo) =>
if (!memInfoMap.contains(executorId)) {
memInfoMap(executorId) = new MemoryUIInfo
memInfoMap(executorId).copyMemUiInfo(memUiInfo)
}
memInfoMap.update(executorId, memInfo)
}
completedStagesToMem.put(stage, activeStagesToMem.remove(stage).get)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use option handling to simplify this:

activeStagesToMem.get(stage).map { memInfoMap =>
  activeExecutorIdToMem.foreach { case (executorId, _) => 
    val memInfo = memInfoMap.getOrElseUpdate(executorId, new MemoryUIInfo)
    latestExecIdToExecMetrics.get(executorId).foreach { prevExecutorMetrics =>
      memInfo.updateExecutorMetrics(prevExecutorMetrics)
    }
  }
}

completedStagesToMem.put(stage, activeStagesToMem.remove(stage).get)
}
}

Expand All @@ -106,13 +122,18 @@ class MemoryUIInfo {
executorAddress = execInfo.executorHost
}

def updateExecutorMetrics(execMetrics: ExecutorMetrics): Unit = {
def updateMemUiInfo(execMetrics: ExecutorMetrics): Unit = {
transportInfo = transportInfo match {
case Some(transportMemSize) => transportInfo
case _ => Some(new TransportMemSize)
}
executorAddress = execMetrics.hostPort
transportInfo.get.updateTransport(execMetrics.transportMetrics)
transportInfo.get.updateTransMemSize(execMetrics.transportMetrics)
}

def copyMemUiInfo(memUiInfo: MemoryUIInfo): Unit = {
executorAddress = memUiInfo.executorAddress
transportInfo.foreach(_.copyTransMemSize(memUiInfo.transportInfo.get))
}
}

Expand All @@ -123,7 +144,7 @@ class TransportMemSize {
var peakOnHeapSizeTime: MemTime = new MemTime()
var peakOffHeapSizeTime: MemTime = new MemTime()

def updateTransport(transportMetrics: TransportMetrics): Unit = {
def updateTransMemSize(transportMetrics: TransportMetrics): Unit = {
val updatedOnHeapSize = transportMetrics.onHeapSize
val updatedOffHeapSize = transportMetrics.offHeapSize
val updateTime: Long = transportMetrics.timeStamp
Expand All @@ -136,6 +157,15 @@ class TransportMemSize {
peakOffHeapSizeTime = MemTime(updatedOffHeapSize, updateTime)
}
}

def copyTransMemSize(transMemSize: TransportMemSize): Unit = {
onHeapSize = transMemSize.onHeapSize
offHeapSize = transMemSize.offHeapSize
peakOnHeapSizeTime = MemTime(transMemSize.peakOnHeapSizeTime.memorySize,
transMemSize.peakOnHeapSizeTime.timeStamp)
peakOffHeapSizeTime = MemTime(transMemSize.peakOffHeapSizeTime.memorySize,
transMemSize.peakOffHeapSizeTime.timeStamp)
}
}

@DeveloperApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ private[ui] class MemTableBase(
<th>Executor ID</th>
<th>Address</th>
<th>Network Memory (on-heap)</th>
<th>Network Memory (direct-heap)</th>
<th>Network Memory (off-heap)</th>
<th>Peak Network Memory (on-heap) / Happen Time</th>
<th>Peak Network Read (direct-heap) / Happen Time</th>
<th>Peak Network Read (off-heap) / Happen Time</th>
}

def toNodeSeq: Seq[Node] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private[ui] class StageMemoryPage(parent: MemoryTab) extends WebUIPage("stage")

val finishedStageToMem = memoryListener.completedStagesToMem
val content = if (finishedStageToMem.get(stage).isDefined) {
val executorIdToMem = finishedStageToMem.get(stage).get.toSeq.sortBy(_._1)
val executorIdToMem = finishedStageToMem(stage).toSeq.sortBy(_._1)
val execMemTable = new MemTableBase(executorIdToMem, memoryListener)
<h4 id="activeExec">Executors ({executorIdToMem.size})</h4> ++
execMemTable.toNodeSeq
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.Map

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.json4s.DefaultFormats
import org.json4s.{JsonAST, DefaultFormats}
import org.json4s.JsonDSL._
import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods._
Expand Down Expand Up @@ -296,7 +296,7 @@ private[spark] object JsonProtocol {
def executorMetricsToJson(executorMetrics: ExecutorMetrics): JValue = {
val transportMetrics = transportMetricsToJson(executorMetrics.transportMetrics)
("Executor Hostname" -> executorMetrics.hostname) ~
("Executor Port" -> executorMetrics.port) ~
("Executor Port" -> executorMetrics.port.map(new JInt(_)).getOrElse(JNothing)) ~
("TransportMetrics" -> transportMetrics)
}

Expand Down Expand Up @@ -732,7 +732,7 @@ private[spark] object JsonProtocol {
return metrics
}
metrics.setHostname((json \ "Executor Hostname").extract[String])
metrics.setPort((json \ "Executor Port").extract[Int])
metrics.setPort(Utils.jsonOption(json \ "Executor Port").map(_.extract[Int]))
metrics.setTransportMetrics(transportMetricsFromJson(json \ "TransportMetrics"))
metrics
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
val eventLogger = new EventLoggingListener("test-memListener", None, testDirPath.toUri(), conf)
val execId = "exec-1"
val hostName = "host-1"
val port = 80
val port: Option[Int] = Some(80)

eventLogger.start()
eventLogger.onExecutorAdded(SparkListenerExecutorAdded(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class MemoryListenerSuite extends SparkFunSuite {
val listener = new MemoryListener
val execId1 = "exec-1"
val host1 = "host-1"
val port: Option[Int] = Some(80)

listener.onExecutorAdded(
SparkListenerExecutorAdded(1L, execId1, new ExecutorInfo(host1, 1, Map.empty)))
Expand All @@ -38,13 +39,13 @@ class MemoryListenerSuite extends SparkFunSuite {

// multiple metrics updated in stage 2
listener.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(2))
val execMetrics1 = MemoryListenerSuite.createExecutorMetrics(host1, 80, 2L, 20, 10)
val execMetrics1 = MemoryListenerSuite.createExecutorMetrics(host1, port, 2L, 20, 10)
listener.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent(
execId1, execMetrics1))
val execMetrics2 = MemoryListenerSuite.createExecutorMetrics(host1, 80, 3L, 30, 5)
val execMetrics2 = MemoryListenerSuite.createExecutorMetrics(host1, port, 3L, 30, 5)
listener.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent(
execId1, execMetrics2))
val execMetrics3 = MemoryListenerSuite.createExecutorMetrics(host1, 80, 4L, 15, 15)
val execMetrics3 = MemoryListenerSuite.createExecutorMetrics(host1, port, 4L, 15, 15)
listener.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent(
execId1, execMetrics3))
listener.onStageCompleted(MemoryListenerSuite.createStageEndEvent(2))
Expand Down Expand Up @@ -87,7 +88,8 @@ class MemoryListenerSuite extends SparkFunSuite {
val listener = new MemoryListener
val (execId1, execId2, execId3) = ("exec-1", "exec-2", "exec-3")
val (host1, host2, host3) = ("host-1", "host-2", "host-3")
val (port1, port2, port3) = (80, 80, 80)
val (port1, port2, port3): (Option[Int], Option[Int], Option[Int]) =
(Some(80), Some(80), Some(80))

// two executors added first
listener.onExecutorAdded(
Expand Down Expand Up @@ -135,7 +137,6 @@ class MemoryListenerSuite extends SparkFunSuite {

assert(listener.activeStagesToMem.isEmpty)
assert(listener.completedStagesToMem.size === 4)
assert(listener.activeExecutorIdToMem.size === listener.latestExecIdToExecMetrics.size)
assert(listener.removedExecutorIdToMem.size === 1)

listener.onExecutorRemoved(SparkListenerExecutorRemoved(7L, execId1, ""))
Expand Down Expand Up @@ -180,7 +181,7 @@ object MemoryListenerSuite extends SparkFunSuite {

def createExecutorMetrics(
hostname: String,
port: Int,
port: Option[Int],
timeStamp: Long,
onHeapSize: Long,
offHeapSize: Long): ExecutorMetrics = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class JsonProtocolSuite extends SparkFunSuite {
val executorMetrics = {
val execMetrics = new ExecutorMetrics
execMetrics.setHostname("host-1")
execMetrics.setPort(80)
execMetrics.setPort(Some(80))
execMetrics.setTransportMetrics(TransportMetrics(0L, 10, 10))
execMetrics
}
Expand Down Expand Up @@ -398,7 +398,7 @@ class JsonProtocolSuite extends SparkFunSuite {
val oldJson = newJson.removeField { case (field, _) => field == "Executor Metrics Updated"}
val newMetrics = JsonProtocol.executorMetricsUpdateFromJson(oldJson)
assert(newMetrics.executorMetrics.hostname === "")
assert(newMetrics.executorMetrics.port === 0)
assert(newMetrics.executorMetrics.port === None)
assert(newMetrics.executorMetrics.transportMetrics.onHeapSize === 0L)
assert(newMetrics.executorMetrics.transportMetrics.offHeapSize === 0L)
assert(newMetrics.executorMetrics.transportMetrics.timeStamp != 0L)
Expand Down