Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Address some missed comments
  • Loading branch information
mccheah committed Mar 17, 2017
commit 36b677565b1818523219d4b3555dffee823a0645
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ private[spark] class Client(
driverServiceManager.handleSubmissionError(
new SparkException("Submission shutting down early...")))
try {
val sslConfigurationProvider = new SslConfigurationProvider(
val sslConfigurationProvider = new DriverSubmitSslConfigurationProvider(
sparkConf, kubernetesAppId, kubernetesClient, kubernetesResourceCleaner)
val submitServerSecret = kubernetesClient.secrets().createNew()
.withNewMetadata()
Expand All @@ -182,7 +182,7 @@ private[spark] class Client(
configureOwnerReferences(
kubernetesClient,
submitServerSecret,
sslConfiguration.sslSecrets,
sslConfiguration.sslSecret,
driverPod,
driverService)
submitApplicationToDriverServer(
Expand Down Expand Up @@ -233,13 +233,13 @@ private[spark] class Client(
}

private def submitApplicationToDriverServer(
kubernetesClient: KubernetesClient,
driverServiceManager: DriverServiceManager,
sslConfiguration: SslConfiguration,
driverService: Service,
submitterLocalFiles: Iterable[String],
submitterLocalJars: Iterable[String],
driverPodKubernetesCredentials: KubernetesCredentials): Unit = {
kubernetesClient: KubernetesClient,
driverServiceManager: DriverServiceManager,
sslConfiguration: DriverSubmitSslConfiguration,
driverService: Service,
submitterLocalFiles: Iterable[String],
submitterLocalJars: Iterable[String],
driverPodKubernetesCredentials: KubernetesCredentials): Unit = {
sparkConf.getOption("spark.app.id").foreach { id =>
logWarning(s"Warning: Provided app id in spark.app.id as $id will be" +
s" overridden as $kubernetesAppId")
Expand Down Expand Up @@ -297,7 +297,7 @@ private[spark] class Client(
customLabels: Map[String, String],
customAnnotations: Map[String, String],
submitServerSecret: Secret,
sslConfiguration: SslConfiguration): (Pod, Service) = {
sslConfiguration: DriverSubmitSslConfiguration): (Pod, Service) = {
val driverKubernetesSelectors = (Map(
SPARK_DRIVER_LABEL -> kubernetesAppId,
SPARK_APP_ID_LABEL -> kubernetesAppId,
Expand Down Expand Up @@ -348,7 +348,7 @@ private[spark] class Client(
private def configureOwnerReferences(
kubernetesClient: KubernetesClient,
submitServerSecret: Secret,
sslSecrets: Option[Secret],
sslSecret: Option[Secret],
driverPod: Pod,
driverService: Service): Service = {
val driverPodOwnerRef = new OwnerReferenceBuilder()
Expand All @@ -358,7 +358,7 @@ private[spark] class Client(
.withKind(driverPod.getKind)
.withController(true)
.build()
sslSecrets.foreach(secret => {
sslSecret.foreach(secret => {
val updatedSecret = kubernetesClient.secrets().withName(secret.getMetadata.getName).edit()
.editMetadata()
.addToOwnerReferences(driverPodOwnerRef)
Expand Down Expand Up @@ -424,7 +424,7 @@ private[spark] class Client(
driverKubernetesSelectors: Map[String, String],
customAnnotations: Map[String, String],
submitServerSecret: Secret,
sslConfiguration: SslConfiguration): Pod = {
sslConfiguration: DriverSubmitSslConfiguration): Pod = {
val containerPorts = buildContainerPorts()
val probePingHttpGet = new HTTPGetActionBuilder()
.withScheme(if (sslConfiguration.enabled) "HTTPS" else "HTTP")
Expand Down Expand Up @@ -660,7 +660,7 @@ private[spark] class Client(
kubernetesClient: KubernetesClient,
driverServiceManager: DriverServiceManager,
service: Service,
sslConfiguration: SslConfiguration): KubernetesSparkRestApi = {
sslConfiguration: DriverSubmitSslConfiguration): KubernetesSparkRestApi = {
val serviceUris = driverServiceManager.getDriverServiceSubmissionServerUris(service)
require(serviceUris.nonEmpty, "No uris found to contact the driver!")
HttpClientUtil.createClient[KubernetesSparkRestApi](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,32 +33,44 @@ import org.apache.spark.deploy.rest.kubernetes.{KubernetesFileUtils, PemsToKeySt
import org.apache.spark.util.Utils

/**
* Raw SSL configuration as the user specified in SparkConf.
* Raw SSL configuration as the user specified in SparkConf for setting up the driver
* submission server.
*/
private case class SslConfigurationParameters(
storeBasedSslOptions: SSLOptions,
isKeyStoreLocalFile: Boolean,
keyPem: Option[File],
isKeyPemLocalFile: Boolean,
serverCertPem: Option[File],
isServerCertPemLocalFile: Boolean,
clientCertPem: Option[File])
private case class DriverSubmitSslConfigurationParameters(
storeBasedSslOptions: SSLOptions,
isKeyStoreLocalFile: Boolean,
driverSubmitServerKeyPem: Option[File],
isDriverSubmitKeyPemLocalFile: Boolean,
driverSubmitServerCertPem: Option[File],
isDriverSubmitServerCertPemLocalFile: Boolean,
submissionClientCertPem: Option[File])

/**
* Resolved from translating options provided in {@link SslConfigurationParameters} into
* Kubernetes volumes, environment variables for the driver pod, client-side trust managers,
* and the client-side SSL context.
* Resolved from translating options provided in
* {@link DriverSubmitSslConfigurationParameters} into Kubernetes volumes, environment variables
* for the driver pod, client-side trust managers, and the client-side SSL context. This is used
* for setting up the SSL connection for the submission server where the application local
* dependencies and configuration is provided from.
*/
private[spark] case class SslConfiguration(
enabled: Boolean,
sslPodEnvVars: Array[EnvVar],
sslPodVolume: Option[Volume],
sslPodVolumeMount: Option[VolumeMount],
sslSecrets: Option[Secret],
driverSubmitClientTrustManager: Option[X509TrustManager],
driverSubmitClientSslContext: SSLContext)
private[spark] case class DriverSubmitSslConfiguration(
enabled: Boolean,
sslPodEnvVars: Array[EnvVar],
sslPodVolume: Option[Volume],
sslPodVolumeMount: Option[VolumeMount],
sslSecret: Option[Secret],
driverSubmitClientTrustManager: Option[X509TrustManager],
driverSubmitClientSslContext: SSLContext)

private[spark] class SslConfigurationProvider(
/**
* Provides the SSL configuration for bootstrapping the driver pod to listen for the driver
* submission over SSL, and then supply the client-side configuration for establishing the
* SSL connection. This is done in two phases: first, interpreting the raw configuration
* values from the SparkConf object; then second, converting the configuration into the
* appropriate Kubernetes constructs, namely the volume and volume mount to add to the
* driver pod, and the secret to create at the API server; and finally, constructing the
* client-side trust manager and SSL context for sending the local dependencies.
*/
private[spark] class DriverSubmitSslConfigurationProvider(
sparkConf: SparkConf,
kubernetesAppId: String,
kubernetesClient: KubernetesClient,
Expand All @@ -68,17 +80,17 @@ private[spark] class SslConfigurationProvider(
private val sslSecretsDirectory = DRIVER_CONTAINER_SUBMISSION_SECRETS_BASE_DIR +
s"/$kubernetesAppId-ssl"

def getSslConfiguration(): SslConfiguration = {
val driverSubmitSslOptions = parseDriverSubmitSslOptions()
if (driverSubmitSslOptions.storeBasedSslOptions.enabled) {
val storeBasedSslOptions = driverSubmitSslOptions.storeBasedSslOptions
def getSslConfiguration(): DriverSubmitSslConfiguration = {
val sslConfigurationParameters = parseSslConfigurationParameters()
if (sslConfigurationParameters.storeBasedSslOptions.enabled) {
val storeBasedSslOptions = sslConfigurationParameters.storeBasedSslOptions
val keyStoreSecret = resolveFileToSecretMapping(
driverSubmitSslOptions.isKeyStoreLocalFile,
sslConfigurationParameters.isKeyStoreLocalFile,
SUBMISSION_SSL_KEYSTORE_SECRET_NAME,
storeBasedSslOptions.keyStore,
"KeyStore")
val keyStorePathEnv = resolveFilePathEnv(
driverSubmitSslOptions.isKeyStoreLocalFile,
sslConfigurationParameters.isKeyStoreLocalFile,
ENV_SUBMISSION_KEYSTORE_FILE,
SUBMISSION_SSL_KEYSTORE_SECRET_NAME,
storeBasedSslOptions.keyStore)
Expand Down Expand Up @@ -109,25 +121,25 @@ private[spark] class SslConfigurationProvider(
.build()
})
val keyPemSecret = resolveFileToSecretMapping(
driverSubmitSslOptions.isKeyPemLocalFile,
sslConfigurationParameters.isDriverSubmitKeyPemLocalFile,
secretName = SUBMISSION_SSL_KEY_PEM_SECRET_NAME,
secretType = "Key pem",
secretFile = driverSubmitSslOptions.keyPem)
secretFile = sslConfigurationParameters.driverSubmitServerKeyPem)
val keyPemLocationEnv = resolveFilePathEnv(
driverSubmitSslOptions.isKeyPemLocalFile,
sslConfigurationParameters.isDriverSubmitKeyPemLocalFile,
envName = ENV_SUBMISSION_KEY_PEM_FILE,
secretName = SUBMISSION_SSL_KEY_PEM_SECRET_NAME,
maybeFile = driverSubmitSslOptions.keyPem)
maybeFile = sslConfigurationParameters.driverSubmitServerKeyPem)
val certPemSecret = resolveFileToSecretMapping(
driverSubmitSslOptions.isServerCertPemLocalFile,
sslConfigurationParameters.isDriverSubmitServerCertPemLocalFile,
secretName = SUBMISSION_SSL_CERT_PEM_SECRET_NAME,
secretType = "Cert pem",
secretFile = driverSubmitSslOptions.serverCertPem)
secretFile = sslConfigurationParameters.driverSubmitServerCertPem)
val certPemLocationEnv = resolveFilePathEnv(
driverSubmitSslOptions.isServerCertPemLocalFile,
sslConfigurationParameters.isDriverSubmitServerCertPemLocalFile,
envName = ENV_SUBMISSION_CERT_PEM_FILE,
secretName = SUBMISSION_SSL_CERT_PEM_SECRET_NAME,
maybeFile = driverSubmitSslOptions.serverCertPem)
maybeFile = sslConfigurationParameters.driverSubmitServerCertPem)
val useSslEnv = new EnvVarBuilder()
.withName(ENV_SUBMISSION_USE_SSL)
.withValue("true")
Expand Down Expand Up @@ -164,8 +176,8 @@ private[spark] class SslConfigurationProvider(
Array(useSslEnv) ++
certPemLocationEnv
val (driverSubmitClientTrustManager, driverSubmitClientSslContext) =
buildSslConnectionConfiguration(driverSubmitSslOptions)
SslConfiguration(
buildSslConnectionConfiguration(sslConfigurationParameters)
DriverSubmitSslConfiguration(
true,
allSslEnvs.toArray,
Some(sslVolume),
Expand All @@ -174,7 +186,7 @@ private[spark] class SslConfigurationProvider(
driverSubmitClientTrustManager,
driverSubmitClientSslContext)
} else {
SslConfiguration(
DriverSubmitSslConfiguration(
false,
Array[EnvVar](),
None,
Expand Down Expand Up @@ -218,47 +230,48 @@ private[spark] class SslConfigurationProvider(
}).toMap
}

private def parseDriverSubmitSslOptions(): SslConfigurationParameters = {
private def parseSslConfigurationParameters(): DriverSubmitSslConfigurationParameters = {
val maybeKeyStore = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_SSL_KEYSTORE)
val maybeTrustStore = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_SSL_TRUSTSTORE)
val maybeKeyPem = sparkConf.get(DRIVER_SUBMIT_SSL_KEY_PEM)
val maybeServerCertPem = sparkConf.get(DRIVER_SUBMIT_SSL_SERVER_CERT_PEM)
val maybeClientCertPem = sparkConf.get(DRIVER_SUBMIT_SSL_CLIENT_CERT_PEM)
val maybeDriverSubmitServerCertPem = sparkConf.get(DRIVER_SUBMIT_SSL_SERVER_CERT_PEM)
val maybeDriverSubmitClientCertPem = sparkConf.get(DRIVER_SUBMIT_SSL_CLIENT_CERT_PEM)
validatePemsDoNotConflictWithStores(
maybeKeyStore,
maybeTrustStore,
maybeKeyPem,
maybeServerCertPem,
maybeClientCertPem)
maybeDriverSubmitServerCertPem,
maybeDriverSubmitClientCertPem)
val resolvedSparkConf = sparkConf.clone()
val (isLocalKeyStore, resolvedKeyStore) = resolveLocalFile(maybeKeyStore, "keyStore")
resolvedKeyStore.foreach {
resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_SSL_KEYSTORE, _)
}
val (isLocalServerCertPem, resolvedServerCertPem): (Boolean, Option[String]) =
resolveLocalFile(maybeServerCertPem, "server cert PEM")
val (isLocalDriverSubmitServerCertPem, resolvedDriverSubmitServerCertPem) =
resolveLocalFile(maybeDriverSubmitServerCertPem, "server cert PEM")
val (isLocalKeyPem, resolvedKeyPem) = resolveLocalFile(maybeKeyPem, "key PEM")
maybeTrustStore.foreach { trustStore =>
require(KubernetesFileUtils.isUriLocalFile(trustStore), s"Invalid trustStore URI" +
s"$trustStore; trustStore URI for submit server must have no scheme, or scheme file://")
s" $trustStore; trustStore URI for submit server must have no scheme, or scheme file://")
resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_SSL_TRUSTSTORE,
Utils.resolveURI(trustStore).getPath)
}
val clientCertPem: Option[String] = maybeClientCertPem.map { clientCert =>
require(KubernetesFileUtils.isUriLocalFile(clientCert), "Invalid client certificate PEM URI" +
s" $clientCert: client certificate URI must have no scheme, or scheme file://")
Utils.resolveURI(clientCert).getPath
val driverSubmitClientCertPem = maybeDriverSubmitClientCertPem.map { driverSubmitClientCert =>
require(KubernetesFileUtils.isUriLocalFile(driverSubmitClientCert),
"Invalid client certificate PEM URI $driverSubmitClientCert: client certificate URI must" +
" have no scheme, or scheme file://")
Utils.resolveURI(driverSubmitClientCert).getPath
}
val securityManager = new SparkSecurityManager(resolvedSparkConf)
val storeBasedSslOptions = securityManager.getSSLOptions(DRIVER_SUBMIT_SSL_NAMESPACE)
SslConfigurationParameters(
DriverSubmitSslConfigurationParameters(
storeBasedSslOptions,
isLocalKeyStore,
resolvedKeyPem.map(new File(_)),
isLocalKeyPem,
resolvedServerCertPem.map(new File(_)),
isLocalServerCertPem,
clientCertPem.map(new File(_)))
resolvedDriverSubmitServerCertPem.map(new File(_)),
isLocalDriverSubmitServerCertPem,
driverSubmitClientCertPem.map(new File(_)))
}

private def resolveLocalFile(file: Option[String],
Expand All @@ -275,22 +288,22 @@ private[spark] class SslConfigurationProvider(
maybeKeyStore: Option[String],
maybeTrustStore: Option[String],
maybeKeyPem: Option[String],
maybeServerCertPem: Option[String],
maybeClientCertPem: Option[String]) = {
maybeKeyPem.orElse(maybeServerCertPem).foreach { _ =>
maybeDriverSubmitServerCertPem: Option[String],
maybeSubmitClientCertPem: Option[String]) = {
maybeKeyPem.orElse(maybeDriverSubmitServerCertPem).foreach { _ =>
require(maybeKeyStore.isEmpty,
"Cannot specify server PEM files and key store files; must specify only one or the other.")
}
maybeKeyPem.foreach { _ =>
require(maybeServerCertPem.isDefined,
require(maybeDriverSubmitServerCertPem.isDefined,
"When specifying the key PEM file, the server certificate PEM file must also be provided.")
}
maybeServerCertPem.foreach { _ =>
maybeDriverSubmitServerCertPem.foreach { _ =>
require(maybeKeyPem.isDefined,
"When specifying the server certificate PEM file, the key PEM file must also be provided.")
}
maybeTrustStore.foreach { _ =>
require(maybeClientCertPem.isEmpty,
require(maybeSubmitClientCertPem.isEmpty,
"Cannot specify client cert file and truststore file; must specify only one or the other.")
}
}
Expand All @@ -300,24 +313,25 @@ private[spark] class SslConfigurationProvider(
resolvedScheme == "file" || resolvedScheme == "local"
}

private def buildSslConnectionConfiguration(driverSubmitSslOptions: SslConfigurationParameters):
(Option[X509TrustManager], SSLContext) = {
val maybeTrustStore = driverSubmitSslOptions.clientCertPem.map { certPem =>
private def buildSslConnectionConfiguration(
sslConfigurationParameters: DriverSubmitSslConfigurationParameters)
: (Option[X509TrustManager], SSLContext) = {
val maybeTrustStore = sslConfigurationParameters.submissionClientCertPem.map { certPem =>
PemsToKeyStoreConverter.convertCertPemToTrustStore(
certPem,
driverSubmitSslOptions.storeBasedSslOptions.trustStoreType)
}.orElse(driverSubmitSslOptions.storeBasedSslOptions.trustStore.map { trustStoreFile =>
sslConfigurationParameters.storeBasedSslOptions.trustStoreType)
}.orElse(sslConfigurationParameters.storeBasedSslOptions.trustStore.map { trustStoreFile =>
if (!trustStoreFile.isFile) {
throw new SparkException(s"TrustStore file at ${trustStoreFile.getAbsolutePath}" +
s" does not exist or is not a file.")
}
val trustStore = KeyStore.getInstance(
driverSubmitSslOptions
sslConfigurationParameters
.storeBasedSslOptions
.trustStoreType
.getOrElse(KeyStore.getDefaultType))
Utils.tryWithResource(new FileInputStream(trustStoreFile)) { trustStoreStream =>
val trustStorePassword = driverSubmitSslOptions
val trustStorePassword = sslConfigurationParameters
.storeBasedSslOptions
.trustStorePassword
.map(_.toCharArray)
Expand Down