Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@
<arrow.version>9.0.0</arrow.version>
<!-- org.fusesource.leveldbjni will be used except on arm64 platform. -->
<leveldbjni.group>org.fusesource.leveldbjni</leveldbjni.group>
<kubernetes-client.version>5.12.3</kubernetes-client.version>
<kubernetes-client.version>6.1.1</kubernetes-client.version>

<test.java.home>${java.home}</test.java.home>

Expand Down
5 changes: 5 additions & 0 deletions resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@
<scope>test</scope>
</dependency>

<dependency>
Copy link
Contributor Author

@attilapiros attilapiros Sep 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/fabric8io/kubernetes-client/blob/master/doc/MIGRATION-v6.md#okhttp-httpclient:

The -client dependencies still default to the OkHttp client If you are doing any customization to OkHttp clients directly, you'll need to include the kubernetes-httpclient-okhttp dependency in the compile scope - instead of the default runtime scope

<groupId>io.fabric8</groupId>
<artifactId>kubernetes-httpclient-okhttp</artifactId>
<version>${kubernetes-client.version}</version>
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.File
import com.fasterxml.jackson.databind.ObjectMapper
import com.google.common.base.Charsets
import com.google.common.io.Files
import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient}
import io.fabric8.kubernetes.client.{ConfigBuilder, KubernetesClient, KubernetesClientBuilder}
import io.fabric8.kubernetes.client.Config.KUBERNETES_REQUEST_RETRY_BACKOFFLIMIT_SYSTEM_PROPERTY
import io.fabric8.kubernetes.client.Config.autoConfigure
import io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory
Expand Down Expand Up @@ -115,7 +115,10 @@ private[spark] object SparkKubernetesClientFactory extends Logging {
}
logDebug("Kubernetes client config: " +
new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(config))
new DefaultKubernetesClient(factoryWithCustomDispatcher.createHttpClient(config), config)
new KubernetesClientBuilder()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of apiimpl-split:

When you rely solely on a compile dependency to the respective -api dependencies you will not be able to use DefaultKubernetesClient nor DefaultOpenShiftClient directly to create your client instances. You should instead use KubernetesClientBuilder.

.withHttpClientFactory(factoryWithCustomDispatcher)
.withConfig(config)
.build()
}

