Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Use delegation tokens to authenticate metastore connection
Change-Id: I84897d0b14fc69a68a70a6341e64c4c0a8188cba
  • Loading branch information
jerryshao committed Mar 21, 2017
commit 85a4220a0838f2e8e33d44db28a34cfb4b3453a1
20 changes: 0 additions & 20 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.deploy

import java.io.IOException
import java.lang.reflect.UndeclaredThrowableException
import java.security.PrivilegedExceptionAction
import java.text.DateFormat
import java.util.{Arrays, Comparator, Date, Locale}
Expand Down Expand Up @@ -354,25 +353,6 @@ class SparkHadoopUtil extends Logging {
}
buffer.toString
}

/**
* Run some code as the real logged in user (which may differ from the current user, for
* example, when using proxying).
*/
private[spark] def doAsRealUser[T](fn: => T): T = {
val currentUser = UserGroupInformation.getCurrentUser()
val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser)

// For some reason the Scala-generated anonymous class ends up causing an
// UndeclaredThrowableException, even if you annotate the method with @throws.
try {
realUser.doAs(new PrivilegedExceptionAction[T]() {
override def run(): T = fn
})
} catch {
case e: UndeclaredThrowableException => throw Option(e.getCause()).getOrElse(e)
}
}
}

object SparkHadoopUtil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,9 @@ private[spark] class Client(

// Merge credentials obtained from registered providers
val nearestTimeOfNextRenewal = credentialManager.obtainCredentials(hadoopConf, credentials)
// Add credentials to current user's UGI, so that following operations don't need to use the
// Kerberos tgt to get delegations again in the client side.
UserGroupInformation.getCurrentUser.addCredentials(credentials)

if (credentials != null) {
logDebug(YarnSparkHadoopUtil.get.dumpTokens(credentials).mkString("\n"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.spark.deploy.yarn.security

import java.lang.reflect.UndeclaredThrowableException
import java.security.PrivilegedExceptionAction

import scala.reflect.runtime.universe
import scala.util.control.NonFatal

Expand All @@ -27,7 +30,6 @@ import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.Token

import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -85,7 +87,7 @@ private[security] class HiveCredentialProvider extends ServiceCredentialProvider
classOf[String], classOf[String])
val getHive = hiveClass.getMethod("get", hiveConfClass)

SparkHadoopUtil.get.doAsRealUser {
doAsRealUser {
val hive = getHive.invoke(null, conf)
val tokenStr = getDelegationToken.invoke(hive, currentUser.getUserName(), principal)
.asInstanceOf[String]
Expand All @@ -106,5 +108,22 @@ private[security] class HiveCredentialProvider extends ServiceCredentialProvider
None
}


/**
* Run some code as the real logged in user (which may differ from the current user, for
* example, when using proxying).
*/
private def doAsRealUser[T](fn: => T): T = {
val currentUser = UserGroupInformation.getCurrentUser()
val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser)

// For some reason the Scala-generated anonymous class ends up causing an
// UndeclaredThrowableException, even if you annotate the method with @throws.
try {
realUser.doAs(new PrivilegedExceptionAction[T]() {
override def run(): T = fn
})
} catch {
case e: UndeclaredThrowableException => throw Option(e.getCause()).getOrElse(e)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,17 +189,7 @@ private[hive] class HiveClientImpl(
if (clientLoader.cachedHive != null) {
Hive.set(clientLoader.cachedHive.asInstanceOf[Hive])
}

// When Security is enabled, using real user to initialize Hive SessionState to avoid tgt
// not found issue with proxy user.
if (UserGroupInformation.isSecurityEnabled) {
SparkHadoopUtil.get.doAsRealUser {
SessionState.start(state)
}
} else {
SessionState.start(state)
}

SessionState.start(state)
state.out = new PrintStream(outputBuffer, true, "UTF-8")
state.err = new PrintStream(outputBuffer, true, "UTF-8")
state
Expand Down