-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-1808] Route bin/pyspark through Spark submit #799
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
a371d26
f04aaa4
afe47bf
fe4c8a7
6fba412
a823661
05879fa
06eb138
b7ba0d8
7eebda8
456d844
1866f85
c8cb3bf
01066fa
bf37e36
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,7 +26,7 @@ export SPARK_HOME="$FWDIR" | |
| SCALA_VERSION=2.10 | ||
|
|
||
| if [[ "$@" == *--help* ]]; then | ||
| echo "Usage: ./bin/pyspark [python file] [options]" | ||
| echo "Usage: ./bin/pyspark [options]" | ||
| ./bin/spark-submit --help 2>&1 | grep -v Usage 1>&2 | ||
| exit 0 | ||
| fi | ||
|
|
@@ -57,15 +57,31 @@ export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH | |
| # Load the PySpark shell.py script when ./pyspark is used interactively: | ||
| export OLD_PYTHONSTARTUP=$PYTHONSTARTUP | ||
| export PYTHONSTARTUP=$FWDIR/python/pyspark/shell.py | ||
| export PYSPARK_SUBMIT_ARGS="$@" | ||
|
|
||
| # If IPython options are specified, assume user wants to run IPython | ||
| if [ -n "$IPYTHON_OPTS" ]; then | ||
| IPYTHON=1 | ||
| fi | ||
|
|
||
| # If a python file is provided, directly run spark-submit | ||
| # Build up arguments list manually to preserve quotes. We export Spark submit arguments as an | ||
| # environment variable because shell.py must run as a PYTHONSTARTUP script, which does not take | ||
| # in arguments. This is required mainly for IPython notebooks. | ||
|
|
||
| PYSPARK_SUBMIT_ARGS="" | ||
| whitespace="[[:space:]]" | ||
| for i in "$@"; do | ||
| if [[ $i =~ $whitespace ]]; then | ||
| i=\"$i\" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this work if the argument contains quote characters?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually I tried it and it works
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It won't work if it contains a backslashed quote character, like Because bash doesn't preserve backslashes.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (If you try it with the latest commit, |
||
| fi | ||
| PYSPARK_SUBMIT_ARGS="$PYSPARK_SUBMIT_ARGS $i" | ||
| done | ||
| export PYSPARK_SUBMIT_ARGS | ||
|
|
||
| # If a python file is provided, directly run spark-submit. | ||
| if [[ "$1" =~ \.py$ ]]; then | ||
| exec $FWDIR/bin/spark-submit $PYSPARK_SUBMIT_ARGS | ||
| echo -e "\nWARNING: Running python applications through ./bin/pyspark is deprecated as of Spark 1.0." | ||
| echo -e "Use ./bin/spark-submit <python file>\n" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These should go to stderr |
||
| exec $FWDIR/bin/spark-submit "$@" | ||
| else | ||
| # Only use ipython if no command line arguments were provided [SPARK-1134] | ||
| if [[ "$IPYTHON" = "1" ]]; then | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -134,10 +134,10 @@ object SparkSubmit { | |
| // Usage: PythonAppRunner <main python file> <extra python files> [app arguments] | ||
| args.mainClass = "org.apache.spark.deploy.PythonRunner" | ||
| args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs | ||
| args.files = Utils.mergeFileLists(args.files, args.primaryResource) | ||
| args.files = mergeFileLists(args.files, args.primaryResource) | ||
| } | ||
| val pyFiles = Option(args.pyFiles).getOrElse("") | ||
| args.files = Utils.mergeFileLists(args.files, pyFiles) | ||
| args.files = mergeFileLists(args.files, pyFiles) | ||
| sysProps("spark.submit.pyFiles") = pyFiles | ||
| } | ||
|
|
||
|
|
@@ -300,7 +300,7 @@ object SparkSubmit { | |
|
|
||
| private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) { | ||
| val localJarFile = new File(new URI(localJar).getPath) | ||
| if (!localJarFile.exists) { | ||
| if (!localJarFile.exists()) { | ||
| printWarning(s"Jar $localJar does not exist, skipping.") | ||
| } | ||
|
|
||
|
|
@@ -328,6 +328,18 @@ object SparkSubmit { | |
| private[spark] def isPython(primaryResource: String): Boolean = { | ||
| primaryResource.endsWith(".py") || primaryResource == PYSPARK_SHELL | ||
| } | ||
|
|
||
| /** | ||
| * Merge a sequence of comma-separated file lists, some of which may be null to indicate | ||
| * no files, into a single comma-separated string. | ||
| */ | ||
| private[spark] def mergeFileLists(lists: String*): String = { | ||
| lists | ||
| .filter(_ != null) | ||
| .filter(_ != "") | ||
| .flatMap(_.split(",")) | ||
| .mkString(",") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will now set files to the empty string instead of null even if there are no files present. To be conservative, would it make sense to change it back? I think there are null checks on this downstream when the option assinger is used. Could this change the behavior in some cases, e.g. by settting |
||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,7 +23,6 @@ | |
| from threading import Thread | ||
| from py4j.java_gateway import java_import, JavaGateway, GatewayClient | ||
|
|
||
|
|
||
| def launch_gateway(): | ||
| SPARK_HOME = os.environ["SPARK_HOME"] | ||
|
|
||
|
|
@@ -36,10 +35,7 @@ def launch_gateway(): | |
| on_windows = platform.system() == "Windows" | ||
| script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit" | ||
| submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS") | ||
| if submit_args is not None: | ||
| submit_args = submit_args.split(" ") | ||
| else: | ||
| submit_args = [] | ||
| submit_args = split_preserve_quotes(submit_args) | ||
| command = [os.path.join(SPARK_HOME, script), "pyspark-shell"] + submit_args | ||
| if not on_windows: | ||
| # Don't send ctrl-c / SIGINT to the Java gateway: | ||
|
|
@@ -80,3 +76,30 @@ def run(self): | |
| java_import(gateway.jvm, "scala.Tuple2") | ||
|
|
||
| return gateway | ||
|
|
||
| def split_preserve_quotes(args): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you have some tests for this? E.g. does it work if some of the arguments contain quotes?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually it probably doesn't work if you have escaped quotes (e.g.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just found out about a nice python library that does this: |
||
| """ | ||
| Given a string of space-delimited arguments with quotes, | ||
| split it into a list while preserving the quote boundaries. | ||
| """ | ||
| if args is None: | ||
| return [] | ||
| split_list = [] | ||
| quoted_string = "" | ||
| wait_for_quote = False | ||
| for arg in args.split(" "): | ||
| if not wait_for_quote: | ||
| if arg.startswith("\""): | ||
| wait_for_quote = True | ||
| quoted_string = arg | ||
| else: | ||
| split_list.append(arg) | ||
| else: | ||
| quoted_string += " " + arg | ||
| if quoted_string.endswith("\""): | ||
| # Strip quotes | ||
| quoted_string = quoted_string[1:-1] | ||
| split_list.append(quoted_string) | ||
| quoted_string = "" | ||
| wait_for_quote = False | ||
| return split_list | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this recognize
-halso