private implicit class OptionConfigurableConfigBuilder(val configBuilder: ConfigBuilder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package org.apache.spark.deploy.k8s.submit
import scala.collection.JavaConverters._

import K8SSparkSubmitOperation.getGracePeriod
import io.fabric8.kubernetes.api.model.{Pod, PodList}
import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.{NonNamespaceOperation, PodResource}
import io.fabric8.kubernetes.client.dsl.PodResource

import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkSubmitOperation
Expand All @@ -32,25 +32,23 @@ import org.apache.spark.deploy.k8s.KubernetesUtils.formatPodState
import org.apache.spark.util.{CommandLineLoggingUtils, Utils}

private sealed trait K8sSubmitOp extends CommandLineLoggingUtils {
type NON_NAMESPACED_PODS =
NonNamespaceOperation[Pod, PodList, PodResource[Pod]]
def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf)
(implicit client: KubernetesClient): Unit
def executeOnGlob(pods: List[Pod], ns: Option[String], sparkConf: SparkConf)
(implicit client: KubernetesClient): Unit
def listPodsInNameSpace(namespace: Option[String])
(implicit client: KubernetesClient): NON_NAMESPACED_PODS = {
def getPod(namespace: Option[String], name: String)
(implicit client: KubernetesClient): PodResource = {
namespace match {
case Some(ns) => client.pods.inNamespace(ns)
case None => client.pods
case Some(ns) => client.pods.inNamespace(ns).withName(name)
case None => client.pods.withName(name)
}
}
}

private class KillApplication extends K8sSubmitOp {
override def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf)
(implicit client: KubernetesClient): Unit = {
val podToDelete = listPodsInNameSpace(namespace).withName(pName)
val podToDelete = getPod(namespace, pName)

if (Option(podToDelete).isDefined) {
getGracePeriod(sparkConf) match {
Expand All @@ -66,19 +64,11 @@ private class KillApplication extends K8sSubmitOp {
(implicit client: KubernetesClient): Unit = {
if (pods.nonEmpty) {
pods.foreach { pod => printMessage(s"Deleting driver pod: ${pod.getMetadata.getName}.") }
val listedPods = listPodsInNameSpace(namespace)

getGracePeriod(sparkConf) match {
case Some(period) =>
// this is not using the batch api because no option is provided
// when using the grace period.
pods.foreach { pod =>
listedPods
.withName(pod.getMetadata.getName)
.withGracePeriod(period)
.delete()
}
case _ => listedPods.delete(pods.asJava)
client.resourceList(pods.asJava).withGracePeriod(period).delete()
case _ =>
client.resourceList(pods.asJava).delete()
}
} else {
printMessage("No applications found.")
Expand All @@ -89,7 +79,7 @@ private class KillApplication extends K8sSubmitOp {
private class ListStatus extends K8sSubmitOp {
override def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf)
(implicit client: KubernetesClient): Unit = {
val pod = listPodsInNameSpace(namespace).withName(pName).get()
val pod = getPod(namespace, pName).get()
if (Option(pod).isDefined) {
printMessage("Application status (driver): " +
Option(pod).map(formatPodState).getOrElse("unknown."))
Expand Down Expand Up @@ -144,14 +134,13 @@ private[spark] class K8SSparkSubmitOperation extends SparkSubmitOperation
kubernetesClient
.pods
}
ops.withLabel(SPARK_ROLE_LABEL, SPARK_POD_DRIVER_ROLE)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do the label based filtering as early as possible.

val pods = ops
.list()
.getItems
.asScala
.filter { pod =>
val meta = pod.getMetadata
meta.getName.startsWith(pName.stripSuffix("*")) &&
meta.getLabels.get(SPARK_ROLE_LABEL) == SPARK_POD_DRIVER_ROLE
pod.getMetadata.getName.startsWith(pName.stripSuffix("*"))
}.toList
op.executeOnGlob(pods, namespace, sparkConf)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ private[spark] class Client(
var watch: Watch = null
var createdDriverPod: Pod = null
try {
createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
createdDriverPod =
kubernetesClient.pods().inNamespace(conf.namespace).resource(resolvedDriverPod).create()
} catch {
case NonFatal(e) =>
kubernetesClient.resourceList(preKubernetesResources: _*).delete()
Expand All @@ -163,7 +164,7 @@ private[spark] class Client(
kubernetesClient.resourceList(preKubernetesResources: _*).createOrReplace()
} catch {
case NonFatal(e) =>
kubernetesClient.pods().delete(createdDriverPod)
kubernetesClient.pods().resource(createdDriverPod).delete()
kubernetesClient.resourceList(preKubernetesResources: _*).delete()
throw e
}
Expand All @@ -175,7 +176,7 @@ private[spark] class Client(
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
} catch {
case NonFatal(e) =>
kubernetesClient.pods().delete(createdDriverPod)
kubernetesClient.pods().resource(createdDriverPod).delete()
throw e
}

Expand All @@ -185,6 +186,7 @@ private[spark] class Client(
while (true) {
val podWithName = kubernetesClient
.pods()
.inNamespace(conf.namespace)
.withName(driverPodName)
// Reset resource to old before we start the watch, this is important for race conditions
watcher.reset()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class ExecutorPodsAllocator(

val driverPod = kubernetesDriverPodName
.map(name => Option(kubernetesClient.pods()
.inNamespace(namespace)
.withName(name)
.get())
.getOrElse(throw new SparkException(
Expand Down Expand Up @@ -112,6 +113,7 @@ class ExecutorPodsAllocator(
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.inNamespace(namespace)
.withName(pod.getMetadata.getName)
.waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS)
}
Expand Down Expand Up @@ -185,6 +187,7 @@ class ExecutorPodsAllocator(
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.inNamespace(namespace)
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, timedOut.toSeq.map(_.toString): _*)
Expand Down Expand Up @@ -299,6 +302,7 @@ class ExecutorPodsAllocator(
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.inNamespace(namespace)
.withField("status.phase", "Pending")
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
Expand Down Expand Up @@ -406,7 +410,8 @@ class ExecutorPodsAllocator(
.build()
val resources = replacePVCsIfNeeded(
podWithAttachedContainer, resolvedExecutorSpec.executorKubernetesResources, reusablePVCs)
val createdExecutorPod = kubernetesClient.pods().create(podWithAttachedContainer)
val createdExecutorPod =
kubernetesClient.pods().inNamespace(namespace).resource(podWithAttachedContainer).create()
try {
addOwnerReference(createdExecutorPod, resources)
resources
Expand All @@ -418,13 +423,16 @@ class ExecutorPodsAllocator(
val pvc = resource.asInstanceOf[PersistentVolumeClaim]
logInfo(s"Trying to create PersistentVolumeClaim ${pvc.getMetadata.getName} with " +
s"StorageClass ${pvc.getSpec.getStorageClassName}")
kubernetesClient.persistentVolumeClaims().create(pvc)
kubernetesClient.persistentVolumeClaims().resource(pvc).create()
}
newlyCreatedExecutors(newExecutorId) = (resourceProfileId, clock.getTimeMillis())
logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
} catch {
case NonFatal(e) =>
kubernetesClient.pods().delete(createdExecutorPod)
kubernetesClient.pods()
.inNamespace(namespace)
.resource(createdExecutorPod)
.delete()
throw e
}
}
Expand Down Expand Up @@ -475,6 +483,7 @@ class ExecutorPodsAllocator(
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.inNamespace(namespace)
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.delete()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.scheduler.cluster.k8s

import java.util.concurrent.TimeUnit
import java.util.function.UnaryOperator

import com.google.common.cache.CacheBuilder
import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
Expand Down Expand Up @@ -57,6 +58,8 @@ private[spark] class ExecutorPodsLifecycleManager(
// This set is cleaned up when a snapshot containing the updated pod is processed.
private val inactivatedPods = mutable.HashSet.empty[Long]

private val namespace = conf.get(KUBERNETES_NAMESPACE)

def start(schedulerBackend: KubernetesClusterSchedulerBackend): Unit = {
val eventProcessingInterval = conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL)
snapshotsStore.addSubscriber(eventProcessingInterval) {
Expand Down Expand Up @@ -168,23 +171,19 @@ private[spark] class ExecutorPodsLifecycleManager(
// of getting rid of the pod is what matters.
kubernetesClient
.pods()
.inNamespace(namespace)
.withName(updatedPod.getMetadata.getName)
.delete()
} else if (!inactivatedPods.contains(execId) && !isPodInactive(updatedPod)) {
// If the config is set to keep the executor around, mark the pod as "inactive" so it
// can be ignored in future updates from the API server.
logDebug(s"Marking executor ${updatedPod.getMetadata.getName} as inactive since " +
"deletion is disabled.")
val inactivatedPod = new PodBuilder(updatedPod)
.editMetadata()
.addToLabels(Map(SPARK_EXECUTOR_INACTIVE_LABEL -> "true").asJava)
.endMetadata()
.build()

kubernetesClient
.pods()
.inNamespace(namespace)
.withName(updatedPod.getMetadata.getName)
.patch(inactivatedPod)
.edit(executorInactivationFn)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I see the edit is preferred over the patch, see kubernetes-client-dsl-usage


inactivatedPods += execId
}
Expand Down Expand Up @@ -274,4 +273,9 @@ private object ExecutorPodsLifecycleManager {
s"${code}${humanStr}"
}

def executorInactivationFn: UnaryOperator[Pod] = (p: Pod) => new PodBuilder(p)
.editOrNewMetadata()
.addToLabels(SPARK_EXECUTOR_INACTIVE_LABEL, "true")
.endMetadata()
.build()
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
Map(SPARK_APP_ID_LABEL -> applicationId(), SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE)
val configMap = KubernetesClientUtils.buildConfigMap(configMapName, confFilesMap, labels)
KubernetesUtils.addOwnerReference(driverPod.orNull, Seq(configMap))
kubernetesClient.configMaps().create(configMap)
kubernetesClient.configMaps().inAnyNamespace().resource(configMap).create()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class StatefulSetPodsAllocator(

val driverPod = kubernetesDriverPodName
.map(name => Option(kubernetesClient.pods()
.inNamespace(namespace)
.withName(name)
.get())
.getOrElse(throw new SparkException(
Expand All @@ -69,6 +70,7 @@ class StatefulSetPodsAllocator(
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.inNamespace(namespace)
.withName(pod.getMetadata.getName)
.waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS)
}
Expand Down Expand Up @@ -99,7 +101,7 @@ class StatefulSetPodsAllocator(
applicationId: String,
resourceProfileId: Int): Unit = {
if (setsCreated.contains(resourceProfileId)) {
val statefulset = kubernetesClient.apps().statefulSets().withName(
val statefulset = kubernetesClient.apps().statefulSets().inNamespace(namespace).withName(
setName(applicationId, resourceProfileId: Int))
statefulset.scale(expected, false /* wait */)
} else {
Expand Down Expand Up @@ -169,7 +171,7 @@ class StatefulSetPodsAllocator(
val statefulSet = new io.fabric8.kubernetes.api.model.apps.StatefulSetBuilder()
.withNewMetadata()
.withName(setName(applicationId, resourceProfileId))
.withNamespace(conf.get(KUBERNETES_NAMESPACE))
.withNamespace(namespace)
.endMetadata()
.withNewSpec()
.withPodManagementPolicy("Parallel")
Expand All @@ -185,7 +187,7 @@ class StatefulSetPodsAllocator(
.build()

addOwnerReference(driverPod.get, Seq(statefulSet))
kubernetesClient.apps().statefulSets().create(statefulSet)
kubernetesClient.apps().statefulSets().inNamespace(namespace).resource(statefulSet).create()
setsCreated += (resourceProfileId)
}
}
Expand All @@ -194,7 +196,12 @@ class StatefulSetPodsAllocator(
// Cleanup the statefulsets when we stop
setsCreated.foreach { rpid =>
Utils.tryLogNonFatalError {
kubernetesClient.apps().statefulSets().withName(setName(applicationId, rpid)).delete()
kubernetesClient
.apps()
.statefulSets()
.inNamespace(namespace)
.withName(setName(applicationId, rpid))
.delete()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,28 @@
package org.apache.spark.deploy.k8s

import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapList, HasMetadata, PersistentVolumeClaim, PersistentVolumeClaimList, Pod, PodList}
import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource, Resource}
import io.fabric8.kubernetes.api.model.apps.StatefulSet
import io.fabric8.kubernetes.api.model.apps.StatefulSetList
import io.fabric8.kubernetes.client.dsl.{AnyNamespaceOperation, FilterWatchListDeletable, MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, NonNamespaceOperation, PodResource, Resource, RollableScalableResource}

object Fabric8Aliases {
type PODS = MixedOperation[Pod, PodList, PodResource[Pod]]
type PODS = MixedOperation[Pod, PodList, PodResource]
type PODS_WITH_NAMESPACE = NonNamespaceOperation[Pod, PodList, PodResource]
type CONFIG_MAPS = MixedOperation[
ConfigMap, ConfigMapList, Resource[ConfigMap]]
type LABELED_PODS = FilterWatchListDeletable[Pod, PodList]
type LABELED_CONFIG_MAPS = FilterWatchListDeletable[ConfigMap, ConfigMapList]
type SINGLE_POD = PodResource[Pod]
type CONFIG_MAPS_OPERATION = AnyNamespaceOperation[ConfigMap, ConfigMapList, Resource[ConfigMap]]
type CONFIG_MAPS_RESOURCE = Resource[ConfigMap]
type LABELED_PODS = FilterWatchListDeletable[Pod, PodList, PodResource]
type LABELED_CONFIG_MAPS = FilterWatchListDeletable[ConfigMap, ConfigMapList, Resource[ConfigMap]]
type SINGLE_POD = PodResource
type RESOURCE_LIST = NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable[
HasMetadata]
type STATEFUL_SET_RES = RollableScalableResource[StatefulSet]
type STATEFUL_SETS = MixedOperation[StatefulSet, StatefulSetList, STATEFUL_SET_RES]
type STATEFUL_SETS_NAMESPACED =
NonNamespaceOperation[StatefulSet, StatefulSetList, STATEFUL_SET_RES]
type PERSISTENT_VOLUME_CLAIMS = MixedOperation[PersistentVolumeClaim, PersistentVolumeClaimList,
Resource[PersistentVolumeClaim]]
type LABELED_PERSISTENT_VOLUME_CLAIMS =
FilterWatchListDeletable[PersistentVolumeClaim, PersistentVolumeClaimList]
type LABELED_PERSISTENT_VOLUME_CLAIMS = FilterWatchListDeletable[PersistentVolumeClaim,
PersistentVolumeClaimList, Resource[PersistentVolumeClaim]]
}
Loading