Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b34ec0c
make master support multiple executors per worker
CodingCat May 4, 2014
a5d629a
java doc
CodingCat Jan 27, 2015
a26096d
stylistic fix
CodingCat Jan 27, 2015
e5efabb
more java docs and consolidate canUse function
CodingCat Jan 27, 2015
ec7d421
test commit
CodingCat Jan 27, 2015
5b81466
remove outdated comments
CodingCat Jan 27, 2015
19d3da7
address the comments
CodingCat Feb 22, 2015
0b64fea
fix compilation issue
CodingCat Feb 22, 2015
35c462c
address Andrew's comments
CodingCat Feb 22, 2015
387f4ec
bug fix
CodingCat Feb 23, 2015
f64a28d
typo fix
CodingCat Feb 23, 2015
878402c
change the launching executor code
CodingCat Feb 23, 2015
497ec2c
address andrew's comments
CodingCat Mar 27, 2015
2c2bcc5
fix wrong usage info
CodingCat Mar 27, 2015
ff011e2
start multiple executors on the worker by rewriting startExeuctor logic
CodingCat Apr 5, 2015
4cf61f1
improve the code and docs
CodingCat Apr 5, 2015
63b3df9
change the description of the parameter in the submit script
CodingCat Apr 5, 2015
f595bd6
recover some unintentional changes
CodingCat Apr 5, 2015
d9c1685
remove unused var
CodingCat Apr 5, 2015
f035423
stylistic fix
CodingCat Apr 5, 2015
12a1b32
change the semantic of coresPerExecutor to exact core number
CodingCat Apr 9, 2015
2eeff77
stylistic fixes
CodingCat Apr 10, 2015
45967b4
remove unused method
CodingCat Apr 10, 2015
b8ca561
revert a change
CodingCat Apr 10, 2015
940cb42
avoid unnecessary allocation
CodingCat Apr 10, 2015
fbeb7e5
address the comments
CodingCat Apr 14, 2015
6dee808
change filter predicate
CodingCat Apr 14, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
change the semantic of coresPerExecutor to exact core number
  • Loading branch information
CodingCat committed Apr 9, 2015
commit 12a1b320ea337cb5d93e54fc0368051b22be1333
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,21 @@ private[spark] class ApplicationDescription(
val eventLogDir: Option[URI] = None,
// short name of compression codec used when writing event logs, if any (e.g. lzf)
val eventLogCodec: Option[String] = None,
val maxCorePerExecutor: Option[Int] = None)
val coresPerExecutor: Option[Int] = None)
extends Serializable {

val user = System.getProperty("user.name", "<unknown>")

def copy(
name: String = name,
maxCores: Option[Int] = maxCores,
memoryPerSlave: Int = memoryPerExecutorMB,
memoryPerExecutorMB: Int = memoryPerExecutorMB,
command: Command = command,
appUiUrl: String = appUiUrl,
eventLogDir: Option[URI] = eventLogDir,
eventLogCodec: Option[String] = eventLogCodec): ApplicationDescription =
new ApplicationDescription(
name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir, eventLogCodec)
name, maxCores, memoryPerExecutorMB, command, appUiUrl, eventLogDir, eventLogCodec)

override def toString: String = "ApplicationDescription(" + name + ")"
}
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ object SparkSubmit {

// Other options
OptionAssigner(args.executorCores, STANDALONE, ALL_DEPLOY_MODES,
sysProp = "spark.deploy.maxCoresPerExecutor"),
sysProp = "spark.executor.cores"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES,
sysProp = "spark.executor.memory"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,10 +483,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| --total-executor-cores NUM Total cores for all executors.
|
| Spark standalone and YARN only:
| --executor-cores NUM Number of cores to use on each executor. Default:
| 1 in YARN mode; in standalone mode, all cores in a worker
| allocated to an application will be assigned to a single
| executor.
| --executor-cores NUM Number of cores per executor. (Default: 1 in YARN mode,
| or all available cores on the worker in standalone mode)
|
| YARN-only:
| --driver-cores NUM Number of cores used by the driver, only in cluster mode
Expand Down
31 changes: 15 additions & 16 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ private[master] class Master(
*/
private def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = {
val enoughResources = worker.memoryFree >= app.desc.memoryPerExecutorMB && worker.coresFree > 0
val allowToExecute = app.desc.maxCorePerExecutor.isDefined || !worker.hasExecutor(app)
val allowToExecute = app.desc.coresPerExecutor.isDefined || !worker.hasExecutor(app)
allowToExecute && enoughResources
}

Expand All @@ -539,7 +539,7 @@ private[master] class Master(
* limit the maximum number of cores to allocate to each executor on each worker; if the parameter
* is not defined, then only one executor will be launched on a worker.
*/
private def startExecutorsOnWorkers() {
private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
if (spreadOutApps) {
Expand Down Expand Up @@ -576,25 +576,24 @@ private[master] class Master(
/**
* allocate resources in a certain worker to one or more executors
* @param app the info of the application which the executors belong to
* @param coresDemand the total number of cores to be allocated to this application
* @param coresToAllocate cores on this worker to be allocated to this application
* @param worker the worker info
*/
private def allocateWorkerResourceToExecutors(
app: ApplicationInfo,
coresDemand: Int,
coresToAllocate: Int,
worker: WorkerInfo): Unit = {
if (canUse(app, worker)) {
val memoryPerExecutor = app.desc.memoryPerExecutorMB
val maxCoresPerExecutor = app.desc.maxCorePerExecutor.getOrElse(Int.MaxValue)
var coresToAssign = coresDemand
while (coresToAssign > 0 && worker.memoryFree >= memoryPerExecutor) {
val coresForThisExecutor = math.min(maxCoresPerExecutor, coresToAssign)
val exec = app.addExecutor(worker, coresForThisExecutor)
coresToAssign -= coresForThisExecutor
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
if (canUse(app, worker)) {
val memoryPerExecutor = app.desc.memoryPerExecutorMB
val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresToAllocate)
var coresLeft = coresToAllocate
while (coresLeft >= coresPerExecutor && worker.memoryFree >= memoryPerExecutor) {
val exec = app.addExecutor(worker, coresPerExecutor)
coresLeft -= coresPerExecutor
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
}
}

private def startDriversOnWorkers(): Unit = {
Expand All @@ -621,7 +620,7 @@ private[master] class Master(
startExecutorsOnWorkers()
}

def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {
def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
worker.actor ! LaunchExecutor(masterUrl,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ private[spark] class SparkDeploySchedulerBackend(
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
appUIAddress, sc.eventLogDir, sc.eventLogCodec,
conf.getOption("spark.deploy.maxCoresPerExecutor").map(_.toInt))
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
command, appUIAddress, sc.eventLogDir, sc.eventLogCodec,
conf.getOption("spark.executor.cores").map(_.toInt))
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
waitForRegistration()
Expand Down
12 changes: 8 additions & 4 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -714,12 +714,16 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
<td><code>spark.deploy.maxCoresPerExecutor</code></td>
<td><code>spark.executor.cores</code></td>
<td>(infinite)</td>
<td>
The maximum number of cores given to the executor. When this parameter is set, Spark will try to
run more than 1 executors on each worker in standalone mode; otherwise, only one executor is
launched on each worker.
Default: 1 in YARN mode, all the available cores on the worker in standalone mode.

The number of cores to use on each executor. For YARN and standalone mode only.

In standalone mode, setting this parameter allows an application to run multiple executors on
the same worker, provided that there are enough cores on that worker. Otherwise, only one
executor per application will run on each worker.
</td>
</tr>
<tr>
Expand Down