Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
3e1cc29
rename: executorMemory -> executorMemoryMB
ryan-williams Feb 10, 2015
cba802c
rename: amMemory -> amMemoryMB
ryan-williams Feb 10, 2015
0a717d7
rename: driverMemory -> driverMemoryMB
ryan-williams Feb 10, 2015
6635945
rename: DEFAULT_MEMORY -> DEFAULT_MEMORY_MB
ryan-williams Feb 10, 2015
96dfec0
rename: amMemoryOverhead -> amMemoryOverheadMB
ryan-williams Feb 10, 2015
5a3e4b8
rename: executorMemoryOverhead -> executorMemoryOverheadMB
ryan-williams Feb 10, 2015
63086eb
rename: amMemoryOverhead -> amMemoryOverheadMB
ryan-williams Feb 10, 2015
f54b0ce
rename: executorMemoryOverhead -> executorMemoryOverheadMB
ryan-williams Feb 10, 2015
fa3d69f
rename: MEMORY_OVERHEAD_MIN -> MEMORY_OVERHEAD_MIN_MB
ryan-williams Feb 10, 2015
f265d15
fix deprecation warning
ryan-williams Feb 9, 2015
c29da1d
rename: executorMemory -> executorMemoryMB
ryan-williams Feb 10, 2015
23a77be
rename: executorMemory -> executorMemoryMB
ryan-williams Feb 10, 2015
5057bd3
rename: memoryOverhead -> memoryOverheadMB
ryan-williams Feb 10, 2015
14bd3d5
rename: memory -> memoryMB
ryan-williams Feb 10, 2015
6e69b08
rename: mem -> memMB
ryan-williams Feb 10, 2015
48c5115
memoryStringToMb can have default scale specified
ryan-williams Nov 13, 2014
dc03bf2
move getMaxResultSize from Utils to SparkConf
ryan-williams Feb 10, 2015
40ac6ce
privatize amMemoryOverheadConf
ryan-williams Feb 10, 2015
dd9be85
refactor memory-size order-of-magnitude constants
ryan-williams Feb 10, 2015
bb66b22
add memory-string-parsing helpers to Utils
ryan-williams Feb 10, 2015
960b525
add `getMemory`, `getMB` helpers to SparkConf
ryan-williams Feb 10, 2015
e038867
use SparkConf.getMB helper in Yarn memory parsing
ryan-williams Feb 10, 2015
2ebb55a
update docs about YARN memory overhead parameters
ryan-williams Feb 10, 2015
50f0f52
code review feedback
ryan-williams Feb 10, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,28 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
getOption(key).map(_.toBoolean).getOrElse(defaultValue)
}

// Limit of bytes for total size of results (default is 1GB)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, why is this property special-cased here? the methods in this class are generally generic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I weighed two options:

  1. have a method in Utils that takes a SparkConf as a parameter and thinly wraps a method call on said SparkConf
  2. make the aforementioned wrapper a method on SparkConf that delegates to another method on SparkConf

..and felt like the latter was better/cleaner. My feeling was that a kitchen-sink / generically-named Utils class that wraps methods for SparkConf (and possibly other classes?) to maintain an illusion of simplicity in the SparkConf API is not helping code clarity.

Of course, this is subjective and I'm open to putting it back in Utils, lmk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NB: that was an answer to why this property is special-cased here, as opposed to over in Utils. You may be more interested in the question of why it's special-cased at all, but that seems reasonable to me given the couple magic strings and its greater-than-1 call-sites (namely: 2).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have picked Utils I suppose. Or is there somewhere less generic to put this that covers the 2 call sites? Other opinions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option 3. could be: Put such methods in a SparkConfUtils object that would be less prone to kitchen-sink-iness.

I'm impartial, I'll let you make the call between these three.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like another set of eyes on the change from @pwendell or @JoshRosen . The reason I hesitate before merging is the large number of small changes and merge conflict potential. Although I do definitely think the variable names are improved by the suffix.

For this particular issue, maybe expose just getMemory from this class, and inline the two simple calls to it that currently use getMaxResultSize? writing getMemory("spark.driver.maxResultSize", "1g", outputScale = 'b') in two places isn't bad versus constructing a new home for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, I'd vote we put it back in Utils over un-factoring those two calls that are doing the same thing as one another

