Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Ensure driver is ready before executors start
Before creating executor pods, wait until driver gets ready. The
driver's headless service can be resolved by DNS only after the driver
pod is ready. If the executor tries to connect to the headless service
before driver pod is ready, it will hit UnkownHostException and get into
error state but will not be restarted.
  • Loading branch information
cchriswu committed Jun 2, 2021
commit 1e4ad6756253914f2bf3c5b25dfb4c3efbf2deb9
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.scheduler.cluster.k8s

import java.time.Instant
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -61,6 +62,8 @@ private[spark] class ExecutorPodsAllocator(
podAllocationDelay * 5,
conf.get(KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT))

private val driverPodReadinessTimeout = 5

private val executorIdleTimeout = conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT) * 1000

private val namespace = conf.get(KUBERNETES_NAMESPACE)
Expand Down Expand Up @@ -345,6 +348,16 @@ private[spark] class ExecutorPodsAllocator(
s"ResourceProfile Id: $resourceProfileId, target: $expected running: $running.")
// Check reusable PVCs for this executor allocation batch
val reusablePVCs = getReusablePVCs(applicationId, pvcsInUse)
// wait until the driver pod is ready to ensure executors can connect to driver svc
if (numExecutorsToAllocate > 0) {
try {
kubernetesClient.pods().inNamespace(namespace).withName(kubernetesDriverPodName.get).
waitUntilReady(driverPodReadinessTimeout, TimeUnit.MINUTES)
} catch {
case e: InterruptedException =>
logWarning(s"Timeout waiting for driver pod ${kubernetesDriverPodName.get} get ready")
}
}
for ( _ <- 0 until numExecutorsToAllocate) {
val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet()
val executorConf = KubernetesConf.createExecutorConf(
Expand Down