Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Use send for removeExecutor
  • Loading branch information
zsxwing committed Aug 31, 2016
commit dcd15e8e818d006956b97af7264fc4622847674e
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Ignoring the task kill since the executor is not registered.
logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
}

case RemoveExecutor(executorId, reason) =>
// We will remove the executor's state and cannot restore it. However, the connection
// between the driver and the executor may be still alive so that the executor won't exit
// automatically, so try to tell the executor to stop itself. See SPARK-13519.
executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))
removeExecutor(executorId, reason)
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
Expand Down Expand Up @@ -196,14 +203,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
context.reply(true)

case RemoveExecutor(executorId, reason) =>
// We will remove the executor's state and cannot restore it. However, the connection
// between the driver and the executor may be still alive so that the executor won't exit
// automatically, so try to tell the executor to stop itself. See SPARK-13519.
executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))
removeExecutor(executorId, reason)
context.reply(true)

case RetrieveSparkProps =>
context.reply(sparkProperties)
}
Expand Down Expand Up @@ -407,20 +406,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
}

// Called by subclasses when notified of a lost worker
/**
* Called by subclasses when notified of a lost worker. It just fires the message and returns
* at once.
*/
protected def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
try {
driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, reason))
} catch {
case e: Exception =>
throw new SparkException("Error notifying standalone scheduler's driver endpoint", e)
}
}

protected def removeExecutorAsync(
executorId: String,
reason: ExecutorLossReason): Future[Boolean] = {
driverEndpoint.ask[Boolean](RemoveExecutor(executorId, reason))
// Only log the failure since we don't care about the result.
driverEndpoint.send(RemoveExecutor(executorId, reason))
}

def sufficientResourcesRegistered(): Boolean = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,17 +148,13 @@ private[spark] class StandaloneSchedulerBackend(
fullId, hostPort, cores, Utils.megabytesToString(memory)))
}

/** Note: this method should not block. See [[StandaloneAppClientListener]] */
override def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]) {
val reason: ExecutorLossReason = exitStatus match {
case Some(code) => ExecutorExited(code, exitCausedByApp = true, message)
case None => SlaveLost(message)
}
logInfo("Executor %s removed: %s".format(fullId, message))
// Only log the failure since we don't care about the result.
removeExecutorAsync(fullId.split("/")(1), reason).onFailure { case t =>
logError(t.getMessage, t)
}(ThreadUtils.sameThread)
removeExecutor(fullId.split("/")(1), reason)
}

override def sufficientResourcesRegistered(): Boolean = {
Expand Down