Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
More wip... probably borked
  • Loading branch information
Ian Hummel committed Feb 28, 2017
commit 25e7639af248bba4f648d13f5dc76a4fe8bfca34
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.util.{ThreadUtils, Utils}

import scala.util.control.NonFatal
Expand All @@ -37,7 +38,7 @@ private[spark] class CredentialUpdater(
@volatile private var lastCredentialsFileSuffix = 0

// TODO move to ConfigBuilder
private val credentialsFile = sparkConf.get("spark.yarn.credentials.file")
private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
private val freshHadoopConf =
SparkHadoopUtil.get.getConfBypassingFSCache(
hadoopConf, new Path(credentialsFile).toUri.getScheme)
Expand All @@ -55,7 +56,7 @@ private[spark] class CredentialUpdater(
/** Start the credential updater task */
def start(): Unit = {
// TODO move to ConfigBuilder
val startTime = sparkConf.getTimeAsMs("spark.yarn.credentials.renewalTime")
val startTime = sparkConf.get(CREDENTIALS_RENEWAL_TIME)
val remainingTime = startTime - System.currentTimeMillis()
if (remainingTime <= 0) {
credentialUpdater.schedule(credentialUpdaterRunnable, 1, TimeUnit.MINUTES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ private[deploy] class ExecutorRunner(


/////////////////////////////
/////////////////// just a one-time token... it can be renewed by driver
/////////////////// but will only last 7 days
/////////////////// for longer, the credetial updater will be used

logInfo(s"APP DESC TOKENS: ${appDesc} tokens ${appDesc.tokens}")
appDesc.tokens.foreach { bytes =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.mutable
import scala.util.{Failure, Success}
import scala.util.control.NonFatal

import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.CREDENTIALS_FILE_PATH
import org.apache.spark.rpc._
import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
Expand Down Expand Up @@ -213,12 +213,18 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
driverConf.set(key, value)
}
}
if (driverConf.contains("spark.yarn.credentials.file")) {

//////////////////////////
////////////////////////// only useful if principal/keytab are specified

if (driverConf.contains(CREDENTIALS_FILE_PATH.key)) {
logInfo("Will periodically update credentials from: " +
driverConf.get("spark.yarn.credentials.file"))
SparkHadoopUtil.get.startCredentialUpdater(driverConf)
}

//////////////////////////////

val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,17 @@ private[spark] class StandaloneSchedulerBackend(
super.start()
launcherBackend.connect()



//////////////////////////////////////////////////////
//////////////////////////////////////////////////////

setupCredentials()


//////////////////////////////////////////////////////
//////////////////////////////////////////////////////

// The endpoint for executors to talk to us
val driverUrl = RpcEndpointAddress(
sc.conf.get("spark.driver.host"),
Expand Down