Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
org.apache.spark.deploy.security.HadoopFSCredentialProvider
org.apache.spark.deploy.security.HBaseCredentialProvider
45 changes: 35 additions & 10 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,28 @@

package org.apache.spark.deploy

import java.io.IOException
import java.io.{File, FileOutputStream, IOException}
import java.security.PrivilegedExceptionAction
import java.text.DateFormat
import java.util.{Arrays, Comparator, Date, Locale}

import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import com.google.common.primitives.Longs
import org.apache.commons.codec.binary.Base64
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier

import org.apache.spark.{SparkConf, SparkException}
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.deploy.security.{ConfigurableCredentialManager, CredentialUpdater}
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
import org.apache.spark.{SparkConf, SparkException}

import scala.collection.JavaConverters._
import scala.util.control.NonFatal

/**
* :: DeveloperApi ::
Expand All @@ -48,6 +50,9 @@ class SparkHadoopUtil extends Logging {
val conf: Configuration = newConfiguration(sparkConf)
UserGroupInformation.setConfiguration(conf)

private var credentialUpdater: CredentialUpdater = _


/**
* Runs the given function with a Hadoop UserGroupInformation as a thread local variable
* (distributed to child threads), used for authenticating HDFS and YARN calls.
Expand Down Expand Up @@ -290,12 +295,21 @@ class SparkHadoopUtil extends Logging {
* Start a thread to periodically update the current user's credentials with new credentials so
* that access to secured service does not fail.
*/
private[spark] def startCredentialUpdater(conf: SparkConf) {}
private[spark] def startCredentialUpdater(sparkConf: SparkConf): Unit = {
credentialUpdater =
new ConfigurableCredentialManager(sparkConf, newConfiguration(sparkConf)).credentialUpdater()
credentialUpdater.start()
}

/**
* Stop the thread that does the credential updates.
*/
private[spark] def stopCredentialUpdater() {}
private[spark] def stopCredentialUpdater(): Unit = {
if (credentialUpdater != null) {
credentialUpdater.stop()
credentialUpdater = null
}
}

