Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b34ec0c
make master support multiple executors per worker
CodingCat May 4, 2014
a5d629a
java doc
CodingCat Jan 27, 2015
a26096d
stylistic fix
CodingCat Jan 27, 2015
e5efabb
more java docs and consolidate canUse function
CodingCat Jan 27, 2015
ec7d421
test commit
CodingCat Jan 27, 2015
5b81466
remove outdated comments
CodingCat Jan 27, 2015
19d3da7
address the comments
CodingCat Feb 22, 2015
0b64fea
fix compilation issue
CodingCat Feb 22, 2015
35c462c
address Andrew's comments
CodingCat Feb 22, 2015
387f4ec
bug fix
CodingCat Feb 23, 2015
f64a28d
typo fix
CodingCat Feb 23, 2015
878402c
change the launching executor code
CodingCat Feb 23, 2015
497ec2c
address andrew's comments
CodingCat Mar 27, 2015
2c2bcc5
fix wrong usage info
CodingCat Mar 27, 2015
ff011e2
start multiple executors on the worker by rewriting startExeuctor logic
CodingCat Apr 5, 2015
4cf61f1
improve the code and docs
CodingCat Apr 5, 2015
63b3df9
change the description of the parameter in the submit script
CodingCat Apr 5, 2015
f595bd6
recover some unintentional changes
CodingCat Apr 5, 2015
d9c1685
remove unused var
CodingCat Apr 5, 2015
f035423
stylistic fix
CodingCat Apr 5, 2015
12a1b32
change the semantic of coresPerExecutor to exact core number
CodingCat Apr 9, 2015
2eeff77
stylistic fixes
CodingCat Apr 10, 2015
45967b4
remove unused method
CodingCat Apr 10, 2015
b8ca561
revert a change
CodingCat Apr 10, 2015
940cb42
avoid unnecessary allocation
CodingCat Apr 10, 2015
fbeb7e5
address the comments
CodingCat Apr 14, 2015
6dee808
change filter predicate
CodingCat Apr 14, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
stylistic fixes
  • Loading branch information
CodingCat committed Apr 10, 2015
commit 2eeff77c066bc14bd13dbf14f75c1b62fe55db07
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ private[deploy] class ApplicationInfo(
}
}

private[master] def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None):
ExecutorDesc = {
private[master] def addExecutor(
worker: WorkerInfo,
cores: Int,
useID: Option[Int] = None): ExecutorDesc = {
val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, desc.memoryPerExecutorMB)
executors(exec.id) = exec
coresGranted += cores
Expand Down
62 changes: 31 additions & 31 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -533,11 +533,14 @@ private[master] class Master(
}

/**
* The resource allocator spread out each app among all the workers until it has all its cores in
* spreadOut mode otherwise packs each app into as few workers as possible until it has assigned
* all its cores. User can define spark.deploy.maxCoresPerExecutor per application to
* limit the maximum number of cores to allocate to each executor on each worker; if the parameter
* is not defined, then only one executor will be launched on a worker.
* Schedule executors to be launched on the workers.There are two modes of launching executors.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you break "There are two modes of..." into a new paragraph?

* The first attempts to spread out an application's executors on as many workers as possible,
* while the second does the opposite (i.e. launch them on as few workers as possible). The former
* is usually better for data locality purposes and is the default. The number of cores assigned
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should split on this sentence ("The number of cores...") to form a new paragraph. Right now it's one huge chunk of text

* to each executor is configurable. When this is explicitly set, multiple executors from the same
* application may be launched on the same worker if the worker has enough cores and memory.
* Otherwise, each executor grabs all the cores available on the worker by default, in which case
* only one executor may be launched on each worker.
*/
private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
Expand All @@ -546,7 +549,9 @@ private[master] class Master(
// Try to spread out each app among all the workers, until it has all its cores
for (app <- waitingApps if app.coresLeft > 0) {
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(canUse(app, _)).sortBy(_.coresFree).reverse
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree > 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you technically don't need the cores check here, since we already check in L551

if (usableWorkers(pos).coresFree - assigned(pos) > 0)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I replaced it with 'worker.coresFree >= app.desc.coresPerExecutor.getOrElse(0)', so that we do not need to run the following allocation algorithm for the case I mentioned above

e.g. I have 8 cores, 2 cores per machine, an application would like to use all of them; in spread mode, we will get an array assigned as Array(2, 2, 2, 2), if we set --executor-cores as 3, then the application will get 0 core, as we have no allocation which is no less than 3...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be worker.coresFree >= app.desc.coresPerExecutor.getOrElse(1)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, this predicate is actually not needed because we handle it correctly in the line I pointed out earlier. But not a big deal, we can just leave it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm....if we remove this, in the case above, the user prefers 3 cores per executor and all workers have at most 2 cores, though we will not allocate anything to the worker, we still generate a assigned array....

.sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable) // Number of cores to give on each node
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
Expand All @@ -566,15 +571,16 @@ private[master] class Master(
} else {
// Pack each app into as few workers as possible until we've assigned all its cores
for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
for (app <- waitingApps if app.coresLeft > 0) {
allocateWorkerResourceToExecutors(app, app.coresLeft, worker)
for (app <- waitingApps if app.coresLeft > 0 &&
worker.memoryFree >= app.desc.memoryPerExecutorMB) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to check this again here... we already check this in allocateWorkerResourceToExecutors L586. Can you revert this line?

allocateWorkerResourceToExecutors(app, app.coresLeft, worker)
}
}
}
}

