diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index 5884348cb3e4..fdfc7e8efc43 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -159,6 +159,19 @@ private[spark] class Client( logInfo(s"Waiting for application $appName to finish...") watcher.awaitCompletion() logInfo(s"Application $appName finished.") + try { + // Upon completion of the application, the client deletes all auxiliary Kubernetes + // resources the application depended on instead of relying on Kubernetes garbage + // collection to kick in on the resources, which only happens when the driver pod + // gets explicitly deleted. Auxiliary resources include the headless service for + // the driver and the ConfigMap for the init-container, for example. Such resources + // are no longer needed once an application completes, but they are still persisted + // by the API server and there should be deleted upon completion. + val otherKubernetesResources = currentDriverSpec.otherKubernetesResources + kubernetesClient.resourceList(otherKubernetesResources: _*).delete() + } catch { + case e: Throwable => logWarning("Failed to clean up auxiliary Kubernetes resources", e) + } } else { logInfo(s"Deployed Spark application $appName into Kubernetes.") } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala index bf4ec0489320..416e9f61a3ae 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala @@ -178,6 +178,17 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { verify(loggingPodStatusWatcher).awaitCompletion() } + test("Submission client should clean up Kubernetes resources upon application completion") { + val submissionClient = new Client( + submissionSteps, + new SparkConf(false), + kubernetesClient, + true, + "spark", + loggingPodStatusWatcher) + submissionClient.run() + verify(resourceList).delete() + } } private object FirstTestConfigurationStep extends DriverConfigurationStep {