Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ private[spark] class SparkSubmit extends Logging {
val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES)

def shouldDownload(scheme: String): Boolean = {
forceDownloadSchemes.contains(scheme) ||
forceDownloadSchemes.contains("*") || forceDownloadSchemes.contains(scheme) ||
Try { FileSystem.getFileSystemClass(scheme, hadoopConf) }.isFailure
}

Expand Down Expand Up @@ -578,7 +578,8 @@ private[spark] class SparkSubmit extends Logging {
}
// Add the main application jar and any added jars to classpath in case YARN client
// requires these jars.
// This assumes both primaryResource and user jars are local jars, otherwise it will not be
// This assumes both primaryResource and user jars are local jars, or already downloaded
// to local by configuring "spark.yarn.dist.forceDownloadSchemes", otherwise it will not be
// added to the classpath of YARN client.
if (isYarnCluster) {
if (isUserJar(args.primaryResource)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,10 +483,11 @@ package object config {

private[spark] val FORCE_DOWNLOAD_SCHEMES =
ConfigBuilder("spark.yarn.dist.forceDownloadSchemes")
.doc("Comma-separated list of schemes for which files will be downloaded to the " +
.doc("Comma-separated list of schemes for which resources will be downloaded to the " +
"local disk prior to being added to YARN's distributed cache. For use in cases " +
"where the YARN service does not support schemes that are supported by Spark, like http, " +
"https and ftp.")
"https and ftp, or jars required to be in the local YARN client's classpath. Wildcard " +
"'*' is denoted to download resources for all the schemes.")
.stringConf
.toSequence
.createWithDefault(Nil)
Expand Down
29 changes: 19 additions & 10 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -995,20 +995,24 @@ class SparkSubmitSuite
}

test("download remote resource if it is not supported by yarn service") {
testRemoteResources(enableHttpFs = false, blacklistHttpFs = false)
testRemoteResources(enableHttpFs = false)
}

test("avoid downloading remote resource if it is supported by yarn service") {
testRemoteResources(enableHttpFs = true, blacklistHttpFs = false)
testRemoteResources(enableHttpFs = true)
}

test("force download from blacklisted schemes") {
testRemoteResources(enableHttpFs = true, blacklistHttpFs = true)
testRemoteResources(enableHttpFs = true, blacklistSchemes = Seq("http"))
}

test("force download for all the schemes") {
testRemoteResources(enableHttpFs = true, blacklistSchemes = Seq("*"))
}

private def testRemoteResources(
enableHttpFs: Boolean,
blacklistHttpFs: Boolean): Unit = {
blacklistSchemes: Seq[String] = Nil): Unit = {
val hadoopConf = new Configuration()
updateConfWithFakeS3Fs(hadoopConf)
if (enableHttpFs) {
Expand All @@ -1025,8 +1029,8 @@ class SparkSubmitSuite
val tmpHttpJar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpDir)
val tmpHttpJarPath = s"http://${new File(tmpHttpJar.toURI).getAbsolutePath}"

val forceDownloadArgs = if (blacklistHttpFs) {
Seq("--conf", "spark.yarn.dist.forceDownloadSchemes=http")
val forceDownloadArgs = if (blacklistSchemes.nonEmpty) {
Seq("--conf", s"spark.yarn.dist.forceDownloadSchemes=${blacklistSchemes.mkString(",")}")
} else {
Nil
}
Expand All @@ -1044,14 +1048,19 @@ class SparkSubmitSuite

val jars = conf.get("spark.yarn.dist.jars").split(",").toSet

// The URI of remote S3 resource should still be remote.
assert(jars.contains(tmpS3JarPath))
def isSchemeBlacklisted(scheme: String) = {
blacklistSchemes.contains("*") || blacklistSchemes.contains(scheme)
}

if (!isSchemeBlacklisted("s3")) {
assert(jars.contains(tmpS3JarPath))
}

if (enableHttpFs && !blacklistHttpFs) {
if (enableHttpFs && blacklistSchemes.isEmpty) {
// If Http FS is supported by yarn service, the URI of remote http resource should
// still be remote.
assert(jars.contains(tmpHttpJarPath))
} else {
} else if (!enableHttpFs || isSchemeBlacklisted("http")) {
// If Http FS is not supported by yarn service, or http scheme is configured to be force
// downloading, the URI of remote http resource should be changed to a local one.
val jarName = new File(tmpHttpJar.toURI).getName
Expand Down
5 changes: 3 additions & 2 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,10 @@ To use a custom metrics.properties for the application master and executors, upd
<td><code>spark.yarn.dist.forceDownloadSchemes</code></td>
<td><code>(none)</code></td>
<td>
Comma-separated list of schemes for which files will be downloaded to the local disk prior to
Comma-separated list of schemes for which resources will be downloaded to the local disk prior to
being added to YARN's distributed cache. For use in cases where the YARN service does not
support schemes that are supported by Spark, like http, https and ftp.
support schemes that are supported by Spark, like http, https and ftp, or jars required to be in the
local YARN client's classpath. Wildcard '*' is denoted to download resources for all the schemes.
</td>
</tr>
<tr>
Expand Down