Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Avoid re-uploading resources
Change-Id: I2cb667aedd53b228e6dbfed5725cd8b268a498e9
  • Loading branch information
jerryshao committed Aug 16, 2017
commit c2cb5f7d816c87cdc3e906131d1c1c3bb9f80b04
41 changes: 28 additions & 13 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -330,19 +330,21 @@ object SparkSubmit extends CommandLineUtils {
args.archives = Option(args.archives).map(resolveGlobPaths(_, hadoopConf)).orNull

// In client mode, download remote files.
var localPrimaryResource: String = null
var localJars: String = null
var localPyFiles: String = null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious why we don't download files and archives here as well? I guess for archives we would want to extract as well to keep same behavior at least for yarn. I guess this was originally for standalone and mesos mode but it seems like for consistency we should download archives.

It looks like the original one did download files? Did that get intentionally dropped?
4af3781

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of downloading is to make sure resource is correctly added to classpath of java or PYTHONPATH to start. But as for files it is not required to start the the Spark driver process or python process, also Spark's internal fileserver will handle remote URI, so in the previous PR I deliberately removed the support of files.

As for archives, as I know it is only used when running on yarn, so remote path should also be fine (YARN#client will figure out whether to upload or not). And previously we cannot leverage it in the driver side for yarn client mode, so here we still keep the same behavior. Do we need to change the semantics to down archives?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, so I guess the original one may have been to add things to the classpath or python path or the actual python file to run, but to me it seems inconsistent that we download some of these things and not others.

For example, my primary resource program that gets automatically downloaded from hdfs requires file X for the driver to read, but we don't download that automatically. So user has to manually download X (and possibly archive Y) but doesn't have to do the jars or primary resource. Normally if the user used --files they expect that file to be in ./filename. Of course I guess the way this is downloading it wouldn't be in ./ either. That is the same case for archives as well.

It seems to be me this was missed in the design of this feature. Even in their example using docker and marathon to run spark-submit in client mode won't work if they specify --files now and their driver needs that file. to me this is the same as needing the jar. They would have to package it in the docker image, thus defeating the purpose of the feature. I'm sure less people use --files in this case, but we shouldn't have inconsistent behavior.

if the user can't rely on spark to download everything they need to figure out which parts to download themselves or just do as is now and setup everything on the gateway in which case the downloads happening here are just extra.

This to me just sounds like I need to explain things to users even more and they are going to be confused or I just tell them to run like they are now.

anyway, I kind of think this feature shouldn't have went in if we can't make it act just like it was running on the cluster. But I think making it run like on the cluster is really hard with out changing users running directory, which again could break other things. If we download things to ./ then you could do bad things as well that the user doesn't expect.

Honestly now I'm leaning towards we should have this configurable with default off. I see the one use case it was added for, but even that is incomplete in my opinion.

Adding the original author and reviewers: @loneknightpy @cloud-fan @gatorsmile

thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Files are, strictly speaking, something that never has been exposed to the driver. They are exposed to the driver in the current working directory when running in YARN cluster mode, but that's the exception, not the rule.

It should be possible to make that work for any driver (e.g. download / copy them to a temp directory and return that path from SparkEnv.get.sparkFilesDir), but I'd count that as a new feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tgravescs , if we also download files/archives to local path, then how do we leverage them, since we don't expose the path to the user, even with previous code downloaded files seems never can be used for driver. So for the semantic completeness, we still need to change some codes to support this feature as what @vanzin mentioned.

I agree with you current state of the code is confused for user (some are downloaded which other are not). I think we could fix it in the following PR, what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I guess yarn is special in that case. Actually I see the help menu for --files and --archives is wrong for yarn as it says on the executors when it really goes to both executors and application master/driver. The original feature was for mesos so which makes more sense there.

And really I guess most people aren't even going to know about this feature so unless they stumble into it running yarn client mode most will just continue to specify things as local filesystem.

@jerryshao you are right, like I mentioned above to take advantage of files/archives on yarn you would have to be in ./ which isn't trivial. Unless someone requests it lets just drop it.

Sorry for the tangent, I'm fine with it as is.

if (deployMode == CLIENT) {
args.primaryResource = Option(args.primaryResource).map {
localPrimaryResource = Option(args.primaryResource).map {
downloadFile(_, targetDir, args.sparkProperties, hadoopConf)
}.orNull
args.jars = Option(args.jars).map {
localJars = Option(args.jars).map {
downloadFileList(_, targetDir, args.sparkProperties, hadoopConf)
}.orNull
args.pyFiles = Option(args.pyFiles).map {
localPyFiles = Option(args.pyFiles).map {
downloadFileList(_, targetDir, args.sparkProperties, hadoopConf)
}.orNull
}


// If we're running a python app, set the main class to our specific python runner
if (args.isPython && deployMode == CLIENT) {
if (args.primaryResource == PYSPARK_SHELL) {
Expand All @@ -351,7 +353,7 @@ object SparkSubmit extends CommandLineUtils {
// If a python file is provided, add it to the child arguments and list of files to deploy.
// Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
args.mainClass = "org.apache.spark.deploy.PythonRunner"
args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs
args.childArgs = ArrayBuffer(localPrimaryResource, localPyFiles) ++ args.childArgs
if (clusterManager != YARN) {
// The YARN backend distributes the primary file differently, so don't merge it.
args.files = mergeFileLists(args.files, args.primaryResource)
Expand All @@ -361,8 +363,8 @@ object SparkSubmit extends CommandLineUtils {
// The YARN backend handles python files differently, so don't merge the lists.
args.files = mergeFileLists(args.files, args.pyFiles)
}
if (args.pyFiles != null) {
sysProps("spark.submit.pyFiles") = args.pyFiles
if (localPyFiles != null) {
sysProps("spark.submit.pyFiles") = localPyFiles
}
}

Expand Down Expand Up @@ -416,7 +418,7 @@ object SparkSubmit extends CommandLineUtils {
// If an R file is provided, add it to the child arguments and list of files to deploy.
// Usage: RRunner <main R file> [app arguments]
args.mainClass = "org.apache.spark.deploy.RRunner"
args.childArgs = ArrayBuffer(args.primaryResource) ++ args.childArgs
args.childArgs = ArrayBuffer(localPrimaryResource) ++ args.childArgs
args.files = mergeFileLists(args.files, args.primaryResource)
}
}
Expand Down Expand Up @@ -484,15 +486,28 @@ object SparkSubmit extends CommandLineUtils {
sysProp = "spark.driver.cores"),
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
sysProp = "spark.driver.supervise"),
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy")
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"),

// An internal option used only for spark-shell to add user jars to repl's classloader,
// previously it uses "spark.jars" or "spark.yarn.dist.jars" which now may be pointed to
// remote jars, so adding a new option to only specify local jars for spark-shell internally.
OptionAssigner(localJars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.repl.local.jars")
)

// In client mode, launch the application main class directly
// In addition, add the main application jar and any added jars (if any) to the classpath
// Also add the main application jar and any added jars to classpath in case YARN client
// requires these jars.
if (deployMode == CLIENT || isYarnCluster) {
if (deployMode == CLIENT) {
childMainClass = args.mainClass
if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
childClasspath += localPrimaryResource
}
if (localJars != null) { childClasspath ++= localJars.split(",") }
}
// Add the main application jar and any added jars to classpath in case YARN client
// requires these jars.
// This assumes both primaryResource and user jars are local jars, eitherwise it will not be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

otherwise

// added to the classpath of YARN client.
if (isYarnCluster) {
if (isUserJar(args.primaryResource)) {
childClasspath += args.primaryResource
}
Expand Down Expand Up @@ -551,7 +566,7 @@ object SparkSubmit extends CommandLineUtils {
}

if (args.pyFiles != null) {
sysProps("spark.submit.pyFiles") = args.pyFiles
sysProps("spark.yarn.dist.pyFiles") = args.pyFiles
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ package object config {
.intConf
.createOptional

private[spark] val PY_FILES = ConfigBuilder("spark.submit.pyFiles")
private[spark] val PY_FILES = ConfigBuilder("spark.yarn.dist.pyFiles")
.internal()
.stringConf
.toSequence
Expand Down
25 changes: 15 additions & 10 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2594,18 +2594,23 @@ private[spark] object Utils extends Logging {
}

/**
* In YARN mode this method returns a union of the jar files pointed by "spark.jars" and the
* "spark.yarn.dist.jars" properties, while in other modes it returns the jar files pointed by
* only the "spark.jars" property.
* Return the jar files pointed by the "spark.jars" property. Spark internally will distribute
* these jars through file server. In the YARN mode, it will return an empty list, since YARN
* has its own mechanism to distribute jars.
*/
def getUserJars(conf: SparkConf, isShell: Boolean = false): Seq[String] = {
def getUserJars(conf: SparkConf): Seq[String] = {
val sparkJars = conf.getOption("spark.jars")
if (conf.get("spark.master") == "yarn" && isShell) {
val yarnJars = conf.getOption("spark.yarn.dist.jars")
unionFileLists(sparkJars, yarnJars).toSeq
} else {
sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
}
sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
}

/**
* Return the local jar files which will be added to REPL's classpath. These jar files are
* specified by --jars (spark.jars) or --packages, remote jars will be downloaded to local by
* SparkSubmit at first.
*/
def getLocalUserJarsForShell(conf: SparkConf): Seq[String] = {
val localJars = conf.getOption("spark.repl.local.jars")
localJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
}

private[spark] val REDACTION_REPLACEMENT_TEXT = "*********(redacted)"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ class SparkSubmitSuite
(Set(jar1.toURI.toString, jar2.toURI.toString))
sysProps("spark.yarn.dist.files").split(",").toSet should be
(Set(file1.toURI.toString, file2.toURI.toString))
sysProps("spark.submit.pyFiles").split(",").toSet should be
sysProps("spark.yarn.dist.pyFiles").split(",").toSet should be
(Set(pyFile1.getAbsolutePath, pyFile2.getAbsolutePath))
sysProps("spark.yarn.dist.archives").split(",").toSet should be
(Set(archive1.toURI.toString, archive2.toURI.toString))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ object Main extends Logging {
// Visible for testing
private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = {
interp = _interp
val jars = Utils.getUserJars(conf, isShell = true)
val jars = Utils.getLocalUserJarsForShell(conf)
// Remove file:///, file:// or file:/ scheme if exists for each jar
.map { x => if (x.startsWith("file:")) new File(new URI(x)).getPath else x }
.mkString(File.pathSeparator)
Expand Down