Skip to content

Commit 53c06dd

Browse files
committed
[SPARK-24332][SS][MESOS] Fix places reading 'spark.network.timeout' as milliseconds
## What changes were proposed in this pull request? This PR replaces `getTimeAsMs` with `getTimeAsSeconds` to fix the issue that reading "spark.network.timeout" using a wrong time unit when the user doesn't specify a time out. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes #21382 from zsxwing/fix-network-timeout-conf.
1 parent 0d89943 commit 53c06dd

File tree

5 files changed

+7
-5
lines changed

5 files changed

+7
-5
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ private[kafka010] class KafkaMicroBatchReader(
6868

6969
private val pollTimeoutMs = options.getLong(
7070
"kafkaConsumer.pollTimeoutMs",
71-
SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
71+
SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L)
7272

7373
private val maxOffsetsPerTrigger =
7474
Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ private[kafka010] class KafkaRelation(
4848

4949
private val pollTimeoutMs = sourceOptions.getOrElse(
5050
"kafkaConsumer.pollTimeoutMs",
51-
sqlContext.sparkContext.conf.getTimeAsMs("spark.network.timeout", "120s").toString
51+
(sqlContext.sparkContext.conf.getTimeAsSeconds(
52+
"spark.network.timeout",
53+
"120s") * 1000L).toString
5254
).toLong
5355

5456
override def schema: StructType = KafkaOffsetReader.kafkaSchema

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ private[kafka010] class KafkaSource(
8484

8585
private val pollTimeoutMs = sourceOptions.getOrElse(
8686
"kafkaConsumer.pollTimeoutMs",
87-
sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString
87+
(sc.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L).toString
8888
).toLong
8989

9090
private val maxOffsetsPerTrigger =

external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ private[spark] class KafkaRDD[K, V](
6565

6666
// TODO is it necessary to have separate configs for initial poll time vs ongoing poll time?
6767
private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms",
68-
conf.getTimeAsMs("spark.network.timeout", "120s"))
68+
conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L)
6969
private val cacheInitialCapacity =
7070
conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16)
7171
private val cacheMaxCapacity =

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -634,7 +634,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
634634
slave.hostname,
635635
externalShufflePort,
636636
sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs",
637-
s"${sc.conf.getTimeAsMs("spark.network.timeout", "120s")}ms"),
637+
s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L}ms"),
638638
sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
639639
slave.shuffleRegistered = true
640640
}

0 commit comments

Comments
 (0)