@@ -25,14 +25,15 @@ import scala.language.postfixOps
2525
2626import org .json4s ._
2727import org .json4s .jackson .JsonMethods ._
28- import org .scalatest .Matchers
28+ import org .scalatest .{ Matchers , PrivateMethodTester }
2929import org .scalatest .concurrent .Eventually
3030import other .supplier .{CustomPersistenceEngine , CustomRecoveryModeFactory }
3131
32- import org .apache .spark .{SparkConf , SparkFunSuite }
32+ import org .apache .spark .{SecurityManager , SparkConf , SparkFunSuite }
3333import org .apache .spark .deploy ._
34+ import org .apache .spark .rpc .RpcEnv
3435
35- class MasterSuite extends SparkFunSuite with Matchers with Eventually {
36+ class MasterSuite extends SparkFunSuite with Matchers with Eventually with PrivateMethodTester {
3637
3738 test(" can use a custom recovery mode factory" ) {
3839 val conf = new SparkConf (loadDefaults = false )
@@ -142,4 +143,196 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually {
142143 }
143144 }
144145
146+ test(" basic scheduling - spread out" ) {
147+ testBasicScheduling(spreadOut = true )
148+ }
149+
150+ test(" basic scheduling - no spread out" ) {
151+ testBasicScheduling(spreadOut = false )
152+ }
153+
154+ test(" scheduling with max cores - spread out" ) {
155+ testSchedulingWithMaxCores(spreadOut = true )
156+ }
157+
158+ test(" scheduling with max cores - no spread out" ) {
159+ testSchedulingWithMaxCores(spreadOut = false )
160+ }
161+
162+ test(" scheduling with cores per executor - spread out" ) {
163+ testSchedulingWithCoresPerExecutor(spreadOut = true )
164+ }
165+
166+ test(" scheduling with cores per executor - no spread out" ) {
167+ testSchedulingWithCoresPerExecutor(spreadOut = false )
168+ }
169+
170+ test(" scheduling with cores per executor AND max cores - spread out" ) {
171+ testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = true )
172+ }
173+
174+ test(" scheduling with cores per executor AND max cores - no spread out" ) {
175+ testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = false )
176+ }
177+
178+ private def testBasicScheduling (spreadOut : Boolean ): Unit = {
179+ val master = makeMaster()
180+ val appInfo = makeAppInfo(1024 )
181+ val workerInfo = makeWorkerInfo(4096 , 10 )
182+ val workerInfos = Array (workerInfo, workerInfo, workerInfo)
183+ val scheduledCores = master.invokePrivate(
184+ _scheduleExecutorsOnWorkers(appInfo, workerInfos, spreadOut))
185+ assert(scheduledCores.length === 3 )
186+ assert(scheduledCores(0 ) === 10 )
187+ assert(scheduledCores(1 ) === 10 )
188+ assert(scheduledCores(2 ) === 10 )
189+ }
190+
191+ private def testSchedulingWithMaxCores (spreadOut : Boolean ): Unit = {
192+ val master = makeMaster()
193+ val appInfo1 = makeAppInfo(1024 , maxCores = Some (8 ))
194+ val appInfo2 = makeAppInfo(1024 , maxCores = Some (16 ))
195+ val workerInfo = makeWorkerInfo(4096 , 10 )
196+ val workerInfos = Array (workerInfo, workerInfo, workerInfo)
197+ var scheduledCores = master.invokePrivate(
198+ _scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut))
199+ assert(scheduledCores.length === 3 )
200+ // With spreading out, each worker should be assigned a few cores
201+ if (spreadOut) {
202+ assert(scheduledCores(0 ) === 3 )
203+ assert(scheduledCores(1 ) === 3 )
204+ assert(scheduledCores(2 ) === 2 )
205+ } else {
206+ // Without spreading out, the cores should be concentrated on the first worker
207+ assert(scheduledCores(0 ) === 8 )
208+ assert(scheduledCores(1 ) === 0 )
209+ assert(scheduledCores(2 ) === 0 )
210+ }
211+ // Now test the same thing with max cores > cores per worker
212+ scheduledCores = master.invokePrivate(
213+ _scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut))
214+ assert(scheduledCores.length === 3 )
215+ if (spreadOut) {
216+ assert(scheduledCores(0 ) === 6 )
217+ assert(scheduledCores(1 ) === 5 )
218+ assert(scheduledCores(2 ) === 5 )
219+ } else {
220+ // Without spreading out, the first worker should be fully booked,
221+ // and the leftover cores should spill over to the second worker only.
222+ assert(scheduledCores(0 ) === 10 )
223+ assert(scheduledCores(1 ) === 6 )
224+ assert(scheduledCores(2 ) === 0 )
225+ }
226+ }
227+
228+ private def testSchedulingWithCoresPerExecutor (spreadOut : Boolean ): Unit = {
229+ val master = makeMaster()
230+ val appInfo1 = makeAppInfo(1024 , coresPerExecutor = Some (2 ))
231+ val appInfo2 = makeAppInfo(256 , coresPerExecutor = Some (2 ))
232+ val appInfo3 = makeAppInfo(256 , coresPerExecutor = Some (3 ))
233+ val workerInfo = makeWorkerInfo(4096 , 10 )
234+ val workerInfos = Array (workerInfo, workerInfo, workerInfo)
235+ // Each worker should end up with 4 executors with 2 cores each
236+ // This should be 4 because of the memory restriction on each worker
237+ var scheduledCores = master.invokePrivate(
238+ _scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut))
239+ assert(scheduledCores.length === 3 )
240+ assert(scheduledCores(0 ) === 8 )
241+ assert(scheduledCores(1 ) === 8 )
242+ assert(scheduledCores(2 ) === 8 )
243+ // Now test the same thing without running into the worker memory limit
244+ // Each worker should now end up with 5 executors with 2 cores each
245+ scheduledCores = master.invokePrivate(
246+ _scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut))
247+ assert(scheduledCores.length === 3 )
248+ assert(scheduledCores(0 ) === 10 )
249+ assert(scheduledCores(1 ) === 10 )
250+ assert(scheduledCores(2 ) === 10 )
251+ // Now test the same thing with a cores per executor that 10 is not divisible by
252+ scheduledCores = master.invokePrivate(
253+ _scheduleExecutorsOnWorkers(appInfo3, workerInfos, spreadOut))
254+ assert(scheduledCores.length === 3 )
255+ assert(scheduledCores(0 ) === 9 )
256+ assert(scheduledCores(1 ) === 9 )
257+ assert(scheduledCores(2 ) === 9 )
258+ }
259+
260+ // Sorry for the long method name!
261+ private def testSchedulingWithCoresPerExecutorAndMaxCores (spreadOut : Boolean ): Unit = {
262+ val master = makeMaster()
263+ val appInfo1 = makeAppInfo(256 , coresPerExecutor = Some (2 ), maxCores = Some (4 ))
264+ val appInfo2 = makeAppInfo(256 , coresPerExecutor = Some (2 ), maxCores = Some (20 ))
265+ val appInfo3 = makeAppInfo(256 , coresPerExecutor = Some (3 ), maxCores = Some (20 ))
266+ val workerInfo = makeWorkerInfo(4096 , 10 )
267+ val workerInfos = Array (workerInfo, workerInfo, workerInfo)
268+ // We should only launch two executors, each with exactly 2 cores
269+ var scheduledCores = master.invokePrivate(
270+ _scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut))
271+ assert(scheduledCores.length === 3 )
272+ if (spreadOut) {
273+ assert(scheduledCores(0 ) === 2 )
274+ assert(scheduledCores(1 ) === 2 )
275+ assert(scheduledCores(2 ) === 0 )
276+ } else {
277+ assert(scheduledCores(0 ) === 4 )
278+ assert(scheduledCores(1 ) === 0 )
279+ assert(scheduledCores(2 ) === 0 )
280+ }
281+ // Test max cores > number of cores per worker
282+ scheduledCores = master.invokePrivate(
283+ _scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut))
284+ assert(scheduledCores.length === 3 )
285+ if (spreadOut) {
286+ assert(scheduledCores(0 ) === 8 )
287+ assert(scheduledCores(1 ) === 6 )
288+ assert(scheduledCores(2 ) === 6 )
289+ } else {
290+ assert(scheduledCores(0 ) === 10 )
291+ assert(scheduledCores(1 ) === 10 )
292+ assert(scheduledCores(2 ) === 0 )
293+ }
294+ // Test max cores > number of cores per worker AND
295+ // a cores per executor that is 10 is not divisible by
296+ scheduledCores = master.invokePrivate(
297+ _scheduleExecutorsOnWorkers(appInfo3, workerInfos, spreadOut))
298+ assert(scheduledCores.length === 3 )
299+ if (spreadOut) {
300+ assert(scheduledCores(0 ) === 6 )
301+ assert(scheduledCores(1 ) === 6 )
302+ assert(scheduledCores(2 ) === 6 )
303+ } else {
304+ assert(scheduledCores(0 ) === 9 )
305+ assert(scheduledCores(1 ) === 9 )
306+ assert(scheduledCores(2 ) === 0 )
307+ }
308+ }
309+
310+ // ===============================
311+ // | Utility methods for testing |
312+ // ===============================
313+
314+ private val _scheduleExecutorsOnWorkers = PrivateMethod [Array [Int ]](' scheduleExecutorsOnWorkers )
315+
316+ private def makeMaster (conf : SparkConf = new SparkConf ): Master = {
317+ val securityMgr = new SecurityManager (conf)
318+ val rpcEnv = RpcEnv .create(Master .SYSTEM_NAME , " localhost" , 7077 , conf, securityMgr)
319+ val master = new Master (rpcEnv, rpcEnv.address, 8080 , securityMgr, conf)
320+ master
321+ }
322+
323+ private def makeAppInfo (
324+ memoryPerExecutorMb : Int ,
325+ coresPerExecutor : Option [Int ] = None ,
326+ maxCores : Option [Int ] = None ): ApplicationInfo = {
327+ val desc = new ApplicationDescription (
328+ " test" , maxCores, memoryPerExecutorMb, null , " " , None , None , coresPerExecutor)
329+ val appId = System .currentTimeMillis.toString
330+ new ApplicationInfo (0 , appId, desc, new Date , null , Int .MaxValue )
331+ }
332+
333+ private def makeWorkerInfo (memoryMb : Int , cores : Int ): WorkerInfo = {
334+ val workerId = System .currentTimeMillis.toString
335+ new WorkerInfo (workerId, " host" , 100 , cores, memoryMb, null , 101 , " address" )
336+ }
337+
145338}
0 commit comments