/**
* Return a fresh Hadoop configuration, bypassing the HDFS cache mechanism.
Expand Down Expand Up @@ -353,6 +367,17 @@ class SparkHadoopUtil extends Logging {
}
buffer.toString
}

private[spark] def decodeAndWriteToFile(env: collection.Map[String, String],
key: String, where: File): Unit = {
if (env.contains(key)) {
val creds = new FileOutputStream(where)
val base64 = env.get(key).get
val raw = Base64.decodeBase64(base64)
IOUtils.write(raw, creds)
creds.close()
}
}
}

object SparkHadoopUtil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ object SparkSubmit extends CommandLineUtils {
}

// assure a keytab is available from any place in a JVM
if (clusterManager == YARN || clusterManager == LOCAL) {
if (clusterManager == YARN || clusterManager == LOCAL || clusterManager == STANDALONE) {
if (args.principal != null) {
require(args.keytab != null, "Keytab must be specified when principal is specified")
if (!new File(args.keytab).exists()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,29 @@

package org.apache.spark.deploy.rest

import java.io.{DataOutputStream, FileNotFoundException}
import java.io.{DataOutputStream, File, FileInputStream, FileNotFoundException}
import java.net.{ConnectException, HttpURLConnection, SocketException, URL}
import java.nio.charset.StandardCharsets
import java.util.UUID
import java.util.concurrent.TimeoutException
import javax.servlet.http.HttpServletResponse

import com.fasterxml.jackson.core.JsonProcessingException
import org.apache.commons.codec.binary.Base64
import org.apache.commons.io
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.BOOTSTRAP_TOKENS
import org.apache.spark.util.Utils
import org.apache.spark.{SparkConf, SparkException, SPARK_VERSION => sparkVersion}

import scala.collection.mutable
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.io.Source
import scala.util.control.NonFatal

import com.fasterxml.jackson.core.JsonProcessingException

import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils

/**
* A client that submits applications to a [[RestSubmissionServer]].
*
Expand Down Expand Up @@ -182,6 +187,38 @@ private[spark] class RestSubmissionClient(master: String) extends Logging {
message.appArgs = appArgs
message.sparkProperties = sparkProperties
message.environmentVariables = environmentVariables

def uti(fn: DataOutputBuffer => Unit): String = {
val dob = new DataOutputBuffer
fn(dob)
dob.close()
new String(Base64.encodeBase64(dob.getData))
}

// Propagate kerberos credentials if necessary
if (sparkProperties.contains(PRINCIPAL.key)) {
val principal = sparkProperties.get(PRINCIPAL.key).get
val keytab = sparkProperties.get(KEYTAB.key).orNull
require(keytab != null, "Keytab must be specified when principal is specified.")
logInfo("Attempting to login to the Kerberos" +
s" using principal: $principal and keytab: $keytab")
val f = new File(keytab)
// Generate a file name that can be used for the keytab file, that does not conflict
// with any user file.
val keytabFileName = f.getName + "-" + UUID.randomUUID().toString

logInfo("To enable the driver to login from keytab, credentials are are being copied" +
" to the Master inside the CreateSubmissionRequest")

val keytabContent = Utils.base64EncodedValue { dob =>
io.IOUtils.copy(new FileInputStream(f), dob)
}

message.environmentVariables += BOOTSTRAP_TOKENS -> keytabContent
// overwrite with localized version
message.sparkProperties += KEYTAB.key -> keytabFileName
}

message.validate()
message
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.yarn.security
package org.apache.spark.deploy.security

import java.security.PrivilegedExceptionAction
import java.util.concurrent.{Executors, TimeUnit}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.util.ThreadUtils
Expand All @@ -51,7 +48,7 @@ import org.apache.spark.util.ThreadUtils
* appeared, it will read the credentials and update the currently running UGI with it. This
* process happens again once 80% of the validity of this has expired.
*/
private[yarn] class AMCredentialRenewer(
private[spark] class AMCredentialRenewer(
sparkConf: SparkConf,
hadoopConf: Configuration,
credentialManager: ConfigurableCredentialManager) extends Logging {
Expand All @@ -62,7 +59,7 @@ private[yarn] class AMCredentialRenewer(
Executors.newSingleThreadScheduledExecutor(
ThreadUtils.namedThreadFactory("Credential Refresh Thread"))

private val hadoopUtil = YarnSparkHadoopUtil.get
private val hadoopUtil = SparkHadoopUtil.get

private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
private val daysToKeepFiles = sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,18 @@
* limitations under the License.
*/

package org.apache.spark.deploy.yarn.security
package org.apache.spark.deploy.security

import java.util.ServiceLoader

import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.Credentials

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils

import scala.collection.JavaConverters._

/**
* A ConfigurableCredentialManager to manage all the registered credential providers and offer
* APIs for other modules to obtain credentials as well as renewal time. By default
Expand All @@ -41,7 +40,7 @@ import org.apache.spark.util.Utils
* For example, Hive's credential provider [[HiveCredentialProvider]] can be enabled/disabled by
* the configuration spark.yarn.security.credentials.hive.enabled.
*/
private[yarn] final class ConfigurableCredentialManager(
private[spark] final class ConfigurableCredentialManager(
sparkConf: SparkConf, hadoopConf: Configuration) extends Logging {
private val deprecatedProviderEnabledConfig = "spark.yarn.security.tokens.%s.enabled"
private val providerEnabledConfig = "spark.yarn.security.credentials.%s.enabled"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,29 @@
* limitations under the License.
*/

package org.apache.spark.deploy.yarn.security
package org.apache.spark.deploy.security

import java.util.concurrent.{Executors, TimeUnit}

import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.security.{Credentials, UserGroupInformation}

import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.util.{ThreadUtils, Utils}

import scala.util.control.NonFatal

private[spark] class CredentialUpdater(
sparkConf: SparkConf,
hadoopConf: Configuration,
credentialManager: ConfigurableCredentialManager) extends Logging {

@volatile private var lastCredentialsFileSuffix = 0

// TODO move to ConfigBuilder
private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH)
private val freshHadoopConf =
SparkHadoopUtil.get.getConfBypassingFSCache(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,18 @@
* limitations under the License.
*/

package org.apache.spark.deploy.yarn.security

import scala.reflect.runtime.universe
import scala.util.control.NonFatal
package org.apache.spark.deploy.security

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.token.{Token, TokenIdentifier}

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils

import scala.reflect.runtime.universe
import scala.util.control.NonFatal

private[security] class HBaseCredentialProvider extends ServiceCredentialProvider with Logging {

override def serviceName: String = "hbase"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,19 @@
* limitations under the License.
*/

package org.apache.spark.deploy.yarn.security

import scala.collection.JavaConverters._
import scala.util.Try
package org.apache.spark.deploy.security

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapred.Master
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.{SparkConf, SparkException}

import scala.collection.JavaConverters._
import scala.util.Try

private[security] class HadoopFSCredentialProvider
extends ServiceCredentialProvider with Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
* limitations under the License.
*/

package org.apache.spark.deploy.yarn.security
package org.apache.spark.deploy.security

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.{Credentials, UserGroupInformation}

import org.apache.spark.SparkConf

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,19 @@ import java.io._
import java.net.URI
import java.nio.charset.StandardCharsets

import scala.collection.JavaConverters._

import com.google.common.io.Files

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.{DriverDescription, SparkHadoopUtil}
import org.apache.spark.deploy.DeployMessages.DriverStateChanged
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.deploy.{DriverDescription, SparkHadoopUtil}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.KEYTAB
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.BOOTSTRAP_TOKENS
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.util.{Clock, ShutdownHookManager, SystemClock, Utils}
import org.apache.spark.{SecurityManager, SparkConf}

import scala.collection.JavaConverters._

/**
* Manages the execution of one driver, including automatically restarting the driver on failure.
Expand Down Expand Up @@ -181,6 +182,15 @@ private[deploy] class DriverRunner(
val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)

// We only support propagtation of credentials if keytab is passed
// since we can't renew indefinitely without a keytab
if (driverDesc.command.environment.contains(BOOTSTRAP_TOKENS) && conf.contains(KEYTAB)) {
val keytab = conf.get(KEYTAB).get
val keytabFile = new File(driverDir, keytab)
SparkHadoopUtil.get.decodeAndWriteToFile(driverDesc.command.environment,
BOOTSTRAP_TOKENS, keytabFile)
}

runDriver(builder, driverDir, driverDesc.supervise)
}

Expand Down
Loading