Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Change back to use ask
  • Loading branch information
zsxwing committed Aug 31, 2016
commit 0cfc2829a5b14d726d5399e6c525acc544ee10a7
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Ignoring the task kill since the executor is not registered.
logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
}

case RemoveExecutor(executorId, reason) =>
// We will remove the executor's state and cannot restore it. However, the connection
// between the driver and the executor may be still alive so that the executor won't exit
// automatically, so try to tell the executor to stop itself. See SPARK-13519.
executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))
removeExecutor(executorId, reason)
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
Expand Down Expand Up @@ -202,6 +195,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
context.reply(true)

case RemoveExecutor(executorId, reason) =>
// We will remove the executor's state and cannot restore it. However, the connection
// between the driver and the executor may be still alive so that the executor won't exit
// automatically, so try to tell the executor to stop itself. See SPARK-13519.
executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))
removeExecutor(executorId, reason)
context.reply(true)

case RetrieveSparkProps =>
context.reply(sparkProperties)
}
Expand Down Expand Up @@ -410,7 +411,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* at once.
*/
protected def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
driverEndpoint.send(RemoveExecutor(executorId, reason))
// Only log the failure since we don't care about the result.
driverEndpoint.ask(RemoveExecutor(executorId, reason)).onFailure { case t =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still need to use ask since there are some places using ask(RemoveExecutor).

logError(t.getMessage, t)
}
}

def sufficientResourcesRegistered(): Boolean = true
Expand Down