Skip to content
Next Next commit
Support executor bind address in Yarn executors
  • Loading branch information
Hendra Saputra committed Sep 20, 2023
commit b7763a804a85a4d317604a8a3ca9def08b126cf1
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,11 @@ package object config {
private[spark] val EXECUTOR_ID =
ConfigBuilder("spark.executor.id").version("1.2.0").stringConf.createOptional

private[spark] val EXECUTOR_BIND_ADDRESS = ConfigBuilder("spark.executor.bindAddress")
.doc("Address where to bind network listen sockets on the executor.")
.stringConf
.createWithDefault(Utils.localHostName())

private[spark] val EXECUTOR_CLASS_PATH =
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH)
.version("1.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ private[spark] class ApplicationMaster(
val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt
val executorCores = _sparkConf.get(EXECUTOR_CORES)
val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "<executorId>",
"<hostname>", executorMemory, executorCores, appId, securityMgr, localResources,
"<bindAddress>", "<hostname>", executorMemory, executorCores, appId, securityMgr, localResources,
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
dummyRunner.launchContextDebugInfo()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ private[yarn] class ExecutorRunnable(
sparkConf: SparkConf,
masterAddress: String,
executorId: String,
bindAddress: String,
hostname: String,
executorMemory: Int,
executorCores: Int,
Expand Down Expand Up @@ -117,7 +118,7 @@ private[yarn] class ExecutorRunnable(
} catch {
case ex: Exception =>
throw new SparkException(s"Exception while starting container ${container.get.getId}" +
s" on host $hostname", ex)
s" on host $hostname ($bindAddress)", ex)
}
}

Expand Down Expand Up @@ -189,6 +190,7 @@ private[yarn] class ExecutorRunnable(
Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend",
"--driver-url", masterAddress,
"--executor-id", executorId,
"--bindAddress", bindAddress,
"--hostname", hostname,
"--cores", executorCores.toString,
"--app-id", appId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -733,12 +733,13 @@ private[yarn] class YarnAllocator(
for (container <- containersToUse) {
val rpId = getResourceProfileIdFromPriority(container.getPriority)
executorIdCounter += 1
val executorBindAddress = sparkConf.get(EXECUTOR_BIND_ADDRESS.key, executorHostname)
val executorHostname = container.getNodeId.getHost
val containerId = container.getId
val executorId = executorIdCounter.toString
val yarnResourceForRpId = rpIdToYarnResource.get(rpId)
assert(container.getResource.getMemorySize >= yarnResourceForRpId.getMemorySize)
logInfo(s"Launching container $containerId on host $executorHostname " +
logInfo(s"Launching container $containerId on host $executorHostname ($executorBindAddress) " +
s"for executor with ID $executorId for ResourceProfile Id $rpId")

val rp = rpIdToResourceProfile(rpId)
Expand All @@ -763,6 +764,7 @@ private[yarn] class YarnAllocator(
sparkConf,
driverUrl,
executorId,
executorBindAddress,
executorHostname,
containerMem,
containerCores,
Expand Down