Skip to content
Closed
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,21 @@ class SparkHadoopUtil extends Logging {
if (!new File(keytabFilename).exists()) {
throw new SparkException(s"Keytab file: ${keytabFilename} does not exist")
} else {
logInfo("Attempting to login to Kerberos" +
s" using principal: ${principalName} and keytab: ${keytabFilename}")
logInfo("Attempting to login to Kerberos " +
s"using principal: ${principalName} and keytab: ${keytabFilename}")
UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename)
}
}

/**
* Add or overwrite current user's credentials with serialized delegation tokens,
* also confirms correct hadoop configuration is set.
*/
def addDelegationTokens(tokens: Array[Byte], sparkConf: SparkConf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment about what this method is doing and why it's needed. (YARN never sets the authentication method, so it'd be good to know why Mesos needs to do it.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Always forget this class is public. Add private[spark].

UserGroupInformation.setConfiguration(newConfiguration(sparkConf))
addCurrentUserCredentials(deserialize(tokens))
}

/**
* Returns a function that can be called to find Hadoop FileSystem bytes read. If
* getFSBytesReadOnThreadCallback is called from thread r at time t, the returned callback will
Expand Down Expand Up @@ -462,6 +471,19 @@ object SparkHadoopUtil {
}
}

/**
* Given an expiration date (e.g. for Hadoop Delegation Tokens) return a the date
* when a given fraction of the duration until the expiration date has passed.
* Formula: current time + (fraction * (time until expiration))
* @param expirationDate Drop-dead expiration date
* @param fraction fraction of the time until expiration return
* @return Date when the fraction of the time until expiration has passed
*/
def getDateOfNextUpdate(expirationDate: Long, fraction: Double): Long = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add private[spark].

val ct = System.currentTimeMillis
(ct + (fraction * (expirationDate - ct))).toLong
}

/**
* Returns a Configuration object with Spark configuration applied on top. Unlike
* the instance method, this will always return a Configuration instance, and not a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ private[spark] class HadoopDelegationTokenManager(
* Writes delegation tokens to creds. Delegation tokens are fetched from all registered
* providers.
*
* @param hadoopConf hadoop Configuration
* @param creds Credentials that will be updated in place (overwritten)
* @return Time after which the fetched delegation tokens should be renewed.
*/
def obtainDelegationTokens(
Expand All @@ -125,3 +127,4 @@ private[spark] class HadoopDelegationTokenManager(
}.foldLeft(Long.MaxValue)(math.min)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ private[spark] class CoarseGrainedExecutorBackend(
executor.stop()
}
}.start()

case UpdateDelegationTokens(tokenBytes) =>
SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a logInfo saying the tokens are being updated? This has always been helpful when debugging issues with this feature on YARN.

}

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
Expand Down Expand Up @@ -219,9 +222,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
SparkHadoopUtil.get.startCredentialUpdater(driverConf)
}

