Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
only renew tokens when using keytab
  • Loading branch information
ArtRand committed Nov 14, 2017
commit 18d77ff5a3aa29c9e60538ae87b6d654c229bdfe
Original file line number Diff line number Diff line change
Expand Up @@ -450,8 +450,6 @@ object SparkHadoopUtil {
case e: Exception => throw new SparkException("Unable to load YARN support", e)
}

val TICKET_CACHE_ENVVAR = "KRB5CCNAME"

val SPARK_YARN_CREDS_TEMP_EXTENSION = ".tmp"

val SPARK_YARN_CREDS_COUNTER_DELIM = "-"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ private[spark] class MesosHadoopDelegationTokenManager(

private val principal: String = conf.get(config.PRINCIPAL).orNull

private val (secretFile: Option[String], mode: String) = getSecretFile(conf)

private var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
try {
val creds = UserGroupInformation.getCurrentUser.getCredentials
Expand All @@ -68,41 +66,24 @@ private[spark] class MesosHadoopDelegationTokenManager(
(SparkHadoopUtil.get.serialize(creds), rt)
} catch {
case e: Exception =>
logError("Failed to initialize Hadoop delegation tokens\n" +
s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file $secretFile\n\tException: $e")
logError(s"Failed to fetch Hadoop delegation tokens $e")
throw e
}
}

scheduleTokenRenewal()
private val keytabFile: Option[String] = conf.get(config.KEYTAB)

private def getSecretFile(conf: SparkConf): (Option[String], String) = {
val keytab = conf.get(config.KEYTAB)
val tgt = Option(conf.getenv(SparkHadoopUtil.TICKET_CACHE_ENVVAR))
val (secretFile, mode) = if (keytab.isDefined && tgt.isDefined) {
// if a keytab and a specific ticket cache is specified use the keytab and log the behavior
logWarning(s"Keytab and TGT were detected, using keytab, ${keytab.get}, " +
s"unset ${config.KEYTAB.key} to use TGT (${tgt.get})")
(keytab, "keytab")
} else {
val m = if (keytab.isDefined) "keytab" else "tgt"
val sf = if (keytab.isDefined) keytab else tgt
(sf, m)
}
scheduleTokenRenewal()

if (principal == null) {
require(mode == "tgt", s"Must specify a principal when using a Keytab, was $principal")
logInfo(s"Using ticket cache to fetch Hadoop delegation tokens")
private def scheduleTokenRenewal(): Unit = {
if (keytabFile.isDefined) {
require(principal != null, "Principal is required for Keytab-based authentication")
logInfo(s"Using keytab: ${keytabFile.get} and principal $principal")
} else {
logInfo(s"Using principal: $principal with mode and keytab $keytab " +
s"to fetch Hadoop delegation tokens")
logInfo("Using ticket cache for Kerberos authentication, no token renewal.")
return
}

logDebug(s"secretFile is $secretFile")
(secretFile, mode)
}

private def scheduleTokenRenewal(): Unit = {
def scheduleRenewal(runnable: Runnable): Unit = {
val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
if (remainingTime <= 0) {
Expand All @@ -118,7 +99,7 @@ private[spark] class MesosHadoopDelegationTokenManager(
new Runnable {
override def run(): Unit = {
try {
val tokensBytes = getNewDelegationTokens
val tokensBytes = getNewDelegationTokens()
broadcastDelegationTokens(tokensBytes)
} catch {
case e: Exception =>
Expand All @@ -134,21 +115,12 @@ private[spark] class MesosHadoopDelegationTokenManager(
}

private def getNewDelegationTokens(): Array[Byte] = {
logInfo(s"Attempting to login to KDC with ${conf.get(config.PRINCIPAL).orNull}")
logInfo(s"Attempting to login to KDC with principal ${principal}")
// Get new delegation tokens by logging in with a new UGI
// inspired by AMCredentialRenewer.scala:L174
val ugi = if (mode == "keytab") {
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, secretFile.get)
} else {
// if the ticket cache is not explicitly defined, use the default
if (secretFile.isEmpty) {
UserGroupInformation.getCurrentUser
} else {
UserGroupInformation.getUGIFromTicketCache(secretFile.get, principal)
}
}
// inspired by AMCredentialRenewer.scala:L174.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: mentioning line numbers will make this stale very quickly.

// Don't protect against keytabFile being empty because it's guarded above.
val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytabFile.get)
logInfo("Successfully logged into KDC")

val tempCreds = ugi.getCredentials
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
val nextRenewalTime = ugi.doAs(new PrivilegedExceptionAction[Long] {
Expand Down