Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Use the new initContainers field in Kubernetes 1.8
  • Loading branch information
liyinan926 committed Oct 18, 2017
commit 1b129496b64ad99faf5bfbca5d3c84a261e3135c
2 changes: 1 addition & 1 deletion resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<name>Spark Project Kubernetes</name>
<properties>
<sbt.project.name>kubernetes</sbt.project.name>
<kubernetes.client.version>2.2.13</kubernetes.client.version>
<kubernetes.client.version>3.0.0</kubernetes.client.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ package object constants {
private[spark] val ENV_MOUNTED_FILES_FROM_SECRET_DIR = "SPARK_MOUNTED_FILES_FROM_SECRET_DIR"

// Bootstrapping dependencies with the init-container
private[spark] val INIT_CONTAINER_ANNOTATION = "pod.beta.kubernetes.io/init-containers"
private[spark] val INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH =
"/mnt/secrets/spark-init"
private[spark] val INIT_CONTAINER_SUBMITTED_JARS_SECRET_KEY =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,15 @@
*/
package org.apache.spark.deploy.k8s.submit

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import io.fabric8.kubernetes.api.model.{Container, Pod, PodBuilder}
import scala.collection.JavaConverters._

import org.apache.spark.deploy.k8s.constants._

private[spark] object InitContainerUtil {

private val OBJECT_MAPPER = new ObjectMapper().registerModule(DefaultScalaModule)

def appendInitContainer(originalPodSpec: Pod, initContainer: Container): Pod = {
val resolvedInitContainers = originalPodSpec
.getMetadata
.getAnnotations
.asScala
.get(INIT_CONTAINER_ANNOTATION)
.map { existingInitContainerAnnotation =>
val existingInitContainers = OBJECT_MAPPER.readValue(
existingInitContainerAnnotation, classOf[List[Container]])
existingInitContainers ++ Seq(initContainer)
}.getOrElse(Seq(initContainer))
val resolvedSerializedInitContainers = OBJECT_MAPPER.writeValueAsString(resolvedInitContainers)
new PodBuilder(originalPodSpec)
.editMetadata()
.removeFromAnnotations(INIT_CONTAINER_ANNOTATION)
.addToAnnotations(INIT_CONTAINER_ANNOTATION, resolvedSerializedInitContainers)
.endMetadata()
.editOrNewSpec()
.addToInitContainers(initContainer)
.endSpec()
.build()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,15 @@ private[k8s] class LoggingPodStatusWatcherImpl(
("namespace", pod.getMetadata.getNamespace()),
("labels", pod.getMetadata.getLabels().asScala.mkString(", ")),
("pod uid", pod.getMetadata.getUid),
("creation time", pod.getMetadata.getCreationTimestamp()),
("creation time", pod.getMetadata.getCreationTimestamp().getTime),

// spec details
("service account name", pod.getSpec.getServiceAccountName()),
("volumes", pod.getSpec.getVolumes().asScala.map(_.getName).mkString(", ")),
("node name", pod.getSpec.getNodeName()),

// status
("start time", pod.getStatus.getStartTime),
("start time", pod.getStatus.getStartTime.getTime),
("container images",
pod.getStatus.getContainerStatuses()
.asScala
Expand Down Expand Up @@ -162,7 +162,7 @@ private[k8s] class LoggingPodStatusWatcherImpl(
case running: ContainerStateRunning =>
Seq(
("Container state", "Running"),
("Container started at", running.getStartedAt))
("Container started at", running.getStartedAt.getTime))
case waiting: ContainerStateWaiting =>
Seq(
("Container state", "Waiting"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer.{InitContainerConfigurationStep, InitContainerSpec}
import org.apache.spark.util.Utils

private[spark] class initContainerBootstrapStepSuite extends SparkFunSuite {
private[spark] class InitContainerBootstrapStepSuite extends SparkFunSuite {

private val OBJECT_MAPPER = new ObjectMapper().registerModule(DefaultScalaModule)
private val CONFIG_MAP_NAME = "spark-init-config-map"
private val CONFIG_MAP_KEY = "spark-init-config-map-key"

Expand All @@ -59,12 +58,9 @@ private[spark] class initContainerBootstrapStepSuite extends SparkFunSuite {
FirstTestInitContainerConfigurationStep$.additionalMainContainerEnvKey)
assert(additionalDriverEnv.head.getValue ===
FirstTestInitContainerConfigurationStep$.additionalMainContainerEnvValue)
val driverAnnotations = preparedDriverSpec.driverPod.getMetadata.getAnnotations.asScala
assert(driverAnnotations.size === 1)
val initContainers = OBJECT_MAPPER.readValue(
driverAnnotations(INIT_CONTAINER_ANNOTATION), classOf[Array[Container]])
assert(initContainers.length === 1)
val initContainerEnv = initContainers.head.getEnv.asScala
val initContainers = preparedDriverSpec.driverPod.getSpec.getInitContainers
assert(initContainers.size() === 1)
val initContainerEnv = initContainers.get(0).getEnv.asScala
assert(initContainerEnv.size === 1)
assert(initContainerEnv.head.getName ===
SecondTestInitContainerConfigurationStep$.additionalInitContainerEnvKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
verify(nodeAffinityExecutorPodModifier, times(1))
.addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]]))

assert(executor.getMetadata.getAnnotations.size() === 1)
assert(executor.getMetadata.getAnnotations.containsKey(INIT_CONTAINER_ANNOTATION))
assert(executor.getSpec.getInitContainers.size() === 1)
checkOwnerReferences(executor, driverPodUid)
}

Expand Down