From 3e1cc2979347ba287b6bdbc4dcd811ec0329c1f6 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 10 Feb 2015 00:03:31 +0000 Subject: [PATCH 01/24] rename: executorMemory -> executorMemoryMB --- .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 6 +++--- .../org/apache/spark/deploy/yarn/ClientArguments.scala | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 46d9df93488c..7f5756527684 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -157,9 +157,9 @@ private[spark] class Client( val maxMem = newAppResponse.getMaximumResourceCapability().getMemory() logInfo("Verifying our application has not requested more than the maximum " + s"memory capability of the cluster ($maxMem MB per container)") - val executorMem = args.executorMemory + executorMemoryOverhead + val executorMem = args.executorMemoryMB + executorMemoryOverhead if (executorMem > maxMem) { - throw new IllegalArgumentException(s"Required executor memory (${args.executorMemory}" + + throw new IllegalArgumentException(s"Required executor memory (${args.executorMemoryMB}" + s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!") } val amMem = args.amMemory + amMemoryOverhead @@ -503,7 +503,7 @@ private[spark] class Client( val amArgs = Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ pyFiles ++ userArgs ++ Seq( - "--executor-memory", args.executorMemory.toString + "m", + "--executor-memory", args.executorMemoryMB.toString + "m", "--executor-cores", args.executorCores.toString, "--num-executors ", args.numExecutors.toString) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 3bc7eb1abf34..f83bb9e13d5b 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -33,7 +33,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) var pyFiles: String = null var primaryPyFile: String = null var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]() - var executorMemory = 1024 // MB + var executorMemoryMB = 1024 var executorCores = 1 var numExecutors = DEFAULT_NUMBER_EXECUTORS var amQueue = sparkConf.get("spark.yarn.queue", "default") @@ -63,7 +63,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toInt, MEMORY_OVERHEAD_MIN)) val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead", - math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)) + math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMB).toInt, MEMORY_OVERHEAD_MIN)) /** Load any default arguments provided through environment variables and Spark properties. */ private def loadEnvironmentArgs(): Unit = { @@ -188,7 +188,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) if (args(0) == "--worker-memory") { println("--worker-memory is deprecated. Use --executor-memory instead.") } - executorMemory = value + executorMemoryMB = value args = tail case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: tail => From cba802c8b2715113b47ddc0d9c4dc06dfb2b2ac6 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 10 Feb 2015 00:04:15 +0000 Subject: [PATCH 02/24] rename: amMemory -> amMemoryMB --- .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 8 ++++---- .../org/apache/spark/deploy/yarn/ClientArguments.scala | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 7f5756527684..2da2ebe98bce 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -126,7 +126,7 @@ private[spark] class Client( "Cluster's default value will be used.") } val capability = Records.newRecord(classOf[Resource]) - capability.setMemory(args.amMemory + amMemoryOverhead) + capability.setMemory(args.amMemoryMB + amMemoryOverhead) capability.setVirtualCores(args.amCores) appContext.setResource(capability) appContext @@ -162,9 +162,9 @@ private[spark] class Client( throw new IllegalArgumentException(s"Required executor memory (${args.executorMemoryMB}" + s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!") } - val amMem = args.amMemory + amMemoryOverhead + val amMem = args.amMemoryMB + amMemoryOverhead if (amMem > maxMem) { - throw new IllegalArgumentException(s"Required AM memory (${args.amMemory}" + + throw new IllegalArgumentException(s"Required AM memory (${args.amMemoryMB}" + s"+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!") } logInfo("Will allocate AM container, with %d MB memory including %d MB overhead".format( @@ -394,7 +394,7 @@ private[spark] class Client( var prefixEnv: Option[String] = None // Add Xmx for AM memory - javaOpts += "-Xmx" + args.amMemory + "m" + javaOpts += "-Xmx" + args.amMemoryMB + "m" val tmpDir = new Path( YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index f83bb9e13d5b..289b11d4075d 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -37,7 +37,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) var executorCores = 1 var numExecutors = DEFAULT_NUMBER_EXECUTORS var amQueue = sparkConf.get("spark.yarn.queue", "default") - var amMemory: Int = 512 // MB + var amMemoryMB: Int = 512 var amCores: Int = 1 var appName: String = "Spark" var priority = 0 @@ -60,7 +60,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) // Additional memory to allocate to containers val amMemoryOverheadConf = if (isClusterMode) driverMemOverheadKey else amMemOverheadKey val amMemoryOverhead = sparkConf.getInt(amMemoryOverheadConf, - math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toInt, MEMORY_OVERHEAD_MIN)) + math.max((MEMORY_OVERHEAD_FACTOR * amMemoryMB).toInt, MEMORY_OVERHEAD_MIN)) val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead", math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMB).toInt, MEMORY_OVERHEAD_MIN)) @@ -116,7 +116,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) println(s"$key is set but does not apply in cluster mode.") } } - amMemory = driverMemory + amMemoryMB = driverMemory amCores = driverCores } else { for (key <- Seq(driverMemOverheadKey, driverCoresKey)) { @@ -126,7 +126,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) } sparkConf.getOption(amMemKey) .map(Utils.memoryStringToMb) - .foreach { mem => amMemory = mem } + .foreach { mem => amMemoryMB = mem } sparkConf.getOption(amCoresKey) .map(_.toInt) .foreach { cores => amCores = cores } From 0a717d7d20d7339fe20dae0f85019b140597ece9 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 10 Feb 2015 00:04:40 +0000 Subject: [PATCH 03/24] rename: driverMemory -> driverMemoryMB --- .../org/apache/spark/deploy/yarn/ClientArguments.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 289b11d4075d..17e4ef86c670 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -43,7 +43,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) var priority = 0 def isClusterMode: Boolean = userClass != null - private var driverMemory: Int = 512 // MB + private var driverMemoryMB: Int = 512 private var driverCores: Int = 1 private val driverMemOverheadKey = "spark.yarn.driver.memoryOverhead" private val amMemKey = "spark.yarn.am.memory" @@ -116,7 +116,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) println(s"$key is set but does not apply in cluster mode.") } } - amMemoryMB = driverMemory + amMemoryMB = driverMemoryMB amCores = driverCores } else { for (key <- Seq(driverMemOverheadKey, driverCoresKey)) { @@ -165,7 +165,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) if (args(0) == "--master-memory") { println("--master-memory is deprecated. Use --driver-memory instead.") } - driverMemory = value + driverMemoryMB = value args = tail case ("--driver-cores") :: IntParam(value) :: tail => From 663594529ad8de3df89b734866b67d3497aeb048 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 10 Feb 2015 00:05:31 +0000 Subject: [PATCH 04/24] rename: DEFAULT_MEMORY -> DEFAULT_MEMORY_MB --- .../scala/org/apache/spark/deploy/ClientArguments.scala | 6 +++--- .../org/apache/spark/deploy/rest/StandaloneRestServer.scala | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala index 415bd5059169..d959b7290922 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala @@ -39,7 +39,7 @@ private[spark] class ClientArguments(args: Array[String]) { var jarUrl: String = "" var mainClass: String = "" var supervise: Boolean = DEFAULT_SUPERVISE - var memory: Int = DEFAULT_MEMORY + var memory: Int = DEFAULT_MEMORY_MB var cores: Int = DEFAULT_CORES private var _driverOptions = ListBuffer[String]() def driverOptions = _driverOptions.toSeq @@ -106,7 +106,7 @@ private[spark] class ClientArguments(args: Array[String]) { | |Options: | -c CORES, --cores CORES Number of cores to request (default: $DEFAULT_CORES) - | -m MEMORY, --memory MEMORY Megabytes of memory to request (default: $DEFAULT_MEMORY) + | -m MEMORY, --memory MEMORY Megabytes of memory to request (default: $DEFAULT_MEMORY_MB) | -s, --supervise Whether to restart the driver on failure | (default: $DEFAULT_SUPERVISE) | -v, --verbose Print more debugging output @@ -118,7 +118,7 @@ private[spark] class ClientArguments(args: Array[String]) { object ClientArguments { private[spark] val DEFAULT_CORES = 1 - private[spark] val DEFAULT_MEMORY = 512 // MB + private[spark] val DEFAULT_MEMORY_MB = 512 private[spark] val DEFAULT_SUPERVISE = false def isValidJarUrl(s: String): Boolean = { diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala index 6e4486e20fcb..793d1f114a64 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala @@ -394,7 +394,7 @@ private class SubmitRequestServlet( "org.apache.spark.deploy.worker.DriverWrapper", Seq("{{WORKER_URL}}", "{{USER_JAR}}", mainClass) ++ appArgs, // args to the DriverWrapper environmentVariables, extraClassPath, extraLibraryPath, javaOpts) - val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY) + val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY_MB) val actualDriverCores = driverCores.map(_.toInt).getOrElse(DEFAULT_CORES) val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE) new DriverDescription( From 96dfec088e3151126dcefeedb376fa02391bcf3c Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 10 Feb 2015 00:06:15 +0000 Subject: [PATCH 05/24] rename: amMemoryOverhead -> amMemoryOverheadMB --- .../scala/org/apache/spark/deploy/yarn/Client.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 2da2ebe98bce..63508d8f4a9e 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -62,7 +62,7 @@ private[spark] class Client( private val yarnClient = YarnClient.createYarnClient private val yarnConf = new YarnConfiguration(hadoopConf) private val credentials = UserGroupInformation.getCurrentUser.getCredentials - private val amMemoryOverhead = args.amMemoryOverhead // MB + private val amMemoryOverheadMB = args.amMemoryOverhead private val executorMemoryOverhead = args.executorMemoryOverhead // MB private val distCacheMgr = new ClientDistributedCacheManager() private val isClusterMode = args.isClusterMode @@ -126,7 +126,7 @@ private[spark] class Client( "Cluster's default value will be used.") } val capability = Records.newRecord(classOf[Resource]) - capability.setMemory(args.amMemoryMB + amMemoryOverhead) + capability.setMemory(args.amMemoryMB + amMemoryOverheadMB) capability.setVirtualCores(args.amCores) appContext.setResource(capability) appContext @@ -162,14 +162,14 @@ private[spark] class Client( throw new IllegalArgumentException(s"Required executor memory (${args.executorMemoryMB}" + s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!") } - val amMem = args.amMemoryMB + amMemoryOverhead + val amMem = args.amMemoryMB + amMemoryOverheadMB if (amMem > maxMem) { throw new IllegalArgumentException(s"Required AM memory (${args.amMemoryMB}" + - s"+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!") + s"+$amMemoryOverheadMB MB) is above the max threshold ($maxMem MB) of this cluster!") } logInfo("Will allocate AM container, with %d MB memory including %d MB overhead".format( amMem, - amMemoryOverhead)) + amMemoryOverheadMB)) // We could add checks to make sure the entire cluster has enough resources but that involves // getting all the node reports and computing ourselves. From 5a3e4b80eb0d64f8e48f00e791dbe6539b14d604 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 10 Feb 2015 00:06:47 +0000 Subject: [PATCH 06/24] rename: executorMemoryOverhead -> executorMemoryOverheadMB --- .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 63508d8f4a9e..2d7c806a10f9 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -63,7 +63,7 @@ private[spark] class Client( private val yarnConf = new YarnConfiguration(hadoopConf) private val credentials = UserGroupInformation.getCurrentUser.getCredentials private val amMemoryOverheadMB = args.amMemoryOverhead - private val executorMemoryOverhead = args.executorMemoryOverhead // MB + private val executorMemoryOverheadMB = args.executorMemoryOverhead private val distCacheMgr = new ClientDistributedCacheManager() private val isClusterMode = args.isClusterMode @@ -157,10 +157,10 @@ private[spark] class Client( val maxMem = newAppResponse.getMaximumResourceCapability().getMemory() logInfo("Verifying our application has not requested more than the maximum " + s"memory capability of the cluster ($maxMem MB per container)") - val executorMem = args.executorMemoryMB + executorMemoryOverhead + val executorMem = args.executorMemoryMB + executorMemoryOverheadMB if (executorMem > maxMem) { throw new IllegalArgumentException(s"Required executor memory (${args.executorMemoryMB}" + - s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!") + s"+$executorMemoryOverheadMB MB) is above the max threshold ($maxMem MB) of this cluster!") } val amMem = args.amMemoryMB + amMemoryOverheadMB if (amMem > maxMem) { From 63086eb5b72287f00d01dee8a44d71b06688d3c9 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 10 Feb 2015 00:07:18 +0000 Subject: [PATCH 07/24] rename: amMemoryOverhead -> amMemoryOverheadMB --- yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 +- .../scala/org/apache/spark/deploy/yarn/ClientArguments.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 2d7c806a10f9..f2daf86d0c58 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -62,7 +62,7 @@ private[spark] class Client( private val yarnClient = YarnClient.createYarnClient private val yarnConf = new YarnConfiguration(hadoopConf) private val credentials = UserGroupInformation.getCurrentUser.getCredentials - private val amMemoryOverheadMB = args.amMemoryOverhead + private val amMemoryOverheadMB = args.amMemoryOverheadMB private val executorMemoryOverheadMB = args.executorMemoryOverhead private val distCacheMgr = new ClientDistributedCacheManager() private val isClusterMode = args.isClusterMode diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 17e4ef86c670..c167cf1dd346 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -59,7 +59,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) // Additional memory to allocate to containers val amMemoryOverheadConf = if (isClusterMode) driverMemOverheadKey else amMemOverheadKey - val amMemoryOverhead = sparkConf.getInt(amMemoryOverheadConf, + val amMemoryOverheadMB = sparkConf.getInt(amMemoryOverheadConf, math.max((MEMORY_OVERHEAD_FACTOR * amMemoryMB).toInt, MEMORY_OVERHEAD_MIN)) val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead", From f54b0ce9db111edaf4b1c7ec4410f6c51259eb03 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 10 Feb 2015 00:07:37 +0000 Subject: [PATCH 08/24] rename: executorMemoryOverhead -> executorMemoryOverheadMB --- yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 +- .../scala/org/apache/spark/deploy/yarn/ClientArguments.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index f2daf86d0c58..93ea06c22d77 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -63,7 +63,7 @@ private[spark] class Client( private val yarnConf = new YarnConfiguration(hadoopConf) private val credentials = UserGroupInformation.getCurrentUser.getCredentials private val amMemoryOverheadMB = args.amMemoryOverheadMB - private val executorMemoryOverheadMB = args.executorMemoryOverhead + private val executorMemoryOverheadMB = args.executorMemoryOverheadMB private val distCacheMgr = new ClientDistributedCacheManager() private val isClusterMode = args.isClusterMode diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index c167cf1dd346..4f9ad2abfba2 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -62,7 +62,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) val amMemoryOverheadMB = sparkConf.getInt(amMemoryOverheadConf, math.max((MEMORY_OVERHEAD_FACTOR * amMemoryMB).toInt, MEMORY_OVERHEAD_MIN)) - val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead", + val executorMemoryOverheadMB = sparkConf.getInt("spark.yarn.executor.memoryOverhead", math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMB).toInt, MEMORY_OVERHEAD_MIN)) /** Load any default arguments provided through environment variables and Spark properties. */ From fa3d69f430a4bc905aaaf89976b728785ae82c70 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 10 Feb 2015 00:08:00 +0000 Subject: [PATCH 09/24] rename: MEMORY_OVERHEAD_MIN -> MEMORY_OVERHEAD_MIN_MB --- .../scala/org/apache/spark/deploy/yarn/ClientArguments.scala | 4 ++-- .../scala/org/apache/spark/deploy/yarn/YarnAllocator.scala | 2 +- .../org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 4f9ad2abfba2..7d0c66cb04d6 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -60,10 +60,10 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) // Additional memory to allocate to containers val amMemoryOverheadConf = if (isClusterMode) driverMemOverheadKey else amMemOverheadKey val amMemoryOverheadMB = sparkConf.getInt(amMemoryOverheadConf, - math.max((MEMORY_OVERHEAD_FACTOR * amMemoryMB).toInt, MEMORY_OVERHEAD_MIN)) + math.max((MEMORY_OVERHEAD_FACTOR * amMemoryMB).toInt, MEMORY_OVERHEAD_MIN_MB)) val executorMemoryOverheadMB = sparkConf.getInt("spark.yarn.executor.memoryOverhead", - math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMB).toInt, MEMORY_OVERHEAD_MIN)) + math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMB).toInt, MEMORY_OVERHEAD_MIN_MB)) /** Load any default arguments provided through environment variables and Spark properties. */ private def loadEnvironmentArgs(): Unit = { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 12c62a659d79..a8c162fff495 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -92,7 +92,7 @@ private[yarn] class YarnAllocator( protected val executorMemory = args.executorMemory // Additional memory overhead. protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead", - math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)) + math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN_MB)) // Number of cores per executor. protected val executorCores = args.executorCores // Resource capability requested for each executors diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 146b2c0f1a30..617c5ee79258 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -90,7 +90,7 @@ object YarnSparkHadoopUtil { // the common cases. Memory overhead tends to grow with container size. val MEMORY_OVERHEAD_FACTOR = 0.07 - val MEMORY_OVERHEAD_MIN = 384 + val MEMORY_OVERHEAD_MIN_MB = 384 val ANY_HOST = "*" From f265d15ed0a11e8249eaed9034bc8530d4437a82 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Mon, 9 Feb 2015 22:42:12 +0000 Subject: [PATCH 10/24] fix deprecation warning --- .../scala/org/apache/spark/scheduler/local/LocalBackend.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 4676b828d3d8..d95426d918e1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -20,6 +20,7 @@ package org.apache.spark.scheduler.local import java.nio.ByteBuffer import scala.concurrent.duration._ +import scala.language.postfixOps import akka.actor.{Actor, ActorRef, Props} From c29da1d0e2602a374867f418bd953da488a2f1f0 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 10 Feb 2015 00:14:38 +0000 Subject: [PATCH 11/24] rename: executorMemory -> executorMemoryMB --- .../apache/spark/deploy/yarn/ApplicationMasterArguments.scala | 4 ++-- .../scala/org/apache/spark/deploy/yarn/YarnAllocator.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala index e1a992af3aae..03d04e5adb38 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala @@ -27,7 +27,7 @@ class ApplicationMasterArguments(val args: Array[String]) { var primaryPyFile: String = null var pyFiles: String = null var userArgs: Seq[String] = Seq[String]() - var executorMemory = 1024 + var executorMemoryMB = 1024 var executorCores = 1 var numExecutors = DEFAULT_NUMBER_EXECUTORS @@ -67,7 +67,7 @@ class ApplicationMasterArguments(val args: Array[String]) { args = tail case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail => - executorMemory = value + executorMemoryMB = value args = tail case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: tail => diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index a8c162fff495..015ed51f4ee8 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -89,7 +89,7 @@ private[yarn] class YarnAllocator( private val executorIdToContainer = new HashMap[String, Container] // Executor memory in MB. - protected val executorMemory = args.executorMemory + protected val executorMemory = args.executorMemoryMB // Additional memory overhead. protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead", math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN_MB)) From 23a77be4a496df07600917cca42a69dc283a7b86 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 10 Feb 2015 00:16:51 +0000 Subject: [PATCH 12/24] rename: executorMemory -> executorMemoryMB --- .../org/apache/spark/deploy/yarn/YarnAllocator.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 015ed51f4ee8..9fbfd89483e0 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -89,14 +89,14 @@ private[yarn] class YarnAllocator( private val executorIdToContainer = new HashMap[String, Container] // Executor memory in MB. - protected val executorMemory = args.executorMemoryMB + protected val executorMemoryMB = args.executorMemoryMB // Additional memory overhead. protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead", - math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN_MB)) + math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMB).toInt, MEMORY_OVERHEAD_MIN_MB)) // Number of cores per executor. protected val executorCores = args.executorCores // Resource capability requested for each executors - private val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores) + private val resource = Resource.newInstance(executorMemoryMB + memoryOverhead, executorCores) private val launcherPool = new ThreadPoolExecutor( // max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue @@ -333,7 +333,7 @@ private[yarn] class YarnAllocator( driverUrl, executorId, executorHostname, - executorMemory, + executorMemoryMB, executorCores, appAttemptId.getApplicationId.toString, securityMgr) From 5057bd30b8ca88206199c15a25331144eccb93d3 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 10 Feb 2015 00:17:12 +0000 Subject: [PATCH 13/24] rename: memoryOverhead -> memoryOverheadMB --- .../scala/org/apache/spark/deploy/yarn/YarnAllocator.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 9fbfd89483e0..7dd4c330468a 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -91,12 +91,12 @@ private[yarn] class YarnAllocator( // Executor memory in MB. protected val executorMemoryMB = args.executorMemoryMB // Additional memory overhead. - protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead", + protected val memoryOverheadMB: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead", math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMB).toInt, MEMORY_OVERHEAD_MIN_MB)) // Number of cores per executor. protected val executorCores = args.executorCores // Resource capability requested for each executors - private val resource = Resource.newInstance(executorMemoryMB + memoryOverhead, executorCores) + private val resource = Resource.newInstance(executorMemoryMB + memoryOverheadMB, executorCores) private val launcherPool = new ThreadPoolExecutor( // max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue @@ -206,7 +206,7 @@ private[yarn] class YarnAllocator( if (missing > 0) { logInfo(s"Will request $missing executor containers, each with ${resource.getVirtualCores} " + - s"cores and ${resource.getMemory} MB memory including $memoryOverhead MB overhead") + s"cores and ${resource.getMemory} MB memory including $memoryOverheadMB MB overhead") for (i <- 0 until missing) { val request = new ContainerRequest(resource, null, null, RM_REQUEST_PRIORITY) From 14bd3d5c25c6356831f922263ae40baaef669c43 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 10 Feb 2015 00:20:58 +0000 Subject: [PATCH 14/24] rename: memory -> memoryMB --- core/src/main/scala/org/apache/spark/deploy/Client.scala | 2 +- .../main/scala/org/apache/spark/deploy/ClientArguments.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 237d26fc6bd0..11d05d9dc2d8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -74,7 +74,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) val driverDescription = new DriverDescription( driverArgs.jarUrl, - driverArgs.memory, + driverArgs.memoryMB, driverArgs.cores, driverArgs.supervise, command) diff --git a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala index d959b7290922..56f80f8c0002 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala @@ -39,7 +39,7 @@ private[spark] class ClientArguments(args: Array[String]) { var jarUrl: String = "" var mainClass: String = "" var supervise: Boolean = DEFAULT_SUPERVISE - var memory: Int = DEFAULT_MEMORY_MB + var memoryMB: Int = DEFAULT_MEMORY_MB var cores: Int = DEFAULT_CORES private var _driverOptions = ListBuffer[String]() def driverOptions = _driverOptions.toSeq @@ -55,7 +55,7 @@ private[spark] class ClientArguments(args: Array[String]) { parse(tail) case ("--memory" | "-m") :: MemoryParam(value) :: tail => - memory = value + memoryMB = value parse(tail) case ("--supervise" | "-s") :: tail => From 6e69b08444d6309bf65a8f554641a0800929e128 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 10 Feb 2015 00:23:23 +0000 Subject: [PATCH 15/24] rename: mem -> memMB --- .../scala/org/apache/spark/deploy/DriverDescription.scala | 6 +++--- .../main/scala/org/apache/spark/deploy/JsonProtocol.scala | 2 +- .../main/scala/org/apache/spark/deploy/master/Master.scala | 2 +- .../scala/org/apache/spark/deploy/master/WorkerInfo.scala | 4 ++-- .../org/apache/spark/deploy/master/ui/MasterPage.scala | 4 ++-- .../scala/org/apache/spark/deploy/worker/DriverRunner.scala | 2 +- .../main/scala/org/apache/spark/deploy/worker/Worker.scala | 4 ++-- .../org/apache/spark/deploy/worker/ui/WorkerPage.scala | 4 ++-- 8 files changed, 14 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala index b056a19ce659..86a415e416a1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy private[spark] class DriverDescription( val jarUrl: String, - val mem: Int, + val memMB: Int, val cores: Int, val supervise: Boolean, val command: Command) @@ -27,11 +27,11 @@ private[spark] class DriverDescription( def copy( jarUrl: String = jarUrl, - mem: Int = mem, + memMB: Int = memMB, cores: Int = cores, supervise: Boolean = supervise, command: Command = command): DriverDescription = - new DriverDescription(jarUrl, mem, cores, supervise, command) + new DriverDescription(jarUrl, memMB, cores, supervise, command) override def toString: String = s"DriverDescription (${command.mainClass})" } diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala index 696f32a6f573..edb828c92920 100644 --- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala @@ -71,7 +71,7 @@ private[spark] object JsonProtocol { ("starttime" -> obj.startTime.toString) ~ ("state" -> obj.state.toString) ~ ("cores" -> obj.desc.cores) ~ - ("memory" -> obj.desc.mem) + ("memory" -> obj.desc.memMB) } def writeMasterState(obj: MasterStateResponse) = { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 53e453990f8c..868e386232e5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -543,7 +543,7 @@ private[spark] class Master( while (numWorkersVisited < numWorkersAlive && !launched) { val worker = shuffledAliveWorkers(curPos) numWorkersVisited += 1 - if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { + if (worker.memoryFree >= driver.desc.memMB && worker.coresFree >= driver.desc.cores) { launchDriver(worker, driver) waitingDrivers -= driver launched = true diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala index e94aae93e449..3947e63bc47e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala @@ -90,13 +90,13 @@ private[spark] class WorkerInfo( def addDriver(driver: DriverInfo) { drivers(driver.id) = driver - memoryUsed += driver.desc.mem + memoryUsed += driver.desc.memMB coresUsed += driver.desc.cores } def removeDriver(driver: DriverInfo) { drivers -= driver.id - memoryUsed -= driver.desc.mem + memoryUsed -= driver.desc.memMB coresUsed -= driver.desc.cores } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala index fd514f07664a..b023e0449757 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala @@ -193,8 +193,8 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { {driver.desc.cores} - - {Utils.megabytesToString(driver.desc.mem.toLong)} + + {Utils.megabytesToString(driver.desc.memMB.toLong)} {driver.desc.command.arguments(2)} diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index b964a09bdb21..e6e699ebf0dc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -81,7 +81,7 @@ private[spark] class DriverRunner( } // TODO: If we add ability to submit multiple jars they should also be added here - val builder = CommandUtils.buildProcessBuilder(driverDesc.command, driverDesc.mem, + val builder = CommandUtils.buildProcessBuilder(driverDesc.command, driverDesc.memMB, sparkHome.getAbsolutePath, substituteVariables) launchDriver(builder, driverDir, driverDesc.supervise) } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 10929eb51604..04e479e9f360 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -435,7 +435,7 @@ private[spark] class Worker( driver.start() coresUsed += driverDesc.cores - memoryUsed += driverDesc.mem + memoryUsed += driverDesc.memMB } case KillDriver(driverId) => { @@ -464,7 +464,7 @@ private[spark] class Worker( master ! DriverStateChanged(driverId, state, exception) val driver = drivers.remove(driverId).get finishedDrivers(driverId) = driver - memoryUsed -= driver.driverDesc.mem + memoryUsed -= driver.driverDesc.memMB coresUsed -= driver.driverDesc.cores } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala index 327b90503280..6a288cac89e6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala @@ -139,8 +139,8 @@ private[spark] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") { {driver.driverDesc.cores.toString} - - {Utils.megabytesToString(driver.driverDesc.mem)} + + {Utils.megabytesToString(driver.driverDesc.memMB)} stdout From 48c5115cfc2732482044a128b3d748de97fa1c17 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Wed, 12 Nov 2014 21:52:50 -0500 Subject: [PATCH 16/24] memoryStringToMb can have default scale specified Previously it assumed a unitless number represented raw bytes, but I want to use it for a config variable that previously defaulted to # of megabytes and not break backwards-compatibility. --- .../scala/org/apache/spark/util/Utils.scala | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 61d287ca9c3a..31a20ffeb949 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -983,18 +983,28 @@ private[spark] object Utils extends Logging { /** * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes. */ - def memoryStringToMb(str: String): Int = { + def memoryStringToMb(str: String): Int = memoryStringToMb(str, 'b') + def memoryStringToMb(str: String, defaultScale: Char): Int = { val lower = str.toLowerCase - if (lower.endsWith("k")) { + val lastChar = lower(lower.length - 1) + val scale = + if (lastChar.isDigit) + defaultScale + else + lastChar + + if (scale == 'k') { (lower.substring(0, lower.length-1).toLong / 1024).toInt - } else if (lower.endsWith("m")) { + } else if (scale == 'm') { lower.substring(0, lower.length-1).toInt - } else if (lower.endsWith("g")) { + } else if (scale == 'g') { lower.substring(0, lower.length-1).toInt * 1024 - } else if (lower.endsWith("t")) { + } else if (scale == 't') { lower.substring(0, lower.length-1).toInt * 1024 * 1024 - } else {// no suffix, so it's just a number in bytes + } else if (scale == 'b') {// no suffix, so it's just a number in bytes (lower.toLong / 1024 / 1024).toInt + } else { + throw new IllegalArgumentException("Invalid memory string: %s".format(str)) } } From dc03bf2ffcc94b21252c367c59cbf2126e283ba3 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 10 Feb 2015 00:42:37 +0000 Subject: [PATCH 17/24] move getMaxResultSize from Utils to SparkConf --- core/src/main/scala/org/apache/spark/SparkConf.scala | 5 +++++ core/src/main/scala/org/apache/spark/executor/Executor.scala | 2 +- .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 2 +- core/src/main/scala/org/apache/spark/util/Utils.scala | 5 ----- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 0dbd26146cb1..14300dfa260e 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -204,6 +204,11 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { getOption(key).map(_.toBoolean).getOrElse(defaultValue) } + // Limit of bytes for total size of results (default is 1GB) + def getMaxResultSize: Long = { + Utils.memoryStringToMb(get("spark.driver.maxResultSize", "1g")).toLong << 20 + } + /** Get all executor environment variables set on this SparkConf */ def getExecutorEnv: Seq[(String, String)] = { val prefix = "spark.executorEnv." diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 6b22dcd6f5cb..a8a08e8fd586 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -105,7 +105,7 @@ private[spark] class Executor( private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) // Limit of bytes for total size of results (default is 1GB) - private val maxResultSize = Utils.getMaxResultSize(conf) + private val maxResultSize = conf.getMaxResultSize // Maintains the list of running tasks. private val runningTasks = new ConcurrentHashMap[Long, TaskRunner] diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 55024ecd55e6..6bb1fff6ec4a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -70,7 +70,7 @@ private[spark] class TaskSetManager( val SPECULATION_MULTIPLIER = conf.getDouble("spark.speculation.multiplier", 1.5) // Limit of bytes for total size of results (default is 1GB) - val maxResultSize = Utils.getMaxResultSize(conf) + val maxResultSize = conf.getMaxResultSize // Serializer for closures and tasks. val env = SparkEnv.get diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 31a20ffeb949..5a86aab5a046 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1915,11 +1915,6 @@ private[spark] object Utils extends Logging { method.invoke(obj, values.toSeq: _*) } - // Limit of bytes for total size of results (default is 1GB) - def getMaxResultSize(conf: SparkConf): Long = { - memoryStringToMb(conf.get("spark.driver.maxResultSize", "1g")).toLong << 20 - } - /** * Return the current system LD_LIBRARY_PATH name */ From 40ac6ce748414ddc3e4929fc97fdde53738f4f35 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 10 Feb 2015 00:44:51 +0000 Subject: [PATCH 18/24] privatize amMemoryOverheadConf --- .../scala/org/apache/spark/deploy/yarn/ClientArguments.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 7d0c66cb04d6..1e4cf602e9c2 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -58,7 +58,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) validateArgs() // Additional memory to allocate to containers - val amMemoryOverheadConf = if (isClusterMode) driverMemOverheadKey else amMemOverheadKey + private val amMemoryOverheadConf = if (isClusterMode) driverMemOverheadKey else amMemOverheadKey val amMemoryOverheadMB = sparkConf.getInt(amMemoryOverheadConf, math.max((MEMORY_OVERHEAD_FACTOR * amMemoryMB).toInt, MEMORY_OVERHEAD_MIN_MB)) From dd9be85a0b94f27d90fe73225489e8fd0317e528 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 10 Feb 2015 00:52:26 +0000 Subject: [PATCH 19/24] refactor memory-size order-of-magnitude constants --- .../main/scala/org/apache/spark/util/Utils.scala | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 5a86aab5a046..7881c51c97a4 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -980,6 +980,12 @@ private[spark] object Utils extends Logging { ) } + + private val TB = 1L << 40 + private val GB = 1L << 30 + private val MB = 1L << 20 + private val KB = 1L << 10 + /** * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes. */ @@ -1012,11 +1018,6 @@ private[spark] object Utils extends Logging { * Convert a quantity in bytes to a human-readable string such as "4.0 MB". */ def bytesToString(size: Long): String = { - val TB = 1L << 40 - val GB = 1L << 30 - val MB = 1L << 20 - val KB = 1L << 10 - val (value, unit) = { if (size >= 2*TB) { (size.asInstanceOf[Double] / TB, "TB") @@ -1057,7 +1058,7 @@ private[spark] object Utils extends Logging { * Convert a quantity in megabytes to a human-readable string such as "4.0 MB". */ def megabytesToString(megabytes: Long): String = { - bytesToString(megabytes * 1024L * 1024L) + bytesToString(megabytes * MB) } /** From bb66b222745a85477f32ce03bb72f2e452e5a670 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 10 Feb 2015 00:53:38 +0000 Subject: [PATCH 20/24] add memory-string-parsing helpers to Utils --- .../scala/org/apache/spark/util/Utils.scala | 79 +++++++++++++------ 1 file changed, 56 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 7881c51c97a4..df14666d2d7a 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -986,34 +986,67 @@ private[spark] object Utils extends Logging { private val MB = 1L << 20 private val KB = 1L << 10 + private val scaleCharToFactor: Map[Char, Long] = Map( + 'b' -> 1L, + 'k' -> KB, + 'm' -> MB, + 'g' -> GB, + 't' -> TB + ) + /** - * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes. - */ - def memoryStringToMb(str: String): Int = memoryStringToMb(str, 'b') - def memoryStringToMb(str: String, defaultScale: Char): Int = { + * Convert a Java memory parameter passed to -Xmx (such as "300m" or "1g") to a number of + * megabytes (or other byte-scale denominations as specified by @outputScaleChar). + * + * For @defaultInputScaleChar and @outputScaleChar, valid values are: 'b' (bytes), 'k' + * (kilobytes), 'm' (megabytes), 'g' (gigabytes), and 't' (terabytes). + * + * @param str String to parse an amount of memory out of + * @param defaultInputScaleChar if no "scale" is provided on the end of @str (i.e. @str is a + * plain numeric value), assume this scale (default: 'b' for + * 'bytes') + * @param outputScaleChar express the output in this scale, i.e. number of bytes, kilobytes, + * megabytes, or gigabytes. + */ + def parseMemoryString( + str: String, + defaultInputScaleChar: Char = 'b', + outputScaleChar: Char = 'm'): Long = { + val lower = str.toLowerCase val lastChar = lower(lower.length - 1) - val scale = - if (lastChar.isDigit) - defaultScale - else - lastChar - - if (scale == 'k') { - (lower.substring(0, lower.length-1).toLong / 1024).toInt - } else if (scale == 'm') { - lower.substring(0, lower.length-1).toInt - } else if (scale == 'g') { - lower.substring(0, lower.length-1).toInt * 1024 - } else if (scale == 't') { - lower.substring(0, lower.length-1).toInt * 1024 * 1024 - } else if (scale == 'b') {// no suffix, so it's just a number in bytes - (lower.toLong / 1024 / 1024).toInt - } else { - throw new IllegalArgumentException("Invalid memory string: %s".format(str)) - } + val (num, inputScaleChar) = + if (lastChar.isDigit) { + (lower.toLong, defaultInputScaleChar) + } else { + (lower.substring(0, lower.length - 1).toLong, lastChar) + } + + (for { + inputScale <- scaleCharToFactor.get(inputScaleChar) + outputScale <- scaleCharToFactor.get(outputScaleChar) + scale = inputScale * num / outputScale + } yield { + scale + }).getOrElse( + throw new IllegalArgumentException( + "Invalid memory string or scale: %s, %s, %s".format( + str, + defaultInputScaleChar, + outputScaleChar + ) + ) + ) } + /** + * Wrapper for @parseMemoryString taking default arguments and returning an int, which is safe + * since we are converting to a number of megabytes. + */ + def memoryStringToMb(str: String): Int = memoryStringToMb(str, defaultInputScale = 'b') + def memoryStringToMb(str: String, defaultInputScale: Char = 'b'): Int = + parseMemoryString(str, defaultInputScale, 'm').toInt + /** * Convert a quantity in bytes to a human-readable string such as "4.0 MB". */ From 960b525bddbf98ac7b12f41239ff6cd8ea8f2f6f Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 10 Feb 2015 00:54:43 +0000 Subject: [PATCH 21/24] add `getMemory`, `getMB` helpers to SparkConf --- .../scala/org/apache/spark/SparkConf.scala | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 14300dfa260e..b7ae65509d39 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -206,7 +206,24 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { // Limit of bytes for total size of results (default is 1GB) def getMaxResultSize: Long = { - Utils.memoryStringToMb(get("spark.driver.maxResultSize", "1g")).toLong << 20 + getMemory("spark.driver.maxResultSize", "1g", outputScale = 'b') + } + + private def getMemory( + key: String, + defaultValue: String, + defaultInputScale: Char = 'b', + outputScale: Char = 'm'): Long = { + Utils.parseMemoryString(getOption(key).getOrElse(defaultValue), defaultInputScale, outputScale) + } + + def getMB( + key: String, + defaultValue: Int): Int = { + getOption(key) + .map(Utils.memoryStringToMb(_, defaultInputScale = 'm')) + .map(_.toInt) + .getOrElse(defaultValue) } /** Get all executor environment variables set on this SparkConf */ From e038867f2a9c698aaae0b5be48f014118162e9de Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 10 Feb 2015 00:55:43 +0000 Subject: [PATCH 22/24] use SparkConf.getMB helper in Yarn memory parsing --- .../scala/org/apache/spark/deploy/yarn/ClientArguments.scala | 4 ++-- .../scala/org/apache/spark/deploy/yarn/YarnAllocator.scala | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 1e4cf602e9c2..c623c440a5ea 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -59,10 +59,10 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) // Additional memory to allocate to containers private val amMemoryOverheadConf = if (isClusterMode) driverMemOverheadKey else amMemOverheadKey - val amMemoryOverheadMB = sparkConf.getInt(amMemoryOverheadConf, + val amMemoryOverheadMB = sparkConf.getMB(amMemoryOverheadConf, math.max((MEMORY_OVERHEAD_FACTOR * amMemoryMB).toInt, MEMORY_OVERHEAD_MIN_MB)) - val executorMemoryOverheadMB = sparkConf.getInt("spark.yarn.executor.memoryOverhead", + val executorMemoryOverheadMB = sparkConf.getMB("spark.yarn.executor.memoryOverhead", math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMB).toInt, MEMORY_OVERHEAD_MIN_MB)) /** Load any default arguments provided through environment variables and Spark properties. */ diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 7dd4c330468a..50a90fa2b894 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -91,8 +91,9 @@ private[yarn] class YarnAllocator( // Executor memory in MB. protected val executorMemoryMB = args.executorMemoryMB // Additional memory overhead. - protected val memoryOverheadMB: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead", + protected val memoryOverheadMB: Int = sparkConf.getMB("spark.yarn.executor.memoryOverhead", math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMB).toInt, MEMORY_OVERHEAD_MIN_MB)) + // Number of cores per executor. protected val executorCores = args.executorCores // Resource capability requested for each executors From 2ebb55aeab2541061bb1c65b06b9f9e2c7ba8a56 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 10 Feb 2015 00:56:09 +0000 Subject: [PATCH 23/24] update docs about YARN memory overhead parameters --- docs/running-on-yarn.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 2b93eef6c26e..f75ba4637715 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -113,9 +113,9 @@ Most of the configs are the same for Spark on YARN as for other deployment modes spark.yarn.executor.memoryOverhead - executorMemory * 0.07, with minimum of 384 + executorMemory * 0.07, with minimum of 384 megabytes - The amount of off heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%). + The amount of off heap memory to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%), but is specified here as an absolute amount of memory, e.g. "1g" or "384m". From 50f0f52766d95378540250a1a3afafca3e4d9635 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Tue, 10 Feb 2015 21:04:13 +0000 Subject: [PATCH 24/24] code review feedback --- core/src/main/scala/org/apache/spark/util/Utils.scala | 3 +-- docs/running-on-yarn.md | 10 +++++----- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index df14666d2d7a..18aad8516671 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1025,9 +1025,8 @@ private[spark] object Utils extends Logging { (for { inputScale <- scaleCharToFactor.get(inputScaleChar) outputScale <- scaleCharToFactor.get(outputScaleChar) - scale = inputScale * num / outputScale } yield { - scale + inputScale * num / outputScale }).getOrElse( throw new IllegalArgumentException( "Invalid memory string or scale: %s, %s, %s".format( diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index f75ba4637715..9630138924aa 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -113,23 +113,23 @@ Most of the configs are the same for Spark on YARN as for other deployment modes spark.yarn.executor.memoryOverhead - executorMemory * 0.07, with minimum of 384 megabytes + executorMemory * 0.07, with a minimum of 384 megabytes The amount of off heap memory to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%), but is specified here as an absolute amount of memory, e.g. "1g" or "384m". spark.yarn.driver.memoryOverhead - driverMemory * 0.07, with minimum of 384 + driverMemory * 0.07, with a minimum of 384 megabytes - The amount of off heap memory (in megabytes) to be allocated per driver in cluster mode. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%). + Same as spark.yarn.executor.memoryOverhead, but for the driver in cluster mode. spark.yarn.am.memoryOverhead - AM memory * 0.07, with minimum of 384 + AM memory * 0.07, with a minimum of 384 megabytes - Same as spark.yarn.driver.memoryOverhead, but for the Application Master in client mode. + Same as spark.yarn.executor.memoryOverhead, but for the Application Master in client mode.