Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[SPARK-35482][K8S] Use spark.blockManager.port not wrong spark.blockm…
…anger.port
  • Loading branch information
yaooqinn committed May 21, 2021
commit 5e91b1e776851e7cb50901092417a79144db03e7
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ private[spark] class BasicExecutorFeatureStep(
.getOrElse(throw new SparkException("Must specify the executor container image"))
private val blockManagerPort = kubernetesConf
.sparkConf
.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)
.getInt(BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT)

require(blockManagerPort == 0 || (1024 <= blockManagerPort && blockManagerPort < 65536),
"port number must be 0 or in [1024, 65535]")

private val executorPodNamePrefix = kubernetesConf.resourceNamePrefix

Expand Down Expand Up @@ -172,14 +175,17 @@ private[spark] class BasicExecutorFeatureStep(
.replaceAll(ENV_EXECUTOR_ID, kubernetesConf.executorId))
}

val requiredPorts = Seq(
(BLOCK_MANAGER_PORT_NAME, blockManagerPort))
.map { case (name, port) =>
new ContainerPortBuilder()
.withName(name)
.withContainerPort(port)
.build()
}
// 0 is invalid as kubernetes contain port request, we shall leave it
val requiredPorts = if (blockManagerPort != 0) {
Seq(
(BLOCK_MANAGER_PORT_NAME, blockManagerPort))
.map { case (name, port) =>
new ContainerPortBuilder()
.withName(name)
.withContainerPort(port)
.build()
}
} else Nil

if (!isDefaultProfile) {
if (pod.container != null && pod.container.getResources() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,35 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter {
assert(!SecretVolumeUtils.podHasVolume(podConfigured.pod, SPARK_CONF_VOLUME_EXEC))
}

test("SPARK-35482: block manager port") {
try {
val initPod = SparkPod.initialPod()
val sm = new SecurityManager(baseConf)
val step1 =
new BasicExecutorFeatureStep(newExecutorConf(), sm, defaultProfile)
val containerPort1 = step1.configurePod(initPod).container.getPorts.get(0)
assert(containerPort1.getContainerPort === DEFAULT_BLOCKMANAGER_PORT,
s"should use port no. $DEFAULT_BLOCKMANAGER_PORT as default")

baseConf.set(BLOCK_MANAGER_PORT, 12345)
val step2 = new BasicExecutorFeatureStep(newExecutorConf(), sm, defaultProfile)
val containerPort2 = step2.configurePod(initPod).container.getPorts.get(0)
assert(containerPort2.getContainerPort === 12345)

baseConf.set(BLOCK_MANAGER_PORT, 1000)
val e = intercept[IllegalArgumentException] {
new BasicExecutorFeatureStep(newExecutorConf(), sm, defaultProfile)
}
assert(e.getMessage.contains("port number must be 0 or in [1024, 65535]"))

baseConf.set(BLOCK_MANAGER_PORT, 0)
val step3 = new BasicExecutorFeatureStep(newExecutorConf(), sm, defaultProfile)
assert(step3.configurePod(initPod).container.getPorts.isEmpty, "random port")
} finally {
baseConf.remove(BLOCK_MANAGER_PORT)
}
}

// There is always exactly one controller reference, and it points to the driver pod.
private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = {
assert(executor.getMetadata.getOwnerReferences.size() === 1)
Expand Down