Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
59bf9e1
[SPARK-5931] Updated Utils and JavaUtils classes to add helper method…
Mar 27, 2015
404f8c3
Updated usage of spark.core.connection.ack.wait.timeout
Mar 27, 2015
7db6d2a
Updated usage of spark.akka.timeout
Mar 27, 2015
4933fda
Updated usage of spark.storage.blockManagerSlaveTimeout
Mar 27, 2015
c9f5cad
Updated spark.shuffle.io.retryWait
Mar 27, 2015
21ef3dd
updated spark.shuffle.sasl.timeout
Mar 27, 2015
064ebd6
Updated usage of spark.cleaner.ttl
Mar 27, 2015
7320c87
updated spark.akka.heartbeat.interval
Mar 27, 2015
272c215
Updated spark.locality.wait
Mar 27, 2015
3352d34
Updated spark.scheduler.maxRegisteredResourcesWaitingTime
Mar 27, 2015
3f1cfc8
Updated spark.scheduler.revive.interval
Mar 27, 2015
6d1518e
Upated spark.speculation.interval
Mar 27, 2015
2fcc91c
Updated spark.dynamicAllocation.executorIdleTimeout
Mar 27, 2015
5181597
Updated spark.dynamicAllocation.schedulerBacklogTimeout
Mar 27, 2015
c6a0095
Updated spark.core.connection.auth.wait.timeout
Mar 27, 2015
cde9bff
Updated spark.streaming.blockInterval
Mar 27, 2015
42477aa
Updated configuration doc with note on specifying time properties
Mar 27, 2015
9a29d8d
Fixed misuse of time in streaming context test
Mar 27, 2015
34f87c2
Update Utils.scala
ilganeli Mar 28, 2015
8f741e1
Update JavaUtils.java
ilganeli Mar 28, 2015
9e2547c
Reverting doc changes
Mar 30, 2015
499bdf0
Merge branch 'SPARK-5931' of github.com:ilganeli/spark into SPARK-5931
Mar 30, 2015
5232a36
[SPARK-5931] Changed default behavior of time string conversion.
Mar 30, 2015
3a12dd8
Updated host revceiver
Mar 30, 2015
68f4e93
Updated more files to clean up usage of default time strings
Mar 30, 2015
70ac213
Fixed remaining usages to be consistent. Updated Java-side time conve…
Mar 30, 2015
647b5ac
Udpated time conversion to use map iterator instead of if fall through
Mar 31, 2015
1c0c07c
Updated Java code to add day, minutes, and hours
Mar 31, 2015
8613631
Whitespace
Mar 31, 2015
bac9edf
More whitespace
Mar 31, 2015
1858197
Fixed bug where all time was being converted to us instead of the app…
Mar 31, 2015
3b126e1
Fixed conversion to US from seconds
Mar 31, 2015
39164f9
[SPARK-5931] Updated Java conversion to be similar to scala conversio…
Mar 31, 2015
b2fc965
replaced get or default since it's not present in this version of java
Mar 31, 2015
dd0a680
Updated scala code to call into java
Mar 31, 2015
bf779b0
Special handling of overlapping usffixes for java
Apr 1, 2015
76cfa27
[SPARK-5931] Minor nit fixes'
Apr 1, 2015
5193d5f
Resolved merge conflicts
Apr 6, 2015
6387772
Updated suffix handling to handle overlap of units more gracefully
Apr 6, 2015
19c31af
Added cleaner computation of time conversions in tests
Apr 6, 2015
ff40bfe
Updated tests to fix small bugs
Apr 6, 2015
28187bf
Convert straight to seconds
Apr 6, 2015
1465390
Nit
Apr 6, 2015
cbf41db
Got rid of thrown exceptions
Apr 7, 2015
d4efd26
Added time conversion for yarn.scheduler.heartbeat.interval-ms
Apr 8, 2015
4e48679
Fixed priority order and mixed up conversions in a couple spots
Apr 8, 2015
1a1122c
Formatting fixes and added m for use as minute formatter
Apr 8, 2015
cbd2ca6
Formatting error
Apr 8, 2015
6f651a8
Now using regexes to simplify code in parseTimeString. Introduces get…
Apr 8, 2015
7d19cdd
Added fix for possible NPE
Apr 8, 2015
dc7bd08
Fixed error in exception handling
Apr 8, 2015
69fedcc
Added test for zero
Apr 8, 2015
8927e66
Fixed handling of -1
Apr 9, 2015
642a06d
Fixed logic for invalid suffixes and addid matching test
Apr 9, 2015
25d3f52
Minor nit fixes
Apr 11, 2015
bc04e05
Minor fixes and doc updates
Apr 13, 2015
951ca2d
Made the most recent round of changes
Apr 13, 2015
f5fafcd
Doc updates
Apr 13, 2015
de3bff9
Fixing style errors
Apr 13, 2015
4526c81
Update configuration.md
Apr 13, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fixed remaining usages to be consistent. Updated Java-side time conve…
…rsion
  • Loading branch information
