diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 2a92ef99b9f3..6d507d85331b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -17,7 +17,7 @@
package org.apache.spark.deploy
-import java.io.{File, IOException}
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, File, IOException}
import java.security.PrivilegedExceptionAction
import java.text.DateFormat
import java.util.{Arrays, Comparator, Date, Locale}
@@ -147,14 +147,18 @@ class SparkHadoopUtil extends Logging {
def isYarnMode(): Boolean = { false }
- def getCurrentUserCredentials(): Credentials = { null }
-
- def addCurrentUserCredentials(creds: Credentials) {}
-
def addSecretKeyToUserCredentials(key: String, secret: String) {}
def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { null }
+ def getCurrentUserCredentials(): Credentials = {
+ UserGroupInformation.getCurrentUser().getCredentials()
+ }
+
+ def addCurrentUserCredentials(creds: Credentials): Unit = {
+ UserGroupInformation.getCurrentUser.addCredentials(creds)
+ }
+
def loginUserFromKeytab(principalName: String, keytabFilename: String): Unit = {
if (!new File(keytabFilename).exists()) {
throw new SparkException(s"Keytab file: ${keytabFilename} does not exist")
@@ -425,6 +429,21 @@ class SparkHadoopUtil extends Logging {
s"${if (status.isDirectory) "d" else "-"}$perm")
false
}
+
+ def serialize(creds: Credentials): Array[Byte] = {
+ val byteStream = new ByteArrayOutputStream
+ val dataStream = new DataOutputStream(byteStream)
+ creds.writeTokenStorageToStream(dataStream)
+ byteStream.toByteArray
+ }
+
+ def deserialize(tokenBytes: Array[Byte]): Credentials = {
+ val tokensBuf = new ByteArrayInputStream(tokenBytes)
+
+ val creds = new Credentials()
+ creds.readTokenStorageStream(new DataInputStream(tokensBuf))
+ creds
+ }
}
object SparkHadoopUtil {
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 6d744a084a0f..e7e8fbc25d0e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -34,6 +34,7 @@ import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.ivy.Ivy
import org.apache.ivy.core.LogOptions
import org.apache.ivy.core.module.descriptor._
@@ -49,6 +50,7 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBibl
import org.apache.spark._
import org.apache.spark.api.r.RUtils
import org.apache.spark.deploy.rest._
+import org.apache.spark.internal.Logging
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util._
@@ -556,19 +558,25 @@ object SparkSubmit extends CommandLineUtils {
}
// assure a keytab is available from any place in a JVM
- if (clusterManager == YARN || clusterManager == LOCAL) {
+ if (clusterManager == YARN || clusterManager == LOCAL || clusterManager == MESOS) {
if (args.principal != null) {
- require(args.keytab != null, "Keytab must be specified when principal is specified")
- SparkHadoopUtil.get.loginUserFromKeytab(args.principal, args.keytab)
- // Add keytab and principal configurations in sysProps to make them available
- // for later use; e.g. in spark sql, the isolated class loader used to talk
- // to HiveMetastore will use these settings. They will be set as Java system
- // properties and then loaded by SparkConf
- sysProps.put("spark.yarn.keytab", args.keytab)
- sysProps.put("spark.yarn.principal", args.principal)
+ if (args.keytab != null) {
+ require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
+ // Add keytab and principal configurations in sysProps to make them available
+ // for later use; e.g. in spark sql, the isolated class loader used to talk
+ // to HiveMetastore will use these settings. They will be set as Java system
+ // properties and then loaded by SparkConf
+ sysProps.put("spark.yarn.keytab", args.keytab)
+ sysProps.put("spark.yarn.principal", args.principal)
+ UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
+ }
}
}
+ if (clusterManager == MESOS && UserGroupInformation.isSecurityEnabled) {
+ setRMPrincipal(sysProps)
+ }
+
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
@@ -653,6 +661,18 @@ object SparkSubmit extends CommandLineUtils {
(childArgs, childClasspath, sysProps, childMainClass)
}
+ // [SPARK-20328]. HadoopRDD calls into a Hadoop library that fetches delegation tokens with
+ // renewer set to the YARN ResourceManager. Since YARN isn't configured in Mesos mode, we
+ // must trick it into thinking we're YARN.
+ private def setRMPrincipal(sysProps: HashMap[String, String]): Unit = {
+ val shortUserName = UserGroupInformation.getCurrentUser.getShortUserName
+ val key = s"spark.hadoop.${YarnConfiguration.RM_PRINCIPAL}"
+ // scalastyle:off println
+ printStream.println(s"Setting ${key} to ${shortUserName}")
+ // scalastyle:off println
+ sysProps.put(key, shortUserName)
+ }
+
/**
* Run the main method of the child class using the provided launch environment.
*
diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
index 01cbfe1ee6ae..c317c4fe3d82 100644
--- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
@@ -55,6 +55,14 @@ private[spark] class HadoopDelegationTokenManager(
logDebug(s"Using the following delegation token providers: " +
s"${delegationTokenProviders.keys.mkString(", ")}.")
+ /** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop filesystem */
+ def this(sparkConf: SparkConf, hadoopConf: Configuration) = {
+ this(
+ sparkConf,
+ hadoopConf,
+ hadoopConf => Set(FileSystem.get(hadoopConf).getHomeDirectory.getFileSystem(hadoopConf)))
+ }
+
private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = {
val providers = List(new HadoopFSDelegationTokenProvider(fileSystems),
new HiveDelegationTokenProvider,
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index a2f1aa22b006..a5d60e90210f 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -26,6 +26,8 @@ import scala.collection.mutable
import scala.util.{Failure, Success}
import scala.util.control.NonFatal
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.deploy.SparkHadoopUtil
@@ -219,6 +221,11 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
SparkHadoopUtil.get.startCredentialUpdater(driverConf)
}
+ cfg.hadoopDelegationCreds.foreach { hadoopCreds =>
+ val creds = SparkHadoopUtil.get.deserialize(hadoopCreds)
+ SparkHadoopUtil.get.addCurrentUserCredentials(creds)
+ }
+
val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 89a9ad6811e1..5d65731dfc30 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -32,7 +32,8 @@ private[spark] object CoarseGrainedClusterMessages {
case class SparkAppConfig(
sparkProperties: Seq[(String, String)],
- ioEncryptionKey: Option[Array[Byte]])
+ ioEncryptionKey: Option[Array[Byte]],
+ hadoopDelegationCreds: Option[Array[Byte]])
extends CoarseGrainedClusterMessage
case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index a46824a0c6fa..a0ef20977930 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -24,7 +24,11 @@ import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Future
+import org.apache.hadoop.security.UserGroupInformation
+
import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.Logging
import org.apache.spark.rpc._
import org.apache.spark.scheduler._
@@ -42,8 +46,8 @@ import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils, Utils}
*/
private[spark]
class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv)
- extends ExecutorAllocationClient with SchedulerBackend with Logging
-{
+ extends ExecutorAllocationClient with SchedulerBackend with Logging {
+
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
protected val totalCoreCount = new AtomicInteger(0)
// Total number of executors that are currently registered
@@ -95,6 +99,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// The num of current max ExecutorId used to re-register appMaster
@volatile protected var currentExecutorIdCounter = 0
+ // hadoop token manager used by some sub-classes (e.g. Mesos)
+ def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = None
+
+ // Hadoop delegation tokens to be sent to the executors.
+ val hadoopDelegationCreds: Option[Array[Byte]] = getHadoopDelegationCreds()
+
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {
@@ -223,8 +233,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
context.reply(true)
case RetrieveSparkAppConfig =>
- val reply = SparkAppConfig(sparkProperties,
- SparkEnv.get.securityManager.getIOEncryptionKey())
+ val reply = SparkAppConfig(
+ sparkProperties,
+ SparkEnv.get.securityManager.getIOEncryptionKey(),
+ hadoopDelegationCreds)
context.reply(reply)
}
@@ -675,6 +687,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
driverEndpoint.send(KillExecutorsOnHost(host))
true
}
+
+ protected def getHadoopDelegationCreds(): Option[Array[Byte]] = {
+ if (UserGroupInformation.isSecurityEnabled && hadoopDelegationTokenManager.isDefined) {
+ hadoopDelegationTokenManager.map { manager =>
+ val creds = UserGroupInformation.getCurrentUser.getCredentials
+ val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+ manager.obtainDelegationTokens(hadoopConf, creds)
+ SparkHadoopUtil.get.serialize(creds)
+ }
+ } else {
+ None
+ }
+ }
}
private[spark] object CoarseGrainedSchedulerBackend {
diff --git a/resource-managers/mesos/pom.xml b/resource-managers/mesos/pom.xml
index 20b53f2d8f98..2aa3228af79d 100644
--- a/resource-managers/mesos/pom.xml
+++ b/resource-managers/mesos/pom.xml
@@ -74,6 +74,17 @@
test
+
+ ${hive.group}
+ hive-exec
+ provided
+
+
+ ${hive.group}
+ hive-metastore
+ provided
+
+
com.google.guava
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index e6b09572121d..5ecd466194d8 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -22,15 +22,15 @@ import java.util.{Collections, List => JList}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import java.util.concurrent.locks.ReentrantLock
+import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
+import org.apache.mesos.SchedulerDriver
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.Future
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
-import org.apache.mesos.SchedulerDriver
-
import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState}
import org.apache.spark.deploy.mesos.config._
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.config
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
@@ -55,8 +55,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
master: String,
securityManager: SecurityManager)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
- with org.apache.mesos.Scheduler
- with MesosSchedulerUtils {
+ with org.apache.mesos.Scheduler with MesosSchedulerUtils {
+
+ override def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] =
+ Some(new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration))
// Blacklist a slave after this many failures
private val MAX_SLAVE_FAILURES = 2
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 4fef4394bb3f..3d9f99f57bed 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -74,14 +74,6 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
}
- override def getCurrentUserCredentials(): Credentials = {
- UserGroupInformation.getCurrentUser().getCredentials()
- }
-
- override def addCurrentUserCredentials(creds: Credentials) {
- UserGroupInformation.getCurrentUser().addCredentials(creds)
- }
-
override def addSecretKeyToUserCredentials(key: String, secret: String) {
val creds = new Credentials()
creds.addSecretKey(new Text(key), secret.getBytes(UTF_8))