-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-21714][CORE][YARN] Avoiding re-uploading remote resources in yarn client mode #18962
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -330,18 +330,45 @@ object SparkSubmit extends CommandLineUtils { | |
| args.archives = Option(args.archives).map(resolveGlobPaths(_, hadoopConf)).orNull | ||
|
|
||
| // In client mode, download remote files. | ||
| var localPrimaryResource: String = null | ||
| var localJars: String = null | ||
| var localPyFiles: String = null | ||
| if (deployMode == CLIENT) { | ||
| args.primaryResource = Option(args.primaryResource).map { | ||
| localPrimaryResource = Option(args.primaryResource).map { | ||
| downloadFile(_, targetDir, args.sparkProperties, hadoopConf) | ||
| }.orNull | ||
| args.jars = Option(args.jars).map { | ||
| localJars = Option(args.jars).map { | ||
| downloadFileList(_, targetDir, args.sparkProperties, hadoopConf) | ||
| }.orNull | ||
| args.pyFiles = Option(args.pyFiles).map { | ||
| localPyFiles = Option(args.pyFiles).map { | ||
| downloadFileList(_, targetDir, args.sparkProperties, hadoopConf) | ||
| }.orNull | ||
| } | ||
|
|
||
| if (clusterManager == YARN) { | ||
| def isNoneFsFileExist(paths: String): Boolean = { | ||
| Option(paths).exists { p => | ||
| p.split(",").map(_.trim).filter(_.nonEmpty).exists { path => | ||
| val url = Utils.resolveURI(path) | ||
| url.getScheme match { | ||
| case "http" | "https" | "ftp" => true | ||
| case _ => false | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Spark on YARN doesn't support upload remote resources from http, https or ftp server | ||
| // directly to distributed cache, so print a warning and exit the process. | ||
| if (isNoneFsFileExist(args.jars) || | ||
|
||
| isNoneFsFileExist(args.files) || | ||
| isNoneFsFileExist(args.primaryResource) || | ||
| isNoneFsFileExist(args.pyFiles) || | ||
| isNoneFsFileExist(args.archives)) { | ||
| printErrorAndExit( | ||
| "Spark on YARN doesn't support resources on remote http, https or ftp server.") | ||
| } | ||
| } | ||
|
|
||
| // If we're running a python app, set the main class to our specific python runner | ||
| if (args.isPython && deployMode == CLIENT) { | ||
|
|
@@ -351,7 +378,7 @@ object SparkSubmit extends CommandLineUtils { | |
| // If a python file is provided, add it to the child arguments and list of files to deploy. | ||
| // Usage: PythonAppRunner <main python file> <extra python files> [app arguments] | ||
| args.mainClass = "org.apache.spark.deploy.PythonRunner" | ||
| args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs | ||
| args.childArgs = ArrayBuffer(localPrimaryResource, localPyFiles) ++ args.childArgs | ||
| if (clusterManager != YARN) { | ||
| // The YARN backend distributes the primary file differently, so don't merge it. | ||
| args.files = mergeFileLists(args.files, args.primaryResource) | ||
|
|
@@ -361,8 +388,8 @@ object SparkSubmit extends CommandLineUtils { | |
| // The YARN backend handles python files differently, so don't merge the lists. | ||
| args.files = mergeFileLists(args.files, args.pyFiles) | ||
| } | ||
| if (args.pyFiles != null) { | ||
| sysProps("spark.submit.pyFiles") = args.pyFiles | ||
| if (localPyFiles != null) { | ||
| sysProps("spark.submit.pyFiles") = localPyFiles | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -416,7 +443,7 @@ object SparkSubmit extends CommandLineUtils { | |
| // If an R file is provided, add it to the child arguments and list of files to deploy. | ||
| // Usage: RRunner <main R file> [app arguments] | ||
| args.mainClass = "org.apache.spark.deploy.RRunner" | ||
| args.childArgs = ArrayBuffer(args.primaryResource) ++ args.childArgs | ||
| args.childArgs = ArrayBuffer(localPrimaryResource) ++ args.childArgs | ||
| args.files = mergeFileLists(args.files, args.primaryResource) | ||
| } | ||
| } | ||
|
|
@@ -461,6 +488,7 @@ object SparkSubmit extends CommandLineUtils { | |
| OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"), | ||
| OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES, | ||
| sysProp = "spark.executor.instances"), | ||
| OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.pyFiles"), | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "spark.submit.pyFiles" both used in python and YARN#client, where in python it requires local files, but in YARN#client we can support the remote files, so using one configuration to control two scenarios is hard to do. In YARN I use a new configuration only for pyfiles to add to distributed cache. |
||
| OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.jars"), | ||
| OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.files"), | ||
| OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.archives"), | ||
|
|
@@ -484,15 +512,28 @@ object SparkSubmit extends CommandLineUtils { | |
| sysProp = "spark.driver.cores"), | ||
| OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER, | ||
| sysProp = "spark.driver.supervise"), | ||
| OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy") | ||
| OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"), | ||
|
|
||
| // An internal option used only for spark-shell to add user jars to repl's classloader, | ||
| // previously it uses "spark.jars" or "spark.yarn.dist.jars" which now may be pointed to | ||
| // remote jars, so adding a new option to only specify local jars for spark-shell internally. | ||
| OptionAssigner(localJars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.repl.local.jars") | ||
| ) | ||
|
|
||
| // In client mode, launch the application main class directly | ||
| // In addition, add the main application jar and any added jars (if any) to the classpath | ||
| // Also add the main application jar and any added jars to classpath in case YARN client | ||
| // requires these jars. | ||
| if (deployMode == CLIENT || isYarnCluster) { | ||
| if (deployMode == CLIENT) { | ||
| childMainClass = args.mainClass | ||
| if (localPrimaryResource != null && isUserJar(localPrimaryResource)) { | ||
| childClasspath += localPrimaryResource | ||
| } | ||
| if (localJars != null) { childClasspath ++= localJars.split(",") } | ||
| } | ||
| // Add the main application jar and any added jars to classpath in case YARN client | ||
| // requires these jars. | ||
| // This assumes both primaryResource and user jars are local jars, eitherwise it will not be | ||
|
||
| // added to the classpath of YARN client. | ||
| if (isYarnCluster) { | ||
| if (isUserJar(args.primaryResource)) { | ||
| childClasspath += args.primaryResource | ||
| } | ||
|
|
@@ -549,10 +590,6 @@ object SparkSubmit extends CommandLineUtils { | |
| if (args.isPython) { | ||
| sysProps.put("spark.yarn.isPython", "true") | ||
| } | ||
|
|
||
| if (args.pyFiles != null) { | ||
| sysProps("spark.submit.pyFiles") = args.pyFiles | ||
| } | ||
| } | ||
|
|
||
| // assure a keytab is available from any place in a JVM | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious why we don't download files and archives here as well? I guess for archives we would want to extract as well to keep same behavior at least for yarn. I guess this was originally for standalone and mesos mode but it seems like for consistency we should download archives.
It looks like the original one did download files? Did that get intentionally dropped?
4af3781
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The purpose of downloading is to make sure resource is correctly added to classpath of java or PYTHONPATH to start. But as for files it is not required to start the the Spark driver process or python process, also Spark's internal fileserver will handle remote URI, so in the previous PR I deliberately removed the support of files.
As for archives, as I know it is only used when running on yarn, so remote path should also be fine (YARN#client will figure out whether to upload or not). And previously we cannot leverage it in the driver side for yarn client mode, so here we still keep the same behavior. Do we need to change the semantics to down archives?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, so I guess the original one may have been to add things to the classpath or python path or the actual python file to run, but to me it seems inconsistent that we download some of these things and not others.
For example, my primary resource program that gets automatically downloaded from hdfs requires file X for the driver to read, but we don't download that automatically. So user has to manually download X (and possibly archive Y) but doesn't have to do the jars or primary resource. Normally if the user used --files they expect that file to be in ./filename. Of course I guess the way this is downloading it wouldn't be in ./ either. That is the same case for archives as well.
It seems to be me this was missed in the design of this feature. Even in their example using docker and marathon to run spark-submit in client mode won't work if they specify --files now and their driver needs that file. to me this is the same as needing the jar. They would have to package it in the docker image, thus defeating the purpose of the feature. I'm sure less people use --files in this case, but we shouldn't have inconsistent behavior.
if the user can't rely on spark to download everything they need to figure out which parts to download themselves or just do as is now and setup everything on the gateway in which case the downloads happening here are just extra.
This to me just sounds like I need to explain things to users even more and they are going to be confused or I just tell them to run like they are now.
anyway, I kind of think this feature shouldn't have went in if we can't make it act just like it was running on the cluster. But I think making it run like on the cluster is really hard with out changing users running directory, which again could break other things. If we download things to ./ then you could do bad things as well that the user doesn't expect.
Honestly now I'm leaning towards we should have this configurable with default off. I see the one use case it was added for, but even that is incomplete in my opinion.
Adding the original author and reviewers: @loneknightpy @cloud-fan @gatorsmile
thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Files are, strictly speaking, something that never has been exposed to the driver. They are exposed to the driver in the current working directory when running in YARN cluster mode, but that's the exception, not the rule.
It should be possible to make that work for any driver (e.g. download / copy them to a temp directory and return that path from
SparkEnv.get.sparkFilesDir), but I'd count that as a new feature.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tgravescs , if we also download files/archives to local path, then how do we leverage them, since we don't expose the path to the user, even with previous code downloaded files seems never can be used for driver. So for the semantic completeness, we still need to change some codes to support this feature as what @vanzin mentioned.
I agree with you current state of the code is confused for user (some are downloaded which other are not). I think we could fix it in the following PR, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I guess yarn is special in that case. Actually I see the help menu for --files and --archives is wrong for yarn as it says on the executors when it really goes to both executors and application master/driver. The original feature was for mesos so which makes more sense there.
And really I guess most people aren't even going to know about this feature so unless they stumble into it running yarn client mode most will just continue to specify things as local filesystem.
@jerryshao you are right, like I mentioned above to take advantage of files/archives on yarn you would have to be in ./ which isn't trivial. Unless someone requests it lets just drop it.
Sorry for the tangent, I'm fine with it as is.