Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.
Merged
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -523,8 +523,8 @@ private[spark] class SecurityManager(
}

/**
* Trying to find a File Based Secret with path specified in SPARK_AUTH_SECRET_CONF
*/
* Trying to find a File Based Secret with path specified in SPARK_AUTH_SECRET_CONF
*/
def getFileBasedSecret(): Option[String] = {
sparkConf
.getOption(SPARK_AUTH_SECRET_CONF)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ private[spark] class MesosClusterScheduler(
this.masterInfo = Some(masterInfo)
this.schedulerDriver = driver

if (!pendingRecover.isEmpty) {
if (pendingRecover.nonEmpty) {
// Start task reconciliation if we need to recover.
val statuses = pendingRecover.collect {
case (taskId, slaveId) =>
Expand Down Expand Up @@ -766,6 +766,10 @@ private[spark] class MesosClusterScheduler(
val state = launchedDrivers(subId)
// Check if the driver is supervise enabled and can be relaunched.
if (state.driverDescription.supervise && shouldRelaunch(status.getState)) {
if (taskIsOutdated(taskId, state)) {
// Prevent outdated task from overwriting a more recent status
return
}
removeFromLaunchedDrivers(subId)
state.finishDate = Some(new Date())
val retryState: Option[MesosClusterRetryState] = state.driverDescription.retryState
Expand All @@ -788,6 +792,15 @@ private[spark] class MesosClusterScheduler(
}
}

/**
* Check if the task has already been launched or is pending
* If neither, the taskId is outdated and should be ignored
* This is to avoid scenarios where an outdated status update arrives
* after a supervised driver has already been relaunched
*/
private def taskIsOutdated(taskId: String, state: MesosClusterSubmissionState): Boolean =
taskId != state.taskId.getValue && !pendingRetryDrivers.contains(state.driverDescription)

private def retireDriver(
submissionId: String,
state: MesosClusterSubmissionState) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.scheduler.cluster.mesos

import java.io.File
import java.util.{Collections, UUID, List => JList}
import java.util.{Collections, List => JList, UUID}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import java.util.concurrent.locks.ReentrantLock

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,84 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
assert(state.finishedDrivers.size == 1)
}

test("does not restart outdated supervised drivers") {
// Covers scenario where:
// - agent goes down
// - supervised job is relaunched on another agent
// - first agent re-registers and sends status update: TASK_FAILED
// - job should NOT be relaunched again
val conf = new SparkConf()
conf.setMaster("mesos://localhost:5050")
conf.setAppName("SparkMesosDriverRetries")
setScheduler(conf.getAll.toMap)

val mem = 1000
val cpu = 1
val offers = List(
Utils.createOffer("o1", "s1", mem, cpu, None),
Utils.createOffer("o2", "s2", mem, cpu, None),
Utils.createOffer("o3", "s1", mem, cpu, None))

val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 100, 1, true, command,
Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")), "sub1", new Date()))
assert(response.success)

// Offer a resource to launch the submitted driver
scheduler.resourceOffers(driver, Collections.singletonList(offers.head))
var state = scheduler.getSchedulerState()
assert(state.launchedDrivers.size == 1)

// Signal agent lost with status with TASK_LOST
val agent1 = SlaveID.newBuilder().setValue("s1").build()
var taskStatus = TaskStatus.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(response.submissionId).build())
.setSlaveId(agent1)
.setReason(TaskStatus.Reason.REASON_SLAVE_REMOVED)
.setState(MesosTaskState.TASK_LOST)
.build()

scheduler.statusUpdate(driver, taskStatus)
state = scheduler.getSchedulerState()
assert(state.pendingRetryDrivers.size == 1)
assert(state.launchedDrivers.isEmpty)

// Offer new resource to retry driver on a new agent
Thread.sleep(1500) // sleep to cover nextRetry's default wait time of 1s
scheduler.resourceOffers(driver, Collections.singletonList(offers(1)))

val agent2 = SlaveID.newBuilder().setValue("s2").build()
taskStatus = TaskStatus.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(response.submissionId).build())
.setSlaveId(agent2)
.setState(MesosTaskState.TASK_RUNNING)
.build()

scheduler.statusUpdate(driver, taskStatus)
state = scheduler.getSchedulerState()
assert(state.pendingRetryDrivers.isEmpty)
assert(state.launchedDrivers.size == 1)
assert(state.launchedDrivers.head.taskId.getValue.endsWith("-retry-1"))

// Agent1 comes back online and sends status update: TASK_FAILED
taskStatus = TaskStatus.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(response.submissionId).build())
.setSlaveId(agent1)
.setState(MesosTaskState.TASK_FAILED)
.setMessage("Abnormal executor termination")
.setReason(TaskStatus.Reason.REASON_EXECUTOR_TERMINATED)
.build()

scheduler.statusUpdate(driver, taskStatus)
scheduler.resourceOffers(driver, Collections.singletonList(offers.last))

// Assert driver does not restart 2nd time
state = scheduler.getSchedulerState()
assert(state.pendingRetryDrivers.isEmpty)
assert(state.launchedDrivers.size == 1)
assert(state.launchedDrivers.head.taskId.getValue.endsWith("-retry-1"))
}

test("Declines offer with refuse seconds = 120.") {
setScheduler()

Expand Down