/**
* allocate resources in a certain worker to one or more executors
* Allocate a worker's resources to one or more executors.
* @param app the info of the application which the executors belong to
* @param coresToAllocate cores on this worker to be allocated to this application
* @param worker the worker info
Expand All @@ -583,20 +589,24 @@ private[master] class Master(
app: ApplicationInfo,
coresToAllocate: Int,
worker: WorkerInfo): Unit = {
if (canUse(app, worker)) {
val memoryPerExecutor = app.desc.memoryPerExecutorMB
val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresToAllocate)
var coresLeft = coresToAllocate
while (coresLeft >= coresPerExecutor && worker.memoryFree >= memoryPerExecutor) {
val exec = app.addExecutor(worker, coresPerExecutor)
coresLeft -= coresPerExecutor
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
val memoryPerExecutor = app.desc.memoryPerExecutorMB
val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresToAllocate)
var coresLeft = coresToAllocate
while (coresLeft >= coresPerExecutor && worker.memoryFree >= memoryPerExecutor) {
val exec = app.addExecutor(worker, coresPerExecutor)
coresLeft -= coresPerExecutor
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
}

private def startDriversOnWorkers(): Unit = {
/**
* Schedule the currently available resources among waiting apps. This method will be called
* every time a new app joins or resource availability changes.
*/
private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) { return }
// start in-cluster drivers, they take strict precedence over applications
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment doesn't make much sense. Can you just replace it with

// Drivers take strict precedence over executors

val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
for (driver <- waitingDrivers) {
Expand All @@ -606,21 +616,11 @@ private[master] class Master(
}
}
}
}

/**
* Schedule the currently available resources among waiting apps. This method will be called
* every time a new app joins or resource availability changes.
*/
private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) { return }
// start in-cluster drivers, they take strict precedence over applications
startDriversOnWorkers()
// start executors
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this comment, as it doesn't convey any information

startExecutorsOnWorkers()
}

def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
worker.actor ! LaunchExecutor(masterUrl,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ private[spark] class SparkDeploySchedulerBackend(
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
command, appUIAddress, sc.eventLogDir, sc.eventLogCodec,
conf.getOption("spark.executor.cores").map(_.toInt))
command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
waitForRegistration()
Expand Down
4 changes: 1 addition & 3 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -715,10 +715,8 @@ Apart from these, the following properties are also available, and may be useful
</tr>
<tr>
<td><code>spark.executor.cores</code></td>
<td>(infinite)</td>
<td>1 in YARN mode, all the available cores on the worker in standalone mode.</td>
<td>
Default: 1 in YARN mode, all the available cores on the worker in standalone mode.

The number of cores to use on each executor. For YARN and standalone mode only.

In standalone mode, setting this parameter allows an application to run multiple executors on
Expand Down