Ilya Ganelin committed Mar 30, 2015
commit 70ac2138d8c1c23bc3bbd4fa91607686b2c329cc
9 changes: 5 additions & 4 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,16 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule

// "spark.network.timeout" uses "seconds", while `spark.storage.blockManagerSlaveTimeoutMs` uses
// "milliseconds"
private val networkTimeoutS = sc.conf.get("spark.network.timeout","120s")
private val networkTimeoutS = Utils.timeStringAsS(sc.conf.get("spark.network.timeout","120s"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space after ,

private val executorTimeoutMs = Utils.timeStringAsMs(
sc.conf.get("spark.storage.blockManagerSlaveTimeout", networkTimeoutS))
sc.conf.get("spark.storage.blockManagerSlaveTimeout", s"${networkTimeoutS}s"))

// "spark.network.timeoutInterval" uses "seconds", while
// "spark.storage.blockManagerTimeoutIntervalMs" uses "milliseconds"
private val networkTimeoutIntervalS = sc.conf.get("spark.network.timeoutInterval","60s")
private val networkTimeoutIntervalS =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor, but if this field is not used anywhere else I'd inline the call below.

Utils.timeStringAsS(sc.conf.get("spark.network.timeoutInterval","60s"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space after ,

private val checkTimeoutIntervalMs = Utils.timeStringAsMs(
sc.conf.get("spark.storage.blockManagerTimeoutIntervalMs", networkTimeoutIntervalS))
sc.conf.get("spark.storage.blockManagerTimeoutIntervalMs", s"${networkTimeoutIntervalS}s"))

private var timeoutCheckingTask: Cancellable = null
override def preStart(): Unit = {
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,7 @@ private[spark] object Utils extends Logging {
* Convert a passed time string (e.g. 50s, 100ms, or 250us) to a microsecond count for
* internal use. If no suffix is provided a direct conversion is attempted.
*/
@throws(classOf[NumberFormatException])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a RuntimeException so in general there's no need to explicitly declare it.

private def timeStringToUs(str: String) : Long = {
try {
val lower = str.toLowerCase.trim()
Expand All @@ -1045,6 +1046,7 @@ private[spark] object Utils extends Logging {
* Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use. If
* no suffix is provided, the passed number is assumed to be in us.
*/
@throws(classOf[NumberFormatException])
def timeStringAsUs(str: String): Long = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the microseconds methods used anywhere? I'm not aware of anything that uses this resolution, but might have missed it. There aren't us methods in SparkConf.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No - they can be removed.

timeStringToUs(str)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: too much indentation (here and in other places too)

}
Expand All @@ -1053,6 +1055,7 @@ private[spark] object Utils extends Logging {
* Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use. If
* no suffix is provided, the passed number is assumed to be in ms.
*/
@throws(classOf[NumberFormatException])
def timeStringAsMs(str : String) : Long = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no space before :

timeStringToUs(str)/1000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: spaces between / (here and elsewhere)

}
Expand All @@ -1061,6 +1064,7 @@ private[spark] object Utils extends Logging {
* Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use. If
* no suffix is provided, the passed number is assumed to be in seconds.
*/
@throws(classOf[NumberFormatException])
def timeStringAsS(str : String) : Long = {
timeStringToUs(str)/1000/1000
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,35 +123,49 @@ private static boolean isSymlink(File file) throws IOException {
}

/**
* Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use
* Convert a passed time string (e.g. 50s, 100ms, or 250us) to a microsecond count for
* internal use. If no suffix is provided a direct conversion is attempted.
*/
public static long timeStringToUs(String str) throws IllegalArgumentException {
private static long timeStringToUs(String str) throws NumberFormatException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method can be very similar to the Scala one (using an com.google.common.collect.ImmutableMap to hold the valid suffixes). Maybe that's what you meant by updating the Java code later.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks - didn't know if there was a similar construct.

String lower = str.toLowerCase().trim();
if (lower.endsWith("ms")) {
return Long.parseLong(lower.substring(0, lower.length()-2)) * 1000;
} else if (lower.endsWith("us")) {
return Long.parseLong(lower.substring(0, lower.length()-2));
} else if (lower.endsWith("s")) {
return Long.parseLong(lower.substring(0, lower.length()-1)) * 1000 * 1000;
} else {// Invalid suffix, force correct formatting
throw new IllegalArgumentException("Time must be specified as seconds (s), " +
try {
if (lower.endsWith("ms")) {
return Long.parseLong(lower.substring(0, lower.length() - 2)) * 1000;
} else if (lower.endsWith("us")) {
return Long.parseLong(lower.substring(0, lower.length() - 2));
} else if (lower.endsWith("s")) {
return Long.parseLong(lower.substring(0, lower.length() - 1)) * 1000 * 1000;
} else {// Invalid suffix, force correct formatting
return Long.parseLong(lower);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole method is really dense. Should we do something like the following instead so it's easier to follow:

private static long parseTimeString(String str, TimeUnit unit) {
  String lower = str.toLowerCase().trim();
  long parsedTime = -1;
  TimeUnit parsedSuffix = null;

  // If the given string does not include units, just assume the provided one
  if (lower.matches("\\d+")) {
    return Long.parseLong(lower);
  }

  // Otherwise, parse the number and the units
  for (String suffix : timeSuffixes.keySet()) {
    if (lower.endsWith(suffix)) {
      try {
        // e.g. "50ms" = 50, "3 min" = 3, "3a50s" = error
        parsedTime = Long.parseLong(str.substring(0, str.length() - suffix.length()).trim());
        if (parsedTime >= 0) {
          parsedSuffix = timeSuffixes.get(suffix);
          break;
        }
      } catch(NumberFormatException e) {
        // continue
      }
    }
  }

  // No valid time suffix found
  if (parsedSuffix == null) {
    throw new SomeException(...);
  }

  return unit.convert(parsedTime, parsedSuffix);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh, that was much longer than expected. It would have been a lot more concise in Scala without sacrificing any readability

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using regexes this would be pretty short.

Matcher m = Pattern.compile("([0-9]+)([a-z]+)?");
if (m.matches()) {
   long val = Long.parseLong(m.group(1));
   String suffix = m.group(2);
}

Or something along those lines.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, do that!

} catch(NumberFormatException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space before (

throw new NumberFormatException("Time must be specified as seconds (s), " +
"milliseconds (ms), or microseconds (us) e.g. 50s, 100ms, or 250us.");
}

}

/**
* Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use. If
* no suffix is provided, the passed number is assumed to be in us.
*/
public static long timeStringAsUs(String str) throws NumberFormatException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NumberFormatException is unchecked. You can put it in the javadoc but I think (?) common practice is not to declare unchecked exceptions?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just wanted to add the additional error message clarifying appropriate suffixes which is why I handle the error.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fine but I mean the method need not be declared as throws NumberFormatException

return timeStringToUs(str);
}

/**
* Convert a time parameter such as (50s, 100ms, or 250us) to milliseconds for internal use.
* Note: may round in some cases
* Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use. If
* no suffix is provided, the passed number is assumed to be in ms.
*/
public static long timeStringToMs(String str) throws IllegalArgumentException {
public static long timeStringAsMs(String str) throws NumberFormatException {
return timeStringToUs(str)/1000;
}

/**
* Convert a time parameter such as (50s, 100ms, or 250us) to seconds for internal use.
* Note: may round in some cases
* Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use. If
* no suffix is provided, the passed number is assumed to be in seconds.
*/
public static long timeStringToS(String str) throws IllegalArgumentException {
public static long timeStringAsS(String str) throws NumberFormatException {
return timeStringToUs(str)/1000/1000;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ public boolean preferDirectBufs() {

/** Connect timeout in milliseconds. Default 120 secs. */
public int connectionTimeoutMs() {
long defaultTimeout = JavaUtils.timeStringToMs(
conf.get("spark.shuffle.io.connectionTimeout",
conf.get("spark.network.timeout", "120s")));
return (int) defaultTimeout;
long defaultNetworkTimeoutS = JavaUtils.timeStringAsS(
conf.get("spark.network.timeout","120s"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space after ,

long defaultTimeoutS = JavaUtils.timeStringAsS(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be parsed straight to milliseconds then?

conf.get("spark.shuffle.io.connectionTimeout", defaultNetworkTimeoutS + "s"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be 2 indents only on this line

return (int) defaultTimeoutS * 1000;
}

/** Number of concurrent connections between two nodes for fetching data. */
Expand Down Expand Up @@ -71,7 +72,7 @@ public int numConnectionsPerPeer() {

/** Timeout for a single round trip of SASL token exchange, in milliseconds. */
public int saslRTTimeoutMs() {
return (int) JavaUtils.timeStringToMs(conf.get("spark.shuffle.sasl.timeout", "30s"));
return (int) JavaUtils.timeStringAsS(conf.get("spark.shuffle.sasl.timeout", "30s")) * 1000;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here and in the next change.

}

/**
Expand All @@ -85,7 +86,7 @@ public int saslRTTimeoutMs() {
* Only relevant if maxIORetries > 0.
*/
public int ioRetryWaitTimeMs() {
return (int) JavaUtils.timeStringToMs(conf.get("spark.shuffle.io.retryWait", "5s"));
return (int) JavaUtils.timeStringAsS(conf.get("spark.shuffle.io.retryWait", "5s")) * 1000;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.util.RecurringTimer
import org.apache.spark.util.{Utils, SystemClock}
import org.apache.spark.util.{SystemClock, Utils}

/** Listener object for BlockGenerator events */
private[streaming] trait BlockGeneratorListener {
Expand Down Expand Up @@ -80,7 +80,7 @@ private[streaming] class BlockGenerator(

private val clock = new SystemClock()
private val blockIntervalMs =
Utils.timeStringAsMs(conf.get("spark.streaming.blockIntervalMs", "200ms"))
Utils.timeStringAsMs(conf.get("spark.streaming.blockInterval", "200ms"))
private val blockIntervalTimer =
new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import akka.actor.{ActorRef, Props, Actor}
import org.apache.spark.{SparkEnv, Logging}
import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time}
import org.apache.spark.streaming.util.RecurringTimer
import org.apache.spark.util.{Utils, Clock, ManualClock}
import org.apache.spark.util.{Clock, ManualClock, Utils}

/** Event classes for JobGenerator */
private[scheduler] sealed trait JobGeneratorEvent
Expand Down