Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
[SPARK-4525] MesosSchedulerBackend.resourceOffers cannot decline unus…
…ed offers from acceptedOffers

- Fix a case that unused node cannot be declined when slaveIdsWithExecutors has already that node.
  • Loading branch information
jongyoul committed Nov 25, 2014
commit 63855bf5c9327f863d6535c17e4c1053ed2a3c06
Original file line number Diff line number Diff line change
Expand Up @@ -255,11 +255,10 @@ private[spark] class MesosSchedulerBackend(

mesosTasks.foreach { case (slaveId, tasks) =>
d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
acceptedOffers -= slaveIdToOffer(slaveId)
}

for (o <- acceptedOffers if !slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) {
d.declineOffer(o.getId)
}
acceptedOffers.foreach(o => d.declineOffer(o.getId))

declinedOffers.foreach(o => d.declineOffer(o.getId))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,37 @@
package org.apache.spark.scheduler.mesos

import org.scalatest.FunSuite
import org.apache.spark.{scheduler, SparkConf, SparkContext, LocalSparkContext}
import org.apache.spark.{SparkConf, SparkContext, LocalSparkContext}
import org.apache.spark.scheduler.{TaskDescription, WorkerOffer, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.mesos.{MemoryUtils, MesosSchedulerBackend}
import org.apache.mesos.SchedulerDriver
import org.apache.mesos.{Protos, SchedulerDriver}
import org.apache.mesos.Protos._
import org.scalatest.mock.EasyMockSugar
import org.apache.mesos.Protos.Value.Scalar
import org.easymock.{IAnswer, Capture, EasyMock}
import org.easymock.{Capture, EasyMock}
import java.nio.ByteBuffer
import java.util.{ArrayList => JArrayList, List => JList}
import java.util.Collections
import java.util
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.Random

class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with EasyMockSugar {
test("mesos resource offer is launching tasks") {
def createOffer(id: Int, mem: Int, cpu: Int) = {
val builder = Offer.newBuilder()
builder.addResourcesBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
.setScalar(Scalar.newBuilder().setValue(mem))
builder.addResourcesBuilder()
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Scalar.newBuilder().setValue(cpu))
builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build()).setFrameworkId(FrameworkID.newBuilder().setValue("f1"))
.setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")).setHostname(s"host${id.toString}").build()
}
def createOffer(id: Int, mem: Int, cpu: Int) = {
val builder = Offer.newBuilder()
builder.addResourcesBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
.setScalar(Scalar.newBuilder().setValue(mem))
builder.addResourcesBuilder()
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Scalar.newBuilder().setValue(cpu))
builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build()).setFrameworkId(FrameworkID.newBuilder().setValue("f1"))
.setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")).setHostname(s"host${id.toString}").build()
}

test("mesos resource offer is launching tasks") {
val driver = EasyMock.createMock(classOf[SchedulerDriver])
val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl])

Expand Down Expand Up @@ -102,4 +102,44 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
assert(cpus.getScalar.getValue.equals(2.0))
assert(taskInfo.getSlaveId.getValue.equals("s1"))
}

test("control with multiple task in one slave") {
val driver = EasyMock.createMock(classOf[SchedulerDriver])
val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl])

val sc = EasyMock.createMock(classOf[SparkContext])

EasyMock.expect(sc.executorMemory).andReturn(1000).anyTimes
EasyMock.expect(sc.getSparkHome).andReturn(Option("/path")).anyTimes
EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes
EasyMock.expect(sc.conf).andReturn(new SparkConf).anyTimes
EasyMock.replay(sc)

val minMem = MemoryUtils.calculateTotalMemory(sc).toInt
val offers = new JArrayList[Offer]
offers.add(createOffer(0, minMem, 16))

val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
val workerOffersFirst = new ArrayBuffer[WorkerOffer](1)
workerOffersFirst.append(new WorkerOffer(offers.get(0).getSlaveId.getValue, offers.get(0).getHostname, 14))
val workerOffersSecond = new ArrayBuffer[WorkerOffer](1)
workerOffersSecond.append(new WorkerOffer(offers.get(0).getSlaveId.getValue, offers.get(0).getHostname, 16))
val taskDesc = new TaskDescription(1L, "s0", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
EasyMock.expect(taskScheduler.resourceOffers(EasyMock.eq(workerOffersFirst))).andReturn(Seq(Seq(taskDesc))).once
EasyMock.expect(taskScheduler.resourceOffers(EasyMock.eq(workerOffersSecond))).andReturn(Seq(Nil)).once
EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes
EasyMock.replay(taskScheduler)

EasyMock.expect(
driver.launchTasks(
EasyMock.anyObject(classOf[util.Collection[Protos.OfferID]]),
EasyMock.anyObject(classOf[util.Collection[Protos.TaskInfo]]),
EasyMock.anyObject(classOf[Filters]))
).andReturn(Status.valueOf(1)).once
EasyMock.expect(driver.declineOffer(offers.get(0).getId)).andReturn(Status.valueOf(1)).times(1)
EasyMock.replay(driver)
backend.resourceOffers(driver, offers)
backend.resourceOffers(driver, offers)
EasyMock.verify(driver)
}
}