Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,20 @@ class SparkProcessBuilder(
buffer += s"${convertConfigKey(k)}=$v"
}

// For spark on kubernetes, spark pod using env SPARK_USER_NAME as current user
def setSparkUserName(userName: String): Unit = {
buffer += CONF
buffer += s"spark.kubernetes.driverEnv.SPARK_USER_NAME=$userName"
buffer += CONF
buffer += s"spark.kubernetes.executorEnv.SPARK_USER_NAME=$userName"
}

// iff the keytab is specified, PROXY_USER is not supported
if (!useKeytab()) {
val shortUserName = useKeytab()
if (shortUserName.nonEmpty) {
setSparkUserName(shortUserName.get)
} else {
setSparkUserName(proxyUser)
buffer += PROXY_USER
buffer += proxyUser
}
Expand All @@ -104,26 +116,27 @@ class SparkProcessBuilder(

override protected def module: String = "kyuubi-spark-sql-engine"

private def useKeytab(): Boolean = {
private def useKeytab(): Option[String] = {
val principal = conf.getOption(PRINCIPAL)
val keytab = conf.getOption(KEYTAB)
if (principal.isEmpty || keytab.isEmpty) {
false
None
} else {
try {
val ugi = UserGroupInformation
.loginUserFromKeytabAndReturnUGI(principal.get, keytab.get)
val keytabEnabled = ugi.getShortUserName == proxyUser
if (!keytabEnabled) {
if (ugi.getShortUserName != proxyUser) {
warn(s"The session proxy user: $proxyUser is not same with " +
s"spark principal: ${ugi.getShortUserName}, so we can't support use keytab. " +
s"Fallback to use proxy user.")
None
} else {
Some(ugi.getShortUserName)
}
keytabEnabled
} catch {
case e: IOException =>
error(s"Failed to login for ${principal.get}", e)
false
None
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,26 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar {
assert(!pb.toString.contains(engineRefId2))
assert(pb.toString.contains(engineRefId))
}

test("SparkProcessBuilder build spark engine with SPARK_USER_NAME") {
val proxyName = "kyuubi"
val conf1 = KyuubiConf(false)
val b1 = new SparkProcessBuilder(proxyName, conf1)
val c1 = b1.toString.split(' ')
assert(c1.contains(s"spark.kubernetes.driverEnv.SPARK_USER_NAME=$proxyName"))
assert(c1.contains(s"spark.kubernetes.executorEnv.SPARK_USER_NAME=$proxyName"))

tryWithSecurityEnabled {
val conf2 = conf.set("spark.kerberos.principal", testPrincipal)
.set("spark.kerberos.keytab", testKeytab)
val name = ServiceUtils.getShortName(testPrincipal)
val b2 = new SparkProcessBuilder(name, conf2)
val c2 = b2.toString.split(' ')
assert(c2.contains(s"spark.kubernetes.driverEnv.SPARK_USER_NAME=$name"))
assert(c2.contains(s"spark.kubernetes.executorEnv.SPARK_USER_NAME=$name"))
assert(!c2.contains(s"--proxy-user $name"))
}
}
}

class FakeSparkProcessBuilder(config: KyuubiConf)
Expand Down