Skip to content
Closed
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,21 @@ class SparkHadoopUtil extends Logging {
if (!new File(keytabFilename).exists()) {
throw new SparkException(s"Keytab file: ${keytabFilename} does not exist")
} else {
logInfo("Attempting to login to Kerberos" +
s" using principal: ${principalName} and keytab: ${keytabFilename}")
logInfo("Attempting to login to Kerberos " +
s"using principal: ${principalName} and keytab: ${keytabFilename}")
UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename)
}
}

/**
* Add or overwrite current user's credentials with serialized delegation tokens,
* also confirms correct hadoop configuration is set.
*/
def addDelegationTokens(tokens: Array[Byte], sparkConf: SparkConf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment about what this method is doing and why it's needed. (YARN never sets the authentication method, so it'd be good to know why Mesos needs to do it.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Always forget this class is public. Add private[spark].

UserGroupInformation.setConfiguration(newConfiguration(sparkConf))
addCurrentUserCredentials(deserialize(tokens))
}

/**
* Returns a function that can be called to find Hadoop FileSystem bytes read. If
* getFSBytesReadOnThreadCallback is called from thread r at time t, the returned callback will
Expand Down Expand Up @@ -462,6 +471,19 @@ object SparkHadoopUtil {
}
}

/**
* Given an expiration date (e.g. for Hadoop Delegation Tokens) return a the date
* when a given fraction of the duration until the expiration date has passed.
* Formula: current time + (fraction * (time until expiration))
* @param expirationDate Drop-dead expiration date
* @param fraction fraction of the time until expiration return
* @return Date when the fraction of the time until expiration has passed
*/
def getDateOfNextUpdate(expirationDate: Long, fraction: Double): Long = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add private[spark].

val ct = System.currentTimeMillis
(ct + (fraction * (expirationDate - ct))).toLong
}

/**
* Returns a Configuration object with Spark configuration applied on top. Unlike
* the instance method, this will always return a Configuration instance, and not a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ private[spark] class HadoopDelegationTokenManager(
logDebug(s"Using the following delegation token providers: " +
s"${delegationTokenProviders.keys.mkString(", ")}.")

/** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop filesystem */
/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not really changing anything, so I'd just revert changes to this file. Or, if you really want to, just keep the new @params you're adding below.

