Skip to content

Commit 9fafec3

Browse files
author
Chris Wu
committed
Add config for the timeout
1 parent 02cf0f3 commit 9fafec3

File tree

2 files changed

+15
-8
lines changed

2 files changed

+15
-8
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,16 @@ private[spark] object Config extends Logging {
300300
.checkValue(value => value > 0, "Allocation batch delay must be a positive time value.")
301301
.createWithDefaultString("1s")
302302

303+
val KUBERNETES_ALLOCATION_DRIVER_READINESS_TIMEOUT =
304+
ConfigBuilder("spark.kubernetes.allocation.driver.readinessTimeout")
305+
.doc("Time to wait for driver pod to get ready before creating executor pods. This wait " +
306+
"only happens on application start. If timeout happens, executor pods will still be " +
307+
"created.")
308+
.version("3.2.0")
309+
.timeConf(TimeUnit.SECONDS)
310+
.checkValue(value => value > 0, "Allocation driver readiness timeout must be a positive time value.")
311+
.createWithDefaultString("1s")
312+
303313
val KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT =
304314
ConfigBuilder("spark.kubernetes.allocation.executor.timeout")
305315
.doc("Time to wait before a newly created executor POD request, which does not reached " +

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ private[spark] class ExecutorPodsAllocator(
6262
podAllocationDelay * 5,
6363
conf.get(KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT))
6464

65-
private val driverPodReadinessTimeout = 5
65+
private val driverPodReadinessTimeout = conf.get(KUBERNETES_ALLOCATION_DRIVER_READINESS_TIMEOUT)
6666

6767
private val executorIdleTimeout = conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT) * 1000
6868

@@ -104,14 +104,11 @@ private[spark] class ExecutorPodsAllocator(
104104
def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = {
105105
// Wait until the driver pod is ready before starting executors, as the headless service won't
106106
// be resolvable by DNS until the driver pod is ready.
107-
try {
108-
kubernetesClient.pods()
107+
Utils.tryLogNonFatalError {
108+
kubernetesClient
109+
.pods()
109110
.withName(kubernetesDriverPodName.get)
110-
.waitUntilReady(driverPodReadinessTimeout, TimeUnit.MINUTES)
111-
} catch {
112-
case e: InterruptedException =>
113-
logWarning(s"Timeout waiting for driver pod ${kubernetesDriverPodName.get} get ready in " +
114-
s"namespace $namespace")
111+
.waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS)
115112
}
116113
snapshotsStore.addSubscriber(podAllocationDelay) {
117114
onNewSnapshots(applicationId, schedulerBackend, _)

0 commit comments

Comments
 (0)