Skip to content
Prev Previous commit
Next Next commit
Move config specific to Yarn cluster mode and add test
  • Loading branch information
Hendra Saputra committed Sep 20, 2023
commit cb91f03545215b0f6ef5a482bcb0deb29cf47126
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,6 @@ package object config {
private[spark] val EXECUTOR_ID =
ConfigBuilder("spark.executor.id").version("1.2.0").stringConf.createOptional

private[spark] val EXECUTOR_BIND_ADDRESS = ConfigBuilder("spark.executor.bindAddress")
.doc("Address where to bind network listen sockets on the executor.")
.stringConf
.createWithDefault(Utils.localHostName())

private[spark] val EXECUTOR_CLASS_PATH =
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH)
.version("1.0.0")
Expand Down
13 changes: 0 additions & 13 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -349,19 +349,6 @@ of the most common options to set are:
</td>
<td>3.0.0</td>
</tr>
<tr>
<td><code>spark.executor.bindAddress</code></td>
<td>(local hostname)</td>
<td>
Hostname or IP address where to bind listening sockets. This config overrides the SPARK_LOCAL_IP
environment variable (see below).
<br />It also allows a different address from the local one to be advertised to other
executors or external systems. This is useful, for example, when running containers with bridged networking.
For this to properly work, the different ports used by the driver (RPC, block manager and UI) need to be
forwarded from the container's host.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.extraListeners</code></td>
<td>(none)</td>
Expand Down
28 changes: 19 additions & 9 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -499,15 +499,6 @@ To use a custom metrics.properties for the application master and executors, upd
</td>
<td>3.3.0</td>
</tr>
<tr>
<td><code>spark.yarn.executor.failuresValidityInterval</code></td>
<td>(none)</td>
<td>
Defines the validity interval for executor failure tracking.
Executor failures which are older than the validity interval will be ignored.
</td>
<td>2.0.0</td>
</tr>
<tr>
<td><code>spark.yarn.submit.waitAppCompletion</code></td>
<td><code>true</code></td>
Expand All @@ -528,6 +519,25 @@ To use a custom metrics.properties for the application master and executors, upd
</td>
<td>1.6.0</td>
</tr>
<tr>
<td><code>spark.yarn.executor.bindAddress</code></td>
<td><code>(executor hostname)</code></td>
<td>
Hostname or IP address where to bind listening sockets in YARN cluster mode.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why YARN cluster mode? The change affects client mode as well, doesn't it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm true, will benefit client mode as well

<br />It also allows a different address from the local one to be advertised to other
executors or external systems.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.yarn.executor.failuresValidityInterval</code></td>
<td>(none)</td>
<td>
Defines the validity interval for executor failure tracking.
Executor failures which are older than the validity interval will be ignored.
</td>
<td>2.0.0</td>
</tr>
<tr>
<td><code>spark.yarn.executor.nodeLabelExpression</code></td>
<td>(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ private[yarn] class YarnAllocator(
@GuardedBy("this")
val allocatedContainerToHostMap = new HashMap[ContainerId, String]

@GuardedBy("this")
val allocatedContainerToBindAddressMap = new HashMap[ContainerId, String]

// Containers that we no longer care about. We've either already told the RM to release them or
// will on the next heartbeat. Containers get removed from this map after the RM tells us they've
// completed.
Expand Down Expand Up @@ -169,6 +172,8 @@ private[yarn] class YarnAllocator(

private val isPythonApp = sparkConf.get(IS_PYTHON_APP)

private val bindAddress = sparkConf.get(EXECUTOR_BIND_ADDRESS)

private val memoryOverheadFactor = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD_FACTOR)

private val launcherPool = ThreadUtils.newDaemonCachedThreadPool(
Expand Down Expand Up @@ -734,7 +739,7 @@ private[yarn] class YarnAllocator(
val rpId = getResourceProfileIdFromPriority(container.getPriority)
executorIdCounter += 1
val executorHostname = container.getNodeId.getHost
val executorBindAddress = sparkConf.get(EXECUTOR_BIND_ADDRESS.key, executorHostname)
val executorBindAddress = bindAddress.getOrElse(executorHostname)
val containerId = container.getId
val executorId = executorIdCounter.toString
val yarnResourceForRpId = rpIdToYarnResource.get(rpId)
Expand Down Expand Up @@ -814,6 +819,7 @@ private[yarn] class YarnAllocator(
new HashSet[ContainerId])
containerSet += containerId
allocatedContainerToHostMap.put(containerId, executorHostname)
allocatedContainerToBindAddressMap.put(containerId, bindAddress.getOrElse(executorHostname))
launchingExecutorContainerIds.remove(containerId)
}
getOrUpdateNumExecutorsStartingForRPId(rpId).decrementAndGet()
Expand Down Expand Up @@ -920,6 +926,7 @@ private[yarn] class YarnAllocator(
}

allocatedContainerToHostMap.remove(containerId)
allocatedContainerToBindAddressMap.remove(containerId)
}

containerIdToExecutorIdAndResourceProfileId.remove(containerId).foreach { case (eid, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,13 @@ package object config extends Logging {
.stringConf
.createOptional

private[spark] val EXECUTOR_BIND_ADDRESS =
ConfigBuilder("spark.yarn.executor.bindAddress")
.doc("Address where to bind network listen sockets on the executor.")
.version("4.0.0")
.stringConf
.createOptional

/* Unmanaged AM configuration. */

private[spark] val YARN_UNMANAGED_AM = ConfigBuilder("spark.yarn.unmanagedAM.enabled")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ class YarnAllocatorSuite extends SparkFunSuite
ContainerStatus.newInstance(containerId, containerState, diagnostics, exitStatus)
}


test("single container allocated") {
// request a single container and receive it
val (handler, _) = createAllocator(1)
Expand All @@ -185,6 +184,7 @@ class YarnAllocatorSuite extends SparkFunSuite

handler.getNumExecutorsRunning should be (1)
handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
handler.allocatedContainerToBindAddressMap.get(container.getId).get should be ("host1")
val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(defaultRPId)
hostTocontainer.get("host1").get should contain(container.getId)

Expand Down Expand Up @@ -362,7 +362,7 @@ class YarnAllocatorSuite extends SparkFunSuite
}
}

test("container should not be created if requested number if met") {
test("container should not be created if requested number is met") {
// request a single container and receive it
val (handler, _) = createAllocator(1)
handler.updateResourceRequests()
Expand Down Expand Up @@ -868,4 +868,17 @@ class YarnAllocatorSuite extends SparkFunSuite
handler.getNumExecutorsRunning should be(0)
handler.getNumExecutorsStarting should be(0)
}

test("use requested bind-address") {
val (handler, _) = createAllocator(maxExecutors = 1,
additionalConfigs = Map(EXECUTOR_BIND_ADDRESS.key -> "0.0.0.0"))
handler.updateResourceRequests()

val container = createContainer("host1")
handler.handleAllocatedContainers(Array(container))

handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
handler.allocatedContainerToBindAddressMap.get(container.getId).get should be ("0.0.0.0")
}

}