Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Rework volume mounting to gracefully handle misconfiguration
  • Loading branch information
Andrew Korzhuev committed Jun 11, 2018
commit d960e34e6309f1ef98d5437acdf9af097f5cf4f9
4 changes: 2 additions & 2 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ specific to Spark on Kubernetes.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.options.[OptionName]</code></td>
<td><code>spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].options.[OptionName]</code></td>
<td>(none)</td>
<td>
Configure <a href="https://kubernetes.io/docs/concepts/storage/volumes/">Kubernetes Volume</a> options passed to the Kubernetes with <code>OptionName</code> as key having specified value, must conform with Kubernetes option format. For example,
Expand All @@ -672,7 +672,7 @@ specific to Spark on Kubernetes.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.options.[OptionName]</code></td>
<td><code>spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].options.[OptionName]</code></td>
<td>(none)</td>
<td>
Configure <a href="https://kubernetes.io/docs/concepts/storage/volumes/">Kubernetes Volume</a> options passed to the Kubernetes with <code>OptionName</code> as key having specified value. For example,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,16 +171,15 @@ private[spark] object Config extends Logging {
val KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX = "spark.kubernetes.executor.secretKeyRef."
val KUBERNETES_EXECUTOR_VOLUMES_PREFIX = "spark.kubernetes.executor.volumes."

val KUBERNETES_VOLUMES_HOSTPATH_KEY = "hostPath"
val KUBERNETES_VOLUMES_PVC_KEY = "persistentVolumeClaim"
val KUBERNETES_VOLUMES_EMPTYDIR_KEY = "emptyDir"
val KUBERNETES_VOLUMES_MOUNT_KEY = "mount"
val KUBERNETES_VOLUMES_PATH_KEY = "path"
val KUBERNETES_VOLUMES_CLAIM_NAME_KEY = "claimName"
val KUBERNETES_VOLUMES_READONLY_KEY = "readOnly"
val KUBERNETES_VOLUMES_OPTIONS_KEY = "options"
val KUBERNETES_VOLUMES_MEDIUM_KEY = "medium"
val KUBERNETES_VOLUMES_SIZE_LIMIT_KEY = "sizeLimit"
val KUBERNETES_VOLUMES_HOSTPATH_TYPE = "hostPath"
val KUBERNETES_VOLUMES_PVC_TYPE = "persistentVolumeClaim"
val KUBERNETES_VOLUMES_EMPTYDIR_TYPE = "emptyDir"
val KUBERNETES_VOLUMES_MOUNT_PATH_KEY = "mount.path"
val KUBERNETES_VOLUMES_MOUNT_READONLY_KEY = "mount.readOnly"
val KUBERNETES_VOLUMES_OPTIONS_PATH_KEY = "options.path"
val KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY = "options.claimName"
val KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY = "options.medium"
val KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY = "options.sizeLimit"

val KUBERNETES_DRIVER_ENV_PREFIX = "spark.kubernetes.driverEnv."
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
roleSecretNamesToMountPaths: Map[String, String],
roleSecretEnvNamesToKeyRefs: Map[String, String],
roleEnvs: Map[String, String],
roleVolumes: Iterable[KubernetesVolumeSpec]) {
roleVolumes: Iterable[KubernetesVolumeSpec[_ <: KubernetesVolumeSpecificConf]]) {

def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)

Expand Down Expand Up @@ -136,7 +136,11 @@ private[spark] object KubernetesConf {
val driverEnvs = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_DRIVER_ENV_PREFIX)
val driverVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix(
sparkConf, KUBERNETES_DRIVER_VOLUMES_PREFIX)
sparkConf, KUBERNETES_DRIVER_VOLUMES_PREFIX).map(_.get)
// Also parse executor volumes in order to verify configuration
// before the driver pod is created
KubernetesVolumeUtils.parseVolumesWithPrefix(
sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get)

KubernetesConf(
sparkConfWithMainAppJar,
Expand Down Expand Up @@ -181,7 +185,7 @@ private[spark] object KubernetesConf {
sparkConf, KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX)
val executorEnv = sparkConf.getExecutorEnv.toMap
val executorVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix(
sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX)
sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get)

