-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-1395] Fix "local:" URI support in Yarn mode (again). #560
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
bbdce05
1dfbb40
e5c682d
93c3f85
b2e377f
6dd5943
6a454ea
4e7f066
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
Recent changes ignored the fact that path may be defined with "local:" URIs, which means they need to be explicitly added to the classpath everywhere a remote process is started. This change fixes that by: - Using the correct methods to add paths to the classpath - Creating SparkConf settings for the Spark jar itself and for the user's jar - Propagating those two settings to the remote processes where needed This ensures that both in client and in cluster mode, the driver has the necessary info to build the executor's classpath and have things still work when they contain "local:" references. On the cleanup front, I removed the hacky way that log4j configuration was being propagated to handle the "local:" case. It's much more cleanly (and generically) handled by using spark-submit arguments (--files to upload a config file, or setting spark.executor.extraJavaOptions to pass JVM arguments and use a local file).
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -154,7 +154,7 @@ trait ClientBase extends Logging { | |
| } | ||
|
|
||
| /** Copy the file into HDFS if needed. */ | ||
| private def copyRemoteFile( | ||
| private[yarn] def copyRemoteFile( | ||
| dstDir: Path, | ||
| originalPath: Path, | ||
| replication: Short, | ||
|
|
@@ -213,10 +213,10 @@ trait ClientBase extends Logging { | |
|
|
||
| val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() | ||
|
|
||
| Map( | ||
| ClientBase.SPARK_JAR -> ClientBase.getSparkJar, ClientBase.APP_JAR -> args.userJar, | ||
| ClientBase.LOG4J_PROP -> System.getenv(ClientBase.LOG4J_CONF_ENV_KEY) | ||
| ).foreach { case(destName, _localPath) => | ||
| List( | ||
| (ClientBase.SPARK_JAR, ClientBase.sparkJar(sparkConf), ClientBase.CONF_SPARK_JAR), | ||
| (ClientBase.APP_JAR, args.userJar, ClientBase.CONF_SPARK_USER_JAR) | ||
| ).foreach { case(destName, _localPath, confKey) => | ||
| val localPath: String = if (_localPath != null) _localPath.trim() else "" | ||
| if (! localPath.isEmpty()) { | ||
| val localURI = new URI(localPath) | ||
|
|
@@ -225,6 +225,8 @@ trait ClientBase extends Logging { | |
| val destPath = copyRemoteFile(dst, qualifyForLocal(localURI), replication, setPermissions) | ||
| distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, | ||
| destName, statCache) | ||
| } else { | ||
| sparkConf.set(confKey, localPath) | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -246,6 +248,8 @@ trait ClientBase extends Logging { | |
| if (addToClasspath) { | ||
| cachedSecondaryJarLinks += linkname | ||
| } | ||
| } else if (addToClasspath) { | ||
| cachedSecondaryJarLinks += file.trim() | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -265,14 +269,10 @@ trait ClientBase extends Logging { | |
| val env = new HashMap[String, String]() | ||
|
|
||
| val extraCp = sparkConf.getOption("spark.driver.extraClassPath") | ||
| val log4jConf = System.getenv(ClientBase.LOG4J_CONF_ENV_KEY) | ||
| ClientBase.populateClasspath(yarnConf, sparkConf, log4jConf, env, extraCp) | ||
| ClientBase.populateClasspath(args, yarnConf, sparkConf, env, extraCp) | ||
| env("SPARK_YARN_MODE") = "true" | ||
| env("SPARK_YARN_STAGING_DIR") = stagingDir | ||
| env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() | ||
| if (log4jConf != null) { | ||
| env(ClientBase.LOG4J_CONF_ENV_KEY) = log4jConf | ||
| } | ||
|
|
||
| // Set the environment variables to be passed on to the executors. | ||
| distCacheMgr.setDistFilesEnv(env) | ||
|
|
@@ -286,6 +286,7 @@ trait ClientBase extends Logging { | |
| env("SPARK_YARN_USER_ENV") = userEnvs | ||
| } | ||
|
|
||
| logInfo(s"ApplicationMaster environment: $env") | ||
| env | ||
| } | ||
|
|
||
|
|
@@ -364,7 +365,6 @@ trait ClientBase extends Logging { | |
| sys.props.get("spark.driver.extraJavaOptions").foreach(opts => javaOpts += opts) | ||
| sys.props.get("spark.driver.libraryPath").foreach(p => javaOpts += s"-Djava.library.path=$p") | ||
| } | ||
| javaOpts += ClientBase.getLog4jConfiguration(localResources) | ||
|
|
||
| // Command for the ApplicationMaster | ||
| val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++ | ||
|
|
@@ -391,12 +391,31 @@ trait ClientBase extends Logging { | |
| object ClientBase extends Logging { | ||
| val SPARK_JAR: String = "__spark__.jar" | ||
| val APP_JAR: String = "__app__.jar" | ||
| val LOG4J_PROP: String = "log4j.properties" | ||
| val LOG4J_CONF_ENV_KEY: String = "SPARK_LOG4J_CONF" | ||
| val LOCAL_SCHEME = "local" | ||
| val CONF_SPARK_JAR = "spark.yarn.jar" | ||
| val CONF_SPARK_USER_JAR = "spark.yarn.user.jar" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. when exactly would this be used? You have to specify the jar with spark-submit.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nevermind I assume its only meant to be used internally.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, the user jar is an internal thing. It could probably be made not necessary by having SparkSubmit add it to the head of the "addJars" list, though. I also had it on my list to check how SparkSubmit deals with a local: jar (I believe it doesn't), but both of those can be handled separately.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please add a comment explaining meant to be used internally only |
||
| val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. adding a comment here about what spark.yarn.secondary.jars is would be nice too if you don't mind. |
||
| val ENV_SPARK_JAR = "SPARK_JAR" | ||
|
|
||
| def getSparkJar = sys.env.get("SPARK_JAR").getOrElse(SparkContext.jarOfClass(this.getClass).head) | ||
| /** | ||
| * Find the user-defined Spark jar if configured, or return the jar containing this | ||
| * class if not. | ||
| * | ||
| * This method first looks in the SparkConf object for the CONF_SPARK_JAR key, and in the | ||
| * user environment if that is not found (for backwards compatibility). | ||
| */ | ||
| def sparkJar(conf: SparkConf) = { | ||
| if (conf.contains(CONF_SPARK_JAR)) { | ||
| conf.get(CONF_SPARK_JAR) | ||
| } else if (System.getenv(ENV_SPARK_JAR) != null) { | ||
| logWarning( | ||
| s"$ENV_SPARK_JAR detected in the system environment. This variable has been deprecated " + | ||
| s"in favor of the $CONF_SPARK_JAR configuration variable.") | ||
| System.getenv(ENV_SPARK_JAR) | ||
| } else { | ||
| SparkContext.jarOfClass(this.getClass).head | ||
| } | ||
| } | ||
|
|
||
| def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) = { | ||
| val classPathElementsToAdd = getYarnAppClasspath(conf) ++ getMRAppClasspath(conf) | ||
|
|
@@ -469,71 +488,75 @@ object ClientBase extends Logging { | |
| triedDefault.toOption | ||
| } | ||
|
|
||
| def populateClasspath(args: ClientArguments, conf: Configuration, sparkConf: SparkConf, | ||
| env: HashMap[String, String], extraClassPath: Option[String] = None) { | ||
| extraClassPath.foreach(addClasspathEntry(_, env)) | ||
| addClasspathEntry(Environment.PWD.$(), env) | ||
|
|
||
| // Normally the users app.jar is last in case conflicts with spark jars | ||
| if (sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean) { | ||
| addUserClasspath(args, sparkConf, env) | ||
| addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env) | ||
| ClientBase.populateHadoopClasspath(conf, env) | ||
| } else { | ||
| addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env) | ||
| ClientBase.populateHadoopClasspath(conf, env) | ||
| addUserClasspath(args, sparkConf, env) | ||
| } | ||
|
|
||
| // Append all class files and jar files under the working directory to the classpath. | ||
| addFileToClasspath("*", null, env) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. PWD/* will never be added because its not local:// and alternative name is null. We need to make sure * gets added. |
||
| } | ||
|
|
||
| /** | ||
| * Returns the java command line argument for setting up log4j. If there is a log4j.properties | ||
| * in the given local resources, it is used, otherwise the SPARK_LOG4J_CONF environment variable | ||
| * is checked. | ||
| * Adds the user jars which have local: URIs (or alternate names, such as APP_JAR) explicitly | ||
| * to the classpath. | ||
| */ | ||
| def getLog4jConfiguration(localResources: HashMap[String, LocalResource]): String = { | ||
| var log4jConf = LOG4J_PROP | ||
| if (!localResources.contains(log4jConf)) { | ||
| log4jConf = System.getenv(LOG4J_CONF_ENV_KEY) match { | ||
| case conf: String => | ||
| val confUri = new URI(conf) | ||
| if (ClientBase.LOCAL_SCHEME.equals(confUri.getScheme())) { | ||
| "file://" + confUri.getPath() | ||
| } else { | ||
| ClientBase.LOG4J_PROP | ||
| } | ||
| case null => "log4j-spark-container.properties" | ||
| private def addUserClasspath(args: ClientArguments, conf: SparkConf, | ||
| env: HashMap[String, String]) = { | ||
| if (args != null) { | ||
| addFileToClasspath(args.userJar, APP_JAR, env) | ||
| if (args.addJars != null) { | ||
| args.addJars.split(",").foreach { case file: String => | ||
| addFileToClasspath(file, null, env) | ||
| } | ||
| } | ||
| } else { | ||
| val userJar = conf.getOption(CONF_SPARK_USER_JAR).getOrElse(null) | ||
| addFileToClasspath(userJar, APP_JAR, env) | ||
|
|
||
| val cachedSecondaryJarLinks = | ||
| conf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",") | ||
| cachedSecondaryJarLinks.foreach(jar => addFileToClasspath(jar, null, env)) | ||
| } | ||
| " -Dlog4j.configuration=" + log4jConf | ||
| } | ||
|
|
||
| def populateClasspath(conf: Configuration, sparkConf: SparkConf, log4jConf: String, | ||
| env: HashMap[String, String], extraClassPath: Option[String] = None) { | ||
|
|
||
| if (log4jConf != null) { | ||
| // If a custom log4j config file is provided as a local: URI, add its parent directory to the | ||
| // classpath. Note that this only works if the custom config's file name is | ||
| // "log4j.properties". | ||
| val localPath = getLocalPath(log4jConf) | ||
| if (localPath != null) { | ||
| val parentPath = new File(localPath).getParent() | ||
| YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, parentPath, | ||
| File.pathSeparator) | ||
| /** | ||
| * Adds the given path to the classpath, handling "local:" URIs correctly. | ||
| * | ||
| * If an alternate name for the file is given, and it's not a "local:" file, the alternate | ||
| * name will be added to the classpath (relative to the job's work directory). | ||
| * | ||
| * If not a "local:" file and no alternate name, the environment is not modified. | ||
| * | ||
| * @param path Path to add to classpath (optional). | ||
| * @param fileName Alternate name for the file (optional). | ||
| * @param env Map holding the environment variables. | ||
| */ | ||
| private def addFileToClasspath(path: String, fileName: String, | ||
| env: HashMap[String, String]) : Unit = { | ||
| if (path != null) { | ||
| scala.util.control.Exception.ignoring(classOf[URISyntaxException]) { | ||
| val localPath = getLocalPath(path) | ||
| if (localPath != null) { | ||
| addClasspathEntry(localPath, env) | ||
| return | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** Add entry to the classpath. */ | ||
| def addClasspathEntry(path: String) = YarnSparkHadoopUtil.addToEnvironment(env, | ||
| Environment.CLASSPATH.name, path, File.pathSeparator) | ||
| /** Add entry to the classpath. Interpreted as a path relative to the working directory. */ | ||
| def addPwdClasspathEntry(entry: String) = | ||
| addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + entry) | ||
|
|
||
| extraClassPath.foreach(addClasspathEntry) | ||
|
|
||
| val cachedSecondaryJarLinks = | ||
| sparkConf.getOption(CONF_SPARK_YARN_SECONDARY_JARS).getOrElse("").split(",") | ||
| .filter(_.nonEmpty) | ||
| // Normally the users app.jar is last in case conflicts with spark jars | ||
| if (sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean) { | ||
| addPwdClasspathEntry(APP_JAR) | ||
| cachedSecondaryJarLinks.foreach(addPwdClasspathEntry) | ||
| addPwdClasspathEntry(SPARK_JAR) | ||
| ClientBase.populateHadoopClasspath(conf, env) | ||
| } else { | ||
| addPwdClasspathEntry(SPARK_JAR) | ||
| ClientBase.populateHadoopClasspath(conf, env) | ||
| addPwdClasspathEntry(APP_JAR) | ||
| cachedSecondaryJarLinks.foreach(addPwdClasspathEntry) | ||
| if (fileName != null) { | ||
| addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + fileName, env); | ||
| } | ||
| // Append all class files and jar files under the working directory to the classpath. | ||
| addClasspathEntry(Environment.PWD.$()) | ||
| addPwdClasspathEntry("*") | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -547,4 +570,8 @@ object ClientBase extends Logging { | |
| null | ||
| } | ||
|
|
||
| private def addClasspathEntry(path: String, env: HashMap[String, String]) = | ||
| YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, path, | ||
| File.pathSeparator) | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,20 +17,29 @@ | |
|
|
||
| package org.apache.spark.deploy.yarn | ||
|
|
||
| import java.io.File | ||
| import java.net.URI | ||
|
|
||
| import com.google.common.io.Files | ||
| import org.apache.hadoop.conf.Configuration | ||
| import org.apache.hadoop.fs.Path | ||
| import org.apache.hadoop.mapreduce.MRJobConfig | ||
| import org.apache.hadoop.yarn.conf.YarnConfiguration | ||
| import org.apache.hadoop.yarn.api.ApplicationConstants.Environment | ||
|
|
||
| import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse | ||
| import org.apache.hadoop.yarn.api.records.ContainerLaunchContext | ||
| import org.apache.hadoop.yarn.conf.YarnConfiguration | ||
| import org.mockito.Matchers._ | ||
| import org.mockito.Mockito._ | ||
| import org.scalatest.FunSuite | ||
| import org.scalatest.matchers.ShouldMatchers._ | ||
|
|
||
| import scala.collection.JavaConversions._ | ||
| import scala.collection.mutable.{ HashMap => MutableHashMap } | ||
| import scala.util.Try | ||
|
|
||
| import org.apache.spark.SparkConf | ||
| import org.apache.spark.util.Utils | ||
|
|
||
| class ClientBaseSuite extends FunSuite { | ||
|
|
||
|
|
@@ -68,6 +77,65 @@ class ClientBaseSuite extends FunSuite { | |
| } | ||
| } | ||
|
|
||
| private val SPARK = "local:/sparkJar" | ||
| private val USER = "local:/userJar" | ||
| private val ADDED = "local:/addJar1,local:/addJar2,/addJar3" | ||
|
|
||
| test("Local jar URIs") { | ||
| val conf = new Configuration() | ||
| val sparkConf = new SparkConf().set(ClientBase.CONF_SPARK_JAR, SPARK) | ||
| val env = new MutableHashMap[String, String]() | ||
| val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf) | ||
|
|
||
| ClientBase.populateClasspath(args, conf, sparkConf, env, None) | ||
|
|
||
| val jars = env("CLASSPATH").split(File.pathSeparator) | ||
| s"$SPARK,$USER,$ADDED".split(",").foreach({ jar => | ||
| val uri = new URI(jar) | ||
| if (ClientBase.LOCAL_SCHEME.equals(uri.getScheme())) { | ||
| jars should contain (uri.getPath()) | ||
| } else { | ||
| jars should not contain (uri.getPath()) | ||
| } | ||
| }) | ||
| jars should not contain (ClientBase.SPARK_JAR) | ||
| jars should not contain (ClientBase.APP_JAR) | ||
| } | ||
|
|
||
| test("Jar path propagation through SparkConf") { | ||
| val conf = new Configuration() | ||
| val sparkConf = new SparkConf().set(ClientBase.CONF_SPARK_JAR, SPARK) | ||
| val yarnConf = new YarnConfiguration() | ||
| val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf) | ||
|
|
||
| val client = spy(new DummyClient(args, conf, sparkConf, yarnConf)) | ||
| doReturn(new Path("/")).when(client).copyRemoteFile(any(classOf[Path]), | ||
| any(classOf[Path]), any(classOf[Short]), any(classOf[Boolean])) | ||
|
|
||
| var tempDir = Files.createTempDir(); | ||
| try { | ||
| client.prepareLocalResources(tempDir.getAbsolutePath()) | ||
| sparkConf.getOption(ClientBase.CONF_SPARK_USER_JAR) should be (Some(USER)) | ||
|
|
||
| // The non-local path should be propagated by name only, since it will end up in the app's | ||
| // staging dir. | ||
| val expected = ADDED.split(",") | ||
| .map(p => { | ||
| val uri = new URI(p) | ||
| if (ClientBase.LOCAL_SCHEME == uri.getScheme()) { | ||
| p | ||
| } else { | ||
| Option(uri.getFragment()).getOrElse(new File(p).getName()) | ||
| } | ||
| }) | ||
| .mkString(",") | ||
|
|
||
| sparkConf.getOption(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS) should be (Some(expected)) | ||
| } finally { | ||
| Utils.deleteRecursively(tempDir) | ||
| } | ||
| } | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you please add a test for PWD/* |
||
| object Fixtures { | ||
|
|
||
| val knownDefYarnAppCP: Seq[String] = | ||
|
|
@@ -109,4 +177,18 @@ class ClientBaseSuite extends FunSuite { | |
| def getFieldValue[A, B](clazz: Class[_], field: String, defaults: => B)(mapTo: A => B): B = | ||
| Try(clazz.getField(field)).map(_.get(null).asInstanceOf[A]).toOption.map(mapTo).getOrElse(defaults) | ||
|
|
||
| private class DummyClient( | ||
| val args: ClientArguments, | ||
| val conf: Configuration, | ||
| val sparkConf: SparkConf, | ||
| val yarnConf: YarnConfiguration) extends ClientBase { | ||
|
|
||
| override def calculateAMMemory(newApp: GetNewApplicationResponse): Int = | ||
| throw new UnsupportedOperationException() | ||
|
|
||
| override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = | ||
| throw new UnsupportedOperationException() | ||
|
|
||
| } | ||
|
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you document the config