From 5c7270e81630dd31dd6cedb845e9ac2ad63800fa Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Fri, 2 Apr 2021 20:12:39 -0700 Subject: [PATCH] [SPARK-34948][K8S] Add ownerReference to executor configmap to fix leakages Revert "[SPARK-XXX][K8S] Do not create an empty ConfigMap" This reverts commit 626b5f54ea43405c01511f425a4418028a1d1dbb. Add owner a a --- .../scheduler/cluster/k8s/ExecutorPodsAllocator.scala | 2 +- .../cluster/k8s/KubernetesClusterSchedulerBackend.scala | 7 +++++-- .../k8s/KubernetesClusterSchedulerBackendSuite.scala | 1 + 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 5fc81a6d8427..5ebd172f7dec 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -70,7 +70,7 @@ private[spark] class ExecutorPodsAllocator( private val shouldDeleteExecutors = conf.get(KUBERNETES_DELETE_EXECUTORS) - private val driverPod = kubernetesDriverPodName + val driverPod = kubernetesDriverPodName .map(name => Option(kubernetesClient.pods() .withName(name) .get()) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index 2bbd3bae29d9..ebc0411f7e53 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -20,11 +20,13 @@ import java.util.concurrent.{ScheduledExecutorService, TimeUnit} import scala.concurrent.Future +import io.fabric8.kubernetes.api.model.Pod import io.fabric8.kubernetes.client.KubernetesClient import org.apache.spark.SparkContext import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesUtils import org.apache.spark.deploy.k8s.submit.KubernetesClientUtils import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.config.SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO @@ -67,13 +69,14 @@ private[spark] class KubernetesClusterSchedulerBackend( } } - private def setUpExecutorConfigMap(): Unit = { + private def setUpExecutorConfigMap(driverPod: Option[Pod]): Unit = { val configMapName = KubernetesClientUtils.configMapNameExecutor val confFilesMap = KubernetesClientUtils .buildSparkConfDirFilesMap(configMapName, conf, Map.empty) val labels = Map(SPARK_APP_ID_LABEL -> applicationId(), SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) val configMap = KubernetesClientUtils.buildConfigMap(configMapName, confFilesMap, labels) + KubernetesUtils.addOwnerReference(driverPod.orNull, Seq(configMap)) kubernetesClient.configMaps().create(configMap) } @@ -97,7 +100,7 @@ private[spark] class KubernetesClusterSchedulerBackend( watchEvents.start(applicationId()) pollEvents.start(applicationId()) if (!conf.get(KUBERNETES_EXECUTOR_DISABLE_CONFIGMAP)) { - setUpExecutorConfigMap() + setUpExecutorConfigMap(podAllocator.driverPod) } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala index 8b2f5d6579e9..c4b878df8be4 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala @@ -112,6 +112,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn .thenReturn(driverEndpointRef) when(kubernetesClient.pods()).thenReturn(podOperations) when(kubernetesClient.configMaps()).thenReturn(configMapsOperations) + when(podAllocator.driverPod).thenReturn(None) schedulerBackendUnderTest = new KubernetesClusterSchedulerBackend( taskScheduler, sc,