Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Go back to propagating secret using credentials.
  • Loading branch information
Marcelo Vanzin committed Dec 1, 2017
commit c752453b2a379f301d52692bfc639bb631520069
36 changes: 19 additions & 17 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ package org.apache.spark

import java.lang.{Byte => JByte}
import java.net.{Authenticator, PasswordAuthentication}
import java.nio.charset.StandardCharsets.UTF_8
import java.security.{KeyStore, SecureRandom}
import java.security.cert.X509Certificate
import javax.net.ssl._

import com.google.common.hash.HashCodes
import com.google.common.io.Files
import org.apache.hadoop.io.Text
import org.apache.hadoop.security.{Credentials, UserGroupInformation}

import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
Expand Down Expand Up @@ -498,7 +501,10 @@ private[spark] class SecurityManager(
*/
def getSecretKey(): String = {
if (isAuthenticationEnabled) {
Option(sparkConf.getenv(ENV_AUTH_SECRET))
val creds = UserGroupInformation.getCurrentUser().getCredentials()
Option(creds.getSecretKey(SECRET_LOOKUP_KEY))
.map { bytes => new String(bytes, UTF_8) }
.orElse(Option(sparkConf.getenv(ENV_AUTH_SECRET)))
.orElse(sparkConf.getOption(SPARK_AUTH_SECRET_CONF))
.getOrElse {
throw new IllegalArgumentException(
Expand All @@ -510,12 +516,11 @@ private[spark] class SecurityManager(
}

/**
* Initialize the configuration object held by this class for authentication.
* Initialize the authentication secret.
*
* If authentication is disabled, do nothing.
*
* In YARN mode, generate a secret key and store it in the configuration object, setting it up to
* also be propagated to executors using an env variable.
* In YARN mode, generate a new secret and store it in the current user's credentials.
*
* In other modes, assert that the auth secret is set in the configuration.
*/
Expand All @@ -530,19 +535,14 @@ private[spark] class SecurityManager(
return
}

// In YARN, force creation of a new secret if this is client mode. This ensures each
// YARN app uses a different secret. For cluster mode, this relies on YARN's client to
// not propagate the secret to the driver, which will then generate a new one.
val deployMode = sparkConf.get(SparkLauncher.DEPLOY_MODE, "client")
if (!sparkConf.contains(SPARK_AUTH_SECRET_CONF) || deployMode == "client") {
val rnd = new SecureRandom()
val length = sparkConf.getInt("spark.authenticate.secretBitLength", 256) / JByte.SIZE
val secretBytes = new Array[Byte](length)
rnd.nextBytes(secretBytes)

val secret = HashCodes.fromBytes(secretBytes).toString()
sparkConf.set(SPARK_AUTH_SECRET_CONF, secret)
}
val rnd = new SecureRandom()
val length = sparkConf.getInt("spark.authenticate.secretBitLength", 256) / JByte.SIZE
val secretBytes = new Array[Byte](length)
rnd.nextBytes(secretBytes)

val creds = new Credentials()
creds.addSecretKey(SECRET_LOOKUP_KEY, secretBytes)
UserGroupInformation.getCurrentUser().addCredentials(creds)
}

// Default SecurityManager only has a single secret key, so ignore appId.
Expand All @@ -558,4 +558,6 @@ private[spark] object SecurityManager {
// value as SPARK_AUTH_SECRET_CONF set in SparkConf
val ENV_AUTH_SECRET = "_SPARK_AUTH_SECRET"

// key used to store the spark secret in the Hadoop UGI
val SECRET_LOOKUP_KEY = new Text("sparkCookie")
}
18 changes: 16 additions & 2 deletions core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
package org.apache.spark

import java.io.File
import java.nio.charset.StandardCharsets.UTF_8
import java.security.PrivilegedExceptionAction

import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.internal.config._
import org.apache.spark.launcher.SparkLauncher
Expand Down Expand Up @@ -441,8 +445,18 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
.set(NETWORK_AUTH_ENABLED, true)
.set(SparkLauncher.SPARK_MASTER, "yarn")
val mgr = new SecurityManager(conf)
mgr.initializeAuth()
assert(mgr.getSecretKey() != null)

UserGroupInformation.createUserForTesting("authTest", Array()).doAs(
new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
mgr.initializeAuth()
val creds = UserGroupInformation.getCurrentUser().getCredentials()
val secret = creds.getSecretKey(SecurityManager.SECRET_LOOKUP_KEY)
assert(secret != null)
assert(new String(secret, UTF_8) === mgr.getSecretKey())
}
}
)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
}
}

// Initialize the security manager for authentication, if enabled. This needs to be done
// before the config is propagated to the system properties.
private val securityMgr = new SecurityManager(sparkConf)
if (isClusterMode) {
securityMgr.initializeAuth()
}

// Set system properties for each config entry. This covers two use cases:
// - The default configuration stored by the SparkHadoopUtil class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -758,9 +758,7 @@ private[spark] class Client(
// Save Spark configuration to a file in the archive, but filter out the app's secret.
val props = new Properties()
sparkConf.getAll.foreach { case (k, v) =>
if (k != SecurityManager.SPARK_AUTH_SECRET_CONF) {
props.setProperty(k, v)
}
props.setProperty(k, v)
}
// Override spark.yarn.key to point to the location in distributed cache which will be used
// by AM.
Expand Down Expand Up @@ -834,11 +832,6 @@ private[spark] class Client(
}
}
sys.env.get("PYTHONHASHSEED").foreach(env.put("PYTHONHASHSEED", _))
} else {
// Only propagate the auth secret to the AM in client mode, using the environment, if set.
sparkConf.getOption(SecurityManager.SPARK_AUTH_SECRET_CONF).foreach { secret =>
env(SecurityManager.ENV_AUTH_SECRET) = secret
}
}

sys.env.get(ENV_DIST_CLASSPATH).foreach { dcp =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,6 @@ private[yarn] class ExecutorRunnable(
}
}

if (securityMgr.getSecretKey() != null) {
env(SecurityManager.ENV_AUTH_SECRET) = securityMgr.getSecretKey()
}

System.getenv().asScala.filterKeys(_.startsWith("SPARK"))
.foreach { case (k, v) => env(k) = v }
env
Expand Down