Skip to content

Commit 80ffc25

Browse files
committed
backport fix from master
1 parent baadfc8 commit 80ffc25

File tree

4 files changed

+20
-11
lines changed

4 files changed

+20
-11
lines changed

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -610,14 +610,14 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
610610
require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED),
611611
s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.")
612612

613-
val executorTimeoutThreshold = getTimeAsSeconds("spark.network.timeout", "120s")
614-
val executorHeartbeatInterval =
615-
getTimeAsMs("spark.executor.heartbeatInterval", "10s").millis.toSeconds
613+
val executorTimeoutThresholdMs =
614+
getTimeAsSeconds("spark.network.timeout", "120s") * 1000
615+
val executorHeartbeatIntervalMs = get(EXECUTOR_HEARTBEAT_INTERVAL)
616616
// If spark.executor.heartbeatInterval bigger than spark.network.timeout,
617617
// it will almost always cause ExecutorLostFailure. See SPARK-22754.
618-
require(executorTimeoutThreshold > executorHeartbeatInterval, "The value of " +
619-
s"spark.network.timeout=${executorTimeoutThreshold}s must be no less than the value of " +
620-
s"spark.executor.heartbeatInterval=${executorHeartbeatInterval}s.")
618+
require(executorHeartbeatIntervalMs > executorTimeoutThresholdMs, "The value of " +
619+
s"spark.network.timeout=${executorTimeoutThresholdMs}ms must be no less than the value of " +
620+
s"spark.executor.heartbeatInterval=${executorHeartbeatIntervalMs}ms.")
621621
}
622622

623623
/**

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,11 @@ private[spark] class Executor(
171171
// Maintains the list of running tasks.
172172
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
173173

174+
/**
175+
* Interval to send heartbeats, in milliseconds
176+
*/
177+
private val HEARTBEAT_INTERVAL_MS = conf.get(EXECUTOR_HEARTBEAT_INTERVAL)
178+
174179
// Executor for the heartbeat task.
175180
private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")
176181

@@ -832,11 +837,9 @@ private[spark] class Executor(
832837
}
833838

834839
val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId)
835-
val heartbeatIntervalInSec =
836-
conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s").millis.toSeconds.seconds
837840
try {
838841
val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
839-
message, new RpcTimeout(heartbeatIntervalInSec, "spark.executor.heartbeatInterval"))
842+
message, new RpcTimeout(HEARTBEAT_INTERVAL_MS.millis, EXECUTOR_HEARTBEAT_INTERVAL.key))
840843
if (response.reregisterBlockManager) {
841844
logInfo("Told to re-register on heartbeat")
842845
env.blockManager.reregister()
@@ -858,7 +861,7 @@ private[spark] class Executor(
858861
* Schedules a task to report heartbeat and partial metrics for active tasks to driver.
859862
*/
860863
private def startDriverHeartbeater(): Unit = {
861-
val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")
864+
val intervalMs = HEARTBEAT_INTERVAL_MS
862865

863866
// Wait a random interval so the heartbeats don't end up in sync
864867
val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@ package object config {
7979
private[spark] val EXECUTOR_CLASS_PATH =
8080
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional
8181

82+
private[spark] val EXECUTOR_HEARTBEAT_INTERVAL =
83+
ConfigBuilder("spark.executor.heartbeatInterval")
84+
.timeConf(TimeUnit.MILLISECONDS)
85+
.createWithDefaultString("10s")
86+
8287
private[spark] val EXECUTOR_JAVA_OPTIONS =
8388
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.createOptional
8489

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.mesos.SchedulerDriver
3333
import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException, TaskState}
3434
import org.apache.spark.deploy.mesos.config._
3535
import org.apache.spark.internal.config
36+
import org.apache.spark.internal.config.EXECUTOR_HEARTBEAT_INTERVAL
3637
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
3738
import org.apache.spark.network.netty.SparkTransportConf
3839
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
@@ -635,7 +636,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
635636
externalShufflePort,
636637
sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs",
637638
s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s")}s"),
638-
sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
639+
sc.conf.get(EXECUTOR_HEARTBEAT_INTERVAL))
639640
slave.shuffleRegistered = true
640641
}
641642

0 commit comments

Comments
 (0)