Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
K8s local file mounting
Palantir's mechanism to mount local files into pods using
secret-populated volumes. Used internally to provide pods with config
files.

Co-authored-by: mccheah <mcheah@palantir.com>
Co-authored-by: Josh Casale <jcasale@palantir.com>
Co-authored-by: Willi Raschkowski <wraschkowski@palantir.com>
  • Loading branch information
3 people committed Mar 2, 2021
commit a40860c0717f4efe1dd7662ad3008a12040a72c0
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,20 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional

val KUBERNETES_SECRET_FILE_MOUNT_ENABLED =
ConfigBuilder("spark.kubernetes.file.secretMount.enabled")
.doc("Mount spark-submit files as base64-encoded k8s secret instead of uploading to " +
s"${KUBERNETES_FILE_UPLOAD_PATH.key}")
.booleanConf
.createWithDefault(false)

val KUBERNETES_SECRET_FILE_MOUNT_PATH =
ConfigBuilder("spark.kubernetes.file.secretMount.path")
.doc("When mounting files as secret, they're made available on drivers at this path.")
.internal()
.stringConf
.createWithDefault("/var/data/spark-submitted-files")
Comment on lines +412 to +424
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to re-do local file mounting and the important resulting difference here is that where we use secret-based file mounting internally (i.e. everywhere) we have to set:

spark.kubernetes.file.secretMount.enabled: true

We can argue about the default. My preference was to stay in-line with upstream on the default behaviour.

The reason why I had to touch is this is that Spark 3 introduced its own behaviour to mount local files into drivers by uploading them to an HCFS (apache#23546). You activate that just by adding a local file to "spark.files" and it breaks for us because we don't have a HCFS and use secret-based mounting instead. So the behaviour now is: If you enable secret-based volume mounting, we disable HCFS-based mounting.


val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation."
val KUBERNETES_DRIVER_SERVICE_ANNOTATION_PREFIX = "spark.kubernetes.driver.service.annotation."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ private[spark] object Constants {
val SPARK_CONF_PATH = s"$SPARK_CONF_DIR_INTERNAL/$SPARK_CONF_FILE_NAME"
val ENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION"

// Local file mounting constants, this should match the value used in 'entrypoint.sh'
val ENV_MOUNTED_FILES_FROM_SECRET_DIR = "SPARK_MOUNTED_FILES_FROM_SECRET_DIR"

// BINDINGS
val ENV_PYSPARK_MAJOR_PYTHON_VERSION = "PYSPARK_MAJOR_PYTHON_VERSION"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
"spark.app.id" -> conf.appId,
KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true",
MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString)

// (Palantir) If file-mounting with secrets is enabled, don't upload files here
if (conf.get(KUBERNETES_SECRET_FILE_MOUNT_ENABLED)) return additionalProps.toMap
Comment on lines +160 to +161
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's new as explained above. We want to be able to mount files using secrets and not have to upload to an HCFS as well.


// try upload local, resolvable files to a hadoop compatible file system
Seq(JARS, FILES).foreach { key =>
val value = conf.get(key).filter(uri => KubernetesUtils.isLocalAndResolvable(uri))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.k8s.features

import java.io.File
import java.net.URI
import java.nio.file.Paths

import scala.collection.JavaConverters._

import com.google.common.io.{BaseEncoding, Files}
import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, SecretBuilder}

import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverConf, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, PythonMainAppResource, RMainAppResource}
import org.apache.spark.internal.config.FILES
import org.apache.spark.util.Utils

/**
* Mount local files listed in `spark.files` into a volume on the driver.
*
* The volume is populated using a secret which in turn is populated with the base64-encoded
* file contents. The volume is only mounted into drivers, not executors. That's because drivers
* can make `spark.files` available to executors using [[org.apache.spark.SparkContext.addFile]].
*
* This is a Palantir addition that works well for the small files we tend to add in `spark.files`.
* Spark's out-of-the-box solution is in [[BasicDriverFeatureStep]] and serves local files by
* uploading them to an HCFS and serving them from there.
*/
private[spark] class MountLocalDriverFilesFeatureStep(conf: KubernetesDriverConf)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change here is that we no longer mount files into executors. It stopped working because the volumes in executors and drivers would refer to different secret names (previously they referred to the same). But KubernetesClientApplication only uploads secrets declared in the driver, not in the executor. We could ensure they reference the same secret, but it's actually unnecessary to mount into executors in the first place. Once you mount "spark.files" into the driver, the driver makes the files available to executors via SparkContext#addFile.

