-
Notifications
You must be signed in to change notification settings - Fork 29k
Clean up and simplify Spark configuration #299
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
ac2d65e
0faa3b6
6eaf7d0
4982331
1f75238
84cc5e5
5b0ba8e
7cc70e4
761ebcd
437aed1
46555c1
b72d183
ace4ead
b08893b
afc9ed8
4ee6f9d
c2a2909
be42f35
e83cd8f
308f1f6
fda0301
ffa00fe
a762901
d50c388
a56b125
af0adf7
b16e6a2
af09e3e
0086939
b4b496c
a006464
127f301
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,11 +17,13 @@ | |
|
|
||
| package org.apache.spark.deploy | ||
|
|
||
| import java.io.{PrintStream, File} | ||
| import java.io.{FileInputStream, PrintStream, File} | ||
| import java.net.URL | ||
| import java.util.Properties | ||
|
|
||
| import org.apache.spark.executor.ExecutorURLClassLoader | ||
|
|
||
| import scala.collection.JavaConversions._ | ||
| import scala.collection.mutable.ArrayBuffer | ||
| import scala.collection.mutable.HashMap | ||
| import scala.collection.mutable.Map | ||
|
|
@@ -108,6 +110,21 @@ object SparkSubmit { | |
| val sysProps = new HashMap[String, String]() | ||
| var childMainClass = "" | ||
|
|
||
| // Load system properties by default from the file, if present | ||
| if (appArgs.verbose) printStream.println(s"Using properties file: ${appArgs.propertiesFile}") | ||
| Option(appArgs.propertiesFile).map { filename => | ||
| val file = new File(filename) | ||
| getDefaultProperties(file).foreach { case (k, v) => | ||
| if (k.startsWith("spark")) { | ||
| sysProps(k) = v | ||
| if (appArgs.verbose) printStream.println(s"Adding default property: $k=$v") | ||
| } | ||
| else { | ||
| printWarning(s"Ignoring non-spark config property: $k=$v") | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if (clusterManager == MESOS && deployOnCluster) { | ||
| printErrorAndExit("Mesos does not support running the driver on the cluster") | ||
| } | ||
|
|
@@ -191,11 +208,11 @@ object SparkSubmit { | |
| sysProps: Map[String, String], childMainClass: String, verbose: Boolean = false) { | ||
|
|
||
| if (verbose) { | ||
| System.err.println(s"Main class:\n$childMainClass") | ||
| System.err.println(s"Arguments:\n${childArgs.mkString("\n")}") | ||
| System.err.println(s"System properties:\n${sysProps.mkString("\n")}") | ||
| System.err.println(s"Classpath elements:\n${childClasspath.mkString("\n")}") | ||
| System.err.println("\n") | ||
| printStream.println(s"Main class:\n$childMainClass") | ||
| printStream.println(s"Arguments:\n${childArgs.mkString("\n")}") | ||
| printStream.println(s"System properties:\n${sysProps.mkString("\n")}") | ||
| printStream.println(s"Classpath elements:\n${childClasspath.mkString("\n")}") | ||
| printStream.println("\n") | ||
| } | ||
|
|
||
| val loader = new ExecutorURLClassLoader(new Array[URL](0), | ||
|
|
@@ -224,6 +241,13 @@ object SparkSubmit { | |
| val url = localJarFile.getAbsoluteFile.toURI.toURL | ||
| loader.addURL(url) | ||
| } | ||
|
|
||
| private def getDefaultProperties(file: File): Seq[(String, String)] = { | ||
| val inputStream = new FileInputStream(file) | ||
| val properties = new Properties() | ||
| properties.load(inputStream) | ||
| properties.stringPropertyNames().toSeq.map(k => (k, properties(k))) | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would be good to add a try catch here (or just throw a nice exception) |
||
| } | ||
|
|
||
| private[spark] class OptionAssigner(val value: String, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |
| package org.apache.spark.deploy | ||
|
|
||
| import scala.collection.mutable.ArrayBuffer | ||
| import java.io.File | ||
|
|
||
| /** | ||
| * Parses and encapsulates arguments from the spark-submit script. | ||
|
|
@@ -28,6 +29,7 @@ private[spark] class SparkSubmitArguments(args: Array[String]) { | |
| var executorMemory: String = null | ||
| var executorCores: String = null | ||
| var totalExecutorCores: String = null | ||
| var propertiesFile: String = null | ||
| var driverMemory: String = null | ||
| var driverCores: String = null | ||
| var supervise: Boolean = false | ||
|
|
@@ -49,6 +51,15 @@ private[spark] class SparkSubmitArguments(args: Array[String]) { | |
| if (args.length == 0) printUsageAndExit(-1) | ||
| if (primaryResource == null) SparkSubmit.printErrorAndExit("Must specify a primary resource") | ||
| if (mainClass == null) SparkSubmit.printErrorAndExit("Must specify a main class with --class") | ||
| if (propertiesFile == null) { | ||
| val sparkHome = sys.env("SPARK_HOME") // defined via `spark-class` | ||
| val sep = File.separator | ||
| val defaultPath = s"${sparkHome}${sep}conf${sep}spark-defaults.properties" | ||
| val file = new File(defaultPath) | ||
| if (file.exists()) { | ||
| propertiesFile = file.getAbsolutePath | ||
| } | ||
| } | ||
|
|
||
| override def toString = { | ||
| s"""Parsed arguments: | ||
|
|
@@ -57,8 +68,9 @@ private[spark] class SparkSubmitArguments(args: Array[String]) { | |
| | executorMemory $executorMemory | ||
| | executorCores $executorCores | ||
| | totalExecutorCores $totalExecutorCores | ||
| | propertiesFile $propertiesFile | ||
| | driverMemory $driverMemory | ||
| | drivercores $driverCores | ||
| | driverCores $driverCores | ||
| | supervise $supervise | ||
| | queue $queue | ||
| | numExecutors $numExecutors | ||
|
|
@@ -122,6 +134,10 @@ private[spark] class SparkSubmitArguments(args: Array[String]) { | |
| driverCores = value | ||
| parseOpts(tail) | ||
|
|
||
| case ("--properties-file") :: value :: tail => | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this isn't being printed in usage of spark-submit script. |
||
| propertiesFile = value | ||
| parseOpts(tail) | ||
|
|
||
| case ("--supervise") :: tail => | ||
| supervise = true | ||
| parseOpts(tail) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.util.{Records, Apps} | |
| import org.apache.spark.{Logging, SparkConf} | ||
| import org.apache.spark.util.Utils | ||
| import org.apache.spark.deploy.SparkHadoopUtil | ||
| import org.apache.spark.deploy.ExecutorLauncher | ||
| import org.apache.hadoop.yarn.api.ApplicationConstants.Environment | ||
|
|
||
|
|
||
|
|
@@ -340,8 +341,19 @@ trait ClientBase extends Logging { | |
| JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 " | ||
| } | ||
|
|
||
| if (env.isDefinedAt("SPARK_JAVA_OPTS")) { | ||
| JAVA_OPTS += " " + env("SPARK_JAVA_OPTS") | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removing support for this is to going to fail too many jobs which are currently run via cron, this is going to make things very messy.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mridulm we could add this back to make it backwards-compatible and give a warning. Would that make sense? Can you give examples of what people are setting in SPARK_JAVA_OPTS? Just curious how people are using it. Also, what does cron have to do with it?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Warning the user would be great, just not remove support for it :-)
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cron as in periodically run via oozie or just normal cron.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay sounds good. If you have examples of what values this is being used for would be helpful (e.g. are they setting GC settings, or is some application-specific system properties or what).
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. application specific defines, -X* config values, etc
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thinking a bit more, we have two options here: (a) make a backwards incompatible change here and people have to re-write there jobs I guess we can do (a) but I might give a loud error message here so that users change this.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm actually I'm not so sure. The existing behavior is really confusing because it means that if SPARK_JAVA_OPTS is set on the executors and the driver... the behavior is basically undefined. It might be worth it to bite the bullet here rather than continue to support this unpredictable behavior for a long time. |
||
| if (args.amClass == classOf[ExecutorLauncher].getName) { | ||
| // If we are being launched in client mode, forward the spark-conf options | ||
| // onto the executor launcher | ||
| for ((k, v) <- sparkConf.getAll) { | ||
| JAVA_OPTS += s"-D$k=$v" | ||
| } | ||
| } else { | ||
| // If we are being launched in standalone mode, capture and forward any spark | ||
| // system properties (e.g. set by spark-class). | ||
| for ((k, v) <- sys.props.filterKeys(_.startsWith("spark"))) { | ||
| JAVA_OPTS += s"-D$k=$v" | ||
| } | ||
| } | ||
|
|
||
| if (!localResources.contains(ClientBase.LOG4J_PROP)) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,7 +29,7 @@ import org.apache.spark.util.Utils | |
| */ | ||
| private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) { | ||
|
|
||
| def this(sc: SparkContext) = this(sc, new Configuration()) | ||
| def this(sc: SparkContext) = this(sc, sc.getConf) | ||
|
||
|
|
||
| // By default, rack is unknown | ||
| override def getRackForHost(hostPort: String): Option[String] = { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
foreach