Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 22 additions & 32 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.UI._
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.network.sasl.SecretKeyHolder
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -56,17 +57,13 @@ private[spark] class SecurityManager(
private val WILDCARD_ACL = "*"

private val authOn = sparkConf.get(NETWORK_AUTH_ENABLED)
// keep spark.ui.acls.enable for backwards compatibility with 1.0
private var aclsOn =
sparkConf.getBoolean("spark.acls.enable", sparkConf.getBoolean("spark.ui.acls.enable", false))
private var aclsOn = sparkConf.get(ACLS_ENABLE)

// admin acls should be set before view or modify acls
private var adminAcls: Set[String] =
stringToSet(sparkConf.get("spark.admin.acls", ""))
private var adminAcls: Set[String] = sparkConf.get(ADMIN_ACLS).toSet

// admin group acls should be set before view or modify group acls
private var adminAclsGroups : Set[String] =
stringToSet(sparkConf.get("spark.admin.acls.groups", ""))
private var adminAclsGroups: Set[String] = sparkConf.get(ADMIN_ACLS_GROUPS).toSet

private var viewAcls: Set[String] = _

Expand All @@ -82,11 +79,11 @@ private[spark] class SecurityManager(
private val defaultAclUsers = Set[String](System.getProperty("user.name", ""),
Utils.getCurrentUserName())

setViewAcls(defaultAclUsers, sparkConf.get("spark.ui.view.acls", ""))
setModifyAcls(defaultAclUsers, sparkConf.get("spark.modify.acls", ""))
setViewAcls(defaultAclUsers, sparkConf.get(UI_VIEW_ACLS))
setModifyAcls(defaultAclUsers, sparkConf.get(MODIFY_ACLS))

setViewAclsGroups(sparkConf.get("spark.ui.view.acls.groups", ""));
setModifyAclsGroups(sparkConf.get("spark.modify.acls.groups", ""));
setViewAclsGroups(sparkConf.get(UI_VIEW_ACLS_GROUPS))
setModifyAclsGroups(sparkConf.get(MODIFY_ACLS_GROUPS))

private var secretKey: String = _
logInfo("SecurityManager: authentication " + (if (authOn) "enabled" else "disabled") +
Expand Down Expand Up @@ -127,32 +124,25 @@ private[spark] class SecurityManager(
opts
}

/**
* Split a comma separated String, filter out any empty items, and return a Set of strings
*/
private def stringToSet(list: String): Set[String] = {
list.split(',').map(_.trim).filter(!_.isEmpty).toSet
}

/**
* Admin acls should be set before the view or modify acls. If you modify the admin
* acls you should also set the view and modify acls again to pick up the changes.
*/
def setViewAcls(defaultUsers: Set[String], allowedUsers: String) {
viewAcls = (adminAcls ++ defaultUsers ++ stringToSet(allowedUsers))
def setViewAcls(defaultUsers: Set[String], allowedUsers: Seq[String]) {
viewAcls = adminAcls ++ defaultUsers ++ allowedUsers
logInfo("Changing view acls to: " + viewAcls.mkString(","))
}

def setViewAcls(defaultUser: String, allowedUsers: String) {
def setViewAcls(defaultUser: String, allowedUsers: Seq[String]) {
setViewAcls(Set[String](defaultUser), allowedUsers)
}

/**
* Admin acls groups should be set before the view or modify acls groups. If you modify the admin
* acls groups you should also set the view and modify acls groups again to pick up the changes.
*/
def setViewAclsGroups(allowedUserGroups: String) {
viewAclsGroups = (adminAclsGroups ++ stringToSet(allowedUserGroups));
def setViewAclsGroups(allowedUserGroups: Seq[String]) {
viewAclsGroups = adminAclsGroups ++ allowedUserGroups
logInfo("Changing view acls groups to: " + viewAclsGroups.mkString(","))
}

Expand All @@ -179,17 +169,17 @@ private[spark] class SecurityManager(
* Admin acls should be set before the view or modify acls. If you modify the admin
* acls you should also set the view and modify acls again to pick up the changes.
*/
def setModifyAcls(defaultUsers: Set[String], allowedUsers: String) {
modifyAcls = (adminAcls ++ defaultUsers ++ stringToSet(allowedUsers))
def setModifyAcls(defaultUsers: Set[String], allowedUsers: Seq[String]) {
modifyAcls = adminAcls ++ defaultUsers ++ allowedUsers
logInfo("Changing modify acls to: " + modifyAcls.mkString(","))
}

/**
* Admin acls groups should be set before the view or modify acls groups. If you modify the admin
* acls groups you should also set the view and modify acls groups again to pick up the changes.
*/
def setModifyAclsGroups(allowedUserGroups: String) {
modifyAclsGroups = (adminAclsGroups ++ stringToSet(allowedUserGroups));
def setModifyAclsGroups(allowedUserGroups: Seq[String]) {
modifyAclsGroups = adminAclsGroups ++ allowedUserGroups
logInfo("Changing modify acls groups to: " + modifyAclsGroups.mkString(","))
}

Expand All @@ -216,17 +206,17 @@ private[spark] class SecurityManager(
* Admin acls should be set before the view or modify acls. If you modify the admin
* acls you should also set the view and modify acls again to pick up the changes.
*/
def setAdminAcls(adminUsers: String) {
adminAcls = stringToSet(adminUsers)
def setAdminAcls(adminUsers: Seq[String]) {
adminAcls = adminUsers.toSet
logInfo("Changing admin acls to: " + adminAcls.mkString(","))
}

/**
* Admin acls groups should be set before the view or modify acls groups. If you modify the admin
* acls groups you should also set the view and modify acls groups again to pick up the changes.
*/
def setAdminAclsGroups(adminUserGroups: String) {
adminAclsGroups = stringToSet(adminUserGroups)
def setAdminAclsGroups(adminUserGroups: Seq[String]) {
adminAclsGroups = adminUserGroups.toSet
logInfo("Changing admin acls groups to: " + adminAclsGroups.mkString(","))
}

Expand Down Expand Up @@ -416,7 +406,7 @@ private[spark] object SecurityManager {

val k8sRegex = "k8s.*".r
val SPARK_AUTH_CONF = NETWORK_AUTH_ENABLED.key
val SPARK_AUTH_SECRET_CONF = "spark.authenticate.secret"
val SPARK_AUTH_SECRET_CONF = AUTH_SECRET.key
// This is used to set auth secret to an executor's env variable. It should have the same
// value as SPARK_AUTH_SECRET_CONF set in SparkConf
val ENV_AUTH_SECRET = "_SPARK_AUTH_SECRET"
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream,
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Tests._
import org.apache.spark.internal.config.UI._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
Expand Down Expand Up @@ -440,7 +441,7 @@ class SparkContext(config: SparkConf) extends Logging {
}

_ui =
if (conf.getBoolean("spark.ui.enabled", true)) {
if (conf.get(UI_ENABLED)) {
Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",
startTime))
} else {
Expand Down Expand Up @@ -510,7 +511,7 @@ class SparkContext(config: SparkConf) extends Logging {
_applicationId = _taskScheduler.applicationId()
_applicationAttemptId = taskScheduler.applicationAttemptId()
_conf.set("spark.app.id", _applicationId)
if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
if (_conf.get(UI_REVERSE_PROXY)) {
System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
}
_ui.foreach(_.setAppId(_applicationId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ class LocalSparkCluster(

// Disable REST server on Master in this mode unless otherwise specified
val _conf = conf.clone()
.setIfMissing("spark.master.rest.enabled", "false")
.set(config.SHUFFLE_SERVICE_ENABLED.key, "false")
.setIfMissing(config.MASTER_REST_SERVER_ENABLED, false)
.set(config.SHUFFLE_SERVICE_ENABLED, false)

/* Start the Master */
val (rpcEnv, webUiPort, _) = Master.startRpcEnvAndEndpoint(localHostname, 0, 0, _conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import org.apache.spark.api.r.RUtils
import org.apache.spark.deploy.rest._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.UI._
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ import org.fusesource.leveldbjni.internal.NativeDB
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{DRIVER_LOG_DFS_DIR, History}
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.History._
import org.apache.spark.internal.config.Status._
import org.apache.spark.internal.config.Tests.IS_TESTING
import org.apache.spark.internal.config.UI._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.ReplayListenerBus._
Expand Down Expand Up @@ -105,12 +106,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

private val logDir = conf.get(History.HISTORY_LOG_DIR)

private val HISTORY_UI_ACLS_ENABLE = conf.get(History.UI_ACLS_ENABLE)
private val HISTORY_UI_ADMIN_ACLS = conf.get(History.UI_ADMIN_ACLS)
private val HISTORY_UI_ADMIN_ACLS_GROUPS = conf.get(History.UI_ADMIN_ACLS_GROUPS)
logInfo(s"History server ui acls " + (if (HISTORY_UI_ACLS_ENABLE) "enabled" else "disabled") +
"; users with admin permissions: " + HISTORY_UI_ADMIN_ACLS.toString +
"; groups with admin permissions" + HISTORY_UI_ADMIN_ACLS_GROUPS.toString)
private val historyUiAclsEnable = conf.get(History.HISTORY_SERVER_UI_ACLS_ENABLE)
private val historyUiAdminAcls = conf.get(History.HISTORY_SERVER_UI_ADMIN_ACLS)
private val historyUiAdminAclsGroups = conf.get(History.HISTORY_SERVER_UI_ADMIN_ACLS_GROUPS)
logInfo(s"History server ui acls " + (if (historyUiAclsEnable) "enabled" else "disabled") +
"; users with admin permissions: " + historyUiAdminAcls.mkString(",") +
"; groups with admin permissions" + historyUiAdminAclsGroups.mkString(","))

private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
// Visible for testing
Expand Down Expand Up @@ -314,6 +315,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

override def getLastUpdatedTime(): Long = lastScanTime.get()

/**
* Split a comma separated String, filter out any empty items, and return a Sequence of strings
*/
private def stringToSeq(list: String): Seq[String] = {
list.split(',').map(_.trim).filter(!_.isEmpty)
}

override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = {
val app = try {
load(appId)
Expand All @@ -330,13 +338,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val conf = this.conf.clone()
val secManager = new SecurityManager(conf)

secManager.setAcls(HISTORY_UI_ACLS_ENABLE)
secManager.setAcls(historyUiAclsEnable)
// make sure to set admin acls before view acls so they are properly picked up
secManager.setAdminAcls(HISTORY_UI_ADMIN_ACLS + "," + attempt.adminAcls.getOrElse(""))
secManager.setViewAcls(attempt.info.sparkUser, attempt.viewAcls.getOrElse(""))
secManager.setAdminAclsGroups(HISTORY_UI_ADMIN_ACLS_GROUPS + "," +
attempt.adminAclsGroups.getOrElse(""))
secManager.setViewAclsGroups(attempt.viewAclsGroups.getOrElse(""))
secManager.setAdminAcls(historyUiAdminAcls ++ stringToSeq(attempt.adminAcls.getOrElse("")))
secManager.setViewAcls(attempt.info.sparkUser, stringToSeq(attempt.viewAcls.getOrElse("")))
secManager.setAdminAclsGroups(historyUiAdminAclsGroups ++
stringToSeq(attempt.adminAclsGroups.getOrElse("")))
secManager.setViewAclsGroups(stringToSeq(attempt.viewAclsGroups.getOrElse("")))

val kvstore = try {
diskManager match {
Expand Down Expand Up @@ -1187,11 +1195,16 @@ private[history] class AppListingListener(
// Only parse the first env update, since any future changes don't have any effect on
// the ACLs set for the UI.
if (!gotEnvUpdate) {
def emptyStringToNone(strOption: Option[String]): Option[String] = strOption match {
case Some("") => None
case _ => strOption
}

val allProperties = event.environmentDetails("Spark Properties").toMap
attempt.viewAcls = allProperties.get("spark.ui.view.acls")
attempt.adminAcls = allProperties.get("spark.admin.acls")
attempt.viewAclsGroups = allProperties.get("spark.ui.view.acls.groups")
attempt.adminAclsGroups = allProperties.get("spark.admin.acls.groups")
attempt.viewAcls = emptyStringToNone(allProperties.get(UI_VIEW_ACLS.key))
attempt.adminAcls = emptyStringToNone(allProperties.get(ADMIN_ACLS.key))
attempt.viewAclsGroups = emptyStringToNone(allProperties.get(UI_VIEW_ACLS_GROUPS.key))
attempt.adminAclsGroups = emptyStringToNone(allProperties.get(ADMIN_ACLS_GROUPS.key))

gotEnvUpdate = true
checkProgress()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.History
import org.apache.spark.internal.config.UI._
import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, UIRoot}
import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
import org.apache.spark.ui.JettyUtils._
Expand Down Expand Up @@ -302,11 +303,10 @@ object HistoryServer extends Logging {
config.set(SecurityManager.SPARK_AUTH_CONF, "false")
}

if (config.getBoolean("spark.acls.enable", config.getBoolean("spark.ui.acls.enable", false))) {
logInfo("Either spark.acls.enable or spark.ui.acls.enable is configured, clearing it and " +
"only using spark.history.ui.acl.enable")
config.set("spark.acls.enable", "false")
config.set("spark.ui.acls.enable", "false")
if (config.get(ACLS_ENABLE)) {
logInfo(s"${ACLS_ENABLE.key} is configured, " +
s"clearing it and only using ${History.HISTORY_SERVER_UI_ACLS_ENABLE.key}")
config.set(ACLS_ENABLE, false)
}

new SecurityManager(config)
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.deploy.rest.StandaloneRestServer
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.UI._
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc._
import org.apache.spark.serializer.{JavaSerializer, Serializer}
Expand Down Expand Up @@ -115,13 +117,13 @@ private[deploy] class Master(

// Default maxCores for applications that don't specify it (i.e. pass Int.MaxValue)
private val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue)
val reverseProxy = conf.getBoolean("spark.ui.reverseProxy", false)
val reverseProxy = conf.get(UI_REVERSE_PROXY)
if (defaultCores < 1) {
throw new SparkException("spark.deploy.defaultCores must be positive")
}

// Alternative application submission gateway that is stable across Spark versions
private val restServerEnabled = conf.getBoolean("spark.master.rest.enabled", false)
private val restServerEnabled = conf.get(MASTER_REST_SERVER_ENABLED)
private var restServer: Option[StandaloneRestServer] = None
private var restServerBoundPort: Option[Int] = None

Expand All @@ -140,7 +142,7 @@ private[deploy] class Master(
webUi.bind()
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
if (reverseProxy) {
masterWebUiUrl = conf.get("spark.ui.reverseProxyUrl", masterWebUiUrl)
masterWebUiUrl = conf.get(UI_REVERSE_PROXY_URL).orElse(Some(masterWebUiUrl)).get
webUi.addProxy()
logInfo(s"Spark Master is acting as a reverse proxy. Master, Workers and " +
s"Applications UIs are available at $masterWebUiUrl")
Expand All @@ -152,7 +154,7 @@ private[deploy] class Master(
}, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)

if (restServerEnabled) {
val port = conf.getInt("spark.master.rest.port", 6066)
val port = conf.get(MASTER_REST_SERVER_PORT)
restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl))
}
restServerBoundPort = restServer.map(_.start())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.annotation.tailrec

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.MASTER_UI_PORT
import org.apache.spark.util.{IntParam, Utils}

/**
Expand Down Expand Up @@ -53,8 +54,8 @@ private[master] class MasterArguments(args: Array[String], conf: SparkConf) exte
// This mutates the SparkConf, so all accesses to it must be made after this line
propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)

if (conf.contains("spark.master.ui.port")) {
webUiPort = conf.get("spark.master.ui.port").toInt
if (conf.contains(MASTER_UI_PORT.key)) {
webUiPort = conf.get(MASTER_UI_PORT)
}

@tailrec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.deploy.master.ui
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.Master
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.UI.UI_KILL_ENABLED
import org.apache.spark.ui.{SparkUI, WebUI}
import org.apache.spark.ui.JettyUtils._

Expand All @@ -34,7 +35,7 @@ class MasterWebUI(
requestedPort, master.conf, name = "MasterUI") with Logging {

val masterEndpointRef = master.self
val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)
val killEnabled = master.conf.get(UI_KILL_ENABLED)

initialize()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ private[rest] class StandaloneSubmitRequestServlet(
// the driver.
val masters = sparkProperties.get("spark.master")
val (_, masterPort) = Utils.extractHostPortFromSparkUrl(masterUrl)
val masterRestPort = this.conf.getInt("spark.master.rest.port", 6066)
val masterRestPort = this.conf.get(config.MASTER_REST_SERVER_PORT)
val updatedMasters = masters.map(
_.replace(s":$masterRestPort", s":$masterPort")).getOrElse(masterUrl)
val appArgs = request.appArgs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState}
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.UI._
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.util.{ShutdownHookManager, Utils}
import org.apache.spark.util.logging.FileAppender
Expand Down Expand Up @@ -160,7 +161,7 @@ private[deploy] class ExecutorRunner(

// Add webUI log urls
val baseUrl =
if (conf.getBoolean("spark.ui.reverseProxy", false)) {
if (conf.get(UI_REVERSE_PROXY)) {
s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="
} else {
s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
Expand Down
Loading