Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -700,8 +700,6 @@ private[spark] object SparkConf extends Logging {
AlternateConfig("spark.akka.frameSize", "1.6")),
"spark.yarn.jars" -> Seq(
AlternateConfig("spark.yarn.jar", "2.0")),
"spark.yarn.access.hadoopFileSystems" -> Seq(
AlternateConfig("spark.yarn.access.namenodes", "2.2")),
MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM.key -> Seq(
AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3")),
LISTENER_BUS_EVENT_QUEUE_CAPACITY.key -> Seq(
Expand All @@ -715,7 +713,10 @@ private[spark] object SparkConf extends Logging {
PRINCIPAL.key -> Seq(
AlternateConfig("spark.yarn.principal", "3.0")),
KERBEROS_RELOGIN_PERIOD.key -> Seq(
AlternateConfig("spark.yarn.kerberos.relogin.period", "3.0"))
AlternateConfig("spark.yarn.kerberos.relogin.period", "3.0")),
KERBEROS_FILESYSTEMS_TO_ACCESS.key -> Seq(
AlternateConfig("spark.yarn.access.namenodes", "2.2"),
AlternateConfig("spark.yarn.access.hadoopFileSystems", "3.0"))
)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ private[security] class HBaseDelegationTokenProvider
override def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
fileSystems: Set[FileSystem],
creds: Credentials): Option[Long] = {
try {
val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ private[spark] class HadoopDelegationTokenManager(
def obtainDelegationTokens(creds: Credentials): Long = {
delegationTokenProviders.values.flatMap { provider =>
if (provider.delegationTokensRequired(sparkConf, hadoopConf)) {
provider.obtainDelegationTokens(hadoopConf, sparkConf, fileSystemsToAccess(), creds)
provider.obtainDelegationTokens(hadoopConf, sparkConf, creds)
} else {
logDebug(s"Service ${provider.serviceName} does not require a token." +
s" Check your configuration to see if security is disabled or not.")
Expand Down Expand Up @@ -181,14 +181,6 @@ private[spark] class HadoopDelegationTokenManager(
.getOrElse(isEnabledDeprecated)
}

/**
* List of file systems for which to obtain delegation tokens. The base implementation
* returns just the default file system in the given Hadoop configuration.
*/
protected def fileSystemsToAccess(): Set[FileSystem] = {
Set(FileSystem.get(hadoopConf))
}

private def scheduleRenewal(delay: Long): Unit = {
val _delay = math.max(0, delay)
logInfo(s"Scheduling renewal in ${UIUtils.formatDuration(delay)}.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,11 @@ private[spark] trait HadoopDelegationTokenProvider {
* Obtain delegation tokens for this service and get the time of the next renewal.
* @param hadoopConf Configuration of current Hadoop Compatible system.
* @param creds Credentials to add tokens and security keys to.
* @param fileSystems List of file systems for which to obtain delegation tokens.
* @return If the returned tokens are renewable and can be renewed, return the time of the next
* renewal, otherwise None should be returned.
*/
def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
fileSystems: Set[FileSystem],
creds: Credentials): Option[Long]
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.util.Try
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapred.Master
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
Expand All @@ -44,9 +44,9 @@ private[deploy] class HadoopFSDelegationTokenProvider
override def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
fileSystems: Set[FileSystem],
creds: Credentials): Option[Long] = {
try {
val fileSystems = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(sparkConf, hadoopConf)
val fetchCreds = fetchDelegationTokens(getTokenRenewer(hadoopConf), fileSystems, creds)

// Get the token renewal interval if it is not set. It will only be called once.
Expand Down Expand Up @@ -133,3 +133,45 @@ private[deploy] class HadoopFSDelegationTokenProvider
if (renewIntervals.isEmpty) None else Some(renewIntervals.min)
}
}

private[deploy] object HadoopFSDelegationTokenProvider {
def hadoopFSsToAccess(
sparkConf: SparkConf,
hadoopConf: Configuration): Set[FileSystem] = {
val filesystemsToAccess = sparkConf.get(KERBEROS_FILESYSTEMS_TO_ACCESS)

val defaultFS = FileSystem.get(hadoopConf)
val master = sparkConf.get("spark.master", null)
val stagingFS = if (master != null && master.contains("yarn")) {
sparkConf.get(STAGING_DIR)
.map(new Path(_).getFileSystem(hadoopConf))
.getOrElse(defaultFS)
} else {
defaultFS
}

// Add the list of available namenodes for all namespaces in HDFS federation.
// If ViewFS is enabled, this is skipped as ViewFS already handles delegation tokens for its
// namespaces.
val hadoopFilesystems = if (!filesystemsToAccess.isEmpty || stagingFS.getScheme == "viewfs") {
filesystemsToAccess.map(new Path(_).getFileSystem(hadoopConf)).toSet
} else {
val nameservices = hadoopConf.getTrimmedStrings("dfs.nameservices")
// Retrieving the filesystem for the nameservices where HA is not enabled
val filesystemsWithoutHA = nameservices.flatMap { ns =>
Option(hadoopConf.get(s"dfs.namenode.rpc-address.$ns")).map { nameNode =>
new Path(s"hdfs://$nameNode").getFileSystem(hadoopConf)
}
}
// Retrieving the filesystem for the nameservices where HA is enabled
val filesystemsWithHA = nameservices.flatMap { ns =>
Option(hadoopConf.get(s"dfs.ha.namenodes.$ns")).map { _ =>
new Path(s"hdfs://$ns").getFileSystem(hadoopConf)
}
}
(filesystemsWithoutHA ++ filesystemsWithHA).toSet
}

hadoopFilesystems + stagingFS + defaultFS
}
}
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,14 @@ package object config {
.checkValues(Set("keytab", "ccache"))
.createWithDefault("keytab")

private[spark] val KERBEROS_FILESYSTEMS_TO_ACCESS =
ConfigBuilder("spark.kerberos.access.hadoopFileSystems")
.doc("Extra Hadoop filesystem URLs for which to request delegation tokens. The filesystem " +
"that hosts fs.defaultFS does not need to be listed here.")
.stringConf
.toSequence
.createWithDefault(Nil)

private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances")
.intConf
.createOptional
Expand Down Expand Up @@ -1253,4 +1261,9 @@ package object config {
ConfigBuilder("spark.speculation.quantile")
.doubleConf
.createWithDefault(0.75)

private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir")
.doc("Staging directory used while submitting applications.")
.stringConf
.createOptional
}
6 changes: 6 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkConfSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,12 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst

conf.set("spark.scheduler.listenerbus.eventqueue.size", "84")
assert(conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY) === 84)

conf.set("spark.yarn.access.namenodes", "testNode")
assert(conf.get(KERBEROS_FILESYSTEMS_TO_ACCESS) === Array("testNode"))

conf.set("spark.yarn.access.hadoopFileSystems", "testNode")
assert(conf.get(KERBEROS_FILESYSTEMS_TO_ACCESS) === Array("testNode"))
}

test("akka deprecated configs") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.deploy.security

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.security.Credentials

import org.apache.spark.{SparkConf, SparkFunSuite}
Expand All @@ -36,7 +35,6 @@ private class ExceptionThrowingDelegationTokenProvider extends HadoopDelegationT
override def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
fileSystems: Set[FileSystem],
creds: Credentials): Option[Long] = throw new IllegalArgumentException
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.security

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.scalatest.Matchers

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.STAGING_DIR

class HadoopFSDelegationTokenProviderSuite extends SparkFunSuite with Matchers {
test("hadoopFSsToAccess should return defaultFS even if not configured") {
val sparkConf = new SparkConf()
val defaultFS = "hdfs://localhost:8020"
val statingDir = "hdfs://localhost:8021"
sparkConf.set("spark.master", "yarn-client")
sparkConf.set(STAGING_DIR, statingDir)
val hadoopConf = new Configuration()
hadoopConf.set("fs.defaultFS", defaultFS)
val expected = Set(
new Path(defaultFS).getFileSystem(hadoopConf),
new Path(statingDir).getFileSystem(hadoopConf)
)
val result = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(sparkConf, hadoopConf)
result should be (expected)
}

test("SPARK-24149: retrieve all namenodes from HDFS") {
val sparkConf = new SparkConf()
sparkConf.set("spark.master", "yarn-client")
val basicFederationConf = new Configuration()
basicFederationConf.set("fs.defaultFS", "hdfs://localhost:8020")
basicFederationConf.set("dfs.nameservices", "ns1,ns2")
basicFederationConf.set("dfs.namenode.rpc-address.ns1", "localhost:8020")
basicFederationConf.set("dfs.namenode.rpc-address.ns2", "localhost:8021")
val basicFederationExpected = Set(
new Path("hdfs://localhost:8020").getFileSystem(basicFederationConf),
new Path("hdfs://localhost:8021").getFileSystem(basicFederationConf))
val basicFederationResult = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(
sparkConf, basicFederationConf)
basicFederationResult should be (basicFederationExpected)

// when viewfs is enabled, namespaces are handled by it, so we don't need to take care of them
val viewFsConf = new Configuration()
viewFsConf.addResource(basicFederationConf)
viewFsConf.set("fs.defaultFS", "viewfs://clusterX/")
viewFsConf.set("fs.viewfs.mounttable.clusterX.link./home", "hdfs://localhost:8020/")
val viewFsExpected = Set(new Path("viewfs://clusterX/").getFileSystem(viewFsConf))
HadoopFSDelegationTokenProvider
.hadoopFSsToAccess(sparkConf, viewFsConf) should be (viewFsExpected)

// invalid config should not throw NullPointerException
val invalidFederationConf = new Configuration()
invalidFederationConf.addResource(basicFederationConf)
invalidFederationConf.unset("dfs.namenode.rpc-address.ns2")
val invalidFederationExpected = Set(
new Path("hdfs://localhost:8020").getFileSystem(invalidFederationConf))
val invalidFederationResult = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(
sparkConf, invalidFederationConf)
invalidFederationResult should be (invalidFederationExpected)

// no namespaces defined, ie. old case
val noFederationConf = new Configuration()
noFederationConf.set("fs.defaultFS", "hdfs://localhost:8020")
val noFederationExpected = Set(
new Path("hdfs://localhost:8020").getFileSystem(noFederationConf))
val noFederationResult = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(sparkConf,
noFederationConf)
noFederationResult should be (noFederationExpected)

// federation and HA enabled
val federationAndHAConf = new Configuration()
federationAndHAConf.set("fs.defaultFS", "hdfs://clusterXHA")
federationAndHAConf.set("dfs.nameservices", "clusterXHA,clusterYHA")
federationAndHAConf.set("dfs.ha.namenodes.clusterXHA", "x-nn1,x-nn2")
federationAndHAConf.set("dfs.ha.namenodes.clusterYHA", "y-nn1,y-nn2")
federationAndHAConf.set("dfs.namenode.rpc-address.clusterXHA.x-nn1", "localhost:8020")
federationAndHAConf.set("dfs.namenode.rpc-address.clusterXHA.x-nn2", "localhost:8021")
federationAndHAConf.set("dfs.namenode.rpc-address.clusterYHA.y-nn1", "localhost:8022")
federationAndHAConf.set("dfs.namenode.rpc-address.clusterYHA.y-nn2", "localhost:8023")
federationAndHAConf.set("dfs.client.failover.proxy.provider.clusterXHA",
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider")
federationAndHAConf.set("dfs.client.failover.proxy.provider.clusterYHA",
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider")

val federationAndHAExpected = Set(
new Path("hdfs://clusterXHA").getFileSystem(federationAndHAConf),
new Path("hdfs://clusterYHA").getFileSystem(federationAndHAConf))
val federationAndHAResult = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(
sparkConf, federationAndHAConf)
federationAndHAResult should be (federationAndHAExpected)
}
}
8 changes: 4 additions & 4 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ for:

If an application needs to interact with other secure Hadoop filesystems, their URIs need to be
explicitly provided to Spark at launch time. This is done by listing them in the
`spark.yarn.access.hadoopFileSystems` property, described in the configuration section below.
`spark.kerberos.access.hadoopFileSystems` property, described in the configuration section below.

The YARN integration also supports custom delegation token providers using the Java Services
mechanism (see `java.util.ServiceLoader`). Implementations of
Expand Down Expand Up @@ -528,11 +528,11 @@ providers can be disabled individually by setting `spark.security.credentials.{s
</td>
</tr>
<tr>
<td><code>spark.yarn.access.hadoopFileSystems</code></td>
<td><code>spark.kerberos.access.hadoopFileSystems</code></td>
<td>(none)</td>
<td>
A comma-separated list of secure Hadoop filesystems your Spark application is going to access. For
example, <code>spark.yarn.access.hadoopFileSystems=hdfs://nn1.com:8032,hdfs://nn2.com:8032,
example, <code>spark.kerberos.access.hadoopFileSystems=hdfs://nn1.com:8032,hdfs://nn2.com:8032,
webhdfs://nn3.com:50070</code>. The Spark application must have access to the filesystems listed
and Kerberos must be properly configured to be able to access them (either in the same realm
or in a trusted realm). Spark acquires security tokens for each of the filesystems so that
Expand Down Expand Up @@ -644,7 +644,7 @@ spark.security.credentials.hive.enabled false
spark.security.credentials.hbase.enabled false
```

The configuration option `spark.yarn.access.hadoopFileSystems` must be unset.
The configuration option `spark.kerberos.access.hadoopFileSystems` must be unset.

# Using the Spark History Server to replace the Spark Web UI

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import scala.language.existentials
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.security.Credentials
import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL}

Expand All @@ -38,7 +37,6 @@ private[spark] class KafkaDelegationTokenProvider
override def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
fileSystems: Set[FileSystem],
creds: Credentials): Option[Long] = {
try {
logDebug("Attempting to fetch Kafka security token.")
Expand Down
Loading