diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index dcfa70a85a97..3b6935560a57 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -127,16 +127,24 @@ Spark supports using TLS to encrypt the traffic in this bootstrapping process. I
whenever possible.
See the [security page](security.html) and [configuration](configuration.html) sections for more information on
-configuring TLS; use the prefix `spark.ssl.kubernetes.submission` in configuring the TLS-related fields in the context
+configuring TLS; use the prefix `spark.ssl.kubernetes.driversubmitserver` in configuring the TLS-related fields in the context
of submitting to Kubernetes. For example, to set the trustStore used when the local machine communicates with the driver
-pod in starting the application, set `spark.ssl.kubernetes.submission.trustStore`.
+pod in starting the application, set `spark.ssl.kubernetes.driversubmitserver.trustStore`.
One note about the keyStore is that it can be specified as either a file on the client machine or a file in the
-container image's disk. Thus `spark.ssl.kubernetes.submission.keyStore` can be a URI with a scheme of either `file:`
+container image's disk. Thus `spark.ssl.kubernetes.driversubmitserver.keyStore` can be a URI with a scheme of either `file:`
or `local:`. A scheme of `file:` corresponds to the keyStore being located on the client machine; it is mounted onto
the driver container as a [secret volume](https://kubernetes.io/docs/user-guide/secrets/). When the URI has the scheme
`local:`, the file is assumed to already be on the container's disk at the appropriate path.
+Finally, the submission server and client can be configured to use PEM files instead of Java keyStores. When using
+this mode, set `spark.ssl.kubernetes.driversubmitserver.keyPem` and
+`spark.ssl.kubernetes.driversubmitserver.serverCertPem` to configure the key and certificate files on the driver
+submission server. These files can be uploaded from the submitter's machine if they have no scheme or a scheme of
+`file:`, or they can be located on the container's disk if they have the scheme `local:`. The client's certificate
+file should be provided via setting `spark.ssl.kubernetes.driversubmitserver.clientCertPem`, and this file must be
+located on the submitting machine's local disk.
+
### Submission of Local Files through Ingress/External controller
Kubernetes pods run with their own IP address space. If Spark is run in cluster mode, the driver pod may not be
diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml
index 985ffd08f3fc..6d2f1d0fd276 100644
--- a/resource-managers/kubernetes/core/pom.xml
+++ b/resource-managers/kubernetes/core/pom.xml
@@ -87,6 +87,10 @@
com.google.guava
guava
+
+ org.bouncycastle
+ bcpkix-jdk15on
+
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala
index e6b2e3156865..7e700b569a3f 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala
@@ -161,7 +161,7 @@ private[spark] class Client(
driverServiceManager.handleSubmissionError(
new SparkException("Submission shutting down early...")))
try {
- val sslConfigurationProvider = new SslConfigurationProvider(
+ val sslConfigurationProvider = new DriverSubmitSslConfigurationProvider(
sparkConf, kubernetesAppId, kubernetesClient, kubernetesResourceCleaner)
val submitServerSecret = kubernetesClient.secrets().createNew()
.withNewMetadata()
@@ -182,7 +182,7 @@ private[spark] class Client(
configureOwnerReferences(
kubernetesClient,
submitServerSecret,
- sslConfiguration.sslSecrets,
+ sslConfiguration.sslSecret,
driverPod,
driverService)
submitApplicationToDriverServer(
@@ -209,7 +209,6 @@ private[spark] class Client(
Utils.tryLogNonFatalError {
driverServiceManager.stop()
}
-
// Remove the shutdown hooks that would be redundant
Utils.tryLogNonFatalError {
ShutdownHookManager.removeShutdownHook(resourceCleanShutdownHook)
@@ -236,7 +235,7 @@ private[spark] class Client(
private def submitApplicationToDriverServer(
kubernetesClient: KubernetesClient,
driverServiceManager: DriverServiceManager,
- sslConfiguration: SslConfiguration,
+ sslConfiguration: DriverSubmitSslConfiguration,
driverService: Service,
submitterLocalFiles: Iterable[String],
submitterLocalJars: Iterable[String],
@@ -298,7 +297,7 @@ private[spark] class Client(
customLabels: Map[String, String],
customAnnotations: Map[String, String],
submitServerSecret: Secret,
- sslConfiguration: SslConfiguration): (Pod, Service) = {
+ sslConfiguration: DriverSubmitSslConfiguration): (Pod, Service) = {
val driverKubernetesSelectors = (Map(
SPARK_DRIVER_LABEL -> kubernetesAppId,
SPARK_APP_ID_LABEL -> kubernetesAppId,
@@ -349,7 +348,7 @@ private[spark] class Client(
private def configureOwnerReferences(
kubernetesClient: KubernetesClient,
submitServerSecret: Secret,
- sslSecrets: Array[Secret],
+ sslSecret: Option[Secret],
driverPod: Pod,
driverService: Service): Service = {
val driverPodOwnerRef = new OwnerReferenceBuilder()
@@ -359,7 +358,7 @@ private[spark] class Client(
.withKind(driverPod.getKind)
.withController(true)
.build()
- sslSecrets.foreach(secret => {
+ sslSecret.foreach(secret => {
val updatedSecret = kubernetesClient.secrets().withName(secret.getMetadata.getName).edit()
.editMetadata()
.addToOwnerReferences(driverPodOwnerRef)
@@ -425,10 +424,10 @@ private[spark] class Client(
driverKubernetesSelectors: Map[String, String],
customAnnotations: Map[String, String],
submitServerSecret: Secret,
- sslConfiguration: SslConfiguration): Pod = {
+ sslConfiguration: DriverSubmitSslConfiguration): Pod = {
val containerPorts = buildContainerPorts()
val probePingHttpGet = new HTTPGetActionBuilder()
- .withScheme(if (sslConfiguration.sslOptions.enabled) "HTTPS" else "HTTP")
+ .withScheme(if (sslConfiguration.enabled) "HTTPS" else "HTTP")
.withPath("/v1/submissions/ping")
.withNewPort(SUBMISSION_SERVER_PORT_NAME)
.build()
@@ -452,7 +451,7 @@ private[spark] class Client(
.withSecretName(submitServerSecret.getMetadata.getName)
.endSecret()
.endVolume()
- .addToVolumes(sslConfiguration.sslPodVolumes: _*)
+ .addToVolumes(sslConfiguration.sslPodVolume.toSeq: _*)
.withServiceAccount(serviceAccount.getOrElse("default"))
.addNewContainer()
.withName(DRIVER_CONTAINER_NAME)
@@ -463,7 +462,7 @@ private[spark] class Client(
.withMountPath(secretDirectory)
.withReadOnly(true)
.endVolumeMount()
- .addToVolumeMounts(sslConfiguration.sslPodVolumeMounts: _*)
+ .addToVolumeMounts(sslConfiguration.sslPodVolumeMount.toSeq: _*)
.addNewEnv()
.withName(ENV_SUBMISSION_SECRET_LOCATION)
.withValue(s"$secretDirectory/$SUBMISSION_APP_SECRET_NAME")
@@ -661,7 +660,7 @@ private[spark] class Client(
kubernetesClient: KubernetesClient,
driverServiceManager: DriverServiceManager,
service: Service,
- sslConfiguration: SslConfiguration): KubernetesSparkRestApi = {
+ sslConfiguration: DriverSubmitSslConfiguration): KubernetesSparkRestApi = {
val serviceUris = driverServiceManager.getDriverServiceSubmissionServerUris(service)
require(serviceUris.nonEmpty, "No uris found to contact the driver!")
HttpClientUtil.createClient[KubernetesSparkRestApi](
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/DriverSubmitSslConfigurationProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/DriverSubmitSslConfigurationProvider.scala
new file mode 100644
index 000000000000..a83c9a9896a0
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/DriverSubmitSslConfigurationProvider.scala
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.kubernetes
+
+import java.io.{File, FileInputStream}
+import java.security.{KeyStore, SecureRandom}
+import javax.net.ssl.{SSLContext, TrustManagerFactory, X509TrustManager}
+
+import com.google.common.base.Charsets
+import com.google.common.io.{BaseEncoding, Files}
+import io.fabric8.kubernetes.api.model.{EnvVar, EnvVarBuilder, Secret, Volume, VolumeBuilder, VolumeMount, VolumeMountBuilder}
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf, SparkException, SSLOptions}
+import org.apache.spark.deploy.kubernetes.config._
+import org.apache.spark.deploy.kubernetes.constants._
+import org.apache.spark.deploy.rest.kubernetes.{KubernetesFileUtils, PemsToKeyStoreConverter}
+import org.apache.spark.util.Utils
+
+/**
+ * Raw SSL configuration as the user specified in SparkConf for setting up the driver
+ * submission server.
+ */
+private case class DriverSubmitSslConfigurationParameters(
+ storeBasedSslOptions: SSLOptions,
+ isKeyStoreLocalFile: Boolean,
+ driverSubmitServerKeyPem: Option[File],
+ isDriverSubmitKeyPemLocalFile: Boolean,
+ driverSubmitServerCertPem: Option[File],
+ isDriverSubmitServerCertPemLocalFile: Boolean,
+ submissionClientCertPem: Option[File])
+
+/**
+ * Resolved from translating options provided in
+ * {@link DriverSubmitSslConfigurationParameters} into Kubernetes volumes, environment variables
+ * for the driver pod, Kubernetes secrets, client-side trust managers, and the client-side SSL
+ * context. This is used for setting up the SSL connection for the submission server where the
+ * application local dependencies and configuration is provided from.
+ */
+private[spark] case class DriverSubmitSslConfiguration(
+ enabled: Boolean,
+ sslPodEnvVars: Array[EnvVar],
+ sslPodVolume: Option[Volume],
+ sslPodVolumeMount: Option[VolumeMount],
+ sslSecret: Option[Secret],
+ driverSubmitClientTrustManager: Option[X509TrustManager],
+ driverSubmitClientSslContext: SSLContext)
+
+/**
+ * Provides the SSL configuration for bootstrapping the driver pod to listen for the driver
+ * submission over SSL, and then supply the client-side configuration for establishing the
+ * SSL connection. This is done in two phases: first, interpreting the raw configuration
+ * values from the SparkConf object; then second, converting the configuration parameters
+ * into the appropriate Kubernetes constructs, namely the volume and volume mount to add to the
+ * driver pod, and the secret to create at the API server; and finally, constructing the
+ * client-side trust manager and SSL context for sending the local dependencies.
+ */
+private[spark] class DriverSubmitSslConfigurationProvider(
+ sparkConf: SparkConf,
+ kubernetesAppId: String,
+ kubernetesClient: KubernetesClient,
+ kubernetesResourceCleaner: KubernetesResourceCleaner) {
+ private val SECURE_RANDOM = new SecureRandom()
+ private val sslSecretsName = s"$SUBMISSION_SSL_SECRETS_PREFIX-$kubernetesAppId"
+ private val sslSecretsDirectory = DRIVER_CONTAINER_SUBMISSION_SECRETS_BASE_DIR +
+ s"/$kubernetesAppId-ssl"
+
+ def getSslConfiguration(): DriverSubmitSslConfiguration = {
+ val sslConfigurationParameters = parseSslConfigurationParameters()
+ if (sslConfigurationParameters.storeBasedSslOptions.enabled) {
+ val storeBasedSslOptions = sslConfigurationParameters.storeBasedSslOptions
+ val keyStoreSecret = resolveFileToSecretMapping(
+ sslConfigurationParameters.isKeyStoreLocalFile,
+ SUBMISSION_SSL_KEYSTORE_SECRET_NAME,
+ storeBasedSslOptions.keyStore,
+ "KeyStore")
+ val keyStorePathEnv = resolveFilePathEnv(
+ sslConfigurationParameters.isKeyStoreLocalFile,
+ ENV_SUBMISSION_KEYSTORE_FILE,
+ SUBMISSION_SSL_KEYSTORE_SECRET_NAME,
+ storeBasedSslOptions.keyStore)
+ val storePasswordSecret = storeBasedSslOptions.keyStorePassword.map(password => {
+ val passwordBase64 = BaseEncoding.base64().encode(password.getBytes(Charsets.UTF_8))
+ (SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME, passwordBase64)
+ }).toMap
+ val storePasswordLocationEnv = storeBasedSslOptions.keyStorePassword.map(_ => {
+ new EnvVarBuilder()
+ .withName(ENV_SUBMISSION_KEYSTORE_PASSWORD_FILE)
+ .withValue(s"$sslSecretsDirectory/$SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME")
+ .build()
+ })
+ val storeKeyPasswordSecret = storeBasedSslOptions.keyPassword.map(password => {
+ val passwordBase64 = BaseEncoding.base64().encode(password.getBytes(Charsets.UTF_8))
+ (SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME, passwordBase64)
+ }).toMap
+ val storeKeyPasswordEnv = storeBasedSslOptions.keyPassword.map(_ => {
+ new EnvVarBuilder()
+ .withName(ENV_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE)
+ .withValue(s"$sslSecretsDirectory/$SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME")
+ .build()
+ })
+ val storeTypeEnv = storeBasedSslOptions.keyStoreType.map(storeType => {
+ new EnvVarBuilder()
+ .withName(ENV_SUBMISSION_KEYSTORE_TYPE)
+ .withValue(storeType)
+ .build()
+ })
+ val keyPemSecret = resolveFileToSecretMapping(
+ sslConfigurationParameters.isDriverSubmitKeyPemLocalFile,
+ secretName = SUBMISSION_SSL_KEY_PEM_SECRET_NAME,
+ secretType = "Key pem",
+ secretFile = sslConfigurationParameters.driverSubmitServerKeyPem)
+ val keyPemLocationEnv = resolveFilePathEnv(
+ sslConfigurationParameters.isDriverSubmitKeyPemLocalFile,
+ envName = ENV_SUBMISSION_KEY_PEM_FILE,
+ secretName = SUBMISSION_SSL_KEY_PEM_SECRET_NAME,
+ maybeFile = sslConfigurationParameters.driverSubmitServerKeyPem)
+ val certPemSecret = resolveFileToSecretMapping(
+ sslConfigurationParameters.isDriverSubmitServerCertPemLocalFile,
+ secretName = SUBMISSION_SSL_CERT_PEM_SECRET_NAME,
+ secretType = "Cert pem",
+ secretFile = sslConfigurationParameters.driverSubmitServerCertPem)
+ val certPemLocationEnv = resolveFilePathEnv(
+ sslConfigurationParameters.isDriverSubmitServerCertPemLocalFile,
+ envName = ENV_SUBMISSION_CERT_PEM_FILE,
+ secretName = SUBMISSION_SSL_CERT_PEM_SECRET_NAME,
+ maybeFile = sslConfigurationParameters.driverSubmitServerCertPem)
+ val useSslEnv = new EnvVarBuilder()
+ .withName(ENV_SUBMISSION_USE_SSL)
+ .withValue("true")
+ .build()
+ val sslVolume = new VolumeBuilder()
+ .withName(SUBMISSION_SSL_SECRETS_VOLUME_NAME)
+ .withNewSecret()
+ .withSecretName(sslSecretsName)
+ .endSecret()
+ .build()
+ val sslVolumeMount = new VolumeMountBuilder()
+ .withName(SUBMISSION_SSL_SECRETS_VOLUME_NAME)
+ .withReadOnly(true)
+ .withMountPath(sslSecretsDirectory)
+ .build()
+ val allSecrets = keyStoreSecret ++
+ storePasswordSecret ++
+ storeKeyPasswordSecret ++
+ keyPemSecret ++
+ certPemSecret
+ val sslSecret = kubernetesClient.secrets().createNew()
+ .withNewMetadata()
+ .withName(sslSecretsName)
+ .endMetadata()
+ .withData(allSecrets.asJava)
+ .withType("Opaque")
+ .done()
+ kubernetesResourceCleaner.registerOrUpdateResource(sslSecret)
+ val allSslEnvs = keyStorePathEnv ++
+ storePasswordLocationEnv ++
+ storeKeyPasswordEnv ++
+ storeTypeEnv ++
+ keyPemLocationEnv ++
+ Array(useSslEnv) ++
+ certPemLocationEnv
+ val (driverSubmitClientTrustManager, driverSubmitClientSslContext) =
+ buildSslConnectionConfiguration(sslConfigurationParameters)
+ DriverSubmitSslConfiguration(
+ true,
+ allSslEnvs.toArray,
+ Some(sslVolume),
+ Some(sslVolumeMount),
+ Some(sslSecret),
+ driverSubmitClientTrustManager,
+ driverSubmitClientSslContext)
+ } else {
+ DriverSubmitSslConfiguration(
+ false,
+ Array[EnvVar](),
+ None,
+ None,
+ None,
+ None,
+ SSLContext.getDefault)
+ }
+ }
+
+ private def resolveFilePathEnv(
+ isLocal: Boolean,
+ envName: String,
+ secretName: String,
+ maybeFile: Option[File]): Option[EnvVar] = {
+ maybeFile.map(file => {
+ val pemPath = if (isLocal) {
+ s"$sslSecretsDirectory/$secretName"
+ } else {
+ file.getAbsolutePath
+ }
+ new EnvVarBuilder()
+ .withName(envName)
+ .withValue(pemPath)
+ .build()
+ })
+ }
+
+ private def resolveFileToSecretMapping(
+ isLocal: Boolean,
+ secretName: String,
+ secretFile: Option[File],
+ secretType: String): Map[String, String] = {
+ secretFile.filter(_ => isLocal).map(file => {
+ if (!file.isFile) {
+ throw new SparkException(s"$secretType specified at ${file.getAbsolutePath} is not" +
+ s" a file or does not exist.")
+ }
+ val keyStoreBytes = Files.toByteArray(file)
+ (secretName, BaseEncoding.base64().encode(keyStoreBytes))
+ }).toMap
+ }
+
+ private def parseSslConfigurationParameters(): DriverSubmitSslConfigurationParameters = {
+ val maybeKeyStore = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_SSL_KEYSTORE)
+ val maybeTrustStore = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_SSL_TRUSTSTORE)
+ val maybeKeyPem = sparkConf.get(DRIVER_SUBMIT_SSL_KEY_PEM)
+ val maybeDriverSubmitServerCertPem = sparkConf.get(DRIVER_SUBMIT_SSL_SERVER_CERT_PEM)
+ val maybeDriverSubmitClientCertPem = sparkConf.get(DRIVER_SUBMIT_SSL_CLIENT_CERT_PEM)
+ validatePemsDoNotConflictWithStores(
+ maybeKeyStore,
+ maybeTrustStore,
+ maybeKeyPem,
+ maybeDriverSubmitServerCertPem,
+ maybeDriverSubmitClientCertPem)
+ val resolvedSparkConf = sparkConf.clone()
+ val (isLocalKeyStore, resolvedKeyStore) = resolveLocalFile(maybeKeyStore, "keyStore")
+ resolvedKeyStore.foreach {
+ resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_SSL_KEYSTORE, _)
+ }
+ val (isLocalDriverSubmitServerCertPem, resolvedDriverSubmitServerCertPem) =
+ resolveLocalFile(maybeDriverSubmitServerCertPem, "server cert PEM")
+ val (isLocalKeyPem, resolvedKeyPem) = resolveLocalFile(maybeKeyPem, "key PEM")
+ maybeTrustStore.foreach { trustStore =>
+ require(KubernetesFileUtils.isUriLocalFile(trustStore), s"Invalid trustStore URI" +
+ s" $trustStore; trustStore URI for submit server must have no scheme, or scheme file://")
+ resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_SSL_TRUSTSTORE,
+ Utils.resolveURI(trustStore).getPath)
+ }
+ val driverSubmitClientCertPem = maybeDriverSubmitClientCertPem.map { driverSubmitClientCert =>
+ require(KubernetesFileUtils.isUriLocalFile(driverSubmitClientCert),
+ "Invalid client certificate PEM URI $driverSubmitClientCert: client certificate URI must" +
+ " have no scheme, or scheme file://")
+ Utils.resolveURI(driverSubmitClientCert).getPath
+ }
+ val securityManager = new SparkSecurityManager(resolvedSparkConf)
+ val storeBasedSslOptions = securityManager.getSSLOptions(DRIVER_SUBMIT_SSL_NAMESPACE)
+ DriverSubmitSslConfigurationParameters(
+ storeBasedSslOptions,
+ isLocalKeyStore,
+ resolvedKeyPem.map(new File(_)),
+ isLocalKeyPem,
+ resolvedDriverSubmitServerCertPem.map(new File(_)),
+ isLocalDriverSubmitServerCertPem,
+ driverSubmitClientCertPem.map(new File(_)))
+ }
+
+ private def resolveLocalFile(file: Option[String],
+ fileType: String): (Boolean, Option[String]) = {
+ file.map { f =>
+ require(isValidSslFileScheme(f), s"Invalid $fileType URI $f, $fileType URI" +
+ s" for submit server must have scheme file:// or local:// (no scheme defaults to file://")
+ val isLocal = KubernetesFileUtils.isUriLocalFile(f)
+ (isLocal, Option.apply(Utils.resolveURI(f).getPath))
+ }.getOrElse(false, None)
+ }
+
+ private def validatePemsDoNotConflictWithStores(
+ maybeKeyStore: Option[String],
+ maybeTrustStore: Option[String],
+ maybeKeyPem: Option[String],
+ maybeDriverSubmitServerCertPem: Option[String],
+ maybeSubmitClientCertPem: Option[String]) = {
+ maybeKeyPem.orElse(maybeDriverSubmitServerCertPem).foreach { _ =>
+ require(maybeKeyStore.isEmpty,
+ "Cannot specify server PEM files and key store files; must specify only one or the other.")
+ }
+ maybeKeyPem.foreach { _ =>
+ require(maybeDriverSubmitServerCertPem.isDefined,
+ "When specifying the key PEM file, the server certificate PEM file must also be provided.")
+ }
+ maybeDriverSubmitServerCertPem.foreach { _ =>
+ require(maybeKeyPem.isDefined,
+ "When specifying the server certificate PEM file, the key PEM file must also be provided.")
+ }
+ maybeTrustStore.foreach { _ =>
+ require(maybeSubmitClientCertPem.isEmpty,
+ "Cannot specify client cert file and truststore file; must specify only one or the other.")
+ }
+ }
+
+ private def isValidSslFileScheme(rawUri: String): Boolean = {
+ val resolvedScheme = Option.apply(Utils.resolveURI(rawUri).getScheme).getOrElse("file")
+ resolvedScheme == "file" || resolvedScheme == "local"
+ }
+
+ private def buildSslConnectionConfiguration(
+ sslConfigurationParameters: DriverSubmitSslConfigurationParameters)
+ : (Option[X509TrustManager], SSLContext) = {
+ val maybeTrustStore = sslConfigurationParameters.submissionClientCertPem.map { certPem =>
+ PemsToKeyStoreConverter.convertCertPemToTrustStore(
+ certPem,
+ sslConfigurationParameters.storeBasedSslOptions.trustStoreType)
+ }.orElse(sslConfigurationParameters.storeBasedSslOptions.trustStore.map { trustStoreFile =>
+ if (!trustStoreFile.isFile) {
+ throw new SparkException(s"TrustStore file at ${trustStoreFile.getAbsolutePath}" +
+ s" does not exist or is not a file.")
+ }
+ val trustStore = KeyStore.getInstance(
+ sslConfigurationParameters
+ .storeBasedSslOptions
+ .trustStoreType
+ .getOrElse(KeyStore.getDefaultType))
+ Utils.tryWithResource(new FileInputStream(trustStoreFile)) { trustStoreStream =>
+ val trustStorePassword = sslConfigurationParameters
+ .storeBasedSslOptions
+ .trustStorePassword
+ .map(_.toCharArray)
+ .orNull
+ trustStore.load(trustStoreStream, trustStorePassword)
+ }
+ trustStore
+ })
+ maybeTrustStore.map { trustStore =>
+ val trustManagerFactory = TrustManagerFactory.getInstance(
+ TrustManagerFactory.getDefaultAlgorithm)
+ trustManagerFactory.init(trustStore)
+ val trustManagers = trustManagerFactory.getTrustManagers
+ val sslContext = SSLContext.getInstance("TLSv1.2")
+ sslContext.init(null, trustManagers, SECURE_RANDOM)
+ (Option.apply(trustManagers(0).asInstanceOf[X509TrustManager]), sslContext)
+ }.getOrElse((Option.empty[X509TrustManager], SSLContext.getDefault))
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SslConfigurationProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SslConfigurationProvider.scala
deleted file mode 100644
index 4bbe3ed385a4..000000000000
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SslConfigurationProvider.scala
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.kubernetes
-
-import java.io.FileInputStream
-import java.security.{KeyStore, SecureRandom}
-import javax.net.ssl.{SSLContext, TrustManagerFactory, X509TrustManager}
-
-import com.google.common.base.Charsets
-import com.google.common.io.{BaseEncoding, Files}
-import io.fabric8.kubernetes.api.model.{EnvVar, EnvVarBuilder, Secret, Volume, VolumeBuilder, VolumeMount, VolumeMountBuilder}
-import io.fabric8.kubernetes.client.KubernetesClient
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf, SparkException, SSLOptions}
-import org.apache.spark.deploy.kubernetes.config._
-import org.apache.spark.deploy.kubernetes.constants._
-import org.apache.spark.util.Utils
-
-private[spark] case class SslConfiguration(
- sslOptions: SSLOptions,
- isKeyStoreLocalFile: Boolean,
- sslPodEnvVars: Array[EnvVar],
- sslPodVolumes: Array[Volume],
- sslPodVolumeMounts: Array[VolumeMount],
- sslSecrets: Array[Secret],
- driverSubmitClientTrustManager: Option[X509TrustManager],
- driverSubmitClientSslContext: SSLContext)
-
-private[spark] class SslConfigurationProvider(
- sparkConf: SparkConf,
- kubernetesAppId: String,
- kubernetesClient: KubernetesClient,
- kubernetesResourceCleaner: KubernetesResourceCleaner) {
- private val SECURE_RANDOM = new SecureRandom()
- private val sslSecretsName = s"$SUBMISSION_SSL_SECRETS_PREFIX-$kubernetesAppId"
- private val sslSecretsDirectory = DRIVER_CONTAINER_SUBMISSION_SECRETS_BASE_DIR +
- s"/$kubernetesAppId-ssl"
-
- def getSslConfiguration(): SslConfiguration = {
- val (driverSubmitSslOptions, isKeyStoreLocalFile) = parseDriverSubmitSslOptions()
- if (driverSubmitSslOptions.enabled) {
- val sslSecretsMap = mutable.HashMap[String, String]()
- val sslEnvs = mutable.Buffer[EnvVar]()
- val secrets = mutable.Buffer[Secret]()
- driverSubmitSslOptions.keyStore.foreach(store => {
- val resolvedKeyStoreFile = if (isKeyStoreLocalFile) {
- if (!store.isFile) {
- throw new SparkException(s"KeyStore specified at $store is not a file or" +
- s" does not exist.")
- }
- val keyStoreBytes = Files.toByteArray(store)
- val keyStoreBase64 = BaseEncoding.base64().encode(keyStoreBytes)
- sslSecretsMap += (SUBMISSION_SSL_KEYSTORE_SECRET_NAME -> keyStoreBase64)
- s"$sslSecretsDirectory/$SUBMISSION_SSL_KEYSTORE_SECRET_NAME"
- } else {
- store.getAbsolutePath
- }
- sslEnvs += new EnvVarBuilder()
- .withName(ENV_SUBMISSION_KEYSTORE_FILE)
- .withValue(resolvedKeyStoreFile)
- .build()
- })
- driverSubmitSslOptions.keyStorePassword.foreach(password => {
- val passwordBase64 = BaseEncoding.base64().encode(password.getBytes(Charsets.UTF_8))
- sslSecretsMap += (SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME -> passwordBase64)
- sslEnvs += new EnvVarBuilder()
- .withName(ENV_SUBMISSION_KEYSTORE_PASSWORD_FILE)
- .withValue(s"$sslSecretsDirectory/$SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME")
- .build()
- })
- driverSubmitSslOptions.keyPassword.foreach(password => {
- val passwordBase64 = BaseEncoding.base64().encode(password.getBytes(Charsets.UTF_8))
- sslSecretsMap += (SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME -> passwordBase64)
- sslEnvs += new EnvVarBuilder()
- .withName(ENV_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE)
- .withValue(s"$sslSecretsDirectory/$SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME")
- .build()
- })
- driverSubmitSslOptions.keyStoreType.foreach(storeType => {
- sslEnvs += new EnvVarBuilder()
- .withName(ENV_SUBMISSION_KEYSTORE_TYPE)
- .withValue(storeType)
- .build()
- })
- sslEnvs += new EnvVarBuilder()
- .withName(ENV_SUBMISSION_USE_SSL)
- .withValue("true")
- .build()
- val sslVolume = new VolumeBuilder()
- .withName(SUBMISSION_SSL_SECRETS_VOLUME_NAME)
- .withNewSecret()
- .withSecretName(sslSecretsName)
- .endSecret()
- .build()
- val sslVolumeMount = new VolumeMountBuilder()
- .withName(SUBMISSION_SSL_SECRETS_VOLUME_NAME)
- .withReadOnly(true)
- .withMountPath(sslSecretsDirectory)
- .build()
- val sslSecrets = kubernetesClient.secrets().createNew()
- .withNewMetadata()
- .withName(sslSecretsName)
- .endMetadata()
- .withData(sslSecretsMap.asJava)
- .withType("Opaque")
- .done()
- kubernetesResourceCleaner.registerOrUpdateResource(sslSecrets)
- secrets += sslSecrets
- val (driverSubmitClientTrustManager, driverSubmitClientSslContext) =
- buildSslConnectionConfiguration(driverSubmitSslOptions)
- SslConfiguration(
- driverSubmitSslOptions,
- isKeyStoreLocalFile,
- sslEnvs.toArray,
- Array(sslVolume),
- Array(sslVolumeMount),
- secrets.toArray,
- driverSubmitClientTrustManager,
- driverSubmitClientSslContext)
- } else {
- SslConfiguration(
- driverSubmitSslOptions,
- isKeyStoreLocalFile,
- Array[EnvVar](),
- Array[Volume](),
- Array[VolumeMount](),
- Array[Secret](),
- None,
- SSLContext.getDefault)
- }
- }
-
- private def parseDriverSubmitSslOptions(): (SSLOptions, Boolean) = {
- val maybeKeyStore = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_KEYSTORE)
- val resolvedSparkConf = sparkConf.clone()
- val (isLocalKeyStore, resolvedKeyStore) = maybeKeyStore.map(keyStore => {
- val keyStoreURI = Utils.resolveURI(keyStore)
- val isProvidedKeyStoreLocal = keyStoreURI.getScheme match {
- case "file" | null => true
- case "local" => false
- case _ => throw new SparkException(s"Invalid KeyStore URI $keyStore; keyStore URI" +
- " for submit server must have scheme file:// or local:// (no scheme defaults" +
- " to file://)")
- }
- (isProvidedKeyStoreLocal, Option.apply(keyStoreURI.getPath))
- }).getOrElse((false, Option.empty[String]))
- resolvedKeyStore.foreach {
- resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_KEYSTORE, _)
- }
- sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE).foreach { trustStore =>
- val trustStoreURI = Utils.resolveURI(trustStore)
- trustStoreURI.getScheme match {
- case "file" | null =>
- resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE, trustStoreURI.getPath)
- case _ => throw new SparkException(s"Invalid trustStore URI $trustStore; trustStore URI" +
- " for submit server must have no scheme, or scheme file://")
- }
- }
- val securityManager = new SparkSecurityManager(resolvedSparkConf)
- (securityManager.getSSLOptions(KUBERNETES_SUBMIT_SSL_NAMESPACE), isLocalKeyStore)
- }
-
- private def buildSslConnectionConfiguration(driverSubmitSslOptions: SSLOptions):
- (Option[X509TrustManager], SSLContext) = {
- driverSubmitSslOptions.trustStore.map(trustStoreFile => {
- val trustManagerFactory = TrustManagerFactory.getInstance(
- TrustManagerFactory.getDefaultAlgorithm)
- val trustStore = KeyStore.getInstance(
- driverSubmitSslOptions.trustStoreType.getOrElse(KeyStore.getDefaultType))
- if (!trustStoreFile.isFile) {
- throw new SparkException(s"TrustStore file at ${trustStoreFile.getAbsolutePath}" +
- s" does not exist or is not a file.")
- }
- Utils.tryWithResource(new FileInputStream(trustStoreFile)) { trustStoreStream =>
- driverSubmitSslOptions.trustStorePassword match {
- case Some(password) =>
- trustStore.load(trustStoreStream, password.toCharArray)
- case None => trustStore.load(trustStoreStream, null)
- }
- }
- trustManagerFactory.init(trustStore)
- val trustManagers = trustManagerFactory.getTrustManagers
- val sslContext = SSLContext.getInstance("TLSv1.2")
- sslContext.init(null, trustManagers, SECURE_RANDOM)
- (Option.apply(trustManagers(0).asInstanceOf[X509TrustManager]), sslContext)
- }).getOrElse((Option.empty[X509TrustManager], SSLContext.getDefault))
- }
-}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala
index e33c761ecc8d..3328809e186e 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala
@@ -197,26 +197,51 @@ package object config {
.timeConf(TimeUnit.SECONDS)
.createWithDefault(60L)
- private[spark] val KUBERNETES_DRIVER_SUBMIT_KEYSTORE =
- ConfigBuilder("spark.ssl.kubernetes.submission.keyStore")
+ private[spark] val KUBERNETES_DRIVER_SUBMIT_SSL_KEYSTORE =
+ ConfigBuilder("spark.ssl.kubernetes.driversubmitserver.keyStore")
.doc("KeyStore file for the driver submission server listening on SSL. Can be pre-mounted" +
" on the driver container or uploaded from the submitting client.")
.stringConf
.createOptional
- private[spark] val KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE =
- ConfigBuilder("spark.ssl.kubernetes.submission.trustStore")
+ private[spark] val KUBERNETES_DRIVER_SUBMIT_SSL_TRUSTSTORE =
+ ConfigBuilder("spark.ssl.kubernetes.driversubmitserver.trustStore")
.doc("TrustStore containing certificates for communicating to the driver submission server" +
" over SSL.")
.stringConf
.createOptional
private[spark] val DRIVER_SUBMIT_SSL_ENABLED =
- ConfigBuilder("spark.ssl.kubernetes.submission.enabled")
+ ConfigBuilder("spark.ssl.kubernetes.driversubmitserver.enabled")
.doc("Whether or not to use SSL when sending the application dependencies to the driver pod.")
.booleanConf
.createWithDefault(false)
+ private[spark] val DRIVER_SUBMIT_SSL_KEY_PEM =
+ ConfigBuilder("spark.ssl.kubernetes.driversubmitserver.keyPem")
+ .doc("Key PEM file that the driver submission server will use when setting up TLS" +
+ " connections. Can be pre-mounted on the driver pod's disk or uploaded from the" +
+ " submitting client's machine.")
+ .stringConf
+ .createOptional
+
+ private[spark] val DRIVER_SUBMIT_SSL_SERVER_CERT_PEM =
+ ConfigBuilder("spark.ssl.kubernetes.driversubmitserver.serverCertPem")
+ .doc("Certificate PEM file that is associated with the key PEM file" +
+ " the submission server uses to set up TLS connections. Can be pre-mounted" +
+ " on the driver pod's disk or uploaded from the submitting client's machine.")
+ .stringConf
+ .createOptional
+
+ private[spark] val DRIVER_SUBMIT_SSL_CLIENT_CERT_PEM =
+ ConfigBuilder("spark.ssl.kubernetes.driversubmitserver.clientCertPem")
+ .doc("Certificate pem file that the submission client uses to connect to the submission" +
+ " server over TLS. This should often be the same as the server certificate, but can be" +
+ " different if the submission client will contact the driver through a proxy instead of" +
+ " the driver service directly.")
+ .stringConf
+ .createOptional
+
private[spark] val KUBERNETES_DRIVER_SERVICE_NAME =
ConfigBuilder("spark.kubernetes.driver.service.name")
.doc("Kubernetes service that exposes the driver pod for external access.")
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala
index 23d216e799ff..0e5fada30242 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala
@@ -36,6 +36,8 @@ package object constants {
private[spark] val SUBMISSION_SSL_KEYSTORE_SECRET_NAME = "spark-submission-server-keystore"
private[spark] val SUBMISSION_SSL_SECRETS_PREFIX = "spark-submission-server-ssl"
private[spark] val SUBMISSION_SSL_SECRETS_VOLUME_NAME = "spark-submission-server-ssl-secrets"
+ private[spark] val SUBMISSION_SSL_KEY_PEM_SECRET_NAME = "spark-submission-server-key-pem"
+ private[spark] val SUBMISSION_SSL_CERT_PEM_SECRET_NAME = "spark-submission-server-cert-pem"
// Default and fixed ports
private[spark] val SUBMISSION_SERVER_PORT = 7077
@@ -57,6 +59,8 @@ package object constants {
private[spark] val ENV_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE =
"SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE"
private[spark] val ENV_SUBMISSION_KEYSTORE_TYPE = "SPARK_SUBMISSION_KEYSTORE_TYPE"
+ private[spark] val ENV_SUBMISSION_KEY_PEM_FILE = "SPARK_SUBMISSION_KEY_PEM_FILE"
+ private[spark] val ENV_SUBMISSION_CERT_PEM_FILE = "SPARK_SUBMISSION_CERT_PEM_FILE"
private[spark] val ENV_SUBMISSION_USE_SSL = "SPARK_SUBMISSION_USE_SSL"
private[spark] val ENV_EXECUTOR_PORT = "SPARK_EXECUTOR_PORT"
private[spark] val ENV_DRIVER_URL = "SPARK_DRIVER_URL"
@@ -74,7 +78,7 @@ package object constants {
// Miscellaneous
private[spark] val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
- private[spark] val KUBERNETES_SUBMIT_SSL_NAMESPACE = "kubernetes.submission"
+ private[spark] val DRIVER_SUBMIT_SSL_NAMESPACE = "kubernetes.driversubmitserver"
private[spark] val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
private[spark] val MEMORY_OVERHEAD_FACTOR = 0.10
private[spark] val MEMORY_OVERHEAD_MIN = 384L
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala
index 4688521a59d3..4ca01b2f6bd3 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.rest.kubernetes
import java.io.{File, FileOutputStream, StringReader}
import java.net.URI
import java.nio.file.Paths
+import java.security.SecureRandom
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicInteger
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
@@ -26,10 +27,11 @@ import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
import com.google.common.base.Charsets
import com.google.common.io.{BaseEncoding, ByteStreams, Files}
import org.apache.commons.codec.binary.Base64
+import org.apache.commons.lang3.RandomStringUtils
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.{SecurityManager, SPARK_VERSION => sparkVersion, SparkConf, SSLOptions}
+import org.apache.spark.{SecurityManager, SPARK_VERSION => sparkVersion, SparkConf, SparkException, SSLOptions}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.rest._
@@ -44,7 +46,9 @@ private case class KubernetesSparkRestServerArguments(
keyStoreFile: Option[String] = None,
keyStorePasswordFile: Option[String] = None,
keyStoreType: Option[String] = None,
- keyPasswordFile: Option[String] = None) {
+ keyPasswordFile: Option[String] = None,
+ keyPemFile: Option[String] = None,
+ certPemFile: Option[String] = None) {
def validate(): KubernetesSparkRestServerArguments = {
require(host.isDefined, "Hostname not set via --hostname.")
require(port.isDefined, "Port not set via --port")
@@ -83,6 +87,12 @@ private object KubernetesSparkRestServerArguments {
case "--keystore-key-password-file" :: value :: tail =>
args = tail
resolvedArguments.copy(keyPasswordFile = Some(value))
+ case "--key-pem-file" :: value :: tail =>
+ args = tail
+ resolvedArguments.copy(keyPemFile = Some(value))
+ case "--cert-pem-file" :: value :: tail =>
+ args = tail
+ resolvedArguments.copy(certPemFile = Some(value))
// TODO polish usage message
case Nil => resolvedArguments
case unknown => throw new IllegalStateException(s"Unknown argument(s) found: $unknown")
@@ -377,26 +387,43 @@ private[spark] class KubernetesSparkRestServer(
private[spark] object KubernetesSparkRestServer {
private val barrier = new CountDownLatch(1)
+ private val SECURE_RANDOM = new SecureRandom()
def main(args: Array[String]): Unit = {
val parsedArguments = KubernetesSparkRestServerArguments.fromArgsArray(args)
val secretFile = new File(parsedArguments.secretFile.get)
- if (!secretFile.isFile) {
- throw new IllegalArgumentException(s"Secret file specified by --secret-file" +
- " is not a file, or does not exist.")
- }
+ require(secretFile.isFile, "Secret file specified by --secret-file is not a file, or" +
+ " does not exist.")
val sslOptions = if (parsedArguments.useSsl) {
- val keyStorePassword = parsedArguments
- .keyStorePasswordFile
- .map(new File(_))
- .map(Files.toString(_, Charsets.UTF_8))
+ validateSslOptions(parsedArguments)
val keyPassword = parsedArguments
.keyPasswordFile
.map(new File(_))
.map(Files.toString(_, Charsets.UTF_8))
+ // If key password isn't set but we're using PEM files, generate a password
+ .orElse(parsedArguments.keyPemFile.map(_ => randomPassword()))
+ val keyStorePassword = parsedArguments
+ .keyStorePasswordFile
+ .map(new File(_))
+ .map(Files.toString(_, Charsets.UTF_8))
+ // If keystore password isn't set but we're using PEM files, generate a password
+ .orElse(parsedArguments.keyPemFile.map(_ => randomPassword()))
+ val resolvedKeyStore = parsedArguments.keyStoreFile.map(new File(_)).orElse(
+ parsedArguments.keyPemFile.map(keyPemFile => {
+ parsedArguments.certPemFile.map(certPemFile => {
+ PemsToKeyStoreConverter.convertPemsToTempKeyStoreFile(
+ new File(keyPemFile),
+ new File(certPemFile),
+ "provided-key",
+ keyStorePassword,
+ keyPassword,
+ parsedArguments.keyStoreType)
+ })
+ }).getOrElse(throw new SparkException("When providing PEM files to set up TLS for the" +
+ " submission server, both the key and the certificate must be specified.")))
new SSLOptions(
enabled = true,
- keyStore = parsedArguments.keyStoreFile.map(new File(_)),
+ keyStore = resolvedKeyStore,
keyStoreType = parsedArguments.keyStoreType,
keyStorePassword = keyStorePassword,
keyPassword = keyPassword)
@@ -425,5 +452,25 @@ private[spark] object KubernetesSparkRestServer {
barrier.await()
System.exit(exitCode.get())
}
+
+ private def validateSslOptions(parsedArguments: KubernetesSparkRestServerArguments): Unit = {
+ parsedArguments.keyStoreFile.foreach { _ =>
+ require(parsedArguments.keyPemFile.orElse(parsedArguments.certPemFile).isEmpty,
+ "Cannot provide both key/cert PEM files and a keyStore file; select one or the other" +
+ " for configuring SSL.")
+ }
+ parsedArguments.keyPemFile.foreach { _ =>
+ require(parsedArguments.certPemFile.isDefined,
+ "When providing the key PEM file, the certificate PEM file must also be provided.")
+ }
+ parsedArguments.certPemFile.foreach { _ =>
+ require(parsedArguments.keyPemFile.isDefined,
+ "When providing the certificate PEM file, the key PEM file must also be provided.")
+ }
+ }
+
+ private def randomPassword(): String = {
+ RandomStringUtils.random(1024, 0, Integer.MAX_VALUE, false, false, null, SECURE_RANDOM)
+ }
}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/PemsToKeyStoreConverter.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/PemsToKeyStoreConverter.scala
new file mode 100644
index 000000000000..e5c43560eccb
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/PemsToKeyStoreConverter.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.rest.kubernetes
+
+import java.io.{File, FileInputStream, FileOutputStream, InputStreamReader}
+import java.nio.file.Paths
+import java.security.{KeyStore, PrivateKey}
+import java.security.cert.Certificate
+import java.util.UUID
+
+import com.google.common.base.Charsets
+import org.bouncycastle.asn1.pkcs.PrivateKeyInfo
+import org.bouncycastle.cert.X509CertificateHolder
+import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter
+import org.bouncycastle.openssl.{PEMKeyPair, PEMParser}
+import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter
+import scala.collection.mutable
+
+import org.apache.spark.SparkException
+import org.apache.spark.util.Utils
+
+private[spark] object PemsToKeyStoreConverter {
+
+ /**
+ * Loads the given key-cert pair into a temporary keystore file. Returns the File pointing
+ * to where the keyStore was written to disk.
+ */
+ def convertPemsToTempKeyStoreFile(
+ keyPemFile: File,
+ certPemFile: File,
+ keyAlias: String,
+ keyStorePassword: Option[String],
+ keyPassword: Option[String],
+ keyStoreType: Option[String]): File = {
+ require(keyPemFile.isFile, s"Key PEM file provided at ${keyPemFile.getAbsolutePath}" +
+ " does not exist or is not a file.")
+ require(certPemFile.isFile, s"Cert PEM file provided at ${certPemFile.getAbsolutePath}" +
+ " does not exist or is not a file.")
+ val privateKey = parsePrivateKeyFromPemFile(keyPemFile)
+ val certificates = parseCertificatesFromPemFile(certPemFile)
+ val resolvedKeyStoreType = keyStoreType.getOrElse(KeyStore.getDefaultType)
+ val keyStore = KeyStore.getInstance(resolvedKeyStoreType)
+ keyStore.load(null, null)
+ keyStore.setKeyEntry(
+ keyAlias,
+ privateKey,
+ keyPassword.map(_.toCharArray).orNull,
+ certificates)
+ val keyStoreOutputPath = Paths.get(s"keystore-${UUID.randomUUID()}.$resolvedKeyStoreType")
+ Utils.tryWithResource(new FileOutputStream(keyStoreOutputPath.toFile)) { storeStream =>
+ keyStore.store(storeStream, keyStorePassword.map(_.toCharArray).orNull)
+ }
+ keyStoreOutputPath.toFile
+ }
+
+ def convertCertPemToTrustStore(
+ certPemFile: File,
+ trustStoreType: Option[String]): KeyStore = {
+ require(certPemFile.isFile, s"Cert PEM file provided at ${certPemFile.getAbsolutePath}" +
+ " does not exist or is not a file.")
+ val trustStore = KeyStore.getInstance(trustStoreType.getOrElse(KeyStore.getDefaultType))
+ trustStore.load(null, null)
+ parseCertificatesFromPemFile(certPemFile).zipWithIndex.foreach { case (cert, index) =>
+ trustStore.setCertificateEntry(s"certificate-$index", cert)
+ }
+ trustStore
+ }
+
+ private def withPemParsedFromFile[T](pemFile: File)(f: (PEMParser => T)): T = {
+ Utils.tryWithResource(new FileInputStream(pemFile)) { pemStream =>
+ Utils.tryWithResource(new InputStreamReader(pemStream, Charsets.UTF_8)) { pemReader =>
+ Utils.tryWithResource(new PEMParser(pemReader))(f)
+ }
+ }
+ }
+
+ private def parsePrivateKeyFromPemFile(keyPemFile: File): PrivateKey = {
+ withPemParsedFromFile(keyPemFile) { keyPemParser =>
+ val converter = new JcaPEMKeyConverter
+ keyPemParser.readObject() match {
+ case privateKey: PrivateKeyInfo =>
+ converter.getPrivateKey(privateKey)
+ case keyPair: PEMKeyPair =>
+ converter.getPrivateKey(keyPair.getPrivateKeyInfo)
+ case _ =>
+ throw new SparkException(s"Key file provided at ${keyPemFile.getAbsolutePath}" +
+ s" is not a key pair or private key PEM file.")
+ }
+ }
+ }
+
+ private def parseCertificatesFromPemFile(certPemFile: File): Array[Certificate] = {
+ withPemParsedFromFile(certPemFile) { certPemParser =>
+ val certificates = mutable.Buffer[Certificate]()
+ var pemObject = certPemParser.readObject()
+ while (pemObject != null) {
+ pemObject match {
+ case certificate: X509CertificateHolder =>
+ val converter = new JcaX509CertificateConverter
+ certificates += converter.getCertificate(certificate)
+ case _ =>
+ }
+ pemObject = certPemParser.readObject()
+ }
+ if (certificates.isEmpty) {
+ throw new SparkException(s"No certificates found in ${certPemFile.getAbsolutePath}")
+ }
+ certificates.toArray
+ }
+ }
+}
diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile
index 3bf6b50ff69c..1f35e7e5eb20 100644
--- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile
+++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile
@@ -42,6 +42,8 @@ CMD SSL_ARGS="" && \
if ! [ -z ${SPARK_SUBMISSION_KEYSTORE_TYPE+x} ]; then SSL_ARGS="$SSL_ARGS --keystore-type $SPARK_SUBMISSION_KEYSTORE_TYPE"; fi && \
if ! [ -z ${SPARK_SUBMISSION_KEYSTORE_PASSWORD_FILE+x} ]; then SSL_ARGS="$SSL_ARGS --keystore-password-file $SPARK_SUBMISSION_KEYSTORE_PASSWORD_FILE"; fi && \
if ! [ -z ${SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE+x} ]; then SSL_ARGS="$SSL_ARGS --keystore-key-password-file $SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE"; fi && \
+ if ! [ -z ${SPARK_SUBMISSION_KEY_PEM_FILE+x} ]; then SSL_ARGS="$SSL_ARGS --key-pem-file $SPARK_SUBMISSION_KEY_PEM_FILE"; fi && \
+ if ! [ -z ${SPARK_SUBMISSION_CERT_PEM_FILE+x} ]; then SSL_ARGS="$SSL_ARGS --cert-pem-file $SPARK_SUBMISSION_CERT_PEM_FILE"; fi && \
exec bin/spark-class org.apache.spark.deploy.rest.kubernetes.KubernetesSparkRestServer \
--hostname $HOSTNAME \
--port $SPARK_SUBMISSION_SERVER_PORT \
diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml
index 5c54d0e5e3aa..da78e783cac1 100644
--- a/resource-managers/kubernetes/integration-tests/pom.xml
+++ b/resource-managers/kubernetes/integration-tests/pom.xml
@@ -106,10 +106,6 @@
-
- org.bouncycastle
- bcpkix-jdk15on
-
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala
index 16564ca746b4..0e55e64fd1d7 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala
@@ -72,8 +72,6 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
private val NAMESPACE = UUID.randomUUID().toString.replaceAll("-", "")
private var minikubeKubernetesClient: KubernetesClient = _
private var clientConfig: Config = _
- private var keyStoreFile: File = _
- private var trustStoreFile: File = _
private var sparkConf: SparkConf = _
override def beforeAll(): Unit = {
@@ -86,13 +84,6 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
.done()
minikubeKubernetesClient = Minikube.getKubernetesClient.inNamespace(NAMESPACE)
clientConfig = minikubeKubernetesClient.getConfiguration
- val (keyStore, trustStore) = SSLUtils.generateKeyStoreTrustStorePair(
- Minikube.getMinikubeIp,
- "changeit",
- "changeit",
- "changeit")
- keyStoreFile = keyStore
- trustStoreFile = trustStore
}
before {
@@ -182,9 +173,6 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
}
test("Run a simple example") {
- // We'll make assertions based on spark rest api, so we need to turn on
- // spark.ui.enabled explicitly since the scalatest-maven-plugin would set it
- // to false by default.
new Client(
sparkConf = sparkConf,
mainClass = SPARK_PI_MAIN_CLASS,
@@ -265,11 +253,30 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
}
test("Enable SSL on the driver submit server") {
- sparkConf.set(KUBERNETES_DRIVER_SUBMIT_KEYSTORE, s"file://${keyStoreFile.getAbsolutePath}")
- sparkConf.set("spark.ssl.kubernetes.submission.keyStorePassword", "changeit")
- sparkConf.set("spark.ssl.kubernetes.submission.keyPassword", "changeit")
- sparkConf.set(KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE,
+ val (keyStoreFile, trustStoreFile) = SSLUtils.generateKeyStoreTrustStorePair(
+ Minikube.getMinikubeIp,
+ "changeit",
+ "changeit",
+ "changeit")
+ sparkConf.set(KUBERNETES_DRIVER_SUBMIT_SSL_KEYSTORE, s"file://${keyStoreFile.getAbsolutePath}")
+ sparkConf.set("spark.ssl.kubernetes.driversubmitserver.keyStorePassword", "changeit")
+ sparkConf.set("spark.ssl.kubernetes.driversubmitserver.keyPassword", "changeit")
+ sparkConf.set(KUBERNETES_DRIVER_SUBMIT_SSL_TRUSTSTORE,
s"file://${trustStoreFile.getAbsolutePath}")
+ sparkConf.set("spark.ssl.kubernetes.driversubmitserver.trustStorePassword", "changeit")
+ sparkConf.set(DRIVER_SUBMIT_SSL_ENABLED, true)
+ new Client(
+ sparkConf = sparkConf,
+ mainClass = SPARK_PI_MAIN_CLASS,
+ mainAppResource = SUBMITTER_LOCAL_MAIN_APP_RESOURCE,
+ appArgs = Array.empty[String]).run()
+ }
+
+ test("Enable SSL on the driver submit server using PEM files") {
+ val (keyPem, certPem) = SSLUtils.generateKeyCertPemPair(Minikube.getMinikubeIp)
+ sparkConf.set(DRIVER_SUBMIT_SSL_KEY_PEM, s"file://${keyPem.getAbsolutePath}")
+ sparkConf.set(DRIVER_SUBMIT_SSL_CLIENT_CERT_PEM, s"file://${certPem.getAbsolutePath}")
+ sparkConf.set(DRIVER_SUBMIT_SSL_SERVER_CERT_PEM, s"file://${certPem.getAbsolutePath}")
sparkConf.set(DRIVER_SUBMIT_SSL_ENABLED, true)
new Client(
sparkConf = sparkConf,
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/sslutil/SSLUtils.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/sslutil/SSLUtils.scala
index bde7b4322666..2078e0585e8f 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/sslutil/SSLUtils.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/sslutil/SSLUtils.scala
@@ -16,15 +16,18 @@
*/
package org.apache.spark.deploy.kubernetes.integrationtest.sslutil
-import java.io.{File, FileOutputStream}
+import java.io.{File, FileOutputStream, OutputStreamWriter}
import java.math.BigInteger
import java.nio.file.Files
-import java.security.{KeyPairGenerator, KeyStore, SecureRandom}
+import java.security.cert.X509Certificate
+import java.security.{KeyPair, KeyPairGenerator, KeyStore, SecureRandom}
import java.util.{Calendar, Random}
import javax.security.auth.x500.X500Principal
+import com.google.common.base.Charsets
import org.bouncycastle.asn1.x509.{Extension, GeneralName, GeneralNames}
import org.bouncycastle.cert.jcajce.{JcaX509CertificateConverter, JcaX509v3CertificateBuilder}
+import org.bouncycastle.openssl.jcajce.JcaPEMWriter
import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder
import org.apache.spark.util.Utils
@@ -39,6 +42,58 @@ private[spark] object SSLUtils {
val keyPairGenerator = KeyPairGenerator.getInstance("RSA")
keyPairGenerator.initialize(512)
val keyPair = keyPairGenerator.generateKeyPair()
+ val certificate = generateCertificate(ipAddress, keyPair)
+ val keyStore = KeyStore.getInstance("JKS")
+ keyStore.load(null, null)
+ keyStore.setKeyEntry("key", keyPair.getPrivate,
+ keyPassword.toCharArray, Array(certificate))
+ val tempDir = Files.createTempDirectory("temp-ssl-stores").toFile
+ tempDir.deleteOnExit()
+ val keyStoreFile = new File(tempDir, "keyStore.jks")
+ Utils.tryWithResource(new FileOutputStream(keyStoreFile)) {
+ keyStore.store(_, keyStorePassword.toCharArray)
+ }
+ val trustStore = KeyStore.getInstance("JKS")
+ trustStore.load(null, null)
+ trustStore.setCertificateEntry("key", certificate)
+ val trustStoreFile = new File(tempDir, "trustStore.jks")
+ Utils.tryWithResource(new FileOutputStream(trustStoreFile)) {
+ trustStore.store(_, trustStorePassword.toCharArray)
+ }
+ (keyStoreFile, trustStoreFile)
+ }
+
+ def generateKeyCertPemPair(ipAddress: String): (File, File) = {
+ val keyPairGenerator = KeyPairGenerator.getInstance("RSA")
+ keyPairGenerator.initialize(512)
+ val keyPair = keyPairGenerator.generateKeyPair()
+ val certificate = generateCertificate(ipAddress, keyPair)
+ val tempDir = Files.createTempDirectory("temp-ssl-pems").toFile
+ tempDir.deleteOnExit()
+ val keyPemFile = new File(tempDir, "key.pem")
+ val certPemFile = new File(tempDir, "cert.pem")
+ Utils.tryWithResource(new FileOutputStream(keyPemFile)) { keyPemStream =>
+ Utils.tryWithResource(
+ new OutputStreamWriter(keyPemStream, Charsets.UTF_8)) { streamWriter =>
+ Utils.tryWithResource(
+ new JcaPEMWriter(streamWriter)) { pemWriter =>
+ pemWriter.writeObject(keyPair.getPrivate)
+ }
+ }
+ }
+ Utils.tryWithResource(new FileOutputStream(certPemFile)) { keyPemStream =>
+ Utils.tryWithResource(
+ new OutputStreamWriter(keyPemStream, Charsets.UTF_8)) { streamWriter =>
+ Utils.tryWithResource(
+ new JcaPEMWriter(streamWriter)) { pemWriter =>
+ pemWriter.writeObject(certificate)
+ }
+ }
+ }
+ (keyPemFile, certPemFile)
+ }
+
+ private def generateCertificate(ipAddress: String, keyPair: KeyPair): X509Certificate = {
val selfPrincipal = new X500Principal(s"cn=$ipAddress")
val currentDate = Calendar.getInstance
val validForOneHundredYears = Calendar.getInstance
@@ -56,25 +111,6 @@ private[spark] object SSLUtils {
.setSecureRandom(new SecureRandom())
.build(keyPair.getPrivate)
val bcCertificate = certificateBuilder.build(signer)
- val jcaCertificate = new JcaX509CertificateConverter().getCertificate(bcCertificate)
- val keyStore = KeyStore.getInstance("JKS")
- keyStore.load(null, null)
- keyStore.setKeyEntry("key", keyPair.getPrivate,
- keyPassword.toCharArray, Array(jcaCertificate))
- val tempDir = Files.createTempDirectory("temp-ssl-stores").toFile()
- tempDir.deleteOnExit()
- val keyStoreFile = new File(tempDir, "keyStore.jks")
- Utils.tryWithResource(new FileOutputStream(keyStoreFile)) {
- keyStore.store(_, keyStorePassword.toCharArray)
- }
- val trustStore = KeyStore.getInstance("JKS")
- trustStore.load(null, null)
- trustStore.setCertificateEntry("key", jcaCertificate)
- val trustStoreFile = new File(tempDir, "trustStore.jks")
- Utils.tryWithResource(new FileOutputStream(trustStoreFile)) {
- trustStore.store(_, trustStorePassword.toCharArray)
- }
- (keyStoreFile, trustStoreFile)
+ new JcaX509CertificateConverter().getCertificate(bcCertificate)
}
-
}