From 30dd8127f352c9902ad7ca1bf8e4e1574bcebb7c Mon Sep 17 00:00:00 2001 From: madanadit Date: Wed, 4 Apr 2018 18:02:25 -0700 Subject: [PATCH 01/12] Support mounting hostPath volumes for executors --- .../org/apache/spark/deploy/k8s/Config.scala | 7 +++++ .../spark/deploy/k8s/KubernetesUtils.scala | 31 ++++++++++++++++++- .../features/BasicExecutorFeatureStep.scala | 14 ++++++--- 3 files changed, 47 insertions(+), 5 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 560dedf431b0..de3d0392b0ce 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -117,6 +117,13 @@ private[spark] object Config extends Logging { .stringConf .createWithDefault("spark") + val KUBERNETES_EXECUTOR_VOLUMES = + ConfigBuilder("spark.kubernetes.executor.volumes") + .doc("List of volumes mounted into the executor container. The format of this property is " + + "a comma-separated list of mappings following the form hostPath:containerPath:name") + .stringConf + .createWithDefault("") + val KUBERNETES_ALLOCATION_BATCH_SIZE = ConfigBuilder("spark.kubernetes.allocation.batch.size") .doc("Number of pods to launch at once in each round of executor allocation.") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index ee629068ad90..bebeea1edeca 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.deploy.k8s -import io.fabric8.kubernetes.api.model.LocalObjectReference +import io.fabric8.kubernetes.api.model._ import org.apache.spark.SparkConf import org.apache.spark.util.Utils @@ -37,6 +37,35 @@ private[spark] object KubernetesUtils { sparkConf.getAllWithPrefix(prefix).toMap } + /** + * Parse a comma-delimited list of volume specs, each of which + * takes the form hostPath:containerPath:name and add to pod. + * + * @param pod original specification of the pod + * @param container original specification of the container + * @param volumes list of volume specs + * @return the pod with the init-container added to the list of InitContainers + */ + def addVolumes(pod: Pod, container : Container, volumes: String): (Pod, Container) = { + val podBuilder = new PodBuilder(pod).editOrNewSpec() + val containerBuilder = new ContainerBuilder(container) + volumes.split(",").map(_.split(":")).map { spec => + spec match { + case Array(hostPath, containerPath, name) => + podBuilder + .withVolumes(new VolumeBuilder() + .withHostPath(new HostPathVolumeSource(hostPath)).withName(name).build()) + containerBuilder.addToVolumeMounts(new VolumeMountBuilder() + .withMountPath(containerPath) + .withName(name) + .build()) + case spec => + None + } + } + (podBuilder.endSpec().build(), containerBuilder.build()) + } + def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = { opt1.foreach { _ => require(opt2.isEmpty, errMessage) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 529069d3b8a0..5f1bbb0dcb2b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -17,11 +17,10 @@ package org.apache.spark.deploy.k8s.features import scala.collection.JavaConverters._ - -import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, PodBuilder, QuantityBuilder} +import io.fabric8.kubernetes.api.model._ import org.apache.spark.SparkException -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, KubernetesUtils, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD} @@ -172,7 +171,14 @@ private[spark] class BasicExecutorFeatureStep( .addToImagePullSecrets(kubernetesConf.imagePullSecrets(): _*) .endSpec() .build() - SparkPod(executorPod, containerWithLimitCores) + + val volumes = kubernetesConf.get(KUBERNETES_EXECUTOR_VOLUMES) + val executorPodAndContainerWithVolumes = + KubernetesUtils.addVolumes(executorPod, containerWithLimitCores, volumes) + val executorPodWithVolumes = executorPodAndContainerWithVolumes._1 + val executorContainerWithVolumes = executorPodAndContainerWithVolumes._2 + + SparkPod(executorPodWithVolumes, executorContainerWithVolumes) } override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty From 36a10ac34ca3dd13f4d8ccf21fe8344b7c49321b Mon Sep 17 00:00:00 2001 From: madanadit Date: Thu, 5 Apr 2018 13:57:45 -0700 Subject: [PATCH 02/12] Read mode for mounted volumes --- .../org/apache/spark/deploy/k8s/Config.scala | 15 +- .../spark/deploy/k8s/KubernetesUtils.scala | 31 ---- .../deploy/k8s/KubernetesVolumeSpec.scala | 28 ++++ .../deploy/k8s/KubernetesVolumeUtils.scala | 142 ++++++++++++++++++ .../k8s/features/BasicDriverFeatureStep.scala | 14 +- .../features/BasicExecutorFeatureStep.scala | 13 +- .../BasicDriverFeatureStepSuite.scala | 59 ++++++++ .../BasicExecutorFeatureStepSuite.scala | 56 +++++++ 8 files changed, 311 insertions(+), 47 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index de3d0392b0ce..972fd6e3613d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -117,13 +117,6 @@ private[spark] object Config extends Logging { .stringConf .createWithDefault("spark") - val KUBERNETES_EXECUTOR_VOLUMES = - ConfigBuilder("spark.kubernetes.executor.volumes") - .doc("List of volumes mounted into the executor container. The format of this property is " + - "a comma-separated list of mappings following the form hostPath:containerPath:name") - .stringConf - .createWithDefault("") - val KUBERNETES_ALLOCATION_BATCH_SIZE = ConfigBuilder("spark.kubernetes.allocation.batch.size") .doc("Number of pods to launch at once in each round of executor allocation.") @@ -170,11 +163,19 @@ private[spark] object Config extends Logging { val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation." val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets." val KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX = "spark.kubernetes.driver.secretKeyRef." + val KUBERNETES_DRIVER_VOLUMES_PREFIX = "spark.kubernetes.driver.volumes." val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label." val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation." val KUBERNETES_EXECUTOR_SECRETS_PREFIX = "spark.kubernetes.executor.secrets." val KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX = "spark.kubernetes.executor.secretKeyRef." + val KUBERNETES_EXECUTOR_VOLUMES_PREFIX = "spark.kubernetes.executor.volumes." + + val KUBERNETES_VOLUMES_HOSTPATH_KEY = "hostPath" + val KUBERNETES_VOLUMES_MOUNT_KEY = "mount" + val KUBERNETES_VOLUMES_PATH_KEY = "path" + val KUBERNETES_VOLUMES_READONLY_KEY = "readOnly" + val KUBERNETES_VOLUMES_OPTIONS_KEY = "options" val KUBERNETES_DRIVER_ENV_PREFIX = "spark.kubernetes.driverEnv." } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index bebeea1edeca..5bc070147d3a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -16,8 +16,6 @@ */ package org.apache.spark.deploy.k8s -import io.fabric8.kubernetes.api.model._ - import org.apache.spark.SparkConf import org.apache.spark.util.Utils @@ -37,35 +35,6 @@ private[spark] object KubernetesUtils { sparkConf.getAllWithPrefix(prefix).toMap } - /** - * Parse a comma-delimited list of volume specs, each of which - * takes the form hostPath:containerPath:name and add to pod. - * - * @param pod original specification of the pod - * @param container original specification of the container - * @param volumes list of volume specs - * @return the pod with the init-container added to the list of InitContainers - */ - def addVolumes(pod: Pod, container : Container, volumes: String): (Pod, Container) = { - val podBuilder = new PodBuilder(pod).editOrNewSpec() - val containerBuilder = new ContainerBuilder(container) - volumes.split(",").map(_.split(":")).map { spec => - spec match { - case Array(hostPath, containerPath, name) => - podBuilder - .withVolumes(new VolumeBuilder() - .withHostPath(new HostPathVolumeSource(hostPath)).withName(name).build()) - containerBuilder.addToVolumeMounts(new VolumeMountBuilder() - .withMountPath(containerPath) - .withName(name) - .build()) - case spec => - None - } - } - (podBuilder.endSpec().build(), containerBuilder.build()) - } - def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = { opt1.foreach { _ => require(opt2.isEmpty, errMessage) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala new file mode 100644 index 000000000000..afd6524aed54 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import scala.collection.mutable.Map + +private[spark] case class KubernetesVolumeSpec( + var mountPath: Option[String], + var mountReadOnly: Option[Boolean], + var optionsSpec: Map[String, String]) + +private[spark] object KubernetesVolumeSpec { + def emptySpec(): KubernetesVolumeSpec = new KubernetesVolumeSpec(None, None, Map()) +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala new file mode 100644 index 000000000000..ed05acd23f5a --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import scala.collection.mutable.HashMap + +import io.fabric8.kubernetes.api.model._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ + +private[spark] object KubernetesVolumeUtils { + + /** + * Given hostPath volume specs, add volume to pod and volume mount to container. + * + * @param pod original specification of the pod + * @param container original specification of the container + * @param sparkConf Spark configuration + * @param prefix the prefix for volume configuration + * @return a tuple of (pod with the volume(s) added, container with mount(s) added) + */ + def addVolumes( + pod: Pod, + container: Container, + sparkConf: SparkConf, + prefix : String): (Pod, Container) = { + val hostPathVolumeSpecs = parseHostPathVolumesWithPrefix(sparkConf, prefix) + addHostPathVolumes(pod, container, hostPathVolumeSpecs) + } + + /** + * Extract Spark volume configuration properties with a given name prefix. + * + * @param sparkConf Spark configuration + * @param prefix the given property name prefix + * @param volumeTypeKey the given property name prefix + * @return a Map storing with volume name as key and spec as value + */ + def parseVolumesWithPrefix( + sparkConf: SparkConf, + prefix: String, + volumeTypeKey: String): Map[String, KubernetesVolumeSpec] = { + val volumes = HashMap[String, KubernetesVolumeSpec]() + val properties = sparkConf.getAllWithPrefix(s"$prefix$volumeTypeKey.").toList + // Extract volume names + properties.foreach { + k => + val keys = k._1.split("\\.") + if (keys.nonEmpty && !volumes.contains(keys(0))) { + volumes.update(keys(0), KubernetesVolumeSpec.emptySpec()) + } + } + // Populate spec + volumes.foreach { + case (name, spec) => + properties.foreach { + k => + k._1.split("\\.") match { + case Array(`name`, KUBERNETES_VOLUMES_MOUNT_KEY, KUBERNETES_VOLUMES_PATH_KEY) => + spec.mountPath = Some(k._2) + case Array(`name`, KUBERNETES_VOLUMES_MOUNT_KEY, KUBERNETES_VOLUMES_READONLY_KEY) => + spec.mountReadOnly = Some(k._2.toBoolean) + case Array(`name`, KUBERNETES_VOLUMES_OPTIONS_KEY, option) => + spec.optionsSpec.update(option, k._2) + case _ => + None + } + } + } + volumes.toMap + } + + /** + * Extract Spark hostPath volume configuration properties with a given name prefix and + * return the result as a Map. + * + * @param sparkConf Spark configuration + * @param prefix the given property name prefix + * @return a Map storing with volume name as key and spec as value + */ + def parseHostPathVolumesWithPrefix( + sparkConf: SparkConf, + prefix: String): Map[String, KubernetesVolumeSpec] = { + parseVolumesWithPrefix(sparkConf, prefix, KUBERNETES_VOLUMES_HOSTPATH_KEY) + } + + /** + * Given hostPath volume specs, add volume to pod and volume mount to container. + * + * @param pod original specification of the pod + * @param container original specification of the container + * @param volumes list of named volume specs + * @return a tuple of (pod with the volume(s) added, container with mount(s) added) + */ + def addHostPathVolumes( + pod: Pod, + container: Container, + volumes: Map[String, KubernetesVolumeSpec]): (Pod, Container) = { + val podBuilder = new PodBuilder(pod).editOrNewSpec() + val containerBuilder = new ContainerBuilder(container) + volumes foreach { + case (name, spec) => + var hostPath: Option[String] = None + if (spec.optionsSpec.contains(KUBERNETES_VOLUMES_PATH_KEY)) { + hostPath = Some(spec.optionsSpec(KUBERNETES_VOLUMES_PATH_KEY)) + } + if (hostPath.isDefined && spec.mountPath.isDefined) { + podBuilder.addToVolumes(new VolumeBuilder() + .withHostPath(new HostPathVolumeSource(hostPath.get)) + .withName(name) + .build()) + val volumeBuilder = new VolumeMountBuilder() + .withMountPath(spec.mountPath.get) + .withName(name) + if (spec.mountReadOnly.isDefined) { + containerBuilder + .addToVolumeMounts(volumeBuilder + .withReadOnly(spec.mountReadOnly.get) + .build()) + } else { + containerBuilder.addToVolumeMounts(volumeBuilder.build()) + } + } + } + (podBuilder.endSpec().build(), containerBuilder.build()) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 07bdccbe0479..8f94f1620ead 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -19,10 +19,10 @@ package org.apache.spark.deploy.k8s.features import scala.collection.JavaConverters._ import scala.collection.mutable -import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, HasMetadata, PodBuilder, QuantityBuilder} +import io.fabric8.kubernetes.api.model._ import org.apache.spark.SparkException -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkPod} +import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.config._ @@ -109,7 +109,15 @@ private[spark] class BasicDriverFeatureStep( .addToImagePullSecrets(conf.imagePullSecrets(): _*) .endSpec() .build() - SparkPod(driverPod, driverContainer) + + val (driverPodWithVolumes, driverContainerVolumes) = + KubernetesVolumeUtils.addVolumes( + driverPod, + driverContainer, + conf.sparkConf, + KUBERNETES_DRIVER_VOLUMES_PREFIX) + + SparkPod(driverPodWithVolumes, driverContainerVolumes) } override def getAdditionalPodSystemProperties(): Map[String, String] = { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 5f1bbb0dcb2b..2e4b091daedd 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -17,10 +17,11 @@ package org.apache.spark.deploy.k8s.features import scala.collection.JavaConverters._ + import io.fabric8.kubernetes.api.model._ import org.apache.spark.SparkException -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, KubernetesUtils, SparkPod} +import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD} @@ -172,11 +173,11 @@ private[spark] class BasicExecutorFeatureStep( .endSpec() .build() - val volumes = kubernetesConf.get(KUBERNETES_EXECUTOR_VOLUMES) - val executorPodAndContainerWithVolumes = - KubernetesUtils.addVolumes(executorPod, containerWithLimitCores, volumes) - val executorPodWithVolumes = executorPodAndContainerWithVolumes._1 - val executorContainerWithVolumes = executorPodAndContainerWithVolumes._2 + val (executorPodWithVolumes, executorContainerWithVolumes) = + KubernetesVolumeUtils.addVolumes(executorPod, + containerWithLimitCores, + kubernetesConf.sparkConf, + KUBERNETES_EXECUTOR_VOLUMES_PREFIX) SparkPod(executorPodWithVolumes, executorContainerWithVolumes) } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index b2813d8b3265..b4fe3ac8c467 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -152,4 +152,63 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { "spark.files" -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt") assert(additionalProperties === expectedSparkConf) } + + test("single driver hostPath volume gets mounted") { + hostPathVolumeTest(1, false) + } + + test("multiple driver hostPath volumes get mounted") { + hostPathVolumeTest(2, false) + } + + test("single driver hostPath volume gets mounted w/ readOnly option") { + hostPathVolumeTest(1, true) + } + + test("multiple driver hostPath volumes get mounted w/ readOnly option") { + hostPathVolumeTest(2, true) + } + + private def hostPathVolumeTest(numVolumes: Int, readOnly: Boolean): Unit = { + val sparkConf = new SparkConf() + .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod") + .set(CONTAINER_IMAGE, "spark-driver:latest") + for (i <- 0 until numVolumes) { + sparkConf.set(s"spark.kubernetes.driver.volumes.hostPath.hostPath-$i.mount.path", + s"/opt/mount$i") + sparkConf.set(s"spark.kubernetes.driver.volumes.hostPath.hostPath-$i.options.path", + s"/tmp/mount$i") + if (readOnly) { + sparkConf.set(s"spark.kubernetes.driver.volumes.hostPath.hostPath-$i.mount.readOnly", + "true") + } + } + val kubernetesConf = KubernetesConf( + sparkConf, + KubernetesDriverSpecificConf( + None, + APP_NAME, + MAIN_CLASS, + APP_ARGS), + RESOURCE_NAME_PREFIX, + APP_ID, + DRIVER_LABELS, + DRIVER_ANNOTATIONS, + Map.empty, + Map.empty) + val step = new BasicDriverFeatureStep(kubernetesConf) + val driver = step.configurePod(SparkPod.initialPod()) + + assert(driver.container.getVolumeMounts.size() === numVolumes) + assert(driver.pod.getSpec.getVolumes.size() === numVolumes) + for (i <- 0 until numVolumes) { + assert(driver.container.getVolumeMounts.asScala + .exists(v => (v.getName == s"hostPath-$i" && v.getMountPath == s"/opt/mount$i"))) + assert(driver.pod.getSpec.getVolumes.asScala + .exists(v => (v.getName == s"hostPath-$i" && v.getHostPath.getPath == s"/tmp/mount$i"))) + if (readOnly) { + assert(driver.container.getVolumeMounts.get(i).getReadOnly == true) + } + } + } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index 9182134b3337..3ccba76ed668 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -155,6 +155,62 @@ class BasicExecutorFeatureStepSuite checkOwnerReferences(executor.pod, DRIVER_POD_UID) } + test("single executor hostPath volume gets mounted") { + hostPathVolumeTest(1, false) + } + + test("multiple executor hostPath volumes get mounted") { + hostPathVolumeTest(2, false) + } + + test("single executor hostPath volume gets mounted w/ readOnly option") { + hostPathVolumeTest(1, true) + } + + test("multiple executor hostPath volumes get mounted w/ readOnly option") { + hostPathVolumeTest(2, true) + } + + private def hostPathVolumeTest(numVolumes: Int, readOnly: Boolean): Unit = { + val conf = baseConf.clone() + for (i <- 0 until numVolumes) { + conf.set(s"spark.kubernetes.executor.volumes.hostPath.hostPath-$i.mount.path", + s"/opt/mount$i") + conf.set(s"spark.kubernetes.executor.volumes.hostPath.hostPath-$i.options.path", + s"/tmp/mount$i") + if (readOnly) { + conf.set(s"spark.kubernetes.executor.volumes.hostPath.hostPath-$i.mount.readOnly", + "true") + } + } + val step = new BasicExecutorFeatureStep( + KubernetesConf( + conf, + KubernetesExecutorSpecificConf("1", DRIVER_POD), + RESOURCE_NAME_PREFIX, + APP_ID, + LABELS, + ANNOTATIONS, + Map.empty, + Map.empty)) + val executor = step.configurePod(SparkPod.initialPod()) + + assert(executor.container.getImage === EXECUTOR_IMAGE) + assert(executor.container.getVolumeMounts.size() === numVolumes) + assert(executor.pod.getSpec.getVolumes.size() === numVolumes) + for (i <- 0 until numVolumes) { + assert(executor.container.getVolumeMounts.asScala + .exists(v => (v.getName == s"hostPath-$i" && v.getMountPath == s"/opt/mount$i"))) + assert(executor.pod.getSpec.getVolumes.asScala + .exists(v => (v.getName == s"hostPath-$i" && v.getHostPath.getPath == s"/tmp/mount$i"))) + if (readOnly) { + assert(executor.container.getVolumeMounts.get(i).getReadOnly == true) + } + } + + checkOwnerReferences(executor.pod, DRIVER_POD_UID) + } + // There is always exactly one controller reference, and it points to the driver pod. private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = { assert(executor.getMetadata.getOwnerReferences.size() === 1) From 9d361ba4363bf717e9e6c1840711d58f69c6096a Mon Sep 17 00:00:00 2001 From: Andrew Korzhuev Date: Mon, 7 May 2018 19:03:05 +0200 Subject: [PATCH 03/12] Address comments, rewrite volume mount to step --- .../spark/deploy/k8s/KubernetesConf.scala | 13 +- .../deploy/k8s/KubernetesVolumeSpec.scala | 12 +- .../deploy/k8s/KubernetesVolumeUtils.scala | 124 +++--------------- .../k8s/features/BasicDriverFeatureStep.scala | 9 +- .../features/BasicExecutorFeatureStep.scala | 8 +- .../features/MountVolumesFeatureStep.scala | 69 ++++++++++ .../k8s/submit/KubernetesDriverBuilder.scala | 23 ++-- .../k8s/KubernetesExecutorBuilder.scala | 30 +++-- .../BasicDriverFeatureStepSuite.scala | 84 ++---------- .../BasicExecutorFeatureStepSuite.scala | 65 +-------- ...ubernetesCredentialsFeatureStepSuite.scala | 9 +- .../DriverServiceFeatureStepSuite.scala | 18 ++- .../features/EnvSecretsFeatureStepSuite.scala | 3 +- .../features/LocalDirsFeatureStepSuite.scala | 3 +- .../MountSecretsFeatureStepSuite.scala | 3 +- .../spark/deploy/k8s/submit/ClientSuite.scala | 3 +- .../submit/KubernetesDriverBuilderSuite.scala | 6 +- .../k8s/KubernetesExecutorBuilderSuite.scala | 6 +- 18 files changed, 196 insertions(+), 292 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index 5a944187a709..8832b65982a3 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -55,7 +55,8 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf]( roleAnnotations: Map[String, String], roleSecretNamesToMountPaths: Map[String, String], roleSecretEnvNamesToKeyRefs: Map[String, String], - roleEnvs: Map[String, String]) { + roleEnvs: Map[String, String], + roleVolumes: Iterable[KubernetesVolumeSpec]) { def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE) @@ -134,6 +135,8 @@ private[spark] object KubernetesConf { sparkConf, KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX) val driverEnvs = KubernetesUtils.parsePrefixedKeyValuePairs( sparkConf, KUBERNETES_DRIVER_ENV_PREFIX) + val driverVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix( + sparkConf, KUBERNETES_DRIVER_VOLUMES_PREFIX) KubernetesConf( sparkConfWithMainAppJar, @@ -144,7 +147,8 @@ private[spark] object KubernetesConf { driverAnnotations, driverSecretNamesToMountPaths, driverSecretEnvNamesToKeyRefs, - driverEnvs) + driverEnvs, + driverVolumes) } def createExecutorConf( @@ -176,6 +180,8 @@ private[spark] object KubernetesConf { val executorEnvSecrets = KubernetesUtils.parsePrefixedKeyValuePairs( sparkConf, KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX) val executorEnv = sparkConf.getExecutorEnv.toMap + val executorVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix( + sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX) KubernetesConf( sparkConf.clone(), @@ -186,6 +192,7 @@ private[spark] object KubernetesConf { executorAnnotations, executorMountSecrets, executorEnvSecrets, - executorEnv) + executorEnv, + executorVolumes) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala index afd6524aed54..6482a973ffd5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala @@ -16,13 +16,15 @@ */ package org.apache.spark.deploy.k8s -import scala.collection.mutable.Map +import scala.collection.immutable.Map private[spark] case class KubernetesVolumeSpec( - var mountPath: Option[String], - var mountReadOnly: Option[Boolean], - var optionsSpec: Map[String, String]) + volumeName: String, + volumeType: String, // ADT + mountPath: String, + mountReadOnly: Boolean, + optionsSpec: Map[String, String]) private[spark] object KubernetesVolumeSpec { - def emptySpec(): KubernetesVolumeSpec = new KubernetesVolumeSpec(None, None, Map()) + def emptySpec(): KubernetesVolumeSpec = new KubernetesVolumeSpec("", "", "", false, Map()) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala index ed05acd23f5a..feb196608d1b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala @@ -25,118 +25,36 @@ import org.apache.spark.deploy.k8s.Config._ private[spark] object KubernetesVolumeUtils { - /** - * Given hostPath volume specs, add volume to pod and volume mount to container. - * - * @param pod original specification of the pod - * @param container original specification of the container - * @param sparkConf Spark configuration - * @param prefix the prefix for volume configuration - * @return a tuple of (pod with the volume(s) added, container with mount(s) added) - */ - def addVolumes( - pod: Pod, - container: Container, - sparkConf: SparkConf, - prefix : String): (Pod, Container) = { - val hostPathVolumeSpecs = parseHostPathVolumesWithPrefix(sparkConf, prefix) - addHostPathVolumes(pod, container, hostPathVolumeSpecs) - } - /** * Extract Spark volume configuration properties with a given name prefix. * * @param sparkConf Spark configuration * @param prefix the given property name prefix - * @param volumeTypeKey the given property name prefix * @return a Map storing with volume name as key and spec as value */ def parseVolumesWithPrefix( - sparkConf: SparkConf, - prefix: String, - volumeTypeKey: String): Map[String, KubernetesVolumeSpec] = { - val volumes = HashMap[String, KubernetesVolumeSpec]() - val properties = sparkConf.getAllWithPrefix(s"$prefix$volumeTypeKey.").toList - // Extract volume names - properties.foreach { - k => - val keys = k._1.split("\\.") - if (keys.nonEmpty && !volumes.contains(keys(0))) { - volumes.update(keys(0), KubernetesVolumeSpec.emptySpec()) - } - } - // Populate spec - volumes.foreach { - case (name, spec) => - properties.foreach { - k => - k._1.split("\\.") match { - case Array(`name`, KUBERNETES_VOLUMES_MOUNT_KEY, KUBERNETES_VOLUMES_PATH_KEY) => - spec.mountPath = Some(k._2) - case Array(`name`, KUBERNETES_VOLUMES_MOUNT_KEY, KUBERNETES_VOLUMES_READONLY_KEY) => - spec.mountReadOnly = Some(k._2.toBoolean) - case Array(`name`, KUBERNETES_VOLUMES_OPTIONS_KEY, option) => - spec.optionsSpec.update(option, k._2) - case _ => - None - } - } + sparkConf: SparkConf, + prefix: String): Iterable[KubernetesVolumeSpec] = { + val properties = sparkConf.getAllWithPrefix(prefix) + + val propsByTypeName = properties.map { case (k, v) => + k.split('.').toSeq match { + case tpe :: name :: rest => ((tpe, name), (rest.mkString("."), v)) + } + }.groupBy(_._1).mapValues(_.map(_._2)) + + propsByTypeName.map { case ((tpe, name), props) => + val mountProps = props.filter(_._1.startsWith(KUBERNETES_VOLUMES_MOUNT_KEY)).toMap + val options = props.filter(_._1.startsWith(KUBERNETES_VOLUMES_OPTIONS_KEY)).toMap + + KubernetesVolumeSpec( + volumeName = name, + volumeType = tpe, + mountPath = mountProps(KUBERNETES_VOLUMES_PATH_KEY), + mountReadOnly = mountProps(KUBERNETES_VOLUMES_READONLY_KEY).toBoolean, + optionsSpec = options + ) } - volumes.toMap - } - - /** - * Extract Spark hostPath volume configuration properties with a given name prefix and - * return the result as a Map. - * - * @param sparkConf Spark configuration - * @param prefix the given property name prefix - * @return a Map storing with volume name as key and spec as value - */ - def parseHostPathVolumesWithPrefix( - sparkConf: SparkConf, - prefix: String): Map[String, KubernetesVolumeSpec] = { - parseVolumesWithPrefix(sparkConf, prefix, KUBERNETES_VOLUMES_HOSTPATH_KEY) } - /** - * Given hostPath volume specs, add volume to pod and volume mount to container. - * - * @param pod original specification of the pod - * @param container original specification of the container - * @param volumes list of named volume specs - * @return a tuple of (pod with the volume(s) added, container with mount(s) added) - */ - def addHostPathVolumes( - pod: Pod, - container: Container, - volumes: Map[String, KubernetesVolumeSpec]): (Pod, Container) = { - val podBuilder = new PodBuilder(pod).editOrNewSpec() - val containerBuilder = new ContainerBuilder(container) - volumes foreach { - case (name, spec) => - var hostPath: Option[String] = None - if (spec.optionsSpec.contains(KUBERNETES_VOLUMES_PATH_KEY)) { - hostPath = Some(spec.optionsSpec(KUBERNETES_VOLUMES_PATH_KEY)) - } - if (hostPath.isDefined && spec.mountPath.isDefined) { - podBuilder.addToVolumes(new VolumeBuilder() - .withHostPath(new HostPathVolumeSource(hostPath.get)) - .withName(name) - .build()) - val volumeBuilder = new VolumeMountBuilder() - .withMountPath(spec.mountPath.get) - .withName(name) - if (spec.mountReadOnly.isDefined) { - containerBuilder - .addToVolumeMounts(volumeBuilder - .withReadOnly(spec.mountReadOnly.get) - .build()) - } else { - containerBuilder.addToVolumeMounts(volumeBuilder.build()) - } - } - } - (podBuilder.endSpec().build(), containerBuilder.build()) - } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 8f94f1620ead..46a7dc264898 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -110,14 +110,7 @@ private[spark] class BasicDriverFeatureStep( .endSpec() .build() - val (driverPodWithVolumes, driverContainerVolumes) = - KubernetesVolumeUtils.addVolumes( - driverPod, - driverContainer, - conf.sparkConf, - KUBERNETES_DRIVER_VOLUMES_PREFIX) - - SparkPod(driverPodWithVolumes, driverContainerVolumes) + SparkPod(driverPod, driverContainer) } override def getAdditionalPodSystemProperties(): Map[String, String] = { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 2e4b091daedd..df0296ac495b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -173,13 +173,7 @@ private[spark] class BasicExecutorFeatureStep( .endSpec() .build() - val (executorPodWithVolumes, executorContainerWithVolumes) = - KubernetesVolumeUtils.addVolumes(executorPod, - containerWithLimitCores, - kubernetesConf.sparkConf, - KUBERNETES_EXECUTOR_VOLUMES_PREFIX) - - SparkPod(executorPodWithVolumes, executorContainerWithVolumes) + SparkPod(executorPod, executorContainer) } override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala new file mode 100644 index 000000000000..9eb36bfebd9b --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import io.fabric8.kubernetes.api.model._ + +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, KubernetesVolumeSpec, SparkPod} +import org.apache.spark.deploy.k8s.Config._ + +private[spark] class MountVolumesFeatureStep( + kubernetesConf: KubernetesConf[_ <: KubernetesRoleSpecificConf]) + extends KubernetesFeatureConfigStep { + + override def configurePod(pod: SparkPod): SparkPod = { + val (volumeMounts, volumes) = constructVolumes(kubernetesConf.roleVolumes).unzip + + val podWithVolumes = new PodBuilder(pod.pod) + .editSpec() + .addToVolumes(volumes.toSeq: _*) + .endSpec() + .build() + + val containerWithLocalDirVolumeMounts = new ContainerBuilder(pod.container) + .addToVolumeMounts(volumeMounts.toSeq: _*) + .build() + + SparkPod(podWithVolumes, containerWithLocalDirVolumeMounts) + } + + override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty + + override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty + + private def constructVolumes( + volumeSpecs: Iterable[KubernetesVolumeSpec]): Iterable[(VolumeMount, Volume)] = { + volumeSpecs.map { spec => + val volumeMount = new VolumeMountBuilder() + .withMountPath(spec.mountPath) + .withReadOnly(spec.mountReadOnly) + .withName(spec.volumeName) + .build() + + val volume = spec.volumeType match { + case KUBERNETES_VOLUMES_HOSTPATH_KEY => + val hostPath = spec.optionsSpec(KUBERNETES_VOLUMES_PATH_KEY) + new VolumeBuilder() + .withHostPath(new HostPathVolumeSource(hostPath)) + .withName(spec.volumeName) + .build() + } + + (volumeMount, volume) + } + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala index fdc5eb0d7583..ed32f9e33eda 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala @@ -32,10 +32,13 @@ private[spark] class KubernetesDriverBuilder( new MountSecretsFeatureStep(_), provideEnvSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf] => EnvSecretsFeatureStep) = - new EnvSecretsFeatureStep(_), + new EnvSecretsFeatureStep(_), provideLocalDirsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]) => LocalDirsFeatureStep = - new LocalDirsFeatureStep(_)) { + new LocalDirsFeatureStep(_), + provideVolumesStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf] + => MountVolumesFeatureStep) = + new MountVolumesFeatureStep(_)) { def buildFromFeatures( kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]): KubernetesDriverSpec = { @@ -44,13 +47,17 @@ private[spark] class KubernetesDriverBuilder( provideCredentialsStep(kubernetesConf), provideServiceStep(kubernetesConf), provideLocalDirsStep(kubernetesConf)) - var allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) { - baseFeatures ++ Seq(provideSecretsStep(kubernetesConf)) - } else baseFeatures + val secretFeature = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) { + Seq(provideSecretsStep(kubernetesConf)) + } else Nil + val envSecretFeature = if (kubernetesConf.roleSecretEnvNamesToKeyRefs.nonEmpty) { + Seq(provideEnvSecretsStep(kubernetesConf)) + } else Nil + val volumesFeature = if (kubernetesConf.roleVolumes.nonEmpty) { + Seq(provideVolumesStep(kubernetesConf)) + } else Nil - allFeatures = if (kubernetesConf.roleSecretEnvNamesToKeyRefs.nonEmpty) { - allFeatures ++ Seq(provideEnvSecretsStep(kubernetesConf)) - } else allFeatures + val allFeatures = baseFeatures ++ secretFeature ++ envSecretFeature ++ volumesFeature var spec = KubernetesDriverSpec.initialSpec(kubernetesConf.sparkConf.getAll.toMap) for (feature <- allFeatures) { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala index d5e1de36a58d..1acdb84b9b3b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala @@ -17,31 +17,39 @@ package org.apache.spark.scheduler.cluster.k8s import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, KubernetesRoleSpecificConf, SparkPod} -import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, EnvSecretsFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep} +import org.apache.spark.deploy.k8s.features._ private[spark] class KubernetesExecutorBuilder( - provideBasicStep: (KubernetesConf[KubernetesExecutorSpecificConf]) => BasicExecutorFeatureStep = + provideBasicStep: (KubernetesConf [KubernetesExecutorSpecificConf]) + => BasicExecutorFeatureStep = new BasicExecutorFeatureStep(_), - provideSecretsStep: - (KubernetesConf[_ <: KubernetesRoleSpecificConf]) => MountSecretsFeatureStep = + provideSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]) + => MountSecretsFeatureStep = new MountSecretsFeatureStep(_), provideEnvSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf] => EnvSecretsFeatureStep) = new EnvSecretsFeatureStep(_), provideLocalDirsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]) => LocalDirsFeatureStep = - new LocalDirsFeatureStep(_)) { + new LocalDirsFeatureStep(_), + provideVolumesStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf] + => MountVolumesFeatureStep) = + new MountVolumesFeatureStep(_)) { def buildFromFeatures( kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]): SparkPod = { val baseFeatures = Seq(provideBasicStep(kubernetesConf), provideLocalDirsStep(kubernetesConf)) - var allFeatures = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) { - baseFeatures ++ Seq(provideSecretsStep(kubernetesConf)) - } else baseFeatures + val secretFeature = if (kubernetesConf.roleSecretNamesToMountPaths.nonEmpty) { + Seq(provideSecretsStep(kubernetesConf)) + } else Nil + val secretEnvFeature = if (kubernetesConf.roleSecretEnvNamesToKeyRefs.nonEmpty) { + Seq(provideEnvSecretsStep(kubernetesConf)) + } else Nil + val volumesFeature = if (kubernetesConf.roleVolumes.nonEmpty) { + Seq(provideVolumesStep(kubernetesConf)) + } else Nil - allFeatures = if (kubernetesConf.roleSecretEnvNamesToKeyRefs.nonEmpty) { - allFeatures ++ Seq(provideEnvSecretsStep(kubernetesConf)) - } else allFeatures + val allFeatures = baseFeatures ++ secretFeature ++ secretEnvFeature ++ volumesFeature var executorPod = SparkPod.initialPod() for (feature <- allFeatures) { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index b4fe3ac8c467..df66cad27404 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -47,6 +47,12 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { TEST_IMAGE_PULL_SECRETS.map { secret => new LocalObjectReferenceBuilder().withName(secret).build() } + private val emptyDriverSpecificConf = KubernetesDriverSpecificConf( + None, + APP_NAME, + MAIN_CLASS, + APP_ARGS) + test("Check the pod respects all configurations from the user.") { val sparkConf = new SparkConf() @@ -59,18 +65,15 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { .set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS.mkString(",")) val kubernetesConf = KubernetesConf( sparkConf, - KubernetesDriverSpecificConf( - None, - APP_NAME, - MAIN_CLASS, - APP_ARGS), + emptyDriverSpecificConf, RESOURCE_NAME_PREFIX, APP_ID, DRIVER_LABELS, DRIVER_ANNOTATIONS, Map.empty, Map.empty, - DRIVER_ENVS) + DRIVER_ENVS, + Nil) val featureStep = new BasicDriverFeatureStep(kubernetesConf) val basePod = SparkPod.initialPod() @@ -129,18 +132,16 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { .set(CONTAINER_IMAGE, "spark-driver:latest") val kubernetesConf = KubernetesConf( sparkConf, - KubernetesDriverSpecificConf( - None, - APP_NAME, - MAIN_CLASS, - APP_ARGS), + emptyDriverSpecificConf, RESOURCE_NAME_PREFIX, APP_ID, DRIVER_LABELS, DRIVER_ANNOTATIONS, Map.empty, Map.empty, - Map.empty) + Map.empty, + Nil) + val step = new BasicDriverFeatureStep(kubernetesConf) val additionalProperties = step.getAdditionalPodSystemProperties() val expectedSparkConf = Map( @@ -152,63 +153,4 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { "spark.files" -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt") assert(additionalProperties === expectedSparkConf) } - - test("single driver hostPath volume gets mounted") { - hostPathVolumeTest(1, false) - } - - test("multiple driver hostPath volumes get mounted") { - hostPathVolumeTest(2, false) - } - - test("single driver hostPath volume gets mounted w/ readOnly option") { - hostPathVolumeTest(1, true) - } - - test("multiple driver hostPath volumes get mounted w/ readOnly option") { - hostPathVolumeTest(2, true) - } - - private def hostPathVolumeTest(numVolumes: Int, readOnly: Boolean): Unit = { - val sparkConf = new SparkConf() - .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod") - .set(CONTAINER_IMAGE, "spark-driver:latest") - for (i <- 0 until numVolumes) { - sparkConf.set(s"spark.kubernetes.driver.volumes.hostPath.hostPath-$i.mount.path", - s"/opt/mount$i") - sparkConf.set(s"spark.kubernetes.driver.volumes.hostPath.hostPath-$i.options.path", - s"/tmp/mount$i") - if (readOnly) { - sparkConf.set(s"spark.kubernetes.driver.volumes.hostPath.hostPath-$i.mount.readOnly", - "true") - } - } - val kubernetesConf = KubernetesConf( - sparkConf, - KubernetesDriverSpecificConf( - None, - APP_NAME, - MAIN_CLASS, - APP_ARGS), - RESOURCE_NAME_PREFIX, - APP_ID, - DRIVER_LABELS, - DRIVER_ANNOTATIONS, - Map.empty, - Map.empty) - val step = new BasicDriverFeatureStep(kubernetesConf) - val driver = step.configurePod(SparkPod.initialPod()) - - assert(driver.container.getVolumeMounts.size() === numVolumes) - assert(driver.pod.getSpec.getVolumes.size() === numVolumes) - for (i <- 0 until numVolumes) { - assert(driver.container.getVolumeMounts.asScala - .exists(v => (v.getName == s"hostPath-$i" && v.getMountPath == s"/opt/mount$i"))) - assert(driver.pod.getSpec.getVolumes.asScala - .exists(v => (v.getName == s"hostPath-$i" && v.getHostPath.getPath == s"/tmp/mount$i"))) - if (readOnly) { - assert(driver.container.getVolumeMounts.get(i).getReadOnly == true) - } - } - } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index 3ccba76ed668..9e8b217132b9 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -88,7 +88,8 @@ class BasicExecutorFeatureStepSuite ANNOTATIONS, Map.empty, Map.empty, - Map.empty)) + Map.empty, + Nil)) val executor = step.configurePod(SparkPod.initialPod()) // The executor pod name and default labels. @@ -126,7 +127,8 @@ class BasicExecutorFeatureStepSuite ANNOTATIONS, Map.empty, Map.empty, - Map.empty)) + Map.empty, + Nil)) assert(step.configurePod(SparkPod.initialPod()).pod.getSpec.getHostname.length === 63) } @@ -145,7 +147,8 @@ class BasicExecutorFeatureStepSuite ANNOTATIONS, Map.empty, Map.empty, - Map("qux" -> "quux"))) + Map("qux" -> "quux"), + Nil)) val executor = step.configurePod(SparkPod.initialPod()) checkEnv(executor, @@ -155,62 +158,6 @@ class BasicExecutorFeatureStepSuite checkOwnerReferences(executor.pod, DRIVER_POD_UID) } - test("single executor hostPath volume gets mounted") { - hostPathVolumeTest(1, false) - } - - test("multiple executor hostPath volumes get mounted") { - hostPathVolumeTest(2, false) - } - - test("single executor hostPath volume gets mounted w/ readOnly option") { - hostPathVolumeTest(1, true) - } - - test("multiple executor hostPath volumes get mounted w/ readOnly option") { - hostPathVolumeTest(2, true) - } - - private def hostPathVolumeTest(numVolumes: Int, readOnly: Boolean): Unit = { - val conf = baseConf.clone() - for (i <- 0 until numVolumes) { - conf.set(s"spark.kubernetes.executor.volumes.hostPath.hostPath-$i.mount.path", - s"/opt/mount$i") - conf.set(s"spark.kubernetes.executor.volumes.hostPath.hostPath-$i.options.path", - s"/tmp/mount$i") - if (readOnly) { - conf.set(s"spark.kubernetes.executor.volumes.hostPath.hostPath-$i.mount.readOnly", - "true") - } - } - val step = new BasicExecutorFeatureStep( - KubernetesConf( - conf, - KubernetesExecutorSpecificConf("1", DRIVER_POD), - RESOURCE_NAME_PREFIX, - APP_ID, - LABELS, - ANNOTATIONS, - Map.empty, - Map.empty)) - val executor = step.configurePod(SparkPod.initialPod()) - - assert(executor.container.getImage === EXECUTOR_IMAGE) - assert(executor.container.getVolumeMounts.size() === numVolumes) - assert(executor.pod.getSpec.getVolumes.size() === numVolumes) - for (i <- 0 until numVolumes) { - assert(executor.container.getVolumeMounts.asScala - .exists(v => (v.getName == s"hostPath-$i" && v.getMountPath == s"/opt/mount$i"))) - assert(executor.pod.getSpec.getVolumes.asScala - .exists(v => (v.getName == s"hostPath-$i" && v.getHostPath.getPath == s"/tmp/mount$i"))) - if (readOnly) { - assert(executor.container.getVolumeMounts.get(i).getReadOnly == true) - } - } - - checkOwnerReferences(executor.pod, DRIVER_POD_UID) - } - // There is always exactly one controller reference, and it points to the driver pod. private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = { assert(executor.getMetadata.getOwnerReferences.size() === 1) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala index f81894f8055f..637993380a15 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala @@ -60,7 +60,8 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef Map.empty, Map.empty, Map.empty, - Map.empty) + Map.empty, + Nil) val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf) assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === BASE_DRIVER_POD) assert(kubernetesCredentialsStep.getAdditionalPodSystemProperties().isEmpty) @@ -90,7 +91,8 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef Map.empty, Map.empty, Map.empty, - Map.empty) + Map.empty, + Nil) val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf) assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === BASE_DRIVER_POD) @@ -127,7 +129,8 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef Map.empty, Map.empty, Map.empty, - Map.empty) + Map.empty, + Nil) val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf) val resolvedProperties = kubernetesCredentialsStep.getAdditionalPodSystemProperties() val expectedSparkConf = Map( diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala index f265522a8823..20118a78755a 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala @@ -66,7 +66,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Map.empty, - Map.empty)) + Map.empty, + Nil)) assert(configurationStep.configurePod(SparkPod.initialPod()) === SparkPod.initialPod()) assert(configurationStep.getAdditionalKubernetesResources().size === 1) assert(configurationStep.getAdditionalKubernetesResources().head.isInstanceOf[Service]) @@ -96,7 +97,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Map.empty, - Map.empty)) + Map.empty, + Nil)) val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX + DriverServiceFeatureStep.DRIVER_SVC_POSTFIX val expectedHostName = s"$expectedServiceName.my-namespace.svc" @@ -116,7 +118,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Map.empty, - Map.empty)) + Map.empty, + Nil)) val resolvedService = configurationStep .getAdditionalKubernetesResources() .head @@ -145,7 +148,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Map.empty, - Map.empty), + Map.empty, + Nil), clock) val driverService = configurationStep .getAdditionalKubernetesResources() @@ -171,7 +175,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Map.empty, - Map.empty), + Map.empty, + Nil), clock) fail("The driver bind address should not be allowed.") } catch { @@ -195,7 +200,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Map.empty, - Map.empty), + Map.empty, + Nil), clock) fail("The driver host address should not be allowed.") } catch { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala index 8b0b2d0739c7..c105e8840889 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala @@ -44,7 +44,8 @@ class EnvSecretsFeatureStepSuite extends SparkFunSuite{ Map.empty, Map.empty, envVarsToKeys, - Map.empty) + Map.empty, + Nil) val step = new EnvSecretsFeatureStep(kubernetesConf) val driverContainerWithEnvSecrets = step.configurePod(baseDriverPod).container diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala index 2542a02d3776..8250c33a9d43 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala @@ -44,7 +44,8 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Map.empty, - Map.empty) + Map.empty, + Nil) } test("Resolve to default local dir if neither env nor configuration are set") { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala index 915579377412..f33daf57a077 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala @@ -42,7 +42,8 @@ class MountSecretsFeatureStepSuite extends SparkFunSuite { Map.empty, secretNamesToMountPaths, Map.empty, - Map.empty) + Map.empty, + Nil) val step = new MountSecretsFeatureStep(kubernetesConf) val driverPodWithSecretsMounted = step.configurePod(baseDriverPod).pod diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala index 0775338098a1..e8d8be2d54dd 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala @@ -143,7 +143,8 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Map.empty, - Map.empty) + Map.empty, + Nil) when(driverBuilder.buildFromFeatures(kubernetesConf)).thenReturn(BUILT_KUBERNETES_SPEC) when(kubernetesClient.pods()).thenReturn(podOperations) when(podOperations.withName(POD_NAME)).thenReturn(namedPods) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala index cb724068ea4f..1b1d94c9735e 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala @@ -70,7 +70,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Map.empty, - Map.empty) + Map.empty, + Nil) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, @@ -93,7 +94,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map("secret" -> "secretMountPath"), Map("EnvName" -> "SecretName:secretKey"), - Map.empty) + Map.empty, + Nil) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala index 753cd30a237f..f2f3c3906f25 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala @@ -54,7 +54,8 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Map.empty, - Map.empty) + Map.empty, + Nil) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, LOCAL_DIRS_STEP_TYPE) } @@ -70,7 +71,8 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { Map.empty, Map("secret" -> "secretMountPath"), Map("secret-name" -> "secret-key"), - Map.empty) + Map.empty, + Nil) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, From 5edce1bde9e890cbb65dc0e2ca862a1d666fb226 Mon Sep 17 00:00:00 2001 From: Andrew Korzhuev Date: Mon, 7 May 2018 19:08:12 +0200 Subject: [PATCH 04/12] Add persistant volume claim --- .../main/scala/org/apache/spark/deploy/k8s/Config.scala | 2 ++ .../deploy/k8s/features/MountVolumesFeatureStep.scala | 8 ++++++++ 2 files changed, 10 insertions(+) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 972fd6e3613d..0a55d7b4fd86 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -172,8 +172,10 @@ private[spark] object Config extends Logging { val KUBERNETES_EXECUTOR_VOLUMES_PREFIX = "spark.kubernetes.executor.volumes." val KUBERNETES_VOLUMES_HOSTPATH_KEY = "hostPath" + val KUBERNETES_VOLUMES_PVC_KEY = "persistentVolumeClaim" val KUBERNETES_VOLUMES_MOUNT_KEY = "mount" val KUBERNETES_VOLUMES_PATH_KEY = "path" + val KUBERNETES_VOLUMES_CLAIM_NAME_KEY = "claimName" val KUBERNETES_VOLUMES_READONLY_KEY = "readOnly" val KUBERNETES_VOLUMES_OPTIONS_KEY = "options" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala index 9eb36bfebd9b..5600d35ef0c8 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala @@ -61,6 +61,14 @@ private[spark] class MountVolumesFeatureStep( .withHostPath(new HostPathVolumeSource(hostPath)) .withName(spec.volumeName) .build() + + case KUBERNETES_VOLUMES_PVC_KEY => + val claimName = spec.optionsSpec(KUBERNETES_VOLUMES_CLAIM_NAME_KEY) + new VolumeBuilder() + .withPersistentVolumeClaim( + new PersistentVolumeClaimVolumeSource(claimName, spec.mountReadOnly)) + .withName(spec.volumeName) + .build() } (volumeMount, volume) From 9bdbd73cc329845835e05e80663dfdbd8f1cd996 Mon Sep 17 00:00:00 2001 From: Andrew Korzhuev Date: Tue, 22 May 2018 14:18:41 +0200 Subject: [PATCH 05/12] emptyDir support. Tests --- .../org/apache/spark/deploy/k8s/Config.scala | 3 + .../features/MountVolumesFeatureStep.scala | 19 ++-- .../features/LocalDirsFeatureStepSuite.scala | 2 +- .../MountVolumesFeatureStepSuite.scala | 107 ++++++++++++++++++ .../submit/KubernetesDriverBuilderSuite.scala | 44 ++++++- .../k8s/KubernetesExecutorBuilderSuite.scala | 37 +++++- 6 files changed, 198 insertions(+), 14 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 0a55d7b4fd86..c61ec8200bc7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -173,11 +173,14 @@ private[spark] object Config extends Logging { val KUBERNETES_VOLUMES_HOSTPATH_KEY = "hostPath" val KUBERNETES_VOLUMES_PVC_KEY = "persistentVolumeClaim" + val KUBERNETES_VOLUMES_EMPTYDIR_KEY = "emptyDir" val KUBERNETES_VOLUMES_MOUNT_KEY = "mount" val KUBERNETES_VOLUMES_PATH_KEY = "path" val KUBERNETES_VOLUMES_CLAIM_NAME_KEY = "claimName" val KUBERNETES_VOLUMES_READONLY_KEY = "readOnly" val KUBERNETES_VOLUMES_OPTIONS_KEY = "options" + val KUBERNETES_VOLUMES_MEDIUM_KEY = "medium" + val KUBERNETES_VOLUMES_SIZE_LIMIT_KEY = "sizeLimit" val KUBERNETES_DRIVER_ENV_PREFIX = "spark.kubernetes.driverEnv." } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala index 5600d35ef0c8..bb8e45338e59 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala @@ -34,11 +34,11 @@ private[spark] class MountVolumesFeatureStep( .endSpec() .build() - val containerWithLocalDirVolumeMounts = new ContainerBuilder(pod.container) + val containerWithVolumeMounts = new ContainerBuilder(pod.container) .addToVolumeMounts(volumeMounts.toSeq: _*) .build() - SparkPod(podWithVolumes, containerWithLocalDirVolumeMounts) + SparkPod(podWithVolumes, containerWithVolumeMounts) } override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty @@ -54,23 +54,28 @@ private[spark] class MountVolumesFeatureStep( .withName(spec.volumeName) .build() - val volume = spec.volumeType match { + val volumeBuilder = spec.volumeType match { case KUBERNETES_VOLUMES_HOSTPATH_KEY => val hostPath = spec.optionsSpec(KUBERNETES_VOLUMES_PATH_KEY) new VolumeBuilder() .withHostPath(new HostPathVolumeSource(hostPath)) - .withName(spec.volumeName) - .build() case KUBERNETES_VOLUMES_PVC_KEY => val claimName = spec.optionsSpec(KUBERNETES_VOLUMES_CLAIM_NAME_KEY) new VolumeBuilder() .withPersistentVolumeClaim( new PersistentVolumeClaimVolumeSource(claimName, spec.mountReadOnly)) - .withName(spec.volumeName) - .build() + + case KUBERNETES_VOLUMES_EMPTYDIR_KEY => + val medium = spec.optionsSpec(KUBERNETES_VOLUMES_MEDIUM_KEY) + val sizeLimit = spec.optionsSpec(KUBERNETES_VOLUMES_SIZE_LIMIT_KEY) + new VolumeBuilder() + .withEmptyDir(new EmptyDirVolumeSource(medium, new Quantity(sizeLimit))) } + val volume = volumeBuilder.withName(spec.volumeName).build() + + (volumeMount, volume) } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala index 8250c33a9d43..61459c3c7734 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala @@ -21,7 +21,7 @@ import org.mockito.Mockito import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesExecutorSpecificConf, KubernetesRoleSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod} class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { private val defaultLocalDir = "/var/data/default-local-dir" diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala new file mode 100644 index 000000000000..dcc88f4c21b4 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesVolumeSpec, SparkPod} +import org.apache.spark.deploy.k8s.Config.{KUBERNETES_VOLUMES_SIZE_LIMIT_KEY, _} + +class MountVolumesFeatureStepSuite extends SparkFunSuite { + private val sparkConf = new SparkConf(false) + private val emptyKubernetesConf = KubernetesConf( + sparkConf, + KubernetesDriverSpecificConf( + None, + "app-name", + "main", + Seq.empty), + "resource", + "app-id", + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Nil) + + test("Mounts hostPath volumes") { + val volumeConf = KubernetesVolumeSpec( + "testVolume", + KUBERNETES_VOLUMES_HOSTPATH_KEY, + "/tmp", + false, + Map(KUBERNETES_VOLUMES_PATH_KEY -> "/hostPath/tmp") + ) + val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: Nil) + val step = new MountVolumesFeatureStep(kubernetesConf) + val configuredPod = step.configurePod(SparkPod.initialPod()) + + assert(configuredPod.pod.getSpec.getVolumes.size() === 1) + assert(configuredPod.pod.getSpec.getVolumes.get(0).getHostPath.getPath === "/hostPath/tmp") + assert(configuredPod.container.getVolumeMounts.size() === 1) + assert(configuredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp") + assert(configuredPod.container.getVolumeMounts.get(0).getName === "testVolume") + assert(configuredPod.container.getVolumeMounts.get(0).getReadOnly === false) + } + + test("Mounts pesistentVolumeClaims") { + val volumeConf = KubernetesVolumeSpec( + "testVolume", + KUBERNETES_VOLUMES_PVC_KEY, + "/tmp", + true, + Map(KUBERNETES_VOLUMES_CLAIM_NAME_KEY -> "pvcClaim") + ) + val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: Nil) + val step = new MountVolumesFeatureStep(kubernetesConf) + val configuredPod = step.configurePod(SparkPod.initialPod()) + + assert(configuredPod.pod.getSpec.getVolumes.size() === 1) + val pvcClaim = configuredPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim + assert(pvcClaim.getClaimName === "pvcClaim") + assert(configuredPod.container.getVolumeMounts.size() === 1) + assert(configuredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp") + assert(configuredPod.container.getVolumeMounts.get(0).getName === "testVolume") + assert(configuredPod.container.getVolumeMounts.get(0).getReadOnly === true) + + } + + test("Mounts emptyDir") { + val volumeConf = KubernetesVolumeSpec( + "testVolume", + KUBERNETES_VOLUMES_EMPTYDIR_KEY, + "/tmp", + false, + Map( + KUBERNETES_VOLUMES_MEDIUM_KEY -> "Memory", + KUBERNETES_VOLUMES_SIZE_LIMIT_KEY -> "6G") + ) + val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: Nil) + val step = new MountVolumesFeatureStep(kubernetesConf) + val configuredPod = step.configurePod(SparkPod.initialPod()) + + assert(configuredPod.pod.getSpec.getVolumes.size() === 1) + val emptyDir = configuredPod.pod.getSpec.getVolumes.get(0).getEmptyDir + assert(emptyDir.getMedium === "Memory") + assert(emptyDir.getSizeLimit.getAmount === "6G") + assert(configuredPod.container.getVolumeMounts.size() === 1) + assert(configuredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp") + assert(configuredPod.container.getVolumeMounts.get(0).getName === "testVolume") + assert(configuredPod.container.getVolumeMounts.get(0).getReadOnly === false) + + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala index 1b1d94c9735e..8c00db58b647 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala @@ -17,8 +17,9 @@ package org.apache.spark.deploy.k8s.submit import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf} -import org.apache.spark.deploy.k8s.features.{BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, DriverServiceFeatureStep, EnvSecretsFeatureStep, KubernetesFeaturesTestUtils, LocalDirsFeatureStep, MountSecretsFeatureStep} +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesVolumeSpec} +import org.apache.spark.deploy.k8s.Config.KUBERNETES_VOLUMES_HOSTPATH_KEY +import org.apache.spark.deploy.k8s.features._ class KubernetesDriverBuilderSuite extends SparkFunSuite { @@ -28,6 +29,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { private val LOCAL_DIRS_STEP_TYPE = "local-dirs" private val SECRETS_STEP_TYPE = "mount-secrets" private val ENV_SECRETS_STEP_TYPE = "env-secrets" + private val MOUNT_VOLUMES_STEP_TYPE = "mount-volumes" private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( BASIC_STEP_TYPE, classOf[BasicDriverFeatureStep]) @@ -47,6 +49,9 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { private val envSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( ENV_SECRETS_STEP_TYPE, classOf[EnvSecretsFeatureStep]) + private val mountVolumesStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( + MOUNT_VOLUMES_STEP_TYPE, classOf[MountVolumesFeatureStep]) + private val builderUnderTest: KubernetesDriverBuilder = new KubernetesDriverBuilder( _ => basicFeatureStep, @@ -54,7 +59,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { _ => serviceStep, _ => secretsStep, _ => envSecretsStep, - _ => localDirsStep) + _ => localDirsStep, + _ => mountVolumesStep) test("Apply fundamental steps all the time.") { val conf = KubernetesConf( @@ -107,6 +113,38 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { ) } + test("Apply volumes step if mounts are present.") { + val volumeSpec = KubernetesVolumeSpec( + "volume", + KUBERNETES_VOLUMES_HOSTPATH_KEY, + "/tmp", + false, + Map.empty) + val conf = KubernetesConf( + new SparkConf(false), + KubernetesDriverSpecificConf( + None, + "test-app", + "main", + Seq.empty), + "prefix", + "appId", + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Map.empty, + volumeSpec :: Nil) + validateStepTypesApplied( + builderUnderTest.buildFromFeatures(conf), + BASIC_STEP_TYPE, + CREDENTIALS_STEP_TYPE, + SERVICE_STEP_TYPE, + LOCAL_DIRS_STEP_TYPE, + MOUNT_VOLUMES_STEP_TYPE) + } + + private def validateStepTypesApplied(resolvedSpec: KubernetesDriverSpec, stepTypes: String*) : Unit = { assert(resolvedSpec.systemProperties.size === stepTypes.size) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala index f2f3c3906f25..b0f4dc7b8ad6 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala @@ -19,14 +19,16 @@ package org.apache.spark.scheduler.cluster.k8s import io.fabric8.kubernetes.api.model.PodBuilder import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod} -import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, EnvSecretsFeatureStep, KubernetesFeaturesTestUtils, LocalDirsFeatureStep, MountSecretsFeatureStep} +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, KubernetesVolumeSpec, SparkPod} +import org.apache.spark.deploy.k8s.Config.KUBERNETES_VOLUMES_HOSTPATH_KEY +import org.apache.spark.deploy.k8s.features._ class KubernetesExecutorBuilderSuite extends SparkFunSuite { private val BASIC_STEP_TYPE = "basic" private val SECRETS_STEP_TYPE = "mount-secrets" private val ENV_SECRETS_STEP_TYPE = "env-secrets" private val LOCAL_DIRS_STEP_TYPE = "local-dirs" + private val MOUNT_VOLUMES_STEP_TYPE = "mount-volumes" private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( BASIC_STEP_TYPE, classOf[BasicExecutorFeatureStep]) @@ -36,12 +38,15 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { ENV_SECRETS_STEP_TYPE, classOf[EnvSecretsFeatureStep]) private val localDirsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( LOCAL_DIRS_STEP_TYPE, classOf[LocalDirsFeatureStep]) + private val mountVolumesStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( + MOUNT_VOLUMES_STEP_TYPE, classOf[MountVolumesFeatureStep]) private val builderUnderTest = new KubernetesExecutorBuilder( _ => basicFeatureStep, _ => mountSecretsStep, _ => envSecretsStep, - _ => localDirsStep) + _ => localDirsStep, + _ => mountVolumesStep) test("Basic steps are consistently applied.") { val conf = KubernetesConf( @@ -81,6 +86,32 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { ENV_SECRETS_STEP_TYPE) } + test("Apply volumes step if mounts are present.") { + val volumeSpec = KubernetesVolumeSpec( + "volume", + KUBERNETES_VOLUMES_HOSTPATH_KEY, + "/tmp", + false, + Map.empty) + val conf = KubernetesConf( + new SparkConf(false), + KubernetesExecutorSpecificConf( + "executor-id", new PodBuilder().build()), + "prefix", + "appId", + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Map.empty, + volumeSpec :: Nil) + validateStepTypesApplied( + builderUnderTest.buildFromFeatures(conf), + BASIC_STEP_TYPE, + LOCAL_DIRS_STEP_TYPE, + MOUNT_VOLUMES_STEP_TYPE) + } + private def validateStepTypesApplied(resolvedPod: SparkPod, stepTypes: String*): Unit = { assert(resolvedPod.pod.getMetadata.getLabels.size === stepTypes.size) stepTypes.foreach { stepType => From e68d34c833be9f9e4be2c23a8a5a3478ffa72b2b Mon Sep 17 00:00:00 2001 From: Andrew Korzhuev Date: Tue, 22 May 2018 14:53:42 +0200 Subject: [PATCH 06/12] Fix style. Add multiple mounts test --- .../deploy/k8s/KubernetesVolumeUtils.scala | 4 ---- .../MountVolumesFeatureStepSuite.scala | 23 +++++++++++++++++++ 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala index feb196608d1b..30ba4acba921 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala @@ -16,10 +16,6 @@ */ package org.apache.spark.deploy.k8s -import scala.collection.mutable.HashMap - -import io.fabric8.kubernetes.api.model._ - import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s.Config._ diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala index dcc88f4c21b4..8d848447af77 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala @@ -102,6 +102,29 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { assert(configuredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp") assert(configuredPod.container.getVolumeMounts.get(0).getName === "testVolume") assert(configuredPod.container.getVolumeMounts.get(0).getReadOnly === false) + } + + test("Mounts multiple volumes") { + val hpVolumeConf = KubernetesVolumeSpec( + "hpVolume", + KUBERNETES_VOLUMES_HOSTPATH_KEY, + "/tmp", + false, + Map(KUBERNETES_VOLUMES_PATH_KEY -> "/hostPath/tmp") + ) + val pvcVolumeConf = KubernetesVolumeSpec( + "checkpointVolume", + KUBERNETES_VOLUMES_PVC_KEY, + "/checkpoints", + true, + Map(KUBERNETES_VOLUMES_CLAIM_NAME_KEY -> "pvcClaim") + ) + val volumesConf = hpVolumeConf :: pvcVolumeConf :: Nil + val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumesConf) + val step = new MountVolumesFeatureStep(kubernetesConf) + val configuredPod = step.configurePod(SparkPod.initialPod()) + assert(configuredPod.pod.getSpec.getVolumes.size() === 2) + assert(configuredPod.container.getVolumeMounts.size() === 2) } } From e3f8d9a1ae0ab72231277e2b700cfcfcfefee519 Mon Sep 17 00:00:00 2001 From: Andrew Korzhuev Date: Tue, 22 May 2018 14:53:54 +0200 Subject: [PATCH 07/12] Document volume mounting usage --- docs/running-on-kubernetes.md | 48 +++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 4eac9bd9032e..4cfaeeedc545 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -630,5 +630,53 @@ specific to Spark on Kubernetes. Add as an environment variable to the executor container with name EnvName (case sensitive), the value referenced by key key in the data of the referenced Kubernetes Secret. For example, spark.kubernetes.executor.secrets.ENV_VAR=spark-secret:key. + + + spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.path + (none) + + Add the Kubernetes Volume named VolumeName of the VolumeType type to the driver pod on the path specified in the value. For example, + spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.mount.path=/checkpoint. + + + + spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.readOnly + (none) + + Specify if the mounted volume is read only or not. For example, + spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.mount.readOnly=false. + + + + spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.options.[OptionName] + (none) + + Configure Kubernetes Volume options passed to the Kubernetes with OptionName as key having specified value. For example, + spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.options.claimName=spark-pvc-claim. + + + + spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.path + (none) + + Add the Kubernetes Volume named VolumeName of the VolumeType type to the executor pod on the path specified in the value. For example, + spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.mount.path=/checkpoint. + + + + spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.readOnly + (none) + + Specify if the mounted volume is read only or not. For example, + spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.mount.readOnly=false. + + + + spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.options.[OptionName] + (none) + + Configure Kubernetes Volume options passed to the Kubernetes with OptionName as key having specified value. For example, + spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.options.claimName=spark-pvc-claim. + From beff9a9f5e71f0e722ac918bcb9614a5f9ddac39 Mon Sep 17 00:00:00 2001 From: Andrew Korzhuev Date: Thu, 24 May 2018 18:23:56 +0200 Subject: [PATCH 08/12] Add tests and refactor KubernetesVolumeUtils --- .../deploy/k8s/KubernetesVolumeUtils.scala | 29 +++++++++++---- .../k8s/KubernetesVolumeUtilsSuite.scala | 36 +++++++++++++++++++ 2 files changed, 58 insertions(+), 7 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala index 30ba4acba921..cc65cb8d8b51 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala @@ -33,15 +33,18 @@ private[spark] object KubernetesVolumeUtils { prefix: String): Iterable[KubernetesVolumeSpec] = { val properties = sparkConf.getAllWithPrefix(prefix) - val propsByTypeName = properties.map { case (k, v) => - k.split('.').toSeq match { - case tpe :: name :: rest => ((tpe, name), (rest.mkString("."), v)) - } - }.groupBy(_._1).mapValues(_.map(_._2)) + val propsByTypeName: Map[(String, String), Array[(String, String)]] = + properties.flatMap { case (k, v) => + k.split('.').toList match { + case tpe :: name :: rest => Some(((tpe, name), (rest.mkString("."), v))) + case _ => None + } + }.groupBy(_._1).mapValues(_.map(_._2)) propsByTypeName.map { case ((tpe, name), props) => - val mountProps = props.filter(_._1.startsWith(KUBERNETES_VOLUMES_MOUNT_KEY)).toMap - val options = props.filter(_._1.startsWith(KUBERNETES_VOLUMES_OPTIONS_KEY)).toMap + val propMap = props.toMap + val mountProps = getAllWithPrefix(propMap, s"$KUBERNETES_VOLUMES_MOUNT_KEY.") + val options = getAllWithPrefix(propMap, s"$KUBERNETES_VOLUMES_OPTIONS_KEY.") KubernetesVolumeSpec( volumeName = name, @@ -53,4 +56,16 @@ private[spark] object KubernetesVolumeUtils { } } + /** + * Extract subset of elements with keys matching on prefix, which is then excluded + * @param props properties to extract data from + * @param prefix prefix to match against + * @return subset of original props + */ + private def getAllWithPrefix(props: Map[String, String], prefix: String): Map[String, String] = { + props + .filterKeys(_.startsWith(prefix)) + .map { case (k, v) => k.substring(prefix.length) -> v } + } + } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala new file mode 100644 index 000000000000..515fe6760b87 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import org.apache.spark.{SparkConf, SparkFunSuite} + +class KubernetesVolumeUtilsSuite extends SparkFunSuite { + test("Parses volume options correctly") { + val sparkConf = new SparkConf(false) + sparkConf.set("test.volumeType.volumeName.mount.path", "/path") + sparkConf.set("test.volumeType.volumeName.mount.readOnly", "true") + sparkConf.set("test.volumeType.volumeName.options.option1", "value1") + sparkConf.set("test.volumeType.volumeName.options.option2", "value2") + + val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head + assert(volumeSpec.volumeName === "volumeName") + assert(volumeSpec.volumeType === "volumeType") + assert(volumeSpec.mountPath === "/path") + assert(volumeSpec.mountReadOnly === true) + assert(volumeSpec.optionsSpec === Map("option1" -> "value1", "option2" -> "value2")) + } +} \ No newline at end of file From 517de7c4b61265aa6a1c4ff4a6a705176c56b4c2 Mon Sep 17 00:00:00 2001 From: Andrew Korzhuev Date: Fri, 8 Jun 2018 16:00:40 +0200 Subject: [PATCH 09/12] Fix scalastyle. Improve documentation --- docs/running-on-kubernetes.md | 2 +- .../apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 4cfaeeedc545..fcc3214feecb 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -651,7 +651,7 @@ specific to Spark on Kubernetes. spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.options.[OptionName] (none) - Configure Kubernetes Volume options passed to the Kubernetes with OptionName as key having specified value. For example, + Configure Kubernetes Volume options passed to the Kubernetes with OptionName as key having specified value, must conform with Kubernetes option format. For example, spark.kubernetes.driver.volumes.persistentVolumeClaim.checkpointpvc.options.claimName=spark-pvc-claim. diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala index 515fe6760b87..debde915f30b 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala @@ -33,4 +33,4 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite { assert(volumeSpec.mountReadOnly === true) assert(volumeSpec.optionsSpec === Map("option1" -> "value1", "option2" -> "value2")) } -} \ No newline at end of file +} From d960e34e6309f1ef98d5437acdf9af097f5cf4f9 Mon Sep 17 00:00:00 2001 From: Andrew Korzhuev Date: Mon, 11 Jun 2018 16:22:58 +0200 Subject: [PATCH 10/12] Rework volume mounting to gracefully handle misconfiguration --- docs/running-on-kubernetes.md | 4 +- .../org/apache/spark/deploy/k8s/Config.scala | 19 ++-- .../spark/deploy/k8s/KubernetesConf.scala | 10 +- .../deploy/k8s/KubernetesVolumeSpec.scala | 18 ++-- .../deploy/k8s/KubernetesVolumeUtils.scala | 101 +++++++++++++----- .../features/BasicExecutorFeatureStep.scala | 2 +- .../features/MountVolumesFeatureStep.scala | 19 ++-- .../k8s/KubernetesVolumeUtilsSuite.scala | 64 +++++++++-- .../MountVolumesFeatureStepSuite.scala | 20 ++-- .../submit/KubernetesDriverBuilderSuite.scala | 7 +- .../k8s/KubernetesExecutorBuilderSuite.scala | 6 +- 11 files changed, 175 insertions(+), 95 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index fcc3214feecb..3d44c3ffe74a 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -648,7 +648,7 @@ specific to Spark on Kubernetes. - spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.options.[OptionName] + spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].options.[OptionName] (none) Configure Kubernetes Volume options passed to the Kubernetes with OptionName as key having specified value, must conform with Kubernetes option format. For example, @@ -672,7 +672,7 @@ specific to Spark on Kubernetes. - spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.options.[OptionName] + spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].options.[OptionName] (none) Configure Kubernetes Volume options passed to the Kubernetes with OptionName as key having specified value. For example, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index c61ec8200bc7..89173078a6f2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -171,16 +171,15 @@ private[spark] object Config extends Logging { val KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX = "spark.kubernetes.executor.secretKeyRef." val KUBERNETES_EXECUTOR_VOLUMES_PREFIX = "spark.kubernetes.executor.volumes." - val KUBERNETES_VOLUMES_HOSTPATH_KEY = "hostPath" - val KUBERNETES_VOLUMES_PVC_KEY = "persistentVolumeClaim" - val KUBERNETES_VOLUMES_EMPTYDIR_KEY = "emptyDir" - val KUBERNETES_VOLUMES_MOUNT_KEY = "mount" - val KUBERNETES_VOLUMES_PATH_KEY = "path" - val KUBERNETES_VOLUMES_CLAIM_NAME_KEY = "claimName" - val KUBERNETES_VOLUMES_READONLY_KEY = "readOnly" - val KUBERNETES_VOLUMES_OPTIONS_KEY = "options" - val KUBERNETES_VOLUMES_MEDIUM_KEY = "medium" - val KUBERNETES_VOLUMES_SIZE_LIMIT_KEY = "sizeLimit" + val KUBERNETES_VOLUMES_HOSTPATH_TYPE = "hostPath" + val KUBERNETES_VOLUMES_PVC_TYPE = "persistentVolumeClaim" + val KUBERNETES_VOLUMES_EMPTYDIR_TYPE = "emptyDir" + val KUBERNETES_VOLUMES_MOUNT_PATH_KEY = "mount.path" + val KUBERNETES_VOLUMES_MOUNT_READONLY_KEY = "mount.readOnly" + val KUBERNETES_VOLUMES_OPTIONS_PATH_KEY = "options.path" + val KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY = "options.claimName" + val KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY = "options.medium" + val KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY = "options.sizeLimit" val KUBERNETES_DRIVER_ENV_PREFIX = "spark.kubernetes.driverEnv." } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index 8832b65982a3..c46b1e5c7a92 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -56,7 +56,7 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf]( roleSecretNamesToMountPaths: Map[String, String], roleSecretEnvNamesToKeyRefs: Map[String, String], roleEnvs: Map[String, String], - roleVolumes: Iterable[KubernetesVolumeSpec]) { + roleVolumes: Iterable[KubernetesVolumeSpec[_ <: KubernetesVolumeSpecificConf]]) { def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE) @@ -136,7 +136,11 @@ private[spark] object KubernetesConf { val driverEnvs = KubernetesUtils.parsePrefixedKeyValuePairs( sparkConf, KUBERNETES_DRIVER_ENV_PREFIX) val driverVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix( - sparkConf, KUBERNETES_DRIVER_VOLUMES_PREFIX) + sparkConf, KUBERNETES_DRIVER_VOLUMES_PREFIX).map(_.get) + // Also parse executor volumes in order to verify configuration + // before the driver pod is created + KubernetesVolumeUtils.parseVolumesWithPrefix( + sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get) KubernetesConf( sparkConfWithMainAppJar, @@ -181,7 +185,7 @@ private[spark] object KubernetesConf { sparkConf, KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX) val executorEnv = sparkConf.getExecutorEnv.toMap val executorVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix( - sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX) + sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get) KubernetesConf( sparkConf.clone(), diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala index 6482a973ffd5..b3d8c827169f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala @@ -16,15 +16,17 @@ */ package org.apache.spark.deploy.k8s -import scala.collection.immutable.Map +private[spark] sealed trait KubernetesVolumeSpecificConf +private[spark] case class KubernetesHostPathVolumeConf( + hostPath: String) extends KubernetesVolumeSpecificConf +private[spark] case class KubernetesPVCVolumeConf( + claimName: String) extends KubernetesVolumeSpecificConf +private[spark] case class KubernetesEmptyDirVolumeConf( + medium: String, + sizeLimit: String) extends KubernetesVolumeSpecificConf -private[spark] case class KubernetesVolumeSpec( +private[spark] case class KubernetesVolumeSpec[T <: KubernetesVolumeSpecificConf]( volumeName: String, - volumeType: String, // ADT mountPath: String, mountReadOnly: Boolean, - optionsSpec: Map[String, String]) - -private[spark] object KubernetesVolumeSpec { - def emptySpec(): KubernetesVolumeSpec = new KubernetesVolumeSpec("", "", "", false, Map()) -} + volumeConf: T) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala index cc65cb8d8b51..c1dd2a6a9415 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala @@ -16,11 +16,14 @@ */ package org.apache.spark.deploy.k8s +import java.util.NoSuchElementException + +import scala.util.{Failure, Success, Try} + import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s.Config._ private[spark] object KubernetesVolumeUtils { - /** * Extract Spark volume configuration properties with a given name prefix. * @@ -30,42 +33,82 @@ private[spark] object KubernetesVolumeUtils { */ def parseVolumesWithPrefix( sparkConf: SparkConf, - prefix: String): Iterable[KubernetesVolumeSpec] = { - val properties = sparkConf.getAllWithPrefix(prefix) - - val propsByTypeName: Map[(String, String), Array[(String, String)]] = - properties.flatMap { case (k, v) => - k.split('.').toList match { - case tpe :: name :: rest => Some(((tpe, name), (rest.mkString("."), v))) - case _ => None - } - }.groupBy(_._1).mapValues(_.map(_._2)) + prefix: String): Iterable[Try[KubernetesVolumeSpec[_ <: KubernetesVolumeSpecificConf]]] = { + val properties = sparkConf.getAllWithPrefix(prefix).toMap - propsByTypeName.map { case ((tpe, name), props) => - val propMap = props.toMap - val mountProps = getAllWithPrefix(propMap, s"$KUBERNETES_VOLUMES_MOUNT_KEY.") - val options = getAllWithPrefix(propMap, s"$KUBERNETES_VOLUMES_OPTIONS_KEY.") + getVolumeTypesAndNames(properties).map { case (volumeType, volumeName) => + val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_PATH_KEY" + val readOnlyKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_READONLY_KEY" - KubernetesVolumeSpec( - volumeName = name, - volumeType = tpe, - mountPath = mountProps(KUBERNETES_VOLUMES_PATH_KEY), - mountReadOnly = mountProps(KUBERNETES_VOLUMES_READONLY_KEY).toBoolean, - optionsSpec = options + for { + path <- properties.getTry(pathKey) + readOnly <- properties.getTry(readOnlyKey) + volumeConf <- parseVolumeSpecificConf(properties, volumeType, volumeName) + } yield KubernetesVolumeSpec( + volumeName = volumeName, + mountPath = path, + mountReadOnly = readOnly.toBoolean, + volumeConf = volumeConf ) } } /** - * Extract subset of elements with keys matching on prefix, which is then excluded - * @param props properties to extract data from - * @param prefix prefix to match against - * @return subset of original props + * Get unique pairs of volumeType and volumeName, + * assuming options are formatted in this way: + * `volumeType`.`volumeName`.`property` = `value` + * @param properties flat mapping of property names to values + * @return Set[(volumeType, volumeName)] */ - private def getAllWithPrefix(props: Map[String, String], prefix: String): Map[String, String] = { - props - .filterKeys(_.startsWith(prefix)) - .map { case (k, v) => k.substring(prefix.length) -> v } + private def getVolumeTypesAndNames( + properties: Map[String, String] + ): Set[(String, String)] = { + properties.keys.flatMap { k => + k.split('.').toList match { + case tpe :: name :: _ => Some((tpe, name)) + case _ => None + } + }.toSet + } + + private def parseVolumeSpecificConf( + options: Map[String, String], + volumeType: String, + volumeName: String): Try[KubernetesVolumeSpecificConf] = { + volumeType match { + case KUBERNETES_VOLUMES_HOSTPATH_TYPE => + val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_PATH_KEY" + for { + path <- options.getTry(pathKey) + } yield KubernetesHostPathVolumeConf(path) + + case KUBERNETES_VOLUMES_PVC_TYPE => + val claimNameKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY" + for { + claimName <- options.getTry(claimNameKey) + } yield KubernetesPVCVolumeConf(claimName) + + case KUBERNETES_VOLUMES_EMPTYDIR_TYPE => + val mediumKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY" + val sizeLimitKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY" + for { + medium <- options.getTry(mediumKey) + sizeLimit <- options.getTry(sizeLimitKey) + } yield KubernetesEmptyDirVolumeConf(medium, sizeLimit) + + case _ => + Failure(new RuntimeException(s"Kubernetes Volume type `$volumeType` is not supported")) + } } + /** + * Convenience wrapper to accumulate key lookup errors + */ + implicit private class MapOps[A, B](m: Map[A, B]) { + def getTry(key: A): Try[B] = { + m + .get(key) + .fold[Try[B]](Failure(new NoSuchElementException(key.toString)))(Success(_)) + } + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index df0296ac495b..07df2be3209e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -173,7 +173,7 @@ private[spark] class BasicExecutorFeatureStep( .endSpec() .build() - SparkPod(executorPod, executorContainer) + SparkPod(executorPod, containerWithLimitCores) } override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala index bb8e45338e59..2e56f39d8103 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala @@ -18,8 +18,7 @@ package org.apache.spark.deploy.k8s.features import io.fabric8.kubernetes.api.model._ -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, KubernetesVolumeSpec, SparkPod} -import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s._ private[spark] class MountVolumesFeatureStep( kubernetesConf: KubernetesConf[_ <: KubernetesRoleSpecificConf]) @@ -46,7 +45,8 @@ private[spark] class MountVolumesFeatureStep( override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty private def constructVolumes( - volumeSpecs: Iterable[KubernetesVolumeSpec]): Iterable[(VolumeMount, Volume)] = { + volumeSpecs: Iterable[KubernetesVolumeSpec[_ <: KubernetesVolumeSpecificConf]] + ): Iterable[(VolumeMount, Volume)] = { volumeSpecs.map { spec => val volumeMount = new VolumeMountBuilder() .withMountPath(spec.mountPath) @@ -54,28 +54,23 @@ private[spark] class MountVolumesFeatureStep( .withName(spec.volumeName) .build() - val volumeBuilder = spec.volumeType match { - case KUBERNETES_VOLUMES_HOSTPATH_KEY => - val hostPath = spec.optionsSpec(KUBERNETES_VOLUMES_PATH_KEY) + val volumeBuilder = spec.volumeConf match { + case KubernetesHostPathVolumeConf(hostPath) => new VolumeBuilder() .withHostPath(new HostPathVolumeSource(hostPath)) - case KUBERNETES_VOLUMES_PVC_KEY => - val claimName = spec.optionsSpec(KUBERNETES_VOLUMES_CLAIM_NAME_KEY) + case KubernetesPVCVolumeConf(claimName) => new VolumeBuilder() .withPersistentVolumeClaim( new PersistentVolumeClaimVolumeSource(claimName, spec.mountReadOnly)) - case KUBERNETES_VOLUMES_EMPTYDIR_KEY => - val medium = spec.optionsSpec(KUBERNETES_VOLUMES_MEDIUM_KEY) - val sizeLimit = spec.optionsSpec(KUBERNETES_VOLUMES_SIZE_LIMIT_KEY) + case KubernetesEmptyDirVolumeConf(medium, sizeLimit) => new VolumeBuilder() .withEmptyDir(new EmptyDirVolumeSource(medium, new Quantity(sizeLimit))) } val volume = volumeBuilder.withName(spec.volumeName).build() - (volumeMount, volume) } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala index debde915f30b..d4bf91b24bfe 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala @@ -19,18 +19,66 @@ package org.apache.spark.deploy.k8s import org.apache.spark.{SparkConf, SparkFunSuite} class KubernetesVolumeUtilsSuite extends SparkFunSuite { - test("Parses volume options correctly") { + test("Parses hostPath volumes correctly") { val sparkConf = new SparkConf(false) - sparkConf.set("test.volumeType.volumeName.mount.path", "/path") - sparkConf.set("test.volumeType.volumeName.mount.readOnly", "true") - sparkConf.set("test.volumeType.volumeName.options.option1", "value1") - sparkConf.set("test.volumeType.volumeName.options.option2", "value2") + sparkConf.set("test.hostPath.volumeName.mount.path", "/path") + sparkConf.set("test.hostPath.volumeName.mount.readOnly", "true") + sparkConf.set("test.hostPath.volumeName.options.path", "/hostPath") - val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head + val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head.get assert(volumeSpec.volumeName === "volumeName") - assert(volumeSpec.volumeType === "volumeType") assert(volumeSpec.mountPath === "/path") assert(volumeSpec.mountReadOnly === true) - assert(volumeSpec.optionsSpec === Map("option1" -> "value1", "option2" -> "value2")) + assert(volumeSpec.volumeConf.asInstanceOf[KubernetesHostPathVolumeConf] === + KubernetesHostPathVolumeConf("/hostPath")) + } + + test("Parses persistentVolumeClaim volumes correctly") { + val sparkConf = new SparkConf(false) + sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path") + sparkConf.set("test.persistentVolumeClaim.volumeName.mount.readOnly", "true") + sparkConf.set("test.persistentVolumeClaim.volumeName.options.claimName", "claimeName") + + val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head.get + assert(volumeSpec.volumeName === "volumeName") + assert(volumeSpec.mountPath === "/path") + assert(volumeSpec.mountReadOnly === true) + assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] === + KubernetesPVCVolumeConf("claimeName")) + } + + test("Parses emptyDir volumes correctly") { + val sparkConf = new SparkConf(false) + sparkConf.set("test.emptyDir.volumeName.mount.path", "/path") + sparkConf.set("test.emptyDir.volumeName.mount.readOnly", "true") + sparkConf.set("test.emptyDir.volumeName.options.medium", "medium") + sparkConf.set("test.emptyDir.volumeName.options.sizeLimit", "5G") + + val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head.get + assert(volumeSpec.volumeName === "volumeName") + assert(volumeSpec.mountPath === "/path") + assert(volumeSpec.mountReadOnly === true) + assert(volumeSpec.volumeConf.asInstanceOf[KubernetesEmptyDirVolumeConf] === + KubernetesEmptyDirVolumeConf("medium", "5G")) + } + + test("Gracefully fails on missing mount key") { + val sparkConf = new SparkConf(false) + sparkConf.set("test.emptyDir.volumeName.mnt.path", "/path") + + val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head + assert(volumeSpec.isFailure === true) + assert(volumeSpec.failed.get.getMessage === "emptyDir.volumeName.mount.path") + } + + test("Gracefully fails on missing option key") { + val sparkConf = new SparkConf(false) + sparkConf.set("test.hostPath.volumeName.mount.path", "/path") + sparkConf.set("test.hostPath.volumeName.mount.readOnly", "true") + sparkConf.set("test.hostPath.volumeName.options.pth", "/hostPath") + + val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head + assert(volumeSpec.isFailure === true) + assert(volumeSpec.failed.get.getMessage === "hostPath.volumeName.options.path") } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala index 8d848447af77..33c6e67dc606 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala @@ -17,8 +17,7 @@ package org.apache.spark.deploy.k8s.features import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesVolumeSpec, SparkPod} -import org.apache.spark.deploy.k8s.Config.{KUBERNETES_VOLUMES_SIZE_LIMIT_KEY, _} +import org.apache.spark.deploy.k8s._ class MountVolumesFeatureStepSuite extends SparkFunSuite { private val sparkConf = new SparkConf(false) @@ -41,10 +40,9 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { test("Mounts hostPath volumes") { val volumeConf = KubernetesVolumeSpec( "testVolume", - KUBERNETES_VOLUMES_HOSTPATH_KEY, "/tmp", false, - Map(KUBERNETES_VOLUMES_PATH_KEY -> "/hostPath/tmp") + KubernetesHostPathVolumeConf("/hostPath/tmp") ) val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: Nil) val step = new MountVolumesFeatureStep(kubernetesConf) @@ -61,10 +59,9 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { test("Mounts pesistentVolumeClaims") { val volumeConf = KubernetesVolumeSpec( "testVolume", - KUBERNETES_VOLUMES_PVC_KEY, "/tmp", true, - Map(KUBERNETES_VOLUMES_CLAIM_NAME_KEY -> "pvcClaim") + KubernetesPVCVolumeConf("pvcClaim") ) val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: Nil) val step = new MountVolumesFeatureStep(kubernetesConf) @@ -83,12 +80,9 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { test("Mounts emptyDir") { val volumeConf = KubernetesVolumeSpec( "testVolume", - KUBERNETES_VOLUMES_EMPTYDIR_KEY, "/tmp", false, - Map( - KUBERNETES_VOLUMES_MEDIUM_KEY -> "Memory", - KUBERNETES_VOLUMES_SIZE_LIMIT_KEY -> "6G") + KubernetesEmptyDirVolumeConf("Memory", "6G") ) val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: Nil) val step = new MountVolumesFeatureStep(kubernetesConf) @@ -107,17 +101,15 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { test("Mounts multiple volumes") { val hpVolumeConf = KubernetesVolumeSpec( "hpVolume", - KUBERNETES_VOLUMES_HOSTPATH_KEY, "/tmp", false, - Map(KUBERNETES_VOLUMES_PATH_KEY -> "/hostPath/tmp") + KubernetesHostPathVolumeConf("/hostPath/tmp") ) val pvcVolumeConf = KubernetesVolumeSpec( "checkpointVolume", - KUBERNETES_VOLUMES_PVC_KEY, "/checkpoints", true, - Map(KUBERNETES_VOLUMES_CLAIM_NAME_KEY -> "pvcClaim") + KubernetesPVCVolumeConf("pvcClaim") ) val volumesConf = hpVolumeConf :: pvcVolumeConf :: Nil val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumesConf) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala index 8c00db58b647..1fcfdc12f5ed 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala @@ -17,8 +17,8 @@ package org.apache.spark.deploy.k8s.submit import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesVolumeSpec} -import org.apache.spark.deploy.k8s.Config.KUBERNETES_VOLUMES_HOSTPATH_KEY +import org.apache.spark.deploy.k8s._ +import org.apache.spark.deploy.k8s.Config.KUBERNETES_VOLUMES_HOSTPATH_TYPE import org.apache.spark.deploy.k8s.features._ class KubernetesDriverBuilderSuite extends SparkFunSuite { @@ -116,10 +116,9 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { test("Apply volumes step if mounts are present.") { val volumeSpec = KubernetesVolumeSpec( "volume", - KUBERNETES_VOLUMES_HOSTPATH_KEY, "/tmp", false, - Map.empty) + KubernetesHostPathVolumeConf("/path")) val conf = KubernetesConf( new SparkConf(false), KubernetesDriverSpecificConf( diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala index b0f4dc7b8ad6..25473ef1e29a 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala @@ -19,8 +19,7 @@ package org.apache.spark.scheduler.cluster.k8s import io.fabric8.kubernetes.api.model.PodBuilder import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, KubernetesVolumeSpec, SparkPod} -import org.apache.spark.deploy.k8s.Config.KUBERNETES_VOLUMES_HOSTPATH_KEY +import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.features._ class KubernetesExecutorBuilderSuite extends SparkFunSuite { @@ -89,10 +88,9 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { test("Apply volumes step if mounts are present.") { val volumeSpec = KubernetesVolumeSpec( "volume", - KUBERNETES_VOLUMES_HOSTPATH_KEY, "/tmp", false, - Map.empty) + KubernetesHostPathVolumeConf("/checkpoint")) val conf = KubernetesConf( new SparkConf(false), KubernetesExecutorSpecificConf( From f714b8e0343451c615740880a2f29783f64b4b8a Mon Sep 17 00:00:00 2001 From: Andrew Korzhuev Date: Wed, 13 Jun 2018 16:34:38 +0200 Subject: [PATCH 11/12] Make mount.readOnly and emptyDir params optional --- docs/running-on-kubernetes.md | 2 +- .../deploy/k8s/KubernetesVolumeSpec.scala | 11 +++++---- .../deploy/k8s/KubernetesVolumeUtils.scala | 8 ++----- .../features/MountVolumesFeatureStep.scala | 4 +++- .../k8s/KubernetesVolumeUtilsSuite.scala | 24 ++++++++++++++++++- .../MountVolumesFeatureStepSuite.scala | 23 +++++++++++++++++- 6 files changed, 58 insertions(+), 14 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index b323f77d7d10..7149616e534a 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -664,7 +664,7 @@ specific to Spark on Kubernetes. spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.readOnly - (none) + false Specify if the mounted volume is read only or not. For example, spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.mount.readOnly=false. diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala index b3d8c827169f..a53ad562600a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala @@ -17,13 +17,16 @@ package org.apache.spark.deploy.k8s private[spark] sealed trait KubernetesVolumeSpecificConf + private[spark] case class KubernetesHostPathVolumeConf( - hostPath: String) extends KubernetesVolumeSpecificConf + hostPath: String) extends KubernetesVolumeSpecificConf + private[spark] case class KubernetesPVCVolumeConf( - claimName: String) extends KubernetesVolumeSpecificConf + claimName: String) extends KubernetesVolumeSpecificConf + private[spark] case class KubernetesEmptyDirVolumeConf( - medium: String, - sizeLimit: String) extends KubernetesVolumeSpecificConf + medium: Option[String], + sizeLimit: Option[String]) extends KubernetesVolumeSpecificConf private[spark] case class KubernetesVolumeSpec[T <: KubernetesVolumeSpecificConf]( volumeName: String, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala index c1dd2a6a9415..713df5fffc3a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtils.scala @@ -42,12 +42,11 @@ private[spark] object KubernetesVolumeUtils { for { path <- properties.getTry(pathKey) - readOnly <- properties.getTry(readOnlyKey) volumeConf <- parseVolumeSpecificConf(properties, volumeType, volumeName) } yield KubernetesVolumeSpec( volumeName = volumeName, mountPath = path, - mountReadOnly = readOnly.toBoolean, + mountReadOnly = properties.get(readOnlyKey).exists(_.toBoolean), volumeConf = volumeConf ) } @@ -91,10 +90,7 @@ private[spark] object KubernetesVolumeUtils { case KUBERNETES_VOLUMES_EMPTYDIR_TYPE => val mediumKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY" val sizeLimitKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY" - for { - medium <- options.getTry(mediumKey) - sizeLimit <- options.getTry(sizeLimitKey) - } yield KubernetesEmptyDirVolumeConf(medium, sizeLimit) + Success(KubernetesEmptyDirVolumeConf(options.get(mediumKey), options.get(sizeLimitKey))) case _ => Failure(new RuntimeException(s"Kubernetes Volume type `$volumeType` is not supported")) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala index 2e56f39d8103..bf736205e8f2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala @@ -66,7 +66,9 @@ private[spark] class MountVolumesFeatureStep( case KubernetesEmptyDirVolumeConf(medium, sizeLimit) => new VolumeBuilder() - .withEmptyDir(new EmptyDirVolumeSource(medium, new Quantity(sizeLimit))) + .withEmptyDir( + new EmptyDirVolumeSource(medium.getOrElse(""), + new Quantity(sizeLimit.orNull))) } val volume = volumeBuilder.withName(spec.volumeName).build() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala index d4bf91b24bfe..d795d159773a 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesVolumeUtilsSuite.scala @@ -59,7 +59,29 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite { assert(volumeSpec.mountPath === "/path") assert(volumeSpec.mountReadOnly === true) assert(volumeSpec.volumeConf.asInstanceOf[KubernetesEmptyDirVolumeConf] === - KubernetesEmptyDirVolumeConf("medium", "5G")) + KubernetesEmptyDirVolumeConf(Some("medium"), Some("5G"))) + } + + test("Parses emptyDir volume options can be optional") { + val sparkConf = new SparkConf(false) + sparkConf.set("test.emptyDir.volumeName.mount.path", "/path") + sparkConf.set("test.emptyDir.volumeName.mount.readOnly", "true") + + val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head.get + assert(volumeSpec.volumeName === "volumeName") + assert(volumeSpec.mountPath === "/path") + assert(volumeSpec.mountReadOnly === true) + assert(volumeSpec.volumeConf.asInstanceOf[KubernetesEmptyDirVolumeConf] === + KubernetesEmptyDirVolumeConf(None, None)) + } + + test("Defaults optional readOnly to false") { + val sparkConf = new SparkConf(false) + sparkConf.set("test.hostPath.volumeName.mount.path", "/path") + sparkConf.set("test.hostPath.volumeName.options.path", "/hostPath") + + val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head.get + assert(volumeSpec.mountReadOnly === false) } test("Gracefully fails on missing mount key") { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala index 086ed1630b28..d309aa94ec11 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala @@ -83,7 +83,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { "testVolume", "/tmp", false, - KubernetesEmptyDirVolumeConf("Memory", "6G") + KubernetesEmptyDirVolumeConf(Some("Memory"), Some("6G")) ) val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: Nil) val step = new MountVolumesFeatureStep(kubernetesConf) @@ -99,6 +99,27 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { assert(configuredPod.container.getVolumeMounts.get(0).getReadOnly === false) } + test("Mounts emptyDir with no options") { + val volumeConf = KubernetesVolumeSpec( + "testVolume", + "/tmp", + false, + KubernetesEmptyDirVolumeConf(None, None) + ) + val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: Nil) + val step = new MountVolumesFeatureStep(kubernetesConf) + val configuredPod = step.configurePod(SparkPod.initialPod()) + + assert(configuredPod.pod.getSpec.getVolumes.size() === 1) + val emptyDir = configuredPod.pod.getSpec.getVolumes.get(0).getEmptyDir + assert(emptyDir.getMedium === "") + assert(emptyDir.getSizeLimit.getAmount === null) + assert(configuredPod.container.getVolumeMounts.size() === 1) + assert(configuredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp") + assert(configuredPod.container.getVolumeMounts.get(0).getName === "testVolume") + assert(configuredPod.container.getVolumeMounts.get(0).getReadOnly === false) + } + test("Mounts multiple volumes") { val hpVolumeConf = KubernetesVolumeSpec( "hpVolume", From 7433244b0e9e0f3f269e8d8dc135862d78c1105d Mon Sep 17 00:00:00 2001 From: Andrew Korzhuev Date: Fri, 15 Jun 2018 11:22:37 +0200 Subject: [PATCH 12/12] Fix indentation for class parameters --- .../apache/spark/deploy/k8s/KubernetesVolumeSpec.scala | 9 ++++++--- .../deploy/k8s/features/MountVolumesFeatureStep.scala | 2 +- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala index a53ad562600a..b1762d1efe2e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesVolumeSpec.scala @@ -19,14 +19,17 @@ package org.apache.spark.deploy.k8s private[spark] sealed trait KubernetesVolumeSpecificConf private[spark] case class KubernetesHostPathVolumeConf( - hostPath: String) extends KubernetesVolumeSpecificConf + hostPath: String) + extends KubernetesVolumeSpecificConf private[spark] case class KubernetesPVCVolumeConf( - claimName: String) extends KubernetesVolumeSpecificConf + claimName: String) + extends KubernetesVolumeSpecificConf private[spark] case class KubernetesEmptyDirVolumeConf( medium: Option[String], - sizeLimit: Option[String]) extends KubernetesVolumeSpecificConf + sizeLimit: Option[String]) + extends KubernetesVolumeSpecificConf private[spark] case class KubernetesVolumeSpec[T <: KubernetesVolumeSpecificConf]( volumeName: String, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala index bf736205e8f2..bb0e2b3128ef 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala @@ -21,7 +21,7 @@ import io.fabric8.kubernetes.api.model._ import org.apache.spark.deploy.k8s._ private[spark] class MountVolumesFeatureStep( - kubernetesConf: KubernetesConf[_ <: KubernetesRoleSpecificConf]) + kubernetesConf: KubernetesConf[_ <: KubernetesRoleSpecificConf]) extends KubernetesFeatureConfigStep { override def configurePod(pod: SparkPod): SparkPod = {