@@ -564,21 +564,28 @@ private[master] class Master(
564564 // each worker. The first index refers to the usable worker, and the second index
565565 // refers to the executor launched on that worker.
566566 val assigned = Array .fill[Array [Int ]](numUsable)(Array .fill[Int ](maxExecutorPerWorker)(0 ))
567- val workerPointer = Array .fill[Int ](numUsable)(0 )
567+ val executorNumberOnWorker = Array .fill[Int ](numUsable)(0 )
568568 val assignedSum = Array .fill[Int ](numUsable)(0 )
569569 var pos = 0
570570 while (maxCoresLeft > 0 ) {
571- if ((usableWorkers(pos).coresFree - assignedSum(pos) > 0 ) &&
572- (usableWorkers(pos).memoryFree >= memoryPerExecutor * (workerPointer(pos) + 1 ))) {
571+ val memoryDemand = {
572+ if (app.desc.maxCorePerExecutor.isDefined) {
573+ executorNumberOnWorker(pos) + 1
574+ } else {
575+ memoryPerExecutor
576+ }
577+ }
578+ if ((usableWorkers(pos).coresFree - assignedSum(pos) > 0 ) &&
579+ (usableWorkers(pos).memoryFree >= memoryDemand)) {
573580 val coreToAssign = math.min(math.min(usableWorkers(pos).coresFree - assignedSum(pos),
574581 maxCoreAllocationPerRound), maxCoresLeft)
575582 val workerAllocationArray = assigned(pos)
576- workerAllocationArray(workerPointer (pos)) += coreToAssign
583+ workerAllocationArray(executorNumberOnWorker (pos)) += coreToAssign
577584 assignedSum(pos) += coreToAssign
578585 maxCoresLeft -= coreToAssign
579- if (app.desc.maxCorePerExecutor.isDefined) {
586+ if (app.desc.maxCorePerExecutor.isDefined || executorNumberOnWorker(pos) == 0 ) {
580587 // if starting multiple executors on the worker, we move to the next executor
581- workerPointer (pos) += 1
588+ executorNumberOnWorker (pos) += 1
582589 }
583590 }
584591 pos = (pos + 1 ) % numUsable
@@ -587,7 +594,7 @@ private[master] class Master(
587594 // Now that we've decided how many executors and the core number for each to
588595 // give on each node, let's actually give them
589596 for (pos <- 0 until numUsable) {
590- for (execIdx <- 0 until workerPointer (pos)) {
597+ for (execIdx <- 0 until executorNumberOnWorker (pos)) {
591598 val exec = app.addExecutor(usableWorkers(pos), assigned(pos)(execIdx))
592599 launchExecutor(usableWorkers(pos), exec)
593600 app.state = ApplicationState .RUNNING
0 commit comments