Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
application specific corePerExecutor
  • Loading branch information
CodingCat committed May 8, 2014
commit 874ec7a20502807d759fad4598068aa920887669
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ package org.apache.spark.deploy
private[spark] class ApplicationDescription(
val name: String,
val maxCores: Option[Int],
val memoryPerExecutor: Int,
val memoryPerExecutor: Int, // in Mb
val command: Command,
val sparkHome: Option[String],
var appUiUrl: String,
val eventLogDir: Option[String] = None)
extends Serializable {

val user = System.getProperty("user.name", "<unknown>")

// only valid when spark.executor.multiPerWorker is set to true
var maxCorePerExecutor = maxCores
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think in var maxCorePerExecutor = maxCores the two variables are different. maxCores is total core's value of a application. but maxCorePerExecutor is cores of per executor. in schedule() app's leftCoreToAssign come from maxCores value.so two variables cannot be equal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's just an initial value

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes i know. but in ApplicationInfo.scala the coresLeft value is same to the value of desc.maxCores. in schedule leftCoreToAssign actually is equto to maxCorePerExecutor. so i think there are not right because leftCoreToAssign is total cores of all executors and maxCorePerExecutor is cores of one executor. i donot know whether you understand it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why you think "in schedule leftCoreToAssign actually is equto to maxCorePerExecutor", it's the minimum value between (app.coresLeft) and the sum of all worker free cores... var leftCoreToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes,but in ApplicationInfo,app.coresLeft is equal to app.maxCores. so in schedule when the sum of all worker free cores is greater than app.coresLeft, now leftCoreToAssign actually is equal to maxCorePerExecutor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes thanks i see.

override def toString: String = "ApplicationDescription(" + name + ")"
}
30 changes: 20 additions & 10 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,8 @@ private[spark] class Master(
* two executors on the same worker).
*/
private def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = {
worker.memoryFree >= app.desc.memoryPerExecutor && !worker.hasExecutor(app)
worker.memoryFree >= app.desc.memoryPerExecutor && !worker.hasExecutor(app) &&
worker.coresFree > 0
}

private def startSingleExecutorPerWorker() {
Expand Down Expand Up @@ -505,29 +506,37 @@ private[spark] class Master(
}

private def startMultiExecutorsPerWorker() {
val coreNumPerExecutor = conf.getInt("spark.executor.coreNumPerExecutor", 1)
// allow user to run multiple executors in the same worker
// (within the same worker JVM process)
if (spreadOutApps) {
var usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(_.coresFree > 0).sortBy(_.coresFree).reverse
for (app <- waitingApps if app.coresLeft > 0) {
var usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(_.coresFree > 0).sortBy(_.coresFree).reverse
val maxCoreNumPerExecutor = app.desc.maxCorePerExecutor.get
var mostFreeCoreWorkerPos = 0
var leftCoreToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
val numUsable = usableWorkers.length
// Number of cores of each executor assigned to each worker
val assigned = Array.fill[ListBuffer[Int]](numUsable)(new ListBuffer[Int])
val assignedSum = Array.fill[Int](numUsable)(0)
var pos = 0
val memoryNotEnoughFlags = new Array[Boolean](numUsable)
while (leftCoreToAssign > 0 && memoryNotEnoughFlags.contains(false)) {
val coreToAssign = math.min(coreNumPerExecutor, leftCoreToAssign)
if (usableWorkers(pos).coresFree - assigned(pos).sum >= coreToAssign &&
!memoryNotEnoughFlags(pos)) {
var noEnoughMemoryWorkerNum = 0
var maxPossibleCore = usableWorkers(mostFreeCoreWorkerPos).coresFree
while (leftCoreToAssign > 0 && noEnoughMemoryWorkerNum < numUsable) {
if (usableWorkers(mostFreeCoreWorkerPos).coresFree < usableWorkers(pos).coresFree) {
mostFreeCoreWorkerPos = pos
maxPossibleCore = usableWorkers(mostFreeCoreWorkerPos).coresFree
}
val coreToAssign = math.min(math.min(maxCoreNumPerExecutor, maxPossibleCore),
leftCoreToAssign)
if (usableWorkers(pos).coresFree - assignedSum(pos) >= coreToAssign) {
if (usableWorkers(pos).memoryFree >=
app.desc.memoryPerExecutor * (assigned(pos).length + 1)) {
leftCoreToAssign -= coreToAssign
assigned(pos) += coreToAssign
assignedSum(pos) += coreToAssign
} else {
memoryNotEnoughFlags(pos) = true
noEnoughMemoryWorkerNum += 1
}
}
pos = (pos + 1) % numUsable
Expand All @@ -548,6 +557,7 @@ private[spark] class Master(
for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
for (app <- waitingApps if app.coresLeft > 0 &&
app.desc.memoryPerExecutor <= worker.memoryFree) {
val coreNumPerExecutor = app.desc.maxCorePerExecutor.get
var coresLeft = math.min(worker.coresFree, app.coresLeft)
while (coresLeft > 0 && app.desc.memoryPerExecutor <= worker.memoryFree) {
val coreToAssign = math.min(coreNumPerExecutor, coresLeft)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ private[spark] class SparkDeploySchedulerBackend(
val sparkHome = sc.getSparkHome()
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))

if (conf.getBoolean("spark.executor.multiPerWorker", false)) {
appDesc.maxCorePerExecutor = Some(conf.getInt("spark.executor.maxCoreNumPerExecutor", 1))
}
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
}
Expand Down
4 changes: 2 additions & 2 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -682,10 +682,10 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
<td>spark.executor.coreNumPerExecutor</td>
<td>spark.executor.maxCoreNumPerExecutor</td>
<td>1</td>
<td>
set the number of cores assigned to each executor; this property is only valid when
set the max number of cores assigned to each executor; this property is only valid when
<code>spark.executor.multiPerWorker</code> is set to true.
</td>
</tr>
Expand Down