Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
no more conditionals, start renewer automatically
  • Loading branch information
ArtRand committed Nov 8, 2017
commit 5f254e5369238903cce0ac012f8f7ff48ea70afb
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@ private[spark] class HadoopDelegationTokenManager(
logDebug(s"Using the following delegation token providers: " +
s"${delegationTokenProviders.keys.mkString(", ")}.")

/**
* Construct a [[HadoopDelegationTokenManager]] for the default Hadoop filesystem.
*/
/** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop filesystem */
def this(sparkConf: SparkConf, hadoopConf: Configuration) = {
this(
sparkConf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Future

import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState}
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.Logging
import org.apache.spark.rpc._
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -96,8 +95,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// The num of current max ExecutorId used to re-register appMaster
@volatile protected var currentExecutorIdCounter = 0

// Hadoop delegation tokens to be sent to the executors, can be updated as necessary.
protected var hadoopDelegationTokens: Option[Array[Byte]] = initializeHadoopDelegationTokens()
private val hadoopDelegationTokens: () => Option[Array[Byte]] = fetchHadoopDelegationTokens

class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {
Expand Down Expand Up @@ -156,7 +154,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

case UpdateDelegationTokens(newDelegationTokens) =>
// Update the driver's delegation tokens in case new executors are added later.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stale comment?

hadoopDelegationTokens = Some(newDelegationTokens)
executorDataMap.values.foreach { ed =>
ed.executorEndpoint.send(UpdateDelegationTokens(newDelegationTokens))
}
Expand Down Expand Up @@ -237,7 +234,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
val reply = SparkAppConfig(
sparkProperties,
SparkEnv.get.securityManager.getIOEncryptionKey(),
hadoopDelegationTokens)
hadoopDelegationTokens.apply())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't you just call fetchHadoopDelegationTokens() directly?

context.reply(reply)
}

Expand Down Expand Up @@ -687,7 +684,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
true
}

protected def initializeHadoopDelegationTokens(): Option[Array[Byte]] = { None }
protected def fetchHadoopDelegationTokens(): Option[Array[Byte]] = { None }
}

private[spark] object CoarseGrainedSchedulerBackend {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.apache.mesos.SchedulerDriver

import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException, TaskState}
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.config
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
import org.apache.spark.network.netty.SparkTransportConf
Expand Down Expand Up @@ -60,9 +59,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
with org.apache.mesos.Scheduler with MesosSchedulerUtils {

private lazy val hadoopCredentialRenewer: MesosCredentialRenewer =
new MesosCredentialRenewer(
conf, new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration))
private lazy val hadoopDelegationTokenManager: MesosHadoopDelegationTokenManager =
new MesosHadoopDelegationTokenManager(conf, sc.hadoopConfiguration, driverEndpoint)

// Blacklist a slave after this many failures
private val MAX_SLAVE_FAILURES = 2
Expand Down Expand Up @@ -216,14 +214,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
)

// check that the credentials are defined, even though it's likely that auth would have failed
// already if you've made it this far, then start the token renewer
if (hadoopDelegationTokens.isDefined) {
// the driver endpoint isn't set when the initial tokens are generated (as well as their
// expiration time, so we pass the driver endpoint here.
hadoopCredentialRenewer.scheduleTokenRenewal(driverEndpoint)
}

launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
startScheduler(driver)
}
Expand Down Expand Up @@ -784,9 +774,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
}
}

