Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ private[spark] object Config extends Logging {

val KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT =
ConfigBuilder("spark.kubernetes.allocation.executor.timeout")
.doc("Time to wait before considering a pending executor timedout.")
.doc("Time to wait before a newly created executor POD request, which does not reached " +
"the POD pending state yet, considered timedout and will be deleted.")
.version("3.1.0")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.timeConf(TimeUnit.MILLISECONDS)
.checkValue(value => value > 0, "Allocation executor timeout must be a positive time value.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.scheduler.cluster.k8s

import java.time.Instant
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}

import scala.collection.JavaConverters._
import scala.collection.mutable
Expand Down Expand Up @@ -82,9 +82,14 @@ private[spark] class ExecutorPodsAllocator(
// snapshot yet. Mapped to the (ResourceProfile id, timestamp) when they were created.
private val newlyCreatedExecutors = mutable.LinkedHashMap.empty[Long, (Int, Long)]

// Executor IDs that have been requested from Kubernetes but have not been detected in any POD
// snapshot yet but already known by the scheduler backend. Mapped to the ResourceProfile id.
private val schedulerKnownNewlyCreatedExecs = mutable.LinkedHashMap.empty[Long, Int]

private val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(conf)

private val hasPendingPods = new AtomicBoolean()
// visible for tests
private[k8s] val numOutstandingPods = new AtomicInteger()

private var lastSnapshot = ExecutorPodsSnapshot()

Expand All @@ -93,9 +98,9 @@ private[spark] class ExecutorPodsAllocator(
// if they happen to come up before the deletion takes effect.
@volatile private var deletedExecutorIds = Set.empty[Long]

def start(applicationId: String): Unit = {
def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = {
snapshotsStore.addSubscriber(podAllocationDelay) {
onNewSnapshots(applicationId, _)
onNewSnapshots(applicationId, schedulerBackend, _)
}
}

Expand All @@ -105,7 +110,7 @@ private[spark] class ExecutorPodsAllocator(
totalExpectedExecutorsPerResourceProfileId.put(rp.id, numExecs)
}
logDebug(s"Set total expected execs to $totalExpectedExecutorsPerResourceProfileId")
if (!hasPendingPods.get()) {
if (numOutstandingPods.get() == 0) {
snapshotsStore.notifySubscribers()
}
}
Expand All @@ -114,8 +119,19 @@ private[spark] class ExecutorPodsAllocator(

private def onNewSnapshots(
applicationId: String,
schedulerBackend: KubernetesClusterSchedulerBackend,
snapshots: Seq[ExecutorPodsSnapshot]): Unit = {
newlyCreatedExecutors --= snapshots.flatMap(_.executorPods.keys)
val k8sKnownExecIds = snapshots.flatMap(_.executorPods.keys)
newlyCreatedExecutors --= k8sKnownExecIds
schedulerKnownNewlyCreatedExecs --= k8sKnownExecIds

// transfer the scheduler backend known executor requests from the newlyCreatedExecutors
// to the schedulerKnownNewlyCreatedExecs
val schedulerKnownExecs = schedulerBackend.getExecutorIds().map(_.toLong).toSet
schedulerKnownNewlyCreatedExecs ++=
newlyCreatedExecutors.filterKeys(schedulerKnownExecs.contains(_)).mapValues(_._1)
newlyCreatedExecutors --= schedulerKnownNewlyCreatedExecs.keySet

// For all executors we've created against the API but have not seen in a snapshot
// yet - check the current time. If the current time has exceeded some threshold,
// assume that the pod was either never created (the API server never properly
Expand Down Expand Up @@ -164,15 +180,16 @@ private[spark] class ExecutorPodsAllocator(
_deletedExecutorIds = _deletedExecutorIds.filter(existingExecs.contains)
}

val notDeletedPods = lastSnapshot.executorPods.filterKeys(!_deletedExecutorIds.contains(_))
Copy link
Contributor Author

@attilapiros attilapiros Feb 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes an extra bug I found during testing. Without this change the 5.) point in my new test would fail as those pending pods which requested to be deleted earlier by the executor PODs allocator counted as available pending PODs for this resource profile (into the currentPendingExecutorsForRpId) and upscale is not triggered at:

if (knownPodCount > targetNum) {
}

As knownPodCount contains the currentPendingExecutorsForRpId.size.

// Map the pods into per ResourceProfile id so we can check per ResourceProfile,
// add a fast path if not using other ResourceProfiles.
val rpIdToExecsAndPodState =
mutable.HashMap[Int, mutable.HashMap[Long, ExecutorPodState]]()
if (totalExpectedExecutorsPerResourceProfileId.size <= 1) {
rpIdToExecsAndPodState(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) =
mutable.HashMap.empty ++= lastSnapshot.executorPods
mutable.HashMap.empty ++= notDeletedPods
} else {
lastSnapshot.executorPods.foreach { case (execId, execPodState) =>
notDeletedPods.foreach { case (execId, execPodState) =>
val rpId = execPodState.pod.getMetadata.getLabels.get(SPARK_RESOURCE_PROFILE_ID_LABEL).toInt
val execPods = rpIdToExecsAndPodState.getOrElseUpdate(rpId,
mutable.HashMap[Long, ExecutorPodState]())
Expand All @@ -190,24 +207,33 @@ private[spark] class ExecutorPodsAllocator(
case _ => false
}

val currentPendingExecutors = podsForRpId.filter {
val (schedulerKnownPendingExecsForRpId, currentPendingExecutorsForRpId) = podsForRpId.filter {
case (_, PodPending(_)) => true
case _ => false
}.partition { case (k, _) =>
schedulerKnownExecs.contains(k)
}
// This variable is used later to print some debug logs. It's updated when cleaning up
// excess pod requests, since currentPendingExecutors is immutable.
var knownPendingCount = currentPendingExecutors.size
// excess pod requests, since currentPendingExecutorsForRpId is immutable.
var knownPendingCount = currentPendingExecutorsForRpId.size

val newlyCreatedExecutorsForRpId =
newlyCreatedExecutors.filter { case (_, (waitingRpId, _)) =>
rpId == waitingRpId
}

val schedulerKnownNewlyCreatedExecsForRpId =
schedulerKnownNewlyCreatedExecs.filter { case (_, waitingRpId) =>
rpId == waitingRpId
}

if (podsForRpId.nonEmpty) {
logDebug(s"ResourceProfile Id: $rpId " +
s"pod allocation status: $currentRunningCount running, " +
s"${currentPendingExecutors.size} pending. " +
s"${newlyCreatedExecutorsForRpId.size} unacknowledged.")
s"${currentPendingExecutorsForRpId.size} unknown pending, " +
s"${schedulerKnownPendingExecsForRpId.size} scheduler backend known pending, " +
s"${newlyCreatedExecutorsForRpId.size} unknown newly created, " +
s"${schedulerKnownNewlyCreatedExecsForRpId.size} scheduler backend known newly created.")
}

// It's possible that we have outstanding pods that are outdated when dynamic allocation
Expand All @@ -218,16 +244,17 @@ private[spark] class ExecutorPodsAllocator(
//
// TODO: with dynamic allocation off, handle edge cases if we end up with more running
// executors than expected.
val knownPodCount = currentRunningCount + currentPendingExecutors.size +
newlyCreatedExecutorsForRpId.size
val knownPodCount = currentRunningCount +
currentPendingExecutorsForRpId.size + schedulerKnownPendingExecsForRpId.size +
newlyCreatedExecutorsForRpId.size + schedulerKnownNewlyCreatedExecsForRpId.size

if (knownPodCount > targetNum) {
val excess = knownPodCount - targetNum
val newlyCreatedToDelete = newlyCreatedExecutorsForRpId
.filter { case (_, (_, createTime)) =>
currentTime - createTime > executorIdleTimeout
}.keys.take(excess).toList
val knownPendingToDelete = currentPendingExecutors
val knownPendingToDelete = currentPendingExecutorsForRpId
.filter(x => isExecutorIdleTimedOut(x._2, currentTime))
.take(excess - newlyCreatedToDelete.size)
.map { case (id, _) => id }
Expand All @@ -245,7 +272,7 @@ private[spark] class ExecutorPodsAllocator(
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, toDelete.sorted.map(_.toString): _*)
.delete()
newlyCreatedExecutors --= toDelete
newlyCreatedExecutors --= newlyCreatedToDelete
knownPendingCount -= knownPendingToDelete.size
}
}
Expand Down Expand Up @@ -276,8 +303,9 @@ private[spark] class ExecutorPodsAllocator(
deletedExecutorIds = _deletedExecutorIds

// Update the flag that helps the setTotalExpectedExecutors() callback avoid triggering this
// update method when not needed.
hasPendingPods.set(totalPendingCount + newlyCreatedExecutors.size > 0)
// update method when not needed. PODs known by the scheduler backend are not counted here as
// they considered running PODs and they should not block upscaling.
numOutstandingPods.set(totalPendingCount + newlyCreatedExecutors.size)
}

private def requestNewExecutors(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
val initExecs = Map(defaultProfile -> initialExecutors)
podAllocator.setTotalExpectedExecutors(initExecs)
lifecycleEventHandler.start(this)
podAllocator.start(applicationId())
podAllocator.start(applicationId(), this)
watchEvents.start(applicationId())
pollEvents.start(applicationId())
if (!conf.get(KUBERNETES_EXECUTOR_DISABLE_CONFIGMAP)) {
Expand Down
Loading