Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,18 @@ private[spark] trait ExecutorAllocationClient {
/**
* Request that the cluster manager kill the specified executors.
*
* When asking the executor to be replaced, the executor loss is considered a failure, and
* killed tasks that are running on the executor will count towards the failure limits. If no
* replacement is being requested, then the tasks will not count towards the limit.
*
* @param executorIds identifiers of executors to kill
* @param replace whether to replace the killed executors with new ones, default false
* @param adjustTargetNumExecutors whether the target number of executors will be adjusted down
* after these executors have been killed
* @param countFailures if there are tasks running on the executors when they are killed, whether
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still a little confused about this parameter.

If force = false, it's a no op. And all call sites I've seen seem to set this parameter to false. So is there something I'm missing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoops, I was supposed to set countFailures = true in sc.killAndReplaceExecutors, thanks for catching that.

* to count those failures toward task failure limits
* @param force whether to force kill busy executors, default false
* @return the ids of the executors acknowledged by the cluster manager to be removed.
*/
def killExecutors(
executorIds: Seq[String],
replace: Boolean = false,
adjustTargetNumExecutors: Boolean,
countFailures: Boolean,
force: Boolean = false): Seq[String]

/**
Expand All @@ -81,7 +81,8 @@ private[spark] trait ExecutorAllocationClient {
* @return whether the request is acknowledged by the cluster manager.
*/
def killExecutor(executorId: String): Boolean = {
val killedExecutors = killExecutors(Seq(executorId))
val killedExecutors = killExecutors(Seq(executorId), adjustTargetNumExecutors = true,
countFailures = false)
killedExecutors.nonEmpty && killedExecutors(0).equals(executorId)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS}
import org.apache.spark.metrics.source.Source
import org.apache.spark.scheduler._
import org.apache.spark.storage.BlockManagerMaster
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}