override def initializeHadoopDelegationTokens(): Option[Array[Byte]] = {
override def fetchHadoopDelegationTokens(): Option[Array[Byte]] = {
if (UserGroupInformation.isSecurityEnabled) {
Some(hadoopCredentialRenewer.tokens)
Some(hadoopDelegationTokenManager.getTokens())
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.scheduler.cluster.mesos
import java.security.PrivilegedExceptionAction
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.SparkConf
Expand All @@ -32,39 +33,45 @@ import org.apache.spark.util.ThreadUtils


/**
* The MesosCredentialRenewer will update the Hadoop credentials for Spark drivers accessing
* secured services using Kerberos authentication. It is modeled after the YARN AMCredential
* renewer, and similarly will renew the Credentials when 75% of the renewal interval has passed.
* The MesosHadoopDelegationTokenManager fetches and updates Hadoop delegation tokens on the behalf
* of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN AMCredentialRenewer,
* and similarly will renew the Credentials when 75% of the renewal interval has passed.
* The principal difference is that instead of writing the new credentials to HDFS and
* incrementing the timestamp of the file, the new credentials (called Tokens when they are
* serialized) are broadcast to all running executors. On the executor side, when new Tokens are
* recieved they overwrite the current credentials.
* received they overwrite the current credentials.
*/
class MesosCredentialRenewer(
conf: SparkConf,
tokenManager: HadoopDelegationTokenManager) extends Logging {
private[spark] class MesosHadoopDelegationTokenManager(
conf: SparkConf, hadoopConfig: Configuration,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One arg per line.

driverEndpoint: RpcEndpointRef)
extends Logging {

private val credentialRenewerThread: ScheduledExecutorService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread")

private val principal = conf.get(config.PRINCIPAL).orNull
private val tokenManager: HadoopDelegationTokenManager =
new HadoopDelegationTokenManager(conf, hadoopConfig)

private val (secretFile, mode) = getSecretFile(conf)
private val principal: String = conf.get(config.PRINCIPAL).orNull

var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
private val (secretFile: String, mode: String) = getSecretFile(conf)

private var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
try {
val creds = UserGroupInformation.getCurrentUser.getCredentials
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
logInfo(s"Initialized tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}")
(SparkHadoopUtil.get.serialize(creds), rt)
} catch {
case e: Exception =>
throw new IllegalStateException("Failed to initialize Hadoop delegation tokens\n" +
s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file $secretFile\n\tException: $e")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use e as the cause of the exception you're throwing.

}

}

scheduleTokenRenewal()

private def getSecretFile(conf: SparkConf): (String, String) = {
val keytab = conf.get(config.KEYTAB).orNull
val tgt = conf.getenv("KRB5CCNAME")
Expand All @@ -90,7 +97,7 @@ class MesosCredentialRenewer(
(secretFile, mode)
}

def scheduleTokenRenewal(driverEndpoint: RpcEndpointRef): Unit = {
private def scheduleTokenRenewal(): Unit = {
def scheduleRenewal(runnable: Runnable): Unit = {
val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
if (remainingTime <= 0) {
Expand All @@ -107,7 +114,7 @@ class MesosCredentialRenewer(
override def run(): Unit = {
try {
val tokensBytes = getNewDelegationTokens
broadcastDelegationTokens(tokensBytes, driverEndpoint)
broadcastDelegationTokens(tokensBytes)
} catch {
case e: Exception =>
// Log the error and try to write new tokens back in an hour
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment says "an hour" but code has 20 seconds.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, I changed the code to match the YARN equivalent.

Expand Down Expand Up @@ -151,16 +158,23 @@ class MesosCredentialRenewer(
} else {
SparkHadoopUtil.getDateOfNextUpdate(nextRenewalTime, 0.75)
}
logInfo(s"Time of next renewal is $timeOfNextRenewal")
logInfo(s"Time of next renewal is in ${timeOfNextRenewal - System.currentTimeMillis()} ms")

// Add the temp credentials back to the original ones.
UserGroupInformation.getCurrentUser.addCredentials(tempCreds)
SparkHadoopUtil.get.serialize(tempCreds)
}

private def broadcastDelegationTokens(tokens: Array[Byte], driverEndpoint: RpcEndpointRef) = {
logInfo("Sending new tokens to all executors")
private def broadcastDelegationTokens(tokens: Array[Byte]) = {
logDebug("Sending new tokens to all executors")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd make this logInfo (similar message in YARN code has helped me a lot).

if (driverEndpoint == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this a require in the constructor?

throw new IllegalStateException("driver endpoint is Null!")
}
driverEndpoint.send(UpdateDelegationTokens(tokens))
}

def getTokens(): Array[Byte] = {
tokens
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tokens is never updated, so fetchHadoopDelegationTokens() will always return the initial set even after it's expired.

Copy link
Author

@ArtRand ArtRand Nov 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this, tokens are now updated for late-joining executors. https://github.com/apache/spark/pull/19272/files#diff-765ac3c4db227cd2c5d796f00794016fR145

}
}