From b7763a804a85a4d317604a8a3ca9def08b126cf1 Mon Sep 17 00:00:00 2001 From: Hendra Saputra Date: Thu, 7 Sep 2023 23:08:55 +0100 Subject: [PATCH 01/11] Support executor bind address in Yarn executors --- .../scala/org/apache/spark/internal/config/package.scala | 5 +++++ .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 2 +- .../org/apache/spark/deploy/yarn/ExecutorRunnable.scala | 4 +++- .../scala/org/apache/spark/deploy/yarn/YarnAllocator.scala | 4 +++- 4 files changed, 12 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 05b2624b4037..188c1bd7c716 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -253,6 +253,11 @@ package object config { private[spark] val EXECUTOR_ID = ConfigBuilder("spark.executor.id").version("1.2.0").stringConf.createOptional + private[spark] val EXECUTOR_BIND_ADDRESS = ConfigBuilder("spark.executor.bindAddress") + .doc("Address where to bind network listen sockets on the executor.") + .stringConf + .createWithDefault(Utils.localHostName()) + private[spark] val EXECUTOR_CLASS_PATH = ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH) .version("1.0.0") diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 4fa7b66c9e5a..987ac1b1d047 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -461,7 +461,7 @@ private[spark] class ApplicationMaster( val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt val executorCores = _sparkConf.get(EXECUTOR_CORES) val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "", - "", executorMemory, executorCores, appId, securityMgr, localResources, + "", "", executorMemory, executorCores, appId, securityMgr, localResources, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) dummyRunner.launchContextDebugInfo() } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index e3fcf5472f54..a2194b6fdd76 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -49,6 +49,7 @@ private[yarn] class ExecutorRunnable( sparkConf: SparkConf, masterAddress: String, executorId: String, + bindAddress: String, hostname: String, executorMemory: Int, executorCores: Int, @@ -117,7 +118,7 @@ private[yarn] class ExecutorRunnable( } catch { case ex: Exception => throw new SparkException(s"Exception while starting container ${container.get.getId}" + - s" on host $hostname", ex) + s" on host $hostname ($bindAddress)", ex) } } @@ -189,6 +190,7 @@ private[yarn] class ExecutorRunnable( Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend", "--driver-url", masterAddress, "--executor-id", executorId, + "--bindAddress", bindAddress, "--hostname", hostname, "--cores", executorCores.toString, "--app-id", appId, diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 19c06f957318..d58953e5ec3e 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -733,12 +733,13 @@ private[yarn] class YarnAllocator( for (container <- containersToUse) { val rpId = getResourceProfileIdFromPriority(container.getPriority) executorIdCounter += 1 + val executorBindAddress = sparkConf.get(EXECUTOR_BIND_ADDRESS.key, executorHostname) val executorHostname = container.getNodeId.getHost val containerId = container.getId val executorId = executorIdCounter.toString val yarnResourceForRpId = rpIdToYarnResource.get(rpId) assert(container.getResource.getMemorySize >= yarnResourceForRpId.getMemorySize) - logInfo(s"Launching container $containerId on host $executorHostname " + + logInfo(s"Launching container $containerId on host $executorHostname ($executorBindAddress) " + s"for executor with ID $executorId for ResourceProfile Id $rpId") val rp = rpIdToResourceProfile(rpId) @@ -763,6 +764,7 @@ private[yarn] class YarnAllocator( sparkConf, driverUrl, executorId, + executorBindAddress, executorHostname, containerMem, containerCores, From 0d0e77dd3391d57f2d8a46b966a062a4a84fa70c Mon Sep 17 00:00:00 2001 From: Hendra Saputra Date: Thu, 7 Sep 2023 23:10:01 +0100 Subject: [PATCH 02/11] Add documentation for spark.executor.bindAddress --- docs/configuration.md | 15 ++++++++++++++- .../apache/spark/deploy/yarn/YarnAllocator.scala | 2 +- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 8fda9317bc77..56f8afb9d670 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -349,6 +349,19 @@ of the most common options to set are: 3.0.0 + +spark.executor.bindAddress + (local hostname) + + Hostname or IP address where to bind listening sockets. This config overrides the SPARK_LOCAL_IP + environment variable (see below). +
It also allows a different address from the local one to be advertised to other + executors or external systems. This is useful, for example, when running containers with bridged networking. + For this to properly work, the different ports used by the driver (RPC, block manager and UI) need to be + forwarded from the container's host. + + 4.0.0 + spark.extraListeners (none) @@ -3028,7 +3041,7 @@ Apart from these, the following properties are also available, and may be useful For more detail, see the description here.

- This requires one of the following conditions: + This requires one of the following conditions: 1) enabling external shuffle service through spark.shuffle.service.enabled, or 2) enabling shuffle tracking through spark.dynamicAllocation.shuffleTracking.enabled, or 3) enabling shuffle blocks decommission through spark.decommission.enabled and spark.storage.decommission.shuffleBlocks.enabled, or diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index d58953e5ec3e..409dcc79b386 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -733,8 +733,8 @@ private[yarn] class YarnAllocator( for (container <- containersToUse) { val rpId = getResourceProfileIdFromPriority(container.getPriority) executorIdCounter += 1 - val executorBindAddress = sparkConf.get(EXECUTOR_BIND_ADDRESS.key, executorHostname) val executorHostname = container.getNodeId.getHost + val executorBindAddress = sparkConf.get(EXECUTOR_BIND_ADDRESS.key, executorHostname) val containerId = container.getId val executorId = executorIdCounter.toString val yarnResourceForRpId = rpIdToYarnResource.get(rpId) From 912dee7489d1df37361889f8fc8da5dbd7da4af3 Mon Sep 17 00:00:00 2001 From: Hendra Saputra Date: Fri, 8 Sep 2023 12:16:42 +0100 Subject: [PATCH 03/11] Update ExecutorRunnable test suite --- .../org/apache/spark/deploy/yarn/ExecutorRunnableSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ExecutorRunnableSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ExecutorRunnableSuite.scala index 1ef3c9c410af..70c6c2474b60 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ExecutorRunnableSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ExecutorRunnableSuite.scala @@ -42,6 +42,7 @@ class ExecutorRunnableSuite extends SparkFunSuite { "yarn", "exec-1", "localhost", + "localhost", 1, 1, "application_123_1", From 168f32dab26febaa4ad73bd2456b0056aba1b613 Mon Sep 17 00:00:00 2001 From: Hendra Saputra Date: Fri, 8 Sep 2023 20:36:03 +0100 Subject: [PATCH 04/11] Fix scala style --- .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 5 +++-- .../scala/org/apache/spark/deploy/yarn/YarnAllocator.scala | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 987ac1b1d047..78a511b8e215 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -460,8 +460,9 @@ private[spark] class ApplicationMaster( logInfo { val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt val executorCores = _sparkConf.get(EXECUTOR_CORES) - val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "", - "", "", executorMemory, executorCores, appId, securityMgr, localResources, + val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, + "", "", "", + executorMemory, executorCores, appId, securityMgr, localResources, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) dummyRunner.launchContextDebugInfo() } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 409dcc79b386..88c1fd0bbcec 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -739,8 +739,8 @@ private[yarn] class YarnAllocator( val executorId = executorIdCounter.toString val yarnResourceForRpId = rpIdToYarnResource.get(rpId) assert(container.getResource.getMemorySize >= yarnResourceForRpId.getMemorySize) - logInfo(s"Launching container $containerId on host $executorHostname ($executorBindAddress) " + - s"for executor with ID $executorId for ResourceProfile Id $rpId") + logInfo(s"Launching container $containerId on host $executorHostname " + + s"($executorBindAddress) for executor with ID $executorId for ResourceProfile Id $rpId") val rp = rpIdToResourceProfile(rpId) val defaultResources = ResourceProfile.getDefaultProfileExecutorResources(sparkConf) From fee951d338e2902a74c5086cc70292f4f378722d Mon Sep 17 00:00:00 2001 From: Hendra Saputra Date: Sun, 10 Sep 2023 20:23:01 +0100 Subject: [PATCH 05/11] Fix wrong argument name for --bind-address --- .../scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index a2194b6fdd76..484fe275ab53 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -190,7 +190,7 @@ private[yarn] class ExecutorRunnable( Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend", "--driver-url", masterAddress, "--executor-id", executorId, - "--bindAddress", bindAddress, + "--bind-address", bindAddress, "--hostname", hostname, "--cores", executorCores.toString, "--app-id", appId, From cb91f03545215b0f6ef5a482bcb0deb29cf47126 Mon Sep 17 00:00:00 2001 From: Hendra Saputra Date: Sun, 10 Sep 2023 22:35:07 +0100 Subject: [PATCH 06/11] Move config specific to Yarn cluster mode and add test --- .../spark/internal/config/package.scala | 5 ---- docs/configuration.md | 13 --------- docs/running-on-yarn.md | 28 +++++++++++++------ .../spark/deploy/yarn/YarnAllocator.scala | 9 +++++- .../org/apache/spark/deploy/yarn/config.scala | 7 +++++ .../deploy/yarn/YarnAllocatorSuite.scala | 17 +++++++++-- 6 files changed, 49 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 188c1bd7c716..05b2624b4037 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -253,11 +253,6 @@ package object config { private[spark] val EXECUTOR_ID = ConfigBuilder("spark.executor.id").version("1.2.0").stringConf.createOptional - private[spark] val EXECUTOR_BIND_ADDRESS = ConfigBuilder("spark.executor.bindAddress") - .doc("Address where to bind network listen sockets on the executor.") - .stringConf - .createWithDefault(Utils.localHostName()) - private[spark] val EXECUTOR_CLASS_PATH = ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH) .version("1.0.0") diff --git a/docs/configuration.md b/docs/configuration.md index 56f8afb9d670..23ed755ed609 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -349,19 +349,6 @@ of the most common options to set are: 3.0.0 - -spark.executor.bindAddress - (local hostname) - - Hostname or IP address where to bind listening sockets. This config overrides the SPARK_LOCAL_IP - environment variable (see below). -
It also allows a different address from the local one to be advertised to other - executors or external systems. This is useful, for example, when running containers with bridged networking. - For this to properly work, the different ports used by the driver (RPC, block manager and UI) need to be - forwarded from the container's host. - - 4.0.0 - spark.extraListeners (none) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 709cffda9b0a..eece12164684 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -499,15 +499,6 @@ To use a custom metrics.properties for the application master and executors, upd 3.3.0 - - spark.yarn.executor.failuresValidityInterval - (none) - - Defines the validity interval for executor failure tracking. - Executor failures which are older than the validity interval will be ignored. - - 2.0.0 - spark.yarn.submit.waitAppCompletion true @@ -528,6 +519,25 @@ To use a custom metrics.properties for the application master and executors, upd 1.6.0 + + spark.yarn.executor.bindAddress + (executor hostname) + + Hostname or IP address where to bind listening sockets in YARN cluster mode. +
It also allows a different address from the local one to be advertised to other + executors or external systems. + + 4.0.0 + + + spark.yarn.executor.failuresValidityInterval + (none) + + Defines the validity interval for executor failure tracking. + Executor failures which are older than the validity interval will be ignored. + + 2.0.0 + spark.yarn.executor.nodeLabelExpression (none) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 88c1fd0bbcec..9df9063fe640 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -85,6 +85,9 @@ private[yarn] class YarnAllocator( @GuardedBy("this") val allocatedContainerToHostMap = new HashMap[ContainerId, String] + @GuardedBy("this") + val allocatedContainerToBindAddressMap = new HashMap[ContainerId, String] + // Containers that we no longer care about. We've either already told the RM to release them or // will on the next heartbeat. Containers get removed from this map after the RM tells us they've // completed. @@ -169,6 +172,8 @@ private[yarn] class YarnAllocator( private val isPythonApp = sparkConf.get(IS_PYTHON_APP) + private val bindAddress = sparkConf.get(EXECUTOR_BIND_ADDRESS) + private val memoryOverheadFactor = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR) private val launcherPool = ThreadUtils.newDaemonCachedThreadPool( @@ -734,7 +739,7 @@ private[yarn] class YarnAllocator( val rpId = getResourceProfileIdFromPriority(container.getPriority) executorIdCounter += 1 val executorHostname = container.getNodeId.getHost - val executorBindAddress = sparkConf.get(EXECUTOR_BIND_ADDRESS.key, executorHostname) + val executorBindAddress = bindAddress.getOrElse(executorHostname) val containerId = container.getId val executorId = executorIdCounter.toString val yarnResourceForRpId = rpIdToYarnResource.get(rpId) @@ -814,6 +819,7 @@ private[yarn] class YarnAllocator( new HashSet[ContainerId]) containerSet += containerId allocatedContainerToHostMap.put(containerId, executorHostname) + allocatedContainerToBindAddressMap.put(containerId, bindAddress.getOrElse(executorHostname)) launchingExecutorContainerIds.remove(containerId) } getOrUpdateNumExecutorsStartingForRPId(rpId).decrementAndGet() @@ -920,6 +926,7 @@ private[yarn] class YarnAllocator( } allocatedContainerToHostMap.remove(containerId) + allocatedContainerToBindAddressMap.remove(containerId) } containerIdToExecutorIdAndResourceProfileId.remove(containerId).foreach { case (eid, _) => diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index c11961008019..46eaa9e2b6a9 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -339,6 +339,13 @@ package object config extends Logging { .stringConf .createOptional + private[spark] val EXECUTOR_BIND_ADDRESS = + ConfigBuilder("spark.yarn.executor.bindAddress") + .doc("Address where to bind network listen sockets on the executor.") + .version("4.0.0") + .stringConf + .createOptional + /* Unmanaged AM configuration. */ private[spark] val YARN_UNMANAGED_AM = ConfigBuilder("spark.yarn.unmanagedAM.enabled") diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index f6f2e1b11d58..ec26e6df807b 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -172,7 +172,6 @@ class YarnAllocatorSuite extends SparkFunSuite ContainerStatus.newInstance(containerId, containerState, diagnostics, exitStatus) } - test("single container allocated") { // request a single container and receive it val (handler, _) = createAllocator(1) @@ -185,6 +184,7 @@ class YarnAllocatorSuite extends SparkFunSuite handler.getNumExecutorsRunning should be (1) handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1") + handler.allocatedContainerToBindAddressMap.get(container.getId).get should be ("host1") val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(defaultRPId) hostTocontainer.get("host1").get should contain(container.getId) @@ -362,7 +362,7 @@ class YarnAllocatorSuite extends SparkFunSuite } } - test("container should not be created if requested number if met") { + test("container should not be created if requested number is met") { // request a single container and receive it val (handler, _) = createAllocator(1) handler.updateResourceRequests() @@ -868,4 +868,17 @@ class YarnAllocatorSuite extends SparkFunSuite handler.getNumExecutorsRunning should be(0) handler.getNumExecutorsStarting should be(0) } + + test("use requested bind-address") { + val (handler, _) = createAllocator(maxExecutors = 1, + additionalConfigs = Map(EXECUTOR_BIND_ADDRESS.key -> "0.0.0.0")) + handler.updateResourceRequests() + + val container = createContainer("host1") + handler.handleAllocatedContainers(Array(container)) + + handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1") + handler.allocatedContainerToBindAddressMap.get(container.getId).get should be ("0.0.0.0") + } + } From 9345a9c4aa87a4a2e7da48281eba1c17d7577290 Mon Sep 17 00:00:00 2001 From: Hendra Saputra Date: Mon, 18 Sep 2023 22:34:56 +0100 Subject: [PATCH 07/11] Restore extra whitespace in configuration.md --- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index 23ed755ed609..8fda9317bc77 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -3028,7 +3028,7 @@ Apart from these, the following properties are also available, and may be useful For more detail, see the description here.

- This requires one of the following conditions: + This requires one of the following conditions: 1) enabling external shuffle service through spark.shuffle.service.enabled, or 2) enabling shuffle tracking through spark.dynamicAllocation.shuffleTracking.enabled, or 3) enabling shuffle blocks decommission through spark.decommission.enabled and spark.storage.decommission.shuffleBlocks.enabled, or From 2499b1d077f8d1f2c26d9e40fe275553a60cd4a7 Mon Sep 17 00:00:00 2001 From: Hendra Saputra Date: Thu, 7 Sep 2023 23:08:55 +0100 Subject: [PATCH 08/11] Support executor bind address in Yarn executors --- .../scala/org/apache/spark/internal/config/package.scala | 5 +++++ .../scala/org/apache/spark/deploy/yarn/YarnAllocator.scala | 1 + 2 files changed, 6 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 05b2624b4037..188c1bd7c716 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -253,6 +253,11 @@ package object config { private[spark] val EXECUTOR_ID = ConfigBuilder("spark.executor.id").version("1.2.0").stringConf.createOptional + private[spark] val EXECUTOR_BIND_ADDRESS = ConfigBuilder("spark.executor.bindAddress") + .doc("Address where to bind network listen sockets on the executor.") + .stringConf + .createWithDefault(Utils.localHostName()) + private[spark] val EXECUTOR_CLASS_PATH = ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH) .version("1.0.0") diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 9df9063fe640..13f11b5a2135 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -738,6 +738,7 @@ private[yarn] class YarnAllocator( for (container <- containersToUse) { val rpId = getResourceProfileIdFromPriority(container.getPriority) executorIdCounter += 1 + val executorBindAddress = sparkConf.get(EXECUTOR_BIND_ADDRESS.key, executorHostname) val executorHostname = container.getNodeId.getHost val executorBindAddress = bindAddress.getOrElse(executorHostname) val containerId = container.getId From 79a2c969adc7702efdedb18fe48cc23aadfa2f74 Mon Sep 17 00:00:00 2001 From: Hendra Saputra Date: Thu, 7 Sep 2023 23:10:01 +0100 Subject: [PATCH 09/11] Add documentation for spark.executor.bindAddress --- docs/configuration.md | 15 ++++++++++++++- .../apache/spark/deploy/yarn/YarnAllocator.scala | 1 - 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 8fda9317bc77..56f8afb9d670 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -349,6 +349,19 @@ of the most common options to set are: 3.0.0 + +spark.executor.bindAddress + (local hostname) + + Hostname or IP address where to bind listening sockets. This config overrides the SPARK_LOCAL_IP + environment variable (see below). +
It also allows a different address from the local one to be advertised to other + executors or external systems. This is useful, for example, when running containers with bridged networking. + For this to properly work, the different ports used by the driver (RPC, block manager and UI) need to be + forwarded from the container's host. + + 4.0.0 + spark.extraListeners (none) @@ -3028,7 +3041,7 @@ Apart from these, the following properties are also available, and may be useful For more detail, see the description here.

- This requires one of the following conditions: + This requires one of the following conditions: 1) enabling external shuffle service through spark.shuffle.service.enabled, or 2) enabling shuffle tracking through spark.dynamicAllocation.shuffleTracking.enabled, or 3) enabling shuffle blocks decommission through spark.decommission.enabled and spark.storage.decommission.shuffleBlocks.enabled, or diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 13f11b5a2135..9df9063fe640 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -738,7 +738,6 @@ private[yarn] class YarnAllocator( for (container <- containersToUse) { val rpId = getResourceProfileIdFromPriority(container.getPriority) executorIdCounter += 1 - val executorBindAddress = sparkConf.get(EXECUTOR_BIND_ADDRESS.key, executorHostname) val executorHostname = container.getNodeId.getHost val executorBindAddress = bindAddress.getOrElse(executorHostname) val containerId = container.getId From b40b2348d55db2d029ad933b9249c4a35e862498 Mon Sep 17 00:00:00 2001 From: Hendra Saputra Date: Sun, 10 Sep 2023 22:35:07 +0100 Subject: [PATCH 10/11] Move config specific to Yarn cluster mode and add test --- .../org/apache/spark/internal/config/package.scala | 5 ----- docs/configuration.md | 13 ------------- 2 files changed, 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 188c1bd7c716..05b2624b4037 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -253,11 +253,6 @@ package object config { private[spark] val EXECUTOR_ID = ConfigBuilder("spark.executor.id").version("1.2.0").stringConf.createOptional - private[spark] val EXECUTOR_BIND_ADDRESS = ConfigBuilder("spark.executor.bindAddress") - .doc("Address where to bind network listen sockets on the executor.") - .stringConf - .createWithDefault(Utils.localHostName()) - private[spark] val EXECUTOR_CLASS_PATH = ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH) .version("1.0.0") diff --git a/docs/configuration.md b/docs/configuration.md index 56f8afb9d670..23ed755ed609 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -349,19 +349,6 @@ of the most common options to set are: 3.0.0 - -spark.executor.bindAddress - (local hostname) - - Hostname or IP address where to bind listening sockets. This config overrides the SPARK_LOCAL_IP - environment variable (see below). -
It also allows a different address from the local one to be advertised to other - executors or external systems. This is useful, for example, when running containers with bridged networking. - For this to properly work, the different ports used by the driver (RPC, block manager and UI) need to be - forwarded from the container's host. - - 4.0.0 - spark.extraListeners (none) From dad8c5d610e57c2bb2e130b20c06e3448404a7b5 Mon Sep 17 00:00:00 2001 From: Hendra Saputra Date: Mon, 18 Sep 2023 22:34:56 +0100 Subject: [PATCH 11/11] Restore extra whitespace in configuration.md --- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index 23ed755ed609..8fda9317bc77 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -3028,7 +3028,7 @@ Apart from these, the following properties are also available, and may be useful For more detail, see the description here.

- This requires one of the following conditions: + This requires one of the following conditions: 1) enabling external shuffle service through spark.shuffle.service.enabled, or 2) enabling shuffle tracking through spark.dynamicAllocation.shuffleTracking.enabled, or 3) enabling shuffle blocks decommission through spark.decommission.enabled and spark.storage.decommission.shuffleBlocks.enabled, or