Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WIP
  • Loading branch information
Ian Hummel committed Feb 26, 2017
commit 539cc6cf630e9429e7131e755d8e9fa12479cd0c
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.deploy

import java.net.URI
import java.nio.ByteBuffer

private[spark] case class ApplicationDescription(
name: String,
Expand All @@ -32,7 +33,8 @@ private[spark] case class ApplicationDescription(
// number of executors this application wants to start with,
// only used if dynamic allocation is enabled
initialExecutorLimit: Option[Int] = None,
user: String = System.getProperty("user.name", "<unknown>")) {
user: String = System.getProperty("user.name", "<unknown>"),
tokens: Option[ByteBuffer] = None) {

override def toString: String = "ApplicationDescription(" + name + ")"
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
package org.apache.spark.scheduler.cluster

import java.io.File
import java.nio.ByteBuffer
import java.util.UUID
import java.util.concurrent.Semaphore

import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext
import org.apache.spark.deploy.client.{StandaloneAppClient, StandaloneAppClientListener}
import org.apache.spark.deploy.security.ConfigurableCredentialManager
import org.apache.spark.deploy.{ApplicationDescription, Command, SparkHadoopUtil}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{KEYTAB, PRINCIPAL}
import org.apache.spark.internal.config.{CREDENTIALS_RENEWAL_TIME, CREDENTIALS_UPDATE_TIME, KEYTAB, PRINCIPAL}
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
import org.apache.spark.rpc.RpcEndpointAddress
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -65,6 +68,8 @@ private[spark] class StandaloneSchedulerBackend(
private var principal: String = null
private var keytab: String = null
private var credentials: Credentials = null
private val credentialManager = new ConfigurableCredentialManager(sc.conf, sc.hadoopConfiguration)


def setupCredentials(): Unit = {
loginFromKeytab = sc.conf.contains(PRINCIPAL.key)
Expand All @@ -84,6 +89,7 @@ private[spark] class StandaloneSchedulerBackend(
}
// Defensive copy of the credentials
credentials = new Credentials(UserGroupInformation.getCurrentUser.getCredentials)
logInfo("Credentials loaded: " + UserGroupInformation.getCurrentUser)
}


Expand Down Expand Up @@ -143,7 +149,62 @@ private[spark] class StandaloneSchedulerBackend(
}
val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)




////////////////////////////////////////////
////////////////////////////////////////////
// Merge credentials obtained from registered providers
val nearestTimeOfNextRenewal = credentialManager.obtainCredentials(sc.hadoopConfiguration, credentials)

if (credentials != null) {
logDebug(SparkHadoopUtil.get.dumpTokens(credentials).mkString("\n"))
}

// If we use principal and keytab to login, also credentials can be renewed some time
// after current time, we should pass the next renewal and updating time to credential
// renewer and updater.
if (loginFromKeytab && nearestTimeOfNextRenewal > System.currentTimeMillis() &&
nearestTimeOfNextRenewal != Long.MaxValue) {

// Valid renewal time is 75% of next renewal time, and the valid update time will be
// slightly later then renewal time (80% of next renewal time). This is to make sure
// credentials are renewed and updated before expired.
val currTime = System.currentTimeMillis()
val renewalTime = (nearestTimeOfNextRenewal - currTime) * 0.75 + currTime
val updateTime = (nearestTimeOfNextRenewal - currTime) * 0.8 + currTime

sc.conf.set(CREDENTIALS_RENEWAL_TIME, renewalTime.toLong)
sc.conf.set(CREDENTIALS_UPDATE_TIME, updateTime.toLong)
}


def setupSecurityToken(appDesc: ApplicationDescription): ApplicationDescription = {
val dob = new DataOutputBuffer
credentials.writeTokenStorageToStream(dob)
appDesc.copy(tokens = Some(ByteBuffer.wrap(dob.getData)))
}

val secureAppDesc = setupSecurityToken(appDesc)

// If we passed in a keytab, make sure we copy the keytab to the staging directory on
// HDFS, and setup the relevant environment vars, so the AM can login again.
// if (loginFromKeytab) {
// logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" +
// " via the YARN Secure Distributed Cache.")
// val (_, localizedPath) = distribute(keytab,
// destName = sparkConf.get(KEYTAB),
// appMasterOnly = true)
// require(localizedPath != null, "Keytab file already distributed.")
// }
////////////////////////////////////////////
////////////////////////////////////////////




client = new StandaloneAppClient(sc.env.rpcEnv, masters, secureAppDesc, this, conf)
client.start()
launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
waitForRegistration()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkException, SparkFunSuite}
import org.scalatest.{Matchers, PrivateMethodTester}

class HadoopFSCredentialProviderSuite
class HadoopFSCredentialProviderSuite
extends SparkFunSuite
with PrivateMethodTester
with Matchers {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -981,6 +981,7 @@ private[spark] class Client(
amContainer
}

// TODO - doesn't actually login from keytab! That's done in SparkSubmit!
def setupCredentials(): Unit = {
loginFromKeytab = sparkConf.contains(PRINCIPAL.key)
if (loginFromKeytab) {
Expand Down