Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Make mount.readOnly and emptyDir params optional
  • Loading branch information
Andrew Korzhuev committed Jun 13, 2018
commit f714b8e0343451c615740880a2f29783f64b4b8a
2 changes: 1 addition & 1 deletion docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ specific to Spark on Kubernetes.
</tr>
<tr>
<td><code>spark.kubernetes.executor.volumes.[VolumeType].[VolumeName].mount.readOnly</code></td>
<td>(none)</td>
<td>false</td>
<td>
Specify if the mounted volume is read only or not. For example,
<code>spark.kubernetes.executor.volumes.persistentVolumeClaim.checkpointpvc.mount.readOnly=false</code>.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@
package org.apache.spark.deploy.k8s

private[spark] sealed trait KubernetesVolumeSpecificConf

private[spark] case class KubernetesHostPathVolumeConf(
hostPath: String) extends KubernetesVolumeSpecificConf
hostPath: String) extends KubernetesVolumeSpecificConf

private[spark] case class KubernetesPVCVolumeConf(
claimName: String) extends KubernetesVolumeSpecificConf
claimName: String) extends KubernetesVolumeSpecificConf

private[spark] case class KubernetesEmptyDirVolumeConf(
medium: String,
sizeLimit: String) extends KubernetesVolumeSpecificConf
medium: Option[String],
sizeLimit: Option[String]) extends KubernetesVolumeSpecificConf

private[spark] case class KubernetesVolumeSpec[T <: KubernetesVolumeSpecificConf](
volumeName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,11 @@ private[spark] object KubernetesVolumeUtils {

for {
path <- properties.getTry(pathKey)
readOnly <- properties.getTry(readOnlyKey)
volumeConf <- parseVolumeSpecificConf(properties, volumeType, volumeName)
} yield KubernetesVolumeSpec(
volumeName = volumeName,
mountPath = path,
mountReadOnly = readOnly.toBoolean,
mountReadOnly = properties.get(readOnlyKey).exists(_.toBoolean),
volumeConf = volumeConf
)
}
Expand Down Expand Up @@ -91,10 +90,7 @@ private[spark] object KubernetesVolumeUtils {
case KUBERNETES_VOLUMES_EMPTYDIR_TYPE =>
val mediumKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_MEDIUM_KEY"
val sizeLimitKey = s"$volumeType.$volumeName.$KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY"
for {
medium <- options.getTry(mediumKey)
sizeLimit <- options.getTry(sizeLimitKey)
} yield KubernetesEmptyDirVolumeConf(medium, sizeLimit)
Success(KubernetesEmptyDirVolumeConf(options.get(mediumKey), options.get(sizeLimitKey)))

case _ =>
Failure(new RuntimeException(s"Kubernetes Volume type `$volumeType` is not supported"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ private[spark] class MountVolumesFeatureStep(

case KubernetesEmptyDirVolumeConf(medium, sizeLimit) =>
new VolumeBuilder()
.withEmptyDir(new EmptyDirVolumeSource(medium, new Quantity(sizeLimit)))
.withEmptyDir(
new EmptyDirVolumeSource(medium.getOrElse(""),
new Quantity(sizeLimit.orNull)))
}

val volume = volumeBuilder.withName(spec.volumeName).build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,29 @@ class KubernetesVolumeUtilsSuite extends SparkFunSuite {
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountReadOnly === true)
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesEmptyDirVolumeConf] ===
KubernetesEmptyDirVolumeConf("medium", "5G"))
KubernetesEmptyDirVolumeConf(Some("medium"), Some("5G")))
}

test("Parses emptyDir volume options can be optional") {
val sparkConf = new SparkConf(false)
sparkConf.set("test.emptyDir.volumeName.mount.path", "/path")
sparkConf.set("test.emptyDir.volumeName.mount.readOnly", "true")

val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head.get
assert(volumeSpec.volumeName === "volumeName")
assert(volumeSpec.mountPath === "/path")
assert(volumeSpec.mountReadOnly === true)
assert(volumeSpec.volumeConf.asInstanceOf[KubernetesEmptyDirVolumeConf] ===
KubernetesEmptyDirVolumeConf(None, None))
}

test("Defaults optional readOnly to false") {
val sparkConf = new SparkConf(false)
sparkConf.set("test.hostPath.volumeName.mount.path", "/path")
sparkConf.set("test.hostPath.volumeName.options.path", "/hostPath")

val volumeSpec = KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, "test.").head.get
assert(volumeSpec.mountReadOnly === false)
}

test("Gracefully fails on missing mount key") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
"testVolume",
"/tmp",
false,
KubernetesEmptyDirVolumeConf("Memory", "6G")
KubernetesEmptyDirVolumeConf(Some("Memory"), Some("6G"))
)
val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: Nil)
val step = new MountVolumesFeatureStep(kubernetesConf)
Expand All @@ -99,6 +99,27 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite {
assert(configuredPod.container.getVolumeMounts.get(0).getReadOnly === false)
}

test("Mounts emptyDir with no options") {
val volumeConf = KubernetesVolumeSpec(
"testVolume",
"/tmp",
false,
KubernetesEmptyDirVolumeConf(None, None)
)
val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: Nil)
val step = new MountVolumesFeatureStep(kubernetesConf)
val configuredPod = step.configurePod(SparkPod.initialPod())

assert(configuredPod.pod.getSpec.getVolumes.size() === 1)
val emptyDir = configuredPod.pod.getSpec.getVolumes.get(0).getEmptyDir
assert(emptyDir.getMedium === "")
assert(emptyDir.getSizeLimit.getAmount === null)
assert(configuredPod.container.getVolumeMounts.size() === 1)
assert(configuredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp")
assert(configuredPod.container.getVolumeMounts.get(0).getName === "testVolume")
assert(configuredPod.container.getVolumeMounts.get(0).getReadOnly === false)
}

test("Mounts multiple volumes") {
val hpVolumeConf = KubernetesVolumeSpec(
"hpVolume",
Expand Down