-
Notifications
You must be signed in to change notification settings - Fork 29k
[Spark-21842][Mesos] Support Kerberos ticket renewal and creation in Mesos #19272
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
44a6098
781c5a7
18d2c6c
43ab547
f136eaa
488f72a
f5925fd
3f22efe
e522150
837157d
c95f80b
e8bbc9e
1596d80
864ab7e
7e0590a
f93c551
b2fbcf2
4558cea
5f254e5
8df7e37
45b46ed
18d77ff
049e4b5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -142,6 +142,13 @@ class SparkHadoopUtil extends Logging { | |
| } | ||
| } | ||
|
|
||
| def addDelegationTokens(tokens: Array[Byte], sparkConf: SparkConf) { | ||
|
||
| val hadoopConf = newConfiguration(sparkConf) | ||
| hadoopConf.set("hadoop.security.authentication", "Token") | ||
| UserGroupInformation.setConfiguration(hadoopConf) | ||
| addCurrentUserCredentials(deserialize(tokens)) | ||
| } | ||
|
|
||
| /** | ||
| * Returns a function that can be called to find Hadoop FileSystem bytes read. If | ||
| * getFSBytesReadOnThreadCallback is called from thread r at time t, the returned callback will | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -123,6 +123,11 @@ private[spark] class CoarseGrainedExecutorBackend( | |
| executor.stop() | ||
| } | ||
| }.start() | ||
|
|
||
| // This message is only sent by Mesos Drivers, and is not expected from other | ||
|
||
| // SchedulerBackends at this time | ||
| case UpdateDelegationTokens(tokens) => | ||
|
||
| SparkHadoopUtil.get.addDelegationTokens(tokens, env.conf) | ||
| } | ||
|
|
||
| override def onDisconnected(remoteAddress: RpcAddress): Unit = { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -159,6 +159,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp | |
| scheduler.getExecutorsAliveOnHost(host).foreach { exec => | ||
| killExecutors(exec.toSeq, replace = true, force = true) | ||
| } | ||
|
|
||
| case UpdateDelegationTokens(tokens) => | ||
| executorDataMap.values.foreach { | ||
| ed => ed.executorEndpoint.send(UpdateDelegationTokens(tokens)) | ||
|
||
| } | ||
| } | ||
|
|
||
| override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,7 +17,7 @@ | |
|
|
||
| package org.apache.spark.scheduler.cluster.mesos | ||
|
|
||
| import org.apache.spark.{SparkContext, SparkException} | ||
| import org.apache.spark.SparkContext | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Change not needed?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| import org.apache.spark.internal.config._ | ||
| import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -22,12 +22,13 @@ import java.util.{Collections, List => JList} | |||
| import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} | ||||
| import java.util.concurrent.locks.ReentrantLock | ||||
|
|
||||
| import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} | ||||
| import org.apache.mesos.SchedulerDriver | ||||
| import scala.collection.JavaConverters._ | ||||
| import scala.collection.mutable | ||||
| import scala.concurrent.Future | ||||
|
|
||||
| import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} | ||||
| import org.apache.mesos.SchedulerDriver | ||||
|
|
||||
| import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState} | ||||
| import org.apache.spark.deploy.mesos.config._ | ||||
| import org.apache.spark.deploy.security.HadoopDelegationTokenManager | ||||
|
|
@@ -39,6 +40,7 @@ import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl} | |||
| import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend | ||||
| import org.apache.spark.util.Utils | ||||
|
|
||||
|
|
||||
| /** | ||||
| * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds | ||||
| * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever | ||||
|
|
@@ -60,6 +62,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( | |||
| override def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = | ||||
| Some(new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration)) | ||||
|
|
||||
| private val principal = conf.get("spark.yarn.principal", null) | ||||
|
||||
|
|
||||
| // Blacklist a slave after this many failures | ||||
| private val MAX_SLAVE_FAILURES = 2 | ||||
|
|
||||
|
|
@@ -194,6 +198,26 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( | |||
| sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix) | ||||
| ) | ||||
|
|
||||
| // check that the credentials are defined, even though it's likely that auth would have failed | ||||
| // already if you've made it this far | ||||
| if (principal != null && hadoopDelegationCreds.isDefined) { | ||||
| logDebug(s"Principal found ($principal) starting token renewer") | ||||
| val credentialRenewerThread = new Thread { | ||||
| setName("MesosCredentialRenewer") | ||||
| override def run(): Unit = { | ||||
| val credentialRenewer = | ||||
| new MesosCredentialRenewer( | ||||
| conf, | ||||
| hadoopDelegationTokenManager.get, | ||||
| MesosCredentialRenewer.getTokenRenewalTime(hadoopDelegationCreds.get, conf), | ||||
|
||||
| driverEndpoint) | ||||
| credentialRenewer.scheduleTokenRenewal() | ||||
| } | ||||
| } | ||||
|
|
||||
| credentialRenewerThread.start() | ||||
| credentialRenewerThread.join() | ||||
|
||||
| credentialRenewerThread.start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a comment explaining why that thread exists right above the code you linked. Did you look at it?
Also, you're calling join() on the thread, so it's obviously going away.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, sorry, for some reason I understood you you to mean the credential renewer itself. I added a comment to the same effect as the YARN analogue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you really understood why the YARN code needs a thread and why I'm telling you this code does not. Read the comment you added here again; what makes you think the current thread does not have access to those classes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, you're probably right. It appears that the YARN code uses setContextClassLoader(userClassLoader) whereas in Mesos doesn't have userClassLoader or anything like it. Therefore we don't need the separate thread in the Mesos code. Do I have this correct? Thanks for showing me this!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's the gist of it.
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
| @@ -0,0 +1,150 @@ | ||||
| /* | ||||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||||
| * contributor license agreements. See the NOTICE file distributed with | ||||
| * this work for additional information regarding copyright ownership. | ||||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||||
| * (the "License"); you may not use this file except in compliance with | ||||
| * the License. You may obtain a copy of the License at | ||||
| * | ||||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||||
| * | ||||
| * Unless required by applicable law or agreed to in writing, software | ||||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| * See the License for the specific language governing permissions and | ||||
| * limitations under the License. | ||||
| */ | ||||
|
|
||||
| package org.apache.spark.scheduler.cluster.mesos | ||||
|
|
||||
| import java.security.PrivilegedExceptionAction | ||||
| import java.util.concurrent.{Executors, TimeUnit} | ||||
|
|
||||
| import scala.collection.JavaConverters._ | ||||
| import scala.util.Try | ||||
|
|
||||
| import org.apache.hadoop.security.UserGroupInformation | ||||
|
|
||||
| import org.apache.spark.SparkConf | ||||
| import org.apache.spark.deploy.SparkHadoopUtil | ||||
| import org.apache.spark.deploy.security.HadoopDelegationTokenManager | ||||
| import org.apache.spark.internal.Logging | ||||
| import org.apache.spark.rpc.RpcEndpointRef | ||||
| import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens | ||||
| import org.apache.spark.util.ThreadUtils | ||||
|
|
||||
|
|
||||
| class MesosCredentialRenewer( | ||||
|
||||
| conf: SparkConf, | ||||
| tokenManager: HadoopDelegationTokenManager, | ||||
| nextRenewal: Long, | ||||
| de: RpcEndpointRef) extends Logging { | ||||
|
||||
| private val credentialRenewerThread = | ||||
| Executors.newSingleThreadScheduledExecutor( | ||||
|
||||
| ThreadUtils.namedThreadFactory("Credential Refresh Thread")) | ||||
|
|
||||
| @volatile private var timeOfNextRenewal = nextRenewal | ||||
|
|
||||
| private val principal = conf.get("spark.yarn.principal") | ||||
|
||||
|
|
||||
| private val (secretFile, mode) = getSecretFile(conf) | ||||
|
|
||||
| private def getSecretFile(conf: SparkConf): (String, String) = { | ||||
| val keytab64 = conf.get("spark.yarn.keytab", null) | ||||
|
||||
| val tgt64 = System.getenv("KRB5CCNAME") | ||||
|
||||
| require(keytab64 != null || tgt64 != null, "keytab or tgt required") | ||||
| require(keytab64 == null || tgt64 == null, "keytab and tgt cannot be used at the same time") | ||||
|
||||
| val mode = if (keytab64 != null) "keytab" else "tgt" | ||||
| val secretFile = if (keytab64 != null) keytab64 else tgt64 | ||||
| logInfo(s"Logging in as $principal with mode $mode to retrieve HDFS delegation tokens") | ||||
| logDebug(s"secretFile is $secretFile") | ||||
| (secretFile, mode) | ||||
| } | ||||
|
|
||||
| def scheduleTokenRenewal(): Unit = { | ||||
| def scheduleRenewal(runnable: Runnable): Unit = { | ||||
| val remainingTime = timeOfNextRenewal - System.currentTimeMillis() | ||||
| if (remainingTime <= 0) { | ||||
| logInfo("Credentials have expired, creating new ones now.") | ||||
| runnable.run() | ||||
| } else { | ||||
| logInfo(s"Scheduling login from keytab in $remainingTime millis.") | ||||
| credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS) | ||||
| } | ||||
| } | ||||
|
|
||||
| val credentialRenewerRunnable = | ||||
| new Runnable { | ||||
| override def run(): Unit = { | ||||
| try { | ||||
| val creds = getRenewedDelegationTokens(conf) | ||||
| broadcastDelegationTokens(creds) | ||||
| } catch { | ||||
| case e: Exception => | ||||
| // Log the error and try to write new tokens back in an hour | ||||
|
||||
| logWarning("Couldn't broadcast tokens, trying again in an hour", e) | ||||
| credentialRenewerThread.schedule(this, 1, TimeUnit.HOURS) | ||||
| return | ||||
| } | ||||
| scheduleRenewal(this) | ||||
| } | ||||
| } | ||||
| scheduleRenewal(credentialRenewerRunnable) | ||||
| } | ||||
|
|
||||
| private def getRenewedDelegationTokens(conf: SparkConf): Array[Byte] = { | ||||
| logInfo(s"Attempting to login with ${conf.get("spark.yarn.principal", null)}") | ||||
| // Get new delegation tokens by logging in with a new UGI | ||||
| // inspired by AMCredentialRenewer.scala:L174 | ||||
| val ugi = if (mode == "keytab") { | ||||
|
||||
| UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, secretFile) | ||||
| } else { | ||||
| UserGroupInformation.getUGIFromTicketCache(secretFile, principal) | ||||
| } | ||||
| val tempCreds = ugi.getCredentials | ||||
| val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) | ||||
| var nextRenewalTime = Long.MaxValue | ||||
|
||||
| "spark.yarn.credentials.renewalTime", |
Does the code here cover this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now when the MesosCredentialRenewer is initialized, it renews the current tokens and sets the renewal time to whatever the expiration time of those tokens is. On a driver restart, the same thing would happen. We could add spark.yarn.credentials.renewalTime as an override, but if the driver restarts, say 2 days later, spark.yarn.credentials.renewalTime is no longer relevant and it'll just immediately renew anyways.
Relavent code:
https://github.com/mesosphere/spark/blob/spark-21842-450-kerberos-ticket-renewal/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala#L210
^^ Where the initial renewal time is set
https://github.com/mesosphere/spark/blob/spark-21842-450-kerberos-ticket-renewal/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala#L66
^^ where we initialize the renewal time if the renewal time has passed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok so we always re-new when we start by fetching the tokens, got it.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment is redundant.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
t -> token
This method does not return an interval, it just returns the new expiration time.
Compare with:
spark/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
Line 102 in b9ab791
| private def getTokenRenewalInterval( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment about what this method is doing and why it's needed. (YARN never sets the authentication method, so it'd be good to know why Mesos needs to do it.)