-
Notifications
You must be signed in to change notification settings - Fork 29k
[Spark-21842][Mesos] Support Kerberos ticket renewal and creation in Mesos #19272
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
44a6098
781c5a7
18d2c6c
43ab547
f136eaa
488f72a
f5925fd
3f22efe
e522150
837157d
c95f80b
e8bbc9e
1596d80
864ab7e
7e0590a
f93c551
b2fbcf2
4558cea
5f254e5
8df7e37
45b46ed
18d77ff
049e4b5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -142,6 +142,17 @@ class SparkHadoopUtil extends Logging { | |
| } | ||
| } | ||
|
|
||
| def addDelegationTokens(tokens: Array[Byte], sparkConf: SparkConf) { | ||
|
||
| logInfo(s"Found delegation tokens of ${tokens.length} bytes") | ||
| val hadoopConf = newConfiguration(sparkConf) | ||
| hadoopConf.set("hadoop.security.authentication", "Token") | ||
| UserGroupInformation.setConfiguration(hadoopConf) | ||
| // decode tokens and add them to the credentials | ||
| val creds = deserialize(tokens) | ||
| uglyF(s"creds $creds id") | ||
| addCurrentUserCredentials(deserialize(tokens)) | ||
| } | ||
|
|
||
| /** | ||
| * Returns a function that can be called to find Hadoop FileSystem bytes read. If | ||
| * getFSBytesReadOnThreadCallback is called from thread r at time t, the returned callback will | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -125,8 +125,11 @@ private[spark] class CoarseGrainedExecutorBackend( | |
| executor.stop() | ||
| } | ||
| }.start() | ||
|
|
||
| // This message is only sent by Mesos Drivers, and is not expected from other | ||
|
||
| // SchedulerBackends at this time | ||
| case UpdateDelegationTokens(tokens) => | ||
|
||
| CoarseGrainedExecutorBackend.addDelegationTokens(tokens, env.conf) | ||
| SparkHadoopUtil.get.addDelegationTokens(tokens, env.conf) | ||
| } | ||
|
|
||
| override def onDisconnected(remoteAddress: RpcAddress): Unit = { | ||
|
|
@@ -178,16 +181,6 @@ private[spark] class CoarseGrainedExecutorBackend( | |
|
|
||
| private[spark] object CoarseGrainedExecutorBackend extends Logging { | ||
|
|
||
| private def addDelegationTokens(tokens: Array[Byte], sparkConf: SparkConf) { | ||
| logInfo(s"Found delegation tokens of ${tokens.length} bytes") | ||
| val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf) | ||
| hadoopConf.set("hadoop.security.authentication", "Token") | ||
| UserGroupInformation.setConfiguration(hadoopConf) | ||
| val creds = SparkHadoopUtil.get.deserialize(tokens) | ||
| // decode tokens and add them to the credentials | ||
| UserGroupInformation.getCurrentUser.addCredentials(SparkHadoopUtil.get.deserialize(tokens)) | ||
| } | ||
|
|
||
| private def run( | ||
| driverUrl: String, | ||
| executorId: String, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -165,7 +165,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp | |
| for ((x, executorData) <- executorDataMap) { | ||
|
||
| executorData.executorEndpoint.send(UpdateDelegationTokens(tokens)) | ||
| } | ||
|
|
||
| } | ||
|
|
||
| override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -198,16 +198,19 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( | |
| sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix) | ||
| ) | ||
|
|
||
| if (principal != null) { | ||
| // check that the credentials are defined, even though it's likely that auth would have failed | ||
| // adready if you've made it this far | ||
| if (principal != null && hadoopDelegationCreds.isDefined) { | ||
| logDebug(s"Principal found ($principal) starting token renewer") | ||
| val credentialRenewerThread = new Thread { | ||
| setName("MesosCredentialRenewer") | ||
| override def run(): Unit = { | ||
| val dummy: Option[Array[Byte]] = None | ||
|
||
| val credentialRenewer = | ||
| new MesosCredentialRenewer( | ||
| conf, | ||
| hadoopDelegationTokenManager.get, | ||
| MesosCredentialRenewer.getTokenRenewalInterval(hadoopDelegationCreds.get, conf), | ||
| MesosCredentialRenewer.getTokenRenewalTime(hadoopDelegationCreds.get, conf), | ||
|
||
| driverEndpoint) | ||
| credentialRenewer.scheduleTokenRenewal() | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -63,7 +63,8 @@ class MesosCredentialRenewer( | |||
|
|
||||
| def scheduleTokenRenewal(): Unit = { | ||||
| def scheduleRenewal(runnable: Runnable): Unit = { | ||||
| val remainingTime = timeOfNextRenewal - System.currentTimeMillis() | ||||
| // val remainingTime = timeOfNextRenewal - System.currentTimeMillis() | ||||
| val remainingTime = 5000 | ||||
|
||||
| if (remainingTime <= 0) { | ||||
| logInfo("Credentials have expired, creating new ones now.") | ||||
| runnable.run() | ||||
|
|
@@ -136,15 +137,15 @@ class MesosCredentialRenewer( | |||
| } | ||||
|
|
||||
| object MesosCredentialRenewer extends Logging { | ||||
| def getTokenRenewalInterval(bytes: Array[Byte], conf: SparkConf): Long = { | ||||
| def getTokenRenewalTime(bytes: Array[Byte], conf: SparkConf): Long = { | ||||
| val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) | ||||
| val creds = SparkHadoopUtil.get.deserialize(bytes) | ||||
| val intervals = creds.getAllTokens.asScala.flatMap { t => | ||||
| val renewalTimes = creds.getAllTokens.asScala.flatMap { t => | ||||
| Try { | ||||
|
||||
| private def getTokenRenewalInterval( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment about what this method is doing and why it's needed. (YARN never sets the authentication method, so it'd be good to know why Mesos needs to do it.)