diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 0400a15240e9..3a6923a606cf 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -523,8 +523,8 @@ private[spark] class SecurityManager( } /** - * Trying to find a File Based Secret with path specified in SPARK_AUTH_SECRET_CONF - */ + * Trying to find a File Based Secret with path specified in SPARK_AUTH_SECRET_CONF + */ def getFileBasedSecret(): Option[String] = { sparkConf .getOption(SPARK_AUTH_SECRET_CONF) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index bc78f1b1edb3..51d92a80618b 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -346,7 +346,7 @@ private[spark] class MesosClusterScheduler( this.masterInfo = Some(masterInfo) this.schedulerDriver = driver - if (!pendingRecover.isEmpty) { + if (pendingRecover.nonEmpty) { // Start task reconciliation if we need to recover. val statuses = pendingRecover.collect { case (taskId, slaveId) => @@ -766,6 +766,10 @@ private[spark] class MesosClusterScheduler( val state = launchedDrivers(subId) // Check if the driver is supervise enabled and can be relaunched. if (state.driverDescription.supervise && shouldRelaunch(status.getState)) { + if (taskIsOutdated(taskId, state)) { + // Prevent outdated task from overwriting a more recent status + return + } removeFromLaunchedDrivers(subId) state.finishDate = Some(new Date()) val retryState: Option[MesosClusterRetryState] = state.driverDescription.retryState @@ -788,6 +792,15 @@ private[spark] class MesosClusterScheduler( } } + /** + * Check if the task has already been launched or is pending + * If neither, the taskId is outdated and should be ignored + * This is to avoid scenarios where an outdated status update arrives + * after a supervised driver has already been relaunched + */ + private def taskIsOutdated(taskId: String, state: MesosClusterSubmissionState): Boolean = + taskId != state.taskId.getValue && !pendingRetryDrivers.contains(state.driverDescription) + private def retireDriver( submissionId: String, state: MesosClusterSubmissionState) = { diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 60880ab4c316..4eb8b822fd43 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -18,7 +18,7 @@ package org.apache.spark.scheduler.cluster.mesos import java.io.File -import java.util.{Collections, UUID, List => JList} +import java.util.{Collections, List => JList, UUID} import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import java.util.concurrent.locks.ReentrantLock diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala index 22803d038491..36e7d6dfd9b5 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala @@ -421,6 +421,84 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi assert(state.finishedDrivers.size == 1) } + test("does not restart outdated supervised drivers") { + // Covers scenario where: + // - agent goes down + // - supervised job is relaunched on another agent + // - first agent re-registers and sends status update: TASK_FAILED + // - job should NOT be relaunched again + val conf = new SparkConf() + conf.setMaster("mesos://localhost:5050") + conf.setAppName("SparkMesosDriverRetries") + setScheduler(conf.getAll.toMap) + + val mem = 1000 + val cpu = 1 + val offers = List( + Utils.createOffer("o1", "s1", mem, cpu, None), + Utils.createOffer("o2", "s2", mem, cpu, None), + Utils.createOffer("o3", "s1", mem, cpu, None)) + + val response = scheduler.submitDriver( + new MesosDriverDescription("d1", "jar", 100, 1, true, command, + Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")), "sub1", new Date())) + assert(response.success) + + // Offer a resource to launch the submitted driver + scheduler.resourceOffers(driver, Collections.singletonList(offers.head)) + var state = scheduler.getSchedulerState() + assert(state.launchedDrivers.size == 1) + + // Signal agent lost with status with TASK_LOST + val agent1 = SlaveID.newBuilder().setValue("s1").build() + var taskStatus = TaskStatus.newBuilder() + .setTaskId(TaskID.newBuilder().setValue(response.submissionId).build()) + .setSlaveId(agent1) + .setReason(TaskStatus.Reason.REASON_SLAVE_REMOVED) + .setState(MesosTaskState.TASK_LOST) + .build() + + scheduler.statusUpdate(driver, taskStatus) + state = scheduler.getSchedulerState() + assert(state.pendingRetryDrivers.size == 1) + assert(state.launchedDrivers.isEmpty) + + // Offer new resource to retry driver on a new agent + Thread.sleep(1500) // sleep to cover nextRetry's default wait time of 1s + scheduler.resourceOffers(driver, Collections.singletonList(offers(1))) + + val agent2 = SlaveID.newBuilder().setValue("s2").build() + taskStatus = TaskStatus.newBuilder() + .setTaskId(TaskID.newBuilder().setValue(response.submissionId).build()) + .setSlaveId(agent2) + .setState(MesosTaskState.TASK_RUNNING) + .build() + + scheduler.statusUpdate(driver, taskStatus) + state = scheduler.getSchedulerState() + assert(state.pendingRetryDrivers.isEmpty) + assert(state.launchedDrivers.size == 1) + assert(state.launchedDrivers.head.taskId.getValue.endsWith("-retry-1")) + + // Agent1 comes back online and sends status update: TASK_FAILED + taskStatus = TaskStatus.newBuilder() + .setTaskId(TaskID.newBuilder().setValue(response.submissionId).build()) + .setSlaveId(agent1) + .setState(MesosTaskState.TASK_FAILED) + .setMessage("Abnormal executor termination") + .setReason(TaskStatus.Reason.REASON_EXECUTOR_TERMINATED) + .build() + + scheduler.statusUpdate(driver, taskStatus) + scheduler.resourceOffers(driver, Collections.singletonList(offers.last)) + + // Assert driver does not restart 2nd time + state = scheduler.getSchedulerState() + assert(state.pendingRetryDrivers.isEmpty) + assert(state.launchedDrivers.size == 1) + assert(state.launchedDrivers.head.taskId.getValue.endsWith("-retry-1")) + } + test("Declines offer with refuse seconds = 120.") { setScheduler()