Skip to content
Closed
Next Next commit
Netty network layer memory usage on webUI
Manual pull of the PR from:
https://github.com/apache/spark/pull/7753/commits

Created with:
git checkout upstream/pr/7753 -- .

A bunch of manual merging happened after this.
  • Loading branch information
jsoltren committed Apr 4, 2017
commit 1e51c73d340b331f6fcd033635e4468201ba1477
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.{ScheduledFuture, TimeUnit}
import scala.collection.mutable
import scala.concurrent.Future

import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.scheduler._
Expand All @@ -36,6 +37,7 @@ import org.apache.spark.util._
*/
private[spark] case class Heartbeat(
executorId: String,
executorMetrics: ExecutorMetrics,
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], // taskId -> accumulator updates
blockManagerId: BlockManagerId)

Expand Down Expand Up @@ -120,14 +122,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
context.reply(true)

// Messages received from executors
case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId) =>
case heartbeat @ Heartbeat(executorId, executorMetrics, accumUpdates, blockManagerId) =>
if (scheduler != null) {
if (executorLastSeen.contains(executorId)) {
executorLastSeen(executorId) = clock.getTimeMillis()
eventLoopThread.submit(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
val unknownExecutor = !scheduler.executorHeartbeatReceived(
executorId, accumUpdates, blockManagerId)
executorId, executorMetrics, accumUpdates, blockManagerId)
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
context.reply(response)
}
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@ package org.apache.spark
import java.io.File
import java.net.Socket

import com.google.common.collect.MapMaker
import scala.collection.mutable
import scala.util.Properties

import com.google.common.collect.MapMaker

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.python.PythonWorkerFactory
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemoryManager}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.{LiveListenerBus, OutputCommitCoordinator}
Expand Down Expand Up @@ -62,6 +62,7 @@ class SparkEnv (
val mapOutputTracker: MapOutputTracker,
val shuffleManager: ShuffleManager,
val broadcastManager: BroadcastManager,
val blockTransferService: BlockTransferService,
val blockManager: BlockManager,
val securityManager: SecurityManager,
val metricsSystem: MetricsSystem,
Expand Down Expand Up @@ -381,6 +382,7 @@ object SparkEnv extends Logging {
mapOutputTracker,
shuffleManager,
broadcastManager,
blockTransferService,
blockManager,
securityManager,
metricsSystem,
Expand Down
20 changes: 19 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ private[spark] class Executor(
env.blockManager.initialize(conf.getAppId)
}

private val executorMetrics: ExecutorMetrics = new ExecutorMetrics
executorMetrics.setHostname(Utils.localHostName)
if (env.rpcEnv.address != null) {
executorMetrics.setPort(Some(env.rpcEnv.address.port))
}

// Whether to load classes in user jars before those in Spark jars
private val userClassPathFirst = conf.getBoolean("spark.executor.userClassPathFirst", false)

Expand Down Expand Up @@ -704,7 +710,19 @@ private[spark] class Executor(
}
}

val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId)
env.blockTransferService.getMemMetrics(this.executorMetrics)
val executorMetrics = if (isLocal) {
// JobProgressListener might hold a reference of it during onExecutorMetricsUpdate()
// in future, if then JobProgressListener cannot see the changes of metrics any
// more, so make a deep copy of it here for future change.
Utils.deserialize[ExecutorMetrics](Utils.serialize(this.executorMetrics))
} else {
this.executorMetrics
}

val message = Heartbeat(
executorId, executorMetrics, accumUpdates.toArray, env.blockManager.blockManagerId)

