Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Provide unit tests for SslConfigurationProvider.
  • Loading branch information
mccheah committed Mar 10, 2017
commit e1ff66c3032a5bb318ab53d614b1de4a701f33e8
10 changes: 10 additions & 0 deletions resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@
<artifactId>guava</artifactId>
</dependency>
<!-- End of shaded deps. -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import javax.net.ssl.{SSLContext, TrustManagerFactory, X509TrustManager}

import com.google.common.base.Charsets
import com.google.common.io.{BaseEncoding, Files}
import io.fabric8.kubernetes.api.model.{EnvVar, EnvVarBuilder, Secret, Volume, VolumeBuilder, VolumeMount, VolumeMountBuilder}
import io.fabric8.kubernetes.api.model.{EnvVar, EnvVarBuilder, Secret, SecretBuilder, Volume, VolumeBuilder, VolumeMount, VolumeMountBuilder}
import io.fabric8.kubernetes.client.KubernetesClient
import scala.collection.JavaConverters._
import scala.collection.mutable
Expand Down Expand Up @@ -112,13 +112,14 @@ private[spark] class SslConfigurationProvider(
.withReadOnly(true)
.withMountPath(sslSecretsDirectory)
.build()
val sslSecrets = kubernetesClient.secrets().createNew()
val sslSecretsModel = new SecretBuilder()
.withNewMetadata()
.withName(sslSecretsName)
.endMetadata()
.withData(sslSecretsMap.asJava)
.withType("Opaque")
.done()
.build()
val sslSecrets = kubernetesClient.secrets().create(sslSecretsModel)
kubernetesResourceCleaner.registerOrUpdateResource(sslSecrets)
secrets += sslSecrets
val (driverSubmitClientTrustManager, driverSubmitClientSslContext) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.integrationtest.sslutil
package org.apache.spark.deploy.kubernetes

import java.io.{File, FileOutputStream}
import java.math.BigInteger
Expand Down Expand Up @@ -61,7 +61,8 @@ private[spark] object SSLUtils {
keyStore.load(null, null)
keyStore.setKeyEntry("key", keyPair.getPrivate,
keyPassword.toCharArray, Array(jcaCertificate))
val tempDir = Files.createTempDirectory("temp-ssl-stores").toFile()
val tempDir = Files.createTempDirectory("temp-ssl-stores").toFile
tempDir.deleteOnExit()
tempDir.deleteOnExit()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to call deleteOnExit twice at line 65 and 66?

val keyStoreFile = new File(tempDir, "keyStore.jks")
Utils.tryWithResource(new FileOutputStream(keyStoreFile)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes

import java.io.FileInputStream
import java.nio.file.Files
import java.security.KeyStore
import javax.net.ssl.SSLContext

import com.google.common.base.Charsets
import com.google.common.io.{BaseEncoding, Files => GuavaFiles}
import io.fabric8.kubernetes.api.model.{DoneableSecret, EnvVar, EnvVarBuilder, Secret, SecretBuilder, SecretList, Volume, VolumeBuilder, VolumeMount, VolumeMountBuilder}
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.{MixedOperation, Resource}
import org.mockito.Matchers.any
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.BeforeAndAfter
import org.scalatest.mock.MockitoSugar.{mock => simpleMock}

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.util.Utils

private[spark] class SslConfigurationProviderSuite extends SparkFunSuite with BeforeAndAfter {

private type SecretResource = Resource[Secret, DoneableSecret]
private type SecretsHandler = MixedOperation[Secret, SecretList, DoneableSecret, SecretResource]

private val APP_ID = "app-id"
private val KEYSTORE_PASSWORD = "keystore"
private val KEY_PASSWORD = "key"
private val TRUSTSTORE_PASSWORD = "truststore"
private val IP_ADDRESS = "192.168.99.100"
private val SSL_SECRET_DIR = s"$DRIVER_CONTAINER_SECRETS_BASE_DIR/$APP_ID-ssl"

private val sslFolder = Files.createTempDirectory("ssl-configuration-provider-suite").toFile
Copy link
Author

@mccheah mccheah Mar 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't quite find an elegant Scalatest equivalent to JUnit's TemporaryFolder Rule. I wonder if there's a better option than manually setting up the directory. We do this in KubernetesSuite as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current approach looks fine to me. I see many tests in core uses Utils.createTempDir, which is similar to this code:

  /**
   * Create a temporary directory inside the given parent directory. The directory will be
   * automatically deleted when the VM shuts down.
   */
  def createTempDir(
      root: String = System.getProperty("java.io.tmpdir"),
      namePrefix: String = "spark"): File = {
    val dir = createDirectory(root, namePrefix)
    ShutdownHookManager.registerShutdownDeleteDir(dir)
    dir
  }

Here's code snippet from SparkContextSuite:

  test("basic case for addFile and listFiles") {
    val dir = Utils.createTempDir()

    val file1 = File.createTempFile("someprefix1", "somesuffix1", dir)
    val absolutePath1 = file1.getAbsolutePath

Maybe we want to switch to this to be consistent. But, I don't have a strong preference here. i.e. I think it's your call.

sslFolder.deleteOnExit()
private val (keyStore, trustStore) = SSLUtils.generateKeyStoreTrustStorePair(
IP_ADDRESS, KEYSTORE_PASSWORD, KEY_PASSWORD, TRUSTSTORE_PASSWORD)

private var sparkConf: SparkConf = _
private var kubernetesClient: KubernetesClient = _
private var kubernetesResourceCleaner: KubernetesResourceCleaner = _
private var secretsHandler: SecretsHandler = _
private var sslConfigurationProvider: SslConfigurationProvider = _

before {
sparkConf = new SparkConf(false)
kubernetesClient = simpleMock[KubernetesClient]
kubernetesResourceCleaner = simpleMock[KubernetesResourceCleaner]
secretsHandler = simpleMock[SecretsHandler]
sslConfigurationProvider = new SslConfigurationProvider(sparkConf, APP_ID, kubernetesClient,
kubernetesResourceCleaner)
when(kubernetesClient.secrets()).thenReturn(secretsHandler)
when(secretsHandler.create(any())).thenAnswer(new Answer[Secret] {
override def answer(invocationOnMock: InvocationOnMock): Secret = {
invocationOnMock.getArgumentAt(0, classOf[Secret])
}
})
}

test("Disabling SSL should return empty components") {
sparkConf.set(DRIVER_SUBMIT_SSL_ENABLED, false)
val configuration = sslConfigurationProvider.getSslConfiguration()
assert(!configuration.sslOptions.enabled, "SSL should not be enabled.")
assert(configuration.driverSubmitClientSslContext == SSLContext.getDefault,
"Should have returned the default SSL context.")
assert(configuration.driverSubmitClientTrustManager.isEmpty, "Trust manager should be absent.")
assert(configuration.sslPodEnvVars.isEmpty, "No environment variables should be defined.")
assert(configuration.sslPodVolumes.isEmpty, "No SSL volumes should be defined.")
assert(configuration.sslPodVolumeMounts.isEmpty, "No SSL volume mounts should be defined.")
assert(configuration.sslSecrets.isEmpty, "No SSL secrets should be defined.")
}

test("Enabling SSL should load a keyStore and trustStore when provided.") {
sparkConf.set(DRIVER_SUBMIT_SSL_ENABLED, true)
sparkConf.set(KUBERNETES_DRIVER_SUBMIT_KEYSTORE, s"file://${keyStore.getAbsolutePath}")
sparkConf.set(KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE, s"file://${trustStore.getAbsolutePath}")
sparkConf.set("spark.ssl.kubernetes.submit.keyStorePassword", KEYSTORE_PASSWORD)
sparkConf.set("spark.ssl.kubernetes.submit.keyPassword", KEY_PASSWORD)
sparkConf.set("spark.ssl.kubernetes.submit.trustStorePassword", TRUSTSTORE_PASSWORD)
sparkConf.set("spark.ssl.kubernetes.submit.keyStoreType", "jks")
val configuration = sslConfigurationProvider.getSslConfiguration()
assert(configuration.sslOptions.enabled, "SSL should be enabled.")
assert(configuration.isKeyStoreLocalFile, "KeyStore should be treated as a local file.")
val maybeReturnedKeyStore = configuration.sslOptions.keyStore
assert(maybeReturnedKeyStore.isDefined, "KeyStore should be in the SSL Options.")
maybeReturnedKeyStore.foreach { returnedKeyStore =>
assert(returnedKeyStore.getAbsolutePath === keyStore.getAbsolutePath,
"KeyStore paths did not match.")
}
val maybeReturnedTrustStore = configuration.sslOptions.trustStore
assert(maybeReturnedTrustStore.isDefined, "TrustStore should be in the SSL Options.")
maybeReturnedTrustStore.foreach { returnedTrustStore =>
assert(returnedTrustStore.getAbsolutePath === trustStore.getAbsolutePath)
}
assertResult(configuration.sslPodEnvVars.toSet,
"Environment vars for SSL did not match,") {
Set[EnvVar](
new EnvVarBuilder()
.withName(ENV_SUBMISSION_KEYSTORE_FILE)
.withValue(s"$SSL_SECRET_DIR/$SUBMISSION_SSL_KEYSTORE_SECRET_NAME")
.build(),
new EnvVarBuilder()
.withName(ENV_SUBMISSION_KEYSTORE_PASSWORD_FILE)
.withValue(s"$SSL_SECRET_DIR/$SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME")
.build(),
new EnvVarBuilder()
.withName(ENV_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE)
.withValue(s"$SSL_SECRET_DIR/$SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME")
.build(),
new EnvVarBuilder()
.withName(ENV_SUBMISSION_KEYSTORE_TYPE)
.withValue("jks")
.build(),
new EnvVarBuilder()
.withName(ENV_SUBMISSION_USE_SSL)
.withValue("true")
.build())
}
val keyStoreBase64 = BaseEncoding.base64().encode(GuavaFiles.toByteArray(keyStore))
val keyPasswordBase64 = BaseEncoding.base64().encode(KEY_PASSWORD.getBytes(Charsets.UTF_8))
val keyStorePasswordBase64 = BaseEncoding
.base64()
.encode(KEYSTORE_PASSWORD.getBytes(Charsets.UTF_8))
val expectedSecret = new SecretBuilder()
.withNewMetadata()
.withName(s"$SUBMISSION_SSL_SECRETS_PREFIX-$APP_ID")
.endMetadata()
.addToData(
SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME,
keyPasswordBase64)
.addToData(
SUBMISSION_SSL_KEYSTORE_SECRET_NAME,
keyStoreBase64)
.addToData(
SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME,
keyStorePasswordBase64)
.withType("Opaque")
.build()
assert(configuration.sslSecrets.toSeq === Seq(expectedSecret),
"SSL secret did not match.")
assertResult(configuration.sslPodVolumes.toSeq, "SSL Volumes are incorrect.") {
Seq[Volume](
new VolumeBuilder()
.withName(SUBMISSION_SSL_SECRETS_VOLUME_NAME)
.withNewSecret()
.withSecretName(s"$SUBMISSION_SSL_SECRETS_PREFIX-$APP_ID")
.endSecret()
.build()
)
}
assertResult(configuration.sslPodVolumeMounts.toSeq, "SSL volume mounts are incorrect.") {
Seq[VolumeMount](
new VolumeMountBuilder()
.withName(SUBMISSION_SSL_SECRETS_VOLUME_NAME)
.withReadOnly(true)
.withMountPath(SSL_SECRET_DIR)
.build()
)
}
assert(configuration.driverSubmitClientTrustManager.isDefined, "Missing trust manager.")
configuration.driverSubmitClientTrustManager.foreach { trustManager =>
val acceptedIssuers = trustManager.getAcceptedIssuers
val inMemoryKeyStore = KeyStore.getInstance("jks")
Utils.tryWithResource(new FileInputStream(keyStore)) { keyStoreStream =>
inMemoryKeyStore.load(keyStoreStream, KEYSTORE_PASSWORD.toCharArray)
val certChain = inMemoryKeyStore.getCertificateChain("key")
val acceptedIssuersBytes = acceptedIssuers.map(_.getEncoded.toSeq)
val certChainEncoded = certChain.map(_.getEncoded.toSeq)
assert(acceptedIssuersBytes.toSeq === certChainEncoded.toSeq,
"Certificates did not match.")
}
}
assert(configuration.driverSubmitClientSslContext.getProtocol === "TLSv1.2",
"SSL context protocol is incorrect.")
verify(kubernetesClient).secrets()
verify(secretsHandler).create(expectedSecret)
}

test("Providing a KeyStore with a local scheme should not mount it in a secret") {
sparkConf.set(DRIVER_SUBMIT_SSL_ENABLED, true)
sparkConf.set(KUBERNETES_DRIVER_SUBMIT_KEYSTORE, s"local:///opt/spark/mykeystore.jks")
val configuration = sslConfigurationProvider.getSslConfiguration()
assert(!configuration.isKeyStoreLocalFile, "KeyStore should not be a local file.")
assert(configuration.sslOptions.keyStore.isDefined, "KeyStore file should be defined.")
configuration.sslOptions.keyStore.foreach { returnedStore =>
assert(returnedStore.getAbsolutePath === "/opt/spark/mykeystore.jks",
"Resolved KeyStore path should match the path of the input URI.")
}
val expectedSecret = new SecretBuilder()
.withNewMetadata()
.withName(s"$SUBMISSION_SSL_SECRETS_PREFIX-$APP_ID")
.endMetadata()
.withType("Opaque")
.build()
assert(configuration.sslSecrets.toSeq === Seq(expectedSecret),
"SSL Secret was incorrect (it should have no data).")
verify(kubernetesClient).secrets()
verify(secretsHandler).create(expectedSecret)
}
}
11 changes: 7 additions & 4 deletions resource-managers/kubernetes/integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-kubernetes_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -106,10 +113,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,12 @@ import scala.collection.JavaConverters._

import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.deploy.SparkSubmit
import org.apache.spark.deploy.kubernetes.Client
import org.apache.spark.deploy.kubernetes.{Client, SSLUtils}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder
import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube
import org.apache.spark.deploy.kubernetes.integrationtest.restapis.SparkRestApiV1
import org.apache.spark.deploy.kubernetes.integrationtest.sslutil.SSLUtils
import org.apache.spark.deploy.rest.kubernetes.ExternalSuppliedUrisDriverServiceManager
import org.apache.spark.status.api.v1.{ApplicationStatus, StageStatus}
import org.apache.spark.util.Utils
Expand Down