Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Support mounting hostPath volumes for executors
  • Loading branch information
madanadit authored and Andrew Korzhuev committed Jun 8, 2018
commit 30dd8127f352c9902ad7ca1bf8e4e1574bcebb7c
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,13 @@ private[spark] object Config extends Logging {
.stringConf
.createWithDefault("spark")

val KUBERNETES_EXECUTOR_VOLUMES =
ConfigBuilder("spark.kubernetes.executor.volumes")
.doc("List of volumes mounted into the executor container. The format of this property is " +
"a comma-separated list of mappings following the form hostPath:containerPath:name")
.stringConf
.createWithDefault("")

val KUBERNETES_ALLOCATION_BATCH_SIZE =
ConfigBuilder("spark.kubernetes.allocation.batch.size")
.doc("Number of pods to launch at once in each round of executor allocation.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.deploy.k8s

import io.fabric8.kubernetes.api.model.LocalObjectReference
import io.fabric8.kubernetes.api.model._

import org.apache.spark.SparkConf
import org.apache.spark.util.Utils
Expand All @@ -37,6 +37,35 @@ private[spark] object KubernetesUtils {
sparkConf.getAllWithPrefix(prefix).toMap
}

/**
* Parse a comma-delimited list of volume specs, each of which
* takes the form hostPath:containerPath:name and add to pod.
*
* @param pod original specification of the pod
* @param container original specification of the container
* @param volumes list of volume specs
* @return the pod with the init-container added to the list of InitContainers
*/
def addVolumes(pod: Pod, container : Container, volumes: String): (Pod, Container) = {
val podBuilder = new PodBuilder(pod).editOrNewSpec()
val containerBuilder = new ContainerBuilder(container)
volumes.split(",").map(_.split(":")).map { spec =>
spec match {
case Array(hostPath, containerPath, name) =>
podBuilder
.withVolumes(new VolumeBuilder()
.withHostPath(new HostPathVolumeSource(hostPath)).withName(name).build())
containerBuilder.addToVolumeMounts(new VolumeMountBuilder()
.withMountPath(containerPath)
.withName(name)
.build())
case spec =>
None
}
}
(podBuilder.endSpec().build(), containerBuilder.build())
}

def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
package org.apache.spark.deploy.k8s.features

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, PodBuilder, QuantityBuilder}
import io.fabric8.kubernetes.api.model._

import org.apache.spark.SparkException
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod}
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, KubernetesUtils, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
Expand Down Expand Up @@ -172,7 +171,14 @@ private[spark] class BasicExecutorFeatureStep(
.addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*)
.endSpec()
.build()
SparkPod(executorPod, containerWithLimitCores)

val volumes = kubernetesConf.get(KUBERNETES_EXECUTOR_VOLUMES)
val executorPodAndContainerWithVolumes =
KubernetesUtils.addVolumes(executorPod, containerWithLimitCores, volumes)
val executorPodWithVolumes = executorPodAndContainerWithVolumes._1
val executorContainerWithVolumes = executorPodAndContainerWithVolumes._2

SparkPod(executorPodWithVolumes, executorContainerWithVolumes)
}

override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty
Expand Down