try {
val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.executor

import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
* Metrics tracked during the execution of an executor.
*
* So, when adding new fields, take into consideration that the whole object can be serialized for
* shipping off at any time to consumers of the SparkListener interface.
*/
@DeveloperApi
class ExecutorMetrics extends Serializable {

/**
* Host's name the executor runs on
*/
private var _hostname: String = ""
def hostname: String = _hostname
private[spark] def setHostname(value: String) = _hostname = value

/**
* Host's port the executor runs on
*/
private var _port: Option[Int] = None
def port: Option[Int] = _port
private[spark] def setPort(value: Option[Int]) = _port = value

private[spark] def hostPort: String = {
val hp = port match {
case None => hostname
case value => hostname + ":" + value.get
}
hp
}

private var _transportMetrics: TransportMetrics = new TransportMetrics
def transportMetrics: TransportMetrics = _transportMetrics
private[spark] def setTransportMetrics(value: TransportMetrics) = {
_transportMetrics = value
}
}

object ExecutorMetrics extends Serializable {
def apply(
hostName: String,
port: Option[Int],
transportMetrics: TransportMetrics): ExecutorMetrics = {
val execMetrics = new ExecutorMetrics
execMetrics.setHostname(hostName)
execMetrics.setPort(port)
execMetrics.setTransportMetrics(transportMetrics)
execMetrics
}
}

/**
* :: DeveloperApi ::
* Metrics for network layer
*/
@DeveloperApi
class TransportMetrics (
val timeStamp: Long = System.currentTimeMillis,
val onHeapSize: Long = 0L,
val offHeapSize: Long = 0L) extends Serializable

object TransportMetrics extends Serializable {
def apply(
timeStamp: Long,
onHeapSize: Long,
offHeapSize: Long): TransportMetrics = {
new TransportMetrics(timeStamp, onHeapSize, offHeapSize)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.concurrent.{Future, Promise}
import scala.concurrent.duration.Duration
import scala.reflect.ClassTag

import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient}
Expand All @@ -39,6 +40,11 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
*/
def init(blockDataManager: BlockDataManager): Unit

/**
* Collect current executor memory metrics of transferService.
*/
private[spark] def getMemMetrics(executorMetrics: ExecutorMetrics): Unit

/**
* Tear down the transfer service.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ import scala.collection.JavaConverters._
import scala.concurrent.{Future, Promise}
import scala.reflect.ClassTag

import org.apache.spark.{SecurityManager, SparkConf}
import io.netty.buffer._

import org.apache.spark.{SecurityManager, SparkConf, SparkEnv}
import org.apache.spark.executor.{ExecutorMetrics, TransportMetrics}
import org.apache.spark.network._
import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.network.client.{RpcResponseCallback, TransportClientBootstrap, TransportClientFactory}
Expand All @@ -34,7 +37,7 @@ import org.apache.spark.network.shuffle.protocol.UploadBlock
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.util.Utils
import org.apache.spark.util.{Clock, SystemClock, Utils}

/**
* A BlockTransferService that uses Netty to fetch a set of blocks at time.
Expand All @@ -57,6 +60,40 @@ private[spark] class NettyBlockTransferService(
private[this] var server: TransportServer = _
private[this] var clientFactory: TransportClientFactory = _
private[this] var appId: String = _
private[this] var clock: Clock = new SystemClock()

/**
* Use a different clock for this allocation manager. This is mainly used for testing.
*/
def setClock(newClock: Clock): Unit = {
clock = newClock
}

private[spark] override def getMemMetrics(executorMetrics: ExecutorMetrics): Unit = {
val currentTime = clock.getTimeMillis()
val clientPooledAllocator = clientFactory.getPooledAllocator()
val serverAllocator = server.getAllocator()
val clientOffHeapSize: Long = sumOfMetrics(convToScala(clientPooledAllocator.directArenas()))
val clientOnHeapSize: Long = sumOfMetrics(convToScala(clientPooledAllocator.heapArenas()))
val serverOffHeapSize: Long = sumOfMetrics(convToScala(serverAllocator.directArenas()))
val serverOnHeapSize: Long = sumOfMetrics(convToScala(serverAllocator.heapArenas()))
logDebug(s"Current Netty Client offheap size is $clientOffHeapSize, " +
s"Client heap size is $clientOnHeapSize, Server offheap size is $serverOffHeapSize, " +
s"server heap size is $serverOnHeapSize, executor id is " +
s"${SparkEnv.get.blockManager.blockManagerId.executorId}")
executorMetrics.setTransportMetrics(TransportMetrics(currentTime,
clientOnHeapSize + serverOnHeapSize, clientOffHeapSize + serverOffHeapSize))
}

private def convToScala = (x: java.util.List[PoolArenaMetric]) => x.asScala

private def sumOfMetrics(arenaMetricList: Seq[PoolArenaMetric]): Long = {
arenaMetricList.map { Arena =>
Arena.chunkLists().asScala.map { chunk =>
chunk.iterator().asScala.map(_.chunkSize()).sum
}.sum
}.sum
}

override def init(blockDataManager: BlockDataManager): Unit = {
val rpcHandler = new NettyBlockRpcServer(conf.getAppId, serializer, blockDataManager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.commons.lang3.SerializationUtils

import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
Expand Down Expand Up @@ -229,16 +229,17 @@ class DAGScheduler(
}

/**
* Update metrics for in-progress tasks and let the master know that the BlockManager is still
* alive. Return true if the driver knows about the given block manager. Otherwise, return false,
* indicating that the block manager should re-register.
* Update metrics for live executor and in-progress tasks and let the master know that the
* BlockManager is still alive. Return true if the driver knows about the given block manager.
* Otherwise, return false, indicating that the block manager should re-register.
*/
def executorHeartbeatReceived(
execId: String,
executorMetrics: ExecutorMetrics,
// (taskId, stageId, stageAttemptId, accumUpdates)
accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])],
blockManagerId: BlockManagerId): Boolean = {
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates))
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, executorMetrics, accumUpdates))
blockManagerMaster.driverEndpoint.askSync[Boolean](
BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat"))
}
Expand Down
Loading