/**
Expand Down Expand Up @@ -81,7 +82,8 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
private[spark] class ExecutorAllocationManager(
client: ExecutorAllocationClient,
listenerBus: LiveListenerBus,
conf: SparkConf)
conf: SparkConf,
blockManagerMaster: BlockManagerMaster)
extends Logging {

allocationManager =>
Expand Down Expand Up @@ -151,7 +153,7 @@ private[spark] class ExecutorAllocationManager(
private var clock: Clock = new SystemClock()

// Listener for Spark events that impact the allocation policy
private val listener = new ExecutorAllocationListener
val listener = new ExecutorAllocationListener

// Executor that handles the scheduling task.
private val executor =
Expand Down Expand Up @@ -334,6 +336,11 @@ private[spark] class ExecutorAllocationManager(

// If the new target has not changed, avoid sending a message to the cluster manager
if (numExecutorsTarget < oldNumExecutorsTarget) {
// We lower the target number of executors but don't actively kill any yet. Killing is
// controlled separately by an idle timeout. It's still helpful to reduce the target number
// in case an executor just happens to get lost (eg., bad hardware, or the cluster manager
// preempts it) -- in that case, there is no point in trying to immediately get a new
// executor, since we wouldn't even use it yet.
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " +
s"$oldNumExecutorsTarget) because not all requested executors are actually needed")
Expand Down Expand Up @@ -455,7 +462,10 @@ private[spark] class ExecutorAllocationManager(
val executorsRemoved = if (testing) {
executorIdsToBeRemoved
} else {
client.killExecutors(executorIdsToBeRemoved)
// We don't want to change our target number of executors, because we already did that
// when the task backlog decreased.
client.killExecutors(executorIdsToBeRemoved, adjustTargetNumExecutors = false,
countFailures = false, force = false)
}
// [SPARK-21834] killExecutors api reduces the target number of executors.
// So we need to update the target with desired value.
Expand Down Expand Up @@ -575,7 +585,7 @@ private[spark] class ExecutorAllocationManager(
// Note that it is not necessary to query the executors since all the cached
// blocks we are concerned with are reported to the driver. Note that this
// does not include broadcast blocks.
val hasCachedBlocks = SparkEnv.get.blockManager.master.hasCachedBlocks(executorId)
val hasCachedBlocks = blockManagerMaster.hasCachedBlocks(executorId)
val now = clock.getTimeMillis()
val timeout = {
if (hasCachedBlocks) {
Expand Down Expand Up @@ -610,7 +620,7 @@ private[spark] class ExecutorAllocationManager(
* This class is intentionally conservative in its assumptions about the relative ordering
* and consistency of events returned by the listener.
*/
private class ExecutorAllocationListener extends SparkListener {
private[spark] class ExecutorAllocationListener extends SparkListener {

private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
// Number of running tasks per stage including speculative tasks.
Expand Down
13 changes: 10 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,8 @@ class SparkContext(config: SparkConf) extends Logging {
schedulerBackend match {
case b: ExecutorAllocationClient =>
Some(new ExecutorAllocationManager(
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf))
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
_env.blockManager.master))
case _ =>
None
}
Expand Down Expand Up @@ -1632,6 +1633,8 @@ class SparkContext(config: SparkConf) extends Logging {
* :: DeveloperApi ::
* Request that the cluster manager kill the specified executors.
*
* This is not supported when dynamic allocation is turned on.
*
* @note This is an indication to the cluster manager that the application wishes to adjust
* its resource usage downwards. If the application wishes to replace the executors it kills
* through this method with new ones, it should follow up explicitly with a call to
Expand All @@ -1643,7 +1646,10 @@ class SparkContext(config: SparkConf) extends Logging {
def killExecutors(executorIds: Seq[String]): Boolean = {
schedulerBackend match {
case b: ExecutorAllocationClient =>
b.killExecutors(executorIds, replace = false, force = true).nonEmpty
require(executorAllocationManager.isEmpty,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a developer api, so probably ok, but this is a change in behavior. Is it just not possible to support this with dynamic allocation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would calling this mean with dynamic allocation on? Note this api explicitly says its meant to adjust resource usage downwards. If you've got just one executor, and then you kill it, should your app sit with 0 executors? Or even if you've got 10 executors, and you kill one -- when is dynamic allocation allowed to bump the total back up? I can't think of useful clear semantics for this

(though this is not necessary to fix the bug, I could pull this out and move to a discussion in a new jira)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why you'd use this with dynamic allocation, but it's been possible in the past. It's probably ok to change this though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @squito , I'm quite questioned about the cases:

If you've got just one executor, and then you kill it, should your app sit with 0 executors?

if app sit with 0 executors, then pending tasks increase, which lead to ExecutorAllocationManager increases target number of executors. So, app will not always sit with 0 executors.

Or even if you've got 10 executors, and you kill one -- when is dynamic allocation allowed to bump the total back up?

for this case, to be honest, I really do not get your point. But, it must blame my poor English.

And, what will happens if we use this method without ExecutorAllocationManager ? Or do we really need adjust TargetNumExecutors (set adjustTargetNumExecutors = true below) if we are not using ExecutorAllocationManager ?

see these several lines in killExecutors():

if (adjustTargetNumExecutors) {
  requestedTotalExecutors = math.max(requestedTotalExecutors - executorsToKill.size, 0)
  ...
  doRequestTotalExecutors(requestedTotalExecutors)
}

Set adjustTargetNumExecutors = true will change requestedTotalExecutors . And IIUC, requestedTotalExecutors is only used in dynamic allocation mode. So, if we are not using ExecutorAllocationManager , allocation client will request requestedTotalExecutors = 0 number of executors to cluster manager (this is really terrible). But, actually, app without ExecutorAllocationManager do not have a limit requesting executors (in default).

Actually, I think this series methods, including killAndReplaceExecutor , requestExecutors, etc, are designed with dynamic allocation mode. And if we still want use these methods while app do not use ExecutorAllocationManager, we should not change requestedTotalExecutors , or even not request cluster manager with a specific number.

WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point in general is that the semantics of combining SparkContext.killExecutors() (which is a publicly visible function, which the end user can call) with dynamic allocation aren't well defined, and I have no idea what the behavior really should be. I was giving some examples of weird behavior.

If you've got just one executor, and then you kill it, should your app sit with 0 executors?

if app sit with 0 executors, then pending tasks increase, which lead to ExecutorAllocationManager increases target number of executors. So, app will not always sit with 0 executors.

Thats true -- but only when pending tasks increase. But if you've got 0 executors, how do you expect pending tasks to increase? That would only happen when another taskset gets submitted, but with no executors your spark program will probably just be blocked.

In the other case, I'm just trying to point out strange interactions between user control and dynamic allocation control. Imagine this sequence:

Dynamic Allocation: 1000 tasks, so 1000 executors
User: I only want 10 executors, so let me tell spark to kill 990 of them
...
... another taskset is submitted to add 1 more task ...
Dynamic Allocation: 1001 tasks, so 1001 executors
User: ??? I set the target to 10 executors, what happened?

So, if we are not using ExecutorAllocationManager, allocation client will request requestedTotalExecutors = 0 number of executors to cluster manager (this is really terrible)

hmm, from a quick look, I think you're right. it doesn't seem that using sc.killExecutors() doesn't make sense even with dynamic allocation off. I think CoarseGrainedSchedulerBackend should actually initiliaze requestedTotalExecutors with SchedulerBackendUtils.getInitialTargetExecutorNumber

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @squito , thanks for your reply.

but only when pending tasks increase.

ExecutorAllocationManager will check pending (or backlog) tasks periodically. So, we do not have to wait for increment actually.

And for Dynamic Allocation & User case, yeah, that's hard to define.

And I checked SchedulerBackendUtils.getInitialTargetExecutorNumbe, it set DEFAULT_NUMBER_EXECUTORS = 2. But, this is not consistent with Master, which set executorLimit to Int.MaxValue if we are not under dynamic allocation mode. Maybe we can just init requestedTotalExecutors with Int.MaxValue(only when we are not under dynamic allocation mode).
Or, we do not call doRequestTotalExecutors if we call requestExecutors or killExecutors, except requestTotalExecutors(only when we are not under dynamic allocation mode).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito any thoughts?

"killExecutors() unsupported with Dynamic Allocation turned on")
b.killExecutors(executorIds, adjustTargetNumExecutors = true, countFailures = false,
force = true).nonEmpty
case _ =>
logWarning("Killing executors is not supported by current scheduler.")
false
Expand Down Expand Up @@ -1681,7 +1687,8 @@ class SparkContext(config: SparkConf) extends Logging {
private[spark] def killAndReplaceExecutor(executorId: String): Boolean = {
schedulerBackend match {
case b: ExecutorAllocationClient =>
b.killExecutors(Seq(executorId), replace = true, force = true).nonEmpty
b.killExecutors(Seq(executorId), adjustTargetNumExecutors = false, countFailures = true,
force = true).nonEmpty
case _ =>
logWarning("Killing executors is not supported by current scheduler.")
false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ private[scheduler] class BlacklistTracker (
case Some(a) =>
logInfo(s"Killing blacklisted executor id $exec " +
s"since ${config.BLACKLIST_KILL_ENABLED.key} is set.")
a.killExecutors(Seq(exec), true, true)
a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, countFailures = false,
force = true)
case None =>
logWarning(s"Not attempting to kill blacklisted executor id $exec " +
s"since allocation client is not defined.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

case KillExecutorsOnHost(host) =>
scheduler.getExecutorsAliveOnHost(host).foreach { exec =>
killExecutors(exec.toSeq, replace = true, force = true)
killExecutors(exec.toSeq, adjustTargetNumExecutors = false, countFailures = false,
force = true)
}

case UpdateDelegationTokens(newDelegationTokens) =>
Expand Down Expand Up @@ -584,18 +585,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
/**
* Request that the cluster manager kill the specified executors.
*
* When asking the executor to be replaced, the executor loss is considered a failure, and
* killed tasks that are running on the executor will count towards the failure limits. If no
* replacement is being requested, then the tasks will not count towards the limit.
*
* @param executorIds identifiers of executors to kill
* @param replace whether to replace the killed executors with new ones, default false
* @param adjustTargetNumExecutors whether the target number of executors be adjusted down
* after these executors have been killed
* @param countFailures if there are tasks running on the executors when they are killed, whether
* those failures be counted to task failure limits?
* @param force whether to force kill busy executors, default false
* @return the ids of the executors acknowledged by the cluster manager to be removed.
*/
final override def killExecutors(
executorIds: Seq[String],
replace: Boolean,
adjustTargetNumExecutors: Boolean,
countFailures: Boolean,
force: Boolean): Seq[String] = {
logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")

Expand All @@ -610,20 +611,21 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
val executorsToKill = knownExecutors
.filter { id => !executorsPendingToRemove.contains(id) }
.filter { id => force || !scheduler.isExecutorBusy(id) }
executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace }
executorsToKill.foreach { id => executorsPendingToRemove(id) = !countFailures }

logInfo(s"Actual list of executor(s) to be killed is ${executorsToKill.mkString(", ")}")

// If we do not wish to replace the executors we kill, sync the target number of executors
// with the cluster manager to avoid allocating new ones. When computing the new target,
// take into account executors that are pending to be added or removed.
val adjustTotalExecutors =
if (!replace) {
if (adjustTargetNumExecutors) {
requestedTotalExecutors = math.max(requestedTotalExecutors - executorsToKill.size, 0)
if (requestedTotalExecutors !=
(numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)) {
logDebug(
s"""killExecutors($executorIds, $replace, $force): Executor counts do not match:
s"""killExecutors($executorIds, $adjustTargetNumExecutors, $countFailures, $force):
|Executor counts do not match:
|requestedTotalExecutors = $requestedTotalExecutors
|numExistingExecutors = $numExistingExecutors
|numPendingExecutors = $numPendingExecutors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package org.apache.spark

import scala.collection.mutable

import org.mockito.Matchers.{any, eq => meq}
import org.mockito.Mockito.{mock, never, verify, when}
import org.scalatest.{BeforeAndAfter, PrivateMethodTester}

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.ExternalClusterManager
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.scheduler.local.LocalSchedulerBackend
import org.apache.spark.storage.BlockManagerMaster
import org.apache.spark.util.ManualClock

/**
Expand Down Expand Up @@ -1050,6 +1053,66 @@ class ExecutorAllocationManagerSuite
assert(removeTimes(manager) === Map.empty)
}

test("SPARK-23365 Don't update target num executors when killing idle executors") {
val minExecutors = 1
val initialExecutors = 1
val maxExecutors = 2
val conf = new SparkConf()
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.shuffle.service.enabled", "true")
.set("spark.dynamicAllocation.minExecutors", minExecutors.toString)
.set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString)
.set("spark.dynamicAllocation.initialExecutors", initialExecutors.toString)
.set("spark.dynamicAllocation.schedulerBacklogTimeout", "1000ms")
.set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", "1000ms")
.set("spark.dynamicAllocation.executorIdleTimeout", s"3000ms")
val mockAllocationClient = mock(classOf[ExecutorAllocationClient])
val mockBMM = mock(classOf[BlockManagerMaster])
val manager = new ExecutorAllocationManager(
mockAllocationClient, mock(classOf[LiveListenerBus]), conf, mockBMM)
val clock = new ManualClock()
manager.setClock(clock)

when(mockAllocationClient.requestTotalExecutors(meq(2), any(), any())).thenReturn(true)
// test setup -- job with 2 tasks, scale up to two executors
assert(numExecutorsTarget(manager) === 1)
manager.listener.onExecutorAdded(SparkListenerExecutorAdded(
clock.getTimeMillis(), "executor-1", new ExecutorInfo("host1", 1, Map.empty)))
manager.listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 2)))
clock.advance(1000)
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.getTimeMillis())
assert(numExecutorsTarget(manager) === 2)
val taskInfo0 = createTaskInfo(0, 0, "executor-1")
manager.listener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo0))
manager.listener.onExecutorAdded(SparkListenerExecutorAdded(
clock.getTimeMillis(), "executor-2", new ExecutorInfo("host1", 1, Map.empty)))
val taskInfo1 = createTaskInfo(1, 1, "executor-2")
manager.listener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo1))
assert(numExecutorsTarget(manager) === 2)

// have one task finish -- we should adjust the target number of executors down
// but we should *not* kill any executors yet
manager.listener.onTaskEnd(SparkListenerTaskEnd(0, 0, null, Success, taskInfo0, null))
assert(maxNumExecutorsNeeded(manager) === 1)
assert(numExecutorsTarget(manager) === 2)
clock.advance(1000)
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.getTimeMillis())
assert(numExecutorsTarget(manager) === 1)
verify(mockAllocationClient, never).killExecutors(any(), any(), any(), any())

// now we cross the idle timeout for executor-1, so we kill it. the really important
// thing here is that we do *not* ask the executor allocation client to adjust the target
// number of executors down
when(mockAllocationClient.killExecutors(Seq("executor-1"), false, false, false))
.thenReturn(Seq("executor-1"))
clock.advance(3000)
schedule(manager)
assert(maxNumExecutorsNeeded(manager) === 1)
assert(numExecutorsTarget(manager) === 1)
// here's the important verify -- we did kill the executors, but did not adjust the target count
verify(mockAllocationClient).killExecutors(Seq("executor-1"), false, false, false)
}

private def createSparkContext(
minExecutors: Int = 1,
maxExecutors: Int = 5,
Expand Down Expand Up @@ -1268,7 +1331,8 @@ private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend

override def killExecutors(
executorIds: Seq[String],
replace: Boolean,
adjustTargetNumExecutors: Boolean,
countFailures: Boolean,
force: Boolean): Seq[String] = executorIds

override def start(): Unit = sb.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,8 @@ class StandaloneDynamicAllocationSuite
syncExecutors(sc)
sc.schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.killExecutors(Seq(executorId), replace = false, force)
b.killExecutors(Seq(executorId), adjustTargetNumExecutors = true, countFailures = false,
force)
case _ => fail("expected coarse grained scheduler")
}
}
Expand Down
Loading