Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Removed traits that have only a single implementation
  • Loading branch information
liyinan926 committed Dec 23, 2017
commit c21fdcf277f031d8a2d754df22e18f5d81881eaf
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,7 @@ import org.apache.spark.deploy.k8s.Constants._
* This is separated out from the init-container steps API because this component can be reused to
* set up the init-container for executors as well.
*/
private[spark] trait InitContainerBootstrap {
/**
* Bootstraps an init-container that downloads dependencies to be used by a main container.
*/
def bootstrapInitContainer(original: PodWithDetachedInitContainer): PodWithDetachedInitContainer
}

private[spark] class InitContainerBootstrapImpl(
private[spark] class InitContainerBootstrap(
initContainerImage: String,
imagePullPolicy: String,
jarsDownloadPath: String,
Expand All @@ -44,10 +37,12 @@ private[spark] class InitContainerBootstrapImpl(
configMapName: String,
configMapKey: String,
sparkRole: String,
sparkConf: SparkConf)
extends InitContainerBootstrap {
sparkConf: SparkConf) {

override def bootstrapInitContainer(
/**
* Bootstraps an init-container that downloads dependencies to be used by a main container.
*/
def bootstrapInitContainer(
original: PodWithDetachedInitContainer): PodWithDetachedInitContainer = {
val sharedVolumeMounts = Seq[VolumeMount](
new VolumeMountBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBui
/**
* Bootstraps a driver or executor container or an init-container with needed secrets mounted.
*/
private[spark] trait MountSecretsBootstrap {
private[spark] class MountSecretsBootstrap(secretNamesToMountPaths: Map[String, String]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand this class correctly, it seems like what you're trying to do here is to inject this logic into different steps that require this functionality. So the code that instantiates those steps needs to know about this dependency, and needs to know how to create both objects. Then the step implementation has to call this code.

Instead, wouldn't it be cleaner to make the step inherit this functionality?

e.g.

trait MountSecretsBootstrap(args) {
  def mountSecrets(...) { }
}

class InitContainerMountSecretsStep extends InitContainerConfigurationStep with MountSecretsBootstrap {

}

The same comment could be made about the init container boostrap.

But in both cases, I'm not sure this would work right now on the executor side, because as I mentioned it doesn't really use the same abstraction as the driver side. Which is kinda one of the problems with the current class hierarchy here...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created https://issues.apache.org/jira/browse/SPARK-22839 to keep track of the refactoring work.


/**
* Mounts Kubernetes secrets as secret volumes into the given container in the given pod.
Expand All @@ -30,13 +30,7 @@ private[spark] trait MountSecretsBootstrap {
* @param container the container into which the secret volumes are being mounted.
* @return the updated pod and container with the secrets mounted.
*/
def mountSecrets(pod: Pod, container: Container): (Pod, Container)
}

private[spark] class MountSecretsBootstrapImpl(
secretNamesToMountPaths: Map[String, String]) extends MountSecretsBootstrap {

override def mountSecrets(pod: Pod, container: Container): (Pod, Container) = {
def mountSecrets(pod: Pod, container: Container): (Pod, Container) = {
var podBuilder = new PodBuilder(pod)
secretNamesToMountPaths.keys.foreach { name =>
podBuilder = podBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.UUID
import com.google.common.primitives.Longs

import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.{ConfigurationUtils, MountSecretsBootstrapImpl}
import org.apache.spark.deploy.k8s.{ConfigurationUtils, MountSecretsBootstrap}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.steps._
Expand Down Expand Up @@ -148,8 +148,7 @@ private[spark] class DriverConfigOrchestrator(
}

val mayBeMountSecretsStep = if (secretNamesToMountPaths.nonEmpty) {
val mountSecretsBootstrap = new MountSecretsBootstrapImpl(secretNamesToMountPaths)
Some(new DriverMountSecretsStep(mountSecretsBootstrap))
Some(new DriverMountSecretsStep(new MountSecretsBootstrap(secretNamesToMountPaths)))
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.deploy.k8s.submit.steps.initcontainer

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.k8s.{ConfigurationUtils, InitContainerBootstrapImpl, MountSecretsBootstrapImpl}
import org.apache.spark.deploy.k8s.{ConfigurationUtils, InitContainerBootstrap, MountSecretsBootstrap}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._

Expand Down Expand Up @@ -45,7 +45,7 @@ private[spark] class InitContainerConfigOrchestrator(
private val downloadTimeoutMinutes = sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT)

def getAllConfigurationSteps: Seq[InitContainerConfigurationStep] = {
val initContainerBootstrap = new InitContainerBootstrapImpl(
val initContainerBootstrap = new InitContainerBootstrap(
initContainerImage,
imagePullPolicy,
jarsDownloadPath,
Expand All @@ -71,8 +71,7 @@ private[spark] class InitContainerConfigOrchestrator(
// because the init-container is sort of an implementation details and this sharing
// avoids introducing a dedicated configuration property just for the init-container.
val maybeMountSecretsStep = if (secretNamesToMountPaths.nonEmpty) {
val mountSecretsBootstrap = new MountSecretsBootstrapImpl(secretNamesToMountPaths)
Some(new InitContainerMountSecretsStep(mountSecretsBootstrap))
Some(new InitContainerMountSecretsStep(new MountSecretsBootstrap(secretNamesToMountPaths)))
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,11 @@ import org.apache.spark.util.Utils
/**
* A factory class for configuring and creating executor pods.
*/
private[spark] trait ExecutorPodFactory {

/**
* Configure and construct an executor pod with the given parameters.
*/
def createExecutorPod(
executorId: String,
applicationId: String,
driverUrl: String,
executorEnvs: Seq[(String, String)],
driverPod: Pod,
nodeToLocalTaskCount: Map[String, Int]): Pod
}

private[spark] class ExecutorPodFactoryImpl(
private[spark] class ExecutorPodFactory(
sparkConf: SparkConf,
mountSecretsBootstrap: Option[MountSecretsBootstrap],
initContainerBootstrap: Option[InitContainerBootstrap],
initContainerMountSecretsBootstrap: Option[MountSecretsBootstrap])
extends ExecutorPodFactory {
initContainerMountSecretsBootstrap: Option[MountSecretsBootstrap]) {

private val executorExtraClasspath = sparkConf.get(EXECUTOR_CLASS_PATH)

Expand Down Expand Up @@ -99,7 +84,10 @@ private[spark] class ExecutorPodFactoryImpl(
private val executorCores = sparkConf.getDouble("spark.executor.cores", 1)
private val executorLimitCores = sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES)

override def createExecutorPod(
/**
* Configure and construct an executor pod with the given parameters.
*/
def createExecutorPod(
executorId: String,
applicationId: String,
driverUrl: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.File
import io.fabric8.kubernetes.client.Config

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.deploy.k8s.{ConfigurationUtils, InitContainerBootstrapImpl, MountSecretsBootstrapImpl, SparkKubernetesClientFactory}
import org.apache.spark.deploy.k8s.{ConfigurationUtils, InitContainerBootstrap, MountSecretsBootstrap, SparkKubernetesClientFactory}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -69,7 +69,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
.get(INIT_CONTAINER_IMAGE)
.getOrElse(throw new SparkException(
"Must specify the init-container image when there are remote dependencies"))
new InitContainerBootstrapImpl(
new InitContainerBootstrap(
initContainerImage,
sparkConf.get(CONTAINER_IMAGE_PULL_POLICY),
sparkConf.get(JARS_DOWNLOAD_LOCATION),
Expand All @@ -84,7 +84,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
val executorSecretNamesToMountPaths = ConfigurationUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_EXECUTOR_SECRETS_PREFIX)
val mayBeMountSecretBootstrap = if (executorSecretNamesToMountPaths.nonEmpty) {
Some(new MountSecretsBootstrapImpl(executorSecretNamesToMountPaths))
Some(new MountSecretsBootstrap(executorSecretNamesToMountPaths))
} else {
None
}
Expand All @@ -95,7 +95,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
// avoids introducing a dedicated configuration property just for the init-container.
val mayBeInitContainerMountSecretsBootstrap = if (maybeInitContainerBootstrap.nonEmpty &&
executorSecretNamesToMountPaths.nonEmpty) {
Some(new MountSecretsBootstrapImpl(executorSecretNamesToMountPaths))
Some(new MountSecretsBootstrap(executorSecretNamesToMountPaths))
} else {
None
}
Expand All @@ -108,7 +108,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))

val executorPodFactory = new ExecutorPodFactoryImpl(
val executorPodFactory = new ExecutorPodFactory(
sparkConf,
mayBeMountSecretBootstrap,
maybeInitContainerBootstrap,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.deploy.k8s.submit.steps

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.MountSecretsBootstrapImpl
import org.apache.spark.deploy.k8s.MountSecretsBootstrap
import org.apache.spark.deploy.k8s.submit.{KubernetesDriverSpec, SecretVolumeUtils}

class DriverMountSecretsStepSuite extends SparkFunSuite {
Expand All @@ -32,7 +32,7 @@ class DriverMountSecretsStepSuite extends SparkFunSuite {
SECRET_FOO -> SECRET_MOUNT_PATH,
SECRET_BAR -> SECRET_MOUNT_PATH)

val mountSecretsBootstrap = new MountSecretsBootstrapImpl(secretNamesToMountPaths)
val mountSecretsBootstrap = new MountSecretsBootstrap(secretNamesToMountPaths)
val mountSecretsStep = new DriverMountSecretsStep(mountSecretsBootstrap)
val configuredDriverSpec = mountSecretsStep.configureDriver(baseDriverSpec)
val driverPodWithSecretsMounted = configuredDriverSpec.driverPod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.deploy.k8s.submit.steps.initcontainer
import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder}

import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.k8s.MountSecretsBootstrapImpl
import org.apache.spark.deploy.k8s.MountSecretsBootstrap
import org.apache.spark.deploy.k8s.submit.SecretVolumeUtils

class InitContainerMountSecretsStepSuite extends SparkFunSuite {
Expand All @@ -40,7 +40,7 @@ class InitContainerMountSecretsStepSuite extends SparkFunSuite {
SECRET_FOO -> SECRET_MOUNT_PATH,
SECRET_BAR -> SECRET_MOUNT_PATH)

val mountSecretsBootstrap = new MountSecretsBootstrapImpl(secretNamesToMountPaths)
val mountSecretsBootstrap = new MountSecretsBootstrap(secretNamesToMountPaths)
val initContainerMountSecretsStep = new InitContainerMountSecretsStep(mountSecretsBootstrap)
val configuredInitContainerSpec = initContainerMountSecretsStep.configureInitContainer(
baseInitContainerSpec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfter, BeforeAndAfterEach}

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{InitContainerBootstrap, MountSecretsBootstrapImpl, PodWithDetachedInitContainer}
import org.apache.spark.deploy.k8s.{InitContainerBootstrap, MountSecretsBootstrap, PodWithDetachedInitContainer}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._

Expand Down Expand Up @@ -58,7 +58,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
}

test("basic executor pod has reasonable defaults") {
val factory = new ExecutorPodFactoryImpl(baseConf, None, None, None)
val factory = new ExecutorPodFactory(baseConf, None, None, None)
val executor = factory.createExecutorPod(
"1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())

Expand Down Expand Up @@ -89,7 +89,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
conf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX,
"loremipsumdolorsitametvimatelitrefficiendisuscipianturvixlegeresple")

val factory = new ExecutorPodFactoryImpl(conf, None, None, None)
val factory = new ExecutorPodFactory(conf, None, None, None)
val executor = factory.createExecutorPod(
"1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]())

Expand All @@ -101,7 +101,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
conf.set(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS, "foo=bar")
conf.set(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH, "bar=baz")

val factory = new ExecutorPodFactoryImpl(conf, None, None, None)
val factory = new ExecutorPodFactory(conf, None, None, None)
val executor = factory.createExecutorPod(
"1", "dummy", "dummy", Seq[(String, String)]("qux" -> "quux"), driverPod, Map[String, Int]())

Expand All @@ -115,8 +115,8 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
test("executor secrets get mounted") {
val conf = baseConf.clone()

val secretsBootstrap = new MountSecretsBootstrapImpl(Map("secret1" -> "/var/secret1"))
val factory = new ExecutorPodFactoryImpl(
val secretsBootstrap = new MountSecretsBootstrap(Map("secret1" -> "/var/secret1"))
val factory = new ExecutorPodFactory(
conf,
Some(secretsBootstrap),
None,
Expand Down Expand Up @@ -144,7 +144,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
when(initContainerBootstrap.bootstrapInitContainer(
any(classOf[PodWithDetachedInitContainer]))).thenAnswer(AdditionalAnswers.returnsFirstArg())

val factory = new ExecutorPodFactoryImpl(
val factory = new ExecutorPodFactory(
conf,
None,
Some(initContainerBootstrap),
Expand All @@ -161,9 +161,9 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
val initContainerBootstrap = mock(classOf[InitContainerBootstrap])
when(initContainerBootstrap.bootstrapInitContainer(
any(classOf[PodWithDetachedInitContainer]))).thenAnswer(AdditionalAnswers.returnsFirstArg())
val secretsBootstrap = new MountSecretsBootstrapImpl(Map("secret1" -> "/var/secret1"))
val secretsBootstrap = new MountSecretsBootstrap(Map("secret1" -> "/var/secret1"))

val factory = new ExecutorPodFactoryImpl(
val factory = new ExecutorPodFactory(
conf,
None,
Some(initContainerBootstrap),
Expand Down