cfg.hadoopDelegationCreds.foreach { hadoopCreds =>
val creds = SparkHadoopUtil.get.deserialize(hadoopCreds)
SparkHadoopUtil.get.addCurrentUserCredentials(creds)
cfg.hadoopDelegationCreds.foreach { tokens =>
SparkHadoopUtil.get.addDelegationTokens(tokens, driverConf)
}

val env = SparkEnv.createExecutorEnv(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ private[spark] object CoarseGrainedClusterMessages {
case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage
with RegisterExecutorResponse

case class UpdateDelegationTokens(tokens: Array[Byte])
extends CoarseGrainedClusterMessage

// Executors to driver
case class RegisterExecutor(
executorId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,7 @@ import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Future

import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.Logging
import org.apache.spark.rpc._
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -99,11 +95,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// The num of current max ExecutorId used to re-register appMaster
@volatile protected var currentExecutorIdCounter = 0

// hadoop token manager used by some sub-classes (e.g. Mesos)
def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = None
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No longer needed because resource-manager backends (may) implement their own initializeHadoopDelegationTokens.


// Hadoop delegation tokens to be sent to the executors.
val hadoopDelegationCreds: Option[Array[Byte]] = getHadoopDelegationCreds()
private val hadoopDelegationTokens: () => Option[Array[Byte]] = fetchHadoopDelegationTokens

class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {
Expand Down Expand Up @@ -159,6 +151,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
scheduler.getExecutorsAliveOnHost(host).foreach { exec =>
killExecutors(exec.toSeq, replace = true, force = true)
}

case UpdateDelegationTokens(newDelegationTokens) =>
// Update the driver's delegation tokens in case new executors are added later.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stale comment?

executorDataMap.values.foreach { ed =>
ed.executorEndpoint.send(UpdateDelegationTokens(newDelegationTokens))
}
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
Expand Down Expand Up @@ -236,7 +234,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
val reply = SparkAppConfig(
sparkProperties,
SparkEnv.get.securityManager.getIOEncryptionKey(),
hadoopDelegationCreds)
hadoopDelegationTokens.apply())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't you just call fetchHadoopDelegationTokens() directly?

context.reply(reply)
}

Expand Down Expand Up @@ -686,18 +684,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
true
}

protected def getHadoopDelegationCreds(): Option[Array[Byte]] = {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method was only called once, and would discard the renewal time information limiting it's utility.

if (UserGroupInformation.isSecurityEnabled && hadoopDelegationTokenManager.isDefined) {
hadoopDelegationTokenManager.map { manager =>
val creds = UserGroupInformation.getCurrentUser.getCredentials
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
manager.obtainDelegationTokens(hadoopConf, creds)
SparkHadoopUtil.get.serialize(creds)
}
} else {
None
}
}
protected def fetchHadoopDelegationTokens(): Option[Array[Byte]] = { None }
}

private[spark] object CoarseGrainedSchedulerBackend {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.scheduler.cluster.mesos

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.SparkContext
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change not needed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SparkException is unused, not sure why it was there in the first place

import org.apache.spark.internal.config._
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@ import java.util.{Collections, List => JList}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import java.util.concurrent.locks.ReentrantLock

import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.SchedulerDriver
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.Future

import org.apache.hadoop.security.UserGroupInformation
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.SchedulerDriver

import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException, TaskState}
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.config
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
import org.apache.spark.network.netty.SparkTransportConf
Expand Down Expand Up @@ -58,8 +59,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
with org.apache.mesos.Scheduler with MesosSchedulerUtils {

override def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] =
Some(new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration))
private lazy val hadoopDelegationTokenManager: MesosHadoopDelegationTokenManager =
new MesosHadoopDelegationTokenManager(conf, sc.hadoopConfiguration, driverEndpoint)

// Blacklist a slave after this many failures
private val MAX_SLAVE_FAILURES = 2
Expand Down Expand Up @@ -772,6 +773,14 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
offer.getHostname
}
}

override def fetchHadoopDelegationTokens(): Option[Array[Byte]] = {
if (UserGroupInformation.isSecurityEnabled) {
Some(hadoopDelegationTokenManager.getTokens())
} else {
None
}
}
}

private class Slave(val hostname: String) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler.cluster.mesos

import java.security.PrivilegedExceptionAction
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
import org.apache.spark.util.ThreadUtils


/**
* The MesosHadoopDelegationTokenManager fetches and updates Hadoop delegation tokens on the behalf
* of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN AMCredentialRenewer,
* and similarly will renew the Credentials when 75% of the renewal interval has passed.
* The principal difference is that instead of writing the new credentials to HDFS and
* incrementing the timestamp of the file, the new credentials (called Tokens when they are
* serialized) are broadcast to all running executors. On the executor side, when new Tokens are
* received they overwrite the current credentials.
*/
private[spark] class MesosHadoopDelegationTokenManager(
conf: SparkConf, hadoopConfig: Configuration,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One arg per line.

driverEndpoint: RpcEndpointRef)
extends Logging {

private val credentialRenewerThread: ScheduledExecutorService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread")

private val tokenManager: HadoopDelegationTokenManager =
new HadoopDelegationTokenManager(conf, hadoopConfig)

private val principal: String = conf.get(config.PRINCIPAL).orNull

private val (secretFile: String, mode: String) = getSecretFile(conf)

private var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
try {
val creds = UserGroupInformation.getCurrentUser.getCredentials
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds)
logInfo(s"Initialized tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}")
(SparkHadoopUtil.get.serialize(creds), rt)
} catch {
case e: Exception =>
throw new IllegalStateException("Failed to initialize Hadoop delegation tokens\n" +
s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file $secretFile\n\tException: $e")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use e as the cause of the exception you're throwing.

}
}

scheduleTokenRenewal()

