Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
dynamic allocation: changes to spark-core
  • Loading branch information
foxish committed May 14, 2017
commit 0e586fbc4ca504f25facd72b5fd504058d671a3c
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
new SecurityManager(executorConf),
clientMode = true)
val driver = fetcher.setupEndpointRefByURI(driverUrl)
val cfg = driver.askWithRetry[SparkAppConfig](RetrieveSparkAppConfig)
val cfg = driver.askWithRetry[SparkAppConfig](RetrieveSparkAppConfig(executorId))
val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId))
fetcher.shutdown()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable

private[spark] object CoarseGrainedClusterMessages {

case object RetrieveSparkAppConfig extends CoarseGrainedClusterMessage
case class RetrieveSparkAppConfig(executorId: String) extends CoarseGrainedClusterMessage

case class SparkAppConfig(
sparkProperties: Seq[(String, String)],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
removeExecutor(executorId, reason)
context.reply(true)

case RetrieveSparkAppConfig =>
case RetrieveSparkAppConfig(executorId) =>
val reply = SparkAppConfig(sparkProperties,
SparkEnv.get.securityManager.getIOEncryptionKey())
context.reply(reply)
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,14 @@ private[spark] class BlockManager(
blockManagerId = if (idFromMaster != null) idFromMaster else id

shuffleServerId = if (externalShuffleServiceEnabled) {
logInfo(s"external shuffle service port = $externalShuffleServicePort")
BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
val shuffleServerHostName = if (blockManagerId.isDriver) {
blockTransferService.hostName
} else {
conf.get("spark.shuffle.service.host", blockTransferService.hostName)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a new configuration you are introducing? We will need documentation for this for sure to know what is this for.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also please use the ConfigBuilder from config.scala and its .internal() marker if this is not meant to be set by users and is only used internally.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ash211 do you think we should add it into spark-core alongside spark.shuffle.service.port that already exists there?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used the configbuilder for references to spark.shuffle.service.host within the Kubernetes package; and left it as it is here, conforming to the surrounding code in spark-core.

}
logInfo(s"external shuffle service host = $shuffleServerHostName, " +
s"port = $externalShuffleServicePort")
BlockManagerId(executorId, shuffleServerHostName, externalShuffleServicePort)
} else {
blockManagerId
}
Expand Down