Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy

import java.io.{File, IOException}
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream, File, IOException}
import java.security.PrivilegedExceptionAction
import java.text.DateFormat
import java.util.{Arrays, Comparator, Date, Locale}
Expand Down Expand Up @@ -147,14 +147,18 @@ class SparkHadoopUtil extends Logging {

def isYarnMode(): Boolean = { false }

def getCurrentUserCredentials(): Credentials = { null }

def addCurrentUserCredentials(creds: Credentials) {}

def addSecretKeyToUserCredentials(key: String, secret: String) {}

def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { null }

def getCurrentUserCredentials(): Credentials = {
UserGroupInformation.getCurrentUser().getCredentials()
}

def addCurrentUserCredentials(creds: Credentials): Unit = {
UserGroupInformation.getCurrentUser.addCredentials(creds)
}

def loginUserFromKeytab(principalName: String, keytabFilename: String): Unit = {
if (!new File(keytabFilename).exists()) {
throw new SparkException(s"Keytab file: ${keytabFilename} does not exist")
Expand Down Expand Up @@ -425,6 +429,21 @@ class SparkHadoopUtil extends Logging {
s"${if (status.isDirectory) "d" else "-"}$perm")
false
}

def serialize(creds: Credentials): Array[Byte] = {
val byteStream = new ByteArrayOutputStream
val dataStream = new DataOutputStream(byteStream)
creds.writeTokenStorageToStream(dataStream)
byteStream.toByteArray
}

def deserialize(tokenBytes: Array[Byte]): Credentials = {
val tokensBuf = new ByteArrayInputStream(tokenBytes)

val creds = new Credentials()
creds.readTokenStorageStream(new DataInputStream(tokensBuf))
creds
}
}

object SparkHadoopUtil {
Expand Down
38 changes: 29 additions & 9 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.ivy.Ivy
import org.apache.ivy.core.LogOptions
import org.apache.ivy.core.module.descriptor._
Expand All @@ -49,6 +50,7 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBibl
import org.apache.spark._
import org.apache.spark.api.r.RUtils
import org.apache.spark.deploy.rest._
import org.apache.spark.internal.Logging
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util._

Expand Down Expand Up @@ -556,19 +558,25 @@ object SparkSubmit extends CommandLineUtils {
}

// assure a keytab is available from any place in a JVM
if (clusterManager == YARN || clusterManager == LOCAL) {
if (clusterManager == YARN || clusterManager == LOCAL || clusterManager == MESOS) {
if (args.principal != null) {
require(args.keytab != null, "Keytab must be specified when principal is specified")
SparkHadoopUtil.get.loginUserFromKeytab(args.principal, args.keytab)
// Add keytab and principal configurations in sysProps to make them available
// for later use; e.g. in spark sql, the isolated class loader used to talk
// to HiveMetastore will use these settings. They will be set as Java system
// properties and then loaded by SparkConf
sysProps.put("spark.yarn.keytab", args.keytab)
sysProps.put("spark.yarn.principal", args.principal)
if (args.keytab != null) {
require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
// Add keytab and principal configurations in sysProps to make them available
// for later use; e.g. in spark sql, the isolated class loader used to talk
// to HiveMetastore will use these settings. They will be set as Java system
// properties and then loaded by SparkConf
sysProps.put("spark.yarn.keytab", args.keytab)
sysProps.put("spark.yarn.principal", args.principal)
UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
}
}
}

if (clusterManager == MESOS && UserGroupInformation.isSecurityEnabled) {
setRMPrincipal(sysProps)
}

// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
Expand Down Expand Up @@ -653,6 +661,18 @@ object SparkSubmit extends CommandLineUtils {
(childArgs, childClasspath, sysProps, childMainClass)
}

// [SPARK-20328]. HadoopRDD calls into a Hadoop library that fetches delegation tokens with
// renewer set to the YARN ResourceManager. Since YARN isn't configured in Mesos mode, we
// must trick it into thinking we're YARN.
private def setRMPrincipal(sysProps: HashMap[String, String]): Unit = {
val shortUserName = UserGroupInformation.getCurrentUser.getShortUserName
val key = s"spark.hadoop.${YarnConfiguration.RM_PRINCIPAL}"
// scalastyle:off println
printStream.println(s"Setting ${key} to ${shortUserName}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want this to be printed out every time someone runs spark-submit? Sounds a bit noisy.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It only prints when UserGroupInformation.isSecurityEnabled and I think it's useful information whenever a job is run.

// scalastyle:off println
sysProps.put(key, shortUserName)
}

/**
* Run the main method of the child class using the provided launch environment.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ private[spark] class HadoopDelegationTokenManager(
logDebug(s"Using the following delegation token providers: " +
s"${delegationTokenProviders.keys.mkString(", ")}.")

/** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop filesystem */
def this(sparkConf: SparkConf, hadoopConf: Configuration) = {
this(
sparkConf,
hadoopConf,
hadoopConf => Set(FileSystem.get(hadoopConf).getHomeDirectory.getFileSystem(hadoopConf)))
}

private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = {
val providers = List(new HadoopFSDelegationTokenProvider(fileSystems),
new HiveDelegationTokenProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import scala.collection.mutable
import scala.util.{Failure, Success}
import scala.util.control.NonFatal

import org.apache.hadoop.security.{Credentials, UserGroupInformation}

import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -219,6 +221,11 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
SparkHadoopUtil.get.startCredentialUpdater(driverConf)
}

cfg.hadoopDelegationCreds.foreach { hadoopCreds =>
val creds = SparkHadoopUtil.get.deserialize(hadoopCreds)
SparkHadoopUtil.get.addCurrentUserCredentials(creds)
}

val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ private[spark] object CoarseGrainedClusterMessages {

case class SparkAppConfig(
sparkProperties: Seq[(String, String)],
ioEncryptionKey: Option[Array[Byte]])
ioEncryptionKey: Option[Array[Byte]],
hadoopDelegationCreds: Option[Array[Byte]])
extends CoarseGrainedClusterMessage

case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Future

import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.Logging
import org.apache.spark.rpc._
import org.apache.spark.scheduler._
Expand All @@ -42,8 +46,8 @@ import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils, Utils}
*/
private[spark]
class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv)
extends ExecutorAllocationClient with SchedulerBackend with Logging
{
extends ExecutorAllocationClient with SchedulerBackend with Logging {

// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
protected val totalCoreCount = new AtomicInteger(0)
// Total number of executors that are currently registered
Expand Down Expand Up @@ -95,6 +99,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// The num of current max ExecutorId used to re-register appMaster
@volatile protected var currentExecutorIdCounter = 0

// hadoop token manager used by some sub-classes (e.g. Mesos)
def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = None

// Hadoop delegation tokens to be sent to the executors.
val hadoopDelegationCreds: Option[Array[Byte]] = getHadoopDelegationCreds()

class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {

Expand Down Expand Up @@ -223,8 +233,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
context.reply(true)

case RetrieveSparkAppConfig =>
val reply = SparkAppConfig(sparkProperties,
SparkEnv.get.securityManager.getIOEncryptionKey())
val reply = SparkAppConfig(
sparkProperties,
SparkEnv.get.securityManager.getIOEncryptionKey(),
hadoopDelegationCreds)
context.reply(reply)
}

Expand Down Expand Up @@ -675,6 +687,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
driverEndpoint.send(KillExecutorsOnHost(host))
true
}

protected def getHadoopDelegationCreds(): Option[Array[Byte]] = {
if (UserGroupInformation.isSecurityEnabled && hadoopDelegationTokenManager.isDefined) {
hadoopDelegationTokenManager.map { manager =>
val creds = UserGroupInformation.getCurrentUser.getCredentials
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
manager.obtainDelegationTokens(hadoopConf, creds)
SparkHadoopUtil.get.serialize(creds)
}
} else {
None
}
}
}

private[spark] object CoarseGrainedSchedulerBackend {
Expand Down
11 changes: 11 additions & 0 deletions resource-managers/mesos/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,17 @@
<scope>test</scope>
</dependency>

<dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really needed?

I don't see you adding specific tests for this, so wonder why you need the explicit dependency when other modules that depend on spark-core don't.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, MesosClusterManagerSuite creates a MesosCoarseGrainedSchedulerBackend which contains a HadoopDelegationTokenManager... etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, ok... the credential manager code should be safe when Hive classes aren't present, but if there's a problem in that area it's not your fault.

<groupId>${hive.group}</groupId>
<artifactId>hive-exec</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>${hive.group}</groupId>
<artifactId>hive-metastore</artifactId>
<scope>provided</scope>
</dependency>

<!-- Explicitly depend on shaded dependencies from the parent, since shaded deps aren't transitive -->
<dependency>
<groupId>com.google.guava</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ import java.util.{Collections, List => JList}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import java.util.concurrent.locks.ReentrantLock

import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.SchedulerDriver
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.Future

import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.SchedulerDriver

import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState}
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.config
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
Expand All @@ -55,8 +55,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
master: String,
securityManager: SecurityManager)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
with org.apache.mesos.Scheduler
with MesosSchedulerUtils {
with org.apache.mesos.Scheduler with MesosSchedulerUtils {

override def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] =
Some(new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration))

// Blacklist a slave after this many failures
private val MAX_SLAVE_FAILURES = 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,6 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
}

override def getCurrentUserCredentials(): Credentials = {
UserGroupInformation.getCurrentUser().getCredentials()
}

override def addCurrentUserCredentials(creds: Credentials) {
UserGroupInformation.getCurrentUser().addCredentials(creds)
}

override def addSecretKeyToUserCredentials(key: String, secret: String) {
val creds = new Credentials()
creds.addSecretKey(new Text(key), secret.getBytes(UTF_8))
Expand Down