extends KubernetesFeatureConfigStep {

private val enabled = conf.get(KUBERNETES_SECRET_FILE_MOUNT_ENABLED)

private val mountPath = conf.get(KUBERNETES_SECRET_FILE_MOUNT_PATH)

private val secretName = s"${conf.resourceNamePrefix}-mounted-files"

def allFiles: Seq[String] = {
Utils.stringToSeq(conf.sparkConf.get(FILES.key, "")) ++
(conf.mainAppResource match {
case JavaMainAppResource(_) => Nil
case PythonMainAppResource(res) => Seq(res)
case RMainAppResource(res) => Seq(res)
})
}

override def configurePod(pod: SparkPod): SparkPod = {
if (!enabled) return pod

val resolvedPod = new PodBuilder(pod.pod)
.editOrNewSpec()
.addNewVolume()
.withName("submitted-files")
.withNewSecret()
.withSecretName(secretName)
.endSecret()
.endVolume()
.endSpec()
.build()
val resolvedContainer = new ContainerBuilder(pod.container)
.addNewEnv()
.withName(ENV_MOUNTED_FILES_FROM_SECRET_DIR)
.withValue(mountPath)
.endEnv()
.addNewVolumeMount()
.withName("submitted-files")
.withMountPath(mountPath)
.endVolumeMount()
.build()
SparkPod(resolvedPod, resolvedContainer)
}

override def getAdditionalPodSystemProperties(): Map[String, String] = {
if (!enabled) return Map.empty

val resolvedFiles = allFiles
.map(file => {
val uri = Utils.resolveURI(file)
if (shouldMountFile(uri)) {
val fileName = Paths.get(uri.getPath).getFileName.toString
s"$mountPath/$fileName"
} else {
file
}
})
Map(FILES.key -> resolvedFiles.mkString(","))
}

override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
if (!enabled) return Nil

val localFiles = allFiles
.map(Utils.resolveURI)
.filter(shouldMountFile)
.map(_.getPath)
.map(new File(_))
val localFileBase64Contents = localFiles.map { file =>
val fileBase64 = BaseEncoding.base64().encode(Files.toByteArray(file))
(file.getName, fileBase64)
}.toMap
val localFilesSecret = new SecretBuilder()
.withNewMetadata()
.withName(secretName)
.endMetadata()
.withData(localFileBase64Contents.asJava)
.build()
Seq(localFilesSecret)
}

