Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,19 @@ private[spark] trait ExecutorAllocationClient {
* Request that the cluster manager kill the specified executors.
* @return whether the request is acknowledged by the cluster manager.
*/
def killExecutors(executorIds: Seq[String]): Boolean
def killExecutors(executorIds: Seq[String], force: Boolean): Boolean

/**
* Request that the cluster manager kill the specified executor.
* @return whether the request is acknowledged by the cluster manager.
*/
def killExecutor(executorId: String): Boolean = killExecutors(Seq(executorId))
private[spark] def killExecutor(executorId: String): Boolean =
killExecutors(Seq(executorId), force = false)

/**
* Request that the cluster manager kill the specified executor.
* @return whether the request is acknowledged by the cluster manager.
*/
def killExecutor(executorId: String, force: Boolean): Boolean =
killExecutors(Seq(executorId), force)
}
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,8 @@ private[spark] class ExecutorAllocationManager(
// Send a request to the backend to kill this executor
val removeRequestAcknowledged = testing || client.killExecutor(executorId)
if (removeRequestAcknowledged) {
// even we get removeRequestAcknowledged, the executor may not be killed
// it can be rescued while onTaskStart event happens
logInfo(s"Removing executor $executorId because it has been idle for " +
s"$executorIdleTimeoutS seconds (new desired total will be ${numExistingExecutors - 1})")
executorsPendingToRemove.add(executorId)
Expand Down Expand Up @@ -509,6 +511,13 @@ private[spark] class ExecutorAllocationManager(
private def onExecutorBusy(executorId: String): Unit = synchronized {
logDebug(s"Clearing idle timer for $executorId because it is now running a task")
removeTimes.remove(executorId)

// Executor is added to remove by misjudgment due to async listener making it as idle).
// see SPARK-9552
if (executorsPendingToRemove.contains(executorId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of fixing the list of pending executors here, I wonder if it wouldn't be better to change removeExecutor to return false when the executors asked to be killed were busy? Then they wouldn't even be added to this list to start with.

That changes slightly the semantics of the return value, but it also sounds more correct. With your changes, killExecutors(..., force = false) will return true even if it didn't kill any executors, which sounds wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin, In the original design, I changed the return back value for that function (killExecutors). Not only for it is the last round of review comments. But also since It is still a little bit strange. For example, you have 3 executors to kill with force=false. And you find one of them is busy. It is hard to tell killing success or not directly. But if we only support single executor here, it is much simple and straightforward.
Besides, this is changed according to last round of review comments. Since the killExecutors only returns with the acknowledge (in documentation), which doesn't indicate the status of kill action. Please let me know your further thoughts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I agree with what you say, the current return value is both not very useful and really not in line with what the documentation says. It basically means "a message was sent to the cluster manager asking the executors to be killed". It doesn't mean the cluster manager received the message nor whether it successfully acted on it.

So IMO it should be fine to change the meaning of the return value of killExecutor slightly; it would make the return value slightly more useful.

Also, that makes me question whether your current code really works. If the executor ID is in the executorsPendingToRemove list, it means a request to kill that executor has already been sent to the cluster manager. Meaning that even if you remove the executor from this list, the cluster manager will still kill it. Which makes my suggestion of not sending the kill request even more important.

I see what the race is, but once the request is sent to the cluster manager, it's too late to try to fix things. So the only enhancement I see is if you're able to avoid sending the request in the first place.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin Here is the code path.

  1. Prepare entire executorID-list to be killed (meet certain criteria)
  2. killExecutors will filter out non-eligible ones (some of them may not be killed accordingly)
  3. no matter what kind of executors filtered out, if some of them are acknowledged(really killed), we will add all of the executorID-list to executorsPendingToRemove. There is no way to tell who is actually to kill.

That is why we need such kind of rescuing. please let me know if it makes sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is why we need such kind of rescuing

But what are you rescuing? You're removing the executor from the "pending to remove" list; but the request to kill the executor has already been sent, otherwise it would not be there. So the executor will still be killed, even if you remove it from this list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vanzin I got little bit confused. If at least one executor was killed, and return true. Then all those executors will be added to executorsPendingToRemove. see

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look at that code again. That code is calling killExecutor, NOT killExecutors. There is a single executor involved!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I see. You mean change the killExecutor return value only, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I mean changing the return value of killExecutor. But since killExecutor is implemented as a call to killExecutors, plural, with a list containing a single executor, you have to change the return value of killExecutors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I will change that accordingly. It will change the original semantics also.

// Rescue the executor from pending to remove list
executorsPendingToRemove.remove(executorId)
}
}

/**
Expand Down
11 changes: 6 additions & 5 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1448,10 +1448,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @return whether the request is received.
*/
@DeveloperApi
override def killExecutors(executorIds: Seq[String]): Boolean = {
override def killExecutors(executorIds: Seq[String], force: Boolean = true): Boolean = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.killExecutors(executorIds)
b.killExecutors(executorIds, force)
case _ =>
logWarning("Killing executors is only supported in coarse-grained mode")
false
Expand All @@ -1470,7 +1470,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @return whether the request is received.
*/
@DeveloperApi
override def killExecutor(executorId: String): Boolean = super.killExecutor(executorId)
override def killExecutor(executorId: String, force: Boolean = true): Boolean =
super.killExecutor(executorId, force)

/**
* Request that the cluster manager kill the specified executor without adjusting the
Expand All @@ -1486,10 +1487,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*
* @return whether the request is received.
*/
private[spark] def killAndReplaceExecutor(executorId: String): Boolean = {
private[spark] def killAndReplaceExecutor(executorId: String, force: Boolean = true): Boolean = {
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.killExecutors(Seq(executorId), replace = true)
b.killExecutors(Seq(executorId), replace = true, force)
case _ =>
logWarning("Killing executors is only supported in coarse-grained mode")
false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ private[spark] class TaskSchedulerImpl(
val nextTaskId = new AtomicLong(0)

// Which executor IDs we have executors on
val activeExecutorIds = new HashSet[String]
// each executor will record running or launched task number
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "task count" instead of "task number".

val activeExecutorIdsWithLoads = new HashMap[String, Int]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: instead of WithLoads, WithTasks or WithTaskCount?


// The set of executors we have on each host; this is used to compute hostsAlive, which
// in turn is used to decide when we can attain data locality on a given host
Expand Down Expand Up @@ -254,6 +255,7 @@ private[spark] class TaskSchedulerImpl(
val tid = task.taskId
taskIdToTaskSetManager(tid) = taskSet
taskIdToExecutorId(tid) = execId
activeExecutorIdsWithLoads(execId) += 1
executorsByHost(host) += execId
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
Expand Down Expand Up @@ -282,7 +284,7 @@ private[spark] class TaskSchedulerImpl(
var newExecAvail = false
for (o <- offers) {
executorIdToHost(o.executorId) = o.host
activeExecutorIds += o.executorId
activeExecutorIdsWithLoads.getOrElseUpdate(o.executorId, 0)
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
executorAdded(o.executorId, o.host)
Expand Down Expand Up @@ -331,7 +333,8 @@ private[spark] class TaskSchedulerImpl(
if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
// We lost this entire executor, so remember that it's gone
val execId = taskIdToExecutorId(tid)
if (activeExecutorIds.contains(execId)) {

if (activeExecutorIdsWithLoads.contains(execId)) {
removeExecutor(execId,
SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
failedExecutor = Some(execId)
Expand All @@ -341,7 +344,10 @@ private[spark] class TaskSchedulerImpl(
case Some(taskSet) =>
if (TaskState.isFinished(state)) {
taskIdToTaskSetManager.remove(tid)
taskIdToExecutorId.remove(tid)
taskIdToExecutorId.remove(tid) match {
case Some(execId) => activeExecutorIdsWithLoads(execId) -= 1
case None =>
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taskIdToExecutorId.remove(tid).foreach { eid =>
  if (executorIdToTaskCount.contains(execId)) {
    executorIdToTaskCount(execId) -= 1
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will change that.

}
if (state == TaskState.FINISHED) {
taskSet.removeRunningTask(tid)
Expand Down Expand Up @@ -462,7 +468,7 @@ private[spark] class TaskSchedulerImpl(
var failedExecutor: Option[String] = None

synchronized {
if (activeExecutorIds.contains(executorId)) {
if (activeExecutorIdsWithLoads.contains(executorId)) {
val hostPort = executorIdToHost(executorId)
logError("Lost executor %s on %s: %s".format(executorId, hostPort, reason))
removeExecutor(executorId, reason)
Expand All @@ -484,7 +490,8 @@ private[spark] class TaskSchedulerImpl(

/** Remove an executor from all our data structures and mark it as lost */
private def removeExecutor(executorId: String, reason: ExecutorLossReason) {
activeExecutorIds -= executorId
activeExecutorIdsWithLoads -= executorId

val host = executorIdToHost(executorId)
val execs = executorsByHost.getOrElse(host, new HashSet)
execs -= executorId
Expand Down Expand Up @@ -518,7 +525,11 @@ private[spark] class TaskSchedulerImpl(
}

def isExecutorAlive(execId: String): Boolean = synchronized {
activeExecutorIds.contains(execId)
activeExecutorIdsWithLoads.contains(execId)
}

def isExecutorBusy(execId: String): Boolean = synchronized {
activeExecutorIdsWithLoads.getOrElse(execId, -1) > 0
}

// By default, rack is unknown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,8 +410,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* Request that the cluster manager kill the specified executors.
* @return whether the kill request is acknowledged.
*/
final override def killExecutors(executorIds: Seq[String]): Boolean = synchronized {
killExecutors(executorIds, replace = false)
final override def killExecutors(
executorIds: Seq[String],
force: Boolean): Boolean = synchronized {
killExecutors(executorIds, replace = false, force)
}

/**
Expand All @@ -421,15 +423,29 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* @param replace whether to replace the killed executors with new ones
* @return whether the kill request is acknowledged.
*/
final def killExecutors(executorIds: Seq[String], replace: Boolean): Boolean = synchronized {
final def killExecutors(
executorIds: Seq[String],
replace: Boolean,
force: Boolean): Boolean = synchronized {
logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains)
unknownExecutors.foreach { id =>
logWarning(s"Executor to kill $id does not exist!")
}

// force killing all busy and idle executors
// otherwise, only idle executors are valid to be killed
val idleExecutors =
if (force) {
knownExecutors
} else {
knownExecutors.filter { id =>
logWarning(s"Busy executor $id is not valid to be killed!")
!scheduler.isExecutorBusy(id)}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would do

// If an executor is already pending to be removed, do not kill it again (SPARK-9795)
// If this executor is busy, do not kill it unless we are told to force kill it (SPARK-9552)
val executorsToKill = knownExecutors
  .filter { id => !executorsPendingToRemove.contains(id) }
  .filter { id => force || !scheduler.isExecutorBusy(id) }

no need to log a warning

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice suggestion. will change that.


// If an executor is already pending to be removed, do not kill it again (SPARK-9795)
val executorsToKill = knownExecutors.filter { id => !executorsPendingToRemove.contains(id) }
val executorsToKill = idleExecutors.filter { id => !executorsPendingToRemove.contains(id) }
executorsPendingToRemove ++= executorsToKill

// If we do not wish to replace the executors we kill, sync the target number of executors
Expand All @@ -442,6 +458,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
numPendingExecutors += knownExecutors.size
}

// executorsToKill may be empty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this comment mean? Is it a TODO? Do we need to check whether it's empty before passing it to doKillExecutors? We can probably just remove this comment and let the downstream code handle this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. will do

doKillExecutors(executorsToKill)
}

Expand Down