Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,8 @@ import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, Priority}
import org.apache.hadoop.yarn.util.ConverterUtils

import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.deploy.yarn.security.YARNHadoopDelegationTokenManager
import org.apache.spark.internal.config._
import org.apache.spark.launcher.YarnCommandBuilderUtils
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -193,8 +190,7 @@ object YarnSparkHadoopUtil {
sparkConf: SparkConf,
hadoopConf: Configuration): Set[FileSystem] = {
val filesystemsToAccess = sparkConf.get(FILESYSTEMS_TO_ACCESS)
.map(new Path(_).getFileSystem(hadoopConf))
.toSet
val requestAllDelegationTokens = filesystemsToAccess.isEmpty

val stagingFS = sparkConf.get(STAGING_DIR)
.map(new Path(_).getFileSystem(hadoopConf))
Expand All @@ -203,8 +199,8 @@ object YarnSparkHadoopUtil {
// Add the list of available namenodes for all namespaces in HDFS federation.
// If ViewFS is enabled, this is skipped as ViewFS already handles delegation tokens for its
// namespaces.
val hadoopFilesystems = if (stagingFS.getScheme == "viewfs") {
Set.empty
val hadoopFilesystems = if (!requestAllDelegationTokens || stagingFS.getScheme == "viewfs") {
filesystemsToAccess.map(new Path(_).getFileSystem(hadoopConf)).toSet
} else {
val nameservices = hadoopConf.getTrimmedStrings("dfs.nameservices")
// Retrieving the filesystem for the nameservices where HA is not enabled
Expand All @@ -222,7 +218,7 @@ object YarnSparkHadoopUtil {
(filesystemsWithoutHA ++ filesystemsWithHA).toSet
}

filesystemsToAccess ++ hadoopFilesystems + stagingFS
hadoopFilesystems + stagingFS
}

}