From ff55b893371a4bd2a3ecd4256d68ae3679092339 Mon Sep 17 00:00:00 2001 From: Michael Gummelt Date: Mon, 3 Jul 2017 13:05:20 -0700 Subject: [PATCH 01/21] Mesos Kerberos support --- .../org/apache/spark/deploy/SparkSubmit.scala | 16 +++++++++-- .../CoarseGrainedExecutorBackend.scala | 14 ++++++++++ .../cluster/CoarseGrainedClusterMessage.scala | 3 ++- .../CoarseGrainedSchedulerBackend.scala | 27 ++++++++++++++++--- .../cluster/StandaloneSchedulerBackend.scala | 2 +- .../apache/spark/HeartbeatReceiverSuite.scala | 2 +- .../MesosCoarseGrainedSchedulerBackend.scala | 20 +++++++++++--- .../cluster/YarnSchedulerBackend.scala | 2 +- 8 files changed, 73 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index d13fb4193970..db1bbcf4cab0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -32,6 +32,7 @@ import org.apache.commons.lang3.StringUtils import org.apache.hadoop.conf.{Configuration => HadoopConfiguration} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.ivy.Ivy import org.apache.ivy.core.LogOptions import org.apache.ivy.core.module.descriptor._ @@ -47,6 +48,7 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBibl import org.apache.spark._ import org.apache.spark.api.r.RUtils import org.apache.spark.deploy.rest._ +import org.apache.spark.internal.Logging import org.apache.spark.launcher.SparkLauncher import org.apache.spark.util._ @@ -65,7 +67,7 @@ private[deploy] object SparkSubmitAction extends Enumeration { * This program handles setting up the classpath with relevant Spark dependencies and provides * a layer over the different cluster managers and deploy modes that Spark supports. */ -object SparkSubmit extends CommandLineUtils { +object SparkSubmit extends CommandLineUtils with Logging { // Cluster managers private val YARN = 1 @@ -563,7 +565,7 @@ object SparkSubmit extends CommandLineUtils { } // assure a keytab is available from any place in a JVM - if (clusterManager == YARN || clusterManager == LOCAL) { + if (clusterManager == YARN || clusterManager == LOCAL || clusterManager == MESOS) { if (args.principal != null) { require(args.keytab != null, "Keytab must be specified when principal is specified") if (!new File(args.keytab).exists()) { @@ -581,6 +583,16 @@ object SparkSubmit extends CommandLineUtils { } } + // [SPARK-20328]. HadoopRDD calls into a Hadoop library that fetches delegation tokens with + // renewer set to the YARN ResourceManager. Since YARN isn't configured in Mesos mode, we + // must trick it into thinking we're YARN. + if (clusterManager == MESOS && UserGroupInformation.isSecurityEnabled) { + val shortUserName = UserGroupInformation.getCurrentUser.getShortUserName + val key = s"spark.hadoop.${YarnConfiguration.RM_PRINCIPAL}" + logDebug(s"Setting ${key} to ${shortUserName}") + sysProps.put(key, shortUserName) + } + // In yarn-cluster mode, use yarn.Client as a wrapper around the user class if (isYarnCluster) { childMainClass = "org.apache.spark.deploy.yarn.Client" diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index a2f1aa22b006..c69f3211e73f 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -26,9 +26,12 @@ import scala.collection.mutable import scala.util.{Failure, Success} import scala.util.control.NonFatal +import org.apache.hadoop.security.UserGroupInformation + import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.CredentialsSerializer import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.internal.Logging import org.apache.spark.rpc._ @@ -219,6 +222,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { SparkHadoopUtil.get.startCredentialUpdater(driverConf) } + cfg.ugiTokens.foreach(addDelegationTokens(_, driverConf)) + val env = SparkEnv.createExecutorEnv( driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false) @@ -232,6 +237,15 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { } } + private def addDelegationTokens(tokenBytes: Array[Byte], driverConf: SparkConf): Unit = { + val creds = new CredentialsSerializer().deserializeTokens(tokenBytes) + + logInfo(s"Adding ${creds.numberOfTokens()} tokens and ${creds.numberOfSecretKeys()} secret" + + s"keys to the current user's credentials.") + + UserGroupInformation.getCurrentUser().addCredentials(creds) + } + def main(args: Array[String]) { var driverUrl: String = null var executorId: String = null diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 89a9ad6811e1..52b5ce2a06cc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -32,7 +32,8 @@ private[spark] object CoarseGrainedClusterMessages { case class SparkAppConfig( sparkProperties: Seq[(String, String)], - ioEncryptionKey: Option[Array[Byte]]) + ioEncryptionKey: Option[Array[Byte]], + ugiTokens: Option[Array[Byte]]) extends CoarseGrainedClusterMessage case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 0b396b794ddc..d8e20ce76e62 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -23,9 +23,12 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.concurrent.Future -import scala.concurrent.duration.Duration + +import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.{CredentialsSerializer, HadoopDelegationTokenManager} import org.apache.spark.internal.Logging import org.apache.spark.rpc._ import org.apache.spark.scheduler._ @@ -42,7 +45,10 @@ import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils, Utils} * (spark.deploy.*). */ private[spark] -class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv) +class CoarseGrainedSchedulerBackend( + scheduler: TaskSchedulerImpl, + val rpcEnv: RpcEnv, + hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager]) extends ExecutorAllocationClient with SchedulerBackend with Logging { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed @@ -96,6 +102,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // The num of current max ExecutorId used to re-register appMaster @volatile protected var currentExecutorIdCounter = 0 + private val userTokens = if (UserGroupInformation.isSecurityEnabled) { + hadoopDelegationTokenManager.map { manager => + val creds = UserGroupInformation.getCurrentUser.getCredentials + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + manager.obtainDelegationTokens(hadoopConf, creds) + new CredentialsSerializer().serializeTokens(creds) + } + } else { + None + } + class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) extends ThreadSafeRpcEndpoint with Logging { @@ -224,8 +241,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp context.reply(true) case RetrieveSparkAppConfig => - val reply = SparkAppConfig(sparkProperties, - SparkEnv.get.securityManager.getIOEncryptionKey()) + val reply = SparkAppConfig( + sparkProperties, + SparkEnv.get.securityManager.getIOEncryptionKey(), + userTokens) context.reply(reply) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index a4e2a7434128..a4509c75f2d3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -38,7 +38,7 @@ private[spark] class StandaloneSchedulerBackend( scheduler: TaskSchedulerImpl, sc: SparkContext, masters: Array[String]) - extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv, None) with StandaloneAppClientListener with Logging { diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index 88916488c0de..14d0575a13cc 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -268,7 +268,7 @@ private class FakeSchedulerBackend( scheduler: TaskSchedulerImpl, rpcEnv: RpcEnv, clusterManagerEndpoint: RpcEndpointRef) - extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { + extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv, None) { protected override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = { clusterManagerEndpoint.ask[Boolean]( diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 7dd42c41aa7c..a8b7dff2cfd7 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -25,10 +25,13 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.concurrent.Future +import org.apache.hadoop.fs.FileSystem import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} import org.apache.mesos.SchedulerDriver import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.config import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient @@ -52,9 +55,20 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc: SparkContext, master: String, securityManager: SecurityManager) - extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) - with org.apache.mesos.Scheduler - with MesosSchedulerUtils { + extends CoarseGrainedSchedulerBackend( + scheduler, + sc.env.rpcEnv, + Some(new HadoopDelegationTokenManager( + sc.conf, + SparkHadoopUtil.get.newConfiguration(sc.conf), + Set(FileSystem.get( + SparkHadoopUtil.get.newConfiguration(sc.conf)) + .getHomeDirectory + .getFileSystem( + SparkHadoopUtil.get.newConfiguration(sc.conf))) + ))) + with org.apache.mesos.Scheduler + with MesosSchedulerUtils { // Blacklist a slave after this many failures private val MAX_SLAVE_FAILURES = 2 diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index cbc6e60e839c..12158d0f2d86 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -38,7 +38,7 @@ import org.apache.spark.util.{RpcUtils, ThreadUtils} private[spark] abstract class YarnSchedulerBackend( scheduler: TaskSchedulerImpl, sc: SparkContext) - extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) { + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv, None) { override val minRegisteredRatio = if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { From 64ab5d590a2de7a7e5499a554825b25c9805e594 Mon Sep 17 00:00:00 2001 From: Michael Gummelt Date: Mon, 3 Jul 2017 14:02:21 -0700 Subject: [PATCH 02/21] . --- .../org/apache/spark/deploy/SparkSubmit.scala | 18 +++++++++++------- .../CoarseGrainedExecutorBackend.scala | 14 +++++++------- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index db1bbcf4cab0..80f5c7d4c2a5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -583,14 +583,8 @@ object SparkSubmit extends CommandLineUtils with Logging { } } - // [SPARK-20328]. HadoopRDD calls into a Hadoop library that fetches delegation tokens with - // renewer set to the YARN ResourceManager. Since YARN isn't configured in Mesos mode, we - // must trick it into thinking we're YARN. if (clusterManager == MESOS && UserGroupInformation.isSecurityEnabled) { - val shortUserName = UserGroupInformation.getCurrentUser.getShortUserName - val key = s"spark.hadoop.${YarnConfiguration.RM_PRINCIPAL}" - logDebug(s"Setting ${key} to ${shortUserName}") - sysProps.put(key, shortUserName) + setRMPrincipal(sysProps) } // In yarn-cluster mode, use yarn.Client as a wrapper around the user class @@ -677,6 +671,16 @@ object SparkSubmit extends CommandLineUtils with Logging { (childArgs, childClasspath, sysProps, childMainClass) } + // [SPARK-20328]. HadoopRDD calls into a Hadoop library that fetches delegation tokens with + // renewer set to the YARN ResourceManager. Since YARN isn't configured in Mesos mode, we + // must trick it into thinking we're YARN. + private def setRMPrincipal(sysProps: HashMap[String, String]): Unit = { + val shortUserName = UserGroupInformation.getCurrentUser.getShortUserName + val key = s"spark.hadoop.${YarnConfiguration.RM_PRINCIPAL}" + logDebug(s"Setting ${key} to ${shortUserName}") + sysProps.put(key, shortUserName) + } + /** * Run the main method of the child class using the provided launch environment. * diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index c69f3211e73f..1be9bbeaf887 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -25,9 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable import scala.util.{Failure, Success} import scala.util.control.NonFatal - -import org.apache.hadoop.security.UserGroupInformation - +import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.deploy.SparkHadoopUtil @@ -222,7 +220,10 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { SparkHadoopUtil.get.startCredentialUpdater(driverConf) } - cfg.ugiTokens.foreach(addDelegationTokens(_, driverConf)) + cfg.ugiTokens.foreach { ugiTokens => + val creds = new CredentialsSerializer().deserializeTokens(ugiTokens) + addCredentials(creds) + } val env = SparkEnv.createExecutorEnv( driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false) @@ -237,9 +238,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { } } - private def addDelegationTokens(tokenBytes: Array[Byte], driverConf: SparkConf): Unit = { - val creds = new CredentialsSerializer().deserializeTokens(tokenBytes) - + /** Add Credentials to the currently logged in user. */ + private def addCredentials(creds: Credentials): Unit = { logInfo(s"Adding ${creds.numberOfTokens()} tokens and ${creds.numberOfSecretKeys()} secret" + s"keys to the current user's credentials.") From b1a90a925fdf1cf2e292c2a801b9cae3ce65ade0 Mon Sep 17 00:00:00 2001 From: Michael Gummelt Date: Mon, 3 Jul 2017 14:10:23 -0700 Subject: [PATCH 03/21] . --- .../spark/executor/CoarseGrainedExecutorBackend.scala | 4 ++-- .../scheduler/cluster/CoarseGrainedClusterMessage.scala | 2 +- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 1be9bbeaf887..fbdc09ebba6c 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -220,8 +220,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { SparkHadoopUtil.get.startCredentialUpdater(driverConf) } - cfg.ugiTokens.foreach { ugiTokens => - val creds = new CredentialsSerializer().deserializeTokens(ugiTokens) + cfg.hadoopCreds.foreach { hadoopCreds => + val creds = new CredentialsSerializer().deserialize(hadoopCreds) addCredentials(creds) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 52b5ce2a06cc..071de213b667 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -33,7 +33,7 @@ private[spark] object CoarseGrainedClusterMessages { case class SparkAppConfig( sparkProperties: Seq[(String, String)], ioEncryptionKey: Option[Array[Byte]], - ugiTokens: Option[Array[Byte]]) + hadoopCreds: Option[Array[Byte]]) extends CoarseGrainedClusterMessage case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index d8e20ce76e62..ec662fc1dc84 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -102,12 +102,12 @@ class CoarseGrainedSchedulerBackend( // The num of current max ExecutorId used to re-register appMaster @volatile protected var currentExecutorIdCounter = 0 - private val userTokens = if (UserGroupInformation.isSecurityEnabled) { + private val hadoopCreds = if (UserGroupInformation.isSecurityEnabled) { hadoopDelegationTokenManager.map { manager => val creds = UserGroupInformation.getCurrentUser.getCredentials val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) manager.obtainDelegationTokens(hadoopConf, creds) - new CredentialsSerializer().serializeTokens(creds) + new CredentialsSerializer().serialize(creds) } } else { None @@ -244,7 +244,7 @@ class CoarseGrainedSchedulerBackend( val reply = SparkAppConfig( sparkProperties, SparkEnv.get.securityManager.getIOEncryptionKey(), - userTokens) + hadoopCreds) context.reply(reply) } From 794d26e05a968a4562450d828ead731f97fa2536 Mon Sep 17 00:00:00 2001 From: Michael Gummelt Date: Mon, 3 Jul 2017 14:19:14 -0700 Subject: [PATCH 04/21] . --- .../CoarseGrainedExecutorBackend.scala | 2 +- .../cluster/CoarseGrainedClusterMessage.scala | 2 +- .../CoarseGrainedSchedulerBackend.scala | 31 ++++++++++++------- 3 files changed, 22 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index fbdc09ebba6c..3ac5c9c172dd 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -220,7 +220,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { SparkHadoopUtil.get.startCredentialUpdater(driverConf) } - cfg.hadoopCreds.foreach { hadoopCreds => + cfg.hadoopDelegationCreds.foreach { hadoopCreds => val creds = new CredentialsSerializer().deserialize(hadoopCreds) addCredentials(creds) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 071de213b667..5d65731dfc30 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -33,7 +33,7 @@ private[spark] object CoarseGrainedClusterMessages { case class SparkAppConfig( sparkProperties: Seq[(String, String)], ioEncryptionKey: Option[Array[Byte]], - hadoopCreds: Option[Array[Byte]]) + hadoopDelegationCreds: Option[Array[Byte]]) extends CoarseGrainedClusterMessage case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index ec662fc1dc84..87f7568e27ce 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -102,16 +102,7 @@ class CoarseGrainedSchedulerBackend( // The num of current max ExecutorId used to re-register appMaster @volatile protected var currentExecutorIdCounter = 0 - private val hadoopCreds = if (UserGroupInformation.isSecurityEnabled) { - hadoopDelegationTokenManager.map { manager => - val creds = UserGroupInformation.getCurrentUser.getCredentials - val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) - manager.obtainDelegationTokens(hadoopConf, creds) - new CredentialsSerializer().serialize(creds) - } - } else { - None - } + private val hadoopDelegationCreds = getHadoopDelegationCreds() class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) extends ThreadSafeRpcEndpoint with Logging { @@ -244,7 +235,7 @@ class CoarseGrainedSchedulerBackend( val reply = SparkAppConfig( sparkProperties, SparkEnv.get.securityManager.getIOEncryptionKey(), - hadoopCreds) + hadoopDelegationCreds) context.reply(reply) } @@ -273,6 +264,7 @@ class CoarseGrainedSchedulerBackend( "messages."))) } + // Make fake resource offers on just one executor private def makeOffers(executorId: String) { // Make sure no executor is killed while some task is launching on it @@ -517,6 +509,7 @@ class CoarseGrainedSchedulerBackend( /** * Request an additional number of executors from the cluster manager. + * * @return whether the request is acknowledged. */ final override def requestExecutors(numAdditionalExecutors: Int): Boolean = { @@ -551,6 +544,7 @@ class CoarseGrainedSchedulerBackend( /** * Update the cluster manager on our scheduling needs. Three bits of information are included * to help it make decisions. + * * @param numExecutors The total number of executors we'd like to have. The cluster manager * shouldn't kill any running executor to reach this number, but, * if all existing executors were to die, this is the number of executors @@ -675,6 +669,7 @@ class CoarseGrainedSchedulerBackend( /** * Kill the given list of executors through the cluster manager. + * * @return whether the kill request is acknowledged. */ protected def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = @@ -682,6 +677,7 @@ class CoarseGrainedSchedulerBackend( /** * Request that the cluster manager kill all executors on a given host. + * * @return whether the kill request is acknowledged. */ final override def killExecutorsOnHost(host: String): Boolean = { @@ -695,6 +691,19 @@ class CoarseGrainedSchedulerBackend( driverEndpoint.send(KillExecutorsOnHost(host)) true } + + private def getHadoopDelegationCreds(): Option[Array[Byte]] = { + if (UserGroupInformation.isSecurityEnabled) { + hadoopDelegationTokenManager.map { manager => + val creds = UserGroupInformation.getCurrentUser.getCredentials + val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) + manager.obtainDelegationTokens(hadoopConf, creds) + new CredentialsSerializer().serialize(creds) + } + } else { + None + } + } } private[spark] object CoarseGrainedSchedulerBackend { From 860351de86d9c802bce6365496f09ed7fbd381bd Mon Sep 17 00:00:00 2001 From: Michael Gummelt Date: Mon, 3 Jul 2017 15:35:04 -0700 Subject: [PATCH 05/21] . --- .../deploy/security/HadoopDelegationTokenManager.scala | 7 +++++++ .../mesos/MesosCoarseGrainedSchedulerBackend.scala | 8 +------- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala index 89b6f52ba4bc..5920685e3c3a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala @@ -55,6 +55,13 @@ private[spark] class HadoopDelegationTokenManager( logDebug(s"Using the following delegation token providers: " + s"${delegationTokenProviders.keys.mkString(", ")}.") + def this(sparkConf: SparkConf, hadoopConf: Configuration) = { + this( + sparkConf, + hadoopConf, + Set(FileSystem.get(hadoopConf).getHomeDirectory.getFileSystem(hadoopConf))) + } + private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = { val providers = List(new HadoopFSDelegationTokenProvider(fileSystems), new HiveDelegationTokenProvider, diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index a8b7dff2cfd7..fc93f992333d 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -60,13 +60,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.env.rpcEnv, Some(new HadoopDelegationTokenManager( sc.conf, - SparkHadoopUtil.get.newConfiguration(sc.conf), - Set(FileSystem.get( - SparkHadoopUtil.get.newConfiguration(sc.conf)) - .getHomeDirectory - .getFileSystem( - SparkHadoopUtil.get.newConfiguration(sc.conf))) - ))) + SparkHadoopUtil.get.newConfiguration(sc.conf)))) with org.apache.mesos.Scheduler with MesosSchedulerUtils { From 7e12dea984781952ff0e93a46e9207f78937f072 Mon Sep 17 00:00:00 2001 From: Michael Gummelt Date: Mon, 3 Jul 2017 16:15:38 -0700 Subject: [PATCH 06/21] comments --- .../spark/deploy/security/HadoopDelegationTokenManager.scala | 1 + .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 1 + 2 files changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala index 5920685e3c3a..5d1dd6eef9ce 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala @@ -55,6 +55,7 @@ private[spark] class HadoopDelegationTokenManager( logDebug(s"Using the following delegation token providers: " + s"${delegationTokenProviders.keys.mkString(", ")}.") + /** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop filesystem */ def this(sparkConf: SparkConf, hadoopConf: Configuration) = { this( sparkConf, diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 87f7568e27ce..1eae59d5a7d9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -102,6 +102,7 @@ class CoarseGrainedSchedulerBackend( // The num of current max ExecutorId used to re-register appMaster @volatile protected var currentExecutorIdCounter = 0 + // Hadoop delegation tokens to be sent to the executors. private val hadoopDelegationCreds = getHadoopDelegationCreds() class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) From 973dce240b77ac6ff2bcc8c90158d81b91d71a55 Mon Sep 17 00:00:00 2001 From: Michael Gummelt Date: Mon, 3 Jul 2017 16:16:25 -0700 Subject: [PATCH 07/21] Add CredentialsSerializer --- .../security/CredentialsSerializer.scala | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 core/src/main/scala/org/apache/spark/deploy/security/CredentialsSerializer.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/security/CredentialsSerializer.scala b/core/src/main/scala/org/apache/spark/deploy/security/CredentialsSerializer.scala new file mode 100644 index 000000000000..549309e8cb9e --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/security/CredentialsSerializer.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.io.{ByteArrayOutputStream, DataOutputStream} + +import org.apache.hadoop.security.Credentials + +class CredentialsSerializer { + def serialize(creds: Credentials): Array[Byte] = { + val byteStream = new ByteArrayOutputStream + val dataStream = new DataOutputStream(byteStream) + creds.writeTokenStorageToStream(dataStream) + byteStream.toByteArray + } + + def deserialize(tokenBytes: Array[Byte]): Credentials = { + val tokensBuf = new java.io.ByteArrayInputStream(tokenBytes) + + val creds = new Credentials() + creds.readTokenStorageStream(new java.io.DataInputStream(tokensBuf)) + creds + } +} + From a9d8998595dbcd8933740ee9474d0d4a11b0ffe4 Mon Sep 17 00:00:00 2001 From: Michael Gummelt Date: Mon, 3 Jul 2017 16:19:56 -0700 Subject: [PATCH 08/21] remove extra comments --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 6 ------ 1 file changed, 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 1eae59d5a7d9..1c5c082bb2d4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -510,7 +510,6 @@ class CoarseGrainedSchedulerBackend( /** * Request an additional number of executors from the cluster manager. - * * @return whether the request is acknowledged. */ final override def requestExecutors(numAdditionalExecutors: Int): Boolean = { @@ -545,7 +544,6 @@ class CoarseGrainedSchedulerBackend( /** * Update the cluster manager on our scheduling needs. Three bits of information are included * to help it make decisions. - * * @param numExecutors The total number of executors we'd like to have. The cluster manager * shouldn't kill any running executor to reach this number, but, * if all existing executors were to die, this is the number of executors @@ -591,7 +589,6 @@ class CoarseGrainedSchedulerBackend( * of requesting a delta of executors risks double counting new executors when there are * insufficient resources to satisfy the first request. We make the assumption here that the * cluster manager will eventually fulfill all requests when resources free up. - * * @return a future whose evaluation indicates whether the request is acknowledged. */ protected def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = @@ -603,7 +600,6 @@ class CoarseGrainedSchedulerBackend( * When asking the executor to be replaced, the executor loss is considered a failure, and * killed tasks that are running on the executor will count towards the failure limits. If no * replacement is being requested, then the tasks will not count towards the limit. - * * @param executorIds identifiers of executors to kill * @param replace whether to replace the killed executors with new ones, default false * @param force whether to force kill busy executors, default false @@ -670,7 +666,6 @@ class CoarseGrainedSchedulerBackend( /** * Kill the given list of executors through the cluster manager. - * * @return whether the kill request is acknowledged. */ protected def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = @@ -678,7 +673,6 @@ class CoarseGrainedSchedulerBackend( /** * Request that the cluster manager kill all executors on a given host. - * * @return whether the kill request is acknowledged. */ final override def killExecutorsOnHost(host: String): Boolean = { From 5c59daa63a91a9c9547e519a8336f07f80aed4c2 Mon Sep 17 00:00:00 2001 From: Michael Gummelt Date: Mon, 3 Jul 2017 16:37:23 -0700 Subject: [PATCH 09/21] style --- .../apache/spark/executor/CoarseGrainedExecutorBackend.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 3ac5c9c172dd..f70b8380084b 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -25,7 +25,9 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable import scala.util.{Failure, Success} import scala.util.control.NonFatal + import org.apache.hadoop.security.{Credentials, UserGroupInformation} + import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.deploy.SparkHadoopUtil From 5848a78cb522562242c7a52f6473df9ab8a75056 Mon Sep 17 00:00:00 2001 From: ArtRand Date: Thu, 20 Jul 2017 15:54:20 -0700 Subject: [PATCH 10/21] addressed comments --- .../apache/spark/deploy/SparkHadoopUtil.scala | 29 +++++++++++--- .../org/apache/spark/deploy/SparkSubmit.scala | 6 ++- .../security/CredentialsSerializer.scala | 40 ------------------- .../CoarseGrainedExecutorBackend.scala | 10 +---- .../CoarseGrainedSchedulerBackend.scala | 14 +++---- .../MesosCoarseGrainedSchedulerBackend.scala | 2 +- .../deploy/yarn/YarnSparkHadoopUtil.scala | 8 ---- 7 files changed, 37 insertions(+), 72 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/deploy/security/CredentialsSerializer.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 6afe58bff522..e8bdecda8890 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy -import java.io.IOException +import java.io._ import java.security.PrivilegedExceptionAction import java.text.DateFormat import java.util.{Arrays, Comparator, Date, Locale} @@ -128,14 +128,18 @@ class SparkHadoopUtil extends Logging { def isYarnMode(): Boolean = { false } - def getCurrentUserCredentials(): Credentials = { null } - - def addCurrentUserCredentials(creds: Credentials) {} - def addSecretKeyToUserCredentials(key: String, secret: String) {} def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { null } + def getCurrentUserCredentials(): Credentials = { + UserGroupInformation.getCurrentUser().getCredentials() + } + + def addCurrentUserCredentials(creds: Credentials): Unit = { + UserGroupInformation.getCurrentUser.addCredentials(creds) + } + def loginUserFromKeytab(principalName: String, keytabFilename: String) { UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename) } @@ -392,6 +396,21 @@ class SparkHadoopUtil extends Logging { s"${if (status.isDirectory) "d" else "-"}$perm") false } + + def serialize(creds: Credentials): Array[Byte] = { + val byteStream = new ByteArrayOutputStream + val dataStream = new DataOutputStream(byteStream) + creds.writeTokenStorageToStream(dataStream) + byteStream.toByteArray + } + + def deserialize(tokenBytes: Array[Byte]): Credentials = { + val tokensBuf = new ByteArrayInputStream(tokenBytes) + + val creds = new Credentials() + creds.readTokenStorageStream(new DataInputStream(tokensBuf)) + creds + } } object SparkHadoopUtil { diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 80f5c7d4c2a5..b585a7205d76 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -67,7 +67,7 @@ private[deploy] object SparkSubmitAction extends Enumeration { * This program handles setting up the classpath with relevant Spark dependencies and provides * a layer over the different cluster managers and deploy modes that Spark supports. */ -object SparkSubmit extends CommandLineUtils with Logging { +object SparkSubmit extends CommandLineUtils { // Cluster managers private val YARN = 1 @@ -677,7 +677,9 @@ object SparkSubmit extends CommandLineUtils with Logging { private def setRMPrincipal(sysProps: HashMap[String, String]): Unit = { val shortUserName = UserGroupInformation.getCurrentUser.getShortUserName val key = s"spark.hadoop.${YarnConfiguration.RM_PRINCIPAL}" - logDebug(s"Setting ${key} to ${shortUserName}") + // scalastyle:off println + printStream.println(s"Setting ${key} to ${shortUserName}") + // scalastyle:off println sysProps.put(key, shortUserName) } diff --git a/core/src/main/scala/org/apache/spark/deploy/security/CredentialsSerializer.scala b/core/src/main/scala/org/apache/spark/deploy/security/CredentialsSerializer.scala deleted file mode 100644 index 549309e8cb9e..000000000000 --- a/core/src/main/scala/org/apache/spark/deploy/security/CredentialsSerializer.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.security - -import java.io.{ByteArrayOutputStream, DataOutputStream} - -import org.apache.hadoop.security.Credentials - -class CredentialsSerializer { - def serialize(creds: Credentials): Array[Byte] = { - val byteStream = new ByteArrayOutputStream - val dataStream = new DataOutputStream(byteStream) - creds.writeTokenStorageToStream(dataStream) - byteStream.toByteArray - } - - def deserialize(tokenBytes: Array[Byte]): Credentials = { - val tokensBuf = new java.io.ByteArrayInputStream(tokenBytes) - - val creds = new Credentials() - creds.readTokenStorageStream(new java.io.DataInputStream(tokensBuf)) - creds - } -} - diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index f70b8380084b..8d02048e3d7a 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -224,7 +224,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { cfg.hadoopDelegationCreds.foreach { hadoopCreds => val creds = new CredentialsSerializer().deserialize(hadoopCreds) - addCredentials(creds) + SparkHadoopUtil.get.addCurrentUserCredentials(creds) } val env = SparkEnv.createExecutorEnv( @@ -240,14 +240,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { } } - /** Add Credentials to the currently logged in user. */ - private def addCredentials(creds: Credentials): Unit = { - logInfo(s"Adding ${creds.numberOfTokens()} tokens and ${creds.numberOfSecretKeys()} secret" + - s"keys to the current user's credentials.") - - UserGroupInformation.getCurrentUser().addCredentials(creds) - } - def main(args: Array[String]) { var driverUrl: String = null var executorId: String = null diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 1c5c082bb2d4..07a34ffdd9d4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.deploy.security.{CredentialsSerializer, HadoopDelegationTokenManager} +import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.Logging import org.apache.spark.rpc._ import org.apache.spark.scheduler._ @@ -46,10 +46,10 @@ import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils, Utils} */ private[spark] class CoarseGrainedSchedulerBackend( - scheduler: TaskSchedulerImpl, - val rpcEnv: RpcEnv, - hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager]) - extends ExecutorAllocationClient with SchedulerBackend with Logging + scheduler: TaskSchedulerImpl, + val rpcEnv: RpcEnv, + hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager]) + extends ExecutorAllocationClient with SchedulerBackend with Logging { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed protected val totalCoreCount = new AtomicInteger(0) @@ -265,7 +265,6 @@ class CoarseGrainedSchedulerBackend( "messages."))) } - // Make fake resource offers on just one executor private def makeOffers(executorId: String) { // Make sure no executor is killed while some task is launching on it @@ -600,6 +599,7 @@ class CoarseGrainedSchedulerBackend( * When asking the executor to be replaced, the executor loss is considered a failure, and * killed tasks that are running on the executor will count towards the failure limits. If no * replacement is being requested, then the tasks will not count towards the limit. + * * @param executorIds identifiers of executors to kill * @param replace whether to replace the killed executors with new ones, default false * @param force whether to force kill busy executors, default false @@ -693,7 +693,7 @@ class CoarseGrainedSchedulerBackend( val creds = UserGroupInformation.getCurrentUser.getCredentials val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) manager.obtainDelegationTokens(hadoopConf, creds) - new CredentialsSerializer().serialize(creds) + SparkHadoopUtil.get.serialize(creds) } } else { None diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index fc93f992333d..1d145961cc51 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -60,7 +60,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.env.rpcEnv, Some(new HadoopDelegationTokenManager( sc.conf, - SparkHadoopUtil.get.newConfiguration(sc.conf)))) + sc.hadoopConfiguration))) with org.apache.mesos.Scheduler with MesosSchedulerUtils { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 4522071bd92e..d8f0609c8d2c 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -71,14 +71,6 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) } - override def getCurrentUserCredentials(): Credentials = { - UserGroupInformation.getCurrentUser().getCredentials() - } - - override def addCurrentUserCredentials(creds: Credentials) { - UserGroupInformation.getCurrentUser().addCredentials(creds) - } - override def addSecretKeyToUserCredentials(key: String, secret: String) { val creds = new Credentials() creds.addSecretKey(new Text(key), secret.getBytes(UTF_8)) From f903e6f5ab51e50ce18ca49dc81f3f6a0543270a Mon Sep 17 00:00:00 2001 From: ArtRand Date: Fri, 21 Jul 2017 10:54:15 -0700 Subject: [PATCH 11/21] style --- .../cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 89e9b83990b7..1f95c98ceff5 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -25,14 +25,12 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.concurrent.Future -import org.apache.hadoop.fs.FileSystem import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} import org.apache.mesos.SchedulerDriver import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState} -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.deploy.mesos.config._ +import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.config import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient From e6a73573c9276d9bd68ae23f38eef15f9897ffef Mon Sep 17 00:00:00 2001 From: ArtRand Date: Fri, 21 Jul 2017 11:11:21 -0700 Subject: [PATCH 12/21] fixed old dep --- .../apache/spark/executor/CoarseGrainedExecutorBackend.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 8d02048e3d7a..a5d60e90210f 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -31,7 +31,6 @@ import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.deploy.security.CredentialsSerializer import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.internal.Logging import org.apache.spark.rpc._ @@ -223,7 +222,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { } cfg.hadoopDelegationCreds.foreach { hadoopCreds => - val creds = new CredentialsSerializer().deserialize(hadoopCreds) + val creds = SparkHadoopUtil.get.deserialize(hadoopCreds) SparkHadoopUtil.get.addCurrentUserCredentials(creds) } From 641bdadabdb070e0308fb00625cff2ab5efd064f Mon Sep 17 00:00:00 2001 From: ArtRand Date: Thu, 10 Aug 2017 13:05:31 -0700 Subject: [PATCH 13/21] addressed comments --- .../cluster/CoarseGrainedSchedulerBackend.scala | 14 ++++++++------ .../cluster/StandaloneSchedulerBackend.scala | 2 +- .../mesos/MesosCoarseGrainedSchedulerBackend.scala | 8 ++++---- .../scheduler/cluster/YarnSchedulerBackend.scala | 2 +- 4 files changed, 14 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index a698138416ff..aa4e5d6a2325 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -46,11 +46,10 @@ import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils, Utils} */ private[spark] class CoarseGrainedSchedulerBackend( - scheduler: TaskSchedulerImpl, - val rpcEnv: RpcEnv, - hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager]) - extends ExecutorAllocationClient with SchedulerBackend with Logging -{ + scheduler: TaskSchedulerImpl, + val rpcEnv: RpcEnv) + extends ExecutorAllocationClient with SchedulerBackend with Logging { + // Use an atomic variable to track total number of cores in the cluster for simplicity and speed protected val totalCoreCount = new AtomicInteger(0) // Total number of executors that are currently registered @@ -102,6 +101,9 @@ class CoarseGrainedSchedulerBackend( // The num of current max ExecutorId used to re-register appMaster @volatile protected var currentExecutorIdCounter = 0 + // hadoop token manager used by some sub-classes (e.g. Mesos) + protected val hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = None + // Hadoop delegation tokens to be sent to the executors. private val hadoopDelegationCreds = getHadoopDelegationCreds() @@ -689,7 +691,7 @@ class CoarseGrainedSchedulerBackend( } private def getHadoopDelegationCreds(): Option[Array[Byte]] = { - if (UserGroupInformation.isSecurityEnabled) { + if (UserGroupInformation.isSecurityEnabled && hadoopDelegationTokenManager.isDefined) { hadoopDelegationTokenManager.map { manager => val creds = UserGroupInformation.getCurrentUser.getCredentials val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index a4509c75f2d3..a4e2a7434128 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -38,7 +38,7 @@ private[spark] class StandaloneSchedulerBackend( scheduler: TaskSchedulerImpl, sc: SparkContext, masters: Array[String]) - extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv, None) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) with StandaloneAppClientListener with Logging { diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 1f95c98ceff5..4476dea72708 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -56,13 +56,13 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( securityManager: SecurityManager) extends CoarseGrainedSchedulerBackend( scheduler, - sc.env.rpcEnv, - Some(new HadoopDelegationTokenManager( - sc.conf, - sc.hadoopConfiguration))) + sc.env.rpcEnv) with org.apache.mesos.Scheduler with MesosSchedulerUtils { + override val hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = + Some(new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration)) + // Blacklist a slave after this many failures private val MAX_SLAVE_FAILURES = 2 diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 10cfd4b8a10a..8452f4377419 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -40,7 +40,7 @@ import org.apache.spark.util.{RpcUtils, ThreadUtils} private[spark] abstract class YarnSchedulerBackend( scheduler: TaskSchedulerImpl, sc: SparkContext) - extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv, None) { + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) { private val stopped = new AtomicBoolean(false) From 797fe4bdb7f91bcda4c96716c3aa071566950275 Mon Sep 17 00:00:00 2001 From: ArtRand Date: Thu, 10 Aug 2017 13:17:20 -0700 Subject: [PATCH 14/21] missed one.. --- .../test/scala/org/apache/spark/HeartbeatReceiverSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index 14d0575a13cc..88916488c0de 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -268,7 +268,7 @@ private class FakeSchedulerBackend( scheduler: TaskSchedulerImpl, rpcEnv: RpcEnv, clusterManagerEndpoint: RpcEndpointRef) - extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv, None) { + extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { protected override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = { clusterManagerEndpoint.ask[Boolean]( From cdd3030fabcecaa00fdf78be489187a5eb1001e9 Mon Sep 17 00:00:00 2001 From: ArtRand Date: Thu, 10 Aug 2017 16:07:05 -0700 Subject: [PATCH 15/21] fixed npe when using kerberos --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 4 ++-- .../cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala | 7 ++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index aa4e5d6a2325..1bf4a5488b2d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -105,7 +105,7 @@ class CoarseGrainedSchedulerBackend( protected val hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = None // Hadoop delegation tokens to be sent to the executors. - private val hadoopDelegationCreds = getHadoopDelegationCreds() + protected val hadoopDelegationCreds: Option[Array[Byte]] = None class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) extends ThreadSafeRpcEndpoint with Logging { @@ -690,7 +690,7 @@ class CoarseGrainedSchedulerBackend( true } - private def getHadoopDelegationCreds(): Option[Array[Byte]] = { + protected def getHadoopDelegationCreds(): Option[Array[Byte]] = { if (UserGroupInformation.isSecurityEnabled && hadoopDelegationTokenManager.isDefined) { hadoopDelegationTokenManager.map { manager => val creds = UserGroupInformation.getCurrentUser.getCredentials diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 4476dea72708..751f09b642ff 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -21,13 +21,12 @@ import java.io.File import java.util.{Collections, List => JList} import java.util.concurrent.locks.ReentrantLock +import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} +import org.apache.mesos.SchedulerDriver import scala.collection.JavaConverters._ import scala.collection.mutable import scala.concurrent.Future -import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} -import org.apache.mesos.SchedulerDriver - import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState} import org.apache.spark.deploy.mesos.config._ import org.apache.spark.deploy.security.HadoopDelegationTokenManager @@ -63,6 +62,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( override val hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = Some(new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration)) + override val hadoopDelegationCreds: Option[Array[Byte]] = getHadoopDelegationCreds() + // Blacklist a slave after this many failures private val MAX_SLAVE_FAILURES = 2 From 63ca4db195caf3b1f1b56614f0387da6936cb513 Mon Sep 17 00:00:00 2001 From: ArtRand Date: Fri, 11 Aug 2017 12:55:41 -0700 Subject: [PATCH 16/21] wip, addressed comments and added tgt support --- .../org/apache/spark/deploy/SparkSubmit.scala | 34 +++++++++++++------ .../spark/deploy/SparkSubmitArguments.scala | 5 +++ .../CoarseGrainedSchedulerBackend.scala | 8 ++--- .../launcher/SparkSubmitOptionParser.java | 2 ++ .../MesosCoarseGrainedSchedulerBackend.scala | 6 ++-- 5 files changed, 36 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 56315dcfde9d..3b0415bdef5a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -582,18 +582,30 @@ object SparkSubmit extends CommandLineUtils { // assure a keytab is available from any place in a JVM if (clusterManager == YARN || clusterManager == LOCAL || clusterManager == MESOS) { if (args.principal != null) { - require(args.keytab != null, "Keytab must be specified when principal is specified") - if (!new File(args.keytab).exists()) { - throw new SparkException(s"Keytab file: ${args.keytab} does not exist") - } else { - // Add keytab and principal configurations in sysProps to make them available - // for later use; e.g. in spark sql, the isolated class loader used to talk - // to HiveMetastore will use these settings. They will be set as Java system - // properties and then loaded by SparkConf - sysProps.put("spark.yarn.keytab", args.keytab) + require(args.keytab != null || args.tgt != null, + "Keytab or TGT must be specified when principal is specified") + if (args.keytab != null) { + require(args.tgt == null, "Keytab and TGT cannot be used at the same time") + if (!new File(args.keytab).exists()) { + throw new SparkException(s"Keytab file: ${args.keytab} does not exist") + } else { + // Add keytab and principal configurations in sysProps to make them available + // for later use; e.g. in spark sql, the isolated class loader used to talk + // to HiveMetastore will use these settings. They will be set as Java system + // properties and then loaded by SparkConf + sysProps.put("spark.yarn.keytab", args.keytab) + sysProps.put("spark.yarn.principal", args.principal) + UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) + } + } + if (args.tgt != null) { + if (!new File(args.tgt).exists()) { + throw new SparkException(s"TGT file: ${args.tgt} does not exist") + } + sysProps.put("spark.kerberos.tgt", args.tgt) sysProps.put("spark.yarn.principal", args.principal) - - UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) + UserGroupInformation.setLoginUser( + UserGroupInformation.getUGIFromTicketCache(args.tgt, args.principal)) } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index fd1521193fde..5b7f0871b5ce 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -73,6 +73,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S var proxyUser: String = null var principal: String = null var keytab: String = null + var tgt: String = null // Standalone cluster mode only var supervise: Boolean = false @@ -197,6 +198,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S .getOrElse(sparkProperties.get("spark.executor.instances").orNull) queue = Option(queue).orElse(sparkProperties.get("spark.yarn.queue")).orNull keytab = Option(keytab).orElse(sparkProperties.get("spark.yarn.keytab")).orNull + tgt = Option(tgt).orElse(sparkProperties.get("spark.kerberos.tgt")).orNull principal = Option(principal).orElse(sparkProperties.get("spark.yarn.principal")).orNull // Try to set main class from JAR if no --class argument is given @@ -453,6 +455,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S case KEYTAB => keytab = value + case TGT => + tgt = value + case HELP => printUsageAndExit(0) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 1bf4a5488b2d..a54bf95c1cc9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -45,9 +45,7 @@ import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils, Utils} * (spark.deploy.*). */ private[spark] -class CoarseGrainedSchedulerBackend( - scheduler: TaskSchedulerImpl, - val rpcEnv: RpcEnv) +class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv) extends ExecutorAllocationClient with SchedulerBackend with Logging { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed @@ -102,10 +100,10 @@ class CoarseGrainedSchedulerBackend( @volatile protected var currentExecutorIdCounter = 0 // hadoop token manager used by some sub-classes (e.g. Mesos) - protected val hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = None + protected var hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = None // Hadoop delegation tokens to be sent to the executors. - protected val hadoopDelegationCreds: Option[Array[Byte]] = None + protected var hadoopDelegationCreds: Option[Array[Byte]] = None class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) extends ThreadSafeRpcEndpoint with Logging { diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java index 6767cc507964..0c444c11da24 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java @@ -72,6 +72,7 @@ class SparkSubmitOptionParser { protected final String ARCHIVES = "--archives"; protected final String EXECUTOR_CORES = "--executor-cores"; protected final String KEYTAB = "--keytab"; + protected final String TGT = "--tgt"; protected final String NUM_EXECUTORS = "--num-executors"; protected final String PRINCIPAL = "--principal"; protected final String QUEUE = "--queue"; @@ -101,6 +102,7 @@ class SparkSubmitOptionParser { { FILES }, { JARS }, { KEYTAB }, + { TGT }, { KILL_SUBMISSION }, { MASTER }, { NAME }, diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 751f09b642ff..54d9b6cbae6a 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -59,10 +59,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( with org.apache.mesos.Scheduler with MesosSchedulerUtils { - override val hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = - Some(new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration)) + this.hadoopDelegationTokenManager = Some(new HadoopDelegationTokenManager( + sc.conf, sc.hadoopConfiguration)) - override val hadoopDelegationCreds: Option[Array[Byte]] = getHadoopDelegationCreds() + this.hadoopDelegationCreds = getHadoopDelegationCreds() // Blacklist a slave after this many failures private val MAX_SLAVE_FAILURES = 2 From 4a861865531a41f085d2dd6371d3b85617afe714 Mon Sep 17 00:00:00 2001 From: ArtRand Date: Fri, 11 Aug 2017 13:14:21 -0700 Subject: [PATCH 17/21] added some docs for tgt --- docs/configuration.md | 9 +++++++++ docs/security.md | 3 ++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index d3df923c4269..33eaf8e04518 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -510,6 +510,15 @@ Apart from these, the following properties are also available, and may be useful Python binary executable to use for PySpark in both driver and executors. + + spark.kerberos.tgt + + + Full path to a ticket-granting ticket (TGT) for the principal specified with --principal. + + + + ### Shuffle Behavior diff --git a/docs/security.md b/docs/security.md index 9eda42888637..aaabe0bed109 100644 --- a/docs/security.md +++ b/docs/security.md @@ -68,7 +68,8 @@ SSL must be configured on each node and configured for each component involved i ### YARN mode The key-store can be prepared on the client side and then distributed and used by the executors as the part of the application. It is possible because the user is able to deploy files before the application is started in YARN by using `spark.yarn.dist.files` or `spark.yarn.dist.archives` configuration settings. The responsibility for encryption of transferring these files is on YARN side and has nothing to do with Spark. -For long-running apps like Spark Streaming apps to be able to write to HDFS, it is possible to pass a principal and keytab to `spark-submit` via the `--principal` and `--keytab` parameters respectively. The keytab passed in will be copied over to the machine running the Application Master via the Hadoop Distributed Cache (securely - if YARN is configured with SSL and HDFS encryption is enabled). The Kerberos login will be periodically renewed using this principal and keytab and the delegation tokens required for HDFS will be generated periodically so the application can continue writing to HDFS. +### Secure HDFS with YARN and Mesos +For long-running apps like Spark Streaming apps to be able to write to secure HDFS, it is possible to pass a principal and keytab or ticket cache to `spark-submit` via the `--principal`, `--keytab`, and `--tgt` parameters, respectively. The keytab passed in will be copied over to the machine running the Application Master via the Hadoop Distributed Cache (securely - if YARN is configured with SSL and HDFS encryption is enabled). The Kerberos login will be periodically renewed using this principal and keytab and the delegation tokens required for HDFS will be generated periodically so the application can continue writing to HDFS. ### Standalone mode The user needs to provide key-stores and configuration options for master and workers. They have to be set by attaching appropriate Java system properties in `SPARK_MASTER_OPTS` and in `SPARK_WORKER_OPTS` environment variables, or just in `SPARK_DAEMON_JAVA_OPTS`. In this mode, the user may allow the executors to use the SSL settings inherited from the worker which spawned that executor. It can be accomplished by setting `spark.ssl.useNodeLocalConf` to `true`. If that parameter is set, the settings provided by user on the client side, are not used by the executors. From 1f3ad3534297e9d1c619b523ab066a5adf7e8c92 Mon Sep 17 00:00:00 2001 From: ArtRand Date: Fri, 11 Aug 2017 20:23:04 -0700 Subject: [PATCH 18/21] reverted tgt changes --- .../org/apache/spark/deploy/SparkSubmit.scala | 29 +++++-------------- .../spark/deploy/SparkSubmitArguments.scala | 5 ---- docs/configuration.md | 9 ------ docs/security.md | 5 +--- .../launcher/SparkSubmitOptionParser.java | 2 -- 5 files changed, 8 insertions(+), 42 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 3b0415bdef5a..45badf858c20 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -582,30 +582,15 @@ object SparkSubmit extends CommandLineUtils { // assure a keytab is available from any place in a JVM if (clusterManager == YARN || clusterManager == LOCAL || clusterManager == MESOS) { if (args.principal != null) { - require(args.keytab != null || args.tgt != null, - "Keytab or TGT must be specified when principal is specified") if (args.keytab != null) { - require(args.tgt == null, "Keytab and TGT cannot be used at the same time") - if (!new File(args.keytab).exists()) { - throw new SparkException(s"Keytab file: ${args.keytab} does not exist") - } else { - // Add keytab and principal configurations in sysProps to make them available - // for later use; e.g. in spark sql, the isolated class loader used to talk - // to HiveMetastore will use these settings. They will be set as Java system - // properties and then loaded by SparkConf - sysProps.put("spark.yarn.keytab", args.keytab) - sysProps.put("spark.yarn.principal", args.principal) - UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) - } - } - if (args.tgt != null) { - if (!new File(args.tgt).exists()) { - throw new SparkException(s"TGT file: ${args.tgt} does not exist") - } - sysProps.put("spark.kerberos.tgt", args.tgt) + require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist") + // Add keytab and principal configurations in sysProps to make them available + // for later use; e.g. in spark sql, the isolated class loader used to talk + // to HiveMetastore will use these settings. They will be set as Java system + // properties and then loaded by SparkConf + sysProps.put("spark.yarn.keytab", args.keytab) sysProps.put("spark.yarn.principal", args.principal) - UserGroupInformation.setLoginUser( - UserGroupInformation.getUGIFromTicketCache(args.tgt, args.principal)) + UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 5b7f0871b5ce..fd1521193fde 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -73,7 +73,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S var proxyUser: String = null var principal: String = null var keytab: String = null - var tgt: String = null // Standalone cluster mode only var supervise: Boolean = false @@ -198,7 +197,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S .getOrElse(sparkProperties.get("spark.executor.instances").orNull) queue = Option(queue).orElse(sparkProperties.get("spark.yarn.queue")).orNull keytab = Option(keytab).orElse(sparkProperties.get("spark.yarn.keytab")).orNull - tgt = Option(tgt).orElse(sparkProperties.get("spark.kerberos.tgt")).orNull principal = Option(principal).orElse(sparkProperties.get("spark.yarn.principal")).orNull // Try to set main class from JAR if no --class argument is given @@ -455,9 +453,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S case KEYTAB => keytab = value - case TGT => - tgt = value - case HELP => printUsageAndExit(0) diff --git a/docs/configuration.md b/docs/configuration.md index 33eaf8e04518..d3df923c4269 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -510,15 +510,6 @@ Apart from these, the following properties are also available, and may be useful Python binary executable to use for PySpark in both driver and executors. - - spark.kerberos.tgt - - - Full path to a ticket-granting ticket (TGT) for the principal specified with --principal. - - - - ### Shuffle Behavior diff --git a/docs/security.md b/docs/security.md index aaabe0bed109..57cd90e775d9 100644 --- a/docs/security.md +++ b/docs/security.md @@ -66,10 +66,7 @@ The full breakdown of available SSL options can be found on the [configuration SSL must be configured on each node and configured for each component involved in communication using the particular protocol. ### YARN mode -The key-store can be prepared on the client side and then distributed and used by the executors as the part of the application. It is possible because the user is able to deploy files before the application is started in YARN by using `spark.yarn.dist.files` or `spark.yarn.dist.archives` configuration settings. The responsibility for encryption of transferring these files is on YARN side and has nothing to do with Spark. - -### Secure HDFS with YARN and Mesos -For long-running apps like Spark Streaming apps to be able to write to secure HDFS, it is possible to pass a principal and keytab or ticket cache to `spark-submit` via the `--principal`, `--keytab`, and `--tgt` parameters, respectively. The keytab passed in will be copied over to the machine running the Application Master via the Hadoop Distributed Cache (securely - if YARN is configured with SSL and HDFS encryption is enabled). The Kerberos login will be periodically renewed using this principal and keytab and the delegation tokens required for HDFS will be generated periodically so the application can continue writing to HDFS. +The key-store can be prepared on the client side and then distributed and used by the executors as the part of the application. It is possible because the user is able to deploy files before the application is started in YARN by using `spark.yarn.dist.files` or `spark.yarn.dist.archives` configuration settings. The responsibility for encryption of transferring these files is on YARN side and has nothing to do with Spark. For long-running apps like Spark Streaming apps to be able to write to HDFS, it is possible to pass a principal and keytab to `spark-submit` via the `--principal` and `--keytab` parameters respectively. The keytab passed in will be copied over to the machine running the Application Master via the Hadoop Distributed Cache (securely - if YARN is configured with SSL and HDFS encryption is enabled). The Kerberos login will be periodically renewed using this principal and keytab and the delegation tokens required for HDFS will be generated periodically so the application can continue writing to HDFS. ### Standalone mode The user needs to provide key-stores and configuration options for master and workers. They have to be set by attaching appropriate Java system properties in `SPARK_MASTER_OPTS` and in `SPARK_WORKER_OPTS` environment variables, or just in `SPARK_DAEMON_JAVA_OPTS`. In this mode, the user may allow the executors to use the SSL settings inherited from the worker which spawned that executor. It can be accomplished by setting `spark.ssl.useNodeLocalConf` to `true`. If that parameter is set, the settings provided by user on the client side, are not used by the executors. diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java index 0c444c11da24..6767cc507964 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java @@ -72,7 +72,6 @@ class SparkSubmitOptionParser { protected final String ARCHIVES = "--archives"; protected final String EXECUTOR_CORES = "--executor-cores"; protected final String KEYTAB = "--keytab"; - protected final String TGT = "--tgt"; protected final String NUM_EXECUTORS = "--num-executors"; protected final String PRINCIPAL = "--principal"; protected final String QUEUE = "--queue"; @@ -102,7 +101,6 @@ class SparkSubmitOptionParser { { FILES }, { JARS }, { KEYTAB }, - { TGT }, { KILL_SUBMISSION }, { MASTER }, { NAME }, From 857cf31b8b42177033b6d0553cb5a6f3550f417d Mon Sep 17 00:00:00 2001 From: ArtRand Date: Fri, 11 Aug 2017 20:25:21 -0700 Subject: [PATCH 19/21] re-added space in security.md --- docs/security.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/security.md b/docs/security.md index 57cd90e775d9..50f64d69cd0a 100644 --- a/docs/security.md +++ b/docs/security.md @@ -66,7 +66,9 @@ The full breakdown of available SSL options can be found on the [configuration SSL must be configured on each node and configured for each component involved in communication using the particular protocol. ### YARN mode -The key-store can be prepared on the client side and then distributed and used by the executors as the part of the application. It is possible because the user is able to deploy files before the application is started in YARN by using `spark.yarn.dist.files` or `spark.yarn.dist.archives` configuration settings. The responsibility for encryption of transferring these files is on YARN side and has nothing to do with Spark. For long-running apps like Spark Streaming apps to be able to write to HDFS, it is possible to pass a principal and keytab to `spark-submit` via the `--principal` and `--keytab` parameters respectively. The keytab passed in will be copied over to the machine running the Application Master via the Hadoop Distributed Cache (securely - if YARN is configured with SSL and HDFS encryption is enabled). The Kerberos login will be periodically renewed using this principal and keytab and the delegation tokens required for HDFS will be generated periodically so the application can continue writing to HDFS. +The key-store can be prepared on the client side and then distributed and used by the executors as the part of the application. It is possible because the user is able to deploy files before the application is started in YARN by using `spark.yarn.dist.files` or `spark.yarn.dist.archives` configuration settings. The responsibility for encryption of transferring these files is on YARN side and has nothing to do with Spark. + +For long-running apps like Spark Streaming apps to be able to write to HDFS, it is possible to pass a principal and keytab to `spark-submit` via the `--principal` and `--keytab` parameters respectively. The keytab passed in will be copied over to the machine running the Application Master via the Hadoop Distributed Cache (securely - if YARN is configured with SSL and HDFS encryption is enabled). The Kerberos login will be periodically renewed using this principal and keytab and the delegation tokens required for HDFS will be generated periodically so the application can continue writing to HDFS. ### Standalone mode The user needs to provide key-stores and configuration options for master and workers. They have to be set by attaching appropriate Java system properties in `SPARK_MASTER_OPTS` and in `SPARK_WORKER_OPTS` environment variables, or just in `SPARK_DAEMON_JAVA_OPTS`. In this mode, the user may allow the executors to use the SSL settings inherited from the worker which spawned that executor. It can be accomplished by setting `spark.ssl.useNodeLocalConf` to `true`. If that parameter is set, the settings provided by user on the client side, are not used by the executors. From 1d7ddbddea165508c4799a0ed0afdefaa884c340 Mon Sep 17 00:00:00 2001 From: ArtRand Date: Fri, 11 Aug 2017 20:26:51 -0700 Subject: [PATCH 20/21] whitespace --- docs/security.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/security.md b/docs/security.md index 50f64d69cd0a..9eda42888637 100644 --- a/docs/security.md +++ b/docs/security.md @@ -66,7 +66,7 @@ The full breakdown of available SSL options can be found on the [configuration SSL must be configured on each node and configured for each component involved in communication using the particular protocol. ### YARN mode -The key-store can be prepared on the client side and then distributed and used by the executors as the part of the application. It is possible because the user is able to deploy files before the application is started in YARN by using `spark.yarn.dist.files` or `spark.yarn.dist.archives` configuration settings. The responsibility for encryption of transferring these files is on YARN side and has nothing to do with Spark. +The key-store can be prepared on the client side and then distributed and used by the executors as the part of the application. It is possible because the user is able to deploy files before the application is started in YARN by using `spark.yarn.dist.files` or `spark.yarn.dist.archives` configuration settings. The responsibility for encryption of transferring these files is on YARN side and has nothing to do with Spark. For long-running apps like Spark Streaming apps to be able to write to HDFS, it is possible to pass a principal and keytab to `spark-submit` via the `--principal` and `--keytab` parameters respectively. The keytab passed in will be copied over to the machine running the Application Master via the Hadoop Distributed Cache (securely - if YARN is configured with SSL and HDFS encryption is enabled). The Kerberos login will be periodically renewed using this principal and keytab and the delegation tokens required for HDFS will be generated periodically so the application can continue writing to HDFS. From 4c77d54d8479035dbd8fbedbe05ec7c7b79cccaf Mon Sep 17 00:00:00 2001 From: ArtRand Date: Sat, 12 Aug 2017 08:33:05 -0700 Subject: [PATCH 21/21] small formatting change --- .../cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 54d9b6cbae6a..4c0096b1da0b 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -53,11 +53,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc: SparkContext, master: String, securityManager: SecurityManager) - extends CoarseGrainedSchedulerBackend( - scheduler, - sc.env.rpcEnv) - with org.apache.mesos.Scheduler - with MesosSchedulerUtils { + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) + with org.apache.mesos.Scheduler with MesosSchedulerUtils { this.hadoopDelegationTokenManager = Some(new HadoopDelegationTokenManager( sc.conf, sc.hadoopConfiguration))