-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-1808] Route bin/pyspark through Spark submit #799
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 14 commits
a371d26
f04aaa4
afe47bf
fe4c8a7
6fba412
a823661
05879fa
06eb138
b7ba0d8
7eebda8
456d844
1866f85
c8cb3bf
01066fa
bf37e36
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,7 +28,7 @@ esac | |
| # Enter posix mode for bash | ||
| set -o posix | ||
|
|
||
| if [[ "$@" == *--help* ]]; then | ||
| if [[ "$@" = *--help ]] || [[ "$@" = *--h ]]; then | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the second condition for? It doesn't seem to match |
||
| echo "Usage: ./bin/spark-shell [options]" | ||
| ./bin/spark-submit --help 2>&1 | grep -v Usage 1>&2 | ||
| exit 0 | ||
|
|
@@ -46,11 +46,11 @@ function main(){ | |
| # (see https://github.com/sbt/sbt/issues/562). | ||
| stty -icanon min 1 -echo > /dev/null 2>&1 | ||
| export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix" | ||
| $FWDIR/bin/spark-submit spark-internal "$@" --class org.apache.spark.repl.Main | ||
| $FWDIR/bin/spark-submit spark-shell "$@" --class org.apache.spark.repl.Main | ||
| stty icanon echo > /dev/null 2>&1 | ||
| else | ||
| export SPARK_SUBMIT_OPTS | ||
| $FWDIR/bin/spark-submit spark-internal "$@" --class org.apache.spark.repl.Main | ||
| $FWDIR/bin/spark-submit spark-shell "$@" --class org.apache.spark.repl.Main | ||
| fi | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,10 +41,10 @@ object SparkSubmit { | |
| private var clusterManager: Int = LOCAL | ||
|
|
||
| /** | ||
| * A special jar name that indicates the class being run is inside of Spark itself, | ||
| * and therefore no user jar is needed. | ||
| * Special primary resource names that represent shells rather than application jars. | ||
| */ | ||
| private val RESERVED_JAR_NAME = "spark-internal" | ||
| private val SPARK_SHELL = "spark-shell" | ||
| private val PYSPARK_SHELL = "pyspark-shell" | ||
|
|
||
| def main(args: Array[String]) { | ||
| val appArgs = new SparkSubmitArguments(args) | ||
|
|
@@ -71,8 +71,8 @@ object SparkSubmit { | |
| * entries for the child, a list of system properties, a list of env vars | ||
| * and the main class for the child | ||
| */ | ||
| private[spark] def createLaunchEnv(args: SparkSubmitArguments): (ArrayBuffer[String], | ||
| ArrayBuffer[String], Map[String, String], String) = { | ||
| private[spark] def createLaunchEnv(args: SparkSubmitArguments) | ||
| : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = { | ||
| if (args.master.startsWith("local")) { | ||
| clusterManager = LOCAL | ||
| } else if (args.master.startsWith("yarn")) { | ||
|
|
@@ -121,24 +121,30 @@ object SparkSubmit { | |
| printErrorAndExit("Cannot currently run driver on the cluster in Mesos") | ||
| } | ||
|
|
||
| // If we're running a Python app, set the Java class to run to be our PythonRunner, add | ||
| // Python files to deployment list, and pass the main file and Python path to PythonRunner | ||
| // If we're running a python app, set the main class to our specific python runner | ||
| if (isPython) { | ||
| if (deployOnCluster) { | ||
| printErrorAndExit("Cannot currently run Python driver programs on cluster") | ||
| } | ||
| args.mainClass = "org.apache.spark.deploy.PythonRunner" | ||
| args.files = mergeFileLists(args.files, args.pyFiles, args.primaryResource) | ||
| if (args.primaryResource == PYSPARK_SHELL) { | ||
| args.mainClass = "py4j.GatewayServer" | ||
| args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0") | ||
| } else { | ||
| // If a python file is provided, add it to the child arguments and list of files to deploy. | ||
| // Usage: PythonAppRunner <main python file> <extra python files> [app arguments] | ||
| args.mainClass = "org.apache.spark.deploy.PythonRunner" | ||
| args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs | ||
| args.files = mergeFileLists(args.files, args.primaryResource) | ||
| } | ||
| val pyFiles = Option(args.pyFiles).getOrElse("") | ||
| args.childArgs = ArrayBuffer(args.primaryResource, pyFiles) ++ args.childArgs | ||
| args.primaryResource = RESERVED_JAR_NAME | ||
| args.files = mergeFileLists(args.files, pyFiles) | ||
| sysProps("spark.submit.pyFiles") = pyFiles | ||
| } | ||
|
|
||
| // If we're deploying into YARN, use yarn.Client as a wrapper around the user class | ||
| if (!deployOnCluster) { | ||
| childMainClass = args.mainClass | ||
| if (args.primaryResource != RESERVED_JAR_NAME) { | ||
| if (isUserJar(args.primaryResource)) { | ||
| childClasspath += args.primaryResource | ||
| } | ||
| } else if (clusterManager == YARN) { | ||
|
|
@@ -219,7 +225,7 @@ object SparkSubmit { | |
| // For python files, the primary resource is already distributed as a regular file | ||
| if (!isYarnCluster && !isPython) { | ||
| var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq()) | ||
| if (args.primaryResource != RESERVED_JAR_NAME) { | ||
| if (isUserJar(args.primaryResource)) { | ||
| jars = jars ++ Seq(args.primaryResource) | ||
| } | ||
| sysProps.put("spark.jars", jars.mkString(",")) | ||
|
|
@@ -293,7 +299,7 @@ object SparkSubmit { | |
| } | ||
|
|
||
| private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) { | ||
| val localJarFile = new File(new URI(localJar).getPath()) | ||
| val localJarFile = new File(new URI(localJar).getPath) | ||
| if (!localJarFile.exists()) { | ||
| printWarning(s"Jar $localJar does not exist, skipping.") | ||
| } | ||
|
|
@@ -302,15 +308,37 @@ object SparkSubmit { | |
| loader.addURL(url) | ||
| } | ||
|
|
||
| /** | ||
| * Return whether the given primary resource represents a user jar. | ||
| */ | ||
| private def isUserJar(primaryResource: String): Boolean = { | ||
| !isShell(primaryResource) && !isPython(primaryResource) | ||
| } | ||
|
|
||
| /** | ||
| * Return whether the given primary resource represents a shell. | ||
| */ | ||
| private def isShell(primaryResource: String): Boolean = { | ||
| primaryResource == SPARK_SHELL || primaryResource == PYSPARK_SHELL | ||
| } | ||
|
|
||
| /** | ||
| * Return whether the given primary resource requires running python. | ||
| */ | ||
| private[spark] def isPython(primaryResource: String): Boolean = { | ||
| primaryResource.endsWith(".py") || primaryResource == PYSPARK_SHELL | ||
| } | ||
|
|
||
| /** | ||
| * Merge a sequence of comma-separated file lists, some of which may be null to indicate | ||
| * no files, into a single comma-separated string. | ||
| */ | ||
| private[spark] def mergeFileLists(lists: String*): String = { | ||
| val merged = lists.filter(_ != null) | ||
| .flatMap(_.split(",")) | ||
| .mkString(",") | ||
| if (merged == "") null else merged | ||
| lists | ||
| .filter(_ != null) | ||
| .filter(_ != "") | ||
| .flatMap(_.split(",")) | ||
| .mkString(",") | ||
|
||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't seem to match
-hfor me... not sure if that was the intention of the second condition.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think maybe you want this to say
One thing is this will only detect if
-hor--helpis the _last_argument, but I think anything other than that is pretty tricky.