Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
update tokens, make SparkHadoopUtil methods private
  • Loading branch information
ArtRand committed Nov 15, 2017
commit 049e4b554b38f12bd1a2bd6855fcbb6a937fbf99
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class SparkHadoopUtil extends Logging {
* Add or overwrite current user's credentials with serialized delegation tokens,
* also confirms correct hadoop configuration is set.
*/
def addDelegationTokens(tokens: Array[Byte], sparkConf: SparkConf) {
private[spark] def addDelegationTokens(tokens: Array[Byte], sparkConf: SparkConf) {
UserGroupInformation.setConfiguration(newConfiguration(sparkConf))
val creds = deserialize(tokens)
logInfo(s"Adding/updating delegation tokens ${dumpTokens(creds)}")
Expand Down Expand Up @@ -481,7 +481,7 @@ object SparkHadoopUtil {
* @param fraction fraction of the time until expiration return
* @return Date when the fraction of the time until expiration has passed
*/
def getDateOfNextUpdate(expirationDate: Long, fraction: Double): Long = {
private[spark] def getDateOfNextUpdate(expirationDate: Long, fraction: Double): Long = {
val ct = System.currentTimeMillis
(ct + (fraction * (expirationDate - ct))).toLong
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ private[spark] class MesosHadoopDelegationTokenManager(
new Runnable {
override def run(): Unit = {
try {
val tokensBytes = getNewDelegationTokens()
broadcastDelegationTokens(tokensBytes)
getNewDelegationTokens()
broadcastDelegationTokens(tokens)
} catch {
case e: Exception =>
// Log the error and try to write new tokens back in an hour
Expand All @@ -114,10 +114,9 @@ private[spark] class MesosHadoopDelegationTokenManager(
scheduleRenewal(credentialRenewerRunnable)
}

private def getNewDelegationTokens(): Array[Byte] = {
private def getNewDelegationTokens(): Unit = {
logInfo(s"Attempting to login to KDC with principal ${principal}")
// Get new delegation tokens by logging in with a new UGI
// inspired by AMCredentialRenewer.scala:L174.
// Get new delegation tokens by logging in with a new UGI inspired by AMCredentialRenewer.scala
// Don't protect against keytabFile being empty because it's guarded above.
val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytabFile.get)
logInfo("Successfully logged into KDC")
Expand All @@ -142,7 +141,8 @@ private[spark] class MesosHadoopDelegationTokenManager(

// Add the temp credentials back to the original ones.
UserGroupInformation.getCurrentUser.addCredentials(tempCreds)
SparkHadoopUtil.get.serialize(tempCreds)
// update tokens for late or dynamically added executors
tokens = SparkHadoopUtil.get.serialize(tempCreds)
}

private def broadcastDelegationTokens(tokens: Array[Byte]) = {
Expand Down