Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private[kafka010] class KafkaMicroBatchReader(

private val pollTimeoutMs = options.getLong(
"kafkaConsumer.pollTimeoutMs",
SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L)

private val maxOffsetsPerTrigger =
Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ private[kafka010] class KafkaRelation(

private val pollTimeoutMs = sourceOptions.getOrElse(
"kafkaConsumer.pollTimeoutMs",
sqlContext.sparkContext.conf.getTimeAsMs("spark.network.timeout", "120s").toString
(sqlContext.sparkContext.conf.getTimeAsSeconds(
"spark.network.timeout",
"120s") * 1000L).toString
).toLong

override def schema: StructType = KafkaOffsetReader.kafkaSchema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private[kafka010] class KafkaSource(

private val pollTimeoutMs = sourceOptions.getOrElse(
"kafkaConsumer.pollTimeoutMs",
sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString
(sc.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L).toString
).toLong

private val maxOffsetsPerTrigger =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private[spark] class KafkaRDD[K, V](

// TODO is it necessary to have separate configs for initial poll time vs ongoing poll time?
private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms",
conf.getTimeAsMs("spark.network.timeout", "120s"))
conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L)
private val cacheInitialCapacity =
conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16)
private val cacheMaxCapacity =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
slave.hostname,
externalShufflePort,
sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs",
s"${sc.conf.getTimeAsMs("spark.network.timeout", "120s")}ms"),
s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L}ms"),
sc.conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
slave.shuffleRegistered = true
}
Expand Down