Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,13 @@ class SparkHadoopUtil extends Logging {
}
}

def addDelegationTokens(tokens: Array[Byte], sparkConf: SparkConf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment about what this method is doing and why it's needed. (YARN never sets the authentication method, so it'd be good to know why Mesos needs to do it.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Always forget this class is public. Add private[spark].

val hadoopConf = newConfiguration(sparkConf)
hadoopConf.set("hadoop.security.authentication", "Token")
UserGroupInformation.setConfiguration(hadoopConf)
addCurrentUserCredentials(deserialize(tokens))
}

/**
* Returns a function that can be called to find Hadoop FileSystem bytes read. If
* getFSBytesReadOnThreadCallback is called from thread r at time t, the returned callback will
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ private[spark] class CoarseGrainedExecutorBackend(
executor.stop()
}
}.start()

// This message is only sent by Mesos Drivers, and is not expected from other
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to add this comment.

// SchedulerBackends at this time
case UpdateDelegationTokens(tokens) =>
Copy link
Contributor

@skonto skonto Sep 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a comment that this is received only in mesos case, since CoarseGrainedExecutorBackend is used by both yarn and standalone.

SparkHadoopUtil.get.addDelegationTokens(tokens, env.conf)
}

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ private[spark] object CoarseGrainedClusterMessages {
case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage
with RegisterExecutorResponse

case class UpdateDelegationTokens(tokens: Array[Byte]) extends CoarseGrainedClusterMessage

// Executors to driver
case class RegisterExecutor(
executorId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
scheduler.getExecutorsAliveOnHost(host).foreach { exec =>
killExecutors(exec.toSeq, replace = true, force = true)
}

case UpdateDelegationTokens(tokens) =>
executorDataMap.values.foreach {
ed => ed.executorEndpoint.send(UpdateDelegationTokens(tokens))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ed => goes in previous line. (The whole thing might fit in one line, too.)

}
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.scheduler.cluster.mesos

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.SparkContext
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change not needed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SparkException is unused, not sure why it was there in the first place

import org.apache.spark.internal.config._
import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ import java.util.{Collections, List => JList}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import java.util.concurrent.locks.ReentrantLock

import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.SchedulerDriver
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.Future

import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.SchedulerDriver

import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState}
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
Expand All @@ -39,6 +40,7 @@ import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils


/**
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
* onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever
Expand All @@ -60,6 +62,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
override def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] =
Some(new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration))

private val principal = conf.get("spark.yarn.principal", null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This config is defined in core already (PRINCIPAL).


// Blacklist a slave after this many failures
private val MAX_SLAVE_FAILURES = 2

Expand Down Expand Up @@ -194,6 +198,27 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
)

// check that the credentials are defined, even though it's likely that auth would have failed
// already if you've made it this far
if (principal != null && hadoopDelegationCreds.isDefined) {
logDebug(s"Principal found ($principal) starting token renewer")
val credentialRenewerThread = new Thread {
setName("MesosCredentialRenewer")
override def run(): Unit = {
val dummy: Option[Array[Byte]] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this for?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoops!

val credentialRenewer =
new MesosCredentialRenewer(
conf,
hadoopDelegationTokenManager.get,
MesosCredentialRenewer.getTokenRenewalTime(hadoopDelegationCreds.get, conf),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sets the first renewal time to be the expiration time of the token.

It should be similar to the way next renewal time in the MesosCredentialRenewer class is calculated so that it renews the first token after 75% of expiration time has passed:

val currTime = System.currentTimeMillis()
val renewTime = MesosCredentialRenewer.getTokenRenewalTime(hadoopDelegationCreds.get, conf)
val rt = 0.75 * (renewTime - currTime)

val credentialRenewer =
   new MesosCredentialRenewer(
     conf,
     hadoopDelegationTokenManager.get,
     (currTime + rt).toLong,
     driverEndpoint)
credentialRenewer.scheduleTokenRenewal()

driverEndpoint)
credentialRenewer.scheduleTokenRenewal()
}
}

credentialRenewerThread.start()
credentialRenewerThread.join()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need this thread if it's going to be short-lived?

Copy link
Author

@ArtRand ArtRand Oct 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I believe so. If you look here (

) it's the same pattern. The thread needs to run as long as the application driver, correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a comment explaining why that thread exists right above the code you linked. Did you look at it?

Also, you're calling join() on the thread, so it's obviously going away.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sorry, for some reason I understood you you to mean the credential renewer itself. I added a comment to the same effect as the YARN analogue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you really understood why the YARN code needs a thread and why I'm telling you this code does not. Read the comment you added here again; what makes you think the current thread does not have access to those classes?

Copy link
Author

@ArtRand ArtRand Oct 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, you're probably right. It appears that the YARN code uses setContextClassLoader(userClassLoader) whereas in Mesos doesn't have userClassLoader or anything like it. Therefore we don't need the separate thread in the Mesos code. Do I have this correct? Thanks for showing me this!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the gist of it.

}
startScheduler(driver)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler.cluster.mesos

import java.security.PrivilegedExceptionAction
import java.util.concurrent.{Executors, TimeUnit}

import scala.collection.JavaConverters._
import scala.util.Try

import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
import org.apache.spark.util.ThreadUtils


class MesosCredentialRenewer(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private[spark]

conf: SparkConf,
tokenManager: HadoopDelegationTokenManager,
nextRenewal: Long,
de: RpcEndpointRef) extends Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about a more descriptive variable name?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

private val credentialRenewerThread =
Executors.newSingleThreadScheduledExecutor(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ThreadUtils.newDaemonSingleThreadScheduledExecutor?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also changed AMCredentialRenewer to the same.

ThreadUtils.namedThreadFactory("Credential Refresh Thread"))

@volatile private var timeOfNextRenewal = nextRenewal

private val principal = conf.get("spark.yarn.principal")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the PRINCIPAL constant.


private val (secretFile, mode) = getSecretFile(conf)

private def getSecretFile(conf: SparkConf): (String, String) = {
val keytab64 = conf.get("spark.yarn.keytab", null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly there's a KEYTAB constant. Also why 64?

val tgt64 = System.getenv("KRB5CCNAME")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

64?

Also, using conf.getenv would allow tests to be written.

require(keytab64 != null || tgt64 != null, "keytab or tgt required")
require(keytab64 == null || tgt64 == null, "keytab and tgt cannot be used at the same time")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KRB5CCNAME is something that people might have in their environment for various reasons, so I'd avoid this requirement.

val mode = if (keytab64 != null) "keytab" else "tgt"
val secretFile = if (keytab64 != null) keytab64 else tgt64
logInfo(s"Logging in as $principal with mode $mode to retrieve HDFS delegation tokens")
logDebug(s"secretFile is $secretFile")
(secretFile, mode)
}

def scheduleTokenRenewal(): Unit = {
def scheduleRenewal(runnable: Runnable): Unit = {
// val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
val remainingTime = 5000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 5000?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well that's embarrassing, just a debugging tool that I forgot to remove.

if (remainingTime <= 0) {
logInfo("Credentials have expired, creating new ones now.")
runnable.run()
} else {
logInfo(s"Scheduling login from keytab in $remainingTime millis.")
credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS)
}
}

val credentialRenewerRunnable =
new Runnable {
override def run(): Unit = {
try {
val creds = getRenewedDelegationTokens(conf)
broadcastDelegationTokens(creds)
} catch {
case e: Exception =>
// Log the error and try to write new tokens back in an hour
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment says "an hour" but code has 20 seconds.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, I changed the code to match the YARN equivalent.

logWarning("Couldn't broadcast tokens, trying again in an hour", e)
credentialRenewerThread.schedule(this, 1, TimeUnit.HOURS)
return
}
scheduleRenewal(this)
}
}
scheduleRenewal(credentialRenewerRunnable)
}

private def getRenewedDelegationTokens(conf: SparkConf): Array[Byte] = {
logInfo(s"Attempting to login with ${conf.get("spark.yarn.principal", null)}")
// Get new delegation tokens by logging in with a new UGI
// inspired by AMCredentialRenewer.scala:L174
val ugi = if (mode == "keytab") {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see where it refreshes the delegation tokens until the max-lifetime, then re-login with the keytab to get a new delegation tokens that'll last until the max-lifetime.

Does this skip over the potential issues with expiring delegation tokens (after the max-lifetime, 7 days default) by just re-logging in with the keytab every time the delegation tokens need to refresh, and then grabbing a new set of delegation tokens?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @kalvinnchau You are correct, all this does is keep track of when the tokens will expire and renew them at that time. Part of my motivation for doing this is to avoid writing any files to disk (like new TGTs, if that's what you're suggesting). We can simply mount the keytab via the Mesos secrets primitive, then renew the tokens every so often. In order to be consistent I tried to keep this solution as close to YARN as possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The correct way would be for the credential management code to differentiate between token creation and token renewal; that way it would renew tokens at the renewal internal and create new ones after the max lifetime.

But it seems the original implementation took a shortcut and just creates new one instead of renewing existing ones; changing that would require changes in the credential provider interfaces, so this is enough for now.

UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, secretFile)
} else {
UserGroupInformation.getUGIFromTicketCache(secretFile, principal)
}
val tempCreds = ugi.getCredentials
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
var nextRenewalTime = Long.MaxValue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as spark.yarn.credentials.renewalTime, should not we have a common value somewhere?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When driver is restarted in case of yarn the old renewalTime is restored:


Does the code here cover this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now when the MesosCredentialRenewer is initialized, it renews the current tokens and sets the renewal time to whatever the expiration time of those tokens is. On a driver restart, the same thing would happen. We could add spark.yarn.credentials.renewalTime as an override, but if the driver restarts, say 2 days later, spark.yarn.credentials.renewalTime is no longer relevant and it'll just immediately renew anyways.

Relavent code:
https://github.com/mesosphere/spark/blob/spark-21842-450-kerberos-ticket-renewal/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala#L210
^^ Where the initial renewal time is set
https://github.com/mesosphere/spark/blob/spark-21842-450-kerberos-ticket-renewal/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala#L66
^^ where we initialize the renewal time if the renewal time has passed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok so we always re-new when we start by fetching the tokens, got it.

ugi.doAs(new PrivilegedExceptionAction[Void] {
override def run(): Void = {
nextRenewalTime = tokenManager.obtainDelegationTokens(hadoopConf, tempCreds)
null
}
})

val currTime = System.currentTimeMillis()
timeOfNextRenewal = if (nextRenewalTime <= currTime) {
logWarning(s"Next credential renewal time ($nextRenewalTime) is earlier than " +
s"current time ($currTime), which is unexpected, please check your credential renewal " +
"related configurations in the target services.")
currTime
} else {
val rt = 0.75 * (nextRenewalTime - currTime)
(currTime + rt).toLong
}
logInfo(s"Time of next renewal is $timeOfNextRenewal")

// Add the temp credentials back to the original ones.
UserGroupInformation.getCurrentUser.addCredentials(tempCreds)
SparkHadoopUtil.get.serialize(tempCreds)
}

private def broadcastDelegationTokens(tokens: Array[Byte]): Unit = {
// send token to existing executors
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment is redundant.

logInfo("Sending new tokens to all executors")
de.send(UpdateDelegationTokens(tokens))
}
}

object MesosCredentialRenewer extends Logging {
def getTokenRenewalTime(bytes: Array[Byte], conf: SparkConf): Long = {
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
val creds = SparkHadoopUtil.get.deserialize(bytes)
val renewalTimes = creds.getAllTokens.asScala.flatMap { t =>
Try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

t -> token
This method does not return an interval, it just returns the new expiration time.
Compare with:

t.renew(hadoopConf)
}.toOption
}
if (renewalTimes.isEmpty) Long.MaxValue else renewalTimes.min
}
}