diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 74158f266558..fa4e40adab48 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -44,7 +44,10 @@ private[spark] class BasicExecutorFeatureStep( .getOrElse(throw new SparkException("Must specify the executor container image")) private val blockManagerPort = kubernetesConf .sparkConf - .getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) + .getInt(BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT) + + require(blockManagerPort == 0 || (1024 <= blockManagerPort && blockManagerPort < 65536), + "port number must be 0 or in [1024, 65535]") private val executorPodNamePrefix = kubernetesConf.resourceNamePrefix @@ -172,14 +175,17 @@ private[spark] class BasicExecutorFeatureStep( .replaceAll(ENV_EXECUTOR_ID, kubernetesConf.executorId)) } - val requiredPorts = Seq( - (BLOCK_MANAGER_PORT_NAME, blockManagerPort)) - .map { case (name, port) => - new ContainerPortBuilder() - .withName(name) - .withContainerPort(port) - .build() - } + // 0 is invalid as kubernetes containerPort request, we shall leave it unmounted + val requiredPorts = if (blockManagerPort != 0) { + Seq( + (BLOCK_MANAGER_PORT_NAME, blockManagerPort)) + .map { case (name, port) => + new ContainerPortBuilder() + .withName(name) + .withContainerPort(port) + .build() + } + } else Nil if (!isDefaultProfile) { if (pod.container != null && pod.container.getResources() != null) { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index 82cbed7509b9..b7c97bb9a640 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -341,6 +341,35 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { assert(!SecretVolumeUtils.podHasVolume(podConfigured.pod, SPARK_CONF_VOLUME_EXEC)) } + test("SPARK-35482: user correct block manager port for executor pods") { + try { + val initPod = SparkPod.initialPod() + val sm = new SecurityManager(baseConf) + val step1 = + new BasicExecutorFeatureStep(newExecutorConf(), sm, defaultProfile) + val containerPort1 = step1.configurePod(initPod).container.getPorts.get(0) + assert(containerPort1.getContainerPort === DEFAULT_BLOCKMANAGER_PORT, + s"should use port no. $DEFAULT_BLOCKMANAGER_PORT as default") + + baseConf.set(BLOCK_MANAGER_PORT, 12345) + val step2 = new BasicExecutorFeatureStep(newExecutorConf(), sm, defaultProfile) + val containerPort2 = step2.configurePod(initPod).container.getPorts.get(0) + assert(containerPort2.getContainerPort === 12345) + + baseConf.set(BLOCK_MANAGER_PORT, 1000) + val e = intercept[IllegalArgumentException] { + new BasicExecutorFeatureStep(newExecutorConf(), sm, defaultProfile) + } + assert(e.getMessage.contains("port number must be 0 or in [1024, 65535]")) + + baseConf.set(BLOCK_MANAGER_PORT, 0) + val step3 = new BasicExecutorFeatureStep(newExecutorConf(), sm, defaultProfile) + assert(step3.configurePod(initPod).container.getPorts.isEmpty, "random port") + } finally { + baseConf.remove(BLOCK_MANAGER_PORT) + } + } + // There is always exactly one controller reference, and it points to the driver pod. private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = { assert(executor.getMetadata.getOwnerReferences.size() === 1)