Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.scheduler.cluster.k8s

import java.time.Instant
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -61,6 +62,8 @@ private[spark] class ExecutorPodsAllocator(
podAllocationDelay * 5,
conf.get(KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT))

private val driverPodReadinessTimeout = 5

private val executorIdleTimeout = conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT) * 1000

private val namespace = conf.get(KUBERNETES_NAMESPACE)
Expand Down Expand Up @@ -99,6 +102,16 @@ private[spark] class ExecutorPodsAllocator(
@volatile private var deletedExecutorIds = Set.empty[Long]

def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = {
// wait until the driver pod is ready to ensure executors can connect to driver svc
try {
kubernetesClient.pods()
.withName(kubernetesDriverPodName.get)
.waitUntilReady(driverPodReadinessTimeout, TimeUnit.MINUTES)
} catch {
case e: InterruptedException =>
logWarning(s"Timeout waiting for driver pod ${kubernetesDriverPodName.get} get ready in " +
s"namespace $namespace")
}
snapshotsStore.addSubscriber(podAllocationDelay) {
onNewSnapshots(applicationId, schedulerBackend, _)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
package org.apache.spark.scheduler.cluster.k8s

import java.time.Instant
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.PodResource
import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.ArgumentMatchers.{any, eq => meq}
import org.mockito.ArgumentMatchers.{any, anyInt, eq => meq}
import org.mockito.Mockito.{never, times, verify, when}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
Expand Down Expand Up @@ -104,6 +105,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
when(kubernetesClient.pods()).thenReturn(podOperations)
when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations)
when(driverPodOperations.get).thenReturn(driverPod)
when(driverPodOperations.waitUntilReady(anyInt(), any(classOf[TimeUnit])))
.thenReturn(driverPod)
when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr),
meq(kubernetesClient), any(classOf[ResourceProfile]))).thenAnswer(executorPodAnswer())
snapshotsStore = new DeterministicExecutorPodsSnapshotsStore()
Expand Down