Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WIP
  • Loading branch information
Ian Hummel committed Mar 8, 2017
commit f743e6b207b7f71034fe617a402f54e0121b13a2
18 changes: 15 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,26 @@

package org.apache.spark.deploy

import java.io.IOException
import java.io.{File, FileOutputStream, IOException}
import java.security.PrivilegedExceptionAction
import java.text.DateFormat
import java.util.{Arrays, Comparator, Date, Locale}

import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import com.google.common.primitives.Longs
import org.apache.commons.codec.binary.Base64
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.BOOTSTRAP_TOKENS
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -350,6 +351,17 @@ class SparkHadoopUtil extends Logging {
}
buffer.toString
}

private[spark] def decodeAndWriteToFile(env: collection.Map[String, String],
key: String, where: File): Unit = {
if (env.contains(key)) {
val creds = new FileOutputStream(where)
val base64 = env.get(key).get
val raw = Base64.decodeBase64(base64)
IOUtils.write(raw, creds)
creds.close()
}
}
}

object SparkHadoopUtil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,17 @@ package org.apache.spark.deploy.rest
import java.io.{DataOutputStream, File, FileInputStream, FileNotFoundException}
import java.net.{ConnectException, HttpURLConnection, SocketException, URL}
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.util.UUID
import java.util.concurrent.TimeoutException
import javax.servlet.http.HttpServletResponse

import com.fasterxml.jackson.core.JsonProcessingException
import org.apache.commons.codec.binary.Base64
import org.apache.commons.io
import org.apache.hadoop.io.{DataInputBuffer, DataOutputBuffer, IOUtils}
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{KEYTAB, PRINCIPAL}
import org.apache.spark.internal.config._
import org.apache.spark.util.Utils
import org.apache.spark.{SparkConf, SparkException, SPARK_VERSION => sparkVersion}

Expand Down Expand Up @@ -217,22 +216,25 @@ private[spark] class RestSubmissionClient(master: String) extends Logging {
io.IOUtils.copy(new FileInputStream(f), dob)
}

message.sparkProperties ++= Map(
KEYTAB.key -> keytabFileName,
KEYTAB.key + ".content" -> keytabContent,
PRINCIPAL.key -> principal
)
message.environmentVariables += KEYTAB_CONTENT.key -> keytabContent
// overwrite with localized version
message.environmentVariables += KEYTAB.key -> keytabFileName
message.sparkProperties += KEYTAB.key -> keytabFileName
}

// Add credentials - works in the case of a user submitting a job
// that completes < YARN max token renewal
// The reason this is here is so you don't strictly need a keytab/principal
// you only need that if you want to run a long running job... but maybe get rid of it?
val credentials = new Credentials(UserGroupInformation.getCurrentUser.getCredentials)
val bootstrapCredentails = base64EncodedValue { dob =>
credentials.writeTokenStorageToStream(dob)
}
if (credentials.getAllTokens.size() > 0) {
val bootstrapCredentails = base64EncodedValue { dob =>
credentials.writeTokenStorageToStream(dob)
}

message.sparkProperties ++= Map(
"spark.yarn.credentials.bootstrap" -> bootstrapCredentails)
logInfo("Security tokens will be sent to driver and executors")
message.environmentVariables += BOOTSTRAP_TOKENS.key -> bootstrapCredentails
}
//////////


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@ import java.nio.charset.StandardCharsets

import scala.collection.JavaConverters._
import com.google.common.io.Files
import org.apache.commons.codec.binary.Base64
import org.apache.commons.io.IOUtils
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.{DriverDescription, SparkHadoopUtil}
import org.apache.spark.deploy.DeployMessages.DriverStateChanged
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.deploy.security.{AMCredentialRenewer, ConfigurableCredentialManager}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.CREDENTIALS_FILE_PATH
import org.apache.spark.internal.config.{BOOTSTRAP_TOKENS, CREDENTIALS_FILE_PATH, KEYTAB, KEYTAB_CONTENT}
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.util.{Clock, ShutdownHookManager, SystemClock, Utils}

