Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,13 @@ class SparkHadoopUtil extends Logging {
}
}

def addDelegationTokens(tokens: Array[Byte], sparkConf: SparkConf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment about what this method is doing and why it's needed. (YARN never sets the authentication method, so it'd be good to know why Mesos needs to do it.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Always forget this class is public. Add private[spark].

val hadoopConf = newConfiguration(sparkConf)
hadoopConf.set("hadoop.security.authentication", "Token")
UserGroupInformation.setConfiguration(hadoopConf)
addCurrentUserCredentials(deserialize(tokens))
}

/**
* Returns a function that can be called to find Hadoop FileSystem bytes read. If
* getFSBytesReadOnThreadCallback is called from thread r at time t, the returned callback will
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import scala.collection.mutable
import scala.util.{Failure, Success}
import scala.util.control.NonFatal

import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -125,8 +123,11 @@ private[spark] class CoarseGrainedExecutorBackend(
executor.stop()
}
}.start()

// This message is only sent by Mesos Drivers, and is not expected from other
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to add this comment.

// SchedulerBackends at this time
case UpdateDelegationTokens(tokens) =>
Copy link
Contributor

@skonto skonto Sep 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a comment that this is received only in mesos case, since CoarseGrainedExecutorBackend is used by both yarn and standalone.

CoarseGrainedExecutorBackend.addDelegationTokens(tokens, env.conf)
SparkHadoopUtil.get.addDelegationTokens(tokens, env.conf)
}

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
Expand Down Expand Up @@ -178,16 +179,6 @@ private[spark] class CoarseGrainedExecutorBackend(

private[spark] object CoarseGrainedExecutorBackend extends Logging {

private def addDelegationTokens(tokens: Array[Byte], sparkConf: SparkConf) {
logInfo(s"Found delegation tokens of ${tokens.length} bytes")
val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
hadoopConf.set("hadoop.security.authentication", "Token")
UserGroupInformation.setConfiguration(hadoopConf)
val creds = SparkHadoopUtil.get.deserialize(tokens)
// decode tokens and add them to the credentials
UserGroupInformation.getCurrentUser.addCredentials(SparkHadoopUtil.get.deserialize(tokens))
}

private def run(
driverUrl: String,
executorId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}

case UpdateDelegationTokens(tokens) =>
logDebug("Asking each executor to update HDFS delegation tokens")
for ((x, executorData) <- executorDataMap) {
executorData.executorEndpoint.send(UpdateDelegationTokens(tokens))
executorDataMap.values.foreach {
ed => ed.executorEndpoint.send(UpdateDelegationTokens(tokens))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ed => goes in previous line. (The whole thing might fit in one line, too.)

}

}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.scheduler.cluster.mesos

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.SparkContext
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change not needed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SparkException is unused, not sure why it was there in the first place

import org.apache.spark.internal.config._
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,16 +198,19 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
)

if (principal != null) {
// check that the credentials are defined, even though it's likely that auth would have failed
// already if you've made it this far
if (principal != null && hadoopDelegationCreds.isDefined) {
logDebug(s"Principal found ($principal) starting token renewer")
val credentialRenewerThread = new Thread {
setName("MesosCredentialRenewer")
override def run(): Unit = {
val dummy: Option[Array[Byte]] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this for?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoops!

val credentialRenewer =
new MesosCredentialRenewer(
conf,
hadoopDelegationTokenManager.get,
MesosCredentialRenewer.getTokenRenewalInterval(hadoopDelegationCreds.get, conf),
MesosCredentialRenewer.getTokenRenewalTime(hadoopDelegationCreds.get, conf),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sets the first renewal time to be the expiration time of the token.

It should be similar to the way next renewal time in the MesosCredentialRenewer class is calculated so that it renews the first token after 75% of expiration time has passed:

val currTime = System.currentTimeMillis()
val renewTime = MesosCredentialRenewer.getTokenRenewalTime(hadoopDelegationCreds.get, conf)
val rt = 0.75 * (renewTime - currTime)

val credentialRenewer =
   new MesosCredentialRenewer(
     conf,
     hadoopDelegationTokenManager.get,
     (currTime + rt).toLong,
     driverEndpoint)
credentialRenewer.scheduleTokenRenewal()

driverEndpoint)
credentialRenewer.scheduleTokenRenewal()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ class MesosCredentialRenewer(

def scheduleTokenRenewal(): Unit = {
def scheduleRenewal(runnable: Runnable): Unit = {
val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
// val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
val remainingTime = 5000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 5000?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well that's embarrassing, just a debugging tool that I forgot to remove.

if (remainingTime <= 0) {
logInfo("Credentials have expired, creating new ones now.")
runnable.run()
Expand All @@ -82,8 +83,8 @@ class MesosCredentialRenewer(
} catch {
case e: Exception =>
// Log the error and try to write new tokens back in an hour
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment says "an hour" but code has 20 seconds.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, I changed the code to match the YARN equivalent.

logWarning("Couldn't broadcast tokens, trying agin in 20 seconds", e)
credentialRenewerThread.schedule(this, 20, TimeUnit.SECONDS)
logWarning("Couldn't broadcast tokens, trying again in an hour", e)
credentialRenewerThread.schedule(this, 1, TimeUnit.HOURS)
return
}
scheduleRenewal(this)
Expand Down Expand Up @@ -136,15 +137,15 @@ class MesosCredentialRenewer(
}

object MesosCredentialRenewer extends Logging {
def getTokenRenewalInterval(bytes: Array[Byte], conf: SparkConf): Long = {
def getTokenRenewalTime(bytes: Array[Byte], conf: SparkConf): Long = {
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
val creds = SparkHadoopUtil.get.deserialize(bytes)
val intervals = creds.getAllTokens.asScala.flatMap { t =>
val renewalTimes = creds.getAllTokens.asScala.flatMap { t =>
Try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

t -> token
This method does not return an interval, it just returns the new expiration time.
Compare with:

t.renew(hadoopConf)
}.toOption
}
if (intervals.isEmpty) Long.MaxValue else intervals.min
if (renewalTimes.isEmpty) Long.MaxValue else renewalTimes.min
}
}