Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,9 @@ private[spark] object SparkConf extends Logging {
"are no longer accepted. To specify the equivalent now, one may use '64k'."),
DeprecatedConfig("spark.rpc", "2.0", "Not used any more."),
DeprecatedConfig("spark.scheduler.executorTaskBlacklistTime", "2.1.0",
"Please use the new blacklisting options, spark.blacklist.*")
"Please use the new blacklisting options, spark.blacklist.*"),
DeprecatedConfig("spark.yarn.am.port", "2.0.0", "Not used any more"),
DeprecatedConfig("spark.executor.port", "2.0.0", "Not used any more")
)

Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
Expand Down
14 changes: 4 additions & 10 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ object SparkEnv extends Logging {
SparkContext.DRIVER_IDENTIFIER,
bindAddress,
advertiseAddress,
port,
Option(port),
isLocal,
numCores,
ioEncryptionKey,
Expand All @@ -194,7 +194,6 @@ object SparkEnv extends Logging {
conf: SparkConf,
executorId: String,
hostname: String,
port: Int,
numCores: Int,
ioEncryptionKey: Option[Array[Byte]],
isLocal: Boolean): SparkEnv = {
Expand All @@ -203,7 +202,7 @@ object SparkEnv extends Logging {
executorId,
hostname,
hostname,
port,
None,
isLocal,
numCores,
ioEncryptionKey
Expand All @@ -220,7 +219,7 @@ object SparkEnv extends Logging {
executorId: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
port: Option[Int],
isLocal: Boolean,
numUsableCores: Int,
ioEncryptionKey: Option[Array[Byte]],
Expand All @@ -243,17 +242,12 @@ object SparkEnv extends Logging {
}

val systemName = if (isDriver) driverSystemName else executorSystemName
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf,
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
securityManager, clientMode = !isDriver)

// Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied.
// In the non-driver case, the RPC env's address may be null since it may not be listening
// for incoming connections.
if (isDriver) {
conf.set("spark.driver.port", rpcEnv.address.port.toString)
} else if (rpcEnv.address != null) {
conf.set("spark.executor.port", rpcEnv.address.port.toString)
logInfo(s"Setting spark.executor.port to: ${rpcEnv.address.port.toString}")
}

// Create an instance of the class with the given name, possibly initializing it with our conf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,10 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {

// Bootstrap to fetch the driver's Spark properties.
val executorConf = new SparkConf
val port = executorConf.getInt("spark.executor.port", 0)
val fetcher = RpcEnv.create(
"driverPropsFetcher",
hostname,
port,
-1,
executorConf,
new SecurityManager(executorConf),
clientMode = true)
Expand All @@ -221,7 +220,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
}

val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false)
driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false)

env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
Expand Down
2 changes: 1 addition & 1 deletion docs/running-on-mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ provide such guarantees on the offer stream.

In this mode spark executors will honor port allocation if such is
provided from the user. Specifically if the user defines
`spark.executor.port` or `spark.blockManager.port` in Spark configuration,
`spark.blockManager.port` in Spark configuration,
the mesos scheduler will check the available offers for a valid port
range containing the port numbers. If no such range is available it will
not launch any task. If no restriction is imposed on port numbers by the
Expand Down
7 changes: 0 additions & 7 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,6 @@ To use a custom metrics.properties for the application master and executors, upd
Same as <code>spark.yarn.driver.memoryOverhead</code>, but for the YARN Application Master in client mode.
</td>
</tr>
<tr>
<td><code>spark.yarn.am.port</code></td>
<td>(random)</td>
<td>
Port for the YARN Application Master to listen on. In YARN client mode, this is used to communicate between the Spark driver running on a gateway and the YARN Application Master running on YARN. In YARN cluster mode, this is used for the dynamic executor feature, where it handles the kill from the scheduler backend.
</td>
</tr>
<tr>
<td><code>spark.yarn.queue</code></td>
<td><code>default</code></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,8 @@ private[spark] class MesosExecutorBackend
val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) ++
Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue))
val conf = new SparkConf(loadDefaults = true).setAll(properties)
val port = conf.getInt("spark.executor.port", 0)
val env = SparkEnv.createExecutorEnv(
conf, executorId, slaveInfo.getHostname, port, cpusPerTask, None, isLocal = false)
conf, executorId, slaveInfo.getHostname, cpusPerTask, None, isLocal = false)