private def getSecretFile(conf: SparkConf): (String, String) = {
val keytab = conf.get(config.KEYTAB).orNull
val tgt = conf.getenv("KRB5CCNAME")
require(keytab != null || tgt != null, "A keytab or TGT required.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that really the case? KRB5CCNAME is not a required env variable. It has a default value, and the UGI class will use the credentials from the default location if they're available (and reloading the cache periodically).

So I think you don't really need this, but just to track whether there's a principal and keytab. And you don't need to call getUGIFromTicketCache later on since I'm pretty sure UGI takes care of that for you.

// if both Keytab and TGT are detected we use the Keytab.
val (secretFile, mode) = if (keytab != null && tgt != null) {
logWarning(s"Keytab and TGT were detected, using keytab, " +
s"unset ${config.KEYTAB.key} to use TGT")
(keytab, "keytab")
} else {
val m = if (keytab != null) "keytab" else "tgt"
val sf = if (keytab != null) keytab else tgt
(sf, m)
}

if (principal == null) {
logInfo(s"Using mode: $mode to retrieve Hadoop delegation tokens")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should probably assert that mode is "tgt" in this case.

} else {
logInfo(s"Using principal: $principal with mode: $mode to retrieve Hadoop delegation tokens")
}

logDebug(s"secretFile is $secretFile")
(secretFile, mode)
}

private def scheduleTokenRenewal(): Unit = {
def scheduleRenewal(runnable: Runnable): Unit = {
val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
if (remainingTime <= 0) {
logInfo("Credentials have expired, creating new ones now.")
runnable.run()
} else {
logInfo(s"Scheduling login from keytab in $remainingTime millis.")
credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS)
}
}

val credentialRenewerRunnable =
new Runnable {
override def run(): Unit = {
try {
val tokensBytes = getNewDelegationTokens
broadcastDelegationTokens(tokensBytes)
} catch {
case e: Exception =>
// Log the error and try to write new tokens back in an hour
logWarning("Couldn't broadcast tokens, trying again in an hour", e)
credentialRenewerThread.schedule(this, 1, TimeUnit.HOURS)
return
}
scheduleRenewal(this)
}
}
scheduleRenewal(credentialRenewerRunnable)
}

private def getNewDelegationTokens: Array[Byte] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer to use () in methods that do non-trivial things.

logInfo(s"Attempting to login to KDC with ${conf.get(config.PRINCIPAL).orNull}")
// Get new delegation tokens by logging in with a new UGI
// inspired by AMCredentialRenewer.scala:L174
val ugi = if (mode == "keytab") {
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, secretFile)
} else {
UserGroupInformation.getUGIFromTicketCache(secretFile, principal)
}
logInfo("Successfully logged into KDC")

val tempCreds = ugi.getCredentials
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
var nextRenewalTime = Long.MaxValue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val nextRenewalTime = ugi.doAs(new PrivilegedExceptionAction[Long] { ... }

ugi.doAs(new PrivilegedExceptionAction[Void] {
override def run(): Void = {
nextRenewalTime = tokenManager.obtainDelegationTokens(hadoopConf, tempCreds)
null
}
})

val currTime = System.currentTimeMillis()
timeOfNextRenewal = if (nextRenewalTime <= currTime) {
logWarning(s"Next credential renewal time ($nextRenewalTime) is earlier than " +
s"current time ($currTime), which is unexpected, please check your credential renewal " +
"related configurations in the target services.")
currTime
} else {
SparkHadoopUtil.getDateOfNextUpdate(nextRenewalTime, 0.75)
}
logInfo(s"Time of next renewal is in ${timeOfNextRenewal - System.currentTimeMillis()} ms")

// Add the temp credentials back to the original ones.
UserGroupInformation.getCurrentUser.addCredentials(tempCreds)
SparkHadoopUtil.get.serialize(tempCreds)
}

private def broadcastDelegationTokens(tokens: Array[Byte]) = {
logDebug("Sending new tokens to all executors")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd make this logInfo (similar message in YARN code has helped me a lot).

if (driverEndpoint == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this a require in the constructor?

throw new IllegalStateException("driver endpoint is Null!")
}
driverEndpoint.send(UpdateDelegationTokens(tokens))
}

def getTokens(): Array[Byte] = {
tokens
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tokens is never updated, so fetchHadoopDelegationTokens() will always return the initial set even after it's expired.

Copy link
Author

@ArtRand ArtRand Nov 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this, tokens are now updated for late-joining executors. https://github.com/apache/spark/pull/19272/files#diff-765ac3c4db227cd2c5d796f00794016fR145

}
}

Loading