diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 709cffda9b0a..eece12164684 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -499,15 +499,6 @@ To use a custom metrics.properties for the application master and executors, upd 3.3.0 - - spark.yarn.executor.failuresValidityInterval - (none) - - Defines the validity interval for executor failure tracking. - Executor failures which are older than the validity interval will be ignored. - - 2.0.0 - spark.yarn.submit.waitAppCompletion true @@ -528,6 +519,25 @@ To use a custom metrics.properties for the application master and executors, upd 1.6.0 + + spark.yarn.executor.bindAddress + (executor hostname) + + Hostname or IP address where to bind listening sockets in YARN cluster mode. +
It also allows a different address from the local one to be advertised to other + executors or external systems. + + 4.0.0 + + + spark.yarn.executor.failuresValidityInterval + (none) + + Defines the validity interval for executor failure tracking. + Executor failures which are older than the validity interval will be ignored. + + 2.0.0 + spark.yarn.executor.nodeLabelExpression (none) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 4fa7b66c9e5a..78a511b8e215 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -460,8 +460,9 @@ private[spark] class ApplicationMaster( logInfo { val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt val executorCores = _sparkConf.get(EXECUTOR_CORES) - val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "", - "", executorMemory, executorCores, appId, securityMgr, localResources, + val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, + "", "", "", + executorMemory, executorCores, appId, securityMgr, localResources, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) dummyRunner.launchContextDebugInfo() } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index e3fcf5472f54..484fe275ab53 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -49,6 +49,7 @@ private[yarn] class ExecutorRunnable( sparkConf: SparkConf, masterAddress: String, executorId: String, + bindAddress: String, hostname: String, executorMemory: Int, executorCores: Int, @@ -117,7 +118,7 @@ private[yarn] class ExecutorRunnable( } catch { case ex: Exception => throw new SparkException(s"Exception while starting container ${container.get.getId}" + - s" on host $hostname", ex) + s" on host $hostname ($bindAddress)", ex) } } @@ -189,6 +190,7 @@ private[yarn] class ExecutorRunnable( Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend", "--driver-url", masterAddress, "--executor-id", executorId, + "--bind-address", bindAddress, "--hostname", hostname, "--cores", executorCores.toString, "--app-id", appId, diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 19c06f957318..9df9063fe640 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -85,6 +85,9 @@ private[yarn] class YarnAllocator( @GuardedBy("this") val allocatedContainerToHostMap = new HashMap[ContainerId, String] + @GuardedBy("this") + val allocatedContainerToBindAddressMap = new HashMap[ContainerId, String] + // Containers that we no longer care about. We've either already told the RM to release them or // will on the next heartbeat. Containers get removed from this map after the RM tells us they've // completed. @@ -169,6 +172,8 @@ private[yarn] class YarnAllocator( private val isPythonApp = sparkConf.get(IS_PYTHON_APP) + private val bindAddress = sparkConf.get(EXECUTOR_BIND_ADDRESS) + private val memoryOverheadFactor = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR) private val launcherPool = ThreadUtils.newDaemonCachedThreadPool( @@ -734,12 +739,13 @@ private[yarn] class YarnAllocator( val rpId = getResourceProfileIdFromPriority(container.getPriority) executorIdCounter += 1 val executorHostname = container.getNodeId.getHost + val executorBindAddress = bindAddress.getOrElse(executorHostname) val containerId = container.getId val executorId = executorIdCounter.toString val yarnResourceForRpId = rpIdToYarnResource.get(rpId) assert(container.getResource.getMemorySize >= yarnResourceForRpId.getMemorySize) logInfo(s"Launching container $containerId on host $executorHostname " + - s"for executor with ID $executorId for ResourceProfile Id $rpId") + s"($executorBindAddress) for executor with ID $executorId for ResourceProfile Id $rpId") val rp = rpIdToResourceProfile(rpId) val defaultResources = ResourceProfile.getDefaultProfileExecutorResources(sparkConf) @@ -763,6 +769,7 @@ private[yarn] class YarnAllocator( sparkConf, driverUrl, executorId, + executorBindAddress, executorHostname, containerMem, containerCores, @@ -812,6 +819,7 @@ private[yarn] class YarnAllocator( new HashSet[ContainerId]) containerSet += containerId allocatedContainerToHostMap.put(containerId, executorHostname) + allocatedContainerToBindAddressMap.put(containerId, bindAddress.getOrElse(executorHostname)) launchingExecutorContainerIds.remove(containerId) } getOrUpdateNumExecutorsStartingForRPId(rpId).decrementAndGet() @@ -918,6 +926,7 @@ private[yarn] class YarnAllocator( } allocatedContainerToHostMap.remove(containerId) + allocatedContainerToBindAddressMap.remove(containerId) } containerIdToExecutorIdAndResourceProfileId.remove(containerId).foreach { case (eid, _) => diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index c11961008019..46eaa9e2b6a9 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -339,6 +339,13 @@ package object config extends Logging { .stringConf .createOptional + private[spark] val EXECUTOR_BIND_ADDRESS = + ConfigBuilder("spark.yarn.executor.bindAddress") + .doc("Address where to bind network listen sockets on the executor.") + .version("4.0.0") + .stringConf + .createOptional + /* Unmanaged AM configuration. */ private[spark] val YARN_UNMANAGED_AM = ConfigBuilder("spark.yarn.unmanagedAM.enabled") diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ExecutorRunnableSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ExecutorRunnableSuite.scala index 1ef3c9c410af..70c6c2474b60 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ExecutorRunnableSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ExecutorRunnableSuite.scala @@ -42,6 +42,7 @@ class ExecutorRunnableSuite extends SparkFunSuite { "yarn", "exec-1", "localhost", + "localhost", 1, 1, "application_123_1", diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index f6f2e1b11d58..ec26e6df807b 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -172,7 +172,6 @@ class YarnAllocatorSuite extends SparkFunSuite ContainerStatus.newInstance(containerId, containerState, diagnostics, exitStatus) } - test("single container allocated") { // request a single container and receive it val (handler, _) = createAllocator(1) @@ -185,6 +184,7 @@ class YarnAllocatorSuite extends SparkFunSuite handler.getNumExecutorsRunning should be (1) handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1") + handler.allocatedContainerToBindAddressMap.get(container.getId).get should be ("host1") val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(defaultRPId) hostTocontainer.get("host1").get should contain(container.getId) @@ -362,7 +362,7 @@ class YarnAllocatorSuite extends SparkFunSuite } } - test("container should not be created if requested number if met") { + test("container should not be created if requested number is met") { // request a single container and receive it val (handler, _) = createAllocator(1) handler.updateResourceRequests() @@ -868,4 +868,17 @@ class YarnAllocatorSuite extends SparkFunSuite handler.getNumExecutorsRunning should be(0) handler.getNumExecutorsStarting should be(0) } + + test("use requested bind-address") { + val (handler, _) = createAllocator(maxExecutors = 1, + additionalConfigs = Map(EXECUTOR_BIND_ADDRESS.key -> "0.0.0.0")) + handler.updateResourceRequests() + + val container = createContainer("host1") + handler.handleAllocatedContainers(Array(container)) + + handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1") + handler.allocatedContainerToBindAddressMap.get(container.getId).get should be ("0.0.0.0") + } + }