executor = new Executor(
executorId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ trait MesosSchedulerUtils extends Logging {
}
}

val managedPortNames = List("spark.executor.port", BLOCK_MANAGER_PORT.key)
val managedPortNames = List(BLOCK_MANAGER_PORT.key)

/**
* The values of the non-zero ports to be used by the executor process.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,40 +179,25 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS

test("Port reservation is done correctly with user specified ports only") {
val conf = new SparkConf()
conf.set("spark.executor.port", "3000" )
conf.set(BLOCK_MANAGER_PORT, 4000)
val portResource = createTestPortResource((3000, 5000), Some("my_role"))

val (resourcesLeft, resourcesToBeUsed) = utils
.partitionPortResources(List(3000, 4000), List(portResource))
resourcesToBeUsed.length shouldBe 2
.partitionPortResources(List(4000), List(portResource))
resourcesToBeUsed.length shouldBe 1

val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}.toArray

portsToUse.length shouldBe 2
arePortsEqual(portsToUse, Array(3000L, 4000L)) shouldBe true
portsToUse.length shouldBe 1
arePortsEqual(portsToUse, Array(4000L)) shouldBe true

val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed)

val expectedUSed = Array((3000L, 3000L), (4000L, 4000L))
val expectedUSed = Array((4000L, 4000L))

arePortsEqual(portRangesToBeUsed.toArray, expectedUSed) shouldBe true
}

test("Port reservation is done correctly with some user specified ports (spark.executor.port)") {
val conf = new SparkConf()
conf.set("spark.executor.port", "3100" )
val portResource = createTestPortResource((3000, 5000), Some("my_role"))

val (resourcesLeft, resourcesToBeUsed) = utils
.partitionPortResources(List(3100), List(portResource))

val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}

portsToUse.length shouldBe 1
portsToUse.contains(3100) shouldBe true
}

test("Port reservation is done correctly with all random ports") {
val conf = new SparkConf()
val portResource = createTestPortResource((3000L, 5000L), Some("my_role"))
Expand All @@ -226,21 +211,20 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS

test("Port reservation is done correctly with user specified ports only - multiple ranges") {
val conf = new SparkConf()
conf.set("spark.executor.port", "2100" )
conf.set("spark.blockManager.port", "4000")
val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")),
createTestPortResource((2000, 2500), Some("other_role")))
val (resourcesLeft, resourcesToBeUsed) = utils
.partitionPortResources(List(2100, 4000), portResourceList)
.partitionPortResources(List(4000), portResourceList)
val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}

portsToUse.length shouldBe 2
portsToUse.length shouldBe 1
val portsRangesLeft = rangesResourcesToTuple(resourcesLeft)
val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed)

val expectedUsed = Array((2100L, 2100L), (4000L, 4000L))
val expectedUsed = Array((4000L, 4000L))

arePortsEqual(portsToUse.toArray, Array(2100L, 4000L)) shouldBe true
arePortsEqual(portsToUse.toArray, Array(4000L)) shouldBe true
arePortsEqual(portRangesToBeUsed.toArray, expectedUsed) shouldBe true
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,7 @@ private[spark] class ApplicationMaster(
}

private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
val port = sparkConf.get(AM_PORT)
rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, port, sparkConf, securityMgr,
rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, -1, sparkConf, securityMgr,
Copy link
Contributor

@vanzin vanzin May 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to replace the two create parameters (port and clientMode) with a single serverPort: Option[Int] now; if it's set, a server is started, if it's not, it operates in client-only mode.

Probably ok to punt on that one though, since it will touch a lot more places I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will touch a lot of places, I would incline to leave that create as it was.

clientMode = true)
val driverRef = waitForSparkDriver()
addAmIpFilter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@ package object config {
.timeConf(TimeUnit.MILLISECONDS)
.createOptional

private[spark] val AM_PORT =
ConfigBuilder("spark.yarn.am.port")
.intConf
.createWithDefault(0)

private[spark] val EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS =
ConfigBuilder("spark.yarn.executor.failuresValidityInterval")
.doc("Interval after which Executor failures will be considered independent and not " +
Expand Down