-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-5931][CORE] Use consistent naming for time properties #5236
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
59bf9e1
404f8c3
7db6d2a
4933fda
c9f5cad
21ef3dd
064ebd6
7320c87
272c215
3352d34
3f1cfc8
6d1518e
2fcc91c
5181597
c6a0095
cde9bff
42477aa
9a29d8d
34f87c2
8f741e1
9e2547c
499bdf0
5232a36
3a12dd8
68f4e93
70ac213
647b5ac
1c0c07c
8613631
bac9edf
1858197
3b126e1
39164f9
b2fc965
dd0a680
bf779b0
76cfa27
5193d5f
6387772
19c31af
ff40bfe
28187bf
1465390
cbf41db
d4efd26
4e48679
1a1122c
cbd2ca6
6f651a8
7d19cdd
dc7bd08
69fedcc
8927e66
642a06d
25d3f52
bc04e05
951ca2d
f5fafcd
de3bff9
4526c81
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
…TimeAsSec and getTimeAsMs methods in SparkConf. Updated documentation
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -62,17 +62,17 @@ private[spark] class HeartbeatReceiver(sc: SparkContext) | |
|
|
||
| // "spark.network.timeout" uses "seconds", while `spark.storage.blockManagerSlaveTimeoutMs` uses | ||
| // "milliseconds" | ||
| private val slaveTimeoutMs = Utils.timeStringAsMs( | ||
| sc.conf.get("spark.storage.blockManagerSlaveTimeoutMs", "120s")) | ||
| private val executorTimeoutMs = Utils.timeStringAsSec(sc.conf.get("spark.network.timeout", | ||
| s"${slaveTimeoutMs}ms")) * 1000 | ||
| private val slaveTimeoutMs = | ||
| sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", "120s") | ||
| private val executorTimeoutMs = | ||
| sc.conf.getTimeAsSec("spark.network.timeout", s"${slaveTimeoutMs}ms") * 1000 | ||
|
|
||
| // "spark.network.timeoutInterval" uses "seconds", while | ||
| // "spark.storage.blockManagerTimeoutIntervalMs" uses "milliseconds" | ||
| private val timeoutIntervalMs = Utils.timeStringAsMs( | ||
| sc.conf.get("spark.storage.blockManagerTimeoutIntervalMs", "60s")) | ||
| private val checkTimeoutIntervalMs = Utils.timeStringAsSec( | ||
| sc.conf.get("spark.network.timeoutInterval", s"${timeoutIntervalMs}ms")) * 1000 | ||
| private val timeoutIntervalMs = | ||
| sc.conf.getTimeAsMs("spark.storage.blockManagerTimeoutIntervalMs", "60s") | ||
| private val checkTimeoutIntervalMs = | ||
| sc.conf.getTimeAsSec("spark.network.timeoutInterval", s"${timeoutIntervalMs}ms") * 1000 | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same, can go straight to ms?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same - it can't since default unit is s.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sean. It can't because then the default unit is assumed to be ms when it's really seconds. Sent with Good (www.good.com) -----Original Message----- In core/src/main/scala/org/apache/spark/HeartbeatReceiver.scalahttps://github.com//pull/5236#discussion_r28196499:
Same, can go straight to ms? — The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. |
||
|
|
||
| private var timeoutCheckingTask: ScheduledFuture[_] = null | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -174,6 +174,35 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { | |
| getOption(key).getOrElse(defaultValue) | ||
| } | ||
|
|
||
| /** Get a time parameter as seconds; throws a NoSuchElementException if it's not set. If no | ||
| * suffix is provided then seconds are assumed. | ||
| */ | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. style: in Spark we use java docs not scala docs. See other methods in this file for reference
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the difference in this context? this works as javadoc too. The formatting could be more like: but should still render OK either way? |
||
| def getTimeAsSec(key: String): Long = { | ||
| Utils.timeStringAsSec(getOption(key).getOrElse(throw new NoSuchElementException(key))) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you can just call |
||
| } | ||
|
|
||
| /** Get a time parameter as seconds, falling back to a default if not set. If no | ||
| * suffix is provided then seconds are assumed. | ||
| */ | ||
| def getTimeAsSec(key: String, defaultValue: String): Long = { | ||
| Utils.timeStringAsSec(getOption(key).getOrElse(defaultValue)) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same question here, can this be |
||
| } | ||
|
|
||
| /** Get a time parameter as milliseconds; throws a NoSuchElementException if it's not set. If no | ||
| * suffix is provided then milliseconds are assumed. | ||
| */ | ||
| def getTimeAsMs(key: String): Long = { | ||
| Utils.timeStringAsMs(getOption(key).getOrElse(throw new NoSuchElementException(key))) | ||
| } | ||
|
|
||
| /** Get a time parameter as milliseconds, falling back to a default if not set. If no | ||
| * suffix is provided then milliseconds are assumed. | ||
| */ | ||
| def getTimeAsMs(key: String, defaultValue: String): Long = { | ||
| Utils.timeStringAsMs(getOption(key).getOrElse(defaultValue)) | ||
| } | ||
|
|
||
|
|
||
| /** Get a parameter as an Option */ | ||
| def getOption(key: String): Option[String] = { | ||
| Option(settings.get(key)) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -855,7 +855,7 @@ private[spark] class TaskSetManager( | |
| case TaskLocality.RACK_LOCAL => "spark.locality.wait.rack" | ||
| case _ => "" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this should be |
||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just an idea, maybe we can rewrite this as: Looks nicer IMO, less duplicate code |
||
| Utils.timeStringAsMs(conf.get(localityWaitKey, defaultWait)) | ||
| conf.getTimeAsMs(localityWaitKey, defaultWait) | ||
| } | ||
|
|
||
| /** | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,7 +20,6 @@ package org.apache.spark.util | |
| import scala.collection.JavaConversions.mapAsJavaMap | ||
| import scala.concurrent.Await | ||
| import scala.concurrent.duration.{Duration, FiniteDuration} | ||
| import scala.util.Try | ||
|
|
||
| import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem} | ||
| import akka.pattern.ask | ||
|
|
@@ -66,8 +65,8 @@ private[spark] object AkkaUtils extends Logging { | |
|
|
||
| val akkaThreads = conf.getInt("spark.akka.threads", 4) | ||
| val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15) | ||
| val akkaTimeoutS = Utils.timeStringAsSec(conf.get("spark.akka.timeout", | ||
| conf.get("spark.network.timeout", "120s"))) | ||
| val akkaTimeoutS = conf.getTimeAsSec("spark.akka.timeout", | ||
| conf.get("spark.network.timeout", "120s")) | ||
| val akkaFrameSize = maxFrameSizeBytes(conf) | ||
| val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false) | ||
| val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off" | ||
|
|
@@ -79,10 +78,10 @@ private[spark] object AkkaUtils extends Logging { | |
|
|
||
| val logAkkaConfig = if (conf.getBoolean("spark.akka.logAkkaConfig", false)) "on" else "off" | ||
|
|
||
| val akkaHeartBeatPausesS = Utils.timeStringAsSec(conf.get("spark.akka.heartbeat.pauses", | ||
| "6000s")) | ||
| val akkaHeartBeatPausesS = conf.getTimeAsSec("spark.akka.heartbeat.pauses", | ||
| "6000s") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you move this to the line above? I think it fits comfortably |
||
| val akkaHeartBeatIntervalS = | ||
| Utils.timeStringAsSec(conf.get("spark.akka.heartbeat.interval", "1000s")) | ||
| conf.getTimeAsSec("spark.akka.heartbeat.interval", "1000s") | ||
|
|
||
| val secretKey = securityManager.getSecretKey() | ||
| val isAuthOn = securityManager.isAuthenticationEnabled() | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -613,8 +613,8 @@ private[spark] object Utils extends Logging { | |
| } | ||
| Utils.setupSecureURLConnection(uc, securityMgr) | ||
|
|
||
| val timeoutMs = Utils.timeStringAsSec( | ||
| conf.get("spark.files.fetchTimeout", "60s")).toInt * 1000 | ||
| val timeoutMs = | ||
| conf.getTimeAsSec("spark.files.fetchTimeout", "60s").toInt * 1000 | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same, can go straight to MS?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't to maintain backwards compatibility for no units. |
||
| uc.setConnectTimeout(timeoutMs) | ||
| uc.setReadTimeout(timeoutMs) | ||
| uc.connect() | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,7 @@ | |
|
|
||
| package org.apache.spark.util | ||
|
|
||
| import java.lang.NumberFormatException | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Trivial: you don't need to import anything from |
||
| import java.util.concurrent.TimeUnit | ||
|
|
||
| import scala.util.Random | ||
|
|
@@ -62,6 +63,30 @@ class UtilsSuite extends FunSuite with ResetSystemProperties { | |
| assert(Utils.timeStringAsUs("1min") === TimeUnit.MINUTES.toMicros(1)) | ||
| assert(Utils.timeStringAsUs("1h") === TimeUnit.HOURS.toMicros(1)) | ||
| assert(Utils.timeStringAsUs("1d") === TimeUnit.DAYS.toMicros(1)) | ||
|
|
||
| // Test invalid strings | ||
| try { | ||
| Utils.timeStringAsMs("This breaks 600s") | ||
| assert(false) // We should never reach this | ||
| } catch { | ||
| case e: NumberFormatException => assert(true) | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there's this really cool thing called
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is actually super cool. Thanks for pointing that out. |
||
|
|
||
| // Test invalid strings | ||
| try { | ||
| Utils.timeStringAsMs("600s This breaks") | ||
| assert(false) // We should never reach this | ||
| } catch { | ||
| case e: NumberFormatException => assert(true) | ||
| } | ||
|
|
||
| // Test invalid strings | ||
| try { | ||
| Utils.timeStringAsMs("This 123s breaks") | ||
| assert(false) // We should never reach this | ||
| } catch { | ||
| case e: NumberFormatException => assert(true) | ||
| } | ||
| } | ||
|
|
||
| test("bytesToString") { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just
getTimeAsMshere without the* 1000?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sean - it can't because since the unit is typically specified in seconds, if no unit is provided it will be interpreted as seconds in this code. Changing to the Ms method will break backwards compatibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, that's why. Ignore all the similar comments below then.