Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-9352] [SPARK-9353] Add tests for standalone scheduling code
This also fixes a small issue in the standalone Master that was uncovered by the new tests. For more detail, read the description of SPARK-9353.

Author: Andrew Or <[email protected]>

Closes apache#7668 from andrewor14/standalone-scheduling-tests and squashes the following commits:

d852faf [Andrew Or] Add tests + fix scheduling with memory limits

(cherry picked from commit 1cf1976)
Signed-off-by: Andrew Or <[email protected]>

Conflicts:
	core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
  • Loading branch information
Andrew Or committed Jul 26, 2015
commit a4b80e6dbd007c3d0cc012bacc19361d9eb00b02
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ private[master] class Master(
* allocated at a time, 12 cores from each worker would be assigned to each executor.
* Since 12 < 16, no executors would launch [SPARK-8881].
*/
private[master] def scheduleExecutorsOnWorkers(
private def scheduleExecutorsOnWorkers(
app: ApplicationInfo,
usableWorkers: Array[WorkerInfo],
spreadOutApps: Boolean): Array[Int] = {
Expand All @@ -577,7 +577,11 @@ private[master] class Master(
while (keepScheduling && canLaunchExecutor(pos) && coresToAssign >= coresPerExecutor) {
coresToAssign -= coresPerExecutor
assignedCores(pos) += coresPerExecutor
assignedMemory(pos) += memoryPerExecutor
// If cores per executor is not set, we are assigning 1 core at a time
// without actually meaning to launch 1 executor for each core assigned
if (app.desc.coresPerExecutor.isDefined) {
assignedMemory(pos) += memoryPerExecutor
}

// Spreading out an application means spreading out its executors across as
// many workers as possible. If we are not spreading out, then we should keep
Expand Down
199 changes: 196 additions & 3 deletions core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ import scala.language.postfixOps
import akka.actor.Address
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.scalatest.Matchers
import org.scalatest.{Matchers, PrivateMethodTester}
import org.scalatest.concurrent.Eventually
import other.supplier.{CustomPersistenceEngine, CustomRecoveryModeFactory}

import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.{SecurityManager, SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.deploy._
import org.apache.spark.rpc.RpcEnv

class MasterSuite extends SparkFunSuite with Matchers with Eventually {
class MasterSuite extends SparkFunSuite with Matchers with Eventually with PrivateMethodTester {

test("toAkkaUrl") {
val conf = new SparkConf(loadDefaults = false)
Expand Down Expand Up @@ -184,4 +185,196 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually {
}
}

test("basic scheduling - spread out") {
testBasicScheduling(spreadOut = true)
}

test("basic scheduling - no spread out") {
testBasicScheduling(spreadOut = false)
}

test("scheduling with max cores - spread out") {
testSchedulingWithMaxCores(spreadOut = true)
}

test("scheduling with max cores - no spread out") {
testSchedulingWithMaxCores(spreadOut = false)
}

test("scheduling with cores per executor - spread out") {
testSchedulingWithCoresPerExecutor(spreadOut = true)
}

test("scheduling with cores per executor - no spread out") {
testSchedulingWithCoresPerExecutor(spreadOut = false)
}

test("scheduling with cores per executor AND max cores - spread out") {
testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = true)
}

test("scheduling with cores per executor AND max cores - no spread out") {
testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = false)
}

private def testBasicScheduling(spreadOut: Boolean): Unit = {
val master = makeMaster()
val appInfo = makeAppInfo(1024)
val workerInfo = makeWorkerInfo(4096, 10)
val workerInfos = Array(workerInfo, workerInfo, workerInfo)
val scheduledCores = master.invokePrivate(
_scheduleExecutorsOnWorkers(appInfo, workerInfos, spreadOut))
assert(scheduledCores.length === 3)
assert(scheduledCores(0) === 10)
assert(scheduledCores(1) === 10)
assert(scheduledCores(2) === 10)
}

private def testSchedulingWithMaxCores(spreadOut: Boolean): Unit = {
val master = makeMaster()
val appInfo1 = makeAppInfo(1024, maxCores = Some(8))
val appInfo2 = makeAppInfo(1024, maxCores = Some(16))
val workerInfo = makeWorkerInfo(4096, 10)
val workerInfos = Array(workerInfo, workerInfo, workerInfo)
var scheduledCores = master.invokePrivate(
_scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut))
assert(scheduledCores.length === 3)
// With spreading out, each worker should be assigned a few cores
if (spreadOut) {
assert(scheduledCores(0) === 3)
assert(scheduledCores(1) === 3)
assert(scheduledCores(2) === 2)
} else {
// Without spreading out, the cores should be concentrated on the first worker
assert(scheduledCores(0) === 8)
assert(scheduledCores(1) === 0)
assert(scheduledCores(2) === 0)
}
// Now test the same thing with max cores > cores per worker
scheduledCores = master.invokePrivate(
_scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut))
assert(scheduledCores.length === 3)
if (spreadOut) {
assert(scheduledCores(0) === 6)
assert(scheduledCores(1) === 5)
assert(scheduledCores(2) === 5)
} else {
// Without spreading out, the first worker should be fully booked,
// and the leftover cores should spill over to the second worker only.
assert(scheduledCores(0) === 10)
assert(scheduledCores(1) === 6)
assert(scheduledCores(2) === 0)
}
}

private def testSchedulingWithCoresPerExecutor(spreadOut: Boolean): Unit = {
val master = makeMaster()
val appInfo1 = makeAppInfo(1024, coresPerExecutor = Some(2))
val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2))
val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3))
val workerInfo = makeWorkerInfo(4096, 10)
val workerInfos = Array(workerInfo, workerInfo, workerInfo)
// Each worker should end up with 4 executors with 2 cores each
// This should be 4 because of the memory restriction on each worker
var scheduledCores = master.invokePrivate(
_scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut))
assert(scheduledCores.length === 3)
assert(scheduledCores(0) === 8)
assert(scheduledCores(1) === 8)
assert(scheduledCores(2) === 8)
// Now test the same thing without running into the worker memory limit
// Each worker should now end up with 5 executors with 2 cores each
scheduledCores = master.invokePrivate(
_scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut))
assert(scheduledCores.length === 3)
assert(scheduledCores(0) === 10)
assert(scheduledCores(1) === 10)
assert(scheduledCores(2) === 10)
// Now test the same thing with a cores per executor that 10 is not divisible by
scheduledCores = master.invokePrivate(
_scheduleExecutorsOnWorkers(appInfo3, workerInfos, spreadOut))
assert(scheduledCores.length === 3)
assert(scheduledCores(0) === 9)
assert(scheduledCores(1) === 9)
assert(scheduledCores(2) === 9)
}

