diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index d8fa9e40c7e4..8ac2ee4792e1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -272,7 +272,8 @@ private[spark] object Config extends Logging { val KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT = ConfigBuilder("spark.kubernetes.allocation.executor.timeout") - .doc("Time to wait before considering a pending executor timedout.") + .doc("Time to wait before a newly created executor POD request, which does not reached " + + "the POD pending state yet, considered timedout and will be deleted.") .version("3.1.0") .timeConf(TimeUnit.MILLISECONDS) .checkValue(value => value > 0, "Allocation executor timeout must be a positive time value.") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index eb35de875959..5fc81a6d8427 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -18,7 +18,7 @@ package org.apache.spark.scheduler.cluster.k8s import java.time.Instant import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} import scala.collection.JavaConverters._ import scala.collection.mutable @@ -82,9 +82,14 @@ private[spark] class ExecutorPodsAllocator( // snapshot yet. Mapped to the (ResourceProfile id, timestamp) when they were created. private val newlyCreatedExecutors = mutable.LinkedHashMap.empty[Long, (Int, Long)] + // Executor IDs that have been requested from Kubernetes but have not been detected in any POD + // snapshot yet but already known by the scheduler backend. Mapped to the ResourceProfile id. + private val schedulerKnownNewlyCreatedExecs = mutable.LinkedHashMap.empty[Long, Int] + private val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(conf) - private val hasPendingPods = new AtomicBoolean() + // visible for tests + private[k8s] val numOutstandingPods = new AtomicInteger() private var lastSnapshot = ExecutorPodsSnapshot() @@ -93,9 +98,9 @@ private[spark] class ExecutorPodsAllocator( // if they happen to come up before the deletion takes effect. @volatile private var deletedExecutorIds = Set.empty[Long] - def start(applicationId: String): Unit = { + def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { snapshotsStore.addSubscriber(podAllocationDelay) { - onNewSnapshots(applicationId, _) + onNewSnapshots(applicationId, schedulerBackend, _) } } @@ -105,7 +110,7 @@ private[spark] class ExecutorPodsAllocator( totalExpectedExecutorsPerResourceProfileId.put(rp.id, numExecs) } logDebug(s"Set total expected execs to $totalExpectedExecutorsPerResourceProfileId") - if (!hasPendingPods.get()) { + if (numOutstandingPods.get() == 0) { snapshotsStore.notifySubscribers() } } @@ -114,8 +119,19 @@ private[spark] class ExecutorPodsAllocator( private def onNewSnapshots( applicationId: String, + schedulerBackend: KubernetesClusterSchedulerBackend, snapshots: Seq[ExecutorPodsSnapshot]): Unit = { - newlyCreatedExecutors --= snapshots.flatMap(_.executorPods.keys) + val k8sKnownExecIds = snapshots.flatMap(_.executorPods.keys) + newlyCreatedExecutors --= k8sKnownExecIds + schedulerKnownNewlyCreatedExecs --= k8sKnownExecIds + + // transfer the scheduler backend known executor requests from the newlyCreatedExecutors + // to the schedulerKnownNewlyCreatedExecs + val schedulerKnownExecs = schedulerBackend.getExecutorIds().map(_.toLong).toSet + schedulerKnownNewlyCreatedExecs ++= + newlyCreatedExecutors.filterKeys(schedulerKnownExecs.contains(_)).mapValues(_._1) + newlyCreatedExecutors --= schedulerKnownNewlyCreatedExecs.keySet + // For all executors we've created against the API but have not seen in a snapshot // yet - check the current time. If the current time has exceeded some threshold, // assume that the pod was either never created (the API server never properly @@ -164,15 +180,16 @@ private[spark] class ExecutorPodsAllocator( _deletedExecutorIds = _deletedExecutorIds.filter(existingExecs.contains) } + val notDeletedPods = lastSnapshot.executorPods.filterKeys(!_deletedExecutorIds.contains(_)) // Map the pods into per ResourceProfile id so we can check per ResourceProfile, // add a fast path if not using other ResourceProfiles. val rpIdToExecsAndPodState = mutable.HashMap[Int, mutable.HashMap[Long, ExecutorPodState]]() if (totalExpectedExecutorsPerResourceProfileId.size <= 1) { rpIdToExecsAndPodState(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) = - mutable.HashMap.empty ++= lastSnapshot.executorPods + mutable.HashMap.empty ++= notDeletedPods } else { - lastSnapshot.executorPods.foreach { case (execId, execPodState) => + notDeletedPods.foreach { case (execId, execPodState) => val rpId = execPodState.pod.getMetadata.getLabels.get(SPARK_RESOURCE_PROFILE_ID_LABEL).toInt val execPods = rpIdToExecsAndPodState.getOrElseUpdate(rpId, mutable.HashMap[Long, ExecutorPodState]()) @@ -190,24 +207,33 @@ private[spark] class ExecutorPodsAllocator( case _ => false } - val currentPendingExecutors = podsForRpId.filter { + val (schedulerKnownPendingExecsForRpId, currentPendingExecutorsForRpId) = podsForRpId.filter { case (_, PodPending(_)) => true case _ => false + }.partition { case (k, _) => + schedulerKnownExecs.contains(k) } // This variable is used later to print some debug logs. It's updated when cleaning up - // excess pod requests, since currentPendingExecutors is immutable. - var knownPendingCount = currentPendingExecutors.size + // excess pod requests, since currentPendingExecutorsForRpId is immutable. + var knownPendingCount = currentPendingExecutorsForRpId.size val newlyCreatedExecutorsForRpId = newlyCreatedExecutors.filter { case (_, (waitingRpId, _)) => rpId == waitingRpId } + val schedulerKnownNewlyCreatedExecsForRpId = + schedulerKnownNewlyCreatedExecs.filter { case (_, waitingRpId) => + rpId == waitingRpId + } + if (podsForRpId.nonEmpty) { logDebug(s"ResourceProfile Id: $rpId " + s"pod allocation status: $currentRunningCount running, " + - s"${currentPendingExecutors.size} pending. " + - s"${newlyCreatedExecutorsForRpId.size} unacknowledged.") + s"${currentPendingExecutorsForRpId.size} unknown pending, " + + s"${schedulerKnownPendingExecsForRpId.size} scheduler backend known pending, " + + s"${newlyCreatedExecutorsForRpId.size} unknown newly created, " + + s"${schedulerKnownNewlyCreatedExecsForRpId.size} scheduler backend known newly created.") } // It's possible that we have outstanding pods that are outdated when dynamic allocation @@ -218,8 +244,9 @@ private[spark] class ExecutorPodsAllocator( // // TODO: with dynamic allocation off, handle edge cases if we end up with more running // executors than expected. - val knownPodCount = currentRunningCount + currentPendingExecutors.size + - newlyCreatedExecutorsForRpId.size + val knownPodCount = currentRunningCount + + currentPendingExecutorsForRpId.size + schedulerKnownPendingExecsForRpId.size + + newlyCreatedExecutorsForRpId.size + schedulerKnownNewlyCreatedExecsForRpId.size if (knownPodCount > targetNum) { val excess = knownPodCount - targetNum @@ -227,7 +254,7 @@ private[spark] class ExecutorPodsAllocator( .filter { case (_, (_, createTime)) => currentTime - createTime > executorIdleTimeout }.keys.take(excess).toList - val knownPendingToDelete = currentPendingExecutors + val knownPendingToDelete = currentPendingExecutorsForRpId .filter(x => isExecutorIdleTimedOut(x._2, currentTime)) .take(excess - newlyCreatedToDelete.size) .map { case (id, _) => id } @@ -245,7 +272,7 @@ private[spark] class ExecutorPodsAllocator( .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) .withLabelIn(SPARK_EXECUTOR_ID_LABEL, toDelete.sorted.map(_.toString): _*) .delete() - newlyCreatedExecutors --= toDelete + newlyCreatedExecutors --= newlyCreatedToDelete knownPendingCount -= knownPendingToDelete.size } } @@ -276,8 +303,9 @@ private[spark] class ExecutorPodsAllocator( deletedExecutorIds = _deletedExecutorIds // Update the flag that helps the setTotalExpectedExecutors() callback avoid triggering this - // update method when not needed. - hasPendingPods.set(totalPendingCount + newlyCreatedExecutors.size > 0) + // update method when not needed. PODs known by the scheduler backend are not counted here as + // they considered running PODs and they should not block upscaling. + numOutstandingPods.set(totalPendingCount + newlyCreatedExecutors.size) } private def requestNewExecutors( diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index c933c49a49bc..2bbd3bae29d9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -93,7 +93,7 @@ private[spark] class KubernetesClusterSchedulerBackend( val initExecs = Map(defaultProfile -> initialExecutors) podAllocator.setTotalExpectedExecutors(initExecs) lifecycleEventHandler.start(this) - podAllocator.start(applicationId()) + podAllocator.start(applicationId(), this) watchEvents.start(applicationId()) pollEvents.start(applicationId()) if (!conf.get(KUBERNETES_EXECUTOR_DISABLE_CONFIGMAP)) { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index ed7e7da24310..0765d835c3fe 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -81,6 +81,9 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { @Mock private var executorBuilder: KubernetesExecutorBuilder = _ + @Mock + private var schedulerBackend: KubernetesClusterSchedulerBackend = _ + private var snapshotsStore: DeterministicExecutorPodsSnapshotsStore = _ private var podsAllocatorUnderTest: ExecutorPodsAllocator = _ @@ -96,12 +99,14 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { waitForExecutorPodsClock = new ManualClock(0L) podsAllocatorUnderTest = new ExecutorPodsAllocator( conf, secMgr, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) - podsAllocatorUnderTest.start(TEST_SPARK_APP_ID) + when(schedulerBackend.getExecutorIds).thenReturn(Seq.empty) + podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) } test("Initially request executors in batches. Do not request another batch if the" + " first has not finished.") { podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> (podAllocationSize + 1))) + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5) for (nextId <- 1 to podAllocationSize) { verify(podOperations).create(podWithAttachedContainerForId(nextId)) } @@ -111,28 +116,34 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { test("Request executors in batches. Allow another batch to be requested if" + " all pending executors start running.") { podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> (podAllocationSize + 1))) + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5) for (execId <- 1 until podAllocationSize) { snapshotsStore.updatePod(runningExecutor(execId)) } snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) verify(podOperations, never()).create(podWithAttachedContainerForId(podAllocationSize + 1)) snapshotsStore.updatePod(runningExecutor(podAllocationSize)) snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) verify(podOperations).create(podWithAttachedContainerForId(podAllocationSize + 1)) snapshotsStore.updatePod(runningExecutor(podAllocationSize)) snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) verify(podOperations, times(podAllocationSize + 1)).create(any(classOf[Pod])) } test("When a current batch reaches error states immediately, re-request" + " them on the next batch.") { podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> podAllocationSize)) + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5) for (execId <- 1 until podAllocationSize) { snapshotsStore.updatePod(runningExecutor(execId)) } val failedPod = failedExecutorWithoutDeletion(podAllocationSize) snapshotsStore.updatePod(failedPod) snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) verify(podOperations).create(podWithAttachedContainerForId(podAllocationSize + 1)) } @@ -148,9 +159,11 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { .withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1")) .thenReturn(labeledPods) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1)) + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) verify(podOperations).create(podWithAttachedContainerForId(1)) waitForExecutorPodsClock.setTime(podCreationTimeout + 1) snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) verify(labeledPods).delete() verify(podOperations).create(podWithAttachedContainerForId(2)) } @@ -174,17 +187,20 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { // Target 1 executor, make sure it's requested, even with an empty initial snapshot. podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1)) + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) verify(podOperations).create(podWithAttachedContainerForId(1)) // Mark executor as running, verify that subsequent allocation cycle is a no-op. snapshotsStore.updatePod(runningExecutor(1)) snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) verify(podOperations, times(1)).create(any()) verify(podOperations, never()).delete() // Request 3 more executors, make sure all are requested. podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 4)) snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3) verify(podOperations).create(podWithAttachedContainerForId(2)) verify(podOperations).create(podWithAttachedContainerForId(3)) verify(podOperations).create(podWithAttachedContainerForId(4)) @@ -193,6 +209,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { snapshotsStore.updatePod(runningExecutor(2)) snapshotsStore.updatePod(pendingExecutor(3)) snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 2) verify(podOperations, times(4)).create(any()) verify(podOperations, never()).delete() @@ -200,6 +217,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { waitForExecutorPodsClock.advance(executorIdleTimeout * 2) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1)) snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) verify(podOperations, times(4)).create(any()) verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "3", "4") verify(podOperations).delete() @@ -212,6 +230,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { snapshotsStore.updatePod(deletedExecutor(4)) snapshotsStore.removeDeletedExecutors() snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) assert(!podsAllocatorUnderTest.isDeleted("3")) assert(!podsAllocatorUnderTest.isDeleted("4")) } @@ -279,6 +298,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { waitForExecutorPodsClock.setTime(startTime) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 5)) + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5) verify(podOperations).create(podWithAttachedContainerForId(1)) verify(podOperations).create(podWithAttachedContainerForId(2)) verify(podOperations).create(podWithAttachedContainerForId(3)) @@ -292,16 +312,139 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { // Newly created executors (both acknowledged and not) are protected by executorIdleTimeout podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 0)) snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5) verify(podOperations, never()).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1", "2", "3", "4", "5") verify(podOperations, never()).delete() // Newly created executors (both acknowledged and not) are cleaned up. waitForExecutorPodsClock.advance(executorIdleTimeout * 2) + when(schedulerBackend.getExecutorIds).thenReturn(Seq("1", "3", "4")) snapshotsStore.notifySubscribers() - verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1", "2", "3", "4", "5") + // SPARK-34361: even as 1, 3 and 4 are not timed out as they are considered as known PODs so + // this is why they are not counted into the outstanding PODs and /they are not removed even + // though executor 1 is still in pending state and executor 3 and 4 are new request without + // any state reported by kubernetes and all the three are already timed out + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) + verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "2", "5") verify(podOperations).delete() } + /** + * This test covers some downscaling and upscaling of dynamic allocation on kubernetes + * along with multiple resource profiles (default and rp) when some executors + * already know by the scheduler backend. + * + * Legend: + * + * N-: newly created not known by the scheduler backend + * N+: newly created known by the scheduler backend + * P- / P+ : pending (not know / known) by the scheduler backend + * D: deleted + * | default || rp | expected + * | || | outstanding + * | 1 | 2 | 3 || 4 | 5 | 6 | 7 | PODs + * ========================================================================================== + * 0) setTotalExpectedExecs with | N- | N- | N- || N- | N- | N- | N- | + * default->3, ro->4 | | | || | | | | 7 + * ------------------------------------------------------------------------------------------ + * 1) make 1 from each rp | N+ | N- | N- || N+ | N- | N- | N- | + * known by backend | | | || | | | | 5 + * ------------------------------------------------------------------------------------------- + * 2) some more backend known + pending | N+ | P+ | P- || N+ | P+ | P- | N- | 3 + * ------------------------------------------------------------------------------------------- + * 3) advance time with idle timeout | | | || | | | | + * setTotalExpectedExecs with | N+ | P+ | D || N+ | P+ | D | D | 0 + * default->1, rp->1 | | | || | | | | + * ------------------------------------------------------------------------------------------- + * 4) setTotalExpectedExecs with | N+ | P+ | D || N+ | P+ | D | D | 0 and + * default->2, rp->2 | | | || | | | | no new POD req. + * =========================================================================================== + * + * 5) setTotalExpectedExecs with default -> 3, rp -> 3 which will lead to creation of the new + * PODs: 8 and 9 + */ + test("SPARK-34361: scheduler backend known pods with multiple resource profiles at downscaling") { + when(podOperations + .withField("status.phase", "Pending")) + .thenReturn(podOperations) + when(podOperations + .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)) + .thenReturn(podOperations) + when(podOperations + .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)) + .thenReturn(podOperations) + when(podOperations + .withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any())) + .thenReturn(podOperations) + + val startTime = Instant.now.toEpochMilli + waitForExecutorPodsClock.setTime(startTime) + + val rpb = new ResourceProfileBuilder() + val ereq = new ExecutorResourceRequests() + val treq = new TaskResourceRequests() + ereq.cores(4).memory("2g") + treq.cpus(2) + rpb.require(ereq).require(treq) + val rp = rpb.build() + + // 0) request 3 PODs for the default and 4 PODs for the other resource profile + podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 3, rp -> 4)) + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 7) + verify(podOperations).create(podWithAttachedContainerForId(1, defaultProfile.id)) + verify(podOperations).create(podWithAttachedContainerForId(2, defaultProfile.id)) + verify(podOperations).create(podWithAttachedContainerForId(3, defaultProfile.id)) + verify(podOperations).create(podWithAttachedContainerForId(4, rp.id)) + verify(podOperations).create(podWithAttachedContainerForId(5, rp.id)) + verify(podOperations).create(podWithAttachedContainerForId(6, rp.id)) + verify(podOperations).create(podWithAttachedContainerForId(7, rp.id)) + + // 1) make 1 POD known by the scheduler backend for each resource profile + when(schedulerBackend.getExecutorIds).thenReturn(Seq("1", "4")) + snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5, + "scheduler backend known PODs are not outstanding") + verify(podOperations, times(7)).create(any()) + + // 2) make 1 extra POD known by the scheduler backend for each resource profile + // and make some to pending + when(schedulerBackend.getExecutorIds).thenReturn(Seq("1", "2", "4", "5")) + snapshotsStore.updatePod(pendingExecutor(2, defaultProfile.id)) + snapshotsStore.updatePod(pendingExecutor(3, defaultProfile.id)) + snapshotsStore.updatePod(pendingExecutor(5, rp.id)) + snapshotsStore.updatePod(pendingExecutor(6, rp.id)) + snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3) + verify(podOperations, times(7)).create(any()) + + // 3) downscale to 1 POD for default and 1 POD for the other resource profile + waitForExecutorPodsClock.advance(executorIdleTimeout * 2) + podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, rp -> 1)) + snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) + verify(podOperations, times(7)).create(any()) + verify(podOperations, times(2)).delete() + assert(podsAllocatorUnderTest.isDeleted("3")) + assert(podsAllocatorUnderTest.isDeleted("6")) + assert(podsAllocatorUnderTest.isDeleted("7")) + + // 4) upscale to 2 PODs for default and 2 for the other resource profile but as there is still + // 2 PODs known by the scheduler backend there must be no new POD requested to be created + podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2, rp -> 2)) + snapshotsStore.notifySubscribers() + verify(podOperations, times(7)).create(any()) + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) + verify(podOperations, times(7)).create(any()) + + // 5) requesting 1 more executor for each resource + podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 3, rp -> 3)) + snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 2) + verify(podOperations, times(9)).create(any()) + verify(podOperations).create(podWithAttachedContainerForId(8, defaultProfile.id)) + verify(podOperations).create(podWithAttachedContainerForId(9, rp.id)) + } + test("SPARK-33288: multiple resource profiles") { when(podOperations .withField("status.phase", "Pending")) @@ -330,6 +473,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { // Target 1 executor for default profile, 2 for other profile, // make sure it's requested, even with an empty initial snapshot. podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, rp -> 2)) + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3) verify(podOperations).create(podWithAttachedContainerForId(1, defaultProfile.id)) verify(podOperations).create(podWithAttachedContainerForId(2, rp.id)) verify(podOperations).create(podWithAttachedContainerForId(3, rp.id)) @@ -339,6 +483,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { snapshotsStore.updatePod(runningExecutor(2, rp.id)) snapshotsStore.updatePod(runningExecutor(3, rp.id)) snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) verify(podOperations, times(3)).create(any()) verify(podOperations, never()).delete() @@ -346,6 +491,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { // make sure all are requested. podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 4, rp -> 3)) snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 4) verify(podOperations).create(podWithAttachedContainerForId(4, defaultProfile.id)) verify(podOperations).create(podWithAttachedContainerForId(5, defaultProfile.id)) verify(podOperations).create(podWithAttachedContainerForId(6, defaultProfile.id)) @@ -356,6 +502,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { snapshotsStore.updatePod(pendingExecutor(5, defaultProfile.id)) snapshotsStore.updatePod(pendingExecutor(7, rp.id)) snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3) verify(podOperations, times(7)).create(any()) verify(podOperations, never()).delete() @@ -364,6 +511,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { waitForExecutorPodsClock.advance(executorIdleTimeout * 2) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, rp -> 1)) snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) verify(podOperations, times(7)).create(any()) verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "5", "6") verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "7") @@ -379,6 +527,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { snapshotsStore.updatePod(deletedExecutor(7)) snapshotsStore.removeDeletedExecutors() snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) assert(!podsAllocatorUnderTest.isDeleted("5")) assert(!podsAllocatorUnderTest.isDeleted("6")) assert(!podsAllocatorUnderTest.isDeleted("7")) @@ -399,6 +548,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { .thenReturn(podOperations) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 6)) + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5) // Initial request of pods verify(podOperations).create(podWithAttachedContainerForId(1)) verify(podOperations).create(podWithAttachedContainerForId(2)) @@ -414,6 +564,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { // We move forward one allocation cycle waitForExecutorPodsClock.setTime(podAllocationDelay + 1) snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 2) // We request pod 6 verify(podOperations).create(podWithAttachedContainerForId(6)) } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala index b63a28bdcc12..8b2f5d6579e9 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala @@ -127,7 +127,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn test("Start all components") { schedulerBackendUnderTest.start() verify(podAllocator).setTotalExpectedExecutors(Map(defaultProfile -> 3)) - verify(podAllocator).start(TEST_SPARK_APP_ID) + verify(podAllocator).start(TEST_SPARK_APP_ID, schedulerBackendUnderTest) verify(lifecycleEventHandler).start(schedulerBackendUnderTest) verify(watchEvents).start(TEST_SPARK_APP_ID) verify(pollEvents).start(TEST_SPARK_APP_ID)