1+ /*
2+ * Licensed to the Apache Software Foundation (ASF) under one or more
3+ * contributor license agreements. See the NOTICE file distributed with
4+ * this work for additional information regarding copyright ownership.
5+ * The ASF licenses this file to You under the Apache License, Version 2.0
6+ * (the "License"); you may not use this file except in compliance with
7+ * the License. You may obtain a copy of the License at
8+ *
9+ * http://www.apache.org/licenses/LICENSE-2.0
10+ *
11+ * Unless required by applicable law or agreed to in writing, software
12+ * distributed under the License is distributed on an "AS IS" BASIS,
13+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+ * See the License for the specific language governing permissions and
15+ * limitations under the License.
16+ */
17+
18+ package org .apache .spark .deploy
19+
20+ import java .io .File
21+ import java .net .URL
22+ import java .net .URLClassLoader
23+
24+ import org .apache .spark .executor .ExecutorURLClassLoader
25+
26+ import scala .collection .mutable .ArrayBuffer
27+ import scala .collection .mutable .HashMap
28+ import scala .collection .mutable .Map
29+
30+ object SparkSubmit {
31+ val YARN = 1
32+ val STANDALONE = 2
33+ val MESOS = 4
34+ val LOCAL = 8
35+ val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL
36+
37+ var clusterManager : Int = LOCAL
38+
39+ def main (args : Array [String ]) {
40+ val appArgs = new SparkSubmitArguments (args)
41+ val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
42+ launch(childArgs, classpath, sysProps, mainClass)
43+ }
44+
45+ /**
46+ * @return
47+ * a tuple containing the arguments for the child, a list of classpath
48+ * entries for the child, and the main class for the child
49+ */
50+ def createLaunchEnv (appArgs : SparkSubmitArguments ): (ArrayBuffer [String ],
51+ ArrayBuffer [String ], Map [String , String ], String ) = {
52+ if (appArgs.master.startsWith(" yarn" )) {
53+ clusterManager = YARN
54+ } else if (appArgs.master.startsWith(" spark" )) {
55+ clusterManager = STANDALONE
56+ } else if (appArgs.master.startsWith(" mesos" )) {
57+ clusterManager = MESOS
58+ } else if (appArgs.master.startsWith(" local" )) {
59+ clusterManager = LOCAL
60+ } else {
61+ System .err.println(" master must start with yarn, mesos, spark, or local" )
62+ System .exit(1 )
63+ }
64+
65+ // Because "yarn-standalone" and "yarn-client" encapsulate both the master
66+ // and deploy mode, we have some logic to infer the master and deploy mode
67+ // from each other if only one is specified, or exit early if they are at odds.
68+ if (appArgs.deployMode == null && appArgs.master == " yarn-standalone" ) {
69+ appArgs.deployMode = " cluster"
70+ }
71+ if (appArgs.deployMode == " cluster" && appArgs.master == " yarn-client" ) {
72+ System .err.println(" Deploy mode \" cluster\" and master \" yarn-client\" are at odds" )
73+ System .exit(1 )
74+ }
75+ if (appArgs.deployMode == " client" && appArgs.master == " yarn-standalone" ) {
76+ System .err.println(" Deploy mode \" client\" and master \" yarn-standalone\" are at odds" )
77+ System .exit(1 )
78+ }
79+ if (appArgs.deployMode == " cluster" && appArgs.master.startsWith(" yarn" )) {
80+ appArgs.master = " yarn-standalone"
81+ }
82+ if (appArgs.deployMode != " cluster" && appArgs.master.startsWith(" yarn" )) {
83+ appArgs.master = " yarn-client"
84+ }
85+
86+ val deployOnCluster = Option (appArgs.deployMode).getOrElse(" client" ) == " cluster"
87+
88+ val childClasspath = new ArrayBuffer [String ]()
89+ val childArgs = new ArrayBuffer [String ]()
90+ val sysProps = new HashMap [String , String ]()
91+ var childMainClass = " "
92+
93+ if (clusterManager == MESOS && deployOnCluster) {
94+ System .err.println(" Mesos does not support running the driver on the cluster" )
95+ System .exit(1 )
96+ }
97+
98+ if (! deployOnCluster) {
99+ childMainClass = appArgs.mainClass
100+ childClasspath += appArgs.primaryResource
101+ } else if (clusterManager == YARN ) {
102+ childMainClass = " org.apache.spark.deploy.yarn.Client"
103+ childArgs += (" --jar" , appArgs.primaryResource)
104+ childArgs += (" --class" , appArgs.mainClass)
105+ }
106+
107+ val options = List [OptionAssigner ](
108+ new OptionAssigner (appArgs.master, ALL_CLUSTER_MGRS , false , sysProp = " spark.master" ),
109+ new OptionAssigner (appArgs.driverMemory, YARN , true , clOption = " --master-memory" ),
110+ new OptionAssigner (appArgs.name, YARN , true , clOption = " --name" ),
111+ new OptionAssigner (appArgs.queue, YARN , true , clOption = " --queue" ),
112+ new OptionAssigner (appArgs.queue, YARN , false , sysProp = " spark.yarn.queue" ),
113+ new OptionAssigner (appArgs.numExecutors, YARN , true , clOption = " --num-workers" ),
114+ new OptionAssigner (appArgs.numExecutors, YARN , false , sysProp = " spark.worker.instances" ),
115+ new OptionAssigner (appArgs.executorMemory, YARN , true , clOption = " --worker-memory" ),
116+ new OptionAssigner (appArgs.executorMemory, STANDALONE | MESOS | YARN , false , sysProp = " spark.executor.memory" ),
117+ new OptionAssigner (appArgs.driverMemory, STANDALONE , true , clOption = " --memory" ),
118+ new OptionAssigner (appArgs.executorCores, YARN , true , clOption = " --worker-cores" ),
119+ new OptionAssigner (appArgs.executorCores, YARN , false , sysProp = " spark.executor.cores" ),
120+ new OptionAssigner (appArgs.driverCores, STANDALONE , true , clOption = " --cores" ),
121+ new OptionAssigner (appArgs.totalExecutorCores, STANDALONE | MESOS , true , sysProp = " spark.cores.max" ),
122+ new OptionAssigner (appArgs.files, YARN , false , sysProp = " spark.yarn.dist.files" ),
123+ new OptionAssigner (appArgs.files, YARN , true , clOption = " --files" ),
124+ new OptionAssigner (appArgs.archives, YARN , false , sysProp = " spark.yarn.dist.archives" ),
125+ new OptionAssigner (appArgs.archives, YARN , true , clOption = " --archives" ),
126+ new OptionAssigner (appArgs.moreJars, YARN , true , clOption = " --addJars" )
127+ )
128+
129+ // more jars
130+ if (appArgs.moreJars != null && ! deployOnCluster) {
131+ for (jar <- appArgs.moreJars.split(" ," )) {
132+ childClasspath += jar
133+ }
134+ }
135+
136+ for (opt <- options) {
137+ if (opt.value != null && deployOnCluster == opt.deployOnCluster &&
138+ (clusterManager & opt.clusterManager) != 0 ) {
139+ if (opt.clOption != null ) {
140+ childArgs += (opt.clOption, opt.value)
141+ } else if (opt.sysProp != null ) {
142+ sysProps.put(opt.sysProp, opt.value)
143+ }
144+ }
145+ }
146+
147+ if (deployOnCluster && clusterManager == STANDALONE ) {
148+ if (appArgs.supervise) {
149+ childArgs += " --supervise"
150+ }
151+
152+ childMainClass = " org.apache.spark.deploy.Client"
153+ childArgs += " launch"
154+ childArgs += (appArgs.master, appArgs.primaryResource, appArgs.mainClass)
155+ }
156+
157+ // args
158+ if (appArgs.childArgs != null ) {
159+ if (! deployOnCluster || clusterManager == STANDALONE ) {
160+ childArgs ++= appArgs.childArgs
161+ } else if (clusterManager == YARN ) {
162+ for (arg <- appArgs.childArgs) {
163+ childArgs += (" --args" , arg)
164+ }
165+ }
166+ }
167+
168+ (childArgs, childClasspath, sysProps, childMainClass)
169+ }
170+
171+ def launch (childArgs : ArrayBuffer [String ], childClasspath : ArrayBuffer [String ],
172+ sysProps : Map [String , String ], childMainClass : String ) {
173+ val loader = new ExecutorURLClassLoader (new Array [URL ](0 ),
174+ Thread .currentThread.getContextClassLoader)
175+ Thread .currentThread.setContextClassLoader(loader)
176+
177+ for (jar <- childClasspath) {
178+ addJarToClasspath(jar, loader)
179+ }
180+
181+ for ((key, value) <- sysProps) {
182+ System .setProperty(key, value)
183+ }
184+
185+ val mainClass = Class .forName(childMainClass, true , loader)
186+ val mainMethod = mainClass.getMethod(" main" , new Array [String ](0 ).getClass)
187+ mainMethod.invoke(null , childArgs.toArray)
188+ }
189+
190+ def addJarToClasspath (localJar : String , loader : ExecutorURLClassLoader ) {
191+ val localJarFile = new File (localJar)
192+ if (! localJarFile.exists()) {
193+ System .err.println(" Jar does not exist: " + localJar + " . Skipping." )
194+ }
195+
196+ val url = localJarFile.getAbsoluteFile.toURI.toURL
197+ loader.addURL(url)
198+ }
199+ }
200+
201+ private [spark] class OptionAssigner (val value : String ,
202+ val clusterManager : Int ,
203+ val deployOnCluster : Boolean ,
204+ val clOption : String = null ,
205+ val sysProp : String = null
206+ ) { }
0 commit comments