-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-9104][SPARK-9105][SPARK-9106][SPARK-9107][CORE] Netty network layer memory usage on webUI #7753
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-9104][SPARK-9105][SPARK-9106][SPARK-9107][CORE] Netty network layer memory usage on webUI #7753
Changes from 1 commit
39ba441
ecc1044
2101538
9ccaf88
13c17fb
c9b44b1
984feaf
2501c82
e0ae855
7491279
424c172
f21a804
2f3d30b
7b846a2
41874aa
0531d0f
27b7da1
a8fcf74
f2f0e64
5f7a999
5ad7a6a
c836fb9
b5aa4da
e8e2bdd
1dffa29
75e63c3
0c1241c
c78628e
a93bd96
89214f3
1ed48c1
cb307aa
b438077
4123ac7
2ce9fd9
17d094e
4b3dbe4
0ea7cab
5e031ce
87f8172
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -50,7 +50,5 @@ class ExecutorMetrics extends Serializable { | |
| @DeveloperApi | ||
| case class TransportMetrics( | ||
| timeStamp: Long, | ||
| clientOnheapSize: Long, | ||
| clientDirectheapSize: Long, | ||
| serverOnheapSize: Long, | ||
| serverDirectheapSize: Long) | ||
| onHeapSize: Long, | ||
| directSize: Long) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know "direct" was my suggestion earlier, but now I see that actually we already use "offheap" extensively in the codebase, so lets use that instead.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also, I think we should avoid using a case class. The problem is binary compatibility of the apply / unapply methods when you add a field.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,7 +22,7 @@ import scala.concurrent.{Future, Promise} | |
|
|
||
| import io.netty.buffer._ | ||
|
|
||
| import org.apache.spark.{SecurityManager, SparkConf} | ||
| import org.apache.spark.{SecurityManager, SparkConf, SparkEnv} | ||
| import org.apache.spark.executor.{TransportMetrics, ExecutorMetrics} | ||
| import org.apache.spark.network._ | ||
| import org.apache.spark.network.buffer.ManagedBuffer | ||
|
|
@@ -32,7 +32,6 @@ import org.apache.spark.network.server._ | |
| import org.apache.spark.network.shuffle.{RetryingBlockFetcher, BlockFetchingListener, OneForOneBlockFetcher} | ||
| import org.apache.spark.network.shuffle.protocol.UploadBlock | ||
| import org.apache.spark.serializer.JavaSerializer | ||
| import org.apache.spark.SparkEnv | ||
| import org.apache.spark.storage.{BlockId, StorageLevel} | ||
| import org.apache.spark.util.{Clock, Utils, SystemClock} | ||
|
|
||
|
|
@@ -64,17 +63,17 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage | |
| val currentTime = clock.getTimeMillis() | ||
| val clientPooledAllocator = clientFactory.getPooledAllocator() | ||
| val serverAllocator = server.getAllocator() | ||
| val clientDirectHeapSize: Long = sumOfMetrics( | ||
| val clientDirectSize: Long = sumOfMetrics( | ||
| clientPooledAllocator.directArenas().asScala.toList) | ||
| val clientOnHeapSize: Long = sumOfMetrics(clientPooledAllocator.heapArenas().asScala.toList) | ||
| val serverDirectHeapSize: Long = sumOfMetrics(serverAllocator.directArenas().asScala.toList) | ||
| val serverDirectSize: Long = sumOfMetrics(serverAllocator.directArenas().asScala.toList) | ||
| val serverOnHeapSize: Long = sumOfMetrics(serverAllocator.heapArenas().asScala.toList) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. push the |
||
| executorMetrics.setTransportMetrics(Some(TransportMetrics(currentTime, | ||
| clientOnHeapSize, clientDirectHeapSize, serverOnHeapSize, serverDirectHeapSize))) | ||
| logDebug(s"current Netty client directHeapSize is $clientDirectHeapSize, " + | ||
| s"client heapSize is $clientOnHeapSize, server directHeapsize is $serverDirectHeapSize, " + | ||
| logDebug(s"Current Netty Client directSize is $clientDirectSize, " + | ||
| s"Client HeapSize is $clientOnHeapSize, server directHeapsize is $serverDirectSize, " + | ||
| s"server heapsize is $serverOnHeapSize, executer id is " + | ||
| s"${SparkEnv.get.blockManager.blockManagerId.executorId}") | ||
| executorMetrics.setTransportMetrics(Some(TransportMetrics(currentTime, | ||
| clientOnHeapSize + serverOnHeapSize, clientDirectSize + serverDirectSize))) | ||
| } | ||
|
|
||
| private def sumOfMetrics(arenaMetricList: List[PoolArenaMetric]): Long = { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,7 +20,6 @@ package org.apache.spark.scheduler | |
| import java.io._ | ||
| import java.net.URI | ||
|
|
||
| import akka.remote.transport.Transport | ||
| import org.apache.spark.executor.TransportMetrics | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: ordering
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
|
|
||
| import scala.collection.mutable | ||
|
|
@@ -97,8 +96,9 @@ private[spark] class EventLoggingListener( | |
| private[scheduler] val logPath = getLogPath( | ||
| logBaseDir, appId, appAttemptId, compressionCodecName) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this shouldn't need to change, right?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, my mistake, the width is 100, it's correct. I'll change it back later, thanks |
||
|
|
||
| private val latestMetrics = new HashMap[String, SparkListenerExecutorMetricsUpdate] | ||
| private val modifiedMetrics = new HashMap[String, SparkListenerExecutorMetricsUpdate] | ||
| private val executorIdToLatestMetrics = new HashMap[String, SparkListenerExecutorMetricsUpdate] | ||
| private val executorIdToModifiedMaxMetrics = new | ||
| HashMap[String, SparkListenerExecutorMetricsUpdate] | ||
|
|
||
| /** | ||
| * Creates the log file in the configured log directory. | ||
|
|
@@ -161,17 +161,23 @@ private[spark] class EventLoggingListener( | |
| } | ||
| } | ||
|
|
||
| // We log the event both when stage submitted and stage completed, and after each logEvent call, | ||
| // replace the modifiedMetrics with the latestMetrics. In case the stages submit and complete | ||
| // time might be interleaved. So as to make the result the same with the running time. | ||
| private def logMetricsUpdateEvent() : Unit = { | ||
| modifiedMetrics.map(metrics => logEvent(metrics._2)) | ||
| latestMetrics.map(metrics => modifiedMetrics.update(metrics._1, metrics._2)) | ||
| // When a stage is submitted and completed, we updated our executor memory metrics for that stage, | ||
| // and then log the metrics. Anytime we receive more executor metrics, we update our running set of | ||
| // {{executorIdToLatestMetrics}} and {{executorIdToModifiedMaxMetrics}}. Since stages submit and | ||
| // complete time might be interleaved, we maintain the latest and max metrics for each time segment. | ||
| // So, for each stage start and stage complete, we replace each item in | ||
| // {{executorIdToModifiedMaxMetrics}} with that in {{executorIdToLatestMetrics}}. | ||
| private def updateAndLogExecutorMemoryMetrics() : Unit = { | ||
| executorIdToModifiedMaxMetrics.foreach { case(_, metrics) => logEvent(metrics) } | ||
| executorIdToLatestMetrics.foreach {case(_, metrics) => logEvent(metrics) } | ||
| executorIdToLatestMetrics.foreach { case (executorId, metrics) => | ||
| executorIdToModifiedMaxMetrics.update(executorId, metrics) | ||
| } | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd rename this to
I don't understand the last two sentences of the comment -- can you expand on that? Finally you should use modifiedMetrics.foreach { case (_, metrics) => logEvent(metrics) }
latestMetrics.foreach { case (executorId, metrics) => modifiedMetrics.update(executorId, metrics) }
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I'll update the code according to the design doc. I think the code is not that correct. Please refer it in design doc |
||
|
|
||
| // Events that do not trigger a flush | ||
| override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { | ||
| logMetricsUpdateEvent() | ||
| updateAndLogExecutorMemoryMetrics() | ||
| logEvent(event) | ||
| } | ||
|
|
||
|
|
@@ -185,7 +191,7 @@ private[spark] class EventLoggingListener( | |
|
|
||
| // Events that trigger a flush | ||
| override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { | ||
| logMetricsUpdateEvent() | ||
| updateAndLogExecutorMemoryMetrics() | ||
| logEvent(event, flushLogger = true) | ||
| } | ||
|
|
||
|
|
@@ -218,8 +224,8 @@ private[spark] class EventLoggingListener( | |
| } | ||
|
|
||
| override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = { | ||
| latestMetrics.remove(event.executorId) | ||
| modifiedMetrics.remove(event.executorId) | ||
| executorIdToLatestMetrics.remove(event.executorId) | ||
| executorIdToModifiedMaxMetrics.remove(event.executorId) | ||
| logEvent(event, flushLogger = true) | ||
| } | ||
|
|
||
|
|
@@ -228,7 +234,7 @@ private[spark] class EventLoggingListener( | |
|
|
||
| // No-op because logging every update would be overkill | ||
| override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { | ||
| latestMetrics.update(event.execId, event) | ||
| executorIdToLatestMetrics.update(event.execId, event) | ||
| updateModifiedMetrics(event.execId) | ||
| } | ||
|
|
||
|
|
@@ -258,10 +264,10 @@ private[spark] class EventLoggingListener( | |
| * @param executorId the executor whose metrics will be modified | ||
| */ | ||
| private def updateModifiedMetrics(executorId: String): Unit = { | ||
| val toBeModifiedEvent = modifiedMetrics.get(executorId) | ||
| val latestEvent = latestMetrics.get(executorId) | ||
| val toBeModifiedEvent = executorIdToModifiedMaxMetrics.get(executorId) | ||
| val latestEvent = executorIdToLatestMetrics.get(executorId) | ||
| if (toBeModifiedEvent.isEmpty) { | ||
| if (latestEvent.isDefined) modifiedMetrics.update(executorId, latestEvent.get) | ||
| if (latestEvent.isDefined) executorIdToModifiedMaxMetrics.update(executorId, latestEvent.get) | ||
| } else { | ||
| val toBeModifiedMetrics = toBeModifiedEvent.get.executorMetrics.transportMetrics | ||
| if (toBeModifiedMetrics.isDefined) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. won't private def updateModifiedMetrics(executorId: String, latestEvent: SparkListenerExecutorMetricsUpdate): Unit = {
executorIdToModifiedMaxMetrics.get(executorId) match {
case None => executorIdToModifiedMaxMetrics.update(executorId, latestEvent)
case Some(toBeModEvent) =>
val toBeModMetrics = toBeModEvent.executorMetrics.transportMetrics
...
}
}and depending on whether or not we need to keep
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Thank you for you cleaner code style example, I've updated in my code. |
||
|
|
@@ -270,29 +276,23 @@ private[spark] class EventLoggingListener( | |
| val toBeModTransMetrics = toBeModifiedMetrics.get | ||
| var timeStamp: Long = toBeModTransMetrics.timeStamp | ||
| // the logic here should be the same with that for memoryListener | ||
| val (clientOnheapSize, serverOnheapSize) = | ||
| if (latestTransMetrics.clientOnheapSize + latestTransMetrics.serverOnheapSize > | ||
| toBeModTransMetrics.clientOnheapSize + toBeModTransMetrics.serverOnheapSize) { | ||
| val onHeapSize = if (latestTransMetrics.onHeapSize > toBeModTransMetrics.onHeapSize) { | ||
| timeStamp = latestTransMetrics.timeStamp | ||
| (latestTransMetrics.clientOnheapSize, latestTransMetrics.serverOnheapSize) | ||
| latestTransMetrics.onHeapSize | ||
| } else { | ||
| (toBeModTransMetrics.clientOnheapSize, toBeModTransMetrics.serverOnheapSize) | ||
| toBeModTransMetrics.onHeapSize | ||
| } | ||
| val (clientDirectheapSize, serverDirectheapSize) = | ||
| if (latestTransMetrics.clientDirectheapSize + latestTransMetrics.serverDirectheapSize > | ||
| toBeModTransMetrics.clientDirectheapSize + toBeModTransMetrics.serverDirectheapSize) { | ||
| val directSize = if (latestTransMetrics.directSize > toBeModTransMetrics.directSize) { | ||
| timeStamp = latestTransMetrics.timeStamp | ||
| (latestTransMetrics.clientDirectheapSize, latestTransMetrics.serverDirectheapSize) | ||
| latestTransMetrics.directSize | ||
| } else { | ||
| (toBeModTransMetrics.clientDirectheapSize, toBeModTransMetrics.serverDirectheapSize) | ||
| toBeModTransMetrics.directSize | ||
| } | ||
| toBeModifiedEvent.get.executorMetrics.setTransportMetrics( | ||
| Some(TransportMetrics(timeStamp, clientOnheapSize, clientDirectheapSize, | ||
| serverOnheapSize, serverDirectheapSize))) | ||
| Some(TransportMetrics(timeStamp, onHeapSize, directSize))) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| } | ||
|
|
||
| private[spark] object EventLoggingListener extends Logging { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,11 +41,11 @@ class MemoryListener extends SparkListener { | |
| type ExecutorId = String | ||
| val activeExecutorIdToMem = new HashMap[ExecutorId, MemoryUIInfo] | ||
| val removedExecutorIdToMem = new HashMap[ExecutorId, MemoryUIInfo] | ||
| // latestExecIdToExecMetrics include all executors that is active and removed. | ||
| // latestExecIdToExecMetrics including all executors that is active and removed. | ||
| // this may consume a lot of memory when executors are changing frequently, e.g. in dynamical | ||
| // allocation mode. | ||
| val latestExecIdToExecMetrics = new HashMap[ExecutorId, ExecutorMetrics] | ||
| // stagesIdToMem a map maintains all executors memory information of each stage, | ||
| // activeStagesToMem a map maintains all executors memory information of each stage, | ||
| // the Map type is [(stageId, attemptId), Seq[(executorId, MemoryUIInfo)] | ||
| val activeStagesToMem = new HashMap[(Int, Int), HashMap[ExecutorId, MemoryUIInfo]] | ||
| val completedStagesToMem = new HashMap[(Int, Int), HashMap[ExecutorId, MemoryUIInfo]] | ||
|
|
@@ -55,10 +55,9 @@ class MemoryListener extends SparkListener { | |
| val executorMetrics = event.executorMetrics | ||
| val memoryInfo = activeExecutorIdToMem.getOrElseUpdate(executorId, new MemoryUIInfo) | ||
| memoryInfo.updateExecutorMetrics(executorMetrics) | ||
| activeStagesToMem.map {stageToMem => | ||
| if (stageToMem._2.contains(executorId)) { | ||
| val memInfo = stageToMem._2.get(executorId).get | ||
| memInfo.updateExecutorMetrics(executorMetrics) | ||
| activeStagesToMem.foreach { case (_, stageMemMetrics) => | ||
| if(stageMemMetrics.contains(executorId)) { | ||
| stageMemMetrics.get(executorId).get.updateExecutorMetrics(executorMetrics) | ||
| } | ||
| } | ||
| latestExecIdToExecMetrics.update(executorId, executorMetrics) | ||
|
|
@@ -84,21 +83,19 @@ class MemoryListener extends SparkListener { | |
| override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { | ||
| val stage = (event.stageInfo.stageId, event.stageInfo.attemptId) | ||
| val memInfoMap = new HashMap[ExecutorId, MemoryUIInfo] | ||
| activeExecutorIdToMem.map(idToMem => memInfoMap.update(idToMem._1, new MemoryUIInfo)) | ||
| activeExecutorIdToMem.foreach(idToMem => memInfoMap.update(idToMem._1, new MemoryUIInfo)) | ||
| activeStagesToMem.update(stage, memInfoMap) | ||
| } | ||
|
|
||
| override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { | ||
| val stage = (event.stageInfo.stageId, event.stageInfo.attemptId) | ||
| val memInfoMap = activeStagesToMem.get(stage) | ||
| if (memInfoMap.isDefined) { | ||
| activeExecutorIdToMem.map { idToMem => | ||
| val executorId = idToMem._1 | ||
| val memInfo = memInfoMap.get.getOrElse(executorId, new MemoryUIInfo) | ||
| if (latestExecIdToExecMetrics.contains(executorId)) { | ||
| memInfo.updateExecutorMetrics(latestExecIdToExecMetrics.get(executorId).get) | ||
| activeStagesToMem.get(stage).map { memInfoMap => | ||
| activeExecutorIdToMem.foreach { case (executorId, _) => | ||
| val memInfo = memInfoMap.getOrElse(executorId, new MemoryUIInfo) | ||
| latestExecIdToExecMetrics.get(executorId).foreach { prevExecutorMetrics => | ||
| memInfo.updateExecutorMetrics(prevExecutorMetrics) | ||
| } | ||
| memInfoMap.get.update(executorId, memInfo) | ||
| memInfoMap.update(executorId, memInfo) | ||
| } | ||
| completedStagesToMem.put(stage, activeStagesToMem.remove(stage).get) | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can use option handling to simplify this: activeStagesToMem.get(stage).map { memInfoMap =>
activeExecutorIdToMem.foreach { case (executorId, _) =>
val memInfo = memInfoMap.getOrElseUpdate(executorId, new MemoryUIInfo)
latestExecIdToExecMetrics.get(executorId).foreach { prevExecutorMetrics =>
memInfo.updateExecutorMetrics(prevExecutorMetrics)
}
}
} |
||
|
|
@@ -107,46 +104,42 @@ class MemoryListener extends SparkListener { | |
|
|
||
| class MemoryUIInfo { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| var executorAddress: String = _ | ||
| var transportInfo: Option[transportMemSize] = None | ||
| var transportInfo: Option[TransportMemSize] = None | ||
|
|
||
| def this(execInfo: ExecutorInfo) = { | ||
| this() | ||
| executorAddress = execInfo.executorHost | ||
| } | ||
|
|
||
| def updateExecutorMetrics(execMetrics: ExecutorMetrics): Unit = { | ||
| if (execMetrics.transportMetrics.isDefined) { | ||
| execMetrics.transportMetrics.map { transPortMetrics => | ||
| transportInfo = transportInfo match { | ||
| case Some(transportMemSize) => transportInfo | ||
| case _ => Some(new transportMemSize) | ||
| case _ => Some(new TransportMemSize) | ||
| } | ||
| executorAddress = execMetrics.hostname | ||
| if (execMetrics.transportMetrics.isDefined) { | ||
| transportInfo.get.updateTransport(execMetrics.transportMetrics.get) | ||
| } | ||
| transportInfo.get.updateTransport(transPortMetrics) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| class transportMemSize { | ||
| var onheapSize: Long = _ | ||
| var directheapSize: Long = _ | ||
| var peakOnheapSizeTime: MemTime = new MemTime() | ||
| var peakDirectheapSizeTime: MemTime = new MemTime() | ||
| class TransportMemSize { | ||
| var onHeapSize: Long = _ | ||
| var directSize: Long = _ | ||
| var peakOnHeapSizeTime: MemTime = new MemTime() | ||
| var peakDirectSizeTime: MemTime = new MemTime() | ||
|
|
||
| def updateTransport(transportMetrics: TransportMetrics): Unit = { | ||
| val updatedOnheapSize = transportMetrics.clientOnheapSize + | ||
| transportMetrics.serverOnheapSize | ||
| val updatedDirectheapSize = transportMetrics.clientDirectheapSize + | ||
| transportMetrics.serverDirectheapSize | ||
| val updatedOnHeapSize = transportMetrics.onHeapSize | ||
| val updatedDirectSize = transportMetrics.directSize | ||
| val updateTime: Long = transportMetrics.timeStamp | ||
| onheapSize = updatedOnheapSize | ||
| directheapSize = updatedDirectheapSize | ||
| if (updatedOnheapSize >= peakOnheapSizeTime.memorySize) { | ||
| peakOnheapSizeTime = MemTime(updatedOnheapSize, updateTime) | ||
| onHeapSize = updatedOnHeapSize | ||
| directSize = updatedDirectSize | ||
| if (updatedOnHeapSize >= peakOnHeapSizeTime.memorySize) { | ||
| peakOnHeapSizeTime = MemTime(updatedOnHeapSize, updateTime) | ||
| } | ||
| if (updatedDirectheapSize >= peakDirectheapSizeTime.memorySize) { | ||
| peakDirectheapSizeTime = MemTime(updatedDirectheapSize, updateTime) | ||
| if (updatedDirectSize >= peakDirectSizeTime.memorySize) { | ||
| peakDirectSizeTime = MemTime(updatedDirectSize, updateTime) | ||
| } | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whats the point of this? sorry if we discussed it earlier ... if its just to test the
ExecutorMetricsreally is serializable, that would be better in a test caseThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is due to SPARK-3465. Currently we do not have any aggregation operations for
ExecutorMetrics, we can remove this. We can add it back when we do some aggregation.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, this is a great point. In that case, I think you can leave it in for now, but add a comment that the serialization & deserialization is just to make a copy, for to SPARK-3465