Skip to content
Closed
Prev Previous commit
Next Next commit
Handle corner cases
  • Loading branch information
venkata91 committed Jul 21, 2020
commit 060b37e72e8730fcaa695ba69ffddea8da105dce
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ private[spark] class ExecutorAllocationManager(
private def maxNumExecutorsNeededPerResourceProfile(rpId: Int): Int = {
val pending = listener.totalPendingTasksPerResourceProfile(rpId)
val pendingSpeculative = listener.pendingSpeculativeTasksPerResourceProfile(rpId)
val numUnschedulables = listener.pendingUnschedulableTasksPerResourceProfile(rpId)
val running = listener.totalRunningTasksPerResourceProfile(rpId)
val numRunningOrPendingTasks = pending + running
val rp = resourceProfileManager.resourceProfileFromId(rpId)
Expand All @@ -289,13 +290,23 @@ private[spark] class ExecutorAllocationManager(
s" tasksperexecutor: $tasksPerExecutor")
val maxNeeded = math.ceil(numRunningOrPendingTasks * executorAllocationRatio /
tasksPerExecutor).toInt
if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) {
val totalNeed = if (tasksPerExecutor > 1 && maxNeeded == 1 && pendingSpeculative > 0) {
// If we have pending speculative tasks and only need a single executor, allocate one more
// to satisfy the locality requirements of speculation
maxNeeded + 1
} else {
maxNeeded
}

// Request additional executors to schedule the unschedulable tasks as well
if (numUnschedulables > 0) {
val maxNeededForUnschedulables = math.ceil(numUnschedulables * executorAllocationRatio /
tasksPerExecutor).toInt
math.max(totalNeed, executorMonitor.executorCountWithResourceProfile(rpId)) +
maxNeededForUnschedulables
} else {
totalNeed
}
}

private def totalRunningTasksPerResourceProfile(id: Int): Int = synchronized {
Expand Down Expand Up @@ -856,14 +867,7 @@ private[spark] class ExecutorAllocationManager(

def pendingUnschedulableTasksPerResourceProfile(rp: Int): Int = {
val attempts = resourceProfileIdToStageAttempt.getOrElse(rp, Set.empty).toSeq
val numUnschedulables = attempts
.filter(attempt => unschedulableTaskSets.contains(attempt)).size
val maxTasksPerExecutor =
resourceProfileManager.resourceProfileFromId(rp).maxTasksPerExecutor(conf)
// Need an additional executor since the unschedulableTasks cannot be currently
// scheduled on the available executors. This is to ensure that we'll always
// request for an additional executor.
numUnschedulables + maxTasksPerExecutor
attempts.filter(attempt => unschedulableTaskSets.contains(attempt)).size
}

def hasPendingUnschedulableTasks: Boolean = {
Expand All @@ -879,8 +883,7 @@ private[spark] class ExecutorAllocationManager(

def totalPendingTasksPerResourceProfile(rp: Int): Int = {
pendingTasksPerResourceProfile(rp) +
pendingSpeculativeTasksPerResourceProfile(rp) +
pendingUnschedulableTasksPerResourceProfile(rp)
pendingSpeculativeTasksPerResourceProfile(rp)
}

/**
Expand Down