-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-5931][CORE] Use consistent naming for time properties #5236
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
59bf9e1
404f8c3
7db6d2a
4933fda
c9f5cad
21ef3dd
064ebd6
7320c87
272c215
3352d34
3f1cfc8
6d1518e
2fcc91c
5181597
c6a0095
cde9bff
42477aa
9a29d8d
34f87c2
8f741e1
9e2547c
499bdf0
5232a36
3a12dd8
68f4e93
70ac213
647b5ac
1c0c07c
8613631
bac9edf
1858197
3b126e1
39164f9
b2fc965
dd0a680
bf779b0
76cfa27
5193d5f
6387772
19c31af
ff40bfe
28187bf
1465390
cbf41db
d4efd26
4e48679
1a1122c
cbd2ca6
6f651a8
7d19cdd
dc7bd08
69fedcc
8927e66
642a06d
25d3f52
bc04e05
951ca2d
f5fafcd
de3bff9
4526c81
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -442,7 +442,7 @@ private[spark] class Executor( | |
| * This thread stops running when the executor is stopped. | ||
| */ | ||
| private def startDriverHeartbeater(): Unit = { | ||
| val intervalMs = Utils.timeStringToMs(conf.get("spark.executor.heartbeatInterval", "10s")) | ||
| val intervalMs = Utils.timeStringAsMs(conf.get("spark.executor.heartbeatInterval", "10s")) | ||
| val thread = new Thread() { | ||
| override def run() { | ||
| // Sleep a random intervalMs so the heartbeats don't end up in sync | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think the "interval" here was referring to the variable, so could you please revert this :) |
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -851,11 +851,11 @@ private[spark] class TaskSetManager( | |
| val defaultWait = conf.get("spark.locality.wait", "3000ms") | ||
| level match { | ||
| case TaskLocality.PROCESS_LOCAL => | ||
| Utils.timeStringToMs(conf.get("spark.locality.wait.process", defaultWait)) | ||
| Utils.timeStringAsMs(conf.get("spark.locality.wait.process", defaultWait)) | ||
| case TaskLocality.NODE_LOCAL => | ||
| Utils.timeStringToMs(conf.get("spark.locality.wait.node", defaultWait)) | ||
| Utils.timeStringAsMs(conf.get("spark.locality.wait.node", defaultWait)) | ||
| case TaskLocality.RACK_LOCAL => | ||
| Utils.timeStringToMs(conf.get("spark.locality.wait.rack", defaultWait)) | ||
| Utils.timeStringAsMs(conf.get("spark.locality.wait.rack", defaultWait)) | ||
| case _ => 0L | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just an idea, maybe we can rewrite this as: Looks nicer IMO, less duplicate code |
||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -58,7 +58,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste | |
| // Submit tasks after maxRegisteredWaitingTime milliseconds | ||
| // if minRegisteredRatio has not yet been reached | ||
| val maxRegisteredWaitingTimeMs = | ||
| Utils.timeStringToMs(conf.get("spark.scheduler.maxRegisteredResourcesWaitingTime", "30000ms")) | ||
| Utils.timeStringAsMs(conf.get("spark.scheduler.maxRegisteredResourcesWaitingTime", "30000ms")) | ||
| val createTime = System.currentTimeMillis() | ||
|
|
||
| private val executorDataMap = new HashMap[String, ExecutorData] | ||
|
|
@@ -80,7 +80,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste | |
| context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) | ||
|
|
||
| // Periodically revive offers to allow delay scheduling to work | ||
| val reviveIntervalMs = Utils.timeStringToMs( | ||
| val reviveIntervalMs = Utils.timeStringAsMs( | ||
| conf.get("spark.scheduler.revive.interval", "1000ms")) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suppose you can say "1s" but it's not that important |
||
| import context.dispatcher | ||
| context.system.scheduler.schedule(0.millis, reviveIntervalMs.millis, self, ReviveOffers) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -66,7 +66,7 @@ private[spark] object AkkaUtils extends Logging { | |
|
|
||
| val akkaThreads = conf.getInt("spark.akka.threads", 4) | ||
| val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15) | ||
| val akkaTimeoutS = Utils.timeStringToS(conf.get("spark.akka.timeout", | ||
| val akkaTimeoutS = Utils.timeStringAsS(conf.get("spark.akka.timeout", | ||
| conf.get("spark.network.timeout", "120s"))) | ||
| val akkaFrameSize = maxFrameSizeBytes(conf) | ||
| val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false) | ||
|
|
@@ -81,7 +81,7 @@ private[spark] object AkkaUtils extends Logging { | |
|
|
||
| val akkaHeartBeatPauses = conf.getInt("spark.akka.heartbeat.pauses", 6000) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is also a time property in seconds. |
||
| val akkaHeartBeatIntervalS = | ||
| Utils.timeStringToS(conf.get("spark.akka.heartbeat.interval", "1000s")) | ||
| Utils.timeStringAsS(conf.get("spark.akka.heartbeat.interval", "1000s")) | ||
|
|
||
| val secretKey = securityManager.getSecretKey() | ||
| val isAuthOn = securityManager.isAuthenticationEnabled() | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -76,7 +76,7 @@ private[spark] object MetadataCleanerType extends Enumeration { | |
| // initialization of StreamingContext. It's okay for users trying to configure stuff themselves. | ||
| private[spark] object MetadataCleaner { | ||
| def getDelaySeconds(conf: SparkConf): Int = { | ||
| Utils.timeStringToS(conf.get("spark.cleaner.ttl", "-1s")).toInt | ||
| Utils.timeStringAsS(conf.get("spark.cleaner.ttl", "-1s")).toInt | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can just make it -1 as it was before. The function will assume it's a seconds. |
||
| } | ||
|
|
||
| def getDelaySeconds( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -611,7 +611,7 @@ private[spark] object Utils extends Logging { | |
| } | ||
| Utils.setupSecureURLConnection(uc, securityMgr) | ||
|
|
||
| val timeoutMs = Utils.timeStringToMs(conf.get("spark.files.fetchTimeout", "60s")).toInt | ||
| val timeoutMs = Utils.timeStringAsMs(conf.get("spark.files.fetchTimeout", "60s")).toInt | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just a question -- do you support "1m"? I can't decide whether that's better than "60s". Kind of reads like "1 megabyte" or something unless it's "1min". Either way seems OK.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wanted to leave it as 1min for clarity for that reason. |
||
| uc.setConnectTimeout(timeoutMs) | ||
| uc.setReadTimeout(timeoutMs) | ||
| uc.connect() | ||
|
|
@@ -1010,36 +1010,58 @@ private[spark] object Utils extends Logging { | |
| ) | ||
| } | ||
|
|
||
| /** Check whether a time-suffix was provided for the time string. */ | ||
| private def hasTimeSuffix(str: String) : Boolean = { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not used anywhere. |
||
| val lower = str.toLowerCase.trim() | ||
| lower.endsWith("ms") || lower.endsWith("us") || lower.endsWith("s") | ||
| } | ||
|
|
||
| val timeError = "Time must be specified as seconds (s), " + | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: I'd declare this where it's used. |
||
| "milliseconds (ms), or microseconds (us) e.g. 50s, 100ms, or 250us." | ||
|
|
||
| /** | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: add empty line before comment start |
||
| * Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use | ||
| * Convert a passed time string (e.g. 50s, 100ms, or 250us) to a microsecond count for | ||
| * internal use. If no suffix is provided a direct conversion is attempted. | ||
| */ | ||
| def timeStringToUs(str: String): Long = { | ||
| val lower = str.toLowerCase.trim() | ||
| if (lower.endsWith("ms")) { | ||
| lower.substring(0, lower.length-2).toLong * 1000 | ||
| } else if (lower.endsWith("us")) { | ||
| lower.substring(0, lower.length-2).toLong | ||
| } else if (lower.endsWith("s")) { | ||
| lower.substring(0, lower.length-1).toLong * 1000 * 1000 | ||
| } else { // Invalid suffix, force correct formatting | ||
| throw new IllegalArgumentException("Time must be specified as seconds (s), " + | ||
| "milliseconds (ms), or microseconds (us) e.g. 50s, 100ms, or 250us.") | ||
| private def timeStringToUs(str: String) : Long = { | ||
| try { | ||
| val lower = str.toLowerCase.trim() | ||
| if (lower.endsWith("ms")) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd add at least Then you can iterate over the map instead of having cascading if / else checks.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, you could also use You could then use the same map in other conversions if you really want to: |
||
| lower.substring(0, lower.length - 2).toLong * 1000 | ||
| } else if (lower.endsWith("us")) { | ||
| lower.substring(0, lower.length - 2).toLong | ||
| } else if (lower.endsWith("s")) { | ||
| lower.substring(0, lower.length - 1).toLong * 1000 * 1000 | ||
| } else { | ||
| // Invalid suffix, force correct formatting | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not really "invalid suffix", right? This is just handling the case where a suffix is not expected. If there is an invalid suffix, then an exception will be thrown. |
||
| lower.toLong | ||
| } | ||
| } catch { | ||
| case e: NumberFormatException => throw new NumberFormatException(timeError) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should also propagate the original error message. It might be the case that the error has nothing to do with the lack of a suffix (e.g. |
||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Convert a time parameter such as (50s, 100ms, or 250us) to milliseconds for internal use. | ||
| * Note: may round in some cases | ||
| * Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use. If | ||
| * no suffix is provided, the passed number is assumed to be in us. | ||
| */ | ||
| def timeStringAsUs(str: String): Long = { | ||
|
||
| timeStringToUs(str) | ||
|
||
| } | ||
|
|
||
| /** | ||
| * Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use. If | ||
| * no suffix is provided, the passed number is assumed to be in ms. | ||
| */ | ||
| def timeStringToMs(str : String) : Long = { | ||
| timeStringToUs(str)/1000 | ||
| def timeStringAsMs(str : String) : Long = { | ||
|
||
| timeStringToUs(str)/1000 | ||
|
||
| } | ||
|
|
||
| /** | ||
| * Convert a time parameter such as (50s, 100ms, or 250us) to seconds for internal use. | ||
| * Note: may round in some cases | ||
| * Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use. If | ||
| * no suffix is provided, the passed number is assumed to be in seconds. | ||
| */ | ||
| def timeStringToS(str : String) : Long = { | ||
| def timeStringAsS(str : String) : Long = { | ||
| timeStringToUs(str)/1000/1000 | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -104,7 +104,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { | |
| if (processReceivedData) { | ||
| logInfo("Stopping JobGenerator gracefully") | ||
| val timeWhenStopStarted = System.currentTimeMillis() | ||
| val stopTimeout = Utils.timeStringToMs(conf.get( | ||
| val stopTimeout = Utils.timeStringAsMs(conf.get( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could have "MS" on the end? |
||
| "spark.streaming.gracefulStopTimeout", s"${10 * ssc.graph.batchDuration.milliseconds}ms")) | ||
| val pollTime = 100 | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -75,7 +75,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w | |
| val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) | ||
| myConf.set("spark.cleaner.ttl", "10s") | ||
| ssc = new StreamingContext(myConf, batchDuration) | ||
| assert(Utils.timeStringToS(ssc.conf.get("spark.cleaner.ttl", "-1s")) === 10) | ||
| assert(Utils.timeStringAsS(ssc.conf.get("spark.cleaner.ttl", "-1s")) === 10) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same comment about |
||
| } | ||
|
|
||
| test("from existing SparkContext") { | ||
|
|
@@ -87,7 +87,7 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w | |
| val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) | ||
| myConf.set("spark.cleaner.ttl", "10s") | ||
| ssc = new StreamingContext(myConf, batchDuration) | ||
| assert(Utils.timeStringToS(ssc.conf.get("spark.cleaner.ttl", "-1s")) === 10) | ||
| assert(Utils.timeStringAsS(ssc.conf.get("spark.cleaner.ttl", "-1s")) === 10) | ||
| } | ||
|
|
||
| test("from checkpoint") { | ||
|
|
@@ -98,12 +98,12 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w | |
| ssc1.start() | ||
| val cp = new Checkpoint(ssc1, Time(1000)) | ||
| assert( | ||
| Utils.timeStringToS(cp.sparkConfPairs.toMap.getOrElse("spark.cleaner.ttl", "-1s")) === 10) | ||
| Utils.timeStringAsS(cp.sparkConfPairs.toMap.getOrElse("spark.cleaner.ttl", "-1s")) === 10) | ||
| ssc1.stop() | ||
| val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp)) | ||
| assert(Utils.timeStringToS(newCp.createSparkConf().get("spark.cleaner.ttl", "-1s")) === 10) | ||
| assert(Utils.timeStringAsS(newCp.createSparkConf().get("spark.cleaner.ttl", "-1s")) === 10) | ||
| ssc = new StreamingContext(null, newCp, null) | ||
| assert(Utils.timeStringToS(ssc.conf.get("spark.cleaner.ttl", "-1s")) === 10) | ||
| assert(Utils.timeStringAsS(ssc.conf.get("spark.cleaner.ttl", "-1s")) === 10) | ||
| } | ||
|
|
||
| test("start and stop state check") { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very minor, but in general I find it weird for variables to have a capital letter at the end. I would call this either
schedulerBacklogTimeoutSecondsor just leave it as before. From the comments it's pretty clear that all these configs are in seconds. Not a big deal if you don't change this.