Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
.
  • Loading branch information
Michael Gummelt committed Jul 3, 2017
commit 794d26e05a968a4562450d828ead731f97fa2536
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
SparkHadoopUtil.get.startCredentialUpdater(driverConf)
}

cfg.hadoopCreds.foreach { hadoopCreds =>
cfg.hadoopDelegationCreds.foreach { hadoopCreds =>
val creds = new CredentialsSerializer().deserialize(hadoopCreds)
addCredentials(creds)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ private[spark] object CoarseGrainedClusterMessages {
case class SparkAppConfig(
sparkProperties: Seq[(String, String)],
ioEncryptionKey: Option[Array[Byte]],
hadoopCreds: Option[Array[Byte]])
hadoopDelegationCreds: Option[Array[Byte]])
extends CoarseGrainedClusterMessage

case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,7 @@ class CoarseGrainedSchedulerBackend(
// The num of current max ExecutorId used to re-register appMaster
@volatile protected var currentExecutorIdCounter = 0

private val hadoopCreds = if (UserGroupInformation.isSecurityEnabled) {
hadoopDelegationTokenManager.map { manager =>
val creds = UserGroupInformation.getCurrentUser.getCredentials
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
manager.obtainDelegationTokens(hadoopConf, creds)
new CredentialsSerializer().serialize(creds)
}
} else {
None
}
private val hadoopDelegationCreds = getHadoopDelegationCreds()

class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {
Expand Down Expand Up @@ -244,7 +235,7 @@ class CoarseGrainedSchedulerBackend(
val reply = SparkAppConfig(
sparkProperties,
SparkEnv.get.securityManager.getIOEncryptionKey(),
hadoopCreds)
hadoopDelegationCreds)
context.reply(reply)
}

Expand Down Expand Up @@ -273,6 +264,7 @@ class CoarseGrainedSchedulerBackend(
"messages.")))
}


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not needed

// Make fake resource offers on just one executor
private def makeOffers(executorId: String) {
// Make sure no executor is killed while some task is launching on it
Expand Down Expand Up @@ -517,6 +509,7 @@ class CoarseGrainedSchedulerBackend(

/**
* Request an additional number of executors from the cluster manager.
*
* @return whether the request is acknowledged.
*/
final override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
Expand Down Expand Up @@ -551,6 +544,7 @@ class CoarseGrainedSchedulerBackend(
/**
* Update the cluster manager on our scheduling needs. Three bits of information are included
* to help it make decisions.
*
* @param numExecutors The total number of executors we'd like to have. The cluster manager
* shouldn't kill any running executor to reach this number, but,
* if all existing executors were to die, this is the number of executors
Expand Down Expand Up @@ -675,13 +669,15 @@ class CoarseGrainedSchedulerBackend(

/**
* Kill the given list of executors through the cluster manager.
*
* @return whether the kill request is acknowledged.
*/
protected def doKillExecutors(executorIds: Seq[String]): Future[Boolean] =
Future.successful(false)

/**
* Request that the cluster manager kill all executors on a given host.
*
* @return whether the kill request is acknowledged.
*/
final override def killExecutorsOnHost(host: String): Boolean = {
Expand All @@ -695,6 +691,19 @@ class CoarseGrainedSchedulerBackend(
driverEndpoint.send(KillExecutorsOnHost(host))
true
}

private def getHadoopDelegationCreds(): Option[Array[Byte]] = {
if (UserGroupInformation.isSecurityEnabled) {
hadoopDelegationTokenManager.map { manager =>
val creds = UserGroupInformation.getCurrentUser.getCredentials
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
manager.obtainDelegationTokens(hadoopConf, creds)
new CredentialsSerializer().serialize(creds)
}
} else {
None
}
}
}

private[spark] object CoarseGrainedSchedulerBackend {
Expand Down