private def shouldMountFile(file: URI): Boolean = {
Option(file.getScheme) match {
case Some("file") => true
case None => true
case _ => false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ private[spark] class KubernetesDriverBuilder {
new HadoopConfDriverFeatureStep(conf),
new KerberosConfDriverFeatureStep(conf),
new PodTemplateConfigMapStep(conf),
new LocalDirsFeatureStep(conf))
new LocalDirsFeatureStep(conf),
new MountLocalDriverFilesFeatureStep(conf))

val spec = KubernetesDriverSpec(
initialPod,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,23 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
assert(configuredPythonPod.container.getImage === "spark-driver-py:latest")
}

test("(Palantir) Do not upload files if secret file mounting is enabled") {
val sparkConf = new SparkConf()
.set(CONTAINER_IMAGE, "spark-driver:latest")
.set(FILES, Seq("file://file.txt"))
.set(KUBERNETES_SECRET_FILE_MOUNT_ENABLED, true)
val kubernetesConf = KubernetesTestConf.createDriverConf(
sparkConf = sparkConf,
labels = DRIVER_LABELS,
environment = DRIVER_ENVS,
annotations = DRIVER_ANNOTATIONS)

val additionalSystemProps = new BasicDriverFeatureStep(kubernetesConf)
.getAdditionalPodSystemProperties()

assert(!additionalSystemProps.contains(FILES.key))
}

// Memory overhead tests. Tuples are:
// test name, main resource, overhead factor, expected factor
Seq(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.features

import java.io.File

import scala.collection.JavaConverters._

import com.google.common.base.Charsets
import com.google.common.io.{BaseEncoding, Files}
import io.fabric8.kubernetes.api.model.Secret
import org.scalatest.BeforeAndAfter

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesDriverConf, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
import org.apache.spark.util.Utils

class MountLocalDriverFilesFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {

private var kubernetesConf: KubernetesDriverConf = _
private var sparkFiles: Seq[String] = _
private var localFiles: Seq[File] = _
private var stepUnderTest: MountLocalDriverFilesFeatureStep = _

before {
val tempDir = Utils.createTempDir()
val firstLocalFile = new File(tempDir, "file1.txt")
Files.write("a", firstLocalFile, Charsets.UTF_8)
val secondLocalFile = new File(tempDir, "file2.txt")
Files.write("b", secondLocalFile, Charsets.UTF_8)
sparkFiles = Seq(
firstLocalFile.getAbsolutePath,
s"file://${secondLocalFile.getAbsolutePath}",
s"local://file3.txt",
"https://localhost:9000/file4.txt")
localFiles = Seq(firstLocalFile, secondLocalFile)
val sparkConf = new SparkConf(false)
.set("spark.files", sparkFiles.mkString(","))
.set(KUBERNETES_SECRET_FILE_MOUNT_ENABLED, true)
.set(KUBERNETES_SECRET_FILE_MOUNT_PATH, "/var/data/spark-submitted-files")
kubernetesConf = new KubernetesDriverConf(
sparkConf,
"test-app",
JavaMainAppResource(None),
"main",
Array.empty[String])
stepUnderTest = new MountLocalDriverFilesFeatureStep(kubernetesConf)
}

test("Attaches a secret volume and secret name") {
val configuredPod = stepUnderTest.configurePod(SparkPod.initialPod())
assert(configuredPod.pod.getSpec.getVolumes.size === 1)
val volume = configuredPod.pod.getSpec.getVolumes.get(0)
assert(volume.getName === "submitted-files")
assert(volume.getSecret.getSecretName === s"${kubernetesConf.resourceNamePrefix}-mounted-files")
assert(configuredPod.container.getVolumeMounts.size === 1)
val volumeMount = configuredPod.container.getVolumeMounts.get(0)
assert(volumeMount.getName === "submitted-files")
assert(volumeMount.getMountPath === "/var/data/spark-submitted-files")
assert(configuredPod.container.getEnv.size === 1)
val addedEnv = configuredPod.container.getEnv.get(0)
assert(addedEnv.getName === ENV_MOUNTED_FILES_FROM_SECRET_DIR)
assert(addedEnv.getValue === "/var/data/spark-submitted-files")
}

test("Maps submitted files in the system properties") {
val resolvedSystemProperties = stepUnderTest.getAdditionalPodSystemProperties()
val expectedSystemProperties = Map(
"spark.files" ->
Seq(
"/var/data/spark-submitted-files/file1.txt",
"/var/data/spark-submitted-files/file2.txt",
"local://file3.txt",
"https://localhost:9000/file4.txt"
).mkString(","))
assert(resolvedSystemProperties === expectedSystemProperties)
}

test("Additional Kubernetes resources includes the mounted files secret") {
val secrets = stepUnderTest.getAdditionalKubernetesResources()
assert(secrets.size === 1)
assert(secrets.head.isInstanceOf[Secret])
val secret = secrets.head.asInstanceOf[Secret]
assert(secret.getMetadata.getName === s"${kubernetesConf.resourceNamePrefix}-mounted-files")
val secretData = secret.getData.asScala
assert(secretData.size === 2)
assert(decodeToUtf8(secretData("file1.txt")) === "a")
assert(decodeToUtf8(secretData("file2.txt")) === "b")
}

private def decodeToUtf8(str: String): String = {
new String(BaseEncoding.base64().decode(str), Charsets.UTF_8)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ if ! [ -z ${HADOOP_CONF_DIR+x} ]; then
SPARK_CLASSPATH="$HADOOP_CONF_DIR:$SPARK_CLASSPATH";
fi

if [ -n "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR" ]; then
cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .
fi

case "$1" in
driver)
shift 1
Expand Down