Skip to content
77 changes: 17 additions & 60 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -324,55 +324,20 @@ object SparkSubmit {
// Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
args.mainClass = "org.apache.spark.deploy.PythonRunner"
args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs
args.files = mergeFileLists(args.files, args.primaryResource)
if (clusterManager != YARN) {
// The YARN backend distributes the primary file differently, so don't merge it.
args.files = mergeFileLists(args.files, args.primaryResource)
}
}
if (clusterManager != YARN) {
// The YARN backend handles python files differently, so don't merge the lists.
args.files = mergeFileLists(args.files, args.pyFiles)
}
args.files = mergeFileLists(args.files, args.pyFiles)
if (args.pyFiles != null) {
sysProps("spark.submit.pyFiles") = args.pyFiles
}
}

// In yarn mode for a python app, add pyspark archives to files
// that can be distributed with the job
if (args.isPython && clusterManager == YARN) {
var pyArchives: String = null
val pyArchivesEnvOpt = sys.env.get("PYSPARK_ARCHIVES_PATH")
if (pyArchivesEnvOpt.isDefined) {
pyArchives = pyArchivesEnvOpt.get
} else {
if (!sys.env.contains("SPARK_HOME")) {
printErrorAndExit("SPARK_HOME does not exist for python application in yarn mode.")
}
val pythonPath = new ArrayBuffer[String]
for (sparkHome <- sys.env.get("SPARK_HOME")) {
val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator)
val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
if (!pyArchivesFile.exists()) {
printErrorAndExit("pyspark.zip does not exist for python application in yarn mode.")
}
val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip")
if (!py4jFile.exists()) {
printErrorAndExit("py4j-0.8.2.1-src.zip does not exist for python application " +
"in yarn mode.")
}
pythonPath += pyArchivesFile.getAbsolutePath()
pythonPath += py4jFile.getAbsolutePath()
}
pyArchives = pythonPath.mkString(",")
}

pyArchives = pyArchives.split(",").map { localPath =>
val localURI = Utils.resolveURI(localPath)
if (localURI.getScheme != "local") {
args.files = mergeFileLists(args.files, localURI.toString)
new Path(localPath).getName
} else {
localURI.getPath
}
}.mkString(File.pathSeparator)
sysProps("spark.submit.pyArchives") = pyArchives
}

// If we're running a R app, set the main class to our specific R runner
if (args.isR && deployMode == CLIENT) {
if (args.primaryResource == SPARKR_SHELL) {
Expand All @@ -386,19 +351,10 @@ object SparkSubmit {
}
}

if (isYarnCluster) {
// In yarn-cluster mode for a python app, add primary resource and pyFiles to files
// that can be distributed with the job
if (args.isPython) {
args.files = mergeFileLists(args.files, args.primaryResource)
args.files = mergeFileLists(args.files, args.pyFiles)
}

if (isYarnCluster && args.isR) {
// In yarn-cluster mode for a R app, add primary resource to files
// that can be distributed with the job
if (args.isR) {
args.files = mergeFileLists(args.files, args.primaryResource)
}
args.files = mergeFileLists(args.files, args.primaryResource)
}

// Special flag to avoid deprecation warnings at the client
Expand Down Expand Up @@ -515,17 +471,18 @@ object SparkSubmit {
}
}

// Let YARN know it's a pyspark app, so it distributes needed libraries.
if (clusterManager == YARN && args.isPython) {
sysProps.put("spark.yarn.isPython", "true")
}

// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
if (args.isPython) {
val mainPyFile = new Path(args.primaryResource).getName
childArgs += ("--primary-py-file", mainPyFile)
childArgs += ("--primary-py-file", args.primaryResource)
if (args.pyFiles != null) {
// These files will be distributed to each machine's working directory, so strip the
// path prefix
val pyFilesNames = args.pyFiles.split(",").map(p => (new Path(p)).getName).mkString(",")
childArgs += ("--py-files", pyFilesNames)
childArgs += ("--py-files", args.pyFiles)
}
childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
} else if (args.isR) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.spark.rpc._
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.SparkException
import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
Expand All @@ -46,6 +46,14 @@ private[spark] class ApplicationMaster(
client: YarnRMClient)
extends Logging {

// Load the properties file with the Spark configuration and set entries as system properties,
// so that user code run inside the AM also has access to them.
if (args.propertiesFile != null) {
Utils.getPropertiesFromFile(args.propertiesFile).foreach { case (k, v) =>
sys.props(k) = v
}
}

// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
// optimal as more containers are available. Might need to handle this better.

Expand Down Expand Up @@ -490,9 +498,11 @@ private[spark] class ApplicationMaster(
new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
}

var userArgs = args.userArgs
if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
System.setProperty("spark.submit.pyFiles",
PythonRunner.formatPaths(args.pyFiles).mkString(","))
// When running pyspark, the app is run using PythonRunner. The second argument is the list
// of files to add to PYTHONPATH, which Client.scala already handles, so it's empty.
userArgs = Seq(args.primaryPyFile, "") ++ userArgs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the empty string?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because PythonRunner needs that argument to be present.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add comment

}
if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) {
// TODO(davies): add R dependencies here
Expand All @@ -503,9 +513,7 @@ private[spark] class ApplicationMaster(
val userThread = new Thread {
override def run() {
try {
val mainArgs = new Array[String](args.userArgs.size)
args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
mainMethod.invoke(null, mainArgs)
mainMethod.invoke(null, userArgs.toArray)
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
logDebug("Done running users class")
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ class ApplicationMasterArguments(val args: Array[String]) {
var userClass: String = null
var primaryPyFile: String = null
var primaryRFile: String = null
var pyFiles: String = null
var userArgs: Seq[String] = Seq[String]()
var userArgs: Seq[String] = Nil
var executorMemory = 1024
var executorCores = 1
var numExecutors = DEFAULT_NUMBER_EXECUTORS
var propertiesFile: String = null

parseArgs(args.toList)

Expand Down Expand Up @@ -59,10 +59,6 @@ class ApplicationMasterArguments(val args: Array[String]) {
primaryRFile = value
args = tail

case ("--py-files") :: value :: tail =>
pyFiles = value
args = tail

case ("--args" | "--arg") :: value :: tail =>
userArgsBuffer += value
args = tail
Expand All @@ -79,6 +75,10 @@ class ApplicationMasterArguments(val args: Array[String]) {
executorCores = value
args = tail

case ("--properties-file") :: value :: tail =>
propertiesFile = value
args = tail

case _ =>
printUsageAndExit(1, args)
}
Expand Down
Loading