Skip to content

Commit 56eeac8

Browse files
ajithmekai-chi
authored andcommitted
[SPARK-27198][CORE] Heartbeat interval mismatch in driver and executor
## What changes were proposed in this pull request? When heartbeat interval is configured via spark.executor.heartbeatInterval without specifying units, we have time mismatched between driver(considers in seconds) and executor(considers as milliseconds) ## How was this patch tested? Will add UTs Closes apache#24140 from ajithme/intervalissue. Authored-by: Ajith <[email protected]> Signed-off-by: Sean Owen <[email protected]>
1 parent 395ed1e commit 56eeac8

File tree

2 files changed

+7
-2
lines changed

2 files changed

+7
-2
lines changed

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap
2222

2323
import scala.collection.JavaConverters._
2424
import scala.collection.mutable.LinkedHashSet
25+
import scala.concurrent.duration._
2526

2627
import org.apache.avro.{Schema, SchemaNormalization}
2728

@@ -610,7 +611,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
610611
s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.")
611612

612613
val executorTimeoutThreshold = getTimeAsSeconds("spark.network.timeout", "120s")
613-
val executorHeartbeatInterval = getTimeAsSeconds("spark.executor.heartbeatInterval", "10s")
614+
val executorHeartbeatInterval =
615+
getTimeAsMs("spark.executor.heartbeatInterval", "10s").millis.toSeconds
614616
// If spark.executor.heartbeatInterval bigger than spark.network.timeout,
615617
// it will almost always cause ExecutorLostFailure. See SPARK-22754.
616618
require(executorTimeoutThreshold > executorHeartbeatInterval, "The value of " +

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import javax.annotation.concurrent.GuardedBy
2828

2929
import scala.collection.JavaConverters._
3030
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
31+
import scala.concurrent.duration._
3132
import scala.util.control.NonFatal
3233

3334
import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -831,9 +832,11 @@ private[spark] class Executor(
831832
}
832833

833834
val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId)
835+
val heartbeatIntervalInSec =
836+
conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s").millis.toSeconds.seconds
834837
try {
835838
val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
836-
message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
839+
message, new RpcTimeout(heartbeatIntervalInSec, "spark.executor.heartbeatInterval"))
837840
if (response.reregisterBlockManager) {
838841
logInfo("Told to re-register on heartbeat")
839842
env.blockManager.reregister()

0 commit comments

Comments
 (0)