// Sorry for the long method name!
private def testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut: Boolean): Unit = {
val master = makeMaster()
val appInfo1 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = Some(4))
val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = Some(20))
val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3), maxCores = Some(20))
val workerInfo = makeWorkerInfo(4096, 10)
val workerInfos = Array(workerInfo, workerInfo, workerInfo)
// We should only launch two executors, each with exactly 2 cores
var scheduledCores = master.invokePrivate(
_scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut))
assert(scheduledCores.length === 3)
if (spreadOut) {
assert(scheduledCores(0) === 2)
assert(scheduledCores(1) === 2)
assert(scheduledCores(2) === 0)
} else {
assert(scheduledCores(0) === 4)
assert(scheduledCores(1) === 0)
assert(scheduledCores(2) === 0)
}
// Test max cores > number of cores per worker
scheduledCores = master.invokePrivate(
_scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut))
assert(scheduledCores.length === 3)
if (spreadOut) {
assert(scheduledCores(0) === 8)
assert(scheduledCores(1) === 6)
assert(scheduledCores(2) === 6)
} else {
assert(scheduledCores(0) === 10)
assert(scheduledCores(1) === 10)
assert(scheduledCores(2) === 0)
}
// Test max cores > number of cores per worker AND
// a cores per executor that is 10 is not divisible by
scheduledCores = master.invokePrivate(
_scheduleExecutorsOnWorkers(appInfo3, workerInfos, spreadOut))
assert(scheduledCores.length === 3)
if (spreadOut) {
assert(scheduledCores(0) === 6)
assert(scheduledCores(1) === 6)
assert(scheduledCores(2) === 6)
} else {
assert(scheduledCores(0) === 9)
assert(scheduledCores(1) === 9)
assert(scheduledCores(2) === 0)
}
}

// ===============================
// | Utility methods for testing |
// ===============================

private val _scheduleExecutorsOnWorkers = PrivateMethod[Array[Int]]('scheduleExecutorsOnWorkers)

private def makeMaster(conf: SparkConf = new SparkConf): Master = {
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(Master.SYSTEM_NAME, "localhost", 7077, conf, securityMgr)
val master = new Master(rpcEnv, rpcEnv.address, 8080, securityMgr, conf)
master
}

private def makeAppInfo(
memoryPerExecutorMb: Int,
coresPerExecutor: Option[Int] = None,
maxCores: Option[Int] = None): ApplicationInfo = {
val desc = new ApplicationDescription(
"test", maxCores, memoryPerExecutorMb, null, "", None, None, coresPerExecutor)
val appId = System.currentTimeMillis.toString
new ApplicationInfo(0, appId, desc, new Date, null, Int.MaxValue)
}

private def makeWorkerInfo(memoryMb: Int, cores: Int): WorkerInfo = {
val workerId = System.currentTimeMillis.toString
new WorkerInfo(workerId, "host", 100, cores, memoryMb, null, 101, "address")
}

}