Skip to content

Commit f20f1b3

Browse files
committed
[SPARK-4525] MesosSchedulerBackend.resourceOffers cannot decline unused offers from acceptedOffers
- Added code for declining unused offers among acceptedOffers - Edited testCase for checking declining unused offers
1 parent e216ffa commit f20f1b3

File tree

2 files changed

+24
-9
lines changed

2 files changed

+24
-9
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,10 @@ private[spark] class MesosSchedulerBackend(
257257
d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
258258
}
259259

260+
for (o <- acceptedOffers if !slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) {
261+
d.declineOffer(o.getId)
262+
}
263+
260264
declinedOffers.foreach(o => d.declineOffer(o.getId))
261265
}
262266
}

core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@ import org.apache.mesos.SchedulerDriver
2525
import org.apache.mesos.Protos._
2626
import org.scalatest.mock.EasyMockSugar
2727
import org.apache.mesos.Protos.Value.Scalar
28-
import org.easymock.{Capture, EasyMock}
28+
import org.easymock.{IAnswer, Capture, EasyMock}
2929
import java.nio.ByteBuffer
3030
import java.util.Collections
3131
import java.util
3232
import scala.collection.mutable
33+
import scala.collection.mutable.ArrayBuffer
34+
import scala.util.Random
3335

3436
class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with EasyMockSugar {
3537
test("mesos resource offer is launching tasks") {
@@ -43,8 +45,8 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
4345
.setName("cpus")
4446
.setType(Value.Type.SCALAR)
4547
.setScalar(Scalar.newBuilder().setValue(cpu))
46-
builder.setId(OfferID.newBuilder().setValue(id.toString).build()).setFrameworkId(FrameworkID.newBuilder().setValue("f1"))
47-
.setSlaveId(SlaveID.newBuilder().setValue("s1")).setHostname("localhost").build()
48+
builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build()).setFrameworkId(FrameworkID.newBuilder().setValue("f1"))
49+
.setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")).setHostname(s"host${id.toString}").build()
4850
}
4951

5052
val driver = EasyMock.createMock(classOf[SchedulerDriver])
@@ -61,11 +63,18 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
6163
val minCpu = 4
6264
val offers = new java.util.ArrayList[Offer]
6365
offers.add(createOffer(1, minMem, minCpu))
64-
offers.add(createOffer(1, minMem - 1, minCpu))
66+
offers.add(createOffer(2, minMem - 1, minCpu))
67+
offers.add(createOffer(3, minMem, minCpu))
6568
val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
66-
val workerOffers = Seq(offers.get(0)).map(o => new WorkerOffer(
67-
o.getSlaveId.getValue,
68-
o.getHostname,
69+
val workerOffers = new ArrayBuffer[WorkerOffer](2)
70+
workerOffers.append(new WorkerOffer(
71+
offers.get(0).getSlaveId.getValue,
72+
offers.get(0).getHostname,
73+
2
74+
))
75+
workerOffers.append(new WorkerOffer(
76+
offers.get(2).getSlaveId.getValue,
77+
offers.get(2).getHostname,
6978
2
7079
))
7180
val taskDesc = new TaskDescription(1L, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
@@ -79,10 +88,12 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
7988
EasyMock.capture(capture),
8089
EasyMock.anyObject(classOf[Filters])
8190
)
82-
).andReturn(Status.valueOf(1))
83-
EasyMock.expect(driver.declineOffer(offers.get(1).getId)).andReturn(Status.valueOf(1))
91+
).andReturn(Status.valueOf(1)).once
92+
EasyMock.expect(driver.declineOffer(offers.get(1).getId)).andReturn(Status.valueOf(1)).times(1)
93+
EasyMock.expect(driver.declineOffer(offers.get(2).getId)).andReturn(Status.valueOf(1)).times(1)
8494
EasyMock.replay(driver)
8595
backend.resourceOffers(driver, offers)
96+
EasyMock.verify(driver)
8697
assert(capture.getValue.size() == 1)
8798
val taskInfo = capture.getValue.iterator().next()
8899
assert(taskInfo.getName.equals("n1"))

0 commit comments

Comments
 (0)