diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 6c4c5c94cfa2..390f8246f4fc 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -609,13 +609,14 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED), s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.") - val executorTimeoutThreshold = getTimeAsSeconds("spark.network.timeout", "120s") - val executorHeartbeatInterval = getTimeAsSeconds("spark.executor.heartbeatInterval", "10s") + val executorTimeoutThresholdMs = + getTimeAsSeconds("spark.network.timeout", "120s") * 1000 + val executorHeartbeatIntervalMs = get(EXECUTOR_HEARTBEAT_INTERVAL) // If spark.executor.heartbeatInterval bigger than spark.network.timeout, // it will almost always cause ExecutorLostFailure. See SPARK-22754. - require(executorTimeoutThreshold > executorHeartbeatInterval, "The value of " + - s"spark.network.timeout=${executorTimeoutThreshold}s must be no less than the value of " + - s"spark.executor.heartbeatInterval=${executorHeartbeatInterval}s.") + require(executorTimeoutThresholdMs > executorHeartbeatIntervalMs, "The value of " + + s"spark.network.timeout=${executorTimeoutThresholdMs}ms must be no less than the value of " + + s"spark.executor.heartbeatInterval=${executorHeartbeatIntervalMs}ms.") } /** diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d943087ab6b8..0a66dae94dbd 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -499,7 +499,7 @@ class SparkContext(config: SparkConf) extends Logging { // create and start the heartbeater for collecting memory metrics _heartbeater = new Heartbeater(env.memoryManager, reportHeartBeat, "driver-heartbeater", - conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")) + conf.get(EXECUTOR_HEARTBEAT_INTERVAL)) _heartbeater.start() // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 072277cb78dc..66d5a8d86b5c 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -28,6 +28,7 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap, Map} +import scala.concurrent.duration._ import scala.util.control.NonFatal import com.google.common.util.concurrent.ThreadFactoryBuilder @@ -120,7 +121,7 @@ private[spark] class Executor( } // Whether to load classes in user jars before those in Spark jars - private val userClassPathFirst = conf.getBoolean("spark.executor.userClassPathFirst", false) + private val userClassPathFirst = conf.get(EXECUTOR_USER_CLASS_PATH_FIRST) // Whether to monitor killed / interrupted tasks private val taskReaperEnabled = conf.getBoolean("spark.task.reaper.enabled", false) @@ -147,21 +148,32 @@ private[spark] class Executor( // Maintains the list of running tasks. private val runningTasks = new ConcurrentHashMap[Long, TaskRunner] + /** + * When an executor is unable to send heartbeats to the driver more than `HEARTBEAT_MAX_FAILURES` + * times, it should kill itself. The default value is 60. It means we will retry to send + * heartbeats about 10 minutes because the heartbeat interval is 10s. + */ + private val HEARTBEAT_MAX_FAILURES = conf.get(EXECUTOR_HEARTBEAT_MAX_FAILURES) + + /** + * Whether to drop empty accumulators from heartbeats sent to the driver. Including the empty + * accumulators (that satisfy isZero) can make the size of the heartbeat message very large. + */ + private val HEARTBEAT_DROP_ZEROES = conf.get(EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES) + + /** + * Interval to send heartbeats, in milliseconds + */ + private val HEARTBEAT_INTERVAL_MS = conf.get(EXECUTOR_HEARTBEAT_INTERVAL) + // Executor for the heartbeat task. private val heartbeater = new Heartbeater(env.memoryManager, reportHeartBeat, - "executor-heartbeater", conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")) + "executor-heartbeater", HEARTBEAT_INTERVAL_MS) // must be initialized before running startDriverHeartbeat() private val heartbeatReceiverRef = RpcUtils.makeDriverRef(HeartbeatReceiver.ENDPOINT_NAME, conf, env.rpcEnv) - /** - * When an executor is unable to send heartbeats to the driver more than `HEARTBEAT_MAX_FAILURES` - * times, it should kill itself. The default value is 60. It means we will retry to send - * heartbeats about 10 minutes because the heartbeat interval is 10s. - */ - private val HEARTBEAT_MAX_FAILURES = conf.getInt("spark.executor.heartbeat.maxFailures", 60) - /** * Count the failure times of heartbeat. It should only be accessed in the heartbeat thread. Each * successful heartbeat will reset it to 0. @@ -799,7 +811,13 @@ private[spark] class Executor( if (taskRunner.task != null) { taskRunner.task.metrics.mergeShuffleReadMetrics() taskRunner.task.metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime) - accumUpdates += ((taskRunner.taskId, taskRunner.task.metrics.accumulators())) + val accumulatorsToReport = + if (HEARTBEAT_DROP_ZEROES) { + taskRunner.task.metrics.accumulators().filterNot(_.isZero) + } else { + taskRunner.task.metrics.accumulators() + } + accumUpdates += ((taskRunner.taskId, accumulatorsToReport)) } } @@ -807,7 +825,7 @@ private[spark] class Executor( executorUpdates) try { val response = heartbeatReceiverRef.askSync[HeartbeatResponse]( - message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s")) + message, new RpcTimeout(HEARTBEAT_INTERVAL_MS.millis, EXECUTOR_HEARTBEAT_INTERVAL.key)) if (response.reregisterBlockManager) { logInfo("Told to re-register on heartbeat") env.blockManager.reregister() diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 8d827189ebb5..069fb58fcdb3 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -83,6 +83,20 @@ package object config { private[spark] val EXECUTOR_CLASS_PATH = ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional + private[spark] val EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES = + ConfigBuilder("spark.executor.heartbeat.dropZeroAccumulatorUpdates") + .internal() + .booleanConf + .createWithDefault(true) + + private[spark] val EXECUTOR_HEARTBEAT_INTERVAL = + ConfigBuilder("spark.executor.heartbeatInterval") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("10s") + + private[spark] val EXECUTOR_HEARTBEAT_MAX_FAILURES = + ConfigBuilder("spark.executor.heartbeat.maxFailures").internal().intConf.createWithDefault(60) + private[spark] val EXECUTOR_JAVA_OPTIONS = ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.createOptional diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index 77a7668d3a1d..1f8a65707b2f 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -21,9 +21,10 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput} import java.lang.Thread.UncaughtExceptionHandler import java.nio.ByteBuffer import java.util.Properties -import java.util.concurrent.{CountDownLatch, TimeUnit} +import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean +import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.Map import scala.concurrent.duration._ import scala.language.postfixOps @@ -33,22 +34,25 @@ import org.mockito.Matchers.{any, eq => meq} import org.mockito.Mockito.{inOrder, verify, when} import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer +import org.scalatest.PrivateMethodTester import org.scalatest.concurrent.Eventually import org.scalatest.mockito.MockitoSugar import org.apache.spark._ import org.apache.spark.TaskState.TaskState -import org.apache.spark.memory.MemoryManager +import org.apache.spark.internal.config._ +import org.apache.spark.memory.TestMemoryManager import org.apache.spark.metrics.MetricsSystem import org.apache.spark.rdd.RDD -import org.apache.spark.rpc.RpcEnv -import org.apache.spark.scheduler.{FakeTask, ResultTask, TaskDescription} +import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv, RpcTimeout} +import org.apache.spark.scheduler.{FakeTask, ResultTask, Task, TaskDescription} import org.apache.spark.serializer.{JavaSerializer, SerializerManager} import org.apache.spark.shuffle.FetchFailedException -import org.apache.spark.storage.BlockManagerId -import org.apache.spark.util.UninterruptibleThread +import org.apache.spark.storage.{BlockManager, BlockManagerId} +import org.apache.spark.util.{LongAccumulator, UninterruptibleThread} -class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar with Eventually { +class ExecutorSuite extends SparkFunSuite + with LocalSparkContext with MockitoSugar with Eventually with PrivateMethodTester { test("SPARK-15963: Catch `TaskKilledException` correctly in Executor.TaskRunner") { // mock some objects to make Executor.launchTask() happy @@ -252,18 +256,107 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug } } + test("Heartbeat should drop zero accumulator updates") { + heartbeatZeroAccumulatorUpdateTest(true) + } + + test("Heartbeat should not drop zero accumulator updates when the conf is disabled") { + heartbeatZeroAccumulatorUpdateTest(false) + } + + private def withHeartbeatExecutor(confs: (String, String)*) + (f: (Executor, ArrayBuffer[Heartbeat]) => Unit): Unit = { + val conf = new SparkConf + confs.foreach { case (k, v) => conf.set(k, v) } + val serializer = new JavaSerializer(conf) + val env = createMockEnv(conf, serializer) + val executor = + new Executor("id", "localhost", SparkEnv.get, userClassPath = Nil, isLocal = true) + val executorClass = classOf[Executor] + + // Save all heartbeats sent into an ArrayBuffer for verification + val heartbeats = ArrayBuffer[Heartbeat]() + val mockReceiver = mock[RpcEndpointRef] + when(mockReceiver.askSync(any[Heartbeat], any[RpcTimeout])(any)) + .thenAnswer(new Answer[HeartbeatResponse] { + override def answer(invocation: InvocationOnMock): HeartbeatResponse = { + val args = invocation.getArguments() + val mock = invocation.getMock + heartbeats += args(0).asInstanceOf[Heartbeat] + HeartbeatResponse(false) + } + }) + val receiverRef = executorClass.getDeclaredField("heartbeatReceiverRef") + receiverRef.setAccessible(true) + receiverRef.set(executor, mockReceiver) + + f(executor, heartbeats) + } + + private def heartbeatZeroAccumulatorUpdateTest(dropZeroMetrics: Boolean): Unit = { + val c = EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES.key -> dropZeroMetrics.toString + withHeartbeatExecutor(c) { (executor, heartbeats) => + val reportHeartbeat = PrivateMethod[Unit]('reportHeartBeat) + + // When no tasks are running, there should be no accumulators sent in heartbeat + executor.invokePrivate(reportHeartbeat()) + // invokeReportHeartbeat(executor) + assert(heartbeats.length == 1) + assert(heartbeats(0).accumUpdates.length == 0, + "No updates should be sent when no tasks are running") + + // When we start a task with a nonzero accumulator, that should end up in the heartbeat + val metrics = new TaskMetrics() + val nonZeroAccumulator = new LongAccumulator() + nonZeroAccumulator.add(1) + metrics.registerAccumulator(nonZeroAccumulator) + + val executorClass = classOf[Executor] + val tasksMap = { + val field = + executorClass.getDeclaredField("org$apache$spark$executor$Executor$$runningTasks") + field.setAccessible(true) + field.get(executor).asInstanceOf[ConcurrentHashMap[Long, executor.TaskRunner]] + } + val mockTaskRunner = mock[executor.TaskRunner] + val mockTask = mock[Task[Any]] + when(mockTask.metrics).thenReturn(metrics) + when(mockTaskRunner.taskId).thenReturn(6) + when(mockTaskRunner.task).thenReturn(mockTask) + when(mockTaskRunner.startGCTime).thenReturn(1) + tasksMap.put(6, mockTaskRunner) + + executor.invokePrivate(reportHeartbeat()) + assert(heartbeats.length == 2) + val updates = heartbeats(1).accumUpdates + assert(updates.length == 1 && updates(0)._1 == 6, + "Heartbeat should only send update for the one task running") + val accumsSent = updates(0)._2.length + assert(accumsSent > 0, "The nonzero accumulator we added should be sent") + if (dropZeroMetrics) { + assert(accumsSent == metrics.accumulators().count(!_.isZero), + "The number of accumulators sent should match the number of nonzero accumulators") + } else { + assert(accumsSent == metrics.accumulators().length, + "The number of accumulators sent should match the number of total accumulators") + } + } + } + private def createMockEnv(conf: SparkConf, serializer: JavaSerializer): SparkEnv = { val mockEnv = mock[SparkEnv] val mockRpcEnv = mock[RpcEnv] val mockMetricsSystem = mock[MetricsSystem] - val mockMemoryManager = mock[MemoryManager] + val mockBlockManager = mock[BlockManager] when(mockEnv.conf).thenReturn(conf) when(mockEnv.serializer).thenReturn(serializer) when(mockEnv.serializerManager).thenReturn(mock[SerializerManager]) when(mockEnv.rpcEnv).thenReturn(mockRpcEnv) when(mockEnv.metricsSystem).thenReturn(mockMetricsSystem) - when(mockEnv.memoryManager).thenReturn(mockMemoryManager) + when(mockEnv.memoryManager).thenReturn(new TestMemoryManager(conf)) when(mockEnv.closureSerializer).thenReturn(serializer) + when(mockBlockManager.blockManagerId).thenReturn(BlockManagerId("1", "hostA", 1234)) + when(mockEnv.blockManager).thenReturn(mockBlockManager) SparkEnv.set(mockEnv) mockEnv } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 178de30f0f38..bac0246b7ddc 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -33,6 +33,7 @@ import org.apache.mesos.SchedulerDriver import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException, TaskState} import org.apache.spark.deploy.mesos.config._ import org.apache.spark.internal.config +import org.apache.spark.internal.config.EXECUTOR_HEARTBEAT_INTERVAL import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient @@ -635,7 +636,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( externalShufflePort, sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s")}s"), - sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")) + sc.conf.get(EXECUTOR_HEARTBEAT_INTERVAL)) slave.shuffleRegistered = true }