KubernetesConf(
sparkConf.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@
*/
package org.apache.spark.deploy.k8s

import scala.collection.immutable.Map
private[spark] sealed trait KubernetesVolumeSpecificConf
private[spark] case class KubernetesHostPathVolumeConf(
hostPath: String) extends KubernetesVolumeSpecificConf
private[spark] case class KubernetesPVCVolumeConf(
claimName: String) extends KubernetesVolumeSpecificConf
private[spark] case class KubernetesEmptyDirVolumeConf(
medium: String,
sizeLimit: String) extends KubernetesVolumeSpecificConf

private[spark] case class KubernetesVolumeSpec(
private[spark] case class KubernetesVolumeSpec[T <: KubernetesVolumeSpecificConf](
volumeName: String,
volumeType: String, // ADT
mountPath: String,
mountReadOnly: Boolean,
optionsSpec: Map[String, String])

private[spark] object KubernetesVolumeSpec {
def emptySpec(): KubernetesVolumeSpec = new KubernetesVolumeSpec("", "", "", false, Map())
}
volumeConf: T)
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
*/
package org.apache.spark.deploy.k8s

import java.util.NoSuchElementException

import scala.util.{Failure, Success, Try}

import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._

private[spark] object KubernetesVolumeUtils {

/**
* Extract Spark volume configuration properties with a given name prefix.
*
Expand All @@ -30,42 +33,82 @@ private[spark] object KubernetesVolumeUtils {
*/
def parseVolumesWithPrefix(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests are missing

sparkConf: SparkConf,
prefix: String): Iterable[KubernetesVolumeSpec] = {
val properties = sparkConf.getAllWithPrefix(prefix)

val propsByTypeName: Map[(String, String), Array[(String, String)]] =
properties.flatMap { case (k, v) =>
k.split('.').toList match {
case tpe :: name :: rest => Some(((tpe, name), (rest.mkString("."), v)))
case _ => None
}
}.groupBy(_._1).mapValues(_.map(_._2))
prefix: String): Iterable[Try[KubernetesVolumeSpec[_ <: KubernetesVolumeSpecificConf]]] = {
val properties = sparkConf.getAllWithPrefix(prefix).toMap

propsByTypeName.map { case ((tpe, name), props) =>
val propMap = props.toMap
val mountProps = getAllWithPrefix(propMap, s"$KUBERNETES_VOLUMES_MOUNT_KEY.")
val options = getAllWithPrefix(propMap, s"$KUBERNETES_VOLUMES_OPTIONS_KEY.")
getVolumeTypesAndNames(properties).map { case (volumeType, volumeName) =>
val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_PATH_KEY"
val readOnlyKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_MOUNT_READONLY_KEY"

KubernetesVolumeSpec(
volumeName = name,
volumeType = tpe,
mountPath = mountProps(KUBERNETES_VOLUMES_PATH_KEY),
mountReadOnly = mountProps(KUBERNETES_VOLUMES_READONLY_KEY).toBoolean,
optionsSpec = options
for {
path <- properties.getTry(pathKey)
readOnly <- properties.getTry(readOnlyKey)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReadOnly is optional and defaults to false so users are not required to set it explicitly. Is this semantics captured by properties.getTry?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made it optional with default false value. We basically ended up with two ways to parse spark options now, one is custom for volumes as there is just so many combinations and one which the rest of spark is using. It must be possible to refactor this somehow, maybe improving spark options parser itself.

volumeConf <- parseVolumeSpecificConf(properties, volumeType, volumeName)
} yield KubernetesVolumeSpec(
volumeName = volumeName,
mountPath = path,
mountReadOnly = readOnly.toBoolean,
volumeConf = volumeConf
)
}
}

/**
* Extract subset of elements with keys matching on prefix, which is then excluded
* @param props properties to extract data from
* @param prefix prefix to match against
* @return subset of original props
* Get unique pairs of volumeType and volumeName,
* assuming options are formatted in this way:
* `volumeType`.`volumeName`.`property` = `value`
* @param properties flat mapping of property names to values
* @return Set[(volumeType, volumeName)]
*/
private def getAllWithPrefix(props: Map[String, String], prefix: String): Map[String, String] = {
props
.filterKeys(_.startsWith(prefix))
.map { case (k, v) => k.substring(prefix.length) -> v }
private def getVolumeTypesAndNames(
properties: Map[String, String]
): Set[(String, String)] = {
properties.keys.flatMap { k =>
k.split('.').toList match {
case tpe :: name :: _ => Some((tpe, name))
case _ => None
}
}.toSet
}

private def parseVolumeSpecificConf(
options: Map[String, String],
volumeType: String,
volumeName: String): Try[KubernetesVolumeSpecificConf] = {
volumeType match {
case KUBERNETES_VOLUMES_HOSTPATH_TYPE =>
val pathKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_PATH_KEY"
for {
path <- options.getTry(pathKey)
} yield KubernetesHostPathVolumeConf(path)

case KUBERNETES_VOLUMES_PVC_TYPE =>
val claimNameKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_CLAIM_NAME_KEY"
for {
claimName <- options.getTry(claimNameKey)
} yield KubernetesPVCVolumeConf(claimName)

case KUBERNETES_VOLUMES_EMPTYDIR_TYPE =>
val mediumKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY"
val sizeLimitKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY"
for {
medium <- options.getTry(mediumKey)
sizeLimit <- options.getTry(sizeLimitKey)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both Medium and SizeLimit are optional for EmptyDirVolumeSource so users are not required to set them. What happens with options.getTry if users do not set the key?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.
https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.10/#emptydirvolumesource-v1-core I don't quite like that you have to pass "" and null to k8s, but it works.

} yield KubernetesEmptyDirVolumeConf(medium, sizeLimit)

case _ =>
Failure(new RuntimeException(s"Kubernetes Volume type `$volumeType` is not supported"))
}
}

/**
* Convenience wrapper to accumulate key lookup errors
*/
implicit private class MapOps[A, B](m: Map[A, B]) {
def getTry(key: A): Try[B] = {
m
.get(key)
.fold[Try[B]](Failure(new NoSuchElementException(key.toString)))(Success(_))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ private[spark] class BasicExecutorFeatureStep(
.endSpec()
.build()

SparkPod(executorPod, executorContainer)
SparkPod(executorPod, containerWithLimitCores)
}

override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ package org.apache.spark.deploy.k8s.features

import io.fabric8.kubernetes.api.model._

import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, KubernetesVolumeSpec, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s._

private[spark] class MountVolumesFeatureStep(
kubernetesConf: KubernetesConf[_ <: KubernetesRoleSpecificConf])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class parameter line should be indented 4 spaces.

Expand All @@ -46,36 +45,32 @@ private[spark] class MountVolumesFeatureStep(
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty

private def constructVolumes(
volumeSpecs: Iterable[KubernetesVolumeSpec]): Iterable[(VolumeMount, Volume)] = {
volumeSpecs: Iterable[KubernetesVolumeSpec[_ <: KubernetesVolumeSpecificConf]]
): Iterable[(VolumeMount, Volume)] = {
volumeSpecs.map { spec =>
val volumeMount = new VolumeMountBuilder()
.withMountPath(spec.mountPath)
.withReadOnly(spec.mountReadOnly)
.withName(spec.volumeName)
.build()

val volumeBuilder = spec.volumeType match {
case KUBERNETES_VOLUMES_HOSTPATH_KEY =>
val hostPath = spec.optionsSpec(KUBERNETES_VOLUMES_PATH_KEY)
val volumeBuilder = spec.volumeConf match {
case KubernetesHostPathVolumeConf(hostPath) =>
new VolumeBuilder()
.withHostPath(new HostPathVolumeSource(hostPath))

case KUBERNETES_VOLUMES_PVC_KEY =>
val claimName = spec.optionsSpec(KUBERNETES_VOLUMES_CLAIM_NAME_KEY)
case KubernetesPVCVolumeConf(claimName) =>
new VolumeBuilder()
.withPersistentVolumeClaim(
new PersistentVolumeClaimVolumeSource(claimName, spec.mountReadOnly))

case KUBERNETES_VOLUMES_EMPTYDIR_KEY =>
val medium = spec.optionsSpec(KUBERNETES_VOLUMES_MEDIUM_KEY)
val sizeLimit = spec.optionsSpec(KUBERNETES_VOLUMES_SIZE_LIMIT_KEY)
case KubernetesEmptyDirVolumeConf(medium, sizeLimit) =>
new VolumeBuilder()
.withEmptyDir(new EmptyDirVolumeSource(medium, new Quantity(sizeLimit)))
}

val volume = volumeBuilder.withName(spec.volumeName).build()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary extra empty line.


(volumeMount, volume)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,66 @@ package org.apache.spark.deploy.k8s
import org.apache.spark.{SparkConf, SparkFunSuite}

class KubernetesVolumeUtilsSuite extends SparkFunSuite {
test("Parses volume options correctly") {
test("Parses hostPath volumes correctly") {
val sparkConf = new SparkConf(false)
sparkConf.set("test.volumeType.volumeName.mount.path", "/path")
sparkConf.set("test.volumeType.volumeName.mount.readOnly", "true")
sparkConf.set("test.volumeType.volumeName.options.option1", "value1")
sparkConf.set("test.volumeType.volumeName.options.option2", "value2")
sparkConf.set("test.hostPath.volumeName.mount.path", "/path")
sparkConf.set("test.hostPath.volumeName.mount.readOnly", "true")
sparkConf.set("test.hostPath.volumeName.options.path", "/hostPath")

val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head
val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head.get
assert(volumeSpec.volumeName === "volumeName")
assert(volumeSpec.volumeType === "volumeType")
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountReadOnly === true)
assert(volumeSpec.optionsSpec === Map("option1" -> "value1", "option2" -> "value2"))
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesHostPathVolumeConf] ===
KubernetesHostPathVolumeConf("/hostPath"))
}

test("Parses persistentVolumeClaim volumes correctly") {
val sparkConf = new SparkConf(false)
sparkConf.set("test.persistentVolumeClaim.volumeName.mount.path", "/path")
sparkConf.set("test.persistentVolumeClaim.volumeName.mount.readOnly", "true")
sparkConf.set("test.persistentVolumeClaim.volumeName.options.claimName", "claimeName")

val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head.get
assert(volumeSpec.volumeName === "volumeName")
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountReadOnly === true)
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesPVCVolumeConf] ===
KubernetesPVCVolumeConf("claimeName"))
}

test("Parses emptyDir volumes correctly") {
val sparkConf = new SparkConf(false)
sparkConf.set("test.emptyDir.volumeName.mount.path", "/path")
sparkConf.set("test.emptyDir.volumeName.mount.readOnly", "true")
sparkConf.set("test.emptyDir.volumeName.options.medium", "medium")
sparkConf.set("test.emptyDir.volumeName.options.sizeLimit", "5G")

val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head.get
assert(volumeSpec.volumeName === "volumeName")
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountReadOnly === true)
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesEmptyDirVolumeConf] ===
KubernetesEmptyDirVolumeConf("medium", "5G"))
}

test("Gracefully fails on missing mount key") {
val sparkConf = new SparkConf(false)
sparkConf.set("test.emptyDir.volumeName.mnt.path", "/path")

val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head
assert(volumeSpec.isFailure === true)
assert(volumeSpec.failed.get.getMessage === "emptyDir.volumeName.mount.path")
}

test("Gracefully fails on missing option key") {
val sparkConf = new SparkConf(false)
sparkConf.set("test.hostPath.volumeName.mount.path", "/path")
sparkConf.set("test.hostPath.volumeName.mount.readOnly", "true")
sparkConf.set("test.hostPath.volumeName.options.pth", "/hostPath")

val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head
assert(volumeSpec.isFailure === true)
assert(volumeSpec.failed.get.getMessage === "hostPath.volumeName.options.path")
}
}
Loading