Skip to content

Commit c4f0697

Browse files
committed
Additional clean-up and fixes on top of existing fix
1 parent f20f1b3 commit c4f0697

File tree

2 files changed

+30
-13
lines changed

2 files changed

+30
-13
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -208,22 +208,25 @@ private[spark] class MesosSchedulerBackend(
208208
*/
209209
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
210210
inClassLoader() {
211-
val (acceptedOffers, declinedOffers) = offers.partition { o =>
211+
// Fail-fast on offers we know will be rejected
212+
val (usableOffers, unUsableOffers) = offers.partition { o =>
212213
val mem = getResource(o.getResourcesList, "mem")
213214
val cpus = getResource(o.getResourcesList, "cpus")
214215
val slaveId = o.getSlaveId.getValue
216+
// TODO(pwendell): Should below be 1 + scheduler.CPUS_PER_TASK?
215217
(mem >= MemoryUtils.calculateTotalMemory(sc) &&
216218
// need at least 1 for executor, 1 for task
217219
cpus >= 2 * scheduler.CPUS_PER_TASK) ||
218220
(slaveIdsWithExecutors.contains(slaveId) &&
219221
cpus >= scheduler.CPUS_PER_TASK)
220222
}
221223

222-
val offerableWorkers = acceptedOffers.map { o =>
224+
val workerOffers = usableOffers.map { o =>
223225
val cpus = if (slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) {
224226
getResource(o.getResourcesList, "cpus").toInt
225227
} else {
226228
// If the executor doesn't exist yet, subtract CPU for executor
229+
// TODO(pwendell): Should below just subtract "1"?
227230
getResource(o.getResourcesList, "cpus").toInt -
228231
scheduler.CPUS_PER_TASK
229232
}
@@ -233,17 +236,21 @@ private[spark] class MesosSchedulerBackend(
233236
cpus)
234237
}
235238

236-
val slaveIdToOffer = acceptedOffers.map(o => o.getSlaveId.getValue -> o).toMap
239+
val slaveIdToOffer = usableOffers.map(o => o.getSlaveId.getValue -> o).toMap
237240

238241
val mesosTasks = new HashMap[String, JArrayList[MesosTaskInfo]]
239242

243+
244+
val slavesIdsOfAcceptedOffers = HashSet[String]()
245+
240246
// Call into the TaskSchedulerImpl
241-
scheduler.resourceOffers(offerableWorkers)
242-
.filter(!_.isEmpty)
247+
val acceptedOffers = scheduler.resourceOffers(workerOffers).filter(!_.isEmpty)
248+
acceptedOffers
243249
.foreach { offer =>
244250
offer.foreach { taskDesc =>
245251
val slaveId = taskDesc.executorId
246252
slaveIdsWithExecutors += slaveId
253+
slavesIdsOfAcceptedOffers += slaveId
247254
taskIdToSlaveId(taskDesc.taskId) = slaveId
248255
mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
249256
.add(createMesosTask(taskDesc, slaveId))
@@ -257,11 +264,14 @@ private[spark] class MesosSchedulerBackend(
257264
d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
258265
}
259266

260-
for (o <- acceptedOffers if !slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) {
267+
// Decline offers that weren't used
268+
// NOTE: This logic assumes that we only get a single offer for each host in a given batch
269+
for (o <- usableOffers if !slavesIdsOfAcceptedOffers.contains(o.getSlaveId.getValue)) {
261270
d.declineOffer(o.getId)
262271
}
263272

264-
declinedOffers.foreach(o => d.declineOffer(o.getId))
273+
// Decline offers we ruled out immediately
274+
unUsableOffers.foreach(o => d.declineOffer(o.getId))
265275
}
266276
}
267277

core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ import scala.collection.mutable.ArrayBuffer
3434
import scala.util.Random
3535

3636
class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with EasyMockSugar {
37-
test("mesos resource offer is launching tasks") {
37+
38+
test("mesos resource offers result in launching tasks") {
3839
def createOffer(id: Int, mem: Int, cpu: Int) = {
3940
val builder = Offer.newBuilder()
4041
builder.addResourcesBuilder()
@@ -53,34 +54,38 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
5354
val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl])
5455

5556
val sc = EasyMock.createMock(classOf[SparkContext])
56-
5757
EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes()
5858
EasyMock.expect(sc.getSparkHome()).andReturn(Option("/path")).anyTimes()
5959
EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes()
6060
EasyMock.expect(sc.conf).andReturn(new SparkConf).anyTimes()
6161
EasyMock.replay(sc)
62+
6263
val minMem = MemoryUtils.calculateTotalMemory(sc).toInt
6364
val minCpu = 4
65+
6466
val offers = new java.util.ArrayList[Offer]
6567
offers.add(createOffer(1, minMem, minCpu))
6668
offers.add(createOffer(2, minMem - 1, minCpu))
6769
offers.add(createOffer(3, minMem, minCpu))
70+
6871
val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
69-
val workerOffers = new ArrayBuffer[WorkerOffer](2)
70-
workerOffers.append(new WorkerOffer(
72+
73+
val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](2)
74+
expectedWorkerOffers.append(new WorkerOffer(
7175
offers.get(0).getSlaveId.getValue,
7276
offers.get(0).getHostname,
7377
2
7478
))
75-
workerOffers.append(new WorkerOffer(
79+
expectedWorkerOffers.append(new WorkerOffer(
7680
offers.get(2).getSlaveId.getValue,
7781
offers.get(2).getHostname,
7882
2
7983
))
8084
val taskDesc = new TaskDescription(1L, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
81-
EasyMock.expect(taskScheduler.resourceOffers(EasyMock.eq(workerOffers))).andReturn(Seq(Seq(taskDesc)))
85+
EasyMock.expect(taskScheduler.resourceOffers(EasyMock.eq(expectedWorkerOffers))).andReturn(Seq(Seq(taskDesc)))
8286
EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes()
8387
EasyMock.replay(taskScheduler)
88+
8489
val capture = new Capture[util.Collection[TaskInfo]]
8590
EasyMock.expect(
8691
driver.launchTasks(
@@ -95,8 +100,10 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
95100
backend.resourceOffers(driver, offers)
96101
EasyMock.verify(driver)
97102
assert(capture.getValue.size() == 1)
103+
98104
val taskInfo = capture.getValue.iterator().next()
99105
assert(taskInfo.getName.equals("n1"))
106+
100107
val cpus = taskInfo.getResourcesList.get(0)
101108
assert(cpus.getName.equals("cpus"))
102109
assert(cpus.getScalar.getValue.equals(2.0))

0 commit comments

Comments
 (0)