Expand All @@ -57,15 +59,6 @@ private[deploy] class DriverRunner(
@volatile private[worker] var finalState: Option[DriverState] = None
@volatile private[worker] var finalException: Option[Exception] = None


///////////////////////
///////////////////////
private var credentialRenewer: AMCredentialRenewer = _
///////////////////////
///////////////////////



// Timeout to wait for when trying to terminate a driver.
private val DRIVER_TERMINATE_TIMEOUT_MS = 10 * 1000

Expand Down Expand Up @@ -191,38 +184,22 @@ private[deploy] class DriverRunner(
val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)



////////////////////
////////////////////
/////////////////////////////
/////////////////// just a one-time token... it can be renewed by driver
/////////////////// but will only last 7 days
/////////////////// for longer, the credetial updater will be used

logInfo(s"Driver description: ${driverDesc} tokens ${driverDesc.tokens}")
driverDesc.tokens.foreach { bytes =>
val creds = new File(driverDir, "driver-credentials-" + driverId)
logInfo("Writing out delegation tokens to " + creds.toString)
Utils.writeByteBuffer(ByteBuffer.wrap(bytes), new FileOutputStream(creds)) // TODO - duh
logInfo(s"Delegation Tokens written out successfully to $creds")
builder.environment.put("HADOOP_TOKEN_FILE_LOCATION", creds.toString)
///////////////
////////////////
////
if (driverDesc.command.environment.contains(BOOTSTRAP_TOKENS.key)) {
val tokenFile = new File(driverDir, "driver-credentials-" + driverId)
SparkHadoopUtil.get.decodeAndWriteToFile(driverDesc.command.environment,
BOOTSTRAP_TOKENS.key, tokenFile)
builder.environment.put("HADOOP_TOKEN_FILE_LOCATION", tokenFile.toString)
}

/// SHOULD BOTH RENEW AND UPDATE CREDENTIALS

// If the credentials file config is present, we must periodically renew tokens. So create
// a new AMDelegationTokenRenewer
if (conf.contains(CREDENTIALS_FILE_PATH.key)) {
// If a principal and keytab have been set, use that to create new credentials for executors
// periodically
val newConf = SparkHadoopUtil.get.newConfiguration(conf)
credentialRenewer =
new ConfigurableCredentialManager(conf, newConf).credentialRenewer()
credentialRenewer.scheduleLoginFromKeytab()
if (driverDesc.command.environment.contains(KEYTAB_CONTENT.key)) {
val keytab = driverDesc.command.environment.get(KEYTAB.key).get
val keytabFile = new File(driverDir, keytab)
SparkHadoopUtil.get.decodeAndWriteToFile(driverDesc.command.environment,
KEYTAB_CONTENT.key, keytabFile)
}


/////////////////////////////
////////////////////
////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import scala.collection.JavaConverters._
import com.google.common.io.Files
import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState, SparkHadoopUtil}
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.BOOTSTRAP_TOKENS
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.util.{ShutdownHookManager, Utils}
import org.apache.spark.util.logging.FileAppender
Expand Down Expand Up @@ -155,30 +156,17 @@ private[deploy] class ExecutorRunner(
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")



/////////////////////////////
/////////////////// just a one-time token... it can be renewed by driver
/////////////////// but will only last 7 days
/////////////////// for longer, the credetial updater will be used

logInfo(s"APP DESC TOKENS: ${appDesc} tokens ${appDesc.tokens}")
appDesc.tokens.foreach { bytes =>
val creds = new File(executorDir, "credentials-" + appId)
logInfo("Writing out delegation tokens to " + creds.toString)
Utils.writeByteBuffer(ByteBuffer.wrap(bytes), new FileOutputStream(creds)) // TODO - duh
logInfo(s"Delegation Tokens written out successfully to $creds")
builder.environment.put("HADOOP_TOKEN_FILE_LOCATION", creds.toString)
/////////////////////////////
if (appDesc.command.environment.contains(BOOTSTRAP_TOKENS.key)) {
val tokenFile = new File(executorDir, "executor-credentials-" + appId)
SparkHadoopUtil.get.decodeAndWriteToFile(appDesc.command.environment,
BOOTSTRAP_TOKENS.key, tokenFile)
builder.environment.put("HADOOP_TOKEN_FILE_LOCATION", tokenFile.toString)
}



/////////////////////////////






/////////////////////////////

// Add webUI log urls
val baseUrl =
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -306,4 +306,15 @@ package object config {
"that hosts fs.defaultFS does not need to be listed here.")
.fallbackConf(NAMENODES_TO_ACCESS)

private[spark] val KEYTAB_CONTENT = ConfigBuilder("spark.yarn.keytab.content")
.doc("Base64 encoded content of spark.yarn.keytab")
.internal()
.stringConf
.createOptional

private[spark] val BOOTSTRAP_TOKENS = ConfigBuilder("spark.deploy.bootstrap.tokens")
.doc("Base64 encoded tokens to propogate to driver/executors laucnhed in standalone mode")
.internal()
.stringConf
.createOptional
}
Loading