* Construct a [[HadoopDelegationTokenManager]] for the default Hadoop filesystem.
*/
def this(sparkConf: SparkConf, hadoopConf: Configuration) = {
this(
sparkConf,
Expand Down Expand Up @@ -109,6 +111,8 @@ private[spark] class HadoopDelegationTokenManager(
* Writes delegation tokens to creds. Delegation tokens are fetched from all registered
* providers.
*
* @param hadoopConf hadoop Configuration
* @param creds Credentials that will be updated in place (overwritten)
* @return Time after which the fetched delegation tokens should be renewed.
*/
def obtainDelegationTokens(
Expand All @@ -125,3 +129,4 @@ private[spark] class HadoopDelegationTokenManager(
}.foldLeft(Long.MaxValue)(math.min)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ private[spark] class CoarseGrainedExecutorBackend(
executor.stop()
}
}.start()

case UpdateDelegationTokens(tokenBytes) =>
SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a logInfo saying the tokens are being updated? This has always been helpful when debugging issues with this feature on YARN.

}

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
Expand Down Expand Up @@ -219,9 +222,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
SparkHadoopUtil.get.startCredentialUpdater(driverConf)
}

cfg.hadoopDelegationCreds.foreach { hadoopCreds =>
val creds = SparkHadoopUtil.get.deserialize(hadoopCreds)
SparkHadoopUtil.get.addCurrentUserCredentials(creds)
cfg.hadoopDelegationCreds.foreach { tokens =>
SparkHadoopUtil.get.addDelegationTokens(tokens, driverConf)
}

val env = SparkEnv.createExecutorEnv(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ private[spark] object CoarseGrainedClusterMessages {
case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage
with RegisterExecutorResponse

case class UpdateDelegationTokens(tokens: Array[Byte])
extends CoarseGrainedClusterMessage

// Executors to driver
case class RegisterExecutor(
executorId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@ import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Future

import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.Logging
import org.apache.spark.rpc._
Expand Down Expand Up @@ -99,11 +96,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// The num of current max ExecutorId used to re-register appMaster
@volatile protected var currentExecutorIdCounter = 0

// hadoop token manager used by some sub-classes (e.g. Mesos)
def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = None
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No longer needed because resource-manager backends (may) implement their own initializeHadoopDelegationTokens.


// Hadoop delegation tokens to be sent to the executors.
val hadoopDelegationCreds: Option[Array[Byte]] = getHadoopDelegationCreds()
// Hadoop delegation tokens to be sent to the executors, can be updated as necessary.
protected var hadoopDelegationTokens: Option[Array[Byte]] = initializeHadoopDelegationTokens()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this protected? There's no reason I can see for subclasses to need access to this field.


class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {
Expand Down Expand Up @@ -159,6 +153,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
scheduler.getExecutorsAliveOnHost(host).foreach { exec =>
killExecutors(exec.toSeq, replace = true, force = true)
}

case UpdateDelegationTokens(newDelegationTokens) =>
// Update the driver's delegation tokens in case new executors are added later.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stale comment?

hadoopDelegationTokens = Some(newDelegationTokens)
executorDataMap.values.foreach { ed =>
ed.executorEndpoint.send(UpdateDelegationTokens(newDelegationTokens))
}
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
Expand Down Expand Up @@ -236,7 +237,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
val reply = SparkAppConfig(
sparkProperties,
SparkEnv.get.securityManager.getIOEncryptionKey(),
hadoopDelegationCreds)
hadoopDelegationTokens)
context.reply(reply)
}

Expand Down Expand Up @@ -686,18 +687,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
true
}

protected def getHadoopDelegationCreds(): Option[Array[Byte]] = {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method was only called once, and would discard the renewal time information limiting it's utility.

if (UserGroupInformation.isSecurityEnabled && hadoopDelegationTokenManager.isDefined) {
hadoopDelegationTokenManager.map { manager =>
val creds = UserGroupInformation.getCurrentUser.getCredentials
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
manager.obtainDelegationTokens(hadoopConf, creds)
SparkHadoopUtil.get.serialize(creds)
}
} else {
None
}
}
protected def initializeHadoopDelegationTokens(): Option[Array[Byte]] = { None }
}

private[spark] object CoarseGrainedSchedulerBackend {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.scheduler.cluster.mesos

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.SparkContext
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change not needed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SparkException is unused, not sure why it was there in the first place

import org.apache.spark.internal.config._
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import java.util.{Collections, List => JList}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import java.util.concurrent.locks.ReentrantLock

import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.SchedulerDriver
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.Future

import org.apache.hadoop.security.UserGroupInformation
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.SchedulerDriver

import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkException, TaskState}
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
Expand Down Expand Up @@ -58,8 +60,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
with org.apache.mesos.Scheduler with MesosSchedulerUtils {

override def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] =
Some(new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration))
private lazy val hadoopCredentialRenewer: MesosCredentialRenewer =
Copy link
Author

@ArtRand ArtRand Nov 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Mesos, the credential renewer contains the tokens, the renewal time, and all logic to renew the tokens. Should never be evaluated (and tokens never initialized) if UserGroupInformation.isSecurityEnabled evaluates to false

new MesosCredentialRenewer(
conf, new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why pass in a HadoopDelegationTokenManager if it's not used by this class? The renewer can create one itself.


// Blacklist a slave after this many failures
private val MAX_SLAVE_FAILURES = 2
Expand Down Expand Up @@ -213,6 +216,14 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
)

// check that the credentials are defined, even though it's likely that auth would have failed
// already if you've made it this far, then start the token renewer
if (hadoopDelegationTokens.isDefined) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't do this here, otherwise you need to keep that field protected in the parent class and that adds unnecessary coupling. Instead, do this in initializeHadoopDelegationTokens.

Copy link
Author

@ArtRand ArtRand Nov 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that I shouldn't need to use the conditional hadoopDelegationTokens.isDefined, however there will need to be some check (UserGroupInformation.isSecurityEnabled or similar) to pass the driverEndpoint to the renewer/manager here. When the initial tokens are generated driverEndpoint is still None because start() hasn't been called yet. So I could schedule the renewal, but I'll still have to at least update the driverEndpoint here.

I could initialize the driverEndpoint in initializeHadoopDelegationTokens for Mesos and change around the logic in start() (for the MesosCoarseGrainedSchedulerBackend) but then you're just switching one conditional for another...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may have spoke too soon, there might be a way..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could call initializeHadoopDelegationTokens in start after everything that's needed is initialized. It would also better follow the scheduler's lifecycle.

Copy link
Author

@ArtRand ArtRand Nov 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check out the patch now. hadoopDelegationTokens now calls initializeHadoopDelegationTokens (renamed fetchHadoopDelegationTokens) by name:

  private val hadoopDelegationTokens: () => Option[Array[Byte]] = fetchHadoopDelegationTokens

where

  override def fetchHadoopDelegationTokens(): Option[Array[Byte]] = {
    if (UserGroupInformation.isSecurityEnabled) {
      Some(hadoopDelegationTokenManager.getTokens())
    } else {
      None
    }
  }

This has the effect of only generating the first set of delegation tokens once the first RetrieveSparkAppConfig message is received. At this point, everything has been initialized because renewer (renamed MesosHadoopDelegationTokenManager) is evaluated lazily with the correct driverEndpoint.

  private lazy val hadoopDelegationTokenManager: MesosHadoopDelegationTokenManager =
    new MesosHadoopDelegationTokenManager(conf, sc.hadoopConfiguration, driverEndpoint)

It's maybe a bit confusing to just avoid an extra conditional. WDYT?

// the driver endpoint isn't set when the initial tokens are generated (as well as their
// expiration time, so we pass the driver endpoint here.
hadoopCredentialRenewer.scheduleTokenRenewal(driverEndpoint)
}

launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
startScheduler(driver)
}
Expand Down Expand Up @@ -772,6 +783,14 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
offer.getHostname
}
}

override def initializeHadoopDelegationTokens(): Option[Array[Byte]] = {
if (UserGroupInformation.isSecurityEnabled) {
Some(hadoopCredentialRenewer.tokens)
Copy link
Contributor

@vanzin vanzin Nov 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, seems to me that your "renewer" is doing more than just renewing tokens; it's also being used to generate the initial set. So aside from my comments about initializing the renewer here, you should also probably make this API a little cleaner. Right now there's too much coupling.

The renewer should do renewals only, otherwise it should be called something different.

} else {
None
}
}
}

private class Slave(val hostname: String) {
Expand Down
Loading