Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Suggestions
  • Loading branch information
Andrew Or committed Nov 13, 2015
commit c44ef8714c6cde70404043e065e81330746f7881
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ private[spark] class TaskSchedulerImpl(
// Incrementing task IDs
val nextTaskId = new AtomicLong(0)

// Number of tasks runing on each executor
// Number of tasks running on each executor
private val executorIdToTaskCount = new HashMap[String, Int]

// The set of executors we have on each host; this is used to compute hostsAlive, which
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,9 +437,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
val executorsToKill = knownExecutors
.filter { id => !executorsPendingToRemove.contains(id) }
.filter { id => force || !scheduler.isExecutorBusy(id) }
// for test only
.filter { id => force ||
!scheduler.sc.getConf.getBoolean("spark.dynamicAllocation.testing", false)}
executorsPendingToRemove ++= executorsToKill

// If we do not wish to replace the executors we kill, sync the target number of executors
Expand All @@ -452,7 +449,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
numPendingExecutors += knownExecutors.size
}

(force || !executorsToKill.isEmpty) && doKillExecutors(executorsToKill)
doKillExecutors(executorsToKill)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package org.apache.spark.deploy

import scala.collection.mutable
import scala.concurrent.duration._

import org.mockito.Mockito.{mock, when}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.{BeforeAndAfterAll, PrivateMethodTester}
import org.scalatest.concurrent.Eventually._

import org.apache.spark._
Expand All @@ -29,6 +30,7 @@ import org.apache.spark.deploy.master.ApplicationInfo
import org.apache.spark.deploy.master.Master
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor

Expand All @@ -38,7 +40,8 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterE
class StandaloneDynamicAllocationSuite
extends SparkFunSuite
with LocalSparkContext
with BeforeAndAfterAll {
with BeforeAndAfterAll
with PrivateMethodTester {

private val numWorkers = 2
private val conf = new SparkConf()
Expand Down Expand Up @@ -405,7 +408,7 @@ class StandaloneDynamicAllocationSuite
}

test("disable force kill for busy executors (SPARK-9552)") {
sc = new SparkContext(appConf.set("spark.dynamicAllocation.testing", "true"))
sc = new SparkContext(appConf)
val appId = sc.applicationId
eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications()
Expand All @@ -417,17 +420,21 @@ class StandaloneDynamicAllocationSuite
// sync executors between the Master and the driver, needed because
// the driver refuses to kill executors it does not know about
syncExecutors(sc)
var executors = getExecutorIds(sc)
val executors = getExecutorIds(sc)
assert(executors.size === 2)
// force kill busy executor
assert(killExecutorWithForce(sc, executors.head))
assert(killExecutor(sc, executors.head, force = true))
var apps = getApplications()
// kill executor successfully
assert(apps.head.executors.size === 1)
// try to kill busy executor but it should be failed
assert(killExecutorWithForce(sc, executors.head, false) === false)
// simulate running a task on the executor
val getMap = PrivateMethod[mutable.HashMap[String, Int]]('executorIdToTaskCount)
val taskScheduler = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl]
val executorIdToTaskCount = taskScheduler invokePrivate getMap()
executorIdToTaskCount(executors.head) = 1
// kill the busy executor without force; this should fail
assert(killExecutor(sc, executors.head, force = false))
apps = getApplications()
// won't kill busy executor
assert(apps.head.executors.size === 1)
}

Expand Down Expand Up @@ -482,16 +489,13 @@ class StandaloneDynamicAllocationSuite
sc.killExecutors(getExecutorIds(sc).take(n))
}

private def killExecutorWithForce(
sc: SparkContext,
executorId: String,
force: Boolean = true): Boolean = {
/** Kill the given executor, specifying whether to force kill it. */
private def killExecutor(sc: SparkContext, executorId: String, force: Boolean): Boolean = {
syncExecutors(sc)
sc.schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.killExecutors(Seq(executorId), replace = false, force)
case _ =>
logWarning("Killing executors is only supported in coarse-grained mode")
false
case _ => fail("expected coarse grained scheduler")
}
}

Expand Down