Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ private[spark] object Config extends Logging {
.checkValues(Set("Always", "Never", "IfNotPresent"))
.createWithDefault("IfNotPresent")

val IMAGE_PULL_SECRET =
ConfigBuilder("spark.kubernetes.imagePullSecret")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given there's an option spark.kubernetes.container.image.pullPolicy, we should make this consistent as spark.kubernetes.container.image.pullSecret

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

.doc("Specifies the Kubernetes secret used to access private image registry.")
.stringConf
.createOptional

val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
"spark.kubernetes.authenticate.driver"
val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.submit.steps

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder}
import io.fabric8.kubernetes.api.model._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be an empty line between third-party imports and org.apache.spark.* imports.


import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.k8s.Config._
Expand Down Expand Up @@ -51,6 +51,8 @@ private[spark] class BasicDriverConfigurationStep(
.get(DRIVER_CONTAINER_IMAGE)
.getOrElse(throw new SparkException("Must specify the driver container image"))

private val imagePullSecret = sparkConf.get(IMAGE_PULL_SECRET)

// CPU settings
private val driverCpuCores = sparkConf.getOption("spark.driver.cores").getOrElse("1")
private val driverLimitCores = sparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES)
Expand Down Expand Up @@ -132,6 +134,8 @@ private[spark] class BasicDriverConfigurationStep(
case _ => driverContainerWithoutArgs.addToArgs(appArgs: _*).build()
}

val imagePullSecrets = imagePullSecret.map(new LocalObjectReference(_)).toList

val baseDriverPod = new PodBuilder(driverSpec.driverPod)
.editOrNewMetadata()
.withName(driverPodName)
Expand All @@ -141,6 +145,7 @@ private[spark] class BasicDriverConfigurationStep(
.withNewSpec()
.withRestartPolicy("Never")
.withNodeSelector(nodeSelector.asJava)
.withImagePullSecrets(imagePullSecrets.asJava)
.endSpec()
.build()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ private[spark] class ExecutorPodFactory(
.get(EXECUTOR_CONTAINER_IMAGE)
.getOrElse(throw new SparkException("Must specify the executor container image"))
private val imagePullPolicy = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
private val imagePullSecret = sparkConf.get(IMAGE_PULL_SECRET)
private val blockManagerPort = sparkConf
.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)

Expand Down Expand Up @@ -98,6 +99,8 @@ private[spark] class ExecutorPodFactory(
nodeToLocalTaskCount: Map[String, Int]): Pod = {
val name = s"$executorPodNamePrefix-exec-$executorId"

val imagePullSecrets = imagePullSecret.map(new LocalObjectReference(_)).toList
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the same code is used to configure both the driver and executor pods, it can be extracted out into a utility method in KubernetesUtil.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liyinan926 I made the change, but imo it looks a bit awkward, as what is done here is just type conversion Option[String] ~> List[LocalObjectReference] to make config option palatable for the k8s.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it seems a bit awkward. I guess the original form looks better. Sorry for that, but looks like it's better if you revert this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liyinan926 reverted! Should be good to merge now. Rebased against master too.


// hostname must be no longer than 63 characters, so take the last 63 characters of the pod
// name as the hostname. This preserves uniqueness since the end of name contains
// executorId
Expand Down Expand Up @@ -193,6 +196,7 @@ private[spark] class ExecutorPodFactory(
.withHostname(hostname)
.withRestartPolicy("Never")
.withNodeSelector(nodeSelector.asJava)
.withImagePullSecrets(imagePullSecrets.asJava)
.endSpec()
.build()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
package org.apache.spark.deploy.k8s.submit.steps

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder}

import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, LocalObjectReference, PodBuilder}
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
Expand Down Expand Up @@ -51,6 +49,7 @@ class BasicDriverConfigurationStepSuite extends SparkFunSuite {
.set(s"$KUBERNETES_DRIVER_ANNOTATION_PREFIX$CUSTOM_ANNOTATION_KEY", CUSTOM_ANNOTATION_VALUE)
.set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY1", "customDriverEnv1")
.set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY2", "customDriverEnv2")
.set(IMAGE_PULL_SECRET, "imagePullSecret")

val submissionStep = new BasicDriverConfigurationStep(
APP_ID,
Expand Down Expand Up @@ -103,7 +102,11 @@ class BasicDriverConfigurationStepSuite extends SparkFunSuite {
CUSTOM_ANNOTATION_KEY -> CUSTOM_ANNOTATION_VALUE,
SPARK_APP_NAME_ANNOTATION -> APP_NAME)
assert(driverPodMetadata.getAnnotations.asScala === expectedAnnotations)
assert(preparedDriverSpec.driverPod.getSpec.getRestartPolicy === "Never")

val driverPodSpec = preparedDriverSpec.driverPod.getSpec
assert(driverPodSpec.getRestartPolicy === "Never")
assert(driverPodSpec.getImagePullSecrets.size() === 1)
assert(driverPodSpec.getImagePullSecrets.get(0).getName === "imagePullSecret")

val resolvedSparkConf = preparedDriverSpec.driverSparkConf.getAll.toMap
val expectedSparkConf = Map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
private val driverPodUid: String = "driver-uid"
private val executorPrefix: String = "base"
private val executorImage: String = "executor-image"
private val imagePullSecret: String = "imagePullSecret"
private val driverPod = new PodBuilder()
.withNewMetadata()
.withName(driverPodName)
Expand All @@ -54,6 +55,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, executorPrefix)
.set(CONTAINER_IMAGE, executorImage)
.set(KUBERNETES_DRIVER_SUBMIT_CHECK, true)
.set(IMAGE_PULL_SECRET, imagePullSecret)
}

test("basic executor pod has reasonable defaults") {
Expand All @@ -74,6 +76,8 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
assert(executor.getSpec.getContainers.get(0).getResources.getLimits.size() === 1)
assert(executor.getSpec.getContainers.get(0).getResources
.getLimits.get("memory").getAmount === "1408Mi")
assert(executor.getSpec.getImagePullSecrets.size() === 1)
assert(executor.getSpec.getImagePullSecrets.get(0).getName === imagePullSecret)

// The pod has no node selector, volumes.
assert(executor.getSpec.getNodeSelector.isEmpty)
Expand Down