Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -290,11 +290,11 @@ private[spark] class ExecutorAllocationManager(
* under the current load to satisfy all running and pending tasks, rounded up.
*/
private[spark] def maxNumExecutorsNeededPerResourceProfile(rpId: Int): Int = {
val pending = listener.totalPendingTasksPerResourceProfile(rpId)
val pendingTask = listener.pendingTasksPerResourceProfile(rpId)
val pendingSpeculative = listener.pendingSpeculativeTasksPerResourceProfile(rpId)
val unschedulableTaskSets = listener.pendingUnschedulableTaskSetsPerResourceProfile(rpId)
val running = listener.totalRunningTasksPerResourceProfile(rpId)
val numRunningOrPendingTasks = pending + running
val numRunningOrPendingTasks = pendingTask + pendingSpeculative + running
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previously pending is pendingTasksPerResourceProfile(rp) + pendingSpeculativeTasksPerResourceProfile(rp), now we use pendingTasksPerResourceProfile() + pendingSpeculative, and pendingSpeculative simply calls pendingSpeculativeTasksPerResourceProfile(...)

seems like a straightforward code clean up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previously the pendingSpeculativeTasksPerResourceProfile(rp) will be called not only by totalPendingTasksPerResourceProfile(rpId) but also by val pendingSpeculative = listener.pendingSpeculativeTasksPerResourceProfile(rpId), this PR we only call the pendingSpeculativeTasksPerResourceProfile(rp) once to get the pendingSpeculativeTasks , and use it for numRunningOrPendingTasks and pendingSpeculative.

val rp = resourceProfileManager.resourceProfileFromId(rpId)
val tasksPerExecutor = rp.maxTasksPerExecutor(conf)
logDebug(s"max needed for rpId: $rpId numpending: $numRunningOrPendingTasks," +
Expand Down Expand Up @@ -916,18 +916,6 @@ private[spark] class ExecutorAllocationManager(
hasPendingSpeculativeTasks || hasPendingRegularTasks
}

def totalPendingTasksPerResourceProfile(rp: Int): Int = {
pendingTasksPerResourceProfile(rp) + pendingSpeculativeTasksPerResourceProfile(rp)
}

/**
* The number of tasks currently running across all stages.
* Include running-but-zombie stage attempts
*/
def totalRunningTasks(): Int = {
stageAttemptToNumRunningTask.values.sum
}

def totalRunningTasksPerResourceProfile(rp: Int): Int = {
val attempts = resourceProfileIdToStageAttempt.getOrElse(rp, Set.empty).toSeq
// attempts is a Set, change to Seq so we keep all values
Expand Down