Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.
Merged
Prev Previous commit
Next Next commit
Cleanup log messages
  • Loading branch information
samvantran committed Jan 16, 2019
commit 70ba913b3b3d51f91ae03ea5793c6f8d2b92ad1f
Original file line number Diff line number Diff line change
Expand Up @@ -740,17 +740,9 @@ private[spark] class MesosClusterScheduler(
* Task state like TASK_ERROR are not relaunchable state since it wasn't able
* to be validated by Mesos.
*/
private def shouldRelaunch(state: MesosTaskState, subId: String): Boolean = {
private def shouldRelaunch(state: MesosTaskState): Boolean = {
state == MesosTaskState.TASK_FAILED ||
state == MesosTaskState.TASK_LOST && {
logInfo(s"Verifying $subId has not already been launched elsewhere")
if (launchedDriversState.fetch(subId).isEmpty) {
true
} else {
logInfo(s"SubmissionId: $subId has already been re-launched!")
false
}
}
state == MesosTaskState.TASK_LOST
}

override def statusUpdate(driver: SchedulerDriver, status: TaskStatus): Unit = {
Expand All @@ -777,7 +769,6 @@ private[spark] class MesosClusterScheduler(
}
val state = launchedDrivers(subId)
// Check if the driver is supervise enabled and can be relaunched.
logTrace(s"Checking if $taskId is supervised and shouldRelaunch")
if (state.driverDescription.supervise && shouldRelaunch(status.getState, subId)) {
if (taskIsNotOutdated(taskId, state)) {
removeFromLaunchedDrivers(subId)
Expand Down Expand Up @@ -808,11 +799,9 @@ private[spark] class MesosClusterScheduler(
}

private def taskIsNotOutdated(taskId: String, state: MesosClusterSubmissionState): Boolean = {
logInfo(s"Verifying $taskId is not outdated")
if (getRetryCountFromTaskId(taskId) >= getRetryCountFromTaskId(state.frameworkId)) {
true
} else {
logInfo(s"$taskId is outdated and should be disregarded")
false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
var taskStatus = TaskStatus.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(response.submissionId).build())
.setSlaveId(agent1)
.setReason(TaskStatus.Reason.REASON_SLAVE_REMOVED)
.setState(MesosTaskState.TASK_LOST)
.build()

Expand Down Expand Up @@ -484,6 +485,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
.setTaskId(TaskID.newBuilder().setValue(response.submissionId).build())
.setSlaveId(agent1)
.setState(MesosTaskState.TASK_FAILED)
.setMessage("Abnormal executor termination")
.setReason(TaskStatus.Reason.REASON_EXECUTOR_TERMINATED)
.build()

scheduler.statusUpdate(driver, taskStatus)
Expand Down