def getMaxResultSize: Long = {
getMemory("spark.driver.maxResultSize", "1g", outputScale = 'b')
}

private def getMemory(
key: String,
defaultValue: String,
defaultInputScale: Char = 'b',
outputScale: Char = 'm'): Long = {
Utils.parseMemoryString(getOption(key).getOrElse(defaultValue), defaultInputScale, outputScale)
}

def getMB(
key: String,
defaultValue: Int): Int = {
getOption(key)
.map(Utils.memoryStringToMb(_, defaultInputScale = 'm'))
.map(_.toInt)
.getOrElse(defaultValue)
}

/** Get all executor environment variables set on this SparkConf */
def getExecutorEnv: Seq[(String, String)] = {
val prefix = "spark.executorEnv."
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)

val driverDescription = new DriverDescription(
driverArgs.jarUrl,
driverArgs.memory,
driverArgs.memoryMB,
driverArgs.cores,
driverArgs.supervise,
command)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private[spark] class ClientArguments(args: Array[String]) {
var jarUrl: String = ""
var mainClass: String = ""
var supervise: Boolean = DEFAULT_SUPERVISE
var memory: Int = DEFAULT_MEMORY
var memoryMB: Int = DEFAULT_MEMORY_MB
var cores: Int = DEFAULT_CORES
private var _driverOptions = ListBuffer[String]()
def driverOptions = _driverOptions.toSeq
Expand All @@ -55,7 +55,7 @@ private[spark] class ClientArguments(args: Array[String]) {
parse(tail)

case ("--memory" | "-m") :: MemoryParam(value) :: tail =>
memory = value
memoryMB = value
parse(tail)

case ("--supervise" | "-s") :: tail =>
Expand Down Expand Up @@ -106,7 +106,7 @@ private[spark] class ClientArguments(args: Array[String]) {
|
|Options:
| -c CORES, --cores CORES Number of cores to request (default: $DEFAULT_CORES)
| -m MEMORY, --memory MEMORY Megabytes of memory to request (default: $DEFAULT_MEMORY)
| -m MEMORY, --memory MEMORY Megabytes of memory to request (default: $DEFAULT_MEMORY_MB)
| -s, --supervise Whether to restart the driver on failure
| (default: $DEFAULT_SUPERVISE)
| -v, --verbose Print more debugging output
Expand All @@ -118,7 +118,7 @@ private[spark] class ClientArguments(args: Array[String]) {

object ClientArguments {
private[spark] val DEFAULT_CORES = 1
private[spark] val DEFAULT_MEMORY = 512 // MB
private[spark] val DEFAULT_MEMORY_MB = 512
private[spark] val DEFAULT_SUPERVISE = false

def isValidJarUrl(s: String): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@ package org.apache.spark.deploy

private[spark] class DriverDescription(
val jarUrl: String,
val mem: Int,
val memMB: Int,
val cores: Int,
val supervise: Boolean,
val command: Command)
extends Serializable {

def copy(
jarUrl: String = jarUrl,
mem: Int = mem,
memMB: Int = memMB,
cores: Int = cores,
supervise: Boolean = supervise,
command: Command = command): DriverDescription =
new DriverDescription(jarUrl, mem, cores, supervise, command)
new DriverDescription(jarUrl, memMB, cores, supervise, command)

override def toString: String = s"DriverDescription (${command.mainClass})"
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private[spark] object JsonProtocol {
("starttime" -> obj.startTime.toString) ~
("state" -> obj.state.toString) ~
("cores" -> obj.desc.cores) ~
("memory" -> obj.desc.mem)
("memory" -> obj.desc.memMB)
}

def writeMasterState(obj: MasterStateResponse) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ private[spark] class Master(
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
if (worker.memoryFree >= driver.desc.memMB && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ private[spark] class WorkerInfo(

def addDriver(driver: DriverInfo) {
drivers(driver.id) = driver
memoryUsed += driver.desc.mem
memoryUsed += driver.desc.memMB
coresUsed += driver.desc.cores
}

def removeDriver(driver: DriverInfo) {
drivers -= driver.id
memoryUsed -= driver.desc.mem
memoryUsed -= driver.desc.memMB
coresUsed -= driver.desc.cores
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
<td sorttable_customkey={driver.desc.cores.toString}>
{driver.desc.cores}
</td>
<td sorttable_customkey={driver.desc.mem.toString}>
{Utils.megabytesToString(driver.desc.mem.toLong)}
<td sorttable_customkey={driver.desc.memMB.toString}>
{Utils.megabytesToString(driver.desc.memMB.toLong)}
</td>
<td>{driver.desc.command.arguments(2)}</td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ private class SubmitRequestServlet(
"org.apache.spark.deploy.worker.DriverWrapper",
Seq("{{WORKER_URL}}", "{{USER_JAR}}", mainClass) ++ appArgs, // args to the DriverWrapper
environmentVariables, extraClassPath, extraLibraryPath, javaOpts)
val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY)
val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY_MB)
val actualDriverCores = driverCores.map(_.toInt).getOrElse(DEFAULT_CORES)
val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE)
new DriverDescription(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private[spark] class DriverRunner(
}

// TODO: If we add ability to submit multiple jars they should also be added here
val builder = CommandUtils.buildProcessBuilder(driverDesc.command, driverDesc.mem,
val builder = CommandUtils.buildProcessBuilder(driverDesc.command, driverDesc.memMB,
sparkHome.getAbsolutePath, substituteVariables)
launchDriver(builder, driverDir, driverDesc.supervise)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ private[spark] class Worker(
driver.start()

coresUsed += driverDesc.cores
memoryUsed += driverDesc.mem
memoryUsed += driverDesc.memMB
}

case KillDriver(driverId) => {
Expand Down Expand Up @@ -464,7 +464,7 @@ private[spark] class Worker(
master ! DriverStateChanged(driverId, state, exception)
val driver = drivers.remove(driverId).get
finishedDrivers(driverId) = driver
memoryUsed -= driver.driverDesc.mem
memoryUsed -= driver.driverDesc.memMB
coresUsed -= driver.driverDesc.cores
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ private[spark] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") {
<td sorttable_customkey={driver.driverDesc.cores.toString}>
{driver.driverDesc.cores.toString}
</td>
<td sorttable_customkey={driver.driverDesc.mem.toString}>
{Utils.megabytesToString(driver.driverDesc.mem)}
<td sorttable_customkey={driver.driverDesc.memMB.toString}>
{Utils.megabytesToString(driver.driverDesc.memMB)}
</td>
<td>
<a href={s"logPage?driverId=${driver.driverId}&logType=stdout"}>stdout</a>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private[spark] class Executor(
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)

// Limit of bytes for total size of results (default is 1GB)
private val maxResultSize = Utils.getMaxResultSize(conf)
private val maxResultSize = conf.getMaxResultSize

// Maintains the list of running tasks.
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private[spark] class TaskSetManager(
val SPECULATION_MULTIPLIER = conf.getDouble("spark.speculation.multiplier", 1.5)

// Limit of bytes for total size of results (default is 1GB)
val maxResultSize = Utils.getMaxResultSize(conf)
val maxResultSize = conf.getMaxResultSize

// Serializer for closures and tasks.
val env = SparkEnv.get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.scheduler.local
import java.nio.ByteBuffer

import scala.concurrent.duration._
import scala.language.postfixOps
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Saw this while compiling:

[WARNING] /Users/ryan/c/spark/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala:156: postfix operator second should be enabled
by making the implicit value scala.language.postfixOps visible.
This can be achieved by adding the import clause 'import scala.language.postfixOps'
or by setting the compiler option -language:postfixOps.
See the Scala docs for value scala.language.postfixOps for a discussion
why the feature should be explicitly enabled.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's the right thing, if you think that the use of postfix ops is justified.


import akka.actor.{Actor, ActorRef, Props}

Expand Down
88 changes: 63 additions & 25 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -980,33 +980,76 @@ private[spark] object Utils extends Logging {
)
}


private val TB = 1L << 40
private val GB = 1L << 30
private val MB = 1L << 20
private val KB = 1L << 10

private val scaleCharToFactor: Map[Char, Long] = Map(
'b' -> 1L,
'k' -> KB,
'm' -> MB,
'g' -> GB,
't' -> TB
)

/**
* Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
*/
def memoryStringToMb(str: String): Int = {
* Convert a Java memory parameter passed to -Xmx (such as "300m" or "1g") to a number of
* megabytes (or other byte-scale denominations as specified by @outputScaleChar).
*
* For @defaultInputScaleChar and @outputScaleChar, valid values are: 'b' (bytes), 'k'
* (kilobytes), 'm' (megabytes), 'g' (gigabytes), and 't' (terabytes).
*
* @param str String to parse an amount of memory out of
* @param defaultInputScaleChar if no "scale" is provided on the end of @str (i.e. @str is a
* plain numeric value), assume this scale (default: 'b' for
* 'bytes')
* @param outputScaleChar express the output in this scale, i.e. number of bytes, kilobytes,
* megabytes, or gigabytes.
*/
def parseMemoryString(
str: String,
defaultInputScaleChar: Char = 'b',
outputScaleChar: Char = 'm'): Long = {

val lower = str.toLowerCase
if (lower.endsWith("k")) {
(lower.substring(0, lower.length-1).toLong / 1024).toInt
} else if (lower.endsWith("m")) {
lower.substring(0, lower.length-1).toInt
} else if (lower.endsWith("g")) {
lower.substring(0, lower.length-1).toInt * 1024
} else if (lower.endsWith("t")) {
lower.substring(0, lower.length-1).toInt * 1024 * 1024
} else {// no suffix, so it's just a number in bytes
(lower.toLong / 1024 / 1024).toInt
}
val lastChar = lower(lower.length - 1)
val (num, inputScaleChar) =
if (lastChar.isDigit) {
(lower.toLong, defaultInputScaleChar)
} else {
(lower.substring(0, lower.length - 1).toLong, lastChar)
}

(for {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is a for construction used here? just to handle the invalid in / out scale param? My personal taste would be to just check that the Option[Long] exists directly and do away with it. I can't tell how much that's just me versus how the kids talk in Scala these days. A looping construct surprised me as there is no loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is a for construction used here? just to handle the invalid in / out scale param?

for comprehensions are commonly used when maping over 2 or more monads to avoid arguably-ugly .flatMap-chaining; the syntax just removes some boilerplate, e.g.:

    scaleCharToFactor.get(inputScaleChar).flatMap(inputScale => 
      scaleCharToFactor.get(outputScaleChar).map(outputScale =>
        inputScale * num / outputScale
      )
    ).getOrElse(
        // throw
      )

vs.

    (for {
      inputScale <- scaleCharToFactor.get(inputScaleChar)
      outputScale <- scaleCharToFactor.get(outputScaleChar)
    } yield {
      inputScale * num / outputScale
    }).getOrElse(
        // throw
      )

(I collapsed the scale wrapper line in the latter for apples-to-apples brevity, and can do that in the PR as well).

So, it's not a "looping construct" so much as a "mapping" one, commonly used on Options, Lists, and even things like twitter Futures (search for "for {").

However, it does get better as the number of chained maps increases, e.g. especially when there are 3 or more, so I'm not that tied to it here.

Of course, handling such situations using a match is also possible:

    (scaleCharToFactor.get(inputScaleChar), scaleCharToFactor.get(outputScaleChar)) match {
      case (Some(inputScale), Some(outputScale)) =>
        inputScale * num / outputScale
      case _ =>
        // throw
    }

I prefer all of these to, say, the following straw-man that doesn't take advantage of any of the nice things that come from using Options:

    if (scaleCharToFactor.get(inputScaleChar).isDefined && 
        scaleCharToFactor.get(outputScaleChar).isDefined)
      scaleCharToFactor.get(inputScaleChar).get * num / scaleCharToFactor.get(outputScaleChar).get
    else
      // throw

but I'll defer to you on the approach you like best.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I get what it does. I was comparing in my mind to...

val inputScale = scaleCharToFactor.get(inputScaleChar)
val outputScale = scaleCharToFactor.get(outputScaleChar)
require(inputScale.isDefined)
require(outputScale.isDefined)
inputScale.get * num / outputScale.get

Here's another instance where I wouldn't mind hearing another opinion as it's a good more general style question.

inputScale <- scaleCharToFactor.get(inputScaleChar)
outputScale <- scaleCharToFactor.get(outputScaleChar)
} yield {
inputScale * num / outputScale
}).getOrElse(
throw new IllegalArgumentException(
"Invalid memory string or scale: %s, %s, %s".format(
str,
defaultInputScaleChar,
outputScaleChar
)
)
)
}

/**
* Wrapper for @parseMemoryString taking default arguments and returning an int, which is safe
* since we are converting to a number of megabytes.
*/
def memoryStringToMb(str: String): Int = memoryStringToMb(str, defaultInputScale = 'b')
def memoryStringToMb(str: String, defaultInputScale: Char = 'b'): Int =
parseMemoryString(str, defaultInputScale, 'm').toInt

/**
* Convert a quantity in bytes to a human-readable string such as "4.0 MB".
*/
def bytesToString(size: Long): String = {
val TB = 1L << 40
val GB = 1L << 30
val MB = 1L << 20
val KB = 1L << 10

val (value, unit) = {
if (size >= 2*TB) {
(size.asInstanceOf[Double] / TB, "TB")
Expand Down Expand Up @@ -1047,7 +1090,7 @@ private[spark] object Utils extends Logging {
* Convert a quantity in megabytes to a human-readable string such as "4.0 MB".
*/
def megabytesToString(megabytes: Long): String = {
bytesToString(megabytes * 1024L * 1024L)
bytesToString(megabytes * MB)
}

/**
Expand Down Expand Up @@ -1905,11 +1948,6 @@ private[spark] object Utils extends Logging {
method.invoke(obj, values.toSeq: _*)
}

// Limit of bytes for total size of results (default is 1GB)
def getMaxResultSize(conf: SparkConf): Long = {
memoryStringToMb(conf.get("spark.driver.maxResultSize", "1g")).toLong << 20
}

/**
* Return the current system LD_LIBRARY_PATH name
*/
Expand Down
12 changes: 6 additions & 6 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,23 +113,23 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
</tr>
<tr>
<td><code>spark.yarn.executor.memoryOverhead</code></td>
<td>executorMemory * 0.07, with minimum of 384 </td>
<td>executorMemory * 0.07, with a minimum of 384 megabytes </td>
<td>
The amount of off heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%).
The amount of off heap memory to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%), but is specified here as an absolute amount of memory, e.g. "1g" or "384m".
</td>
</tr>
<tr>
<td><code>spark.yarn.driver.memoryOverhead</code></td>
<td>driverMemory * 0.07, with minimum of 384 </td>
<td>driverMemory * 0.07, with a minimum of 384 megabytes </td>
<td>
The amount of off heap memory (in megabytes) to be allocated per driver in cluster mode. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%).
Same as <code>spark.yarn.executor.memoryOverhead</code>, but for the driver in cluster mode.
</td>
</tr>
<tr>
<td><code>spark.yarn.am.memoryOverhead</code></td>
<td>AM memory * 0.07, with minimum of 384 </td>
<td>AM memory * 0.07, with a minimum of 384 megabytes </td>
<td>
Same as <code>spark.yarn.driver.memoryOverhead</code>, but for the Application Master in client mode.
Same as <code>spark.yarn.executor.memoryOverhead</code>, but for the Application Master in client mode.
</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
var primaryPyFile: String = null
var pyFiles: String = null
var userArgs: Seq[String] = Seq[String]()
var executorMemory = 1024
var executorMemoryMB = 1024
var executorCores = 1
var numExecutors = DEFAULT_NUMBER_EXECUTORS

Expand Down Expand Up @@ -67,7 +67,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
args = tail

case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail =>
executorMemory = value
executorMemoryMB = value
args = tail

case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: tail =>
Expand Down
Loading