@@ -551,36 +551,37 @@ private[master] class Master(
551551 * allocated at a time, 12 cores from each worker would be assigned to each executor.
552552 * Since 12 < 16, no executors would launch [SPARK-8881].
553553 */
554- private [master] def coresToAssign (
554+ private [master] def scheduleExecutorsOnWorkers (
555555 app : ApplicationInfo ,
556556 usableWorkers : Array [WorkerInfo ],
557557 spreadOutApps : Boolean ): Array [Int ] = {
558- // Default value for number of cores per executor is 1
558+ // If the number of cores per executor is not specified, then we can just schedule
559+ // 1 core at a time since we expect a single executor to be launched on each worker
559560 val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1 )
560561 val memoryPerExecutor = app.desc.memoryPerExecutorMB
561562 val numUsable = usableWorkers.length
562563 val assignedCores = new Array [Int ](numUsable) // Number of cores to give to each worker
563564 val assignedMemory = new Array [Int ](numUsable) // Amount of memory to give to each worker
564- var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
565+ var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
565566 var pos = 0
566567 if (spreadOutApps) {
567568 // Try to spread out executors among workers (sparse scheduling)
568- while (toAssign > 0 ) {
569+ while (coresToAssign > 0 ) {
569570 if (usableWorkers(pos).coresFree - assignedCores(pos) >= coresPerExecutor &&
570571 usableWorkers(pos).memoryFree - assignedMemory(pos) >= memoryPerExecutor) {
571- toAssign -= coresPerExecutor
572+ coresToAssign -= coresPerExecutor
572573 assignedCores(pos) += coresPerExecutor
573574 assignedMemory(pos) += memoryPerExecutor
574575 }
575576 pos = (pos + 1 ) % numUsable
576577 }
577578 } else {
578579 // Pack executors into as few workers as possible (dense scheduling)
579- while (toAssign > 0 ) {
580+ while (coresToAssign > 0 ) {
580581 while (usableWorkers(pos).coresFree - assignedCores(pos) >= coresPerExecutor &&
581582 usableWorkers(pos).memoryFree - assignedMemory(pos) >= memoryPerExecutor &&
582- toAssign > 0 ) {
583- toAssign -= coresPerExecutor
583+ coresToAssign > 0 ) {
584+ coresToAssign -= coresPerExecutor
584585 assignedCores(pos) += coresPerExecutor
585586 assignedMemory(pos) += memoryPerExecutor
586587 }
@@ -597,13 +598,12 @@ private[master] class Master(
597598 // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
598599 // in the queue, then the second app, etc.
599600 for (app <- waitingApps if app.coresLeft > 0 ) {
600- // Default value for number of cores per executor is 1
601- val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1 )
601+ val coresPerExecutor : Option [Int ] = app.desc.coresPerExecutor
602602 val usableWorkers = workers.toArray.filter(_.state == WorkerState .ALIVE )
603603 .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
604- worker.coresFree >= coresPerExecutor)
604+ worker.coresFree >= coresPerExecutor.getOrElse( 1 ) )
605605 .sortBy(_.coresFree).reverse
606- val assignedCores = coresToAssign (app, usableWorkers, spreadOutApps)
606+ val assignedCores = scheduleExecutorsOnWorkers (app, usableWorkers, spreadOutApps)
607607
608608 // Now that we've decided how many cores to allocate on each worker, let's allocate them
609609 var pos = 0
@@ -624,12 +624,15 @@ private[master] class Master(
624624 private def allocateWorkerResourceToExecutors (
625625 app : ApplicationInfo ,
626626 assignedCores : Int ,
627- coresPerExecutor : Int ,
627+ coresPerExecutor : Option [ Int ] ,
628628 worker : WorkerInfo ): Unit = {
629- // If cores per executor is specified, then this division should have a remainder of zero
630- val numExecutors = assignedCores / coresPerExecutor
629+ // If the number of cores per executor is specified, we divide the cores assigned
630+ // to this worker evenly among the executors with no remainder.
631+ // Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
632+ val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1 )
633+ val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
631634 for (i <- 1 to numExecutors) {
632- val exec = app.addExecutor(worker, coresPerExecutor )
635+ val exec = app.addExecutor(worker, coresToAssign )
633636 launchExecutor(worker, exec)
634637 app.state = ApplicationState .RUNNING
635638 }
0 commit comments