-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-2849] Handle driver configs separately in client mode #1845
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
250cb95
a2ab1b0
0025474
63ed2e9
75ee6b4
8843562
98dd8e3
130f295
4edcaa8
e5cfb46
4ec22a1
ef12f74
fa2136e
dec2343
a4df3c4
de765c9
8e552b7
c13a2cb
c854859
1cdc6b1
45a1eb9
aabfc7e
a992ae2
c7b9926
5d8f8c4
e793e5f
c2273fc
b3c4cd5
4ae24c3
8d26a5c
2732ac0
aeb79c7
8d4614c
56ac247
bd0d468
be99eb3
371cac4
fa11ef8
7396be2
7a4190a
c886568
0effa1e
a396eda
c37e08d
3a8235d
7d94a8d
b71f52b
c84f5c8
158f813
a91ea19
1ea6bbe
d6488f9
19464ad
8867a09
9ba37e2
a78cb26
d0f20db
9a778f6
51aeb01
ff34728
08fd788
24dba60
bed4bdf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
The eventual goal of this is to shift the current complex BASH logic to Scala. The new class should be invoked from `spark-class`. For simplicity, this currently does not handle SPARK-2914. It is likely that this will be dealt with in a future PR instead.
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,118 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.deploy | ||
|
|
||
| import java.io.File | ||
|
|
||
| import scala.collection.JavaConversions._ | ||
|
|
||
| import org.apache.spark.util.{RedirectThread, Utils} | ||
|
|
||
| /** | ||
| * Wrapper of `bin/spark-class` that prepares the launch environment of the child JVM properly. | ||
| */ | ||
| object SparkClassLauncher { | ||
|
|
||
| /** | ||
| * Launch a Spark class with the given class paths, library paths, java options and memory. | ||
| * If we are launching an application through Spark submit in client mode, we must also | ||
| * take into account special `spark.driver.*` properties needed to start the driver JVM. | ||
| */ | ||
| def main(args: Array[String]): Unit = { | ||
| if (args.size < 8) { | ||
| System.err.println( | ||
| """ | ||
| |Usage: org.apache.spark.deploy.SparkClassLauncher | ||
| | | ||
| | [properties file] - path to your Spark properties file | ||
| | [java runner] - command to launch the child JVM | ||
| | [java class paths] - class paths to pass to the child JVM | ||
| | [java library paths] - library paths to pass to the child JVM | ||
| | [java opts] - java options to pass to the child JVM | ||
| | [java memory] - memory used to launch the child JVM | ||
| | [client mode] - whether the child JVM will run the Spark driver | ||
| | [main class] - main class to run in the child JVM | ||
| | <main args> - arguments passed to this main class | ||
| | | ||
| |Example: | ||
| | org.apache.spark.deploy.SparkClassLauncher.SparkClassLauncher | ||
| | conf/spark-defaults.conf java /classpath1:/classpath2 /librarypath1:/librarypath2 | ||
| | "-XX:-UseParallelGC -Dsome=property" 5g true org.apache.spark.deploy.SparkSubmit | ||
| | --master local --class org.apache.spark.examples.SparkPi 10 | ||
| """.stripMargin) | ||
| System.exit(1) | ||
| } | ||
| val propertiesFile = args(0) | ||
| val javaRunner = args(1) | ||
| val clClassPaths = args(2) | ||
| val clLibraryPaths = args(3) | ||
| val clJavaOpts = args(4) | ||
| val clJavaMemory = args(5) | ||
| val clientMode = args(6) == "true" | ||
|
||
| val mainClass = args(7) | ||
|
|
||
| // In client deploy mode, parse the properties file for certain `spark.driver.*` configs. | ||
| // These configs encode java options, class paths, and library paths needed to launch the JVM. | ||
| val properties = | ||
| if (clientMode) { | ||
| SparkSubmitArguments.getPropertiesFromFile(new File(propertiesFile)).toMap | ||
| } else { | ||
| Map[String, String]() | ||
| } | ||
| val confDriverMemory = properties.get("spark.driver.memory") | ||
| val confClassPaths = properties.get("spark.driver.extraClassPath") | ||
| val confLibraryPaths = properties.get("spark.driver.extraLibraryPath") | ||
| val confJavaOpts = properties.get("spark.driver.extraJavaOptions") | ||
|
|
||
| // Merge relevant command line values with the config equivalents, if any | ||
| val javaMemory = | ||
| if (clientMode) { | ||
| confDriverMemory.getOrElse(clJavaMemory) | ||
| } else { | ||
| clJavaMemory | ||
| } | ||
| val pathSeparator = sys.props("path.separator") | ||
| val classPaths = clClassPaths + confClassPaths.map(pathSeparator + _).getOrElse("") | ||
| val libraryPaths = clLibraryPaths + confLibraryPaths.map(pathSeparator + _).getOrElse("") | ||
|
||
| val javaOpts = Utils.splitCommandString(clJavaOpts) ++ | ||
| confJavaOpts.map(Utils.splitCommandString).getOrElse(Seq.empty) | ||
| val filteredJavaOpts = javaOpts.filterNot { opt => | ||
| opt.startsWith("-Djava.library.path") || opt.startsWith("-Xms") || opt.startsWith("-Xmx") | ||
|
||
| } | ||
|
|
||
| // Build up command | ||
| val command: Seq[String] = | ||
| Seq(javaRunner) ++ | ||
| { if (classPaths.nonEmpty) Seq("-cp", classPaths) else Seq.empty } ++ | ||
| { if (libraryPaths.nonEmpty) Seq(s"-Djava.library.path=$libraryPaths") else Seq.empty } ++ | ||
| filteredJavaOpts ++ | ||
| Seq(s"-Xms$javaMemory", s"-Xmx$javaMemory") ++ | ||
| Seq(mainClass) ++ | ||
| args.slice(8, args.size) | ||
|
|
||
| command.foreach(println) | ||
|
|
||
| val builder = new ProcessBuilder(command) | ||
| val process = builder.start() | ||
| new RedirectThread(System.in, process.getOutputStream, "redirect stdin").start() | ||
| new RedirectThread(process.getInputStream, System.out, "redirect stdout").start() | ||
| new RedirectThread(process.getErrorStream, System.err, "redirect stderr").start() | ||
| System.exit(process.waitFor()) | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1421,3 +1421,24 @@ private[spark] object Utils extends Logging { | |
| } | ||
|
|
||
| } | ||
|
|
||
| /** | ||
| * A utility class to redirect the child process's stdout or stderr. | ||
| */ | ||
| private[spark] class RedirectThread(in: InputStream, out: OutputStream, name: String) | ||
| extends Thread(name) { | ||
|
|
||
| setDaemon(true) | ||
| override def run() { | ||
| scala.util.control.Exception.ignoring(classOf[IOException]) { | ||
| // FIXME: We copy the stream on the level of bytes to avoid encoding problems. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you understand this comment? I don't.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shrug
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm guessing the original author was reading Strings() before, which requires that you define how to interpret the bytes into strings (encodings like ASCII/UTF8, etc). There were probably some byte sequences coming through that weren't characters in UTF8 so exceptions were being thrown. Since this is just reading from an InputStream and writing to an OutputStream, copying bytes would be much more efficient than reading bytes, interpreting as characters, converting back to bytes, then sending those out the other side. Conveniently, Apache has an |
||
| val buf = new Array[Byte](1024) | ||
| var len = in.read(buf) | ||
| while (len != -1) { | ||
| out.write(buf, 0, len) | ||
| out.flush() | ||
| len = in.read(buf) | ||
| } | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To keep it simpler for now, rather than having a bunch of command line arguments, why not just directly read the environment variables set in
spark-class? Then you could just have this script take the main class and the arguments and you could just directly readRUNNER,PROPERTEIS_FILE,CLASSPATH,SPARK_SUBMIT_LIBRARY_PATH,JAVA_OPTS, andOUR_JAVA_MEM. This is one fewer levels of interpretation/parsing to worry about for now and would overall make this patch smaller.