Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -356,16 +356,19 @@ object SparkSubmit {
args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs
if (clusterManager != YARN) {
// The YARN backend distributes the primary file differently, so don't merge it.
args.files = mergeFileLists(args.files, args.primaryResource)
args.files = mergeFileLists(args.files, args.primaryResource, args.pyRequirements)
}
}
if (clusterManager != YARN) {
// The YARN backend handles python files differently, so don't merge the lists.
args.files = mergeFileLists(args.files, args.pyFiles)
args.files = mergeFileLists(args.files, args.pyFiles, args.pyRequirements)
}
if (args.pyFiles != null) {
sysProps("spark.submit.pyFiles") = args.pyFiles
}
if (args.pyRequirements != null) {
sysProps("spark.submit.pyRequirements") = args.pyRequirements
}
}

// In YARN mode for an R app, add the SparkR package archive and the R package
Expand Down Expand Up @@ -542,6 +545,10 @@ object SparkSubmit {
if (args.pyFiles != null) {
sysProps("spark.submit.pyFiles") = args.pyFiles
}

if (args.pyRequirements != null) {
sysProps("spark.submit.pyRequirements") = args.pyRequirements
}
}

// assure a keytab is available from any place in a JVM
Expand Down Expand Up @@ -593,6 +600,9 @@ object SparkSubmit {
if (args.pyFiles != null) {
sysProps("spark.submit.pyFiles") = args.pyFiles
}
if (args.pyRequirements != null) {
sysProps("spark.submit.pyRequirements") = args.pyRequirements
}
} else {
childArgs += (args.primaryResource, args.mainClass)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
var verbose: Boolean = false
var isPython: Boolean = false
var pyFiles: String = null
var pyRequirements: String = null
var isR: Boolean = false
var action: SparkSubmitAction = null
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
Expand Down Expand Up @@ -304,6 +305,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| numExecutors $numExecutors
| files $files
| pyFiles $pyFiles
| pyRequiremenst $pyRequirements
| archives $archives
| mainClass $mainClass
| primaryResource $primaryResource
Expand Down Expand Up @@ -395,6 +397,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
case PY_FILES =>
pyFiles = Utils.resolveURIs(value)

case PY_REQUIREMENTS =>
pyRequirements = Utils.resolveURIs(value)

case ARCHIVES =>
archives = Utils.resolveURIs(value)

Expand Down Expand Up @@ -505,6 +510,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| search for the maven coordinates given with --packages.
| --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place
| on the PYTHONPATH for Python apps.
| --py-requirements REQS Pip requirements file with dependencies that will be fetched
| and placed on PYTHONPATH
| --files FILES Comma-separated list of files to be placed in the working
| directory of each executor.
|
Expand Down
31 changes: 31 additions & 0 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,37 @@ class SparkSubmitSuite
appArgs.executorMemory should be ("2.3g")
}
}

test("py-requirements will be distributed") {
val pyReqs = "requirements.txt"

val clArgsYarn = Seq(
"--master", "yarn",
"--deploy-mode", "cluster",
"--py-requirements", pyReqs,
"mister.py"
)

val appArgsYarn = new SparkSubmitArguments(clArgsYarn)
val sysPropsYarn = SparkSubmit.prepareSubmitEnvironment(appArgsYarn)._3
appArgsYarn.pyRequirements should be (Utils.resolveURIs(pyReqs))
sysPropsYarn("spark.yarn.dist.files") should be (
PythonRunner.formatPaths(Utils.resolveURIs(pyReqs)).mkString(","))
sysPropsYarn("spark.submit.pyRequirements") should be (
PythonRunner.formatPaths(Utils.resolveURIs(pyReqs)).mkString(","))

val clArgs = Seq(
"--master", "local",
"--py-requirements", pyReqs,
"mister.py"
)

val appArgs = new SparkSubmitArguments(clArgs)
val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs)._3
appArgs.pyRequirements should be (Utils.resolveURIs(pyReqs))
sysProps("spark.submit.pyRequirements") should be (
PythonRunner.formatPaths(Utils.resolveURIs(pyReqs)).mkString(","))
}
// scalastyle:on println

// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class SparkSubmitOptionParser {
protected final String PROPERTIES_FILE = "--properties-file";
protected final String PROXY_USER = "--proxy-user";
protected final String PY_FILES = "--py-files";
protected final String PY_REQUIREMENTS = "--py-requirements";
protected final String REPOSITORIES = "--repositories";
protected final String STATUS = "--status";
protected final String TOTAL_EXECUTOR_CORES = "--total-executor-cores";
Expand Down
6 changes: 6 additions & 0 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,12 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment,
self._python_includes.append(filename)
sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename))

# Apply requirements file set by spark-submit.
for path in self._conf.get("spark.submit.pyRequirements", "").split(","):
if path != "":
(dirname, filename) = os.path.split(path)
self.addRequirementsFile(os.path.join(SparkFiles.getRootDirectory(), filename))

# Create a temporary directory inside spark.local.dir:
local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf())
self._temp_dir = \
Expand Down