-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-2849] Handle driver configs separately in client mode #1845
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
250cb95
a2ab1b0
0025474
63ed2e9
75ee6b4
8843562
98dd8e3
130f295
4edcaa8
e5cfb46
4ec22a1
ef12f74
fa2136e
dec2343
a4df3c4
de765c9
8e552b7
c13a2cb
c854859
1cdc6b1
45a1eb9
aabfc7e
a992ae2
c7b9926
5d8f8c4
e793e5f
c2273fc
b3c4cd5
4ae24c3
8d26a5c
2732ac0
aeb79c7
8d4614c
56ac247
bd0d468
be99eb3
371cac4
fa11ef8
7396be2
7a4190a
c886568
0effa1e
a396eda
c37e08d
3a8235d
7d94a8d
b71f52b
c84f5c8
158f813
a91ea19
1ea6bbe
d6488f9
19464ad
8867a09
9ba37e2
a78cb26
d0f20db
9a778f6
51aeb01
ff34728
08fd788
24dba60
bed4bdf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,14 +17,18 @@ | |
| # limitations under the License. | ||
| # | ||
|
|
||
| # NOTE: Any changes in this file must be reflected in SparkClassLauncher.scala! | ||
|
|
||
| export SPARK_HOME="$(cd `dirname $0`/..; pwd)" | ||
| ORIG_ARGS=("$@") | ||
|
|
||
| while (($#)); do | ||
| if [ "$1" = "--deploy-mode" ]; then | ||
| DEPLOY_MODE=$2 | ||
| SPARK_SUBMIT_DEPLOY_MODE=$2 | ||
| elif [ "$1" = "--properties-file" ]; then | ||
| SPARK_SUBMIT_PROPERTIES_FILE=$2 | ||
| elif [ "$1" = "--driver-memory" ]; then | ||
| DRIVER_MEMORY=$2 | ||
| export SPARK_SUBMIT_DRIVER_MEMORY=$2 | ||
| elif [ "$1" = "--driver-library-path" ]; then | ||
| export SPARK_SUBMIT_LIBRARY_PATH=$2 | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe SPARK_SUBMIT_OPTS has become a slightly public API since some people have used it when scripting to avoid exactly the problem this PR is trying to solve. So it might be to initialize SPARK_SUBMIT_JAVA_OPTS to the value of SPARK_SUBMIT_OPTS with a comment that it's for compatiblity. |
||
| elif [ "$1" = "--driver-class-path" ]; then | ||
|
|
@@ -35,10 +39,24 @@ while (($#)); do | |
| shift | ||
| done | ||
|
|
||
| DEPLOY_MODE=${DEPLOY_MODE:-"client"} | ||
| DEFAULT_PROPERTIES_FILE="$SPARK_HOME/conf/spark-defaults.conf" | ||
| export SPARK_SUBMIT_DEPLOY_MODE=${SPARK_SUBMIT_DEPLOY_MODE:-"client"} | ||
| export SPARK_SUBMIT_PROPERTIES_FILE=${SPARK_SUBMIT_PROPERTIES_FILE:-"$DEFAULT_PROPERTIES_FILE"} | ||
|
|
||
| # For client mode, the driver will be launched in the same JVM that launches | ||
| # SparkSubmit, so we may need to read the properties file for any extra class | ||
| # paths, library paths, java options and memory early on. Otherwise, it will | ||
| # be too late by the time the driver JVM has started. | ||
|
|
||
| if [ -n "$DRIVER_MEMORY" ] && [ $DEPLOY_MODE == "client" ]; then | ||
| export SPARK_DRIVER_MEMORY=$DRIVER_MEMORY | ||
| if [[ "$SPARK_SUBMIT_DEPLOY_MODE" == "client" && -f "$SPARK_SUBMIT_PROPERTIES_FILE" ]]; then | ||
| # Parse the properties file only if the special configs exist | ||
| contains_special_configs=$( | ||
| grep -e "spark.driver.extra*\|spark.driver.memory" "$SPARK_SUBMIT_PROPERTIES_FILE" | \ | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if SPARK_SUBMIT_PROPERTIES_FILE does not exist, will this line cause the script to fail? |
||
| grep -v "^[[:space:]]*#" | ||
| ) | ||
| if [ -n "$contains_special_configs" ]; then | ||
| export SPARK_SUBMIT_BOOTSTRAP_DRIVER=1 | ||
| fi | ||
| fi | ||
|
|
||
| exec $SPARK_HOME/bin/spark-class org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}" | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,149 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.deploy | ||
|
|
||
| import java.io.File | ||
|
|
||
| import scala.collection.JavaConversions._ | ||
|
|
||
| import org.apache.spark.util.{RedirectThread, Utils} | ||
|
|
||
| /** | ||
| * Launch an application through Spark submit in client mode with the appropriate classpath, | ||
| * library paths, java options and memory. These properties of the JVM must be set before the | ||
| * driver JVM is launched. The sole purpose of this class is to avoid handling the complexity | ||
| * of parsing the properties file for such relevant configs in Bash. | ||
| * | ||
| * Usage: org.apache.spark.deploy.SparkSubmitDriverBootstrapper <submit args> | ||
| */ | ||
| private[spark] object SparkSubmitDriverBootstrapper { | ||
|
|
||
| // Note: This class depends on the behavior of `bin/spark-class` and `bin/spark-submit`. | ||
| // Any changes made there must be reflected in this file. | ||
|
|
||
| def main(args: Array[String]): Unit = { | ||
|
|
||
| // This should be called only from `bin/spark-class` | ||
| if (!sys.env.contains("SPARK_CLASS")) { | ||
| System.err.println("SparkSubmitDriverBootstrapper must be called from `bin/spark-class`!") | ||
| System.exit(1) | ||
| } | ||
|
|
||
| val submitArgs = args | ||
| val runner = sys.env("RUNNER") | ||
| val classpath = sys.env("CLASSPATH") | ||
| val javaOpts = sys.env("JAVA_OPTS") | ||
| val defaultDriverMemory = sys.env("OUR_JAVA_MEM") | ||
|
|
||
| // Spark submit specific environment variables | ||
| val deployMode = sys.env("SPARK_SUBMIT_DEPLOY_MODE") | ||
| val propertiesFile = sys.env("SPARK_SUBMIT_PROPERTIES_FILE") | ||
| val bootstrapDriver = sys.env("SPARK_SUBMIT_BOOTSTRAP_DRIVER") | ||
| val submitDriverMemory = sys.env.get("SPARK_SUBMIT_DRIVER_MEMORY") | ||
| val submitLibraryPath = sys.env.get("SPARK_SUBMIT_LIBRARY_PATH") | ||
| val submitClasspath = sys.env.get("SPARK_SUBMIT_CLASSPATH") | ||
| val submitJavaOpts = sys.env.get("SPARK_SUBMIT_OPTS") | ||
|
|
||
| assume(runner != null, "RUNNER must be set") | ||
| assume(classpath != null, "CLASSPATH must be set") | ||
| assume(javaOpts != null, "JAVA_OPTS must be set") | ||
| assume(defaultDriverMemory != null, "OUR_JAVA_MEM must be set") | ||
| assume(deployMode == "client", "SPARK_SUBMIT_DEPLOY_MODE must be \"client\"!") | ||
| assume(propertiesFile != null, "SPARK_SUBMIT_PROPERTIES_FILE must be set") | ||
| assume(bootstrapDriver != null, "SPARK_SUBMIT_BOOTSTRAP_DRIVER must be set") | ||
|
|
||
| // Parse the properties file for the equivalent spark.driver.* configs | ||
| val properties = SparkSubmitArguments.getPropertiesFromFile(new File(propertiesFile)).toMap | ||
| val confDriverMemory = properties.get("spark.driver.memory") | ||
| val confLibraryPath = properties.get("spark.driver.extraLibraryPath") | ||
| val confClasspath = properties.get("spark.driver.extraClassPath") | ||
| val confJavaOpts = properties.get("spark.driver.extraJavaOptions") | ||
|
|
||
| // Favor Spark submit arguments over the equivalent configs in the properties file. | ||
| // Note that we do not actually use the Spark submit values for library path, classpath, | ||
| // and Java opts here, because we have already captured them in Bash. | ||
|
|
||
| val newDriverMemory = submitDriverMemory | ||
| .orElse(confDriverMemory) | ||
| .getOrElse(defaultDriverMemory) | ||
|
|
||
| val newLibraryPath = | ||
| if (submitLibraryPath.isDefined) { | ||
| // SPARK_SUBMIT_LIBRARY_PATH is already captured in JAVA_OPTS | ||
| "" | ||
| } else { | ||
| confLibraryPath.map("-Djava.library.path=" + _).getOrElse("") | ||
| } | ||
|
|
||
| val newClasspath = | ||
| if (submitClasspath.isDefined) { | ||
| // SPARK_SUBMIT_CLASSPATH is already captured in CLASSPATH | ||
| classpath | ||
| } else { | ||
| classpath + confClasspath.map(sys.props("path.separator") + _).getOrElse("") | ||
| } | ||
|
|
||
| val newJavaOpts = | ||
| if (submitJavaOpts.isDefined) { | ||
| // SPARK_SUBMIT_OPTS is already captured in JAVA_OPTS | ||
| javaOpts | ||
| } else { | ||
| javaOpts + confJavaOpts.map(" " + _).getOrElse("") | ||
| } | ||
|
|
||
| val filteredJavaOpts = Utils.splitCommandString(newJavaOpts) | ||
| .filterNot(_.startsWith("-Xms")) | ||
| .filterNot(_.startsWith("-Xmx")) | ||
|
|
||
| // Build up command | ||
| val command: Seq[String] = | ||
| Seq(runner) ++ | ||
| Seq("-cp", newClasspath) ++ | ||
| Seq(newLibraryPath) ++ | ||
| filteredJavaOpts ++ | ||
| Seq(s"-Xms$newDriverMemory", s"-Xmx$newDriverMemory") ++ | ||
| Seq("org.apache.spark.deploy.SparkSubmit") ++ | ||
| submitArgs | ||
|
|
||
| // Print the launch command. This follows closely the format used in `bin/spark-class`. | ||
| if (sys.env.contains("SPARK_PRINT_LAUNCH_COMMAND")) { | ||
| System.err.print("Spark Command: ") | ||
| System.err.println(command.mkString(" ")) | ||
| System.err.println("========================================\n") | ||
| } | ||
|
|
||
| // Start the driver JVM | ||
| val filteredCommand = command.filter(_.nonEmpty) | ||
| val builder = new ProcessBuilder(filteredCommand) | ||
| val process = builder.start() | ||
|
|
||
| // Redirect stdin, stdout, and stderr to/from the child JVM | ||
| val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin") | ||
| val stdoutThread = new RedirectThread(process.getInputStream, System.out, "redirect stdout") | ||
| val stderrThread = new RedirectThread(process.getErrorStream, System.err, "redirect stderr") | ||
| stdinThread.start() | ||
| stdoutThread.start() | ||
| stderrThread.start() | ||
|
|
||
| // Terminate on broken pipe, which signals that the parent process has exited. This is | ||
| // important for the PySpark shell, where Spark submit itself is a python subprocess. | ||
| stdinThread.join() | ||
| process.destroy() | ||
| } | ||
|
|
||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
below we should change this to advise people to set
spark.driver.memoryinstead ofSPARK_DRIVER_MEMORY. Also we should remove the references inspark-env.sh.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok